33 #include <json/json.h>
37 const copyRequest::clock_type::time_point& t) {
38 stream << std::chrono::duration_cast<std::chrono::duration<double>>(t.time_since_epoch()).count();
47 "xattrs to set on source file on success");
49 "xattrs to set on source file on failure");
51 "xattrs to set on source file during copy, removed ad end of copy");
53 "xattrs to set on source file on start of copy");
55 "xattrs to check on source file after copy (or hashing)");
57 "xattrs to set on destination file on success");
61 '\0',
"prohibitive-attrs",
62 "xattrs that prohibit copying when present on source");
65 "xattrs that trigger append mode when present on source");
68 '\0',
"appendable-files",
69 "regexp for files that may be appended to");
72 '\0',
"ignoreExisting",
73 "ignore files where destination exists and has same stat values");
75 '\0',
"changeAttrsOnIgnoreExisting",
76 "update preserved attrs on files ignored by ignoreExisting");
79 "base dir that gets removed from absolute links");
81 "control creation of sparse files.",
"auto", {
"auto",
"never",
"always"});
85 backupMode = getBackupModeNameMap().at(me);
87 '\0',
"backupMode",
"backup mode",
"none", [] {std::vector<std::string> v;
for (
const auto& i : getBackupModeNameMap()) {
97 "sync new files after write",
false);
99 "set attributes after close() (needed for dCache)");
102 "extra write/checksum thread per file",
false);
104 "number of threads for parallelizable checksums per file",
107 "number of threads for parallelizable reads per file",
110 "do parallel read even on loaded system");
112 "number of threads for parallelizable writes per file",
115 "read block size, 0: from fs",
118 "write block size, 0: from fs",
123 "max block size of memory blocks",
126 "min block size of memory blocks",
130 "field in the log lines");
132 "regexp to determine log prefix from source path");
134 "regexp to determine log prefix from destination path");
137 defineStatic(copyRequest::base::doMagic,
'\0',
"doMagic",
138 "determine file type via libmagic");
162 "max seconds to delay files", 10);
177 magicCookie =
throwcall::badval(magic_open(MAGIC_NONE),
nullptr,
"can't init libmagic");
178 if (magic_load(magicCookie,
nullptr) != 0) {
179 throw std::runtime_error(magic_error(magicCookie));
186 std::unique_lock<decltype(listMutex)> lock(listMutex);
194 std::unique_lock<decltype(listMutex)> lock(listMutex);
195 return std::distance(list.cbegin(), list.cend());
198 std::unique_lock<decltype(listMutex)> lock(listMutex);
236 static std::map<std::string, backupModeType> backupModeNameMap = {
243 return backupModeNameMap;
247 static std::map<std::string, void (
copyRequest::base::*)(std::string&)
const> map;
261 #define defKeyword(kw) static copyRequest::base::registerme kw_##kw("%" #kw,©Request::base::kw_##kw); \
262 void copyRequest::base::kw_##kw(std::string& value) const
267 if (readInitialStat ==
nullptr) {
271 value = std::to_string(readInitialStat->size);
276 if (readInitialStat ==
nullptr) {
280 readInitialStat->getMtime(value);
284 value = std::to_string(std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()).count());
289 value = std::to_string(std::chrono::duration_cast<std::chrono::duration<double>>(std::chrono::system_clock::now().time_since_epoch()).count());
294 value = EWMSCP_VERSION;
299 value = EWMSCP_COMMIT;
313 if (errorMessage.empty()) {
314 value =
"no error message found";
316 value = errorMessage;
335 bool needComma =
false;
336 for (
const auto& bitName : bitNames) {
337 if (state & bitName.bit) {
342 value += bitName.name;
351 sprintf(buffer,
"%.6f", std::chrono::duration_cast<std::chrono::duration<double>>(tWorkDone - tInotify).count());
366 auto it = keywordMap().find(value);
367 if (it != keywordMap().end()) {
373 if (value.size() > 1 && value.at(0) ==
'%') {
374 auto keyword = value.substr(1);
375 for (
const auto& cksum : checkSums) {
376 if (cksum->getName() == keyword) {
377 value = cksum->getResult();
447 const std::map<std::string, std::string>& attrs) {
448 for (
const auto& attr : attrs) {
449 auto name = attr.first;
450 auto value = attr.second;
452 if (!expandAttrValue(value)) {
453 if (value ==
"%count") {
454 unsigned int count = 0;
455 auto retval = writeHandle.
getXattr(name);
456 if (!retval.empty()) {
458 count = std::stoul(retval);
461 source,
"xattr count",
462 "strange value '", retval,
"'");
466 value = std::to_string(count + 1);
467 }
else if (value ==
"%sums") {
468 for (
const auto& item : checkSums) {
469 std::string fullName(name);
470 fullName += item->getName();
471 writeHandle.
setXattr(fullName, item->getResult());
482 for (
const auto& attr : attrs) {
487 for (
const auto& attr : attrs) {
488 auto name = attr.first;
489 auto value = attr.second;
490 expandAttrValue(value);
491 auto fileValue = handle.
getXattr(name);
493 if (fileValue.empty()) {
495 errorMessage +=
"attr " + name +
" missing ";
499 if (value != fileValue) {
501 errorMessage +=
"attr " + name +
" mismatch " + value +
" i.o. " + fileValue;
506 throw std::runtime_error(errorMessage);
516 while (
auto request = blockRequests.
dequeue()) {
517 auto b = freeBlocks.
dequeue(readBlocks.
size() < 3, memoryBlockSize);
519 throw std::runtime_error(
"got no free block in readWorker");
521 input.
readBlockP(*b, request->size, request->offset);
525 exceptions.
add(std::current_exception());
536 unsigned nBlocks = readInitialStat->size / memoryBlockSize;
539 && (nConcurrentProcesses <
nThreads / nReadThreads + 1
540 || forceParallelRead)
542 && nBlocks >= nReadThreads) {
544 for (
size_t o = 0; o < readInitialStat->size; o += memoryBlockSize) {
545 auto s = readInitialStat->
size - o;
546 s = std::min(s, memoryBlockSize);
550 std::vector<std::thread> workers;
553 workers.resize(nReadThreads);
554 for (
auto& worker : workers) {
557 std::ref(readRequests),
558 std::ref(freeBlocks),
559 std::ref(readBlocks),
560 std::ref(exceptions));
563 for (
auto& worker : workers) {
568 auto b = freeBlocks.
dequeue(readBlocks.
size() < 3, memoryBlockSize);
570 throw std::runtime_error(
"got no free block in reader");
586 exceptions.
add(std::current_exception());
597 size_t lastOffset = 0;
598 while (
auto b = blocksToWrite.
dequeue()) {
599 if (b->offset() < lastOffset) {
600 throw std::logic_error(
"blocks not in order");
603 lastOffset = b->offset();
607 exceptions.
add(std::current_exception());
615 while (
auto b = blocksToWrite.
dequeue()) {
620 exceptions.
add(std::current_exception());
631 outputQueue(writtenBlocks) {
644 std::ref(writeHandle),
645 std::ref(blocksToWrite),
646 std::ref(writtenBlocks),
647 std::ref(exceptions));
651 std::ref(writeHandle),
652 std::ref(blocksToWrite),
653 std::ref(writtenBlocks),
654 std::ref(exceptions)));
658 if (workers.empty()) {
661 for (
auto& worker : workers) {
664 outputQueue.signalDone();
667 return !workers.empty();
674 outputQueue(hashedBlocks) {
680 && request->
checkSums.front()->parallelizable()
689 std::ref(blocksToHash),
691 std::ref(exceptions));
695 std::ref(blocksToHash),
697 std::ref(exceptions)));
702 if (workers.empty()) {
705 for (
auto& worker : workers) {
708 if (workers.size() > 1) {
709 parallelSum->parallelFinish();
711 outputQueue.signalDone();
714 return ! workers.empty();
717 return workers.size() != 1;
725 while (
auto b = blocksToHash.
dequeue()) {
728 sum->
update(b->size(), b->offset());
730 sum->
update(b->bufferAt(0), b->size(), b->offset());
735 exceptions.
add(std::current_exception());
742 while (
auto b = blocksToHash.
dequeue()) {
746 sum->update(b->size());
750 sum->update(b->bufferAt(0), b->size());
760 exceptions.
add(std::current_exception());
765 void copyRequest::base::innerMagic(
const block& b) {
769 if (result ==
nullptr) {
770 result = magic_error(
threadData->magicCookie);
775 fileMagic =
"unknown";
779 void copyRequest::base::magician(
blockQueue& inputQueue,
781 exceptionList& exceptions) {
783 while (
auto b = inputQueue.dequeue()) {
785 outputQueue.enqueue(b);
788 exceptions.add(std::current_exception());
791 copyRequest::base::magicCalculator::magicCalculator(
blockQueue& inputQueue,
794 exceptionList& exceptions):
795 outputQueue(aOutputQueue) {
797 worker = std::thread(©Request::base::magician, request,
798 std::ref(inputQueue),
799 std::ref(outputQueue),
800 std::ref(exceptions));
803 copyRequest::base::magicCalculator::~magicCalculator() noexcept (false) {
804 if (worker.joinable()) {
806 outputQueue.signalDone();
809 bool copyRequest::base::magicCalculator::joinable()
const {
810 return worker.joinable();
814 std::unique_ptr<outputHandler::base::writer>& writeHandle) {
822 scoped::generic < decltype(threadData->freeBlocks) > freeBlockReseter(threadData->freeBlocks,
823 [](decltype(threadData->freeBlocks)& freeBlocks) {
824 freeBlocks.resetDone();
828 magicCalculator magicThread(blocksForMagic, threadData->freeBlocks,
this, exceptions);
829 auto& lastQueue = magicThread.joinable() ?
831 threadData->freeBlocks;
833 auto& lastQueue = threadData->freeBlocks;
837 auto& writerOutputQueue = hashThread.
joinable() ?
846 auto& readerOutputQueue = writeThread.
joinable() ?
854 reader(input, threadData->freeBlocks, readerOutputQueue,
858 if (!exceptions.
empty()) {
859 auto n = exceptions.
size();
862 source,
"threaded copy",
"we have ", n,
" exceptions at once");
864 std::rethrow_exception(exceptions.
front());
869 auto& freeBlocks = threadData->freeBlocks;
870 auto b = freeBlocks.dequeue(freeBlocks.empty(), memoryBlockSize);
872 throw std::runtime_error(
"got no block for unthreaded copy");
880 for (
auto sum : checkSums) {
881 sum->update(b->
size());
884 for (
auto sum : checkSums) {
903 for (
auto sum : checkSums) {
907 freeBlocks.enqueue(b);
910 source,
"unthreaded copy",
"block ptr is nullptr after copy");
917 errorMessage =
"source " + source +
" still exists, not deleting " + destination;
920 OutputHandler->
remove(destination, state);
922 errorMessage =
"vanished before processing";
924 errorMessage =
"directory not empty";
933 if (readInitialStat ==
nullptr) {
934 throw std::runtime_error(
"makeSymlink: no readInitialStat for " + source);
936 std::vector<char> link(readInitialStat->size + 1);
972 for (
const auto& item : linkBaseMap) {
973 if ((link.size() - 1 > item.first.size()) &&
974 (item.first.compare(0, item.first.size(), link.data(), item.first.size()) == 0)) {
975 link.erase(link.begin(), link.begin() + item.first.size());
976 while (link.front() ==
'/') {
977 link.erase(link.begin());
979 auto searchStart(item.second.empty() ? mapEntry.second.size() + 1 : item.second.size() + 1);
984 for (
auto slashPosition = source.find_first_of(
'/', searchStart);
985 slashPosition != std::remove_reference<decltype(source)>::type::npos;
986 slashPosition = source.find_first_of(
'/', slashPosition + 1)) {
987 if (slashPosition > 0 &&
988 source[slashPosition - 1] ==
'.' &&
989 (slashPosition == 1 || source[slashPosition - 2] !=
'.')) {
993 link.insert(link.begin(), {
'.',
'.',
'/'});
1012 const std::string& suffix) {
1013 if (suffix.empty() || dt < clock_type::duration::zero()) {
1016 if (std::chrono::duration_cast<std::chrono::duration<double>>(dt).count() > maxDelayTime) {
1017 dt = std::chrono::duration_cast<decltype(dt)>(std::chrono::duration<double>(maxDelayTime));
1019 bool delayChanged =
false;
1021 std::unique_lock<decltype(advisoryWaitMapMutex)> lock(advisoryWaitMapMutex);
1022 auto result = advisoryWaitMap.emplace(suffix, dt);
1023 if (result.second ==
false) {
1024 auto it = result.first;
1025 if (it->second < dt) {
1027 delayChanged =
true;
1030 delayChanged =
true;
1035 "-- no path --",
"delay change",
1036 "set wait time for '", suffix,
"' files to ",
1037 std::fixed, std::chrono::duration_cast<std::chrono::duration<double>>(dt).count(),
"s");
1039 return delayChanged;
1044 if (dt == decltype(dt)::zero()) {
1045 dt = clock_type::now() - tInotify;
1048 if (adviseDelay(dt, getSuffix())) {
1052 filesInWorkIterator->second.setWaitTime(dt);
1060 auto it = advisoryWaitMap.find(getSuffix());
1061 if (it == advisoryWaitMap.end()) {
1062 return clock_type::duration::zero();
1078 if (backupSuffix.empty()) {
1079 throw std::logic_error(
"backup suffix must not be empty");
1081 std::string bkgSuffix;
1082 decltype(backupSuffix.find(
"bla")) index = 0;
1083 while (index != decltype(backupSuffix)::npos) {
1084 auto newIndex = backupSuffix.find_first_of(
'%', index);
1085 bkgSuffix += backupSuffix.substr(index, newIndex - index);
1086 if (newIndex != decltype(backupSuffix)::npos) {
1088 if (newIndex < backupSuffix.size()) {
1089 if (backupSuffix.at(newIndex) ==
'{') {
1091 auto endmarker = backupSuffix.find_first_of(
'}', newIndex);
1092 auto part =
"%" + backupSuffix.substr(newIndex, endmarker - newIndex);
1093 expandAttrValue(part);
1095 newIndex = endmarker + 1;
1097 auto part = backupSuffix.substr(newIndex - 1);
1098 expandAttrValue(part);
1100 newIndex = decltype(backupSuffix)::npos;
1110 processMultiplicities->at(++nConcurrentProcesses)++;
1112 [](decltype(*
this)& ) {
1116 tWorkStart = clock_type::now();
1118 [](decltype(tWorkDone)& workDone) {
1119 workDone = clock_type::now();
1124 [](decltype(state)& State) {
1128 threadData = &aThreadData;
1130 [](decltype(threadData)& ThreadData) {
1131 ThreadData =
nullptr;
1134 errorMessage.clear();
1139 removeFileOrDirectory(threadData->InputHandler, threadData->OutputHandler);
1145 errorMessage =
"ignored";
1147 errorMessage =
"vanished before action";
1149 errorMessage =
"oops";
1156 threadData->OutputHandler->ensureParentDirs(destination, source, threadData->InputHandler);
1159 if (makeSymLink(threadData->InputHandler, threadData->OutputHandler)) {
1161 errorMessage =
"vanished before action";
1171 auto result = threadData->OutputHandler->rename(moveSource, readInitialStat, destination, state);
1175 errorMessage =
"directory move ignored";
1179 if (threadData->InputHandler->getStat(origSource) ==
nullptr) {
1182 moveSource,
"remove obsolete mv source",
1183 "can't find source '", origSource,
"'");
1184 threadData->OutputHandler->remove(moveSource, dummyState);
1187 moveSource,
"rename",
1188 "littering with obsolete mv source");
1192 __attribute__ ((fallthrough));
1199 throw std::runtime_error(
"impossible rename retval");
1204 }
catch (
const std::exception& e) {
1205 errorMessage = e.what();
1215 if (readInitialStat ==
nullptr ||
1217 clock_type::now() - tEnqueue > std::chrono::seconds(1) ||
1218 (! readInitialStat->isRegularFile())) {
1219 readInitialStat = threadData->InputHandler->getStat(source);
1220 if (readInitialStat ==
nullptr) {
1222 errorMessage =
"vanished in action";
1226 if (! readInitialStat->isRegularFile()) {
1228 errorMessage =
"ignored non regular file";
1232 if (ignoreExisting) {
1233 auto destStat = threadData->OutputHandler->getStat(destination);
1235 if (readInitialStat->size == destStat->size &&
1236 readInitialStat->isSameMtimeAs(*destStat)) {
1238 errorMessage =
"ignored due to existing copy with same stat";
1239 if (changeAttrsOnIgnoreExisting) {
1240 if ((
preserve.mode && readInitialStat->mode == destStat->mode)
1241 || (
preserve.ownership && readInitialStat->ownerUid == destStat->ownerUid
1242 && readInitialStat->ownerGid == destStat->ownerGid)) {
1243 errorMessage +=
", preserved attrs are already fine";
1245 threadData->OutputHandler->doAttributePreservations(destination, *readInitialStat);
1246 errorMessage +=
", attributes updated";
1254 auto input = threadData->InputHandler->newReader(source, state, *readInitialStat);
1255 std::unique_ptr<ioHandle::attrDataType> attrData;
1257 attrData = input->getAttrData(threadData->OutputHandler);
1259 std::unique_ptr<acl::list> aclData;
1261 aclData = input->getAclData();
1264 attrset(*input, process_source_attrs);
1265 attrset(*input, start_source_attrs);
1267 std::string destNameDuringWrite(destination);
1268 std::string backupFile;
1269 switch (backupMode) {
1274 backupFile = destination + getBackupSuffix();
1275 threadData->OutputHandler->renameSimple(destination, backupFile);
1279 if (!backupSuffix.empty()) {
1280 destNameDuringWrite += getBackupSuffix();
1286 threadData->OutputHandler->remove(destNameDuringWrite, dummy);
1290 throw std::runtime_error(
"illegal backupModeType");
1294 std::unique_ptr<outputHandler::base::writer> writeHandle;
1296 writeHandle = threadData->OutputHandler->newTmpWriter(destNameDuringWrite,
1297 readInitialStat->size,
1299 std::move(attrData),
1300 std::move(aclData));
1302 writeHandle = threadData->OutputHandler->newWriter(destNameDuringWrite,
1303 (appendableFiles.fIsSet() &&
1304 regex_match(destination, appendableFiles)) ||
1305 std::any_of(append_attrs.cbegin(), append_attrs.cend(),
1306 [
this](decltype(append_attrs)::value_type attr) {
1307 return ! threadData->InputHandler->getXattr(source, attr).empty();
1309 readInitialStat->size,
1310 input->getBlockSize(),
1313 std::move(attrData),
1314 std::move(aclData));
1316 doBlockSizeSetup(*input, *writeHandle);
1318 input->setupSparseRegions(sparse);
1321 auto startPosition = writeHandle->
getSize();
1323 startPosition = (startPosition / input->getBlockSize()) * input->getBlockSize();
1324 writeHandle->
seek(startPosition);
1325 input->seek(startPosition);
1329 && readInitialStat->size / memoryBlockSize > 1
1331 doThreadedCopy(*input, writeHandle);
1333 doUnthreadedCopy(*input, writeHandle);
1336 input->checkUnchangedness();
1338 checkAttributes(*input, check_source_attrs);
1341 if (syncWrittenFiles) {
1342 writeHandle->
sync();
1344 if (!setAttributesAfterClose) {
1347 attrset(*writeHandle, success_dest_attrs);
1353 if (setAttributesAfterClose && !
noCopy) {
1354 threadData->OutputHandler->doAttributePreservations(destNameDuringWrite, *readInitialStat);
1356 switch (backupMode) {
1359 threadData->OutputHandler->remove(backupFile, dummy);
1363 threadData->OutputHandler->renameSimple(destNameDuringWrite, destination);
1369 errorMessage = e.what();
1370 attrset(*input, failure_source_attrs);
1372 }
catch (
const std::exception& e) {
1373 errorMessage = e.what();
1374 attrset(*input, failure_source_attrs);
1377 attrdel(*input, process_source_attrs);
1378 attrdel(*input, failure_source_attrs);
1379 attrset(*input, success_source_attrs);
1382 }
catch (
const std::exception& e) {
1383 errorMessage = e.what();
1392 if (tWorkStart < filesInWorkIterator->second.getEarliestprocessTime()) {
1394 errorMessage =
"changed earliest process time, ";
1395 auto deltaT = filesInWorkIterator->second.getEarliestprocessTime() - tWorkStart;
1396 errorMessage += std::to_string(std::chrono::duration_cast<std::chrono::duration<double>>(deltaT).count());
1422 std::unique_lock<decltype(objectCountMutex)> lock(objectCountMutex);
1423 while (objectCount > 0) {
1424 objectCountCondVar.wait(lock);
1428 std::unique_lock<decltype(objectCountMutex)> lock(objectCountMutex);
1429 return (objectCount > 0);
1433 std::string& suffix) {
1434 auto fileNameStart = path.find_last_of(
'/');
1435 if (fileNameStart == std::string::npos) {
1438 auto const fileName = path.substr(fileNameStart);
1439 auto const suffixStart = fileName.find_last_of(
'.');
1440 if (suffixStart != decltype(fileName)::npos) {
1441 suffix = fileName.substr(suffixStart);
1446 threadData =
nullptr;
1447 tEnqueue = clock_type::now();
1449 std::unique_lock<decltype(objectCountMutex)> lock(objectCountMutex);
1452 getSuffix(source, suffix);
1454 for (
const auto& item : std::vector<std::pair<
const std::string&,
1455 const decltype(sourcePrefixRegex)&>>({
1456 {source, sourcePrefixRegex},
1457 {destination, destinationPrefixRegex}
1459 if (item.second.fIsSet()) {
1461 if (std::regex_match(item.first, match, item.second)) {
1462 if (match.size() == 2) {
1463 prefix = match[1].str();
1470 checkSums.push_front(checksumCreator->create());
1473 if (std::any_of(prohibitive_attrs.cbegin(), prohibitive_attrs.cend(),
1474 [
this, InputHandler](decltype(prohibitive_attrs)::value_type attr) {
1475 return ! InputHandler->
getXattr(source, attr).empty();
1481 if (readInitialStat ==
nullptr) {
1482 readInitialStat = InputHandler->
getStat(source,
false);
1484 if (readInitialStat ==
nullptr) {
1490 if (readInitialStat->isLink()) {
1492 readInitialStat = InputHandler->
getStat(source,
true);
1493 if (readInitialStat ==
nullptr) {
1494 throw std::runtime_error(
"can't dereference " + source);
1505 if (readInitialStat->isDir()) {
1512 "is a directory ", destination,
" inotified at ", std::fixed, std::chrono::duration_cast<std::chrono::duration<double>>(tInotify.time_since_epoch()).count());
1518 const std::string& aSource,
1519 const std::string& aDestination,
1520 std::unique_ptr<const genericStat>& aStat,
1523 clock_type::time_point timestamp) :
1524 readInitialStat(std::move(aStat)),
1527 destination(aDestination),
1528 mapEntry(aMapEntry),
1529 tInotify(timestamp),
1534 const std::string& aSource,
1535 const std::string& aDestination,
1538 clock_type::time_point timestamp) :
1541 destination(aDestination),
1542 mapEntry(aMapEntry),
1543 tInotify(timestamp),
1548 const std::string& aSource,
1549 const std::string& aDestination,
1550 const std::string& aMoveSource,
1551 const std::string& aOrigSource,
1553 clock_type::time_point timestamp) :
1556 destination(aDestination),
1557 moveSource(aMoveSource),
1558 origSource(aOrigSource),
1559 mapEntry(aMapEntry),
1560 tInotify(timestamp),
1567 std::unique_lock<decltype(objectCountMutex)> lock(objectCountMutex);
1569 if (objectCount == 0) {
1570 objectCountCondVar.notify_all();
1578 tTotalStat.addValue(tWorkDone - tInotify);
1579 tPipeStat.addValue(tEnqueue - tInotify);
1580 tWaitStat.addValue(tWorkStart - tEnqueue);
1583 auto dtCopy = tWorkDone - tWorkStart;
1584 auto tModification = readInitialStat->getMtime();
1585 auto d = tInotify - tModification;
1586 tTotal2Stat.addValue(tWorkDone - tModification);
1587 tInotifyStat.addValue(d);
1588 tCopyStat.addValue(dtCopy);
1589 if (readInitialStat) {
1590 bytesStat.addValue(readInitialStat->size);
1592 auto dtCopyReal = std::chrono::duration_cast<std::chrono::duration<double>>(dtCopy).count();
1594 if (dtCopyReal > 0) {
1595 speedStat.addValue(readInitialStat->size / (1024. * 1024) / dtCopyReal);
1602 const std::string& expectedValue) {
1603 for (
auto& cksum : checkSums) {
1604 if (cksum->getName() == checkSumType) {
1605 cksum->setExpectedResult(expectedValue);
1611 auto cksum = checksumCreator->create();
1612 cksum->setExpectedResult(expectedValue);
1613 checkSums.push_front(cksum);
1619 std::ostream& logStream) {
1626 for (
const auto& sum : checkSums) {
1628 escaper->escape(
noCopy ? source : destination, name);
1629 if (checkSums.size() > 1) {
1630 hashStream << sum->getName() <<
": ";
1632 hashStream << sum->getResult() <<
" " << name <<
"\n";
1636 bool needSpace =
false;
1641 std::string operation;
1647 operation =
"remove";
1654 static std::atomic<Json::UInt64> messageCounter(0);
1656 root[
"msgnr"] = messageCounter++;
1657 root[
"operation"] = operation;
1658 root[
"path"] = destination;
1659 root[
"source"] = source;
1660 if (readInitialStat !=
nullptr) {
1661 root[
"size"] =
static_cast<Json::Int64
>(readInitialStat->size);
1663 for (
const auto& sum : checkSums) {
1664 root[sum->getName()] = sum->getResult();
1668 root[
"fileType"] = fileMagic;
1671 root[
"finishTime"] = std::chrono::duration_cast<std::chrono::duration<double>>(tWorkDone.time_since_epoch()).count();
1672 root[
"inotifyTime"] = std::chrono::duration_cast<std::chrono::duration<double>>(tInotify.time_since_epoch()).count();
1673 root[
"retries"] = retries;
1674 if (!prefixJsonName.empty()) {
1675 root[prefixJsonName] = prefix.empty() ?
statPrefix : prefix;
1677 for (
auto& item : jsonExtraFields) {
1678 root[item.first] = item.second;
1680 static Json::FastWriter jsonWriter;
1689 source, operation, errorMessage);
1692 for (
const auto& field : logFields) {
1693 std::string value(field);
1695 if (!expandAttrValue(value)) {
1696 if (value ==
"%source") {
1701 value += moveSource;
1706 value.push_back(
'\'');
1707 }
else if (value ==
"%destination") {
1709 value += destination;
1710 value.push_back(
'\'');
1711 }
else if (value ==
"%urlsource") {
1716 escaper->escape(moveSource, value);
1718 escaper->escape(source, value);
1720 }
else if (value ==
"%urldestination") {
1722 escaper->escape(destination, value);
1723 }
else if (value ==
"%retries") {
1724 value = std::to_string(retries);
1725 }
else if (value ==
"%error") {
1726 value = errorMessage;
1730 value +=
" vanished before action started";
1732 }
else if (value ==
"%op") {
1759 logStream <<
"'" << source <<
"' ";
1762 logStream <<
" -> ";
1766 logStream <<
" link ";
1770 logStream <<
" rm ";
1774 logStream <<
" mv ";
1778 logStream <<
" vanished before action started";
1782 logStream <<
" was appended to";
1786 logStream <<
" '" << destination <<
"' after " << retries <<
" attempts";
1790 logStream <<
" is ignored";
1794 logStream <<
" due to truncation";
1798 logStream <<
" failed due to " << errorMessage;
1802 logStream <<
" inWork not cleared";
1806 logStream <<
" attribute mismatch: " << errorMessage;
1825 stream <<
statPrefix << tInotifyStat <<
"\n";
1829 stream <<
statPrefix << tEnqueueStat <<
"\n";
1832 stream <<
statPrefix << bytesStat.getN() <<
" files with " << bytesStat.sum <<
"B transferred\n";
1834 auto tElapsed = std::chrono::duration_cast<std::chrono::duration<double>>(clock_type::now() - tProgramStart).count();
1835 stream <<
statPrefix <<
"avg rate: " << bytesStat.sum / (tElapsed * 1024 * 1024) <<
" MiB/s in " << tElapsed <<
" s\n";
1836 for (
unsigned int i = 0; i < processMultiplicities->size(); i++) {
1837 stream <<
statPrefix << i <<
" processes at once: "
1838 << (*processMultiplicities)[i] <<
" times\n";
1841 std::multimap<clock_type::duration, std::string> waitMap;
1842 for (
const auto& item : advisoryWaitMap) {
1843 waitMap.emplace(item.second, item.first);
1845 for (
const auto& item : waitMap) {
1846 stream <<
statPrefix <<
" wait time for " << item.second <<
" files: "
1847 << std::chrono::duration_cast<std::chrono::duration<double>>(item.first).count() <<
"s\n";
1853 tTotal2Stat.reset();
1855 tInotifyStat.reset();
1859 tEnqueueStat.reset();
1870 (request->
retries < maxRetries)) {
1871 static ssize_t lastSize;
1874 std::ifstream statm(
"/proc/self/statm");
1882 request->
source,
"re-sched",
1884 ", delaying by ", std::fixed, std::chrono::duration_cast<std::chrono::duration<double>>(waitTime).count(),
1885 "s vmsize: ", size,
"(", lastSize,
", ", size - lastSize,
"), retries: ", request->
retries);
1886 delayedRequests.
enqueue(request, waitTime);
1887 if (request !=
nullptr) {
1888 throw std::runtime_error(
"requesy not null");
1901 while (
auto request = queue.
dequeue()) {
1906 auto dt = request->filesInWorkIterator->second.getEarliestprocessTime()
1907 - clock_type::now();
1908 if (dt > decltype(dt)::zero()) {
1910 request->source,
"delay",
1911 "scheduled too early by ", std::chrono::duration_cast<std::chrono::duration<double>>(dt).count(),
"s");
1912 auto newDelay = request->filesInWorkIterator->second.getEarliestprocessTime() - request->tInotify;
1913 request->adviseDelay(newDelay);
1914 delayedRequests.
enqueue(request, dt);
1915 if (request !=
nullptr) {
1916 throw std::runtime_error(
"request not nullptr");
1923 request->process(threadData);
1924 if (! retry(request, delayedRequests)) {
1927 if (request !=
nullptr) {
1928 throw std::runtime_error(
"request not nullptr");
1934 }
catch (
const std::exception& e) {
1937 "--",
"process",
"stop requested due to ", e.what());
1941 "--",
"process",
"stop requested due to unknown exception");
1947 const copyRequest::clock_type::time_point& t) {
1948 stream << std::chrono::duration_cast<std::chrono::duration<double>>(t.time_since_epoch()).count();
1954 out <<
" request " <<
static_cast<const void*
>(&request);
1956 out <<
" state " << std::hex << static_cast<unsigned>(request.
state) << std::dec;
1957 out <<
" tInotify " << request.
tInotify;
1959 out <<
" now: " << clock_type::now() <<
"\n";