otsdaq  v2_05_02_indev
DataManager.cc
1 #include "otsdaq/DataManager/DataManager.h"
2 #include "otsdaq/ConfigurationInterface/ConfigurationManager.h"
3 #include "otsdaq/DataManager/CircularBuffer.h"
4 #include "otsdaq/DataManager/DataConsumer.h"
5 #include "otsdaq/DataManager/DataProducerBase.h"
6 #include "otsdaq/Macros/CoutMacros.h"
7 #include "otsdaq/MessageFacility/MessageFacility.h"
8 #include "otsdaq/PluginMakers/MakeDataProcessor.h"
9 
10 #include <unistd.h> //usleep
11 #include <iostream>
12 #include <vector>
13 
14 using namespace ots;
15 
16 // clang-format off
17 //==============================================================================
18 DataManager::DataManager(const ConfigurationTree& theXDAQContextConfigTree, const std::string& supervisorConfigurationPath)
19  : Configurable(theXDAQContextConfigTree, supervisorConfigurationPath)
20  , VStateMachine(Configurable::theConfigurationRecordName_)
21  , parentSupervisorHasFrontends_(false)
22 {
23  __CFG_COUT__ << "Constructed." << __E__;
24 } // end constructor
25 // clang-format on
26 
27 //==============================================================================
28 DataManager::~DataManager(void)
29 {
30  __CFG_COUT__ << "Destructor." << __E__;
31  DataManager::destroyBuffers();
32  __CFG_COUT__ << "Destructed." << __E__;
33 } // end destructor
34 
35 //==============================================================================
36 void DataManager::dumpStatus(std::ostream* out) const
37 {
38  *out << "Buffer count: " << buffers_.size() << __E__;
39  for(auto& bufferPair : buffers_)
40  {
41  *out << "\t"
42  << "Buffer '" << bufferPair.first << "' status=" << bufferPair.second.status_ << " producers=" << bufferPair.second.producers_.size()
43  << " consumers=" << bufferPair.second.consumers_.size() << __E__;
44 
45  *out << "\t\t"
46  << "Producers:" << __E__;
47  for(auto& producer : bufferPair.second.producers_)
48  {
49  *out << "\t\t\t" << producer->getProcessorID() << " [" << bufferPair.second.buffer_->getProducerBufferSize(producer->getProcessorID()) << "]"
50  << __E__;
51  }
52  *out << "\t\t"
53  << "Consumers:" << __E__;
54  for(auto& consumer : bufferPair.second.consumers_)
55  {
56  *out << "\t\t\t" << consumer->getProcessorID() << __E__;
57  }
58  }
59 } // end dumpStatus()
60 
61 //==============================================================================
62 void DataManager::configure(void)
63 {
64  const std::string transitionName = "Configuring";
65 
66  const std::string COL_NAME_bufferGroupLink = "LinkToDataBufferTable";
67  const std::string COL_NAME_processorGroupLink = "LinkToDataProcessorTable";
68  const std::string COL_NAME_processorType = "ProcessorType";
69  const std::string COL_NAME_processorPlugin = "ProcessorPluginName";
70  const std::string COL_NAME_processorLink = "LinkToProcessorTable";
71  const std::string COL_NAME_appUID = "ApplicationUID";
72 
73  __CFG_COUT__ << transitionName << " DataManager" << __E__;
74  __CFG_COUT__ << "Path: " << theConfigurationPath_ + "/" + COL_NAME_bufferGroupLink << __E__;
75 
76  destroyBuffers();
77 
78  // get all buffer definitions from configuration tree
79  for(const auto& buffer :
80  theXDAQContextConfigTree_.getNode(theConfigurationPath_ + "/" + COL_NAME_bufferGroupLink).getChildren()) //"/LinkToDataManagerTable").getChildren())
81  {
82  __CFG_COUT__ << "Data Buffer Name: " << buffer.first << __E__;
83  if(buffer.second.getNode(TableViewColumnInfo::COL_NAME_STATUS).getValue<bool>())
84  {
85  std::vector<unsigned int> producersVectorLocation;
86  std::vector<unsigned int> consumersVectorLocation;
87  auto bufferConfigurationList = buffer.second.getNode(COL_NAME_processorGroupLink).getChildren(); //"LinkToDataBufferTable").getChildren();
88  unsigned int location = 0;
89  for(const auto& bufferConfiguration : bufferConfigurationList)
90  {
91  __CFG_COUT__ << "Processor id: " << bufferConfiguration.first << __E__;
92  if(bufferConfiguration.second.getNode(TableViewColumnInfo::COL_NAME_STATUS).getValue<bool>())
93  {
94  if(bufferConfiguration.second.getNode(COL_NAME_processorType).getValue<std::string>() == "Producer")
95  {
96  producersVectorLocation.push_back(location);
97  }
98  else if(bufferConfiguration.second.getNode(COL_NAME_processorType).getValue<std::string>() == "Consumer")
99  {
100  consumersVectorLocation.push_back(location);
101  }
102  else
103  {
104  __CFG_SS__ << "Node ProcessorType in " << bufferConfiguration.first << " of type "
105  << bufferConfiguration.second.getNode(COL_NAME_processorPlugin).getValue<std::string>()
106  << " is invalid. The only accepted types are Producer "
107  "and Consumer"
108  << __E__;
109  __CFG_MOUT_ERR__ << ss.str();
110  __CFG_SS_THROW__;
111  }
112  }
113  ++location;
114 
115  } // end loop sorting by producer and consumer
116 
117  if(!parentSupervisorHasFrontends_ && producersVectorLocation.size() == 0) // || consumersVectorLocation.size() == 0)
118  {
119  __CFG_SS__ << "Node Data Buffer " << buffer.first << " has " << producersVectorLocation.size() << " Producers"
120  << " and " << consumersVectorLocation.size() << " Consumers"
121  << " there must be at least 1 Producer " << // of both configured
122  "for the buffer!" << __E__;
123  __CFG_MOUT_ERR__ << ss.str();
124  __CFG_SS_THROW__;
125  }
126 
127  if(parentSupervisorHasFrontends_)
128  __CFG_COUT__ << "Parent supervisor has front-ends, so FE-producers may "
129  << "be instantiated in the configure steps of the FESupervisor." << __E__;
130 
131  configureBuffer<std::string, std::map<std::string, std::string> >(buffer.first);
132 
133  for(auto& producerLocation : producersVectorLocation)
134  {
135  // __CFG_COUT__ << theConfigurationPath_ << __E__;
136  // __CFG_COUT__ << buffer.first << __E__;
137  __CFG_COUT__ << "Creating producer... " << bufferConfigurationList[producerLocation].first << __E__;
138  // __CFG_COUT__ <<
139  // bufferConfigurationMap[producer].getNode(COL_NAME_processorPlugin).getValue<std::string>()
140  //<< __E__;
141  // __CFG_COUT__ <<
142  // bufferConfigurationMap[producer].getNode("LinkToProcessorTable") <<
143  // __E__;
144  // __CFG_COUT__ << "THIS DATA MANAGER POINTER: " << this <<
145  // __E__;
146  // __CFG_COUT__ << "PASSED" << __E__;
147 
148  try
149  {
150  // buffers_[buffer.first].producers_.push_back(std::shared_ptr<DataProducerBase>(
151  DataProducerBase* tmpCastCheck = dynamic_cast<DataProducerBase*>(
152  makeDataProcessor(bufferConfigurationList[producerLocation].second.getNode(COL_NAME_processorPlugin).getValue<std::string>(),
153  theXDAQContextConfigTree_.getBackNode(theConfigurationPath_).getNode(COL_NAME_appUID).getValue<std::string>(),
154  buffer.first,
155  bufferConfigurationList[producerLocation].first,
156  theXDAQContextConfigTree_,
157  theConfigurationPath_ + "/" + COL_NAME_bufferGroupLink + "/" + buffer.first + "/" + COL_NAME_processorGroupLink +
158  "/" + bufferConfigurationList[producerLocation].first + "/" + COL_NAME_processorLink)); //));
159 
160  if(!tmpCastCheck)
161  {
162  __CFG_SS__ << "Construction failed for producer '" << bufferConfigurationList[producerLocation].first << "!' Null pointer returned."
163  << __E__;
164  __CFG_SS_THROW__;
165  }
166  __CFG_COUT__ << tmpCastCheck->getProcessorID() << __E__;
167 
168  {
169  __CFG_SS__;
170  dumpStatus((std::ostream*)&ss);
171  __CFG_COUT__ << ss.str() << __E__;
172  }
173  }
174  catch(const std::bad_cast& e)
175  {
176  __CFG_SS__ << "Failed to instantiate producer plugin named '" << bufferConfigurationList[producerLocation].first << "' of type '"
177  << bufferConfigurationList[producerLocation].second.getNode(COL_NAME_processorPlugin).getValue<std::string>()
178  << "' due to the following error: \n"
179  << e.what() << __E__;
180  __CFG_MOUT_ERR__ << ss.str();
181  __CFG_SS_THROW__;
182  }
183  catch(const cet::exception& e)
184  {
185  __CFG_SS__ << "Failed to instantiate producer plugin named '" << bufferConfigurationList[producerLocation].first << "' of type '"
186  << bufferConfigurationList[producerLocation].second.getNode(COL_NAME_processorPlugin).getValue<std::string>()
187  << "' due to the following error: \n"
188  << e.what() << __E__;
189  __CFG_MOUT_ERR__ << ss.str();
190  __CFG_SS_THROW__;
191  }
192  catch(const std::runtime_error& e)
193  {
194  __CFG_SS__ << "Failed to instantiate producer plugin named '" << bufferConfigurationList[producerLocation].first << "' of type '"
195  << bufferConfigurationList[producerLocation].second.getNode(COL_NAME_processorPlugin).getValue<std::string>()
196  << "' due to the following error: \n"
197  << e.what() << __E__;
198  __CFG_MOUT_ERR__ << ss.str();
199  __CFG_SS_THROW__;
200  }
201  catch(...)
202  {
203  __CFG_SS__ << "Failed to instantiate producer plugin named '" << bufferConfigurationList[producerLocation].first << "' of type '"
204  << bufferConfigurationList[producerLocation].second.getNode(COL_NAME_processorPlugin).getValue<std::string>()
205  << "' due to an unknown error." << __E__;
206  __CFG_MOUT_ERR__ << ss.str();
207  throw; // if we do not throw, it is hard to tell what is causing the
208  // problem..
209  //__CFG_SS_THROW__;
210  }
211  __CFG_COUT__ << bufferConfigurationList[producerLocation].first << " has been created!" << __E__;
212  } // end producer creation loop
213 
214  for(auto& consumerLocation : consumersVectorLocation)
215  {
216  // __CFG_COUT__ << theConfigurationPath_ << __E__;
217  // __CFG_COUT__ << buffer.first << __E__;
218  __CFG_COUT__ << "Creating consumer... " << bufferConfigurationList[consumerLocation].first << __E__;
219  // __CFG_COUT__ <<
220  // bufferConfigurationMap[consumer].getNode(COL_NAME_processorPlugin).getValue<std::string>()
221  //<< __E__;
222  // __CFG_COUT__ <<
223  // bufferConfigurationMap[consumer].getNode("LinkToProcessorTable") <<
224  // __E__;
225  // __CFG_COUT__ <<
226  // theXDAQContextConfigTree_.getBackNode(theConfigurationPath_) <<
227  // __E__;
228  // __CFG_COUT__ << "THIS DATA MANAGER POINTER: " << this <<
229  // __E__;
230  // __CFG_COUT__ << "PASSED" << __E__;
231  try
232  {
233  // buffers_[buffer.first].consumers_.push_back(std::shared_ptr<DataConsumer>(
234  DataConsumer* tmpCastCheck = dynamic_cast<DataConsumer*>(
235  makeDataProcessor(bufferConfigurationList[consumerLocation].second.getNode(COL_NAME_processorPlugin).getValue<std::string>(),
236  theXDAQContextConfigTree_.getBackNode(theConfigurationPath_).getNode(COL_NAME_appUID).getValue<std::string>(),
237  buffer.first,
238  bufferConfigurationList[consumerLocation].first,
239  theXDAQContextConfigTree_,
240  theConfigurationPath_ + "/" + COL_NAME_bufferGroupLink + "/" + buffer.first + "/" + COL_NAME_processorGroupLink +
241  "/" + bufferConfigurationList[consumerLocation].first + "/" + COL_NAME_processorLink)); //));
242 
243  if(!tmpCastCheck)
244  {
245  __CFG_SS__ << "Construction failed for consumer '" << bufferConfigurationList[consumerLocation].first << "!' Null pointer returned."
246  << __E__;
247  __CFG_SS_THROW__;
248  }
249 
250  {
251  __CFG_SS__;
252  dumpStatus((std::ostream*)&ss);
253  __CFG_COUT__ << ss.str() << __E__;
254  }
255  }
256  catch(const std::bad_cast& e)
257  {
258  __CFG_SS__ << "Failed to instantiate consumer plugin named '" << bufferConfigurationList[consumerLocation].first << "' of type '"
259  << bufferConfigurationList[consumerLocation].second.getNode(COL_NAME_processorPlugin).getValue<std::string>()
260  << "' due to the following error: \n"
261  << e.what() << __E__;
262  __CFG_MOUT_ERR__ << ss.str();
263  __CFG_SS_THROW__;
264  }
265  catch(const cet::exception& e)
266  {
267  __CFG_SS__ << "Failed to instantiate consumer plugin named '" << bufferConfigurationList[consumerLocation].first << "' of type '"
268  << bufferConfigurationList[consumerLocation].second.getNode(COL_NAME_processorPlugin).getValue<std::string>()
269  << "' due to the following error: \n"
270  << e.what() << __E__;
271  __CFG_MOUT_ERR__ << ss.str();
272  __CFG_SS_THROW__;
273  }
274  catch(const std::runtime_error& e)
275  {
276  __CFG_SS__ << "Failed to instantiate consumer plugin named '" << bufferConfigurationList[consumerLocation].first << "' of type '"
277  << bufferConfigurationList[consumerLocation].second.getNode(COL_NAME_processorPlugin).getValue<std::string>()
278  << "' due to the following error: \n"
279  << e.what() << __E__;
280  __CFG_MOUT_ERR__ << ss.str();
281  __CFG_SS_THROW__;
282  }
283  catch(...)
284  {
285  __CFG_SS__ << "Failed to instantiate consumer plugin named '" << bufferConfigurationList[consumerLocation].first << "' of type '"
286  << bufferConfigurationList[consumerLocation].second.getNode(COL_NAME_processorPlugin).getValue<std::string>()
287  << "' due to an unknown error." << __E__;
288  __CFG_MOUT_ERR__ << ss.str();
289  throw; // if we do not throw, it is hard to tell what is happening..
290  //__CFG_SS_THROW__;
291  }
292  __CFG_COUT__ << bufferConfigurationList[consumerLocation].first << " has been created!" << __E__;
293  } // end consumer creation loop
294  }
295  }
296 } // end configure()
297 
298 //==============================================================================
299 void DataManager::halt(void)
300 {
301  const std::string transitionName = "Halting";
302 
303  __CFG_COUT__ << transitionName << " DataManager " << __E__;
304 
305  // since halting also occurs on errors, ignore more errors
306  try
307  {
308  stop();
309  }
310  catch(...)
311  {
312  __CFG_COUT_WARN__ << "An error occurred while halting the Data Manager, ignoring." << __E__;
313  }
314 
315  stop();
316 
317  __CFG_COUT__ << transitionName << " DataManager stopped. Now destruct buffers..." << __E__;
318 
319  DataManager::destroyBuffers(); // Stop all Buffers, deletes all pointers, and delete
320  // Buffer struct
321 } // end halt()
322 
323 //==============================================================================
324 void DataManager::pause(void)
325 {
326  const std::string transitionName = "Pausing";
327 
328  __CFG_COUT__ << transitionName << " DataManager " << __E__;
329 
330  DataManager::pauseAllBuffers();
331 } // end pause()
332 
333 //==============================================================================
334 void DataManager::resume(void)
335 {
336  const std::string transitionName = "Resuming";
337 
338  __CFG_COUT__ << transitionName << " DataManager " << __E__;
339 
340  DataManager::resumeAllBuffers();
341 } // end resume()
342 
343 //==============================================================================
344 void DataManager::start(std::string runNumber)
345 {
346  const std::string transitionName = "Starting";
347 
348  __CFG_COUT__ << transitionName << " DataManager " << __E__;
349 
350  DataManager::startAllBuffers(runNumber);
351 } // end start()
352 
353 //==============================================================================
354 void DataManager::stop()
355 {
356  const std::string transitionName = "Stopping";
357 
358  __CFG_COUT__ << transitionName << " DataManager " << __E__;
359 
360  DataManager::stopAllBuffers();
361 } // end stop()
362 
363 //==============================================================================
364 // destroyBuffers
365 // Stop all Buffers, deletes all pointers, and delete Buffer struct
366 void DataManager::destroyBuffers(void)
367 {
368  DataManager::stopAllBuffers();
369 
370  for(auto& bufferPair : buffers_)
371  {
372  // delete all producers/consumers
373  // then delete CircularBuffer
374  for(auto& producer : bufferPair.second.producers_)
375  delete producer;
376  bufferPair.second.producers_.clear();
377 
378  for(auto& consumer : bufferPair.second.consumers_)
379  delete consumer;
380  bufferPair.second.consumers_.clear();
381 
382  delete bufferPair.second.buffer_;
383  } // end delete buffer loop
384 
385  buffers_.clear();
386 } // end destroyBuffers()
387 
389 // void DataManager::eraseBuffer(const std::string& bufferUID)
390 //{
391 // if (deleteBuffer(bufferUID))
392 // buffers_.erase(bufferUID);
393 //} //end eraseBuffer()
394 //
396 //
397 // bool DataManager::deleteBuffer(const std::string& bufferUID)
398 //{
399 // auto it = buffers_.find(bufferUID);
400 // if (it != buffers_.end())
401 // {
402 // auto aBuffer = it->second;
403 // if (aBuffer.status_ == Running)
404 // stopBuffer(bufferUID);
405 //
411 // aBuffer.consumers_.clear();
412 // // for(auto& itp: aBuffer.producers_)
413 // // delete itp;
414 // aBuffer.producers_.clear();
415 //
416 // delete aBuffer.buffer_;
417 // return true;
418 // }
419 // return false;
420 //} //end deleteBuffer()
421 //
428 // void DataManager::unregisterConsumer(const std::string& bufferID, const std::string&
429 // consumerID)
430 //{
431 // __CFG_COUT__ << "Un-Registering consumer '" << consumerID <<
432 // "' from buffer '" << bufferID << "'..." << __E__;
433 //
434 // auto bufferIt = buffers_.find(bufferID);
435 // if(bufferIt == buffers_.end())
436 // {
437 // __CFG_SS__ << "While Un-Registering consumer '" << consumerID <<
438 // ",' buffer '" << bufferID << "' not found!" << __E__;
439 // __CFG_SS_THROW__;
440 // }
441 //
442 // //just destroy consumer, and it unregisters itself
443 // for (auto consumerIt = bufferIt->second.consumers_.begin();
444 // consumerIt != bufferIt->second.consumers_.end(); consumerIt++)
445 // {
446 // if((*consumerIt)->getProcessorID() == consumerID)
447 // {
448 // bufferIt->second.consumers_.erase(consumerIt);
449 // break;
450 // }
451 // }
452 // //DO NOT DO ANY STRING BASED UNREGISTERING.. leave it to end of DataManager halt
453 // //bufferIt->second.buffer_->unregisterConsumer(consumerID);
454 //
455 // __CFG_COUT__ << "Un-Registered consumer '" << consumerID <<
456 // "' from buffer '" << bufferID << ".'" << __E__;
457 // {__CFG_SS__; dumpStatus((std::ostream*)&ss); __CFG_COUT__ << ss.str() << __E__;}
496 //
512 //
513 // __CFG_SS__ << "While Un-Registering, consumer '" << consumerID <<
514 // "' not found!" << __E__;
515 // __CFG_SS_THROW__;
516 //} //end unregisterConsumer()
517 //
519 // void DataManager::unregisterProducer(const std::string& bufferID, const std::string&
520 // producerID)
521 //{
522 // __CFG_COUT__ << "Un-Registering producer '" << producerID <<
523 // "' from buffer '" << bufferID << "'..." << __E__;
524 //
525 // auto bufferIt = buffers_.find(bufferID);
526 // if(bufferIt == buffers_.end())
527 // {
528 // __CFG_SS__ << "While Un-Registering producer '" << producerID <<
529 // ",' buffer '" << bufferID << "' not found!" << __E__;
530 // __CFG_SS_THROW__;
531 // }
532 //
533 // //DO NOT DO ANY STRING BASED UNREGISTERING.. leave it to end of DataManager halt
534 // //bufferIt->second.unregisterProducer(producerID);
535 //
536 // __CFG_COUT__ << "Un-Registered producer '" << producerID <<
537 // "' from buffer '" << bufferID << ".'" << __E__;
538 // {__CFG_SS__; dumpStatus((std::ostream*)&ss); __CFG_COUT__ << ss.str() << __E__;}
539 //} //end unregisterProducer()
540 
541 //==============================================================================
542 void DataManager::unregisterFEProducer(const std::string& bufferID, const std::string& feProducerID)
543 {
544  __CFG_COUT__ << "Un-Registering FE-producer '" << feProducerID << "' from buffer '" << bufferID << "'..." << __E__;
545 
546  auto bufferIt = buffers_.find(bufferID);
547  if(bufferIt == buffers_.end())
548  {
549  __CFG_SS__ << "While Un-Registering FE-producer '" << feProducerID << ",' buffer '" << bufferID << "' not found!" << __E__;
550  __CFG_SS_THROW__;
551  }
552 
553  // DO NOT DO ANY STRING BASED UNREGISTERING.. leave it to end of DataManager halt
554  // DO NOT DO bufferIt->second.unregisterProducer(producerID);
555 
556  // remove from producer vector
557  // just destroy consumer, and it unregisters itself
558  for(auto feProducerIt = bufferIt->second.producers_.begin(); feProducerIt != bufferIt->second.producers_.end(); feProducerIt++)
559  {
560  if((*feProducerIt)->getProcessorID() == feProducerID)
561  {
562  // do not delete pointer before erasing
563  // because FEVInterfacesManager will delete FEProducer instance
564  bufferIt->second.producers_.erase(feProducerIt);
565  break;
566  }
567  }
568 
569  __CFG_COUT__ << "Un-Registered FE-producer '" << feProducerID << "' from buffer '" << bufferID << ".'" << __E__;
570  {
571  __CFG_SS__;
572  dumpStatus((std::ostream*)&ss);
573  __CFG_COUT__ << ss.str() << __E__;
574  }
575 
576 } // end unregisterFEProducer()
577 
578 //==============================================================================
579 // registerProducer
580 // DataManager takes ownership of producer pointer
581 // and is now responsible for destructing.
582 // Note: in the future, we could pass a shared_ptr, so that source of pointer could
583 // share in destructing responsibility.
584 void DataManager::registerProducer(const std::string& bufferUID, DataProducerBase* producer)
585 {
586  __CFG_COUT__ << "Registering producer '" << producer->getProcessorID() << "' to buffer '" << bufferUID << "'..." << __E__;
587 
588  auto bufferIt = buffers_.find(bufferUID);
589  if(bufferIt == buffers_.end())
590  {
591  __CFG_SS__ << "Can't find buffer UID '" + bufferUID << "' for producer '" << producer->getProcessorID()
592  << ".' Make sure that your configuration is correct!" << __E__;
593 
594  ss << "\n\n Here is the list of buffers:" << __E__;
595  for(const auto bufferPair : buffers_)
596  ss << bufferPair.first << __E__;
597  ss << "\n\n";
598 
599  __CFG_SS_THROW__;
600  }
601 
602  {
603  __CFG_SS__ << "Before!" << __E__;
604  dumpStatus((std::ostream*)&ss);
605  __CFG_COUT__ << ss.str() << __E__;
606  }
607 
608  __CFG_COUTV__(producer->getBufferSize());
609  bufferIt->second.buffer_->registerProducer(producer, producer->getBufferSize());
610  bufferIt->second.producers_.push_back(producer); // this is where ownership is taken!
611 
612  {
613  __CFG_SS__ << "After!" << __E__;
614  dumpStatus((std::ostream*)&ss);
615  __CFG_COUT__ << ss.str() << __E__;
616  }
617 }
618 
619 //==============================================================================
620 void DataManager::registerConsumer(const std::string& bufferUID, DataConsumer* consumer)
621 {
622  __CFG_COUT__ << "Registering consumer '" << consumer->getProcessorID() << "' to buffer '" << bufferUID << "'..." << __E__;
623 
624  auto bufferIt = buffers_.find(bufferUID);
625  if(bufferIt == buffers_.end())
626  {
627  __CFG_SS__ << "Can't find buffer UID '" + bufferUID << "' for consumer '" << consumer->getProcessorID()
628  << ".' Make sure that your configuration is correct!" << __E__;
629 
630  ss << "\n\n Here is the list of buffers:" << __E__;
631  for(const auto bufferPair : buffers_)
632  ss << bufferPair.first << __E__;
633 
634  __CFG_SS_THROW__;
635  }
636 
637  {
638  __CFG_SS__ << "Before!" << __E__;
639  dumpStatus((std::ostream*)&ss);
640  __CFG_COUT__ << ss.str() << __E__;
641  }
642 
643  bufferIt->second.buffer_->registerConsumer(consumer);
644  bufferIt->second.consumers_.push_back(consumer); // this is where ownership is taken!
645 
646  {
647  __CFG_SS__ << "After!" << __E__;
648  dumpStatus((std::ostream*)&ss);
649  __CFG_COUT__ << ss.str() << __E__;
650  }
651 }
652 
653 //==============================================================================
654 void DataManager::startAllBuffers(const std::string& runNumber)
655 {
656  for(auto it = buffers_.begin(); it != buffers_.end(); it++)
657  startBuffer(it->first, runNumber);
658 }
659 
660 //==============================================================================
661 void DataManager::stopAllBuffers(void)
662 {
663  for(auto it = buffers_.begin(); it != buffers_.end(); it++)
664  stopBuffer(it->first);
665 }
666 
667 //==============================================================================
668 void DataManager::resumeAllBuffers(void)
669 {
670  for(auto it = buffers_.begin(); it != buffers_.end(); it++)
671  resumeBuffer(it->first);
672 }
673 
674 //==============================================================================
675 void DataManager::pauseAllBuffers(void)
676 {
677  for(auto it = buffers_.begin(); it != buffers_.end(); it++)
678  pauseBuffer(it->first);
679 }
680 
681 //==============================================================================
682 void DataManager::startBuffer(const std::string& bufferUID, std::string runNumber)
683 {
684  __CFG_COUT__ << "Starting... " << bufferUID << __E__;
685 
686  buffers_[bufferUID].buffer_->reset();
687  for(auto& it : buffers_[bufferUID].consumers_)
688  {
689  // use try..catch to make sure there is some identifying trail for errors
690  try
691  {
692  it->startProcessingData(runNumber);
693  }
694  catch(...)
695  {
696  __CFG_COUT_WARN__ << "An error occurred while starting consumer '" << it->getProcessorID() << "'..." << __E__;
697  throw;
698  }
699  }
700 
701  for(auto& it : buffers_[bufferUID].producers_)
702  {
703  // use try..catch to make sure there is some identifying trail for errors
704  try
705  {
706  it->startProcessingData(runNumber);
707  }
708  catch(...)
709  {
710  __CFG_COUT_WARN__ << "An error occurred while starting producer '" << it->getProcessorID() << "'..." << __E__;
711  throw;
712  }
713  }
714 
715  buffers_[bufferUID].status_ = Running;
716 
717 } // end startBuffer()
718 
719 //==============================================================================
720 void DataManager::stopBuffer(const std::string& bufferUID)
721 {
722  __CFG_COUT__ << "Stopping... " << bufferUID << __E__;
723 
724  __CFG_COUT__ << "Stopping producers..." << __E__;
725  for(auto& it : buffers_[bufferUID].producers_)
726  {
727  // use try..catch to make sure there is some identifying trail for errors
728  try
729  {
730  it->stopProcessingData();
731  }
732  catch(...)
733  {
734  __CFG_COUT_WARN__ << "An error occurred while stopping producer '" << it->getProcessorID() << "'..." << __E__;
735  throw;
736  }
737  }
738 
739  // Wait until all buffers are flushed
740  unsigned int timeOut = 0;
741  const unsigned int ratio = 100;
742  const unsigned int sleepTime = 1000 * ratio;
743  unsigned int totalSleepTime = sleepTime / ratio * buffers_[bufferUID].buffer_->getTotalNumberOfSubBuffers(); // 1 milliseconds for each buffer!!!!
744  if(totalSleepTime < 5000000)
745  totalSleepTime = 5000000; // At least 5 seconds
746  while(!buffers_[bufferUID].buffer_->isEmpty())
747  {
748  usleep(sleepTime);
749  timeOut += sleepTime;
750  if(timeOut > totalSleepTime)
751  {
752  __CFG_COUT__ << "Couldn't flush all buffers! Timing out after " << totalSleepTime / 1000000. << " seconds!" << __E__;
753  buffers_[bufferUID].buffer_->isEmpty();
754  break;
755  }
756  }
757  __CFG_COUT__ << "Stopping consumers, buffer MUST BE EMPTY. Is buffer empty? " << (buffers_[bufferUID].buffer_->isEmpty() ? "yes" : "no") << __E__;
758 
759  for(auto& it : buffers_[bufferUID].consumers_)
760  {
761  // use try..catch to make sure there is some identifying trail for errors
762  try
763  {
764  it->stopProcessingData();
765  }
766  catch(...)
767  {
768  __CFG_COUT_WARN__ << "An error occurred while stopping consumer '" << it->getProcessorID() << "'..." << __E__;
769  throw;
770  }
771  }
772 
773  buffers_[bufferUID].buffer_->reset();
774  buffers_[bufferUID].status_ = Initialized;
775 } // end stopBuffer()
776 
777 //==============================================================================
778 void DataManager::resumeBuffer(const std::string& bufferUID)
779 {
780  __CFG_COUT__ << "Resuming... " << bufferUID << __E__;
781 
782  for(auto& it : buffers_[bufferUID].consumers_)
783  it->resumeProcessingData();
784  for(auto& it : buffers_[bufferUID].producers_)
785  it->resumeProcessingData();
786 
787  buffers_[bufferUID].status_ = Running;
788 } // end resumeBuffer()
789 
790 //==============================================================================
791 void DataManager::pauseBuffer(const std::string& bufferUID)
792 {
793  __CFG_COUT__ << "Pausing... " << bufferUID << __E__;
794 
795  for(auto& it : buffers_[bufferUID].producers_)
796  it->pauseProcessingData();
797  // Wait until all buffers are flushed
798  unsigned int timeOut = 0;
799  const unsigned int sleepTime = 1000;
800  while(!buffers_[bufferUID].buffer_->isEmpty())
801  {
802  usleep(sleepTime);
803  timeOut += sleepTime;
804  if(timeOut > sleepTime * buffers_[bufferUID].buffer_->getTotalNumberOfSubBuffers()) // 1
805  // milliseconds
806  // for each
807  // buffer!!!!
808  {
809  __CFG_COUT__ << "Couldn't flush all buffers! Timing out after " << buffers_[bufferUID].buffer_->getTotalNumberOfSubBuffers() * sleepTime / 1000000.
810  << " seconds!" << __E__;
811  break;
812  }
813  }
814  for(auto& it : buffers_[bufferUID].consumers_)
815  it->pauseProcessingData();
816  buffers_[bufferUID].status_ = Initialized;
817 } // end pauseBuffer()
void unregisterFEProducer(const std::string &bufferID, const std::string &feProducerID)
Definition: DataManager.cc:542
void startAllBuffers(const std::string &runNumber)
Definition: DataManager.cc:654