ewmscp  ..
Classes | Functions | Variables
ewmscp.cpp File Reference

(v0.19-24-g0617ca1 with changes)

#include <atomic>
#include <condition_variable>
#include <csignal>
#include <deque>
#include <forward_list>
#include <iomanip>
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>
#include <dirent.h>
#include <grp.h>
#include <libgen.h>
#include <pwd.h>
#include <string.h>
#include <sys/resource.h>
#include <sys/time.h>
#include <unistd.h>
#include "timer.h"
#include "copyRequest.h"
#include "ewmscp.h"
#include "scoped.h"
#include "syslogstream.h"
#include "errMsgQueue.h"
#include "throwcall.h"
#include "requestProvider.h"
#include "git-rev.h"
Include dependency graph for ewmscp.cpp:

Go to the source code of this file.

Classes

class  queuesAndThreads
 class to make sure all threads are joined before thread object is deleted in case of an exception to do so, all queues must be signalled as done More...
 

Functions

decltype(uid) uid ('\0', "uid", "uid of created files and directories", -1)
 
decltype(gid) gid ('\0', "gid", "gid of created files and directories", -1)
 
decltype(noCopy) noCopy ('\0', "noCopy", "do not create a copy", false)
 
decltype(dereference) dereference ( 'L', "dereference", "always follow symbolic links in SOURCE", false)
 
decltype(verbose) verbose ( 'v', "verbose", "explain what is being done", false)
 
decltype(quiet) quiet ( 'q', "quiet", "be as quiet as possible", false)
 
decltype(workDir) workDir ('\0', "workDir", "use DIRECTORY as work dir")
 
decltype(continueOnError) continueOnError ('\0', "continueOnError", "don't stop on errors in non-follow mode", false)
 
decltype(nThreads) nThreads ('\0', "nThreads", "number of simultaneous copys", 1)
 
decltype(writeRateLimit) writeRateLimit ("writeRateLimit")
 
decltype(fileRateLimit) fileRateLimit ("fileRateLimit")
 
decltype(readRateLimit) readRateLimit ("readRateLimit")
 
decltype(modeBits) modeBits ('\0', "mode", "access mode for copied files", 0644)
 
decltype(statPrefix) statPrefix ('\0', "statPrefix", "prefix for statistics log lines", "")
 
decltype(requestProviderName) requestProviderName ('\0', "requestProvider", "name of request provider", "cmdLine")
 
decltype(printStatAnyway) printStatAnyway ('\0', "printStatAnyway", "print stat values at the end anyway")
 
decltype(outputHandlerName) outputHandlerName ('\0', "outputHandler", "name of outputHandler", "posixFile")
 
decltype(inputHandlerName) inputHandlerName ('\0', "inputHandler", "name of inputHandler", "posixFile")
 
decltype(resultOutput) resultOutput (nullptr)
 
static void pidFileRemover ()
 
static volatile std::atomic< bool > statPrintRequested (false)
 
static volatile std::atomic< bool > statResetRequested (false)
 
void sigUsrHandler (int sigNum)
 
void delayRequest (copyRequest::timedQueue &delayedRequests, copyRequest::simpleQueue &requests)
 runs as it's own thread to move delayed requests from the delay queue to the requests queue as soon as they become available. More...
 
void printResults (copyRequest::simpleQueue &results, copyRequest::simpleQueue &requests, std::ostream &hashStream, std::ostream &logStream, std::ostream &statStream)
 
void printErrors (std::ostream &errStream, messageQueue::queue *errQueue)
 

Variables

decltype(preserve) preserve
 set of properties to preserve in the copy More...
 
decltype(checksumCreators) checksumCreators
 
static options::single< std::string > pidFileName ('\0', "pidFile", "name of PID file")
 
static options::single< bool > printErrorLocation ('\0', "printErrorLocation", "print error location (file,line,v fct) instead of prefix")
 

Function Documentation

◆ delayRequest()

void delayRequest ( copyRequest::timedQueue delayedRequests,
copyRequest::simpleQueue requests 
)

runs as it's own thread to move delayed requests from the delay queue to the requests queue as soon as they become available.

The delaying itself is hidden inside the delayedRequests queue, see waitQueues::timed

Definition at line 224 of file ewmscp.cpp.

225  {
226  while (auto request = delayedRequests.dequeue()) {
227  requests.enqueue(request);
228  }
229 }

References waitQueues::timed< T, clock_type >::dequeue(), and waitQueues::simple< T >::enqueue().

Here is the call graph for this function:

◆ pidFileRemover()

static void pidFileRemover ( )
static

Definition at line 120 of file ewmscp.cpp.

120  {
121  if (! pidFileName.empty()) {
122  throwcall::good0(unlink(pidFileName.c_str()), "can't unlink pid file ", pidFileName);
123  }
124 }

References throwcall::good0(), and pidFileName.

Here is the call graph for this function:

◆ printErrors()

void printErrors ( std::ostream &  errStream,
messageQueue::queue errQueue 
)

Definition at line 291 of file ewmscp.cpp.

291  {
292  try {
293  while (auto msg = errMsg::message::getQueue().dequeue()) {
294  timerInst(produceLogMsg);
295  if (printErrorLocation) {
296  errStream << msg->getLoc().getFile() << ":" << msg->getLoc().getLine()
297  << ": in " << msg->getLoc().getFunc() << "(): ";
298  } else {
299  errStream << msg->getLogLevel() << statPrefix << ": ";
300  }
301  errStream << msg->getObject() << " " << msg->getAction() << " " << msg->getMessage() << "\n";
302  if (errQueue) {
303 #ifdef WithJsonCpp
304  Json::Value root;
305  if (!prefixJsonName.empty()) {
306  root[prefixJsonName] = statPrefix;
307  }
308  for (auto& item : jsonExtraFields) {
309  root[item.first] = item.second;
310  }
311  root["level"] = logstream::namedLevel::getNameByLevel(msg->getLogLevel());
312  root["path"] = msg->getObject();
313  root["operation"] = msg->getAction();
314  root["message"] = msg->getMessage();
315  root["time"] = std::chrono::duration_cast<std::chrono::duration<double>>(msg->getTimeStamp().time_since_epoch()).count();
316  static Json::FastWriter jsonWriter;
317  errQueue->send(jsonWriter.write(root));
318 #else
319  errQueue->send(msg->getMessage());
320 #endif
321  }
322  }
323  } catch (const std::exception& e) {
324  stopRequest::RequestStop(e.what());
325  } catch (...) {
326  stopRequest::RequestStop("unknown exception");
327  }
328 }

References errStream(), logstream::namedLevel::getNameByLevel(), errMsg::message::getQueue(), printErrorLocation, stopRequest::RequestStop(), messageQueue::queue::send(), statPrefix(), and timerInst.

Here is the call graph for this function:

◆ printResults()

void printResults ( copyRequest::simpleQueue results,
copyRequest::simpleQueue requests,
std::ostream &  hashStream,
std::ostream &  logStream,
std::ostream &  statStream 
)

Definition at line 231 of file ewmscp.cpp.

235  {
236  try {
237  bool timedOut = true;
238  auto lastQueueSize = requests.size();
239  auto lastQueueSizePrint = std::chrono::system_clock::now();
240 
241  while (timedOut == true) {
242  while (auto request = results.dequeue(std::chrono::seconds(1), timedOut)) {
243  request->printResults(hashStream, logStream);
244  // delete request;
245  }
246 
247  if (timedOut) {
248  logStream.flush();
249  }
250 
251  auto now = std::chrono::system_clock::now();
252 
253  if (statPrintRequested) {
254  statStream << statPrefix << "statistics values at " << std::fixed
255  << std::chrono::duration<double>(now.time_since_epoch()).count() << "\n";
256  statStream << statPrefix << "outstanding requests: " << requests.size() << "\n";
257  copyRequest::base::getStatPrinter()(statStream);
258  statStream.flush();
259  statPrintRequested = false;
260  } else {
261  auto queueSize = requests.size();
262 
263  if (lastQueueSize != queueSize &&
264  queueSize > 1 &&
265  now - lastQueueSizePrint > std::chrono::seconds(1)) {
266  statStream << statPrefix << "outstanding requests: " << requests.size() << "\n";
267  lastQueueSize = queueSize;
268  lastQueueSizePrint = now;
269  }
270 
271  }
272  if (statResetRequested) {
274  statResetRequested = false;
275  }
276  }
277 
278  copyRequest::base::getStatPrinter()(statStream);
279  } catch (const std::exception& e) {
281  "printThread","caught exception ",e.what());
282  stopRequest::RequestStop(e.what());
283  } catch (...) {
284  stopRequest::RequestStop("unknown exception");
285  }
286 }

References waitQueues::simple< T >::dequeue(), errMsg::emit(), copyRequest::base::getStatPrinter(), stopRequest::RequestStop(), copyRequest::base::resetStats(), waitQueues::simple< T >::size(), statPrefix(), statPrintRequested(), statResetRequested(), and errMsg::warning.

Here is the call graph for this function:

◆ resultOutput()

decltype( resultOutput ) resultOutput ( nullptr  )

◆ sigUsrHandler()

void sigUsrHandler ( int  sigNum)

Definition at line 129 of file ewmscp.cpp.

129  {
130  statPrintRequested = true;
131  if (sigNum == SIGUSR2) {
132  statResetRequested = true;
133  }
134 }

References statPrintRequested(), and statResetRequested().

Here is the call graph for this function:

◆ statPrintRequested()

static volatile std::atomic<bool> statPrintRequested ( false  )
static

Referenced by printResults(), and sigUsrHandler().

Here is the caller graph for this function:

◆ statResetRequested()

static volatile std::atomic<bool> statResetRequested ( false  )
static

Referenced by printResults(), and sigUsrHandler().

Here is the caller graph for this function:

Variable Documentation

◆ checksumCreators

decltype( checksumCreators ) checksumCreators

◆ pidFileName

options::single<std::string> pidFileName('\0', "pidFile", "name of PID file")
static

Referenced by pidFileRemover().

◆ preserve

decltype( preserve ) preserve

◆ printErrorLocation

options::single<bool> printErrorLocation('\0', "printErrorLocation", "print error location (file,line,v fct) instead of prefix")
static

Referenced by printErrors().

pidFileName
static options::single< std::string > pidFileName('\0', "pidFile", "name of PID file")
errMsg::level::warning
@ warning
errMsg::location
class for defining the location of a error message in the source code.
Definition: errMsgQueue.h:14
waitQueues::simple::size
decltype(queue.size()) size() const
Definition: waitQueues.h:99
errStream
std::ostream * errStream(nullptr)
errMsg::message::getQueue
static waitQueues::simple< message > & getQueue()
get reference to teh message queue
Definition: errMsgQueue.h:90
copyRequest::base::getStatPrinter
static std::function< void(std::ostream &)> & getStatPrinter()
we can't have static functions virtual, so we explicitly use a function to give us the proper print f...
Definition: copyRequest.cpp:1816
messageQueue::queue::send
virtual void send(const std::string &aMessage, const std::string &aTopic="")=0
statPrintRequested
static volatile std::atomic< bool > statPrintRequested(false)
logstream::namedLevel::getNameByLevel
static const std::string & getNameByLevel(level aLevel)
Definition: syslogstream.h:44
waitQueues::simple::enqueue
void enqueue(std::unique_ptr< T > &item)
Definition: waitQueues.h:37
printErrorLocation
static options::single< bool > printErrorLocation('\0', "printErrorLocation", "print error location (file,line,v fct) instead of prefix")
statResetRequested
static volatile std::atomic< bool > statResetRequested(false)
statPrefix
decltype(statPrefix) statPrefix('\0', "statPrefix", "prefix for statistics log lines", "")
errMsg::emit
void emit(level aLogLevel, const location &loc, const std::string &aObject, const std::string &aAction, const Args &... args)
function to create and enqueue a message, this is the only way that messages should be created!
Definition: errMsgQueue.h:148
timerInst
#define timerInst(subfunc)
Definition: timer.h:157
waitQueues::timed::dequeue
std::unique_ptr< T > dequeue()
Definition: waitQueues.h:240
throwcall::good0
void good0(T call, const Args &... args)
template function to wrap system calls that return 0 on success
Definition: throwcall.h:40
waitQueues::simple::dequeue
std::unique_ptr< T > dequeue(bool mayCreateNew, Types ... args)
Definition: waitQueues.h:59
stopRequest::RequestStop
static void RequestStop(const std::string &aReason)
Definition: ewmscp.cpp:162
copyRequest::base::resetStats
static void resetStats()
Definition: copyRequest.cpp:1852