ewmscp  ..
ewmscp.cpp
Go to the documentation of this file.
1 /*
2  ewmscp: copy program with extended functionality
3  Copyright (C) 2018 Juergen Hannappel
4 
5  This program is free software: you can redistribute it and/or modify
6  it under the terms of the GNU General Public License as published by
7  the Free Software Foundation, either version 3 of the License, or
8  (at your option) any later version.
9 
10  This program is distributed in the hope that it will be useful,
11  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  GNU General Public License for more details.
14 
15  You should have received a copy of the GNU General Public License
16  along with this program. If not, see <https://www.gnu.org/licenses/>.
17 */
19 #include <atomic>
20 #include <condition_variable>
21 #include <csignal>
22 #include <deque>
23 #include <forward_list>
24 #include <iomanip>
25 #include <iostream>
26 #include <mutex>
27 #include <thread>
28 #include <vector>
29 
30 
31 #include <dirent.h>
32 #include <grp.h>
33 #include <libgen.h>
34 #include <pwd.h>
35 #include <string.h>
36 #include <sys/resource.h>
37 #include <sys/time.h>
38 #include <unistd.h>
39 
40 #include "timer.h"
41 #include "copyRequest.h"
42 #include "ewmscp.h"
43 #include "scoped.h"
44 #include "syslogstream.h"
45 #include "errMsgQueue.h"
46 #include "throwcall.h"
47 #ifdef WithKafka
48 #include "kafkaQueue.h"
49 #endif
50 #ifdef WithJsonCpp
51 #include <json/json.h>
52 #endif
53 #include "requestProvider.h"
54 #include "git-rev.h"
55 
68 defineStatic(uid, '\0', "uid", "uid of created files and directories", -1);
69 defineStatic(gid, '\0', "gid", "gid of created files and directories", -1);
70 defineStatic(noCopy, '\0', "noCopy", "do not create a copy", false);
71 defineStatic(dereference, 'L', "dereference",
72  "always follow symbolic links in SOURCE", false);
73 defineStatic(verbose, 'v', "verbose", "explain what is being done", false);
74 defineStatic(quiet, 'q', "quiet", "be as quiet as possible", false);
75 defineStatic(workDir, '\0', "workDir", "use DIRECTORY as work dir");
76 defineStatic(continueOnError, '\0', "continueOnError", "don't stop on errors in non-follow mode", false);
77 defineStatic(nThreads, '\0', "nThreads", "number of simultaneous copys", 1);
78 
79 defineStatic(writeRateLimit, "writeRateLimit");
80 defineStatic(fileRateLimit, "fileRateLimit");
81 defineStatic(readRateLimit, "readRateLimit");
82 
83 defineStatic(modeBits, '\0', "mode",
84  "access mode for copied files", 0644);
85 
86 defineStatic(statPrefix, '\0', "statPrefix",
87  "prefix for statistics log lines", "");
88 
89 defineStatic(requestProviderName, '\0', "requestProvider",
90  "name of request provider",
91  "cmdLine");
92 defineStatic(printStatAnyway, '\0', "printStatAnyway",
93  "print stat values at the end anyway");
94 
95 #ifdef WithJsonCpp
96 defineStatic(prefixJsonName, '\0', "prefixJsonName",
97  "name of statPrefix in json output", "");
98 defineStatic(jsonExtraFields, '\0', "jsonExtraFields",
99  "extra field to add to json messages");
100 #endif
101 
102 
103 defineStatic(outputHandlerName, '\0', "outputHandler",
104  "name of outputHandler", "posixFile");
105 defineStatic(inputHandlerName, '\0', "inputHandler",
106  "name of inputHandler", "posixFile");
107 
108 
110 
112 
114 
115 defineStatic(resultOutput, nullptr);
116 
117 static options::single<std::string> pidFileName('\0', "pidFile",
118  "name of PID file");
119 
120 static void pidFileRemover() {
121  if (! pidFileName.empty()) {
122  throwcall::good0(unlink(pidFileName.c_str()), "can't unlink pid file ", pidFileName);
123  }
124 }
125 
126 
127 static volatile std::atomic<bool> statPrintRequested(false);
128 static volatile std::atomic<bool> statResetRequested(false);
129 void sigUsrHandler(int sigNum) {
130  statPrintRequested = true;
131  if (sigNum == SIGUSR2) {
132  statResetRequested = true;
133  }
134 }
135 
140  stopRequested = true;
141  // static_assert(std::atomic_is_lock_free(&stopRequested), "atomic bool is not always lock free"); // assert that stopRequested is always lock-free
142 }
145  throw std::logic_error("stop request handler cannot be instatiated twice");
146  }
148  throw std::logic_error("illegal handler id value");
149  }
150  std::signal(SIGTERM, sigHandler);
151  handlerId = id;
152 }
155  return stopRequested;
156  }
157  return false;
158 }
160  return handlerId == id && stopRequested;
161 }
162 void stopRequest::RequestStop(const std::string& aReason) {
164  static std::mutex instanceProtector;
165  std::unique_lock<decltype(instanceProtector)> lock(instanceProtector);
166  stopRequested = true;
167  reason = aReason;
168  } else {
170  "stopRequest", "requestStop", "no stop request instance");
171  }
172 }
175  if (stopRequested) {
176  if (! reason.empty()) {
177  throw std::runtime_error(reason);
178  } else {
179  throw std::runtime_error("SIGTERM received");
180  }
181  }
182  }
183 }
184 
185 void preservables::set(const std::string& opt) {
186  std::remove_reference<decltype(opt)>::type::size_type start = 0;
187  auto stop = opt.find(',', start);
188  while (true) {
189  auto keyword = opt.substr(start, stop - start);
190  if (keyword == "mode") {
191  mode = true;
192  } else if (keyword == "ownership") {
193  ownership = true;
194  } else if (keyword == "timestamps") {
195  timestamps = true;
196  } else if (keyword == "attr" || keyword == "xattr") {
197  attrs = true;
198  } else if (keyword == "acls") {
199  acls = true;
200  } else if (keyword == "all") {
201  mode = true;
202  ownership = true;
203  timestamps = true;
204  attrs = true;
205  acls = true;
206  } else {
207  throw std::invalid_argument("bad --preserve argument " + keyword);
208  }
209  if (stop == std::remove_reference<decltype(opt)>::type::npos) {
210  break;
211  }
212  start = stop + 1;
213  stop = opt.find(',', start);
214  }
215 }
216 
217 
218 
219 
220 
224 void delayRequest(copyRequest::timedQueue& delayedRequests,
225  copyRequest::simpleQueue& requests) {
226  while (auto request = delayedRequests.dequeue()) {
227  requests.enqueue(request);
228  }
229 }
230 
232  copyRequest::simpleQueue& requests,
233  std::ostream& hashStream,
234  std::ostream& logStream,
235  std::ostream& statStream) {
236  try {
237  bool timedOut = true;
238  auto lastQueueSize = requests.size();
239  auto lastQueueSizePrint = std::chrono::system_clock::now();
240 
241  while (timedOut == true) {
242  while (auto request = results.dequeue(std::chrono::seconds(1), timedOut)) {
243  request->printResults(hashStream, logStream);
244  // delete request;
245  }
246 
247  if (timedOut) {
248  logStream.flush();
249  }
250 
251  auto now = std::chrono::system_clock::now();
252 
253  if (statPrintRequested) {
254  statStream << statPrefix << "statistics values at " << std::fixed
255  << std::chrono::duration<double>(now.time_since_epoch()).count() << "\n";
256  statStream << statPrefix << "outstanding requests: " << requests.size() << "\n";
257  copyRequest::base::getStatPrinter()(statStream);
258  statStream.flush();
259  statPrintRequested = false;
260  } else {
261  auto queueSize = requests.size();
262 
263  if (lastQueueSize != queueSize &&
264  queueSize > 1 &&
265  now - lastQueueSizePrint > std::chrono::seconds(1)) {
266  statStream << statPrefix << "outstanding requests: " << requests.size() << "\n";
267  lastQueueSize = queueSize;
268  lastQueueSizePrint = now;
269  }
270 
271  }
272  if (statResetRequested) {
274  statResetRequested = false;
275  }
276  }
277 
278  copyRequest::base::getStatPrinter()(statStream);
279  } catch (const std::exception& e) {
281  "printThread","caught exception ",e.what());
282  stopRequest::RequestStop(e.what());
283  } catch (...) {
284  stopRequest::RequestStop("unknown exception");
285  }
286 }
287 
288 static options::single<bool> printErrorLocation('\0', "printErrorLocation",
289  "print error location (file,line,v fct) instead of prefix");
290 
291 void printErrors(std::ostream& errStream, messageQueue::queue* errQueue) {
292  try {
293  while (auto msg = errMsg::message::getQueue().dequeue()) {
294  timerInst(produceLogMsg);
295  if (printErrorLocation) {
296  errStream << msg->getLoc().getFile() << ":" << msg->getLoc().getLine()
297  << ": in " << msg->getLoc().getFunc() << "(): ";
298  } else {
299  errStream << msg->getLogLevel() << statPrefix << ": ";
300  }
301  errStream << msg->getObject() << " " << msg->getAction() << " " << msg->getMessage() << "\n";
302  if (errQueue) {
303 #ifdef WithJsonCpp
304  Json::Value root;
305  if (!prefixJsonName.empty()) {
306  root[prefixJsonName] = statPrefix;
307  }
308  for (auto& item : jsonExtraFields) {
309  root[item.first] = item.second;
310  }
311  root["level"] = logstream::namedLevel::getNameByLevel(msg->getLogLevel());
312  root["path"] = msg->getObject();
313  root["operation"] = msg->getAction();
314  root["message"] = msg->getMessage();
315  root["time"] = std::chrono::duration_cast<std::chrono::duration<double>>(msg->getTimeStamp().time_since_epoch()).count();
316  static Json::FastWriter jsonWriter;
317  errQueue->send(jsonWriter.write(root));
318 #else
319  errQueue->send(msg->getMessage());
320 #endif
321  }
322  }
323  } catch (const std::exception& e) {
324  stopRequest::RequestStop(e.what());
325  } catch (...) {
326  stopRequest::RequestStop("unknown exception");
327  }
328 }
329 
333  template <class T> void stopQueue(T& queue, const std::string& name) {
334  try {
335  queue.signalDone();
336  } catch (const std::logic_error& e) {
337  if (queue.size() > 1) {
338  errMsg::emit(errMsg::level::debug, errMsg::location(), name, "signalled", "done");
339  }
340  }
341  }
342  void joinThread(std::thread& t) {
343  if (t.joinable()) {
344  t.join();
345  }
346  }
347  public:
351  std::thread printer;
352  std::thread delayer;
353  std::forward_list<std::thread> workers;
354  queuesAndThreads() = default;
355  ~queuesAndThreads() noexcept(true) {
356  stopQueue(requests, "requests");
357  stopQueue(delayedRequests, "delayedRequests");
358  stopQueue(results, "results");
361  for (auto& worker : workers) {
362  joinThread(worker);
363  }
364  }
365 };
366 
367 
368 
373 int main(int argc, const char* argv[]) {
374  options::parser parser(EWMSCP_VERSION " https://stash.desy.de/projects/LSDMA/repos/ewmscp/commits/" EWMSCP_COMMIT, "", {});
376  "source files to be copied");
378  "destination file or directory");
379  options::single<std::string>target('\0', "target-directory",
380  "copy all source arguments into directory");
381  options::single<bool>parents('\0', "parents",
382  "use full source file name under DIRECTORY", false);
385  options::single<std::string> user('\0', "user",
386  "user of created files and directories", "");
387  user.fForbid(&uid);
388  uid.fForbid(&user);
391  options::single<std::string> group('\0', "group",
392  "group of created files and directories", "");
393  group.fForbid(&gid);
394  gid.fForbid(&group);
395  options::single<std::string> preserveOpt('\0', "preserve",
396  "preserve the given attribtes");
397  options::single<bool> preserveAll('p', "preserveAll",
398  "preserve mode,ownership,timestamps");
399 
400  options::single<std::string> hashListFile('\0', "hashList",
401  "file to write hash results into");
402  options::single<std::string> logFile('\0', "log",
403  "file to write log messages into");
404  options::single<std::string> statListFile('\0', "statList",
405  "file to write statistics into");
406  options::single<std::string> errorStream('\0', "errors",
407  "file to write errors to");
408 
409  options::single<std::string> startStopMsg('\0', "startStopMsg",
410  "message to show at start or stop");
411 
412  options::single<bool> allowCoreDumps('\0', "allowCoreDumps",
413  "set core dump limit to inf, 0 else", false);
414 
415 
417 
418  options::container<std::string> checksumCreatorNames('\0', "checksum",
419  "name of a checksum");
420  checksum::base::addAllowedNamesToOption(checksumCreatorNames);
421 
424 
425  #ifdef WithKafka
426  messageQueue::kafka kafkaOutQueue("kafkaOut");
427  messageQueue::kafka kafkaErrQueue("kafkaErr");
428  #endif
429  parser.fParse(argc, argv);
431 
432 
433  if (allowCoreDumps) {
434  struct rlimit new_limit = {RLIM_INFINITY, RLIM_INFINITY};
435  throwcall::good0(setrlimit(RLIMIT_CORE, &new_limit), "can't set coredump limit to infinity");
436  } else {
437  struct rlimit new_limit = {0, 0};
438  throwcall::good0(setrlimit(RLIMIT_CORE, &new_limit), "can't set coredump limit to 0");
439  }
440 
441  if (!pidFileName.empty()) {
442  std::ofstream pidFile(pidFileName);
443  pidFile << getpid() << "\n";
444  throwcall::good0(std::atexit(pidFileRemover), "can't register pidFileRemover");
445  }
446 
447  messageQueue::queue* errorOutput = nullptr;
448  #ifdef WithKafka
449  kafkaOutQueue.init();
450  if (kafkaOutQueue.isSenderConfigured()) {
451  resultOutput = &kafkaOutQueue;
452  }
453  kafkaErrQueue.init();
454  if (kafkaErrQueue.isSenderConfigured()) {
455  errorOutput = &kafkaErrQueue;
456  }
457  #endif
458 
459  logstream::provider errStreamBase(errorStream, std::cerr);
460  std::thread errPrinterThread(printErrors, std::ref(errStreamBase.getStream()),
461  errorOutput);
462 
463  logstream::provider hashStreamBase(hashListFile);
464  auto& hashStream = hashStreamBase.getStream();
465  logstream::provider logStreamBase(logFile);
466  auto& logStream = logStreamBase.getStream();
467  logstream::provider statStreamBase(statListFile);
468  auto& statStream = statStreamBase.getStream();
469 
470 
471  for (const auto& name : checksumCreatorNames) {
473  }
474 
475  if (!statPrefix.empty()) {
477  "ewmscp", "starting",
478  EWMSCP_VERSION, " pid: ", getpid());
479  }
480  if (!startStopMsg.empty()) {
481  logStream << logstream::level::info << startStopMsg << " ewmscp " << EWMSCP_VERSION << " starting.\n";
482  }
483 
484  try {
485 
486  if (group.fIsSet()) {
487  auto entry = getgrnam(group.c_str());
488 
489  if (entry) {
490  gid = entry->gr_gid;
491  } else {
492  throw std::runtime_error("groupname " + group + " not found.");
493  }
494  }
495 
496  if (user.fIsSet()) {
497  auto entry = getpwnam(user.c_str());
498 
499  if (entry) {
500  uid = entry->pw_uid;
501  } else {
502  throw std::runtime_error("username " + user + " not found.");
503  }
504  }
505 
506 
507  if (nThreads == 0) {
508  nThreads = sysconf(_SC_NPROCESSORS_ONLN);
510  nThreads = std::max(nThreads / (checksumCreators.empty() ? 2 : 3), 1u);
511  }
512  }
514 
515  if (preserveOpt.fIsSet()) {
516  preserve.set(preserveOpt);
517  }
518 
519  if (preserveAll) {
520  preserve.mode = true;
521  preserve.ownership = true;
522  preserve.timestamps = true;
523  }
524 
525 
526  if (workDir.fIsSet()) {
527  throwcall::good0(chdir(workDir.c_str()), "can't change work dir to ", workDir);
528  }
529 
530 
531 
532  if (target.fIsSet() || noCopy) { // the last positional arg is also a source file name
533  sources.push_back(destination);
534  std::string& dstRef(destination);
535  dstRef = target;
536  }
537 
538  queuesAndThreads qt;
539 
540 
542  qt.requests, qt.delayedRequests, parents,
545  );
546 
547  if ((nThreads > 1) || reqProv->isFollowMode()) {
548  for (unsigned int i = 0; i < nThreads; i++) {
549  qt.workers.emplace_front(std::thread(copyRequest::base::processQueue,
550  std::ref(qt.requests),
551  std::ref(qt.results),
552  std::ref(qt.delayedRequests)));
553  }
554 
555  qt.printer = std::thread(printResults, std::ref(qt.results), std::ref(qt.requests),
556  std::ref(hashStream), std::ref(logStream), std::ref(statStream));
557  std::signal(SIGUSR1, sigUsrHandler);
558  std::signal(SIGUSR2, sigUsrHandler);
559  stopRequest::instantiate(reqProv->isFollowMode() ?
562  }
563  if (reqProv->isFollowMode()) {
564  continueOnError = true;
565  }
566  qt.delayer = std::thread(delayRequest,
567  std::ref(qt.delayedRequests), std::ref(qt.requests));
568  reqProv->prepareMappings(sources, destination);
569  if (verbose) {
570  reqProv->printMappings(logStream);
571  }
572  reqProv->processSources(sources);
573  if (reqProv->isFollowMode()) {
575  } else {
576  if (qt.workers.empty()) {
577  copyRequest::perThreadData threadData;
579  auto request = qt.requests.dequeue();
580  request->process(threadData);
581  request->printResults(hashStream, logStream);
583  }
584  if (printStatAnyway) {
585  copyRequest::base::getStatPrinter()(statStream);
586  }
587  } else {
589  }
590  }
592  } catch (const std::exception& e) {
594  "ewmscp", "ended",
595  "badly: ", e.what(), ", pid: ", getpid());
596  errMsg::message::getQueue().signalDone();
597  errPrinterThread.join();
598  logStream << logstream::level::crit << startStopMsg << " ended badly: " << e.what() << std::endl;
599  return EXIT_FAILURE;
600  }
601  if (!statPrefix.empty()) {
603  "ewmscp", "ended", "sucessfully, pid: ", getpid());
604  }
605  if (!startStopMsg.empty()) {
606  logStream << logstream::level::info << startStopMsg << " ended sucessfully" << std::endl;
607  }
608  errMsg::message::getQueue().signalDone();
609  errPrinterThread.join();
610  return EXIT_SUCCESS;
611 }
inputHandler::base::newHandler
static base * newHandler(const std::string &name)
create an instance of an inputHandler, select by name
Definition: inputHandler.h:130
resultOutput
decltype(resultOutput) resultOutput(nullptr)
queuesAndThreads::delayedRequests
copyRequest::timedQueue delayedRequests
Definition: ewmscp.cpp:349
options::parser
class that contains the parser, i.e. does that option handling
Definition: Options.h:363
queuesAndThreads::results
copyRequest::simpleQueue results
Definition: ewmscp.cpp:350
printStatAnyway
decltype(printStatAnyway) printStatAnyway('\0', "printStatAnyway", "print stat values at the end anyway")
options::single< std::string >
pidFileName
static options::single< std::string > pidFileName('\0', "pidFile", "name of PID file")
errMsgQueue.h
stopRequest::instantiate
static void instantiate(handlerIdType id)
Definition: ewmscp.cpp:143
errMsg::level::warning
@ warning
errMsg::location
class for defining the location of a error message in the source code.
Definition: errMsgQueue.h:14
queuesAndThreads::stopQueue
void stopQueue(T &queue, const std::string &name)
Definition: ewmscp.cpp:333
options::single< bool >
class specialisation for options of type bool
Definition: Options.h:595
dereference
decltype(dereference) dereference( 'L', "dereference", "always follow symbolic links in SOURCE", false)
waitQueues::timed< copyRequest::base >
copyRequest::base::perFileThreads
static options::single< bool > perFileThreads
Definition: copyRequest.h:125
readRateLimit
decltype(readRateLimit) readRateLimit("readRateLimit")
waitQueues::simple::size
decltype(queue.size()) size() const
Definition: waitQueues.h:99
queuesAndThreads::requests
copyRequest::simpleQueue requests
Definition: ewmscp.cpp:348
messageQueue::kafka
Definition: kafkaQueue.h:14
logstream::level::info
@ info
requestProviderName
decltype(requestProviderName) requestProviderName('\0', "requestProvider", "name of request provider", "cmdLine")
outputHandler::base::newHandler
static base * newHandler(const std::string &name)
Definition: outputHandler.h:80
requestProvider::newProvider
static requestProvider * newProvider(const std::string &choice, decltype(requests) aRequests, decltype(delayedRequests) aDelayedRequests, decltype(parents) aParents, decltype(InputHandler) aInputHandler, decltype(OutputHandler) aOutputHandler)
Definition: requestProvider.cpp:16
errMsg::level::crit
@ crit
errMsg::level::info
@ info
errStream
std::ostream * errStream(nullptr)
copyRequest::base::waitForAllInstancesGone
static void waitForAllInstancesGone()
Definition: copyRequest.cpp:1421
delayRequest
void delayRequest(copyRequest::timedQueue &delayedRequests, copyRequest::simpleQueue &requests)
runs as it's own thread to move delayed requests from the delay queue to the requests queue as soon a...
Definition: ewmscp.cpp:224
quiet
decltype(quiet) quiet( 'q', "quiet", "be as quiet as possible", false)
preservables::mode
bool mode
Definition: ewmscp.h:78
workDir
decltype(workDir) workDir('\0', "workDir", "use DIRECTORY as work dir")
copyRequest::base::retry
static bool retry(std::unique_ptr< base > &request, timedQueue &delayedRequests)
Definition: copyRequest.cpp:1866
continueOnError
decltype(continueOnError) continueOnError('\0', "continueOnError", "don't stop on errors in non-follow mode", false)
copyRequest::base::processMultiplicities
static std::vector< std::atomic< unsigned int > > * processMultiplicities
Definition: copyRequest.h:321
queuesAndThreads::printer
std::thread printer
Definition: ewmscp.cpp:351
stopRequest::stopRequested
static volatile std::atomic< bool > stopRequested
Definition: ewmscp.h:129
scoped.h
errMsg::message::getQueue
static waitQueues::simple< message > & getQueue()
get reference to teh message queue
Definition: errMsgQueue.h:90
gid
decltype(gid) gid('\0', "gid", "gid of created files and directories", -1)
checksum::base::addAllowedNamesToOption
static void addAllowedNamesToOption(T &option)
Definition: checksumBase.h:75
sigUsrHandler
void sigUsrHandler(int sigNum)
Definition: ewmscp.cpp:129
logstream::provider
Definition: syslogstream.h:100
copyRequest::base::processQueue
static void processQueue(simpleQueue &queue, simpleQueue &resultQueue, timedQueue &delayedRequests)
Definition: copyRequest.cpp:1895
errMsg::level::debug
@ debug
stopRequest::handlerId
static handlerIdType handlerId
Definition: ewmscp.h:126
stopRequest::handlerIdType::processLoop
@ processLoop
queuesAndThreads::delayer
std::thread delayer
Definition: ewmscp.cpp:352
preservables::timestamps
bool timestamps
Definition: ewmscp.h:81
outputHandler::base::addAllowedNamesToOption
static void addAllowedNamesToOption(T &option)
Definition: outputHandler.h:87
stopRequest::sigHandler
static void sigHandler(int)
Definition: ewmscp.cpp:139
fileRateLimit
decltype(fileRateLimit) fileRateLimit("fileRateLimit")
throttle::start
static auto start
Definition: throttle.h:10
main
int main(int argc, const char *argv[])
Definition: optionExample.cpp:6
preservables::attrs
bool attrs
Definition: ewmscp.h:82
requestProvider::addAllowedNamesToOption
static void addAllowedNamesToOption(options::single< std::string > &option)
Definition: requestProvider.cpp:24
options::container< std::string >
copyRequest::base::getStatPrinter
static std::function< void(std::ostream &)> & getStatPrinter()
we can't have static functions virtual, so we explicitly use a function to give us the proper print f...
Definition: copyRequest.cpp:1816
timer.h
modeBits
decltype(modeBits) modeBits('\0', "mode", "access mode for copied files", 0644)
messageQueue::queue::send
virtual void send(const std::string &aMessage, const std::string &aTopic="")=0
kafkaQueue.h
preservables::ownership
bool ownership
Definition: ewmscp.h:80
throwcall.h
preservables::set
void set(const std::string &opt)
Definition: ewmscp.cpp:185
requestProvider.h
preservables::acls
bool acls
Definition: ewmscp.h:83
stopRequest::Requested
static bool Requested()
Definition: ewmscp.cpp:153
outputHandlerName
decltype(outputHandlerName) outputHandlerName('\0', "outputHandler", "name of outputHandler", "posixFile")
defineStaticNoArg
#define defineStaticNoArg(var)
defines a static variable that needs no arguments to it's constructor
Definition: ewmscp.h:44
logstream::level::crit
@ crit
statPrintRequested
static volatile std::atomic< bool > statPrintRequested(false)
logstream::namedLevel::getNameByLevel
static const std::string & getNameByLevel(level aLevel)
Definition: syslogstream.h:44
waitQueues::simple::enqueue
void enqueue(std::unique_ptr< T > &item)
Definition: waitQueues.h:37
inputHandlerName
decltype(inputHandlerName) inputHandlerName('\0', "inputHandler", "name of inputHandler", "posixFile")
printErrorLocation
static options::single< bool > printErrorLocation('\0', "printErrorLocation", "print error location (file,line,v fct) instead of prefix")
copyRequest.h
preserve
decltype(preserve) preserve
set of properties to preserve in the copy
Definition: ewmscp.cpp:111
statResetRequested
static volatile std::atomic< bool > statResetRequested(false)
copyRequest::perThreadData
class for thread-specific data
Definition: copyRequest.h:67
queuesAndThreads::joinThread
void joinThread(std::thread &t)
Definition: ewmscp.cpp:342
copyRequest::fileInWork::waitForAllInstancesGone
static void waitForAllInstancesGone()
Definition: fileInWork.cpp:32
stopRequest::handlerIdType::unhandled
@ unhandled
stopRequest::handlerIdType::requestProvider
@ requestProvider
noCopy
decltype(noCopy) noCopy('\0', "noCopy", "do not create a copy", false)
stopRequest::handlerIdType::undefined
@ undefined
checksumCreators
decltype(checksumCreators) checksumCreators
Definition: ewmscp.cpp:113
statPrefix
decltype(statPrefix) statPrefix('\0', "statPrefix", "prefix for statistics log lines", "")
errMsg::emit
void emit(level aLogLevel, const location &loc, const std::string &aObject, const std::string &aAction, const Args &... args)
function to create and enqueue a message, this is the only way that messages should be created!
Definition: errMsgQueue.h:148
verbose
decltype(verbose) verbose( 'v', "verbose", "explain what is being done", false)
stopRequest::reason
static std::string reason
Definition: ewmscp.h:127
timerInst
#define timerInst(subfunc)
Definition: timer.h:157
waitQueues::simple< copyRequest::base >
waitQueues::timed::dequeue
std::unique_ptr< T > dequeue()
Definition: waitQueues.h:240
options::positional
Definition: Options.h:876
stopRequest::ThrowUpReasonably
static void ThrowUpReasonably()
Definition: ewmscp.cpp:173
messageQueue::queue
Definition: messageQueue.h:8
pidFileRemover
static void pidFileRemover()
Definition: ewmscp.cpp:120
printErrors
void printErrors(std::ostream &errStream, messageQueue::queue *errQueue)
Definition: ewmscp.cpp:291
inputHandler::base::addAllowedNamesToOption
static void addAllowedNamesToOption(T &option)
Definition: inputHandler.h:137
checksum::base::newFactory
static factoryClass * newFactory(const std::string &aName)
Definition: checksumBase.h:68
stopRequest::handlerIdType
handlerIdType
Definition: ewmscp.h:119
queuesAndThreads::workers
std::forward_list< std::thread > workers
Definition: ewmscp.cpp:353
throwcall::good0
void good0(T call, const Args &... args)
template function to wrap system calls that return 0 on success
Definition: throwcall.h:40
queuesAndThreads::queuesAndThreads
queuesAndThreads()=default
waitQueues::simple::dequeue
std::unique_ptr< T > dequeue(bool mayCreateNew, Types ... args)
Definition: waitQueues.h:59
queuesAndThreads
class to make sure all threads are joined before thread object is deleted in case of an exception to ...
Definition: ewmscp.cpp:332
uid
decltype(uid) uid('\0', "uid", "uid of created files and directories", -1)
stopRequest::RequestStop
static void RequestStop(const std::string &aReason)
Definition: ewmscp.cpp:162
nThreads
decltype(nThreads) nThreads('\0', "nThreads", "number of simultaneous copys", 1)
copyRequest::base::checkForInstances
static bool checkForInstances()
Definition: copyRequest.cpp:1427
syslogstream.h
copyRequest::base::resetStats
static void resetStats()
Definition: copyRequest.cpp:1852
ewmscp.h
queuesAndThreads::~queuesAndThreads
~queuesAndThreads() noexcept(true)
Definition: ewmscp.cpp:355
defineStatic
#define defineStatic(var,...)
defines a static variable and instatitates the constructor with the variable number of arguments.
Definition: ewmscp.h:42
printResults
void printResults(copyRequest::simpleQueue &results, copyRequest::simpleQueue &requests, std::ostream &hashStream, std::ostream &logStream, std::ostream &statStream)
Definition: ewmscp.cpp:231
writeRateLimit
decltype(writeRateLimit) writeRateLimit("writeRateLimit")