19 using namespace lcio ;
24 #define SAFE_PRINT( message ) { std::lock_guard<std::mutex> lock(printMutex); std::cout << message << std::endl; }
25 #define SAFE_CODE( code ) { std::lock_guard<std::mutex> lock(printMutex); code; }
35 _maxNTasks(maxNTasks) {
36 SAFE_PRINT(
"Scheduler created with maxNTasks = " << _maxNTasks ) ;
44 while ( not canStartNewTask() ) {
45 processFinishedTasks() ;
52 << evt->getRunNumber()
54 << evt->getEventNumber() ) ;
57 int t = (
rand() / (float) RAND_MAX) * 1000000 ;
64 for (
unsigned int i=0 ; i<_futures.size() ; ++i ) {
73 auto iter = _futures.begin() ;
74 while ( iter != _futures.end() ) {
76 if ( status == std::future_status::ready ) {
79 iter = _futures.erase( iter );
88 return ( _futures.size() < _maxNTasks ) ;
103 unsigned int _maxNTasks {} ;
109 int main(
int argc,
char** argv ){
113 cout <<
" start tasks to process events in parallel" <<
endl <<
endl;
114 cout <<
" usage: lcio_parallel_processing <input-file1> [[input-file2],...]" <<
endl ;
118 int nFiles = argc-1 ;
121 for(
int i=1 ; i <= nFiles ; i++) {
127 reader.
open( inputFiles );
130 char *nthreadsenv =
getenv(
"LCIO_MAX_THREADS" ) ;
132 nThreads =
atoi( nthreadsenv );
133 if ( nThreads <= 0 ) {
141 listeners.
insert( &scheduler ) ;
Scheduler(unsigned int maxNTasks=std::thread::hardware_concurrency())
std::future< task_return_type > future_type
static std::vector< std::string > FILEN
void readNextRecord(const LCReaderListenerList &listeners)
Reads the input stream and notifies listeners according to the object type found in the stream...
void open(const std::string &filename)
Opens a file for reading (read-only).
void processFinishedTasks()
void processEvent(LCEventPtr event) override
process an event
std::vector< future_type > future_list
T hardware_concurrency(T...args)
std::shared_ptr< EVENT::LCRunHeader > LCRunHeaderPtr
void processRunHeader(LCRunHeaderPtr hdr) override
process a run header
static constexpr int directAccess
Bit for direct access.
void startTask(LCEventPtr event)
int main(int argc, char **argv)
Simple program that opens existing LCIO files and appends the records needed for direct access - if t...
Implementation of a LCReader for parallel processing use.
bool canStartNewTask() const
LCReaderListener class Interface for MT::LCReader::readStream() callbacks.
EndOfDataException for signaling the end of a data stream.
#define SAFE_PRINT(message)
std::shared_ptr< EVENT::LCEvent > LCEventPtr