16 #include "sio/definitions.h"
17 #include "sio/buffer.h"
18 #include <sio/exception.h>
20 #include <sio/compression/zlib.h>
32 _rawBuffer( std::make_shared<sio::buffer>(1*sio::mbyte) ),
33 _compBuffer( std::make_shared<sio::buffer>(2*sio::mbyte) ),
34 _eventHandlerMgr( std::make_shared<SIO::SIOHandlerMgr>() ),
35 _readEventMap( lcReaderFlag &
LCReader::directAccess ),
36 _lazyUnpack( lcReaderFlag &
LCReader::lazyUnpack ),
37 _raMgr(std::make_shared<SIO::LCIORandomAccessMgr>()) {
49 SIO_THROW( sio::error_code::not_open,
"Couldn't open input stream '" + filename +
"'" ) ;
63 if( filenames.
empty() ) {
64 throw IO::IOException(
"[LCReader::open()] Provided file list is empty") ;
66 struct stat fileinfo ;
69 for(
unsigned int i=0 ; i < filenames.
size(); i++) {
70 if ( ::stat( filenames[i].c_str(), &fileinfo ) != 0 ) {
71 missing_files += filenames[i] ;
72 missing_files +=
" " ;
76 if( missing_files.size() != 0 ) {
88 auto validator = [&](
const sio::record_info &recinfo ) {
92 auto processor = [&](
const sio::record_info &recinfo,
const sio::buffer_span& recdata ) {
93 const bool compressed = sio::api::is_compressed( recinfo._options ) ;
95 _compBuffer->resize( recinfo._uncompressed_length ) ;
96 sio::zlib_compression compressor ;
99 auto rundata = compressed ?
_compBuffer->span() : recdata ;
100 auto runheader = std::make_unique<IOIMPL::LCRunHeaderIOImpl>() ;
102 runheader->setReadOnly( accessMode == EVENT::LCIO::READ_ONLY ) ;
104 runhdr = std::move(runheader) ;
110 catch( sio::exception &e ) {
112 if( e.code() == sio::error_code::eof ) {
119 catch( sio::exception &e2 ) {
120 if( e2.code() == sio::error_code::eof ) {
123 SIO_RETHROW( e2, e2.code(),
"Couldn't read next run header!" ) ;
128 SIO_RETHROW( e, e.code(),
"Couldn't read next run header!" ) ;
138 auto validator = [&](
const sio::record_info &recinfo ) {
141 auto processor = [&](
const sio::record_info &recinfo,
const sio::buffer_span& recdata ) {
142 const bool compressed = sio::api::is_compressed( recinfo._options ) ;
146 _compBuffer->resize( recinfo._uncompressed_length ) ;
147 sio::zlib_compression compressor ;
150 auto data = uncomp ?
_compBuffer->span() : recdata ;
154 event.reset( lazyEvent ) ;
157 event = std::make_unique<IOIMPL::LCEventIOImpl>() ;
163 if(
nullptr ==
event ) {
164 throw IO::IOException(
"SIOReader::readNextEvent: Event record before an EventHeader record ..." ) ;
183 catch( sio::exception &e ) {
185 if( e.code() == sio::error_code::eof ) {
192 catch( sio::exception &e2 ) {
193 if( e2.code() == sio::error_code::eof ) {
196 SIO_RETHROW( e2, e2.code(),
"Couldn't read next event!" ) ;
201 SIO_RETHROW( e, e.code(),
"Couldn't read next event!" ) ;
203 if(
nullptr !=
event ) {
204 event->setAccessMode( accessMode ) ;
220 return _raMgr->getEventMap()->getNumberOfEventRecords() ;
231 return _raMgr->getEventMap()->getNumberOfRunRecords() ;
239 auto map =
_raMgr->getEventMap() ;
240 auto it = map->begin() ;
242 for(
int i=0 ; i <nRun ; ++i, ++it) {
243 runs[i] = it->first.RunNum ;
244 assert( it->first.EvtNum == -1 ) ;
253 events.
resize( 2 * nEvt ) ;
254 auto map =
_raMgr->getEventMap() ;
255 auto it = map->begin() ;
259 for(
int i=0 ; i < nEvt ; ++i , ++it ) {
260 events[ 2*i ] = it->first.RunNum ;
261 events[ 2*i + 1 ] = it->first.EvtNum ;
262 assert( it->first.EvtNum != -1 ) ;
278 int eventsSkipped = 0 ;
281 sio::api::skip_records(
_stream, [&] (
const sio::record_info &recinfo ) {
285 return ( eventsSkipped < n ) ;
288 sio::api::skip_n_records(
_stream, 1 ) ;
290 catch( sio::exception &e ) {
291 if( e.code() != sio::error_code::eof ) {
292 SIO_RETHROW( e, e.code(),
"SIOReader::skipNEvents: Couldn't skip events" ) ;
300 if( _readEventMap ) {
303 _stream.seekg( pos ) ;
304 if( not _stream.good() ) {
305 throw IO::IOException(
"[SIOReader::readRunHeader()] Can't seek stream to requested position" ) ;
307 return readNextRunHeader( accessMode ) ;
314 std::cout <<
" WARNING : LCReader::readRunHeader(run) called but not in direct access Mode - " <<
std::endl
315 <<
" Too avoid this WARNING create the LCReader with: " <<
std::endl
316 <<
" LCFactory::getInstance()->createLCReader( IO::LCReader::directAccess ) ; " <<
std::endl ;
324 if( _readEventMap ) {
327 _stream.seekg( pos ) ;
328 if( not _stream.good() ) {
329 throw IO::IOException(
"[LCReader::readEvent()] Can't seek stream to requested position" ) ;
331 return readNextEvent( accessMode ) ;
338 std::cout <<
" WARNING : LCReader::readEvent(run,evt) called but not in direct access Mode - " <<
std::endl
339 <<
" use fast skip mechanism instead ..." <<
std::endl
340 <<
" Too avoid this WARNING create the LCReader with: " <<
std::endl
341 <<
" LCFactory::getInstance()->createLCReader( IO::LCReader::directAccess ) ; " <<
std::endl ;
343 bool evtFound = false ;
347 auto validator = [&](
const sio::record_info &recinfo ) {
350 auto processor = [&](
const sio::record_info &recinfo,
const sio::buffer_span& recdata ) {
351 const bool compressed = sio::api::is_compressed( recinfo._options ) ;
354 _compBuffer->resize( recinfo._uncompressed_length ) ;
355 sio::zlib_compression compressor ;
356 compressor.uncompress( recdata, *_compBuffer ) ;
358 auto data = uncomp ? _compBuffer->span() : recdata ;
362 event.reset( lazyEvent ) ;
365 event = std::make_unique<IOIMPL::LCEventIOImpl>() ;
371 if(
nullptr ==
event ) {
372 throw IO::IOException(
"SIOReader::readNextEvent: Event record before an EventHeader record ..." ) ;
375 if(
event->getEventNumber() == evtNumber &&
event->getRunNumber() == runNumber ) {
377 _bufferMaxSize =
std::max( _bufferMaxSize, _rawBuffer->size() ) ;
379 lazyEvent->
setBuffer( recinfo, std::move(*_rawBuffer) ) ;
381 *_rawBuffer = sio::buffer( _bufferMaxSize ) ;
396 sio::api::read_records( _stream, *_rawBuffer, validator, processor ) ;
398 catch( sio::exception &e ) {
399 if( e.code() != sio::error_code::eof ) {
400 SIO_RETHROW( e, e.code(),
"Exception caucht while searching for event!" ) ;
406 event->setAccessMode( EVENT::LCIO::READ_ONLY ) ;
407 if( not _lazyUnpack ) {
408 postProcessEvent(
event.get() ) ;
416 void LCReader::close() {
418 _readEventMap = false ;
434 int recordsRead = 0 ;
436 auto validator = [&] (
const sio::record_info &recinfo ) {
437 return ( records.
find( recinfo._name ) != records.
end() ) ;
439 auto processor = [&] (
const sio::record_info &recinfo,
const sio::buffer_span& recdata ) {
440 const bool compressed = sio::api::is_compressed( recinfo._options ) ;
443 _compBuffer->resize( recinfo._uncompressed_length ) ;
444 sio::zlib_compression compressor ;
445 compressor.uncompress( recdata, *_compBuffer ) ;
447 auto data = uncomp ? _compBuffer->span() : recdata ;
451 auto runheader = std::make_shared<IOIMPL::LCRunHeaderIOImpl>() ;
453 runheader->parameters().setValue(
"LCIOFileName" , _myFilenames[ _currentFileIndex ] ) ;
454 auto iter = listeners.
begin() ;
455 while( iter != listeners.
end() ){
456 runheader->setReadOnly(
false ) ;
457 (*iter)->processRunHeader( runheader ) ;
464 lazyEvent = std::make_shared<IOIMPL::LCEventLazyImpl>() ;
468 event = std::make_shared<IOIMPL::LCEventIOImpl>() ;
473 if(
nullptr ==
event ) {
474 throw IO::IOException(
"SIOReader::readStream: Event record before an EventHeader record ..." ) ;
478 _bufferMaxSize =
std::max( _bufferMaxSize, _rawBuffer->size() ) ;
480 lazyEvent->setBuffer( recinfo, std::move(*_rawBuffer) ) ;
482 *_rawBuffer = sio::buffer( _bufferMaxSize ) ;
487 auto iter = listeners.
begin() ;
488 while( iter != listeners.
end() ) {
489 if( not _lazyUnpack ) {
490 postProcessEvent(
event.get() ) ;
492 event->setAccessMode( EVENT::LCIO::UPDATE ) ;
493 (*iter)->processEvent(
event ) ;
499 return (recordsRead < maxRecord) ;
502 sio::api::read_records( _stream, *_rawBuffer, validator , processor ) ;
504 catch( sio::exception &e ) {
505 if( e.code() != sio::error_code::eof ) {
506 SIO_RETHROW( e, e.code(),
"SIOReader::readStream: Couldn't read stream" ) ;
510 bool nextFileAvailable = (!_myFilenames.empty() && _currentFileIndex+1 < _myFilenames.size()) ;
511 if( nextFileAvailable ) {
513 open( _myFilenames[ ++_currentFileIndex ] ) ;
516 readStream( listeners, maxRecord ) ;
520 readStream( listeners, maxRecord - recordsRead ) ;
529 message <<
"SIOReader::readStream(int maxRecord) : EOF before "
530 << maxRecord <<
" records read from file" <<
std::ends ;
540 readStream( listeners, 1 );
567 char* rColChar = getenv (
"LCIO_IGNORE_NULL_IN_SUBSET_COLLECTIONS") ;
568 if(
nullptr != rColChar ) {
572 for(
auto name = strVec->
begin() ; name != strVec->
end() ; name++) {
578 sts <<
" SIOReader::postProcessEvent: null pointer in subset collection "
579 << *name <<
" at position: " << i <<
std::endl ;
589 void LCReader::getEventMap() {
590 _raMgr->createEventMap( _stream ) ;
Base exception class for LCIO - all other exceptions extend this.
void setBuffer(const sio::record_info &recinfo, sio::buffer &&recordBuffer)
Set the event record buffer.
static void restoreParentDaughterRelations(EVENT::LCEvent *evt)
Restore the MCParticle parent <-> daughter relations.
std::size_t _bufferMaxSize
void open(const std::string &filename)
Opens a file for reading (read-only).
int getNumberOfEvents()
Return the number of events in the file - the file has to be open.
void setReadCollectionNames(const std::vector< std::string > &colnames)
Limit the collection names that are going to be read to the subset given in the vector - all other co...
bool _readEventMap
Whether to read the event map using the random access manager.
static constexpr const char * RunRecordName
std::ifstream _stream
The input file stream.
Helper struct that stores run and event positions in the file.
virtual const std::vector< std::string > * getCollectionNames() const =0
Returns the names of the collections in the event.
std::vector< std::string > _myFilenames
The list of files to open and read.
std::vector< std::string > _readCollectionNames
A restricted list of collections to read only.
static void readBlocks(const sio::buffer_span &buffer, EVENT::LCEvent *event, const SIOHandlerMgr &handlerMgr)
Read the blocks (collections) from the sio buffer.
long long long64
64 bit signed integer,e.g.to be used for timestamps
virtual LCObject * getElementAt(int index) const =0
Returns pointer to element at index - no range check, use getNumberOfEntries().
void skipNEvents(int n)
Skips the next n events from the current position.
bool _lazyUnpack
Whether to perform the lazy unpacking of event records.
static constexpr const int npos
LCReader(const LCReader &)=delete
void getRuns(EVENT::IntVec &runs)
Return the run numbers of the runs (run headers) in the file - the file has to be open...
virtual LCCollection * getCollection(const std::string &name) const =0
Returns the collection for the given name.
virtual bool isSubset() const =0
True if the collection holds a subset of objects from other collections.
IOException used for reading/writing errors.
Implementation of a LCReader for parallel processing use.
std::unique_ptr< EVENT::LCEvent > readNextEvent(int accessMode=EVENT::LCIO::READ_ONLY)
Reads the next event from the file.
std::shared_ptr< sio::buffer > _rawBuffer
The raw buffer for extracting bytes from the stream.
virtual int getNumberOfElements() const =0
Returns the number of elements in the collection.
std::shared_ptr< SIO::SIOHandlerMgr > _eventHandlerMgr
The collection block handler manager for events.
std::unique_ptr< EVENT::LCRunHeader > readNextRunHeader(int accessMode=EVENT::LCIO::READ_ONLY)
Reads the next run header from the file.
Implementation of the event class with a lazy record unpacking.
The main event interface.
The generic collection used in LCIO.
std::shared_ptr< SIO::LCIORandomAccessMgr > _raMgr
The random access manager for event/run random access in the file.
LCReaderListener class Interface for MT::LCReader::readStream() callbacks.
std::shared_ptr< sio::buffer > _compBuffer
The raw buffer for uncompression.
EndOfDataException for signaling the end of a data stream.
static constexpr const char * HeaderRecordName
void postProcessEvent(EVENT::LCEvent *evt)
unsigned int _currentFileIndex
The current file list index when opening multiple files.
int getNumberOfRuns()
Return the number of runs (run headers) in the file - the file has to be open.
static constexpr const char * EventRecordName
void getEvents(EVENT::IntVec &events)
Return the run and event numbers of the events in the file - the file has to be open.
void close()
Closes the output file/stream etc.