1 #define TRACE_NAME "builder"
3 #include "art/Framework/Art/artapp.h"
4 #include "artdaq-core/Generators/FragmentGenerator.hh"
5 #include "artdaq-core/Data/Fragment.hh"
6 #include "artdaq-core/Generators/makeFragmentGenerator.hh"
8 #include "artdaq/DAQrate/DataSenderManager.hh"
9 #include "artdaq/DAQrate/DataReceiverManager.hh"
10 #include "artdaq-core/Core/SimpleMemoryReader.hh"
13 #include <boost/program_options.hpp>
14 #include "fhiclcpp/make_ParameterSet.h"
15 namespace bpo = boost::program_options;
33 #include <sys/resource.h>
49 Builder(
int argc,
char* argv[], fhicl::ParameterSet pset,
int key);
73 void printHost(
const std::string& functionName)
const;
75 fhicl::ParameterSet daq_pset_;
76 bool const want_sink_;
77 bool const want_periodic_sync_;
78 MPI_Comm local_group_comm_;
85 , want_sink_(daq_pset_.get<bool>(
"want_sink", true))
86 , want_periodic_sync_(daq_pset_.get<bool>(
"want_periodic_sync", false))
89 std::vector<std::string> detectors;
90 daq_pset_.get_if_present(
"detectors", detectors);
91 if (static_cast<size_t>(my_rank) >= detectors.size())
93 builder_role_ = Role::SINK;
97 builder_role_ = Role::DETECTOR;
99 std::string type(pset.get<std::string>(
"transfer_plugin_type",
"Shmem"));
101 int senders = pset.get<
int>(
"num_senders");
102 int receivers = pset.get<
int>(
"num_receivers");
103 int buffer_count = pset.get<
int>(
"buffer_count", 10);
104 int max_payload_size = pset.get<
size_t>(
"fragment_size", 0x100000);
106 std::string hostmap =
"";
107 if (pset.has_key(
"hostmap"))
109 hostmap =
" host_map: @local::hostmap";
112 std::stringstream ss;
113 ss << pset.to_string();
115 for (
int ii = 0; ii < senders; ++ii)
117 ss <<
"s" << ii <<
": { transferPluginType: " << type <<
" source_rank: " << ii <<
" max_fragment_size_words: " << max_payload_size <<
" buffer_count: " << buffer_count <<
" shm_key_offset: " << std::to_string(key) << hostmap <<
"}";
119 ss <<
"} destinations: {";
120 for (
int jj = senders; jj < senders + receivers; ++jj)
122 ss <<
"d" << jj <<
": { transferPluginType: " << type <<
" destination_rank: " << jj <<
" max_fragment_size_words: " << max_payload_size <<
" buffer_count: " << buffer_count <<
" shm_key_offset: " << std::to_string(key) << hostmap <<
"}";
126 daq_pset_ = fhicl::ParameterSet();
127 make_ParameterSet(ss.str(), daq_pset_);
141 MPI_Barrier(MPI_COMM_WORLD);
143 MPI_Comm_split(MPI_COMM_WORLD, static_cast<int>(builder_role_), 0, &local_group_comm_);
144 switch (builder_role_)
154 msg(
"WARNING: a sink was instantiated despite want_sink being false:\n"
155 "set nsinks to 0 in invocation of daqrate?\n");
157 MPI_Barrier(MPI_COMM_WORLD);
164 throw "No such node type";
170 printHost(
"detector");
173 MPI_Comm_rank(local_group_comm_, &detector_rank);
174 assert(!(detector_rank < 0));
175 std::ostringstream det_ps_name_loc;
176 std::vector<std::string> detectors;
177 bool detectors_present = daq_pset_.get_if_present(
"detectors", detectors);
178 size_t detectors_size = detectors.size();
179 if (!(detectors_present && detectors_size))
181 throw cet::exception(
"Configuration")
182 <<
"Unable to find required sequence of detector "
183 <<
"parameter set names, \"detectors\".";
185 fhicl::ParameterSet det_ps =
186 daq_pset_.get<fhicl::ParameterSet>(((detectors_size >
static_cast<size_t>(detector_rank)) ? detectors[detector_rank] : detectors[0]));
187 std::unique_ptr<artdaq::FragmentGenerator>
const
189 (det_ps.get<std::string>(
"generator"),
193 MPI_Barrier(local_group_comm_);
196 size_t fragments_per_source = -1;
197 daq_pset_.get_if_present(
"fragments_per_source", fragments_per_source);
199 size_t fragments_sent = 0;
200 while (fragments_sent < fragments_per_source && gen->getNext(frags))
206 MPI_Barrier(local_group_comm_);
208 for (
auto& fragPtr : frags)
210 std::cout <<
"Program::detector: Sending fragment " << fragments_sent + 1 <<
" of " << fragments_per_source << std::endl;
211 TLOG(TLVL_DEBUG) <<
"Program::detector: Sending fragment " << fragments_sent + 1 <<
" of " << fragments_per_source ;
213 if (++fragments_sent == fragments_per_source) {
break; }
214 if (want_periodic_sync_ && (fragments_sent % 100) == 0)
217 MPI_Barrier(local_group_comm_);
222 TLOG(TLVL_DEBUG) <<
"detector waiting " << my_rank ;
224 TLOG(TLVL_DEBUG) <<
"detector done " << my_rank ;
225 MPI_Comm_free(&local_group_comm_);
226 MPI_Barrier(MPI_COMM_WORLD);
233 usleep(1000 * my_rank);
235 auto events = std::make_shared<artdaq::SharedMemoryEventManager>(daq_pset_, daq_pset_);
236 events->startRun(daq_pset_.get<
int>(
"run_number", 100));
246 TLOG(TLVL_DEBUG) <<
"All detectors are done, Sending endOfData Fragment" ;
249 bool endSucceeded =
false;
250 endSucceeded = events->endOfData();
253 TLOG(TLVL_DEBUG) <<
"Sink: reader is done" ;
257 TLOG(TLVL_DEBUG) <<
"Sink: reader failed to complete because the "
258 <<
"endOfData marker could not be pushed onto the queue."
262 TLOG(TLVL_DEBUG) <<
"Sink done " << my_rank ;
263 MPI_Barrier(MPI_COMM_WORLD);
266 void Builder::printHost(
const std::string& functionName)
const
268 char* doPrint = getenv(
"PRINT_HOST");
269 if (doPrint == 0) {
return; }
270 const int ARRSIZE = 80;
271 char hostname[ARRSIZE];
272 std::string hostString;
273 if (!gethostname(hostname, ARRSIZE))
275 hostString = hostname;
279 hostString =
"unknown";
281 TLOG(TLVL_DEBUG) <<
"Running " << functionName
282 <<
" on host " << hostString
283 <<
" with rank " << my_rank <<
"."
291 getrusage(RUSAGE_SELF, &usage);
292 std::cout << myid <<
":"
298 int main(
int argc,
char* argv[])
302 std::ostringstream descstr;
304 <<
" <-c <config-file>> <other-options> [<source-file>]+";
305 bpo::options_description desc(descstr.str());
307 (
"config,c", bpo::value<std::string>(),
"Configuration file.")
308 (
"key,k", bpo::value<int>(),
"Shared Memory Key")
309 (
"help,h",
"produce help message");
310 bpo::variables_map vm;
312 bpo::store(bpo::command_line_parser(argc, argv).options(desc).run(), vm);
315 catch (bpo::error
const & e) {
316 std::cerr <<
"Exception from command line processing in " << argv[0]
317 <<
": " << e.what() <<
"\n";
320 if (vm.count(
"help")) {
321 std::cout << desc << std::endl;
324 if (!vm.count(
"config")) {
325 std::cerr <<
"Exception from command line processing in " << argv[0]
326 <<
": no configuration file given.\n"
327 <<
"For usage and an options list, please do '"
328 << argv[0] <<
" --help"
335 key = vm[
"key"].as<
int>();
337 fhicl::ParameterSet pset;
338 if (getenv(
"FHICL_FILE_PATH") ==
nullptr) {
340 <<
"INFO: environment variable FHICL_FILE_PATH was not set. Using \".\"\n";
341 setenv(
"FHICL_FILE_PATH",
".", 0);
343 cet::filepath_lookup_after1 lookup_policy(
"FHICL_FILE_PATH");
344 fhicl::make_ParameterSet(vm[
"config"].as<std::string>(), lookup_policy, pset);
349 Builder p(argc, argv, pset,key);
350 std::cerr <<
"Started process " << my_rank <<
" of " << p.procs_ <<
".\n";
354 catch (std::string& x)
356 std::cerr <<
"Exception (type string) caught in driver: "
361 catch (
char const* m)
363 std::cerr <<
"Exception (type char const*) caught in driver: ";
370 std::cerr <<
"[the value was a null pointer, so no message is available]";
std::set< int > running_sources() const
void go()
Start the Builder application, using the type configuration to select which method to run...
A wrapper for a MPI program. Similar to MPISentry.
void detector()
Generate data, and send it using DataSenderManager.
std::pair< int, TransferInterface::CopyStatus > sendFragment(Fragment &&frag)
double convertUnixTimeToSeconds(time_t inputUnixTime)
std::unique_ptr< FragmentGenerator > makeFragmentGenerator(std::string const &generator_plugin_spec, fhicl::ParameterSet const &ps)
void configureMessageFacility(char const *progname, bool useConsole=true, bool printDebug=false)
The Builder class runs the builder test.
std::list< FragmentPtr > FragmentPtrs
void sink()
Receive data from source via DataReceiverManager, send it to the EventStore (and art, if configured)
Builder(int argc, char *argv[], fhicl::ParameterSet pset, int key)
Builder Constructor.