LCIO  02.17
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
lcio_parallel_processing.cc
Go to the documentation of this file.
1 #include "lcio.h"
2 
3 #include "MT/Types.h"
4 #include "MT/LCReader.h"
5 #include "MT/LCReaderListener.h"
6 #include "UTIL/LCTOOLS.h"
7 #include "IMPL/LCEventImpl.h"
8 
9 #include <cstdlib>
10 #include <mutex>
11 #include <future>
12 #include <functional>
13 #include <thread>
14 #include <unistd.h>
15 
17 
18 using namespace std ;
19 using namespace lcio ;
22 
24 #define SAFE_PRINT( message ) { std::lock_guard<std::mutex> lock(printMutex); std::cout << message << std::endl; }
25 #define SAFE_CODE( code ) { std::lock_guard<std::mutex> lock(printMutex); code; }
26 
27 class Scheduler final : public MT::LCReaderListener {
28 private:
29  typedef void task_return_type ;
32 
33 public:
34  Scheduler( unsigned int maxNTasks = std::thread::hardware_concurrency() ) :
35  _maxNTasks(maxNTasks) {
36  SAFE_PRINT( "Scheduler created with maxNTasks = " << _maxNTasks ) ;
37  }
38 
40  waitForAll();
41  }
42 
44  while ( not canStartNewTask() ) {
45  processFinishedTasks() ;
46  usleep(1000) ;
47  }
48  SAFE_PRINT( "Starting new task ..." ) ;
49  _futures.push_back( std::async( std::launch::async, [](LCEventPtr evt) {
50 
51  SAFE_PRINT( "Task processing run no "
52  << evt->getRunNumber()
53  << ", event no "
54  << evt->getEventNumber() ) ;
55 
56  // n usec to sleep within the task
57  int t = (rand() / (float) RAND_MAX) * 1000000 ;
58  usleep( t ) ;
59  }, event)) ;
60  }
61 
62  void waitForAll() {
63  SAFE_PRINT( "waitForAll()" ) ;
64  for ( unsigned int i=0 ; i<_futures.size() ; ++i ) {
65  _futures.at(i).get();
66  }
67  _futures.clear();
68  }
69 
70 private:
71 
73  auto iter = _futures.begin() ;
74  while ( iter != _futures.end() ) {
75  auto status = iter->wait_for( std::chrono::seconds(0) ) ;
76  if ( status == std::future_status::ready ) {
77  // process finished task and remove it for the pending task list
78  iter->get();
79  iter = _futures.erase( iter );
80  }
81  else {
82  iter++ ;
83  }
84  }
85  }
86 
87  bool canStartNewTask() const {
88  return ( _futures.size() < _maxNTasks ) ;
89  }
90 
91  void processEvent( LCEventPtr event ) override {
92  startTask( event ) ;
93  }
94 
95  void processRunHeader( LCRunHeaderPtr hdr ) override {
96  // Wait for all event task to finish
97  // and then process the new run header
98  waitForAll();
100  }
101 
102 private:
103  unsigned int _maxNTasks {} ;
104  future_list _futures {} ;
105 };
106 
109 int main(int argc, char** argv ){
110 
111  // read file names from command line (only argument)
112  if( argc < 2) {
113  cout << " start tasks to process events in parallel" << endl << endl;
114  cout << " usage: lcio_parallel_processing <input-file1> [[input-file2],...]" << endl ;
115  exit(1) ;
116  }
117 
118  int nFiles = argc-1 ;
119  std::vector<std::string> inputFiles ;
120 
121  for(int i=1 ; i <= nFiles ; i++) {
122  inputFiles.push_back( argv[i] ) ;
123  }
124 
125  // The LCReader to read events and runs
127  reader.open( inputFiles );
128 
129  unsigned int nThreads = std::thread::hardware_concurrency() ;
130  char *nthreadsenv = getenv( "LCIO_MAX_THREADS" ) ;
131  if ( nthreadsenv ) {
132  nThreads = atoi( nthreadsenv );
133  if ( nThreads <= 0 ) {
134  nThreads = 1;
135  }
136  }
137 
138  // The task scheduler
139  Scheduler scheduler (nThreads);
140  MT::LCReaderListenerList listeners ;
141  listeners.insert( &scheduler ) ;
142 
143  // Read stream and process events and run headers
144  while (1) {
145  try {
146  reader.readNextRecord( listeners );
147  }
148  catch ( const IO::EndOfDataException& ) {
149  break;
150  }
151  }
152 
153  return 0 ;
154 }
155 
156 
Scheduler(unsigned int maxNTasks=std::thread::hardware_concurrency())
T atoi(T...args)
std::future< task_return_type > future_type
static std::vector< std::string > FILEN
void readNextRecord(const LCReaderListenerList &listeners)
Reads the input stream and notifies listeners according to the object type found in the stream...
Definition: LCReader.cc:539
void open(const std::string &filename)
Opens a file for reading (read-only).
Definition: LCReader.cc:43
T rand(T...args)
std::mutex printMutex
T endl(T...args)
void processEvent(LCEventPtr event) override
process an event
LCEvent * event
Definition: lsh.cc:80
std::vector< future_type > future_list
T hardware_concurrency(T...args)
std::shared_ptr< EVENT::LCRunHeader > LCRunHeaderPtr
Definition: Types.h:20
void processRunHeader(LCRunHeaderPtr hdr) override
process a run header
T getenv(T...args)
static constexpr int directAccess
Bit for direct access.
Definition: LCReader.h:45
T push_back(T...args)
T exit(T...args)
void startTask(LCEventPtr event)
int main(int argc, char **argv)
Simple program that opens existing LCIO files and appends the records needed for direct access - if t...
Implementation of a LCReader for parallel processing use.
Definition: LCReader.h:37
T get(T...args)
T insert(T...args)
static void dumpRunHeader(const EVENT::LCRunHeader *run)
Simple function to dump the run header to the screen.
Definition: LCTOOLS.cc:1383
bool canStartNewTask() const
LCReaderListener class Interface for MT::LCReader::readStream() callbacks.
EndOfDataException for signaling the end of a data stream.
Definition: Exceptions.h:108
#define SAFE_PRINT(message)
T async(T...args)
std::shared_ptr< EVENT::LCEvent > LCEventPtr
Definition: Types.h:16