artdaq  v3_04_00
DataReceiverManager.cc
1 #include <chrono>
2 
3 #define TRACE_NAME (app_name + "_DataReceiverManager").c_str()
4 #include "artdaq/DAQdata/Globals.hh"
5 #include "artdaq/DAQrate/DataReceiverManager.hh"
6 #include "artdaq/TransferPlugins/MakeTransferPlugin.hh"
7 #include "artdaq/TransferPlugins/detail/HostMap.hh"
8 #include "cetlib_except/exception.h"
9 #include <iomanip>
10 
11 artdaq::DataReceiverManager::DataReceiverManager(const fhicl::ParameterSet& pset, std::shared_ptr<SharedMemoryEventManager> shm)
12  : stop_requested_(false)
13  , stop_requested_time_(0)
14  , source_threads_()
15  , source_plugins_()
16  , source_metric_data_()
17  , source_metric_send_time_()
18  , enabled_sources_()
19  , running_sources_()
20  , recv_frag_count_()
21  , recv_frag_size_()
22  , recv_seq_count_()
23  , receive_timeout_(pset.get<size_t>("receive_timeout_usec", 100000))
24  , stop_timeout_ms_(pset.get<size_t>("stop_timeout_ms", 1500))
25  , shm_manager_(shm)
26  , non_reliable_mode_enabled_(pset.get<bool>("non_reliable_mode", false))
27  , non_reliable_mode_retry_count_(pset.get<size_t>("non_reliable_mode_retry_count", -1))
28 {
29  TLOG(TLVL_DEBUG) << "Constructor";
30  auto enabled_srcs = pset.get<std::vector<int>>("enabled_sources", std::vector<int>());
31  auto enabled_srcs_empty = enabled_srcs.size() == 0;
32 
33  if (non_reliable_mode_enabled_)
34  {
35  TLOG(TLVL_WARNING) << "DataReceiverManager is configured to drop data after " << non_reliable_mode_retry_count_
36  << " failed attempts to put data into the SharedMemoryEventManager! If this is unexpected, please check your configuration!";
37  }
38 
39  if (enabled_srcs_empty)
40  {
41  TLOG(TLVL_INFO) << "enabled_sources not specified, assuming all sources enabled.";
42  }
43  else
44  {
45  for (auto& s : enabled_srcs)
46  {
47  enabled_sources_[s] = true;
48  }
49  }
50 
51  hostMap_t host_map = MakeHostMap(pset);
52  size_t tcp_receive_buffer_size = pset.get<size_t>("tcp_receive_buffer_size", 0);
53  size_t max_fragment_size_words = pset.get<size_t>("max_fragment_size_words", 0);
54 
55  auto srcs = pset.get<fhicl::ParameterSet>("sources", fhicl::ParameterSet());
56  for (auto& s : srcs.get_pset_names())
57  {
58  auto src_pset = srcs.get<fhicl::ParameterSet>(s);
59  host_map = MakeHostMap(src_pset, host_map);
60  }
61  auto host_map_pset = MakeHostMapPset(host_map);
62  fhicl::ParameterSet srcs_mod;
63  for (auto& s : srcs.get_pset_names())
64  {
65  auto src_pset = srcs.get<fhicl::ParameterSet>(s);
66  src_pset.erase("host_map");
67  src_pset.put<std::vector<fhicl::ParameterSet>>("host_map", host_map_pset);
68 
69  if (tcp_receive_buffer_size != 0 && !src_pset.has_key("tcp_receive_buffer_size"))
70  {
71  src_pset.put<size_t>("tcp_receive_buffer_size", tcp_receive_buffer_size);
72  }
73  if (max_fragment_size_words != 0 && !src_pset.has_key("max_fragment_size_words"))
74  {
75  src_pset.put<size_t>("max_fragment_size_words", max_fragment_size_words);
76  }
77 
78  srcs_mod.put<fhicl::ParameterSet>(s, src_pset);
79  }
80 
81  for (auto& s : srcs_mod.get_pset_names())
82  {
83  try
84  {
85  auto transfer = std::unique_ptr<TransferInterface>(MakeTransferPlugin(srcs_mod, s,
87  auto source_rank = transfer->source_rank();
88  if (enabled_srcs_empty) enabled_sources_[source_rank] = true;
89  else if (!enabled_sources_.count(source_rank)) enabled_sources_[source_rank] = false;
90  running_sources_[source_rank] = false;
91  source_plugins_[source_rank] = std::move(transfer);
92  source_metric_send_time_[source_rank] = std::chrono::steady_clock::now();
93  source_metric_data_[source_rank] = source_metric_data();
94  }
95  catch (cet::exception ex)
96  {
97  TLOG(TLVL_WARNING) << "cet::exception caught while setting up source " << s << ": " << ex.what();
98  }
99  catch (std::exception ex)
100  {
101  TLOG(TLVL_WARNING) << "std::exception caught while setting up source " << s << ": " << ex.what();
102  }
103  catch (...)
104  {
105  TLOG(TLVL_WARNING) << "Non-cet exception caught while setting up source " << s << ".";
106  }
107  }
108  if (srcs.get_pset_names().size() == 0)
109  {
110  TLOG(TLVL_ERROR) << "No sources configured!";
111  }
112 }
113 
115 {
116  TLOG(TLVL_TRACE) << "~DataReceiverManager: BEGIN";
117  stop_threads();
118  shm_manager_.reset();
119  TLOG(TLVL_TRACE) << "Destructor END";
120 }
121 
122 
124 {
125  stop_requested_ = false;
126  if (shm_manager_) shm_manager_->setRequestMode(artdaq::detail::RequestMessageMode::Normal);
127  for (auto& source : source_plugins_)
128  {
129  auto& rank = source.first;
130  if (enabled_sources_.count(rank) && enabled_sources_[rank].load())
131  {
132  source_metric_data_[rank] = source_metric_data();
133  source_metric_send_time_[rank] = std::chrono::steady_clock::now();
134 
135  recv_frag_count_.setSlot(rank, 0);
136  recv_frag_size_.setSlot(rank,0);
137  recv_seq_count_.setSlot(rank,0);
138 
139  running_sources_[rank] = true;
140  boost::thread::attributes attrs;
141  attrs.set_stack_size(4096 * 2000); // 2000 KB
142  try {
143  source_threads_[rank] = boost::thread(attrs, boost::bind(&DataReceiverManager::runReceiver_, this, rank));
144  }
145  catch (const boost::exception& e)
146  {
147  TLOG(TLVL_ERROR) << "Caught boost::exception starting Receiver " << rank << " thread: " << boost::diagnostic_information(e) << ", errno=" << errno;
148  std::cerr << "Caught boost::exception starting Receiver " << rank << " thread: " << boost::diagnostic_information(e) << ", errno=" << errno << std::endl;
149  exit(5);
150  }
151  }
152  }
153 }
154 
156 {
157  TLOG(TLVL_TRACE) << "stop_threads: BEGIN: Setting stop_requested to true, frags=" << count() << ", bytes=" << byteCount();
158 
159  stop_requested_time_ = TimeUtils::gettimeofday_us();
160  stop_requested_ = true;
161 
162  auto initial_count = running_sources().size();
163  TLOG(TLVL_TRACE) << "stop_threads: Waiting for " << initial_count << " running receiver threads to stop";
164  auto wait_start = std::chrono::steady_clock::now();
165  auto last_report = std::chrono::steady_clock::now();
166  while (running_sources().size() && TimeUtils::GetElapsedTime(wait_start) < 60.0)
167  {
168  usleep(10000);
169  if (TimeUtils::GetElapsedTime(last_report) > 1.0)
170  {
171  TLOG(TLVL_DEBUG) << "stop_threads: Waited " << TimeUtils::GetElapsedTime(wait_start) << " s for " << initial_count
172  << " receiver threads to end (" << running_sources().size() << " remain)";
173  last_report = std::chrono::steady_clock::now();
174  }
175  }
176  if (running_sources().size()) {
177  TLOG(TLVL_WARNING) << "stop_threads: Timeout expired while waiting for all receiver threads to end. There are "
178  << running_sources().size() << " threads remaining.";
179  }
180 
181  TLOG(TLVL_TRACE) << "stop_threads: Joining " << source_threads_.size() << " receiver threads";
182  for (auto it = source_threads_.begin(); it != source_threads_.end(); ++it)
183  {
184  TLOG(TLVL_TRACE) << "stop_threads: Joining thread for source_rank " << (*it).first;
185  if ((*it).second.joinable())
186  (*it).second.join();
187  else
188  TLOG(TLVL_ERROR) << "stop_threads: Thread for source rank " << (*it).first << " is not joinable!";
189  }
190  source_threads_.clear(); // To prevent error messages from shutdown-after-stop
191 
192  TLOG(TLVL_TRACE) << "stop_threads: END";
193 }
194 
196 {
197  std::set<int> output;
198  for (auto& src : enabled_sources_)
199  {
200  if (src.second) output.insert(src.first);
201  }
202  return output;
203 }
204 
206 {
207  std::set<int> output;
208  for (auto& src : running_sources_)
209  {
210  if (src.second) output.insert(src.first);
211  }
212  return output;
213 }
214 
215 void artdaq::DataReceiverManager::runReceiver_(int source_rank)
216 {
217  std::chrono::steady_clock::time_point start_time, after_header, before_body,after_body, end_time = std::chrono::steady_clock::now();
218  int ret;
220  size_t endOfDataCount = -1;
221  auto sleep_time = receive_timeout_ / 100 > 100000 ? 100000 : receive_timeout_ / 100;
222  if (sleep_time < 5000) sleep_time = 5000;
223  auto max_retries = non_reliable_mode_retry_count_ * ceil(receive_timeout_ / sleep_time);
224 
225  while (!(stop_requested_ && TimeUtils::gettimeofday_us() - stop_requested_time_ > stop_timeout_ms_ * 1000) && enabled_sources_.count(source_rank))
226  {
227  TLOG(16) << "runReceiver_: Begin loop";
228  std::this_thread::yield();
229 
230  // Don't stop receiving until we haven't received anything for 1 second
231  if (endOfDataCount <= recv_frag_count_.slotCount(source_rank) && !source_plugins_[source_rank]->isRunning())
232  {
233  TLOG(TLVL_DEBUG) << "runReceiver_: End of Data conditions met, ending runReceiver loop";
234  break;
235  }
236 
237  start_time = std::chrono::steady_clock::now();
238 
239  TLOG(16) << "runReceiver_: Calling receiveFragmentHeader tmo=" << receive_timeout_;
240  ret = source_plugins_[source_rank]->receiveFragmentHeader(header, receive_timeout_);
241  TLOG(16) << "runReceiver_: Done with receiveFragmentHeader, ret=" << ret << " (should be " << source_rank << ")";
242  if (ret != source_rank)
243  {
244  if (ret >= 0) {
245  TLOG(TLVL_WARNING) << "Received Fragment from rank " << ret << ", but was expecting one from rank " << source_rank << "!";
246  }
247  else if (ret == TransferInterface::DATA_END)
248  {
249  TLOG(TLVL_ERROR) << "Transfer Plugin returned DATA_END, ending receive loop!";
250  break;
251  }
252  if (*running_sources().begin() == source_rank) // Only do this for the first sender in the running_sources_ map
253  {
254  TLOG(6) << "Calling SMEM::CheckPendingBuffers from DRM receiver thread for " << source_rank << " to make sure that things aren't stuck";
255  shm_manager_->CheckPendingBuffers();
256  }
257 
258  usleep(sleep_time);
259  continue; // Receive timeout or other oddness
260  }
261 
262  after_header = std::chrono::steady_clock::now();
263 
265  TLOG(TLVL_TRACE) << "Received Fragment Header from rank " << source_rank << ", sequence ID " << header.sequence_id << ", timestamp " << header.timestamp;
266  RawDataType* loc = nullptr;
267  size_t retries = 0;
268  while (loc == nullptr)//&& TimeUtils::GetElapsedTimeMicroseconds(after_header)) < receive_timeout_)
269  {
270  loc = shm_manager_->WriteFragmentHeader(header);
271 
272  // Break here and outside of the loop to go to the cleanup steps at the end of runReceiver_
273  if (loc == nullptr && stop_requested_) break;
274 
275  if (loc == nullptr) usleep(sleep_time);
276  retries++;
277  if (non_reliable_mode_enabled_ && retries > max_retries)
278  {
279  loc = shm_manager_->WriteFragmentHeader(header, true);
280  }
281  }
282  // Break here to go to cleanup at the end of runReceiver_
283  if (loc == nullptr && stop_requested_) break;
284  if (loc == nullptr)
285  {
286  // Could not enqueue event!
287  TLOG(TLVL_ERROR) << "runReceiver_: Could not get data location for event " << header.sequence_id;
288  continue;
289  }
290  before_body = std::chrono::steady_clock::now();
291 
293  TLOG(16) << "runReceiver_: Calling receiveFragmentData from rank " << source_rank << ", sequence ID " << header.sequence_id << ", timestamp " << header.timestamp;
294  auto ret2 = source_plugins_[source_rank]->receiveFragmentData(loc, header.word_count - header.num_words());
295  TLOG(16) << "runReceiver_: Done with receiveFragmentData, ret2=" << ret2 << " (should be " << source_rank << ")";
296 
297  if (ret != ret2) {
298  TLOG(TLVL_ERROR) << "Unexpected return code from receiveFragmentData after receiveFragmentHeader! (Expected: " << ret << ", Got: " << ret2 << ")";
299  TLOG(TLVL_ERROR) << "Error receiving data from rank " << source_rank << ", data has been lost! Event " << header.sequence_id << " will most likely be Incomplete!";
300 
301  // Mark the Fragment as invalid
302  /* \todo Make a RawFragmentHeader field that marks it as invalid while maintaining previous type! */
303  hdrLoc->type = Fragment::ErrorFragmentType;
304 
305  shm_manager_->DoneWritingFragment(header);
306  //throw cet::exception("DataReceiverManager") << "Unexpected return code from receiveFragmentData after receiveFragmentHeader! (Expected: " << ret << ", Got: " << ret2 << ")";
307  continue;
308  }
309 
310  shm_manager_->DoneWritingFragment(header);
311  TLOG(TLVL_TRACE) << "Done receiving fragment with sequence ID " << header.sequence_id << " from rank " << source_rank;
312 
313  recv_frag_count_.incSlot(source_rank);
314  recv_frag_size_.incSlot(source_rank, header.word_count * sizeof(RawDataType));
315  recv_seq_count_.setSlot(source_rank, header.sequence_id);
316  if (endOfDataCount != static_cast<size_t>(-1))
317  {
318  TLOG(TLVL_DEBUG) << "Received fragment " << header.sequence_id << " from rank " << source_rank
319  << " (" << recv_frag_count_.slotCount(source_rank) << "/" << endOfDataCount << ")";
320  }
321 
322  after_body = std::chrono::steady_clock::now();
323 
324  source_metric_data_[source_rank].hdr_delta_t += TimeUtils::GetElapsedTime(start_time, after_header);
325  source_metric_data_[source_rank].store_delta_t += TimeUtils::GetElapsedTime(after_header, before_body);
326  source_metric_data_[source_rank].data_delta_t += TimeUtils::GetElapsedTime(before_body, after_body);
327  source_metric_data_[source_rank].delta_t += TimeUtils::GetElapsedTime(start_time, after_body);
328  source_metric_data_[source_rank].dead_t += TimeUtils::GetElapsedTime(end_time, start_time);
329 
330  source_metric_data_[source_rank].data_size += header.word_count * sizeof(RawDataType);
331  source_metric_data_[source_rank].header_size += header.num_words() * sizeof(RawDataType);
332  source_metric_data_[source_rank].data_point_count++;
333 
334  if (metricMan && TimeUtils::GetElapsedTime(source_metric_send_time_[source_rank]) > 1)
335  {//&& recv_frag_count_.slotCount(source_rank) % 100 == 0) {
336  TLOG(6) << "runReceiver_: Sending receive stats for rank " << source_rank;
337  metricMan->sendMetric("Total Receive Time From Rank " + std::to_string(source_rank), source_metric_data_[source_rank].delta_t, "s", 5, MetricMode::Accumulate);
338  metricMan->sendMetric("Total Receive Size From Rank " + std::to_string(source_rank), static_cast<unsigned long>(source_metric_data_[source_rank].data_size), "B", 5, MetricMode::Accumulate);
339  metricMan->sendMetric("Total Receive Rate From Rank " + std::to_string(source_rank), source_metric_data_[source_rank].data_size / source_metric_data_[source_rank].delta_t, "B/s", 5, MetricMode::Average);
340 
341  metricMan->sendMetric("Header Receive Time From Rank " + std::to_string(source_rank), source_metric_data_[source_rank].hdr_delta_t, "s", 5, MetricMode::Accumulate);
342  metricMan->sendMetric("Header Receive Size From Rank " + std::to_string(source_rank), static_cast<unsigned long>(source_metric_data_[source_rank].header_size), "B", 5, MetricMode::Accumulate);
343  metricMan->sendMetric("Header Receive Rate From Rank " + std::to_string(source_rank), source_metric_data_[source_rank].header_size / source_metric_data_[source_rank].hdr_delta_t, "B/s", 5, MetricMode::Average);
344 
345  auto payloadSize = source_metric_data_[source_rank].data_size - source_metric_data_[source_rank].header_size;
346  metricMan->sendMetric("Data Receive Time From Rank " + std::to_string(source_rank), source_metric_data_[source_rank].data_delta_t, "s", 5, MetricMode::Accumulate);
347  metricMan->sendMetric("Data Receive Size From Rank " + std::to_string(source_rank), static_cast<unsigned long>(payloadSize), "B", 5, MetricMode::Accumulate);
348  metricMan->sendMetric("Data Receive Rate From Rank " + std::to_string(source_rank), payloadSize / source_metric_data_[source_rank].data_delta_t, "B/s", 5, MetricMode::Average);
349 
350  metricMan->sendMetric("Data Receive Count From Rank " + std::to_string(source_rank), recv_frag_count_.slotCount(source_rank), "fragments", 3, MetricMode::LastPoint);
351 
352  metricMan->sendMetric("Total Shared Memory Wait Time From Rank " + std::to_string(source_rank), source_metric_data_[source_rank].store_delta_t, "s", 3, MetricMode::Accumulate);
353  metricMan->sendMetric("Avg Shared Memory Wait Time From Rank " + std::to_string(source_rank), source_metric_data_[source_rank].store_delta_t / source_metric_data_[source_rank].data_point_count, "s", 3, MetricMode::Average);
354  metricMan->sendMetric("Avg Fragment Wait Time From Rank " + std::to_string(source_rank), source_metric_data_[source_rank].dead_t / source_metric_data_[source_rank].data_point_count, "s", 3, MetricMode::Average);
355 
356  TLOG(6) << "runReceiver_: Done sending receive stats for rank " << source_rank;
357 
358  source_metric_send_time_[source_rank] = std::chrono::steady_clock::now();
359  source_metric_data_[source_rank] = source_metric_data();
360  }
361 
362  end_time = std::chrono::steady_clock::now();
363  }
365  {
366  TLOG(TLVL_DEBUG) << "Received System Fragment from rank " << source_rank << " of type " << detail::RawFragmentHeader::SystemTypeToString(header.type) << ".";
367 
368  FragmentPtr frag(new Fragment(header.word_count - header.num_words()));
369  memcpy(frag->headerAddress(), &header, header.num_words() * sizeof(RawDataType));
370  auto ret3 = source_plugins_[source_rank]->receiveFragmentData(frag->headerAddress() + header.num_words(), header.word_count - header.num_words());
371  if (ret3 != source_rank)
372  {
373  TLOG(TLVL_ERROR) << "Unexpected return code from receiveFragmentData after receiveFragmentHeader while receiving System Fragment! (Expected: " << source_rank << ", Got: " << ret3 << ")";
374  throw cet::exception("DataReceiverManager") << "Unexpected return code from receiveFragmentData after receiveFragmentHeader while receiving System Fragment! (Expected: " << source_rank << ", Got: " << ret3 << ")";
375  }
376 
377  switch (header.type)
378  {
380  shm_manager_->setRequestMode(detail::RequestMessageMode::EndOfRun);
381  if(endOfDataCount == static_cast<size_t>(-1) ) endOfDataCount = *(frag->dataBegin());
382  else endOfDataCount += *(frag->dataBegin());
383  TLOG(TLVL_DEBUG) << "EndOfData Fragment indicates that " << endOfDataCount << " fragments are expected from rank " << source_rank
384  << " (recvd " << recv_frag_count_.slotCount(source_rank) << ").";
385  break;
387  TLOG(TLVL_DEBUG) << "Received Init Fragment from rank " << source_rank << ".";
388  shm_manager_->setRequestMode(detail::RequestMessageMode::Normal);
389  shm_manager_->SetInitFragment(std::move(frag));
390  break;
392  shm_manager_->setRequestMode(detail::RequestMessageMode::EndOfRun);
393  //shm_manager_->endRun();
394  break;
396  //shm_manager_->setRequestMode(detail::RequestMessageMode::EndOfRun);
397  TLOG(TLVL_DEBUG) << "Received EndOfSubrun Fragment from rank " << source_rank
398  << " with sequence_id " << header.sequence_id << ".";
399  if (header.sequence_id != Fragment::InvalidSequenceID) shm_manager_->rolloverSubrun(header.sequence_id, header.timestamp);
400  else shm_manager_->rolloverSubrun(recv_seq_count_.slotCount(source_rank), header.timestamp);
401  break;
403  shm_manager_->setRequestMode(detail::RequestMessageMode::EndOfRun);
404  break;
405  }
406  }
407  }
408 
409  source_plugins_[source_rank]->flush_buffers();
410 
411  TLOG(TLVL_DEBUG) << "runReceiver_ " << source_rank << " receive loop exited";
412  running_sources_[source_rank] = false;
413 }
std::unique_ptr< Fragment > FragmentPtr
std::set< int > running_sources() const
Get the list of sources which are still receiving data.
static constexpr type_t EndOfDataFragmentType
static constexpr type_t EmptyFragmentType
static constexpr bool isUserFragmentType(type_t fragmentType)
static constexpr std::size_t num_words()
static constexpr type_t DataFragmentType
static constexpr type_t EndOfRunFragmentType
static constexpr type_t InitFragmentType
static constexpr type_t ErrorFragmentType
This TransferInterface is a Receiver.
static constexpr sequence_id_t InvalidSequenceID
DataReceiverManager(const fhicl::ParameterSet &ps, std::shared_ptr< SharedMemoryEventManager > shm)
DataReceiverManager Constructor.
static constexpr type_t ContainerFragmentType
std::set< int > enabled_sources() const
Get the list of enabled sources.
Value that is to be returned when a Transfer plugin determines that no more data will be arriving...
constexpr double GetElapsedTime(std::chrono::steady_clock::time_point then, std::chrono::steady_clock::time_point now=std::chrono::steady_clock::now())
static constexpr type_t ShutdownFragmentType
detail::RawFragmentHeader::RawDataType RawDataType
static std::string SystemTypeToString(type_t type)
static constexpr type_t EndOfSubrunFragmentType
virtual ~DataReceiverManager()
DataReceiverManager Destructor.
void start_threads()
Start receiver threads for all enabled sources.
uint64_t gettimeofday_us()
void stop_threads()
Stop receiver threads.