1 #define TRACE_NAME "builder"
3 #include "art/Framework/Art/artapp.h"
4 #include "artdaq-core/Generators/FragmentGenerator.hh"
5 #include "artdaq-core/Data/Fragment.hh"
6 #include "artdaq-core/Generators/makeFragmentGenerator.hh"
8 #include "artdaq/DAQrate/DataSenderManager.hh"
9 #include "artdaq/DAQrate/DataReceiverManager.hh"
10 #include "artdaq-core/Core/SimpleMemoryReader.hh"
13 #include <boost/program_options.hpp>
14 #include "fhiclcpp/make_ParameterSet.h"
15 namespace bpo = boost::program_options;
33 #include <sys/resource.h>
49 Builder(
int argc,
char* argv[], fhicl::ParameterSet pset,
int key);
73 void printHost(
const std::string& functionName)
const;
75 fhicl::ParameterSet daq_pset_;
76 bool const want_sink_;
77 bool const want_periodic_sync_;
78 MPI_Comm local_group_comm_;
85 , want_sink_(daq_pset_.get<bool>(
"want_sink", true))
86 , want_periodic_sync_(daq_pset_.get<bool>(
"want_periodic_sync", false))
89 std::vector<std::string> detectors;
90 daq_pset_.get_if_present(
"detectors", detectors);
91 if (static_cast<size_t>(my_rank) >= detectors.size())
93 builder_role_ = Role::SINK;
97 builder_role_ = Role::DETECTOR;
99 std::string type(pset.get<std::string>(
"transfer_plugin_type",
"Shmem"));
101 int senders = pset.get<
int>(
"num_senders");
102 int receivers = pset.get<
int>(
"num_receivers");
103 int buffer_count = pset.get<
int>(
"buffer_count", 10);
104 int max_payload_size = pset.get<
size_t>(
"fragment_size", 0x100000);
106 std::string hostmap =
"";
107 if (pset.has_key(
"hostmap"))
109 hostmap =
" host_map: @local::hostmap";
112 std::stringstream ss;
113 ss << pset.to_string();
115 for (
int ii = 0; ii < senders; ++ii)
117 ss <<
"s" << ii <<
": { transferPluginType: " << type <<
" source_rank: " << ii <<
" max_fragment_size_words: " << max_payload_size <<
" buffer_count: " << buffer_count <<
" shm_key_offset: " << std::to_string(key) << hostmap <<
"}";
119 ss <<
"} destinations: {";
120 for (
int jj = senders; jj < senders + receivers; ++jj)
122 ss <<
"d" << jj <<
": { transferPluginType: " << type <<
" destination_rank: " << jj <<
" max_fragment_size_words: " << max_payload_size <<
" buffer_count: " << buffer_count <<
" shm_key_offset: " << std::to_string(key) << hostmap <<
"}";
126 daq_pset_ = fhicl::ParameterSet();
127 make_ParameterSet(ss.str(), daq_pset_);
141 MPI_Barrier(MPI_COMM_WORLD);
143 MPI_Comm_split(MPI_COMM_WORLD, static_cast<int>(builder_role_), 0, &local_group_comm_);
144 switch (builder_role_)
154 msg(
"WARNING: a sink was instantiated despite want_sink being false:\n"
155 "set nsinks to 0 in invocation of daqrate?\n");
157 MPI_Barrier(MPI_COMM_WORLD);
164 throw "No such node type";
170 printHost(
"detector");
173 MPI_Comm_rank(local_group_comm_, &detector_rank);
174 assert(!(detector_rank < 0));
175 std::ostringstream det_ps_name_loc;
176 std::vector<std::string> detectors;
177 bool detectors_present = daq_pset_.get_if_present(
"detectors", detectors);
178 size_t detectors_size = detectors.size();
179 if (!(detectors_present && detectors_size))
181 throw cet::exception(
"Configuration")
182 <<
"Unable to find required sequence of detector "
183 <<
"parameter set names, \"detectors\".";
185 fhicl::ParameterSet det_ps =
186 daq_pset_.get<fhicl::ParameterSet>(((detectors_size >
static_cast<size_t>(detector_rank)) ? detectors[detector_rank] : detectors[0]));
187 std::unique_ptr<artdaq::FragmentGenerator>
const
189 (det_ps.get<std::string>(
"generator"),
193 MPI_Barrier(local_group_comm_);
196 size_t fragments_per_source = -1;
197 daq_pset_.get_if_present(
"fragments_per_source", fragments_per_source);
199 size_t fragments_sent = 0;
200 while (fragments_sent < fragments_per_source && gen->getNext(frags))
206 MPI_Barrier(local_group_comm_);
208 for (
auto& fragPtr : frags)
210 std::cout <<
"Program::detector: Sending fragment " << fragments_sent + 1 <<
" of " << fragments_per_source << std::endl;
211 TLOG(TLVL_DEBUG) <<
"Program::detector: Sending fragment " << fragments_sent + 1 <<
" of " << fragments_per_source ;
212 auto sequence_id = fragPtr->sequenceID();
215 if (h.GetSentSequenceIDCount(sequence_id) == gen->
fragmentIDs().size())
217 h.RemoveRoutingTableEntry(sequence_id);
220 if (++fragments_sent == fragments_per_source) {
break; }
221 if (want_periodic_sync_ && (fragments_sent % 100) == 0)
224 MPI_Barrier(local_group_comm_);
229 TLOG(TLVL_DEBUG) <<
"detector waiting " << my_rank ;
231 TLOG(TLVL_DEBUG) <<
"detector done " << my_rank ;
232 MPI_Comm_free(&local_group_comm_);
233 MPI_Barrier(MPI_COMM_WORLD);
240 usleep(1000 * my_rank);
242 auto events = std::make_shared<artdaq::SharedMemoryEventManager>(daq_pset_, daq_pset_);
243 events->startRun(daq_pset_.get<
int>(
"run_number", 100));
253 TLOG(TLVL_DEBUG) <<
"All detectors are done, Sending endOfData Fragment" ;
256 bool endSucceeded =
false;
257 endSucceeded = events->endOfData();
260 TLOG(TLVL_DEBUG) <<
"Sink: reader is done" ;
264 TLOG(TLVL_DEBUG) <<
"Sink: reader failed to complete because the "
265 <<
"endOfData marker could not be pushed onto the queue."
269 TLOG(TLVL_DEBUG) <<
"Sink done " << my_rank ;
270 MPI_Barrier(MPI_COMM_WORLD);
273 void Builder::printHost(
const std::string& functionName)
const
275 char* doPrint = getenv(
"PRINT_HOST");
276 if (doPrint == 0) {
return; }
277 const int ARRSIZE = 80;
278 char hostname[ARRSIZE];
279 std::string hostString;
280 if (!gethostname(hostname, ARRSIZE))
282 hostString = hostname;
286 hostString =
"unknown";
288 TLOG(TLVL_DEBUG) <<
"Running " << functionName
289 <<
" on host " << hostString
290 <<
" with rank " << my_rank <<
"."
298 getrusage(RUSAGE_SELF, &usage);
299 std::cout << myid <<
":"
305 int main(
int argc,
char* argv[])
309 std::ostringstream descstr;
311 <<
" <-c <config-file>> <other-options> [<source-file>]+";
312 bpo::options_description desc(descstr.str());
314 (
"config,c", bpo::value<std::string>(),
"Configuration file.")
315 (
"key,k", bpo::value<int>(),
"Shared Memory Key")
316 (
"help,h",
"produce help message");
317 bpo::variables_map vm;
319 bpo::store(bpo::command_line_parser(argc, argv).options(desc).run(), vm);
322 catch (bpo::error
const & e) {
323 std::cerr <<
"Exception from command line processing in " << argv[0]
324 <<
": " << e.what() <<
"\n";
327 if (vm.count(
"help")) {
328 std::cout << desc << std::endl;
331 if (!vm.count(
"config")) {
332 std::cerr <<
"Exception from command line processing in " << argv[0]
333 <<
": no configuration file given.\n"
334 <<
"For usage and an options list, please do '"
335 << argv[0] <<
" --help"
342 key = vm[
"key"].as<
int>();
344 fhicl::ParameterSet pset;
345 if (getenv(
"FHICL_FILE_PATH") ==
nullptr) {
347 <<
"INFO: environment variable FHICL_FILE_PATH was not set. Using \".\"\n";
348 setenv(
"FHICL_FILE_PATH",
".", 0);
350 cet::filepath_lookup_after1 lookup_policy(
"FHICL_FILE_PATH");
351 fhicl::make_ParameterSet(vm[
"config"].as<std::string>(), lookup_policy, pset);
356 Builder p(argc, argv, pset,key);
357 std::cerr <<
"Started process " << my_rank <<
" of " << p.procs_ <<
".\n";
361 catch (std::string& x)
363 std::cerr <<
"Exception (type string) caught in driver: "
368 catch (
char const* m)
370 std::cerr <<
"Exception (type char const*) caught in driver: ";
377 std::cerr <<
"[the value was a null pointer, so no message is available]";
std::set< int > running_sources() const
void go()
Start the Builder application, using the type configuration to select which method to run...
A wrapper for a MPI program. Similar to MPISentry.
virtual std::vector< Fragment::fragment_id_t > fragmentIDs()=0
void detector()
Generate data, and send it using DataSenderManager.
std::pair< int, TransferInterface::CopyStatus > sendFragment(Fragment &&frag)
double convertUnixTimeToSeconds(time_t inputUnixTime)
std::unique_ptr< FragmentGenerator > makeFragmentGenerator(std::string const &generator_plugin_spec, fhicl::ParameterSet const &ps)
void configureMessageFacility(char const *progname, bool useConsole=true, bool printDebug=false)
The Builder class runs the builder test.
std::list< FragmentPtr > FragmentPtrs
void sink()
Receive data from source via DataReceiverManager, send it to the EventStore (and art, if configured)
Builder(int argc, char *argv[], fhicl::ParameterSet pset, int key)
Builder Constructor.