tdaq-develop-2025-02-12
ARTDAQSupervisor.cc
1 
2 
3 #define TRACEMF_USE_VERBATIM 1 // for trace longer path filenames
4 #include "otsdaq/ARTDAQSupervisor/ARTDAQSupervisor.hh"
5 
6 #include "artdaq-core/Utilities/configureMessageFacility.hh"
7 #include "artdaq/BuildInfo/GetPackageBuildInfo.hh"
8 #include "artdaq/DAQdata/Globals.hh"
9 #include "artdaq/ExternalComms/MakeCommanderPlugin.hh"
10 #include "cetlib_except/exception.h"
11 #include "fhiclcpp/make_ParameterSet.h"
12 #include "otsdaq/ARTDAQSupervisor/ARTDAQSupervisorTRACEController.h"
13 
14 #include "artdaq-core/Utilities/ExceptionHandler.hh" /*for artdaq::ExceptionHandler*/
15 
16 #include <boost/exception/all.hpp>
17 #include <boost/filesystem.hpp>
18 
19 #include <signal.h>
20 #include <regex>
21 
22 using namespace ots;
23 
24 XDAQ_INSTANTIATOR_IMPL(ARTDAQSupervisor)
25 
26 #define FAKE_CONFIG_NAME "ots_config"
27 #define DAQINTERFACE_PORT \
28  std::atoi(__ENV__("ARTDAQ_BASE_PORT")) + \
29  (partition_ * std::atoi(__ENV__("ARTDAQ_PORTS_PER_PARTITION")))
30 
31 static ARTDAQSupervisor* instance = nullptr;
32 static std::unordered_map<int, struct sigaction> old_actions =
33  std::unordered_map<int, struct sigaction>();
34 static bool sighandler_init = false;
35 static void signal_handler(int signum)
36 {
37  // Messagefacility may already be gone at this point, TRACE ONLY!
38 #if TRACE_REVNUM < 1459
39  TRACE_STREAMER(TLVL_ERROR, &("ARTDAQsupervisor")[0], 0, 0, 0)
40 #else
41  TRACE_STREAMER(TLVL_ERROR, TLOG2("ARTDAQsupervisor", 0), 0)
42 #endif
43  << "A signal of type " << signum
44  << " was caught by ARTDAQSupervisor. Shutting down DAQInterface, "
45  "then proceeding with default handlers!";
46 
47  if(instance)
48  instance->destroy();
49 
50  sigset_t set;
51  pthread_sigmask(SIG_UNBLOCK, NULL, &set);
52  pthread_sigmask(SIG_UNBLOCK, &set, NULL);
53 
54 #if TRACE_REVNUM < 1459
55  TRACE_STREAMER(TLVL_ERROR, &("ARTDAQsupervisor")[0], 0, 0, 0)
56 #else
57  TRACE_STREAMER(TLVL_ERROR, TLOG2("ARTDAQsupervisor", 0), 0)
58 #endif
59  << "Calling default signal handler";
60  if(signum != SIGUSR2)
61  {
62  sigaction(signum, &old_actions[signum], NULL);
63  kill(getpid(), signum); // Only send signal to self
64  }
65  else
66  {
67  // Send Interrupt signal if parsing SIGUSR2 (i.e. user-defined exception that
68  // should tear down ARTDAQ)
69  sigaction(SIGINT, &old_actions[SIGINT], NULL);
70  kill(getpid(), SIGINT); // Only send signal to self
71  }
72 }
73 
74 static void init_sighandler(ARTDAQSupervisor* inst)
75 {
76  static std::mutex sighandler_mutex;
77  std::unique_lock<std::mutex> lk(sighandler_mutex);
78 
79  if(!sighandler_init)
80  {
81  sighandler_init = true;
82  instance = inst;
83  std::vector<int> signals = {
84  SIGINT,
85  SIGILL,
86  SIGABRT,
87  SIGFPE,
88  SIGSEGV,
89  SIGPIPE,
90  SIGALRM,
91  SIGTERM,
92  SIGUSR2,
93  SIGHUP}; // SIGQUIT is used by art in normal operation
94  for(auto signal : signals)
95  {
96  struct sigaction old_action;
97  sigaction(signal, NULL, &old_action);
98 
99  // If the old handler wasn't SIG_IGN (it's a handler that just
100  // "ignore" the signal)
101  if(old_action.sa_handler != SIG_IGN)
102  {
103  struct sigaction action;
104  action.sa_handler = signal_handler;
105  sigemptyset(&action.sa_mask);
106  for(auto sigblk : signals)
107  {
108  sigaddset(&action.sa_mask, sigblk);
109  }
110  action.sa_flags = 0;
111 
112  // Replace the signal handler of SIGINT with the one described by
113  // new_action
114  sigaction(signal, &action, NULL);
115  old_actions[signal] = old_action;
116  }
117  }
118  }
119 }
120 
121 //==============================================================================
122 ARTDAQSupervisor::ARTDAQSupervisor(xdaq::ApplicationStub* stub)
123  : CoreSupervisorBase(stub)
124  , daqinterface_ptr_(NULL)
125  , partition_(getSupervisorProperty("partition", 0))
126  , daqinterface_state_("notrunning")
127  , runner_thread_(nullptr)
128 {
129  __SUP_COUT__ << "Constructor." << __E__;
130 
131  INIT_MF("." /*directory used is USER_DATA/LOG/.*/);
132  init_sighandler(this);
133 
134  // Only use system Python
135  // unsetenv("PYTHONPATH");
136  // unsetenv("PYTHONHOME");
137 
138  // Write out settings file
139  auto settings_file = __ENV__("DAQINTERFACE_SETTINGS");
140  std::ofstream o(settings_file, std::ios::trunc);
141 
142  setenv("DAQINTERFACE_PARTITION_NUMBER", std::to_string(partition_).c_str(), 1);
143  auto logfileName = std::string(__ENV__("OTSDAQ_LOG_DIR")) +
144  "/DAQInteface/DAQInterface_partition" +
145  std::to_string(partition_) + ".log";
146  setenv("DAQINTERFACE_LOGFILE", logfileName.c_str(), 1);
147 
148  o << "log_directory: "
149  << getSupervisorProperty("log_directory", std::string(__ENV__("OTSDAQ_LOG_DIR")))
150  << std::endl;
151 
152  {
153  const std::string record_directory = getSupervisorProperty(
154  "record_directory", ARTDAQTableBase::ARTDAQ_FCL_PATH + "/run_records/");
155  mkdir(record_directory.c_str(), 0755);
156  o << "record_directory: " << record_directory << std::endl;
157  }
158 
159  o << "package_hashes_to_save: "
160  << getSupervisorProperty("package_hashes_to_save", "[artdaq]") << std::endl;
161  // Note that productsdir_for_bash_scripts is REQUIRED!
162  __SUP_COUT__ << "Use spack is " << getSupervisorProperty("use_spack", false)
163  << ", spack_root is "
164  << getSupervisorProperty("spack_root_for_bash_scripts", "NOT SET")
165  << ", productsdir is "
166  << getSupervisorProperty("productsdir_for_bash_scripts", "NOT SET")
167  << __E__;
168  if(getSupervisorProperty("use_spack", false))
169  {
170  o << "spack_root_for_bash_scripts: "
171  << getSupervisorProperty("spack_root_for_bash_scripts",
172  std::string(__ENV__("SPACK_ROOT")))
173  << std::endl;
174  }
175  else
176  {
177  o << "productsdir_for_bash_scripts: "
178  << getSupervisorProperty("productsdir_for_bash_scripts",
179  std::string(__ENV__("OTS_PRODUCTS")))
180  << std::endl;
181  }
182  o << "boardreader timeout: " << getSupervisorProperty("boardreader_timeout", 30)
183  << std::endl;
184  o << "eventbuilder timeout: " << getSupervisorProperty("eventbuilder_timeout", 30)
185  << std::endl;
186  o << "datalogger timeout: " << getSupervisorProperty("datalogger_timeout", 30)
187  << std::endl;
188  o << "dispatcher timeout: " << getSupervisorProperty("dispatcher_timeout", 30)
189  << std::endl;
190  // Only put max_fragment_size_bytes into DAQInterface settings file if advanced_memory_usage is disabled
191  if(!getSupervisorProperty("advanced_memory_usage", false))
192  {
193  o << "max_fragment_size_bytes: "
194  << getSupervisorProperty("max_fragment_size_bytes", 1048576) << std::endl;
195  }
196  o << "transfer_plugin_to_use: "
197  << getSupervisorProperty("transfer_plugin_to_use", "Autodetect") << std::endl;
198  o << "all_events_to_all_dispatchers: " << std::boolalpha
199  << getSupervisorProperty("all_events_to_all_dispatchers", true) << std::endl;
200  o << "data_directory_override: "
201  << getSupervisorProperty("data_directory_override",
202  std::string(__ENV__("ARTDAQ_OUTPUT_DIR")))
203  << std::endl;
204  o << "max_configurations_to_list: "
205  << getSupervisorProperty("max_configurations_to_list", 10) << std::endl;
206  o << "disable_unique_rootfile_labels: "
207  << getSupervisorProperty("disable_unique_rootfile_labels", false) << std::endl;
208  o << "use_messageviewer: " << std::boolalpha
209  << getSupervisorProperty("use_messageviewer", false) << std::endl;
210  o << "use_messagefacility: " << std::boolalpha
211  << getSupervisorProperty("use_messagefacility", true) << std::endl;
212  o << "fake_messagefacility: " << std::boolalpha
213  << getSupervisorProperty("fake_messagefacility", false) << std::endl;
214  o << "kill_existing_processes: " << std::boolalpha
215  << getSupervisorProperty("kill_existing_processes", true) << std::endl;
216  o << "advanced_memory_usage: " << std::boolalpha
217  << getSupervisorProperty("advanced_memory_usage", false) << std::endl;
218  o << "strict_fragment_id_mode: " << std::boolalpha
219  << getSupervisorProperty("strict_fragment_id_mode", false) << std::endl;
220  o << "disable_private_network_bookkeeping: " << std::boolalpha
221  << getSupervisorProperty("disable_private_network_bookkeeping", false) << std::endl;
222  o << "allowed_processors: " << getSupervisorProperty("allowed_processors", "0-255")
223  << std::
224  endl; // Note this sets a taskset for ALL processes, on all nodes (ex. "1,2,5-7")
225 
226  o.close();
227 
228  // destroy current TRACEController and instantiate ARTDAQSupervisorTRACEController
229  if(CorePropertySupervisorBase::theTRACEController_)
230  {
231  __SUP_COUT__ << "Destroying TRACE Controller..." << __E__;
232  delete CorePropertySupervisorBase::
233  theTRACEController_; // destruct current TRACEController
234  CorePropertySupervisorBase::theTRACEController_ = nullptr;
235  }
236  CorePropertySupervisorBase::theTRACEController_ =
238  ((ARTDAQSupervisorTRACEController*)CorePropertySupervisorBase::theTRACEController_)
239  ->setSupervisorPtr(this);
240 
241  __SUP_COUT__ << "Constructed." << __E__;
242 } // end constructor()
243 
244 //==============================================================================
245 ARTDAQSupervisor::~ARTDAQSupervisor(void)
246 {
247  __SUP_COUT__ << "Destructor." << __E__;
248  destroy();
249  __SUP_COUT__ << "Destructed." << __E__;
250 } // end destructor()
251 
252 //==============================================================================
253 void ARTDAQSupervisor::destroy(void)
254 {
255  __SUP_COUT__ << "Destroying..." << __E__;
256 
257  if(daqinterface_ptr_ != NULL)
258  {
259  __SUP_COUT__ << "Calling recover transition" << __E__;
260  std::lock_guard<std::recursive_mutex> lk(daqinterface_mutex_);
261  PyObject* pName = PyUnicode_FromString("do_recover");
262  /*PyObject* res =*/PyObject_CallMethodObjArgs(
263  daqinterface_ptr_, pName, NULL);
264 
265  __SUP_COUT__ << "Making sure that correct state has been reached" << __E__;
266  getDAQState_();
267  while(daqinterface_state_ != "stopped")
268  {
269  getDAQState_();
270  __SUP_COUT__ << "State is " << daqinterface_state_
271  << ", waiting 1s and retrying..." << __E__;
272  usleep(1000000);
273  }
274 
275  Py_XDECREF(daqinterface_ptr_);
276  daqinterface_ptr_ = NULL;
277  }
278 
279  Py_Finalize();
280 
281  // CorePropertySupervisorBase would destroy, but since it was created here, attempt to destroy
283  {
284  __SUP_COUT__ << "Destroying TRACE Controller..." << __E__;
287  }
288 
289  __SUP_COUT__ << "Destroyed." << __E__;
290 } // end destroy()
291 
292 //==============================================================================
293 void ARTDAQSupervisor::init(void)
294 {
295  stop_runner_();
296 
297  __SUP_COUT__ << "Initializing..." << __E__;
298  {
299  std::lock_guard<std::recursive_mutex> lk(daqinterface_mutex_);
300 
301  // allSupervisorInfo_.init(getApplicationContext());
302  artdaq::configureMessageFacility("ARTDAQSupervisor");
303  __SUP_COUT__ << "artdaq MF configured." << __E__;
304 
305  // initialization
306  char* daqinterface_dir = getenv("ARTDAQ_DAQINTERFACE_DIR");
307  if(daqinterface_dir == NULL)
308  {
309  __SS__ << "ARTDAQ_DAQINTERFACE_DIR environment variable not set! This "
310  "means that DAQInterface has not been setup!"
311  << __E__;
312  __SUP_SS_THROW__;
313  }
314  else
315  {
316  __SUP_COUT__ << "Initializing Python" << __E__;
317  Py_Initialize();
318 
319  __SUP_COUT__ << "Adding DAQInterface directory to PYTHON_PATH" << __E__;
320  PyObject* sysPath = PySys_GetObject((char*)"path");
321  PyObject* programName = PyUnicode_FromString(daqinterface_dir);
322  PyList_Append(sysPath, programName);
323  Py_DECREF(programName);
324 
325  __SUP_COUT__ << "Creating Module name" << __E__;
326  PyObject* pName = PyUnicode_FromString("rc.control.daqinterface");
327  /* Error checking of pName left out */
328 
329  __SUP_COUT__ << "Importing module" << __E__;
330  PyObject* pModule = PyImport_Import(pName);
331  Py_DECREF(pName);
332 
333  if(pModule == NULL)
334  {
335  PyErr_Print();
336  __SS__ << "Failed to load rc.control.daqinterface" << __E__;
337  __SUP_SS_THROW__;
338  }
339  else
340  {
341  __SUP_COUT__ << "Loading python module dictionary" << __E__;
342  PyObject* pDict = PyModule_GetDict(pModule);
343  if(pDict == NULL)
344  {
345  PyErr_Print();
346  __SS__ << "Unable to load module dictionary" << __E__;
347  __SUP_SS_THROW__;
348  }
349  else
350  {
351  Py_DECREF(pModule);
352 
353  __SUP_COUT__ << "Getting DAQInterface object pointer" << __E__;
354  PyObject* di_obj_ptr = PyDict_GetItemString(pDict, "DAQInterface");
355 
356  __SUP_COUT__ << "Filling out DAQInterface args struct" << __E__;
357  PyObject* pArgs = PyTuple_New(0);
358 
359  PyObject* kwargs = Py_BuildValue("{s:s, s:s, s:i, s:i, s:s, s:s}",
360  "logpath",
361  ".daqint.log",
362  "name",
363  "DAQInterface",
364  "partition_number",
365  partition_,
366  "rpc_port",
367  DAQINTERFACE_PORT,
368  "rpc_host",
369  "localhost",
370  "control_host",
371  "localhost");
372 
373  __SUP_COUT__ << "Calling DAQInterface Object Constructor" << __E__;
374  daqinterface_ptr_ = PyObject_Call(di_obj_ptr, pArgs, kwargs);
375 
376  Py_DECREF(di_obj_ptr);
377  }
378  }
379  }
380 
381  getDAQState_();
382 
383  // { //attempt to cleanup old artdaq processes DOES NOT WORK because artdaq interface knows it hasn't started
384  // __SUP_COUT__ << "Attempting artdaq stale cleanup..." << __E__;
385  // std::lock_guard<std::recursive_mutex> lk(daqinterface_mutex_);
386  // getDAQState_();
387  // __SUP_COUT__ << "Status before cleanup: " << daqinterface_state_ << __E__;
388 
389  // PyObject* pName = PyUnicode_FromString("do_recover");
390  // PyObject* res = PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, NULL);
391 
392  // if(res == NULL)
393  // {
394  // PyErr_Print();
395  // __SS__ << "Error with clean up calling do_recover" << __E__;
396  // __SUP_SS_THROW__;
397  // }
398  // getDAQState_();
399  // __SUP_COUT__ << "Status after cleanup: " << daqinterface_state_ << __E__;
400  // __SUP_COUT__ << "cleanup DONE." << __E__;
401  // }
402  }
403  start_runner_();
404  __SUP_COUT__ << "Initialized." << __E__;
405 } // end init()
406 
407 //==============================================================================
408 void ARTDAQSupervisor::transitionConfiguring(toolbox::Event::Reference /*event*/)
409 {
410  __SUP_COUT__ << "transitionConfiguring" << __E__;
411 
412  // activate the configuration tree (the first iteration)
413  if(RunControlStateMachine::getIterationIndex() == 0 &&
414  RunControlStateMachine::getSubIterationIndex() == 0)
415  {
416  thread_error_message_ = "";
417  thread_progress_bar_.resetProgressBar(0);
418  last_thread_progress_update_ = time(0); // initialize timeout timer
419 
420  std::pair<std::string /*group name*/, TableGroupKey> theGroup(
421  SOAPUtilities::translate(theStateMachine_.getCurrentMessage())
422  .getParameters()
423  .getValue("ConfigurationTableGroupName"),
424  TableGroupKey(SOAPUtilities::translate(theStateMachine_.getCurrentMessage())
425  .getParameters()
426  .getValue("ConfigurationTableGroupKey")));
427 
428  __SUP_COUT__ << "Configuration table group name: " << theGroup.first
429  << " key: " << theGroup.second << __E__;
430 
431  try
432  {
433  // disable version tracking to accept untracked versions to be selected by the FSM transition source
434  theConfigurationManager_->loadTableGroup(
435  theGroup.first,
436  theGroup.second,
437  true /*doActivate*/,
438  0,
439  0,
440  0,
441  0,
442  0,
443  0,
444  false,
445  0,
446  0,
447  ConfigurationManager::LoadGroupType::ALL_TYPES,
448  true /*ignoreVersionTracking*/);
449  }
450  catch(const std::runtime_error& e)
451  {
452  __SS__ << "Error loading table group '" << theGroup.first << "("
453  << theGroup.second << ")! \n"
454  << e.what() << __E__;
455  __SUP_COUT_ERR__ << ss.str();
456  // ExceptionHandler(ExceptionHandlerRethrow::no, ss.str());
457 
458  //__SS_THROW_ONLY__;
459  theStateMachine_.setErrorMessage(ss.str());
460  throw toolbox::fsm::exception::Exception(
461  "Transition Error" /*name*/,
462  ss.str() /* message*/,
463  "ARTDAQSupervisor::transitionConfiguring" /*module*/,
464  __LINE__ /*line*/,
465  __FUNCTION__ /*function*/
466  );
467  }
468  catch(...)
469  {
470  __SS__ << "Unknown error loading table group '" << theGroup.first << "("
471  << theGroup.second << ")!" << __E__;
472  __SUP_COUT_ERR__ << ss.str();
473  // ExceptionHandler(ExceptionHandlerRethrow::no, ss.str());
474 
475  //__SS_THROW_ONLY__;
476  theStateMachine_.setErrorMessage(ss.str());
477  throw toolbox::fsm::exception::Exception(
478  "Transition Error" /*name*/,
479  ss.str() /* message*/,
480  "ARTDAQSupervisor::transitionConfiguring" /*module*/,
481  __LINE__ /*line*/,
482  __FUNCTION__ /*function*/
483  );
484  }
485 
486  // start configuring thread
487  std::thread(&ARTDAQSupervisor::configuringThread, this).detach();
488 
489  __SUP_COUT__ << "Configuring thread started." << __E__;
490 
491  RunControlStateMachine::
492  indicateIterationWork(); // use Iteration to allow other steps to complete in the system
493  }
494  else // not first time
495  {
496  std::string errorMessage;
497  {
498  std::lock_guard<std::mutex> lock(
499  thread_mutex_); // lock out for remainder of scope
500  errorMessage = thread_error_message_; // theStateMachine_.getErrorMessage();
501  }
502  int progress = thread_progress_bar_.read();
503  __SUP_COUTV__(errorMessage);
504  __SUP_COUTV__(progress);
505  __SUP_COUTV__(thread_progress_bar_.isComplete());
506 
507  // check for done and error messages
508  if(errorMessage == "" && // if no update in 600 seconds, give up
509  time(0) - last_thread_progress_update_ > 600)
510  {
511  __SUP_SS__ << "There has been no update from the configuration thread for "
512  << (time(0) - last_thread_progress_update_)
513  << " seconds, assuming something is wrong and giving up! "
514  << "Last progress received was " << progress << __E__;
515  errorMessage = ss.str();
516  }
517 
518  if(errorMessage != "")
519  {
520  __SUP_SS__ << "Error was caught in configuring thread: " << errorMessage
521  << __E__;
522  __SUP_COUT_ERR__ << "\n" << ss.str();
523 
524  theStateMachine_.setErrorMessage(ss.str());
525  throw toolbox::fsm::exception::Exception(
526  "Transition Error" /*name*/,
527  ss.str() /* message*/,
528  "CoreSupervisorBase::transitionConfiguring" /*module*/,
529  __LINE__ /*line*/,
530  __FUNCTION__ /*function*/
531  );
532  }
533 
534  if(!thread_progress_bar_.isComplete())
535  {
536  RunControlStateMachine::
537  indicateIterationWork(); // use Iteration to allow other steps to complete in the system
538 
539  if(last_thread_progress_read_ != progress)
540  {
541  last_thread_progress_read_ = progress;
542  last_thread_progress_update_ = time(0);
543  }
544 
545  sleep(1 /*seconds*/);
546  }
547  else
548  {
549  __SUP_COUT_INFO__ << "Complete configuring transition!" << __E__;
550  __SUP_COUTV__(getProcessInfo_());
551  }
552  }
553 
554  return;
555 } // end transitionConfiguring()
556 
557 //==============================================================================
558 void ARTDAQSupervisor::configuringThread()
559 try
560 {
561  std::string uid = theConfigurationManager_
562  ->getNode(ConfigurationManager::XDAQ_APPLICATION_TABLE_NAME +
563  "/" + CorePropertySupervisorBase::getSupervisorUID() +
564  "/" + "LinkToSupervisorTable")
565  .getValueAsString();
566 
567  __COUT__ << "Supervisor uid is " << uid << ", getting supervisor table node" << __E__;
568 
569  const std::string mfSubject_ = supervisorClassNoNamespace_ + "-" + uid;
570 
571  ConfigurationTree theSupervisorNode = getSupervisorTableNode();
572 
573  thread_progress_bar_.step();
574 
575  set_thread_message_("ConfigGen");
576 
577  auto info = ARTDAQTableBase::extractARTDAQInfo(
578  theSupervisorNode,
579  false /*getStatusFalseNodes*/,
580  true /*doWriteFHiCL*/,
581  getSupervisorProperty("max_fragment_size_bytes", 8888),
582  getSupervisorProperty("routing_timeout_ms", 1999),
583  getSupervisorProperty("routing_retry_count", 12),
584  &thread_progress_bar_);
585 
586  // Check lists
587  if(info.processes.count(ARTDAQTableBase::ARTDAQAppType::BoardReader) == 0)
588  {
589  __GEN_SS__ << "There must be at least one enabled BoardReader!" << __E__;
590  __GEN_SS_THROW__;
591  return;
592  }
593  if(info.processes.count(ARTDAQTableBase::ARTDAQAppType::EventBuilder) == 0)
594  {
595  __GEN_SS__ << "There must be at least one enabled EventBuilder!" << __E__;
596  __GEN_SS_THROW__;
597  return;
598  }
599 
600  thread_progress_bar_.step();
601  set_thread_message_("Writing boot.txt");
602 
603  __GEN_COUT__ << "Writing boot.txt" << __E__;
604 
605  int debugLevel = theSupervisorNode.getNode("DAQInterfaceDebugLevel").getValue<int>();
606  std::string setupScript = theSupervisorNode.getNode("DAQSetupScript").getValue();
607 
608  std::ofstream o(ARTDAQTableBase::ARTDAQ_FCL_PATH + "/boot.txt", std::ios::trunc);
609  o << "DAQ setup script: " << setupScript << std::endl;
610  o << "debug level: " << debugLevel << std::endl;
611  o << std::endl;
612 
613  if(info.subsystems.size() > 1)
614  {
615  for(auto& ss : info.subsystems)
616  {
617  if(ss.first == 0)
618  continue;
619  o << "Subsystem id: " << ss.first << std::endl;
620  if(ss.second.destination != 0)
621  {
622  o << "Subsystem destination: " << ss.second.destination << std::endl;
623  }
624  for(auto& sss : ss.second.sources)
625  {
626  o << "Subsystem source: " << sss << std::endl;
627  }
628  if(ss.second.eventMode)
629  {
630  o << "Subsystem fragmentMode: False" << std::endl;
631  }
632  o << std::endl;
633  }
634  }
635 
636  for(auto& builder : info.processes[ARTDAQTableBase::ARTDAQAppType::EventBuilder])
637  {
638  o << "EventBuilder host: " << builder.hostname << std::endl;
639  o << "EventBuilder label: " << builder.label << std::endl;
640  label_to_proc_type_map_[builder.label] = "EventBuilder";
641  if(builder.subsystem != 1)
642  {
643  o << "EventBuilder subsystem: " << builder.subsystem << std::endl;
644  }
645  if(builder.allowed_processors != "")
646  {
647  o << "EventBuilder allowed_processors" << builder.allowed_processors
648  << std::endl;
649  }
650  o << std::endl;
651  }
652  for(auto& logger : info.processes[ARTDAQTableBase::ARTDAQAppType::DataLogger])
653  {
654  o << "DataLogger host: " << logger.hostname << std::endl;
655  o << "DataLogger label: " << logger.label << std::endl;
656  label_to_proc_type_map_[logger.label] = "DataLogger";
657  if(logger.subsystem != 1)
658  {
659  o << "DataLogger subsystem: " << logger.subsystem << std::endl;
660  }
661  if(logger.allowed_processors != "")
662  {
663  o << "DataLogger allowed_processors" << logger.allowed_processors
664  << std::endl;
665  }
666  o << std::endl;
667  }
668  for(auto& dispatcher : info.processes[ARTDAQTableBase::ARTDAQAppType::Dispatcher])
669  {
670  o << "Dispatcher host: " << dispatcher.hostname << std::endl;
671  o << "Dispatcher label: " << dispatcher.label << std::endl;
672  o << "Dispatcher port: " << dispatcher.port << std::endl;
673  label_to_proc_type_map_[dispatcher.label] = "Dispatcher";
674  if(dispatcher.subsystem != 1)
675  {
676  o << "Dispatcher subsystem: " << dispatcher.subsystem << std::endl;
677  }
678  if(dispatcher.allowed_processors != "")
679  {
680  o << "Dispatcher allowed_processors" << dispatcher.allowed_processors
681  << std::endl;
682  }
683  o << std::endl;
684  }
685  for(auto& rmanager : info.processes[ARTDAQTableBase::ARTDAQAppType::RoutingManager])
686  {
687  o << "RoutingManager host: " << rmanager.hostname << std::endl;
688  o << "RoutingManager label: " << rmanager.label << std::endl;
689  label_to_proc_type_map_[rmanager.label] = "RoutingManager";
690  if(rmanager.subsystem != 1)
691  {
692  o << "RoutingManager subsystem: " << rmanager.subsystem << std::endl;
693  }
694  if(rmanager.allowed_processors != "")
695  {
696  o << "RoutingManager allowed_processors" << rmanager.allowed_processors
697  << std::endl;
698  }
699  o << std::endl;
700  }
701  o.close();
702 
703  thread_progress_bar_.step();
704  set_thread_message_("Writing Fhicl Files");
705 
706  __GEN_COUT__ << "Building configuration directory" << __E__;
707 
708  boost::system::error_code ignored;
709  boost::filesystem::remove_all(ARTDAQTableBase::ARTDAQ_FCL_PATH + FAKE_CONFIG_NAME,
710  ignored);
711  mkdir((ARTDAQTableBase::ARTDAQ_FCL_PATH + FAKE_CONFIG_NAME).c_str(), 0755);
712 
713  for(auto& reader : info.processes[ARTDAQTableBase::ARTDAQAppType::BoardReader])
714  {
715  symlink(ARTDAQTableBase::getFlatFHICLFilename(
716  ARTDAQTableBase::ARTDAQAppType::BoardReader, reader.label)
717  .c_str(),
718  (ARTDAQTableBase::ARTDAQ_FCL_PATH + FAKE_CONFIG_NAME + "/" +
719  reader.label + ".fcl")
720  .c_str());
721  }
722  for(auto& builder : info.processes[ARTDAQTableBase::ARTDAQAppType::EventBuilder])
723  {
724  symlink(ARTDAQTableBase::getFlatFHICLFilename(
725  ARTDAQTableBase::ARTDAQAppType::EventBuilder, builder.label)
726  .c_str(),
727  (ARTDAQTableBase::ARTDAQ_FCL_PATH + FAKE_CONFIG_NAME + "/" +
728  builder.label + ".fcl")
729  .c_str());
730  }
731  for(auto& logger : info.processes[ARTDAQTableBase::ARTDAQAppType::DataLogger])
732  {
733  symlink(ARTDAQTableBase::getFlatFHICLFilename(
734  ARTDAQTableBase::ARTDAQAppType::DataLogger, logger.label)
735  .c_str(),
736  (ARTDAQTableBase::ARTDAQ_FCL_PATH + FAKE_CONFIG_NAME + "/" +
737  logger.label + ".fcl")
738  .c_str());
739  }
740  for(auto& dispatcher : info.processes[ARTDAQTableBase::ARTDAQAppType::Dispatcher])
741  {
742  symlink(ARTDAQTableBase::getFlatFHICLFilename(
743  ARTDAQTableBase::ARTDAQAppType::Dispatcher, dispatcher.label)
744  .c_str(),
745  (ARTDAQTableBase::ARTDAQ_FCL_PATH + FAKE_CONFIG_NAME + "/" +
746  dispatcher.label + ".fcl")
747  .c_str());
748  }
749  for(auto& rmanager : info.processes[ARTDAQTableBase::ARTDAQAppType::RoutingManager])
750  {
751  symlink(ARTDAQTableBase::getFlatFHICLFilename(
752  ARTDAQTableBase::ARTDAQAppType::RoutingManager, rmanager.label)
753  .c_str(),
754  (ARTDAQTableBase::ARTDAQ_FCL_PATH + FAKE_CONFIG_NAME + "/" +
755  rmanager.label + ".fcl")
756  .c_str());
757  }
758 
759  thread_progress_bar_.step();
760 
761  std::lock_guard<std::recursive_mutex> lk(daqinterface_mutex_);
762  getDAQState_();
763  if(daqinterface_state_ != "stopped" && daqinterface_state_ != "")
764  {
765  __GEN_SS__ << "Cannot configure DAQInterface because it is in the wrong state"
766  << " (" << daqinterface_state_ << " != stopped)!" << __E__;
767  __GEN_SS_THROW__
768  }
769 
770  set_thread_message_("Calling setdaqcomps");
771  __GEN_COUT__ << "Calling setdaqcomps" << __E__;
772  __GEN_COUT__ << "Status before setdaqcomps: " << daqinterface_state_ << __E__;
773  PyObject* pName1 = PyUnicode_FromString("setdaqcomps");
774 
775  PyObject* readerDict = PyDict_New();
776  for(auto& reader : info.processes[ARTDAQTableBase::ARTDAQAppType::BoardReader])
777  {
778  label_to_proc_type_map_[reader.label] = "BoardReader";
779  PyObject* readerName = PyUnicode_FromString(reader.label.c_str());
780 
781  int list_size = reader.allowed_processors != "" ? 4 : 3;
782 
783  PyObject* readerData = PyList_New(list_size);
784  PyObject* readerHost = PyUnicode_FromString(reader.hostname.c_str());
785  PyObject* readerPort = PyUnicode_FromString("-1");
786  PyObject* readerSubsystem =
787  PyUnicode_FromString(std::to_string(reader.subsystem).c_str());
788  PyList_SetItem(readerData, 0, readerHost);
789  PyList_SetItem(readerData, 1, readerPort);
790  PyList_SetItem(readerData, 2, readerSubsystem);
791  if(reader.allowed_processors != "")
792  {
793  PyObject* readerAllowedProcessors =
794  PyUnicode_FromString(reader.allowed_processors.c_str());
795  PyList_SetItem(readerData, 3, readerAllowedProcessors);
796  }
797  PyDict_SetItem(readerDict, readerName, readerData);
798  }
799  PyObject* res1 =
800  PyObject_CallMethodObjArgs(daqinterface_ptr_, pName1, readerDict, NULL);
801  Py_DECREF(readerDict);
802 
803  if(res1 == NULL)
804  {
805  PyErr_Print();
806  __GEN_SS__ << "Error calling setdaqcomps transition" << __E__;
807  __GEN_SS_THROW__;
808  }
809  getDAQState_();
810  __GEN_COUT__ << "Status after setdaqcomps: " << daqinterface_state_ << __E__;
811 
812  thread_progress_bar_.step();
813  set_thread_message_("Calling do_boot");
814  __GEN_COUT__ << "Calling do_boot" << __E__;
815  __GEN_COUT__ << "Status before boot: " << daqinterface_state_ << __E__;
816  PyObject* pName2 = PyUnicode_FromString("do_boot");
817  PyObject* pStateArgs1 =
818  PyUnicode_FromString((ARTDAQTableBase::ARTDAQ_FCL_PATH + "/boot.txt").c_str());
819  PyObject* res2 =
820  PyObject_CallMethodObjArgs(daqinterface_ptr_, pName2, pStateArgs1, NULL);
821 
822  if(res2 == NULL)
823  {
824  PyErr_Print();
825  __GEN_COUT__ << "Error on first boost attempt, recovering and retrying" << __E__;
826 
827  PyObject* pName = PyUnicode_FromString("do_recover");
828  PyObject* res = PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, NULL);
829 
830  if(res == NULL)
831  {
832  PyErr_Print();
833  __GEN_SS__ << "Error calling recover transition!!!!" << __E__;
834  __GEN_SS_THROW__;
835  }
836 
837  thread_progress_bar_.step();
838  set_thread_message_("Calling do_boot (retry)");
839  __GEN_COUT__ << "Calling do_boot again" << __E__;
840  __GEN_COUT__ << "Status before boot: " << daqinterface_state_ << __E__;
841  PyObject* res3 =
842  PyObject_CallMethodObjArgs(daqinterface_ptr_, pName2, pStateArgs1, NULL);
843 
844  if(res3 == NULL)
845  {
846  PyErr_Print();
847  __GEN_SS__ << "Error calling boot transition (2nd try)" << __E__;
848  __GEN_SS_THROW__;
849  }
850  }
851 
852  getDAQState_();
853  if(daqinterface_state_ != "booted")
854  {
855  __GEN_SS__ << "DAQInterface boot transition failed! "
856  << "Status after boot attempt: " << daqinterface_state_ << __E__;
857  __GEN_SS_THROW__;
858  }
859  __GEN_COUT__ << "Status after boot: " << daqinterface_state_ << __E__;
860 
861  thread_progress_bar_.step();
862  set_thread_message_("Calling do_config");
863  __GEN_COUT__ << "Calling do_config" << __E__;
864  __GEN_COUT__ << "Status before config: " << daqinterface_state_ << __E__;
865  PyObject* pName3 = PyUnicode_FromString("do_config");
866  PyObject* pStateArgs2 = Py_BuildValue("[s]", FAKE_CONFIG_NAME);
867  PyObject* res3 =
868  PyObject_CallMethodObjArgs(daqinterface_ptr_, pName3, pStateArgs2, NULL);
869 
870  if(res3 == NULL)
871  {
872  PyErr_Print();
873  __GEN_SS__ << "Error calling config transition" << __E__;
874  __GEN_SS_THROW__;
875  }
876  getDAQState_();
877  if(daqinterface_state_ != "ready")
878  {
879  __GEN_SS__ << "DAQInterface config transition failed!" << __E__
880  << "Supervisor state: \"" << daqinterface_state_ << "\" != \"ready\" "
881  << __E__;
882  __GEN_SS_THROW__;
883  }
884  __GEN_COUT__ << "Status after config: " << daqinterface_state_ << __E__;
885  thread_progress_bar_.complete();
886  set_thread_message_("Configured");
887  __GEN_COUT__ << "Configured." << __E__;
888 
889 } // end configuringThread()
890 catch(const std::runtime_error& e)
891 {
892  set_thread_message_("ERROR");
893  __SS__ << "Error was caught while configuring: " << e.what() << __E__;
894  __COUT_ERR__ << "\n" << ss.str();
895  std::lock_guard<std::mutex> lock(thread_mutex_); // lock out for remainder of scope
896  thread_error_message_ = ss.str();
897 }
898 catch(...)
899 {
900  set_thread_message_("ERROR");
901  __SS__ << "Unknown error was caught while configuring. Please checked the logs."
902  << __E__;
903  __COUT_ERR__ << "\n" << ss.str();
904 
905  artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
906 
907  std::lock_guard<std::mutex> lock(thread_mutex_); // lock out for remainder of scope
908  thread_error_message_ = ss.str();
909 } // end configuringThread() error handling
910 
911 //==============================================================================
912 void ARTDAQSupervisor::transitionHalting(toolbox::Event::Reference /*event*/)
913 try
914 {
915  set_thread_message_("Halting");
916  __SUP_COUT__ << "Halting..." << __E__;
917  std::lock_guard<std::recursive_mutex> lk(daqinterface_mutex_);
918  getDAQState_();
919  __SUP_COUT__ << "Status before halt: " << daqinterface_state_ << __E__;
920 
921  if(daqinterface_state_ == "running")
922  {
923  // First stop before halting
924  PyObject* pName = PyUnicode_FromString("do_stop_running");
925  PyObject* res = PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, NULL);
926 
927  if(res == NULL)
928  {
929  PyErr_Print();
930  __SS__ << "Error calling DAQ Interface stop transition." << __E__;
931  __SUP_SS_THROW__;
932  }
933  }
934 
935  PyObject* pName = PyUnicode_FromString("do_command");
936  PyObject* pArg = PyUnicode_FromString("Shutdown");
937  PyObject* res = PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, pArg, NULL);
938 
939  if(res == NULL)
940  {
941  PyErr_Print();
942  __SS__ << "Error calling DAQ Interface halt transition." << __E__;
943  __SUP_SS_THROW__;
944  }
945 
946  getDAQState_();
947  __SUP_COUT__ << "Status after halt: " << daqinterface_state_ << __E__;
948  __SUP_COUT__ << "Halted." << __E__;
949  set_thread_message_("Halted");
950 } // end transitionHalting()
951 catch(const std::runtime_error& e)
952 {
953  const std::string transitionName = "Halting";
954  // if halting from Failed state, then ignore errors
955  if(theStateMachine_.getProvenanceStateName() ==
956  RunControlStateMachine::FAILED_STATE_NAME ||
957  theStateMachine_.getProvenanceStateName() ==
958  RunControlStateMachine::HALTED_STATE_NAME)
959  {
960  __SUP_COUT_INFO__ << "Error was caught while halting (but ignoring because "
961  "previous state was '"
962  << RunControlStateMachine::FAILED_STATE_NAME
963  << "'): " << e.what() << __E__;
964  }
965  else // if not previously in Failed state, then fail
966  {
967  __SUP_SS__ << "Error was caught while " << transitionName << ": " << e.what()
968  << __E__;
969  __SUP_COUT_ERR__ << "\n" << ss.str();
970  theStateMachine_.setErrorMessage(ss.str());
971  throw toolbox::fsm::exception::Exception(
972  "Transition Error" /*name*/,
973  ss.str() /* message*/,
974  "ARTDAQSupervisorBase::transition" + transitionName /*module*/,
975  __LINE__ /*line*/,
976  __FUNCTION__ /*function*/
977  );
978  }
979 } // end transitionHalting() std::runtime_error exception handling
980 catch(...)
981 {
982  const std::string transitionName = "Halting";
983  // if halting from Failed state, then ignore errors
984  if(theStateMachine_.getProvenanceStateName() ==
985  RunControlStateMachine::FAILED_STATE_NAME ||
986  theStateMachine_.getProvenanceStateName() ==
987  RunControlStateMachine::HALTED_STATE_NAME)
988  {
989  __SUP_COUT_INFO__ << "Unknown error was caught while halting (but ignoring "
990  "because previous state was '"
991  << RunControlStateMachine::FAILED_STATE_NAME << "')." << __E__;
992  }
993  else // if not previously in Failed state, then fail
994  {
995  __SUP_SS__ << "Unknown error was caught while " << transitionName
996  << ". Please checked the logs." << __E__;
997  __SUP_COUT_ERR__ << "\n" << ss.str();
998  theStateMachine_.setErrorMessage(ss.str());
999 
1000  artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1001 
1002  throw toolbox::fsm::exception::Exception(
1003  "Transition Error" /*name*/,
1004  ss.str() /* message*/,
1005  "ARTDAQSupervisorBase::transition" + transitionName /*module*/,
1006  __LINE__ /*line*/,
1007  __FUNCTION__ /*function*/
1008  );
1009  }
1010 } // end transitionHalting() exception handling
1011 
1012 //==============================================================================
1013 void ARTDAQSupervisor::transitionInitializing(toolbox::Event::Reference /*event*/)
1014 try
1015 {
1016  set_thread_message_("Initializing");
1017  __SUP_COUT__ << "Initializing..." << __E__;
1018  init();
1019  __SUP_COUT__ << "Initialized." << __E__;
1020  set_thread_message_("Initialized");
1021 } // end transitionInitializing()
1022 catch(const std::runtime_error& e)
1023 {
1024  __SS__ << "Error was caught while Initializing: " << e.what() << __E__;
1025  __SS_THROW__;
1026 }
1027 catch(...)
1028 {
1029  __SS__ << "Unknown error was caught while Initializing. Please checked the logs."
1030  << __E__;
1031  artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1032  __SS_THROW__;
1033 } // end transitionInitializing() error handling
1034 
1035 //==============================================================================
1036 void ARTDAQSupervisor::transitionPausing(toolbox::Event::Reference /*event*/)
1037 try
1038 {
1039  set_thread_message_("Pausing");
1040  __SUP_COUT__ << "Pausing..." << __E__;
1041  std::lock_guard<std::recursive_mutex> lk(daqinterface_mutex_);
1042 
1043  getDAQState_();
1044  __SUP_COUT__ << "Status before pause: " << daqinterface_state_ << __E__;
1045 
1046  PyObject* pName = PyUnicode_FromString("do_command");
1047  PyObject* pArg = PyUnicode_FromString("Pause");
1048  PyObject* res = PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, pArg, NULL);
1049 
1050  if(res == NULL)
1051  {
1052  PyErr_Print();
1053  __SS__ << "Error calling DAQ Interface Pause transition." << __E__;
1054  __SUP_SS_THROW__;
1055  }
1056 
1057  getDAQState_();
1058  __SUP_COUT__ << "Status after pause: " << daqinterface_state_ << __E__;
1059 
1060  __SUP_COUT__ << "Paused." << __E__;
1061  set_thread_message_("Paused");
1062 } // end transitionPausing()
1063 catch(const std::runtime_error& e)
1064 {
1065  __SS__ << "Error was caught while Pausing: " << e.what() << __E__;
1066  __SS_THROW__;
1067 }
1068 catch(...)
1069 {
1070  __SS__ << "Unknown error was caught while Pausing. Please checked the logs." << __E__;
1071  artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1072  __SS_THROW__;
1073 } // end transitionPausing() error handling
1074 
1075 //==============================================================================
1076 void ARTDAQSupervisor::transitionResuming(toolbox::Event::Reference /*event*/)
1077 try
1078 {
1079  set_thread_message_("Resuming");
1080  __SUP_COUT__ << "Resuming..." << __E__;
1081  std::lock_guard<std::recursive_mutex> lk(daqinterface_mutex_);
1082 
1083  getDAQState_();
1084  __SUP_COUT__ << "Status before resume: " << daqinterface_state_ << __E__;
1085  PyObject* pName = PyUnicode_FromString("do_command");
1086  PyObject* pArg = PyUnicode_FromString("Resume");
1087  PyObject* res = PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, pArg, NULL);
1088 
1089  if(res == NULL)
1090  {
1091  PyErr_Print();
1092  __SS__ << "Error calling DAQ Interface Resume transition." << __E__;
1093  __SUP_SS_THROW__;
1094  }
1095  getDAQState_();
1096  __SUP_COUT__ << "Status after resume: " << daqinterface_state_ << __E__;
1097  __SUP_COUT__ << "Resumed." << __E__;
1098  set_thread_message_("Resumed");
1099 } // end transitionResuming()
1100 catch(const std::runtime_error& e)
1101 {
1102  __SS__ << "Error was caught while Resuming: " << e.what() << __E__;
1103  __SS_THROW__;
1104 }
1105 catch(...)
1106 {
1107  __SS__ << "Unknown error was caught while Resuming. Please checked the logs."
1108  << __E__;
1109  artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1110  __SS_THROW__;
1111 } // end transitionResuming() error handling
1112 
1113 //==============================================================================
1114 void ARTDAQSupervisor::transitionStarting(toolbox::Event::Reference /*event*/)
1115 try
1116 {
1117  __SUP_COUT__ << "transitionStarting" << __E__;
1118 
1119  // first time launch thread because artdaq Supervisor may take a while
1120  if(RunControlStateMachine::getIterationIndex() == 0 &&
1121  RunControlStateMachine::getSubIterationIndex() == 0)
1122  {
1123  thread_error_message_ = "";
1124  thread_progress_bar_.resetProgressBar(0);
1125  last_thread_progress_update_ = time(0); // initialize timeout timer
1126 
1127  // start configuring thread
1128  std::thread(&ARTDAQSupervisor::startingThread, this).detach();
1129 
1130  __SUP_COUT__ << "Starting thread started." << __E__;
1131 
1132  RunControlStateMachine::
1133  indicateIterationWork(); // use Iteration to allow other steps to complete in the system
1134  }
1135  else // not first time
1136  {
1137  std::string errorMessage;
1138  {
1139  std::lock_guard<std::mutex> lock(
1140  thread_mutex_); // lock out for remainder of scope
1141  errorMessage = thread_error_message_; // theStateMachine_.getErrorMessage();
1142  }
1143  int progress = thread_progress_bar_.read();
1144  __SUP_COUTV__(errorMessage);
1145  __SUP_COUTV__(progress);
1146  __SUP_COUTV__(thread_progress_bar_.isComplete());
1147 
1148  // check for done and error messages
1149  if(errorMessage == "" && // if no update in 600 seconds, give up
1150  time(0) - last_thread_progress_update_ > 600)
1151  {
1152  __SUP_SS__ << "There has been no update from the start thread for "
1153  << (time(0) - last_thread_progress_update_)
1154  << " seconds, assuming something is wrong and giving up! "
1155  << "Last progress received was " << progress << __E__;
1156  errorMessage = ss.str();
1157  }
1158 
1159  if(errorMessage != "")
1160  {
1161  __SUP_SS__ << "Error was caught in starting thread: " << errorMessage
1162  << __E__;
1163  __SUP_COUT_ERR__ << "\n" << ss.str();
1164 
1165  theStateMachine_.setErrorMessage(ss.str());
1166  throw toolbox::fsm::exception::Exception(
1167  "Transition Error" /*name*/,
1168  ss.str() /* message*/,
1169  "CoreSupervisorBase::transitionStarting" /*module*/,
1170  __LINE__ /*line*/,
1171  __FUNCTION__ /*function*/
1172  );
1173  }
1174 
1175  if(!thread_progress_bar_.isComplete())
1176  {
1177  RunControlStateMachine::
1178  indicateIterationWork(); // use Iteration to allow other steps to complete in the system
1179 
1180  if(last_thread_progress_read_ != progress)
1181  {
1182  last_thread_progress_read_ = progress;
1183  last_thread_progress_update_ = time(0);
1184  }
1185 
1186  sleep(1 /*seconds*/);
1187  }
1188  else
1189  {
1190  __SUP_COUT_INFO__ << "Complete starting transition!" << __E__;
1191  __SUP_COUTV__(getProcessInfo_());
1192  }
1193  }
1194 
1195  return;
1196 
1197 } // end transitionStarting()
1198 catch(const std::runtime_error& e)
1199 {
1200  __SS__ << "Error was caught while Starting: " << e.what() << __E__;
1201  __SS_THROW__;
1202 }
1203 catch(...)
1204 {
1205  __SS__ << "Unknown error was caught while Starting. Please checked the logs."
1206  << __E__;
1207  artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1208  __SS_THROW__;
1209 } // end transitionStarting() error handling
1210 
1211 //==============================================================================
1212 void ARTDAQSupervisor::startingThread()
1213 try
1214 {
1215  std::string uid = theConfigurationManager_
1216  ->getNode(ConfigurationManager::XDAQ_APPLICATION_TABLE_NAME +
1217  "/" + CorePropertySupervisorBase::getSupervisorUID() +
1218  "/" + "LinkToSupervisorTable")
1219  .getValueAsString();
1220 
1221  __COUT__ << "Supervisor uid is " << uid << ", getting supervisor table node" << __E__;
1222  const std::string mfSubject_ = supervisorClassNoNamespace_ + "-" + uid;
1223  __GEN_COUT__ << "Starting..." << __E__;
1224  set_thread_message_("Starting");
1225 
1226  thread_progress_bar_.step();
1227  stop_runner_();
1228  {
1229  std::lock_guard<std::recursive_mutex> lk(daqinterface_mutex_);
1230  getDAQState_();
1231  __GEN_COUT__ << "Status before start: " << daqinterface_state_ << __E__;
1232  auto runNumber = SOAPUtilities::translate(theStateMachine_.getCurrentMessage())
1233  .getParameters()
1234  .getValue("RunNumber");
1235 
1236  thread_progress_bar_.step();
1237 
1238  PyObject* pName = PyUnicode_FromString("do_start_running");
1239  int run_number = std::stoi(runNumber);
1240  PyObject* pStateArgs = PyLong_FromLong(run_number);
1241  PyObject* res =
1242  PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, pStateArgs, NULL);
1243 
1244  thread_progress_bar_.step();
1245 
1246  if(res == NULL)
1247  {
1248  PyErr_Print();
1249  __SS__ << "Error calling start transition" << __E__;
1250  __GEN_SS_THROW__;
1251  }
1252  getDAQState_();
1253 
1254  thread_progress_bar_.step();
1255 
1256  __GEN_COUT__ << "Status after start: " << daqinterface_state_ << __E__;
1257  if(daqinterface_state_ != "running")
1258  {
1259  __SS__ << "DAQInterface start transition failed!" << __E__;
1260  __GEN_SS_THROW__;
1261  }
1262 
1263  thread_progress_bar_.step();
1264  }
1265  start_runner_();
1266  set_thread_message_("Started");
1267  thread_progress_bar_.step();
1268 
1269  __GEN_COUT__ << "Started." << __E__;
1270  thread_progress_bar_.complete();
1271 
1272 } // end startingThread()
1273 catch(const std::runtime_error& e)
1274 {
1275  __SS__ << "Error was caught while Starting: " << e.what() << __E__;
1276  __COUT_ERR__ << "\n" << ss.str();
1277  std::lock_guard<std::mutex> lock(thread_mutex_); // lock out for remainder of scope
1278  thread_error_message_ = ss.str();
1279 }
1280 catch(...)
1281 {
1282  __SS__ << "Unknown error was caught while Starting. Please checked the logs."
1283  << __E__;
1284  __COUT_ERR__ << "\n" << ss.str();
1285 
1286  artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1287 
1288  std::lock_guard<std::mutex> lock(thread_mutex_); // lock out for remainder of scope
1289  thread_error_message_ = ss.str();
1290 } // end startingThread() error handling
1291 
1292 //==============================================================================
1293 void ARTDAQSupervisor::transitionStopping(toolbox::Event::Reference /*event*/)
1294 try
1295 {
1296  __SUP_COUT__ << "Stopping..." << __E__;
1297  set_thread_message_("Stopping");
1298  std::lock_guard<std::recursive_mutex> lk(daqinterface_mutex_);
1299  getDAQState_();
1300  __SUP_COUT__ << "Status before stop: " << daqinterface_state_ << __E__;
1301  PyObject* pName = PyUnicode_FromString("do_stop_running");
1302  PyObject* res = PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, NULL);
1303 
1304  if(res == NULL)
1305  {
1306  PyErr_Print();
1307  __SS__ << "Error calling DAQ Interface stop transition." << __E__;
1308  __SUP_SS_THROW__;
1309  }
1310  getDAQState_();
1311  __SUP_COUT__ << "Status after stop: " << daqinterface_state_ << __E__;
1312  __SUP_COUT__ << "Stopped." << __E__;
1313  set_thread_message_("Stopped");
1314 } // end transitionStopping()
1315 catch(const std::runtime_error& e)
1316 {
1317  __SS__ << "Error was caught while Stopping: " << e.what() << __E__;
1318  __SS_THROW__;
1319 }
1320 catch(...)
1321 {
1322  __SS__ << "Unknown error was caught while Stopping. Please checked the logs."
1323  << __E__;
1324  artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
1325  __SS_THROW__;
1326 } // end transitionStopping() error handling
1327 
1328 //==============================================================================
1329 void ots::ARTDAQSupervisor::enteringError(toolbox::Event::Reference /*event*/)
1330 {
1331  __SUP_COUT__ << "Entering error recovery state" << __E__;
1332  std::lock_guard<std::recursive_mutex> lk(daqinterface_mutex_);
1333  getDAQState_();
1334  __SUP_COUT__ << "Status before error: " << daqinterface_state_ << __E__;
1335 
1336  PyObject* pName = PyUnicode_FromString("do_recover");
1337  PyObject* res = PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, NULL);
1338 
1339  if(res == NULL)
1340  {
1341  PyErr_Print();
1342  __SS__ << "Error calling DAQ Interface recover transition." << __E__;
1343  __SUP_SS_THROW__;
1344  }
1345  getDAQState_();
1346  __SUP_COUT__ << "Status after error: " << daqinterface_state_ << __E__;
1347  __SUP_COUT__ << "EnteringError DONE." << __E__;
1348 
1349 } // end enteringError()
1350 
1351 std::vector<SupervisorInfo::SubappInfo> ots::ARTDAQSupervisor::getSubappInfo(void)
1352 {
1353  auto apps = getAndParseProcessInfo_();
1354  std::vector<SupervisorInfo::SubappInfo> output;
1355  for(auto& app : apps)
1356  {
1358 
1359  info.name = app.label;
1360  info.detail = "Rank " + std::to_string(app.rank) + ", subsystem " +
1361  std::to_string(app.subsystem);
1362  info.lastStatusTime = time(0);
1363  info.progress = 100;
1364  info.status = artdaqStateToOtsState(app.state);
1365  info.url = "http://" + app.host + ":" + std::to_string(app.port) + "/RPC2";
1366  info.class_name = "ARTDAQ " + labelToProcType_(app.label);
1367 
1368  output.push_back(info);
1369  }
1370  return output;
1371 }
1372 
1373 //==============================================================================
1374 void ots::ARTDAQSupervisor::getDAQState_()
1375 {
1376  //__SUP_COUT__ << "Getting DAQInterface state" << __E__;
1377  std::lock_guard<std::recursive_mutex> lk(daqinterface_mutex_);
1378 
1379  if(daqinterface_ptr_ == nullptr)
1380  {
1381  daqinterface_state_ = "";
1382  return;
1383  }
1384 
1385  PyObject* pName = PyUnicode_FromString("state");
1386  PyObject* pArg = PyUnicode_FromString("DAQInterface");
1387  PyObject* res = PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, pArg, NULL);
1388 
1389  if(res == NULL)
1390  {
1391  PyErr_Print();
1392  __SS__ << "Error calling state function" << __E__;
1393  __SUP_SS_THROW__;
1394  return;
1395  }
1396  daqinterface_state_ = std::string(PyUnicode_AsUTF8(res));
1397  //__SUP_COUT__ << "getDAQState_ DONE: state=" << result << __E__;
1398 } // end getDAQState_()
1399 
1400 //==============================================================================
1401 std::string ots::ARTDAQSupervisor::getProcessInfo_(void)
1402 {
1403  //__SUP_COUT__ << "Getting DAQInterface state" << __E__;
1404  std::lock_guard<std::recursive_mutex> lk(daqinterface_mutex_);
1405 
1406  if(daqinterface_ptr_ == nullptr)
1407  {
1408  return "";
1409  }
1410 
1411  PyObject* pName = PyUnicode_FromString("artdaq_process_info");
1412  PyObject* pArg = PyUnicode_FromString("DAQInterface");
1413  PyObject* pArg2 = PyBool_FromLong(true);
1414  PyObject* res =
1415  PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, pArg, pArg2, NULL);
1416 
1417  if(res == NULL)
1418  {
1419  PyErr_Print();
1420  __SS__ << "Error calling artdaq_process_info function" << __E__;
1421  __SUP_SS_THROW__;
1422  return "";
1423  }
1424  return std::string(PyUnicode_AsUTF8(res));
1425  //__SUP_COUT__ << "getDAQState_ DONE: state=" << result << __E__;
1426 } // end getProcessInfo_()
1427 
1428 std::string ots::ARTDAQSupervisor::artdaqStateToOtsState(std::string state)
1429 {
1430  if(state == "nonexistant")
1431  return RunControlStateMachine::INITIAL_STATE_NAME;
1432  if(state == "Ready")
1433  return "Configured";
1434  if(state == "Running")
1435  return RunControlStateMachine::RUNNING_STATE_NAME;
1436  if(state == "Paused")
1437  return RunControlStateMachine::PAUSED_STATE_NAME;
1438  if(state == "Stopped")
1439  return RunControlStateMachine::HALTED_STATE_NAME;
1440 
1441  TLOG(TLVL_WARNING) << "Unrecognized state name " << state;
1442  return RunControlStateMachine::FAILED_STATE_NAME;
1443 }
1444 
1445 std::string ots::ARTDAQSupervisor::labelToProcType_(std::string label)
1446 {
1447  if(label_to_proc_type_map_.count(label))
1448  {
1449  return label_to_proc_type_map_[label];
1450  }
1451  return "UNKNOWN";
1452 }
1453 
1454 //==============================================================================
1455 std::list<ots::ARTDAQSupervisor::DAQInterfaceProcessInfo>
1456 ots::ARTDAQSupervisor::getAndParseProcessInfo_()
1457 {
1458  std::list<ots::ARTDAQSupervisor::DAQInterfaceProcessInfo> output;
1459  auto info = getProcessInfo_();
1460  auto procs = tokenize_(info);
1461 
1462  // 0: Whole string
1463  // 1: Process Label
1464  // 2: Process host
1465  // 3: Process port
1466  // 4: Process subsystem
1467  // 5: Process Rank
1468  // 6: Process state
1469  std::regex re("(.*?) at ([^:]*):(\\d+) \\(subsystem (\\d+), rank (\\d+)\\): (.*)");
1470 
1471  for(auto& proc : procs)
1472  {
1473  std::smatch match;
1474  if(std::regex_match(proc, match, re))
1475  {
1476  DAQInterfaceProcessInfo info;
1477 
1478  info.label = match[1];
1479  info.host = match[2];
1480  info.port = std::stoi(match[3]);
1481  info.subsystem = std::stoi(match[4]);
1482  info.rank = std::stoi(match[5]);
1483  info.state = match[6];
1484 
1485  output.push_back(info);
1486  }
1487  }
1488  return output;
1489 } // end getAndParseProcessInfo_()
1490 
1491 //==============================================================================
1493  std::unique_ptr<artdaq::CommanderInterface>>>
1494 ots::ARTDAQSupervisor::makeCommandersFromProcessInfo()
1495 {
1496  std::list<
1497  std::pair<DAQInterfaceProcessInfo, std::unique_ptr<artdaq::CommanderInterface>>>
1498  output;
1499  auto infos = getAndParseProcessInfo_();
1500 
1501  for(auto& info : infos)
1502  {
1503  artdaq::Commandable cm;
1504  fhicl::ParameterSet ps;
1505 
1506  ps.put<std::string>("commanderPluginType", "xmlrpc");
1507  ps.put<int>("id", info.port);
1508  ps.put<std::string>("server_url", info.host);
1509 
1510  output.emplace_back(std::make_pair<DAQInterfaceProcessInfo,
1511  std::unique_ptr<artdaq::CommanderInterface>>(
1512  std::move(info), artdaq::MakeCommanderPlugin(ps, cm)));
1513  }
1514 
1515  return output;
1516 } // end makeCommandersFromProcessInfo()
1517 
1518 //==============================================================================
1519 std::list<std::string> ots::ARTDAQSupervisor::tokenize_(std::string const& input)
1520 {
1521  size_t pos = 0;
1522  std::list<std::string> output;
1523 
1524  while(pos != std::string::npos && pos < input.size())
1525  {
1526  auto newpos = input.find('\n', pos);
1527  if(newpos != std::string::npos)
1528  {
1529  output.emplace_back(input, pos, newpos - pos);
1530  // TLOG(TLVL_TRACE) << "tokenize_: " << output.back();
1531  pos = newpos + 1;
1532  }
1533  else
1534  {
1535  output.emplace_back(input, pos);
1536  // TLOG(TLVL_TRACE) << "tokenize_: " << output.back();
1537  pos = newpos;
1538  }
1539  }
1540  return output;
1541 } // end tokenize_()
1542 
1543 //==============================================================================
1544 void ots::ARTDAQSupervisor::daqinterfaceRunner_()
1545 {
1546  TLOG(TLVL_TRACE) << "Runner thread starting";
1547  runner_running_ = true;
1548  while(runner_running_)
1549  {
1550  if(daqinterface_ptr_ != NULL)
1551  {
1552  std::unique_lock<std::recursive_mutex> lk(daqinterface_mutex_);
1553  getDAQState_();
1554  std::string state_before = daqinterface_state_;
1555 
1556  if(daqinterface_state_ == "running" || daqinterface_state_ == "ready" ||
1557  daqinterface_state_ == "booted")
1558  {
1559  try
1560  {
1561  TLOG(TLVL_TRACE) << "Calling DAQInterface::check_proc_heartbeats";
1562  PyObject* pName = PyUnicode_FromString("check_proc_heartbeats");
1563  PyObject* res =
1564  PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, NULL);
1565  TLOG(TLVL_TRACE)
1566  << "Done with DAQInterface::check_proc_heartbeats call";
1567 
1568  if(res == NULL)
1569  {
1570  runner_running_ = false;
1571  PyErr_Print();
1572  __SS__ << "Error calling check_proc_heartbeats function" << __E__;
1573  __SUP_SS_THROW__;
1574  break;
1575  }
1576  }
1577  catch(cet::exception& ex)
1578  {
1579  runner_running_ = false;
1580  PyErr_Print();
1581  __SS__ << "An cet::exception occurred while calling "
1582  "check_proc_heartbeats function: "
1583  << ex.explain_self() << __E__;
1584  __SUP_SS_THROW__;
1585  break;
1586  }
1587  catch(std::exception& ex)
1588  {
1589  runner_running_ = false;
1590  PyErr_Print();
1591  __SS__ << "An std::exception occurred while calling "
1592  "check_proc_heartbeats function: "
1593  << ex.what() << __E__;
1594  __SUP_SS_THROW__;
1595  break;
1596  }
1597  catch(...)
1598  {
1599  runner_running_ = false;
1600  PyErr_Print();
1601  __SS__ << "An unknown Error occurred while calling runner function"
1602  << __E__;
1603  __SUP_SS_THROW__;
1604  break;
1605  }
1606 
1607  lk.unlock();
1608  getDAQState_();
1609  if(daqinterface_state_ != state_before)
1610  {
1611  runner_running_ = false;
1612  lk.unlock();
1613  __SS__ << "DAQInterface state unexpectedly changed from "
1614  << state_before << " to " << daqinterface_state_
1615  << ". Check supervisor log file for more info!" << __E__;
1616  __SUP_SS_THROW__;
1617  break;
1618  }
1619  }
1620  }
1621  else
1622  {
1623  break;
1624  }
1625  usleep(1000000);
1626  }
1627  runner_running_ = false;
1628  TLOG(TLVL_TRACE) << "Runner thread complete";
1629 } // end daqinterfaceRunner_()
1630 
1631 //==============================================================================
1632 void ots::ARTDAQSupervisor::stop_runner_()
1633 {
1634  runner_running_ = false;
1635  if(runner_thread_ && runner_thread_->joinable())
1636  {
1637  runner_thread_->join();
1638  runner_thread_.reset(nullptr);
1639  }
1640 } // end stop_runner_()
1641 
1642 //==============================================================================
1643 void ots::ARTDAQSupervisor::start_runner_()
1644 {
1645  stop_runner_();
1646  runner_thread_ =
1647  std::make_unique<std::thread>(&ots::ARTDAQSupervisor::daqinterfaceRunner_, this);
1648 } // end start_runner_()
virtual void transitionHalting(toolbox::Event::Reference event) override
virtual void transitionInitializing(toolbox::Event::Reference event) override
void loadTableGroup(const std::string &tableGroupName, const TableGroupKey &tableGroupKey, bool doActivate=false, std::map< std::string, TableVersion > *groupMembers=0, ProgressBar *progressBar=0, std::string *accumulateWarnings=0, std::string *groupComment=0, std::string *groupAuthor=0, std::string *groupCreateTime=0, bool doNotLoadMember=false, std::string *groupTypeString=0, std::map< std::string, std::string > *groupAliases=0, ConfigurationManager::LoadGroupType onlyLoadIfBackboneOrContext=ConfigurationManager::LoadGroupType::ALL_TYPES, bool ignoreVersionTracking=false)
ConfigurationTree getNode(const std::string &nodeString, bool doNotThrowOnBrokenUIDLinks=false) const
"root/parent/parent/"
ConfigurationTree getNode(const std::string &nodeName, bool doNotThrowOnBrokenUIDLinks=false) const
navigating between nodes
const std::string & getValueAsString(bool returnLinkTableValue=false) const
void getValue(T &value) const
ITRACEController * theTRACEController_
only define for an app that receives a command
bool isComplete()
get functions
Definition: ProgressBar.cc:89
void step()
thread safe
Definition: ProgressBar.cc:75
int read()
if stepsToComplete==0, then define any progress as 50%, thread safe
Definition: ProgressBar.cc:121
void complete()
declare complete, thread safe
Definition: ProgressBar.cc:96
void INIT_MF(const char *name)
std::string name
Also key in map.