tdaq-develop-2025-02-12
UDPReceiver_generator.cc
1 
2 
3 #define TRACEMF_USE_VERBATIM 1 // for trace longer path filenames
4 #include "artdaq/DAQdata/Globals.hh"
5 #define TRACE_NAME "UDPReceiver"
6 
7 #include "artdaq-ots/Generators/UDPReceiver.hh"
8 
9 #include "artdaq-core/Utilities/SimpleLookupPolicy.hh"
10 #include "artdaq-ots/Overlays/FragmentType.hh"
11 #include "artdaq-ots/Overlays/UDPFragmentWriter.hh"
12 #include "artdaq/Generators/GeneratorMacros.hh"
13 #include "canvas/Utilities/Exception.h"
14 #include "fhiclcpp/ParameterSet.h"
15 #include "otsdaq/Macros/CoutMacros.h"
16 
17 #include <sys/poll.h>
18 #include <fstream>
19 #include <iomanip>
20 #include <iostream>
21 #include <iterator>
22 
23 //==============================================================================
24 ots::UDPReceiver::UDPReceiver(fhicl::ParameterSet const& ps)
25  : CommandableFragmentGenerator(ps)
26  , rawOutput_(ps.get<bool>("raw_output_enabled", false))
27  , rawPath_(ps.get<std::string>("raw_output_path", "/tmp"))
28  , dataport_(ps.get<int>("port", 6343))
29  , ip_(ps.get<std::string>("ip", "127.0.0.1"))
30  , rcvbuf_(ps.get<int>("rcvbuf", 0x1000000))
31  , expectedPacketNumber_(0)
32  , datasocket_(-1)
33  , sendCommands_(ps.get<bool>("send_OtsUDP_commands", false))
34  , receiverThread_(nullptr)
35  , fakeDataMode_(ps.get<bool>("fake_data_mode", false))
36  , fragmentWindow_(ps.get<double>("fragment_time_window_ms", 1000))
37  , lastFrag_(std::chrono::high_resolution_clock::now())
38 {
39  TLOG(TLVL_DEBUG) << "Constructor.";
40 
41  if(!fakeDataMode_)
42  {
43  datasocket_ = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
44  if(!datasocket_)
45  {
46  throw art::Exception(art::errors::Configuration)
47  << "UDPReceiver: Error creating socket!";
48  exit(1);
49  }
50 
51  struct sockaddr_in si_me_data;
52  si_me_data.sin_family = AF_INET;
53  si_me_data.sin_port = htons(dataport_);
54  si_me_data.sin_addr.s_addr = htonl(INADDR_ANY);
55  if(bind(datasocket_, (struct sockaddr*)&si_me_data, sizeof(si_me_data)) == -1)
56  {
57  throw art::Exception(art::errors::Configuration)
58  << "UDPReceiver: Cannot bind data socket to port " << dataport_;
59  exit(1);
60  }
61  /*if(fcntl(datasocket_, F_SETFL, O_NONBLOCK) == -1) {
62 
63  throw art::Exception(art::errors::Configuration) << "UDPReceiver: Cannot set
64  socket to nonblocking!" ;
65  }*/
66 
67  if(rcvbuf_ > 0 &&
68  setsockopt(datasocket_, SOL_SOCKET, SO_RCVBUF, &rcvbuf_, sizeof(rcvbuf_)))
69  {
70  throw art::Exception(art::errors::Configuration)
71  << "UDPReceiver: Could not set receive buffer size: " << rcvbuf_;
72  exit(1);
73  }
74 
75  si_data_.sin_family = AF_INET;
76  si_data_.sin_port = htons(dataport_);
77  if(inet_aton(ip_.c_str(), &si_data_.sin_addr) == 0)
78  {
79  throw art::Exception(art::errors::Configuration)
80  << "UDPReceiver: Could not translate provided IP Address: " << ip_;
81  exit(1);
82  }
83  }
84  TLOG(TLVL_INFO) << "UDP Receiver Construction Complete!";
85 
86  TLOG(TLVL_DEBUG) << "Constructed.";
87 } // end constructor()
88 
89 //==============================================================================
90 ots::UDPReceiver::~UDPReceiver()
91 {
92  TLOG(TLVL_DEBUG) << "Destructor.";
93 
94  // join waits for thread to complete
95  if(receiverThread_ && receiverThread_->joinable())
96  {
97  // only join if thread has started
98  receiverThread_->join();
99  receiverThread_.reset(nullptr);
100  }
101 
102  if(datasocket_)
103  {
104  close(datasocket_);
105  datasocket_ = -1;
106  }
107 
108  TLOG(TLVL_DEBUG) << "Destructed.";
109 } // end destructor()
110 
111 //==============================================================================
112 void ots::UDPReceiver::start()
113 {
114  TLOG(TLVL_DEBUG) << "Starting...";
115 
116  TLOG(TLVL_INFO) << "Starting...";
117 
118  if(!fakeDataMode_)
119  {
120  receiverThread_.reset(new std::thread(&UDPReceiver::receiveLoop_, this));
121  }
122  start_();
123 
124  TLOG(TLVL_DEBUG) << "Started.";
125 } // end start()
126 
127 //==============================================================================
128 void ots::UDPReceiver::receiveLoop_()
129 {
130  while(!should_stop())
131  {
132  struct pollfd ufds[1];
133  ufds[0].fd = datasocket_;
134  ufds[0].events = POLLIN | POLLPRI;
135 
136  int rv = poll(ufds, 1, 1000);
137  if(rv > 0)
138  {
139  TLOG(TLVL_TRACE) << "revents: " << ufds[0].revents
140  << ", "; // ufds[1].revents ;
141  if(ufds[0].revents == POLLIN || ufds[0].revents == POLLPRI)
142  {
143  // FIXME -> IN THE STIB GENERATOR WE DON'T HAVE A HEADER
144  // FIXME -> IN THE STIB GENERATOR WE DON'T HAVE A HEADER
145  // FIXME -> IN THE STIB GENERATOR WE DON'T HAVE A HEADER
146  uint8_t peekBuffer[4];
147  socklen_t dataSz = sizeof(si_data_);
148  recvfrom(datasocket_,
149  peekBuffer,
150  sizeof(peekBuffer),
151  MSG_PEEK,
152  (struct sockaddr*)&si_data_,
153  &dataSz);
154 
155  TLOG(TLVL_TRACE)
156  << "Received UDP Datagram with sequence number " << std::hex << "0x"
157  << static_cast<int>(peekBuffer[1]) << "!" << std::dec;
158  TLOG(TLVL_TRACE) << "peekBuffer[1] == expectedPacketNumber_: " << std::hex
159  << static_cast<int>(peekBuffer[1])
160  << " =?= " << (int)expectedPacketNumber_;
161  TLOG(TLVL_TRACE)
162  << "peekBuffer: 0: " << std::hex << static_cast<int>(peekBuffer[0])
163  << ", 1: " << std::hex << static_cast<int>(peekBuffer[1])
164  << ", 2: " << std::hex << static_cast<int>(peekBuffer[2])
165  << ", 3: " << std::hex << static_cast<int>(peekBuffer[3]);
166 
167  uint8_t seqNum = peekBuffer[1];
168  // ReturnCode dataCode = getReturnCode(peekBuffer[0]);
169  if(seqNum >= expectedPacketNumber_ ||
170  (seqNum < 64 && expectedPacketNumber_ > 192))
171  {
172  if(seqNum != expectedPacketNumber_)
173  {
174  int delta = seqNum - expectedPacketNumber_;
175  TLOG(TLVL_WARNING)
176  << std::dec
177  << "Sequence Number different than expected! (delta: "
178  << delta << ")";
179  expectedPacketNumber_ = seqNum;
180  }
181 
182  packetBuffer_t receiveBuffer;
183  receiveBuffer.resize(1500);
184  int sts = recvfrom(datasocket_,
185  &receiveBuffer[0],
186  receiveBuffer.size(),
187  0,
188  (struct sockaddr*)&si_data_,
189  &dataSz);
190  receiveBuffer.resize(sts);
191 
192  if(sts == -1)
193  {
194  TLOG(TLVL_WARNING) << "Error on socket: " << strerror(errno);
195  }
196  else
197  {
198  TLOG(TLVL_TRACE) << "Received " << sts << " bytes.";
199  }
200 
201  std::unique_lock<std::mutex> lock(receiveBufferLock_);
202  TLOG(TLVL_TRACE)
203  << "Now placing UDP datagram with sequence number " << std::hex
204  << (int)seqNum << " into buffer." << std::dec;
205  receiveBuffers_.push_back(receiveBuffer);
206 
207  ++expectedPacketNumber_;
208  }
209  else
210  {
211  // Receiving out-of-order datagram, then moving on...
212  TLOG(TLVL_WARNING)
213  << "Received out-of-order datagram: " << seqNum
214  << " != " << expectedPacketNumber_ << " (expected)";
215  packetBuffer_t receiveBuffer;
216  receiveBuffer.resize(1500);
217  int sts = recvfrom(datasocket_,
218  &receiveBuffer[0],
219  receiveBuffer.size(),
220  0,
221  (struct sockaddr*)&si_data_,
222  &dataSz);
223  receiveBuffer.resize(sts);
224  }
225  }
226  }
227  }
228  TLOG(TLVL_INFO) << "receive Loop exiting...";
229 }
230 
231 //==============================================================================
232 bool ots::UDPReceiver::getNext_(artdaq::FragmentPtrs& output)
233 {
234  if(should_stop())
235  {
236  return false;
237  }
238 
239  if(!fakeDataMode_)
240  {
241  std::unique_lock<std::mutex> lock(receiveBufferLock_);
242  std::move(receiveBuffers_.begin(),
243  receiveBuffers_.end(),
244  std::inserter(packetBuffers_, packetBuffers_.end()));
245  receiveBuffers_.clear();
246  }
247  else
248  {
249  packetBuffer_t pkt; // std::string
250  pkt.resize(10);
251  pkt[0] = 0x23; // Data type string, return code last
252  for(int ii = 1; ii < 9; ++ii)
253  {
254  pkt[ii] = ii;
255  }
256  pkt[9] = 0;
257  packetBuffers_.push_back(pkt);
258  }
259 
260  if(packetBuffers_.size() > 0)
261  {
262  size_t packetBufferSize = 0;
263  for(auto& buf : packetBuffers_)
264  {
265  packetBufferSize += buf.size();
266  }
267  TLOG(TLVL_TRACE) << "Calling ProcessData, packetBuffers_.size() == "
268  << std::to_string(packetBuffers_.size())
269  << ", sz = " << std::to_string(packetBufferSize);
270  ProcessData_(output, packetBufferSize);
271 
272  packetBuffers_.clear();
273  TLOG(TLVL_TRACE) << "Returning output of size " << output.size();
274  }
275  else
276  {
277  // Sleep 10 times per poll timeout
278  usleep(100000);
279  }
280  return true;
281 }
282 
283 //==============================================================================
284 void ots::UDPReceiver::ProcessData_(artdaq::FragmentPtrs& output, size_t totalSize)
285 {
286  TLOG(TLVL_TRACE) << "ProcessData_ start";
288  metadata.port = dataport_;
289  metadata.address = si_data_.sin_addr.s_addr;
290 
291  std::size_t initial_payload_size = 0;
292 
293  TLOG(TLVL_TRACE) << "Creating Fragment";
294 
295  output.emplace_back(artdaq::Fragment::FragmentBytes(initial_payload_size,
296  ev_counter(),
297  fragment_id(),
298  ots::detail::FragmentType::UDP,
299  metadata));
300 
301  ev_counter_inc();
302 
303  TLOG(TLVL_TRACE) << "Creating UDPFragmentWriter";
304  // We now have a fragment to contain this event:
305  ots::UDPFragmentWriter thisFrag(*output.back());
306 
307  TLOG(TLVL_TRACE) << "Received data, now placing data with UDP sequence number "
308  << std::hex << static_cast<int>((packetBuffers_.front()).at(1))
309  << " into UDPFragment";
310  thisFrag.resize(totalSize + 1);
311  std::ofstream rawOutput;
312  if(rawOutput_)
313  {
314  std::string outputPath =
315  rawPath_ + "/UDPReceiver-" + ip_ + ":" + std::to_string(dataport_) + ".bin";
316  rawOutput.open(outputPath, std::ios::out | std::ios::app | std::ios::binary);
317  }
318 
319  DataType dataType = getDataType((packetBuffers_.front()).at(0));
320  thisFrag.set_hdr_type((int)dataType);
321  int pos = 0;
322  for(auto jj = packetBuffers_.begin(); jj != packetBuffers_.end(); ++jj)
323  {
324  for(size_t ii = 0; ii < jj->size(); ++ii)
325  {
326  // Null-terminate string types
327  if((jj)->at(ii) == 0 &&
328  (dataType == DataType::JSON || dataType == DataType::String))
329  {
330  break;
331  }
332 
333  if(rawOutput_)
334  rawOutput.write((char*)&((jj)->at(ii)), sizeof(uint8_t));
335  *(thisFrag.dataBegin() + pos) = (jj)->at(ii);
336  ++pos;
337  }
338  }
339 
340  if(dataType == DataType::JSON || dataType == DataType::String)
341  {
342  *(thisFrag.dataBegin() + pos) = 0;
343  char zero = 0;
344  if(rawOutput_)
345  rawOutput.write(&zero, sizeof(char));
346  }
347  if(rawOutput_)
348  rawOutput.close();
349 }
350 
351 //==============================================================================
352 void ots::UDPReceiver::send(CommandType command)
353 {
354  if(sendCommands_)
355  {
356  CommandPacket packet;
357  packet.type = command;
358  packet.dataSize = 0;
359  sendto(datasocket_,
360  &packet,
361  sizeof(packet),
362  0,
363  (struct sockaddr*)&si_data_,
364  sizeof(si_data_));
365  }
366 }
367 
368 //==============================================================================
369 bool ots::UDPReceiver::isTimerExpired_()
370 {
371  auto now = std::chrono::high_resolution_clock::now();
372  auto diff = std::chrono::duration<double, std::milli>(now - lastFrag_).count();
373  return diff > fragmentWindow_;
374 }
375 
376 //==============================================================================
377 void ots::UDPReceiver::stop()
378 {
379  TLOG(TLVL_DEBUG) << "Stopping...";
380  // #pragma message "Using default implementation of UDPReceiver::stop()"
381 }
382 
383 //==============================================================================
384 void ots::UDPReceiver::stopNoMutex()
385 {
386  // #pragma message "Using default implementation of UDPReceiver::stopNoMutex()"
387 }
388 
389 //==============================================================================
390 void ots::UDPReceiver::start_()
391 {
392  // #pragma message "Using default implementation of UDPReceiver::start_()"
393 }
394 
396 DEFINE_ARTDAQ_COMMANDABLE_GENERATOR(ots::UDPReceiver)
bool getNext_(artdaq::FragmentPtrs &output) override