ewmscp  ..
Classes | Typedefs | Functions | Variables
fileopstest.cpp File Reference

(v0.19-24-g0617ca1 with changes)

#include <thread>
#include <mutex>
#include <condition_variable>
#include <string>
#include <vector>
#include <deque>
#include <iostream>
#include <iomanip>
#include <system_error>
#include <sstream>
#include <chrono>
#include <algorithm>
#include <random>
#include <map>
#include <cstdlib>
#include <list>
#include <Options.h>
#include "throwcall.h"
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/statvfs.h>
#include <fcntl.h>
#include <unistd.h>
#include <string.h>
Include dependency graph for fileopstest.cpp:

Go to the source code of this file.

Classes

class  testFile
 
class  buffqueue
 
class  buffstuff
 
class  timeRecords
 
class  fileSizeGenerator
 
class  statCollectionBase< TC >
 
class  dummyMember
 
class  statCollection< T, TC >
 
class  timeSpan
 

Typedefs

typedef std::chrono::system_clock clock_type
 

Functions

testFilecreateFile (const char *workdir, unsigned int index, buffstuff &buffer, size_t size, std::chrono::system_clock::time_point &stat_time)
 
void readAndDeleteFile (testFile *f, buffstuff &buffer)
 
void bunch_wait (long int bunchTime)
 
void creator (buffqueue *queue, const char *workdir, unsigned int repeat, unsigned long blockSize, unsigned int maxQueueDepth)
 
void consumer (buffqueue *queue, buffqueue *results, unsigned long blockSize)
 
template<typename T , typename TC , class ... Types>
statCollection< T, TC > * statCollectionFactory (T TC::*member, Types...args)
 
void collectStats (buffqueue *results)
 
int main (int argc, const char *argv[])
 

Variables

static options::single< int > bunchTime ('B', "bunchTime", "bunching time for requests (seconds)", 0)
 
static options::single< int > bunchStart ('s', "startTime", "bunching time for starts (seconds)", 0)
 
static timeRecords creationRecords
 
static timeRecords mtimeRecords
 
static timeRecords consumptionRecords
 
static options::single< std::string > distribution ('d', "distribution", "distribution type", "constant", {"constant", "uniform", "geometric", "gauss"})
 
static options::single< size_t > parameter1 ('\0', "parameter1", "1st distribution paramater, avg/min file size", 1024)
 
static options::single< size_t > parameter2 ('\0', "parameter2", "2nd distribution paramater, sigma/max file size", 2048)
 

Typedef Documentation

◆ clock_type

typedef std::chrono::system_clock clock_type

Definition at line 27 of file fileopstest.cpp.

Function Documentation

◆ bunch_wait()

void bunch_wait ( long int  bunchTime)

Definition at line 164 of file fileopstest.cpp.

164  {
165  if (bunchTime > 0) {
166  auto now = std::chrono::system_clock::now();
167  auto seconds = std::chrono::duration_cast<std::chrono::seconds>(now.time_since_epoch()).count();
168  auto divresult = std::div(seconds, bunchTime);
169  auto then = std::chrono::system_clock::time_point(std::chrono::seconds(divresult.quot * bunchTime + bunchTime));
170  std::this_thread::sleep_until(then);
171  }
172 }

References bunchTime.

Referenced by consumer(), and creator().

Here is the caller graph for this function:

◆ collectStats()

void collectStats ( buffqueue results)

Definition at line 460 of file fileopstest.cpp.

460  {
461  unsigned long n(0);
462  statCollection<double> writeSpeed("write speed ", "MiB/s");
463  statCollection<double> readSpeed("read speed ", "MiB/s");
464  std::vector<statCollectionBase<testFile>*> stats;
465  stats.emplace_back(statCollectionFactory(&testFile::deltaTCreate, "File creation"));
466  stats.emplace_back(statCollectionFactory(&testFile::deltaTWrite, "Write to file"));
467  stats.emplace_back(statCollectionFactory(&testFile::deltaTClose1, "Close file 1 "));
468  stats.emplace_back(statCollectionFactory(&testFile::deltaTOpen, "Open for read"));
469  stats.emplace_back(statCollectionFactory(&testFile::deltaTRead, "Read fr. file"));
470  stats.emplace_back(statCollectionFactory(&testFile::deltaTClose2, "Close file 2 "));
471  stats.emplace_back(statCollectionFactory(&testFile::deltaTUnlink, "Unlink file "));
472  stats.emplace_back(statCollectionFactory(&testFile::deltaTQueue, "Wait wr/rd "));
473  auto fileSize = statCollectionFactory(&testFile::bytes, "file size ", "Bytes");
474  stats.emplace_back(fileSize);
475 
476  timeSpan tWrite;
477  timeSpan tRead;
478  timeSpan tCreations;
479  timeSpan tOpens;
480 
481 
482  std::map<size_t, unsigned int> sizeHisto;
483 
484  while (true) {
485  auto f = results->dequeue();
486 
487  if (f == nullptr) {
488  break;
489  }
490 
491  n++;
492  tWrite.update(f->tCreate, f->tClose1);
493  tRead.update(f->tOpen, f->tClose2);
494  tCreations.update(f->tCreate);
495  tOpens.update(f->tOpen);
496  {
497  unsigned int i = 1;
498 
499  while (i < f->bytes) {
500  i <<= 1;
501  }
502 
503  ++sizeHisto[i];
504  }
505 
506  for (auto stat : stats) {
507  stat->add(f);
508  }
509 
510  writeSpeed.addValue(f->bytes / (1024 * 1024 *
511  std::chrono::duration_cast<std::chrono::duration<double>>(f->deltaTWrite).count()));
512  readSpeed.addValue(f->bytes / (1024 * 1024 *
513  std::chrono::duration_cast<std::chrono::duration<double>>(f->deltaTRead).count()));
514  /*
515  std::cout << f->name
516  << " cr: " << std::chrono::duration_cast<std::chrono::nanoseconds>(f->deltaTCreate).count()
517  << " wr: " << std::chrono::duration_cast<std::chrono::nanoseconds>(f->deltaTWrite).count()
518  << " cl: " << std::chrono::duration_cast<std::chrono::nanoseconds>(f->deltaTClose1).count()
519  << " op: " << std::chrono::duration_cast<std::chrono::nanoseconds>(f->deltaTOpen).count()
520  << " rd: " << std::chrono::duration_cast<std::chrono::nanoseconds>(f->deltaTRead).count()
521  << " cl: " << std::chrono::duration_cast<std::chrono::nanoseconds>(f->deltaTClose2).count()
522  << " rm: " << std::chrono::duration_cast<std::chrono::nanoseconds>(f->deltaTUnlink).count()
523  << " qu: " << std::chrono::duration_cast<std::chrono::nanoseconds>(f->deltaTQueue).count()
524  << "\n";
525  */
526  }
527 
528  std::cout << "file sizes in bytes:\n";
529  size_t from = 0;
530 
531  for (auto bin : sizeHisto) {
532  std::cout << std::setw(10) << std::right << from << " to " << bin.first << "\t" << bin.second << "\n";
533  from = bin.first;
534  }
535 
536  stats.push_back(&writeSpeed);
537  stats.push_back(&readSpeed);
538 
539  for (auto stat : stats) {
540  stat->print(n);
541  }
542 
543  auto deltaTWrite = tWrite.span().count();
544  std::cout << "total time spent writing: "
545  << deltaTWrite
546  << "s, with "
547  << fileSize->sum / (1024 * 1024 * deltaTWrite)
548  << "MB/s \n";
549  auto deltaTRead = tRead.span().count();
550  std::cout << "total time spent reading: "
551  << deltaTRead
552  << "s, with "
553  << fileSize->sum / (1024 * 1024 * deltaTRead)
554  << "MB/s \n";
555  auto deltaTCreate = tCreations.span().count();
556  auto deltaTOpen = tOpens.span().count();
557  std::cout << n << " files created in " << deltaTCreate << "s: " << n / deltaTCreate << "Creations/s\n";
558  std::cout << n << " files opene in " << deltaTOpen << "s: " << n / deltaTOpen << "Opens/s\n";
559 }

References statCollection< T, TC >::addValue(), testFile::bytes, testFile::deltaTClose1, testFile::deltaTClose2, testFile::deltaTCreate, testFile::deltaTOpen, testFile::deltaTQueue, testFile::deltaTRead, testFile::deltaTUnlink, testFile::deltaTWrite, buffqueue::dequeue(), f(), timeSpan::span(), statCollectionFactory(), and timeSpan::update().

Referenced by main().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ consumer()

void consumer ( buffqueue queue,
buffqueue results,
unsigned long  blockSize 
)

Definition at line 317 of file fileopstest.cpp.

317  {
318  buffstuff buffer(blockSize);
319  std::list<std::chrono::system_clock::time_point> startTimes;
320 
322 
323  while (true) {
324  auto f = queue->dequeue();
325 
326  if (f == nullptr) {
327  break;
328  }
329 
331  startTimes.emplace_back(std::chrono::system_clock::now());
332  readAndDeleteFile(f, buffer);
333  results->enqueue(f);
334  }
335 
336  consumptionRecords.record(startTimes.begin(), startTimes.end());
337 }

References bunch_wait(), bunchStart, bunchTime, consumptionRecords, buffqueue::dequeue(), buffqueue::enqueue(), f(), readAndDeleteFile(), and timeRecords::record().

Referenced by main().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ createFile()

testFile* createFile ( const char *  workdir,
unsigned int  index,
buffstuff buffer,
size_t  size,
std::chrono::system_clock::time_point &  stat_time 
)

Definition at line 104 of file fileopstest.cpp.

104  {
105  testFile* f = new testFile();
106  std::ostringstream name;
107  name << workdir
108  << "/testfile_"
109  << std::this_thread::get_id()
110  << "_"
111  << index;
112  f->name = name.str();
113  f->bytes = size;
114  f->tCreate = clock_type::now();
115  auto fd = throwcall::badval(open(f->name.c_str(), O_CREAT | O_WRONLY, 0755), -1,
116  "can't open ", f->name);
117  auto now = clock_type::now();
118  f->deltaTCreate = now - f->tCreate;
119  {
120  struct stat newstat;
121  throwcall::good0(fstat(fd, &newstat), "can't stat file");
122  stat_time = std::chrono::time_point<std::chrono::system_clock>(std::chrono::nanoseconds(newstat.st_mtim.tv_sec * 1000000000 + newstat.st_mtim.tv_nsec));
123  }
124  auto before = clock_type::now();
125  throwcall::badval(write(fd, buffer.writeBuffer.data(), f->bytes), -1,
126  "write ", f->name);
127  now = clock_type::now();
128  f->deltaTWrite = now - before;
129  before = now;
130  throwcall::good0(close(fd), "can't close ", f->name);
131  f->tClose1 = clock_type::now();
132  f->deltaTClose1 = f->tClose1 - before;
133  return f;
134 }

References throwcall::badval(), f(), throwcall::good0(), and buffstuff::writeBuffer.

Referenced by creator().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ creator()

void creator ( buffqueue queue,
const char *  workdir,
unsigned int  repeat,
unsigned long  blockSize,
unsigned int  maxQueueDepth 
)

Definition at line 290 of file fileopstest.cpp.

290  {
291  buffstuff buffer(blockSize);
292  fileSizeGenerator generator(distribution, parameter1, parameter2, blockSize);
293 
294  std::list<std::chrono::system_clock::time_point> startTimes;
295  std::list<std::chrono::system_clock::time_point> statTimes;
296 
298 
299  for (decltype(repeat) index = 0; index < repeat; index++) {
300  while (queue->size() > maxQueueDepth) {
301  std::this_thread::yield();
302  }
303 
304  auto size = generator.getNumber();
306  startTimes.emplace_back(std::chrono::system_clock::now());
307  std::chrono::system_clock::time_point stat_time;
308  auto f = createFile(workdir, index, buffer, size, stat_time);
309  statTimes.emplace_back(stat_time);
310  queue->enqueue(f);
311  }
312 
313  creationRecords.record(startTimes.begin(), startTimes.end());
314  mtimeRecords.record(statTimes.begin(), statTimes.end());
315 }

References bunch_wait(), bunchStart, bunchTime, createFile(), creationRecords, distribution, buffqueue::enqueue(), f(), fileSizeGenerator::getNumber(), mtimeRecords, parameter1, parameter2, timeRecords::record(), and buffqueue::size().

Referenced by main().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ main()

int main ( int  argc,
const char *  argv[] 
)

Definition at line 562 of file fileopstest.cpp.

562  {
563  options::parser parser("fileopstest", "", {});
564  options::single<unsigned>nCreators('w', "nCreators", "number of creator threads", sysconf(_SC_NPROCESSORS_ONLN));
565  options::single<unsigned>nConsumers('r', "nConsumers", "number of consumer threads", sysconf(_SC_NPROCESSORS_ONLN));
566  options::single<unsigned>nRepeat('c', "nRepeat", "number of files per thread", 1);
567  options::single<unsigned>maxQueueDepth('q', "maxQueueDepth", "maximum depth of file queue", 100);
568  options::positional<options::single<std::string>>workdir(10, "workdir", "directory to work in");
569  options::single<size_t>blockSize('b', "blockSize", "block size: 0 fs native block size", 0);
570  options::single<int>consumerDelay('D', "consumerDelay", "consumer delay", 0);
571 
572  parser.fParse(argc, argv);
573 
574  if (blockSize == 0) {
575  struct statvfs statbuf;
576  throwcall::good0(statvfs(workdir.c_str(), &statbuf), "can't stat workdir fs");
577  size_t& bla = blockSize;
578  bla = statbuf.f_bsize;
579  }
580 
581  std::cout << "Will create " << nRepeat << " files in " << nCreators << " threads with " << blockSize << std::hex << " (0x" << blockSize << ", " << std::dec << blockSize / 1024 << "k, " << blockSize / (1024 * 1024) << "M) blocks," << std::endl;
582  std::cout << "Consuming them with " << nConsumers << " threads, the queue should not grow above " << maxQueueDepth << std::endl;
583 
584  buffqueue queue;
585  buffqueue results;
586  std::vector<std::thread> creators;
587  std::vector<std::thread> consumers;
588 
589  for (unsigned index = 0; index < nCreators; index++) {
590  creators.emplace_back(std::thread(creator, &queue, workdir.c_str(), nRepeat, blockSize, maxQueueDepth));
591  }
592 
593  std::this_thread::sleep_for(std::chrono::seconds(consumerDelay.fGetValue()));
594 
595  for (unsigned index = 0; index < nConsumers; index++) {
596  consumers.emplace_back(std::thread(consumer, &queue, &results, blockSize));
597  }
598 
599  auto collector = std::thread(collectStats, &results);
600 
601  for (auto & tr : creators) {
602  tr.join();
603  }
604 
605  queue.endOfInput();
606 
607  for (auto & tr : consumers) {
608  tr.join();
609  }
610 
611  results.endOfInput();
612  collector.join();
613  std::cout << "creation times:\n";
614  creationRecords.print("file creation time: ");
615  std::cout << "\nstat times:\n";
616  mtimeRecords.print("file stat time: ");
617  std::cout << "\nconsumption times:\n";
618  consumptionRecords.print("file consumption time: ");
619  return 0;
620 }

References collectStats(), consumer(), consumptionRecords, creationRecords, creator(), buffqueue::endOfInput(), options::single< T >::fGetValue(), throwcall::good0(), mtimeRecords, and timeRecords::print().

Here is the call graph for this function:

◆ readAndDeleteFile()

void readAndDeleteFile ( testFile f,
buffstuff buffer 
)

Definition at line 135 of file fileopstest.cpp.

135  {
136  f->tOpen = clock_type::now();
137  auto fd = throwcall::badval(open(f->name.c_str(), O_RDONLY), -1,
138  "can't open fotr read", f->name);
139  auto now = clock_type::now();
140  f->deltaTOpen = now - f->tOpen;
141  f->deltaTQueue = f->tOpen - f->tClose1;
142  auto before = now;
143  throwcall::badval(read(fd, buffer.readBuffer.data(), f->bytes), -1,
144  "can't read from ", f->name);
145  now = clock_type::now();
146  f->deltaTRead = now - before;
147 
148  if (memcmp(buffer.writeBuffer.data(), buffer.readBuffer.data(), f->bytes) != 0) {
149  throw std::runtime_error("values in file " + f->name);
150  }
151 
152  before = clock_type::now();
153  throwcall::good0(close(fd), "can't close after reading ", f->name);
154  now = clock_type::now();
155  f->tClose2 = now;
156  f->deltaTClose2 = now - before;
157  before = now;
158  throwcall::good0(unlink(f->name.c_str()), "can't unlink ", f->name);
159  now = clock_type::now();
160  f->deltaTUnlink = now - before;
161 }

References throwcall::badval(), f(), throwcall::good0(), buffstuff::readBuffer, and buffstuff::writeBuffer.

Referenced by consumer().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ statCollectionFactory()

template<typename T , typename TC , class ... Types>
statCollection<T, TC>* statCollectionFactory ( T TC::*  member,
Types...  args 
)

Definition at line 425 of file fileopstest.cpp.

425  {
426  return new typename statCollection<T, TC>::statCollection(member, args...);
427 }

References statCollection< T, TC >::statCollection().

Referenced by collectStats().

Here is the call graph for this function:
Here is the caller graph for this function:

Variable Documentation

◆ bunchStart

options::single<int> bunchStart( 's', "startTime", "bunching time for starts (seconds)", 0)
static

Referenced by consumer(), and creator().

◆ bunchTime

options::single<int> bunchTime( 'B', "bunchTime", "bunching time for requests (seconds)", 0)
static

Referenced by bunch_wait(), consumer(), and creator().

◆ consumptionRecords

timeRecords consumptionRecords
static

Definition at line 205 of file fileopstest.cpp.

Referenced by consumer(), and main().

◆ creationRecords

timeRecords creationRecords
static

Definition at line 203 of file fileopstest.cpp.

Referenced by creator(), and main().

◆ distribution

options::single<std::string> distribution( 'd', "distribution", "distribution type", "constant", {"constant", "uniform", "geometric", "gauss"})
static

◆ mtimeRecords

timeRecords mtimeRecords
static

Definition at line 204 of file fileopstest.cpp.

Referenced by creator(), and main().

◆ parameter1

options::single<size_t> parameter1('\0', "parameter1", "1st distribution paramater, avg/min file size", 1024)
static

Referenced by creator().

◆ parameter2

options::single<size_t> parameter2('\0', "parameter2", "2nd distribution paramater, sigma/max file size", 2048)
static

Referenced by creator().

bunchStart
static options::single< int > bunchStart('s', "startTime", "bunching time for starts (seconds)", 0)
options::parser
class that contains the parser, i.e. does that option handling
Definition: Options.h:363
testFile
Definition: fileopstest.cpp:33
options::single< unsigned >
copyRequest::backupModeType::before
@ before
testFile::deltaTClose2
clock_type::duration deltaTClose2
Definition: fileopstest.cpp:41
testFile::deltaTCreate
clock_type::duration deltaTCreate
Definition: fileopstest.cpp:36
readAndDeleteFile
void readAndDeleteFile(testFile *f, buffstuff &buffer)
Definition: fileopstest.cpp:135
buffqueue
Definition: fileopstest.cpp:51
testFile::deltaTUnlink
clock_type::duration deltaTUnlink
Definition: fileopstest.cpp:42
throwcall::badval
T badval(T call, t badvalue, const Args &... args)
template function to wrap system calls that return a special bad value on failure
Definition: throwcall.h:54
testFile::deltaTWrite
clock_type::duration deltaTWrite
Definition: fileopstest.cpp:37
testFile::bytes
size_t bytes
Definition: fileopstest.cpp:48
buffstuff::readBuffer
decltype(writeBuffer) readBuffer
Definition: fileopstest.cpp:100
statCollection
Definition: fileopstest.cpp:355
timeRecords::print
void print(const std::string &prefix)
Definition: fileopstest.cpp:191
testFile::deltaTQueue
clock_type::duration deltaTQueue
Definition: fileopstest.cpp:43
fileSizeGenerator
Definition: fileopstest.cpp:208
mtimeRecords
static timeRecords mtimeRecords
Definition: fileopstest.cpp:204
testFile::deltaTRead
clock_type::duration deltaTRead
Definition: fileopstest.cpp:40
buffstuff
Definition: fileopstest.cpp:89
bunch_wait
void bunch_wait(long int bunchTime)
Definition: fileopstest.cpp:164
parameter2
static options::single< size_t > parameter2('\0', "parameter2", "2nd distribution paramater, sigma/max file size", 2048)
timeSpan::update
void update(clock_type::time_point aStart, clock_type::time_point aStop)
Definition: fileopstest.cpp:436
timeRecords::record
void record(InputIt begin, InputIt end)
Definition: fileopstest.cpp:182
testFile::deltaTOpen
clock_type::duration deltaTOpen
Definition: fileopstest.cpp:39
bunchTime
static options::single< int > bunchTime('B', "bunchTime", "bunching time for requests (seconds)", 0)
timeSpan
Definition: fileopstest.cpp:429
statCollectionFactory
statCollection< T, TC > * statCollectionFactory(T TC::*member, Types...args)
Definition: fileopstest.cpp:425
buffqueue::enqueue
void enqueue(testFile *f)
Definition: fileopstest.cpp:59
buffstuff::writeBuffer
std::vector< int > writeBuffer
Definition: fileopstest.cpp:99
consumer
void consumer(buffqueue *queue, buffqueue *results, unsigned long blockSize)
Definition: fileopstest.cpp:317
collectStats
void collectStats(buffqueue *results)
Definition: fileopstest.cpp:460
parameter1
static options::single< size_t > parameter1('\0', "parameter1", "1st distribution paramater, avg/min file size", 1024)
creationRecords
static timeRecords creationRecords
Definition: fileopstest.cpp:203
createFile
testFile * createFile(const char *workdir, unsigned int index, buffstuff &buffer, size_t size, std::chrono::system_clock::time_point &stat_time)
Definition: fileopstest.cpp:104
creator
void creator(buffqueue *queue, const char *workdir, unsigned int repeat, unsigned long blockSize, unsigned int maxQueueDepth)
Definition: fileopstest.cpp:290
options::positional
Definition: Options.h:876
buffqueue::size
decltype(queue.size()) size()
Definition: fileopstest.cpp:79
testFile::deltaTClose1
clock_type::duration deltaTClose1
Definition: fileopstest.cpp:38
consumptionRecords
static timeRecords consumptionRecords
Definition: fileopstest.cpp:205
throwcall::good0
void good0(T call, const Args &... args)
template function to wrap system calls that return 0 on success
Definition: throwcall.h:40
f
int f(int a, int line)
Definition: cctest.cpp:4
distribution
static options::single< std::string > distribution('d', "distribution", "distribution type", "constant", {"constant", "uniform", "geometric", "gauss"})
buffqueue::endOfInput
void endOfInput()
Definition: fileopstest.cpp:82
timeSpan::span
std::chrono::duration< double > span()
Definition: fileopstest.cpp:455
statCollection::statCollection
statCollection(decltype(sourceMember) aSourceMember, Types ... args)
Definition: fileopstest.cpp:376
buffqueue::dequeue
testFile * dequeue()
Definition: fileopstest.cpp:64