LCIO  02.17
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
LCWriter.cc
Go to the documentation of this file.
1 #include <MT/LCWriter.h>
2 
3 // -- lcio headers
4 #include "EVENT/LCEvent.h"
5 #include "EVENT/LCRunHeader.h"
6 #include "EVENT/LCIO.h"
7 #include "EVENT/LCCollection.h"
8 #include "SIO/LCSIO.h"
10 
11 // -- sio headers
12 #include <sio/exception.h>
13 #include <sio/api.h>
14 #include <sio/compression/zlib.h>
15 
16 // -- std headers
17 #include <sys/stat.h>
18 
19 namespace MT {
20 
21  void LCWriter::open( const std::string &filename ) {
22  auto sioFilename = getSIOFileName( filename ) ;
23  struct stat fileinfo ;
24  // if the file exists we throw an exception
25  if ( ::stat( sioFilename.c_str(), &fileinfo ) == 0 ) {
26  throw IO::IOException( std::string( "[SIOWriter::open()] File already exists: "
27  + sioFilename
28  + " \n open it in append or new mode !\n"
29  )) ;
30  }
31  // open new file for writing
32  open( filename, EVENT::LCIO::WRITE_NEW ) ;
33  }
34 
35  //----------------------------------------------------------------------------
36 
37  void LCWriter::open( const std::string &filename, int writeMode ) {
38  // make sure filename has the proper extension (.slcio)
39  auto sioFilename = getSIOFileName( filename ) ;
41  _raMgr = std::make_shared<SIO::LCIORandomAccessMgr>() ;
42  switch( writeMode ) {
43  case EVENT::LCIO::WRITE_NEW :
44  _stream.open( sioFilename , std::ios::binary ) ;
45  break ;
46  case EVENT::LCIO::WRITE_APPEND :
47  // try to read the last LCIORandomAccess record at the end --------------------
48  SIO_DEBUG( "SIOWriter::open: Opening in write append mode" );
49  sio::ifstream istr ;
50  istr.open( sioFilename, std::ios::binary ) ;
51  // status = _stream->open( sioFilename.c_str() , SIO_MODE_READ ) ;
52  if( not istr.is_open() ) {
53  throw IO::IOException( std::string( "[SIOWriter::open()] Can't open stream for reading TOC: " + sioFilename ) ) ;
54  }
55  SIO_DEBUG( "SIOWriter::open: Opening in write append mode 1" );
56  bool hasRandomAccess = _raMgr->initAppend( istr ) ;
57  istr.close() ;
58  SIO_DEBUG( "SIOWriter::open: Opening in write append mode 2" );
59  if( hasRandomAccess ) {
60  _stream.open( sioFilename.c_str() , std::ios::binary | std::ios::out | std::ios::in ) ;
61  // position at the beginnning of the file record which will then be overwritten with the next record ...
62  _stream.seekp( 0 , std::ios_base::end ) ;
63  auto endg = _stream.tellp() ;
64  if( endg < SIO::LCSIO::RandomAccessSize ) {
65  std::stringstream s ; s << "[SIOWriter::open()] Can't seek stream to " << SIO::LCSIO::RandomAccessSize ;
66  throw IO::IOException( s.str() ) ;
67  }
69  _stream.seekp( endg - tpos , std::ios_base::beg ) ;
70  }
71  else {
72  _stream.open( sioFilename.c_str() , std::ios::binary | std::ios::out | std::ios::ate ) ;
73  }
74  SIO_DEBUG( "SIOWriter::open: Opening in write append mode ... OK" );
75  break ;
76  }
77  if( not _stream.good() or not _stream.is_open() ) {
78  SIO_THROW( sio::error_code::not_open, "[SIOWriter::open()] Couldn't open file: '" + sioFilename + "'" ) ;
79  }
80  }
81 
82  //----------------------------------------------------------------------------
83 
86  }
87 
88  //----------------------------------------------------------------------------
89 
91  // NOTE: the lock is acquired only on stream writing.
92  // create a buffer with a sufficient length
93  auto buflen = _maxBufferSize.load() ;
94  sio::buffer rawBuffer( buflen ) ;
95  // write the record in the buffer
96  sio::record_info recinfo {} ;
97  SIO::SIORunHeaderRecord::writeRecord( rawBuffer, hdr, recinfo, 0 ) ;
98  // deal with zlib compression here
99  if( _compressionLevel.load() != 0 ) {
100  sio::zlib_compression compressor {} ;
101  compressor.set_level( _compressionLevel.load() ) ;
102  // the compressed buffer will have a len
103  // smaller than the uncompressed one
104  sio::buffer compBuffer( rawBuffer.size() ) ;
105  // compress !
106  sio::api::compress_record( recinfo, rawBuffer, compBuffer, compressor ) ;
107  // locked scope
108  {
110  if( not _stream.is_open() ) {
111  throw IO::IOException( "[MT::LCWriter::writeRunHeader] stream not opened") ;
112  }
113  sio::api::write_record( _stream, rawBuffer.span(0, recinfo._header_length), compBuffer.span(), recinfo ) ;
114  // random access treatment
115  _raMgr->add( SIO::RunEvent( hdr->getRunNumber(), -1 ) , recinfo._file_start ) ;
116  // update the maximum size of buffer
117  if( _maxBufferSize.load() < rawBuffer.size() ) {
118  _maxBufferSize = rawBuffer.size() ;
119  }
120  }
121  }
122  else {
123  // locked scope
125  if( not _stream.is_open() ) {
126  throw IO::IOException( "[MT::LCWriter::writeRunHeader] stream not opened") ;
127  }
128  sio::api::write_record( _stream, rawBuffer.span(), recinfo ) ;
129  // random access treatment
130  _raMgr->add( SIO::RunEvent( hdr->getRunNumber(), -1 ) , recinfo._file_start ) ;
131  // update the maximum size of buffer
132  if( _maxBufferSize.load() < rawBuffer.size() ) {
133  _maxBufferSize = rawBuffer.size() ;
134  }
135  }
136  }
137 
138  //----------------------------------------------------------------------------
139 
141  writeEvent( evt, {} ) ;
142  }
143 
144  //----------------------------------------------------------------------------
145 
147  // create buffers with a sufficient length
148  auto buflen = _maxBufferSize.load() ;
149  sio::buffer hdrRawBuffer( buflen ) ;
150  sio::buffer evtRawBuffer( buflen ) ;
151  // 1) write the event header record
152  sio::record_info rechdrinfo {} ;
153  SIO::SIOEventHeaderRecord::writeRecord( hdrRawBuffer, evt, rechdrinfo, colsOnly, 0 ) ;
154  // 2) write the event record
155  sio::record_info recinfo {} ;
156  SIO::SIOHandlerMgr eventHandlerMgr {} ;
157  SIO::SIOEventRecord::writeRecord( evtRawBuffer, evt, eventHandlerMgr, recinfo, colsOnly, 0 ) ;
158  // deal with zlib compression here
159  // and write in locked scope
160  if( _compressionLevel.load() != 0 ) {
161  sio::zlib_compression compressor {} ;
162  compressor.set_level( _compressionLevel.load() ) ;
163  sio::buffer hdrCompBuffer( hdrRawBuffer.size() ) ;
164  sio::buffer evtCompBuffer( evtRawBuffer.size() ) ;
165  // compress records
166  sio::api::compress_record( rechdrinfo, hdrRawBuffer, hdrCompBuffer, compressor ) ;
167  sio::api::compress_record( recinfo, evtRawBuffer, evtCompBuffer, compressor ) ;
168  {
170  if( not _stream.is_open() ) {
171  throw IO::IOException( "[SIOWriter::writeEvent] stream not opened") ;
172  }
173  // write to disk
174  sio::api::write_record( _stream, hdrRawBuffer.span(0, rechdrinfo._header_length), hdrCompBuffer.span(), rechdrinfo ) ;
175  sio::api::write_record( _stream, evtRawBuffer.span(0, recinfo._header_length), evtCompBuffer.span(), recinfo ) ;
176  // random access treatment
177  _raMgr->add( SIO::RunEvent( evt->getRunNumber(), evt->getEventNumber() ) , rechdrinfo._file_start ) ;
178  // update the maximum size of buffer
179  if( _maxBufferSize.load() < evtRawBuffer.size() ) {
180  _maxBufferSize = evtRawBuffer.size() ;
181  }
182  }
183  }
184  else {
186  if( not _stream.is_open() ) {
187  throw IO::IOException( "[SIOWriter::writeEvent] stream not opened") ;
188  }
189  sio::api::write_record( _stream, hdrRawBuffer.span(), rechdrinfo ) ;
190  sio::api::write_record( _stream, evtRawBuffer.span(), recinfo ) ;
191  // random access treatment
192  _raMgr->add( SIO::RunEvent( evt->getRunNumber(), evt->getEventNumber() ) , rechdrinfo._file_start ) ;
193  // update the maximum size of buffer
194  if( _maxBufferSize.load() < evtRawBuffer.size() ) {
195  _maxBufferSize = evtRawBuffer.size() ;
196  }
197  }
198  }
199 
200  //----------------------------------------------------------------------------
201 
204  _raMgr->writeRandomAccessRecords( _stream ) ;
205  _raMgr->clear() ;
206  _raMgr = nullptr ;
207  _stream.close();
208  }
209 
210  //----------------------------------------------------------------------------
211 
213  if( filename.rfind( SIO::LCSIO::FileExtension ) == std::string::npos || // .slcio not found at all
214  !( filename.rfind( SIO::LCSIO::FileExtension ) + strlen( SIO::LCSIO::FileExtension ) == filename.length() ) ) { // found, but not at end
215  return filename + SIO::LCSIO::FileExtension ;
216  }
217  else {
218  return filename ;
219  }
220  }
221 
222 }
static void writeRecord(sio::buffer &outbuf, EVENT::LCEvent *event, sio::record_info &rec_info, sio::options_type opts=0)
Write an event header to the raw sio buffer.
Definition: LCIORecords.cc:28
std::shared_ptr< SIO::LCIORandomAccessMgr > _raMgr
The random access manager for event/run random access in the file.
Definition: LCWriter.h:111
T open(T...args)
static constexpr int RandomAccessSize
Definition: LCSIO.h:31
Interface for the run header.
Definition: LCRunHeader.h:23
T good(T...args)
T rfind(T...args)
void close()
Closes the output file/stream.
Definition: LCWriter.cc:202
static void writeRecord(sio::buffer &outbuf, EVENT::LCRunHeader *rhdr, sio::record_info &rec_info, sio::options_type opts=0)
Write a run header record to the sio buffer.
Definition: LCIORecords.cc:105
Helper struct that stores run and event positions in the file.
Definition: RunEventMap.h:14
virtual int getRunNumber() const =0
Return the run number off this event.
void open(const std::string &filename)
Opens a file for writing.
Definition: LCWriter.cc:21
static void writeRecord(sio::buffer &outbuf, EVENT::LCEvent *event, const SIOHandlerMgr &handlerMgr, sio::record_info &rec_info, sio::options_type opts=0)
Write an event record.
Definition: LCIORecords.cc:78
STL class.
void writeEvent(EVENT::LCEvent *evt)
Writes the given event to file.
Definition: LCWriter.cc:140
T seekp(T...args)
virtual int getRunNumber() const =0
Returns the run number.
T close(T...args)
T str(T...args)
IOException used for reading/writing errors.
Definition: Exceptions.h:92
std::mutex _mutex
Synchronization mutex.
Definition: LCWriter.h:113
static constexpr const char * FileExtension
The lcio file extension name.
Definition: LCSIO.h:20
T tellp(T...args)
T length(T...args)
std::atomic< unsigned int > _maxBufferSize
The compression level.
Definition: LCWriter.h:107
The main event interface.
Definition: LCEvent.h:31
std::atomic< int > _compressionLevel
The compression level.
Definition: LCWriter.h:109
void setCompressionLevel(int level)
Set the compression level - needs to be called before open() otherwise call will have no effect...
Definition: LCWriter.cc:84
Holds instances of all types of SIOObjectHandlers.
Definition: SIOHandlerMgr.h:20
virtual int getEventNumber() const =0
Returns this event&#39;s number .
pair< int, string > level
Definition: lsh.cc:50
void writeRunHeader(EVENT::LCRunHeader *hdr)
Writes the given run header to file.
Definition: LCWriter.cc:90
std::ofstream _stream
The output file stream.
Definition: LCWriter.h:105
T is_open(T...args)
static std::string getSIOFileName(const std::string &filename)
Definition: LCWriter.cc:212
T load(T...args)