tdaq-develop-2025-02-12
RawDataSaverConsumerBase.cc
1 #include "otsdaq/DataManager/RawDataSaverConsumerBase.h"
2 #include "otsdaq/Macros/BinaryStringMacros.h"
3 #include "otsdaq/MessageFacility/MessageFacility.h"
4 
5 #include <unistd.h>
6 #include <cassert>
7 #include <iostream>
8 // #include <string.h> //memcpy
9 #include <fstream>
10 #include <sstream>
11 
12 using namespace ots;
13 
14 //==============================================================================
15 RawDataSaverConsumerBase::RawDataSaverConsumerBase(
16  std::string supervisorApplicationUID,
17  std::string bufferUID,
18  std::string processorUID,
19  const ConfigurationTree& theXDAQContextConfigTree,
20  const std::string& configurationPath)
21  : WorkLoop(processorUID)
22  , DataConsumer(
23  supervisorApplicationUID, bufferUID, processorUID, HighConsumerPriority)
24  , Configurable(theXDAQContextConfigTree, configurationPath)
25  , filePath_(theXDAQContextConfigTree.getNode(configurationPath)
26  .getNode("FilePath")
27  .getValue<std::string>())
28  , fileRadix_(theXDAQContextConfigTree.getNode(configurationPath)
29  .getNode("RadixFileName")
30  .getValue<std::string>())
31  , maxFileSize_(theXDAQContextConfigTree.getNode(configurationPath)
32  .getNode("MaxFileSize")
33  .getValue<long>() *
34  1000000) // Instead of 2^6=1048576
35  , currentSubRunNumber_(0)
36 
37 {
38  // FILE *fp = fopen( "/home/otsdaq/tsave.txt","w");
39  // if(fp)fclose(fp);
40 }
41 
42 //==============================================================================
43 RawDataSaverConsumerBase::~RawDataSaverConsumerBase(void) {}
44 
45 //==============================================================================
47 {
48  if(runNumber != "") // If there is no number it means it was paused
49  {
50  currentSubRunNumber_ = 0;
51  openFile(runNumber);
52  }
54 }
55 
56 //==============================================================================
57 void RawDataSaverConsumerBase::stopProcessingData(void)
58 {
59  DataConsumer::stopProcessingData();
60  closeFile();
61 }
62 
63 //==============================================================================
64 void RawDataSaverConsumerBase::openFile(std::string runNumber)
65 {
66  currentRunNumber_ = runNumber;
67  // std::string fileName = "Run" + runNumber + "_" + processorUID_ + "_Raw.dat";
68  std::stringstream fileName;
69  fileName << filePath_ << "/" << fileRadix_ << "_Run" << runNumber;
70  // if split file is there then subrunnumber must be set!
71  if(maxFileSize_ > 0)
72  fileName << "_" << currentSubRunNumber_;
73  fileName << "_Raw.dat";
74  __CFG_COUT__ << "Saving file: " << fileName.str() << std::endl;
75  outFile_.open(fileName.str().c_str(), std::ios::out | std::ios::binary);
76  if(!outFile_.is_open())
77  {
78  __CFG_SS__ << "Can't open file " << fileName.str() << std::endl;
79  __CFG_SS_THROW__;
80  }
81 
82  writeHeader(); // write start of file header
83 }
84 
85 //==============================================================================
86 void RawDataSaverConsumerBase::closeFile(void)
87 {
88  if(outFile_.is_open())
89  {
90  writeFooter(); // write end of file footer
91  outFile_.close();
92  }
93 }
94 
95 //==============================================================================
96 void RawDataSaverConsumerBase::save(const std::string& data)
97 {
98  std::ofstream output;
99 
100  // std::string outputPath = "/home/otsdaq/tsave.txt";
101  // if(0)
102  // {
103  // output.open(outputPath, std::ios::out | std::ios::app | std::ios::binary);
104  // output << data;
105  // }
106  // else
107  // {
108  // output.open(outputPath, std::ios::out | std::ios::app);
109  // output << data;
110  //
111  // char str[5];
112  // for(unsigned int j=0;j<data.length();++j)
113  // {
114  // sprintf(str,"%2.2x",((unsigned int)data[j]) & ((unsigned int)(0x0FF)));
115  //
116  // if(j%64 == 0) std::cout << "SAVE " << j << "\t: 0x\t";
117  // std::cout << str;
118  // if(j%8 == 7) std::cout << " ";
119  // if(j%64 == 63) std::cout << std::endl;
120  // }
121  // std::cout << std::endl;
122  // std::cout << std::endl;
123  // }
124  //
125  // if(1)
126  // {
127  // char str[5];
128  // for(unsigned int j=0;j<data.length();++j)
129  // {
130  // sprintf(str,"%2.2x",((unsigned int)data[j]) & ((unsigned int)(0x0FF)));
131  //
132  // if(j%64 == 0) std::cout << "SAVE " << j << "\t: 0x\t";
133  // std::cout << str;
134  // if(j%8 == 7) std::cout << " ";
135  // if(j%64 == 63) std::cout << std::endl;
136  // }
137  // std::cout << std::endl;
138  // std::cout << std::endl;
139  // }
140 
141  if(maxFileSize_ > 0)
142  {
143  long length = outFile_.tellp();
144  if(length >= maxFileSize_ / 1000)
145  {
146  closeFile();
147  ++currentSubRunNumber_;
148  openFile(currentRunNumber_);
149  }
150  }
151 
152  writePacketHeader(data); // write start of packet header
153  outFile_.write((char*)&data[0], data.length());
154  writePacketFooter(data); // write start of packet footer
155 }
156 
157 //==============================================================================
158 bool RawDataSaverConsumerBase::workLoopThread(toolbox::task::WorkLoop* /*workLoop*/)
159 {
160  //__CFG_COUT__ << DataProcessor::processorUID_ << " running, because workloop: "
161  //<< WorkLoop::continueWorkLoop_ << std::endl;
162  fastRead();
163  return WorkLoop::continueWorkLoop_;
164 }
165 
166 //==============================================================================
167 void RawDataSaverConsumerBase::fastRead(void)
168 {
169  //__CFG_COUT__ << processorUID_ << " running!" << std::endl;
170  if(DataConsumer::read(dataP_, headerP_) < 0)
171  {
172  usleep(100);
173  return;
174  }
175  //__CFG_COUTV__(dataP_->length());
176  // std::string& buffer = *dataP_;
177 
178  //__CFG_COUT__ << "Reading from buffer length " << buffer.length() << " bytes!" <<
179  //__E__;
180  //__CFG_COUT__ << "Buffer Data: " <<
181  // BinaryStringMacros::binaryNumberToHexString(buffer) << __E__;
182 
183  save(*dataP_);
184  DataConsumer::setReadSubBuffer<std::string, std::map<std::string, std::string> >();
185 }
186 
187 //==============================================================================
188 void RawDataSaverConsumerBase::slowRead(void)
189 {
190  //__CFG_COUT__ << processorUID_ << " running!" << std::endl;
191  // This is making a copy!!!
192  if(DataConsumer::read(data_, header_) < 0)
193  {
194  usleep(1000);
195  return;
196  }
197  save(data_);
198 }
int read(D &buffer, H &header)
Copies the buffer into the passed parameters.
Definition: DataConsumer.h:38
virtual void startProcessingData(std::string runNumber)
virtual void startProcessingData(std::string runNumber) override
std::string * dataP_
For fast read.