otsdaq  v2_05_02_indev
ARTDAQSupervisor.cc
1 
2 #include "otsdaq/ARTDAQSupervisor/ARTDAQSupervisor.hh"
3 
4 #include "artdaq-core/Utilities/configureMessageFacility.hh"
5 #include "artdaq/BuildInfo/GetPackageBuildInfo.hh"
6 #include "artdaq/DAQdata/Globals.hh"
7 #include "artdaq/ExternalComms/MakeCommanderPlugin.hh"
8 #include "cetlib_except/exception.h"
9 #include "fhiclcpp/make_ParameterSet.h"
10 
11 #include "artdaq-core/Utilities/ExceptionHandler.hh" /*for artdaq::ExceptionHandler*/
12 
13 #include <boost/exception/all.hpp>
14 #include <boost/filesystem.hpp>
15 
16 #include <signal.h>
17 
18 using namespace ots;
19 
20 XDAQ_INSTANTIATOR_IMPL(ARTDAQSupervisor)
21 
22 #define FAKE_CONFIG_NAME "ots_config"
23 #define DAQINTERFACE_PORT std::atoi(__ENV__("ARTDAQ_BASE_PORT")) + (partition_ * std::atoi(__ENV__("ARTDAQ_PORTS_PER_PARTITION")))
24 
25 static ARTDAQSupervisor* instance = nullptr;
26 static std::unordered_map<int, struct sigaction> old_actions = std::unordered_map<int, struct sigaction>();
27 static bool sighandler_init = false;
28 static void signal_handler(int signum)
29 {
30  // Messagefacility may already be gone at this point, TRACE ONLY!
31  TRACE_STREAMER(TLVL_ERROR, &("ARTDAQsupervisor")[0], 0, 0, 0) << "A signal of type " << signum
32  << " was caught by ARTDAQSupervisor. Shutting down DAQInterface, "
33  "then proceeding with default handlers!";
34 
35  if(instance)
36  instance->destroy();
37 
38  sigset_t set;
39  pthread_sigmask(SIG_UNBLOCK, NULL, &set);
40  pthread_sigmask(SIG_UNBLOCK, &set, NULL);
41 
42  TRACE_STREAMER(TLVL_ERROR, &("SharedMemoryManager")[0], 0, 0, 0) << "Calling default signal handler";
43  if(signum != SIGUSR2)
44  {
45  sigaction(signum, &old_actions[signum], NULL);
46  kill(getpid(), signum); // Only send signal to self
47  }
48  else
49  {
50  // Send Interrupt signal if parsing SIGUSR2 (i.e. user-defined exception that
51  // should tear down ARTDAQ)
52  sigaction(SIGINT, &old_actions[SIGINT], NULL);
53  kill(getpid(), SIGINT); // Only send signal to self
54  }
55 }
56 
57 static void init_sighandler(ARTDAQSupervisor* inst)
58 {
59  static std::mutex sighandler_mutex;
60  std::unique_lock<std::mutex> lk(sighandler_mutex);
61 
62  if(!sighandler_init)
63  {
64  sighandler_init = true;
65  instance = inst;
66  std::vector<int> signals = {
67  SIGINT, SIGILL, SIGABRT, SIGFPE, SIGSEGV, SIGPIPE, SIGALRM, SIGTERM, SIGUSR2, SIGHUP}; // SIGQUIT is used by art in normal operation
68  for(auto signal : signals)
69  {
70  struct sigaction old_action;
71  sigaction(signal, NULL, &old_action);
72 
73  // If the old handler wasn't SIG_IGN (it's a handler that just
74  // "ignore" the signal)
75  if(old_action.sa_handler != SIG_IGN)
76  {
77  struct sigaction action;
78  action.sa_handler = signal_handler;
79  sigemptyset(&action.sa_mask);
80  for(auto sigblk : signals)
81  {
82  sigaddset(&action.sa_mask, sigblk);
83  }
84  action.sa_flags = 0;
85 
86  // Replace the signal handler of SIGINT with the one described by
87  // new_action
88  sigaction(signal, &action, NULL);
89  old_actions[signal] = old_action;
90  }
91  }
92  }
93 }
94 
95 //==============================================================================
96 ARTDAQSupervisor::ARTDAQSupervisor(xdaq::ApplicationStub* stub)
97  : CoreSupervisorBase(stub)
98  , daqinterface_ptr_(NULL)
99  , partition_(getSupervisorProperty("partition", 0))
100  , daqinterface_state_("notrunning")
101  , runner_thread_(nullptr)
102 {
103  __SUP_COUT__ << "Constructor." << __E__;
104 
105  INIT_MF("." /*directory used is USER_DATA/LOG/.*/);
106  init_sighandler(this);
107 
108  // Only use system Python
109  unsetenv("PYTHONPATH");
110  unsetenv("PYTHONHOME");
111 
112  // Write out settings file
113  auto settings_file = __ENV__("DAQINTERFACE_SETTINGS");
114  std::ofstream o(settings_file, std::ios::trunc);
115 
116  setenv("DAQINTERFACE_PARTITION_NUMBER", std::to_string(partition_).c_str(), 1);
117  auto logfileName = std::string(__ENV__("OTSDAQ_LOG_DIR")) + "/DAQInteface/DAQInterface_partition" + std::to_string(partition_) + ".log";
118  setenv("DAQINTERFACE_LOGFILE", logfileName.c_str(), 1);
119 
120  o << "log_directory: " << getSupervisorProperty("log_directory", std::string(__ENV__("OTSDAQ_LOG_DIR"))) << std::endl;
121 
122  {
123  const std::string record_directory = getSupervisorProperty("record_directory", ARTDAQTableBase::ARTDAQ_FCL_PATH + "/run_records/");
124  mkdir(record_directory.c_str(), 0755);
125  o << "record_directory: " << record_directory << std::endl;
126  }
127 
128  o << "package_hashes_to_save: " << getSupervisorProperty("package_hashes_to_save", "[artdaq]") << std::endl;
129  // Note that productsdir_for_bash_scripts is REQUIRED!
130  o << "productsdir_for_bash_scripts: " << getSupervisorProperty("productsdir_for_bash_scripts", std::string(__ENV__("OTS_PRODUCTS"))) << std::endl;
131  o << "boardreader timeout: " << getSupervisorProperty("boardreader_timeout", 30) << std::endl;
132  o << "eventbuilder timeout: " << getSupervisorProperty("eventbuilder_timeout", 30) << std::endl;
133  o << "datalogger timeout: " << getSupervisorProperty("datalogger_timeout", 30) << std::endl;
134  o << "dispatcher timeout: " << getSupervisorProperty("dispatcher_timeout", 30) << std::endl;
135  o << "max_fragment_size_bytes: " << getSupervisorProperty("max_fragment_size_bytes", 1048576) << std::endl;
136  o << "transfer_plugin_to_use: " << getSupervisorProperty("transfer_plugin_to_use", "Autodetect") << std::endl;
137  o << "all_events_to_all_dispatchers: " << std::boolalpha << getSupervisorProperty("all_events_to_all_dispatchers", true) << std::endl;
138  o << "data_directory_override: " << getSupervisorProperty("data_directory_override", std::string(__ENV__("ARTDAQ_OUTPUT_DIR"))) << std::endl;
139  o << "max_configurations_to_list: " << getSupervisorProperty("max_configurations_to_list", 10) << std::endl;
140  o << "disable_unique_rootfile_labels: " << getSupervisorProperty("disable_unique_rootfile_labels", false) << std::endl;
141  o << "use_messageviewer: " << std::boolalpha << getSupervisorProperty("use_messageviewer", false) << std::endl;
142  o << "fake_messagefacility: " << std::boolalpha << getSupervisorProperty("fake_messagefacility", false) << std::endl;
143  o << "advanced_memory_usage: " << std::boolalpha << getSupervisorProperty("advanced_memory_usage", false) << std::endl;
144  o << "disable_private_network_bookkeeping: " << std::boolalpha << getSupervisorProperty("disable_private_network_bookkeeping", false) << std::endl;
145 
146  o.close();
147  __SUP_COUT__ << "Constructed." << __E__;
148 } // end constructor()
149 
150 //==============================================================================
151 ARTDAQSupervisor::~ARTDAQSupervisor(void)
152 {
153  __SUP_COUT__ << "Destructor." << __E__;
154  destroy();
155  __SUP_COUT__ << "Destructed." << __E__;
156 } // end destructor()
157 
158 //==============================================================================
159 void ARTDAQSupervisor::init(void)
160 {
161  stop_runner_();
162 
163  __SUP_COUT__ << "Initializing..." << __E__;
164  {
165  std::lock_guard<std::recursive_mutex> lk(daqinterface_mutex_);
166 
167  // allSupervisorInfo_.init(getApplicationContext());
168  artdaq::configureMessageFacility("ARTDAQSupervisor");
169  __SUP_COUT__ << "artdaq MF configured." << __E__;
170 
171  // initialization
172  char* daqinterface_dir = getenv("ARTDAQ_DAQINTERFACE_DIR");
173  if(daqinterface_dir == NULL)
174  {
175  __SS__ << "ARTDAQ_DAQINTERFACE_DIR environment variable not set! This "
176  "means that DAQInterface has not been setup!"
177  << __E__;
178  __SUP_SS_THROW__;
179  }
180  else
181  {
182  __SUP_COUT__ << "Initializing Python" << __E__;
183  Py_Initialize();
184 
185  __SUP_COUT__ << "Adding DAQInterface directory to PYTHON_PATH" << __E__;
186  PyObject* sysPath = PySys_GetObject((char*)"path");
187  PyObject* programName = PyString_FromString(daqinterface_dir);
188  PyList_Append(sysPath, programName);
189  Py_DECREF(programName);
190 
191  __SUP_COUT__ << "Creating Module name" << __E__;
192  PyObject* pName = PyString_FromString("rc.control.daqinterface");
193  /* Error checking of pName left out */
194 
195  __SUP_COUT__ << "Importing module" << __E__;
196  PyObject* pModule = PyImport_Import(pName);
197  Py_DECREF(pName);
198 
199  if(pModule == NULL)
200  {
201  PyErr_Print();
202  __SS__ << "Failed to load rc.control.daqinterface" << __E__;
203  __SUP_SS_THROW__;
204  }
205  else
206  {
207  __SUP_COUT__ << "Loading python module dictionary" << __E__;
208  PyObject* pDict = PyModule_GetDict(pModule);
209  if(pDict == NULL)
210  {
211  PyErr_Print();
212  __SS__ << "Unable to load module dictionary" << __E__;
213  __SUP_SS_THROW__;
214  }
215  else
216  {
217  Py_DECREF(pModule);
218 
219  __SUP_COUT__ << "Getting DAQInterface object pointer" << __E__;
220  PyObject* di_obj_ptr = PyDict_GetItemString(pDict, "DAQInterface");
221 
222  __SUP_COUT__ << "Filling out DAQInterface args struct" << __E__;
223  PyObject* pArgs = PyTuple_New(0);
224 
225  PyObject* kwargs = Py_BuildValue("{s:s, s:s, s:i, s:i, s:s, s:s}",
226  "logpath",
227  ".daqint.log",
228  "name",
229  "DAQInterface",
230  "partition_number",
231  partition_,
232  "rpc_port",
233  DAQINTERFACE_PORT,
234  "rpc_host",
235  "localhost",
236  "control_host",
237  "localhost");
238 
239  __SUP_COUT__ << "Calling DAQInterface Object Constructor" << __E__;
240  daqinterface_ptr_ = PyObject_Call(di_obj_ptr, pArgs, kwargs);
241 
242  Py_DECREF(di_obj_ptr);
243  }
244  }
245  }
246 
247  getDAQState_();
248  }
249  start_runner_();
250  __SUP_COUT__ << "Initialized." << __E__;
251 } // end init()
252 
253 //==============================================================================
254 void ARTDAQSupervisor::destroy(void)
255 {
256  __SUP_COUT__ << "Destroying..." << __E__;
257 
258  if(daqinterface_ptr_ != NULL)
259  {
260  __SUP_COUT__ << "Calling recover transition" << __E__;
261  std::lock_guard<std::recursive_mutex> lk(daqinterface_mutex_);
262  PyObject* pName = PyString_FromString("do_recover");
263  /*PyObject* res =*/ PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, NULL);
264 
265  __SUP_COUT__ << "Making sure that correct state has been reached" << __E__;
266  getDAQState_();
267  while(daqinterface_state_ != "stopped")
268  {
269  getDAQState_();
270  __SUP_COUT__ << "State is " << daqinterface_state_ << ", waiting 1s and retrying..." << __E__;
271  usleep(1000000);
272  }
273 
274  Py_XDECREF(daqinterface_ptr_);
275  daqinterface_ptr_ = NULL;
276  }
277 
278  Py_Finalize();
279  __SUP_COUT__ << "Destroyed." << __E__;
280 } // end destroy()
281 
282 //==============================================================================
283 void ARTDAQSupervisor::transitionConfiguring(toolbox::Event::Reference /*event*/)
284 {
285  __SUP_COUT__ << "transitionConfiguring" << __E__;
286 
287  // activate the configuration tree (the first iteration)
288  if(RunControlStateMachine::getIterationIndex() == 0 && RunControlStateMachine::getSubIterationIndex() == 0)
289  {
290  thread_error_message_ = "";
291  thread_progress_bar_.resetProgressBar(0);
292  last_thread_progress_update_ = time(0); // initialize timeout timer
293 
294  std::pair<std::string /*group name*/, TableGroupKey> theGroup(
295  SOAPUtilities::translate(theStateMachine_.getCurrentMessage()).getParameters().getValue("ConfigurationTableGroupName"),
296  TableGroupKey(SOAPUtilities::translate(theStateMachine_.getCurrentMessage()).getParameters().getValue("ConfigurationTableGroupKey")));
297 
298  __SUP_COUT__ << "Configuration table group name: " << theGroup.first << " key: " << theGroup.second << __E__;
299 
300  theConfigurationManager_->loadTableGroup(theGroup.first, theGroup.second, true /*doActivate*/);
301 
302  // start configuring thread
303  std::thread(&ARTDAQSupervisor::configuringThread, this).detach();
304 
305  __SUP_COUT__ << "Configuring thread started." << __E__;
306 
307  RunControlStateMachine::indicateSubIterationWork();
308  }
309  else // not first time
310  {
311  std::string errorMessage;
312  {
313  std::lock_guard<std::mutex> lock(thread_mutex_); // lock out for remainder of scope
314  errorMessage = thread_error_message_; // theStateMachine_.getErrorMessage();
315  }
316  int progress = thread_progress_bar_.read();
317  __SUP_COUTV__(errorMessage);
318  __SUP_COUTV__(progress);
319  __SUP_COUTV__(thread_progress_bar_.isComplete());
320 
321  // check for done and error messages
322  if(errorMessage == "" && // if no update in 600 seconds, give up
323  time(0) - last_thread_progress_update_ > 600)
324  {
325  __SUP_SS__ << "There has been no update from the configuration thread for " << (time(0) - last_thread_progress_update_)
326  << " seconds, assuming something is wrong and giving up! "
327  << "Last progress received was " << progress << __E__;
328  errorMessage = ss.str();
329  }
330 
331  if(errorMessage != "")
332  {
333  __SUP_SS__ << "Error was caught in configuring thread: " << errorMessage << __E__;
334  __SUP_COUT_ERR__ << "\n" << ss.str();
335 
336  theStateMachine_.setErrorMessage(ss.str());
337  throw toolbox::fsm::exception::Exception("Transition Error" /*name*/,
338  ss.str() /* message*/,
339  "CoreSupervisorBase::transitionConfiguring" /*module*/,
340  __LINE__ /*line*/,
341  __FUNCTION__ /*function*/
342  );
343  }
344 
345  if(!thread_progress_bar_.isComplete())
346  {
347  RunControlStateMachine::indicateSubIterationWork();
348 
349  if(last_thread_progress_read_ != progress)
350  {
351  last_thread_progress_read_ = progress;
352  last_thread_progress_update_ = time(0);
353  }
354 
355  sleep(1 /*seconds*/);
356  }
357  else
358  {
359  __SUP_COUT_INFO__ << "Complete configuring transition!" << __E__;
360  __SUP_COUTV__(getProcessInfo_());
361  }
362  }
363 
364  return;
365 } // end transitionConfiguring()
366 
367 //==============================================================================
368 void ARTDAQSupervisor::configuringThread() try
369 {
370  const std::string& uid =
371  theConfigurationManager_
372  ->getNode(ConfigurationManager::XDAQ_APPLICATION_TABLE_NAME + "/" + CorePropertySupervisorBase::getSupervisorUID() + "/" + "LinkToSupervisorTable")
373  .getValueAsString();
374 
375  __COUT__ << "Supervisor uid is " << uid << ", getting supervisor table node" << __E__;
376 
377  const std::string mfSubject_ = supervisorClassNoNamespace_ + "-" + uid;
378 
379  ConfigurationTree theSupervisorNode = getSupervisorTableNode();
380 
381  thread_progress_bar_.step();
382 
383  auto info = ARTDAQTableBase::extractARTDAQInfo(
384  theSupervisorNode,
385  false /*getStatusFalseNodes*/,
386  true /*doWriteFHiCL*/,
387  CorePropertySupervisorBase::getSupervisorProperty<size_t>("max_fragment_size_bytes", ARTDAQTableBase::DEFAULT_MAX_FRAGMENT_SIZE),
388  CorePropertySupervisorBase::getSupervisorProperty<size_t>("routing_timeout_ms", ARTDAQTableBase::DEFAULT_ROUTING_TIMEOUT_MS),
389  CorePropertySupervisorBase::getSupervisorProperty<size_t>("routing_retry_count", ARTDAQTableBase::DEFAULT_ROUTING_RETRY_COUNT),
390  &thread_progress_bar_);
391 
392  // Check lists
393  if(info.processes.count(ARTDAQTableBase::ARTDAQAppType::BoardReader) == 0)
394  {
395  __GEN_SS__ << "There must be at least one enabled BoardReader!" << __E__;
396  __GEN_SS_THROW__;
397  return;
398  }
399  if(info.processes.count(ARTDAQTableBase::ARTDAQAppType::EventBuilder) == 0)
400  {
401  __GEN_SS__ << "There must be at least one enabled EventBuilder!" << __E__;
402  __GEN_SS_THROW__;
403  return;
404  }
405 
406  thread_progress_bar_.step();
407 
408  __GEN_COUT__ << "Writing boot.txt" << __E__;
409 
410  int debugLevel = theSupervisorNode.getNode("DAQInterfaceDebugLevel").getValue<int>();
411  std::string setupScript = theSupervisorNode.getNode("DAQSetupScript").getValue();
412 
413  std::ofstream o(ARTDAQTableBase::ARTDAQ_FCL_PATH + "/boot.txt", std::ios::trunc);
414  o << "DAQ setup script: " << setupScript << std::endl;
415  o << "debug level: " << debugLevel << std::endl;
416  o << std::endl;
417 
418  if(info.subsystems.size() > 1)
419  {
420  for(auto& ss : info.subsystems)
421  {
422  if(ss.first == 0)
423  continue;
424  o << "Subsystem id: " << ss.first << std::endl;
425  if(ss.second.destination != 0)
426  {
427  o << "Subsystem destination: " << ss.second.destination << std::endl;
428  }
429  for(auto& sss : ss.second.sources)
430  {
431  o << "Subsystem source: " << sss << std::endl;
432  }
433  o << std::endl;
434  }
435  }
436 
437  for(auto& builder : info.processes[ARTDAQTableBase::ARTDAQAppType::EventBuilder])
438  {
439  o << "EventBuilder host: " << builder.hostname << std::endl;
440  o << "EventBuilder label: " << builder.label << std::endl;
441  if(builder.subsystem != 1)
442  {
443  o << "EventBuilder subsystem: " << builder.subsystem << std::endl;
444  }
445  o << std::endl;
446  }
447  for(auto& logger : info.processes[ARTDAQTableBase::ARTDAQAppType::DataLogger])
448  {
449  o << "DataLogger host: " << logger.hostname << std::endl;
450  o << "DataLogger label: " << logger.label << std::endl;
451  if(logger.subsystem != 1)
452  {
453  o << "DataLogger subsystem: " << logger.subsystem << std::endl;
454  }
455  o << std::endl;
456  }
457  for(auto& dispatcher : info.processes[ARTDAQTableBase::ARTDAQAppType::Dispatcher])
458  {
459  o << "Dispatcher host: " << dispatcher.hostname << std::endl;
460  o << "Dispatcher label: " << dispatcher.label << std::endl;
461  if(dispatcher.subsystem != 1)
462  {
463  o << "Dispatcher subsystem: " << dispatcher.subsystem << std::endl;
464  }
465  o << std::endl;
466  }
467  for(auto& rmaster : info.processes[ARTDAQTableBase::ARTDAQAppType::RoutingMaster])
468  {
469  o << "RoutingMaster host: " << rmaster.hostname << std::endl;
470  o << "RoutingMaster label: " << rmaster.label << std::endl;
471  if(rmaster.subsystem != 1)
472  {
473  o << "RoutingMaster subsystem: " << rmaster.subsystem << std::endl;
474  }
475  o << std::endl;
476  }
477  o.close();
478 
479  thread_progress_bar_.step();
480 
481  __GEN_COUT__ << "Building configuration directory" << __E__;
482 
483  boost::system::error_code ignored;
484  boost::filesystem::remove_all(ARTDAQTableBase::ARTDAQ_FCL_PATH + FAKE_CONFIG_NAME, ignored);
485  mkdir((ARTDAQTableBase::ARTDAQ_FCL_PATH + FAKE_CONFIG_NAME).c_str(), 0755);
486 
487  for(auto& reader : info.processes[ARTDAQTableBase::ARTDAQAppType::BoardReader])
488  {
489  symlink(ARTDAQTableBase::getFlatFHICLFilename(ARTDAQTableBase::ARTDAQAppType::BoardReader, reader.label).c_str(),
490  (ARTDAQTableBase::ARTDAQ_FCL_PATH + FAKE_CONFIG_NAME + "/" + reader.label + ".fcl").c_str());
491  }
492  for(auto& builder : info.processes[ARTDAQTableBase::ARTDAQAppType::EventBuilder])
493  {
494  symlink(ARTDAQTableBase::getFlatFHICLFilename(ARTDAQTableBase::ARTDAQAppType::EventBuilder, builder.label).c_str(),
495  (ARTDAQTableBase::ARTDAQ_FCL_PATH + FAKE_CONFIG_NAME + "/" + builder.label + ".fcl").c_str());
496  }
497  for(auto& logger : info.processes[ARTDAQTableBase::ARTDAQAppType::DataLogger])
498  {
499  symlink(ARTDAQTableBase::getFlatFHICLFilename(ARTDAQTableBase::ARTDAQAppType::DataLogger, logger.label).c_str(),
500  (ARTDAQTableBase::ARTDAQ_FCL_PATH + FAKE_CONFIG_NAME + "/" + logger.label + ".fcl").c_str());
501  }
502  for(auto& dispatcher : info.processes[ARTDAQTableBase::ARTDAQAppType::Dispatcher])
503  {
504  symlink(ARTDAQTableBase::getFlatFHICLFilename(ARTDAQTableBase::ARTDAQAppType::Dispatcher, dispatcher.label).c_str(),
505  (ARTDAQTableBase::ARTDAQ_FCL_PATH + FAKE_CONFIG_NAME + "/" + dispatcher.label + ".fcl").c_str());
506  }
507  for(auto& rmaster : info.processes[ARTDAQTableBase::ARTDAQAppType::RoutingMaster])
508  {
509  symlink(ARTDAQTableBase::getFlatFHICLFilename(ARTDAQTableBase::ARTDAQAppType::RoutingMaster, rmaster.label).c_str(),
510  (ARTDAQTableBase::ARTDAQ_FCL_PATH + FAKE_CONFIG_NAME + "/" + rmaster.label + ".fcl").c_str());
511  }
512 
513  thread_progress_bar_.step();
514 
515  std::lock_guard<std::recursive_mutex> lk(daqinterface_mutex_);
516  getDAQState_();
517  if(daqinterface_state_ != "stopped" && daqinterface_state_ != "")
518  {
519  __GEN_SS__ << "Cannot configure DAQInterface because it is in the wrong state"
520  << " (" << daqinterface_state_ << " != stopped)!" << __E__;
521  __GEN_SS_THROW__
522  }
523 
524  __GEN_COUT__ << "Calling setdaqcomps" << __E__;
525  __GEN_COUT__ << "Status before setdaqcomps: " << daqinterface_state_ << __E__;
526  PyObject* pName1 = PyString_FromString("setdaqcomps");
527 
528  PyObject* readerDict = PyDict_New();
529  for(auto& reader : info.processes[ARTDAQTableBase::ARTDAQAppType::BoardReader])
530  {
531  PyObject* readerName = PyString_FromString(reader.label.c_str());
532 
533  PyObject* readerData = PyList_New(3);
534  PyObject* readerHost = PyString_FromString(reader.hostname.c_str());
535  PyObject* readerPort = PyString_FromString("-1");
536  PyObject* readerSubsystem = PyString_FromString(std::to_string(reader.subsystem).c_str());
537  PyList_SetItem(readerData, 0, readerHost);
538  PyList_SetItem(readerData, 1, readerPort);
539  PyList_SetItem(readerData, 2, readerSubsystem);
540  PyDict_SetItem(readerDict, readerName, readerData);
541  }
542  PyObject* res1 = PyObject_CallMethodObjArgs(daqinterface_ptr_, pName1, readerDict, NULL);
543  Py_DECREF(readerDict);
544 
545  if(res1 == NULL)
546  {
547  PyErr_Print();
548  __GEN_SS__ << "Error calling setdaqcomps transition" << __E__;
549  __GEN_SS_THROW__;
550  }
551  getDAQState_();
552  __GEN_COUT__ << "Status after setdaqcomps: " << daqinterface_state_ << __E__;
553 
554  thread_progress_bar_.step();
555  __GEN_COUT__ << "Calling do_boot" << __E__;
556  __GEN_COUT__ << "Status before boot: " << daqinterface_state_ << __E__;
557  PyObject* pName2 = PyString_FromString("do_boot");
558  PyObject* pStateArgs1 = PyString_FromString((ARTDAQTableBase::ARTDAQ_FCL_PATH + "/boot.txt").c_str());
559  PyObject* res2 = PyObject_CallMethodObjArgs(daqinterface_ptr_, pName2, pStateArgs1, NULL);
560 
561  if(res2 == NULL)
562  {
563  PyErr_Print();
564  __GEN_SS__ << "Error calling boot transition" << __E__;
565  __GEN_SS_THROW__;
566  }
567 
568  getDAQState_();
569  if(daqinterface_state_ != "booted")
570  {
571  __GEN_SS__ << "DAQInterface boot transition failed! "
572  << "Status after boot attempt: " << daqinterface_state_ << __E__;
573  __GEN_SS_THROW__;
574  }
575  __GEN_COUT__ << "Status after boot: " << daqinterface_state_ << __E__;
576 
577  thread_progress_bar_.step();
578  __GEN_COUT__ << "Calling do_config" << __E__;
579  __GEN_COUT__ << "Status before config: " << daqinterface_state_ << __E__;
580  PyObject* pName3 = PyString_FromString("do_config");
581  PyObject* pStateArgs2 = Py_BuildValue("[s]", FAKE_CONFIG_NAME);
582  PyObject* res3 = PyObject_CallMethodObjArgs(daqinterface_ptr_, pName3, pStateArgs2, NULL);
583 
584  if(res3 == NULL)
585  {
586  PyErr_Print();
587  __GEN_SS__ << "Error calling config transition" << __E__;
588  __GEN_SS_THROW__;
589  }
590  getDAQState_();
591  // if(daqinterface_state_ != "ready")
592  // {
593  // __GEN_SS__ << "DAQInterface config transition failed!" << __E__
594  // << "Supervisor state: "<< daqinterface_state_ <<
595  // __E__;
596  // __GEN_SS_THROW__;
597  // }
598  __GEN_COUT__ << "Status after config: " << daqinterface_state_ << __E__;
599  thread_progress_bar_.complete();
600  __GEN_COUT__ << "Configured." << __E__;
601 
602 } // end configuringThread()
603 catch(const std::runtime_error& e)
604 {
605  __SS__ << "Error was caught while configuring: " << e.what() << __E__;
606  __COUT_ERR__ << "\n" << ss.str();
607  std::lock_guard<std::mutex> lock(thread_mutex_); // lock out for remainder of scope
608  thread_error_message_ = ss.str();
609 }
610 catch(...)
611 {
612  __SS__ << "Unknown error was caught while configuring. Please checked the logs." << __E__;
613  __COUT_ERR__ << "\n" << ss.str();
614 
615  artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
616 
617  std::lock_guard<std::mutex> lock(thread_mutex_); // lock out for remainder of scope
618  thread_error_message_ = ss.str();
619 } // end configuringThread() error handling
620 
621 //==============================================================================
622 void ARTDAQSupervisor::transitionHalting(toolbox::Event::Reference /*event*/) try
623 {
624  __SUP_COUT__ << "Halting..." << __E__;
625  std::lock_guard<std::recursive_mutex> lk(daqinterface_mutex_);
626  getDAQState_();
627  __SUP_COUT__ << "Status before halt: " << daqinterface_state_ << __E__;
628 
629  PyObject* pName = PyString_FromString("do_command");
630  PyObject* pArg = PyString_FromString("Shutdown");
631  PyObject* res = PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, pArg, NULL);
632 
633  if(res == NULL)
634  {
635  PyErr_Print();
636  __SS__ << "Error calling Shutdown transition" << __E__;
637  __SUP_SS_THROW__;
638  }
639 
640  getDAQState_();
641  __SUP_COUT__ << "Status after halt: " << daqinterface_state_ << __E__;
642  __SUP_COUT__ << "Halted." << __E__;
643 } // end transitionHalting()
644 catch(const std::runtime_error& e)
645 {
646  const std::string transitionName = "Halting";
647  // if halting from Failed state, then ignore errors
648  if(theStateMachine_.getProvenanceStateName() == RunControlStateMachine::FAILED_STATE_NAME ||
649  theStateMachine_.getProvenanceStateName() == RunControlStateMachine::HALTED_STATE_NAME)
650  {
651  __SUP_COUT_INFO__ << "Error was caught while halting (but ignoring because "
652  "previous state was '"
653  << RunControlStateMachine::FAILED_STATE_NAME << "'): " << e.what() << __E__;
654  }
655  else // if not previously in Failed state, then fail
656  {
657  __SUP_SS__ << "Error was caught while " << transitionName << ": " << e.what() << __E__;
658  __SUP_COUT_ERR__ << "\n" << ss.str();
659  theStateMachine_.setErrorMessage(ss.str());
660  throw toolbox::fsm::exception::Exception("Transition Error" /*name*/,
661  ss.str() /* message*/,
662  "ARTDAQSupervisorBase::transition" + transitionName /*module*/,
663  __LINE__ /*line*/,
664  __FUNCTION__ /*function*/
665  );
666  }
667 } // end transitionHalting() std::runtime_error exception handling
668 catch(...)
669 {
670  const std::string transitionName = "Halting";
671  // if halting from Failed state, then ignore errors
672  if(theStateMachine_.getProvenanceStateName() == RunControlStateMachine::FAILED_STATE_NAME ||
673  theStateMachine_.getProvenanceStateName() == RunControlStateMachine::HALTED_STATE_NAME)
674  {
675  __SUP_COUT_INFO__ << "Unknown error was caught while halting (but ignoring "
676  "because previous state was '"
677  << RunControlStateMachine::FAILED_STATE_NAME << "')." << __E__;
678  }
679  else // if not previously in Failed state, then fail
680  {
681  __SUP_SS__ << "Unknown error was caught while " << transitionName << ". Please checked the logs." << __E__;
682  __SUP_COUT_ERR__ << "\n" << ss.str();
683  theStateMachine_.setErrorMessage(ss.str());
684 
685  artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
686 
687  throw toolbox::fsm::exception::Exception("Transition Error" /*name*/,
688  ss.str() /* message*/,
689  "ARTDAQSupervisorBase::transition" + transitionName /*module*/,
690  __LINE__ /*line*/,
691  __FUNCTION__ /*function*/
692  );
693  }
694 } // end transitionHalting() exception handling
695 
696 //==============================================================================
697 void ARTDAQSupervisor::transitionInitializing(toolbox::Event::Reference /*event*/) try
698 {
699  __SUP_COUT__ << "Initializing..." << __E__;
700  init();
701  __SUP_COUT__ << "Initialized." << __E__;
702 } // end transitionInitializing()
703 catch(const std::runtime_error& e)
704 {
705  __SS__ << "Error was caught while Initializing: " << e.what() << __E__;
706  __SS_THROW__;
707 }
708 catch(...)
709 {
710  __SS__ << "Unknown error was caught while Initializing. Please checked the logs." << __E__;
711  artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
712  __SS_THROW__;
713 } // end transitionInitializing() error handling
714 
715 //==============================================================================
716 void ARTDAQSupervisor::transitionPausing(toolbox::Event::Reference /*event*/) try
717 {
718  __SUP_COUT__ << "Pausing..." << __E__;
719  std::lock_guard<std::recursive_mutex> lk(daqinterface_mutex_);
720 
721  getDAQState_();
722  __SUP_COUT__ << "Status before pause: " << daqinterface_state_ << __E__;
723 
724  PyObject* pName = PyString_FromString("do_command");
725  PyObject* pArg = PyString_FromString("Pause");
726  PyObject* res = PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, pArg, NULL);
727 
728  if(res == NULL)
729  {
730  PyErr_Print();
731  __SS__ << "Error calling Pause transition" << __E__;
732  __SUP_SS_THROW__;
733  }
734 
735  getDAQState_();
736  __SUP_COUT__ << "Status after pause: " << daqinterface_state_ << __E__;
737 
738  __SUP_COUT__ << "Paused." << __E__;
739 } // end transitionPausing()
740 catch(const std::runtime_error& e)
741 {
742  __SS__ << "Error was caught while Pausing: " << e.what() << __E__;
743  __SS_THROW__;
744 }
745 catch(...)
746 {
747  __SS__ << "Unknown error was caught while Pausing. Please checked the logs." << __E__;
748  artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
749  __SS_THROW__;
750 } // end transitionPausing() error handling
751 
752 //==============================================================================
753 void ARTDAQSupervisor::transitionResuming(toolbox::Event::Reference /*event*/) try
754 {
755  __SUP_COUT__ << "Resuming..." << __E__;
756  std::lock_guard<std::recursive_mutex> lk(daqinterface_mutex_);
757 
758  getDAQState_();
759  __SUP_COUT__ << "Status before resume: " << daqinterface_state_ << __E__;
760  PyObject* pName = PyString_FromString("do_command");
761  PyObject* pArg = PyString_FromString("Resume");
762  PyObject* res = PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, pArg, NULL);
763 
764  if(res == NULL)
765  {
766  PyErr_Print();
767  __SS__ << "Error calling Resume transition" << __E__;
768  __SUP_SS_THROW__;
769  }
770  getDAQState_();
771  __SUP_COUT__ << "Status after resume: " << daqinterface_state_ << __E__;
772  __SUP_COUT__ << "Resumed." << __E__;
773 } // end transitionResuming()
774 catch(const std::runtime_error& e)
775 {
776  __SS__ << "Error was caught while Resuming: " << e.what() << __E__;
777  __SS_THROW__;
778 }
779 catch(...)
780 {
781  __SS__ << "Unknown error was caught while Resuming. Please checked the logs." << __E__;
782  artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
783  __SS_THROW__;
784 } // end transitionResuming() error handling
785 
786 //==============================================================================
787 void ARTDAQSupervisor::transitionStarting(toolbox::Event::Reference /*event*/) try
788 {
789  __SUP_COUT__ << "Starting..." << __E__;
790  {
791  std::lock_guard<std::recursive_mutex> lk(daqinterface_mutex_);
792  getDAQState_();
793  __SUP_COUT__ << "Status before start: " << daqinterface_state_ << __E__;
794  auto runNumber = SOAPUtilities::translate(theStateMachine_.getCurrentMessage()).getParameters().getValue("RunNumber");
795 
796  PyObject* pName = PyString_FromString("do_start_running");
797  int run_number = std::stoi(runNumber);
798  PyObject* pStateArgs = PyInt_FromLong(run_number);
799  PyObject* res = PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, pStateArgs, NULL);
800 
801  if(res == NULL)
802  {
803  PyErr_Print();
804  __SS__ << "Error calling start transition" << __E__;
805  __SUP_SS_THROW__;
806  }
807  getDAQState_();
808  __SUP_COUT__ << "Status after start: " << daqinterface_state_ << __E__;
809  if(daqinterface_state_ != "running")
810  {
811  __SS__ << "DAQInterface start transition failed!" << __E__;
812  __SUP_SS_THROW__;
813  }
814  }
815  start_runner_();
816  __SUP_COUT__ << "Started." << __E__;
817 } // end transitionStarting()
818 catch(const std::runtime_error& e)
819 {
820  __SS__ << "Error was caught while Starting: " << e.what() << __E__;
821  __SS_THROW__;
822 }
823 catch(...)
824 {
825  __SS__ << "Unknown error was caught while Starting. Please checked the logs." << __E__;
826  artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
827  __SS_THROW__;
828 } // end transitionStarting() error handling
829 
830 //==============================================================================
831 void ARTDAQSupervisor::transitionStopping(toolbox::Event::Reference /*event*/) try
832 {
833  __SUP_COUT__ << "Stopping..." << __E__;
834  std::lock_guard<std::recursive_mutex> lk(daqinterface_mutex_);
835  getDAQState_();
836  __SUP_COUT__ << "Status before stop: " << daqinterface_state_ << __E__;
837  PyObject* pName = PyString_FromString("do_stop_running");
838  PyObject* res = PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, NULL);
839 
840  if(res == NULL)
841  {
842  PyErr_Print();
843  __SS__ << "Error calling stop transition" << __E__;
844  __SUP_SS_THROW__;
845  }
846  getDAQState_();
847  __SUP_COUT__ << "Status after stop: " << daqinterface_state_ << __E__;
848  __SUP_COUT__ << "Stopped." << __E__;
849 } // end transitionStopping()
850 catch(const std::runtime_error& e)
851 {
852  __SS__ << "Error was caught while Stopping: " << e.what() << __E__;
853  __SS_THROW__;
854 }
855 catch(...)
856 {
857  __SS__ << "Unknown error was caught while Stopping. Please checked the logs." << __E__;
858  artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, ss.str());
859  __SS_THROW__;
860 } // end transitionStopping() error handling
861 
862 //==============================================================================
863 void ots::ARTDAQSupervisor::enteringError(toolbox::Event::Reference /*event*/)
864 {
865  __SUP_COUT__ << "Entering error recovery state" << __E__;
866  std::lock_guard<std::recursive_mutex> lk(daqinterface_mutex_);
867  getDAQState_();
868  __SUP_COUT__ << "Status before error: " << daqinterface_state_ << __E__;
869 
870  PyObject* pName = PyString_FromString("do_recover");
871  PyObject* res = PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, NULL);
872 
873  if(res == NULL)
874  {
875  PyErr_Print();
876  __SS__ << "Error calling recover transition" << __E__;
877  __SUP_SS_THROW__;
878  }
879  getDAQState_();
880  __SUP_COUT__ << "Status after error: " << daqinterface_state_ << __E__;
881  __SUP_COUT__ << "EnteringError DONE." << __E__;
882 
883 } // end enteringError()
884 
885 //==============================================================================
886 void ots::ARTDAQSupervisor::getDAQState_()
887 {
888  //__SUP_COUT__ << "Getting DAQInterface state" << __E__;
889  std::lock_guard<std::recursive_mutex> lk(daqinterface_mutex_);
890 
891  if(daqinterface_ptr_ == nullptr)
892  {
893  daqinterface_state_ = "";
894  return;
895  }
896 
897  PyObject* pName = PyString_FromString("state");
898  PyObject* pArg = PyString_FromString("DAQInterface");
899  PyObject* res = PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, pArg, NULL);
900 
901  if(res == NULL)
902  {
903  PyErr_Print();
904  __SS__ << "Error calling state function" << __E__;
905  __SUP_SS_THROW__;
906  return;
907  }
908  daqinterface_state_ = std::string(PyString_AsString(res));
909  //__SUP_COUT__ << "getDAQState_ DONE: state=" << result << __E__;
910 } // end getDAQState_()
911 
912 //==============================================================================
913 std::string ots::ARTDAQSupervisor::getProcessInfo_(void)
914 {
915  //__SUP_COUT__ << "Getting DAQInterface state" << __E__;
916  std::lock_guard<std::recursive_mutex> lk(daqinterface_mutex_);
917 
918  if(daqinterface_ptr_ == nullptr)
919  {
920  return "";
921  }
922 
923  PyObject* pName = PyString_FromString("artdaq_process_info");
924  PyObject* pArg = PyString_FromString("DAQInterface");
925  PyObject* res = PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, pArg, NULL);
926 
927  if(res == NULL)
928  {
929  PyErr_Print();
930  __SS__ << "Error calling artdaq_process_info function" << __E__;
931  __SUP_SS_THROW__;
932  return "";
933  }
934  return std::string(PyString_AsString(res));
935  //__SUP_COUT__ << "getDAQState_ DONE: state=" << result << __E__;
936 } // end getProcessInfo_()
937 
938 //==============================================================================
939 std::list<ots::ARTDAQSupervisor::DAQInterfaceProcessInfo> ots::ARTDAQSupervisor::getAndParseProcessInfo_()
940 {
941  std::list<ots::ARTDAQSupervisor::DAQInterfaceProcessInfo> output;
942  auto info = getProcessInfo_();
943  auto procs = tokenize_(info);
944 
945  // 0: Whole string
946  // 1: Process Label
947  // 2: Process host
948  // 3: Process port
949  // 4: Process subsystem
950  // 5: Process Rank
951  // 6: Process state
952  std::regex re("(.*?) at ([^:]*):(\\d+) \\(subsystem (\\d+), rank (\\d+)\\): (.*)");
953 
954  for(auto& proc : procs)
955  {
956  std::smatch match;
957  if(std::regex_match(proc, match, re))
958  {
959  DAQInterfaceProcessInfo info;
960 
961  info.label = match[1];
962  info.host = match[2];
963  info.port = std::stoi(match[3]);
964  info.subsystem = std::stoi(match[4]);
965  info.rank = std::stoi(match[5]);
966  info.state = match[6];
967 
968  output.push_back(info);
969  }
970  }
971  return output;
972 } // end getAndParseProcessInfo_()
973 
974 //==============================================================================
975 std::list<std::pair<ots::ARTDAQSupervisor::DAQInterfaceProcessInfo, std::unique_ptr<artdaq::CommanderInterface>>>
976 ots::ARTDAQSupervisor::makeCommandersFromProcessInfo()
977 {
978  std::list<std::pair<DAQInterfaceProcessInfo, std::unique_ptr<artdaq::CommanderInterface>>> output;
979  auto infos = getAndParseProcessInfo_();
980 
981  for(auto& info : infos)
982  {
983  artdaq::Commandable cm;
984  fhicl::ParameterSet ps;
985 
986  ps.put<std::string>("commanderPluginType", "xmlrpc");
987  ps.put<int>("id", info.port);
988  ps.put<std::string>("server_url", info.host);
989 
990  output.emplace_back(
991  std::make_pair<DAQInterfaceProcessInfo, std::unique_ptr<artdaq::CommanderInterface>>(std::move(info), artdaq::MakeCommanderPlugin(ps, cm)));
992  }
993 
994  return output;
995 } // end makeCommandersFromProcessInfo()
996 
997 //==============================================================================
998 std::list<std::string> ots::ARTDAQSupervisor::tokenize_(std::string const& input)
999 {
1000  size_t pos = 0;
1001  std::list<std::string> output;
1002 
1003  while(pos != std::string::npos && pos < input.size())
1004  {
1005  auto newpos = input.find('\n', pos);
1006  if(newpos != std::string::npos)
1007  {
1008  output.emplace_back(input, pos, newpos - pos);
1009  // TLOG(TLVL_TRACE) << "tokenize_: " << output.back();
1010  pos = newpos + 1;
1011  }
1012  else
1013  {
1014  output.emplace_back(input, pos);
1015  // TLOG(TLVL_TRACE) << "tokenize_: " << output.back();
1016  pos = newpos;
1017  }
1018  }
1019  return output;
1020 } // end tokenize_()
1021 
1022 //==============================================================================
1023 void ots::ARTDAQSupervisor::daqinterfaceRunner_()
1024 {
1025  TLOG(TLVL_TRACE) << "Runner thread starting";
1026  runner_running_ = true;
1027  while(runner_running_)
1028  {
1029  if(daqinterface_ptr_ != NULL)
1030  {
1031  std::unique_lock<std::recursive_mutex> lk(daqinterface_mutex_);
1032  getDAQState_();
1033  std::string state_before = daqinterface_state_;
1034 
1035  if(daqinterface_state_ == "running" || daqinterface_state_ == "ready" || daqinterface_state_ == "booted")
1036  {
1037  try
1038  {
1039  TLOG(TLVL_TRACE) << "Calling DAQInterface::check_proc_heartbeats";
1040  PyObject* pName = PyString_FromString("check_proc_heartbeats");
1041  PyObject* res = PyObject_CallMethodObjArgs(daqinterface_ptr_, pName, NULL);
1042  TLOG(TLVL_TRACE) << "Done with DAQInterface::check_proc_heartbeats call";
1043 
1044  if(res == NULL)
1045  {
1046  runner_running_ = false;
1047  lk.unlock();
1048  PyErr_Print();
1049  __SS__ << "Error calling check_proc_heartbeats function" << __E__;
1050  __SUP_SS_THROW__;
1051  break;
1052  }
1053  }
1054  catch(cet::exception& ex)
1055  {
1056  runner_running_ = false;
1057  lk.unlock();
1058  PyErr_Print();
1059  __SS__ << "An cet::exception occurred while calling "
1060  "check_proc_heartbeats function: "
1061  << ex.explain_self() << __E__;
1062  __SUP_SS_THROW__;
1063  break;
1064  }
1065  catch(std::exception& ex)
1066  {
1067  runner_running_ = false;
1068  lk.unlock();
1069  PyErr_Print();
1070  __SS__ << "An std::exception occurred while calling "
1071  "check_proc_heartbeats function: "
1072  << ex.what() << __E__;
1073  __SUP_SS_THROW__;
1074  break;
1075  }
1076  catch(...)
1077  {
1078  runner_running_ = false;
1079  lk.unlock();
1080  PyErr_Print();
1081  __SS__ << "An unknown Error occurred while calling runner function" << __E__;
1082  __SUP_SS_THROW__;
1083  break;
1084  }
1085 
1086  getDAQState_();
1087  if(daqinterface_state_ != state_before)
1088  {
1089  runner_running_ = false;
1090  lk.unlock();
1091  __SS__ << "DAQInterface state unexpectedly changed from " << state_before << " to " << daqinterface_state_
1092  << ". Check supervisor log file for more info!" << __E__;
1093  __SUP_SS_THROW__;
1094  break;
1095  }
1096  }
1097  }
1098  else
1099  {
1100  break;
1101  }
1102  usleep(1000000);
1103  }
1104  runner_running_ = false;
1105  TLOG(TLVL_TRACE) << "Runner thread complete";
1106 } // end daqinterfaceRunner_()
1107 
1108 //==============================================================================
1109 void ots::ARTDAQSupervisor::stop_runner_()
1110 {
1111  runner_running_ = false;
1112  if(runner_thread_ && runner_thread_->joinable())
1113  {
1114  runner_thread_->join();
1115  runner_thread_.reset(nullptr);
1116  }
1117 } // end stop_runner_()
1118 
1119 //==============================================================================
1120 void ots::ARTDAQSupervisor::start_runner_()
1121 {
1122  stop_runner_();
1123  runner_thread_ = std::make_unique<std::thread>(&ots::ARTDAQSupervisor::daqinterfaceRunner_, this);
1124 } // end start_runner_()