ewmscp  ..
Public Member Functions | Protected Member Functions | Static Protected Attributes | List of all members
followRequestProvider Class Referenceabstract

#include <followRequestProvider.h>

Inheritance diagram for followRequestProvider:
[legend]
Collaboration diagram for followRequestProvider:
[legend]

Public Member Functions

 ~followRequestProvider () override=default
 
bool isFollowMode () const override
 
void processSources (const std::vector< std::string > &sources) override
 
- Public Member Functions inherited from requestProvider
 requestProvider (decltype(requests) aRequests, decltype(delayedRequests) aDelayedRequests, decltype(parents) aParents, decltype(InputHandler) aInputHandler, decltype(OutputHandler) aOutputHandler)
 ! use only the file name part in the destination More...
 
virtual ~requestProvider ()=default
 
virtual void prepareMappings (std::vector< std::string > &sources, std::string &destination)
 
virtual void printMappings (std::ostream &stream)
 

Protected Member Functions

virtual void followStream (std::istream &stream)=0
 
template<typename ... Types>
void enqueueOrAppend (const std::string &srcPath, copyRequest::fileInWork::slotTypes type, copyRequest::clock_type::time_point timestamp, Types ... args)
 
virtual void handleMove (const std::string &srcPath, const std::string &dstPath, const std::string &fromPath, const std::string &origPath, const singleMap &mapEntry, copyRequest::clock_type::time_point timestamp)
 
virtual void handleOther (const std::string &srcPath, const std::string &dstPath, const singleMap &mapEntry, copyRequest::clock_type::time_point timestamp, bool requestForRemoval)
 
 followRequestProvider (decltype(requests) aRequests, decltype(delayedRequests) aDelayedRequests, decltype(parents) aParents, decltype(InputHandler) aInputHandler, decltype(OutputHandler) aOutputHandler)
 
- Protected Member Functions inherited from requestProvider
virtual const singleMapgetDstPath (const std::string &source, std::string &destination, bool baseNameOnly=false)
 get detstination papth for a given source path More...
 

Static Protected Attributes

static options::single< bool > mayDelete
 
static options::single< bool > nullDelimiter
 
- Static Protected Attributes inherited from requestProvider
static options::map< std::string, pathMapTypepathMap
 
static singleMap badMapEntry
 

Additional Inherited Members

- Static Public Member Functions inherited from requestProvider
static requestProvidernewProvider (const std::string &choice, decltype(requests) aRequests, decltype(delayedRequests) aDelayedRequests, decltype(parents) aParents, decltype(InputHandler) aInputHandler, decltype(OutputHandler) aOutputHandler)
 
static void addAllowedNamesToOption (options::single< std::string > &option)
 
- Static Protected Member Functions inherited from requestProvider
static std::map< std::string, factoryClass * > & getFactoryMap ()
 
- Protected Attributes inherited from requestProvider
copyRequest::simpleQueuerequests
 
copyRequest::timedQueuedelayedRequests
 
bool parents
 
inputHandler::baseInputHandler
 
outputHandler::baseOutputHandler
 

Detailed Description

Definition at line 17 of file followRequestProvider.h.

Constructor & Destructor Documentation

◆ followRequestProvider()

followRequestProvider::followRequestProvider ( decltype(requests aRequests,
decltype(delayedRequests aDelayedRequests,
decltype(parents aParents,
decltype(InputHandler aInputHandler,
decltype(OutputHandler aOutputHandler 
)
protected

Definition at line 9 of file followRequestProvider.cpp.

13  :
14  requestProvider(aRequests, aDelayedRequests, aParents,
15  aInputHandler, aOutputHandler) {
16 }

◆ ~followRequestProvider()

followRequestProvider::~followRequestProvider ( )
overridedefault

Member Function Documentation

◆ enqueueOrAppend()

template<typename ... Types>
void followRequestProvider::enqueueOrAppend ( const std::string &  srcPath,
copyRequest::fileInWork::slotTypes  type,
copyRequest::clock_type::time_point  timestamp,
Types ...  args 
)
inlineprotected

Definition at line 25 of file followRequestProvider.h.

28  {
29  timerInst(enqueue);
30  auto now = copyRequest::clock_type::now();
31  if (timestamp > now) { // cannot be true, probably ntp uncertainty
32  if (timestamp - now > std::chrono::milliseconds(10)) {
34  srcPath, "createRequest",
35  "fixing future timestamp ",
36  std::chrono::duration_cast<std::chrono::duration<double>>(timestamp - now).count(), "s");
37  }
38  timestamp = now;
39  }
40 
41  copyRequest::fileInWork::inserter inserter(srcPath);
42  if (inserter.getFileInWork().getLatestRequestType() == type) { // just update the existing requests start time
43  std::string suffix;
44  copyRequest::base::getSuffix(srcPath, suffix);
45  auto& fileInWork = inserter.getFileInWork();
46  copyRequest::base::adviseDelay(timestamp - fileInWork.getEarliestprocessTime(), suffix);
47  fileInWork.updateExecTime(timestamp);
48  } else { // create a new request
49  std::unique_ptr<copyRequest::base> request(new copyRequest::base(InputHandler, srcPath, args..., timestamp));
50  if (inserter.getFileInWork().getLatestRequestType() == copyRequest::fileInWork::slotTypes::none) { // no resquest for this file yet
51  inserter.markForQueueing(request);
52  auto advisedDelay = request->getAdvisedDelay();
53  if (request->getSlotType() != copyRequest::fileInWork::slotTypes::unlink
54  && advisedDelay != copyRequest::clock_type::duration::zero()) {
55  inserter.getFileInWork().setWaitTime(advisedDelay);
56  delayedRequests.enqueue(request, advisedDelay);
57  } else {
58  requests.enqueue(request);
59  }
60  } else { // add request to per-file list
61  inserter.enRegister(request);
62  }
63  }
64  }

References copyRequest::base::adviseDelay(), errMsg::debug, requestProvider::delayedRequests, errMsg::emit(), waitQueues::simple< T >::enqueue(), waitQueues::timed< T, clock_type >::enqueue(), copyRequest::fileInWork::inserter::enRegister(), copyRequest::base::getAdvisedDelay(), copyRequest::fileInWork::inserter::getFileInWork(), copyRequest::fileInWork::getLatestRequestType(), copyRequest::base::getSlotType(), copyRequest::base::getSuffix(), requestProvider::InputHandler, copyRequest::fileInWork::inserter::markForQueueing(), copyRequest::fileInWork::none, requestProvider::requests, copyRequest::fileInWork::setWaitTime(), timerInst, and copyRequest::fileInWork::unlink.

Referenced by handleMove(), and handleOther().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ followStream()

virtual void followRequestProvider::followStream ( std::istream &  stream)
protectedpure virtual

Implemented in followJsonRequestProvider, followKafkaRequestProvider, followInotifyWatchRequestProvider, and followBeeGfsRequestProvider.

Referenced by processSources().

Here is the caller graph for this function:

◆ handleMove()

void followRequestProvider::handleMove ( const std::string &  srcPath,
const std::string &  dstPath,
const std::string &  fromPath,
const std::string &  origPath,
const singleMap mapEntry,
copyRequest::clock_type::time_point  timestamp 
)
protectedvirtual

Definition at line 31 of file followRequestProvider.cpp.

36  {
38  dstPath, fromPath, origPath, mapEntry);
39 }

References enqueueOrAppend(), and copyRequest::fileInWork::move.

Referenced by followInotifyWatchRequestProvider::followStream(), followJsonRequestProvider::processJson(), and followBeeGfsRequestProvider::processSources().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ handleOther()

void followRequestProvider::handleOther ( const std::string &  srcPath,
const std::string &  dstPath,
const singleMap mapEntry,
copyRequest::clock_type::time_point  timestamp,
bool  requestForRemoval 
)
protectedvirtual

Definition at line 41 of file followRequestProvider.cpp.

45  {
46  if (mayDelete || !requestForRemoval) {
47  enqueueOrAppend(srcPath,
48  requestForRemoval ? copyRequest::fileInWork::slotTypes::unlink :
50  timestamp,
51  dstPath, mapEntry, requestForRemoval);
52  }
53 }

References copyRequest::fileInWork::copy, enqueueOrAppend(), mayDelete, and copyRequest::fileInWork::unlink.

Referenced by followJsonRequestProvider::cleanupRenameEvents(), followInotifyWatchRequestProvider::followStream(), followJsonRequestProvider::processJson(), and followBeeGfsRequestProvider::processSources().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ isFollowMode()

bool followRequestProvider::isFollowMode ( ) const
inlineoverridevirtual

Implements requestProvider.

Definition at line 83 of file followRequestProvider.h.

83  {
84  return true;
85  };

◆ processSources()

void followRequestProvider::processSources ( const std::vector< std::string > &  sources)
overridevirtual

Implements requestProvider.

Definition at line 18 of file followRequestProvider.cpp.

18  {
19  for (auto& source : sources) {
20  if (source == "-") {
21  followStream(std::cin);
22  } else {
23  std::ifstream stream(source);
24  followStream(stream);
25  }
26  }
27 }

References followStream().

Here is the call graph for this function:

Member Data Documentation

◆ mayDelete

options::single<bool> followRequestProvider::mayDelete
staticprotected

◆ nullDelimiter

options::single<bool> followRequestProvider::nullDelimiter
staticprotected

The documentation for this class was generated from the following files:
copyRequest::fileInWork::slotTypes::copy
@ copy
copyRequest::fileInWork::slotTypes::move
@ move
followRequestProvider::enqueueOrAppend
void enqueueOrAppend(const std::string &srcPath, copyRequest::fileInWork::slotTypes type, copyRequest::clock_type::time_point timestamp, Types ... args)
Definition: followRequestProvider.h:25
errMsg::location
class for defining the location of a error message in the source code.
Definition: errMsgQueue.h:14
copyRequest::fileInWork::slotTypes::none
@ none
copyRequest::base::getSuffix
const std::string & getSuffix() const
Definition: copyRequest.h:421
followRequestProvider::mayDelete
static options::single< bool > mayDelete
Definition: followRequestProvider.h:19
copyRequest::base
class for copy requests.
Definition: copyRequest.h:99
requestProvider::delayedRequests
copyRequest::timedQueue & delayedRequests
Definition: requestProvider.h:32
copyRequest::fileInWork::slotTypes::unlink
@ unlink
errMsg::level::debug
@ debug
copyRequest::fileInWork::inserter
create a new fileInWork instance in the map.
Definition: fileInWork.h:79
waitQueues::simple::enqueue
void enqueue(std::unique_ptr< T > &item)
Definition: waitQueues.h:37
requestProvider::requests
copyRequest::simpleQueue & requests
Definition: requestProvider.h:31
waitQueues::timed::enqueue
void enqueue(std::unique_ptr< T > &item, typename clock_type::time_point when)
Definition: waitQueues.h:216
errMsg::emit
void emit(level aLogLevel, const location &loc, const std::string &aObject, const std::string &aAction, const Args &... args)
function to create and enqueue a message, this is the only way that messages should be created!
Definition: errMsgQueue.h:148
timerInst
#define timerInst(subfunc)
Definition: timer.h:157
requestProvider::InputHandler
inputHandler::base * InputHandler
Definition: requestProvider.h:34
followRequestProvider::followStream
virtual void followStream(std::istream &stream)=0
copyRequest::base::adviseDelay
static bool adviseDelay(clock_type::duration dt, const std::string &suffix)
update the advised delay in the map.
Definition: copyRequest.cpp:1011
requestProvider::requestProvider
requestProvider(decltype(requests) aRequests, decltype(delayedRequests) aDelayedRequests, decltype(parents) aParents, decltype(InputHandler) aInputHandler, decltype(OutputHandler) aOutputHandler)
! use only the file name part in the destination
Definition: requestProvider.h:74