1 #define TRACE_NAME "routing_master_t"
3 #include "artdaq/DAQrate/detail/RoutingPacket.hh"
6 #include "cetlib/filepath_maker.h"
7 #include "fhiclcpp/ParameterSet.h"
8 #include "fhiclcpp/make_ParameterSet.h"
10 #include <boost/program_options.hpp>
11 #include "artdaq/Application/RoutingMasterCore.hh"
12 #include "artdaq/Application/RoutingMasterApp.hh"
14 namespace bpo = boost::program_options;
28 #include <arpa/inet.h>
29 #include <netinet/in.h>
30 #include <sys/types.h>
31 #include <sys/socket.h>
36 #include <sys/resource.h>
87 fhicl::ParameterSet
getPset(
int argc,
char* argv[])
const;
90 enum class TestRole_t : int
97 void printHost(
const std::string& functionName)
const;
99 fhicl::ParameterSet
const pset_;
100 fhicl::ParameterSet
const daq_pset_;
103 std::string routing_master_address_;
104 std::string multicast_address_;
108 std::vector<int> eb_ranks_;
110 size_t token_interval_us_;
116 , pset_(getPset(argc, argv))
117 , daq_pset_(pset_.get<fhicl::ParameterSet>(
"daq"))
118 , routing_master_address_(daq_pset_.get<std::string>(
"routing_master_hostname",
"localhost"))
119 , multicast_address_(daq_pset_.get<std::string>(
"table_update_address",
"227.128.12.28"))
120 , token_port_(daq_pset_.get<int>(
"routing_token_port", 35555))
121 , table_port_(daq_pset_.get<int>(
"table_update_port", 35556))
122 , ack_port_(daq_pset_.get<int>(
"table_acknowledge_port", 35557))
123 , token_count_(pset_.get<int>(
"token_count", 1000))
124 , token_interval_us_(pset_.get<size_t>(
"token_interval_us", 5000))
125 , run_number_(pset_.get<size_t>(
"run_number"))
127 assert(!(my_rank < 0));
131 role_ = TestRole_t::TOKEN_GEN;
134 role_ = TestRole_t::ROUTING_MASTER;
137 role_ = TestRole_t::TABLE_RECEIVER;
140 auto policy_pset = daq_pset_.get<fhicl::ParameterSet>(
"policy");
141 eb_ranks_ = policy_pset.get<std::vector<int>>(
"receiver_ranks");
146 std::ostringstream descstr;
147 descstr <<
"-- <-c <config-file>>";
148 bpo::options_description desc(descstr.str());
150 (
"config,c", bpo::value<std::string>(),
"Configuration file.");
151 bpo::variables_map vm;
154 bpo::store(bpo::command_line_parser(argc, argv).
155 options(desc).allow_unregistered().run(), vm);
158 catch (bpo::error
const& e)
160 std::cerr <<
"Exception from command line processing in Config::getArtPset: " << e.what() <<
"\n";
161 throw "cmdline parsing error.";
163 if (!vm.count(
"config"))
165 std::cerr <<
"Expected \"-- -c <config-file>\" fhicl file specification.\n";
166 throw "cmdline parsing error.";
168 fhicl::ParameterSet pset;
169 cet::filepath_lookup lookup_policy(
"FHICL_FILE_PATH");
170 fhicl::make_ParameterSet(vm[
"config"].as<std::string>(), lookup_policy, pset);
177 TLOG(TLVL_INFO) <<
"Entering MPI_Barrier";
178 MPI_Barrier(MPI_COMM_WORLD);
179 TLOG(TLVL_INFO) <<
"Done with Barrier";
184 case TestRole_t::TABLE_RECEIVER:
187 case TestRole_t::ROUTING_MASTER:
190 case TestRole_t::TOKEN_GEN:
194 throw "No such node type";
196 TLOG(TLVL_INFO) <<
"Rank " << my_rank <<
" complete." ;
201 TLOG(TLVL_INFO) <<
"generate_tokens(): Init" ;
202 printHost(
"generate_tokens");
205 int token_socket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
206 if (token_socket < 0)
208 TLOG(TLVL_ERROR) <<
"generate_tokens(): I failed to create the socket for sending Routing Tokens!" ;
211 struct sockaddr_in token_addr;
212 auto sts =
ResolveHost(routing_master_address_.c_str(), token_port_, token_addr);
215 TLOG(TLVL_ERROR) <<
"generate_tokens(): Could not resolve host name" ;
218 connect(token_socket, (
struct sockaddr*)&token_addr,
sizeof(token_addr));
221 std::map<int, int> token_counter;
222 for(
auto rank : eb_ranks_)
224 token_counter[rank] = 0;
226 while (sent_tokens < token_count_) {
227 int this_rank = eb_ranks_[seedAndRandom() % eb_ranks_.size()];
228 token_counter[this_rank]++;
230 token.
header = TOKEN_MAGIC;
231 token.
rank = this_rank;
235 TLOG(TLVL_INFO) <<
"generate_tokens(): Sending RoutingToken " << ++sent_tokens <<
" for rank " << this_rank <<
" to " << routing_master_address_ ;
237 usleep(token_interval_us_);
240 for(
auto rank : token_counter)
242 if (rank.second > max_rank) max_rank = rank.second;
244 for(
auto rank : token_counter)
247 token.
header = TOKEN_MAGIC;
248 token.
rank = rank.first;
252 TLOG(TLVL_INFO) <<
"generate_tokens(): Sending RoutingToken " << ++sent_tokens <<
" for rank " << rank.first <<
" to " << routing_master_address_ ;
254 usleep(token_interval_us_);
258 TLOG(TLVL_INFO) <<
"generate_tokens(): Waiting at MPI_Barrier" ;
259 MPI_Barrier(MPI_COMM_WORLD);
260 TLOG(TLVL_INFO) <<
"generate_tokens(): Done with MPI_Barrier" ;
265 TLOG(TLVL_INFO) <<
"table_receiver(): Init" ;
266 printHost(
"table_receiver");
269 auto table_socket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
270 if (table_socket < 0)
272 TLOG(TLVL_ERROR) <<
"table_receiver(): Error creating socket for receiving data requests!" ;
276 struct sockaddr_in si_me_request;
279 if (setsockopt(table_socket, SOL_SOCKET, SO_REUSEADDR, &yes,
sizeof(yes)) < 0)
281 TLOG(TLVL_ERROR) <<
"table_receiver(): Unable to enable port reuse on request socket" ;
284 memset(&si_me_request, 0,
sizeof(si_me_request));
285 si_me_request.sin_family = AF_INET;
286 si_me_request.sin_port = htons(table_port_);
287 si_me_request.sin_addr.s_addr = htonl(INADDR_ANY);
288 if (bind(table_socket, (
struct sockaddr *)&si_me_request,
sizeof(si_me_request)) == -1)
290 TLOG(TLVL_ERROR) <<
"table_receiver(): Cannot bind request socket to port " << table_port_ ;
295 long int sts =
ResolveHost(multicast_address_.c_str(), mreq.imr_multiaddr);
298 TLOG(TLVL_ERROR) <<
"table_receiver(): Unable to resolve multicast hostname" ;
301 mreq.imr_interface.s_addr = htonl(INADDR_ANY);
302 if (setsockopt(table_socket, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq,
sizeof(mreq)) < 0)
304 TLOG(TLVL_ERROR) <<
"table_receiver(): Unable to join multicast group" ;
308 struct epoll_event ev;
309 int table_epoll_fd = epoll_create1(0);
310 ev.events = EPOLLIN | EPOLLPRI;
311 ev.data.fd = table_socket;
312 if (epoll_ctl(table_epoll_fd, EPOLL_CTL_ADD, table_socket, &ev) == -1)
314 TLOG(TLVL_ERROR) <<
"table_receiver(): Could not register listen socket to epoll fd" ;
318 auto ack_socket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
319 struct sockaddr_in ack_addr;
320 sts =
ResolveHost(routing_master_address_.c_str(), ack_port_, ack_addr);
323 TLOG(TLVL_ERROR) <<
"table_receiver(): Unable to resolve routing master hostname" ;
327 if (table_socket == -1 || table_epoll_fd == -1 || ack_socket == -1)
329 TLOG(TLVL_INFO) <<
"table_receiver(): One of the listen sockets was not opened successfully." ;
334 std::map<artdaq::Fragment::sequence_id_t, int> routing_table;
335 TLOG(TLVL_INFO) <<
"table_receiver(): Expecting " << max_sequence_id <<
" as the last Sequence ID in this run" ;
336 while (current_sequence_id < max_sequence_id)
338 std::vector<epoll_event> table_events_(4);
339 TLOG(TLVL_INFO) <<
"table_receiver(): Waiting for event on table socket" ;
340 auto nfds = epoll_wait(table_epoll_fd, &table_events_[0], table_events_.size(), -1);
342 perror(
"epoll_wait");
346 TLOG(TLVL_INFO) <<
"table_receiver(): Received " << nfds <<
" table update(s)" ;
347 for (
auto n = 0; n < nfds; ++n) {
351 std::vector<uint8_t> buf(MAX_ROUTING_TABLE_SIZE);
353 auto stss = recv(table_events_[n].data.fd, &buf[0], MAX_ROUTING_TABLE_SIZE, 0);
356 if (stss > static_cast<ssize_t>(
sizeof(hdr)))
358 memcpy(&hdr, &buf[0],
sizeof(hdr));
362 TLOG(TLVL_TRACE) << __func__ <<
": Incorrect size received. Discarding.";
366 TLOG(TLVL_INFO) <<
"table_receiver(): Checking for valid header" ;
367 if (hdr.
header == ROUTING_MAGIC) {
368 artdaq::detail::RoutingPacket buffer(hdr.
nEntries);
369 TLOG(TLVL_INFO) <<
"table_receiver(): Receiving data buffer" ;
372 first = buffer[0].sequence_id;
373 last = buffer[buffer.size() - 1].sequence_id;
375 for (
auto entry : buffer)
377 if (routing_table.count(entry.sequence_id))
379 assert(routing_table[entry.sequence_id] == entry.destination_rank);
382 routing_table[entry.sequence_id] = entry.destination_rank;
383 TLOG(TLVL_INFO) <<
"table_receiver(): table_receiver " << my_rank <<
": received update: SeqID " << entry.sequence_id <<
" -> Rank " << entry.destination_rank ;
391 TLOG(TLVL_INFO) <<
"table_receiver(): Sending RoutingAckPacket with first= " << first <<
" and last= " << last <<
" to " << routing_master_address_ <<
", port " << ack_port_ ;
393 current_sequence_id = last;
398 TLOG(TLVL_INFO) <<
"table_receiver(): Waiting at MPI_Barrier" ;
399 MPI_Barrier(MPI_COMM_WORLD);
400 TLOG(TLVL_INFO) <<
"table_receiver(): Done with MPI_Barrier" ;
405 TLOG(TLVL_INFO) <<
"routing_master: Init" ;
406 printHost(
"routing_master");
408 app_name =
"RoutingMaster";
410 auto app = std::make_unique<artdaq::RoutingMasterApp>();
412 auto sts = app->initialize(pset_, 0, 0);
414 TLOG(TLVL_ERROR) <<
"routing_master: Failed to initalize!";
416 app->do_start(art::RunID(run_number_), 0, 0);
417 TLOG(TLVL_INFO) <<
"routing_master: Waiting at MPI_Barrier" ;
418 MPI_Barrier(MPI_COMM_WORLD);
419 TLOG(TLVL_INFO) <<
"routing_master: Done with MPI_Barrier, calling RoutingMasterCore::stop" ;
421 TLOG(TLVL_INFO) <<
"routing_master: Done with RoutingMasterCore::stop, calling shutdown" ;
423 TLOG(TLVL_INFO) <<
"routing_master: Done with RoutingMasterCore::shutdown" ;
426 void RoutingMasterTest::printHost(
const std::string& functionName)
const
428 char* doPrint = getenv(
"PRINT_HOST");
429 if (doPrint == 0) {
return; }
430 const int ARRSIZE = 80;
431 char hostname[ARRSIZE];
432 std::string hostString;
433 if (!gethostname(hostname, ARRSIZE))
435 hostString = hostname;
439 hostString =
"unknown";
441 TLOG(TLVL_INFO) <<
"Running " << functionName
442 <<
" on host " << hostString
443 <<
" with rank " << my_rank <<
"."
451 getrusage(RUSAGE_SELF, &usage);
452 std::cout << myid <<
":"
458 int main(
int argc,
char* argv[])
464 std::cerr <<
"PID: " << getpid() << std::endl;
465 volatile bool attach =
true;
475 std::cerr <<
"Started process " << my_rank <<
" of " << p.procs_ <<
".\n";
479 catch (std::string& x)
481 std::cerr <<
"Exception (type string) caught in routing_master: "
486 catch (
char const* m)
488 std::cerr <<
"Exception (type char const*) caught in routing_master: ";
495 std::cerr <<
"[the value was a null pointer, so no message is available]";
The RoutingMasterTest class runs the routing_master test.
int ResolveHost(char const *host_in, in_addr &addr)
void routing_master()
Load a RoutingMasterCore instance, receive tokens from the token generators, and send table updates t...
A wrapper for a MPI program. Similar to MPISentry.
void go()
Start the test, using the role assigned.
static constexpr sequence_id_t InvalidSequenceID
RoutingMasterTest(int argc, char *argv[])
RoutingMasterTest Constructor.
void table_receiver()
Receive Routing Tables from the Routing Master and send acknowledgement packets back.
Fragment::sequence_id_t first_sequence_id
double convertUnixTimeToSeconds(time_t inputUnixTime)
fhicl::ParameterSet getPset(int argc, char *argv[]) const
Parse the command line arguments and load a configuration FHiCL file.
Fragment::sequence_id_t last_sequence_id
void configureMessageFacility(char const *progname, bool useConsole=true, bool printDebug=false)
void generate_tokens()
Generate tokens and send them to the Routing Master.
detail::RawFragmentHeader::sequence_id_t sequence_id_t