ewmscp  ..
Classes | Public Member Functions | Static Public Member Functions | Public Attributes | Static Public Attributes | Protected Member Functions | Protected Attributes | Static Protected Attributes | Private Types | Private Member Functions | Static Private Member Functions | Static Private Attributes | Friends | List of all members
copyRequest::base Class Reference

class for copy requests. More...

#include <copyRequest.h>

Inheritance diagram for copyRequest::base:
[legend]
Collaboration diagram for copyRequest::base:
[legend]

Classes

class  blockReadRequest
 
class  hashCalculator
 
class  pointerCompare
 
class  registerme
 class to register the keyword expansion member functions in the map More...
 
class  writeActor
 

Public Member Functions

const genericStatgetInitialStat () const
 
const std::string & getSource () const
 
clock_type::duration getAdvisedDelay () const
 returns the advised delay More...
 
void registerIterator (decltype(filesInWorkIterator) iter)
 
decltype(filesInWorkIteratorgetFilesInWorkIteraror () const
 
fileInWork::slotTypes getSlotType () const
 
void changeRequestType (stateBitType newType)
 
 base (inputHandler::base *InputHandler, const std::string &aSource, const std::string &aDestination, std::unique_ptr< const genericStat > &aStat, const singleMap &aMapEntry, bool remove, clock_type::time_point timestamp)
 constructor for copy,link and remove requests. More...
 
 base (inputHandler::base *InputHandler, const std::string &aSource, const std::string &aDestination, const singleMap &aMapEntry, bool remove, clock_type::time_point timestamp)
 constructor for copy,link and remove requests. More...
 
 base (inputHandler::base *InputHandler, const std::string &aSource, const std::string &aDestination, const std::string &aMoveSource, const std::string &aOrigSource, const singleMap &aMapEntry, clock_type::time_point timestamp)
 constructor for rename requests only. More...
 
virtual ~base () noexcept(false)
 
virtual void process (perThreadData &threadData)
 
virtual void printResults (std::ostream &hashStream, std::ostream &logStream)
 
const std::string & getSuffix () const
 
void addExpectedChecksumResult (const std::string &checkSumType, const std::string &expectedValue)
 
void kw_size (std::string &value) const
 
void kw_mtime (std::string &value) const
 
void kw_inow (std::string &value) const
 
void kw_now (std::string &value) const
 
void kw_version (std::string &value) const
 
void kw_commit (std::string &value) const
 
void kw_prefix (std::string &value) const
 
void kw_reason (std::string &value) const
 
void kw_state (std::string &value) const
 
void kw_finishtime (std::string &value) const
 

Static Public Member Functions

static std::map< std::string, void(base::*)(std::string &) const > & keywordMap ()
 
static bool adviseDelay (clock_type::duration dt, const std::string &suffix)
 update the advised delay in the map. More...
 
static void printStats (std::ostream &stream)
 
static void resetStats ()
 
static bool retry (std::unique_ptr< base > &request, timedQueue &delayedRequests)
 
static void waitForAllInstancesGone ()
 
static bool checkForInstances ()
 
static std::function< void(std::ostream &)> & getStatPrinter ()
 we can't have static functions virtual, so we explicitly use a function to give us the proper print function, which can be changed in derived classes. More...
 
static void setStatPrinter (void(*f)(std::ostream &))
 
static void processQueue (simpleQueue &queue, simpleQueue &resultQueue, timedQueue &delayedRequests)
 
static void getSuffix (const std::string &path, std::string &suffix)
 

Public Attributes

decltype(fileInWork::filesInWork.begin()) filesInWorkIterator = fileInWork::filesInWork.end()
 

Static Public Attributes

static options::single< bool > setAttributesAfterClose
 
static options::single< bool > perFileThreads
 
static options::single< size_t > userWriteBlockSize
 
static options::single< size_t > userReadBlockSize
 
static options::single< size_t > maxMemoryBlockSize
 
static options::single< size_t > minMemoryBlockSize
 
static statCollector::typed< copyRequest::clock_type::duration > tEnqueueStat
 
static std::vector< std::atomic< unsigned int > > * processMultiplicities
 

Protected Member Functions

void adviseDelay (clock_type::duration dt=clock_type::duration::zero())
 

Protected Attributes

std::unique_ptr< const genericStatreadInitialStat
 
std::list< checksum::base * > checkSums
 
perThreadDatathreadData
 
stateType state
 
std::string errorMessage
 
const std::string source
 
const std::string destination
 
const std::string moveSource
 
const std::string origSource
 
const singleMapmapEntry
 
std::string suffix
 
std::string prefix
 
size_t memoryBlockSize
 
clock_type::time_point tInotify
 
clock_type::time_point tEnqueue
 
clock_type::time_point tWorkStart
 
clock_type::time_point tWorkDone
 
unsigned retries
 

Static Protected Attributes

static statCollector::typed< decltype(tWorkDone - tInotify) > tTotalStat
 
static statCollector::typed< decltype(tWorkDone - tInotify) > tTotal2Stat
 
static statCollector::typed< decltype(tWorkDone - tInotify) > tInotifyStat
 
static statCollector::typed< decltype(tEnqueue - tInotify) > tPipeStat
 
static statCollector::typed< decltype(tWorkStart - tEnqueue) > tWaitStat
 
static statCollector::typed< decltype(tWorkDone - tWorkStart) > tCopyStat
 
static statCollector::typed< decltype(readInitialStat->size) > bytesStat
 
static statCollector::typed< double > speedStat
 
static clock_type::time_point tProgramStart
 
static std::atomic< unsigned int > nConcurrentProcesses
 
static options::single< double > maxDelayTime
 
static std::function< void(std::ostream &)> statPrinter
 

Private Types

typedef waitQueues::simple< blockReadRequestblockReadRequestQueue
 

Private Member Functions

bool expandAttrValue (std::string &value)
 expand the keywords in value, see Expansion of key words in attributes More...
 
void attrset (ioHandle &writeHandle, const std::map< std::string, std::string > &attrs)
 
void checkAttributes (ioHandle &, const std::map< std::string, std::string > &check_source_attr)
 
void hash_worker (checksum::parallel *sum, blockQueue &blocksToHash, blockQueue &hashedBlocks, exceptionList &exceptions)
 
void hasher (blockQueue &blocksToHash, blockQueue &hashedBlocks, exceptionList &exceptions)
 
void readWorker (inputHandler::base::reader &input, blockReadRequestQueue &blockRequests, blockQueue &freeBlocks, blockQueue &readBlocks, exceptionList &exceptions)
 
void reader (inputHandler::base::reader &input, blockQueue &freeBlocks, blockQueue &readBlocks, bool mayParallelize, exceptionList &exceptions)
 
void writer (std::unique_ptr< outputHandler::base::writer > &writeHandle, blockQueue &blocksToWrite, blockQueue &writtenBlocks, exceptionList &exceptions)
 
void writeWorker (std::unique_ptr< outputHandler::base::writer > &writeHandle, blockQueue &blocksToWrite, blockQueue &writtenBlocks, exceptionList &exceptions)
 
void doThreadedCopy (inputHandler::base::reader &input, std::unique_ptr< outputHandler::base::writer > &writeHandle)
 
void doUnthreadedCopy (inputHandler::base::reader &input, std::unique_ptr< outputHandler::base::writer > &writeHandle)
 
void doBlockSizeSetup (ioHandle &input, ioHandle &output)
 
bool makeSymLink (inputHandler::base *InputHandler, outputHandler::base *OutputHandler)
 
void removeFileOrDirectory (inputHandler::base *InputHandler, outputHandler::base *OutputHandler)
 
std::string getBackupSuffix ()
 
void init (inputHandler::base *InputHandler)
 
virtual bool compare (const base &rhs) const
 

Static Private Member Functions

static const std::map< std::string, backupModeType > & getBackupModeNameMap ()
 
static void attrdel (ioHandle &, const std::map< std::string, std::string > &attrs)
 

Static Private Attributes

static options::map< std::string > success_source_attrs
 
static options::map< std::string > failure_source_attrs
 
static options::map< std::string > process_source_attrs
 
static options::map< std::string > start_source_attrs
 
static options::map< std::string > check_source_attrs
 
static options::map< std::string > success_dest_attrs
 
static options::container< std::string > prohibitive_attrs
 
static options::container< std::string > append_attrs
 
static options::single< std::regex > appendableFiles
 
static options::single< bool > ignoreExisting
 
static options::single< bool > changeAttrsOnIgnoreExisting
 
static options::single< unsigned > maxRetries
 
static options::map< std::string > linkBaseMap
 
static options::single< std::string > sparse
 
static options::withAction< options::single< std::string > > backupModeName
 
static backupModeType backupMode
 
static options::single< std::string > backupSuffix
 
static options::single< bool > syncWrittenFiles
 
static options::single< unsigned > nSumThreads
 
static options::single< unsigned > nReadThreads
 
static options::single< bool > forceParallelRead
 
static options::single< unsigned > nWriteThreads
 
static options::container< std::string > logFields
 
static options::single< std::regex > sourcePrefixRegex
 
static options::single< std::regex > destinationPrefixRegex
 
static std::map< std::string, clock_type::duration > advisoryWaitMap
 
static std::mutex advisoryWaitMapMutex
 
static std::mutex objectCountMutex
 
static unsigned objectCount
 
static std::condition_variable objectCountCondVar
 

Friends

class fileInWork
 
std::ostream & operator<< (std::ostream &out, const base &request)
 

Detailed Description

class for copy requests.

Well, in fact not only copy but also rename, link and delete requests. Objects of this class are created by either from the command line or in The follow-mode. from a command stream, kept in queues and precessed as resources are available. The instances hold all data needed to perform the required operation and stores performance data and a result.

Definition at line 99 of file copyRequest.h.

Member Typedef Documentation

◆ blockReadRequestQueue

Definition at line 223 of file copyRequest.h.

Constructor & Destructor Documentation

◆ base() [1/3]

copyRequest::base::base ( inputHandler::base InputHandler,
const std::string &  aSource,
const std::string &  aDestination,
std::unique_ptr< const genericStat > &  aStat,
const singleMap aMapEntry,
bool  remove,
clock_type::time_point  timestamp 
)

constructor for copy,link and remove requests.

Definition at line 1517 of file copyRequest.cpp.

1523  :
1524  readInitialStat(std::move(aStat)),
1526  source(aSource),
1527  destination(aDestination),
1528  mapEntry(aMapEntry),
1529  tInotify(timestamp),
1530  retries(0) {
1531  init(InputHandler);
1532 }

References copyRequest::fileToBeCopied, copyRequest::fileToBeRemoved, init(), and copyRequest::remove.

Here is the call graph for this function:

◆ base() [2/3]

copyRequest::base::base ( inputHandler::base InputHandler,
const std::string &  aSource,
const std::string &  aDestination,
const singleMap aMapEntry,
bool  remove,
clock_type::time_point  timestamp 
)

constructor for copy,link and remove requests.

Definition at line 1533 of file copyRequest.cpp.

1538  :
1540  source(aSource),
1541  destination(aDestination),
1542  mapEntry(aMapEntry),
1543  tInotify(timestamp),
1544  retries(0) {
1545  init(InputHandler);
1546 }

References copyRequest::fileToBeCopied, copyRequest::fileToBeRemoved, init(), and copyRequest::remove.

Here is the call graph for this function:

◆ base() [3/3]

copyRequest::base::base ( inputHandler::base InputHandler,
const std::string &  aSource,
const std::string &  aDestination,
const std::string &  aMoveSource,
const std::string &  aOrigSource,
const singleMap aMapEntry,
clock_type::time_point  timestamp 
)

constructor for rename requests only.

Definition at line 1547 of file copyRequest.cpp.

1553  :
1555  source(aSource),
1556  destination(aDestination),
1557  moveSource(aMoveSource),
1558  origSource(aOrigSource),
1559  mapEntry(aMapEntry),
1560  tInotify(timestamp),
1561  retries(0) {
1562  init(InputHandler);
1563 }

References copyRequest::fileToBeRenamed, and init().

Here is the call graph for this function:

◆ ~base()

copyRequest::base::~base ( )
virtualnoexcept

Definition at line 1565 of file copyRequest.cpp.

1565  {
1566  {
1567  std::unique_lock<decltype(objectCountMutex)> lock(objectCountMutex);
1568  --objectCount;
1569  if (objectCount == 0) {
1570  objectCountCondVar.notify_all();
1571  }
1572  }
1573 
1574  if (state & stateBitType::ignore) {
1575  return;
1576  }
1577 
1581 
1583  auto dtCopy = tWorkDone - tWorkStart;
1584  auto tModification = readInitialStat->getMtime();
1585  auto d = tInotify - tModification;
1586  tTotal2Stat.addValue(tWorkDone - tModification);
1588  tCopyStat.addValue(dtCopy);
1589  if (readInitialStat) {
1590  bytesStat.addValue(readInitialStat->size);
1591  }
1592  auto dtCopyReal = std::chrono::duration_cast<std::chrono::duration<double>>(dtCopy).count();
1593 
1594  if (dtCopyReal > 0) {
1595  speedStat.addValue(readInitialStat->size / (1024. * 1024) / dtCopyReal);
1596  }
1597  }
1598 }

References copyRequest::done, copyRequest::fileToBeCopied, and copyRequest::ignore.

Member Function Documentation

◆ addExpectedChecksumResult()

void copyRequest::base::addExpectedChecksumResult ( const std::string &  checkSumType,
const std::string &  expectedValue 
)

Definition at line 1601 of file copyRequest.cpp.

1602  {
1603  for (auto& cksum : checkSums) {
1604  if (cksum->getName() == checkSumType) {
1605  cksum->setExpectedResult(expectedValue);
1606  return;
1607  }
1608  }
1609  auto checksumCreator = checksum::base::newFactory(checkSumType);
1610  checksumCreators.push_front(checksumCreator);
1611  auto cksum = checksumCreator->create();
1612  cksum->setExpectedResult(expectedValue);
1613  checkSums.push_front(cksum);
1614 }

References checksumCreators, and checksum::base::newFactory().

Here is the call graph for this function:

◆ adviseDelay() [1/2]

bool copyRequest::base::adviseDelay ( clock_type::duration  dt,
const std::string &  suffix 
)
static

update the advised delay in the map.

Creates entries in the advisoryWaitMap or updates them. Is to be called when a copy request failed due to changes to the source file, the advised delay will be the time difference between the notofication by The inotify_watch command. and the current time.

Definition at line 1011 of file copyRequest.cpp.

1012  {
1013  if (suffix.empty() || dt < clock_type::duration::zero()) {
1014  return false;
1015  }
1016  if (std::chrono::duration_cast<std::chrono::duration<double>>(dt).count() > maxDelayTime) {
1017  dt = std::chrono::duration_cast<decltype(dt)>(std::chrono::duration<double>(maxDelayTime));
1018  }
1019  bool delayChanged = false;
1020  {
1021  std::unique_lock<decltype(advisoryWaitMapMutex)> lock(advisoryWaitMapMutex);
1022  auto result = advisoryWaitMap.emplace(suffix, dt);
1023  if (result.second == false) { // we already have an entry
1024  auto it = result.first;
1025  if (it->second < dt) { // apparently we must wait longer
1026  it->second = dt;
1027  delayChanged = true;
1028  }
1029  } else {
1030  delayChanged = true;
1031  }
1032  }
1033  if (delayChanged) {
1035  "-- no path --", "delay change",
1036  "set wait time for '", suffix, "' files to ",
1037  std::fixed, std::chrono::duration_cast<std::chrono::duration<double>>(dt).count(), "s");
1038  }
1039  return delayChanged;
1040 }

References errMsg::emit(), and errMsg::notice.

Referenced by followRequestProvider::enqueueOrAppend().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ adviseDelay() [2/2]

void copyRequest::base::adviseDelay ( clock_type::duration  dt = clock_type::duration::zero())
protected

Definition at line 1043 of file copyRequest.cpp.

1043  {
1044  if (dt == decltype(dt)::zero()) {
1045  dt = clock_type::now() - tInotify;
1046  }
1047 
1048  if (adviseDelay(dt, getSuffix())) {
1050  return;
1051  }
1052  filesInWorkIterator->second.setWaitTime(dt);
1053  }
1054 }

References copyRequest::fileInWork::filesInWork.

◆ attrdel()

void copyRequest::base::attrdel ( ioHandle handle,
const std::map< std::string, std::string > &  attrs 
)
staticprivate

Definition at line 481 of file copyRequest.cpp.

481  {
482  for (const auto& attr : attrs) {
483  handle.removeXattr(attr.first);
484  }
485 }

References ioHandle::removeXattr().

Here is the call graph for this function:

◆ attrset()

void copyRequest::base::attrset ( ioHandle writeHandle,
const std::map< std::string, std::string > &  attrs 
)
private

Definition at line 446 of file copyRequest.cpp.

447  {
448  for (const auto& attr : attrs) {
449  auto name = attr.first;
450  auto value = attr.second;
451 
452  if (!expandAttrValue(value)) {
453  if (value == "%count") { // special expansion, only useful here
454  unsigned int count = 0;
455  auto retval = writeHandle.getXattr(name);
456  if (!retval.empty()) {
457  try {
458  count = std::stoul(retval);
459  } catch (...) {
461  source, "xattr count",
462  "strange value '", retval, "'");
463  count = 0;
464  }
465  }
466  value = std::to_string(count + 1);
467  } else if (value == "%sums") { // special expansion only useful here
468  for (const auto& item : checkSums) {
469  std::string fullName(name);
470  fullName += item->getName();
471  writeHandle.setXattr(fullName, item->getResult());
472  }
473  continue;
474  }
475  }
476  writeHandle.setXattr(name, value);
477  }
478 }

References errMsg::debug, errMsg::emit(), ioHandle::getXattr(), and ioHandle::setXattr().

Here is the call graph for this function:

◆ changeRequestType()

void copyRequest::base::changeRequestType ( stateBitType  newType)

Definition at line 1072 of file copyRequest.cpp.

1072  {
1073  fileInWork::typeChanger changer(*this, newType);
1074 }

◆ checkAttributes()

void copyRequest::base::checkAttributes ( ioHandle handle,
const std::map< std::string, std::string > &  check_source_attr 
)
private

Definition at line 486 of file copyRequest.cpp.

486  {
487  for (const auto& attr : attrs) {
488  auto name = attr.first;
489  auto value = attr.second;
490  expandAttrValue(value);
491  auto fileValue = handle.getXattr(name);
492 
493  if (fileValue.empty()) {
495  errorMessage += "attr " + name + " missing ";
496  continue;
497  }
498 
499  if (value != fileValue) {
501  errorMessage += "attr " + name + " mismatch " + value + " i.o. " + fileValue;
502  }
503  }
504 
506  throw std::runtime_error(errorMessage);
507  }
508 }

References copyRequest::attributeMismatch, and ioHandle::getXattr().

Here is the call graph for this function:

◆ checkForInstances()

bool copyRequest::base::checkForInstances ( )
static

Definition at line 1427 of file copyRequest.cpp.

1427  {
1428  std::unique_lock<decltype(objectCountMutex)> lock(objectCountMutex);
1429  return (objectCount > 0);
1430 }

◆ compare()

virtual bool copyRequest::base::compare ( const base rhs) const
inlineprivatevirtual

Reimplemented in copyRequest::listingRequest.

Definition at line 374 of file copyRequest.h.

374  {
375  return readInitialStat->size > rhs.readInitialStat->size;
376  }

References readInitialStat.

Referenced by copyRequest::base::pointerCompare::operator()().

Here is the caller graph for this function:

◆ doBlockSizeSetup()

void copyRequest::base::doBlockSizeSetup ( ioHandle input,
ioHandle output 
)
private

Definition at line 1404 of file copyRequest.cpp.

1405  {
1408  }
1411  }
1412 
1413  memoryBlockSize = input.getBlockSize() * output.getBlockSize();
1414  // sanitize block sizes, ensure they at least fit
1417  input.setBlockSize(std::min(input.getBlockSize(), memoryBlockSize));
1418  output.setBlockSize(std::min(output.getBlockSize(), memoryBlockSize));
1419 }

References ioHandle::getBlockSize(), maxMemoryBlockSize, minMemoryBlockSize, ioHandle::setBlockSize(), userReadBlockSize, and userWriteBlockSize.

Here is the call graph for this function:

◆ doThreadedCopy()

void copyRequest::base::doThreadedCopy ( inputHandler::base::reader input,
std::unique_ptr< outputHandler::base::writer > &  writeHandle 
)
private

Definition at line 813 of file copyRequest.cpp.

814  {
815  blockQueue blocksToWrite;
816  blockQueue blocksToHash;
817  #ifdef WithMagic
818  blockQueue blocksForMagic;
819  #endif
820  exceptionList exceptions;
821 
822  scoped::generic < decltype(threadData->freeBlocks) > freeBlockReseter(threadData->freeBlocks,
823  [](decltype(threadData->freeBlocks)& freeBlocks) {
824  freeBlocks.resetDone();
825  });
826  {
827  #ifdef WithMagic
828  magicCalculator magicThread(blocksForMagic, threadData->freeBlocks, this, exceptions);
829  auto& lastQueue = magicThread.joinable() ?
830  blocksForMagic : // with magic thread
831  threadData->freeBlocks; // without magic thread
832  #else
833  auto& lastQueue = threadData->freeBlocks;
834  #endif
835  hashCalculator hashThread(blocksToHash, lastQueue, this,
836  (writeHandle->parallelizable() && nWriteThreads > 1) || noCopy, exceptions);
837  auto& writerOutputQueue = hashThread.joinable() ?
838  blocksToHash : // with hash thread
839  lastQueue; // without hash thread
840  writeActor writeThread(writeHandle,
841  blocksToWrite,
842  writerOutputQueue,
843  hashThread.parallelized(),
844  this,
845  exceptions);
846  auto& readerOutputQueue = writeThread.joinable() ? // with write thread
847  blocksToWrite :
848  ( // without write thread
849  hashThread.joinable() ? // with hash thred
850  blocksToHash : // we read and hash
851  lastQueue // apparently we just read and discard the result
852  );
853 
854  reader(input, threadData->freeBlocks, readerOutputQueue,
855  hashThread.parallelized() && (writeHandle->parallelizable() || noCopy), exceptions);
856  } // hashThread and writeThread go out of scope and are joined
857 
858  if (!exceptions.empty()) {
859  auto n = exceptions.size();
860  if (n > 1) {
862  source, "threaded copy", "we have ", n, " exceptions at once");
863  }
864  std::rethrow_exception(exceptions.front());
865  }
866 }

References errMsg::emit(), copyRequest::exceptionList::empty(), copyRequest::exceptionList::front(), copyRequest::base::hashCalculator::joinable(), copyRequest::base::writeActor::joinable(), noCopy, ioHandle::parallelizable(), copyRequest::base::hashCalculator::parallelized(), copyRequest::exceptionList::size(), and errMsg::warning.

Here is the call graph for this function:

◆ doUnthreadedCopy()

void copyRequest::base::doUnthreadedCopy ( inputHandler::base::reader input,
std::unique_ptr< outputHandler::base::writer > &  writeHandle 
)
private

Definition at line 868 of file copyRequest.cpp.

868  {
869  auto& freeBlocks = threadData->freeBlocks;
870  auto b = freeBlocks.dequeue(freeBlocks.empty(), memoryBlockSize);
871  if (b == nullptr) {
872  throw std::runtime_error("got no block for unthreaded copy");
873  }
874  for (;;) {
875  auto lastblock = input.readBlock(*b);
876 
877  if (!checkSums.empty() && !(state & stateBitType::append)) {
879  if (b->isHole()) {
880  for (auto sum : checkSums) {
881  sum->update(b->size());
882  }
883  } else {
884  for (auto sum : checkSums) {
885  sum->update(b->bufferAt(0), b->size());
886  }
887  }
888  }
889 
890  if (!noCopy) {
891  writeHandle->writeBlock(*b);
892  }
893  #ifdef WithMagic
894  if (doMagic) {
895  innerMagic(*b);
896  }
897  #endif
898  if (lastblock) {
899  break;
900  }
901  }
902 
903  for (auto sum : checkSums) {
904  sum->finish();
905  }
906  if (b != nullptr) {
907  freeBlocks.enqueue(b);
908  } else {
910  source, "unthreaded copy", "block ptr is nullptr after copy");
911  }
912 }

References copyRequest::append, block::bufferAt(), errMsg::crit, errMsg::emit(), block::isHole(), noCopy, inputHandler::base::reader::readBlock(), block::size(), timerInst, and outputHandler::base::writer::writeBlock().

Here is the call graph for this function:

◆ expandAttrValue()

bool copyRequest::base::expandAttrValue ( std::string &  value)
private

expand the keywords in value, see Expansion of key words in attributes

Returns
true if a keyword was found, false if not

Definition at line 365 of file copyRequest.cpp.

365  {
366  auto it = keywordMap().find(value);
367  if (it != keywordMap().end()) {
368  auto f = it->second;
369  (this ->* f)(value);
370  return true;
371  }
372 
373  if (value.size() > 1 && value.at(0) == '%') {
374  auto keyword = value.substr(1);
375  for (const auto& cksum : checkSums) {
376  if (cksum->getName() == keyword) {
377  value = cksum->getResult();
378  return true;
379  }
380  }
381  }
382 
383  return false; // no change...
384 }

References f().

Here is the call graph for this function:

◆ getAdvisedDelay()

copyRequest::clock_type::duration copyRequest::base::getAdvisedDelay ( ) const

returns the advised delay

retuns special duration value zero if no delay is advised.

Locking the map is not needed here because read access to containers where no elements are removed (which does not happen here) is thread safe.

Definition at line 1059 of file copyRequest.cpp.

1059  {
1060  auto it = advisoryWaitMap.find(getSuffix());
1061  if (it == advisoryWaitMap.end()) {
1062  return clock_type::duration::zero();
1063  }
1064  return it->second;
1065 }

Referenced by followRequestProvider::enqueueOrAppend().

Here is the caller graph for this function:

◆ getBackupModeNameMap()

const std::map< std::string, copyRequest::backupModeType > & copyRequest::base::getBackupModeNameMap ( )
staticprivate

Definition at line 235 of file copyRequest.cpp.

235  {
236  static std::map<std::string, backupModeType> backupModeNameMap = {
242  };
243  return backupModeNameMap;
244 }

References copyRequest::after, copyRequest::before, copyRequest::during, copyRequest::none, and copyRequest::remove.

◆ getBackupSuffix()

std::string copyRequest::base::getBackupSuffix ( )
private

Definition at line 1077 of file copyRequest.cpp.

1077  {
1078  if (backupSuffix.empty()) {
1079  throw std::logic_error("backup suffix must not be empty");
1080  }
1081  std::string bkgSuffix;
1082  decltype(backupSuffix.find("bla")) index = 0;
1083  while (index != decltype(backupSuffix)::npos) {
1084  auto newIndex = backupSuffix.find_first_of('%', index);
1085  bkgSuffix += backupSuffix.substr(index, newIndex - index);
1086  if (newIndex != decltype(backupSuffix)::npos) {
1087  newIndex++;
1088  if (newIndex < backupSuffix.size()) {
1089  if (backupSuffix.at(newIndex) == '{') {
1090  newIndex++;
1091  auto endmarker = backupSuffix.find_first_of('}', newIndex);
1092  auto part = "%" + backupSuffix.substr(newIndex, endmarker - newIndex);
1093  expandAttrValue(part);
1094  bkgSuffix += part;
1095  newIndex = endmarker + 1;
1096  } else {
1097  auto part = backupSuffix.substr(newIndex - 1);
1098  expandAttrValue(part);
1099  bkgSuffix += part;
1100  newIndex = decltype(backupSuffix)::npos;
1101  }
1102  }
1103  }
1104  index = newIndex;
1105  }
1106  return bkgSuffix;
1107 }

◆ getFilesInWorkIteraror()

decltype(filesInWorkIterator) copyRequest::base::getFilesInWorkIteraror ( ) const
inline

Definition at line 345 of file copyRequest.h.

345  {
346  return filesInWorkIterator;
347  }

References filesInWorkIterator.

◆ getInitialStat()

const genericStat& copyRequest::base::getInitialStat ( ) const
inline

Definition at line 273 of file copyRequest.h.

273  {
274  return *readInitialStat;
275  }

References readInitialStat.

Referenced by copyRequest::listingRequest::compare().

Here is the caller graph for this function:

◆ getSlotType()

fileInWork::slotTypes copyRequest::base::getSlotType ( ) const
inline

Definition at line 348 of file copyRequest.h.

348  {
352  }
355  }
358  }
359  throw std::logic_error("no slot type detected");
360  }

References copyRequest::fileInWork::copy, copyRequest::fileToBeCopied, copyRequest::fileToBeRemoved, copyRequest::fileToBeRenamed, copyRequest::linkToBeMade, copyRequest::fileInWork::move, state, and copyRequest::fileInWork::unlink.

Referenced by followRequestProvider::enqueueOrAppend(), copyRequest::fileInWork::inserter::enRegister(), and copyRequest::fileInWork::inserter::markForQueueing().

Here is the caller graph for this function:

◆ getSource()

const std::string& copyRequest::base::getSource ( ) const
inline

Definition at line 292 of file copyRequest.h.

292  {
293  return source;
294  };

References source.

Referenced by copyRequest::listingRequest::compare().

Here is the caller graph for this function:

◆ getStatPrinter()

std::function< void(std::ostream &)> & copyRequest::base::getStatPrinter ( )
static

we can't have static functions virtual, so we explicitly use a function to give us the proper print function, which can be changed in derived classes.

Definition at line 1816 of file copyRequest.cpp.

1816  {
1817  return statPrinter;
1818 }

Referenced by printResults().

Here is the caller graph for this function:

◆ getSuffix() [1/2]

const std::string& copyRequest::base::getSuffix ( ) const
inline

Definition at line 421 of file copyRequest.h.

421  {
422  return (suffix);
423  }

References suffix.

Referenced by followRequestProvider::enqueueOrAppend().

Here is the caller graph for this function:

◆ getSuffix() [2/2]

void copyRequest::base::getSuffix ( const std::string &  path,
std::string &  suffix 
)
static

Definition at line 1432 of file copyRequest.cpp.

1433  {
1434  auto fileNameStart = path.find_last_of('/');
1435  if (fileNameStart == std::string::npos) {
1436  fileNameStart = 0;
1437  }
1438  auto const fileName = path.substr(fileNameStart);
1439  auto const suffixStart = fileName.find_last_of('.');
1440  if (suffixStart != decltype(fileName)::npos) {
1441  suffix = fileName.substr(suffixStart);
1442  }
1443 }

◆ hash_worker()

void copyRequest::base::hash_worker ( checksum::parallel sum,
blockQueue blocksToHash,
blockQueue hashedBlocks,
exceptionList exceptions 
)
private

Definition at line 720 of file copyRequest.cpp.

723  {
724  try {
725  while (auto b = blocksToHash.dequeue()) {
726  timerInst(hash);
727  if (b->isHole()) {
728  sum->update(b->size(), b->offset());
729  } else {
730  sum->update(b->bufferAt(0), b->size(), b->offset());
731  }
732  hashedBlocks.enqueue(b);
733  }
734  } catch (...) {
735  exceptions.add(std::current_exception());
736  }
737 }

References copyRequest::exceptionList::add(), waitQueues::simple< T >::dequeue(), waitQueues::simple< T >::enqueue(), timerInst, and checksum::parallel::update().

Referenced by copyRequest::base::hashCalculator::hashCalculator().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ hasher()

void copyRequest::base::hasher ( blockQueue blocksToHash,
blockQueue hashedBlocks,
exceptionList exceptions 
)
private

Definition at line 738 of file copyRequest.cpp.

740  {
741  try {
742  while (auto b = blocksToHash.dequeue()) {
743  timerInst(hash);
744  if (b->isHole()) {
745  for (auto sum : checkSums) {
746  sum->update(b->size());
747  }
748  } else {
749  for (auto sum : checkSums) {
750  sum->update(b->bufferAt(0), b->size());
751  }
752  }
753  hashedBlocks.enqueue(b);
754  }
755 
756  for (auto sum : checkSums) {
757  sum->finish();
758  }
759  } catch (...) {
760  exceptions.add(std::current_exception());
761  }
762 }

References copyRequest::exceptionList::add(), checkSums, waitQueues::simple< T >::dequeue(), waitQueues::simple< T >::enqueue(), and timerInst.

Referenced by copyRequest::base::hashCalculator::hashCalculator().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ init()

void copyRequest::base::init ( inputHandler::base InputHandler)
private

Definition at line 1445 of file copyRequest.cpp.

1445  {
1446  threadData = nullptr;
1447  tEnqueue = clock_type::now();
1448  {
1449  std::unique_lock<decltype(objectCountMutex)> lock(objectCountMutex);
1450  ++objectCount;
1451  }
1453 
1454  for (const auto& item : std::vector<std::pair<const std::string&,
1455  const decltype(sourcePrefixRegex)&>>({
1458 })) {
1459  if (item.second.fIsSet()) {
1460  std::smatch match;
1461  if (std::regex_match(item.first, match, item.second)) {
1462  if (match.size() == 2) { // we have a sub match as required
1463  prefix = match[1].str();
1464  }
1465  }
1466  }
1467  }
1468  // set this up before any return can happen, otherwise we can get empty lists..
1469  for (auto checksumCreator : checksumCreators) {
1470  checkSums.push_front(checksumCreator->create());
1471  }
1472 
1473  if (std::any_of(prohibitive_attrs.cbegin(), prohibitive_attrs.cend(),
1474  [this, InputHandler](decltype(prohibitive_attrs)::value_type attr) {
1475  return ! InputHandler->getXattr(source, attr).empty();
1476  })) {
1479  return;
1480  }
1481  if (readInitialStat == nullptr) {
1482  readInitialStat = InputHandler->getStat(source, false);
1483  }
1484  if (readInitialStat == nullptr) {
1487  }
1488  return;
1489  }
1490  if (readInitialStat->isLink()) {
1491  if (dereference) {
1492  readInitialStat = InputHandler->getStat(source, true);
1493  if (readInitialStat == nullptr) {
1494  throw std::runtime_error("can't dereference " + source);
1495  }
1496  } else if (state & stateBitType::fileToBeCopied) {
1498  }
1499  }
1500 
1502  return; // no directory test needed if we move
1503  }
1504 
1505  if (readInitialStat->isDir()) {
1508  } else {
1511  source, "init",
1512  "is a directory ", destination, " inotified at ", std::fixed, std::chrono::duration_cast<std::chrono::duration<double>>(tInotify.time_since_epoch()).count());
1513  //throw std::runtime_error(aSource + " is a directory, not expected here");
1514  }
1515  }
1516 }

References checksumCreators, dereference, errMsg::emit(), copyRequest::fileToBeCopied, copyRequest::fileToBeRemoved, copyRequest::fileToBeRenamed, pathHandler::getStat(), pathHandler::getXattr(), copyRequest::ignore, copyRequest::linkToBeMade, errMsg::notice, copyRequest::trucated, and copyRequest::vanished.

Referenced by base().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ keywordMap()

std::map< std::string, void(copyRequest::base::*)(std::string &) const > & copyRequest::base::keywordMap ( )
static

Definition at line 246 of file copyRequest.cpp.

246  {
247  static std::map<std::string, void (copyRequest::base::*)(std::string&) const> map;
248  return map;
249 }

Referenced by copyRequest::base::registerme::registerme().

Here is the caller graph for this function:

◆ makeSymLink()

bool copyRequest::base::makeSymLink ( inputHandler::base InputHandler,
outputHandler::base OutputHandler 
)
private

◆ printResults()

void copyRequest::base::printResults ( std::ostream &  hashStream,
std::ostream &  logStream 
)
virtual
Bug:
to little structure, unreadable code
Bug:
was follow before, ugly condition

Reimplemented in copyRequest::listingRequest, and copyRequest::checksumTest.

Definition at line 1618 of file copyRequest.cpp.

1619  {
1620  timerInst(all);
1622  if (!(state & stateBitType::append)
1623  && (state & stateBitType::done)
1624  && !continueOnError) {
1625  static auto escaper = escapism::newEscaper("C");
1626  for (const auto& sum : checkSums) {
1627  std::string name;
1628  escaper->escape(noCopy ? source : destination, name);
1629  if (checkSums.size() > 1) {
1630  hashStream << sum->getName() << ": ";
1631  }
1632  hashStream << sum->getResult() << " " << name << "\n";
1633  }
1634  }
1635 
1636  bool needSpace = false;
1637 
1638  if (!logFields.empty() && !(state & stateBitType::done)) {
1639  logStream << logstream::level::warning;
1640  }
1641  std::string operation;
1643  operation = "copy";
1644  } else if (state & stateBitType::linkToBeMade) {
1645  operation = "link";
1646  } else if (state & stateBitType::fileToBeRemoved) {
1647  operation = "remove";
1648  } else if (state & stateBitType::fileToBeRenamed) {
1649  operation = "move";
1650  }
1651  if (resultOutput && (state & stateBitType::done)) {
1652  timerInst(sendMsg);
1653  #ifdef WithJsonCpp
1654  static std::atomic<Json::UInt64> messageCounter(0);
1655  Json::Value root;
1656  root["msgnr"] = messageCounter++;
1657  root["operation"] = operation;
1658  root["path"] = destination;
1659  root["source"] = source;
1660  if (readInitialStat != nullptr) {
1661  root["size"] = static_cast<Json::Int64>(readInitialStat->size);
1662  }
1663  for (const auto& sum : checkSums) {
1664  root[sum->getName()] = sum->getResult();
1665  }
1666  #ifdef WithMagic
1667  if (doMagic) {
1668  root["fileType"] = fileMagic;
1669  }
1670  #endif
1671  root["finishTime"] = std::chrono::duration_cast<std::chrono::duration<double>>(tWorkDone.time_since_epoch()).count();
1672  root["inotifyTime"] = std::chrono::duration_cast<std::chrono::duration<double>>(tInotify.time_since_epoch()).count();
1673  root["retries"] = retries;
1674  if (!prefixJsonName.empty()) {
1675  root[prefixJsonName] = prefix.empty() ? statPrefix : prefix;
1676  }
1677  for (auto& item : jsonExtraFields) {
1678  root[item.first] = item.second;
1679  }
1680  static Json::FastWriter jsonWriter;
1681  resultOutput->send(jsonWriter.write(root), prefix);
1682  #else
1684  #endif
1685  }
1687  timerInst(errmsgEmit);
1689  source, operation, errorMessage);
1690  }
1691  timerInst(toLogStream);
1692  for (const auto& field : logFields) {
1693  std::string value(field);
1694  static auto escaper = escapism::newEscaper("Url");
1695  if (!expandAttrValue(value)) { // not expanded, try special ones
1696  if (value == "%source") {
1697  value = "'";
1698 
1701  value += moveSource;
1702  } else {
1703  value += source;
1704  }
1705 
1706  value.push_back('\'');
1707  } else if (value == "%destination") {
1708  value = "'";
1709  value += destination;
1710  value.push_back('\'');
1711  } else if (value == "%urlsource") {
1712  value.clear();
1713 
1716  escaper->escape(moveSource, value);
1717  } else {
1718  escaper->escape(source, value);
1719  }
1720  } else if (value == "%urldestination") {
1721  value.clear();
1722  escaper->escape(destination, value);
1723  } else if (value == "%retries") {
1724  value = std::to_string(retries);
1725  } else if (value == "%error") {
1726  value = errorMessage;
1727 
1730  value += " vanished before action started";
1731  }
1732  } else if (value == "%op") {
1734  value = "->";
1735  } else if (state & stateBitType::linkToBeMade) {
1736  value = "ln";
1737  } else if (state & stateBitType::fileToBeRemoved) {
1738  value = "rm";
1739  } else if (state & stateBitType::fileToBeRenamed) {
1740  value = "mv";
1741  }
1742  }
1743  }
1744 
1745  if (needSpace) {
1746  logStream << " ";
1747  }
1748 
1749  logStream << value;
1750  needSpace = true;
1751  }
1752 
1753  if (needSpace) {
1754  logStream << "\n";
1755  }
1756 
1757 
1758  if (verbose) {
1759  logStream << "'" << source << "' ";
1760 
1762  logStream << " -> ";
1763  }
1764 
1766  logStream << " link ";
1767  }
1768 
1770  logStream << " rm ";
1771  }
1772 
1774  logStream << " mv ";
1775  }
1776 
1777  if (state & stateBitType::vanished) {
1778  logStream << " vanished before action started";
1779  }
1780 
1781  if (state & stateBitType::append) {
1782  logStream << " was appended to";
1783  }
1784 
1785  if (state & stateBitType::done) {
1786  logStream << " '" << destination << "' after " << retries << " attempts";
1787  }
1788 
1789  if (state & stateBitType::ignore) {
1790  logStream << " is ignored";
1791  }
1792 
1793  if (state & stateBitType::trucated) {
1794  logStream << " due to truncation";
1795  }
1796 
1797  if (state & stateBitType::failed) {
1798  logStream << " failed due to " << errorMessage;
1799  }
1800 
1801  if (state & stateBitType::inWork) {
1802  logStream << " inWork not cleared";
1803  }
1804 
1806  logStream << " attribute mismatch: " << errorMessage;
1807  }
1808 
1809  logStream << "\n";
1810  }
1811 }

References copyRequest::append, copyRequest::attributeMismatch, continueOnError, copyRequest::done, errMsg::emit(), copyRequest::failed, copyRequest::fileToBeCopied, copyRequest::fileToBeRemoved, copyRequest::fileToBeRenamed, copyRequest::ignore, errMsg::info, copyRequest::inWork, copyRequest::linkToBeMade, escapism::newEscaper(), noCopy, resultOutput, messageQueue::queue::send(), statPrefix, timerInst, copyRequest::trucated, copyRequest::vanished, verbose, and logstream::warning.

Here is the call graph for this function:

◆ printStats()

void copyRequest::base::printStats ( std::ostream &  stream)
static

Definition at line 1822 of file copyRequest.cpp.

1822  {
1823  stream << statPrefix << tTotal2Stat << "\n";
1824  stream << statPrefix << tTotalStat << "\n";
1825  stream << statPrefix << tInotifyStat << "\n";
1826  stream << statPrefix << tPipeStat << "\n";
1827  stream << statPrefix << tWaitStat << "\n";
1828  stream << statPrefix << tCopyStat << "\n";
1829  stream << statPrefix << tEnqueueStat << "\n";
1830  stream << statPrefix << bytesStat << "\n";
1831  stream << statPrefix << speedStat << "\n";
1832  stream << statPrefix << bytesStat.getN() << " files with " << bytesStat.sum << "B transferred\n";
1833 
1834  auto tElapsed = std::chrono::duration_cast<std::chrono::duration<double>>(clock_type::now() - tProgramStart).count();
1835  stream << statPrefix << "avg rate: " << bytesStat.sum / (tElapsed * 1024 * 1024) << " MiB/s in " << tElapsed << " s\n";
1836  for (unsigned int i = 0; i < processMultiplicities->size(); i++) {
1837  stream << statPrefix << i << " processes at once: "
1838  << (*processMultiplicities)[i] << " times\n";
1839  }
1840 
1841  std::multimap<clock_type::duration, std::string> waitMap;
1842  for (const auto& item : advisoryWaitMap) {
1843  waitMap.emplace(item.second, item.first);
1844  }
1845  for (const auto& item : waitMap) {
1846  stream << statPrefix << " wait time for " << item.second << " files: "
1847  << std::chrono::duration_cast<std::chrono::duration<double>>(item.first).count() << "s\n";
1848  }
1850 };

References timer::anchor::print(), and statPrefix.

Referenced by quietStatPrinter().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ process()

void copyRequest::base::process ( perThreadData threadData)
virtual

Reimplemented in copyRequest::listingRequest.

Definition at line 1109 of file copyRequest.cpp.

1109  {
1111  scoped::generic<decltype(*this)> concurrencyCount(*this,
1112  [](decltype(*this)& ) {
1114  });
1115 
1116  tWorkStart = clock_type::now();
1118  [](decltype(tWorkDone)& workDone) {
1119  workDone = clock_type::now();
1120  });
1121 
1123  scoped::generic<decltype(state)> stateReset(state,
1124  [](decltype(state)& State) {
1125  State.clear(stateBitType::inWork);
1126  });
1127 
1128  threadData = &aThreadData;
1129  scoped::generic<decltype(threadData)> unsetThreadData(threadData,
1130  [](decltype(threadData)& ThreadData) {
1131  ThreadData = nullptr;
1132  });
1133 
1134  errorMessage.clear(); // forget messages from previous attempts
1135  retries++;
1136 
1137  // do this before check for noAction needed to not be fooled by vanished bit
1140  return; // no more things to do for stuff to remove
1141  }
1142 
1144  if (state & stateBitType::ignore) {
1145  errorMessage = "ignored";
1146  } else if (state & stateBitType::vanished) {
1147  errorMessage = "vanished before action";
1148  } else {
1149  errorMessage = "oops";
1150  }
1151  return;
1152  }
1153 
1154  try {
1155  if (!noCopy) {
1157  }
1160  if (state & stateBitType::vanished) {
1161  errorMessage = "vanished before action";
1162  } else {
1164  }
1165  return;
1166  }
1168  }
1169 
1172  switch (result) {
1174  if (state & stateBitType::ignore) {
1175  errorMessage = "directory move ignored";
1176  }
1177  return;
1179  if (threadData->InputHandler->getStat(origSource) == nullptr) {
1180  stateType dummyState;
1182  moveSource, "remove obsolete mv source",
1183  "can't find source '", origSource, "'");
1184  threadData->OutputHandler->remove(moveSource, dummyState);
1185  } else {
1187  moveSource, "rename",
1188  "littering with obsolete mv source");
1189  }
1190  }
1191 #if __GNUC__ >= 7
1192  __attribute__ ((fallthrough));
1193 #endif
1195  // file was probably not copied yet, try to copy it from the move target
1197  break;
1198  default:
1199  throw std::runtime_error("impossible rename retval");
1200  }
1201  }
1202  } catch (const fatalException&) { // always throw, never return
1203  throw;
1204  } catch (const std::exception& e) {
1205  errorMessage = e.what();
1207  if (continueOnError) {
1208  return;
1209  } else {
1210  throw;
1211  }
1212  }
1213 
1214  try { // try scope for input file setup/cleanup
1215  if (readInitialStat == nullptr ||// may happen with failed rename events
1216  retries > 1 || // for second or worse attempts always refresh stat
1217  clock_type::now() - tEnqueue > std::chrono::seconds(1) ||
1218  (! readInitialStat->isRegularFile())) {
1220  if (readInitialStat == nullptr) {
1222  errorMessage = "vanished in action";
1223  return;
1224  }
1225  }
1226  if (! readInitialStat->isRegularFile()) {
1228  errorMessage = "ignored non regular file";
1229  return;
1230  }
1231 
1232  if (ignoreExisting) { // check if copy exists and has same stat values
1233  auto destStat = threadData->OutputHandler->getStat(destination);
1234  if (destStat) { // destination exists, check stat values
1235  if (readInitialStat->size == destStat->size &&
1236  readInitialStat->isSameMtimeAs(*destStat)) {
1238  errorMessage = "ignored due to existing copy with same stat";
1240  if ((preserve.mode && readInitialStat->mode == destStat->mode)
1241  || (preserve.ownership && readInitialStat->ownerUid == destStat->ownerUid
1242  && readInitialStat->ownerGid == destStat->ownerGid)) {
1243  errorMessage += ", preserved attrs are already fine";
1244  } else {
1246  errorMessage += ", attributes updated";
1247  }
1248  }
1249  return;
1250  }
1251  }
1252  }
1253 
1255  std::unique_ptr<ioHandle::attrDataType> attrData;
1256  if (preserve.attrs) {
1257  attrData = input->getAttrData(threadData->OutputHandler);
1258  }
1259  std::unique_ptr<acl::list> aclData;
1260  if (preserve.acls) {
1261  aclData = input->getAclData();
1262  }
1263 
1264  attrset(*input, process_source_attrs);
1265  attrset(*input, start_source_attrs);
1266  try {
1267  std::string destNameDuringWrite(destination);
1268  std::string backupFile;
1269  switch (backupMode) {
1270  case backupModeType::none:
1271  break;
1273  case backupModeType::during: {
1274  backupFile = destination + getBackupSuffix();
1276  break;
1277  }
1278  case backupModeType::after: {
1279  if (!backupSuffix.empty()) {
1280  destNameDuringWrite += getBackupSuffix();
1281  }
1282  break;
1283  }
1284  case backupModeType::remove: {
1285  stateType dummy;
1286  threadData->OutputHandler->remove(destNameDuringWrite, dummy);
1287  break;
1288  }
1289  default:
1290  throw std::runtime_error("illegal backupModeType");
1291  }
1292  {
1293  // scope for writeHandle
1294  std::unique_ptr<outputHandler::base::writer> writeHandle;
1295  if (backupSuffix.empty() && backupMode == backupModeType::after) {
1296  writeHandle = threadData->OutputHandler->newTmpWriter(destNameDuringWrite,
1297  readInitialStat->size,
1298  noCopy,
1299  std::move(attrData),
1300  std::move(aclData));
1301  } else {
1302  writeHandle = threadData->OutputHandler->newWriter(destNameDuringWrite,
1303  (appendableFiles.fIsSet() &&
1304  regex_match(destination, appendableFiles)) ||
1305  std::any_of(append_attrs.cbegin(), append_attrs.cend(),
1306  [this](decltype(append_attrs)::value_type attr) {
1307  return ! threadData->InputHandler->getXattr(source, attr).empty();
1308  }),
1309  readInitialStat->size,
1310  input->getBlockSize(),
1311  state,
1312  noCopy,
1313  std::move(attrData),
1314  std::move(aclData));
1315  }
1316  doBlockSizeSetup(*input, *writeHandle);
1317 
1318  input->setupSparseRegions(sparse);
1319 
1320  if (state & stateBitType::append) { // we want to append
1321  auto startPosition = writeHandle->getSize();
1322  // align start position to a disk block
1323  startPosition = (startPosition / input->getBlockSize()) * input->getBlockSize();
1324  writeHandle->seek(startPosition);
1325  input->seek(startPosition);
1326  }
1327 
1328  if (perFileThreads
1329  && readInitialStat->size / memoryBlockSize > 1 // no point in threading small files
1330  ) {
1331  doThreadedCopy(*input, writeHandle);
1332  } else {
1333  doUnthreadedCopy(*input, writeHandle);
1334  }
1335 
1336  input->checkUnchangedness();
1337 
1339 
1340  if (!noCopy) {
1341  if (syncWrittenFiles) {
1342  writeHandle->sync();
1343  }
1344  if (!setAttributesAfterClose) {
1346  }
1347  attrset(*writeHandle, success_dest_attrs);
1348  }
1349  // writeHandle is deleted here when the unique_ptr goes out of scope
1350  } // scope for writeHandle
1351 
1352  // special workaround for dCache that sets mtime on close()
1353  if (setAttributesAfterClose && !noCopy) {
1355  }
1356  switch (backupMode) {
1357  case backupModeType::during: {
1358  stateType dummy;
1359  threadData->OutputHandler->remove(backupFile, dummy);
1360  break;
1361  }
1362  case backupModeType::after:
1363  threadData->OutputHandler->renameSimple(destNameDuringWrite, destination);
1364  default:
1365  break;
1366  }
1367  } catch (const delayAdvisingError& e) {
1368  adviseDelay();
1369  errorMessage = e.what();
1370  attrset(*input, failure_source_attrs); // mark on input file
1371  throw; // continue in outer try/catch
1372  } catch (const std::exception& e) {
1373  errorMessage = e.what();
1374  attrset(*input, failure_source_attrs); // mark on input file
1375  throw; // continue in outer try/catch
1376  }
1377  attrdel(*input, process_source_attrs);
1378  attrdel(*input, failure_source_attrs); // we succeeded, remove old failure marks
1379  attrset(*input, success_source_attrs);
1380  } catch (const fatalException&) { // always throw, never return
1381  throw;
1382  } catch (const std::exception& e) { // no attribute massage for input, we don't have that
1383  errorMessage = e.what();
1385  if (continueOnError) {
1386  return;
1387  } else {
1388  throw;
1389  }
1390  }
1391  // check if earliest process time changed
1392  if (tWorkStart < filesInWorkIterator->second.getEarliestprocessTime()) {
1393  adviseDelay(); // try to avoid this next time
1394  errorMessage = "changed earliest process time, ";
1395  auto deltaT = filesInWorkIterator->second.getEarliestprocessTime() - tWorkStart;
1396  errorMessage += std::to_string(std::chrono::duration_cast<std::chrono::duration<double>>(deltaT).count());
1398  return;
1399  }
1401 }

References copyRequest::after, copyRequest::append, copyRequest::before, continueOnError, outputHandler::base::writer::doAttributePreservations(), copyRequest::done, copyRequest::during, errMsg::emit(), copyRequest::failed, outputHandler::base::fileChanged, copyRequest::fileToBeCopied, copyRequest::fileToBeRemoved, copyRequest::fileToBeRenamed, outputHandler::base::fileVanished, outputHandler::base::writer::getSize(), copyRequest::ignore, copyRequest::inWork, copyRequest::linkToBeMade, nConcurrentProcesses, copyRequest::noActionNeeded, noCopy, copyRequest::none, errMsg::notice, outputHandler::base::ok, preserve, copyRequest::remove, outputHandler::base::writer::seek(), outputHandler::base::writer::sync(), and copyRequest::vanished.

Here is the call graph for this function:

◆ processQueue()

void copyRequest::base::processQueue ( simpleQueue queue,
simpleQueue resultQueue,
timedQueue delayedRequests 
)
static

Definition at line 1895 of file copyRequest.cpp.

1897  {
1898  try {
1899  perThreadData threadData;
1900 
1901  while (auto request = queue.dequeue()) {
1903  break;
1904  }
1905  while (request) {
1906  auto dt = request->filesInWorkIterator->second.getEarliestprocessTime()
1907  - clock_type::now();
1908  if (dt > decltype(dt)::zero()) {
1910  request->source, "delay",
1911  "scheduled too early by ", std::chrono::duration_cast<std::chrono::duration<double>>(dt).count(), "s");
1912  auto newDelay = request->filesInWorkIterator->second.getEarliestprocessTime() - request->tInotify;
1913  request->adviseDelay(newDelay);
1914  delayedRequests.enqueue(request, dt);
1915  if (request != nullptr) {
1916  throw std::runtime_error("request not nullptr");
1917  }
1918  break;
1919  }
1920 
1921 
1923  request->process(threadData);
1924  if (! retry(request, delayedRequests)) {
1925  copyRequest::fileInWork::eraser eraser(*request);
1926  resultQueue.enqueue(request);
1927  if (request != nullptr) {
1928  throw std::runtime_error("request not nullptr");
1929  }
1930  request = eraser.getNext();
1931  }
1932  }
1933  }
1934  } catch (const std::exception& e) {
1935  stopRequest::RequestStop(e.what());
1937  "--", "process", "stop requested due to ", e.what());
1938  } catch (...) {
1939  stopRequest::RequestStop("unknown exception");
1941  "--", "process", "stop requested due to unknown exception");
1942  }
1943 }

References copyRequest::action, errMsg::debug, waitQueues::simple< T >::dequeue(), errMsg::emit(), waitQueues::simple< T >::enqueue(), waitQueues::timed< T, clock_type >::enqueue(), fileRateLimit, copyRequest::fileInWork::eraser::getNext(), stopRequest::processLoop, stopRequest::Requested(), and stopRequest::RequestStop().

Here is the call graph for this function:

◆ reader()

void copyRequest::base::reader ( inputHandler::base::reader input,
blockQueue freeBlocks,
blockQueue readBlocks,
bool  mayParallelize,
exceptionList exceptions 
)
private

Definition at line 530 of file copyRequest.cpp.

534  {
535  try {
536  unsigned nBlocks = readInitialStat->size / memoryBlockSize;
537  if (mayParallelize
538  && input.parallelizable()
539  && (nConcurrentProcesses < nThreads / nReadThreads + 1 // don't load loaded systems
541  && nReadThreads > 1
542  && nBlocks >= nReadThreads) {
543  blockReadRequestQueue readRequests;
544  for (size_t o = 0; o < readInitialStat->size; o += memoryBlockSize) {
545  auto s = readInitialStat->size - o;
546  s = std::min(s, memoryBlockSize);
547  readRequests.emplace(s, o);
548  }
549  readRequests.signalDone();
550  std::vector<std::thread> workers;
551  {
552  timerInst(readThreadSetup);
553  workers.resize(nReadThreads);
554  for (auto& worker : workers) {
555  worker = std::thread(&copyRequest::base::readWorker, this,
556  std::ref(input),
557  std::ref(readRequests),
558  std::ref(freeBlocks),
559  std::ref(readBlocks),
560  std::ref(exceptions));
561  }
562  }
563  for (auto& worker : workers) {
564  worker.join();
565  }
566  } else {
567  while (true) {
568  auto b = freeBlocks.dequeue(readBlocks.size() < 3, memoryBlockSize);
569  if (b == nullptr) {
570  throw std::runtime_error("got no free block in reader");
571  }
572  auto lastblock = input.readBlock(*b);
573 
574  if (b->empty()) { // we are done, recycle the block in the free block list
575  freeBlocks.enqueue(b);
576  } else {
577  readBlocks.enqueue(b);
578  }
579 
580  if (lastblock) {
581  break;
582  }
583  }
584  }
585  } catch (...) {
586  exceptions.add(std::current_exception());
587  }
588  readBlocks.signalDone();
589 }

References copyRequest::exceptionList::add(), waitQueues::simple< T >::dequeue(), waitQueues::simple< T >::emplace(), waitQueues::simple< T >::enqueue(), nThreads, ioHandle::parallelizable(), inputHandler::base::reader::readBlock(), readWorker(), waitQueues::simple< T >::signalDone(), waitQueues::simple< T >::size(), and timerInst.

Here is the call graph for this function:

◆ readWorker()

void copyRequest::base::readWorker ( inputHandler::base::reader input,
blockReadRequestQueue blockRequests,
blockQueue freeBlocks,
blockQueue readBlocks,
exceptionList exceptions 
)
private

Definition at line 510 of file copyRequest.cpp.

514  {
515  try {
516  while (auto request = blockRequests.dequeue()) {
517  auto b = freeBlocks.dequeue(readBlocks.size() < 3, memoryBlockSize);
518  if (b == nullptr) {
519  throw std::runtime_error("got no free block in readWorker");
520  }
521  input.readBlockP(*b, request->size, request->offset);
522  readBlocks.enqueue(b);
523  }
524  } catch (...) {
525  exceptions.add(std::current_exception());
526  }
527 }

References copyRequest::exceptionList::add(), waitQueues::simple< T >::dequeue(), waitQueues::simple< T >::enqueue(), inputHandler::base::reader::readBlockP(), and waitQueues::simple< T >::size().

Referenced by reader().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ registerIterator()

void copyRequest::base::registerIterator ( decltype(filesInWorkIterator iter)
inline

Definition at line 342 of file copyRequest.h.

342  {
343  filesInWorkIterator = iter;
344  }

References filesInWorkIterator.

Referenced by copyRequest::fileInWork::inserter::enRegister(), and copyRequest::fileInWork::inserter::markForQueueing().

Here is the caller graph for this function:

◆ removeFileOrDirectory()

void copyRequest::base::removeFileOrDirectory ( inputHandler::base InputHandler,
outputHandler::base OutputHandler 
)
private

Definition at line 914 of file copyRequest.cpp.

915  {
916  if (InputHandler->pathExists(source)) {// source really vanished
917  errorMessage = "source " + source + " still exists, not deleting " + destination;
919  } else {
920  OutputHandler->remove(destination, state);
922  errorMessage = "vanished before processing";
923  } else if (state & stateBitType::failed) {
924  errorMessage = "directory not empty";
925  }
927  }
928 }

References copyRequest::done, copyRequest::failed, pathHandler::pathExists(), outputHandler::base::remove(), and copyRequest::vanished.

Here is the call graph for this function:

◆ resetStats()

void copyRequest::base::resetStats ( )
static

Definition at line 1852 of file copyRequest.cpp.

1852  {
1853  tTotal2Stat.reset();
1854  tTotalStat.reset();
1855  tInotifyStat.reset();
1856  tPipeStat.reset();
1857  tWaitStat.reset();
1858  tCopyStat.reset();
1859  tEnqueueStat.reset();
1860  bytesStat.reset();
1861  speedStat.reset();
1862  bytesStat.reset();
1864 }

References timer::anchor::reset().

Referenced by printResults().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ retry()

bool copyRequest::base::retry ( std::unique_ptr< base > &  request,
timedQueue delayedRequests 
)
static

Definition at line 1866 of file copyRequest.cpp.

1867  {
1868  if ((request->state & stateBitType::failed) &&
1869  !(request->state & stateBitType::fileToBeRemoved) &&
1870  (request->retries < maxRetries)) {
1871  static ssize_t lastSize;
1872  ssize_t size;
1873  {
1874  std::ifstream statm("/proc/self/statm");
1875  statm >> size;
1876  }
1877  lastSize = size;
1878  request->state.clear(stateBitType::failed);
1879  auto waitTime = request->tWorkDone - request->tWorkStart + std::chrono::seconds(1);
1880  waitTime *= request->retries;
1882  request->source, "re-sched",
1883  request->errorMessage,
1884  ", delaying by ", std::fixed, std::chrono::duration_cast<std::chrono::duration<double>>(waitTime).count(),
1885  "s vmsize: ", size, "(", lastSize, ", ", size - lastSize, "), retries: ", request->retries);
1886  delayedRequests.enqueue(request, waitTime);
1887  if (request != nullptr) {
1888  throw std::runtime_error("requesy not null");
1889  }
1890  return true;
1891  }
1892  return false;
1893 }

References enumAsBitmask< T >::clear(), errMsg::debug, errMsg::emit(), waitQueues::timed< T, clock_type >::enqueue(), errorMessage, copyRequest::failed, copyRequest::fileToBeRemoved, retries, source, state, tWorkDone, and tWorkStart.

Here is the call graph for this function:

◆ setStatPrinter()

void copyRequest::base::setStatPrinter ( void(*)(std::ostream &)  f)
static

Definition at line 1819 of file copyRequest.cpp.

1819  {
1820  statPrinter = f;
1821 }

References f().

Referenced by policyRunRequestProvider::processSource().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ waitForAllInstancesGone()

void copyRequest::base::waitForAllInstancesGone ( )
static

Definition at line 1421 of file copyRequest.cpp.

1421  {
1422  std::unique_lock<decltype(objectCountMutex)> lock(objectCountMutex);
1423  while (objectCount > 0) {
1424  objectCountCondVar.wait(lock);
1425  }
1426 }

◆ writer()

void copyRequest::base::writer ( std::unique_ptr< outputHandler::base::writer > &  writeHandle,
blockQueue blocksToWrite,
blockQueue writtenBlocks,
exceptionList exceptions 
)
private

Definition at line 592 of file copyRequest.cpp.

595  {
596  try {
597  size_t lastOffset = 0;
598  while (auto b = blocksToWrite.dequeue()) {
599  if (b->offset() < lastOffset) {
600  throw std::logic_error("blocks not in order");
601  }
602  writeHandle->writeBlock(*b);
603  lastOffset = b->offset();
604  writtenBlocks.enqueue(b);
605  }
606  } catch (...) {
607  exceptions.add(std::current_exception());
608  }
609 }

References copyRequest::exceptionList::add(), waitQueues::simple< T >::dequeue(), waitQueues::simple< T >::enqueue(), and outputHandler::base::writer::writeBlock().

Referenced by copyRequest::base::writeActor::writeActor().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ writeWorker()

void copyRequest::base::writeWorker ( std::unique_ptr< outputHandler::base::writer > &  writeHandle,
blockQueue blocksToWrite,
blockQueue writtenBlocks,
exceptionList exceptions 
)
private

Definition at line 610 of file copyRequest.cpp.

613  {
614  try {
615  while (auto b = blocksToWrite.dequeue()) {
616  writeHandle->writeBlockP(*b);
617  writtenBlocks.enqueue(b);
618  }
619  } catch (...) {
620  exceptions.add(std::current_exception());
621  }
622 }

References copyRequest::exceptionList::add(), waitQueues::simple< T >::dequeue(), waitQueues::simple< T >::enqueue(), and outputHandler::base::writer::writeBlockP().

Referenced by copyRequest::base::writeActor::writeActor().

Here is the call graph for this function:
Here is the caller graph for this function:

Friends And Related Function Documentation

◆ fileInWork

friend class fileInWork
friend

Definition at line 100 of file copyRequest.h.

◆ operator<<

std::ostream& operator<< ( std::ostream &  out,
const base request 
)
friend

Definition at line 1952 of file copyRequest.cpp.

1952  {
1953  out << std::fixed;
1954  out << " request " << static_cast<const void*>(&request);
1955  out << " for '" << request.source << "' to '" << request.destination << "'";
1956  out << " state " << std::hex << static_cast<unsigned>(request.state) << std::dec;
1957  out << " tInotify " << request.tInotify;
1958  out << " tEarliest " << request.filesInWorkIterator->second.getEarliestprocessTime();
1959  out << " now: " << clock_type::now() << "\n";
1960  return out;
1961  }

Member Data Documentation

◆ advisoryWaitMap

std::map<std::string, clock_type::duration> copyRequest::base::advisoryWaitMap
staticprivate

Definition at line 336 of file copyRequest.h.

◆ advisoryWaitMapMutex

std::mutex copyRequest::base::advisoryWaitMapMutex
staticprivate

Definition at line 337 of file copyRequest.h.

◆ append_attrs

options::container<std::string> copyRequest::base::append_attrs
staticprivate

Definition at line 112 of file copyRequest.h.

◆ appendableFiles

options::single<std::regex> copyRequest::base::appendableFiles
staticprivate

Definition at line 113 of file copyRequest.h.

◆ backupMode

backupModeType copyRequest::base::backupMode
staticprivate

Definition at line 120 of file copyRequest.h.

◆ backupModeName

options::withAction<options::single<std::string> > copyRequest::base::backupModeName
staticprivate

Definition at line 119 of file copyRequest.h.

◆ backupSuffix

options::single<std::string> copyRequest::base::backupSuffix
staticprivate

Definition at line 121 of file copyRequest.h.

◆ bytesStat

statCollector::typed< decltype(readInitialStat->size) > copyRequest::base::bytesStat
staticprotected

Definition at line 314 of file copyRequest.h.

◆ changeAttrsOnIgnoreExisting

options::single<bool> copyRequest::base::changeAttrsOnIgnoreExisting
staticprivate

Definition at line 115 of file copyRequest.h.

◆ check_source_attrs

options::map<std::string> copyRequest::base::check_source_attrs
staticprivate

Definition at line 109 of file copyRequest.h.

◆ checkSums

std::list<checksum::base*> copyRequest::base::checkSums
protected

◆ destination

const std::string copyRequest::base::destination
protected

Definition at line 285 of file copyRequest.h.

Referenced by copyRequest::operator<<().

◆ destinationPrefixRegex

options::single<std::regex> copyRequest::base::destinationPrefixRegex
staticprivate

Definition at line 133 of file copyRequest.h.

◆ errorMessage

std::string copyRequest::base::errorMessage
protected

Definition at line 283 of file copyRequest.h.

Referenced by retry().

◆ failure_source_attrs

options::map<std::string> copyRequest::base::failure_source_attrs
staticprivate

Definition at line 106 of file copyRequest.h.

◆ filesInWorkIterator

decltype(fileInWork::filesInWork.begin()) copyRequest::base::filesInWorkIterator = fileInWork::filesInWork.end()

◆ forceParallelRead

options::single<bool> copyRequest::base::forceParallelRead
staticprivate

Definition at line 129 of file copyRequest.h.

◆ ignoreExisting

options::single<bool> copyRequest::base::ignoreExisting
staticprivate

Definition at line 114 of file copyRequest.h.

◆ linkBaseMap

options::map<std::string> copyRequest::base::linkBaseMap
staticprivate

Definition at line 117 of file copyRequest.h.

◆ logFields

options::container<std::string> copyRequest::base::logFields
staticprivate

Definition at line 131 of file copyRequest.h.

◆ mapEntry

const singleMap& copyRequest::base::mapEntry
protected

Definition at line 288 of file copyRequest.h.

◆ maxDelayTime

options::single<double> copyRequest::base::maxDelayTime
staticprotected

Definition at line 328 of file copyRequest.h.

◆ maxMemoryBlockSize

options::single<size_t> copyRequest::base::maxMemoryBlockSize
static

Definition at line 297 of file copyRequest.h.

Referenced by doBlockSizeSetup().

◆ maxRetries

options::single<unsigned> copyRequest::base::maxRetries
staticprivate

Definition at line 116 of file copyRequest.h.

◆ memoryBlockSize

size_t copyRequest::base::memoryBlockSize
protected

◆ minMemoryBlockSize

options::single<size_t> copyRequest::base::minMemoryBlockSize
static

Definition at line 298 of file copyRequest.h.

Referenced by doBlockSizeSetup().

◆ moveSource

const std::string copyRequest::base::moveSource
protected

Definition at line 286 of file copyRequest.h.

◆ nConcurrentProcesses

std::atomic<unsigned int> copyRequest::base::nConcurrentProcesses
staticprotected

Definition at line 318 of file copyRequest.h.

Referenced by process(), and copyRequest::base::writeActor::writeActor().

◆ nReadThreads

options::single<unsigned> copyRequest::base::nReadThreads
staticprivate

Definition at line 128 of file copyRequest.h.

◆ nSumThreads

options::single<unsigned> copyRequest::base::nSumThreads
staticprivate

Definition at line 127 of file copyRequest.h.

Referenced by copyRequest::base::hashCalculator::hashCalculator().

◆ nWriteThreads

options::single<unsigned> copyRequest::base::nWriteThreads
staticprivate

Definition at line 130 of file copyRequest.h.

Referenced by copyRequest::base::writeActor::writeActor().

◆ objectCount

unsigned copyRequest::base::objectCount
staticprivate

Definition at line 371 of file copyRequest.h.

◆ objectCountCondVar

std::condition_variable copyRequest::base::objectCountCondVar
staticprivate

Definition at line 372 of file copyRequest.h.

◆ objectCountMutex

std::mutex copyRequest::base::objectCountMutex
staticprivate

Definition at line 370 of file copyRequest.h.

◆ origSource

const std::string copyRequest::base::origSource
protected

Definition at line 287 of file copyRequest.h.

◆ perFileThreads

options::single<bool> copyRequest::base::perFileThreads
static

Definition at line 125 of file copyRequest.h.

◆ prefix

std::string copyRequest::base::prefix
protected

Definition at line 290 of file copyRequest.h.

◆ process_source_attrs

options::map<std::string> copyRequest::base::process_source_attrs
staticprivate

Definition at line 107 of file copyRequest.h.

◆ processMultiplicities

std::vector<std::atomic<unsigned int> >* copyRequest::base::processMultiplicities
static

Definition at line 321 of file copyRequest.h.

◆ prohibitive_attrs

options::container<std::string> copyRequest::base::prohibitive_attrs
staticprivate

Definition at line 111 of file copyRequest.h.

◆ readInitialStat

std::unique_ptr<const genericStat> copyRequest::base::readInitialStat
protected

◆ retries

unsigned copyRequest::base::retries
protected

Definition at line 323 of file copyRequest.h.

Referenced by retry().

◆ setAttributesAfterClose

options::single<bool> copyRequest::base::setAttributesAfterClose
static

Definition at line 124 of file copyRequest.h.

Referenced by outputHandler::dcap::dcap().

◆ source

const std::string copyRequest::base::source
protected

◆ sourcePrefixRegex

options::single<std::regex> copyRequest::base::sourcePrefixRegex
staticprivate

Definition at line 132 of file copyRequest.h.

◆ sparse

options::single<std::string> copyRequest::base::sparse
staticprivate

Definition at line 118 of file copyRequest.h.

◆ speedStat

statCollector::typed<double> copyRequest::base::speedStat
staticprotected

Definition at line 315 of file copyRequest.h.

◆ start_source_attrs

options::map<std::string> copyRequest::base::start_source_attrs
staticprivate

Definition at line 108 of file copyRequest.h.

◆ state

stateType copyRequest::base::state
protected

◆ statPrinter

std::function<void(std::ostream&)> copyRequest::base::statPrinter
staticprotected

◆ success_dest_attrs

options::map<std::string> copyRequest::base::success_dest_attrs
staticprivate

Definition at line 110 of file copyRequest.h.

◆ success_source_attrs

options::map<std::string> copyRequest::base::success_source_attrs
staticprivate

Definition at line 105 of file copyRequest.h.

◆ suffix

std::string copyRequest::base::suffix
protected

Definition at line 289 of file copyRequest.h.

Referenced by getSuffix().

◆ syncWrittenFiles

options::single<bool> copyRequest::base::syncWrittenFiles
staticprivate

Definition at line 122 of file copyRequest.h.

◆ tCopyStat

statCollector::typed< decltype(tWorkDone - tWorkStart) > copyRequest::base::tCopyStat
staticprotected

Definition at line 313 of file copyRequest.h.

◆ tEnqueue

clock_type::time_point copyRequest::base::tEnqueue
protected

Definition at line 304 of file copyRequest.h.

◆ tEnqueueStat

statCollector::typed< copyRequest::clock_type::duration > copyRequest::base::tEnqueueStat
static

Definition at line 320 of file copyRequest.h.

Referenced by followInotifyWatchRequestProvider::followStream().

◆ threadData

perThreadData* copyRequest::base::threadData
protected

Definition at line 281 of file copyRequest.h.

◆ tInotify

clock_type::time_point copyRequest::base::tInotify
protected

Definition at line 303 of file copyRequest.h.

Referenced by copyRequest::operator<<().

◆ tInotifyStat

statCollector::typed< decltype(tWorkDone - tInotify) > copyRequest::base::tInotifyStat
staticprotected

Definition at line 310 of file copyRequest.h.

◆ tPipeStat

statCollector::typed< decltype(tEnqueue - tInotify) > copyRequest::base::tPipeStat
staticprotected

Definition at line 311 of file copyRequest.h.

◆ tProgramStart

clock_type::time_point copyRequest::base::tProgramStart
staticprotected

Definition at line 316 of file copyRequest.h.

◆ tTotal2Stat

statCollector::typed< decltype(tWorkDone - tInotify) > copyRequest::base::tTotal2Stat
staticprotected

Definition at line 309 of file copyRequest.h.

◆ tTotalStat

statCollector::typed< decltype(tWorkDone - tInotify) > copyRequest::base::tTotalStat
staticprotected

Definition at line 308 of file copyRequest.h.

◆ tWaitStat

statCollector::typed< decltype(tWorkStart - tEnqueue) > copyRequest::base::tWaitStat
staticprotected

Definition at line 312 of file copyRequest.h.

◆ tWorkDone

clock_type::time_point copyRequest::base::tWorkDone
protected

Definition at line 306 of file copyRequest.h.

Referenced by retry().

◆ tWorkStart

clock_type::time_point copyRequest::base::tWorkStart
protected

Definition at line 305 of file copyRequest.h.

Referenced by retry().

◆ userReadBlockSize

options::single<size_t> copyRequest::base::userReadBlockSize
static

Definition at line 296 of file copyRequest.h.

Referenced by doBlockSizeSetup().

◆ userWriteBlockSize

options::single<size_t> copyRequest::base::userWriteBlockSize
static

Definition at line 294 of file copyRequest.h.

Referenced by doBlockSizeSetup().


The documentation for this class was generated from the following files:
copyRequest::base::userWriteBlockSize
static options::single< size_t > userWriteBlockSize
Definition: copyRequest.h:294
copyRequest::fileInWork::filesInWork
static std::map< std::string, fileInWork > filesInWork
maps file names to filesInWork objects
Definition: fileInWork.h:30
copyRequest::base::tWaitStat
static statCollector::typed< decltype(tWorkStart - tEnqueue) > tWaitStat
Definition: copyRequest.h:312
copyRequest::fileInWork::slotTypes::copy
@ copy
statCollector::typed::reset
void reset() override
Definition: statCollector.h:109
copyRequest::backupModeType::during
@ during
copyRequest::base::moveSource
const std::string moveSource
Definition: copyRequest.h:286
copyRequest::fileInWork::slotTypes::move
@ move
verbose
options::single< bool > verbose
dereference
options::single< bool > dereference
delayAdvisingError
class for exceptions that advise for delays Exceptions of this kind are to be thrown when circumstanc...
Definition: inputHandler.h:22
ioHandle::setXattr
virtual void setXattr(const std::string &, const std::string &)
Definition: ioHandle.h:43
copyRequest::perThreadData::InputHandler
inputHandler::base * InputHandler
Definition: copyRequest.h:69
copyRequest::base::nReadThreads
static options::single< unsigned > nReadThreads
Definition: copyRequest.h:128
copyRequest::backupModeType::before
@ before
outputHandler::base::ensureParentDirs
virtual void ensureParentDirs(const std::string &path, const std::string &srcPath, inputHandler::base *InputHandler)=0
copyRequest::backupModeType::after
@ after
errMsg::level::warning
@ warning
noCopy
options::single< bool > noCopy
copyRequest::stateBitType::attributeMismatch
@ attributeMismatch
outputHandler::base::rename
virtual renameRetvalType rename(const std::string &fromPath, const std::unique_ptr< const genericStat > &readInitialStat, const std::string &toPath, copyRequest::stateType &state)=0
errMsg::location
class for defining the location of a error message in the source code.
Definition: errMsgQueue.h:14
copyRequest::base::errorMessage
std::string errorMessage
Definition: copyRequest.h:283
copyRequest::stateBitType::fileToBeCopied
@ fileToBeCopied
ioHandle::removeXattr
virtual void removeXattr(const std::string &)
Definition: ioHandle.h:55
copyRequest::blockQueue
waitQueues::simple< block > blockQueue
Definition: copyRequest.h:63
copyRequest::base::speedStat
static statCollector::typed< double > speedStat
Definition: copyRequest.h:315
copyRequest::base::tEnqueueStat
static statCollector::typed< copyRequest::clock_type::duration > tEnqueueStat
Definition: copyRequest.h:320
copyRequest::base::perFileThreads
static options::single< bool > perFileThreads
Definition: copyRequest.h:125
statPrefix
options::single< std::string > statPrefix
copyRequest::base::tTotalStat
static statCollector::typed< decltype(tWorkDone - tInotify) > tTotalStat
Definition: copyRequest.h:308
copyRequest::base::sourcePrefixRegex
static options::single< std::regex > sourcePrefixRegex
Definition: copyRequest.h:132
inputHandler::base::reader::readBlockP
virtual void readBlockP(block &, size_t, off_t)
read one block from the file, starting at offset.
Definition: inputHandler.h:100
fatalException
class for exceptions that should result in program termination
Definition: copyRequestTypes.h:11
copyRequest::base::success_dest_attrs
static options::map< std::string > success_dest_attrs
Definition: copyRequest.h:110
options::base::fIsSet
virtual bool fIsSet() const
check if this option was set, regardless of from command line or config file
Definition: Options.h:263
copyRequest::base::getSuffix
const std::string & getSuffix() const
Definition: copyRequest.h:421
copyRequest::base::prefix
std::string prefix
Definition: copyRequest.h:290
copyRequest::base::maxMemoryBlockSize
static options::single< size_t > maxMemoryBlockSize
Definition: copyRequest.h:297
copyRequest::stateBitType::inWork
@ inWork
outputHandler::base::writer::writeBlockP
virtual void writeBlockP(const block &)
Definition: outputHandler.h:60
copyRequest::base::backupMode
static backupModeType backupMode
Definition: copyRequest.h:120
copyRequest::stateBitType::ignore
@ ignore
errMsg::level::crit
@ crit
outputHandler::base::newWriter
virtual std::unique_ptr< writer > newWriter(const std::string &path, bool mightAppend, size_t sourceSize, size_t readBlockSize, copyRequest::stateType &state, bool noWrite, std::unique_ptr< ioHandle::attrDataType > attrData, std::unique_ptr< acl::list > aclData)=0
outputHandler::base::newTmpWriter
virtual std::unique_ptr< writer > newTmpWriter(std::string &path, size_t sourceSize, bool noWrite, std::unique_ptr< ioHandle::attrDataType > attrData, std::unique_ptr< acl::list > aclData)
Definition: outputHandler.cpp:9
errMsg::level::info
@ info
copyRequest::base::tInotifyStat
static statCollector::typed< decltype(tWorkDone - tInotify) > tInotifyStat
Definition: copyRequest.h:310
copyRequest::base::makeSymLink
bool makeSymLink(inputHandler::base *InputHandler, outputHandler::base *OutputHandler)
copyRequest::base::advisoryWaitMapMutex
static std::mutex advisoryWaitMapMutex
Definition: copyRequest.h:337
ioHandle::getXattr
virtual std::string getXattr(const std::string &)
get one extended attribute value
Definition: ioHandle.h:51
copyRequest::base
class for copy requests.
Definition: copyRequest.h:99
copyRequest::base::bytesStat
static statCollector::typed< decltype(readInitialStat->size) > bytesStat
Definition: copyRequest.h:314
copyRequest::base::filesInWorkIterator
decltype(fileInWork::filesInWork.begin()) filesInWorkIterator
Definition: copyRequest.h:341
copyRequest::stateBitType::done
@ done
copyRequest::base::retry
static bool retry(std::unique_ptr< base > &request, timedQueue &delayedRequests)
Definition: copyRequest.cpp:1866
copyRequest::base::tWorkStart
clock_type::time_point tWorkStart
Definition: copyRequest.h:305
scoped::generic
Definition: scoped.h:34
copyRequest::base::changeRequestType
void changeRequestType(stateBitType newType)
Definition: copyRequest.cpp:1072
timer
Definition: timer.cpp:3
copyRequest::base::check_source_attrs
static options::map< std::string > check_source_attrs
Definition: copyRequest.h:109
outputHandler::base::renameRetvalType::fileChanged
@ fileChanged
copyRequest::stateBitType::failed
@ failed
copyRequest::base::failure_source_attrs
static options::map< std::string > failure_source_attrs
Definition: copyRequest.h:106
inputHandler::base::newReader
virtual std::unique_ptr< reader > newReader(const std::string &path, copyRequest::stateType &state, const genericStat &inititalStat)=0
get a reader for the file at path
copyRequest::base::processMultiplicities
static std::vector< std::atomic< unsigned int > > * processMultiplicities
Definition: copyRequest.h:321
copyRequest::base::tEnqueue
clock_type::time_point tEnqueue
Definition: copyRequest.h:304
copyRequest::backupModeType::none
@ none
logstream::level::warning
@ warning
copyRequest::base::doBlockSizeSetup
void doBlockSizeSetup(ioHandle &input, ioHandle &output)
Definition: copyRequest.cpp:1404
copyRequest::base::tInotify
clock_type::time_point tInotify
Definition: copyRequest.h:303
copyRequest::base::attrset
void attrset(ioHandle &writeHandle, const std::map< std::string, std::string > &attrs)
Definition: copyRequest.cpp:446
copyRequest::base::expandAttrValue
bool expandAttrValue(std::string &value)
expand the keywords in value, see Expansion of key words in attributes
Definition: copyRequest.cpp:365
copyRequest::stateBitType::vanished
@ vanished
copyRequest::base::reader
void reader(inputHandler::base::reader &input, blockQueue &freeBlocks, blockQueue &readBlocks, bool mayParallelize, exceptionList &exceptions)
Definition: copyRequest.cpp:530
ioHandle::getBlockSize
virtual size_t getBlockSize() const
Definition: ioHandle.h:58
copyRequest::stateBitType::fileToBeRemoved
@ fileToBeRemoved
copyRequest::base::mapEntry
const singleMap & mapEntry
Definition: copyRequest.h:288
copyRequest::fileInWork::slotTypes::unlink
@ unlink
errMsg::level::debug
@ debug
copyRequest::base::doUnthreadedCopy
void doUnthreadedCopy(inputHandler::base::reader &input, std::unique_ptr< outputHandler::base::writer > &writeHandle)
Definition: copyRequest.cpp:868
copyRequest::base::origSource
const std::string origSource
Definition: copyRequest.h:287
copyRequest::base::success_source_attrs
static options::map< std::string > success_source_attrs
Definition: copyRequest.h:105
stopRequest::handlerIdType::processLoop
@ processLoop
copyRequest::base::process_source_attrs
static options::map< std::string > process_source_attrs
Definition: copyRequest.h:107
copyRequest::stateBitType::trucated
@ trucated
copyRequest::base::getBackupSuffix
std::string getBackupSuffix()
Definition: copyRequest.cpp:1077
copyRequest::base::logFields
static options::container< std::string > logFields
Definition: copyRequest.h:131
timer::anchor::print
static void print(std::ostream &out, const std::string &prefix)
Definition: timer.cpp:5
nThreads
static options::single< unsigned > nThreads('n', "nThreads", "number of threads", 0)
copyRequest::stateBitType::fileToBeRenamed
@ fileToBeRenamed
throttle::action
Definition: throttle.h:71
copyRequest::base::maxDelayTime
static options::single< double > maxDelayTime
Definition: copyRequest.h:328
copyRequest::base::setAttributesAfterClose
static options::single< bool > setAttributesAfterClose
Definition: copyRequest.h:124
copyRequest::base::tCopyStat
static statCollector::typed< decltype(tWorkDone - tWorkStart) > tCopyStat
Definition: copyRequest.h:313
outputHandler::base::renameRetvalType::fileVanished
@ fileVanished
copyRequest::base::attrdel
static void attrdel(ioHandle &, const std::map< std::string, std::string > &attrs)
Definition: copyRequest.cpp:481
copyRequest::base::append_attrs
static options::container< std::string > append_attrs
Definition: copyRequest.h:112
outputHandler::base::renameRetvalType::ok
@ ok
fileRateLimit
throttle::watch fileRateLimit
copyRequest::base::init
void init(inputHandler::base *InputHandler)
Definition: copyRequest.cpp:1445
messageQueue::queue::send
virtual void send(const std::string &aMessage, const std::string &aTopic="")=0
copyRequest::base::prohibitive_attrs
static options::container< std::string > prohibitive_attrs
Definition: copyRequest.h:111
copyRequest::base::suffix
std::string suffix
Definition: copyRequest.h:289
timer::anchor::reset
static void reset()
Definition: timer.h:86
copyRequest::base::readInitialStat
std::unique_ptr< const genericStat > readInitialStat
Definition: copyRequest.h:271
copyRequest::base::maxRetries
static options::single< unsigned > maxRetries
Definition: copyRequest.h:116
stopRequest::Requested
static bool Requested()
Definition: ewmscp.cpp:153
pathHandler::pathExists
virtual bool pathExists(const std::string &path)=0
outputHandler::base::remove
virtual void remove(const std::string &path, copyRequest::stateType &state)=0
copyRequest::base::tPipeStat
static statCollector::typed< decltype(tEnqueue - tInotify) > tPipeStat
Definition: copyRequest.h:311
copyRequest::base::start_source_attrs
static options::map< std::string > start_source_attrs
Definition: copyRequest.h:108
copyRequest::base::removeFileOrDirectory
void removeFileOrDirectory(inputHandler::base *InputHandler, outputHandler::base *OutputHandler)
Definition: copyRequest.cpp:914
outputHandler::base::renameSimple
virtual bool renameSimple(const std::string &fromPath, const std::string &toPath)=0
copyRequest::base::syncWrittenFiles
static options::single< bool > syncWrittenFiles
Definition: copyRequest.h:122
copyRequest::stateBitType::action
@ action
copyRequest::base::blockReadRequestQueue
waitQueues::simple< blockReadRequest > blockReadRequestQueue
Definition: copyRequest.h:223
copyRequest::base::backupSuffix
static options::single< std::string > backupSuffix
Definition: copyRequest.h:121
copyRequest::base::nWriteThreads
static options::single< unsigned > nWriteThreads
Definition: copyRequest.h:130
copyRequest::base::userReadBlockSize
static options::single< size_t > userReadBlockSize
Definition: copyRequest.h:296
copyRequest::base::tTotal2Stat
static statCollector::typed< decltype(tWorkDone - tInotify) > tTotal2Stat
Definition: copyRequest.h:309
copyRequest::base::retries
unsigned retries
Definition: copyRequest.h:323
copyRequest::base::objectCountCondVar
static std::condition_variable objectCountCondVar
Definition: copyRequest.h:372
preserve
decltype(preserve) preserve
set of properties to preserve in the copy
Definition: ewmscp.cpp:111
copyRequest::base::threadData
perThreadData * threadData
Definition: copyRequest.h:281
enumAsBitmask::set
void set(const T aBits)
Definition: enumAsBitmask.h:34
outputHandler::base::writer::seek
virtual void seek(size_t position)
Definition: outputHandler.cpp:27
copyRequest::base::state
stateType state
Definition: copyRequest.h:282
copyRequest::base::checkAttributes
void checkAttributes(ioHandle &, const std::map< std::string, std::string > &check_source_attr)
Definition: copyRequest.cpp:486
pathHandler::getXattr
virtual std::string getXattr(const std::string &, const std::string &)
Definition: pathHandler.h:12
copyRequest::perThreadData::freeBlocks
blockQueue freeBlocks
Definition: copyRequest.h:71
escapism::newEscaper
static const escapism * newEscaper(const std::string &name)
Definition: escapism.cpp:25
copyRequest::stateBitType::noActionNeeded
@ noActionNeeded
copyRequest::base::forceParallelRead
static options::single< bool > forceParallelRead
Definition: copyRequest.h:129
pathHandler::getStat
virtual std::unique_ptr< const genericStat > getStat(const std::string &path, bool followLink=true)=0
copyRequest::base::nConcurrentProcesses
static std::atomic< unsigned int > nConcurrentProcesses
Definition: copyRequest.h:318
checksumCreators
decltype(checksumCreators) checksumCreators
Definition: ewmscp.cpp:113
errMsg::emit
void emit(level aLogLevel, const location &loc, const std::string &aObject, const std::string &aAction, const Args &... args)
function to create and enqueue a message, this is the only way that messages should be created!
Definition: errMsgQueue.h:148
copyRequest::stateBitType::linkToBeMade
@ linkToBeMade
timerInst
#define timerInst(subfunc)
Definition: timer.h:157
statCollector::typed::addValue
void addValue(T value)
Definition: statCollector.h:37
copyRequest::base::source
const std::string source
Definition: copyRequest.h:284
outputHandler::base::writer::doAttributePreservations
virtual void doAttributePreservations(const genericStat &readInitialStat)=0
copyRequest::perThreadData::OutputHandler
outputHandler::base * OutputHandler
Definition: copyRequest.h:70
copyRequest::base::tWorkDone
clock_type::time_point tWorkDone
Definition: copyRequest.h:306
continueOnError
options::single< bool > continueOnError
copyRequest::base::objectCount
static unsigned objectCount
Definition: copyRequest.h:371
copyRequest::base::destinationPrefixRegex
static options::single< std::regex > destinationPrefixRegex
Definition: copyRequest.h:133
copyRequest::base::advisoryWaitMap
static std::map< std::string, clock_type::duration > advisoryWaitMap
Definition: copyRequest.h:336
copyRequest::base::memoryBlockSize
size_t memoryBlockSize
Definition: copyRequest.h:300
outputHandler::base::writer::getSize
virtual size_t getSize() const
Definition: outputHandler.cpp:30
outputHandler::base::writer::sync
virtual void sync()=0
checksum::base::newFactory
static factoryClass * newFactory(const std::string &aName)
Definition: checksumBase.h:68
copyRequest::base::appendableFiles
static options::single< std::regex > appendableFiles
Definition: copyRequest.h:113
copyRequest::base::statPrinter
static std::function< void(std::ostream &)> statPrinter
Definition: copyRequest.h:330
copyRequest::base::adviseDelay
static bool adviseDelay(clock_type::duration dt, const std::string &suffix)
update the advised delay in the map.
Definition: copyRequest.cpp:1011
waitQueues::simple::dequeue
std::unique_ptr< T > dequeue(bool mayCreateNew, Types ... args)
Definition: waitQueues.h:59
f
int f(int a, int line)
Definition: cctest.cpp:4
outputHandler::base::writer::writeBlock
virtual void writeBlock(const block &b)=0
copyRequest::base::objectCountMutex
static std::mutex objectCountMutex
Definition: copyRequest.h:370
errMsg::level::notice
@ notice
copyRequest::base::readWorker
void readWorker(inputHandler::base::reader &input, blockReadRequestQueue &blockRequests, blockQueue &freeBlocks, blockQueue &readBlocks, exceptionList &exceptions)
Definition: copyRequest.cpp:510
resultOutput
messageQueue::queue * resultOutput
enumAsBitmask::clear
void clear(const T aBits)
Definition: enumAsBitmask.h:31
copyRequest::base::ignoreExisting
static options::single< bool > ignoreExisting
Definition: copyRequest.h:114
stopRequest::RequestStop
static void RequestStop(const std::string &aReason)
Definition: ewmscp.cpp:162
inputHandler::base::reader::readBlock
virtual bool readBlock(block &b)=0
read one block from the file
checksum
Definition: adler32.cpp:9
copyRequest::base::destination
const std::string destination
Definition: copyRequest.h:285
copyRequest::base::tProgramStart
static clock_type::time_point tProgramStart
Definition: copyRequest.h:316
copyRequest::fileInWork::eraser
class to get copyRequest out of the fileInWork list.
Definition: fileInWork.h:103
copyRequest::base::sparse
static options::single< std::string > sparse
Definition: copyRequest.h:118
copyRequest::stateBitType::append
@ append
copyRequest::base::checkSums
std::list< checksum::base * > checkSums
Definition: copyRequest.h:277
outputHandler::base::doAttributePreservations
virtual void doAttributePreservations(const std::string &, const genericStat &)
Definition: outputHandler.h:117
ioHandle::setBlockSize
virtual void setBlockSize(size_t newSize)
Definition: ioHandle.h:61
checksum::parallel::update
virtual void update(void *data, size_t size, size_t offset)=0
copyRequest::base::doThreadedCopy
void doThreadedCopy(inputHandler::base::reader &input, std::unique_ptr< outputHandler::base::writer > &writeHandle)
Definition: copyRequest.cpp:813
copyRequest::base::keywordMap
static std::map< std::string, void(base::*)(std::string &) const > & keywordMap()
Definition: copyRequest.cpp:246
copyRequest::base::changeAttrsOnIgnoreExisting
static options::single< bool > changeAttrsOnIgnoreExisting
Definition: copyRequest.h:115
copyRequest::backupModeType::remove
@ remove
copyRequest::base::minMemoryBlockSize
static options::single< size_t > minMemoryBlockSize
Definition: copyRequest.h:298
ioHandle::parallelizable
virtual bool parallelizable() const
tell if this handler is capable of parallel IO. Unsually not the case
Definition: ioHandle.h:38