 |
ewmscp
..
|
class for copy requests.
More...
#include <copyRequest.h>
|
const genericStat & | getInitialStat () const |
|
const std::string & | getSource () const |
|
clock_type::duration | getAdvisedDelay () const |
| returns the advised delay More...
|
|
void | registerIterator (decltype(filesInWorkIterator) iter) |
|
decltype(filesInWorkIterator) | getFilesInWorkIteraror () const |
|
fileInWork::slotTypes | getSlotType () const |
|
void | changeRequestType (stateBitType newType) |
|
| base (inputHandler::base *InputHandler, const std::string &aSource, const std::string &aDestination, std::unique_ptr< const genericStat > &aStat, const singleMap &aMapEntry, bool remove, clock_type::time_point timestamp) |
| constructor for copy,link and remove requests. More...
|
|
| base (inputHandler::base *InputHandler, const std::string &aSource, const std::string &aDestination, const singleMap &aMapEntry, bool remove, clock_type::time_point timestamp) |
| constructor for copy,link and remove requests. More...
|
|
| base (inputHandler::base *InputHandler, const std::string &aSource, const std::string &aDestination, const std::string &aMoveSource, const std::string &aOrigSource, const singleMap &aMapEntry, clock_type::time_point timestamp) |
| constructor for rename requests only. More...
|
|
virtual | ~base () noexcept(false) |
|
virtual void | process (perThreadData &threadData) |
|
virtual void | printResults (std::ostream &hashStream, std::ostream &logStream) |
|
const std::string & | getSuffix () const |
|
void | addExpectedChecksumResult (const std::string &checkSumType, const std::string &expectedValue) |
|
|
void | kw_size (std::string &value) const |
|
void | kw_mtime (std::string &value) const |
|
void | kw_inow (std::string &value) const |
|
void | kw_now (std::string &value) const |
|
void | kw_version (std::string &value) const |
|
void | kw_commit (std::string &value) const |
|
void | kw_prefix (std::string &value) const |
|
|
void | kw_reason (std::string &value) const |
|
void | kw_state (std::string &value) const |
|
void | kw_finishtime (std::string &value) const |
|
|
void | adviseDelay (clock_type::duration dt=clock_type::duration::zero()) |
|
|
bool | expandAttrValue (std::string &value) |
| expand the keywords in value, see Expansion of key words in attributes More...
|
|
void | attrset (ioHandle &writeHandle, const std::map< std::string, std::string > &attrs) |
|
void | checkAttributes (ioHandle &, const std::map< std::string, std::string > &check_source_attr) |
|
void | hash_worker (checksum::parallel *sum, blockQueue &blocksToHash, blockQueue &hashedBlocks, exceptionList &exceptions) |
|
void | hasher (blockQueue &blocksToHash, blockQueue &hashedBlocks, exceptionList &exceptions) |
|
void | readWorker (inputHandler::base::reader &input, blockReadRequestQueue &blockRequests, blockQueue &freeBlocks, blockQueue &readBlocks, exceptionList &exceptions) |
|
void | reader (inputHandler::base::reader &input, blockQueue &freeBlocks, blockQueue &readBlocks, bool mayParallelize, exceptionList &exceptions) |
|
void | writer (std::unique_ptr< outputHandler::base::writer > &writeHandle, blockQueue &blocksToWrite, blockQueue &writtenBlocks, exceptionList &exceptions) |
|
void | writeWorker (std::unique_ptr< outputHandler::base::writer > &writeHandle, blockQueue &blocksToWrite, blockQueue &writtenBlocks, exceptionList &exceptions) |
|
void | doThreadedCopy (inputHandler::base::reader &input, std::unique_ptr< outputHandler::base::writer > &writeHandle) |
|
void | doUnthreadedCopy (inputHandler::base::reader &input, std::unique_ptr< outputHandler::base::writer > &writeHandle) |
|
void | doBlockSizeSetup (ioHandle &input, ioHandle &output) |
|
bool | makeSymLink (inputHandler::base *InputHandler, outputHandler::base *OutputHandler) |
|
void | removeFileOrDirectory (inputHandler::base *InputHandler, outputHandler::base *OutputHandler) |
|
std::string | getBackupSuffix () |
|
void | init (inputHandler::base *InputHandler) |
|
virtual bool | compare (const base &rhs) const |
|
class for copy requests.
Well, in fact not only copy but also rename, link and delete requests. Objects of this class are created by either from the command line or in The follow-mode. from a command stream, kept in queues and precessed as resources are available. The instances hold all data needed to perform the required operation and stores performance data and a result.
Definition at line 99 of file copyRequest.h.
◆ blockReadRequestQueue
◆ base() [1/3]
copyRequest::base::base |
( |
inputHandler::base * |
InputHandler, |
|
|
const std::string & |
aSource, |
|
|
const std::string & |
aDestination, |
|
|
std::unique_ptr< const genericStat > & |
aStat, |
|
|
const singleMap & |
aMapEntry, |
|
|
bool |
remove, |
|
|
clock_type::time_point |
timestamp |
|
) |
| |
◆ base() [2/3]
copyRequest::base::base |
( |
inputHandler::base * |
InputHandler, |
|
|
const std::string & |
aSource, |
|
|
const std::string & |
aDestination, |
|
|
const singleMap & |
aMapEntry, |
|
|
bool |
remove, |
|
|
clock_type::time_point |
timestamp |
|
) |
| |
◆ base() [3/3]
copyRequest::base::base |
( |
inputHandler::base * |
InputHandler, |
|
|
const std::string & |
aSource, |
|
|
const std::string & |
aDestination, |
|
|
const std::string & |
aMoveSource, |
|
|
const std::string & |
aOrigSource, |
|
|
const singleMap & |
aMapEntry, |
|
|
clock_type::time_point |
timestamp |
|
) |
| |
◆ ~base()
copyRequest::base::~base |
( |
| ) |
|
|
virtualnoexcept |
◆ addExpectedChecksumResult()
void copyRequest::base::addExpectedChecksumResult |
( |
const std::string & |
checkSumType, |
|
|
const std::string & |
expectedValue |
|
) |
| |
◆ adviseDelay() [1/2]
bool copyRequest::base::adviseDelay |
( |
clock_type::duration |
dt, |
|
|
const std::string & |
suffix |
|
) |
| |
|
static |
update the advised delay in the map.
Creates entries in the advisoryWaitMap or updates them. Is to be called when a copy request failed due to changes to the source file, the advised delay will be the time difference between the notofication by The inotify_watch command. and the current time.
Definition at line 1011 of file copyRequest.cpp.
1013 if (
suffix.empty() || dt < clock_type::duration::zero()) {
1016 if (std::chrono::duration_cast<std::chrono::duration<double>>(dt).count() >
maxDelayTime) {
1017 dt = std::chrono::duration_cast<decltype(dt)>(std::chrono::duration<double>(
maxDelayTime));
1019 bool delayChanged =
false;
1023 if (result.second ==
false) {
1024 auto it = result.first;
1025 if (it->second < dt) {
1027 delayChanged =
true;
1030 delayChanged =
true;
1035 "-- no path --",
"delay change",
1036 "set wait time for '",
suffix,
"' files to ",
1037 std::fixed, std::chrono::duration_cast<std::chrono::duration<double>>(dt).count(),
"s");
1039 return delayChanged;
References errMsg::emit(), and errMsg::notice.
Referenced by followRequestProvider::enqueueOrAppend().
◆ adviseDelay() [2/2]
void copyRequest::base::adviseDelay |
( |
clock_type::duration |
dt = clock_type::duration::zero() | ) |
|
|
protected |
◆ attrdel()
void copyRequest::base::attrdel |
( |
ioHandle & |
handle, |
|
|
const std::map< std::string, std::string > & |
attrs |
|
) |
| |
|
staticprivate |
◆ attrset()
void copyRequest::base::attrset |
( |
ioHandle & |
writeHandle, |
|
|
const std::map< std::string, std::string > & |
attrs |
|
) |
| |
|
private |
Definition at line 446 of file copyRequest.cpp.
448 for (
const auto& attr : attrs) {
449 auto name = attr.first;
450 auto value = attr.second;
453 if (value ==
"%count") {
454 unsigned int count = 0;
455 auto retval = writeHandle.
getXattr(name);
456 if (!retval.empty()) {
458 count = std::stoul(retval);
462 "strange value '", retval,
"'");
466 value = std::to_string(count + 1);
467 }
else if (value ==
"%sums") {
469 std::string fullName(name);
470 fullName += item->getName();
471 writeHandle.
setXattr(fullName, item->getResult());
References errMsg::debug, errMsg::emit(), ioHandle::getXattr(), and ioHandle::setXattr().
◆ changeRequestType()
void copyRequest::base::changeRequestType |
( |
stateBitType |
newType | ) |
|
Definition at line 1072 of file copyRequest.cpp.
1073 fileInWork::typeChanger changer(*
this, newType);
◆ checkAttributes()
void copyRequest::base::checkAttributes |
( |
ioHandle & |
handle, |
|
|
const std::map< std::string, std::string > & |
check_source_attr |
|
) |
| |
|
private |
◆ checkForInstances()
bool copyRequest::base::checkForInstances |
( |
| ) |
|
|
static |
◆ compare()
virtual bool copyRequest::base::compare |
( |
const base & |
rhs | ) |
const |
|
inlineprivatevirtual |
◆ doBlockSizeSetup()
void copyRequest::base::doBlockSizeSetup |
( |
ioHandle & |
input, |
|
|
ioHandle & |
output |
|
) |
| |
|
private |
◆ doThreadedCopy()
Definition at line 813 of file copyRequest.cpp.
820 exceptionList exceptions;
824 freeBlocks.resetDone();
829 auto& lastQueue = magicThread.joinable() ?
835 hashCalculator hashThread(blocksToHash, lastQueue,
this,
837 auto& writerOutputQueue = hashThread.joinable() ?
840 writeActor writeThread(writeHandle,
843 hashThread.parallelized(),
846 auto& readerOutputQueue = writeThread.joinable() ?
849 hashThread.joinable() ?
858 if (!exceptions.empty()) {
859 auto n = exceptions.size();
862 source,
"threaded copy",
"we have ", n,
" exceptions at once");
864 std::rethrow_exception(exceptions.front());
References errMsg::emit(), copyRequest::exceptionList::empty(), copyRequest::exceptionList::front(), copyRequest::base::hashCalculator::joinable(), copyRequest::base::writeActor::joinable(), noCopy, ioHandle::parallelizable(), copyRequest::base::hashCalculator::parallelized(), copyRequest::exceptionList::size(), and errMsg::warning.
◆ doUnthreadedCopy()
Definition at line 868 of file copyRequest.cpp.
872 throw std::runtime_error(
"got no block for unthreaded copy");
881 sum->update(b->size());
885 sum->update(b->bufferAt(0), b->size());
907 freeBlocks.enqueue(b);
910 source,
"unthreaded copy",
"block ptr is nullptr after copy");
References copyRequest::append, block::bufferAt(), errMsg::crit, errMsg::emit(), block::isHole(), noCopy, inputHandler::base::reader::readBlock(), block::size(), timerInst, and outputHandler::base::writer::writeBlock().
◆ expandAttrValue()
bool copyRequest::base::expandAttrValue |
( |
std::string & |
value | ) |
|
|
private |
expand the keywords in value, see Expansion of key words in attributes
- Returns
- true if a keyword was found, false if not
Definition at line 365 of file copyRequest.cpp.
373 if (value.size() > 1 && value.at(0) ==
'%') {
374 auto keyword = value.substr(1);
376 if (cksum->getName() == keyword) {
377 value = cksum->getResult();
References f().
◆ getAdvisedDelay()
copyRequest::clock_type::duration copyRequest::base::getAdvisedDelay |
( |
| ) |
const |
returns the advised delay
retuns special duration value zero if no delay is advised.
Locking the map is not needed here because read access to containers where no elements are removed (which does not happen here) is thread safe.
Definition at line 1059 of file copyRequest.cpp.
1062 return clock_type::duration::zero();
Referenced by followRequestProvider::enqueueOrAppend().
◆ getBackupModeNameMap()
◆ getBackupSuffix()
std::string copyRequest::base::getBackupSuffix |
( |
| ) |
|
|
private |
Definition at line 1077 of file copyRequest.cpp.
1079 throw std::logic_error(
"backup suffix must not be empty");
1081 std::string bkgSuffix;
1084 auto newIndex =
backupSuffix.find_first_of(
'%', index);
1085 bkgSuffix +=
backupSuffix.substr(index, newIndex - index);
1091 auto endmarker =
backupSuffix.find_first_of(
'}', newIndex);
1092 auto part =
"%" +
backupSuffix.substr(newIndex, endmarker - newIndex);
1095 newIndex = endmarker + 1;
◆ getFilesInWorkIteraror()
◆ getInitialStat()
const genericStat& copyRequest::base::getInitialStat |
( |
| ) |
const |
|
inline |
◆ getSlotType()
◆ getSource()
const std::string& copyRequest::base::getSource |
( |
| ) |
const |
|
inline |
◆ getStatPrinter()
std::function< void(std::ostream &)> & copyRequest::base::getStatPrinter |
( |
| ) |
|
|
static |
we can't have static functions virtual, so we explicitly use a function to give us the proper print function, which can be changed in derived classes.
Definition at line 1816 of file copyRequest.cpp.
Referenced by printResults().
◆ getSuffix() [1/2]
const std::string& copyRequest::base::getSuffix |
( |
| ) |
const |
|
inline |
◆ getSuffix() [2/2]
void copyRequest::base::getSuffix |
( |
const std::string & |
path, |
|
|
std::string & |
suffix |
|
) |
| |
|
static |
Definition at line 1432 of file copyRequest.cpp.
1434 auto fileNameStart = path.find_last_of(
'/');
1435 if (fileNameStart == std::string::npos) {
1438 auto const fileName = path.substr(fileNameStart);
1439 auto const suffixStart = fileName.find_last_of(
'.');
1440 if (suffixStart != decltype(fileName)::npos) {
1441 suffix = fileName.substr(suffixStart);
◆ hash_worker()
◆ hasher()
◆ init()
Definition at line 1445 of file copyRequest.cpp.
1454 for (
const auto& item : std::vector<std::pair<
const std::string&,
1459 if (item.second.fIsSet()) {
1461 if (std::regex_match(item.first, match, item.second)) {
1462 if (match.size() == 2) {
1470 checkSums.push_front(checksumCreator->create());
1494 throw std::runtime_error(
"can't dereference " +
source);
1512 "is a directory ",
destination,
" inotified at ", std::fixed, std::chrono::duration_cast<std::chrono::duration<double>>(
tInotify.time_since_epoch()).count());
References checksumCreators, dereference, errMsg::emit(), copyRequest::fileToBeCopied, copyRequest::fileToBeRemoved, copyRequest::fileToBeRenamed, pathHandler::getStat(), pathHandler::getXattr(), copyRequest::ignore, copyRequest::linkToBeMade, errMsg::notice, copyRequest::trucated, and copyRequest::vanished.
Referenced by base().
◆ keywordMap()
std::map< std::string, void(copyRequest::base::*)(std::string &) const > & copyRequest::base::keywordMap |
( |
| ) |
|
|
static |
◆ makeSymLink()
◆ printResults()
void copyRequest::base::printResults |
( |
std::ostream & |
hashStream, |
|
|
std::ostream & |
logStream |
|
) |
| |
|
virtual |
- Bug:
- to little structure, unreadable code
- Bug:
- was follow before, ugly condition
Reimplemented in copyRequest::listingRequest, and copyRequest::checksumTest.
Definition at line 1618 of file copyRequest.cpp.
1630 hashStream << sum->getName() <<
": ";
1632 hashStream << sum->getResult() <<
" " << name <<
"\n";
1636 bool needSpace =
false;
1641 std::string operation;
1647 operation =
"remove";
1654 static std::atomic<Json::UInt64> messageCounter(0);
1656 root[
"msgnr"] = messageCounter++;
1657 root[
"operation"] = operation;
1664 root[sum->getName()] = sum->getResult();
1668 root[
"fileType"] = fileMagic;
1671 root[
"finishTime"] = std::chrono::duration_cast<std::chrono::duration<double>>(
tWorkDone.time_since_epoch()).count();
1672 root[
"inotifyTime"] = std::chrono::duration_cast<std::chrono::duration<double>>(
tInotify.time_since_epoch()).count();
1674 if (!prefixJsonName.empty()) {
1677 for (
auto& item : jsonExtraFields) {
1678 root[item.first] = item.second;
1680 static Json::FastWriter jsonWriter;
1693 std::string value(field);
1696 if (value ==
"%source") {
1706 value.push_back(
'\'');
1707 }
else if (value ==
"%destination") {
1710 value.push_back(
'\'');
1711 }
else if (value ==
"%urlsource") {
1718 escaper->escape(
source, value);
1720 }
else if (value ==
"%urldestination") {
1723 }
else if (value ==
"%retries") {
1724 value = std::to_string(
retries);
1725 }
else if (value ==
"%error") {
1730 value +=
" vanished before action started";
1732 }
else if (value ==
"%op") {
1759 logStream <<
"'" <<
source <<
"' ";
1762 logStream <<
" -> ";
1766 logStream <<
" link ";
1770 logStream <<
" rm ";
1774 logStream <<
" mv ";
1778 logStream <<
" vanished before action started";
1782 logStream <<
" was appended to";
1790 logStream <<
" is ignored";
1794 logStream <<
" due to truncation";
1802 logStream <<
" inWork not cleared";
References copyRequest::append, copyRequest::attributeMismatch, continueOnError, copyRequest::done, errMsg::emit(), copyRequest::failed, copyRequest::fileToBeCopied, copyRequest::fileToBeRemoved, copyRequest::fileToBeRenamed, copyRequest::ignore, errMsg::info, copyRequest::inWork, copyRequest::linkToBeMade, escapism::newEscaper(), noCopy, resultOutput, messageQueue::queue::send(), statPrefix, timerInst, copyRequest::trucated, copyRequest::vanished, verbose, and logstream::warning.
◆ printStats()
void copyRequest::base::printStats |
( |
std::ostream & |
stream | ) |
|
|
static |
Definition at line 1822 of file copyRequest.cpp.
1834 auto tElapsed = std::chrono::duration_cast<std::chrono::duration<double>>(clock_type::now() -
tProgramStart).count();
1835 stream <<
statPrefix <<
"avg rate: " <<
bytesStat.sum / (tElapsed * 1024 * 1024) <<
" MiB/s in " << tElapsed <<
" s\n";
1837 stream <<
statPrefix << i <<
" processes at once: "
1838 << (*processMultiplicities)[i] <<
" times\n";
1841 std::multimap<clock_type::duration, std::string> waitMap;
1843 waitMap.emplace(item.second, item.first);
1845 for (
const auto& item : waitMap) {
1846 stream <<
statPrefix <<
" wait time for " << item.second <<
" files: "
1847 << std::chrono::duration_cast<std::chrono::duration<double>>(item.first).count() <<
"s\n";
References timer::anchor::print(), and statPrefix.
Referenced by quietStatPrinter().
◆ process()
Reimplemented in copyRequest::listingRequest.
Definition at line 1109 of file copyRequest.cpp.
1112 [](decltype(*
this)& ) {
1119 workDone = clock_type::now();
1124 [](decltype(
state)& State) {
1131 ThreadData =
nullptr;
1180 stateType dummyState;
1188 "littering with obsolete mv source");
1192 __attribute__ ((fallthrough));
1199 throw std::runtime_error(
"impossible rename retval");
1204 }
catch (
const std::exception& e) {
1217 clock_type::now() -
tEnqueue > std::chrono::seconds(1) ||
1238 errorMessage =
"ignored due to existing copy with same stat";
1255 std::unique_ptr<ioHandle::attrDataType> attrData;
1259 std::unique_ptr<acl::list> aclData;
1261 aclData = input->getAclData();
1268 std::string backupFile;
1290 throw std::runtime_error(
"illegal backupModeType");
1294 std::unique_ptr<outputHandler::base::writer> writeHandle;
1299 std::move(attrData),
1300 std::move(aclData));
1310 input->getBlockSize(),
1313 std::move(attrData),
1314 std::move(aclData));
1318 input->setupSparseRegions(
sparse);
1321 auto startPosition = writeHandle->
getSize();
1323 startPosition = (startPosition / input->getBlockSize()) * input->getBlockSize();
1324 writeHandle->
seek(startPosition);
1325 input->seek(startPosition);
1336 input->checkUnchangedness();
1342 writeHandle->
sync();
1372 }
catch (
const std::exception& e) {
1382 }
catch (
const std::exception& e) {
1392 if (tWorkStart < filesInWorkIterator->second.getEarliestprocessTime()) {
1396 errorMessage += std::to_string(std::chrono::duration_cast<std::chrono::duration<double>>(deltaT).count());
References copyRequest::after, copyRequest::append, copyRequest::before, continueOnError, outputHandler::base::writer::doAttributePreservations(), copyRequest::done, copyRequest::during, errMsg::emit(), copyRequest::failed, outputHandler::base::fileChanged, copyRequest::fileToBeCopied, copyRequest::fileToBeRemoved, copyRequest::fileToBeRenamed, outputHandler::base::fileVanished, outputHandler::base::writer::getSize(), copyRequest::ignore, copyRequest::inWork, copyRequest::linkToBeMade, nConcurrentProcesses, copyRequest::noActionNeeded, noCopy, copyRequest::none, errMsg::notice, outputHandler::base::ok, preserve, copyRequest::remove, outputHandler::base::writer::seek(), outputHandler::base::writer::sync(), and copyRequest::vanished.
◆ processQueue()
Definition at line 1895 of file copyRequest.cpp.
1901 while (
auto request = queue.dequeue()) {
1906 auto dt = request->filesInWorkIterator->second.getEarliestprocessTime()
1907 - clock_type::now();
1908 if (dt > decltype(dt)::zero()) {
1910 request->source,
"delay",
1911 "scheduled too early by ", std::chrono::duration_cast<std::chrono::duration<double>>(dt).count(),
"s");
1912 auto newDelay = request->filesInWorkIterator->second.getEarliestprocessTime() - request->tInotify;
1913 request->adviseDelay(newDelay);
1914 delayedRequests.enqueue(request, dt);
1915 if (request !=
nullptr) {
1916 throw std::runtime_error(
"request not nullptr");
1924 if (!
retry(request, delayedRequests)) {
1926 resultQueue.enqueue(request);
1927 if (request !=
nullptr) {
1928 throw std::runtime_error(
"request not nullptr");
1930 request = eraser.getNext();
1934 }
catch (
const std::exception& e) {
1937 "--",
"process",
"stop requested due to ", e.what());
1941 "--",
"process",
"stop requested due to unknown exception");
References copyRequest::action, errMsg::debug, waitQueues::simple< T >::dequeue(), errMsg::emit(), waitQueues::simple< T >::enqueue(), waitQueues::timed< T, clock_type >::enqueue(), fileRateLimit, copyRequest::fileInWork::eraser::getNext(), stopRequest::processLoop, stopRequest::Requested(), and stopRequest::RequestStop().
◆ reader()
Definition at line 530 of file copyRequest.cpp.
547 readRequests.emplace(s, o);
549 readRequests.signalDone();
550 std::vector<std::thread> workers;
554 for (
auto& worker : workers) {
557 std::ref(readRequests),
558 std::ref(freeBlocks),
559 std::ref(readBlocks),
560 std::ref(exceptions));
563 for (
auto& worker : workers) {
570 throw std::runtime_error(
"got no free block in reader");
575 freeBlocks.enqueue(b);
577 readBlocks.enqueue(b);
586 exceptions.add(std::current_exception());
588 readBlocks.signalDone();
References copyRequest::exceptionList::add(), waitQueues::simple< T >::dequeue(), waitQueues::simple< T >::emplace(), waitQueues::simple< T >::enqueue(), nThreads, ioHandle::parallelizable(), inputHandler::base::reader::readBlock(), readWorker(), waitQueues::simple< T >::signalDone(), waitQueues::simple< T >::size(), and timerInst.
◆ readWorker()
◆ registerIterator()
◆ removeFileOrDirectory()
◆ resetStats()
void copyRequest::base::resetStats |
( |
| ) |
|
|
static |
◆ retry()
bool copyRequest::base::retry |
( |
std::unique_ptr< base > & |
request, |
|
|
timedQueue & |
delayedRequests |
|
) |
| |
|
static |
Definition at line 1866 of file copyRequest.cpp.
1871 static ssize_t lastSize;
1874 std::ifstream statm(
"/proc/self/statm");
1879 auto waitTime = request->tWorkDone - request->tWorkStart + std::chrono::seconds(1);
1880 waitTime *= request->retries;
1882 request->source,
"re-sched",
1883 request->errorMessage,
1884 ", delaying by ", std::fixed, std::chrono::duration_cast<std::chrono::duration<double>>(waitTime).count(),
1885 "s vmsize: ", size,
"(", lastSize,
", ", size - lastSize,
"), retries: ", request->retries);
1886 delayedRequests.enqueue(request, waitTime);
1887 if (request !=
nullptr) {
1888 throw std::runtime_error(
"requesy not null");
References enumAsBitmask< T >::clear(), errMsg::debug, errMsg::emit(), waitQueues::timed< T, clock_type >::enqueue(), errorMessage, copyRequest::failed, copyRequest::fileToBeRemoved, retries, source, state, tWorkDone, and tWorkStart.
◆ setStatPrinter()
void copyRequest::base::setStatPrinter |
( |
void(*)(std::ostream &) |
f | ) |
|
|
static |
◆ waitForAllInstancesGone()
void copyRequest::base::waitForAllInstancesGone |
( |
| ) |
|
|
static |
◆ writer()
◆ writeWorker()
◆ fileInWork
◆ operator<<
std::ostream& operator<< |
( |
std::ostream & |
out, |
|
|
const base & |
request |
|
) |
| |
|
friend |
Definition at line 1952 of file copyRequest.cpp.
1954 out <<
" request " <<
static_cast<const void*
>(&request);
1955 out <<
" for '" << request.source <<
"' to '" << request.destination <<
"'";
1956 out <<
" state " << std::hex << static_cast<unsigned>(request.state) << std::dec;
1957 out <<
" tInotify " << request.tInotify;
1958 out <<
" tEarliest " << request.filesInWorkIterator->second.getEarliestprocessTime();
1959 out <<
" now: " << clock_type::now() <<
"\n";
◆ advisoryWaitMap
std::map<std::string, clock_type::duration> copyRequest::base::advisoryWaitMap |
|
staticprivate |
◆ advisoryWaitMapMutex
std::mutex copyRequest::base::advisoryWaitMapMutex |
|
staticprivate |
◆ append_attrs
◆ appendableFiles
◆ backupMode
◆ backupModeName
◆ backupSuffix
◆ bytesStat
◆ changeAttrsOnIgnoreExisting
◆ check_source_attrs
options::map<std::string> copyRequest::base::check_source_attrs |
|
staticprivate |
◆ checkSums
◆ destination
const std::string copyRequest::base::destination |
|
protected |
◆ destinationPrefixRegex
◆ errorMessage
std::string copyRequest::base::errorMessage |
|
protected |
◆ failure_source_attrs
options::map<std::string> copyRequest::base::failure_source_attrs |
|
staticprivate |
◆ filesInWorkIterator
decltype(fileInWork::filesInWork.begin()) copyRequest::base::filesInWorkIterator = fileInWork::filesInWork.end() |
◆ forceParallelRead
◆ ignoreExisting
◆ linkBaseMap
◆ logFields
◆ mapEntry
◆ maxDelayTime
◆ maxMemoryBlockSize
◆ maxRetries
◆ memoryBlockSize
size_t copyRequest::base::memoryBlockSize |
|
protected |
◆ minMemoryBlockSize
◆ moveSource
const std::string copyRequest::base::moveSource |
|
protected |
◆ nConcurrentProcesses
std::atomic<unsigned int> copyRequest::base::nConcurrentProcesses |
|
staticprotected |
◆ nReadThreads
◆ nSumThreads
◆ nWriteThreads
◆ objectCount
unsigned copyRequest::base::objectCount |
|
staticprivate |
◆ objectCountCondVar
std::condition_variable copyRequest::base::objectCountCondVar |
|
staticprivate |
◆ objectCountMutex
std::mutex copyRequest::base::objectCountMutex |
|
staticprivate |
◆ origSource
const std::string copyRequest::base::origSource |
|
protected |
◆ perFileThreads
◆ prefix
std::string copyRequest::base::prefix |
|
protected |
◆ process_source_attrs
options::map<std::string> copyRequest::base::process_source_attrs |
|
staticprivate |
◆ processMultiplicities
std::vector<std::atomic<unsigned int> >* copyRequest::base::processMultiplicities |
|
static |
◆ prohibitive_attrs
◆ readInitialStat
std::unique_ptr<const genericStat> copyRequest::base::readInitialStat |
|
protected |
◆ retries
unsigned copyRequest::base::retries |
|
protected |
◆ setAttributesAfterClose
◆ source
const std::string copyRequest::base::source |
|
protected |
◆ sourcePrefixRegex
◆ sparse
◆ speedStat
◆ start_source_attrs
options::map<std::string> copyRequest::base::start_source_attrs |
|
staticprivate |
◆ state
◆ statPrinter
std::function<void(std::ostream&)> copyRequest::base::statPrinter |
|
staticprotected |
◆ success_dest_attrs
options::map<std::string> copyRequest::base::success_dest_attrs |
|
staticprivate |
◆ success_source_attrs
options::map<std::string> copyRequest::base::success_source_attrs |
|
staticprivate |
◆ suffix
std::string copyRequest::base::suffix |
|
protected |
◆ syncWrittenFiles
◆ tCopyStat
◆ tEnqueue
clock_type::time_point copyRequest::base::tEnqueue |
|
protected |
◆ tEnqueueStat
◆ threadData
◆ tInotify
clock_type::time_point copyRequest::base::tInotify |
|
protected |
◆ tInotifyStat
◆ tPipeStat
◆ tProgramStart
clock_type::time_point copyRequest::base::tProgramStart |
|
staticprotected |
◆ tTotal2Stat
◆ tTotalStat
◆ tWaitStat
◆ tWorkDone
clock_type::time_point copyRequest::base::tWorkDone |
|
protected |
◆ tWorkStart
clock_type::time_point copyRequest::base::tWorkStart |
|
protected |
◆ userReadBlockSize
◆ userWriteBlockSize
The documentation for this class was generated from the following files:
static options::single< size_t > userWriteBlockSize
static std::map< std::string, fileInWork > filesInWork
maps file names to filesInWork objects
static statCollector::typed< decltype(tWorkStart - tEnqueue) > tWaitStat
const std::string moveSource
options::single< bool > verbose
options::single< bool > dereference
class for exceptions that advise for delays Exceptions of this kind are to be thrown when circumstanc...
virtual void setXattr(const std::string &, const std::string &)
inputHandler::base * InputHandler
static options::single< unsigned > nReadThreads
virtual void ensureParentDirs(const std::string &path, const std::string &srcPath, inputHandler::base *InputHandler)=0
options::single< bool > noCopy
virtual renameRetvalType rename(const std::string &fromPath, const std::unique_ptr< const genericStat > &readInitialStat, const std::string &toPath, copyRequest::stateType &state)=0
class for defining the location of a error message in the source code.
virtual void removeXattr(const std::string &)
waitQueues::simple< block > blockQueue
static statCollector::typed< double > speedStat
static statCollector::typed< copyRequest::clock_type::duration > tEnqueueStat
static options::single< bool > perFileThreads
options::single< std::string > statPrefix
static statCollector::typed< decltype(tWorkDone - tInotify) > tTotalStat
static options::single< std::regex > sourcePrefixRegex
class for exceptions that should result in program termination
static options::map< std::string > success_dest_attrs
virtual bool fIsSet() const
check if this option was set, regardless of from command line or config file
const std::string & getSuffix() const
static options::single< size_t > maxMemoryBlockSize
virtual void writeBlockP(const block &)
static backupModeType backupMode
virtual std::unique_ptr< writer > newWriter(const std::string &path, bool mightAppend, size_t sourceSize, size_t readBlockSize, copyRequest::stateType &state, bool noWrite, std::unique_ptr< ioHandle::attrDataType > attrData, std::unique_ptr< acl::list > aclData)=0
virtual std::unique_ptr< writer > newTmpWriter(std::string &path, size_t sourceSize, bool noWrite, std::unique_ptr< ioHandle::attrDataType > attrData, std::unique_ptr< acl::list > aclData)
static statCollector::typed< decltype(tWorkDone - tInotify) > tInotifyStat
bool makeSymLink(inputHandler::base *InputHandler, outputHandler::base *OutputHandler)
static std::mutex advisoryWaitMapMutex
virtual std::string getXattr(const std::string &)
get one extended attribute value
static statCollector::typed< decltype(readInitialStat->size) > bytesStat
decltype(fileInWork::filesInWork.begin()) filesInWorkIterator
static bool retry(std::unique_ptr< base > &request, timedQueue &delayedRequests)
clock_type::time_point tWorkStart
void changeRequestType(stateBitType newType)
static options::map< std::string > check_source_attrs
static options::map< std::string > failure_source_attrs
static std::vector< std::atomic< unsigned int > > * processMultiplicities
clock_type::time_point tEnqueue
void doBlockSizeSetup(ioHandle &input, ioHandle &output)
clock_type::time_point tInotify
void attrset(ioHandle &writeHandle, const std::map< std::string, std::string > &attrs)
bool expandAttrValue(std::string &value)
expand the keywords in value, see Expansion of key words in attributes
void reader(inputHandler::base::reader &input, blockQueue &freeBlocks, blockQueue &readBlocks, bool mayParallelize, exceptionList &exceptions)
virtual size_t getBlockSize() const
const singleMap & mapEntry
void doUnthreadedCopy(inputHandler::base::reader &input, std::unique_ptr< outputHandler::base::writer > &writeHandle)
const std::string origSource
static options::map< std::string > success_source_attrs
static options::map< std::string > process_source_attrs
std::string getBackupSuffix()
static options::container< std::string > logFields
static void print(std::ostream &out, const std::string &prefix)
static options::single< unsigned > nThreads('n', "nThreads", "number of threads", 0)
static options::single< double > maxDelayTime
static options::single< bool > setAttributesAfterClose
static statCollector::typed< decltype(tWorkDone - tWorkStart) > tCopyStat
static void attrdel(ioHandle &, const std::map< std::string, std::string > &attrs)
static options::container< std::string > append_attrs
throttle::watch fileRateLimit
void init(inputHandler::base *InputHandler)
virtual void send(const std::string &aMessage, const std::string &aTopic="")=0
static options::container< std::string > prohibitive_attrs
std::unique_ptr< const genericStat > readInitialStat
static options::single< unsigned > maxRetries
virtual bool pathExists(const std::string &path)=0
virtual void remove(const std::string &path, copyRequest::stateType &state)=0
static statCollector::typed< decltype(tEnqueue - tInotify) > tPipeStat
static options::map< std::string > start_source_attrs
void removeFileOrDirectory(inputHandler::base *InputHandler, outputHandler::base *OutputHandler)
virtual bool renameSimple(const std::string &fromPath, const std::string &toPath)=0
static options::single< bool > syncWrittenFiles
waitQueues::simple< blockReadRequest > blockReadRequestQueue
static options::single< std::string > backupSuffix
static options::single< unsigned > nWriteThreads
static options::single< size_t > userReadBlockSize
static statCollector::typed< decltype(tWorkDone - tInotify) > tTotal2Stat
static std::condition_variable objectCountCondVar
decltype(preserve) preserve
set of properties to preserve in the copy
perThreadData * threadData
virtual void seek(size_t position)
void checkAttributes(ioHandle &, const std::map< std::string, std::string > &check_source_attr)
virtual std::string getXattr(const std::string &, const std::string &)
static const escapism * newEscaper(const std::string &name)
static options::single< bool > forceParallelRead
virtual std::unique_ptr< const genericStat > getStat(const std::string &path, bool followLink=true)=0
static std::atomic< unsigned int > nConcurrentProcesses
decltype(checksumCreators) checksumCreators
void emit(level aLogLevel, const location &loc, const std::string &aObject, const std::string &aAction, const Args &... args)
function to create and enqueue a message, this is the only way that messages should be created!
#define timerInst(subfunc)
virtual void doAttributePreservations(const genericStat &readInitialStat)=0
outputHandler::base * OutputHandler
clock_type::time_point tWorkDone
options::single< bool > continueOnError
static unsigned objectCount
static options::single< std::regex > destinationPrefixRegex
static std::map< std::string, clock_type::duration > advisoryWaitMap
virtual size_t getSize() const
static factoryClass * newFactory(const std::string &aName)
static options::single< std::regex > appendableFiles
static std::function< void(std::ostream &)> statPrinter
static bool adviseDelay(clock_type::duration dt, const std::string &suffix)
update the advised delay in the map.
std::unique_ptr< T > dequeue(bool mayCreateNew, Types ... args)
virtual void writeBlock(const block &b)=0
static std::mutex objectCountMutex
void readWorker(inputHandler::base::reader &input, blockReadRequestQueue &blockRequests, blockQueue &freeBlocks, blockQueue &readBlocks, exceptionList &exceptions)
messageQueue::queue * resultOutput
void clear(const T aBits)
static options::single< bool > ignoreExisting
static void RequestStop(const std::string &aReason)
const std::string destination
static clock_type::time_point tProgramStart
class to get copyRequest out of the fileInWork list.
static options::single< std::string > sparse
std::list< checksum::base * > checkSums
virtual void doAttributePreservations(const std::string &, const genericStat &)
virtual void setBlockSize(size_t newSize)
virtual void update(void *data, size_t size, size_t offset)=0
void doThreadedCopy(inputHandler::base::reader &input, std::unique_ptr< outputHandler::base::writer > &writeHandle)
static std::map< std::string, void(base::*)(std::string &) const > & keywordMap()
static options::single< bool > changeAttrsOnIgnoreExisting
static options::single< size_t > minMemoryBlockSize
virtual bool parallelizable() const
tell if this handler is capable of parallel IO. Unsually not the case