ewmscp
..
src
followKafkaRequestProvider.cpp
Go to the documentation of this file.
1
#include "
followKafkaRequestProvider.h
"
2
#include <json/json.h>
3
#include "
errMsgQueue.h
"
4
5
defineStatic
(
followKafkaRequestProvider::factory
,
"KafkaStream"
);
6
defineStatic
(
followKafkaRequestProvider::kafkaInQueue
,
"kafkaIn"
);
7
8
void
followKafkaRequestProvider::followStream
(std::istream&
/*stream*/
) {
9
}
10
11
void
followKafkaRequestProvider::processSources
(
const
std::vector<std::string>&
/*sources*/
) {
12
Json::Value json;
13
while
(
true
) {
14
try
{
15
cleanupRenameEvents
(std::chrono::seconds(30));
16
if
(
stopRequest::Requested
()) {
17
break
;
18
}
19
auto
message =
kafkaInQueue
.
receive
(std::chrono::seconds(10));
20
if
(! message.empty()) {
21
std::stringstream buffer(message);
22
try
{
23
buffer >> json;
24
processJson
(json);
25
}
catch
(
const
std::exception& e) {
26
errMsg::emit
(
errMsg::level::debug
,
errMsg::location
(),
27
"kafka receiver"
,
"json decode"
, e.what(),
" '"
, message,
"'"
);
28
}
29
json.clear();
30
}
31
}
catch
(
const
std::exception& e) {
32
errMsg::emit
(
errMsg::level::info
,
errMsg::location
(),
33
"kafka receiver"
,
"receive"
, e.what());
34
break
;
35
}
36
}
37
cleanupRenameEvents
(std::chrono::seconds(-10));
38
}
39
followJsonRequestProvider::processJson
virtual void processJson(const Json::Value &json)
Definition:
followJsonRequestProvider.cpp:77
followKafkaRequestProvider.h
errMsgQueue.h
errMsg::location
class for defining the location of a error message in the source code.
Definition:
errMsgQueue.h:14
followKafkaRequestProvider::kafkaInQueue
static messageQueue::kafka kafkaInQueue
Definition:
followKafkaRequestProvider.h:8
errMsg::level::info
@ info
errMsg::level::debug
@ debug
stopRequest::Requested
static bool Requested()
Definition:
ewmscp.cpp:153
followKafkaRequestProvider::processSources
void processSources(const std::vector< std::string > &sources) override
Definition:
followKafkaRequestProvider.cpp:11
errMsg::emit
void emit(level aLogLevel, const location &loc, const std::string &aObject, const std::string &aAction, const Args &... args)
function to create and enqueue a message, this is the only way that messages should be created!
Definition:
errMsgQueue.h:148
followKafkaRequestProvider::followStream
void followStream(std::istream &stream) override
Definition:
followKafkaRequestProvider.cpp:8
followKafkaRequestProvider::factory
static factoryTemplate< followKafkaRequestProvider > factory
Definition:
followKafkaRequestProvider.h:7
messageQueue::kafka::receive
std::string receive(std::chrono::system_clock::duration timeout) override
Definition:
kafkaQueue.cpp:84
defineStatic
#define defineStatic(var,...)
defines a static variable and instatitates the constructor with the variable number of arguments.
Definition:
ewmscp.h:42
followJsonRequestProvider::cleanupRenameEvents
void cleanupRenameEvents(copyRequest::clock_type::duration minAge)
Definition:
followJsonRequestProvider.cpp:155
Generated by
1.8.17