artdaq_demo  v3_04_00
NthEvent_transfer.cc
1 #include "artdaq/TransferPlugins/TransferInterface.hh"
2 #include "artdaq/TransferPlugins/MakeTransferPlugin.hh"
3 #include "artdaq-core/Data/Fragment.hh"
4 #include "artdaq-core/Utilities/ExceptionHandler.hh"
5 #include "cetlib/BasicPluginFactory.h"
6 
7 #include "messagefacility/MessageLogger/MessageLogger.h"
8 #include "fhiclcpp/ParameterSet.h"
9 
10 #include <boost/tokenizer.hpp>
11 
12 #include <sys/shm.h>
13 #include <memory>
14 #include <iostream>
15 #include <string>
16 #include <limits>
17 #include <sstream>
18 
22 namespace artdaq
23 {
29  {
30  public:
38  NthEventTransfer(fhicl::ParameterSet const& ps, artdaq::TransferInterface::Role role);
39 
46  TransferInterface::CopyStatus
47  transfer_fragment_min_blocking_mode(artdaq::Fragment const& fragment, size_t send_timeout_usec) override;
48 
54  TransferInterface::CopyStatus
56 
64  size_t receiveTimeout) override
65  {
66  // nth-event discarding is done at the send side. Pass receive calls through to underlying transfer
67  return physical_transfer_->receiveFragment(fragment, receiveTimeout);
68  }
69 
76  int receiveFragmentHeader(detail::RawFragmentHeader& header, size_t receiveTimeout) override
77  {
78  return physical_transfer_->receiveFragmentHeader(header, receiveTimeout);
79  }
80 
87  int receiveFragmentData(RawDataType* destination, size_t wordCount) override
88  {
89  return physical_transfer_->receiveFragmentData(destination, wordCount);
90  }
91 
96  int source_rank() const override { return physical_transfer_->source_rank(); }
97 
102  int destination_rank() const override { return physical_transfer_->destination_rank(); }
103 
104 
109  bool isRunning() override { return physical_transfer_->isRunning(); }
110 
115  void flush_buffers() override { physical_transfer_->flush_buffers(); }
116 
117  private:
118 
119  bool pass(const artdaq::Fragment&) const;
120 
121  std::unique_ptr<TransferInterface> physical_transfer_;
122  size_t nth_;
123  size_t offset_;
124  };
125 
126  NthEventTransfer::NthEventTransfer(fhicl::ParameterSet const& pset, artdaq::TransferInterface::Role role) :
127  TransferInterface(pset, role)
128  , nth_(pset.get<size_t>("nth")),
129  offset_(pset.get<size_t>("offset", 0))
130  {
131  if (pset.has_key("source_rank") || pset.has_key("destination_rank")) {
132  throw cet::exception("NthEvent") << "The parameters \"source_rank\" and \"destination_rank\" must be explicitly defined in the body of the physical_transfer_plugin table, and not outside of it";
133  }
134 
135 
136  if (offset_ >= nth_) {
137  throw cet::exception("NthEvent") << "Offset value of " << offset_ <<
138  " must not be larger than the modulus value of " << nth_;
139  }
140 
141  if (nth_ == 0)
142  {
143  mf::LogWarning("NthEventTransfer") << "0 was passed as the nth parameter to NthEventTransfer. Will change to 1 (0 is undefined behavior)";
144  nth_ = 1;
145  }
146  // Instantiate the TransferInterface plugin used to effect transfers
147  physical_transfer_ = MakeTransferPlugin(pset, "physical_transfer_plugin", role);
148  }
149 
150 
151  TransferInterface::CopyStatus
153  size_t send_timeout_usec)
154  {
155 
156  if (!pass(fragment))
157  {
158  // Do not transfer but return success. Fragment is discarded
159  return TransferInterface::CopyStatus::kSuccess;
160  }
161 
162  // This is the nth Fragment, transfer
163  return physical_transfer_->transfer_fragment_min_blocking_mode(fragment, send_timeout_usec);
164  }
165 
166  TransferInterface::CopyStatus
168  {
169  if (!pass(fragment))
170  {
171  // Do not transfer but return success. Fragment is discarded
172  return TransferInterface::CopyStatus::kSuccess;
173  }
174 
175  // This is the nth Fragment, transfer
176  return physical_transfer_->transfer_fragment_reliable_mode(std::move(fragment));
177  }
178 
179  bool
180  NthEventTransfer::pass(const artdaq::Fragment& fragment) const
181  {
182  bool passed = false;
183 
184  if (fragment.type() == artdaq::Fragment::DataFragmentType) {
185  passed = (fragment.sequenceID() + nth_ - offset_) % nth_ == 0 ? true : false;
186  }
187  else {
188  passed = true;
189  }
190 
191  return passed;
192  }
193 }
194 
195 DEFINE_ARTDAQ_TRANSFER(artdaq::NthEventTransfer)
196 
197 // Local Variables:
198 // mode: c++
199 // End:
Demonstration TransferInterface plugin showing how to discard events Intended for use in the transfer...
int receiveFragment(artdaq::Fragment &fragment, size_t receiveTimeout) override
Receive a fragment from the transfer plugin.
int receiveFragmentHeader(detail::RawFragmentHeader &header, size_t receiveTimeout) override
Receive a Fragment Header from the transport mechanism.
sequence_id_t sequenceID() const
static constexpr type_t DataFragmentType
TransferInterface::CopyStatus transfer_fragment_min_blocking_mode(artdaq::Fragment const &fragment, size_t send_timeout_usec) override
Transfer a Fragment to the destination. May not necessarily be reliable, but will not block longer th...
bool isRunning() override
Determine whether the TransferInterface plugin is able to send/receive data.
type_t type() const
NthEventTransfer(fhicl::ParameterSet const &ps, artdaq::TransferInterface::Role role)
NthEventTransfer Constructor.
detail::RawFragmentHeader::RawDataType RawDataType
int destination_rank() const override
Get the destination rank from the physical transfer.
void flush_buffers() override
Flush any in-flight data. This should be used by the receiver after the receive loop has ended...
TransferInterface::CopyStatus transfer_fragment_reliable_mode(artdaq::Fragment &&fragment) override
Copy a fragment, using the reliable channel. moveFragment assumes ownership of the fragment...
int receiveFragmentData(RawDataType *destination, size_t wordCount) override
Receive the body of a Fragment to the given destination pointer.
int source_rank() const override
Get the source rank from the physical transfer.