otsdaq  v2_05_02_indev
RawDataSaverConsumerBase.cc
1 #include "otsdaq/DataManager/RawDataSaverConsumerBase.h"
2 #include "otsdaq/Macros/BinaryStringMacros.h"
3 #include "otsdaq/MessageFacility/MessageFacility.h"
4 
5 #include <unistd.h>
6 #include <cassert>
7 #include <iostream>
8 //#include <string.h> //memcpy
9 #include <fstream>
10 #include <sstream>
11 
12 using namespace ots;
13 
14 //==============================================================================
15 RawDataSaverConsumerBase::RawDataSaverConsumerBase(std::string supervisorApplicationUID,
16  std::string bufferUID,
17  std::string processorUID,
18  const ConfigurationTree& theXDAQContextConfigTree,
19  const std::string& configurationPath)
20  : WorkLoop(processorUID)
21  , DataConsumer(supervisorApplicationUID, bufferUID, processorUID, HighConsumerPriority)
22  , Configurable(theXDAQContextConfigTree, configurationPath)
23  , filePath_(theXDAQContextConfigTree.getNode(configurationPath).getNode("FilePath").getValue<std::string>())
24  , fileRadix_(theXDAQContextConfigTree.getNode(configurationPath).getNode("RadixFileName").getValue<std::string>())
25  , maxFileSize_(theXDAQContextConfigTree.getNode(configurationPath).getNode("MaxFileSize").getValue<long>() * 1000000) // Instead of 2^6=1048576
26  , currentSubRunNumber_(0)
27 
28 {
29  // FILE *fp = fopen( "/home/otsdaq/tsave.txt","w");
30  // if(fp)fclose(fp);
31 }
32 
33 //==============================================================================
34 RawDataSaverConsumerBase::~RawDataSaverConsumerBase(void) {}
35 
36 //==============================================================================
37 void RawDataSaverConsumerBase::startProcessingData(std::string runNumber)
38 {
39  if(runNumber != "") // If there is no number it means it was paused
40  {
41  currentSubRunNumber_ = 0;
42  openFile(runNumber);
43  }
44  DataConsumer::startProcessingData(runNumber);
45 }
46 
47 //==============================================================================
48 void RawDataSaverConsumerBase::stopProcessingData(void)
49 {
50  DataConsumer::stopProcessingData();
51  closeFile();
52 }
53 
54 //==============================================================================
55 void RawDataSaverConsumerBase::openFile(std::string runNumber)
56 {
57  currentRunNumber_ = runNumber;
58  // std::string fileName = "Run" + runNumber + "_" + processorUID_ + "_Raw.dat";
59  std::stringstream fileName;
60  fileName << filePath_ << "/" << fileRadix_ << "_Run" << runNumber;
61  // if split file is there then subrunnumber must be set!
62  if(maxFileSize_ > 0)
63  fileName << "_" << currentSubRunNumber_;
64  fileName << "_Raw.dat";
65  __CFG_COUT__ << "Saving file: " << fileName.str() << std::endl;
66  outFile_.open(fileName.str().c_str(), std::ios::out | std::ios::binary);
67  if(!outFile_.is_open())
68  {
69  __CFG_SS__ << "Can't open file " << fileName.str() << std::endl;
70  __CFG_SS_THROW__;
71  }
72 
73  writeHeader(); // write start of file header
74 }
75 
76 //==============================================================================
77 void RawDataSaverConsumerBase::closeFile(void)
78 {
79  if(outFile_.is_open())
80  {
81  writeFooter(); // write end of file footer
82  outFile_.close();
83  }
84 }
85 
86 //==============================================================================
87 void RawDataSaverConsumerBase::save(const std::string& data)
88 {
89  std::ofstream output;
90 
91  // std::string outputPath = "/home/otsdaq/tsave.txt";
92  // if(0)
93  // {
94  // output.open(outputPath, std::ios::out | std::ios::app | std::ios::binary);
95  // output << data;
96  // }
97  // else
98  // {
99  // output.open(outputPath, std::ios::out | std::ios::app);
100  // output << data;
101  //
102  // char str[5];
103  // for(unsigned int j=0;j<data.length();++j)
104  // {
105  // sprintf(str,"%2.2x",((unsigned int)data[j]) & ((unsigned int)(0x0FF)));
106  //
107  // if(j%64 == 0) std::cout << "SAVE " << j << "\t: 0x\t";
108  // std::cout << str;
109  // if(j%8 == 7) std::cout << " ";
110  // if(j%64 == 63) std::cout << std::endl;
111  // }
112  // std::cout << std::endl;
113  // std::cout << std::endl;
114  // }
115  //
116  // if(1)
117  // {
118  // char str[5];
119  // for(unsigned int j=0;j<data.length();++j)
120  // {
121  // sprintf(str,"%2.2x",((unsigned int)data[j]) & ((unsigned int)(0x0FF)));
122  //
123  // if(j%64 == 0) std::cout << "SAVE " << j << "\t: 0x\t";
124  // std::cout << str;
125  // if(j%8 == 7) std::cout << " ";
126  // if(j%64 == 63) std::cout << std::endl;
127  // }
128  // std::cout << std::endl;
129  // std::cout << std::endl;
130  // }
131 
132  if(maxFileSize_ > 0)
133  {
134  long length = outFile_.tellp();
135  if(length >= maxFileSize_ / 1000)
136  {
137  closeFile();
138  ++currentSubRunNumber_;
139  openFile(currentRunNumber_);
140  }
141  }
142 
143  writePacketHeader(data); // write start of packet header
144  outFile_.write((char*)&data[0], data.length());
145  writePacketFooter(data); // write start of packet footer
146 }
147 
148 //==============================================================================
149 bool RawDataSaverConsumerBase::workLoopThread(toolbox::task::WorkLoop* /*workLoop*/)
150 {
151  //__CFG_COUT__ << DataProcessor::processorUID_ << " running, because workloop: "
152  //<< WorkLoop::continueWorkLoop_ << std::endl;
153  fastRead();
154  return WorkLoop::continueWorkLoop_;
155 }
156 
157 //==============================================================================
158 void RawDataSaverConsumerBase::fastRead(void)
159 {
160  //__CFG_COUT__ << processorUID_ << " running!" << std::endl;
161  if(DataConsumer::read(dataP_, headerP_) < 0)
162  {
163  usleep(100);
164  return;
165  }
166  //__CFG_COUTV__(dataP_->length());
167  //std::string& buffer = *dataP_;
168 
169  //__CFG_COUT__ << "Reading from buffer length " << buffer.length() << " bytes!" <<
170  //__E__;
171  //__CFG_COUT__ << "Buffer Data: " <<
172  // BinaryStringMacros::binaryNumberToHexString(buffer) << __E__;
173 
174  save(*dataP_);
175  DataConsumer::setReadSubBuffer<std::string, std::map<std::string, std::string> >();
176 }
177 
178 //==============================================================================
179 void RawDataSaverConsumerBase::slowRead(void)
180 {
181  //__CFG_COUT__ << processorUID_ << " running!" << std::endl;
182  // This is making a copy!!!
183  if(DataConsumer::read(data_, header_) < 0)
184  {
185  usleep(1000);
186  return;
187  }
188  save(data_);
189 }