artdaq_demo  v3_04_00
NthEvent_policy.cc
1 #define TRACE_NAME "NthEventPolicy"
2 #include "artdaq/Application/Routing/RoutingMasterPolicy.hh"
3 #include "artdaq/Application/Routing/PolicyMacros.hh"
4 #include "fhiclcpp/ParameterSet.h"
5 #include "cetlib_except/exception.h"
6 
7 namespace artdaq
8 {
14  {
15  public:
16  explicit NthEventPolicy(fhicl::ParameterSet ps);
17 
18  virtual ~NthEventPolicy() { }
19 
20  detail::RoutingPacket GetCurrentTable() override;
21  private:
22  size_t nth_;
23  int nth_rank_;
24  };
25 
36  NthEventPolicy::NthEventPolicy(fhicl::ParameterSet ps)
38  , nth_(ps.get<size_t>("nth_event"))
39  , nth_rank_(ps.get<int>("target_receiver"))
40  {
41  if (nth_ == 0) throw cet::exception("NthEvent_policy") << "nth_event must be greater than 0!";
42  }
43 
50  detail::RoutingPacket NthEventPolicy::GetCurrentTable()
51  {
52  auto tokens = getTokensSnapshot();
53  std::map<int, int> table;
54  for (auto token : *tokens.get())
55  {
56  table[token]++;
57  }
58  if (table.count(nth_rank_) == 0) table[nth_rank_] = 0;
59  tokens->clear();
60 
61  detail::RoutingPacket output;
62  TLOG(5) << "table[nth_rank_]=" << (table[nth_rank_])
63  << ", Next nth=" << (((next_sequence_id_ / nth_) + 1) * nth_)
64  << ", max seq=" << (next_sequence_id_ + table.size() - 1) ;
65  auto endCondition = table.size() < GetReceiverCount() || (table[nth_rank_] <= 0 && (next_sequence_id_ % nth_ == 0 || ((next_sequence_id_ / nth_) + 1) * nth_ < next_sequence_id_ + table.size() - 1));
66  while (!endCondition)
67  {
68  for (auto r : table)
69  {
70  TLOG(5) << "nth_=" << nth_
71  << ", nth_rank=" << nth_rank_
72  << ", r=" << r.first
73  << ", next_sequence_id=" << next_sequence_id_;
74  if (next_sequence_id_ % nth_ == 0)
75  {
76  TLOG(5) << "Diverting event " << next_sequence_id_ << " to EVB " << nth_rank_ ;
77  output.emplace_back(detail::RoutingPacketEntry(next_sequence_id_++, nth_rank_));
78  table[nth_rank_]--;
79  }
80  if (r.first != nth_rank_) {
81  TLOG(5) << "Sending event " << next_sequence_id_ << " to EVB " << r.first ;
82  output.emplace_back(detail::RoutingPacketEntry(next_sequence_id_++, r.first));
83  if (!endCondition) endCondition = r.second == 1;
84  table[r.first]--;
85  }
86  }
87  TLOG(5) << "table[nth_rank_]=" << table[nth_rank_]
88  << ", Next nth=" << (((next_sequence_id_ / nth_) + 1) * nth_)
89  << ", max seq=" << (next_sequence_id_ + table.size() - 1) ;
90  endCondition = endCondition || (table[nth_rank_] <= 0 && (next_sequence_id_ % nth_ == 0 || (next_sequence_id_ / nth_) * nth_ + nth_ < next_sequence_id_ + table.size() - 1));
91  }
92 
93  for (auto r : table)
94  {
95  for (auto i = 0; i < r.second; ++i)
96  {
97  tokens->push_back(r.first);
98  }
99  }
100  addUnusedTokens(std::move(tokens));
101 
102  return output;
103  }
104 }
105 
106 DEFINE_ARTDAQ_ROUTING_POLICY(artdaq::NthEventPolicy)
NthEventPolicy(fhicl::ParameterSet ps)
NthEventPolicy Constructor.
An example RoutingMasterPolicy which redirects every Nth event to a desginated destination. Other events are Round-Robin&#39;ed to the other configured destinations.
detail::RoutingPacket GetCurrentTable() override
Construct a Routing Table using the current tokens.