 |
ewmscp
..
|
(v0.19-24-g0617ca1 with changes)
#include <thread>
#include <mutex>
#include <condition_variable>
#include <string>
#include <vector>
#include <deque>
#include <iostream>
#include <iomanip>
#include <system_error>
#include <sstream>
#include <chrono>
#include <algorithm>
#include <random>
#include <map>
#include <cstdlib>
#include <list>
#include <Options.h>
#include "throwcall.h"
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/statvfs.h>
#include <fcntl.h>
#include <unistd.h>
#include <string.h>
Go to the source code of this file.
|
testFile * | createFile (const char *workdir, unsigned int index, buffstuff &buffer, size_t size, std::chrono::system_clock::time_point &stat_time) |
|
void | readAndDeleteFile (testFile *f, buffstuff &buffer) |
|
void | bunch_wait (long int bunchTime) |
|
void | creator (buffqueue *queue, const char *workdir, unsigned int repeat, unsigned long blockSize, unsigned int maxQueueDepth) |
|
void | consumer (buffqueue *queue, buffqueue *results, unsigned long blockSize) |
|
template<typename T , typename TC , class ... Types> |
statCollection< T, TC > * | statCollectionFactory (T TC::*member, Types...args) |
|
void | collectStats (buffqueue *results) |
|
int | main (int argc, const char *argv[]) |
|
|
static options::single< int > | bunchTime ('B', "bunchTime", "bunching time for requests (seconds)", 0) |
|
static options::single< int > | bunchStart ('s', "startTime", "bunching time for starts (seconds)", 0) |
|
static timeRecords | creationRecords |
|
static timeRecords | mtimeRecords |
|
static timeRecords | consumptionRecords |
|
static options::single< std::string > | distribution ('d', "distribution", "distribution type", "constant", {"constant", "uniform", "geometric", "gauss"}) |
|
static options::single< size_t > | parameter1 ('\0', "parameter1", "1st distribution paramater, avg/min file size", 1024) |
|
static options::single< size_t > | parameter2 ('\0', "parameter2", "2nd distribution paramater, sigma/max file size", 2048) |
|
◆ clock_type
◆ bunch_wait()
void bunch_wait |
( |
long int |
bunchTime | ) |
|
Definition at line 164 of file fileopstest.cpp.
166 auto now = std::chrono::system_clock::now();
167 auto seconds = std::chrono::duration_cast<std::chrono::seconds>(now.time_since_epoch()).count();
168 auto divresult = std::div(seconds,
bunchTime);
169 auto then = std::chrono::system_clock::time_point(std::chrono::seconds(divresult.quot *
bunchTime +
bunchTime));
170 std::this_thread::sleep_until(then);
References bunchTime.
Referenced by consumer(), and creator().
◆ collectStats()
Definition at line 460 of file fileopstest.cpp.
464 std::vector<statCollectionBase<testFile>*> stats;
474 stats.emplace_back(fileSize);
482 std::map<size_t, unsigned int> sizeHisto;
492 tWrite.
update(
f->tCreate,
f->tClose1);
499 while (i < f->bytes) {
506 for (
auto stat : stats) {
510 writeSpeed.addValue(
f->bytes / (1024 * 1024 *
511 std::chrono::duration_cast<std::chrono::duration<double>>(
f->deltaTWrite).count()));
512 readSpeed.addValue(
f->bytes / (1024 * 1024 *
513 std::chrono::duration_cast<std::chrono::duration<double>>(
f->deltaTRead).count()));
528 std::cout <<
"file sizes in bytes:\n";
531 for (
auto bin : sizeHisto) {
532 std::cout << std::setw(10) << std::right << from <<
" to " << bin.first <<
"\t" << bin.second <<
"\n";
536 stats.push_back(&writeSpeed);
537 stats.push_back(&readSpeed);
539 for (
auto stat : stats) {
543 auto deltaTWrite = tWrite.
span().count();
544 std::cout <<
"total time spent writing: "
547 << fileSize->sum / (1024 * 1024 * deltaTWrite)
549 auto deltaTRead = tRead.
span().count();
550 std::cout <<
"total time spent reading: "
553 << fileSize->sum / (1024 * 1024 * deltaTRead)
555 auto deltaTCreate = tCreations.
span().count();
556 auto deltaTOpen = tOpens.
span().count();
557 std::cout << n <<
" files created in " << deltaTCreate <<
"s: " << n / deltaTCreate <<
"Creations/s\n";
558 std::cout << n <<
" files opene in " << deltaTOpen <<
"s: " << n / deltaTOpen <<
"Opens/s\n";
References statCollection< T, TC >::addValue(), testFile::bytes, testFile::deltaTClose1, testFile::deltaTClose2, testFile::deltaTCreate, testFile::deltaTOpen, testFile::deltaTQueue, testFile::deltaTRead, testFile::deltaTUnlink, testFile::deltaTWrite, buffqueue::dequeue(), f(), timeSpan::span(), statCollectionFactory(), and timeSpan::update().
Referenced by main().
◆ consumer()
◆ createFile()
testFile* createFile |
( |
const char * |
workdir, |
|
|
unsigned int |
index, |
|
|
buffstuff & |
buffer, |
|
|
size_t |
size, |
|
|
std::chrono::system_clock::time_point & |
stat_time |
|
) |
| |
Definition at line 104 of file fileopstest.cpp.
106 std::ostringstream name;
109 << std::this_thread::get_id()
112 f->name = name.str();
114 f->tCreate = clock_type::now();
116 "can't open ",
f->name);
117 auto now = clock_type::now();
118 f->deltaTCreate = now -
f->tCreate;
122 stat_time = std::chrono::time_point<std::chrono::system_clock>(std::chrono::nanoseconds(newstat.st_mtim.tv_sec * 1000000000 + newstat.st_mtim.tv_nsec));
124 auto before = clock_type::now();
127 now = clock_type::now();
131 f->tClose1 = clock_type::now();
132 f->deltaTClose1 =
f->tClose1 -
before;
References throwcall::badval(), f(), throwcall::good0(), and buffstuff::writeBuffer.
Referenced by creator().
◆ creator()
void creator |
( |
buffqueue * |
queue, |
|
|
const char * |
workdir, |
|
|
unsigned int |
repeat, |
|
|
unsigned long |
blockSize, |
|
|
unsigned int |
maxQueueDepth |
|
) |
| |
Definition at line 290 of file fileopstest.cpp.
294 std::list<std::chrono::system_clock::time_point> startTimes;
295 std::list<std::chrono::system_clock::time_point> statTimes;
299 for (decltype(repeat) index = 0; index < repeat; index++) {
300 while (queue->
size() > maxQueueDepth) {
301 std::this_thread::yield();
304 auto size = generator.getNumber();
306 startTimes.emplace_back(std::chrono::system_clock::now());
307 std::chrono::system_clock::time_point stat_time;
308 auto f =
createFile(workdir, index, buffer, size, stat_time);
309 statTimes.emplace_back(stat_time);
References bunch_wait(), bunchStart, bunchTime, createFile(), creationRecords, distribution, buffqueue::enqueue(), f(), fileSizeGenerator::getNumber(), mtimeRecords, parameter1, parameter2, timeRecords::record(), and buffqueue::size().
Referenced by main().
◆ main()
int main |
( |
int |
argc, |
|
|
const char * |
argv[] |
|
) |
| |
Definition at line 562 of file fileopstest.cpp.
572 parser.fParse(argc, argv);
574 if (blockSize == 0) {
575 struct statvfs statbuf;
576 throwcall::good0(statvfs(workdir.c_str(), &statbuf),
"can't stat workdir fs");
577 size_t& bla = blockSize;
578 bla = statbuf.f_bsize;
581 std::cout <<
"Will create " << nRepeat <<
" files in " << nCreators <<
" threads with " << blockSize << std::hex <<
" (0x" << blockSize <<
", " << std::dec << blockSize / 1024 <<
"k, " << blockSize / (1024 * 1024) <<
"M) blocks," << std::endl;
582 std::cout <<
"Consuming them with " << nConsumers <<
" threads, the queue should not grow above " << maxQueueDepth << std::endl;
586 std::vector<std::thread> creators;
587 std::vector<std::thread> consumers;
589 for (
unsigned index = 0; index < nCreators; index++) {
590 creators.emplace_back(std::thread(
creator, &queue, workdir.c_str(), nRepeat, blockSize, maxQueueDepth));
593 std::this_thread::sleep_for(std::chrono::seconds(consumerDelay.fGetValue()));
595 for (
unsigned index = 0; index < nConsumers; index++) {
596 consumers.emplace_back(std::thread(
consumer, &queue, &results, blockSize));
601 for (
auto & tr : creators) {
607 for (
auto & tr : consumers) {
613 std::cout <<
"creation times:\n";
615 std::cout <<
"\nstat times:\n";
617 std::cout <<
"\nconsumption times:\n";
References collectStats(), consumer(), consumptionRecords, creationRecords, creator(), buffqueue::endOfInput(), options::single< T >::fGetValue(), throwcall::good0(), mtimeRecords, and timeRecords::print().
◆ readAndDeleteFile()
◆ statCollectionFactory()
template<typename T , typename TC , class ... Types>
statCollection<T, TC>* statCollectionFactory |
( |
T TC::* |
member, |
|
|
Types... |
args |
|
) |
| |
◆ bunchStart
options::single<int> bunchStart( 's', "startTime", "bunching time for starts (seconds)", 0) |
|
static |
◆ bunchTime
options::single<int> bunchTime( 'B', "bunchTime", "bunching time for requests (seconds)", 0) |
|
static |
◆ consumptionRecords
◆ creationRecords
◆ distribution
options::single<std::string> distribution( 'd', "distribution", "distribution type", "constant", {"constant", "uniform", "geometric", "gauss"}) |
|
static |
◆ mtimeRecords
◆ parameter1
◆ parameter2
static options::single< int > bunchStart('s', "startTime", "bunching time for starts (seconds)", 0)
class that contains the parser, i.e. does that option handling
clock_type::duration deltaTClose2
clock_type::duration deltaTCreate
void readAndDeleteFile(testFile *f, buffstuff &buffer)
clock_type::duration deltaTUnlink
T badval(T call, t badvalue, const Args &... args)
template function to wrap system calls that return a special bad value on failure
clock_type::duration deltaTWrite
decltype(writeBuffer) readBuffer
void print(const std::string &prefix)
clock_type::duration deltaTQueue
static timeRecords mtimeRecords
clock_type::duration deltaTRead
void bunch_wait(long int bunchTime)
static options::single< size_t > parameter2('\0', "parameter2", "2nd distribution paramater, sigma/max file size", 2048)
void update(clock_type::time_point aStart, clock_type::time_point aStop)
void record(InputIt begin, InputIt end)
clock_type::duration deltaTOpen
static options::single< int > bunchTime('B', "bunchTime", "bunching time for requests (seconds)", 0)
statCollection< T, TC > * statCollectionFactory(T TC::*member, Types...args)
void enqueue(testFile *f)
std::vector< int > writeBuffer
void consumer(buffqueue *queue, buffqueue *results, unsigned long blockSize)
void collectStats(buffqueue *results)
static options::single< size_t > parameter1('\0', "parameter1", "1st distribution paramater, avg/min file size", 1024)
static timeRecords creationRecords
testFile * createFile(const char *workdir, unsigned int index, buffstuff &buffer, size_t size, std::chrono::system_clock::time_point &stat_time)
void creator(buffqueue *queue, const char *workdir, unsigned int repeat, unsigned long blockSize, unsigned int maxQueueDepth)
decltype(queue.size()) size()
clock_type::duration deltaTClose1
static timeRecords consumptionRecords
void good0(T call, const Args &... args)
template function to wrap system calls that return 0 on success
static options::single< std::string > distribution('d', "distribution", "distribution type", "constant", {"constant", "uniform", "geometric", "gauss"})
std::chrono::duration< double > span()
statCollection(decltype(sourceMember) aSourceMember, Types ... args)