2 #include "otsdaq/ARTDAQSupervisor/ARTDAQSupervisor.hh"
4 #include "artdaq-core/Utilities/configureMessageFacility.hh"
5 #include "artdaq/BuildInfo/GetPackageBuildInfo.hh"
6 #include "artdaq/DAQdata/Globals.hh"
7 #include "artdaq/ExternalComms/MakeCommanderPlugin.hh"
8 #include "cetlib_except/exception.h"
9 #include "fhiclcpp/make_ParameterSet.h"
11 #include "artdaq-core/Utilities/ExceptionHandler.hh"
13 #include <boost/exception/all.hpp>
14 #include <boost/filesystem.hpp>
22 #define FAKE_CONFIG_NAME "ots_config"
23 #define DAQINTERFACE_PORT std::atoi(__ENV__("ARTDAQ_BASE_PORT")) + (partition_ * std::atoi(__ENV__("ARTDAQ_PORTS_PER_PARTITION")))
26 static std::unordered_map<int, struct sigaction> old_actions = std::unordered_map<int, struct sigaction>();
27 static bool sighandler_init =
false;
28 static void signal_handler(
int signum)
31 TRACE_STREAMER(TLVL_ERROR, &(
"ARTDAQsupervisor")[0], 0, 0, 0) <<
"A signal of type " << signum
32 <<
" was caught by ARTDAQSupervisor. Shutting down DAQInterface, "
33 "then proceeding with default handlers!";
39 pthread_sigmask(SIG_UNBLOCK, NULL, &set);
40 pthread_sigmask(SIG_UNBLOCK, &set, NULL);
42 TRACE_STREAMER(TLVL_ERROR, &(
"SharedMemoryManager")[0], 0, 0, 0) <<
"Calling default signal handler";
45 sigaction(signum, &old_actions[signum], NULL);
46 kill(getpid(), signum);
52 sigaction(SIGINT, &old_actions[SIGINT], NULL);
53 kill(getpid(), SIGINT);
59 static std::mutex sighandler_mutex;
60 std::unique_lock<std::mutex> lk(sighandler_mutex);
64 sighandler_init =
true;
66 std::vector<int> signals = {
67 SIGINT, SIGILL, SIGABRT, SIGFPE, SIGSEGV, SIGPIPE, SIGALRM, SIGTERM, SIGUSR2, SIGHUP};
68 for(
auto signal : signals)
70 struct sigaction old_action;
71 sigaction(signal, NULL, &old_action);
75 if(old_action.sa_handler != SIG_IGN)
77 struct sigaction action;
78 action.sa_handler = signal_handler;
79 sigemptyset(&action.sa_mask);
80 for(
auto sigblk : signals)
82 sigaddset(&action.sa_mask, sigblk);
88 sigaction(signal, &action, NULL);
89 old_actions[signal] = old_action;
96 ARTDAQSupervisor::ARTDAQSupervisor(xdaq::ApplicationStub* stub)
98 , daqinterface_ptr_(NULL)
99 , partition_(getSupervisorProperty(
"partition", 0))
100 , daqinterface_state_(
"notrunning")
101 , runner_thread_(nullptr)
103 __SUP_COUT__ <<
"Constructor." << __E__;
106 init_sighandler(
this);
109 unsetenv(
"PYTHONPATH");
110 unsetenv(
"PYTHONHOME");
113 auto settings_file = __ENV__(
"DAQINTERFACE_SETTINGS");
114 std::ofstream o(settings_file, std::ios::trunc);
116 setenv(
"DAQINTERFACE_PARTITION_NUMBER", std::to_string(partition_).c_str(), 1);
117 auto logfileName = std::string(__ENV__(
"OTSDAQ_LOG_DIR")) +
"/DAQInteface/DAQInterface_partition" + std::to_string(partition_) +
".log";
118 setenv(
"DAQINTERFACE_LOGFILE", logfileName.c_str(), 1);
120 o <<
"log_directory: " << getSupervisorProperty(
"log_directory", std::string(__ENV__(
"OTSDAQ_LOG_DIR"))) << std::endl;
123 const std::string record_directory = getSupervisorProperty(
"record_directory", ARTDAQTableBase::ARTDAQ_FCL_PATH +
"/run_records/");
124 mkdir(record_directory.c_str(), 0755);
125 o <<
"record_directory: " << record_directory << std::endl;
128 o <<
"package_hashes_to_save: " << getSupervisorProperty(
"package_hashes_to_save",
"[artdaq]") << std::endl;
130 o <<
"productsdir_for_bash_scripts: " << getSupervisorProperty(
"productsdir_for_bash_scripts", std::string(__ENV__(
"OTS_PRODUCTS"))) << std::endl;
131 o <<
"boardreader timeout: " << getSupervisorProperty(
"boardreader_timeout", 30) << std::endl;
132 o <<
"eventbuilder timeout: " << getSupervisorProperty(
"eventbuilder_timeout", 30) << std::endl;
133 o <<
"datalogger timeout: " << getSupervisorProperty(
"datalogger_timeout", 30) << std::endl;
134 o <<
"dispatcher timeout: " << getSupervisorProperty(
"dispatcher_timeout", 30) << std::endl;
135 o <<
"max_fragment_size_bytes: " << getSupervisorProperty(
"max_fragment_size_bytes", 1048576) << std::endl;
136 o <<
"transfer_plugin_to_use: " << getSupervisorProperty(
"transfer_plugin_to_use",
"Autodetect") << std::endl;
137 o <<
"all_events_to_all_dispatchers: " << std::boolalpha << getSupervisorProperty(
"all_events_to_all_dispatchers",
true) << std::endl;
138 o <<
"data_directory_override: " << getSupervisorProperty(
"data_directory_override", std::string(__ENV__(
"ARTDAQ_OUTPUT_DIR"))) << std::endl;
139 o <<
"max_configurations_to_list: " << getSupervisorProperty(
"max_configurations_to_list", 10) << std::endl;
140 o <<
"disable_unique_rootfile_labels: " << getSupervisorProperty(
"disable_unique_rootfile_labels",
false) << std::endl;
141 o <<
"use_messageviewer: " << std::boolalpha << getSupervisorProperty(
"use_messageviewer",
false) << std::endl;
142 o <<
"fake_messagefacility: " << std::boolalpha << getSupervisorProperty(
"fake_messagefacility",
false) << std::endl;
143 o <<
"advanced_memory_usage: " << std::boolalpha << getSupervisorProperty(
"advanced_memory_usage",
false) << std::endl;
144 o <<
"disable_private_network_bookkeeping: " << std::boolalpha << getSupervisorProperty(
"disable_private_network_bookkeeping",
false) << std::endl;
147 __SUP_COUT__ <<
"Constructed." << __E__;
151 ARTDAQSupervisor::~ARTDAQSupervisor(
void)
153 __SUP_COUT__ <<
"Destructor." << __E__;
155 __SUP_COUT__ <<
"Destructed." << __E__;
159 void ARTDAQSupervisor::init(
void)
163 __SUP_COUT__ <<
"Initializing..." << __E__;
165 std::lock_guard<std::recursive_mutex> lk(daqinterface_mutex_);
168 artdaq::configureMessageFacility(
"ARTDAQSupervisor");
169 __SUP_COUT__ <<
"artdaq MF configured." << __E__;
172 char* daqinterface_dir = getenv(
"ARTDAQ_DAQINTERFACE_DIR");
173 if(daqinterface_dir == NULL)
175 __SS__ <<
"ARTDAQ_DAQINTERFACE_DIR environment variable not set! This "
176 "means that DAQInterface has not been setup!"
182 __SUP_COUT__ <<
"Initializing Python" << __E__;
185 __SUP_COUT__ <<
"Adding DAQInterface directory to PYTHON_PATH" << __E__;
186 PyObject* sysPath = PySys_GetObject((
char*)
"path");
187 PyObject* programName = PyString_FromString(daqinterface_dir);
188 PyList_Append(sysPath, programName);
189 Py_DECREF(programName);
191 __SUP_COUT__ <<
"Creating Module name" << __E__;
192 PyObject* pName = PyString_FromString(
"rc.control.daqinterface");
195 __SUP_COUT__ <<
"Importing module" << __E__;
196 PyObject* pModule = PyImport_Import(pName);
202 __SS__ <<
"Failed to load rc.control.daqinterface" << __E__;
207 __SUP_COUT__ <<
"Loading python module dictionary" << __E__;
208 PyObject* pDict = PyModule_GetDict(pModule);
212 __SS__ <<
"Unable to load module dictionary" << __E__;
219 __SUP_COUT__ <<
"Getting DAQInterface object pointer" << __E__;
220 PyObject* di_obj_ptr = PyDict_GetItemString(pDict,
"DAQInterface");
222 __SUP_COUT__ <<
"Filling out DAQInterface args struct" << __E__;
223 PyObject* pArgs = PyTuple_New(0);
225 PyObject* kwargs = Py_BuildValue(
"{s:s, s:s, s:i, s:i, s:s, s:s}",
239 __SUP_COUT__ <<
"Calling DAQInterface Object Constructor" << __E__;
240 daqinterface_ptr_ = PyObject_Call(di_obj_ptr, pArgs, kwargs);
242 Py_DECREF(di_obj_ptr);
250 __SUP_COUT__ <<
"Initialized." << __E__;
254 void ARTDAQSupervisor::destroy(
void)
256 __SUP_COUT__ <<
"Destroying..." << __E__;
258 if(daqinterface_ptr_ != NULL)
260 __SUP_COUT__ <<
"Calling recover transition" << __E__;
261 std::lock_guard<std::recursive_mutex> lk(daqinterface_mutex_);
262 PyObject* pName = PyString_FromString(
"do_recover");
263 PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, NULL);
265 __SUP_COUT__ <<
"Making sure that correct state has been reached" << __E__;
267 while(daqinterface_state_ !=
"stopped")
270 __SUP_COUT__ <<
"State is " << daqinterface_state_ <<
", waiting 1s and retrying..." << __E__;
274 Py_XDECREF(daqinterface_ptr_);
275 daqinterface_ptr_ = NULL;
279 __SUP_COUT__ <<
"Destroyed." << __E__;
283 void ARTDAQSupervisor::transitionConfiguring(toolbox::Event::Reference )
285 __SUP_COUT__ <<
"transitionConfiguring" << __E__;
288 if(RunControlStateMachine::getIterationIndex() == 0 && RunControlStateMachine::getSubIterationIndex() == 0)
290 thread_error_message_ =
"";
291 thread_progress_bar_.resetProgressBar(0);
292 last_thread_progress_update_ = time(0);
295 SOAPUtilities::translate(theStateMachine_.getCurrentMessage()).getParameters().getValue(
"ConfigurationTableGroupName"),
296 TableGroupKey(SOAPUtilities::translate(theStateMachine_.getCurrentMessage()).getParameters().getValue(
"ConfigurationTableGroupKey")));
298 __SUP_COUT__ <<
"Configuration table group name: " << theGroup.first <<
" key: " << theGroup.second << __E__;
300 theConfigurationManager_->loadTableGroup(theGroup.first, theGroup.second,
true );
303 std::thread(&ARTDAQSupervisor::configuringThread,
this).detach();
305 __SUP_COUT__ <<
"Configuring thread started." << __E__;
307 RunControlStateMachine::indicateSubIterationWork();
311 std::string errorMessage;
313 std::lock_guard<std::mutex> lock(thread_mutex_);
314 errorMessage = thread_error_message_;
316 int progress = thread_progress_bar_.read();
317 __SUP_COUTV__(errorMessage);
318 __SUP_COUTV__(progress);
319 __SUP_COUTV__(thread_progress_bar_.isComplete());
322 if(errorMessage ==
"" &&
323 time(0) - last_thread_progress_update_ > 600)
325 __SUP_SS__ <<
"There has been no update from the configuration thread for " << (time(0) - last_thread_progress_update_)
326 <<
" seconds, assuming something is wrong and giving up! "
327 <<
"Last progress received was " << progress << __E__;
328 errorMessage = ss.str();
331 if(errorMessage !=
"")
333 __SUP_SS__ <<
"Error was caught in configuring thread: " << errorMessage << __E__;
334 __SUP_COUT_ERR__ <<
"\n" << ss.str();
336 theStateMachine_.setErrorMessage(ss.str());
337 throw toolbox::fsm::exception::Exception(
"Transition Error" ,
339 "CoreSupervisorBase::transitionConfiguring" ,
345 if(!thread_progress_bar_.isComplete())
347 RunControlStateMachine::indicateSubIterationWork();
349 if(last_thread_progress_read_ != progress)
351 last_thread_progress_read_ = progress;
352 last_thread_progress_update_ = time(0);
359 __SUP_COUT_INFO__ <<
"Complete configuring transition!" << __E__;
360 __SUP_COUTV__(getProcessInfo_());
368 void ARTDAQSupervisor::configuringThread()
try
370 const std::string& uid =
371 theConfigurationManager_
372 ->getNode(ConfigurationManager::XDAQ_APPLICATION_TABLE_NAME +
"/" + CorePropertySupervisorBase::getSupervisorUID() +
"/" +
"LinkToSupervisorTable")
375 __COUT__ <<
"Supervisor uid is " << uid <<
", getting supervisor table node" << __E__;
377 const std::string mfSubject_ = supervisorClassNoNamespace_ +
"-" + uid;
381 thread_progress_bar_.step();
383 auto info = ARTDAQTableBase::extractARTDAQInfo(
387 CorePropertySupervisorBase::getSupervisorProperty<size_t>(
"max_fragment_size_bytes", ARTDAQTableBase::DEFAULT_MAX_FRAGMENT_SIZE),
388 CorePropertySupervisorBase::getSupervisorProperty<size_t>(
"routing_timeout_ms", ARTDAQTableBase::DEFAULT_ROUTING_TIMEOUT_MS),
389 CorePropertySupervisorBase::getSupervisorProperty<size_t>(
"routing_retry_count", ARTDAQTableBase::DEFAULT_ROUTING_RETRY_COUNT),
390 &thread_progress_bar_);
393 if(info.processes.count(ARTDAQTableBase::ARTDAQAppType::BoardReader) == 0)
395 __GEN_SS__ <<
"There must be at least one enabled BoardReader!" << __E__;
399 if(info.processes.count(ARTDAQTableBase::ARTDAQAppType::EventBuilder) == 0)
401 __GEN_SS__ <<
"There must be at least one enabled EventBuilder!" << __E__;
406 thread_progress_bar_.step();
408 __GEN_COUT__ <<
"Writing boot.txt" << __E__;
410 int debugLevel = theSupervisorNode.getNode(
"DAQInterfaceDebugLevel").getValue<
int>();
411 std::string setupScript = theSupervisorNode.getNode(
"DAQSetupScript").getValue();
413 std::ofstream o(ARTDAQTableBase::ARTDAQ_FCL_PATH +
"/boot.txt", std::ios::trunc);
414 o <<
"DAQ setup script: " << setupScript << std::endl;
415 o <<
"debug level: " << debugLevel << std::endl;
418 if(info.subsystems.size() > 1)
420 for(
auto& ss : info.subsystems)
424 o <<
"Subsystem id: " << ss.first << std::endl;
425 if(ss.second.destination != 0)
427 o <<
"Subsystem destination: " << ss.second.destination << std::endl;
429 for(
auto& sss : ss.second.sources)
431 o <<
"Subsystem source: " << sss << std::endl;
437 for(
auto& builder : info.processes[ARTDAQTableBase::ARTDAQAppType::EventBuilder])
439 o <<
"EventBuilder host: " << builder.hostname << std::endl;
440 o <<
"EventBuilder label: " << builder.label << std::endl;
441 if(builder.subsystem != 1)
443 o <<
"EventBuilder subsystem: " << builder.subsystem << std::endl;
447 for(
auto& logger : info.processes[ARTDAQTableBase::ARTDAQAppType::DataLogger])
449 o <<
"DataLogger host: " << logger.hostname << std::endl;
450 o <<
"DataLogger label: " << logger.label << std::endl;
451 if(logger.subsystem != 1)
453 o <<
"DataLogger subsystem: " << logger.subsystem << std::endl;
457 for(
auto& dispatcher : info.processes[ARTDAQTableBase::ARTDAQAppType::Dispatcher])
459 o <<
"Dispatcher host: " << dispatcher.hostname << std::endl;
460 o <<
"Dispatcher label: " << dispatcher.label << std::endl;
461 if(dispatcher.subsystem != 1)
463 o <<
"Dispatcher subsystem: " << dispatcher.subsystem << std::endl;
467 for(
auto& rmaster : info.processes[ARTDAQTableBase::ARTDAQAppType::RoutingMaster])
469 o <<
"RoutingMaster host: " << rmaster.hostname << std::endl;
470 o <<
"RoutingMaster label: " << rmaster.label << std::endl;
471 if(rmaster.subsystem != 1)
473 o <<
"RoutingMaster subsystem: " << rmaster.subsystem << std::endl;
479 thread_progress_bar_.step();
481 __GEN_COUT__ <<
"Building configuration directory" << __E__;
483 boost::system::error_code ignored;
484 boost::filesystem::remove_all(ARTDAQTableBase::ARTDAQ_FCL_PATH + FAKE_CONFIG_NAME, ignored);
485 mkdir((ARTDAQTableBase::ARTDAQ_FCL_PATH + FAKE_CONFIG_NAME).c_str(), 0755);
487 for(
auto& reader : info.processes[ARTDAQTableBase::ARTDAQAppType::BoardReader])
489 symlink(ARTDAQTableBase::getFlatFHICLFilename(ARTDAQTableBase::ARTDAQAppType::BoardReader, reader.label).c_str(),
490 (ARTDAQTableBase::ARTDAQ_FCL_PATH + FAKE_CONFIG_NAME +
"/" + reader.label +
".fcl").c_str());
492 for(
auto& builder : info.processes[ARTDAQTableBase::ARTDAQAppType::EventBuilder])
494 symlink(ARTDAQTableBase::getFlatFHICLFilename(ARTDAQTableBase::ARTDAQAppType::EventBuilder, builder.label).c_str(),
495 (ARTDAQTableBase::ARTDAQ_FCL_PATH + FAKE_CONFIG_NAME +
"/" + builder.label +
".fcl").c_str());
497 for(
auto& logger : info.processes[ARTDAQTableBase::ARTDAQAppType::DataLogger])
499 symlink(ARTDAQTableBase::getFlatFHICLFilename(ARTDAQTableBase::ARTDAQAppType::DataLogger, logger.label).c_str(),
500 (ARTDAQTableBase::ARTDAQ_FCL_PATH + FAKE_CONFIG_NAME +
"/" + logger.label +
".fcl").c_str());
502 for(
auto& dispatcher : info.processes[ARTDAQTableBase::ARTDAQAppType::Dispatcher])
504 symlink(ARTDAQTableBase::getFlatFHICLFilename(ARTDAQTableBase::ARTDAQAppType::Dispatcher, dispatcher.label).c_str(),
505 (ARTDAQTableBase::ARTDAQ_FCL_PATH + FAKE_CONFIG_NAME +
"/" + dispatcher.label +
".fcl").c_str());
507 for(
auto& rmaster : info.processes[ARTDAQTableBase::ARTDAQAppType::RoutingMaster])
509 symlink(ARTDAQTableBase::getFlatFHICLFilename(ARTDAQTableBase::ARTDAQAppType::RoutingMaster, rmaster.label).c_str(),
510 (ARTDAQTableBase::ARTDAQ_FCL_PATH + FAKE_CONFIG_NAME +
"/" + rmaster.label +
".fcl").c_str());
513 thread_progress_bar_.step();
515 std::lock_guard<std::recursive_mutex> lk(daqinterface_mutex_);
517 if(daqinterface_state_ !=
"stopped" && daqinterface_state_ !=
"")
519 __GEN_SS__ <<
"Cannot configure DAQInterface because it is in the wrong state"
520 <<
" (" << daqinterface_state_ <<
" != stopped)!" << __E__;
524 __GEN_COUT__ <<
"Calling setdaqcomps" << __E__;
525 __GEN_COUT__ <<
"Status before setdaqcomps: " << daqinterface_state_ << __E__;
526 PyObject* pName1 = PyString_FromString(
"setdaqcomps");
528 PyObject* readerDict = PyDict_New();
529 for(
auto& reader : info.processes[ARTDAQTableBase::ARTDAQAppType::BoardReader])
531 PyObject* readerName = PyString_FromString(reader.label.c_str());
533 PyObject* readerData = PyList_New(3);
534 PyObject* readerHost = PyString_FromString(reader.hostname.c_str());
535 PyObject* readerPort = PyString_FromString(
"-1");
536 PyObject* readerSubsystem = PyString_FromString(std::to_string(reader.subsystem).c_str());
537 PyList_SetItem(readerData, 0, readerHost);
538 PyList_SetItem(readerData, 1, readerPort);
539 PyList_SetItem(readerData, 2, readerSubsystem);
540 PyDict_SetItem(readerDict, readerName, readerData);
542 PyObject* res1 = PyObject_CallMethodObjArgs(daqinterface_ptr_, pName1, readerDict, NULL);
543 Py_DECREF(readerDict);
548 __GEN_SS__ <<
"Error calling setdaqcomps transition" << __E__;
552 __GEN_COUT__ <<
"Status after setdaqcomps: " << daqinterface_state_ << __E__;
554 thread_progress_bar_.step();
555 __GEN_COUT__ <<
"Calling do_boot" << __E__;
556 __GEN_COUT__ <<
"Status before boot: " << daqinterface_state_ << __E__;
557 PyObject* pName2 = PyString_FromString(
"do_boot");
558 PyObject* pStateArgs1 = PyString_FromString((ARTDAQTableBase::ARTDAQ_FCL_PATH +
"/boot.txt").c_str());
559 PyObject* res2 = PyObject_CallMethodObjArgs(daqinterface_ptr_, pName2, pStateArgs1, NULL);
564 __GEN_SS__ <<
"Error calling boot transition" << __E__;
569 if(daqinterface_state_ !=
"booted")
571 __GEN_SS__ <<
"DAQInterface boot transition failed! "
572 <<
"Status after boot attempt: " << daqinterface_state_ << __E__;
575 __GEN_COUT__ <<
"Status after boot: " << daqinterface_state_ << __E__;
577 thread_progress_bar_.step();
578 __GEN_COUT__ <<
"Calling do_config" << __E__;
579 __GEN_COUT__ <<
"Status before config: " << daqinterface_state_ << __E__;
580 PyObject* pName3 = PyString_FromString(
"do_config");
581 PyObject* pStateArgs2 = Py_BuildValue(
"[s]", FAKE_CONFIG_NAME);
582 PyObject* res3 = PyObject_CallMethodObjArgs(daqinterface_ptr_, pName3, pStateArgs2, NULL);
587 __GEN_SS__ <<
"Error calling config transition" << __E__;
598 __GEN_COUT__ <<
"Status after config: " << daqinterface_state_ << __E__;
599 thread_progress_bar_.complete();
600 __GEN_COUT__ <<
"Configured." << __E__;
603 catch(
const std::runtime_error& e)
605 __SS__ <<
"Error was caught while configuring: " << e.what() << __E__;
606 __COUT_ERR__ <<
"\n" << ss.str();
607 std::lock_guard<std::mutex> lock(thread_mutex_);
608 thread_error_message_ = ss.str();
612 __SS__ <<
"Unknown error was caught while configuring. Please checked the logs." << __E__;
613 __COUT_ERR__ <<
"\n" << ss.str();
615 artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
617 std::lock_guard<std::mutex> lock(thread_mutex_);
618 thread_error_message_ = ss.str();
622 void ARTDAQSupervisor::transitionHalting(toolbox::Event::Reference )
try
624 __SUP_COUT__ <<
"Halting..." << __E__;
625 std::lock_guard<std::recursive_mutex> lk(daqinterface_mutex_);
627 __SUP_COUT__ <<
"Status before halt: " << daqinterface_state_ << __E__;
629 PyObject* pName = PyString_FromString(
"do_command");
630 PyObject* pArg = PyString_FromString(
"Shutdown");
631 PyObject* res = PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, pArg, NULL);
636 __SS__ <<
"Error calling Shutdown transition" << __E__;
641 __SUP_COUT__ <<
"Status after halt: " << daqinterface_state_ << __E__;
642 __SUP_COUT__ <<
"Halted." << __E__;
644 catch(
const std::runtime_error& e)
646 const std::string transitionName =
"Halting";
648 if(theStateMachine_.getProvenanceStateName() == RunControlStateMachine::FAILED_STATE_NAME ||
649 theStateMachine_.getProvenanceStateName() == RunControlStateMachine::HALTED_STATE_NAME)
651 __SUP_COUT_INFO__ <<
"Error was caught while halting (but ignoring because "
652 "previous state was '"
653 << RunControlStateMachine::FAILED_STATE_NAME <<
"'): " << e.what() << __E__;
657 __SUP_SS__ <<
"Error was caught while " << transitionName <<
": " << e.what() << __E__;
658 __SUP_COUT_ERR__ <<
"\n" << ss.str();
659 theStateMachine_.setErrorMessage(ss.str());
660 throw toolbox::fsm::exception::Exception(
"Transition Error" ,
662 "ARTDAQSupervisorBase::transition" + transitionName ,
670 const std::string transitionName =
"Halting";
672 if(theStateMachine_.getProvenanceStateName() == RunControlStateMachine::FAILED_STATE_NAME ||
673 theStateMachine_.getProvenanceStateName() == RunControlStateMachine::HALTED_STATE_NAME)
675 __SUP_COUT_INFO__ <<
"Unknown error was caught while halting (but ignoring "
676 "because previous state was '"
677 << RunControlStateMachine::FAILED_STATE_NAME <<
"')." << __E__;
681 __SUP_SS__ <<
"Unknown error was caught while " << transitionName <<
". Please checked the logs." << __E__;
682 __SUP_COUT_ERR__ <<
"\n" << ss.str();
683 theStateMachine_.setErrorMessage(ss.str());
685 artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
687 throw toolbox::fsm::exception::Exception(
"Transition Error" ,
689 "ARTDAQSupervisorBase::transition" + transitionName ,
697 void ARTDAQSupervisor::transitionInitializing(toolbox::Event::Reference )
try
699 __SUP_COUT__ <<
"Initializing..." << __E__;
701 __SUP_COUT__ <<
"Initialized." << __E__;
703 catch(
const std::runtime_error& e)
705 __SS__ <<
"Error was caught while Initializing: " << e.what() << __E__;
710 __SS__ <<
"Unknown error was caught while Initializing. Please checked the logs." << __E__;
711 artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
716 void ARTDAQSupervisor::transitionPausing(toolbox::Event::Reference )
try
718 __SUP_COUT__ <<
"Pausing..." << __E__;
719 std::lock_guard<std::recursive_mutex> lk(daqinterface_mutex_);
722 __SUP_COUT__ <<
"Status before pause: " << daqinterface_state_ << __E__;
724 PyObject* pName = PyString_FromString(
"do_command");
725 PyObject* pArg = PyString_FromString(
"Pause");
726 PyObject* res = PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, pArg, NULL);
731 __SS__ <<
"Error calling Pause transition" << __E__;
736 __SUP_COUT__ <<
"Status after pause: " << daqinterface_state_ << __E__;
738 __SUP_COUT__ <<
"Paused." << __E__;
740 catch(
const std::runtime_error& e)
742 __SS__ <<
"Error was caught while Pausing: " << e.what() << __E__;
747 __SS__ <<
"Unknown error was caught while Pausing. Please checked the logs." << __E__;
748 artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
753 void ARTDAQSupervisor::transitionResuming(toolbox::Event::Reference )
try
755 __SUP_COUT__ <<
"Resuming..." << __E__;
756 std::lock_guard<std::recursive_mutex> lk(daqinterface_mutex_);
759 __SUP_COUT__ <<
"Status before resume: " << daqinterface_state_ << __E__;
760 PyObject* pName = PyString_FromString(
"do_command");
761 PyObject* pArg = PyString_FromString(
"Resume");
762 PyObject* res = PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, pArg, NULL);
767 __SS__ <<
"Error calling Resume transition" << __E__;
771 __SUP_COUT__ <<
"Status after resume: " << daqinterface_state_ << __E__;
772 __SUP_COUT__ <<
"Resumed." << __E__;
774 catch(
const std::runtime_error& e)
776 __SS__ <<
"Error was caught while Resuming: " << e.what() << __E__;
781 __SS__ <<
"Unknown error was caught while Resuming. Please checked the logs." << __E__;
782 artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
787 void ARTDAQSupervisor::transitionStarting(toolbox::Event::Reference )
try
789 __SUP_COUT__ <<
"Starting..." << __E__;
791 std::lock_guard<std::recursive_mutex> lk(daqinterface_mutex_);
793 __SUP_COUT__ <<
"Status before start: " << daqinterface_state_ << __E__;
794 auto runNumber = SOAPUtilities::translate(theStateMachine_.getCurrentMessage()).getParameters().getValue(
"RunNumber");
796 PyObject* pName = PyString_FromString(
"do_start_running");
797 int run_number = std::stoi(runNumber);
798 PyObject* pStateArgs = PyInt_FromLong(run_number);
799 PyObject* res = PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, pStateArgs, NULL);
804 __SS__ <<
"Error calling start transition" << __E__;
808 __SUP_COUT__ <<
"Status after start: " << daqinterface_state_ << __E__;
809 if(daqinterface_state_ !=
"running")
811 __SS__ <<
"DAQInterface start transition failed!" << __E__;
816 __SUP_COUT__ <<
"Started." << __E__;
818 catch(
const std::runtime_error& e)
820 __SS__ <<
"Error was caught while Starting: " << e.what() << __E__;
825 __SS__ <<
"Unknown error was caught while Starting. Please checked the logs." << __E__;
826 artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
831 void ARTDAQSupervisor::transitionStopping(toolbox::Event::Reference )
try
833 __SUP_COUT__ <<
"Stopping..." << __E__;
834 std::lock_guard<std::recursive_mutex> lk(daqinterface_mutex_);
836 __SUP_COUT__ <<
"Status before stop: " << daqinterface_state_ << __E__;
837 PyObject* pName = PyString_FromString(
"do_stop_running");
838 PyObject* res = PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, NULL);
843 __SS__ <<
"Error calling stop transition" << __E__;
847 __SUP_COUT__ <<
"Status after stop: " << daqinterface_state_ << __E__;
848 __SUP_COUT__ <<
"Stopped." << __E__;
850 catch(
const std::runtime_error& e)
852 __SS__ <<
"Error was caught while Stopping: " << e.what() << __E__;
857 __SS__ <<
"Unknown error was caught while Stopping. Please checked the logs." << __E__;
858 artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
863 void ots::ARTDAQSupervisor::enteringError(toolbox::Event::Reference )
865 __SUP_COUT__ <<
"Entering error recovery state" << __E__;
866 std::lock_guard<std::recursive_mutex> lk(daqinterface_mutex_);
868 __SUP_COUT__ <<
"Status before error: " << daqinterface_state_ << __E__;
870 PyObject* pName = PyString_FromString(
"do_recover");
871 PyObject* res = PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, NULL);
876 __SS__ <<
"Error calling recover transition" << __E__;
880 __SUP_COUT__ <<
"Status after error: " << daqinterface_state_ << __E__;
881 __SUP_COUT__ <<
"EnteringError DONE." << __E__;
886 void ots::ARTDAQSupervisor::getDAQState_()
889 std::lock_guard<std::recursive_mutex> lk(daqinterface_mutex_);
891 if(daqinterface_ptr_ ==
nullptr)
893 daqinterface_state_ =
"";
897 PyObject* pName = PyString_FromString(
"state");
898 PyObject* pArg = PyString_FromString(
"DAQInterface");
899 PyObject* res = PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, pArg, NULL);
904 __SS__ <<
"Error calling state function" << __E__;
908 daqinterface_state_ = std::string(PyString_AsString(res));
913 std::string ots::ARTDAQSupervisor::getProcessInfo_(
void)
916 std::lock_guard<std::recursive_mutex> lk(daqinterface_mutex_);
918 if(daqinterface_ptr_ ==
nullptr)
923 PyObject* pName = PyString_FromString(
"artdaq_process_info");
924 PyObject* pArg = PyString_FromString(
"DAQInterface");
925 PyObject* res = PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, pArg, NULL);
930 __SS__ <<
"Error calling artdaq_process_info function" << __E__;
934 return std::string(PyString_AsString(res));
939 std::list<ots::ARTDAQSupervisor::DAQInterfaceProcessInfo> ots::ARTDAQSupervisor::getAndParseProcessInfo_()
941 std::list<ots::ARTDAQSupervisor::DAQInterfaceProcessInfo> output;
942 auto info = getProcessInfo_();
943 auto procs = tokenize_(info);
952 std::regex re(
"(.*?) at ([^:]*):(\\d+) \\(subsystem (\\d+), rank (\\d+)\\): (.*)");
954 for(
auto& proc : procs)
957 if(std::regex_match(proc, match, re))
959 DAQInterfaceProcessInfo info;
961 info.label = match[1];
962 info.host = match[2];
963 info.port = std::stoi(match[3]);
964 info.subsystem = std::stoi(match[4]);
965 info.rank = std::stoi(match[5]);
966 info.state = match[6];
968 output.push_back(info);
975 std::list<std::pair<ots::ARTDAQSupervisor::DAQInterfaceProcessInfo, std::unique_ptr<artdaq::CommanderInterface>>>
976 ots::ARTDAQSupervisor::makeCommandersFromProcessInfo()
978 std::list<std::pair<DAQInterfaceProcessInfo, std::unique_ptr<artdaq::CommanderInterface>>> output;
979 auto infos = getAndParseProcessInfo_();
981 for(
auto& info : infos)
983 artdaq::Commandable cm;
984 fhicl::ParameterSet ps;
986 ps.put<std::string>(
"commanderPluginType",
"xmlrpc");
987 ps.put<
int>(
"id", info.port);
988 ps.put<std::string>(
"server_url", info.host);
991 std::make_pair<DAQInterfaceProcessInfo, std::unique_ptr<artdaq::CommanderInterface>>(std::move(info), artdaq::MakeCommanderPlugin(ps, cm)));
998 std::list<std::string> ots::ARTDAQSupervisor::tokenize_(std::string
const& input)
1001 std::list<std::string> output;
1003 while(pos != std::string::npos && pos < input.size())
1005 auto newpos = input.find(
'\n', pos);
1006 if(newpos != std::string::npos)
1008 output.emplace_back(input, pos, newpos - pos);
1014 output.emplace_back(input, pos);
1023 void ots::ARTDAQSupervisor::daqinterfaceRunner_()
1025 TLOG(TLVL_TRACE) <<
"Runner thread starting";
1026 runner_running_ =
true;
1027 while(runner_running_)
1029 if(daqinterface_ptr_ != NULL)
1031 std::unique_lock<std::recursive_mutex> lk(daqinterface_mutex_);
1033 std::string state_before = daqinterface_state_;
1035 if(daqinterface_state_ ==
"running" || daqinterface_state_ ==
"ready" || daqinterface_state_ ==
"booted")
1039 TLOG(TLVL_TRACE) <<
"Calling DAQInterface::check_proc_heartbeats";
1040 PyObject* pName = PyString_FromString(
"check_proc_heartbeats");
1041 PyObject* res = PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, NULL);
1042 TLOG(TLVL_TRACE) <<
"Done with DAQInterface::check_proc_heartbeats call";
1046 runner_running_ =
false;
1049 __SS__ <<
"Error calling check_proc_heartbeats function" << __E__;
1054 catch(cet::exception& ex)
1056 runner_running_ =
false;
1059 __SS__ <<
"An cet::exception occurred while calling "
1060 "check_proc_heartbeats function: "
1061 << ex.explain_self() << __E__;
1065 catch(std::exception& ex)
1067 runner_running_ =
false;
1070 __SS__ <<
"An std::exception occurred while calling "
1071 "check_proc_heartbeats function: "
1072 << ex.what() << __E__;
1078 runner_running_ =
false;
1081 __SS__ <<
"An unknown Error occurred while calling runner function" << __E__;
1087 if(daqinterface_state_ != state_before)
1089 runner_running_ =
false;
1091 __SS__ <<
"DAQInterface state unexpectedly changed from " << state_before <<
" to " << daqinterface_state_
1092 <<
". Check supervisor log file for more info!" << __E__;
1104 runner_running_ =
false;
1105 TLOG(TLVL_TRACE) <<
"Runner thread complete";
1109 void ots::ARTDAQSupervisor::stop_runner_()
1111 runner_running_ =
false;
1112 if(runner_thread_ && runner_thread_->joinable())
1114 runner_thread_->join();
1115 runner_thread_.reset(
nullptr);
1120 void ots::ARTDAQSupervisor::start_runner_()
1123 runner_thread_ = std::make_unique<std::thread>(&ots::ARTDAQSupervisor::daqinterfaceRunner_,
this);