LCIO  02.17
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
LCIORandomAccessMgr.cc
Go to the documentation of this file.
2 
3 #include "SIO/LCSIO.h"
5 #include "SIO/SIOIndexHandler.h"
6 #include "SIO/SIOEventHandler.h"
8 #include "IOIMPL/LCEventIOImpl.h"
10 #include "Exceptions.h"
11 
12 // -- std headers
13 #include <sstream>
14 #include <stdio.h>
15 #include <string.h>
16 
17 // -- sio headers
18 #include <sio/api.h>
19 #include <sio/compression/zlib.h>
20 
21 namespace SIO {
22 
24  _runEvtMap->clear() ;
25  _list.clear() ;
26  _fileRecord = nullptr ;
27  }
28 
29  //----------------------------------------------------------------------------
30 
32  auto ra = std::make_shared<LCIORandomAccess>() ;
33  ra->_minRunEvt = _runEvtMap->minRunEvent() ;
34  ra->_maxRunEvt = _runEvtMap->maxRunEvent() ;
35  ra->_nRunHeaders = _runEvtMap->getNumberOfRunRecords() ;
36  ra->_nEvents = _runEvtMap->getNumberOfEventRecords() ;
37  ra->_recordsAreInOrder = true ; // ???? how is this defined ????
38  ra->_indexLocation = 0 ;
39  ra->_prevLocation = 0 ;
40  ra->_nextLocation = 0 ;
41  ra->_firstRecordLocation = 0 ;
42  return ra ;
43  }
44 
45  //----------------------------------------------------------------------------
46 
48  if( nullptr == _fileRecord ) {
49  _fileRecord = std::make_shared<LCIORandomAccess>() ;
50  _fileRecord->_minRunEvt = RunEvent( 2147483647L, 2147483647L ) ; // 2**31-1 - largest 32 bit signed
51  _fileRecord->_maxRunEvt = 0 ;
52  _fileRecord->_nRunHeaders = 0 ;
53  _fileRecord->_nEvents = 0 ;
54  _fileRecord->_recordsAreInOrder = 1 ;
55  _fileRecord->_indexLocation = 0 ; // defines file record
56  _fileRecord->_prevLocation = 9223372036854775807LL ; // 2**63-1 - largest 64 bit signed
57  _fileRecord->_nextLocation = 0 ;
58  _fileRecord->_firstRecordLocation = 0 ;
59  }
60 
61  for( auto i = _list.begin() ; i != _list.end() ; ++i ) {
62  auto ra = *i ;
63  _fileRecord->_minRunEvt = ( ra->_minRunEvt < _fileRecord->_minRunEvt ? ra->_minRunEvt : _fileRecord->_minRunEvt ) ;
64  _fileRecord->_maxRunEvt = ( ra->_maxRunEvt > _fileRecord->_maxRunEvt ? ra->_maxRunEvt : _fileRecord->_maxRunEvt ) ;
65  _fileRecord->_nRunHeaders += ra->_nRunHeaders ;
66  _fileRecord->_nEvents += ra->_nEvents ;
67  _fileRecord->_recordsAreInOrder = ( _fileRecord->_recordsAreInOrder * ra->_recordsAreInOrder ) ; // should be 0 if any record is 0
68  _fileRecord->_indexLocation = 0 ; // defines file record
69  if( ra->_nextLocation > _fileRecord->_nextLocation ) {
70  _fileRecord->_nextLocation = ra->_nextLocation ;
71  }
72  if( 0 < ra->_prevLocation && ra->_prevLocation < _fileRecord->_prevLocation ) {
73  _fileRecord->_prevLocation = ra->_prevLocation ;
74  }
75  }
76  }
77 
78  //----------------------------------------------------------------------------
79 
81  _list.push_back( ra );
82  }
83 
84  //----------------------------------------------------------------------------
85 
86  bool LCIORandomAccessMgr::readLCIORandomAccessAt( sio::ifstream& stream , long64 pos) {
87  seekStream( stream, pos ) ;
88  return readLCIORandomAccess( stream ) ;
89  }
90 
91  //----------------------------------------------------------------------------
92 
93  bool LCIORandomAccessMgr::readLCIORandomAccess( sio::ifstream &stream ) {
94  // first extract the record from the stream (header + data)
95  sio::record_info recinfo ;
96  try {
97  sio::api::read_record( stream, recinfo, _rawBuffer ) ;
98  }
99  catch( sio::exception &e ) {
100  // RE: Need to clear stream state.
101  // Maybe the record is not there and we want to move ahead
102  stream.clear() ;
103  seekStream( stream, 0 ) ;
104  return false ;
105  }
106  // we got a record but it's not an LCIORandomAccess record...
107  if( recinfo._name != LCSIO::AccessRecordName ) {
108  return false ;
109  }
110  // setup the rando access block
111  sio::block_list blocks {} ;
112  auto raBlock = std::make_shared<SIORandomAccessHandler>() ;
113  blocks.push_back( raBlock ) ;
114  // get a valid buffer span for reading
115  auto recordData = _rawBuffer.span( recinfo._header_length, recinfo._data_length ) ;
116  // read the record blocks
117  sio::api::read_blocks( recordData, blocks ) ;
118  // get our random access object and add it to our list
119  addLCIORandomAccess( raBlock->randomAccess() ) ;
120  return true ;
121  }
122 
123  //----------------------------------------------------------------------------
124 
125  //----------------------------------------------------------------------------
126 
127  bool LCIORandomAccessMgr::readLCIOIndexAt( sio::ifstream &stream , long64 pos) {
128  seekStream( stream, pos ) ;
129  return readLCIOIndex( stream ) ;
130  }
131 
132  //----------------------------------------------------------------------------
133 
134  bool LCIORandomAccessMgr::readLCIOIndex( sio::ifstream &stream ) {
135  SIO_DEBUG( "LCIORandomAccessMgr::readLCIOIndex: Reading at " << stream.tellg() );
136  // first extract the record from the stream (header + data)
137  sio::record_info recinfo ;
138  try {
139  sio::api::read_record( stream, recinfo, _rawBuffer ) ;
140  }
141  catch( sio::exception &e ) {
142  // no way to extract a record !
143  // RE: Need to clear stream state.
144  // Maybe the record is not there and we want to move ahead
145  stream.clear() ;
146  seekStream( stream, 0 ) ;
147  return false ;
148  }
149  // we got a record but it's not an LCIOIndex record...
150  if( recinfo._name != LCSIO::IndexRecordName ) {
151  return false ;
152  }
153  // setup the rando access block
154  sio::block_list blocks {} ;
155  auto indexBlock = std::make_shared<SIOIndexHandler>() ;
156  indexBlock->setRunEventMap( _runEvtMap ) ;
157  blocks.push_back( indexBlock ) ;
158  const bool compressed = sio::api::is_compressed( recinfo._options ) ;
159  if( compressed ) {
160  sio::buffer compBuffer( recinfo._uncompressed_length ) ;
161  sio::zlib_compression compressor ;
162  compressor.uncompress( _rawBuffer.span( recinfo._header_length, recinfo._data_length ), compBuffer ) ;
163  // read the record blocks
164  sio::api::read_blocks( compBuffer.span(), blocks ) ;
165  }
166  else {
167  // read the record blocks
168  sio::api::read_blocks( _rawBuffer.span( recinfo._header_length, recinfo._data_length ), blocks ) ;
169  }
170  return true ;
171  }
172 
173  //----------------------------------------------------------------------------
174 
175  bool LCIORandomAccessMgr::initAppend( sio::ifstream & stream ) {
176  // check if the last record is LCIORandomAccess (the file record )
177  if( ! readLCIORandomAccessAt( stream , -LCSIO::RandomAccessSize ) ) {
178  recreateEventMap( stream ) ;
179  return false;
180  }
181  // store the file record separately
182  _fileRecord = _list.back() ;
183  _list.pop_back() ;
184  // now read first LCIORandomAccess record
185  readLCIORandomAccessAt( stream , _fileRecord->_nextLocation ) ; // start of last LCIORandomAccessRecord
186  return true ;
187  }
188 
189  //----------------------------------------------------------------------------
190 
191  bool LCIORandomAccessMgr::createEventMap( sio::ifstream & stream ) {
192  // check if the last record is LCIORandomAccess ( the file record )
193  if( ! readLCIORandomAccessAt( stream , -LCSIO::RandomAccessSize) ) {
194  return recreateEventMap( stream ) ;
195  }
196  // store the file record separately
197  _fileRecord = _list.back() ;
198  _list.pop_back() ;
199 
200  auto raPos = _fileRecord->_nextLocation ;
201  // start of last LCIORandomAccessRecord
202  readLCIORandomAccessAt( stream , raPos ) ;
203  auto ra = lastLCIORandomAccess() ;
204  // and then read all remaining LCIORandomAccess records
205  raPos = ra->getPrevLocation() ;
206  auto indexPos = ra->getIndexLocation() ;
207  readLCIOIndexAt( stream , indexPos ) ;
208 
209  while( raPos != 0 ) {
210  if( readLCIORandomAccessAt( stream , raPos) ) {
211  ra = lastLCIORandomAccess() ;
212  raPos = ra->getPrevLocation() ;
213  auto idxPos = ra->getIndexLocation() ;
214  readLCIOIndexAt( stream , idxPos ) ;
215  }
216  else {
217  throw IO::IOException( std::string( "[LCIORandomAccessMgr::ReadEventMap()] Could not read previous LCIORandomAccess record" ) ) ;
218  }
219  }
220  seekStream( stream, 0 ) ;// go to start of file
221  return true ;
222  }
223 
224  //----------------------------------------------------------------------------
225 
226  bool LCIORandomAccessMgr::recreateEventMap( sio::ifstream & stream ) {
227 
228  std::cout << " LCIORandomAccessMgr::getEventMap() recreating event map for direct access (old file)" << std::endl ;
229  // read the next record from the stream
230  if( not stream.is_open() ) {
231  throw IO::IOException( "LCIORandomAccessMgr::recreateEventMap: stream is not open" ) ;
232  }
233  // go to start of file
234  seekStream( stream, 0 ) ;
236  while( true ) {
237  sio::record_info recinfo {} ;
238  try {
239  sio::api::skip_records( stream, [&]( const sio::record_info &ri ){
240  return ( recordlist.find(ri._name) != recordlist.end() ) ;
241  }) ;
242  // we read something valid here. Read the record from the stream
243  sio::api::read_record( stream, recinfo, _rawBuffer ) ;
244  // Get the record data
245  auto recdata = _rawBuffer.span( recinfo._header_length, recinfo._data_length ) ;
246  // deal with zlib uncompression
247  const bool compressed = sio::api::is_compressed( recinfo._options ) ;
248  if( compressed ) {
249  sio::zlib_compression compressor ;
250  _compressBuffer.resize( recinfo._uncompressed_length ) ;
251  compressor.uncompress( recdata, _compressBuffer ) ;
252  }
253  // get the correct buffer depending of compression settings
254  auto databuf = compressed ? _compressBuffer.span() : recdata ;
255  // event case
256  if( recinfo._name == LCSIO::HeaderRecordName ) {
257  auto event = std::make_shared<IOIMPL::LCEventIOImpl>() ;
258  // pass a dummy list of read collection with only '__dummy__' inside.
259  // We only need the event and run numbers there, no need to settup all collections
260  SIOEventHeaderRecord::readBlocks( databuf, event.get(), {"__dummy__"} ) ;
261  _runEvtMap->add( RunEvent( event->getRunNumber() , event->getEventNumber() ) , static_cast<long64>(recinfo._file_start) ) ;
262  }
263  // run case
264  else if( recinfo._name == LCSIO::RunRecordName ) {
265  auto run = std::make_shared<IOIMPL::LCRunHeaderIOImpl>() ;
266  SIORunHeaderRecord::readBlocks( recdata, run.get() ) ;
267  _runEvtMap->add( RunEvent( run->getRunNumber() , -1 ) , static_cast<long64>(recinfo._file_start) ) ;
268  }
269  }
270  catch( sio::exception &e ) {
271  // reached end of file
272  if( e.code() == sio::error_code::eof ) {
273  // RE: clear EOF bit in stream. Normal exit in this case
274  stream.clear() ;
275  break ;
276  }
277  SIO_RETHROW( e, e.code(), "Couldn't recreate event map !" ) ;
278  }
279  } // while
280  // go to start of file
281  seekStream( stream, 0 ) ;
282  return true ;
283  }
284 
285  //----------------------------------------------------------------------------
286 
287  //----------------------------------------------------------------------------
288 
289  void LCIORandomAccessMgr::writeRandomAccessRecords( sio::ofstream & stream ) {
290  // nothing to write ?
291  if( _runEvtMap->empty() ) {
292  return ;
293  }
294  if( not stream.is_open() ) {
295  throw IO::IOException( "[LCIORandomAccessMgr::writeRandomAccessRecords] stream not opened" ) ;
296  }
297  sio::block_list blocks {} ;
298 
299  // 1) write the first index in the file
300  auto indexHandler = std::make_shared<SIOIndexHandler>() ;
301  indexHandler->setRunEventMap( _runEvtMap ) ;
302  blocks.push_back( indexHandler ) ;
303  // write in the buffer first, then the buffer to the file
304  auto recinfo = sio::api::write_record( LCSIO::IndexRecordName, _rawBuffer, blocks, 0 ) ;
305  sio::api::write_record( stream, _rawBuffer.span(), recinfo ) ;
306 
307  // 2) create the LCIORandomAccess object ( linked list of records )
308  auto ra = createFromEventMap() ;
309  ra->setIndexLocation( recinfo._file_start ) ;
310  long64 thisPos = stream.tellp() ;
311  ra->setFirstRecordLocation( thisPos ) ;
312  auto lRa = lastLCIORandomAccess() ;
313  SIO_DEBUG( "LCIORandomAccessMgr::writeRandomAccessRecords: lRa is " << lRa );
314  long64 prevPos = ( lRa ? lRa->getFirstRecordLocation() : 0 ) ;
315  ra->setPreviousLocation( prevPos ) ;
316  addLCIORandomAccess( ra ) ;
317 
318  // 3) Write the random access record
319  blocks.clear() ;
320  auto raHandler = std::make_shared<SIORandomAccessHandler>() ;
321  raHandler->setRandomAccess( lastLCIORandomAccess() ) ;
322  blocks.push_back( raHandler ) ;
323  // write in the buffer first, then the buffer to the file
324  recinfo = sio::api::write_record( LCSIO::AccessRecordName, _rawBuffer, blocks, 0 ) ;
325  sio::api::write_record( stream, _rawBuffer.span(), recinfo ) ;
326 
327  // 4) Create the file record
328  createFileRecord() ;
329  if( thisPos > _fileRecord->_nextLocation ) {
330  _fileRecord->_nextLocation = thisPos ;
331  }
332  if( _fileRecord->_prevLocation > thisPos ) {
333  _fileRecord->_prevLocation = thisPos ;
334  }
335 
336  // 5) Write the file random access record to stream
337  raHandler->setRandomAccess( _fileRecord ) ;
338  // write in the buffer first, then the buffer to the file
339  recinfo = sio::api::write_record( LCSIO::AccessRecordName, _rawBuffer, blocks, 0 ) ;
340  sio::api::write_record( stream, _rawBuffer.span(), recinfo ) ;
341 
342  SIO_DEBUG( "Random access manager: =====\n" << *this );
343  }
344 
345  //----------------------------------------------------------------------------
346 
347  void LCIORandomAccessMgr::seekStream( sio::ifstream &stream, long64 pos ) {
348  if( not stream.is_open() ) {
349  throw IO::IOException( "[LCIORandomAccessMgr::seekStream] Stream not open") ;
350  }
351  if( pos < 0 ) {
352  stream.seekg( 0 , std::ios_base::end ) ;
353  auto endg = stream.tellg() ;
354  if( endg < -pos ) {
355  std::stringstream s ; s << "[LCIORandomAccessMgr::seekStream] Can't seek stream to " << pos ;
356  throw IO::IOException( s.str() ) ;
357  }
358  std::streampos tpos = -pos ;
359  // pos is negative, so addition make sense here !
360  stream.seekg( endg - tpos , std::ios_base::beg ) ;
361  }
362  else {
363  stream.seekg( pos ) ;
364  }
365  if( not stream.good() ) {
366  std::stringstream s ; s << "[LCIORandomAccessMgr::seekStream] Can't seek stream to " << pos << ". rdstate is: " << stream.rdstate() ;
367  throw IO::IOException( s.str() ) ;
368  }
369  }
370 
371  //----------------------------------------------------------------------------
372 
374  os << " LCIORandomAccessMgr: ----------------------- " << std::endl ;
375  for( auto i = ra._list.begin() ; i != ra._list.end() ; ++i ) {
376  os << **i ;
377  }
378  os << *(ra._runEvtMap) << std::endl ;
379  return os ;
380  }
381 }
Manager class for LCIO direct access.
std::shared_ptr< LCIORandomAccess > _fileRecord
The file record summary.
bool createEventMap(sio::ifstream &stream)
Get the run and event header map from the stream - either by reading the random access records or by ...
static constexpr int RandomAccessSize
Definition: LCSIO.h:31
T endl(T...args)
static constexpr const char * RunRecordName
Definition: LCSIO.h:23
LCEvent * event
Definition: lsh.cc:80
bool initAppend(sio::ifstream &stream)
Initialize random access for append mode: read last LCIORandomAccess record if it exists - recreate t...
T rdstate(T...args)
std::shared_ptr< LCIORandomAccess > createFromEventMap() const
Prepare an LCIORandomAccess object from the current contents of RunEventMap (all file locations set t...
T end(T...args)
Helper struct that stores run and event positions in the file.
Definition: RunEventMap.h:14
bool readLCIOIndex(sio::ifstream &stream)
Helper for reading the next LCIOIndex record (need preceeding call to LCSIO::seek() ) ...
static constexpr const char * AccessRecordName
Definition: LCSIO.h:29
static void readBlocks(const sio::buffer_span &buffer, EVENT::LCRunHeader *rhdr)
Read the block(s) from the sio buffer and decode a run header.
Definition: LCIORecords.cc:95
void addLCIORandomAccess(std::shared_ptr< LCIORandomAccess > ra)
Add a new LCIORandomAccess object to the list.
std::ostream & operator<<(std::ostream &os, const LCIORandomAccess &ra)
bool recreateEventMap(sio::ifstream &stream)
Fill the RunEventMap from the event and run header records in the file.
STL class.
bool readLCIORandomAccess(sio::ifstream &stream)
Helper for reading the next LCIORandomAccess record (need preceeding call to LCSIO::seek() ) ...
T push_back(T...args)
T pop_back(T...args)
T str(T...args)
IOException used for reading/writing errors.
Definition: Exceptions.h:92
T clear(T...args)
T find(T...args)
const std::shared_ptr< LCIORandomAccess > lastLCIORandomAccess() const
Pointer to the last LCIORandomAccess in the list.
bool readLCIORandomAccessAt(sio::ifstream &stream, long64 pos)
Read the LCIORandomAccess record at the specified position.
bool readLCIOIndexAt(sio::ifstream &stream, long64 pos)
Read the LCIOIndex record at the specified position.
T begin(T...args)
void writeRandomAccessRecords(sio::ofstream &stream)
Write the current random access records LCIOIndex and LCIORandomAccess to the stream.
void createFileRecord()
Create file record from all LCIORandomAccess records.
T back(T...args)
LCIORandomAccessList _list
List of LCIORandomAccess objects.
static constexpr const char * IndexRecordName
Definition: LCSIO.h:32
std::shared_ptr< RunEventMap > _runEvtMap
Map with RunHeader and EventHeader record positions.
static constexpr const char * HeaderRecordName
Definition: LCSIO.h:27
void clear()
Clear all lists and maps before closing a file.
void seekStream(sio::ifstream &stream, long64 pos)
Seek the stream at the given position.
STL class.
static void readBlocks(const sio::buffer_span &buffer, EVENT::LCEvent *event, const std::vector< std::string > &readCol)
Read an event header record.
Definition: LCIORecords.cc:17