ewmscp  ..
copyRequest.cpp
Go to the documentation of this file.
1 /*
2  ewmscp: copy program with extended functionality
3  Copyright (C) 2018 Juergen Hannappel
4 
5  This program is free software: you can redistribute it and/or modify
6  it under the terms of the GNU General Public License as published by
7  the Free Software Foundation, either version 3 of the License, or
8  (at your option) any later version.
9 
10  This program is distributed in the hope that it will be useful,
11  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  GNU General Public License for more details.
14 
15  You should have received a copy of the GNU General Public License
16  along with this program. If not, see <https://www.gnu.org/licenses/>.
17 */
21 #include "copyRequest.h"
22 #include "inputHandler.h"
23 #include "md5sum.h"
24 #include "scoped.h"
25 #include "syslogstream.h"
26 #include "throwcall.h"
27 #include "escapism.h"
28 #include "git-rev.h"
29 #include "errMsgQueue.h"
30 #include <string.h>
31 #include "timer.h"
32 #ifdef WithJsonCpp
33 #include <json/json.h>
34 #endif
35 
36 std::ostream& operator<< (std::ostream& stream,
37  const copyRequest::clock_type::time_point& t) {
38  stream << std::chrono::duration_cast<std::chrono::duration<double>>(t.time_since_epoch()).count();
39  return stream;
40 }
41 
42 
43 
46 defineStatic(copyRequest::base::success_source_attrs, '\0', "success-source-attrs",
47  "xattrs to set on source file on success");
48 defineStatic(copyRequest::base::failure_source_attrs, '\0', "failure-source-attrs",
49  "xattrs to set on source file on failure");
50 defineStatic(copyRequest::base::process_source_attrs, '\0', "process-source-attrs",
51  "xattrs to set on source file during copy, removed ad end of copy");
52 defineStatic(copyRequest::base::start_source_attrs, '\0', "start-source-attrs",
53  "xattrs to set on source file on start of copy");
54 defineStatic(copyRequest::base::check_source_attrs, '\0', "check-source-attrs",
55  "xattrs to check on source file after copy (or hashing)");
56 defineStatic(copyRequest::base::success_dest_attrs, '\0', "success-dest-attrs",
57  "xattrs to set on destination file on success");
58 defineStatic(copyRequest::base::maxRetries, '\0', "retries", "max number of retries", 5);
59 
61  '\0', "prohibitive-attrs",
62  "xattrs that prohibit copying when present on source");
64  '\0', "append-attrs",
65  "xattrs that trigger append mode when present on source");
66 
68  '\0', "appendable-files",
69  "regexp for files that may be appended to");
70 
72  '\0', "ignoreExisting",
73  "ignore files where destination exists and has same stat values");
75  '\0', "changeAttrsOnIgnoreExisting",
76  "update preserved attrs on files ignored by ignoreExisting");
77 
78 defineStatic(copyRequest::base::linkBaseMap, '\0', "linkBaseMap",
79  "base dir that gets removed from absolute links");
81  "control creation of sparse files.", "auto", {"auto", "never", "always"});
82 
85  backupMode = getBackupModeNameMap().at(me);
86 },
87 '\0', "backupMode", "backup mode", "none", [] {std::vector<std::string> v; for (const auto& i : getBackupModeNameMap()) {
88 v.push_back(i.first);
89 }
90 return v;
91  }());
92 
93 defineStatic(copyRequest::base::backupMode, backupModeType::none);
94 
95 defineStatic(copyRequest::base::backupSuffix, '\0', "backupSuffix", "suffix to append to backup files");
97  "sync new files after write", false);
98 defineStatic(copyRequest::base::setAttributesAfterClose, '\0', "setAttributesAfterClose",
99  "set attributes after close() (needed for dCache)");
100 
101 defineStatic(copyRequest::base::perFileThreads, '\0', "perFileThreads",
102  "extra write/checksum thread per file", false);
103 defineStatic(copyRequest::base::nSumThreads, '\0', "nSumThreads",
104  "number of threads for parallelizable checksums per file",
105  3);
106 defineStatic(copyRequest::base::nReadThreads, '\0', "nReadThreads",
107  "number of threads for parallelizable reads per file",
108  3);
109 defineStatic(copyRequest::base::forceParallelRead, '\0', "forceParallelRead",
110  "do parallel read even on loaded system");
111 defineStatic(copyRequest::base::nWriteThreads, '\0', "nWriteThreads",
112  "number of threads for parallelizable writes per file",
113  3);
115  "read block size, 0: from fs",
116  0);
118  "write block size, 0: from fs",
119  0);
120 
121 
122 defineStatic(copyRequest::base::maxMemoryBlockSize, '\0', "maxMemoryBlockSize",
123  "max block size of memory blocks",
124  16 * 1024 * 1024);
125 defineStatic(copyRequest::base::minMemoryBlockSize, '\0', "minMemoryBlockSize",
126  "min block size of memory blocks",
127  1 * 1024 * 1024);
128 
130  "field in the log lines");
131 defineStatic(copyRequest::base::sourcePrefixRegex, '\0', "sourcePrefixRegex",
132  "regexp to determine log prefix from source path");
133 defineStatic(copyRequest::base::destinationPrefixRegex, '\0', "destinationPrefixRegex",
134  "regexp to determine log prefix from destination path");
135 
136 #ifdef WithMagic
137 defineStatic(copyRequest::base::doMagic, '\0', "doMagic",
138  "determine file type via libmagic");
139 #endif
140 
142 
147 defineStatic(copyRequest::base::tTotal2Stat, "time modify to finish");
153 defineStatic(copyRequest::base::bytesStat, "file size", "B");
154 defineStatic(copyRequest::base::speedStat, "copy speed", "MiB/s");
156 
161 defineStatic(copyRequest::base::maxDelayTime, '\0', "maxDelayTime",
162  "max seconds to delay files", 10);
165 
167 
168 
172 
176  #ifdef WithMagic
177  magicCookie = throwcall::badval(magic_open(MAGIC_NONE), nullptr, "can't init libmagic");
178  if (magic_load(magicCookie, nullptr) != 0) {
179  throw std::runtime_error(magic_error(magicCookie));
180  }
181  #endif
182 }
183 
184 
185 void copyRequest::exceptionList::add(std::exception_ptr e) {
186  std::unique_lock<decltype(listMutex)> lock(listMutex);
187  list.push_front(e);
188 }
190  // for test on emptyness lock is not needed
191  return list.empty();
192 }
194  std::unique_lock<decltype(listMutex)> lock(listMutex);
195  return std::distance(list.cbegin(), list.cend());
196 }
197 std::exception_ptr copyRequest::exceptionList::front() {
198  std::unique_lock<decltype(listMutex)> lock(listMutex);
199  return list.front();
200 }
201 
202 
235 const std::map<std::string, copyRequest::backupModeType>& copyRequest::base::getBackupModeNameMap() {
236  static std::map<std::string, backupModeType> backupModeNameMap = {
242  };
243  return backupModeNameMap;
244 }
245 
246 std::map<std::string, void (copyRequest::base::*)(std::string&) const>& copyRequest::base::keywordMap() {
247  static std::map<std::string, void (copyRequest::base::*)(std::string&) const> map;
248  return map;
249 }
260 // /}
261 #define defKeyword(kw) static copyRequest::base::registerme kw_##kw("%" #kw,&copyRequest::base::kw_##kw); \
262  void copyRequest::base::kw_##kw(std::string& value) const
263 defKeyword(size) {
267  if (readInitialStat == nullptr) {
268  value = "NA";
269  return;
270  }
271  value = std::to_string(readInitialStat->size);
272 }
273 
275 defKeyword(mtime) {
276  if (readInitialStat == nullptr) {
277  value = "NA";
278  return;
279  }
280  readInitialStat->getMtime(value);
281 }
283 defKeyword(inow) {
284  value = std::to_string(std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()).count());
285 }
286 
289  value = std::to_string(std::chrono::duration_cast<std::chrono::duration<double>>(std::chrono::system_clock::now().time_since_epoch()).count());
290 }
291 
293 defKeyword(version) {
294  value = EWMSCP_VERSION;
295 }
296 
298 defKeyword(commit) {
299  value = EWMSCP_COMMIT;
300 }
301 
303 defKeyword(prefix) {
304  value = prefix;
305 }
307 
308 
312 defKeyword(reason) {
313  if (errorMessage.empty()) {
314  value = "no error message found";
315  } else {
316  value = errorMessage;
317  }
318 }
319 
321 defKeyword(state) {
322  value = "";
323  static struct {
324  stateBitType bit;
325  const char *name;
326  } bitNames[] = {{stateBitType::done, "done"},
327  {stateBitType::append, "append"},
328  {stateBitType::linkToBeMade, "link"},
329  {stateBitType::failed, "failed"},
330  {stateBitType::ignore, "ignored"},
331  {stateBitType::trucated, "trucated"},
332  {stateBitType::inWork, "inWork"}
333  };
334 
335  bool needComma = false;
336  for (const auto& bitName : bitNames) {
337  if (state & bitName.bit) {
338  if (needComma) {
339  value += ", ";
340  }
341 
342  value += bitName.name;
343  needComma = true;
344  }
345  }
346 }
347 
349 defKeyword(finishtime) {
350  char buffer[64];
351  sprintf(buffer, "%.6f", std::chrono::duration_cast<std::chrono::duration<double>>(tWorkDone - tInotify).count());
352  value = buffer;
353 }
354 
355 #ifdef WithMagic
356 defKeyword(magic) {
358  value = fileMagic;
359 }
360 #endif
361 
365 bool copyRequest::base::expandAttrValue(std::string& value) {
366  auto it = keywordMap().find(value);
367  if (it != keywordMap().end()) {
368  auto f = it->second;
369  (this ->* f)(value);
370  return true;
371  }
372 
373  if (value.size() > 1 && value.at(0) == '%') {
374  auto keyword = value.substr(1);
375  for (const auto& cksum : checkSums) {
376  if (cksum->getName() == keyword) {
377  value = cksum->getResult();
378  return true;
379  }
380  }
381  }
382 
383  return false; // no change...
384 }
447  const std::map<std::string, std::string>& attrs) {
448  for (const auto& attr : attrs) {
449  auto name = attr.first;
450  auto value = attr.second;
451 
452  if (!expandAttrValue(value)) {
453  if (value == "%count") { // special expansion, only useful here
454  unsigned int count = 0;
455  auto retval = writeHandle.getXattr(name);
456  if (!retval.empty()) {
457  try {
458  count = std::stoul(retval);
459  } catch (...) {
461  source, "xattr count",
462  "strange value '", retval, "'");
463  count = 0;
464  }
465  }
466  value = std::to_string(count + 1);
467  } else if (value == "%sums") { // special expansion only useful here
468  for (const auto& item : checkSums) {
469  std::string fullName(name);
470  fullName += item->getName();
471  writeHandle.setXattr(fullName, item->getResult());
472  }
473  continue;
474  }
475  }
476  writeHandle.setXattr(name, value);
477  }
478 }
479 
480 
481 void copyRequest::base::attrdel(ioHandle& handle, const std::map<std::string, std::string>& attrs) {
482  for (const auto& attr : attrs) {
483  handle.removeXattr(attr.first);
484  }
485 }
486 void copyRequest::base::checkAttributes(ioHandle& handle, const std::map<std::string, std::string>& attrs) {
487  for (const auto& attr : attrs) {
488  auto name = attr.first;
489  auto value = attr.second;
490  expandAttrValue(value);
491  auto fileValue = handle.getXattr(name);
492 
493  if (fileValue.empty()) {
495  errorMessage += "attr " + name + " missing ";
496  continue;
497  }
498 
499  if (value != fileValue) {
501  errorMessage += "attr " + name + " mismatch " + value + " i.o. " + fileValue;
502  }
503  }
504 
505  if (state & stateBitType::attributeMismatch) {
506  throw std::runtime_error(errorMessage);
507  }
508 }
509 
511  blockReadRequestQueue& blockRequests,
512  blockQueue& freeBlocks,
513  blockQueue& readBlocks,
514  exceptionList& exceptions) {
515  try {
516  while (auto request = blockRequests.dequeue()) {
517  auto b = freeBlocks.dequeue(readBlocks.size() < 3, memoryBlockSize);
518  if (b == nullptr) {
519  throw std::runtime_error("got no free block in readWorker");
520  }
521  input.readBlockP(*b, request->size, request->offset);
522  readBlocks.enqueue(b);
523  }
524  } catch (...) {
525  exceptions.add(std::current_exception());
526  }
527 }
528 
529 
531  blockQueue& freeBlocks,
532  blockQueue& readBlocks,
533  bool mayParallelize,
534  exceptionList& exceptions) {
535  try {
536  unsigned nBlocks = readInitialStat->size / memoryBlockSize;
537  if (mayParallelize
538  && input.parallelizable()
539  && (nConcurrentProcesses < nThreads / nReadThreads + 1 // don't load loaded systems
540  || forceParallelRead)
541  && nReadThreads > 1
542  && nBlocks >= nReadThreads) {
543  blockReadRequestQueue readRequests;
544  for (size_t o = 0; o < readInitialStat->size; o += memoryBlockSize) {
545  auto s = readInitialStat->size - o;
546  s = std::min(s, memoryBlockSize);
547  readRequests.emplace(s, o);
548  }
549  readRequests.signalDone();
550  std::vector<std::thread> workers;
551  {
552  timerInst(readThreadSetup);
553  workers.resize(nReadThreads);
554  for (auto& worker : workers) {
555  worker = std::thread(&copyRequest::base::readWorker, this,
556  std::ref(input),
557  std::ref(readRequests),
558  std::ref(freeBlocks),
559  std::ref(readBlocks),
560  std::ref(exceptions));
561  }
562  }
563  for (auto& worker : workers) {
564  worker.join();
565  }
566  } else {
567  while (true) {
568  auto b = freeBlocks.dequeue(readBlocks.size() < 3, memoryBlockSize);
569  if (b == nullptr) {
570  throw std::runtime_error("got no free block in reader");
571  }
572  auto lastblock = input.readBlock(*b);
573 
574  if (b->empty()) { // we are done, recycle the block in the free block list
575  freeBlocks.enqueue(b);
576  } else {
577  readBlocks.enqueue(b);
578  }
579 
580  if (lastblock) {
581  break;
582  }
583  }
584  }
585  } catch (...) {
586  exceptions.add(std::current_exception());
587  }
588  readBlocks.signalDone();
589 }
590 
591 
592 void copyRequest::base::writer(std::unique_ptr<outputHandler::base::writer>& writeHandle,
593  blockQueue& blocksToWrite,
594  blockQueue& writtenBlocks,
595  exceptionList& exceptions) {
596  try {
597  size_t lastOffset = 0;
598  while (auto b = blocksToWrite.dequeue()) {
599  if (b->offset() < lastOffset) {
600  throw std::logic_error("blocks not in order");
601  }
602  writeHandle->writeBlock(*b);
603  lastOffset = b->offset();
604  writtenBlocks.enqueue(b);
605  }
606  } catch (...) {
607  exceptions.add(std::current_exception());
608  }
609 }
610 void copyRequest::base::writeWorker(std::unique_ptr<outputHandler::base::writer>& writeHandle,
611  blockQueue& blocksToWrite,
612  blockQueue& writtenBlocks,
613  exceptionList& exceptions) {
614  try {
615  while (auto b = blocksToWrite.dequeue()) {
616  writeHandle->writeBlockP(*b);
617  writtenBlocks.enqueue(b);
618  }
619  } catch (...) {
620  exceptions.add(std::current_exception());
621  }
622 }
623 
624 
625 copyRequest::base::writeActor::writeActor(std::unique_ptr<outputHandler::base::writer>& writeHandle,
626  blockQueue& blocksToWrite,
627  blockQueue& writtenBlocks,
628  bool mayParallelize,
629  base* request,
630  exceptionList& exceptions):
631  outputQueue(writtenBlocks) {
632  if (noCopy) { // do nothing
633  return;
634  }
635  unsigned nBlocks = request->readInitialStat->size / request->memoryBlockSize;
636  if (mayParallelize
637  && request->nConcurrentProcesses < nThreads / nWriteThreads + 1
638  && writeHandle->parallelizable()
639  && nWriteThreads > 1
640  && nBlocks >= nWriteThreads) {
641  workers.resize(nWriteThreads);
642  for (auto& worker : workers) {
643  worker = std::thread(&copyRequest::base::writeWorker, request,
644  std::ref(writeHandle),
645  std::ref(blocksToWrite),
646  std::ref(writtenBlocks),
647  std::ref(exceptions));
648  }
649  } else {
650  workers.push_back(std::thread(&copyRequest::base::writer, request,
651  std::ref(writeHandle),
652  std::ref(blocksToWrite),
653  std::ref(writtenBlocks),
654  std::ref(exceptions)));
655  }
656 }
658  if (workers.empty()) {
659  return;
660  }
661  for (auto& worker : workers) {
662  worker.join();
663  }
664  outputQueue.signalDone();
665 }
667  return !workers.empty();
668 }
670  blockQueue& hashedBlocks,
671  base* request,
672  bool mayParallelize,
673  exceptionList& exceptions):
674  outputQueue(hashedBlocks) {
675  if (request->checkSums.empty() || (request->state & stateBitType::append)) {
676  return;
677  }
678  unsigned nBlocks = request->readInitialStat->size / request->memoryBlockSize;
679  if (request->checkSums.size() == 1
680  && request->checkSums.front()->parallelizable()
681  && nSumThreads > 1
682  && mayParallelize
683  && nBlocks >= nSumThreads ) {
684  workers.resize(std::min(nSumThreads.fGetValue(), nBlocks));
685  parallelSum = dynamic_cast<checksum::parallel*>(request->checkSums.front());
686  for (auto& worker : workers) {
687  worker = std::thread(&copyRequest::base::hash_worker, request,
688  parallelSum,
689  std::ref(blocksToHash),
690  std::ref(outputQueue),
691  std::ref(exceptions));
692  }
693  } else { // use one thread to calculate all checksums
694  workers.emplace_back(std::thread(&copyRequest::base::hasher, request,
695  std::ref(blocksToHash),
696  std::ref(outputQueue),
697  std::ref(exceptions)));
698  }
699 }
700 
702  if (workers.empty()) {
703  return;
704  }
705  for (auto& worker : workers) {
706  worker.join();
707  }
708  if (workers.size() > 1) {
709  parallelSum->parallelFinish();
710  }
711  outputQueue.signalDone();
712 }
714  return ! workers.empty();
715 }
717  return workers.size() != 1; // we also do nothing in parallel
718 }
719 
721  blockQueue& blocksToHash,
722  blockQueue& hashedBlocks,
723  exceptionList& exceptions) {
724  try {
725  while (auto b = blocksToHash.dequeue()) {
726  timerInst(hash);
727  if (b->isHole()) {
728  sum->update(b->size(), b->offset());
729  } else {
730  sum->update(b->bufferAt(0), b->size(), b->offset());
731  }
732  hashedBlocks.enqueue(b);
733  }
734  } catch (...) {
735  exceptions.add(std::current_exception());
736  }
737 }
739  blockQueue& hashedBlocks,
740  exceptionList& exceptions) {
741  try {
742  while (auto b = blocksToHash.dequeue()) {
743  timerInst(hash);
744  if (b->isHole()) {
745  for (auto sum : checkSums) {
746  sum->update(b->size());
747  }
748  } else {
749  for (auto sum : checkSums) {
750  sum->update(b->bufferAt(0), b->size());
751  }
752  }
753  hashedBlocks.enqueue(b);
754  }
755 
756  for (auto sum : checkSums) {
757  sum->finish();
758  }
759  } catch (...) {
760  exceptions.add(std::current_exception());
761  }
762 }
763 
764 #ifdef WithMagic
765 void copyRequest::base::innerMagic(const block& b) {
766  if (b.offset() == 0) { // do magic only for the first block
767  timerInst(magic);
768  auto result = magic_buffer(threadData->magicCookie, b.bufferAt(0), b.size());
769  if (result == nullptr) {
770  result = magic_error(threadData->magicCookie);
771  }
772  if (result) {
773  fileMagic = result;
774  } else {
775  fileMagic = "unknown";
776  }
777  }
778 }
779 void copyRequest::base::magician(blockQueue& inputQueue,
780  blockQueue& outputQueue,
781  exceptionList& exceptions) {
782  try {
783  while (auto b = inputQueue.dequeue()) {
784  innerMagic(*b);
785  outputQueue.enqueue(b);
786  }
787  } catch (...) {
788  exceptions.add(std::current_exception());
789  }
790 }
791 copyRequest::base::magicCalculator::magicCalculator(blockQueue& inputQueue,
792  blockQueue& aOutputQueue,
793  base* request,
794  exceptionList& exceptions):
795  outputQueue(aOutputQueue) {
796  if (doMagic) {
797  worker = std::thread(&copyRequest::base::magician, request,
798  std::ref(inputQueue),
799  std::ref(outputQueue),
800  std::ref(exceptions));
801  }
802 }
803 copyRequest::base::magicCalculator::~magicCalculator() noexcept (false) {
804  if (worker.joinable()) {
805  worker.join();
806  outputQueue.signalDone();
807  }
808 }
809 bool copyRequest::base::magicCalculator::joinable() const {
810  return worker.joinable();
811 }
812 #endif
814  std::unique_ptr<outputHandler::base::writer>& writeHandle) {
815  blockQueue blocksToWrite;
816  blockQueue blocksToHash;
817  #ifdef WithMagic
818  blockQueue blocksForMagic;
819  #endif
820  exceptionList exceptions;
821 
822  scoped::generic < decltype(threadData->freeBlocks) > freeBlockReseter(threadData->freeBlocks,
823  [](decltype(threadData->freeBlocks)& freeBlocks) {
824  freeBlocks.resetDone();
825  });
826  {
827  #ifdef WithMagic
828  magicCalculator magicThread(blocksForMagic, threadData->freeBlocks, this, exceptions);
829  auto& lastQueue = magicThread.joinable() ?
830  blocksForMagic : // with magic thread
831  threadData->freeBlocks; // without magic thread
832  #else
833  auto& lastQueue = threadData->freeBlocks;
834  #endif
835  hashCalculator hashThread(blocksToHash, lastQueue, this,
836  (writeHandle->parallelizable() && nWriteThreads > 1) || noCopy, exceptions);
837  auto& writerOutputQueue = hashThread.joinable() ?
838  blocksToHash : // with hash thread
839  lastQueue; // without hash thread
840  writeActor writeThread(writeHandle,
841  blocksToWrite,
842  writerOutputQueue,
843  hashThread.parallelized(),
844  this,
845  exceptions);
846  auto& readerOutputQueue = writeThread.joinable() ? // with write thread
847  blocksToWrite :
848  ( // without write thread
849  hashThread.joinable() ? // with hash thred
850  blocksToHash : // we read and hash
851  lastQueue // apparently we just read and discard the result
852  );
853 
854  reader(input, threadData->freeBlocks, readerOutputQueue,
855  hashThread.parallelized() && (writeHandle->parallelizable() || noCopy), exceptions);
856  } // hashThread and writeThread go out of scope and are joined
857 
858  if (!exceptions.empty()) {
859  auto n = exceptions.size();
860  if (n > 1) {
862  source, "threaded copy", "we have ", n, " exceptions at once");
863  }
864  std::rethrow_exception(exceptions.front());
865  }
866 }
867 
868 void copyRequest::base::doUnthreadedCopy(inputHandler::base::reader& input, std::unique_ptr<outputHandler::base::writer>& writeHandle) {
869  auto& freeBlocks = threadData->freeBlocks;
870  auto b = freeBlocks.dequeue(freeBlocks.empty(), memoryBlockSize);
871  if (b == nullptr) {
872  throw std::runtime_error("got no block for unthreaded copy");
873  }
874  for (;;) {
875  auto lastblock = input.readBlock(*b);
876 
877  if (!checkSums.empty() && !(state & stateBitType::append)) {
879  if (b->isHole()) {
880  for (auto sum : checkSums) {
881  sum->update(b->size());
882  }
883  } else {
884  for (auto sum : checkSums) {
885  sum->update(b->bufferAt(0), b->size());
886  }
887  }
888  }
889 
890  if (!noCopy) {
891  writeHandle->writeBlock(*b);
892  }
893  #ifdef WithMagic
894  if (doMagic) {
895  innerMagic(*b);
896  }
897  #endif
898  if (lastblock) {
899  break;
900  }
901  }
902 
903  for (auto sum : checkSums) {
904  sum->finish();
905  }
906  if (b != nullptr) {
907  freeBlocks.enqueue(b);
908  } else {
910  source, "unthreaded copy", "block ptr is nullptr after copy");
911  }
912 }
913 
915  outputHandler::base* OutputHandler) {
916  if (InputHandler->pathExists(source)) {// source really vanished
917  errorMessage = "source " + source + " still exists, not deleting " + destination;
918  state |= stateBitType::failed;
919  } else {
920  OutputHandler->remove(destination, state);
921  if (state & stateBitType::vanished) {
922  errorMessage = "vanished before processing";
923  } else if (state & stateBitType::failed) {
924  errorMessage = "directory not empty";
925  }
926  state |= stateBitType::done;
927  }
928 }
929 
930 
932  outputHandler::base* OutputHandler) {
933  if (readInitialStat == nullptr) {
934  throw std::runtime_error("makeSymlink: no readInitialStat for " + source);
935  }
936  std::vector<char> link(readInitialStat->size + 1);
937  auto linkExists = InputHandler->readLinkTarget(source, link);
938 
939  if (! linkExists) {
941  return true;
942  }
943 
972  for (const auto& item : linkBaseMap) {
973  if ((link.size() - 1 > item.first.size()) &&
974  (item.first.compare(0, item.first.size(), link.data(), item.first.size()) == 0)) { // link is relative to (source) work dir
975  link.erase(link.begin(), link.begin() + item.first.size());
976  while (link.front() == '/') {
977  link.erase(link.begin());
978  }
979  auto searchStart(item.second.empty() ? mapEntry.second.size() + 1 : item.second.size() + 1);
980 
981  // source, i.e. the link itself is relative to the workDIr of the inotify watch
982  // prepend as many ../ to the link dest as needed to make it the rest start at
983  // that directory
984  for (auto slashPosition = source.find_first_of('/', searchStart);
985  slashPosition != std::remove_reference<decltype(source)>::type::npos;
986  slashPosition = source.find_first_of('/', slashPosition + 1)) {
987  if (slashPosition > 0 &&
988  source[slashPosition - 1] == '.' &&
989  (slashPosition == 1 || source[slashPosition - 2] != '.')) { // skip ./
990  continue;
991  }
992 
993  link.insert(link.begin(), {'.', '.', '/'});
994  }
995  break; // we consider only the first match
996  }
997  }
998  OutputHandler->createSymlink(link, destination, uid, gid);
999 
1000 
1001  return true;
1002 }
1003 
1004 
1011 bool copyRequest::base::adviseDelay(clock_type::duration dt,
1012  const std::string& suffix) {
1013  if (suffix.empty() || dt < clock_type::duration::zero()) {
1014  return false;
1015  }
1016  if (std::chrono::duration_cast<std::chrono::duration<double>>(dt).count() > maxDelayTime) {
1017  dt = std::chrono::duration_cast<decltype(dt)>(std::chrono::duration<double>(maxDelayTime));
1018  }
1019  bool delayChanged = false;
1020  {
1021  std::unique_lock<decltype(advisoryWaitMapMutex)> lock(advisoryWaitMapMutex);
1022  auto result = advisoryWaitMap.emplace(suffix, dt);
1023  if (result.second == false) { // we already have an entry
1024  auto it = result.first;
1025  if (it->second < dt) { // apparently we must wait longer
1026  it->second = dt;
1027  delayChanged = true;
1028  }
1029  } else {
1030  delayChanged = true;
1031  }
1032  }
1033  if (delayChanged) {
1035  "-- no path --", "delay change",
1036  "set wait time for '", suffix, "' files to ",
1037  std::fixed, std::chrono::duration_cast<std::chrono::duration<double>>(dt).count(), "s");
1038  }
1039  return delayChanged;
1040 }
1041 
1042 
1043 void copyRequest::base::adviseDelay(clock_type::duration dt) {
1044  if (dt == decltype(dt)::zero()) {
1045  dt = clock_type::now() - tInotify;
1046  }
1047 
1048  if (adviseDelay(dt, getSuffix())) {
1049  if (filesInWorkIterator == fileInWork::filesInWork.end()) {
1050  return;
1051  }
1052  filesInWorkIterator->second.setWaitTime(dt);
1053  }
1054 }
1059 copyRequest::clock_type::duration copyRequest::base::getAdvisedDelay() const {
1060  auto it = advisoryWaitMap.find(getSuffix());
1061  if (it == advisoryWaitMap.end()) {
1062  return clock_type::duration::zero();
1063  }
1064  return it->second;
1065 }
1066 
1067 
1068 
1069 
1070 
1071 
1073  fileInWork::typeChanger changer(*this, newType);
1074 }
1075 
1076 
1078  if (backupSuffix.empty()) {
1079  throw std::logic_error("backup suffix must not be empty");
1080  }
1081  std::string bkgSuffix;
1082  decltype(backupSuffix.find("bla")) index = 0;
1083  while (index != decltype(backupSuffix)::npos) {
1084  auto newIndex = backupSuffix.find_first_of('%', index);
1085  bkgSuffix += backupSuffix.substr(index, newIndex - index);
1086  if (newIndex != decltype(backupSuffix)::npos) {
1087  newIndex++;
1088  if (newIndex < backupSuffix.size()) {
1089  if (backupSuffix.at(newIndex) == '{') {
1090  newIndex++;
1091  auto endmarker = backupSuffix.find_first_of('}', newIndex);
1092  auto part = "%" + backupSuffix.substr(newIndex, endmarker - newIndex);
1093  expandAttrValue(part);
1094  bkgSuffix += part;
1095  newIndex = endmarker + 1;
1096  } else {
1097  auto part = backupSuffix.substr(newIndex - 1);
1098  expandAttrValue(part);
1099  bkgSuffix += part;
1100  newIndex = decltype(backupSuffix)::npos;
1101  }
1102  }
1103  }
1104  index = newIndex;
1105  }
1106  return bkgSuffix;
1107 }
1108 
1110  processMultiplicities->at(++nConcurrentProcesses)++;
1111  scoped::generic<decltype(*this)> concurrencyCount(*this,
1112  [](decltype(*this)& ) {
1114  });
1115 
1116  tWorkStart = clock_type::now();
1117  scoped::generic<decltype(tWorkDone)> timer(tWorkDone,
1118  [](decltype(tWorkDone)& workDone) {
1119  workDone = clock_type::now();
1120  });
1121 
1122  state |= stateBitType::inWork;
1123  scoped::generic<decltype(state)> stateReset(state,
1124  [](decltype(state)& State) {
1125  State.clear(stateBitType::inWork);
1126  });
1127 
1128  threadData = &aThreadData;
1129  scoped::generic<decltype(threadData)> unsetThreadData(threadData,
1130  [](decltype(threadData)& ThreadData) {
1131  ThreadData = nullptr;
1132  });
1133 
1134  errorMessage.clear(); // forget messages from previous attempts
1135  retries++;
1136 
1137  // do this before check for noAction needed to not be fooled by vanished bit
1138  if (state & stateBitType::fileToBeRemoved) {
1139  removeFileOrDirectory(threadData->InputHandler, threadData->OutputHandler);
1140  return; // no more things to do for stuff to remove
1141  }
1142 
1143  if (state & stateBitType::noActionNeeded) {
1144  if (state & stateBitType::ignore) {
1145  errorMessage = "ignored";
1146  } else if (state & stateBitType::vanished) {
1147  errorMessage = "vanished before action";
1148  } else {
1149  errorMessage = "oops";
1150  }
1151  return;
1152  }
1153 
1154  try {
1155  if (!noCopy) {
1156  threadData->OutputHandler->ensureParentDirs(destination, source, threadData->InputHandler);
1157  }
1158  if (state & stateBitType::linkToBeMade && !noCopy) {
1159  if (makeSymLink(threadData->InputHandler, threadData->OutputHandler)) {
1160  if (state & stateBitType::vanished) {
1161  errorMessage = "vanished before action";
1162  } else {
1163  state |= stateBitType::done;
1164  }
1165  return;
1166  }
1167  changeRequestType(stateBitType::fileToBeCopied);
1168  }
1169 
1170  if (state & stateBitType::fileToBeRenamed) {
1171  auto result = threadData->OutputHandler->rename(moveSource, readInitialStat, destination, state);
1172  switch (result) {
1174  if (state & stateBitType::ignore) {
1175  errorMessage = "directory move ignored";
1176  }
1177  return;
1179  if (threadData->InputHandler->getStat(origSource) == nullptr) {
1180  stateType dummyState;
1182  moveSource, "remove obsolete mv source",
1183  "can't find source '", origSource, "'");
1184  threadData->OutputHandler->remove(moveSource, dummyState);
1185  } else {
1187  moveSource, "rename",
1188  "littering with obsolete mv source");
1189  }
1190  }
1191 #if __GNUC__ >= 7
1192  __attribute__ ((fallthrough));
1193 #endif
1195  // file was probably not copied yet, try to copy it from the move target
1196  changeRequestType(stateBitType::fileToBeCopied);
1197  break;
1198  default:
1199  throw std::runtime_error("impossible rename retval");
1200  }
1201  }
1202  } catch (const fatalException&) { // always throw, never return
1203  throw;
1204  } catch (const std::exception& e) {
1205  errorMessage = e.what();
1206  state |= stateBitType::failed;
1207  if (continueOnError) {
1208  return;
1209  } else {
1210  throw;
1211  }
1212  }
1213 
1214  try { // try scope for input file setup/cleanup
1215  if (readInitialStat == nullptr ||// may happen with failed rename events
1216  retries > 1 || // for second or worse attempts always refresh stat
1217  clock_type::now() - tEnqueue > std::chrono::seconds(1) ||
1218  (! readInitialStat->isRegularFile())) {
1219  readInitialStat = threadData->InputHandler->getStat(source);
1220  if (readInitialStat == nullptr) {
1221  state |= stateBitType::vanished;
1222  errorMessage = "vanished in action";
1223  return;
1224  }
1225  }
1226  if (! readInitialStat->isRegularFile()) {
1227  state |= stateBitType::ignore;
1228  errorMessage = "ignored non regular file";
1229  return;
1230  }
1231 
1232  if (ignoreExisting) { // check if copy exists and has same stat values
1233  auto destStat = threadData->OutputHandler->getStat(destination);
1234  if (destStat) { // destination exists, check stat values
1235  if (readInitialStat->size == destStat->size &&
1236  readInitialStat->isSameMtimeAs(*destStat)) {
1237  state |= stateBitType::ignore;
1238  errorMessage = "ignored due to existing copy with same stat";
1239  if (changeAttrsOnIgnoreExisting) {
1240  if ((preserve.mode && readInitialStat->mode == destStat->mode)
1241  || (preserve.ownership && readInitialStat->ownerUid == destStat->ownerUid
1242  && readInitialStat->ownerGid == destStat->ownerGid)) {
1243  errorMessage += ", preserved attrs are already fine";
1244  } else {
1245  threadData->OutputHandler->doAttributePreservations(destination, *readInitialStat);
1246  errorMessage += ", attributes updated";
1247  }
1248  }
1249  return;
1250  }
1251  }
1252  }
1253 
1254  auto input = threadData->InputHandler->newReader(source, state, *readInitialStat);
1255  std::unique_ptr<ioHandle::attrDataType> attrData;
1256  if (preserve.attrs) {
1257  attrData = input->getAttrData(threadData->OutputHandler);
1258  }
1259  std::unique_ptr<acl::list> aclData;
1260  if (preserve.acls) {
1261  aclData = input->getAclData();
1262  }
1263 
1264  attrset(*input, process_source_attrs);
1265  attrset(*input, start_source_attrs);
1266  try {
1267  std::string destNameDuringWrite(destination);
1268  std::string backupFile;
1269  switch (backupMode) {
1270  case backupModeType::none:
1271  break;
1273  case backupModeType::during: {
1274  backupFile = destination + getBackupSuffix();
1275  threadData->OutputHandler->renameSimple(destination, backupFile);
1276  break;
1277  }
1278  case backupModeType::after: {
1279  if (!backupSuffix.empty()) {
1280  destNameDuringWrite += getBackupSuffix();
1281  }
1282  break;
1283  }
1284  case backupModeType::remove: {
1285  stateType dummy;
1286  threadData->OutputHandler->remove(destNameDuringWrite, dummy);
1287  break;
1288  }
1289  default:
1290  throw std::runtime_error("illegal backupModeType");
1291  }
1292  {
1293  // scope for writeHandle
1294  std::unique_ptr<outputHandler::base::writer> writeHandle;
1295  if (backupSuffix.empty() && backupMode == backupModeType::after) {
1296  writeHandle = threadData->OutputHandler->newTmpWriter(destNameDuringWrite,
1297  readInitialStat->size,
1298  noCopy,
1299  std::move(attrData),
1300  std::move(aclData));
1301  } else {
1302  writeHandle = threadData->OutputHandler->newWriter(destNameDuringWrite,
1303  (appendableFiles.fIsSet() &&
1304  regex_match(destination, appendableFiles)) ||
1305  std::any_of(append_attrs.cbegin(), append_attrs.cend(),
1306  [this](decltype(append_attrs)::value_type attr) {
1307  return ! threadData->InputHandler->getXattr(source, attr).empty();
1308  }),
1309  readInitialStat->size,
1310  input->getBlockSize(),
1311  state,
1312  noCopy,
1313  std::move(attrData),
1314  std::move(aclData));
1315  }
1316  doBlockSizeSetup(*input, *writeHandle);
1317 
1318  input->setupSparseRegions(sparse);
1319 
1320  if (state & stateBitType::append) { // we want to append
1321  auto startPosition = writeHandle->getSize();
1322  // align start position to a disk block
1323  startPosition = (startPosition / input->getBlockSize()) * input->getBlockSize();
1324  writeHandle->seek(startPosition);
1325  input->seek(startPosition);
1326  }
1327 
1328  if (perFileThreads
1329  && readInitialStat->size / memoryBlockSize > 1 // no point in threading small files
1330  ) {
1331  doThreadedCopy(*input, writeHandle);
1332  } else {
1333  doUnthreadedCopy(*input, writeHandle);
1334  }
1335 
1336  input->checkUnchangedness();
1337 
1338  checkAttributes(*input, check_source_attrs);
1339 
1340  if (!noCopy) {
1341  if (syncWrittenFiles) {
1342  writeHandle->sync();
1343  }
1344  if (!setAttributesAfterClose) {
1345  writeHandle->doAttributePreservations(*readInitialStat);
1346  }
1347  attrset(*writeHandle, success_dest_attrs);
1348  }
1349  // writeHandle is deleted here when the unique_ptr goes out of scope
1350  } // scope for writeHandle
1351 
1352  // special workaround for dCache that sets mtime on close()
1353  if (setAttributesAfterClose && !noCopy) {
1354  threadData->OutputHandler->doAttributePreservations(destNameDuringWrite, *readInitialStat);
1355  }
1356  switch (backupMode) {
1357  case backupModeType::during: {
1358  stateType dummy;
1359  threadData->OutputHandler->remove(backupFile, dummy);
1360  break;
1361  }
1362  case backupModeType::after:
1363  threadData->OutputHandler->renameSimple(destNameDuringWrite, destination);
1364  default:
1365  break;
1366  }
1367  } catch (const delayAdvisingError& e) {
1368  adviseDelay();
1369  errorMessage = e.what();
1370  attrset(*input, failure_source_attrs); // mark on input file
1371  throw; // continue in outer try/catch
1372  } catch (const std::exception& e) {
1373  errorMessage = e.what();
1374  attrset(*input, failure_source_attrs); // mark on input file
1375  throw; // continue in outer try/catch
1376  }
1377  attrdel(*input, process_source_attrs);
1378  attrdel(*input, failure_source_attrs); // we succeeded, remove old failure marks
1379  attrset(*input, success_source_attrs);
1380  } catch (const fatalException&) { // always throw, never return
1381  throw;
1382  } catch (const std::exception& e) { // no attribute massage for input, we don't have that
1383  errorMessage = e.what();
1384  state |= stateBitType::failed;
1385  if (continueOnError) {
1386  return;
1387  } else {
1388  throw;
1389  }
1390  }
1391  // check if earliest process time changed
1392  if (tWorkStart < filesInWorkIterator->second.getEarliestprocessTime()) {
1393  adviseDelay(); // try to avoid this next time
1394  errorMessage = "changed earliest process time, ";
1395  auto deltaT = filesInWorkIterator->second.getEarliestprocessTime() - tWorkStart;
1396  errorMessage += std::to_string(std::chrono::duration_cast<std::chrono::duration<double>>(deltaT).count());
1397  state |= stateBitType::failed;
1398  return;
1399  }
1400  state |= stateBitType::done;
1401 }
1402 
1403 
1405  ioHandle &output) {
1407  input.setBlockSize(userReadBlockSize);
1408  }
1410  output.setBlockSize(userWriteBlockSize);
1411  }
1412 
1413  memoryBlockSize = input.getBlockSize() * output.getBlockSize();
1414  // sanitize block sizes, ensure they at least fit
1415  memoryBlockSize = std::min(memoryBlockSize, copyRequest::base::maxMemoryBlockSize.fGetValue());
1416  memoryBlockSize = std::max(memoryBlockSize, copyRequest::base::minMemoryBlockSize.fGetValue());
1417  input.setBlockSize(std::min(input.getBlockSize(), memoryBlockSize));
1418  output.setBlockSize(std::min(output.getBlockSize(), memoryBlockSize));
1419 }
1420 
1422  std::unique_lock<decltype(objectCountMutex)> lock(objectCountMutex);
1423  while (objectCount > 0) {
1424  objectCountCondVar.wait(lock);
1425  }
1426 }
1428  std::unique_lock<decltype(objectCountMutex)> lock(objectCountMutex);
1429  return (objectCount > 0);
1430 }
1431 
1432 void copyRequest::base::getSuffix(const std::string& path,
1433  std::string& suffix) {
1434  auto fileNameStart = path.find_last_of('/');
1435  if (fileNameStart == std::string::npos) {
1436  fileNameStart = 0;
1437  }
1438  auto const fileName = path.substr(fileNameStart);
1439  auto const suffixStart = fileName.find_last_of('.');
1440  if (suffixStart != decltype(fileName)::npos) {
1441  suffix = fileName.substr(suffixStart);
1442  }
1443 }
1444 
1446  threadData = nullptr;
1447  tEnqueue = clock_type::now();
1448  {
1449  std::unique_lock<decltype(objectCountMutex)> lock(objectCountMutex);
1450  ++objectCount;
1451  }
1452  getSuffix(source, suffix);
1453 
1454  for (const auto& item : std::vector<std::pair<const std::string&,
1455  const decltype(sourcePrefixRegex)&>>({
1456  {source, sourcePrefixRegex},
1457  {destination, destinationPrefixRegex}
1458 })) {
1459  if (item.second.fIsSet()) {
1460  std::smatch match;
1461  if (std::regex_match(item.first, match, item.second)) {
1462  if (match.size() == 2) { // we have a sub match as required
1463  prefix = match[1].str();
1464  }
1465  }
1466  }
1467  }
1468  // set this up before any return can happen, otherwise we can get empty lists..
1469  for (auto checksumCreator : checksumCreators) {
1470  checkSums.push_front(checksumCreator->create());
1471  }
1472 
1473  if (std::any_of(prohibitive_attrs.cbegin(), prohibitive_attrs.cend(),
1474  [this, InputHandler](decltype(prohibitive_attrs)::value_type attr) {
1475  return ! InputHandler->getXattr(source, attr).empty();
1476  })) {
1477  state.set(stateBitType::trucated);
1478  state.set(stateBitType::ignore);
1479  return;
1480  }
1481  if (readInitialStat == nullptr) {
1482  readInitialStat = InputHandler->getStat(source, false);
1483  }
1484  if (readInitialStat == nullptr) {
1485  if (!(state & stateBitType::fileToBeRenamed)) {
1486  state.set(stateBitType::vanished);
1487  }
1488  return;
1489  }
1490  if (readInitialStat->isLink()) {
1491  if (dereference) {
1492  readInitialStat = InputHandler->getStat(source, true);
1493  if (readInitialStat == nullptr) {
1494  throw std::runtime_error("can't dereference " + source);
1495  }
1496  } else if (state & stateBitType::fileToBeCopied) {
1498  }
1499  }
1500 
1501  if (state & stateBitType::fileToBeRenamed) {
1502  return; // no directory test needed if we move
1503  }
1504 
1505  if (readInitialStat->isDir()) {
1506  if (state & stateBitType::fileToBeRemoved) {
1507  state |= stateBitType::ignore;
1508  } else {
1509  state |= stateBitType::ignore;
1511  source, "init",
1512  "is a directory ", destination, " inotified at ", std::fixed, std::chrono::duration_cast<std::chrono::duration<double>>(tInotify.time_since_epoch()).count());
1513  //throw std::runtime_error(aSource + " is a directory, not expected here");
1514  }
1515  }
1516 }
1518  const std::string& aSource,
1519  const std::string& aDestination,
1520  std::unique_ptr<const genericStat>& aStat,
1521  const singleMap& aMapEntry,
1522  bool remove,
1523  clock_type::time_point timestamp) :
1524  readInitialStat(std::move(aStat)),
1526  source(aSource),
1527  destination(aDestination),
1528  mapEntry(aMapEntry),
1529  tInotify(timestamp),
1530  retries(0) {
1531  init(InputHandler);
1532 }
1534  const std::string& aSource,
1535  const std::string& aDestination,
1536  const singleMap& aMapEntry,
1537  bool remove,
1538  clock_type::time_point timestamp) :
1540  source(aSource),
1541  destination(aDestination),
1542  mapEntry(aMapEntry),
1543  tInotify(timestamp),
1544  retries(0) {
1545  init(InputHandler);
1546 }
1548  const std::string& aSource,
1549  const std::string& aDestination,
1550  const std::string& aMoveSource,
1551  const std::string& aOrigSource,
1552  const singleMap& aMapEntry,
1553  clock_type::time_point timestamp) :
1554  state(stateBitType::fileToBeRenamed),
1555  source(aSource),
1556  destination(aDestination),
1557  moveSource(aMoveSource),
1558  origSource(aOrigSource),
1559  mapEntry(aMapEntry),
1560  tInotify(timestamp),
1561  retries(0) {
1562  init(InputHandler);
1563 }
1564 
1565 copyRequest::base::~base() noexcept (false) {
1566  {
1567  std::unique_lock<decltype(objectCountMutex)> lock(objectCountMutex);
1568  --objectCount;
1569  if (objectCount == 0) {
1570  objectCountCondVar.notify_all();
1571  }
1572  }
1573 
1574  if (state & stateBitType::ignore) {
1575  return;
1576  }
1577 
1578  tTotalStat.addValue(tWorkDone - tInotify);
1579  tPipeStat.addValue(tEnqueue - tInotify);
1580  tWaitStat.addValue(tWorkStart - tEnqueue);
1581 
1582  if (state & stateBitType::done && state & stateBitType::fileToBeCopied) {
1583  auto dtCopy = tWorkDone - tWorkStart;
1584  auto tModification = readInitialStat->getMtime();
1585  auto d = tInotify - tModification;
1586  tTotal2Stat.addValue(tWorkDone - tModification);
1587  tInotifyStat.addValue(d);
1588  tCopyStat.addValue(dtCopy);
1589  if (readInitialStat) {
1590  bytesStat.addValue(readInitialStat->size);
1591  }
1592  auto dtCopyReal = std::chrono::duration_cast<std::chrono::duration<double>>(dtCopy).count();
1593 
1594  if (dtCopyReal > 0) {
1595  speedStat.addValue(readInitialStat->size / (1024. * 1024) / dtCopyReal);
1596  }
1597  }
1598 }
1599 
1600 
1601 void copyRequest::base::addExpectedChecksumResult(const std::string& checkSumType,
1602  const std::string& expectedValue) {
1603  for (auto& cksum : checkSums) {
1604  if (cksum->getName() == checkSumType) {
1605  cksum->setExpectedResult(expectedValue);
1606  return;
1607  }
1608  }
1609  auto checksumCreator = checksum::base::newFactory(checkSumType);
1610  checksumCreators.push_front(checksumCreator);
1611  auto cksum = checksumCreator->create();
1612  cksum->setExpectedResult(expectedValue);
1613  checkSums.push_front(cksum);
1614 }
1615 
1616 
1617 
1618 void copyRequest::base::printResults(std::ostream& hashStream,
1619  std::ostream& logStream) {
1620  timerInst(all);
1622  if (!(state & stateBitType::append)
1623  && (state & stateBitType::done)
1624  && !continueOnError) {
1625  static auto escaper = escapism::newEscaper("C");
1626  for (const auto& sum : checkSums) {
1627  std::string name;
1628  escaper->escape(noCopy ? source : destination, name);
1629  if (checkSums.size() > 1) {
1630  hashStream << sum->getName() << ": ";
1631  }
1632  hashStream << sum->getResult() << " " << name << "\n";
1633  }
1634  }
1635 
1636  bool needSpace = false;
1637 
1638  if (!logFields.empty() && !(state & stateBitType::done)) {
1639  logStream << logstream::level::warning;
1640  }
1641  std::string operation;
1642  if (state & stateBitType::fileToBeCopied) {
1643  operation = "copy";
1644  } else if (state & stateBitType::linkToBeMade) {
1645  operation = "link";
1646  } else if (state & stateBitType::fileToBeRemoved) {
1647  operation = "remove";
1648  } else if (state & stateBitType::fileToBeRenamed) {
1649  operation = "move";
1650  }
1651  if (resultOutput && (state & stateBitType::done)) {
1652  timerInst(sendMsg);
1653  #ifdef WithJsonCpp
1654  static std::atomic<Json::UInt64> messageCounter(0);
1655  Json::Value root;
1656  root["msgnr"] = messageCounter++;
1657  root["operation"] = operation;
1658  root["path"] = destination;
1659  root["source"] = source;
1660  if (readInitialStat != nullptr) {
1661  root["size"] = static_cast<Json::Int64>(readInitialStat->size);
1662  }
1663  for (const auto& sum : checkSums) {
1664  root[sum->getName()] = sum->getResult();
1665  }
1666  #ifdef WithMagic
1667  if (doMagic) {
1668  root["fileType"] = fileMagic;
1669  }
1670  #endif
1671  root["finishTime"] = std::chrono::duration_cast<std::chrono::duration<double>>(tWorkDone.time_since_epoch()).count();
1672  root["inotifyTime"] = std::chrono::duration_cast<std::chrono::duration<double>>(tInotify.time_since_epoch()).count();
1673  root["retries"] = retries;
1674  if (!prefixJsonName.empty()) {
1675  root[prefixJsonName] = prefix.empty() ? statPrefix : prefix;
1676  }
1677  for (auto& item : jsonExtraFields) {
1678  root[item.first] = item.second;
1679  }
1680  static Json::FastWriter jsonWriter;
1681  resultOutput->send(jsonWriter.write(root), prefix);
1682  #else
1683  resultOutput->send(destination, prefix);
1684  #endif
1685  }
1686  if (!(state & stateBitType::done) || (state & stateBitType::failed)) {
1687  timerInst(errmsgEmit);
1689  source, operation, errorMessage);
1690  }
1691  timerInst(toLogStream);
1692  for (const auto& field : logFields) {
1693  std::string value(field);
1694  static auto escaper = escapism::newEscaper("Url");
1695  if (!expandAttrValue(value)) { // not expanded, try special ones
1696  if (value == "%source") {
1697  value = "'";
1698 
1699  if ((state & stateBitType::fileToBeRenamed)
1700  && !(state & stateBitType::fileToBeCopied)) {
1701  value += moveSource;
1702  } else {
1703  value += source;
1704  }
1705 
1706  value.push_back('\'');
1707  } else if (value == "%destination") {
1708  value = "'";
1709  value += destination;
1710  value.push_back('\'');
1711  } else if (value == "%urlsource") {
1712  value.clear();
1713 
1714  if ((state & stateBitType::fileToBeRenamed)
1715  && !(state & stateBitType::fileToBeCopied)) {
1716  escaper->escape(moveSource, value);
1717  } else {
1718  escaper->escape(source, value);
1719  }
1720  } else if (value == "%urldestination") {
1721  value.clear();
1722  escaper->escape(destination, value);
1723  } else if (value == "%retries") {
1724  value = std::to_string(retries);
1725  } else if (value == "%error") {
1726  value = errorMessage;
1727 
1728  if (state & stateBitType::vanished
1729  && !(state & stateBitType::fileToBeRemoved)) {
1730  value += " vanished before action started";
1731  }
1732  } else if (value == "%op") {
1733  if (state & stateBitType::fileToBeCopied) {
1734  value = "->";
1735  } else if (state & stateBitType::linkToBeMade) {
1736  value = "ln";
1737  } else if (state & stateBitType::fileToBeRemoved) {
1738  value = "rm";
1739  } else if (state & stateBitType::fileToBeRenamed) {
1740  value = "mv";
1741  }
1742  }
1743  }
1744 
1745  if (needSpace) {
1746  logStream << " ";
1747  }
1748 
1749  logStream << value;
1750  needSpace = true;
1751  }
1752 
1753  if (needSpace) {
1754  logStream << "\n";
1755  }
1756 
1757 
1758  if (verbose) {
1759  logStream << "'" << source << "' ";
1760 
1761  if (state & stateBitType::fileToBeCopied) {
1762  logStream << " -> ";
1763  }
1764 
1765  if (state & stateBitType::linkToBeMade) {
1766  logStream << " link ";
1767  }
1768 
1769  if (state & stateBitType::fileToBeRemoved) {
1770  logStream << " rm ";
1771  }
1772 
1773  if (state & stateBitType::fileToBeRenamed) {
1774  logStream << " mv ";
1775  }
1776 
1777  if (state & stateBitType::vanished) {
1778  logStream << " vanished before action started";
1779  }
1780 
1781  if (state & stateBitType::append) {
1782  logStream << " was appended to";
1783  }
1784 
1785  if (state & stateBitType::done) {
1786  logStream << " '" << destination << "' after " << retries << " attempts";
1787  }
1788 
1789  if (state & stateBitType::ignore) {
1790  logStream << " is ignored";
1791  }
1792 
1793  if (state & stateBitType::trucated) {
1794  logStream << " due to truncation";
1795  }
1796 
1797  if (state & stateBitType::failed) {
1798  logStream << " failed due to " << errorMessage;
1799  }
1800 
1801  if (state & stateBitType::inWork) {
1802  logStream << " inWork not cleared";
1803  }
1804 
1805  if (state & stateBitType::attributeMismatch) {
1806  logStream << " attribute mismatch: " << errorMessage;
1807  }
1808 
1809  logStream << "\n";
1810  }
1811 }
1812 
1816 std::function<void(std::ostream&)>& copyRequest::base::getStatPrinter() {
1817  return statPrinter;
1818 }
1819 void copyRequest::base::setStatPrinter(void(*f)(std::ostream&)) {
1820  statPrinter = f;
1821 }
1822 void copyRequest::base::printStats(std::ostream& stream) {
1823  stream << statPrefix << tTotal2Stat << "\n";
1824  stream << statPrefix << tTotalStat << "\n";
1825  stream << statPrefix << tInotifyStat << "\n";
1826  stream << statPrefix << tPipeStat << "\n";
1827  stream << statPrefix << tWaitStat << "\n";
1828  stream << statPrefix << tCopyStat << "\n";
1829  stream << statPrefix << tEnqueueStat << "\n";
1830  stream << statPrefix << bytesStat << "\n";
1831  stream << statPrefix << speedStat << "\n";
1832  stream << statPrefix << bytesStat.getN() << " files with " << bytesStat.sum << "B transferred\n";
1833 
1834  auto tElapsed = std::chrono::duration_cast<std::chrono::duration<double>>(clock_type::now() - tProgramStart).count();
1835  stream << statPrefix << "avg rate: " << bytesStat.sum / (tElapsed * 1024 * 1024) << " MiB/s in " << tElapsed << " s\n";
1836  for (unsigned int i = 0; i < processMultiplicities->size(); i++) {
1837  stream << statPrefix << i << " processes at once: "
1838  << (*processMultiplicities)[i] << " times\n";
1839  }
1840 
1841  std::multimap<clock_type::duration, std::string> waitMap;
1842  for (const auto& item : advisoryWaitMap) {
1843  waitMap.emplace(item.second, item.first);
1844  }
1845  for (const auto& item : waitMap) {
1846  stream << statPrefix << " wait time for " << item.second << " files: "
1847  << std::chrono::duration_cast<std::chrono::duration<double>>(item.first).count() << "s\n";
1848  }
1850 };
1851 
1853  tTotal2Stat.reset();
1854  tTotalStat.reset();
1855  tInotifyStat.reset();
1856  tPipeStat.reset();
1857  tWaitStat.reset();
1858  tCopyStat.reset();
1859  tEnqueueStat.reset();
1860  bytesStat.reset();
1861  speedStat.reset();
1862  bytesStat.reset();
1864 }
1865 
1866 bool copyRequest::base::retry(std::unique_ptr<base>& request,
1867  timedQueue& delayedRequests) {
1868  if ((request->state & stateBitType::failed) &&
1869  !(request->state & stateBitType::fileToBeRemoved) &&
1870  (request->retries < maxRetries)) {
1871  static ssize_t lastSize;
1872  ssize_t size;
1873  {
1874  std::ifstream statm("/proc/self/statm");
1875  statm >> size;
1876  }
1877  lastSize = size;
1878  request->state.clear(stateBitType::failed);
1879  auto waitTime = request->tWorkDone - request->tWorkStart + std::chrono::seconds(1);
1880  waitTime *= request->retries;
1882  request->source, "re-sched",
1883  request->errorMessage,
1884  ", delaying by ", std::fixed, std::chrono::duration_cast<std::chrono::duration<double>>(waitTime).count(),
1885  "s vmsize: ", size, "(", lastSize, ", ", size - lastSize, "), retries: ", request->retries);
1886  delayedRequests.enqueue(request, waitTime);
1887  if (request != nullptr) {
1888  throw std::runtime_error("requesy not null");
1889  }
1890  return true;
1891  }
1892  return false;
1893 }
1894 
1896  simpleQueue& resultQueue,
1897  timedQueue& delayedRequests) {
1898  try {
1899  perThreadData threadData;
1900 
1901  while (auto request = queue.dequeue()) {
1903  break;
1904  }
1905  while (request) {
1906  auto dt = request->filesInWorkIterator->second.getEarliestprocessTime()
1907  - clock_type::now();
1908  if (dt > decltype(dt)::zero()) {
1910  request->source, "delay",
1911  "scheduled too early by ", std::chrono::duration_cast<std::chrono::duration<double>>(dt).count(), "s");
1912  auto newDelay = request->filesInWorkIterator->second.getEarliestprocessTime() - request->tInotify;
1913  request->adviseDelay(newDelay);
1914  delayedRequests.enqueue(request, dt);
1915  if (request != nullptr) {
1916  throw std::runtime_error("request not nullptr");
1917  }
1918  break;
1919  }
1920 
1921 
1923  request->process(threadData);
1924  if (! retry(request, delayedRequests)) {
1925  copyRequest::fileInWork::eraser eraser(*request);
1926  resultQueue.enqueue(request);
1927  if (request != nullptr) {
1928  throw std::runtime_error("request not nullptr");
1929  }
1930  request = eraser.getNext();
1931  }
1932  }
1933  }
1934  } catch (const std::exception& e) {
1935  stopRequest::RequestStop(e.what());
1937  "--", "process", "stop requested due to ", e.what());
1938  } catch (...) {
1939  stopRequest::RequestStop("unknown exception");
1941  "--", "process", "stop requested due to unknown exception");
1942  }
1943 }
1944 
1945 namespace copyRequest {
1946  std::ostream& operator<< (std::ostream& stream,
1947  const copyRequest::clock_type::time_point& t) {
1948  stream << std::chrono::duration_cast<std::chrono::duration<double>>(t.time_since_epoch()).count();
1949  return stream;
1950  }
1951 
1952  std::ostream& operator<<(std::ostream& out, const base& request) {
1953  out << std::fixed;
1954  out << " request " << static_cast<const void*>(&request);
1955  out << " for '" << request.source << "' to '" << request.destination << "'";
1956  out << " state " << std::hex << static_cast<unsigned>(request.state) << std::dec;
1957  out << " tInotify " << request.tInotify;
1958  out << " tEarliest " << request.filesInWorkIterator->second.getEarliestprocessTime();
1959  out << " now: " << clock_type::now() << "\n";
1960  return out;
1961  }
1962 }
inputHandler::base::newHandler
static base * newHandler(const std::string &name)
create an instance of an inputHandler, select by name
Definition: inputHandler.h:130
copyRequest::base::userWriteBlockSize
static options::single< size_t > userWriteBlockSize
Definition: copyRequest.h:294
copyRequest::fileInWork::filesInWork
static std::map< std::string, fileInWork > filesInWork
maps file names to filesInWork objects
Definition: fileInWork.h:30
copyRequest::base::tWaitStat
static statCollector::typed< decltype(tWorkStart - tEnqueue) > tWaitStat
Definition: copyRequest.h:312
copyRequest::backupModeType::during
@ during
verbose
options::single< bool > verbose
dereference
options::single< bool > dereference
delayAdvisingError
class for exceptions that advise for delays Exceptions of this kind are to be thrown when circumstanc...
Definition: inputHandler.h:22
options::single< std::string >
copyRequest::exceptionList
Definition: copyRequest.h:79
ioHandle::setXattr
virtual void setXattr(const std::string &, const std::string &)
Definition: ioHandle.h:43
copyRequest::stateBitType
stateBitType
Definition: copyRequestTypes.h:50
copyRequest::perThreadData::InputHandler
inputHandler::base * InputHandler
Definition: copyRequest.h:69
copyRequest::base::nReadThreads
static options::single< unsigned > nReadThreads
Definition: copyRequest.h:128
copyRequest::backupModeType::before
@ before
errMsgQueue.h
copyRequest::backupModeType::after
@ after
errMsg::level::warning
@ warning
noCopy
options::single< bool > noCopy
copyRequest::stateBitType::attributeMismatch
@ attributeMismatch
block::offset
size_t offset() const
Definition: block.h:25
errMsg::location
class for defining the location of a error message in the source code.
Definition: errMsgQueue.h:14
copyRequest::base::errorMessage
std::string errorMessage
Definition: copyRequest.h:283
copyRequest::stateBitType::fileToBeCopied
@ fileToBeCopied
ioHandle::removeXattr
virtual void removeXattr(const std::string &)
Definition: ioHandle.h:55
throwcall::badval
T badval(T call, t badvalue, const Args &... args)
template function to wrap system calls that return a special bad value on failure
Definition: throwcall.h:54
copyRequest::blockQueue
waitQueues::simple< block > blockQueue
Definition: copyRequest.h:63
copyRequest::base::speedStat
static statCollector::typed< double > speedStat
Definition: copyRequest.h:315
copyRequest::base::tEnqueueStat
static statCollector::typed< copyRequest::clock_type::duration > tEnqueueStat
Definition: copyRequest.h:320
waitQueues::timed< copyRequest::base >
copyRequest::base::perFileThreads
static options::single< bool > perFileThreads
Definition: copyRequest.h:125
statPrefix
options::single< std::string > statPrefix
copyRequest::base::tTotalStat
static statCollector::typed< decltype(tWorkDone - tInotify) > tTotalStat
Definition: copyRequest.h:308
waitQueues::simple::size
decltype(queue.size()) size() const
Definition: waitQueues.h:99
copyRequest::base::sourcePrefixRegex
static options::single< std::regex > sourcePrefixRegex
Definition: copyRequest.h:132
inputHandler::base::reader::readBlockP
virtual void readBlockP(block &, size_t, off_t)
read one block from the file, starting at offset.
Definition: inputHandler.h:100
copyRequest::base::writeActor::joinable
bool joinable() const
Definition: copyRequest.cpp:666
fatalException
class for exceptions that should result in program termination
Definition: copyRequestTypes.h:11
copyRequest::base::success_dest_attrs
static options::map< std::string > success_dest_attrs
Definition: copyRequest.h:110
md5sum.h
outputHandler::base::newHandler
static base * newHandler(const std::string &name)
Definition: outputHandler.h:80
copyRequest::base::getSuffix
const std::string & getSuffix() const
Definition: copyRequest.h:421
copyRequest::base::maxMemoryBlockSize
static options::single< size_t > maxMemoryBlockSize
Definition: copyRequest.h:297
copyRequest::stateBitType::inWork
@ inWork
outputHandler::base::writer::writeBlockP
virtual void writeBlockP(const block &)
Definition: outputHandler.h:60
copyRequest::base::backupMode
static backupModeType backupMode
Definition: copyRequest.h:120
copyRequest::stateBitType::ignore
@ ignore
errMsg::level::crit
@ crit
errMsg::level::info
@ info
operator<<
std::ostream & operator<<(std::ostream &stream, const copyRequest::clock_type::time_point &t)
Definition: copyRequest.cpp:36
copyRequest::base::hasher
void hasher(blockQueue &blocksToHash, blockQueue &hashedBlocks, exceptionList &exceptions)
Definition: copyRequest.cpp:738
copyRequest::base::tInotifyStat
static statCollector::typed< decltype(tWorkDone - tInotify) > tInotifyStat
Definition: copyRequest.h:310
copyRequest::base::makeSymLink
bool makeSymLink(inputHandler::base *InputHandler, outputHandler::base *OutputHandler)
copyRequest::base::advisoryWaitMapMutex
static std::mutex advisoryWaitMapMutex
Definition: copyRequest.h:337
copyRequest::base::waitForAllInstancesGone
static void waitForAllInstancesGone()
Definition: copyRequest.cpp:1421
outputHandlerName
options::single< std::string > outputHandlerName
ioHandle::getXattr
virtual std::string getXattr(const std::string &)
get one extended attribute value
Definition: ioHandle.h:51
copyRequest::base
class for copy requests.
Definition: copyRequest.h:99
copyRequest::exceptionList::add
void add(std::exception_ptr e)
Definition: copyRequest.cpp:185
copyRequest::base::bytesStat
static statCollector::typed< decltype(readInitialStat->size) > bytesStat
Definition: copyRequest.h:314
copyRequest::base::filesInWorkIterator
decltype(fileInWork::filesInWork.begin()) filesInWorkIterator
Definition: copyRequest.h:341
copyRequest::base::hashCalculator::parallelized
bool parallelized() const
Definition: copyRequest.cpp:716
copyRequest::stateBitType::done
@ done
copyRequest::base::retry
static bool retry(std::unique_ptr< base > &request, timedQueue &delayedRequests)
Definition: copyRequest.cpp:1866
copyRequest::base::tWorkStart
clock_type::time_point tWorkStart
Definition: copyRequest.h:305
scoped::generic
Definition: scoped.h:34
copyRequest::base::changeRequestType
void changeRequestType(stateBitType newType)
Definition: copyRequest.cpp:1072
timer
Definition: timer.cpp:3
copyRequest::base::check_source_attrs
static options::map< std::string > check_source_attrs
Definition: copyRequest.h:109
copyRequest::base::hashCalculator
Definition: copyRequest.h:186
outputHandler::base::renameRetvalType::fileChanged
@ fileChanged
inputHandler::base::readLinkTarget
virtual bool readLinkTarget(const std::string &, std::vector< char > &)
read link target from a symlink
Definition: inputHandler.h:149
copyRequest::stateBitType::failed
@ failed
copyRequest::base::failure_source_attrs
static options::map< std::string > failure_source_attrs
Definition: copyRequest.h:106
copyRequest::base::processMultiplicities
static std::vector< std::atomic< unsigned int > > * processMultiplicities
Definition: copyRequest.h:321
copyRequest::backupModeType::none
@ none
logstream::level::warning
@ warning
copyRequest::base::writeActor::writeActor
writeActor(std::unique_ptr< outputHandler::base::writer > &writeHandle, blockQueue &blocksToWrite, blockQueue &writtenBlocks, bool mayParallelize, base *request, exceptionList &exceptions)
Definition: copyRequest.cpp:625
copyRequest::base::doBlockSizeSetup
void doBlockSizeSetup(ioHandle &input, ioHandle &output)
Definition: copyRequest.cpp:1404
scoped.h
gid
options::single< int > gid
copyRequest::base::tInotify
clock_type::time_point tInotify
Definition: copyRequest.h:303
copyRequest::base::attrset
void attrset(ioHandle &writeHandle, const std::map< std::string, std::string > &attrs)
Definition: copyRequest.cpp:446
copyRequest::base::expandAttrValue
bool expandAttrValue(std::string &value)
expand the keywords in value, see Expansion of key words in attributes
Definition: copyRequest.cpp:365
copyRequest::stateBitType::vanished
@ vanished
copyRequest::base::hash_worker
void hash_worker(checksum::parallel *sum, blockQueue &blocksToHash, blockQueue &hashedBlocks, exceptionList &exceptions)
Definition: copyRequest.cpp:720
copyRequest::stateType
Definition: copyRequestTypes.h:66
copyRequest::base::reader
void reader(inputHandler::base::reader &input, blockQueue &freeBlocks, blockQueue &readBlocks, bool mayParallelize, exceptionList &exceptions)
Definition: copyRequest.cpp:530
ioHandle::getBlockSize
virtual size_t getBlockSize() const
Definition: ioHandle.h:58
copyRequest::base::hashCalculator::parallelSum
checksum::parallel * parallelSum
Definition: copyRequest.h:188
block::isHole
bool isHole() const
Definition: block.h:36
copyRequest::base::processQueue
static void processQueue(simpleQueue &queue, simpleQueue &resultQueue, timedQueue &delayedRequests)
Definition: copyRequest.cpp:1895
copyRequest::stateBitType::fileToBeRemoved
@ fileToBeRemoved
errMsg::level::debug
@ debug
copyRequest::base::doUnthreadedCopy
void doUnthreadedCopy(inputHandler::base::reader &input, std::unique_ptr< outputHandler::base::writer > &writeHandle)
Definition: copyRequest.cpp:868
copyRequest::base::success_source_attrs
static options::map< std::string > success_source_attrs
Definition: copyRequest.h:105
stopRequest::handlerIdType::processLoop
@ processLoop
copyRequest::base::process_source_attrs
static options::map< std::string > process_source_attrs
Definition: copyRequest.h:107
copyRequest::base::setStatPrinter
static void setStatPrinter(void(*f)(std::ostream &))
Definition: copyRequest.cpp:1819
copyRequest::stateBitType::trucated
@ trucated
copyRequest::base::getBackupSuffix
std::string getBackupSuffix()
Definition: copyRequest.cpp:1077
copyRequest::base::logFields
static options::container< std::string > logFields
Definition: copyRequest.h:131
timer::anchor::print
static void print(std::ostream &out, const std::string &prefix)
Definition: timer.cpp:5
copyRequest::operator<<
std::ostream & operator<<(std::ostream &stream, const copyRequest::clock_type::time_point &t)
Definition: copyRequest.cpp:1946
nThreads
static options::single< unsigned > nThreads('n', "nThreads", "number of threads", 0)
copyRequest::stateBitType::fileToBeRenamed
@ fileToBeRenamed
copyRequest::exceptionList::size
size_t size() const
Definition: copyRequest.cpp:193
throttle::action
Definition: throttle.h:71
copyRequest::base::maxDelayTime
static options::single< double > maxDelayTime
Definition: copyRequest.h:328
copyRequest::base::setAttributesAfterClose
static options::single< bool > setAttributesAfterClose
Definition: copyRequest.h:124
copyRequest::base::tCopyStat
static statCollector::typed< decltype(tWorkDone - tWorkStart) > tCopyStat
Definition: copyRequest.h:313
outputHandler::base::renameRetvalType::fileVanished
@ fileVanished
copyRequest::exceptionList::front
std::exception_ptr front()
Definition: copyRequest.cpp:197
copyRequest::base::attrdel
static void attrdel(ioHandle &, const std::map< std::string, std::string > &attrs)
Definition: copyRequest.cpp:481
escapism.h
copyRequest::base::append_attrs
static options::container< std::string > append_attrs
Definition: copyRequest.h:112
outputHandler::base
Definition: outputHandler.h:17
defKeyword
#define defKeyword(kw)
Definition: copyRequest.cpp:261
block::bufferAt
void * bufferAt(size_t offset)
only way to access the data in the block
Definition: block.cpp:28
copyRequest::base::getStatPrinter
static std::function< void(std::ostream &)> & getStatPrinter()
we can't have static functions virtual, so we explicitly use a function to give us the proper print f...
Definition: copyRequest.cpp:1816
outputHandler::base::renameRetvalType::ok
@ ok
timer.h
copyRequest::base::writeActor::~writeActor
~writeActor() noexcept(false)
Definition: copyRequest.cpp:657
waitQueues::simple::emplace
void emplace(Types ... args)
Definition: waitQueues.h:45
fileRateLimit
throttle::watch fileRateLimit
copyRequest::base::init
void init(inputHandler::base *InputHandler)
Definition: copyRequest.cpp:1445
messageQueue::queue::send
virtual void send(const std::string &aMessage, const std::string &aTopic="")=0
throwcall.h
copyRequest::base::prohibitive_attrs
static options::container< std::string > prohibitive_attrs
Definition: copyRequest.h:111
block
data block, used to hold the data that are being copied (or checksummed).
Definition: block.h:7
copyRequest::fileInWork::typeChanger
Definition: fileInWork.h:109
timer::anchor::reset
static void reset()
Definition: timer.h:86
copyRequest::base::writeActor::workers
std::vector< std::thread > workers
Definition: copyRequest.h:245
copyRequest::base::readInitialStat
std::unique_ptr< const genericStat > readInitialStat
Definition: copyRequest.h:271
copyRequest::base::maxRetries
static options::single< unsigned > maxRetries
Definition: copyRequest.h:116
stopRequest::Requested
static bool Requested()
Definition: ewmscp.cpp:153
pathHandler::pathExists
virtual bool pathExists(const std::string &path)=0
outputHandler::base::remove
virtual void remove(const std::string &path, copyRequest::stateType &state)=0
options::single::fGetValue
const T & fGetValue() const
Definition: Options.h:589
copyRequest::base::tPipeStat
static statCollector::typed< decltype(tEnqueue - tInotify) > tPipeStat
Definition: copyRequest.h:311
copyRequest::fileInWork::eraser::getNext
std::unique_ptr< copyRequest::base > getNext()
Definition: fileInWork.cpp:66
copyRequest::base::start_source_attrs
static options::map< std::string > start_source_attrs
Definition: copyRequest.h:108
defineStaticNoArg
#define defineStaticNoArg(var)
defines a static variable that needs no arguments to it's constructor
Definition: ewmscp.h:44
copyRequest::base::removeFileOrDirectory
void removeFileOrDirectory(inputHandler::base *InputHandler, outputHandler::base *OutputHandler)
Definition: copyRequest.cpp:914
copyRequest::base::writer
void writer(std::unique_ptr< outputHandler::base::writer > &writeHandle, blockQueue &blocksToWrite, blockQueue &writtenBlocks, exceptionList &exceptions)
Definition: copyRequest.cpp:592
copyRequest::base::syncWrittenFiles
static options::single< bool > syncWrittenFiles
Definition: copyRequest.h:122
copyRequest::base::printResults
virtual void printResults(std::ostream &hashStream, std::ostream &logStream)
Definition: copyRequest.cpp:1618
copyRequest::stateBitType::action
@ action
copyRequest::base::hashCalculator::hashCalculator
hashCalculator(blockQueue &blocksToHash, blockQueue &hashedBlocks, base *request, bool mayParallelize, exceptionList &exceptions)
Definition: copyRequest.cpp:669
copyRequest::base::backupSuffix
static options::single< std::string > backupSuffix
Definition: copyRequest.h:121
copyRequest::base::nWriteThreads
static options::single< unsigned > nWriteThreads
Definition: copyRequest.h:130
waitQueues::simple::enqueue
void enqueue(std::unique_ptr< T > &item)
Definition: waitQueues.h:37
copyRequest::base::userReadBlockSize
static options::single< size_t > userReadBlockSize
Definition: copyRequest.h:296
copyRequest::base::tTotal2Stat
static statCollector::typed< decltype(tWorkDone - tInotify) > tTotal2Stat
Definition: copyRequest.h:309
copyRequest::base::retries
unsigned retries
Definition: copyRequest.h:323
copyRequest::base::objectCountCondVar
static std::condition_variable objectCountCondVar
Definition: copyRequest.h:372
copyRequest.h
preserve
decltype(preserve) preserve
set of properties to preserve in the copy
Definition: ewmscp.cpp:111
copyRequest::base::threadData
perThreadData * threadData
Definition: copyRequest.h:281
waitQueues::simple::signalDone
void signalDone()
Definition: waitQueues.h:49
copyRequest::base::printStats
static void printStats(std::ostream &stream)
Definition: copyRequest.cpp:1822
inputHandlerName
options::single< std::string > inputHandlerName
outputHandler::base::writer::seek
virtual void seek(size_t position)
Definition: outputHandler.cpp:27
copyRequest::perThreadData
class for thread-specific data
Definition: copyRequest.h:67
copyRequest::base::state
stateType state
Definition: copyRequest.h:282
copyRequest::base::getBackupModeNameMap
static const std::map< std::string, backupModeType > & getBackupModeNameMap()
Definition: copyRequest.cpp:235
copyRequest::base::checkAttributes
void checkAttributes(ioHandle &, const std::map< std::string, std::string > &check_source_attr)
Definition: copyRequest.cpp:486
checksum::parallel
Definition: checksumBase.h:82
copyRequest::exceptionList::empty
bool empty() const
Definition: copyRequest.cpp:189
waitQueues::timed::enqueue
void enqueue(std::unique_ptr< T > &item, typename clock_type::time_point when)
Definition: waitQueues.h:216
pathHandler::getXattr
virtual std::string getXattr(const std::string &, const std::string &)
Definition: pathHandler.h:12
uid
options::single< int > uid
escapism::newEscaper
static const escapism * newEscaper(const std::string &name)
Definition: escapism.cpp:25
inputHandler::base
class for handling input This is the (abstract) base class for handling input, both reading a file vi...
Definition: inputHandler.h:35
copyRequest::stateBitType::noActionNeeded
@ noActionNeeded
copyRequest::base::forceParallelRead
static options::single< bool > forceParallelRead
Definition: copyRequest.h:129
copyRequest::base::getAdvisedDelay
clock_type::duration getAdvisedDelay() const
returns the advised delay
Definition: copyRequest.cpp:1059
pathHandler::getStat
virtual std::unique_ptr< const genericStat > getStat(const std::string &path, bool followLink=true)=0
copyRequest::base::nConcurrentProcesses
static std::atomic< unsigned int > nConcurrentProcesses
Definition: copyRequest.h:318
checksumCreators
decltype(checksumCreators) checksumCreators
Definition: ewmscp.cpp:113
errMsg::emit
void emit(level aLogLevel, const location &loc, const std::string &aObject, const std::string &aAction, const Args &... args)
function to create and enqueue a message, this is the only way that messages should be created!
Definition: errMsgQueue.h:148
copyRequest::stateBitType::linkToBeMade
@ linkToBeMade
copyRequest::perThreadData::perThreadData
perThreadData()
Definition: copyRequest.cpp:173
timerInst
#define timerInst(subfunc)
Definition: timer.h:157
copyRequest::base::process
virtual void process(perThreadData &threadData)
Definition: copyRequest.cpp:1109
waitQueues::simple
Definition: waitQueues.h:31
singleMap
std::pair< std::string, std::string > singleMap
Definition: copyRequest.h:52
copyRequest::base::source
const std::string source
Definition: copyRequest.h:284
block::size
size_t size() const
Definition: block.h:16
outputHandler::base::writer::doAttributePreservations
virtual void doAttributePreservations(const genericStat &readInitialStat)=0
copyRequest::perThreadData::OutputHandler
outputHandler::base * OutputHandler
Definition: copyRequest.h:70
copyRequest::base::tWorkDone
clock_type::time_point tWorkDone
Definition: copyRequest.h:306
continueOnError
options::single< bool > continueOnError
copyRequest::base::objectCount
static unsigned objectCount
Definition: copyRequest.h:371
copyRequest::base::hashCalculator::~hashCalculator
~hashCalculator() noexcept(false)
Definition: copyRequest.cpp:701
copyRequest::base::destinationPrefixRegex
static options::single< std::regex > destinationPrefixRegex
Definition: copyRequest.h:133
copyRequest::base::writeActor
Definition: copyRequest.h:244
copyRequest::base::advisoryWaitMap
static std::map< std::string, clock_type::duration > advisoryWaitMap
Definition: copyRequest.h:336
copyRequest::base::memoryBlockSize
size_t memoryBlockSize
Definition: copyRequest.h:300
copyRequest::base::~base
virtual ~base() noexcept(false)
Definition: copyRequest.cpp:1565
outputHandler::base::writer::getSize
virtual size_t getSize() const
Definition: outputHandler.cpp:30
outputHandler::base::writer::sync
virtual void sync()=0
checksum::base::newFactory
static factoryClass * newFactory(const std::string &aName)
Definition: checksumBase.h:68
ioHandle
class as base for inputHandler::base::reader and outputHandler::base::writer containing the common pa...
Definition: ioHandle.h:15
copyRequest::base::backupModeName
static options::withAction< options::single< std::string > > backupModeName
Definition: copyRequest.h:119
copyRequest::base::appendableFiles
static options::single< std::regex > appendableFiles
Definition: copyRequest.h:113
copyRequest::base::hashCalculator::workers
std::vector< std::thread > workers
Definition: copyRequest.h:187
copyRequest::base::statPrinter
static std::function< void(std::ostream &)> statPrinter
Definition: copyRequest.h:330
copyRequest::base::hashCalculator::joinable
bool joinable() const
Definition: copyRequest.cpp:713
copyRequest::base::adviseDelay
static bool adviseDelay(clock_type::duration dt, const std::string &suffix)
update the advised delay in the map.
Definition: copyRequest.cpp:1011
waitQueues::simple::dequeue
std::unique_ptr< T > dequeue(bool mayCreateNew, Types ... args)
Definition: waitQueues.h:59
f
int f(int a, int line)
Definition: cctest.cpp:4
outputHandler::base::writer::writeBlock
virtual void writeBlock(const block &b)=0
copyRequest::base::objectCountMutex
static std::mutex objectCountMutex
Definition: copyRequest.h:370
copyRequest::base::writeWorker
void writeWorker(std::unique_ptr< outputHandler::base::writer > &writeHandle, blockQueue &blocksToWrite, blockQueue &writtenBlocks, exceptionList &exceptions)
Definition: copyRequest.cpp:610
errMsg::level::notice
@ notice
copyRequest::base::readWorker
void readWorker(inputHandler::base::reader &input, blockReadRequestQueue &blockRequests, blockQueue &freeBlocks, blockQueue &readBlocks, exceptionList &exceptions)
Definition: copyRequest.cpp:510
resultOutput
messageQueue::queue * resultOutput
copyRequest::base::addExpectedChecksumResult
void addExpectedChecksumResult(const std::string &checkSumType, const std::string &expectedValue)
Definition: copyRequest.cpp:1601
enumAsBitmask::clear
void clear(const T aBits)
Definition: enumAsBitmask.h:31
copyRequest::base::ignoreExisting
static options::single< bool > ignoreExisting
Definition: copyRequest.h:114
stopRequest::RequestStop
static void RequestStop(const std::string &aReason)
Definition: ewmscp.cpp:162
copyRequest
Definition: checksumTestRequestProvider.cpp:25
outputHandler::base::createSymlink
virtual void createSymlink(const std::vector< char > &, const std::string &, uid_t, gid_t)
Definition: outputHandler.h:100
inputHandler::base::reader::readBlock
virtual bool readBlock(block &b)=0
read one block from the file
checksum
Definition: adler32.cpp:9
copyRequest::base::checkForInstances
static bool checkForInstances()
Definition: copyRequest.cpp:1427
copyRequest::base::destination
const std::string destination
Definition: copyRequest.h:285
copyRequest::base::tProgramStart
static clock_type::time_point tProgramStart
Definition: copyRequest.h:316
copyRequest::base::base
base(inputHandler::base *InputHandler, const std::string &aSource, const std::string &aDestination, std::unique_ptr< const genericStat > &aStat, const singleMap &aMapEntry, bool remove, clock_type::time_point timestamp)
constructor for copy,link and remove requests.
Definition: copyRequest.cpp:1517
copyRequest::fileInWork::eraser
class to get copyRequest out of the fileInWork list.
Definition: fileInWork.h:103
inputHandler::base::reader
(abstract) class for reading a file An instance of this class is used to read data from a file vie th...
Definition: inputHandler.h:72
copyRequest::base::hashCalculator::outputQueue
blockQueue & outputQueue
Definition: copyRequest.h:189
syslogstream.h
copyRequest::base::resetStats
static void resetStats()
Definition: copyRequest.cpp:1852
copyRequest::base::sparse
static options::single< std::string > sparse
Definition: copyRequest.h:118
copyRequest::stateBitType::append
@ append
copyRequest::base::checkSums
std::list< checksum::base * > checkSums
Definition: copyRequest.h:277
ioHandle::setBlockSize
virtual void setBlockSize(size_t newSize)
Definition: ioHandle.h:61
copyRequest::base::nSumThreads
static options::single< unsigned > nSumThreads
Definition: copyRequest.h:127
inputHandler.h
defineStatic
#define defineStatic(var,...)
defines a static variable and instatitates the constructor with the variable number of arguments.
Definition: ewmscp.h:42
checksum::parallel::update
virtual void update(void *data, size_t size, size_t offset)=0
copyRequest::base::doThreadedCopy
void doThreadedCopy(inputHandler::base::reader &input, std::unique_ptr< outputHandler::base::writer > &writeHandle)
Definition: copyRequest.cpp:813
copyRequest::base::linkBaseMap
static options::map< std::string > linkBaseMap
Definition: copyRequest.h:117
copyRequest::base::keywordMap
static std::map< std::string, void(base::*)(std::string &) const > & keywordMap()
Definition: copyRequest.cpp:246
copyRequest::base::changeAttrsOnIgnoreExisting
static options::single< bool > changeAttrsOnIgnoreExisting
Definition: copyRequest.h:115
copyRequest::backupModeType::remove
@ remove
copyRequest::base::minMemoryBlockSize
static options::single< size_t > minMemoryBlockSize
Definition: copyRequest.h:298
ioHandle::parallelizable
virtual bool parallelizable() const
tell if this handler is capable of parallel IO. Unsually not the case
Definition: ioHandle.h:38