ewmscp  ..
inputHandlerDaosFs.cpp
Go to the documentation of this file.
1 #include "inputHandlerDaosFs.h"
2 #include "block.h"
3 #include "copyRequestTypes.h"
4 #include "ewmscp.h"
5 #include "timer.h"
6 
7 #include <fcntl.h>
8 #include <errMsgQueue.h>
9 #include <throwcall.h>
10 #include <unistd.h>
11 
12 #include <memory>
13 
14 namespace inputHandler {
15  decltype(daosFs::factory) daosFs::factory("daosFs");
16 
17  static daosFsCommon::daosOptions inputConfig("daosFsIn");
18 
20 
21  std::unique_ptr<base::reader> daosFs::newReader(const std::string& aPath,
23  const genericStat& inititalStat) {
24  return std::unique_ptr<base::reader>(new readerDaosFs(aPath,
25  state,
26  inititalStat,
27  *this));
28  }
29 
30 
31  bool daosFs::readLinkTarget(const std::string& path,
32  std::vector<char>& target) {
33  timerInst(readlink);
34  auto linklength = readlink(path.c_str(), target.data(), target.size());
35  if (linklength == -1 && errno == ENOENT) {
36  return false;
37  }
38  throwcall::badval(linklength, -1, "could not read link ", path);
39  if (linklength >= static_cast<ssize_t>(target.size())) {
40  throw std::runtime_error("link size increased after stat for " + path);
41  }
42  target[linklength] = '\0';
43  return true;
44  }
45 
46 
47 
48  daosFs::readerDaosFs::readerDaosFs(const std::string& aPath,
50  const genericStat&inititalStat,
51  daosFsCommon& aHandler):
52  daosFsIoCommon(aPath, aHandler),
53  reader(inititalStat) {
54  int retval;
55  {
56  timerInst(open);
57  retval = dfs_open(handler.dfs, handler.getDirObj(path), path.c_str(), 0444, O_RDONLY,
58  0, readInitialStat.blksize, nullptr, &obj);
59  }
60  if (retval) {
61  if (errno == ENOENT
62  || (errno == ENOTDIR && !S_ISDIR(readInitialStat.mode))) {
64  }
65  }
66  throwcall::good0(retval, "can't open ", path, " for reading");
67  blockSize = readInitialStat.blksize;
68  }
69 
71  if (obj) {
72  if (isUnwinding()) {
73  timerInst(release);
74  if (dfs_release(obj) != 0) {
76  path, "release during unwind ",
77  std::system_category().default_error_condition(errno).message());
78  }
79  return;
80  }
81  {
82  timerInst(release);
83  throwcall::good0(dfs_release(obj), "can't release ", path, " after reading");
84  }
85  }
86  }
87 
88 
89 
90 
92  return true;
93  }
95  b.clear(totalBytesRead);
96  bool lastblock = false;
97 
98  auto bytesToRead = b.max_size();
99 
100  while (b.size() + blockSize <= bytesToRead) {
102  d_iov_t iov;
103  iov.iov_buf = b.bufferAt(b.size());
104  iov.iov_buf_len = blockSize;
105  d_sg_list_t sg_list;
106  sg_list.sg_nr=1;
107  sg_list.sg_iovs = &iov;
108  timerInst(read);
109  daos_size_t bytes_read;
110  throwcall::good0(dfs_read(handler.dfs, obj, &sg_list, totalBytesRead, &bytes_read, nullptr),
111  "read failed on ", path);
112  readRateLimit.update(bytes_read);
113  if (bytes_read == 0) {
114  lastblock = true;
115  if (totalBytesRead < readInitialStat.size) {
116  throw delayAdvisingError(path + " has shrunk while reading, (" +
117  std::to_string(readInitialStat.size) +
118  " -> " +
119  std::to_string(totalBytesRead) +
120  ")");
121  }
122  break;
123  }
124  totalBytesRead += bytes_read;
125  if (totalBytesRead > readInitialStat.size) {
126  throw delayAdvisingError(path + " has grown while reading, (" +
127  std::to_string(readInitialStat.size) +
128  " -> " +
129  std::to_string(totalBytesRead) +
130  ")");
131  }
132 
133  b.bump_size(bytes_read);
134  }
135 
136  return lastblock;
137  }
138 
139 
140  void daosFs::readerDaosFs::readBlockP(block& b, size_t bytesToRead, off_t offset) {
141  b.clear(offset);
142  while (b.size() + blockSize <= b.max_size()) {
143  timerInst(pread);
144  d_iov_t iov;
145  iov.iov_buf = b.bufferAt(b.size());
146  iov.iov_buf_len = blockSize;
147  d_sg_list_t sg_list;
148  sg_list.sg_nr=1;
149  sg_list.sg_iovs = &iov;
150  timerInst(read);
151  daos_size_t bytes_read;
152  throwcall::good0(dfs_read(handler.dfs, obj, &sg_list, offset + b.size() , &bytes_read, nullptr),
153  "read failed on ", path);
154  if (bytes_read == 0) {
155  break;
156  }
157  b.bump_size(bytes_read);
158  if (b.size() > bytesToRead) {
159  throw delayAdvisingError(path + " has grown while reading");
160  }
161  }
162  if (b.size() < bytesToRead) {
163  throw delayAdvisingError(path + " has shrunk while reading "
164  + std::to_string(bytesToRead)
165  + " "
166  + std::to_string(b.size()));
167  }
168  }
169 
170 
172  struct stat readFinalStatBuf;
173  {
174  timerInst(fstat);
175  throwcall::good0(dfs_ostat(handler.dfs, obj, &readFinalStatBuf), "can't stat path file ", path);
176  }
177  genericStat readFinalStat(readFinalStatBuf, std::chrono::nanoseconds(1));
178  if (readFinalStat.size != readInitialStat.size) {
179  throw delayAdvisingError("file size has changed (" +
180  std::to_string(readInitialStat.size) +
181  " -> " +
182  std::to_string(readFinalStat.size) +
183  ") during reading on " + path);
184  }
185 
186  if (!readFinalStat.isSameMtimeAs(readInitialStat)) {
187  throw delayAdvisingError("file " + path + " was modified (" +
188  std::to_string(std::chrono::duration_cast<std::chrono::duration<double>>(readFinalStat.getMtime() - readInitialStat.getMtime()).count()) +
189  "s different mtime) during reading");
190  }
191  }
192  daosFs::DaosFsDirectory::DaosFsDirectory(daosFs& aHandler, const std::string& aPath): Directory(aPath), handler(aHandler) {
193  timerInst(opendir);
194  throwcall::good0(dfs_open(handler.dfs, handler.getDirObj(path), path.c_str(), 0666, O_RDONLY, 0, 0, nullptr, &dir),
195  "can't open direcory ", path);
196  }
198  if (isUnwinding()) {
199  if (dfs_release(dir) != 0) {
201  "", "close directory during unwind ",
202  std::system_category().default_error_condition(errno).message());
203  }
204  } else {
205  timerInst(closedir);
206  throwcall::good0(dfs_release(dir), "can't close directory ", path);
207  }
208  }
209  std::unique_ptr<base::Directory::Entry> daosFs::DaosFsDirectory::getNextEntry(bool ignoreMissing) {
210  while (true) {
211  struct dirent entry;
212  uint32_t n=1;
213  throwcall::good0(dfs_readdir(handler.dfs, dir, &anchor, &n, &entry),"can't read dir enry");
214  if (n==1) {
215  if (entry.d_name[entry.d_name[0] != '.' ? 0 : entry.d_name[1] != '.' ? 1 : 2] == '\0') {
216  continue; // skip . .. and empty strings
217  }
218  struct stat statbuf;
219  {
220  timerInst(fstatat);
221  auto result = dfs_stat(handler.dfs, dir, entry.d_name, &statbuf);
222  if (result != 0 && errno == ENOENT && ignoreMissing) {
223  continue;
224  }
225  throwcall::good0(result, "can't stat ", entry.d_name);
226  }
227  auto genStat = std::unique_ptr<const genericStat>(new genericStat(statbuf, std::chrono::nanoseconds(1)));
228  return std::unique_ptr<Entry>(new Entry(entry.d_name, genStat));
229  } else {
230  break;
231  }
232  }
233  return nullptr;
234  }
235  std::unique_ptr<base::Directory> daosFs::getDirectory(const std::string& path) {
236  return std::unique_ptr<Directory>(new DaosFsDirectory(*this, path));
237  }
238 
239 } //end namespace inputHandler
daosFsCommon::daosOptions
Definition: daosFsCommon.h:20
daosFsIoCommon::obj
dfs_obj_t * obj
Definition: daosFsCommon.h:53
block.h
delayAdvisingError
class for exceptions that advise for delays Exceptions of this kind are to be thrown when circumstanc...
Definition: inputHandler.h:22
inputHandler::daosFs::readerDaosFs
Definition: inputHandlerDaosFs.h:19
errMsgQueue.h
inputHandler::daosFs::readerDaosFs::parallelizable
bool parallelizable() const override
tell if this handler is capable of parallel IO. Unsually not the case
Definition: inputHandlerDaosFs.cpp:91
block::max_size
size_t max_size() const
Definition: block.h:22
inputHandler
Definition: inputHandler.h:29
errMsg::location
class for defining the location of a error message in the source code.
Definition: errMsgQueue.h:14
inputHandler::inputConfig
static daosFsCommon::daosOptions inputConfig("daosFsIn")
throwcall::badval
T badval(T call, t badvalue, const Args &... args)
template function to wrap system calls that return a special bad value on failure
Definition: throwcall.h:54
inputHandler::daosFs::newReader
std::unique_ptr< reader > newReader(const std::string &aPath, copyRequest::stateType &state, const genericStat &inititalStat) override
get a reader for the file at path
Definition: inputHandlerDaosFs.cpp:21
genericStat
generic stat abstraction class Used to abstract the variants of the stat structure.
Definition: genericStat.h:12
copyRequestTypes.h
inputHandler::daosFs::DaosFsDirectory::~DaosFsDirectory
~DaosFsDirectory() noexcept(false) override
Definition: inputHandlerDaosFs.cpp:197
genericStat::getMtime
void getMtime(struct timespec &spec) const
Definition: genericStat.cpp:65
copyRequest::stateBitType::vanished
@ vanished
block::bump_size
void bump_size(size_t additionalBytes)
Definition: block.h:33
inputHandlerDaosFs.h
copyRequest::stateType
Definition: copyRequestTypes.h:66
genericStat::isSameMtimeAs
bool isSameMtimeAs(const genericStat &that) const
Definition: genericStat.cpp:87
readRateLimit
throttle::watch readRateLimit
errMsg::level::debug
@ debug
daosFsCommon
Definition: daosFsCommon.h:11
ioHandle::blockSize
size_t blockSize
in bytes, block size to be used when reading or writing
Definition: ioHandle.h:17
inputHandler::daosFs
Definition: inputHandlerDaosFs.h:15
genericStat::size
size_t size
Definition: genericStat.h:16
inputHandler::daosFs::daosFs
daosFs()
Definition: inputHandlerDaosFs.cpp:19
daosFsCommon::getDirObj
dfs_obj_t * getDirObj(const std::string &path)
Definition: daosFsCommon.cpp:21
block::bufferAt
void * bufferAt(size_t offset)
only way to access the data in the block
Definition: block.cpp:28
timer.h
throwcall.h
block
data block, used to hold the data that are being copied (or checksummed).
Definition: block.h:7
inputHandler::daosFs::factory
static factoryTemplate< daosFs > factory
Definition: inputHandlerDaosFs.h:16
inputHandler::daosFs::DaosFsDirectory::DaosFsDirectory
DaosFsDirectory(daosFs &aHandler, const std::string &path)
Definition: inputHandlerDaosFs.cpp:192
daosFsCommon::dfs
dfs_t * dfs
Definition: daosFsCommon.h:15
inputHandler::daosFs::readerDaosFs::~readerDaosFs
~readerDaosFs() override
Definition: inputHandlerDaosFs.cpp:70
inputHandler::daosFs::readLinkTarget
bool readLinkTarget(const std::string &path, std::vector< char > &target) override
read link target from a symlink
Definition: inputHandlerDaosFs.cpp:31
inputHandler::daosFs::DaosFsDirectory
Definition: inputHandlerDaosFs.h:40
inputHandler::daosFs::DaosFsDirectory::getNextEntry
std::unique_ptr< Entry > getNextEntry(bool ignoreMissing) override
Definition: inputHandlerDaosFs.cpp:209
inputHandler::daosFs::readerDaosFs::checkUnchangedness
void checkUnchangedness() override
Definition: inputHandlerDaosFs.cpp:171
throttle::watch::wait
void wait()
Definition: throttle.h:50
inputHandler::daosFs::readerDaosFs::readerDaosFs
readerDaosFs(const std::string &aPath, copyRequest::stateType &state, const genericStat &inititalStat, daosFsCommon &aHandler)
Definition: inputHandlerDaosFs.cpp:48
daosFsIoCommon
base class for daosFs reader and writer class with the common stuff like fd, path and xattr handling
Definition: daosFsCommon.h:50
errMsg::emit
void emit(level aLogLevel, const location &loc, const std::string &aObject, const std::string &aAction, const Args &... args)
function to create and enqueue a message, this is the only way that messages should be created!
Definition: errMsgQueue.h:148
timerInst
#define timerInst(subfunc)
Definition: timer.h:157
inputHandler::daosFs::readerDaosFs::readBlockP
void readBlockP(block &b, size_t bytesToRead, off_t offset) override
Definition: inputHandlerDaosFs.cpp:140
block::size
size_t size() const
Definition: block.h:16
inputHandler::daosFs::getDirectory
std::unique_ptr< Directory > getDirectory(const std::string &path) override
Definition: inputHandlerDaosFs.cpp:235
inputHandler::daosFs::DaosFsDirectory::handler
daosFs & handler
Definition: inputHandlerDaosFs.h:41
throwcall::good0
void good0(T call, const Args &... args)
template function to wrap system calls that return 0 on success
Definition: throwcall.h:40
block::clear
void clear(size_t aOffset)
Definition: block.h:28
inputHandler::daosFs::DaosFsDirectory::dir
dfs_obj_t * dir
Definition: inputHandlerDaosFs.h:42
daosFsIoCommon::path
const std::string & path
Definition: daosFsCommon.h:52
throttle::watch::update
void update(double units=1.0)
Definition: throttle.h:35
daosFsIoCommon::handler
daosFsCommon & handler
Definition: daosFsCommon.h:54
ewmscp.h
inputHandler::daosFs::readerDaosFs::readBlock
bool readBlock(block &b) override
Definition: inputHandlerDaosFs.cpp:94