ewmscp  ..
followJsonRequestProvider.cpp
Go to the documentation of this file.
1 #include <OptionsChrono.h>
3 #include "getlineWithTimeout.h"
4 #include <json/json.h>
5 #include <time.h>
6 
7 
8 
10 
12  "delimiter between json records, if not \\0 or \\n", 0);
14  "key for path data in json input", "path");
16  "key for operation in json input", "event");
18  "key for timestamp in json input", "eventTime");
19 defineStatic(followJsonRequestProvider::timestampFormat, '\0', "jsonTimestampFormat",
20  "format for timestamp in json input, like strptime",
21  "%Y-%m-%d_%H:%M:%S%z");
22 defineStatic(followJsonRequestProvider::timestampOffset, '\0', "jsonTimestampOffset",
23  "offset of timestamp", copyRequest::clock_type::duration::zero());
25  "key for rename cookie in json input", "cookie");
27  "operation value triggering a copy", "IN_CLOSE_WRITE");
29  "operation value triggering a delete", "IN_DELETE");
31  "operation value triggering a move from", "IN_MOVED_FROM");
33  "operation value triggering a move to", "IN_MOVED_TO");
34 defineStatic(followJsonRequestProvider::sourceMatchRegex, '\0', "jsonSourceMatchRegex",
35  "regegx that must be matched if set to consider a path");
36 
37 void followJsonRequestProvider::followStream(std::istream& stream) {
38  char delimiter = nullDelimiter ? '\0' : '\n';
39  if (jsonDelimiter.fIsSet()) {
40  delimiter = jsonDelimiter;
41  }
42  Json::Value json;
43  getlineWithTimeout lineGetter(stream, delimiter);
44  while (lineGetter.active()) {
45  cleanupRenameEvents(std::chrono::seconds(30));
46  if (stopRequest::Requested()) {
47  break;
48  }
49  {
50  // scope to contain line and it's lock
51  getlineWithTimeout::nocopyLine line(lineGetter, std::chrono::seconds(5));
52  if (line.timedOut()) {
53  continue;
54  }
55  if (line.line().empty()) {
56  continue;
57  }
58  std::stringstream buffer(line.line());
59  buffer >> json;
60  }
61  processJson(json);
62  json.clear();
63  }
64  cleanupRenameEvents(std::chrono::seconds(-10));
65  lineGetter.active();
66 }
67 
68 std::string followJsonRequestProvider::getRequiredJsonValue(const Json::Value& json,
69  const std::string& key) {
70  auto jsonValue = json.find(key.c_str(), key.c_str() + key.size());
71  if (jsonValue == nullptr) {
72  return "";
73  }
74  return jsonValue->asString();
75 }
76 
77 void followJsonRequestProvider::processJson(const Json::Value& json) {
78  auto srcPath = getRequiredJsonValue(json, pathKey);
79  if (srcPath.empty()) { // ignore json without path data
80  return;
81  }
82  if (sourceMatchRegex.fIsSet() && ! std::regex_match(srcPath, sourceMatchRegex)) {
83  return;
84  }
85  auto operation = getRequiredJsonValue(json, operationKey);
86  if (operation.empty()) { // ignore json without operation defined
87  return;
88  }
89 
90  copyRequest::clock_type::time_point timestamp;
91  auto jsonTimestamp = json.find(timestampKey.c_str(), timestampKey.c_str() + timestampKey.size());
92  if (jsonTimestamp) {
93  if (timestampFormat == "%s") { // use
94  timestamp = copyRequest::clock_type::time_point(std::chrono::duration_cast<copyRequest::clock_type::duration>(std::chrono::duration<double>(jsonTimestamp->asDouble())));
95  } else {
96  struct tm tmStruct{};
97  strptime(jsonTimestamp->asCString(), timestampFormat.c_str(), &tmStruct);
98  timestamp = copyRequest::clock_type::time_point(std::chrono::duration_cast<copyRequest::clock_type::duration>(std::chrono::duration<double>(mktime(&tmStruct)))) + timestampOffset;
99  if (timestampFormat.back() == 'z') {
100  auto tzStart=jsonTimestamp->asString().find_last_of("+-");
101  if (tzStart != std::string::npos) {
102  auto offset = std::stoi(jsonTimestamp->asString().substr(tzStart));
103  timestamp -= std::chrono::hours(offset/100);
104  timestamp -= std::chrono::minutes(offset%100);
105  }
106  }
107  }
108  } else {
109  timestamp = copyRequest::clock_type::now();
110  }
111 
112  std::string dstPath;
113  auto mapEntry = getDstPath(srcPath, dstPath);
114  if (operation == copyOpValue) {
115  handleOther(srcPath, dstPath, mapEntry, timestamp, false);
116  } else if (operation == deleteOpValue) {
117  handleOther(srcPath, dstPath, mapEntry, timestamp, true);
118  } else if (operation == moveFromOpValue || operation == moveToOpValue) {
119  auto jsonCookie = getRequiredJsonValue(json, cookieKey);
120  if (jsonCookie.empty()) {
121  return;
122  }
123  renameHalfEvent::cookieType cookie = std::stoull(jsonCookie);
125  if (operation == moveFromOpValue) {
127  } else {
129  }
130  auto partner = renameHandler.getRenamePartner(cookie, srcPath, dstPath, moveType, mapEntry, timestamp);
131  if (partner != nullptr) { // we found a partner
132  if (moveType == renameHalfEvent::moveTypeEnum::moveFrom) {
133  handleMove(partner->srcPath,
134  partner->dstPath,
135  dstPath,
136  srcPath,
137  mapEntry,
138  timestamp);
139  } else {
140  handleMove(srcPath,
141  dstPath,
142  partner->dstPath,
143  partner->srcPath,
144  mapEntry,
145  timestamp);
146  }
148  }
149  }
150 }
151 
152 
153 
154 
155 void followJsonRequestProvider::cleanupRenameEvents(copyRequest::clock_type::duration minAge) {
156  while (auto event = renameHandler.getNextStaleSingle(minAge)) {
157  if (event->moveType == renameHalfEvent::moveTypeEnum::moveFrom) {
159  event->dstPath, "found stale moveFrom, cookie", event->cookie,
160  "from", std::fixed, event->timestamp);
161  if (mayDelete) {
162  handleOther(event->srcPath, event->dstPath, event->mapEntry, event->timestamp, true);
163  }
164  } else {
166  event->dstPath, "found stale moveTo, cookie", event->cookie,
167  "from", std::fixed, event->timestamp);
168  handleOther(event->srcPath, event->dstPath, event->mapEntry, event->timestamp, false);
169  }
171  }
172 }
followJsonRequestProvider::processJson
virtual void processJson(const Json::Value &json)
Definition: followJsonRequestProvider.cpp:77
followJsonRequestProvider::cookieKey
static options::single< std::string > cookieKey
Definition: followJsonRequestProvider.h:19
followJsonRequestProvider::followStream
void followStream(std::istream &stream) override
Definition: followJsonRequestProvider.cpp:37
errMsg::location
class for defining the location of a error message in the source code.
Definition: errMsgQueue.h:14
renameEventHandler::forgetRenameEvent
void forgetRenameEvent(const renameHalfEvent *what)
Definition: renameEventHandler.h:66
followJsonRequestProvider::factory
static factoryTemplate< followJsonRequestProvider > factory
Definition: followJsonRequestProvider.h:11
renameHalfEvent::moveTypeEnum::moveTo
@ moveTo
followRequestProvider::handleMove
virtual void handleMove(const std::string &srcPath, const std::string &dstPath, const std::string &fromPath, const std::string &origPath, const singleMap &mapEntry, copyRequest::clock_type::time_point timestamp)
Definition: followRequestProvider.cpp:31
options::base::fIsSet
virtual bool fIsSet() const
check if this option was set, regardless of from command line or config file
Definition: Options.h:263
followRequestProvider::mayDelete
static options::single< bool > mayDelete
Definition: followRequestProvider.h:19
getlineWithTimeout
Definition: getlineWithTimeout.h:9
getlineWithTimeout::nocopyLine::line
std::string & line()
Definition: getlineWithTimeout.h:65
renameEventHandler::getNextStaleSingle
const renameHalfEvent * getNextStaleSingle(renameHalfEvent::timeType::duration timeout)
Definition: renameEventHandler.h:69
followJsonRequestProvider::moveFromOpValue
static options::single< std::string > moveFromOpValue
Definition: followJsonRequestProvider.h:22
followJsonRequestProvider::operationKey
static options::single< std::string > operationKey
Definition: followJsonRequestProvider.h:15
errMsg::level::debug
@ debug
followJsonRequestProvider::copyOpValue
static options::single< std::string > copyOpValue
Definition: followJsonRequestProvider.h:20
renameHalfEvent::moveTypeEnum::moveFrom
@ moveFrom
getlineWithTimeout::active
bool active()
Definition: getlineWithTimeout.h:45
getlineWithTimeout.h
getlineWithTimeout::nocopyLine
Definition: getlineWithTimeout.h:49
OptionsChrono.h
followRequestProvider::nullDelimiter
static options::single< bool > nullDelimiter
Definition: followRequestProvider.h:20
followJsonRequestProvider::sourceMatchRegex
static options::single< std::regex > sourceMatchRegex
Definition: followJsonRequestProvider.h:24
stopRequest::Requested
static bool Requested()
Definition: ewmscp.cpp:153
followJsonRequestProvider::pathKey
static options::single< std::string > pathKey
Definition: followJsonRequestProvider.h:14
followJsonRequestProvider::timestampFormat
static options::single< std::string > timestampFormat
Definition: followJsonRequestProvider.h:17
followJsonRequestProvider::timestampKey
static options::single< std::string > timestampKey
Definition: followJsonRequestProvider.h:16
followJsonRequestProvider::moveToOpValue
static options::single< std::string > moveToOpValue
Definition: followJsonRequestProvider.h:23
followJsonRequestProvider::timestampOffset
static options::single< copyRequest::clock_type::duration > timestampOffset
Definition: followJsonRequestProvider.h:18
followJsonRequestProvider::jsonDelimiter
static options::single< int > jsonDelimiter
Definition: followJsonRequestProvider.h:13
getlineWithTimeout::nocopyLine::timedOut
bool timedOut() const
Definition: getlineWithTimeout.h:62
errMsg::emit
void emit(level aLogLevel, const location &loc, const std::string &aObject, const std::string &aAction, const Args &... args)
function to create and enqueue a message, this is the only way that messages should be created!
Definition: errMsgQueue.h:148
renameEventHandler::getRenamePartner
const renameHalfEvent * getRenamePartner(renameHalfEvent::cookieType cookie, const renameHalfEvent::pathType &aSrcPath, const renameHalfEvent::pathType &aDstPath, renameHalfEvent::moveTypeEnum aIsMoveFrom, const singleMap &aMapEntry, renameHalfEvent::timeType::time_point aTimestamp)
Definition: renameEventHandler.h:43
followJsonRequestProvider.h
followJsonRequestProvider::renameHandler
renameEventHandler renameHandler
Definition: followJsonRequestProvider.h:25
renameHalfEvent::moveTypeEnum
moveTypeEnum
Definition: renameEventHandler.h:16
requestProvider::getDstPath
virtual const singleMap & getDstPath(const std::string &source, std::string &destination, bool baseNameOnly=false)
get detstination papth for a given source path
Definition: requestProvider.cpp:30
followRequestProvider::handleOther
virtual void handleOther(const std::string &srcPath, const std::string &dstPath, const singleMap &mapEntry, copyRequest::clock_type::time_point timestamp, bool requestForRemoval)
Definition: followRequestProvider.cpp:41
followJsonRequestProvider::getRequiredJsonValue
static std::string getRequiredJsonValue(const Json::Value &json, const std::string &key)
Definition: followJsonRequestProvider.cpp:68
renameHalfEvent::cookieType
long cookieType
Definition: renameEventHandler.h:8
followJsonRequestProvider::deleteOpValue
static options::single< std::string > deleteOpValue
Definition: followJsonRequestProvider.h:21
defineStatic
#define defineStatic(var,...)
defines a static variable and instatitates the constructor with the variable number of arguments.
Definition: ewmscp.h:42
followJsonRequestProvider::cleanupRenameEvents
void cleanupRenameEvents(copyRequest::clock_type::duration minAge)
Definition: followJsonRequestProvider.cpp:155