1 #include "otsdaq/DataManager/DataManager.h"
2 #include "otsdaq/ConfigurationInterface/ConfigurationManager.h"
3 #include "otsdaq/DataManager/CircularBuffer.h"
4 #include "otsdaq/DataManager/DataConsumer.h"
5 #include "otsdaq/DataManager/DataProducerBase.h"
6 #include "otsdaq/DataManager/MakeDataProcessor.h"
7 #include "otsdaq/Macros/CoutMacros.h"
8 #include "otsdaq/MessageFacility/MessageFacility.h"
18 DataManager::DataManager(
const ConfigurationTree& theXDAQContextConfigTree,
const std::string& supervisorConfigurationPath)
19 :
Configurable(theXDAQContextConfigTree, supervisorConfigurationPath)
21 , parentSupervisorHasFrontends_(false)
23 __CFG_COUT__ <<
"Constructed." << __E__;
28 DataManager::~DataManager(
void)
30 __CFG_COUT__ <<
"Destructor." << __E__;
32 __CFG_COUT__ <<
"Destructed." << __E__;
38 *out <<
"Buffer count: " << buffers_.size() << __E__;
39 for(
auto& bufferPair : buffers_)
42 <<
"Buffer '" << bufferPair.first <<
"' status=" << bufferPair.second.status_
43 <<
" producers=" << bufferPair.second.producers_.size()
44 <<
" consumers=" << bufferPair.second.consumers_.size() << __E__;
47 <<
"Producers:" << __E__;
48 for(
auto& producer : bufferPair.second.producers_)
50 *out <<
"\t\t\t" << producer->getProcessorID() <<
" ["
51 << bufferPair.second.buffer_->getProducerBufferSize(
52 producer->getProcessorID())
56 <<
"Consumers:" << __E__;
57 for(
auto& consumer : bufferPair.second.consumers_)
59 *out <<
"\t\t\t" << consumer->getProcessorID() << __E__;
67 const std::string transitionName =
"Configuring";
69 const std::string COL_NAME_bufferGroupLink =
"LinkToDataBufferTable";
70 const std::string COL_NAME_processorGroupLink =
"LinkToDataProcessorTable";
71 const std::string COL_NAME_processorType =
"ProcessorType";
72 const std::string COL_NAME_processorPlugin =
"ProcessorPluginName";
73 const std::string COL_NAME_processorLink =
"LinkToProcessorTable";
74 const std::string COL_NAME_appUID =
"ApplicationUID";
76 __CFG_COUT__ << transitionName <<
" DataManager" << __E__;
77 __CFG_COUT__ <<
"Path: " << theConfigurationPath_ +
"/" + COL_NAME_bufferGroupLink
83 for(
const auto& buffer :
84 theXDAQContextConfigTree_
85 .getNode(theConfigurationPath_ +
"/" + COL_NAME_bufferGroupLink)
88 __CFG_COUT__ <<
"Data Buffer Name: " << buffer.first << __E__;
89 if(buffer.second.getNode(TableViewColumnInfo::COL_NAME_STATUS).getValue<
bool>())
91 std::vector<unsigned int> producersVectorLocation;
92 std::vector<unsigned int> consumersVectorLocation;
93 auto bufferConfigurationList =
94 buffer.second.getNode(COL_NAME_processorGroupLink)
96 unsigned int location = 0;
97 for(
const auto& bufferConfiguration : bufferConfigurationList)
99 __CFG_COUT__ <<
"Processor id: " << bufferConfiguration.first << __E__;
100 if(bufferConfiguration.second
101 .getNode(TableViewColumnInfo::COL_NAME_STATUS)
104 if(bufferConfiguration.second.getNode(COL_NAME_processorType)
105 .getValue<std::string>() ==
"Producer")
107 producersVectorLocation.push_back(location);
109 else if(bufferConfiguration.second.getNode(COL_NAME_processorType)
110 .getValue<std::string>() ==
"Consumer")
112 consumersVectorLocation.push_back(location);
116 __CFG_SS__ <<
"Node ProcessorType in "
117 << bufferConfiguration.first <<
" of type "
118 << bufferConfiguration.second
119 .getNode(COL_NAME_processorPlugin)
120 .getValue<std::string>()
121 <<
" is invalid. The only accepted types are Producer "
124 __CFG_COUT_ERR__ << ss.str();
133 producersVectorLocation.size() ==
136 __CFG_SS__ <<
"Node Data Buffer " << buffer.first <<
" has "
137 << producersVectorLocation.size() <<
" Producers"
138 <<
" and " << consumersVectorLocation.size() <<
" Consumers"
139 <<
" there must be at least 1 Producer "
141 "for the buffer!" << __E__;
142 __CFG_COUT_ERR__ << ss.str();
148 <<
"Parent supervisor has front-ends, so FE-producers may "
149 <<
"be instantiated in the configure steps of the FESupervisor."
152 configureBuffer<std::string, std::map<std::string, std::string> >(
155 for(
auto& producerLocation : producersVectorLocation)
159 __CFG_COUT__ <<
"Creating producer... "
160 << bufferConfigurationList[producerLocation].first << __E__;
176 bufferConfigurationList[producerLocation]
177 .second.getNode(COL_NAME_processorPlugin)
178 .getValue<std::string>(),
179 theXDAQContextConfigTree_.getBackNode(theConfigurationPath_)
183 bufferConfigurationList[producerLocation].first,
184 theXDAQContextConfigTree_,
185 theConfigurationPath_ +
"/" + COL_NAME_bufferGroupLink +
"/" +
186 buffer.first +
"/" + COL_NAME_processorGroupLink +
"/" +
187 bufferConfigurationList[producerLocation].first +
"/" +
188 COL_NAME_processorLink));
192 __CFG_SS__ <<
"Construction failed for producer '"
193 << bufferConfigurationList[producerLocation].first
194 <<
"!' Null pointer returned." << __E__;
202 __CFG_COUT__ << ss.str() << __E__;
205 catch(
const std::bad_cast& e)
207 __CFG_SS__ <<
"Failed to instantiate producer plugin named '"
208 << bufferConfigurationList[producerLocation].first
210 << bufferConfigurationList[producerLocation]
211 .second.getNode(COL_NAME_processorPlugin)
212 .getValue<std::string>()
213 <<
"' due to the following error: \n"
214 << e.what() << __E__;
215 __CFG_COUT_ERR__ << ss.str();
218 catch(
const cet::exception& e)
220 __CFG_SS__ <<
"Failed to instantiate producer plugin named '"
221 << bufferConfigurationList[producerLocation].first
223 << bufferConfigurationList[producerLocation]
224 .second.getNode(COL_NAME_processorPlugin)
225 .getValue<std::string>()
226 <<
"' due to the following error: \n"
227 << e.what() << __E__;
228 __CFG_COUT_ERR__ << ss.str();
231 catch(
const std::runtime_error& e)
233 __CFG_SS__ <<
"Failed to instantiate producer plugin named '"
234 << bufferConfigurationList[producerLocation].first
236 << bufferConfigurationList[producerLocation]
237 .second.getNode(COL_NAME_processorPlugin)
238 .getValue<std::string>()
239 <<
"' due to the following error: \n"
240 << e.what() << __E__;
241 __CFG_COUT_ERR__ << ss.str();
246 __CFG_SS__ <<
"Failed to instantiate producer plugin named '"
247 << bufferConfigurationList[producerLocation].first
249 << bufferConfigurationList[producerLocation]
250 .second.getNode(COL_NAME_processorPlugin)
251 .getValue<std::string>()
252 <<
"' due to an unknown error." << __E__;
257 catch(
const std::exception& e)
259 ss <<
"Exception message: " << e.what();
264 __CFG_COUT_ERR__ << ss.str();
269 __CFG_COUT__ << bufferConfigurationList[producerLocation].first
270 <<
" has been created!" << __E__;
273 for(
auto& consumerLocation : consumersVectorLocation)
277 __CFG_COUT__ <<
"Creating consumer... "
278 << bufferConfigurationList[consumerLocation].first << __E__;
296 bufferConfigurationList[consumerLocation]
297 .second.getNode(COL_NAME_processorPlugin)
298 .getValue<std::string>(),
299 theXDAQContextConfigTree_.getBackNode(theConfigurationPath_)
303 bufferConfigurationList[consumerLocation].first,
304 theXDAQContextConfigTree_,
305 theConfigurationPath_ +
"/" + COL_NAME_bufferGroupLink +
"/" +
306 buffer.first +
"/" + COL_NAME_processorGroupLink +
"/" +
307 bufferConfigurationList[consumerLocation].first +
"/" +
308 COL_NAME_processorLink));
312 __CFG_SS__ <<
"Construction failed for consumer '"
313 << bufferConfigurationList[consumerLocation].first
314 <<
"!' Null pointer returned." << __E__;
321 __CFG_COUT__ << ss.str() << __E__;
324 catch(
const std::bad_cast& e)
326 __CFG_SS__ <<
"Failed to instantiate consumer plugin named '"
327 << bufferConfigurationList[consumerLocation].first
329 << bufferConfigurationList[consumerLocation]
330 .second.getNode(COL_NAME_processorPlugin)
331 .getValue<std::string>()
332 <<
"' due to the following error: \n"
333 << e.what() << __E__;
334 __CFG_COUT_ERR__ << ss.str();
337 catch(
const cet::exception& e)
339 __CFG_SS__ <<
"Failed to instantiate consumer plugin named '"
340 << bufferConfigurationList[consumerLocation].first
342 << bufferConfigurationList[consumerLocation]
343 .second.getNode(COL_NAME_processorPlugin)
344 .getValue<std::string>()
345 <<
"' due to the following error: \n"
346 << e.what() << __E__;
347 __CFG_COUT_ERR__ << ss.str();
350 catch(
const std::runtime_error& e)
352 __CFG_SS__ <<
"Failed to instantiate consumer plugin named '"
353 << bufferConfigurationList[consumerLocation].first
355 << bufferConfigurationList[consumerLocation]
356 .second.getNode(COL_NAME_processorPlugin)
357 .getValue<std::string>()
358 <<
"' due to the following error: \n"
359 << e.what() << __E__;
360 __CFG_COUT_ERR__ << ss.str();
365 __CFG_SS__ <<
"Failed to instantiate consumer plugin named '"
366 << bufferConfigurationList[consumerLocation].first
368 << bufferConfigurationList[consumerLocation]
369 .second.getNode(COL_NAME_processorPlugin)
370 .getValue<std::string>()
371 <<
"' due to an unknown error." << __E__;
376 catch(
const std::exception& e)
378 ss <<
"Exception message: " << e.what();
383 __CFG_COUT_ERR__ << ss.str();
387 __CFG_COUT__ << bufferConfigurationList[consumerLocation].first
388 <<
" has been created!" << __E__;
396 void DataManager::halt(
void)
398 const std::string transitionName =
"Halting";
400 __CFG_COUT__ << transitionName <<
" DataManager " << __E__;
409 __CFG_COUT_WARN__ <<
"An error occurred while halting the Data Manager, ignoring."
415 __CFG_COUT__ << transitionName <<
" DataManager stopped. Now destruct buffers..."
423 void DataManager::pause(
void)
425 const std::string transitionName =
"Pausing";
427 __CFG_COUT__ << transitionName <<
" DataManager " << __E__;
429 DataManager::pauseAllBuffers();
433 void DataManager::resume(
void)
435 const std::string transitionName =
"Resuming";
437 __CFG_COUT__ << transitionName <<
" DataManager " << __E__;
439 DataManager::resumeAllBuffers();
443 void DataManager::start(std::string runNumber)
445 const std::string transitionName =
"Starting";
447 __CFG_COUT__ << transitionName <<
" DataManager " << __E__;
449 DataManager::startAllBuffers(runNumber);
453 void DataManager::stop()
455 const std::string transitionName =
"Stopping";
457 __CFG_COUT__ << transitionName <<
" DataManager " << __E__;
459 DataManager::stopAllBuffers();
467 DataManager::stopAllBuffers();
469 for(
auto& bufferPair : buffers_)
473 for(
auto& producer : bufferPair.second.producers_)
475 bufferPair.second.producers_.clear();
477 for(
auto& consumer : bufferPair.second.consumers_)
479 bufferPair.second.consumers_.clear();
481 delete bufferPair.second.buffer_;
642 const std::string& feProducerID)
644 __CFG_COUT__ <<
"Un-Registering FE-producer '" << feProducerID <<
"' from buffer '"
645 << bufferID <<
"'..." << __E__;
647 auto bufferIt = buffers_.find(bufferID);
648 if(bufferIt == buffers_.end())
650 __CFG_SS__ <<
"While Un-Registering FE-producer '" << feProducerID
651 <<
",' buffer '" << bufferID <<
"' not found!" << __E__;
660 for(
auto feProducerIt = bufferIt->second.producers_.begin();
661 feProducerIt != bufferIt->second.producers_.end();
664 if((*feProducerIt)->getProcessorID() == feProducerID)
668 bufferIt->second.producers_.erase(feProducerIt);
673 __CFG_COUT__ <<
"Un-Registered FE-producer '" << feProducerID <<
"' from buffer '"
674 << bufferID <<
".'" << __E__;
678 __CFG_COUT__ << ss.str() << __E__;
692 __CFG_COUT__ <<
"Registering producer '" << producer->
getProcessorID()
693 <<
"' to buffer '" << bufferUID <<
"'..." << __E__;
695 auto bufferIt = buffers_.find(bufferUID);
696 if(bufferIt == buffers_.end())
698 __CFG_SS__ <<
"Can't find buffer UID '" + bufferUID <<
"' for producer '"
700 <<
".' Make sure that your configuration is correct!" << __E__;
702 ss <<
"\n\n Here is the list of buffers:" << __E__;
703 for(
const auto& bufferPair : buffers_)
704 ss << bufferPair.first << __E__;
711 __CFG_SS__ <<
"Before!" << __E__;
713 __CFG_COUT__ << ss.str() << __E__;
716 __CFG_COUTV__(producer->getBufferSize());
717 bufferIt->second.buffer_->registerProducer(producer, producer->getBufferSize());
718 bufferIt->second.producers_.push_back(producer);
721 __CFG_SS__ <<
"After!" << __E__;
723 __CFG_COUT__ << ss.str() << __E__;
730 __CFG_COUT__ <<
"Registering consumer '" << consumer->
getProcessorID()
731 <<
"' to buffer '" << bufferUID <<
"'..." << __E__;
733 auto bufferIt = buffers_.find(bufferUID);
734 if(bufferIt == buffers_.end())
736 __CFG_SS__ <<
"Can't find buffer UID '" + bufferUID <<
"' for consumer '"
738 <<
".' Make sure that your configuration is correct!" << __E__;
740 ss <<
"\n\n Here is the list of buffers:" << __E__;
741 for(
const auto& bufferPair : buffers_)
742 ss << bufferPair.first << __E__;
748 __CFG_SS__ <<
"Before!" << __E__;
750 __CFG_COUT__ << ss.str() << __E__;
753 bufferIt->second.buffer_->registerConsumer(consumer);
754 bufferIt->second.consumers_.push_back(consumer);
757 __CFG_SS__ <<
"After!" << __E__;
759 __CFG_COUT__ << ss.str() << __E__;
766 for(
auto it = buffers_.begin(); it != buffers_.end(); it++)
767 configureBuffer(it->first);
771 void DataManager::startAllBuffers(
const std::string& runNumber)
773 for(
auto it = buffers_.begin(); it != buffers_.end(); it++)
774 startBuffer(it->first, runNumber);
778 void DataManager::stopAllBuffers(
void)
780 for(
auto it = buffers_.begin(); it != buffers_.end(); it++)
781 stopBuffer(it->first);
785 void DataManager::resumeAllBuffers(
void)
787 for(
auto it = buffers_.begin(); it != buffers_.end(); it++)
788 resumeBuffer(it->first);
792 void DataManager::pauseAllBuffers(
void)
794 for(
auto it = buffers_.begin(); it != buffers_.end(); it++)
795 pauseBuffer(it->first);
799 void DataManager::configureBuffer(
const std::string& bufferUID)
801 __CFG_COUT__ <<
"Configuring... " << bufferUID << __E__;
803 for(
auto& it : buffers_[bufferUID].consumers_)
812 __CFG_COUT_WARN__ <<
"An error occurred while configuring consumer '"
813 << it->getProcessorID() <<
"'..." << __E__;
818 for(
auto& it : buffers_[bufferUID].producers_)
827 __CFG_COUT_WARN__ <<
"An error occurred while starting producer '"
828 << it->getProcessorID() <<
"'..." << __E__;
833 buffers_[bufferUID].status_ = Initialized;
838 void DataManager::startBuffer(
const std::string& bufferUID, std::string runNumber)
840 __CFG_COUT__ <<
"Starting... " << bufferUID << __E__;
842 buffers_[bufferUID].buffer_->reset();
843 for(
auto& it : buffers_[bufferUID].consumers_)
848 it->startProcessingData(runNumber);
852 __CFG_COUT_WARN__ <<
"An error occurred while starting consumer '"
853 << it->getProcessorID() <<
"'..." << __E__;
858 for(
auto& it : buffers_[bufferUID].producers_)
863 it->startProcessingData(runNumber);
867 __CFG_COUT_WARN__ <<
"An error occurred while starting producer '"
868 << it->getProcessorID() <<
"'..." << __E__;
873 buffers_[bufferUID].status_ = Running;
878 void DataManager::stopBuffer(
const std::string& bufferUID)
880 __CFG_COUT__ <<
"Stopping... " << bufferUID << __E__;
882 __CFG_COUT__ <<
"Stopping producers..." << __E__;
883 for(
auto& it : buffers_[bufferUID].producers_)
888 it->stopProcessingData();
892 __CFG_COUT_WARN__ <<
"An error occurred while stopping producer '"
893 << it->getProcessorID() <<
"'..." << __E__;
899 unsigned int timeOut = 0;
900 const unsigned int ratio = 100;
901 const unsigned int sleepTime = 1000 * ratio;
902 unsigned int totalSleepTime =
905 .buffer_->getTotalNumberOfSubBuffers();
906 if(totalSleepTime < 5000000)
907 totalSleepTime = 5000000;
908 while(!buffers_[bufferUID].buffer_->isEmpty())
911 timeOut += sleepTime;
912 if(timeOut > totalSleepTime)
914 __CFG_COUT__ <<
"Couldn't flush all buffers! Timing out after "
915 << totalSleepTime / 1000000. <<
" seconds!" << __E__;
916 buffers_[bufferUID].buffer_->isEmpty();
920 __CFG_COUT__ <<
"Stopping consumers, buffer MUST BE EMPTY. Is buffer empty? "
921 << (buffers_[bufferUID].buffer_->isEmpty() ?
"yes" :
"no") << __E__;
923 for(
auto& it : buffers_[bufferUID].consumers_)
928 it->stopProcessingData();
932 __CFG_COUT_WARN__ <<
"An error occurred while stopping consumer '"
933 << it->getProcessorID() <<
"'..." << __E__;
938 buffers_[bufferUID].buffer_->reset();
939 buffers_[bufferUID].status_ = Initialized;
943 void DataManager::resumeBuffer(
const std::string& bufferUID)
945 __CFG_COUT__ <<
"Resuming... " << bufferUID << __E__;
947 for(
auto& it : buffers_[bufferUID].consumers_)
948 it->resumeProcessingData();
949 for(
auto& it : buffers_[bufferUID].producers_)
950 it->resumeProcessingData();
952 buffers_[bufferUID].status_ = Running;
956 void DataManager::pauseBuffer(
const std::string& bufferUID)
958 __CFG_COUT__ <<
"Pausing... " << bufferUID << __E__;
960 for(
auto& it : buffers_[bufferUID].producers_)
961 it->pauseProcessingData();
963 unsigned int timeOut = 0;
964 const unsigned int sleepTime = 1000;
965 while(!buffers_[bufferUID].buffer_->isEmpty())
968 timeOut += sleepTime;
970 sleepTime * buffers_[bufferUID].buffer_->getTotalNumberOfSubBuffers())
975 __CFG_COUT__ <<
"Couldn't flush all buffers! Timing out after "
976 << buffers_[bufferUID].buffer_->getTotalNumberOfSubBuffers() *
978 <<
" seconds!" << __E__;
982 for(
auto& it : buffers_[bufferUID].consumers_)
983 it->pauseProcessingData();
984 buffers_[bufferUID].status_ = Initialized;
ConfigurationTree getNode(const std::string &nodeName, bool doNotThrowOnBrokenUIDLinks=false) const
navigating between nodes
void getValue(T &value) const
void unregisterFEProducer(const std::string &bufferID, const std::string &feProducerID)
void registerProducer(const std::string &bufferUID, DataProducerBase *producer)
owner of the producer object!
bool parentSupervisorHasFrontends_
void configureAllBuffers(void)
void dumpStatus(std::ostream *out=(std::ostream *)&(std::cout)) const
void registerConsumer(const std::string &bufferUID, DataConsumer *consumer)
void destroyBuffers(void)
virtual void configure(void)
State Machine Methods.
const std::string & getProcessorID(void) const
Getters.