ewmscp  ..
followRequestProvider.h
Go to the documentation of this file.
1 #ifndef __followRequestProvider_h__
2 #define __followRequestProvider_h__
3 #include "copyRequestTypes.h"
4 #include "requestProvider.h"
5 #include "errMsgQueue.h"
6 #include "timer.h"
7 
18  protected:
21 
22 
23 
24  virtual void followStream(std::istream& stream) = 0;
25  template <typename ... Types> void enqueueOrAppend(const std::string& srcPath,
27  copyRequest::clock_type::time_point timestamp,
28  Types ... args) {
29  timerInst(enqueue);
30  auto now = copyRequest::clock_type::now();
31  if (timestamp > now) { // cannot be true, probably ntp uncertainty
32  if (timestamp - now > std::chrono::milliseconds(10)) {
34  srcPath, "createRequest",
35  "fixing future timestamp ",
36  std::chrono::duration_cast<std::chrono::duration<double>>(timestamp - now).count(), "s");
37  }
38  timestamp = now;
39  }
40 
41  copyRequest::fileInWork::inserter inserter(srcPath);
42  if (inserter.getFileInWork().getLatestRequestType() == type) { // just update the existing requests start time
43  std::string suffix;
44  copyRequest::base::getSuffix(srcPath, suffix);
45  auto& fileInWork = inserter.getFileInWork();
46  copyRequest::base::adviseDelay(timestamp - fileInWork.getEarliestprocessTime(), suffix);
47  fileInWork.updateExecTime(timestamp);
48  } else { // create a new request
49  std::unique_ptr<copyRequest::base> request(new copyRequest::base(InputHandler, srcPath, args..., timestamp));
50  if (inserter.getFileInWork().getLatestRequestType() == copyRequest::fileInWork::slotTypes::none) { // no resquest for this file yet
51  inserter.markForQueueing(request);
52  auto advisedDelay = request->getAdvisedDelay();
54  && advisedDelay != copyRequest::clock_type::duration::zero()) {
55  inserter.getFileInWork().setWaitTime(advisedDelay);
56  delayedRequests.enqueue(request, advisedDelay);
57  } else {
58  requests.enqueue(request);
59  }
60  } else { // add request to per-file list
61  inserter.enRegister(request);
62  }
63  }
64  }
65  virtual void handleMove(const std::string& srcPath,
66  const std::string& dstPath,
67  const std::string& fromPath,
68  const std::string& origPath,
69  const singleMap& mapEntry,
70  copyRequest::clock_type::time_point timestamp);
71  virtual void handleOther(const std::string& srcPath,
72  const std::string& dstPath,
73  const singleMap& mapEntry,
74  copyRequest::clock_type::time_point timestamp,
75  bool requestForRemoval);
76  followRequestProvider(decltype(requests) aRequests,
77  decltype(delayedRequests) aDelayedRequests,
78  decltype(parents) aParents,
79  decltype(InputHandler) aInputHandler,
80  decltype(OutputHandler) aOutputHandler);
81  public:
82  ~followRequestProvider() override = default;
83  bool isFollowMode() const override {
84  return true;
85  };
86  void processSources(const std::vector<std::string>& sources) override;
87 };
88 
89 
91 #endif
followRequestProvider::enqueueOrAppend
void enqueueOrAppend(const std::string &srcPath, copyRequest::fileInWork::slotTypes type, copyRequest::clock_type::time_point timestamp, Types ... args)
Definition: followRequestProvider.h:25
errMsgQueue.h
errMsg::location
class for defining the location of a error message in the source code.
Definition: errMsgQueue.h:14
options::single< bool >
class specialisation for options of type bool
Definition: Options.h:595
copyRequest::fileInWork::slotTypes::none
@ none
copyRequestTypes.h
followRequestProvider::handleMove
virtual void handleMove(const std::string &srcPath, const std::string &dstPath, const std::string &fromPath, const std::string &origPath, const singleMap &mapEntry, copyRequest::clock_type::time_point timestamp)
Definition: followRequestProvider.cpp:31
copyRequest::base::getSuffix
const std::string & getSuffix() const
Definition: copyRequest.h:421
followRequestProvider::mayDelete
static options::single< bool > mayDelete
Definition: followRequestProvider.h:19
requestProvider
generic provider of copy requests
Definition: requestProvider.h:13
copyRequest::base
class for copy requests.
Definition: copyRequest.h:99
requestProvider::parents
bool parents
Definition: requestProvider.h:33
followRequestProvider::isFollowMode
bool isFollowMode() const override
Definition: followRequestProvider.h:83
requestProvider::delayedRequests
copyRequest::timedQueue & delayedRequests
Definition: requestProvider.h:32
copyRequest::fileInWork::slotTypes::unlink
@ unlink
errMsg::level::debug
@ debug
copyRequest::base::getSlotType
fileInWork::slotTypes getSlotType() const
Definition: copyRequest.h:348
copyRequest::fileInWork::inserter::enRegister
void enRegister(std::unique_ptr< copyRequest::base > &request)
register a request in the fileInWork list
Definition: fileInWork.cpp:50
copyRequest::fileInWork::inserter::getFileInWork
fileInWork & getFileInWork()
Definition: fileInWork.h:93
copyRequest::fileInWork::inserter
create a new fileInWork instance in the map.
Definition: fileInWork.h:79
copyRequest::fileInWork::inserter::markForQueueing
void markForQueueing(std::unique_ptr< copyRequest::base > &request)
Definition: fileInWork.cpp:57
timer.h
followRequestProvider::nullDelimiter
static options::single< bool > nullDelimiter
Definition: followRequestProvider.h:20
requestProvider.h
requestProvider::OutputHandler
outputHandler::base * OutputHandler
Definition: requestProvider.h:35
followRequestProvider::followRequestProvider
followRequestProvider(decltype(requests) aRequests, decltype(delayedRequests) aDelayedRequests, decltype(parents) aParents, decltype(InputHandler) aInputHandler, decltype(OutputHandler) aOutputHandler)
Definition: followRequestProvider.cpp:9
waitQueues::simple::enqueue
void enqueue(std::unique_ptr< T > &item)
Definition: waitQueues.h:37
requestProvider::requests
copyRequest::simpleQueue & requests
Definition: requestProvider.h:31
followRequestProvider::processSources
void processSources(const std::vector< std::string > &sources) override
Definition: followRequestProvider.cpp:18
waitQueues::timed::enqueue
void enqueue(std::unique_ptr< T > &item, typename clock_type::time_point when)
Definition: waitQueues.h:216
copyRequest::base::getAdvisedDelay
clock_type::duration getAdvisedDelay() const
returns the advised delay
Definition: copyRequest.cpp:1059
errMsg::emit
void emit(level aLogLevel, const location &loc, const std::string &aObject, const std::string &aAction, const Args &... args)
function to create and enqueue a message, this is the only way that messages should be created!
Definition: errMsgQueue.h:148
timerInst
#define timerInst(subfunc)
Definition: timer.h:157
copyRequest::fileInWork::getLatestRequestType
slotTypes getLatestRequestType() const
Definition: fileInWork.cpp:41
singleMap
std::pair< std::string, std::string > singleMap
Definition: copyRequest.h:52
requestProvider::InputHandler
inputHandler::base * InputHandler
Definition: requestProvider.h:34
followRequestProvider
Definition: followRequestProvider.h:17
followRequestProvider::followStream
virtual void followStream(std::istream &stream)=0
copyRequest::base::adviseDelay
static bool adviseDelay(clock_type::duration dt, const std::string &suffix)
update the advised delay in the map.
Definition: copyRequest.cpp:1011
followRequestProvider::handleOther
virtual void handleOther(const std::string &srcPath, const std::string &dstPath, const singleMap &mapEntry, copyRequest::clock_type::time_point timestamp, bool requestForRemoval)
Definition: followRequestProvider.cpp:41
copyRequest::fileInWork::slotTypes
slotTypes
Definition: fileInWork.h:23
followRequestProvider::~followRequestProvider
~followRequestProvider() override=default
copyRequest::fileInWork::setWaitTime
void setWaitTime(clock_type::duration aWaitTime)
Definition: fileInWork.h:63