ewmscp  ..
Public Member Functions | List of all members
inputHandler::daosFs::readerDaosFs Class Reference

#include <inputHandlerDaosFs.h>

Inheritance diagram for inputHandler::daosFs::readerDaosFs:
[legend]
Collaboration diagram for inputHandler::daosFs::readerDaosFs:
[legend]

Public Member Functions

 readerDaosFs (const std::string &aPath, copyRequest::stateType &state, const genericStat &inititalStat, daosFsCommon &aHandler)
 
 ~readerDaosFs () override
 
bool parallelizable () const override
 tell if this handler is capable of parallel IO. Unsually not the case More...
 
bool readBlock (block &b) override
 
void readBlockP (block &b, size_t bytesToRead, off_t offset) override
 
void checkUnchangedness () override
 
- Public Member Functions inherited from daosFsIoCommon
 daosFsIoCommon (const std::string &aPath, daosFsCommon &aHandler)
 
void setXattr (const std::string &name, const std::string &value) override
 
std::string getXattr (const std::string &name) override
 get one extended attribute value More...
 
void removeXattr (const std::string &name) override
 
std::unique_ptr< const genericStatgetStat () override
 
std::unique_ptr< ioHandle::attrDataTypegetAttrData (const outputHandler::base *aOutputHandler) override
 get attributes in the optimal way for setting with aOutputHandler More...
 
- Public Member Functions inherited from ioHandle
 ioHandle ()
 
virtual ~ioHandle () noexcept(false)
 
virtual size_t getBlockSize () const
 
virtual void setBlockSize (size_t newSize)
 
virtual std::unique_ptr< acl::listgetAclData ()
 get acls More...
 

Additional Inherited Members

- Protected Attributes inherited from daosFsIoCommon
const std::string & path
 
dfs_obj_t * obj
 
daosFsCommonhandler
 
- Protected Attributes inherited from ioHandle
size_t blockSize
 in bytes, block size to be used when reading or writing More...
 

Detailed Description

Definition at line 19 of file inputHandlerDaosFs.h.

Constructor & Destructor Documentation

◆ readerDaosFs()

inputHandler::daosFs::readerDaosFs::readerDaosFs ( const std::string &  aPath,
copyRequest::stateType state,
const genericStat inititalStat,
daosFsCommon aHandler 
)

Definition at line 48 of file inputHandlerDaosFs.cpp.

51  :
52  daosFsIoCommon(aPath, aHandler),
53  reader(inititalStat) {
54  int retval;
55  {
56  timerInst(open);
57  retval = dfs_open(handler.dfs, handler.getDirObj(path), path.c_str(), 0444, O_RDONLY,
58  0, readInitialStat.blksize, nullptr, &obj);
59  }
60  if (retval) {
61  if (errno == ENOENT
62  || (errno == ENOTDIR && !S_ISDIR(readInitialStat.mode))) {
64  }
65  }
66  throwcall::good0(retval, "can't open ", path, " for reading");
67  blockSize = readInitialStat.blksize;
68  }

References ioHandle::blockSize, daosFsCommon::dfs, daosFsCommon::getDirObj(), throwcall::good0(), daosFsIoCommon::handler, daosFsIoCommon::obj, daosFsIoCommon::path, timerInst, and copyRequest::vanished.

Here is the call graph for this function:

◆ ~readerDaosFs()

inputHandler::daosFs::readerDaosFs::~readerDaosFs ( )
override

Definition at line 70 of file inputHandlerDaosFs.cpp.

70  {
71  if (obj) {
72  if (isUnwinding()) {
73  timerInst(release);
74  if (dfs_release(obj) != 0) {
76  path, "release during unwind ",
77  std::system_category().default_error_condition(errno).message());
78  }
79  return;
80  }
81  {
82  timerInst(release);
83  throwcall::good0(dfs_release(obj), "can't release ", path, " after reading");
84  }
85  }
86  }

References errMsg::debug, errMsg::emit(), throwcall::good0(), and timerInst.

Here is the call graph for this function:

Member Function Documentation

◆ checkUnchangedness()

void inputHandler::daosFs::readerDaosFs::checkUnchangedness ( )
override

Definition at line 171 of file inputHandlerDaosFs.cpp.

171  {
172  struct stat readFinalStatBuf;
173  {
174  timerInst(fstat);
175  throwcall::good0(dfs_ostat(handler.dfs, obj, &readFinalStatBuf), "can't stat path file ", path);
176  }
177  genericStat readFinalStat(readFinalStatBuf, std::chrono::nanoseconds(1));
178  if (readFinalStat.size != readInitialStat.size) {
179  throw delayAdvisingError("file size has changed (" +
180  std::to_string(readInitialStat.size) +
181  " -> " +
182  std::to_string(readFinalStat.size) +
183  ") during reading on " + path);
184  }
185 
186  if (!readFinalStat.isSameMtimeAs(readInitialStat)) {
187  throw delayAdvisingError("file " + path + " was modified (" +
188  std::to_string(std::chrono::duration_cast<std::chrono::duration<double>>(readFinalStat.getMtime() - readInitialStat.getMtime()).count()) +
189  "s different mtime) during reading");
190  }
191  }

References genericStat::getMtime(), throwcall::good0(), genericStat::isSameMtimeAs(), genericStat::size, and timerInst.

Here is the call graph for this function:

◆ parallelizable()

bool inputHandler::daosFs::readerDaosFs::parallelizable ( ) const
overridevirtual

tell if this handler is capable of parallel IO. Unsually not the case

Reimplemented from ioHandle.

Definition at line 91 of file inputHandlerDaosFs.cpp.

91  {
92  return true;
93  }

◆ readBlock()

bool inputHandler::daosFs::readerDaosFs::readBlock ( block b)
override

Definition at line 94 of file inputHandlerDaosFs.cpp.

94  {
95  b.clear(totalBytesRead);
96  bool lastblock = false;
97 
98  auto bytesToRead = b.max_size();
99 
100  while (b.size() + blockSize <= bytesToRead) {
102  d_iov_t iov;
103  iov.iov_buf = b.bufferAt(b.size());
104  iov.iov_buf_len = blockSize;
105  d_sg_list_t sg_list;
106  sg_list.sg_nr=1;
107  sg_list.sg_iovs = &iov;
108  timerInst(read);
109  daos_size_t bytes_read;
110  throwcall::good0(dfs_read(handler.dfs, obj, &sg_list, totalBytesRead, &bytes_read, nullptr),
111  "read failed on ", path);
112  readRateLimit.update(bytes_read);
113  if (bytes_read == 0) {
114  lastblock = true;
115  if (totalBytesRead < readInitialStat.size) {
116  throw delayAdvisingError(path + " has shrunk while reading, (" +
117  std::to_string(readInitialStat.size) +
118  " -> " +
119  std::to_string(totalBytesRead) +
120  ")");
121  }
122  break;
123  }
124  totalBytesRead += bytes_read;
125  if (totalBytesRead > readInitialStat.size) {
126  throw delayAdvisingError(path + " has grown while reading, (" +
127  std::to_string(readInitialStat.size) +
128  " -> " +
129  std::to_string(totalBytesRead) +
130  ")");
131  }
132 
133  b.bump_size(bytes_read);
134  }
135 
136  return lastblock;
137  }

References block::bufferAt(), block::bump_size(), block::clear(), throwcall::good0(), block::max_size(), readRateLimit, block::size(), timerInst, throttle::watch::update(), and throttle::watch::wait().

Here is the call graph for this function:

◆ readBlockP()

void inputHandler::daosFs::readerDaosFs::readBlockP ( block b,
size_t  bytesToRead,
off_t  offset 
)
override

Definition at line 140 of file inputHandlerDaosFs.cpp.

140  {
141  b.clear(offset);
142  while (b.size() + blockSize <= b.max_size()) {
143  timerInst(pread);
144  d_iov_t iov;
145  iov.iov_buf = b.bufferAt(b.size());
146  iov.iov_buf_len = blockSize;
147  d_sg_list_t sg_list;
148  sg_list.sg_nr=1;
149  sg_list.sg_iovs = &iov;
150  timerInst(read);
151  daos_size_t bytes_read;
152  throwcall::good0(dfs_read(handler.dfs, obj, &sg_list, offset + b.size() , &bytes_read, nullptr),
153  "read failed on ", path);
154  if (bytes_read == 0) {
155  break;
156  }
157  b.bump_size(bytes_read);
158  if (b.size() > bytesToRead) {
159  throw delayAdvisingError(path + " has grown while reading");
160  }
161  }
162  if (b.size() < bytesToRead) {
163  throw delayAdvisingError(path + " has shrunk while reading "
164  + std::to_string(bytesToRead)
165  + " "
166  + std::to_string(b.size()));
167  }
168  }

References block::bufferAt(), block::bump_size(), block::clear(), throwcall::good0(), block::max_size(), block::size(), and timerInst.

Here is the call graph for this function:

The documentation for this class was generated from the following files:
daosFsIoCommon::obj
dfs_obj_t * obj
Definition: daosFsCommon.h:53
delayAdvisingError
class for exceptions that advise for delays Exceptions of this kind are to be thrown when circumstanc...
Definition: inputHandler.h:22
block::max_size
size_t max_size() const
Definition: block.h:22
errMsg::location
class for defining the location of a error message in the source code.
Definition: errMsgQueue.h:14
genericStat
generic stat abstraction class Used to abstract the variants of the stat structure.
Definition: genericStat.h:12
copyRequest::stateBitType::vanished
@ vanished
block::bump_size
void bump_size(size_t additionalBytes)
Definition: block.h:33
readRateLimit
throttle::watch readRateLimit
errMsg::level::debug
@ debug
ioHandle::blockSize
size_t blockSize
in bytes, block size to be used when reading or writing
Definition: ioHandle.h:17
daosFsCommon::getDirObj
dfs_obj_t * getDirObj(const std::string &path)
Definition: daosFsCommon.cpp:21
block::bufferAt
void * bufferAt(size_t offset)
only way to access the data in the block
Definition: block.cpp:28
daosFsCommon::dfs
dfs_t * dfs
Definition: daosFsCommon.h:15
throttle::watch::wait
void wait()
Definition: throttle.h:50
daosFsIoCommon::daosFsIoCommon
daosFsIoCommon(const std::string &aPath, daosFsCommon &aHandler)
Definition: daosFsCommon.cpp:115
errMsg::emit
void emit(level aLogLevel, const location &loc, const std::string &aObject, const std::string &aAction, const Args &... args)
function to create and enqueue a message, this is the only way that messages should be created!
Definition: errMsgQueue.h:148
timerInst
#define timerInst(subfunc)
Definition: timer.h:157
block::size
size_t size() const
Definition: block.h:16
throwcall::good0
void good0(T call, const Args &... args)
template function to wrap system calls that return 0 on success
Definition: throwcall.h:40
block::clear
void clear(size_t aOffset)
Definition: block.h:28
daosFsIoCommon::path
const std::string & path
Definition: daosFsCommon.h:52
throttle::watch::update
void update(double units=1.0)
Definition: throttle.h:35
daosFsIoCommon::handler
daosFsCommon & handler
Definition: daosFsCommon.h:54