3 #include <condition_variable>
9 #include <system_error>
21 #include <sys/types.h>
23 #include <sys/statvfs.h>
55 std::condition_variable
cv;
67 while (
queue.empty()) {
95 for (
unsigned int i = 0; i <
writeBuffer.capacity(); i++) {
106 std::ostringstream name;
109 << std::this_thread::get_id()
112 f->name = name.str();
114 f->tCreate = clock_type::now();
116 "can't open ",
f->name);
117 auto now = clock_type::now();
118 f->deltaTCreate = now -
f->tCreate;
122 stat_time = std::chrono::time_point<std::chrono::system_clock>(std::chrono::nanoseconds(newstat.st_mtim.tv_sec * 1000000000 + newstat.st_mtim.tv_nsec));
124 auto before = clock_type::now();
127 now = clock_type::now();
128 f->deltaTWrite = now - before;
131 f->tClose1 = clock_type::now();
132 f->deltaTClose1 =
f->tClose1 - before;
136 f->tOpen = clock_type::now();
138 "can't open fotr read",
f->name);
139 auto now = clock_type::now();
140 f->deltaTOpen = now -
f->tOpen;
141 f->deltaTQueue =
f->tOpen -
f->tClose1;
144 "can't read from ",
f->name);
145 now = clock_type::now();
146 f->deltaTRead = now - before;
149 throw std::runtime_error(
"values in file " +
f->name);
152 before = clock_type::now();
154 now = clock_type::now();
156 f->deltaTClose2 = now - before;
159 now = clock_type::now();
160 f->deltaTUnlink = now - before;
166 auto now = std::chrono::system_clock::now();
167 auto seconds = std::chrono::duration_cast<std::chrono::seconds>(now.time_since_epoch()).count();
168 auto divresult = std::div(seconds,
bunchTime);
169 auto then = std::chrono::system_clock::time_point(std::chrono::seconds(divresult.quot *
bunchTime +
bunchTime));
170 std::this_thread::sleep_until(then);
178 typedef std::chrono::time_point<std::chrono::system_clock, std::chrono::milliseconds>
mapTimeType;
182 template <
typename InputIt>
void record(InputIt begin, InputIt end) {
183 for (
auto it = begin; it != end; ++it) {
187 void record(std::chrono::system_clock::time_point when) {
189 ++(
timeMap[std::chrono::time_point_cast<mapTimeType::duration>(when)]);
191 void print(
const std::string& prefix) {
192 for (
const auto& it :
timeMap) {
193 auto then = std::chrono::system_clock::to_time_t(it.first);
194 auto milliseconds = std::chrono::duration_cast<std::chrono::milliseconds>(it.first.time_since_epoch()).count() % 1000;
195 std::cout << prefix <<
198 <<
"." << std::setw(3) << std::setfill(
'0') << milliseconds <<
" " << it.second <<
"\n";
227 size_t size_parameter1,
228 size_t size_parameter2,
233 static std::random_device rd;
236 if (aDistribution ==
"constant") {
238 }
else if (aDistribution ==
"uniform") {
241 }
else if (aDistribution ==
"geometric") {
244 }
else if (aDistribution ==
"gauss") {
249 throw std::runtime_error(
"bad distribution parameter");
285 {
"constant",
"uniform",
"geometric",
"gauss"});
290 void creator(
buffqueue* queue,
const char *workdir,
unsigned int repeat,
unsigned long blockSize,
unsigned int maxQueueDepth) {
294 std::list<std::chrono::system_clock::time_point> startTimes;
295 std::list<std::chrono::system_clock::time_point> statTimes;
299 for (decltype(repeat) index = 0; index < repeat; index++) {
300 while (queue->
size() > maxQueueDepth) {
301 std::this_thread::yield();
306 startTimes.emplace_back(std::chrono::system_clock::now());
307 std::chrono::system_clock::time_point stat_time;
308 auto f =
createFile(workdir, index, buffer, size, stat_time);
309 statTimes.emplace_back(stat_time);
319 std::list<std::chrono::system_clock::time_point> startTimes;
331 startTimes.emplace_back(std::chrono::system_clock::now());
344 const std::string& aUnit =
"s") :
name(aName),
unit(aUnit) {};
345 virtual void print(
unsigned long n) = 0;
346 virtual void add(TC* ) {};
375 template < typename TT = T, typename TCC = TC, typename std::enable_if < std::is_arithmetic<TT>::value && ! std::is_void<TCC>::value >::type...,
class ... Types >
380 min(std::numeric_limits<T>::max()),
381 max(std::numeric_limits<T>::lowest()) {
383 template <typename TT = T, typename std::enable_if <std::is_arithmetic<TT>::value>::type...,
class ... Types>
387 min(std::numeric_limits<T>::max()),
388 max(std::numeric_limits<T>::lowest()) {
390 template < typename TT = T, typename std::enable_if < !std::is_arithmetic<TT>::value >::type...,
class ... Types >
398 template<
typename TT = T>
399 typename std::enable_if < !std::is_arithmetic<TT>::value >::type
401 std::cout << this->
getName() <<
" avg: " << std::scientific
402 << std::chrono::duration_cast<std::chrono::duration<double>>(
sum / n).count()
403 << this->
getUnit() <<
", min: "
404 << std::chrono::duration_cast<std::chrono::duration<double>>(
min).count()
405 << this->
getUnit() <<
", max: "
406 << std::chrono::duration_cast<std::chrono::duration<double>>(
max).count()
409 template<
typename TT = T>
410 typename std::enable_if <std::is_arithmetic<TT>::value>::type
412 std::cout << this->
getName() <<
" avg: "
414 << this->
getUnit() <<
", min: "
416 << this->
getUnit() <<
", max: "
420 void print(
unsigned long n)
override {
436 void update(clock_type::time_point aStart,
437 clock_type::time_point aStop) {
438 if (aStart <
start) {
446 void update(clock_type::time_point aTime) {
455 std::chrono::duration<double>
span() {
456 return std::chrono::duration_cast<std::chrono::duration<double>>(
stop -
start);
464 std::vector<statCollectionBase<testFile>*> stats;
474 stats.emplace_back(fileSize);
482 std::map<size_t, unsigned int> sizeHisto;
492 tWrite.
update(
f->tCreate,
f->tClose1);
499 while (i < f->bytes) {
506 for (
auto stat : stats) {
510 writeSpeed.
addValue(
f->bytes / (1024 * 1024 *
511 std::chrono::duration_cast<std::chrono::duration<double>>(
f->deltaTWrite).count()));
512 readSpeed.
addValue(
f->bytes / (1024 * 1024 *
513 std::chrono::duration_cast<std::chrono::duration<double>>(
f->deltaTRead).count()));
528 std::cout <<
"file sizes in bytes:\n";
531 for (
auto bin : sizeHisto) {
532 std::cout << std::setw(10) << std::right << from <<
" to " << bin.first <<
"\t" << bin.second <<
"\n";
536 stats.push_back(&writeSpeed);
537 stats.push_back(&readSpeed);
539 for (
auto stat : stats) {
543 auto deltaTWrite = tWrite.
span().count();
544 std::cout <<
"total time spent writing: "
547 << fileSize->sum / (1024 * 1024 * deltaTWrite)
549 auto deltaTRead = tRead.
span().count();
550 std::cout <<
"total time spent reading: "
553 << fileSize->sum / (1024 * 1024 * deltaTRead)
555 auto deltaTCreate = tCreations.
span().count();
556 auto deltaTOpen = tOpens.
span().count();
557 std::cout << n <<
" files created in " << deltaTCreate <<
"s: " << n / deltaTCreate <<
"Creations/s\n";
558 std::cout << n <<
" files opene in " << deltaTOpen <<
"s: " << n / deltaTOpen <<
"Opens/s\n";
562 int main(
int argc,
const char *argv[]) {
572 parser.fParse(argc, argv);
574 if (blockSize == 0) {
575 struct statvfs statbuf;
576 throwcall::good0(statvfs(workdir.c_str(), &statbuf),
"can't stat workdir fs");
577 size_t& bla = blockSize;
578 bla = statbuf.f_bsize;
581 std::cout <<
"Will create " << nRepeat <<
" files in " << nCreators <<
" threads with " << blockSize << std::hex <<
" (0x" << blockSize <<
", " << std::dec << blockSize / 1024 <<
"k, " << blockSize / (1024 * 1024) <<
"M) blocks," << std::endl;
582 std::cout <<
"Consuming them with " << nConsumers <<
" threads, the queue should not grow above " << maxQueueDepth << std::endl;
586 std::vector<std::thread> creators;
587 std::vector<std::thread> consumers;
589 for (
unsigned index = 0; index < nCreators; index++) {
590 creators.emplace_back(std::thread(
creator, &queue, workdir.c_str(), nRepeat, blockSize, maxQueueDepth));
593 std::this_thread::sleep_for(std::chrono::seconds(consumerDelay.
fGetValue()));
595 for (
unsigned index = 0; index < nConsumers; index++) {
596 consumers.emplace_back(std::thread(
consumer, &queue, &results, blockSize));
601 for (
auto & tr : creators) {
607 for (
auto & tr : consumers) {
613 std::cout <<
"creation times:\n";
615 std::cout <<
"\nstat times:\n";
617 std::cout <<
"\nconsumption times:\n";