ewmscp  ..
inputHandlerDavix.cpp
Go to the documentation of this file.
1 #include "inputHandlerDavix.h"
2 #include "timer.h"
3 
4 #include <memory>
5 #include "block.h"
6 #include "ewmscp.h"
7 
8 #include <errMsgQueue.h>
9 
10 namespace inputHandler {
11  decltype(davix::factory) davix::factory("davix");
12  std::unique_ptr<base::reader> davix::newReader(const std::string& aPath,
14  const genericStat& inititalStat) {
15  return std::unique_ptr<reader>(new readerDavix(*this, aPath, state, inititalStat));
16  }
17 
18  static davixConfigObject inputConfig("davixIn");
19 
22  };
23 
24 
25  std::unique_ptr<base::Directory> davix::getDirectory(const std::string& path) {
26  return std::unique_ptr<Directory>(new DavixDirectory(path, *this));
27  }
28  davix::DavixDirectory::DavixDirectory(const std::string& aPath, davixCommon& aHandler):
29  Directory(aPath),
30  handler(aHandler) {
31  errorReport report(__func__, "opendirpp", path);
32  dir = handler.posix.opendirpp(&handler.params, path, report);
33  if (dir == nullptr) {
34  report.throwUp();
35  }
36  }
38  errorReport report(__func__, "closedirpp", path);
39  if (handler.posix.closedirpp(dir, report)) {
40  if (isUnwinding()) {
42  path, "close directory during unwind ",
43  report.getMessage());
44  } else {
45  report.throwUp();
46  }
47  }
48  }
49  std::unique_ptr<base::Directory::Entry> davix::DavixDirectory::getNextEntry(bool /*ignoreMissing*/) {
50  errorReport report(__func__, "readdirpp", path);
51  struct stat statbuf;
52  auto entry = handler.posix.readdirpp(dir, &statbuf, report);
53  if (entry == nullptr) {
54  return nullptr;
55  }
56  auto genstat = std::unique_ptr<const genericStat>(new genericStat(statbuf, std::chrono::seconds(1)));
57  return std::unique_ptr<Entry>(new Entry(entry->d_name, genstat));
58  }
59 
61  const std::string& aPath,
63  const genericStat& inititalStat):
64  davixIoCommon(aPath, aHandler),
65  reader(inititalStat) {
66  errorReport report(__func__, "open", path);
67  fd = handler.posix.open(&handler.params, path.c_str(), O_RDONLY, report);
68  if (fd == nullptr) {
69  if (report->getStatus() == Davix::StatusCode::FileNotFound) {
71  }
72  report.throwUp();
73  }
74  blockSize = readInitialStat.blksize;
75  }
77  if (fd) {
78  errorReport report(__func__, "close", path);
79  if (handler.posix.close(fd, report) != 0) {
80  if (isUnwinding()) {
82  path, "close during unwind ",
83  report.getMessage());
84  return;
85  }
86  report.throwUp();
87  }
88  }
89  }
90 
91 
93  b.clear(totalBytesRead);
94  bool lastblock = false;
95  while (b.size() + blockSize <= b.max_size()) {
97  errorReport report(__func__, "read", path);
98  timerInst(read);
99  auto bytes_read = handler.posix.read(fd, b.bufferAt(b.size()), blockSize, report);
100  if (bytes_read < 0) {
101  report.throwUp();
102  }
103  if (bytes_read == 0) {
104  lastblock = true;
105  if (totalBytesRead < readInitialStat.size) {
106  throw delayAdvisingError(path + " has shrunk while reading, (" +
107  std::to_string(readInitialStat.size) +
108  " -> " +
109  std::to_string(totalBytesRead) +
110  ")");
111  }
112  break;
113  }
114  readRateLimit.update(bytes_read);
115  totalBytesRead += bytes_read;
116  if (totalBytesRead > readInitialStat.size) {
117  throw delayAdvisingError(path + " has grown while reading, (" +
118  std::to_string(readInitialStat.size) +
119  " -> " +
120  std::to_string(totalBytesRead) +
121  ")");
122  }
123  b.bump_size(bytes_read);
124  }
125 
126  return lastblock;
127  };
128 
130  return true;
131  }
132 
133  void davix::readerDavix::readBlockP(block& b, size_t bytesToRead, off_t offset) {
134  b.clear(offset);
135  while (b.size() + blockSize <= b.max_size()) {
136  errorReport report(__func__, "pread", path);
137  timerInst(pread);
138  auto bytes_read = handler.posix.pread64(fd, b.bufferAt(b.size()), blockSize, offset + b.size(),report);
139  if (bytes_read < 0)
140  report.throwUp();
141  if (bytes_read == 0) {
142  break;
143  }
144  b.bump_size(bytes_read);
145  if (b.size() > bytesToRead) {
146  throw delayAdvisingError(path + " has grown while reading");
147  }
148  }
149  if (b.size() < bytesToRead) {
150  throw delayAdvisingError(path + " has shrunk while reading "
151  + std::to_string(bytesToRead)
152  + " "
153  + std::to_string(b.size()));
154  }
155  }
156 
157 
159  };
160 } // end namespace inputHandler
block.h
inputHandler::davix::newReader
std::unique_ptr< reader > newReader(const std::string &aPath, copyRequest::stateType &state, const genericStat &inititalStat) override
get a reader for the file at path
Definition: inputHandlerDavix.cpp:12
delayAdvisingError
class for exceptions that advise for delays Exceptions of this kind are to be thrown when circumstanc...
Definition: inputHandler.h:22
inputHandler::davix::readerDavix::readBlock
bool readBlock(block &b) override
Definition: inputHandlerDavix.cpp:92
errMsgQueue.h
block::max_size
size_t max_size() const
Definition: block.h:22
inputHandlerDavix.h
inputHandler::davix::factory
static factoryTemplate< davix > factory
Definition: inputHandlerDavix.h:14
inputHandler
Definition: inputHandler.h:29
errMsg::location
class for defining the location of a error message in the source code.
Definition: errMsgQueue.h:14
davixIoCommon::fd
DAVIX_FD * fd
Definition: davixCommon.h:228
inputHandler::inputConfig
static daosFsCommon::daosOptions inputConfig("daosFsIn")
genericStat
generic stat abstraction class Used to abstract the variants of the stat structure.
Definition: genericStat.h:12
inputHandler::davix::readerDavix::checkUnchangedness
void checkUnchangedness() override
Definition: inputHandlerDavix.cpp:158
davixCommon
Definition: davixCommon.h:193
inputHandler::davix::DavixDirectory::handler
davixCommon & handler
Definition: inputHandlerDavix.h:35
copyRequest::stateBitType::vanished
@ vanished
block::bump_size
void bump_size(size_t additionalBytes)
Definition: block.h:33
copyRequest::stateType
Definition: copyRequestTypes.h:66
inputHandler::davix::DavixDirectory
Definition: inputHandlerDavix.h:33
readRateLimit
throttle::watch readRateLimit
errMsg::level::debug
@ debug
inputHandler::davix::readerDavix::parallelizable
bool parallelizable() const override
tell if this handler is capable of parallel IO. Unsually not the case
Definition: inputHandlerDavix.cpp:129
ioHandle::blockSize
size_t blockSize
in bytes, block size to be used when reading or writing
Definition: ioHandle.h:17
inputHandler::davix::readerDavix::readerDavix
readerDavix(davixCommon &aHandler, const std::string &aPath, copyRequest::stateType &state, const genericStat &inititalStat)
Definition: inputHandlerDavix.cpp:60
inputHandler::davix::readerDavix::readBlockP
void readBlockP(block &b, size_t bytesToRead, off_t offset) override
Definition: inputHandlerDavix.cpp:133
block::bufferAt
void * bufferAt(size_t offset)
only way to access the data in the block
Definition: block.cpp:28
errorReport
class for easy error handling with davix ensures proper cleanup of the error report when going out of...
Definition: davixCommon.h:12
davixIoCommon
Definition: davixCommon.h:224
timer.h
davixIoCommon::handler
davixCommon & handler
Definition: davixCommon.h:227
block
data block, used to hold the data that are being copied (or checksummed).
Definition: block.h:7
inputHandler::davix::DavixDirectory::getNextEntry
std::unique_ptr< Entry > getNextEntry(bool ignoreMissing) override
Definition: inputHandlerDavix.cpp:49
inputHandler::base::Directory
Definition: inputHandler.h:159
inputHandler::davix::DavixDirectory::DavixDirectory
DavixDirectory(const std::string &aPath, davixCommon &aHandler)
Definition: inputHandlerDavix.cpp:28
davixCommon::posix
Davix::DavPosix posix
Definition: davixCommon.h:198
inputHandler::davix::DavixDirectory::~DavixDirectory
~DavixDirectory() noexcept(false) override
Definition: inputHandlerDavix.cpp:37
inputHandler::base::Directory::path
const std::string path
Definition: inputHandler.h:161
throttle::watch::wait
void wait()
Definition: throttle.h:50
errMsg::emit
void emit(level aLogLevel, const location &loc, const std::string &aObject, const std::string &aAction, const Args &... args)
function to create and enqueue a message, this is the only way that messages should be created!
Definition: errMsgQueue.h:148
timerInst
#define timerInst(subfunc)
Definition: timer.h:157
inputHandler::davix::readerDavix
Definition: inputHandlerDavix.h:16
block::size
size_t size() const
Definition: block.h:16
inputHandler::davix::davix
davix()
Definition: inputHandlerDavix.cpp:20
inputHandler::davix::DavixDirectory::dir
DAVIX_DIR * dir
Definition: inputHandlerDavix.h:34
block::clear
void clear(size_t aOffset)
Definition: block.h:28
davixCommon::params
Davix::RequestParams params
Definition: davixCommon.h:197
inputHandler::base::Directory::Entry
Definition: inputHandler.h:163
errorReport::getMessage
const std::string & getMessage() const
Definition: davixCommon.h:37
throttle::watch::update
void update(double units=1.0)
Definition: throttle.h:35
davixConfigObject
class for configuring one davix instance holds all options necessary for that
Definition: davixCommon.h:169
inputHandler::davix::getDirectory
std::unique_ptr< Directory > getDirectory(const std::string &path) override
Definition: inputHandlerDavix.cpp:25
ewmscp.h
davixIoCommon::path
const std::string & path
Definition: davixCommon.h:226
inputHandler::davix::readerDavix::~readerDavix
~readerDavix() noexcept(false) override
Definition: inputHandlerDavix.cpp:76
errorReport::throwUp
void throwUp()
Definition: davixCommon.h:45