artdaq_mpich_plugin  v1_00_08
routing_master_t.cc
1 #define TRACE_NAME "routing_master_t"
2 #include "MPIProg.hh"
3 #include "artdaq/DAQrate/detail/RoutingPacket.hh"
6 #include "cetlib/filepath_maker.h"
7 #include "fhiclcpp/ParameterSet.h"
8 #include "fhiclcpp/make_ParameterSet.h"
9 
10 #include <boost/program_options.hpp>
11 #include "artdaq/Application/RoutingMasterCore.hh"
12 #include "artdaq/Application/RoutingMasterApp.hh"
13 #include <netdb.h>
14 namespace bpo = boost::program_options;
15 
16 #include <algorithm>
17 #include <cmath>
18 #include <cstdio>
19 
20 extern "C"
21 {
22 #include <unistd.h>
23 }
24 
25 #include <iostream>
26 #include <memory>
27 #include <utility>
28 #include <arpa/inet.h>
29 #include <netinet/in.h>
30 #include <sys/types.h>
31 #include <sys/socket.h>
32 
33 extern "C"
34 {
35 #include <sys/time.h>
36 #include <sys/resource.h>
37 }
38 
42 class RoutingMasterTest : public MPIProg
43 {
44 public:
59  RoutingMasterTest(int argc, char* argv[]);
60 
64  void go();
65 
69  void generate_tokens();
70 
74  void routing_master();
75 
79  void table_receiver();
80 
87  fhicl::ParameterSet getPset(int argc, char* argv[]) const;
88 
89 private:
90  enum class TestRole_t : int
91  {
92  TOKEN_GEN = 0,
93  ROUTING_MASTER = 1,
94  TABLE_RECEIVER = 2
95  };
96 
97  void printHost(const std::string& functionName) const;
98 
99  fhicl::ParameterSet const pset_;
100  fhicl::ParameterSet const daq_pset_;
101  TestRole_t role_;
102 
103  std::string routing_master_address_;
104  std::string multicast_address_;
105  int token_port_;
106  int table_port_;
107  int ack_port_;
108  std::vector<int> eb_ranks_;
109  int token_count_;
110  size_t token_interval_us_;
111  size_t run_number_;
112 };
113 
114 RoutingMasterTest::RoutingMasterTest(int argc, char* argv[]) :
115  MPIProg(argc, argv)
116  , pset_(getPset(argc, argv))
117  , daq_pset_(pset_.get<fhicl::ParameterSet>("daq"))
118  , routing_master_address_(daq_pset_.get<std::string>("routing_master_hostname", "localhost"))
119  , multicast_address_(daq_pset_.get<std::string>("table_update_address", "227.128.12.28"))
120  , token_port_(daq_pset_.get<int>("routing_token_port", 35555))
121  , table_port_(daq_pset_.get<int>("table_update_port", 35556))
122  , ack_port_(daq_pset_.get<int>("table_acknowledge_port", 35557))
123  , token_count_(pset_.get<int>("token_count", 1000))
124  , token_interval_us_(pset_.get<size_t>("token_interval_us", 5000))
125  , run_number_(pset_.get<size_t>("run_number"))
126 {
127  assert(!(my_rank < 0));
128  switch (my_rank)
129  {
130  case 0:
131  role_ = TestRole_t::TOKEN_GEN;
132  break;
133  case 1:
134  role_ = TestRole_t::ROUTING_MASTER;
135  break;
136  default:
137  role_ = TestRole_t::TABLE_RECEIVER;
138  break;
139  }
140  auto policy_pset = daq_pset_.get<fhicl::ParameterSet>("policy");
141  eb_ranks_ = policy_pset.get<std::vector<int>>("receiver_ranks");
142 }
143 
144 fhicl::ParameterSet RoutingMasterTest::getPset(int argc, char* argv[]) const
145 {
146  std::ostringstream descstr;
147  descstr << "-- <-c <config-file>>";
148  bpo::options_description desc(descstr.str());
149  desc.add_options()
150  ("config,c", bpo::value<std::string>(), "Configuration file.");
151  bpo::variables_map vm;
152  try
153  {
154  bpo::store(bpo::command_line_parser(argc, argv).
155  options(desc).allow_unregistered().run(), vm);
156  bpo::notify(vm);
157  }
158  catch (bpo::error const& e)
159  {
160  std::cerr << "Exception from command line processing in Config::getArtPset: " << e.what() << "\n";
161  throw "cmdline parsing error.";
162  }
163  if (!vm.count("config"))
164  {
165  std::cerr << "Expected \"-- -c <config-file>\" fhicl file specification.\n";
166  throw "cmdline parsing error.";
167  }
168  fhicl::ParameterSet pset;
169  cet::filepath_lookup lookup_policy("FHICL_FILE_PATH");
170  fhicl::make_ParameterSet(vm["config"].as<std::string>(), lookup_policy, pset);
171 
172  return pset;
173 }
174 
176 {
177  TLOG(TLVL_INFO) << "Entering MPI_Barrier";
178  MPI_Barrier(MPI_COMM_WORLD);
179  TLOG(TLVL_INFO) << "Done with Barrier";
180  //std::cout << "daq_pset_: " << daq_pset_.to_string() << std::endl << "conf_.makeParameterSet(): " << conf_.makeParameterSet().to_string() << std::endl;
181 
182  switch (role_)
183  {
184  case TestRole_t::TABLE_RECEIVER:
185  table_receiver();
186  break;
187  case TestRole_t::ROUTING_MASTER:
188  routing_master();
189  break;
190  case TestRole_t::TOKEN_GEN:
191  generate_tokens();
192  break;
193  default:
194  throw "No such node type";
195  }
196  TLOG(TLVL_INFO) << "Rank " << my_rank << " complete." ;
197 }
198 
200 {
201  TLOG(TLVL_INFO) << "generate_tokens(): Init" ;
202  printHost("generate_tokens");
203  sleep(1);
204 
205  int token_socket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
206  if (token_socket < 0)
207  {
208  TLOG(TLVL_ERROR) << "generate_tokens(): I failed to create the socket for sending Routing Tokens!" ;
209  exit(1);
210  }
211  struct sockaddr_in token_addr;
212  auto sts = ResolveHost(routing_master_address_.c_str(), token_port_, token_addr);
213  if(sts == -1)
214  {
215  TLOG(TLVL_ERROR) << "generate_tokens(): Could not resolve host name" ;
216  }
217 
218  connect(token_socket, (struct sockaddr*)&token_addr, sizeof(token_addr));
219 
220  int sent_tokens = 0;
221  std::map<int, int> token_counter;
222  for(auto rank : eb_ranks_)
223  {
224  token_counter[rank] = 0;
225  }
226  while (sent_tokens < token_count_) {
227  int this_rank = eb_ranks_[seedAndRandom() % eb_ranks_.size()];
228  token_counter[this_rank]++;
230  token.header = TOKEN_MAGIC;
231  token.rank = this_rank;
232  token.new_slots_free = 1;
233  token.run_number = run_number_;
234 
235  TLOG(TLVL_INFO) << "generate_tokens(): Sending RoutingToken " << ++sent_tokens << " for rank " << this_rank << " to " << routing_master_address_ ;
236  send(token_socket, &token, sizeof(artdaq::detail::RoutingToken), 0);
237  usleep(token_interval_us_);
238  }
239  auto max_rank = 0;
240  for(auto rank : token_counter)
241  {
242  if (rank.second > max_rank) max_rank = rank.second;
243  }
244  for(auto rank : token_counter)
245  {
247  token.header = TOKEN_MAGIC;
248  token.rank = rank.first;
249  token.new_slots_free = max_rank - rank.second;
250  token.run_number = run_number_;
251 
252  TLOG(TLVL_INFO) << "generate_tokens(): Sending RoutingToken " << ++sent_tokens << " for rank " << rank.first << " to " << routing_master_address_ ;
253  send(token_socket, &token, sizeof(artdaq::detail::RoutingToken), 0);
254  usleep(token_interval_us_);
255 
256  }
257 
258  TLOG(TLVL_INFO) << "generate_tokens(): Waiting at MPI_Barrier" ;
259  MPI_Barrier(MPI_COMM_WORLD);
260  TLOG(TLVL_INFO) << "generate_tokens(): Done with MPI_Barrier" ;
261 }
262 
264 {
265  TLOG(TLVL_INFO) << "table_receiver(): Init" ;
266  printHost("table_receiver");
267 
268 
269  auto table_socket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
270  if (table_socket < 0)
271  {
272  TLOG(TLVL_ERROR) << "table_receiver(): Error creating socket for receiving data requests!" ;
273  exit(1);
274  }
275 
276  struct sockaddr_in si_me_request;
277 
278  int yes = 1;
279  if (setsockopt(table_socket, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)) < 0)
280  {
281  TLOG(TLVL_ERROR) << "table_receiver(): Unable to enable port reuse on request socket" ;
282  exit(1);
283  }
284  memset(&si_me_request, 0, sizeof(si_me_request));
285  si_me_request.sin_family = AF_INET;
286  si_me_request.sin_port = htons(table_port_);
287  si_me_request.sin_addr.s_addr = htonl(INADDR_ANY);
288  if (bind(table_socket, (struct sockaddr *)&si_me_request, sizeof(si_me_request)) == -1)
289  {
290  TLOG(TLVL_ERROR) << "table_receiver(): Cannot bind request socket to port " << table_port_ ;
291  exit(1);
292  }
293 
294  struct ip_mreq mreq;
295  long int sts = ResolveHost(multicast_address_.c_str(), mreq.imr_multiaddr);
296  if(sts == -1)
297  {
298  TLOG(TLVL_ERROR) << "table_receiver(): Unable to resolve multicast hostname" ;
299  exit(1);
300  }
301  mreq.imr_interface.s_addr = htonl(INADDR_ANY);
302  if (setsockopt(table_socket, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)) < 0)
303  {
304  TLOG(TLVL_ERROR) << "table_receiver(): Unable to join multicast group" ;
305  exit(1);
306  }
307 
308  struct epoll_event ev;
309  int table_epoll_fd = epoll_create1(0);
310  ev.events = EPOLLIN | EPOLLPRI;
311  ev.data.fd = table_socket;
312  if (epoll_ctl(table_epoll_fd, EPOLL_CTL_ADD, table_socket, &ev) == -1)
313  {
314  TLOG(TLVL_ERROR) << "table_receiver(): Could not register listen socket to epoll fd" ;
315  exit(3);
316  }
317 
318  auto ack_socket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
319  struct sockaddr_in ack_addr;
320  sts = ResolveHost(routing_master_address_.c_str(), ack_port_, ack_addr);
321  if(sts == -1)
322  {
323  TLOG(TLVL_ERROR) << "table_receiver(): Unable to resolve routing master hostname" ;
324  exit(1);
325  }
326 
327  if (table_socket == -1 || table_epoll_fd == -1 || ack_socket == -1)
328  {
329  TLOG(TLVL_INFO) << "table_receiver(): One of the listen sockets was not opened successfully." ;
330  exit(4);
331  }
332  artdaq::Fragment::sequence_id_t max_sequence_id = token_count_;
333  artdaq::Fragment::sequence_id_t current_sequence_id = 0;
334  std::map<artdaq::Fragment::sequence_id_t, int> routing_table;
335  TLOG(TLVL_INFO) << "table_receiver(): Expecting " << max_sequence_id << " as the last Sequence ID in this run" ;
336  while (current_sequence_id < max_sequence_id)
337  {
338  std::vector<epoll_event> table_events_(4);
339  TLOG(TLVL_INFO) << "table_receiver(): Waiting for event on table socket" ;
340  auto nfds = epoll_wait(table_epoll_fd, &table_events_[0], table_events_.size(), -1);
341  if (nfds == -1) {
342  perror("epoll_wait");
343  exit(EXIT_FAILURE);
344  }
345 
346  TLOG(TLVL_INFO) << "table_receiver(): Received " << nfds << " table update(s)" ;
347  for (auto n = 0; n < nfds; ++n) {
350 
351  std::vector<uint8_t> buf(MAX_ROUTING_TABLE_SIZE);
353  auto stss = recv(table_events_[n].data.fd, &buf[0], MAX_ROUTING_TABLE_SIZE, 0);
354 
355 
356  if (stss > static_cast<ssize_t>(sizeof(hdr)))
357  {
358  memcpy(&hdr, &buf[0], sizeof(hdr));
359  }
360  else
361  {
362  TLOG(TLVL_TRACE) << __func__ << ": Incorrect size received. Discarding.";
363  continue;
364  }
365 
366  TLOG(TLVL_INFO) << "table_receiver(): Checking for valid header" ;
367  if (hdr.header == ROUTING_MAGIC) {
368  artdaq::detail::RoutingPacket buffer(hdr.nEntries);
369  TLOG(TLVL_INFO) << "table_receiver(): Receiving data buffer" ;
370  memcpy(&buffer[0], &buf[sizeof(artdaq::detail::RoutingPacketHeader)], sizeof(artdaq::detail::RoutingPacketEntry) * hdr.nEntries);
371 
372  first = buffer[0].sequence_id;
373  last = buffer[buffer.size() - 1].sequence_id;
374 
375  for (auto entry : buffer)
376  {
377  if (routing_table.count(entry.sequence_id))
378  {
379  assert(routing_table[entry.sequence_id] == entry.destination_rank);
380  continue;
381  }
382  routing_table[entry.sequence_id] = entry.destination_rank;
383  TLOG(TLVL_INFO) << "table_receiver(): table_receiver " << my_rank << ": received update: SeqID " << entry.sequence_id << " -> Rank " << entry.destination_rank ;
384  }
385 
387  ack.rank = my_rank;
388  ack.first_sequence_id = first;
389  ack.last_sequence_id = last;
390 
391  TLOG(TLVL_INFO) << "table_receiver(): Sending RoutingAckPacket with first= " << first << " and last= " << last << " to " << routing_master_address_ << ", port " << ack_port_ ;
392  sendto(ack_socket, &ack, sizeof(artdaq::detail::RoutingAckPacket), 0, (struct sockaddr *)&ack_addr, sizeof(ack_addr));
393  current_sequence_id = last;
394  }
395  }
396  }
397 
398  TLOG(TLVL_INFO) << "table_receiver(): Waiting at MPI_Barrier" ;
399  MPI_Barrier(MPI_COMM_WORLD);
400  TLOG(TLVL_INFO) << "table_receiver(): Done with MPI_Barrier" ;
401 }
402 
404 {
405  TLOG(TLVL_INFO) << "routing_master: Init" ;
406  printHost("routing_master");
407 
408  app_name = "RoutingMaster";
409 
410  auto app = std::make_unique<artdaq::RoutingMasterApp>();
411 
412  auto sts = app->initialize(pset_, 0, 0);
413  if (!sts) {
414  TLOG(TLVL_ERROR) << "routing_master: Failed to initalize!";
415  }
416  app->do_start(art::RunID(run_number_), 0, 0);
417  TLOG(TLVL_INFO) << "routing_master: Waiting at MPI_Barrier" ;
418  MPI_Barrier(MPI_COMM_WORLD);
419  TLOG(TLVL_INFO) << "routing_master: Done with MPI_Barrier, calling RoutingMasterCore::stop" ;
420  app->do_stop(0, 0);
421  TLOG(TLVL_INFO) << "routing_master: Done with RoutingMasterCore::stop, calling shutdown" ;
422  app->do_shutdown(0);
423  TLOG(TLVL_INFO) << "routing_master: Done with RoutingMasterCore::shutdown" ;
424 }
425 
426 void RoutingMasterTest::printHost(const std::string& functionName) const
427 {
428  char* doPrint = getenv("PRINT_HOST");
429  if (doPrint == 0) { return; }
430  const int ARRSIZE = 80;
431  char hostname[ARRSIZE];
432  std::string hostString;
433  if (!gethostname(hostname, ARRSIZE))
434  {
435  hostString = hostname;
436  }
437  else
438  {
439  hostString = "unknown";
440  }
441  TLOG(TLVL_INFO) << "Running " << functionName
442  << " on host " << hostString
443  << " with rank " << my_rank << "."
444  ;
445 }
446 
447 void printUsage()
448 {
449  int myid = 0;
450  struct rusage usage;
451  getrusage(RUSAGE_SELF, &usage);
452  std::cout << myid << ":"
453  << " user=" << artdaq::TimeUtils::convertUnixTimeToSeconds(usage.ru_utime)
454  << " sys=" << artdaq::TimeUtils::convertUnixTimeToSeconds(usage.ru_stime)
455  << std::endl;
456 }
457 
458 int main(int argc, char* argv[])
459 {
460  artdaq::configureMessageFacility("routing_master", true);
461  int rc = 1;
462 
463 #if 0
464  std::cerr << "PID: " << getpid() << std::endl;
465  volatile bool attach = true;
466  while (attach)
467  {
468  usleep(100000);
469  }
470 #endif
471 
472  try
473  {
474  RoutingMasterTest p(argc, argv);
475  std::cerr << "Started process " << my_rank << " of " << p.procs_ << ".\n";
476  p.go();
477  rc = 0;
478  }
479  catch (std::string& x)
480  {
481  std::cerr << "Exception (type string) caught in routing_master: "
482  << x
483  << '\n';
484  return 1;
485  }
486  catch (char const* m)
487  {
488  std::cerr << "Exception (type char const*) caught in routing_master: ";
489  if (m)
490  {
491  std::cerr << m;
492  }
493  else
494  {
495  std::cerr << "[the value was a null pointer, so no message is available]";
496  }
497  std::cerr << '\n';
498  }
499  return rc;
500 }
The RoutingMasterTest class runs the routing_master test.
int ResolveHost(char const *host_in, in_addr &addr)
void routing_master()
Load a RoutingMasterCore instance, receive tokens from the token generators, and send table updates t...
A wrapper for a MPI program. Similar to MPISentry.
Definition: MPIProg.hh:10
void go()
Start the test, using the role assigned.
static constexpr sequence_id_t InvalidSequenceID
RoutingMasterTest(int argc, char *argv[])
RoutingMasterTest Constructor.
void table_receiver()
Receive Routing Tables from the Routing Master and send acknowledgement packets back.
Fragment::sequence_id_t first_sequence_id
double convertUnixTimeToSeconds(time_t inputUnixTime)
fhicl::ParameterSet getPset(int argc, char *argv[]) const
Parse the command line arguments and load a configuration FHiCL file.
Fragment::sequence_id_t last_sequence_id
void configureMessageFacility(char const *progname, bool useConsole=true, bool printDebug=false)
void generate_tokens()
Generate tokens and send them to the Routing Master.
detail::RawFragmentHeader::sequence_id_t sequence_id_t