ewmscp  ..
followKafkaRequestProvider.cpp
Go to the documentation of this file.
2 #include <json/json.h>
3 #include "errMsgQueue.h"
4 
7 
8 void followKafkaRequestProvider::followStream(std::istream& /*stream*/) {
9 }
10 
11 void followKafkaRequestProvider::processSources(const std::vector<std::string>& /*sources*/) {
12  Json::Value json;
13  while (true) {
14  try {
15  cleanupRenameEvents(std::chrono::seconds(30));
16  if (stopRequest::Requested()) {
17  break;
18  }
19  auto message = kafkaInQueue.receive(std::chrono::seconds(10));
20  if (! message.empty()) {
21  std::stringstream buffer(message);
22  try {
23  buffer >> json;
24  processJson(json);
25  } catch (const std::exception& e) {
27  "kafka receiver", "json decode", e.what(), " '", message, "'");
28  }
29  json.clear();
30  }
31  } catch (const std::exception& e) {
33  "kafka receiver", "receive", e.what());
34  break;
35  }
36  }
37  cleanupRenameEvents(std::chrono::seconds(-10));
38 }
39 
followJsonRequestProvider::processJson
virtual void processJson(const Json::Value &json)
Definition: followJsonRequestProvider.cpp:77
followKafkaRequestProvider.h
errMsgQueue.h
errMsg::location
class for defining the location of a error message in the source code.
Definition: errMsgQueue.h:14
followKafkaRequestProvider::kafkaInQueue
static messageQueue::kafka kafkaInQueue
Definition: followKafkaRequestProvider.h:8
errMsg::level::info
@ info
errMsg::level::debug
@ debug
stopRequest::Requested
static bool Requested()
Definition: ewmscp.cpp:153
followKafkaRequestProvider::processSources
void processSources(const std::vector< std::string > &sources) override
Definition: followKafkaRequestProvider.cpp:11
errMsg::emit
void emit(level aLogLevel, const location &loc, const std::string &aObject, const std::string &aAction, const Args &... args)
function to create and enqueue a message, this is the only way that messages should be created!
Definition: errMsgQueue.h:148
followKafkaRequestProvider::followStream
void followStream(std::istream &stream) override
Definition: followKafkaRequestProvider.cpp:8
followKafkaRequestProvider::factory
static factoryTemplate< followKafkaRequestProvider > factory
Definition: followKafkaRequestProvider.h:7
messageQueue::kafka::receive
std::string receive(std::chrono::system_clock::duration timeout) override
Definition: kafkaQueue.cpp:84
defineStatic
#define defineStatic(var,...)
defines a static variable and instatitates the constructor with the variable number of arguments.
Definition: ewmscp.h:42
followJsonRequestProvider::cleanupRenameEvents
void cleanupRenameEvents(copyRequest::clock_type::duration minAge)
Definition: followJsonRequestProvider.cpp:155