tdaq-develop-2025-02-12
DataManager.cc
1 #include "otsdaq/DataManager/DataManager.h"
2 #include "otsdaq/ConfigurationInterface/ConfigurationManager.h"
3 #include "otsdaq/DataManager/CircularBuffer.h"
4 #include "otsdaq/DataManager/DataConsumer.h"
5 #include "otsdaq/DataManager/DataProducerBase.h"
6 #include "otsdaq/DataManager/MakeDataProcessor.h"
7 #include "otsdaq/Macros/CoutMacros.h"
8 #include "otsdaq/MessageFacility/MessageFacility.h"
9 
10 #include <unistd.h> //usleep
11 #include <iostream>
12 #include <vector>
13 
14 using namespace ots;
15 
16 // clang-format off
17 //==============================================================================
18 DataManager::DataManager(const ConfigurationTree& theXDAQContextConfigTree, const std::string& supervisorConfigurationPath)
19  : Configurable(theXDAQContextConfigTree, supervisorConfigurationPath)
20  , VStateMachine(Configurable::theConfigurationRecordName_)
21  , parentSupervisorHasFrontends_(false)
22 {
23  __CFG_COUT__ << "Constructed." << __E__;
24 } // end constructor
25 // clang-format on
26 
27 //==============================================================================
28 DataManager::~DataManager(void)
29 {
30  __CFG_COUT__ << "Destructor." << __E__;
32  __CFG_COUT__ << "Destructed." << __E__;
33 } // end destructor
34 
35 //==============================================================================
36 void DataManager::dumpStatus(std::ostream* out) const
37 {
38  *out << "Buffer count: " << buffers_.size() << __E__;
39  for(auto& bufferPair : buffers_)
40  {
41  *out << "\t"
42  << "Buffer '" << bufferPair.first << "' status=" << bufferPair.second.status_
43  << " producers=" << bufferPair.second.producers_.size()
44  << " consumers=" << bufferPair.second.consumers_.size() << __E__;
45 
46  *out << "\t\t"
47  << "Producers:" << __E__;
48  for(auto& producer : bufferPair.second.producers_)
49  {
50  *out << "\t\t\t" << producer->getProcessorID() << " ["
51  << bufferPair.second.buffer_->getProducerBufferSize(
52  producer->getProcessorID())
53  << "]" << __E__;
54  }
55  *out << "\t\t"
56  << "Consumers:" << __E__;
57  for(auto& consumer : bufferPair.second.consumers_)
58  {
59  *out << "\t\t\t" << consumer->getProcessorID() << __E__;
60  }
61  }
62 } // end dumpStatus()
63 
64 //==============================================================================
66 {
67  const std::string transitionName = "Configuring";
68 
69  const std::string COL_NAME_bufferGroupLink = "LinkToDataBufferTable";
70  const std::string COL_NAME_processorGroupLink = "LinkToDataProcessorTable";
71  const std::string COL_NAME_processorType = "ProcessorType";
72  const std::string COL_NAME_processorPlugin = "ProcessorPluginName";
73  const std::string COL_NAME_processorLink = "LinkToProcessorTable";
74  const std::string COL_NAME_appUID = "ApplicationUID";
75 
76  __CFG_COUT__ << transitionName << " DataManager" << __E__;
77  __CFG_COUT__ << "Path: " << theConfigurationPath_ + "/" + COL_NAME_bufferGroupLink
78  << __E__;
79 
81 
82  // get all buffer definitions from configuration tree
83  for(const auto& buffer :
84  theXDAQContextConfigTree_
85  .getNode(theConfigurationPath_ + "/" + COL_NAME_bufferGroupLink)
86  .getChildren()) //"/LinkToDataManagerTable").getChildren())
87  {
88  __CFG_COUT__ << "Data Buffer Name: " << buffer.first << __E__;
89  if(buffer.second.getNode(TableViewColumnInfo::COL_NAME_STATUS).getValue<bool>())
90  {
91  std::vector<unsigned int> producersVectorLocation;
92  std::vector<unsigned int> consumersVectorLocation;
93  auto bufferConfigurationList =
94  buffer.second.getNode(COL_NAME_processorGroupLink)
95  .getChildren(); //"LinkToDataBufferTable").getChildren();
96  unsigned int location = 0;
97  for(const auto& bufferConfiguration : bufferConfigurationList)
98  {
99  __CFG_COUT__ << "Processor id: " << bufferConfiguration.first << __E__;
100  if(bufferConfiguration.second
101  .getNode(TableViewColumnInfo::COL_NAME_STATUS)
102  .getValue<bool>())
103  {
104  if(bufferConfiguration.second.getNode(COL_NAME_processorType)
105  .getValue<std::string>() == "Producer")
106  {
107  producersVectorLocation.push_back(location);
108  }
109  else if(bufferConfiguration.second.getNode(COL_NAME_processorType)
110  .getValue<std::string>() == "Consumer")
111  {
112  consumersVectorLocation.push_back(location);
113  }
114  else
115  {
116  __CFG_SS__ << "Node ProcessorType in "
117  << bufferConfiguration.first << " of type "
118  << bufferConfiguration.second
119  .getNode(COL_NAME_processorPlugin)
120  .getValue<std::string>()
121  << " is invalid. The only accepted types are Producer "
122  "and Consumer"
123  << __E__;
124  __CFG_COUT_ERR__ << ss.str();
125  __CFG_SS_THROW__;
126  }
127  }
128  ++location;
129 
130  } // end loop sorting by producer and consumer
131 
133  producersVectorLocation.size() ==
134  0) // || consumersVectorLocation.size() == 0)
135  {
136  __CFG_SS__ << "Node Data Buffer " << buffer.first << " has "
137  << producersVectorLocation.size() << " Producers"
138  << " and " << consumersVectorLocation.size() << " Consumers"
139  << " there must be at least 1 Producer "
140  << // of both configured
141  "for the buffer!" << __E__;
142  __CFG_COUT_ERR__ << ss.str();
143  __CFG_SS_THROW__;
144  }
145 
147  __CFG_COUT__
148  << "Parent supervisor has front-ends, so FE-producers may "
149  << "be instantiated in the configure steps of the FESupervisor."
150  << __E__;
151 
152  configureBuffer<std::string, std::map<std::string, std::string> >(
153  buffer.first);
154 
155  for(auto& producerLocation : producersVectorLocation)
156  {
157  // __CFG_COUT__ << theConfigurationPath_ << __E__;
158  // __CFG_COUT__ << buffer.first << __E__;
159  __CFG_COUT__ << "Creating producer... "
160  << bufferConfigurationList[producerLocation].first << __E__;
161  // __CFG_COUT__ <<
162  // bufferConfigurationMap[producer].getNode(COL_NAME_processorPlugin).getValue<std::string>()
163  //<< __E__;
164  // __CFG_COUT__ <<
165  // bufferConfigurationMap[producer].getNode("LinkToProcessorTable") <<
166  // __E__;
167  // __CFG_COUT__ << "THIS DATA MANAGER POINTER: " << this <<
168  // __E__;
169  // __CFG_COUT__ << "PASSED" << __E__;
170 
171  try
172  {
173  // buffers_[buffer.first].producers_.push_back(std::shared_ptr<DataProducerBase>(
174  DataProducerBase* tmpCastCheck =
175  dynamic_cast<DataProducerBase*>(makeDataProcessor(
176  bufferConfigurationList[producerLocation]
177  .second.getNode(COL_NAME_processorPlugin)
178  .getValue<std::string>(),
179  theXDAQContextConfigTree_.getBackNode(theConfigurationPath_)
180  .getNode(COL_NAME_appUID)
181  .getValue<std::string>(),
182  buffer.first,
183  bufferConfigurationList[producerLocation].first,
184  theXDAQContextConfigTree_,
185  theConfigurationPath_ + "/" + COL_NAME_bufferGroupLink + "/" +
186  buffer.first + "/" + COL_NAME_processorGroupLink + "/" +
187  bufferConfigurationList[producerLocation].first + "/" +
188  COL_NAME_processorLink)); //));
189 
190  if(!tmpCastCheck)
191  {
192  __CFG_SS__ << "Construction failed for producer '"
193  << bufferConfigurationList[producerLocation].first
194  << "!' Null pointer returned." << __E__;
195  __CFG_SS_THROW__;
196  }
197  __CFG_COUT__ << tmpCastCheck->getProcessorID() << __E__;
198 
199  {
200  __CFG_SS__;
201  dumpStatus((std::ostream*)&ss);
202  __CFG_COUT__ << ss.str() << __E__;
203  }
204  }
205  catch(const std::bad_cast& e)
206  {
207  __CFG_SS__ << "Failed to instantiate producer plugin named '"
208  << bufferConfigurationList[producerLocation].first
209  << "' of type '"
210  << bufferConfigurationList[producerLocation]
211  .second.getNode(COL_NAME_processorPlugin)
212  .getValue<std::string>()
213  << "' due to the following error: \n"
214  << e.what() << __E__;
215  __CFG_COUT_ERR__ << ss.str();
216  __CFG_SS_THROW__;
217  }
218  catch(const cet::exception& e)
219  {
220  __CFG_SS__ << "Failed to instantiate producer plugin named '"
221  << bufferConfigurationList[producerLocation].first
222  << "' of type '"
223  << bufferConfigurationList[producerLocation]
224  .second.getNode(COL_NAME_processorPlugin)
225  .getValue<std::string>()
226  << "' due to the following error: \n"
227  << e.what() << __E__;
228  __CFG_COUT_ERR__ << ss.str();
229  __CFG_SS_THROW__;
230  }
231  catch(const std::runtime_error& e)
232  {
233  __CFG_SS__ << "Failed to instantiate producer plugin named '"
234  << bufferConfigurationList[producerLocation].first
235  << "' of type '"
236  << bufferConfigurationList[producerLocation]
237  .second.getNode(COL_NAME_processorPlugin)
238  .getValue<std::string>()
239  << "' due to the following error: \n"
240  << e.what() << __E__;
241  __CFG_COUT_ERR__ << ss.str();
242  __CFG_SS_THROW__;
243  }
244  catch(...)
245  {
246  __CFG_SS__ << "Failed to instantiate producer plugin named '"
247  << bufferConfigurationList[producerLocation].first
248  << "' of type '"
249  << bufferConfigurationList[producerLocation]
250  .second.getNode(COL_NAME_processorPlugin)
251  .getValue<std::string>()
252  << "' due to an unknown error." << __E__;
253  try
254  {
255  throw;
256  } //one more try to printout extra info
257  catch(const std::exception& e)
258  {
259  ss << "Exception message: " << e.what();
260  }
261  catch(...)
262  {
263  }
264  __CFG_COUT_ERR__ << ss.str();
265  throw; // if we do not throw, it is hard to tell what is causing the
266  // problem..
267  //__CFG_SS_THROW__;
268  }
269  __CFG_COUT__ << bufferConfigurationList[producerLocation].first
270  << " has been created!" << __E__;
271  } // end producer creation loop
272 
273  for(auto& consumerLocation : consumersVectorLocation)
274  {
275  // __CFG_COUT__ << theConfigurationPath_ << __E__;
276  // __CFG_COUT__ << buffer.first << __E__;
277  __CFG_COUT__ << "Creating consumer... "
278  << bufferConfigurationList[consumerLocation].first << __E__;
279  // __CFG_COUT__ <<
280  // bufferConfigurationMap[consumer].getNode(COL_NAME_processorPlugin).getValue<std::string>()
281  //<< __E__;
282  // __CFG_COUT__ <<
283  // bufferConfigurationMap[consumer].getNode("LinkToProcessorTable") <<
284  // __E__;
285  // __CFG_COUT__ <<
286  // theXDAQContextConfigTree_.getBackNode(theConfigurationPath_) <<
287  // __E__;
288  // __CFG_COUT__ << "THIS DATA MANAGER POINTER: " << this <<
289  // __E__;
290  // __CFG_COUT__ << "PASSED" << __E__;
291  try
292  {
293  // buffers_[buffer.first].consumers_.push_back(std::shared_ptr<DataConsumer>(
294  DataConsumer* tmpCastCheck =
295  dynamic_cast<DataConsumer*>(makeDataProcessor(
296  bufferConfigurationList[consumerLocation]
297  .second.getNode(COL_NAME_processorPlugin)
298  .getValue<std::string>(),
299  theXDAQContextConfigTree_.getBackNode(theConfigurationPath_)
300  .getNode(COL_NAME_appUID)
301  .getValue<std::string>(),
302  buffer.first,
303  bufferConfigurationList[consumerLocation].first,
304  theXDAQContextConfigTree_,
305  theConfigurationPath_ + "/" + COL_NAME_bufferGroupLink + "/" +
306  buffer.first + "/" + COL_NAME_processorGroupLink + "/" +
307  bufferConfigurationList[consumerLocation].first + "/" +
308  COL_NAME_processorLink)); //));
309 
310  if(!tmpCastCheck)
311  {
312  __CFG_SS__ << "Construction failed for consumer '"
313  << bufferConfigurationList[consumerLocation].first
314  << "!' Null pointer returned." << __E__;
315  __CFG_SS_THROW__;
316  }
317 
318  {
319  __CFG_SS__;
320  dumpStatus((std::ostream*)&ss);
321  __CFG_COUT__ << ss.str() << __E__;
322  }
323  }
324  catch(const std::bad_cast& e)
325  {
326  __CFG_SS__ << "Failed to instantiate consumer plugin named '"
327  << bufferConfigurationList[consumerLocation].first
328  << "' of type '"
329  << bufferConfigurationList[consumerLocation]
330  .second.getNode(COL_NAME_processorPlugin)
331  .getValue<std::string>()
332  << "' due to the following error: \n"
333  << e.what() << __E__;
334  __CFG_COUT_ERR__ << ss.str();
335  __CFG_SS_THROW__;
336  }
337  catch(const cet::exception& e)
338  {
339  __CFG_SS__ << "Failed to instantiate consumer plugin named '"
340  << bufferConfigurationList[consumerLocation].first
341  << "' of type '"
342  << bufferConfigurationList[consumerLocation]
343  .second.getNode(COL_NAME_processorPlugin)
344  .getValue<std::string>()
345  << "' due to the following error: \n"
346  << e.what() << __E__;
347  __CFG_COUT_ERR__ << ss.str();
348  __CFG_SS_THROW__;
349  }
350  catch(const std::runtime_error& e)
351  {
352  __CFG_SS__ << "Failed to instantiate consumer plugin named '"
353  << bufferConfigurationList[consumerLocation].first
354  << "' of type '"
355  << bufferConfigurationList[consumerLocation]
356  .second.getNode(COL_NAME_processorPlugin)
357  .getValue<std::string>()
358  << "' due to the following error: \n"
359  << e.what() << __E__;
360  __CFG_COUT_ERR__ << ss.str();
361  __CFG_SS_THROW__;
362  }
363  catch(...)
364  {
365  __CFG_SS__ << "Failed to instantiate consumer plugin named '"
366  << bufferConfigurationList[consumerLocation].first
367  << "' of type '"
368  << bufferConfigurationList[consumerLocation]
369  .second.getNode(COL_NAME_processorPlugin)
370  .getValue<std::string>()
371  << "' due to an unknown error." << __E__;
372  try
373  {
374  throw;
375  } //one more try to printout extra info
376  catch(const std::exception& e)
377  {
378  ss << "Exception message: " << e.what();
379  }
380  catch(...)
381  {
382  }
383  __CFG_COUT_ERR__ << ss.str();
384  throw; // if we do not throw, it is hard to tell what is happening..
385  //__CFG_SS_THROW__;
386  }
387  __CFG_COUT__ << bufferConfigurationList[consumerLocation].first
388  << " has been created!" << __E__;
389  } // end consumer creation loop
390  }
391  }
393 } // end configure()
394 
395 //==============================================================================
396 void DataManager::halt(void)
397 {
398  const std::string transitionName = "Halting";
399 
400  __CFG_COUT__ << transitionName << " DataManager " << __E__;
401 
402  // since halting also occurs on errors, ignore more errors
403  try
404  {
405  stop();
406  }
407  catch(...)
408  {
409  __CFG_COUT_WARN__ << "An error occurred while halting the Data Manager, ignoring."
410  << __E__;
411  }
412 
413  stop();
414 
415  __CFG_COUT__ << transitionName << " DataManager stopped. Now destruct buffers..."
416  << __E__;
417 
418  DataManager::destroyBuffers(); // Stop all Buffers, deletes all pointers, and delete
419  // Buffer struct
420 } // end halt()
421 
422 //==============================================================================
423 void DataManager::pause(void)
424 {
425  const std::string transitionName = "Pausing";
426 
427  __CFG_COUT__ << transitionName << " DataManager " << __E__;
428 
429  DataManager::pauseAllBuffers();
430 } // end pause()
431 
432 //==============================================================================
433 void DataManager::resume(void)
434 {
435  const std::string transitionName = "Resuming";
436 
437  __CFG_COUT__ << transitionName << " DataManager " << __E__;
438 
439  DataManager::resumeAllBuffers();
440 } // end resume()
441 
442 //==============================================================================
443 void DataManager::start(std::string runNumber)
444 {
445  const std::string transitionName = "Starting";
446 
447  __CFG_COUT__ << transitionName << " DataManager " << __E__;
448 
449  DataManager::startAllBuffers(runNumber);
450 } // end start()
451 
452 //==============================================================================
453 void DataManager::stop()
454 {
455  const std::string transitionName = "Stopping";
456 
457  __CFG_COUT__ << transitionName << " DataManager " << __E__;
458 
459  DataManager::stopAllBuffers();
460 } // end stop()
461 
462 //==============================================================================
466 {
467  DataManager::stopAllBuffers();
468 
469  for(auto& bufferPair : buffers_)
470  {
471  // delete all producers/consumers
472  // then delete CircularBuffer
473  for(auto& producer : bufferPair.second.producers_)
474  delete producer;
475  bufferPair.second.producers_.clear();
476 
477  for(auto& consumer : bufferPair.second.consumers_)
478  delete consumer;
479  bufferPair.second.consumers_.clear();
480 
481  delete bufferPair.second.buffer_;
482  } // end delete buffer loop
483 
484  buffers_.clear();
485 } // end destroyBuffers()
486 
489 //{
490 // if (deleteBuffer(bufferUID))
491 // buffers_.erase(bufferUID);
492 //} //end eraseBuffer()
493 
497 //{
498 // auto it = buffers_.find(bufferUID);
499 // if (it != buffers_.end())
500 // {
501 // auto aBuffer = it->second;
502 // if (aBuffer.status_ == Running)
503 // stopBuffer(bufferUID);
504 //
510 // aBuffer.consumers_.clear();
511 // // for(auto& itp: aBuffer.producers_)
512 // // delete itp;
513 // aBuffer.producers_.clear();
514 //
515 // delete aBuffer.buffer_;
516 // return true;
517 // }
518 // return false;
519 //} //end deleteBuffer()
520 
529 //{
530 // __CFG_COUT__ << "Un-Registering consumer '" << consumerID <<
531 // "' from buffer '" << bufferID << "'..." << __E__;
532 //
533 // auto bufferIt = buffers_.find(bufferID);
534 // if(bufferIt == buffers_.end())
535 // {
536 // __CFG_SS__ << "While Un-Registering consumer '" << consumerID <<
537 // ",' buffer '" << bufferID << "' not found!" << __E__;
538 // __CFG_SS_THROW__;
539 // }
540 //
541 // //just destroy consumer, and it unregisters itself
542 // for (auto consumerIt = bufferIt->second.consumers_.begin();
543 // consumerIt != bufferIt->second.consumers_.end(); consumerIt++)
544 // {
545 // if((*consumerIt)->getProcessorID() == consumerID)
546 // {
547 // bufferIt->second.consumers_.erase(consumerIt);
548 // break;
549 // }
550 // }
551 // //DO NOT DO ANY STRING BASED UNREGISTERING.. leave it to end of DataManager halt
552 // //bufferIt->second.buffer_->unregisterConsumer(consumerID);
553 //
554 // __CFG_COUT__ << "Un-Registered consumer '" << consumerID <<
555 // "' from buffer '" << bufferID << ".'" << __E__;
556 // {__CFG_SS__; dumpStatus((std::ostream*)&ss); __CFG_COUT__ << ss.str() << __E__;}
595 //
611 //
612 // __CFG_SS__ << "While Un-Registering, consumer '" << consumerID <<
613 // "' not found!" << __E__;
614 // __CFG_SS_THROW__;
615 //} //end unregisterConsumer()
616 
620 //{
621 // __CFG_COUT__ << "Un-Registering producer '" << producerID <<
622 // "' from buffer '" << bufferID << "'..." << __E__;
623 //
624 // auto bufferIt = buffers_.find(bufferID);
625 // if(bufferIt == buffers_.end())
626 // {
627 // __CFG_SS__ << "While Un-Registering producer '" << producerID <<
628 // ",' buffer '" << bufferID << "' not found!" << __E__;
629 // __CFG_SS_THROW__;
630 // }
631 //
632 // //DO NOT DO ANY STRING BASED UNREGISTERING.. leave it to end of DataManager halt
633 // //bufferIt->second.unregisterProducer(producerID);
634 //
635 // __CFG_COUT__ << "Un-Registered producer '" << producerID <<
636 // "' from buffer '" << bufferID << ".'" << __E__;
637 // {__CFG_SS__; dumpStatus((std::ostream*)&ss); __CFG_COUT__ << ss.str() << __E__;}
638 //} //end unregisterProducer()
639 
640 //==============================================================================
641 void DataManager::unregisterFEProducer(const std::string& bufferID,
642  const std::string& feProducerID)
643 {
644  __CFG_COUT__ << "Un-Registering FE-producer '" << feProducerID << "' from buffer '"
645  << bufferID << "'..." << __E__;
646 
647  auto bufferIt = buffers_.find(bufferID);
648  if(bufferIt == buffers_.end())
649  {
650  __CFG_SS__ << "While Un-Registering FE-producer '" << feProducerID
651  << ",' buffer '" << bufferID << "' not found!" << __E__;
652  __CFG_SS_THROW__;
653  }
654 
655  // DO NOT DO ANY STRING BASED UNREGISTERING.. leave it to end of DataManager halt
656  // DO NOT DO bufferIt->second.unregisterProducer(producerID);
657 
658  // remove from producer vector
659  // just destroy consumer, and it unregisters itself
660  for(auto feProducerIt = bufferIt->second.producers_.begin();
661  feProducerIt != bufferIt->second.producers_.end();
662  feProducerIt++)
663  {
664  if((*feProducerIt)->getProcessorID() == feProducerID)
665  {
666  // do not delete pointer before erasing
667  // because FEVInterfacesManager will delete FEProducer instance
668  bufferIt->second.producers_.erase(feProducerIt);
669  break;
670  }
671  }
672 
673  __CFG_COUT__ << "Un-Registered FE-producer '" << feProducerID << "' from buffer '"
674  << bufferID << ".'" << __E__;
675  {
676  __CFG_SS__;
677  dumpStatus((std::ostream*)&ss);
678  __CFG_COUT__ << ss.str() << __E__;
679  }
680 
681 } // end unregisterFEProducer()
682 
683 //==============================================================================
689 void DataManager::registerProducer(const std::string& bufferUID,
690  DataProducerBase* producer)
691 {
692  __CFG_COUT__ << "Registering producer '" << producer->getProcessorID()
693  << "' to buffer '" << bufferUID << "'..." << __E__;
694 
695  auto bufferIt = buffers_.find(bufferUID);
696  if(bufferIt == buffers_.end())
697  {
698  __CFG_SS__ << "Can't find buffer UID '" + bufferUID << "' for producer '"
699  << producer->getProcessorID()
700  << ".' Make sure that your configuration is correct!" << __E__;
701 
702  ss << "\n\n Here is the list of buffers:" << __E__;
703  for(const auto& bufferPair : buffers_)
704  ss << bufferPair.first << __E__;
705  ss << "\n\n";
706 
707  __CFG_SS_THROW__;
708  }
709 
710  {
711  __CFG_SS__ << "Before!" << __E__;
712  dumpStatus((std::ostream*)&ss);
713  __CFG_COUT__ << ss.str() << __E__;
714  }
715 
716  __CFG_COUTV__(producer->getBufferSize());
717  bufferIt->second.buffer_->registerProducer(producer, producer->getBufferSize());
718  bufferIt->second.producers_.push_back(producer); // this is where ownership is taken!
719 
720  {
721  __CFG_SS__ << "After!" << __E__;
722  dumpStatus((std::ostream*)&ss);
723  __CFG_COUT__ << ss.str() << __E__;
724  }
725 }
726 
727 //==============================================================================
728 void DataManager::registerConsumer(const std::string& bufferUID, DataConsumer* consumer)
729 {
730  __CFG_COUT__ << "Registering consumer '" << consumer->getProcessorID()
731  << "' to buffer '" << bufferUID << "'..." << __E__;
732 
733  auto bufferIt = buffers_.find(bufferUID);
734  if(bufferIt == buffers_.end())
735  {
736  __CFG_SS__ << "Can't find buffer UID '" + bufferUID << "' for consumer '"
737  << consumer->getProcessorID()
738  << ".' Make sure that your configuration is correct!" << __E__;
739 
740  ss << "\n\n Here is the list of buffers:" << __E__;
741  for(const auto& bufferPair : buffers_)
742  ss << bufferPair.first << __E__;
743 
744  __CFG_SS_THROW__;
745  }
746 
747  {
748  __CFG_SS__ << "Before!" << __E__;
749  dumpStatus((std::ostream*)&ss);
750  __CFG_COUT__ << ss.str() << __E__;
751  }
752 
753  bufferIt->second.buffer_->registerConsumer(consumer);
754  bufferIt->second.consumers_.push_back(consumer); // this is where ownership is taken!
755 
756  {
757  __CFG_SS__ << "After!" << __E__;
758  dumpStatus((std::ostream*)&ss);
759  __CFG_COUT__ << ss.str() << __E__;
760  }
761 }
762 
763 //==============================================================================
765 {
766  for(auto it = buffers_.begin(); it != buffers_.end(); it++)
767  configureBuffer(it->first);
768 }
769 
770 //==============================================================================
771 void DataManager::startAllBuffers(const std::string& runNumber)
772 {
773  for(auto it = buffers_.begin(); it != buffers_.end(); it++)
774  startBuffer(it->first, runNumber);
775 }
776 
777 //==============================================================================
778 void DataManager::stopAllBuffers(void)
779 {
780  for(auto it = buffers_.begin(); it != buffers_.end(); it++)
781  stopBuffer(it->first);
782 }
783 
784 //==============================================================================
785 void DataManager::resumeAllBuffers(void)
786 {
787  for(auto it = buffers_.begin(); it != buffers_.end(); it++)
788  resumeBuffer(it->first);
789 }
790 
791 //==============================================================================
792 void DataManager::pauseAllBuffers(void)
793 {
794  for(auto it = buffers_.begin(); it != buffers_.end(); it++)
795  pauseBuffer(it->first);
796 }
797 
798 //==============================================================================
799 void DataManager::configureBuffer(const std::string& bufferUID)
800 {
801  __CFG_COUT__ << "Configuring... " << bufferUID << __E__;
802 
803  for(auto& it : buffers_[bufferUID].consumers_)
804  {
805  // use try..catch to make sure there is some identifying trail for errors
806  try
807  {
808  it->configure();
809  }
810  catch(...)
811  {
812  __CFG_COUT_WARN__ << "An error occurred while configuring consumer '"
813  << it->getProcessorID() << "'..." << __E__;
814  throw;
815  }
816  }
817 
818  for(auto& it : buffers_[bufferUID].producers_)
819  {
820  // use try..catch to make sure there is some identifying trail for errors
821  try
822  {
823  it->configure();
824  }
825  catch(...)
826  {
827  __CFG_COUT_WARN__ << "An error occurred while starting producer '"
828  << it->getProcessorID() << "'..." << __E__;
829  throw;
830  }
831  }
832 
833  buffers_[bufferUID].status_ = Initialized;
834 
835 } // end startBuffer()
836 
837 //==============================================================================
838 void DataManager::startBuffer(const std::string& bufferUID, std::string runNumber)
839 {
840  __CFG_COUT__ << "Starting... " << bufferUID << __E__;
841 
842  buffers_[bufferUID].buffer_->reset();
843  for(auto& it : buffers_[bufferUID].consumers_)
844  {
845  // use try..catch to make sure there is some identifying trail for errors
846  try
847  {
848  it->startProcessingData(runNumber);
849  }
850  catch(...)
851  {
852  __CFG_COUT_WARN__ << "An error occurred while starting consumer '"
853  << it->getProcessorID() << "'..." << __E__;
854  throw;
855  }
856  }
857 
858  for(auto& it : buffers_[bufferUID].producers_)
859  {
860  // use try..catch to make sure there is some identifying trail for errors
861  try
862  {
863  it->startProcessingData(runNumber);
864  }
865  catch(...)
866  {
867  __CFG_COUT_WARN__ << "An error occurred while starting producer '"
868  << it->getProcessorID() << "'..." << __E__;
869  throw;
870  }
871  }
872 
873  buffers_[bufferUID].status_ = Running;
874 
875 } // end startBuffer()
876 
877 //==============================================================================
878 void DataManager::stopBuffer(const std::string& bufferUID)
879 {
880  __CFG_COUT__ << "Stopping... " << bufferUID << __E__;
881 
882  __CFG_COUT__ << "Stopping producers..." << __E__;
883  for(auto& it : buffers_[bufferUID].producers_)
884  {
885  // use try..catch to make sure there is some identifying trail for errors
886  try
887  {
888  it->stopProcessingData();
889  }
890  catch(...)
891  {
892  __CFG_COUT_WARN__ << "An error occurred while stopping producer '"
893  << it->getProcessorID() << "'..." << __E__;
894  throw;
895  }
896  }
897 
898  // Wait until all buffers are flushed
899  unsigned int timeOut = 0;
900  const unsigned int ratio = 100;
901  const unsigned int sleepTime = 1000 * ratio;
902  unsigned int totalSleepTime =
903  sleepTime / ratio *
904  buffers_[bufferUID]
905  .buffer_->getTotalNumberOfSubBuffers(); // 1 milliseconds for each buffer!!!!
906  if(totalSleepTime < 5000000)
907  totalSleepTime = 5000000; // At least 5 seconds
908  while(!buffers_[bufferUID].buffer_->isEmpty())
909  {
910  usleep(sleepTime);
911  timeOut += sleepTime;
912  if(timeOut > totalSleepTime)
913  {
914  __CFG_COUT__ << "Couldn't flush all buffers! Timing out after "
915  << totalSleepTime / 1000000. << " seconds!" << __E__;
916  buffers_[bufferUID].buffer_->isEmpty();
917  break;
918  }
919  }
920  __CFG_COUT__ << "Stopping consumers, buffer MUST BE EMPTY. Is buffer empty? "
921  << (buffers_[bufferUID].buffer_->isEmpty() ? "yes" : "no") << __E__;
922 
923  for(auto& it : buffers_[bufferUID].consumers_)
924  {
925  // use try..catch to make sure there is some identifying trail for errors
926  try
927  {
928  it->stopProcessingData();
929  }
930  catch(...)
931  {
932  __CFG_COUT_WARN__ << "An error occurred while stopping consumer '"
933  << it->getProcessorID() << "'..." << __E__;
934  throw;
935  }
936  }
937 
938  buffers_[bufferUID].buffer_->reset();
939  buffers_[bufferUID].status_ = Initialized;
940 } // end stopBuffer()
941 
942 //==============================================================================
943 void DataManager::resumeBuffer(const std::string& bufferUID)
944 {
945  __CFG_COUT__ << "Resuming... " << bufferUID << __E__;
946 
947  for(auto& it : buffers_[bufferUID].consumers_)
948  it->resumeProcessingData();
949  for(auto& it : buffers_[bufferUID].producers_)
950  it->resumeProcessingData();
951 
952  buffers_[bufferUID].status_ = Running;
953 } // end resumeBuffer()
954 
955 //==============================================================================
956 void DataManager::pauseBuffer(const std::string& bufferUID)
957 {
958  __CFG_COUT__ << "Pausing... " << bufferUID << __E__;
959 
960  for(auto& it : buffers_[bufferUID].producers_)
961  it->pauseProcessingData();
962  // Wait until all buffers are flushed
963  unsigned int timeOut = 0;
964  const unsigned int sleepTime = 1000;
965  while(!buffers_[bufferUID].buffer_->isEmpty())
966  {
967  usleep(sleepTime);
968  timeOut += sleepTime;
969  if(timeOut >
970  sleepTime * buffers_[bufferUID].buffer_->getTotalNumberOfSubBuffers()) // 1
971  // milliseconds
972  // for each
973  // buffer!!!!
974  {
975  __CFG_COUT__ << "Couldn't flush all buffers! Timing out after "
976  << buffers_[bufferUID].buffer_->getTotalNumberOfSubBuffers() *
977  sleepTime / 1000000.
978  << " seconds!" << __E__;
979  break;
980  }
981  }
982  for(auto& it : buffers_[bufferUID].consumers_)
983  it->pauseProcessingData();
984  buffers_[bufferUID].status_ = Initialized;
985 } // end pauseBuffer()
ConfigurationTree getNode(const std::string &nodeName, bool doNotThrowOnBrokenUIDLinks=false) const
navigating between nodes
void getValue(T &value) const
void unregisterFEProducer(const std::string &bufferID, const std::string &feProducerID)
Definition: DataManager.cc:641
void registerProducer(const std::string &bufferUID, DataProducerBase *producer)
owner of the producer object!
Definition: DataManager.cc:689
bool parentSupervisorHasFrontends_
Definition: DataManager.h:99
void configureAllBuffers(void)
Definition: DataManager.cc:764
void dumpStatus(std::ostream *out=(std::ostream *)&(std::cout)) const
Definition: DataManager.cc:36
void registerConsumer(const std::string &bufferUID, DataConsumer *consumer)
Definition: DataManager.cc:728
void destroyBuffers(void)
Definition: DataManager.cc:465
virtual void configure(void)
State Machine Methods.
Definition: DataManager.cc:65
const std::string & getProcessorID(void) const
Getters.
Definition: DataProcessor.h:31