otsdaq  v2_05_02_indev
ARTDAQConsumer_processor.cc
1 
2 //#include "artdaq/Application/Commandable.hh"
3 //#include "fhiclcpp/make_ParameterSet.h"
4 //#include "otsdaq/DataManager/DataManager.h"
5 //#include "otsdaq/DataManager/DataManagerSingleton.h"
6 #include "otsdaq/DataProcessorPlugins/ARTDAQConsumer.h"
7 //#include "otsdaq/Macros/CoutMacros.h"
8 #include "otsdaq/Macros/ProcessorPluginMacros.h"
9 //#include "otsdaq/MessageFacility/MessageFacility.h"
10 //
11 //#include <cstdint>
12 //#include <fstream>
13 //#include <iostream>
14 //#include <set>
15 
16 using namespace ots;
17 //
18 //#define ARTDAQ_FCL_PATH std::string(__ENV__("USER_DATA")) + "/" +
19 //"ARTDAQConfigurations/" #define ARTDAQ_FILE_PREAMBLE "boardReader"
20 
21 //==============================================================================
22 ARTDAQConsumer::ARTDAQConsumer(std::string supervisorApplicationUID,
23  std::string bufferUID,
24  std::string processorUID,
25  const ConfigurationTree& theXDAQContextConfigTree,
26  const std::string& configurationPath)
27  : WorkLoop(processorUID)
28  , DataConsumer(supervisorApplicationUID, bufferUID, processorUID, LowConsumerPriority)
29  , ARTDAQReaderProcessorBase(supervisorApplicationUID, bufferUID, processorUID, theXDAQContextConfigTree, configurationPath)
30 // : WorkLoop(processorUID)
31 // , DataConsumer(supervisorApplicationUID, bufferUID, processorUID,
32 // LowConsumerPriority) , Configurable(theXDAQContextConfigTree, configurationPath)
33 {
34  __COUT__ << "ARTDAQ Consumer constructed." << __E__;
35  //__COUT__ << "Configuration string:-" <<
36  // theXDAQContextConfigTree.getNode(configurationPath).getNode("ConfigurationString").getValue<std::string>()
37  //<< "-" << __E__;
38  //
39  // std::string filename = ARTDAQ_FCL_PATH + ARTDAQ_FILE_PREAMBLE + "-";
40  // std::string uid =
41  // theXDAQContextConfigTree.getNode(configurationPath).getValue();
42  //
43  // __COUT__ << "uid: " << uid << __E__;
44  // for(unsigned int i = 0; i < uid.size(); ++i)
45  // if((uid[i] >= 'a' && uid[i] <= 'z') || (uid[i] >= 'A' && uid[i] <= 'Z') ||
46  // (uid[i] >= '0' && uid[i] <= '9')) // only allow alpha numeric in file name
47  // filename += uid[i];
48  // filename += ".fcl";
49  //
50  // __COUT__ << __E__;
51  // __COUT__ << __E__;
52  // __COUT__ << "filename: " << filename << __E__;
53  //
54  // std::string fileFclString;
55  // {
56  // std::ifstream in(filename, std::ios::in | std::ios::binary);
57  // if(in)
58  // {
59  // std::string contents;
60  // in.seekg(0, std::ios::end);
61  // fileFclString.resize(in.tellg());
62  // in.seekg(0, std::ios::beg);
63  // in.read(&fileFclString[0], fileFclString.size());
64  // in.close();
65  // }
66  // }
67  // //__COUT__ << fileFclString << __E__;
68  //
69  // // find fragment_receiver {
70  // // and insert e.g.,
71  // // SupervisorApplicationUID:"ARTDataManager0"
72  // // BufferUID:"ART_S0_DM0_DataBuffer0"
73  // // ProcessorUID:"ART_S0_DM0_DB0_ARTConsumer0"
74  // size_t fcli =
75  // fileFclString.find("fragment_receiver: {") + +strlen("fragment_receiver: {");
76  // if(fcli == std::string::npos)
77  // {
78  // __SS__ << "Could not find 'fragment_receiver: {' in Board Reader fcl string!"
79  // << __E__;
80  // __COUT__ << "\n" << ss.str();
81  // __SS_THROW__;
82  // }
83  //
84  // // get the parent IDs from configurationPath
85  // __COUT__ << "configurationPath " << configurationPath << __E__;
86  //
87  // std::string consumerID, bufferID, appID;
88  // unsigned int backSteps; // at 2, 4, and 7 are the important parent IDs
89  // size_t backi = -1, backj;
90  // backSteps = 7;
91  // for(unsigned int i = 0; i < backSteps; i++)
92  // {
93  // //__COUT__ << "backsteps: " << i+1 << __E__;
94  //
95  // backj = backi;
96  // backi = configurationPath.rfind('/', backi - 1);
97  //
98  // //__COUT__ << "backi:" << backi << " backj:" << backj << __E__;
99  // //__COUT__ << "substr: " << configurationPath.substr(backi+1,backj-backi-1) <<
100  // // __E__;
101  //
102  // if(i + 1 == 2)
103  // consumerID = configurationPath.substr(backi + 1, backj - backi - 1);
104  // else if(i + 1 == 4)
105  // bufferID = configurationPath.substr(backi + 1, backj - backi - 1);
106  // else if(i + 1 == 7)
107  // appID = configurationPath.substr(backi + 1, backj - backi - 1);
108  // }
109  //
110  // // insert parent IDs into fcl string
111  // fileFclString = fileFclString.substr(0, fcli) + "\n\t\t" +
112  // "SupervisorApplicationUID: \"" + appID + "\"\n\t\t" +
113  // "BufferUID: \"" + bufferID + "\"\n\t\t" + "ProcessorUID: \"" +
114  // consumerID + "\"\n" + fileFclString.substr(fcli);
115  //
116  // __COUT__ << fileFclString << __E__;
117  //
118  // fhicl::make_ParameterSet(fileFclString, fhiclConfiguration_);
119  //
120  // //
121  // fhicl::make_ParameterSet(theXDAQContextConfigTree.getNode(configurationPath).getNode("ConfigurationString").getValue<std::string>(),
122  // // fhiclConfiguration_);
123 }
124 
125 //==============================================================================
126 // ARTDAQConsumer::ARTDAQConsumer(std::string interfaceID, MPI_Comm local_group_comm,
127 // std::string name) :FEVInterface (feId, 0) ,local_group_comm_(local_group_comm)
128 //,name_ (name)
129 //{}
130 
131 //==============================================================================
132 ARTDAQConsumer::~ARTDAQConsumer(void)
133 {
134  halt();
135  __COUT__ << "Destructed." << __E__;
136 }
137 //
139 // void ARTDAQConsumer::initLocalGroup(int rank)
140 //{
141 // name_ = "BoardReader_" + DataConsumer::processorUID_;
142 // configure(rank);
143 //}
144 //
145 //#define ARTDAQ_FCL_PATH std::string(__ENV__("USER_DATA")) + "/" +
146 //"ARTDAQConfigurations/" #define ARTDAQ_FILE_PREAMBLE "boardReader"
147 //
149 // void ARTDAQConsumer::configure(int rank)
150 //{
151 // __COUT__ << "\tConfigure" << __E__;
152 //
153 // report_string_ = "";
154 // external_request_status_ = true;
155 //
156 // // in the following block, we first destroy the existing BoardReader
157 // // instance, then create a new one. Doing it in one step does not
158 // // produce the desired result since that creates a new instance and
159 // // then deletes the old one, and we need the opposite order.
160 // fragment_receiver_ptr_.reset(nullptr);
161 // __COUT__ << "\tNew core" << __E__;
162 // my_rank = rank;
163 // app_name = name_;
164 // fragment_receiver_ptr_.reset(new artdaq::BoardReaderApp());
165 // // FIXME These are passed as parameters
166 // uint64_t timeout = 45;
167 // // uint64_t timestamp = 184467440737095516;
168 // uint64_t timestamp = 184467440737095516;
169 // __COUT__ << "\tInitialize: "
170 // << __E__; //<< fhiclConfiguration_.to_string() << __E__;
171 // external_request_status_ =
172 // fragment_receiver_ptr_->initialize(fhiclConfiguration_, timeout, timestamp);
173 // __COUT__ << "\tDone Initialize" << __E__;
174 // if(!external_request_status_)
175 // {
176 // report_string_ = "Error initializing ";
177 // report_string_.append(name_ + " ");
178 // report_string_.append("with ParameterSet = \"" + fhiclConfiguration_.to_string() +
179 // "\".");
180 // }
181 // __COUT__ << "\tDone Configure" << __E__;
182 //}
183 //
185 // void ARTDAQConsumer::halt(void)
186 //{
187 // __COUT__ << "\tHalt" << __E__;
188 // // FIXME These are passed as parameters
189 // uint64_t timeout = 45;
190 // // uint64_t timestamp = 184467440737095516;
191 // report_string_ = "";
192 // external_request_status_ = fragment_receiver_ptr_->shutdown(timeout);
193 // if(!external_request_status_)
194 // {
195 // report_string_ = "Error shutting down ";
196 // report_string_.append(name_ + ".");
197 // }
198 //}
199 //
200 //==============================================================================
201 void ARTDAQConsumer::pauseProcessingData(void) { ARTDAQReaderProcessorBase::pause(); }
202 // __COUT__ << "\tPause" << __E__;
203 // // FIXME These are passed as parameters
204 // uint64_t timeout = 45;
205 // uint64_t timestamp = 184467440737095516;
206 // report_string_ = "";
207 // external_request_status_ = fragment_receiver_ptr_->pause(timeout, timestamp);
208 // if(!external_request_status_)
209 // {
210 // report_string_ = "Error pausing ";
211 // report_string_.append(name_ + ".");
212 // }
213 //}
214 //
215 //==============================================================================
216 void ARTDAQConsumer::resumeProcessingData(void) { ARTDAQReaderProcessorBase::resume(); }
217 // __COUT__ << "\tResume" << __E__;
218 // // FIXME These are passed as parameters
219 // uint64_t timeout = 45;
220 // uint64_t timestamp = 184467440737095516;
221 // report_string_ = "";
222 // external_request_status_ = fragment_receiver_ptr_->resume(timeout, timestamp);
223 // if(!external_request_status_)
224 // {
225 // report_string_ = "Error resuming ";
226 // report_string_.append(name_ + ".");
227 // }
228 //}
229 //
230 //==============================================================================
231 void ARTDAQConsumer::startProcessingData(std::string runNumber) { ARTDAQReaderProcessorBase::start(runNumber); }
232 // __COUT__ << "\tStart" << __E__;
233 //
234 // art::RunID runId((art::RunNumber_t)boost::lexical_cast<art::RunNumber_t>(runNumber));
235 //
236 // // FIXME These are passed as parameters
237 // uint64_t timeout = 45;
238 // uint64_t timestamp = 184467440737095516;
239 //
240 // report_string_ = "";
241 // __COUT__ << "\tStart run: " << runId << __E__;
242 // external_request_status_ = fragment_receiver_ptr_->start(runId, timeout, timestamp);
243 // __COUT__ << "\tStart already crashed " << __E__;
244 // if(!external_request_status_)
245 // {
246 // report_string_ = "Error starting ";
247 // report_string_.append(name_ + " ");
248 // report_string_.append("for run number ");
249 // report_string_.append(boost::lexical_cast<std::string>(runId.run()));
250 // report_string_.append(", timeout ");
251 // report_string_.append(boost::lexical_cast<std::string>(timeout));
252 // report_string_.append(", timestamp ");
253 // report_string_.append(boost::lexical_cast<std::string>(timestamp));
254 // report_string_.append(".");
255 // }
256 //
257 // __COUT__ << "STARTING BOARD READER THREAD" << __E__;
258 //}
259 //
260 //==============================================================================
261 void ARTDAQConsumer::stopProcessingData(void) { ARTDAQReaderProcessorBase::stop(); }
262 // __COUT__ << "\tStop" << __E__;
263 // // FIXME These are passed as parameters
264 // uint64_t timeout = 45;
265 // uint64_t timestamp = 184467440737095516;
266 // report_string_ = "";
267 //
268 // auto sts = fragment_receiver_ptr_->status();
269 // if(sts == "Ready")
270 // return; // Already stopped/never started
271 //
272 // external_request_status_ = fragment_receiver_ptr_->stop(timeout, timestamp);
273 // if(!external_request_status_)
274 // {
275 // report_string_ = "Error stopping ";
276 // report_string_.append(name_ + ".");
277 // // return false;
278 // }
279 //}
280 
281 DEFINE_OTS_PROCESSOR(ARTDAQConsumer)