ewmscp  ..
Public Member Functions | Protected Member Functions | Static Protected Member Functions | Private Attributes | Static Private Attributes | List of all members
followJsonRequestProvider Class Reference

#include <followJsonRequestProvider.h>

Inheritance diagram for followJsonRequestProvider:
[legend]
Collaboration diagram for followJsonRequestProvider:
[legend]

Public Member Functions

 followJsonRequestProvider (decltype(requests) aRequests, decltype(delayedRequests) aDelayedRequests, decltype(parents) aParents, decltype(InputHandler) aInputHandler, decltype(OutputHandler) aOutputHandler)
 
 ~followJsonRequestProvider () override=default
 
- Public Member Functions inherited from followRequestProvider
 ~followRequestProvider () override=default
 
bool isFollowMode () const override
 
void processSources (const std::vector< std::string > &sources) override
 
- Public Member Functions inherited from requestProvider
 requestProvider (decltype(requests) aRequests, decltype(delayedRequests) aDelayedRequests, decltype(parents) aParents, decltype(InputHandler) aInputHandler, decltype(OutputHandler) aOutputHandler)
 ! use only the file name part in the destination More...
 
virtual ~requestProvider ()=default
 
virtual void prepareMappings (std::vector< std::string > &sources, std::string &destination)
 
virtual void printMappings (std::ostream &stream)
 

Protected Member Functions

void cleanupRenameEvents (copyRequest::clock_type::duration minAge)
 
void followStream (std::istream &stream) override
 
virtual void processJson (const Json::Value &json)
 
- Protected Member Functions inherited from followRequestProvider
template<typename ... Types>
void enqueueOrAppend (const std::string &srcPath, copyRequest::fileInWork::slotTypes type, copyRequest::clock_type::time_point timestamp, Types ... args)
 
virtual void handleMove (const std::string &srcPath, const std::string &dstPath, const std::string &fromPath, const std::string &origPath, const singleMap &mapEntry, copyRequest::clock_type::time_point timestamp)
 
virtual void handleOther (const std::string &srcPath, const std::string &dstPath, const singleMap &mapEntry, copyRequest::clock_type::time_point timestamp, bool requestForRemoval)
 
 followRequestProvider (decltype(requests) aRequests, decltype(delayedRequests) aDelayedRequests, decltype(parents) aParents, decltype(InputHandler) aInputHandler, decltype(OutputHandler) aOutputHandler)
 
- Protected Member Functions inherited from requestProvider
virtual const singleMapgetDstPath (const std::string &source, std::string &destination, bool baseNameOnly=false)
 get detstination papth for a given source path More...
 

Static Protected Member Functions

static std::string getRequiredJsonValue (const Json::Value &json, const std::string &key)
 
- Static Protected Member Functions inherited from requestProvider
static std::map< std::string, factoryClass * > & getFactoryMap ()
 

Private Attributes

renameEventHandler renameHandler
 

Static Private Attributes

static factoryTemplate< followJsonRequestProviderfactory
 
static options::single< int > jsonDelimiter
 
static options::single< std::string > pathKey
 
static options::single< std::string > operationKey
 
static options::single< std::string > timestampKey
 
static options::single< std::string > timestampFormat
 
static options::single< copyRequest::clock_type::duration > timestampOffset
 
static options::single< std::string > cookieKey
 
static options::single< std::string > copyOpValue
 
static options::single< std::string > deleteOpValue
 
static options::single< std::string > moveFromOpValue
 
static options::single< std::string > moveToOpValue
 
static options::single< std::regex > sourceMatchRegex
 

Additional Inherited Members

- Static Public Member Functions inherited from requestProvider
static requestProvidernewProvider (const std::string &choice, decltype(requests) aRequests, decltype(delayedRequests) aDelayedRequests, decltype(parents) aParents, decltype(InputHandler) aInputHandler, decltype(OutputHandler) aOutputHandler)
 
static void addAllowedNamesToOption (options::single< std::string > &option)
 
- Protected Attributes inherited from requestProvider
copyRequest::simpleQueuerequests
 
copyRequest::timedQueuedelayedRequests
 
bool parents
 
inputHandler::baseInputHandler
 
outputHandler::baseOutputHandler
 
- Static Protected Attributes inherited from followRequestProvider
static options::single< bool > mayDelete
 
static options::single< bool > nullDelimiter
 
- Static Protected Attributes inherited from requestProvider
static options::map< std::string, pathMapTypepathMap
 
static singleMap badMapEntry
 

Detailed Description

Definition at line 9 of file followJsonRequestProvider.h.

Constructor & Destructor Documentation

◆ followJsonRequestProvider()

followJsonRequestProvider::followJsonRequestProvider ( decltype(requests aRequests,
decltype(delayedRequests aDelayedRequests,
decltype(parents aParents,
decltype(InputHandler aInputHandler,
decltype(OutputHandler aOutputHandler 
)
inline

Definition at line 34 of file followJsonRequestProvider.h.

38  :
39  followRequestProvider(aRequests, aDelayedRequests, aParents,
40  aInputHandler, aOutputHandler) {};

◆ ~followJsonRequestProvider()

followJsonRequestProvider::~followJsonRequestProvider ( )
overridedefault

Member Function Documentation

◆ cleanupRenameEvents()

void followJsonRequestProvider::cleanupRenameEvents ( copyRequest::clock_type::duration  minAge)
protected

Definition at line 155 of file followJsonRequestProvider.cpp.

155  {
156  while (auto event = renameHandler.getNextStaleSingle(minAge)) {
157  if (event->moveType == renameHalfEvent::moveTypeEnum::moveFrom) {
159  event->dstPath, "found stale moveFrom, cookie", event->cookie,
160  "from", std::fixed, event->timestamp);
161  if (mayDelete) {
162  handleOther(event->srcPath, event->dstPath, event->mapEntry, event->timestamp, true);
163  }
164  } else {
166  event->dstPath, "found stale moveTo, cookie", event->cookie,
167  "from", std::fixed, event->timestamp);
168  handleOther(event->srcPath, event->dstPath, event->mapEntry, event->timestamp, false);
169  }
171  }
172 }

References errMsg::debug, errMsg::emit(), renameEventHandler::forgetRenameEvent(), renameEventHandler::getNextStaleSingle(), followRequestProvider::handleOther(), followRequestProvider::mayDelete, renameHalfEvent::moveFrom, and renameHandler.

Referenced by followStream(), and followKafkaRequestProvider::processSources().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ followStream()

void followJsonRequestProvider::followStream ( std::istream &  stream)
overrideprotectedvirtual

Implements followRequestProvider.

Reimplemented in followKafkaRequestProvider.

Definition at line 37 of file followJsonRequestProvider.cpp.

37  {
38  char delimiter = nullDelimiter ? '\0' : '\n';
39  if (jsonDelimiter.fIsSet()) {
40  delimiter = jsonDelimiter;
41  }
42  Json::Value json;
43  getlineWithTimeout lineGetter(stream, delimiter);
44  while (lineGetter.active()) {
45  cleanupRenameEvents(std::chrono::seconds(30));
46  if (stopRequest::Requested()) {
47  break;
48  }
49  {
50  // scope to contain line and it's lock
51  getlineWithTimeout::nocopyLine line(lineGetter, std::chrono::seconds(5));
52  if (line.timedOut()) {
53  continue;
54  }
55  if (line.line().empty()) {
56  continue;
57  }
58  std::stringstream buffer(line.line());
59  buffer >> json;
60  }
61  processJson(json);
62  json.clear();
63  }
64  cleanupRenameEvents(std::chrono::seconds(-10));
65  lineGetter.active();
66 }

References getlineWithTimeout::active(), cleanupRenameEvents(), options::base::fIsSet(), jsonDelimiter, getlineWithTimeout::nocopyLine::line(), followRequestProvider::nullDelimiter, processJson(), stopRequest::Requested(), and getlineWithTimeout::nocopyLine::timedOut().

Here is the call graph for this function:

◆ getRequiredJsonValue()

std::string followJsonRequestProvider::getRequiredJsonValue ( const Json::Value &  json,
const std::string &  key 
)
staticprotected

Definition at line 68 of file followJsonRequestProvider.cpp.

69  {
70  auto jsonValue = json.find(key.c_str(), key.c_str() + key.size());
71  if (jsonValue == nullptr) {
72  return "";
73  }
74  return jsonValue->asString();
75 }

Referenced by processJson().

Here is the caller graph for this function:

◆ processJson()

void followJsonRequestProvider::processJson ( const Json::Value &  json)
protectedvirtual

Definition at line 77 of file followJsonRequestProvider.cpp.

77  {
78  auto srcPath = getRequiredJsonValue(json, pathKey);
79  if (srcPath.empty()) { // ignore json without path data
80  return;
81  }
82  if (sourceMatchRegex.fIsSet() && ! std::regex_match(srcPath, sourceMatchRegex)) {
83  return;
84  }
85  auto operation = getRequiredJsonValue(json, operationKey);
86  if (operation.empty()) { // ignore json without operation defined
87  return;
88  }
89 
90  copyRequest::clock_type::time_point timestamp;
91  auto jsonTimestamp = json.find(timestampKey.c_str(), timestampKey.c_str() + timestampKey.size());
92  if (jsonTimestamp) {
93  if (timestampFormat == "%s") { // use
94  timestamp = copyRequest::clock_type::time_point(std::chrono::duration_cast<copyRequest::clock_type::duration>(std::chrono::duration<double>(jsonTimestamp->asDouble())));
95  } else {
96  struct tm tmStruct{};
97  strptime(jsonTimestamp->asCString(), timestampFormat.c_str(), &tmStruct);
98  timestamp = copyRequest::clock_type::time_point(std::chrono::duration_cast<copyRequest::clock_type::duration>(std::chrono::duration<double>(mktime(&tmStruct)))) + timestampOffset;
99  if (timestampFormat.back() == 'z') {
100  auto tzStart=jsonTimestamp->asString().find_last_of("+-");
101  if (tzStart != std::string::npos) {
102  auto offset = std::stoi(jsonTimestamp->asString().substr(tzStart));
103  timestamp -= std::chrono::hours(offset/100);
104  timestamp -= std::chrono::minutes(offset%100);
105  }
106  }
107  }
108  } else {
109  timestamp = copyRequest::clock_type::now();
110  }
111 
112  std::string dstPath;
113  auto mapEntry = getDstPath(srcPath, dstPath);
114  if (operation == copyOpValue) {
115  handleOther(srcPath, dstPath, mapEntry, timestamp, false);
116  } else if (operation == deleteOpValue) {
117  handleOther(srcPath, dstPath, mapEntry, timestamp, true);
118  } else if (operation == moveFromOpValue || operation == moveToOpValue) {
119  auto jsonCookie = getRequiredJsonValue(json, cookieKey);
120  if (jsonCookie.empty()) {
121  return;
122  }
123  renameHalfEvent::cookieType cookie = std::stoull(jsonCookie);
125  if (operation == moveFromOpValue) {
127  } else {
129  }
130  auto partner = renameHandler.getRenamePartner(cookie, srcPath, dstPath, moveType, mapEntry, timestamp);
131  if (partner != nullptr) { // we found a partner
132  if (moveType == renameHalfEvent::moveTypeEnum::moveFrom) {
133  handleMove(partner->srcPath,
134  partner->dstPath,
135  dstPath,
136  srcPath,
137  mapEntry,
138  timestamp);
139  } else {
140  handleMove(srcPath,
141  dstPath,
142  partner->dstPath,
143  partner->srcPath,
144  mapEntry,
145  timestamp);
146  }
148  }
149  }
150 }

References cookieKey, copyOpValue, deleteOpValue, options::base::fIsSet(), renameEventHandler::forgetRenameEvent(), requestProvider::getDstPath(), renameEventHandler::getRenamePartner(), getRequiredJsonValue(), followRequestProvider::handleMove(), followRequestProvider::handleOther(), renameHalfEvent::moveFrom, moveFromOpValue, renameHalfEvent::moveTo, moveToOpValue, operationKey, pathKey, renameHandler, sourceMatchRegex, timestampFormat, timestampKey, and timestampOffset.

Referenced by followStream(), and followKafkaRequestProvider::processSources().

Here is the call graph for this function:
Here is the caller graph for this function:

Member Data Documentation

◆ cookieKey

options::single<std::string> followJsonRequestProvider::cookieKey
staticprivate

Definition at line 19 of file followJsonRequestProvider.h.

Referenced by processJson().

◆ copyOpValue

options::single<std::string> followJsonRequestProvider::copyOpValue
staticprivate

Definition at line 20 of file followJsonRequestProvider.h.

Referenced by processJson().

◆ deleteOpValue

options::single<std::string> followJsonRequestProvider::deleteOpValue
staticprivate

Definition at line 21 of file followJsonRequestProvider.h.

Referenced by processJson().

◆ factory

factoryTemplate<followJsonRequestProvider> followJsonRequestProvider::factory
staticprivate

Definition at line 11 of file followJsonRequestProvider.h.

◆ jsonDelimiter

options::single<int> followJsonRequestProvider::jsonDelimiter
staticprivate

Definition at line 13 of file followJsonRequestProvider.h.

Referenced by followStream().

◆ moveFromOpValue

options::single<std::string> followJsonRequestProvider::moveFromOpValue
staticprivate

Definition at line 22 of file followJsonRequestProvider.h.

Referenced by processJson().

◆ moveToOpValue

options::single<std::string> followJsonRequestProvider::moveToOpValue
staticprivate

Definition at line 23 of file followJsonRequestProvider.h.

Referenced by processJson().

◆ operationKey

options::single<std::string> followJsonRequestProvider::operationKey
staticprivate

Definition at line 15 of file followJsonRequestProvider.h.

Referenced by processJson().

◆ pathKey

options::single<std::string> followJsonRequestProvider::pathKey
staticprivate

Definition at line 14 of file followJsonRequestProvider.h.

Referenced by processJson().

◆ renameHandler

renameEventHandler followJsonRequestProvider::renameHandler
private

Definition at line 25 of file followJsonRequestProvider.h.

Referenced by cleanupRenameEvents(), and processJson().

◆ sourceMatchRegex

options::single<std::regex> followJsonRequestProvider::sourceMatchRegex
staticprivate

Definition at line 24 of file followJsonRequestProvider.h.

Referenced by processJson().

◆ timestampFormat

options::single<std::string> followJsonRequestProvider::timestampFormat
staticprivate

Definition at line 17 of file followJsonRequestProvider.h.

Referenced by processJson().

◆ timestampKey

options::single<std::string> followJsonRequestProvider::timestampKey
staticprivate

Definition at line 16 of file followJsonRequestProvider.h.

Referenced by processJson().

◆ timestampOffset

options::single<copyRequest::clock_type::duration> followJsonRequestProvider::timestampOffset
staticprivate

Definition at line 18 of file followJsonRequestProvider.h.

Referenced by processJson().


The documentation for this class was generated from the following files:
followJsonRequestProvider::processJson
virtual void processJson(const Json::Value &json)
Definition: followJsonRequestProvider.cpp:77
followJsonRequestProvider::cookieKey
static options::single< std::string > cookieKey
Definition: followJsonRequestProvider.h:19
errMsg::location
class for defining the location of a error message in the source code.
Definition: errMsgQueue.h:14
renameEventHandler::forgetRenameEvent
void forgetRenameEvent(const renameHalfEvent *what)
Definition: renameEventHandler.h:66
renameHalfEvent::moveTypeEnum::moveTo
@ moveTo
followRequestProvider::handleMove
virtual void handleMove(const std::string &srcPath, const std::string &dstPath, const std::string &fromPath, const std::string &origPath, const singleMap &mapEntry, copyRequest::clock_type::time_point timestamp)
Definition: followRequestProvider.cpp:31
options::base::fIsSet
virtual bool fIsSet() const
check if this option was set, regardless of from command line or config file
Definition: Options.h:263
followRequestProvider::mayDelete
static options::single< bool > mayDelete
Definition: followRequestProvider.h:19
getlineWithTimeout
Definition: getlineWithTimeout.h:9
renameEventHandler::getNextStaleSingle
const renameHalfEvent * getNextStaleSingle(renameHalfEvent::timeType::duration timeout)
Definition: renameEventHandler.h:69
followJsonRequestProvider::moveFromOpValue
static options::single< std::string > moveFromOpValue
Definition: followJsonRequestProvider.h:22
followJsonRequestProvider::operationKey
static options::single< std::string > operationKey
Definition: followJsonRequestProvider.h:15
errMsg::level::debug
@ debug
followJsonRequestProvider::copyOpValue
static options::single< std::string > copyOpValue
Definition: followJsonRequestProvider.h:20
renameHalfEvent::moveTypeEnum::moveFrom
@ moveFrom
getlineWithTimeout::nocopyLine
Definition: getlineWithTimeout.h:49
followRequestProvider::nullDelimiter
static options::single< bool > nullDelimiter
Definition: followRequestProvider.h:20
followJsonRequestProvider::sourceMatchRegex
static options::single< std::regex > sourceMatchRegex
Definition: followJsonRequestProvider.h:24
stopRequest::Requested
static bool Requested()
Definition: ewmscp.cpp:153
followJsonRequestProvider::pathKey
static options::single< std::string > pathKey
Definition: followJsonRequestProvider.h:14
followJsonRequestProvider::timestampFormat
static options::single< std::string > timestampFormat
Definition: followJsonRequestProvider.h:17
followRequestProvider::followRequestProvider
followRequestProvider(decltype(requests) aRequests, decltype(delayedRequests) aDelayedRequests, decltype(parents) aParents, decltype(InputHandler) aInputHandler, decltype(OutputHandler) aOutputHandler)
Definition: followRequestProvider.cpp:9
followJsonRequestProvider::timestampKey
static options::single< std::string > timestampKey
Definition: followJsonRequestProvider.h:16
followJsonRequestProvider::moveToOpValue
static options::single< std::string > moveToOpValue
Definition: followJsonRequestProvider.h:23
followJsonRequestProvider::timestampOffset
static options::single< copyRequest::clock_type::duration > timestampOffset
Definition: followJsonRequestProvider.h:18
followJsonRequestProvider::jsonDelimiter
static options::single< int > jsonDelimiter
Definition: followJsonRequestProvider.h:13
errMsg::emit
void emit(level aLogLevel, const location &loc, const std::string &aObject, const std::string &aAction, const Args &... args)
function to create and enqueue a message, this is the only way that messages should be created!
Definition: errMsgQueue.h:148
renameEventHandler::getRenamePartner
const renameHalfEvent * getRenamePartner(renameHalfEvent::cookieType cookie, const renameHalfEvent::pathType &aSrcPath, const renameHalfEvent::pathType &aDstPath, renameHalfEvent::moveTypeEnum aIsMoveFrom, const singleMap &aMapEntry, renameHalfEvent::timeType::time_point aTimestamp)
Definition: renameEventHandler.h:43
followJsonRequestProvider::renameHandler
renameEventHandler renameHandler
Definition: followJsonRequestProvider.h:25
renameHalfEvent::moveTypeEnum
moveTypeEnum
Definition: renameEventHandler.h:16
requestProvider::getDstPath
virtual const singleMap & getDstPath(const std::string &source, std::string &destination, bool baseNameOnly=false)
get detstination papth for a given source path
Definition: requestProvider.cpp:30
followRequestProvider::handleOther
virtual void handleOther(const std::string &srcPath, const std::string &dstPath, const singleMap &mapEntry, copyRequest::clock_type::time_point timestamp, bool requestForRemoval)
Definition: followRequestProvider.cpp:41
followJsonRequestProvider::getRequiredJsonValue
static std::string getRequiredJsonValue(const Json::Value &json, const std::string &key)
Definition: followJsonRequestProvider.cpp:68
renameHalfEvent::cookieType
long cookieType
Definition: renameEventHandler.h:8
followJsonRequestProvider::deleteOpValue
static options::single< std::string > deleteOpValue
Definition: followJsonRequestProvider.h:21
followJsonRequestProvider::cleanupRenameEvents
void cleanupRenameEvents(copyRequest::clock_type::duration minAge)
Definition: followJsonRequestProvider.cpp:155