1 #include "otsdaq/DataManager/DataManager.h"
2 #include "otsdaq/ConfigurationInterface/ConfigurationManager.h"
3 #include "otsdaq/DataManager/CircularBuffer.h"
4 #include "otsdaq/DataManager/DataConsumer.h"
5 #include "otsdaq/DataManager/DataProducerBase.h"
6 #include "otsdaq/Macros/CoutMacros.h"
7 #include "otsdaq/MessageFacility/MessageFacility.h"
8 #include "otsdaq/PluginMakers/MakeDataProcessor.h"
18 DataManager::DataManager(
const ConfigurationTree& theXDAQContextConfigTree,
const std::string& supervisorConfigurationPath)
19 :
Configurable(theXDAQContextConfigTree, supervisorConfigurationPath)
21 , parentSupervisorHasFrontends_(false)
23 __CFG_COUT__ <<
"Constructed." << __E__;
28 DataManager::~DataManager(
void)
30 __CFG_COUT__ <<
"Destructor." << __E__;
31 DataManager::destroyBuffers();
32 __CFG_COUT__ <<
"Destructed." << __E__;
36 void DataManager::dumpStatus(std::ostream* out)
const
38 *out <<
"Buffer count: " << buffers_.size() << __E__;
39 for(
auto& bufferPair : buffers_)
42 <<
"Buffer '" << bufferPair.first <<
"' status=" << bufferPair.second.status_ <<
" producers=" << bufferPair.second.producers_.size()
43 <<
" consumers=" << bufferPair.second.consumers_.size() << __E__;
46 <<
"Producers:" << __E__;
47 for(
auto& producer : bufferPair.second.producers_)
49 *out <<
"\t\t\t" << producer->getProcessorID() <<
" [" << bufferPair.second.buffer_->getProducerBufferSize(producer->getProcessorID()) <<
"]"
53 <<
"Consumers:" << __E__;
54 for(
auto& consumer : bufferPair.second.consumers_)
56 *out <<
"\t\t\t" << consumer->getProcessorID() << __E__;
62 void DataManager::configure(
void)
64 const std::string transitionName =
"Configuring";
66 const std::string COL_NAME_bufferGroupLink =
"LinkToDataBufferTable";
67 const std::string COL_NAME_processorGroupLink =
"LinkToDataProcessorTable";
68 const std::string COL_NAME_processorType =
"ProcessorType";
69 const std::string COL_NAME_processorPlugin =
"ProcessorPluginName";
70 const std::string COL_NAME_processorLink =
"LinkToProcessorTable";
71 const std::string COL_NAME_appUID =
"ApplicationUID";
73 __CFG_COUT__ << transitionName <<
" DataManager" << __E__;
74 __CFG_COUT__ <<
"Path: " << theConfigurationPath_ +
"/" + COL_NAME_bufferGroupLink << __E__;
79 for(
const auto& buffer :
80 theXDAQContextConfigTree_.getNode(theConfigurationPath_ +
"/" + COL_NAME_bufferGroupLink).getChildren())
82 __CFG_COUT__ <<
"Data Buffer Name: " << buffer.first << __E__;
83 if(buffer.second.getNode(TableViewColumnInfo::COL_NAME_STATUS).getValue<
bool>())
85 std::vector<unsigned int> producersVectorLocation;
86 std::vector<unsigned int> consumersVectorLocation;
87 auto bufferConfigurationList = buffer.second.getNode(COL_NAME_processorGroupLink).getChildren();
88 unsigned int location = 0;
89 for(
const auto& bufferConfiguration : bufferConfigurationList)
91 __CFG_COUT__ <<
"Processor id: " << bufferConfiguration.first << __E__;
92 if(bufferConfiguration.second.getNode(TableViewColumnInfo::COL_NAME_STATUS).getValue<
bool>())
94 if(bufferConfiguration.second.getNode(COL_NAME_processorType).getValue<std::string>() ==
"Producer")
96 producersVectorLocation.push_back(location);
98 else if(bufferConfiguration.second.getNode(COL_NAME_processorType).getValue<std::string>() ==
"Consumer")
100 consumersVectorLocation.push_back(location);
104 __CFG_SS__ <<
"Node ProcessorType in " << bufferConfiguration.first <<
" of type "
105 << bufferConfiguration.second.getNode(COL_NAME_processorPlugin).getValue<std::string>()
106 <<
" is invalid. The only accepted types are Producer "
109 __CFG_MOUT_ERR__ << ss.str();
117 if(!parentSupervisorHasFrontends_ && producersVectorLocation.size() == 0)
119 __CFG_SS__ <<
"Node Data Buffer " << buffer.first <<
" has " << producersVectorLocation.size() <<
" Producers"
120 <<
" and " << consumersVectorLocation.size() <<
" Consumers"
121 <<
" there must be at least 1 Producer " <<
122 "for the buffer!" << __E__;
123 __CFG_MOUT_ERR__ << ss.str();
127 if(parentSupervisorHasFrontends_)
128 __CFG_COUT__ <<
"Parent supervisor has front-ends, so FE-producers may "
129 <<
"be instantiated in the configure steps of the FESupervisor." << __E__;
131 configureBuffer<std::string, std::map<std::string, std::string> >(buffer.first);
133 for(
auto& producerLocation : producersVectorLocation)
137 __CFG_COUT__ <<
"Creating producer... " << bufferConfigurationList[producerLocation].first << __E__;
152 makeDataProcessor(bufferConfigurationList[producerLocation].second.getNode(COL_NAME_processorPlugin).getValue<std::string>(),
153 theXDAQContextConfigTree_.getBackNode(theConfigurationPath_).getNode(COL_NAME_appUID).getValue<std::string>(),
155 bufferConfigurationList[producerLocation].first,
156 theXDAQContextConfigTree_,
157 theConfigurationPath_ +
"/" + COL_NAME_bufferGroupLink +
"/" + buffer.first +
"/" + COL_NAME_processorGroupLink +
158 "/" + bufferConfigurationList[producerLocation].first +
"/" + COL_NAME_processorLink));
162 __CFG_SS__ <<
"Construction failed for producer '" << bufferConfigurationList[producerLocation].first <<
"!' Null pointer returned."
166 __CFG_COUT__ << tmpCastCheck->getProcessorID() << __E__;
170 dumpStatus((std::ostream*)&ss);
171 __CFG_COUT__ << ss.str() << __E__;
174 catch(
const std::bad_cast& e)
176 __CFG_SS__ <<
"Failed to instantiate producer plugin named '" << bufferConfigurationList[producerLocation].first <<
"' of type '"
177 << bufferConfigurationList[producerLocation].second.getNode(COL_NAME_processorPlugin).getValue<std::string>()
178 <<
"' due to the following error: \n"
179 << e.what() << __E__;
180 __CFG_MOUT_ERR__ << ss.str();
183 catch(
const cet::exception& e)
185 __CFG_SS__ <<
"Failed to instantiate producer plugin named '" << bufferConfigurationList[producerLocation].first <<
"' of type '"
186 << bufferConfigurationList[producerLocation].second.getNode(COL_NAME_processorPlugin).getValue<std::string>()
187 <<
"' due to the following error: \n"
188 << e.what() << __E__;
189 __CFG_MOUT_ERR__ << ss.str();
192 catch(
const std::runtime_error& e)
194 __CFG_SS__ <<
"Failed to instantiate producer plugin named '" << bufferConfigurationList[producerLocation].first <<
"' of type '"
195 << bufferConfigurationList[producerLocation].second.getNode(COL_NAME_processorPlugin).getValue<std::string>()
196 <<
"' due to the following error: \n"
197 << e.what() << __E__;
198 __CFG_MOUT_ERR__ << ss.str();
203 __CFG_SS__ <<
"Failed to instantiate producer plugin named '" << bufferConfigurationList[producerLocation].first <<
"' of type '"
204 << bufferConfigurationList[producerLocation].second.getNode(COL_NAME_processorPlugin).getValue<std::string>()
205 <<
"' due to an unknown error." << __E__;
206 __CFG_MOUT_ERR__ << ss.str();
211 __CFG_COUT__ << bufferConfigurationList[producerLocation].first <<
" has been created!" << __E__;
214 for(
auto& consumerLocation : consumersVectorLocation)
218 __CFG_COUT__ <<
"Creating consumer... " << bufferConfigurationList[consumerLocation].first << __E__;
235 makeDataProcessor(bufferConfigurationList[consumerLocation].second.getNode(COL_NAME_processorPlugin).getValue<std::string>(),
236 theXDAQContextConfigTree_.getBackNode(theConfigurationPath_).getNode(COL_NAME_appUID).getValue<std::string>(),
238 bufferConfigurationList[consumerLocation].first,
239 theXDAQContextConfigTree_,
240 theConfigurationPath_ +
"/" + COL_NAME_bufferGroupLink +
"/" + buffer.first +
"/" + COL_NAME_processorGroupLink +
241 "/" + bufferConfigurationList[consumerLocation].first +
"/" + COL_NAME_processorLink));
245 __CFG_SS__ <<
"Construction failed for consumer '" << bufferConfigurationList[consumerLocation].first <<
"!' Null pointer returned."
252 dumpStatus((std::ostream*)&ss);
253 __CFG_COUT__ << ss.str() << __E__;
256 catch(
const std::bad_cast& e)
258 __CFG_SS__ <<
"Failed to instantiate consumer plugin named '" << bufferConfigurationList[consumerLocation].first <<
"' of type '"
259 << bufferConfigurationList[consumerLocation].second.getNode(COL_NAME_processorPlugin).getValue<std::string>()
260 <<
"' due to the following error: \n"
261 << e.what() << __E__;
262 __CFG_MOUT_ERR__ << ss.str();
265 catch(
const cet::exception& e)
267 __CFG_SS__ <<
"Failed to instantiate consumer plugin named '" << bufferConfigurationList[consumerLocation].first <<
"' of type '"
268 << bufferConfigurationList[consumerLocation].second.getNode(COL_NAME_processorPlugin).getValue<std::string>()
269 <<
"' due to the following error: \n"
270 << e.what() << __E__;
271 __CFG_MOUT_ERR__ << ss.str();
274 catch(
const std::runtime_error& e)
276 __CFG_SS__ <<
"Failed to instantiate consumer plugin named '" << bufferConfigurationList[consumerLocation].first <<
"' of type '"
277 << bufferConfigurationList[consumerLocation].second.getNode(COL_NAME_processorPlugin).getValue<std::string>()
278 <<
"' due to the following error: \n"
279 << e.what() << __E__;
280 __CFG_MOUT_ERR__ << ss.str();
285 __CFG_SS__ <<
"Failed to instantiate consumer plugin named '" << bufferConfigurationList[consumerLocation].first <<
"' of type '"
286 << bufferConfigurationList[consumerLocation].second.getNode(COL_NAME_processorPlugin).getValue<std::string>()
287 <<
"' due to an unknown error." << __E__;
288 __CFG_MOUT_ERR__ << ss.str();
292 __CFG_COUT__ << bufferConfigurationList[consumerLocation].first <<
" has been created!" << __E__;
299 void DataManager::halt(
void)
301 const std::string transitionName =
"Halting";
303 __CFG_COUT__ << transitionName <<
" DataManager " << __E__;
312 __CFG_COUT_WARN__ <<
"An error occurred while halting the Data Manager, ignoring." << __E__;
317 __CFG_COUT__ << transitionName <<
" DataManager stopped. Now destruct buffers..." << __E__;
319 DataManager::destroyBuffers();
324 void DataManager::pause(
void)
326 const std::string transitionName =
"Pausing";
328 __CFG_COUT__ << transitionName <<
" DataManager " << __E__;
330 DataManager::pauseAllBuffers();
334 void DataManager::resume(
void)
336 const std::string transitionName =
"Resuming";
338 __CFG_COUT__ << transitionName <<
" DataManager " << __E__;
340 DataManager::resumeAllBuffers();
344 void DataManager::start(std::string runNumber)
346 const std::string transitionName =
"Starting";
348 __CFG_COUT__ << transitionName <<
" DataManager " << __E__;
354 void DataManager::stop()
356 const std::string transitionName =
"Stopping";
358 __CFG_COUT__ << transitionName <<
" DataManager " << __E__;
360 DataManager::stopAllBuffers();
366 void DataManager::destroyBuffers(
void)
368 DataManager::stopAllBuffers();
370 for(
auto& bufferPair : buffers_)
374 for(
auto& producer : bufferPair.second.producers_)
376 bufferPair.second.producers_.clear();
378 for(
auto& consumer : bufferPair.second.consumers_)
380 bufferPair.second.consumers_.clear();
382 delete bufferPair.second.buffer_;
544 __CFG_COUT__ <<
"Un-Registering FE-producer '" << feProducerID <<
"' from buffer '" << bufferID <<
"'..." << __E__;
546 auto bufferIt = buffers_.find(bufferID);
547 if(bufferIt == buffers_.end())
549 __CFG_SS__ <<
"While Un-Registering FE-producer '" << feProducerID <<
",' buffer '" << bufferID <<
"' not found!" << __E__;
558 for(
auto feProducerIt = bufferIt->second.producers_.begin(); feProducerIt != bufferIt->second.producers_.end(); feProducerIt++)
560 if((*feProducerIt)->getProcessorID() == feProducerID)
564 bufferIt->second.producers_.erase(feProducerIt);
569 __CFG_COUT__ <<
"Un-Registered FE-producer '" << feProducerID <<
"' from buffer '" << bufferID <<
".'" << __E__;
572 dumpStatus((std::ostream*)&ss);
573 __CFG_COUT__ << ss.str() << __E__;
584 void DataManager::registerProducer(
const std::string& bufferUID,
DataProducerBase* producer)
586 __CFG_COUT__ <<
"Registering producer '" << producer->getProcessorID() <<
"' to buffer '" << bufferUID <<
"'..." << __E__;
588 auto bufferIt = buffers_.find(bufferUID);
589 if(bufferIt == buffers_.end())
591 __CFG_SS__ <<
"Can't find buffer UID '" + bufferUID <<
"' for producer '" << producer->getProcessorID()
592 <<
".' Make sure that your configuration is correct!" << __E__;
594 ss <<
"\n\n Here is the list of buffers:" << __E__;
595 for(
const auto bufferPair : buffers_)
596 ss << bufferPair.first << __E__;
603 __CFG_SS__ <<
"Before!" << __E__;
604 dumpStatus((std::ostream*)&ss);
605 __CFG_COUT__ << ss.str() << __E__;
608 __CFG_COUTV__(producer->getBufferSize());
609 bufferIt->second.buffer_->registerProducer(producer, producer->getBufferSize());
610 bufferIt->second.producers_.push_back(producer);
613 __CFG_SS__ <<
"After!" << __E__;
614 dumpStatus((std::ostream*)&ss);
615 __CFG_COUT__ << ss.str() << __E__;
620 void DataManager::registerConsumer(
const std::string& bufferUID,
DataConsumer* consumer)
622 __CFG_COUT__ <<
"Registering consumer '" << consumer->getProcessorID() <<
"' to buffer '" << bufferUID <<
"'..." << __E__;
624 auto bufferIt = buffers_.find(bufferUID);
625 if(bufferIt == buffers_.end())
627 __CFG_SS__ <<
"Can't find buffer UID '" + bufferUID <<
"' for consumer '" << consumer->getProcessorID()
628 <<
".' Make sure that your configuration is correct!" << __E__;
630 ss <<
"\n\n Here is the list of buffers:" << __E__;
631 for(
const auto bufferPair : buffers_)
632 ss << bufferPair.first << __E__;
638 __CFG_SS__ <<
"Before!" << __E__;
639 dumpStatus((std::ostream*)&ss);
640 __CFG_COUT__ << ss.str() << __E__;
643 bufferIt->second.buffer_->registerConsumer(consumer);
644 bufferIt->second.consumers_.push_back(consumer);
647 __CFG_SS__ <<
"After!" << __E__;
648 dumpStatus((std::ostream*)&ss);
649 __CFG_COUT__ << ss.str() << __E__;
656 for(
auto it = buffers_.begin(); it != buffers_.end(); it++)
657 startBuffer(it->first, runNumber);
661 void DataManager::stopAllBuffers(
void)
663 for(
auto it = buffers_.begin(); it != buffers_.end(); it++)
664 stopBuffer(it->first);
668 void DataManager::resumeAllBuffers(
void)
670 for(
auto it = buffers_.begin(); it != buffers_.end(); it++)
671 resumeBuffer(it->first);
675 void DataManager::pauseAllBuffers(
void)
677 for(
auto it = buffers_.begin(); it != buffers_.end(); it++)
678 pauseBuffer(it->first);
682 void DataManager::startBuffer(
const std::string& bufferUID, std::string runNumber)
684 __CFG_COUT__ <<
"Starting... " << bufferUID << __E__;
686 buffers_[bufferUID].buffer_->reset();
687 for(
auto& it : buffers_[bufferUID].consumers_)
692 it->startProcessingData(runNumber);
696 __CFG_COUT_WARN__ <<
"An error occurred while starting consumer '" << it->getProcessorID() <<
"'..." << __E__;
701 for(
auto& it : buffers_[bufferUID].producers_)
706 it->startProcessingData(runNumber);
710 __CFG_COUT_WARN__ <<
"An error occurred while starting producer '" << it->getProcessorID() <<
"'..." << __E__;
715 buffers_[bufferUID].status_ = Running;
720 void DataManager::stopBuffer(
const std::string& bufferUID)
722 __CFG_COUT__ <<
"Stopping... " << bufferUID << __E__;
724 __CFG_COUT__ <<
"Stopping producers..." << __E__;
725 for(
auto& it : buffers_[bufferUID].producers_)
730 it->stopProcessingData();
734 __CFG_COUT_WARN__ <<
"An error occurred while stopping producer '" << it->getProcessorID() <<
"'..." << __E__;
740 unsigned int timeOut = 0;
741 const unsigned int ratio = 100;
742 const unsigned int sleepTime = 1000 * ratio;
743 unsigned int totalSleepTime = sleepTime / ratio * buffers_[bufferUID].buffer_->getTotalNumberOfSubBuffers();
744 if(totalSleepTime < 5000000)
745 totalSleepTime = 5000000;
746 while(!buffers_[bufferUID].buffer_->isEmpty())
749 timeOut += sleepTime;
750 if(timeOut > totalSleepTime)
752 __CFG_COUT__ <<
"Couldn't flush all buffers! Timing out after " << totalSleepTime / 1000000. <<
" seconds!" << __E__;
753 buffers_[bufferUID].buffer_->isEmpty();
757 __CFG_COUT__ <<
"Stopping consumers, buffer MUST BE EMPTY. Is buffer empty? " << (buffers_[bufferUID].buffer_->isEmpty() ?
"yes" :
"no") << __E__;
759 for(
auto& it : buffers_[bufferUID].consumers_)
764 it->stopProcessingData();
768 __CFG_COUT_WARN__ <<
"An error occurred while stopping consumer '" << it->getProcessorID() <<
"'..." << __E__;
773 buffers_[bufferUID].buffer_->reset();
774 buffers_[bufferUID].status_ = Initialized;
778 void DataManager::resumeBuffer(
const std::string& bufferUID)
780 __CFG_COUT__ <<
"Resuming... " << bufferUID << __E__;
782 for(
auto& it : buffers_[bufferUID].consumers_)
783 it->resumeProcessingData();
784 for(
auto& it : buffers_[bufferUID].producers_)
785 it->resumeProcessingData();
787 buffers_[bufferUID].status_ = Running;
791 void DataManager::pauseBuffer(
const std::string& bufferUID)
793 __CFG_COUT__ <<
"Pausing... " << bufferUID << __E__;
795 for(
auto& it : buffers_[bufferUID].producers_)
796 it->pauseProcessingData();
798 unsigned int timeOut = 0;
799 const unsigned int sleepTime = 1000;
800 while(!buffers_[bufferUID].buffer_->isEmpty())
803 timeOut += sleepTime;
804 if(timeOut > sleepTime * buffers_[bufferUID].buffer_->getTotalNumberOfSubBuffers())
809 __CFG_COUT__ <<
"Couldn't flush all buffers! Timing out after " << buffers_[bufferUID].buffer_->getTotalNumberOfSubBuffers() * sleepTime / 1000000.
810 <<
" seconds!" << __E__;
814 for(
auto& it : buffers_[bufferUID].consumers_)
815 it->pauseProcessingData();
816 buffers_[bufferUID].status_ = Initialized;
void unregisterFEProducer(const std::string &bufferID, const std::string &feProducerID)
void startAllBuffers(const std::string &runNumber)