ewmscp  ..
Public Member Functions | Private Member Functions | Static Private Attributes | List of all members
followKafkaRequestProvider Class Reference

#include <followKafkaRequestProvider.h>

Inheritance diagram for followKafkaRequestProvider:
[legend]
Collaboration diagram for followKafkaRequestProvider:
[legend]

Public Member Functions

 followKafkaRequestProvider (decltype(requests) aRequests, decltype(delayedRequests) aDelayedRequests, decltype(parents) aParents, decltype(InputHandler) aInputHandler, decltype(OutputHandler) aOutputHandler)
 
 ~followKafkaRequestProvider () override=default
 
void processSources (const std::vector< std::string > &sources) override
 
- Public Member Functions inherited from followJsonRequestProvider
 followJsonRequestProvider (decltype(requests) aRequests, decltype(delayedRequests) aDelayedRequests, decltype(parents) aParents, decltype(InputHandler) aInputHandler, decltype(OutputHandler) aOutputHandler)
 
 ~followJsonRequestProvider () override=default
 
- Public Member Functions inherited from followRequestProvider
 ~followRequestProvider () override=default
 
bool isFollowMode () const override
 
void processSources (const std::vector< std::string > &sources) override
 
- Public Member Functions inherited from requestProvider
 requestProvider (decltype(requests) aRequests, decltype(delayedRequests) aDelayedRequests, decltype(parents) aParents, decltype(InputHandler) aInputHandler, decltype(OutputHandler) aOutputHandler)
 ! use only the file name part in the destination More...
 
virtual ~requestProvider ()=default
 
virtual void prepareMappings (std::vector< std::string > &sources, std::string &destination)
 
virtual void printMappings (std::ostream &stream)
 

Private Member Functions

void followStream (std::istream &stream) override
 

Static Private Attributes

static factoryTemplate< followKafkaRequestProviderfactory
 
static messageQueue::kafka kafkaInQueue
 

Additional Inherited Members

- Static Public Member Functions inherited from requestProvider
static requestProvidernewProvider (const std::string &choice, decltype(requests) aRequests, decltype(delayedRequests) aDelayedRequests, decltype(parents) aParents, decltype(InputHandler) aInputHandler, decltype(OutputHandler) aOutputHandler)
 
static void addAllowedNamesToOption (options::single< std::string > &option)
 
- Protected Member Functions inherited from followJsonRequestProvider
void cleanupRenameEvents (copyRequest::clock_type::duration minAge)
 
virtual void processJson (const Json::Value &json)
 
- Protected Member Functions inherited from followRequestProvider
template<typename ... Types>
void enqueueOrAppend (const std::string &srcPath, copyRequest::fileInWork::slotTypes type, copyRequest::clock_type::time_point timestamp, Types ... args)
 
virtual void handleMove (const std::string &srcPath, const std::string &dstPath, const std::string &fromPath, const std::string &origPath, const singleMap &mapEntry, copyRequest::clock_type::time_point timestamp)
 
virtual void handleOther (const std::string &srcPath, const std::string &dstPath, const singleMap &mapEntry, copyRequest::clock_type::time_point timestamp, bool requestForRemoval)
 
 followRequestProvider (decltype(requests) aRequests, decltype(delayedRequests) aDelayedRequests, decltype(parents) aParents, decltype(InputHandler) aInputHandler, decltype(OutputHandler) aOutputHandler)
 
- Protected Member Functions inherited from requestProvider
virtual const singleMapgetDstPath (const std::string &source, std::string &destination, bool baseNameOnly=false)
 get detstination papth for a given source path More...
 
- Static Protected Member Functions inherited from followJsonRequestProvider
static std::string getRequiredJsonValue (const Json::Value &json, const std::string &key)
 
- Static Protected Member Functions inherited from requestProvider
static std::map< std::string, factoryClass * > & getFactoryMap ()
 
- Protected Attributes inherited from requestProvider
copyRequest::simpleQueuerequests
 
copyRequest::timedQueuedelayedRequests
 
bool parents
 
inputHandler::baseInputHandler
 
outputHandler::baseOutputHandler
 
- Static Protected Attributes inherited from followRequestProvider
static options::single< bool > mayDelete
 
static options::single< bool > nullDelimiter
 
- Static Protected Attributes inherited from requestProvider
static options::map< std::string, pathMapTypepathMap
 
static singleMap badMapEntry
 

Detailed Description

Definition at line 5 of file followKafkaRequestProvider.h.

Constructor & Destructor Documentation

◆ followKafkaRequestProvider()

followKafkaRequestProvider::followKafkaRequestProvider ( decltype(requests aRequests,
decltype(delayedRequests aDelayedRequests,
decltype(parents aParents,
decltype(InputHandler aInputHandler,
decltype(OutputHandler aOutputHandler 
)
inline

Definition at line 11 of file followKafkaRequestProvider.h.

15  :
16  followJsonRequestProvider(aRequests, aDelayedRequests, aParents,
17  aInputHandler, aOutputHandler) {
18  kafkaInQueue.init(true);
19  };

References messageQueue::kafka::init(), and kafkaInQueue.

Here is the call graph for this function:

◆ ~followKafkaRequestProvider()

followKafkaRequestProvider::~followKafkaRequestProvider ( )
overridedefault

Member Function Documentation

◆ followStream()

void followKafkaRequestProvider::followStream ( std::istream &  stream)
overrideprivatevirtual

Reimplemented from followJsonRequestProvider.

Definition at line 8 of file followKafkaRequestProvider.cpp.

8  {
9 }

◆ processSources()

void followKafkaRequestProvider::processSources ( const std::vector< std::string > &  sources)
overridevirtual

Implements requestProvider.

Definition at line 11 of file followKafkaRequestProvider.cpp.

11  {
12  Json::Value json;
13  while (true) {
14  try {
15  cleanupRenameEvents(std::chrono::seconds(30));
16  if (stopRequest::Requested()) {
17  break;
18  }
19  auto message = kafkaInQueue.receive(std::chrono::seconds(10));
20  if (! message.empty()) {
21  std::stringstream buffer(message);
22  try {
23  buffer >> json;
24  processJson(json);
25  } catch (const std::exception& e) {
27  "kafka receiver", "json decode", e.what(), " '", message, "'");
28  }
29  json.clear();
30  }
31  } catch (const std::exception& e) {
33  "kafka receiver", "receive", e.what());
34  break;
35  }
36  }
37  cleanupRenameEvents(std::chrono::seconds(-10));
38 }

References followJsonRequestProvider::cleanupRenameEvents(), errMsg::debug, errMsg::emit(), errMsg::info, kafkaInQueue, followJsonRequestProvider::processJson(), messageQueue::kafka::receive(), and stopRequest::Requested().

Here is the call graph for this function:

Member Data Documentation

◆ factory

factoryTemplate<followKafkaRequestProvider> followKafkaRequestProvider::factory
staticprivate

Definition at line 7 of file followKafkaRequestProvider.h.

◆ kafkaInQueue

messageQueue::kafka followKafkaRequestProvider::kafkaInQueue
staticprivate

Definition at line 8 of file followKafkaRequestProvider.h.

Referenced by followKafkaRequestProvider(), and processSources().


The documentation for this class was generated from the following files:
followJsonRequestProvider::processJson
virtual void processJson(const Json::Value &json)
Definition: followJsonRequestProvider.cpp:77
errMsg::location
class for defining the location of a error message in the source code.
Definition: errMsgQueue.h:14
followKafkaRequestProvider::kafkaInQueue
static messageQueue::kafka kafkaInQueue
Definition: followKafkaRequestProvider.h:8
errMsg::level::info
@ info
errMsg::level::debug
@ debug
stopRequest::Requested
static bool Requested()
Definition: ewmscp.cpp:153
messageQueue::kafka::init
void init(bool isConsumer=false)
Definition: kafkaQueue.cpp:14
errMsg::emit
void emit(level aLogLevel, const location &loc, const std::string &aObject, const std::string &aAction, const Args &... args)
function to create and enqueue a message, this is the only way that messages should be created!
Definition: errMsgQueue.h:148
messageQueue::kafka::receive
std::string receive(std::chrono::system_clock::duration timeout) override
Definition: kafkaQueue.cpp:84
followJsonRequestProvider::followJsonRequestProvider
followJsonRequestProvider(decltype(requests) aRequests, decltype(delayedRequests) aDelayedRequests, decltype(parents) aParents, decltype(InputHandler) aInputHandler, decltype(OutputHandler) aOutputHandler)
Definition: followJsonRequestProvider.h:34
followJsonRequestProvider::cleanupRenameEvents
void cleanupRenameEvents(copyRequest::clock_type::duration minAge)
Definition: followJsonRequestProvider.cpp:155