3 #define TRACE_NAME (app_name + "_DataReceiverManager").c_str()
4 #include "artdaq/DAQdata/Globals.hh"
5 #include "artdaq/DAQrate/DataReceiverManager.hh"
6 #include "artdaq/TransferPlugins/MakeTransferPlugin.hh"
7 #include "artdaq/TransferPlugins/detail/HostMap.hh"
8 #include "cetlib_except/exception.h"
12 : stop_requested_(false)
13 , stop_requested_time_(0)
16 , source_metric_data_()
17 , source_metric_send_time_()
23 , receive_timeout_(pset.get<size_t>(
"receive_timeout_usec", 100000))
24 , stop_timeout_ms_(pset.get<size_t>(
"stop_timeout_ms", 1500))
26 , non_reliable_mode_enabled_(pset.get<bool>(
"non_reliable_mode", false))
27 , non_reliable_mode_retry_count_(pset.get<size_t>(
"non_reliable_mode_retry_count", -1))
29 TLOG(TLVL_DEBUG) <<
"Constructor";
30 auto enabled_srcs = pset.get<std::vector<int>>(
"enabled_sources", std::vector<int>());
31 auto enabled_srcs_empty = enabled_srcs.size() == 0;
33 if (non_reliable_mode_enabled_)
35 TLOG(TLVL_WARNING) <<
"DataReceiverManager is configured to drop data after " << non_reliable_mode_retry_count_
36 <<
" failed attempts to put data into the SharedMemoryEventManager! If this is unexpected, please check your configuration!";
39 if (enabled_srcs_empty)
41 TLOG(TLVL_INFO) <<
"enabled_sources not specified, assuming all sources enabled.";
45 for (
auto& s : enabled_srcs)
47 enabled_sources_[s] =
true;
51 hostMap_t host_map = MakeHostMap(pset);
52 size_t tcp_receive_buffer_size = pset.get<
size_t>(
"tcp_receive_buffer_size", 0);
53 size_t max_fragment_size_words = pset.get<
size_t>(
"max_fragment_size_words", 0);
55 auto srcs = pset.get<fhicl::ParameterSet>(
"sources", fhicl::ParameterSet());
56 for (
auto& s : srcs.get_pset_names())
58 auto src_pset = srcs.get<fhicl::ParameterSet>(s);
59 host_map = MakeHostMap(src_pset, host_map);
61 auto host_map_pset = MakeHostMapPset(host_map);
62 fhicl::ParameterSet srcs_mod;
63 for (
auto& s : srcs.get_pset_names())
65 auto src_pset = srcs.get<fhicl::ParameterSet>(s);
66 src_pset.erase(
"host_map");
67 src_pset.put<std::vector<fhicl::ParameterSet>>(
"host_map", host_map_pset);
69 if (tcp_receive_buffer_size != 0 && !src_pset.has_key(
"tcp_receive_buffer_size"))
71 src_pset.put<
size_t>(
"tcp_receive_buffer_size", tcp_receive_buffer_size);
73 if (max_fragment_size_words != 0 && !src_pset.has_key(
"max_fragment_size_words"))
75 src_pset.put<
size_t>(
"max_fragment_size_words", max_fragment_size_words);
78 srcs_mod.put<fhicl::ParameterSet>(s, src_pset);
81 for (
auto& s : srcs_mod.get_pset_names())
85 auto transfer = std::unique_ptr<TransferInterface>(MakeTransferPlugin(srcs_mod, s,
87 auto source_rank = transfer->source_rank();
88 if (enabled_srcs_empty) enabled_sources_[source_rank] =
true;
89 else if (!enabled_sources_.count(source_rank)) enabled_sources_[source_rank] =
false;
90 running_sources_[source_rank] =
false;
91 source_plugins_[source_rank] = std::move(transfer);
92 source_metric_send_time_[source_rank] = std::chrono::steady_clock::now();
93 source_metric_data_[source_rank] = source_metric_data();
95 catch (cet::exception ex)
97 TLOG(TLVL_WARNING) <<
"cet::exception caught while setting up source " << s <<
": " << ex.what();
99 catch (std::exception ex)
101 TLOG(TLVL_WARNING) <<
"std::exception caught while setting up source " << s <<
": " << ex.what();
105 TLOG(TLVL_WARNING) <<
"Non-cet exception caught while setting up source " << s <<
".";
108 if (srcs.get_pset_names().size() == 0)
110 TLOG(TLVL_ERROR) <<
"No sources configured!";
116 TLOG(TLVL_TRACE) <<
"~DataReceiverManager: BEGIN";
118 shm_manager_.reset();
119 TLOG(TLVL_TRACE) <<
"Destructor END";
125 stop_requested_ =
false;
126 if (shm_manager_) shm_manager_->setRequestMode(artdaq::detail::RequestMessageMode::Normal);
127 for (
auto& source : source_plugins_)
129 auto& rank = source.first;
130 if (enabled_sources_.count(rank) && enabled_sources_[rank].load())
132 source_metric_data_[rank] = source_metric_data();
133 source_metric_send_time_[rank] = std::chrono::steady_clock::now();
135 recv_frag_count_.setSlot(rank, 0);
136 recv_frag_size_.setSlot(rank,0);
137 recv_seq_count_.setSlot(rank,0);
139 running_sources_[rank] =
true;
140 boost::thread::attributes attrs;
141 attrs.set_stack_size(4096 * 2000);
143 source_threads_[rank] = boost::thread(attrs, boost::bind(&DataReceiverManager::runReceiver_,
this, rank));
145 catch (
const boost::exception& e)
147 TLOG(TLVL_ERROR) <<
"Caught boost::exception starting Receiver " << rank <<
" thread: " << boost::diagnostic_information(e) <<
", errno=" << errno;
148 std::cerr <<
"Caught boost::exception starting Receiver " << rank <<
" thread: " << boost::diagnostic_information(e) <<
", errno=" << errno << std::endl;
157 TLOG(TLVL_TRACE) <<
"stop_threads: BEGIN: Setting stop_requested to true, frags=" << count() <<
", bytes=" << byteCount();
160 stop_requested_ =
true;
162 auto initial_count = running_sources().size();
163 TLOG(TLVL_TRACE) <<
"stop_threads: Waiting for " << initial_count <<
" running receiver threads to stop";
164 auto wait_start = std::chrono::steady_clock::now();
165 auto last_report = std::chrono::steady_clock::now();
172 <<
" receiver threads to end (" << running_sources().size() <<
" remain)";
173 last_report = std::chrono::steady_clock::now();
176 if (running_sources().size()) {
177 TLOG(TLVL_WARNING) <<
"stop_threads: Timeout expired while waiting for all receiver threads to end. There are "
178 << running_sources().size() <<
" threads remaining.";
181 TLOG(TLVL_TRACE) <<
"stop_threads: Joining " << source_threads_.size() <<
" receiver threads";
182 for (
auto it = source_threads_.begin(); it != source_threads_.end(); ++it)
184 TLOG(TLVL_TRACE) <<
"stop_threads: Joining thread for source_rank " << (*it).first;
185 if ((*it).second.joinable())
188 TLOG(TLVL_ERROR) <<
"stop_threads: Thread for source rank " << (*it).first <<
" is not joinable!";
190 source_threads_.clear();
192 TLOG(TLVL_TRACE) <<
"stop_threads: END";
197 std::set<int> output;
198 for (
auto& src : enabled_sources_)
200 if (src.second) output.insert(src.first);
207 std::set<int> output;
208 for (
auto& src : running_sources_)
210 if (src.second) output.insert(src.first);
215 void artdaq::DataReceiverManager::runReceiver_(
int source_rank)
217 std::chrono::steady_clock::time_point start_time, after_header, before_body,after_body, end_time = std::chrono::steady_clock::now();
220 size_t endOfDataCount = -1;
221 auto sleep_time = receive_timeout_ / 100 > 100000 ? 100000 : receive_timeout_ / 100;
222 if (sleep_time < 5000) sleep_time = 5000;
223 auto max_retries = non_reliable_mode_retry_count_ * ceil(receive_timeout_ / sleep_time);
225 while (!(stop_requested_ &&
TimeUtils::gettimeofday_us() - stop_requested_time_ > stop_timeout_ms_ * 1000) && enabled_sources_.count(source_rank))
227 TLOG(16) <<
"runReceiver_: Begin loop";
228 std::this_thread::yield();
231 if (endOfDataCount <= recv_frag_count_.slotCount(source_rank) && !source_plugins_[source_rank]->isRunning())
233 TLOG(TLVL_DEBUG) <<
"runReceiver_: End of Data conditions met, ending runReceiver loop";
237 start_time = std::chrono::steady_clock::now();
239 TLOG(16) <<
"runReceiver_: Calling receiveFragmentHeader tmo=" << receive_timeout_;
240 ret = source_plugins_[source_rank]->receiveFragmentHeader(header, receive_timeout_);
241 TLOG(16) <<
"runReceiver_: Done with receiveFragmentHeader, ret=" << ret <<
" (should be " << source_rank <<
")";
242 if (ret != source_rank)
245 TLOG(TLVL_WARNING) <<
"Received Fragment from rank " << ret <<
", but was expecting one from rank " << source_rank <<
"!";
249 TLOG(TLVL_ERROR) <<
"Transfer Plugin returned DATA_END, ending receive loop!";
252 if (*running_sources().begin() == source_rank)
254 TLOG(6) <<
"Calling SMEM::CheckPendingBuffers from DRM receiver thread for " << source_rank <<
" to make sure that things aren't stuck";
255 shm_manager_->CheckPendingBuffers();
262 after_header = std::chrono::steady_clock::now();
265 TLOG(TLVL_TRACE) <<
"Received Fragment Header from rank " << source_rank <<
", sequence ID " << header.
sequence_id <<
", timestamp " << header.
timestamp;
268 while (loc ==
nullptr)
270 loc = shm_manager_->WriteFragmentHeader(header);
273 if (loc ==
nullptr && stop_requested_)
break;
275 if (loc ==
nullptr) usleep(sleep_time);
277 if (non_reliable_mode_enabled_ && retries > max_retries)
279 loc = shm_manager_->WriteFragmentHeader(header,
true);
283 if (loc ==
nullptr && stop_requested_)
break;
287 TLOG(TLVL_ERROR) <<
"runReceiver_: Could not get data location for event " << header.
sequence_id;
290 before_body = std::chrono::steady_clock::now();
293 TLOG(16) <<
"runReceiver_: Calling receiveFragmentData from rank " << source_rank <<
", sequence ID " << header.
sequence_id <<
", timestamp " << header.
timestamp;
294 auto ret2 = source_plugins_[source_rank]->receiveFragmentData(loc, header.
word_count - header.
num_words());
295 TLOG(16) <<
"runReceiver_: Done with receiveFragmentData, ret2=" << ret2 <<
" (should be " << source_rank <<
")";
298 TLOG(TLVL_ERROR) <<
"Unexpected return code from receiveFragmentData after receiveFragmentHeader! (Expected: " << ret <<
", Got: " << ret2 <<
")";
299 TLOG(TLVL_ERROR) <<
"Error receiving data from rank " << source_rank <<
", data has been lost! Event " << header.
sequence_id <<
" will most likely be Incomplete!";
305 shm_manager_->DoneWritingFragment(header);
310 shm_manager_->DoneWritingFragment(header);
311 TLOG(TLVL_TRACE) <<
"Done receiving fragment with sequence ID " << header.
sequence_id <<
" from rank " << source_rank;
313 recv_frag_count_.incSlot(source_rank);
315 recv_seq_count_.setSlot(source_rank, header.
sequence_id);
316 if (endOfDataCount != static_cast<size_t>(-1))
318 TLOG(TLVL_DEBUG) <<
"Received fragment " << header.
sequence_id <<
" from rank " << source_rank
319 <<
" (" << recv_frag_count_.slotCount(source_rank) <<
"/" << endOfDataCount <<
")";
322 after_body = std::chrono::steady_clock::now();
332 source_metric_data_[source_rank].data_point_count++;
336 TLOG(6) <<
"runReceiver_: Sending receive stats for rank " << source_rank;
337 metricMan->sendMetric(
"Total Receive Time From Rank " + std::to_string(source_rank), source_metric_data_[source_rank].delta_t,
"s", 5,
MetricMode::Accumulate);
338 metricMan->sendMetric(
"Total Receive Size From Rank " + std::to_string(source_rank), static_cast<unsigned long>(source_metric_data_[source_rank].data_size),
"B", 5,
MetricMode::Accumulate);
339 metricMan->sendMetric(
"Total Receive Rate From Rank " + std::to_string(source_rank), source_metric_data_[source_rank].data_size / source_metric_data_[source_rank].delta_t,
"B/s", 5,
MetricMode::Average);
341 metricMan->sendMetric(
"Header Receive Time From Rank " + std::to_string(source_rank), source_metric_data_[source_rank].hdr_delta_t,
"s", 5,
MetricMode::Accumulate);
342 metricMan->sendMetric(
"Header Receive Size From Rank " + std::to_string(source_rank), static_cast<unsigned long>(source_metric_data_[source_rank].header_size),
"B", 5,
MetricMode::Accumulate);
343 metricMan->sendMetric(
"Header Receive Rate From Rank " + std::to_string(source_rank), source_metric_data_[source_rank].header_size / source_metric_data_[source_rank].hdr_delta_t,
"B/s", 5,
MetricMode::Average);
345 auto payloadSize = source_metric_data_[source_rank].data_size - source_metric_data_[source_rank].header_size;
346 metricMan->sendMetric(
"Data Receive Time From Rank " + std::to_string(source_rank), source_metric_data_[source_rank].data_delta_t,
"s", 5,
MetricMode::Accumulate);
347 metricMan->sendMetric(
"Data Receive Size From Rank " + std::to_string(source_rank), static_cast<unsigned long>(payloadSize),
"B", 5,
MetricMode::Accumulate);
348 metricMan->sendMetric(
"Data Receive Rate From Rank " + std::to_string(source_rank), payloadSize / source_metric_data_[source_rank].data_delta_t,
"B/s", 5,
MetricMode::Average);
350 metricMan->sendMetric(
"Data Receive Count From Rank " + std::to_string(source_rank), recv_frag_count_.slotCount(source_rank),
"fragments", 3,
MetricMode::LastPoint);
352 metricMan->sendMetric(
"Total Shared Memory Wait Time From Rank " + std::to_string(source_rank), source_metric_data_[source_rank].store_delta_t,
"s", 3,
MetricMode::Accumulate);
353 metricMan->sendMetric(
"Avg Shared Memory Wait Time From Rank " + std::to_string(source_rank), source_metric_data_[source_rank].store_delta_t / source_metric_data_[source_rank].data_point_count,
"s", 3,
MetricMode::Average);
354 metricMan->sendMetric(
"Avg Fragment Wait Time From Rank " + std::to_string(source_rank), source_metric_data_[source_rank].dead_t / source_metric_data_[source_rank].data_point_count,
"s", 3,
MetricMode::Average);
356 TLOG(6) <<
"runReceiver_: Done sending receive stats for rank " << source_rank;
358 source_metric_send_time_[source_rank] = std::chrono::steady_clock::now();
359 source_metric_data_[source_rank] = source_metric_data();
362 end_time = std::chrono::steady_clock::now();
370 auto ret3 = source_plugins_[source_rank]->receiveFragmentData(frag->headerAddress() + header.
num_words(), header.
word_count - header.
num_words());
371 if (ret3 != source_rank)
373 TLOG(TLVL_ERROR) <<
"Unexpected return code from receiveFragmentData after receiveFragmentHeader while receiving System Fragment! (Expected: " << source_rank <<
", Got: " << ret3 <<
")";
374 throw cet::exception(
"DataReceiverManager") <<
"Unexpected return code from receiveFragmentData after receiveFragmentHeader while receiving System Fragment! (Expected: " << source_rank <<
", Got: " << ret3 <<
")";
380 shm_manager_->setRequestMode(detail::RequestMessageMode::EndOfRun);
381 if(endOfDataCount == static_cast<size_t>(-1) ) endOfDataCount = *(frag->dataBegin());
382 else endOfDataCount += *(frag->dataBegin());
383 TLOG(TLVL_DEBUG) <<
"EndOfData Fragment indicates that " << endOfDataCount <<
" fragments are expected from rank " << source_rank
384 <<
" (recvd " << recv_frag_count_.slotCount(source_rank) <<
").";
387 TLOG(TLVL_DEBUG) <<
"Received Init Fragment from rank " << source_rank <<
".";
388 shm_manager_->setRequestMode(detail::RequestMessageMode::Normal);
389 shm_manager_->SetInitFragment(std::move(frag));
392 shm_manager_->setRequestMode(detail::RequestMessageMode::EndOfRun);
397 TLOG(TLVL_DEBUG) <<
"Received EndOfSubrun Fragment from rank " << source_rank
398 <<
" with sequence_id " << header.
sequence_id <<
".";
400 else shm_manager_->rolloverSubrun(recv_seq_count_.slotCount(source_rank), header.
timestamp);
403 shm_manager_->setRequestMode(detail::RequestMessageMode::EndOfRun);
409 source_plugins_[source_rank]->flush_buffers();
411 TLOG(TLVL_DEBUG) <<
"runReceiver_ " << source_rank <<
" receive loop exited";
412 running_sources_[source_rank] =
false;
std::unique_ptr< Fragment > FragmentPtr
std::set< int > running_sources() const
Get the list of sources which are still receiving data.
static constexpr type_t EndOfDataFragmentType
static constexpr type_t EmptyFragmentType
static constexpr bool isUserFragmentType(type_t fragmentType)
static constexpr type_t DataFragmentType
static constexpr type_t EndOfRunFragmentType
static constexpr type_t InitFragmentType
static constexpr type_t ErrorFragmentType
This TransferInterface is a Receiver.
static constexpr sequence_id_t InvalidSequenceID
DataReceiverManager(const fhicl::ParameterSet &ps, std::shared_ptr< SharedMemoryEventManager > shm)
DataReceiverManager Constructor.
static constexpr type_t ContainerFragmentType
std::set< int > enabled_sources() const
Get the list of enabled sources.
Value that is to be returned when a Transfer plugin determines that no more data will be arriving...
constexpr double GetElapsedTime(std::chrono::steady_clock::time_point then, std::chrono::steady_clock::time_point now=std::chrono::steady_clock::now())
static constexpr type_t ShutdownFragmentType
detail::RawFragmentHeader::RawDataType RawDataType
static constexpr type_t EndOfSubrunFragmentType
virtual ~DataReceiverManager()
DataReceiverManager Destructor.
void start_threads()
Start receiver threads for all enabled sources.
uint64_t gettimeofday_us()
void stop_threads()
Stop receiver threads.