artdaq_mpich_plugin  v1_00_08
builder.cc
1 #define TRACE_NAME "builder"
2 
3 #include "art/Framework/Art/artapp.h"
4 #include "artdaq-core/Generators/FragmentGenerator.hh"
5 #include "artdaq-core/Data/Fragment.hh"
6 #include "artdaq-core/Generators/makeFragmentGenerator.hh"
7 #include "MPIProg.hh"
8 #include "artdaq/DAQrate/DataSenderManager.hh"
9 #include "artdaq/DAQrate/DataReceiverManager.hh"
10 #include "artdaq-core/Core/SimpleMemoryReader.hh"
12 
13 #include <boost/program_options.hpp>
14 #include "fhiclcpp/make_ParameterSet.h"
15 namespace bpo = boost::program_options;
16 
17 #include <algorithm>
18 #include <cmath>
19 #include <cstdlib>
20 
21 extern "C"
22 {
23 #include <unistd.h>
24 }
25 
26 #include <iostream>
27 #include <memory>
28 #include <utility>
29 
30 extern "C"
31 {
32 #include <sys/time.h>
33 #include <sys/resource.h>
34 }
35 
39 class Builder : public MPIProg
40 {
41 public:
49  Builder(int argc, char* argv[], fhicl::ParameterSet pset, int key);
50 
54  void go();
55 
59  void sink();
60 
64  void detector();
65 
66 private:
67  enum class Role : int
68  {
69  DETECTOR,
70  SINK
71  };
72 
73  void printHost(const std::string& functionName) const;
74 
75  fhicl::ParameterSet daq_pset_;
76  bool const want_sink_;
77  bool const want_periodic_sync_;
78  MPI_Comm local_group_comm_;
79  Role builder_role_;
80 };
81 
82 Builder::Builder(int argc, char* argv[], fhicl::ParameterSet pset, int key) :
83  MPIProg(argc, argv)
84  , daq_pset_(pset)
85  , want_sink_(daq_pset_.get<bool>("want_sink", true))
86  , want_periodic_sync_(daq_pset_.get<bool>("want_periodic_sync", false))
87  , local_group_comm_()
88 {
89  std::vector<std::string> detectors;
90  daq_pset_.get_if_present("detectors", detectors);
91  if (static_cast<size_t>(my_rank) >= detectors.size())
92  {
93  builder_role_ = Role::SINK;
94  }
95  else
96  {
97  builder_role_ = Role::DETECTOR;
98  }
99  std::string type(pset.get<std::string>("transfer_plugin_type", "Shmem"));
100 
101  int senders = pset.get<int>("num_senders");
102  int receivers = pset.get<int>("num_receivers");
103  int buffer_count = pset.get<int>("buffer_count", 10);
104  int max_payload_size = pset.get<size_t>("fragment_size", 0x100000);
105 
106  std::string hostmap = "";
107  if (pset.has_key("hostmap"))
108  {
109  hostmap = " host_map: @local::hostmap";
110  }
111 
112  std::stringstream ss;
113  ss << pset.to_string();
114  ss << " sources: {";
115  for (int ii = 0; ii < senders; ++ii)
116  {
117  ss << "s" << ii << ": { transferPluginType: " << type << " source_rank: " << ii << " max_fragment_size_words: " << max_payload_size << " buffer_count: " << buffer_count << " shm_key_offset: " << std::to_string(key) << hostmap << "}";
118  }
119  ss << "} destinations: {";
120  for (int jj = senders; jj < senders + receivers; ++jj)
121  {
122  ss << "d" << jj << ": { transferPluginType: " << type << " destination_rank: " << jj << " max_fragment_size_words: " << max_payload_size << " buffer_count: " << buffer_count << " shm_key_offset: " << std::to_string(key) << hostmap << "}";
123  }
124  ss << "}";
125 
126  daq_pset_ = fhicl::ParameterSet();
127  make_ParameterSet(ss.str(), daq_pset_);
128 
129 
130 }
131 
133 {
134  //volatile bool loopForever = true;
135  //while(loopForever)
136  //{
137  // usleep(1000000);
138  //}
139 
140 
141  MPI_Barrier(MPI_COMM_WORLD);
142  //std::cout << "daq_pset_: " << daq_pset_.to_string() << std::endl << "conf_.makeParameterSet(): " << conf_.makeParameterSet().to_string() << std::endl;
143  MPI_Comm_split(MPI_COMM_WORLD, static_cast<int>(builder_role_), 0, &local_group_comm_);
144  switch (builder_role_)
145  {
146  case Role::SINK:
147  if (want_sink_)
148  {
149  sink();
150  }
151  else
152  {
153  std::string
154  msg("WARNING: a sink was instantiated despite want_sink being false:\n"
155  "set nsinks to 0 in invocation of daqrate?\n");
156  std::cerr << msg;
157  MPI_Barrier(MPI_COMM_WORLD);
158  }
159  break;
160  case Role::DETECTOR:
161  detector();
162  break;
163  default:
164  throw "No such node type";
165  }
166 }
167 
169 {
170  printHost("detector");
171  int detector_rank;
172  // Should be zero-based, detectors only.
173  MPI_Comm_rank(local_group_comm_, &detector_rank);
174  assert(!(detector_rank < 0));
175  std::ostringstream det_ps_name_loc;
176  std::vector<std::string> detectors;
177  bool detectors_present = daq_pset_.get_if_present("detectors", detectors);
178  size_t detectors_size = detectors.size();
179  if (!(detectors_present && detectors_size))
180  {
181  throw cet::exception("Configuration")
182  << "Unable to find required sequence of detector "
183  << "parameter set names, \"detectors\".";
184  }
185  fhicl::ParameterSet det_ps =
186  daq_pset_.get<fhicl::ParameterSet>(((detectors_size > static_cast<size_t>(detector_rank)) ? detectors[detector_rank] : detectors[0]));
187  std::unique_ptr<artdaq::FragmentGenerator> const
189  (det_ps.get<std::string>("generator"),
190  det_ps));
191  { // Block to handle lifetime of h, below.
192  artdaq::DataSenderManager h(daq_pset_);
193  MPI_Barrier(local_group_comm_);
194  // not using the run time method
195  // TimedLoop tl(conf_.run_time_);
196  size_t fragments_per_source = -1;
197  daq_pset_.get_if_present("fragments_per_source", fragments_per_source);
198  artdaq::FragmentPtrs frags;
199  size_t fragments_sent = 0;
200  while (fragments_sent < fragments_per_source && gen->getNext(frags))
201  {
202  if (!fragments_sent)
203  {
204  // Get the detectors lined up first time before we start the
205  // firehoses.
206  MPI_Barrier(local_group_comm_);
207  }
208  for (auto& fragPtr : frags)
209  {
210  std::cout << "Program::detector: Sending fragment " << fragments_sent + 1 << " of " << fragments_per_source << std::endl;
211  TLOG(TLVL_DEBUG) << "Program::detector: Sending fragment " << fragments_sent + 1 << " of " << fragments_per_source ;
212  auto sequence_id = fragPtr->sequenceID();
213  h.sendFragment(std::move(*fragPtr));
214 ;
215  if (h.GetSentSequenceIDCount(sequence_id) == gen->fragmentIDs().size())
216  {
217  h.RemoveRoutingTableEntry(sequence_id);
218  }
219 
220  if (++fragments_sent == fragments_per_source) { break; }
221  if (want_periodic_sync_ && (fragments_sent % 100) == 0)
222  {
223  // Don't get too far out of sync.
224  MPI_Barrier(local_group_comm_);
225  }
226  }
227  frags.clear();
228  }
229  TLOG(TLVL_DEBUG) << "detector waiting " << my_rank ;
230  }
231  TLOG(TLVL_DEBUG) << "detector done " << my_rank ;
232  MPI_Comm_free(&local_group_comm_);
233  MPI_Barrier(MPI_COMM_WORLD);
234 }
235 
237 {
238  printHost("sink");
239  {
240  usleep(1000 * my_rank);
241  // This scope exists to control the lifetime of 'events'
242  auto events = std::make_shared<artdaq::SharedMemoryEventManager>(daq_pset_, daq_pset_);
243  events->startRun(daq_pset_.get<int>("run_number", 100));
244  { // Block to handle scope of h, below.
245  artdaq::DataReceiverManager h(daq_pset_, events);
246  h.start_threads();
247  while (h.running_sources().size() > 0)
248  {
249  usleep(10000);
250  }
251  }
252 
253  TLOG(TLVL_DEBUG) << "All detectors are done, Sending endOfData Fragment" ;
254  // Make the reader application finish, and capture its return
255  // status.
256  bool endSucceeded = false;
257  endSucceeded = events->endOfData();
258  if (endSucceeded)
259  {
260  TLOG(TLVL_DEBUG) << "Sink: reader is done" ;
261  }
262  else
263  {
264  TLOG(TLVL_DEBUG) << "Sink: reader failed to complete because the "
265  << "endOfData marker could not be pushed onto the queue."
266  ;
267  }
268  } // end of lifetime of 'events'
269  TLOG(TLVL_DEBUG) << "Sink done " << my_rank ;
270  MPI_Barrier(MPI_COMM_WORLD);
271 }
272 
273 void Builder::printHost(const std::string& functionName) const
274 {
275  char* doPrint = getenv("PRINT_HOST");
276  if (doPrint == 0) { return; }
277  const int ARRSIZE = 80;
278  char hostname[ARRSIZE];
279  std::string hostString;
280  if (!gethostname(hostname, ARRSIZE))
281  {
282  hostString = hostname;
283  }
284  else
285  {
286  hostString = "unknown";
287  }
288  TLOG(TLVL_DEBUG) << "Running " << functionName
289  << " on host " << hostString
290  << " with rank " << my_rank << "."
291  ;
292 }
293 
294 void printUsage()
295 {
296  int myid = 0;
297  struct rusage usage;
298  getrusage(RUSAGE_SELF, &usage);
299  std::cout << myid << ":"
300  << " user=" << artdaq::TimeUtils::convertUnixTimeToSeconds(usage.ru_utime)
301  << " sys=" << artdaq::TimeUtils::convertUnixTimeToSeconds(usage.ru_stime)
302  << std::endl;
303 }
304 
305 int main(int argc, char* argv[])
306 {
308 
309  std::ostringstream descstr;
310  descstr << argv[0]
311  << " <-c <config-file>> <other-options> [<source-file>]+";
312  bpo::options_description desc(descstr.str());
313  desc.add_options()
314  ("config,c", bpo::value<std::string>(), "Configuration file.")
315  ("key,k", bpo::value<int>(), "Shared Memory Key")
316  ("help,h", "produce help message");
317  bpo::variables_map vm;
318  try {
319  bpo::store(bpo::command_line_parser(argc, argv).options(desc).run(), vm);
320  bpo::notify(vm);
321  }
322  catch (bpo::error const & e) {
323  std::cerr << "Exception from command line processing in " << argv[0]
324  << ": " << e.what() << "\n";
325  return -1;
326  }
327  if (vm.count("help")) {
328  std::cout << desc << std::endl;
329  return 1;
330  }
331  if (!vm.count("config")) {
332  std::cerr << "Exception from command line processing in " << argv[0]
333  << ": no configuration file given.\n"
334  << "For usage and an options list, please do '"
335  << argv[0] << " --help"
336  << "'.\n";
337  return 2;
338  }
339  int key = 0;
340  if (vm.count("key"))
341  {
342  key = vm["key"].as<int>();
343  }
344  fhicl::ParameterSet pset;
345  if (getenv("FHICL_FILE_PATH") == nullptr) {
346  std::cerr
347  << "INFO: environment variable FHICL_FILE_PATH was not set. Using \".\"\n";
348  setenv("FHICL_FILE_PATH", ".", 0);
349  }
350  cet::filepath_lookup_after1 lookup_policy("FHICL_FILE_PATH");
351  fhicl::make_ParameterSet(vm["config"].as<std::string>(), lookup_policy, pset);
352 
353  int rc = 1;
354  try
355  {
356  Builder p(argc, argv, pset,key);
357  std::cerr << "Started process " << my_rank << " of " << p.procs_ << ".\n";
358  p.go();
359  rc = 0;
360  }
361  catch (std::string& x)
362  {
363  std::cerr << "Exception (type string) caught in driver: "
364  << x
365  << '\n';
366  return 1;
367  }
368  catch (char const* m)
369  {
370  std::cerr << "Exception (type char const*) caught in driver: ";
371  if (m)
372  {
373  std::cerr << m;
374  }
375  else
376  {
377  std::cerr << "[the value was a null pointer, so no message is available]";
378  }
379  std::cerr << '\n';
380  }
381  return rc;
382 }
std::set< int > running_sources() const
void go()
Start the Builder application, using the type configuration to select which method to run...
Definition: builder.cc:132
A wrapper for a MPI program. Similar to MPISentry.
Definition: MPIProg.hh:10
virtual std::vector< Fragment::fragment_id_t > fragmentIDs()=0
void detector()
Generate data, and send it using DataSenderManager.
Definition: builder.cc:168
std::pair< int, TransferInterface::CopyStatus > sendFragment(Fragment &&frag)
double convertUnixTimeToSeconds(time_t inputUnixTime)
std::unique_ptr< FragmentGenerator > makeFragmentGenerator(std::string const &generator_plugin_spec, fhicl::ParameterSet const &ps)
void configureMessageFacility(char const *progname, bool useConsole=true, bool printDebug=false)
The Builder class runs the builder test.
Definition: builder.cc:39
std::list< FragmentPtr > FragmentPtrs
void sink()
Receive data from source via DataReceiverManager, send it to the EventStore (and art, if configured)
Definition: builder.cc:236
Builder(int argc, char *argv[], fhicl::ParameterSet pset, int key)
Builder Constructor.
Definition: builder.cc:82