otsdaq  v2_05_02_indev
UDPReceiver_generator.cc
1 #include "artdaq/DAQdata/Globals.hh"
2 #define TRACE_NAME "UDPReceiver"
3 
4 #include "artdaq-ots/Generators/UDPReceiver.hh"
5 
6 #include "artdaq-core/Utilities/SimpleLookupPolicy.hh"
7 #include "artdaq-ots/Overlays/FragmentType.hh"
8 #include "artdaq-ots/Overlays/UDPFragmentWriter.hh"
9 #include "artdaq/Generators/GeneratorMacros.hh"
10 #include "canvas/Utilities/Exception.h"
11 #include "fhiclcpp/ParameterSet.h"
12 #include "otsdaq/Macros/CoutMacros.h"
13 
14 #include <sys/poll.h>
15 #include <fstream>
16 #include <iomanip>
17 #include <iostream>
18 #include <iterator>
19 
20 //==============================================================================
21 ots::UDPReceiver::UDPReceiver(fhicl::ParameterSet const& ps)
22  : CommandableFragmentGenerator(ps)
23  , rawOutput_(ps.get<bool>("raw_output_enabled", false))
24  , rawPath_(ps.get<std::string>("raw_output_path", "/tmp"))
25  , dataport_(ps.get<int>("port", 6343))
26  , ip_(ps.get<std::string>("ip", "127.0.0.1"))
27  , rcvbuf_(ps.get<int>("rcvbuf", 0x1000000))
28  , expectedPacketNumber_(0)
29  , datasocket_(-1)
30  , sendCommands_(ps.get<bool>("send_OtsUDP_commands", false))
31  , receiverThread_(nullptr)
32  , fakeDataMode_(ps.get<bool>("fake_data_mode", false))
33  , fragmentWindow_(ps.get<double>("fragment_time_window_ms", 1000))
34  , lastFrag_(std::chrono::high_resolution_clock::now())
35 {
36  TLOG(TLVL_DEBUG) << "Constructor.";
37 
38  if(!fakeDataMode_)
39  {
40  datasocket_ = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
41  if(!datasocket_)
42  {
43  throw art::Exception(art::errors::Configuration) << "UDPReceiver: Error creating socket!";
44  exit(1);
45  }
46 
47  struct sockaddr_in si_me_data;
48  si_me_data.sin_family = AF_INET;
49  si_me_data.sin_port = htons(dataport_);
50  si_me_data.sin_addr.s_addr = htonl(INADDR_ANY);
51  if(bind(datasocket_, (struct sockaddr*)&si_me_data, sizeof(si_me_data)) == -1)
52  {
53  throw art::Exception(art::errors::Configuration) << "UDPReceiver: Cannot bind data socket to port " << dataport_;
54  exit(1);
55  }
56  /*if(fcntl(datasocket_, F_SETFL, O_NONBLOCK) == -1) {
57 
58  throw art::Exception(art::errors::Configuration) << "UDPReceiver: Cannot set
59  socket to nonblocking!" ;
60  }*/
61 
62  if(rcvbuf_ > 0 && setsockopt(datasocket_, SOL_SOCKET, SO_RCVBUF, &rcvbuf_, sizeof(rcvbuf_)))
63  {
64  throw art::Exception(art::errors::Configuration) << "UDPReceiver: Could not set receive buffer size: " << rcvbuf_;
65  exit(1);
66  }
67 
68  si_data_.sin_family = AF_INET;
69  si_data_.sin_port = htons(dataport_);
70  if(inet_aton(ip_.c_str(), &si_data_.sin_addr) == 0)
71  {
72  throw art::Exception(art::errors::Configuration) << "UDPReceiver: Could not translate provided IP Address: " << ip_;
73  exit(1);
74  }
75  }
76  TLOG(TLVL_INFO) << "UDP Receiver Construction Complete!";
77 
78  TLOG(TLVL_DEBUG) << "Constructed.";
79 } // end constructor()
80 
81 //==============================================================================
82 ots::UDPReceiver::~UDPReceiver()
83 {
84  TLOG(TLVL_DEBUG) << "Destructor.";
85 
86  // join waits for thread to complete
87  if(receiverThread_ && receiverThread_->joinable())
88  {
89  // only join if thread has started
90  receiverThread_->join();
91  receiverThread_.reset(nullptr);
92  }
93 
94  if(datasocket_)
95  {
96  close(datasocket_);
97  datasocket_ = -1;
98  }
99 
100  TLOG(TLVL_DEBUG) << "Destructed.";
101 } // end destructor()
102 
103 //==============================================================================
104 void ots::UDPReceiver::start()
105 {
106  TLOG(TLVL_DEBUG) << "Starting...";
107 
108  TLOG(TLVL_INFO) << "Starting...";
109 
110  if(!fakeDataMode_)
111  {
112  receiverThread_.reset(new std::thread(&UDPReceiver::receiveLoop_, this));
113  }
114  start_();
115 
116  TLOG(TLVL_DEBUG) << "Started.";
117 } // end start()
118 
119 //==============================================================================
120 void ots::UDPReceiver::receiveLoop_()
121 {
122  while(!should_stop())
123  {
124  struct pollfd ufds[1];
125  ufds[0].fd = datasocket_;
126  ufds[0].events = POLLIN | POLLPRI;
127 
128  int rv = poll(ufds, 1, 1000);
129  if(rv > 0)
130  {
131  TLOG(TLVL_TRACE) << "revents: " << ufds[0].revents << ", "; // ufds[1].revents ;
132  if(ufds[0].revents == POLLIN || ufds[0].revents == POLLPRI)
133  {
134  // FIXME -> IN THE STIB GENERATOR WE DON'T HAVE A HEADER
135  // FIXME -> IN THE STIB GENERATOR WE DON'T HAVE A HEADER
136  // FIXME -> IN THE STIB GENERATOR WE DON'T HAVE A HEADER
137  uint8_t peekBuffer[4];
138  socklen_t dataSz = sizeof(si_data_);
139  recvfrom(datasocket_, peekBuffer, sizeof(peekBuffer), MSG_PEEK, (struct sockaddr*)&si_data_, &dataSz);
140 
141  TLOG(TLVL_TRACE) << "Received UDP Datagram with sequence number " << std::hex << "0x" << static_cast<int>(peekBuffer[1]) << "!" << std::dec;
142  TLOG(TLVL_TRACE) << "peekBuffer[1] == expectedPacketNumber_: " << std::hex << static_cast<int>(peekBuffer[1])
143  << " =?= " << (int)expectedPacketNumber_;
144  TLOG(TLVL_TRACE) << "peekBuffer: 0: " << std::hex << static_cast<int>(peekBuffer[0]) << ", 1: " << std::hex << static_cast<int>(peekBuffer[1])
145  << ", 2: " << std::hex << static_cast<int>(peekBuffer[2]) << ", 3: " << std::hex << static_cast<int>(peekBuffer[3]);
146 
147  uint8_t seqNum = peekBuffer[1];
148  // ReturnCode dataCode = getReturnCode(peekBuffer[0]);
149  if(seqNum >= expectedPacketNumber_ || (seqNum < 64 && expectedPacketNumber_ > 192))
150  {
151  if(seqNum != expectedPacketNumber_)
152  {
153  int delta = seqNum - expectedPacketNumber_;
154  TLOG(TLVL_WARNING) << std::dec << "Sequence Number different than expected! (delta: " << delta << ")";
155  expectedPacketNumber_ = seqNum;
156  }
157 
158  packetBuffer_t receiveBuffer;
159  receiveBuffer.resize(1500);
160  int sts = recvfrom(datasocket_, &receiveBuffer[0], receiveBuffer.size(), 0, (struct sockaddr*)&si_data_, &dataSz);
161  receiveBuffer.resize(sts);
162 
163  if(sts == -1)
164  {
165  TLOG(TLVL_WARNING) << "Error on socket: " << strerror(errno);
166  }
167  else
168  {
169  TLOG(TLVL_TRACE) << "Received " << sts << " bytes.";
170  }
171 
172  std::unique_lock<std::mutex> lock(receiveBufferLock_);
173  TLOG(TLVL_TRACE) << "Now placing UDP datagram with sequence number " << std::hex << (int)seqNum << " into buffer." << std::dec;
174  receiveBuffers_.push_back(receiveBuffer);
175 
176  ++expectedPacketNumber_;
177  }
178  else
179  {
180  // Receiving out-of-order datagram, then moving on...
181  TLOG(TLVL_WARNING) << "Received out-of-order datagram: " << seqNum << " != " << expectedPacketNumber_ << " (expected)";
182  packetBuffer_t receiveBuffer;
183  receiveBuffer.resize(1500);
184  int sts = recvfrom(datasocket_, &receiveBuffer[0], receiveBuffer.size(), 0, (struct sockaddr*)&si_data_, &dataSz);
185  receiveBuffer.resize(sts);
186  }
187  }
188  }
189  }
190  TLOG(TLVL_INFO) << "receive Loop exiting...";
191 }
192 
193 //==============================================================================
194 bool ots::UDPReceiver::getNext_(artdaq::FragmentPtrs& output)
195 {
196  if(should_stop())
197  {
198  return false;
199  }
200 
201  if(!fakeDataMode_)
202  {
203  std::unique_lock<std::mutex> lock(receiveBufferLock_);
204  std::move(receiveBuffers_.begin(), receiveBuffers_.end(), std::inserter(packetBuffers_, packetBuffers_.end()));
205  receiveBuffers_.clear();
206  }
207  else
208  {
209  packetBuffer_t pkt; // std::string
210  pkt.resize(10);
211  pkt[0] = 0x23; // Data type string, return code last
212  for(int ii = 1; ii < 9; ++ii)
213  {
214  pkt[ii] = ii;
215  }
216  pkt[9] = 0;
217  packetBuffers_.push_back(pkt);
218  }
219 
220  if(packetBuffers_.size() > 0)
221  {
222  size_t packetBufferSize = 0;
223  for(auto& buf : packetBuffers_)
224  {
225  packetBufferSize += buf.size();
226  }
227  TLOG(TLVL_TRACE) << "Calling ProcessData, packetBuffers_.size() == " << std::to_string(packetBuffers_.size())
228  << ", sz = " << std::to_string(packetBufferSize);
229  ProcessData_(output, packetBufferSize);
230 
231  packetBuffers_.clear();
232  TLOG(TLVL_TRACE) << "Returning output of size " << output.size();
233  }
234  else
235  {
236  // Sleep 10 times per poll timeout
237  usleep(100000);
238  }
239  return true;
240 }
241 
242 //==============================================================================
243 void ots::UDPReceiver::ProcessData_(artdaq::FragmentPtrs& output, size_t totalSize)
244 {
245  TLOG(TLVL_TRACE) << "ProcessData_ start";
247  metadata.port = dataport_;
248  metadata.address = si_data_.sin_addr.s_addr;
249 
250  std::size_t initial_payload_size = 0;
251 
252  TLOG(TLVL_TRACE) << "Creating Fragment";
253 
254  output.emplace_back(artdaq::Fragment::FragmentBytes(initial_payload_size, ev_counter(), fragment_id(), ots::detail::FragmentType::UDP, metadata));
255 
256  ev_counter_inc();
257 
258  TLOG(TLVL_TRACE) << "Creating UDPFragmentWriter";
259  // We now have a fragment to contain this event:
260  ots::UDPFragmentWriter thisFrag(*output.back());
261 
262  TLOG(TLVL_TRACE) << "Received data, now placing data with UDP sequence number " << std::hex << static_cast<int>((packetBuffers_.front()).at(1))
263  << " into UDPFragment";
264  thisFrag.resize(totalSize + 1);
265  std::ofstream rawOutput;
266  if(rawOutput_)
267  {
268  std::string outputPath = rawPath_ + "/UDPReceiver-" + ip_ + ":" + std::to_string(dataport_) + ".bin";
269  rawOutput.open(outputPath, std::ios::out | std::ios::app | std::ios::binary);
270  }
271 
272  DataType dataType = getDataType((packetBuffers_.front()).at(0));
273  thisFrag.set_hdr_type((int)dataType);
274  int pos = 0;
275  for(auto jj = packetBuffers_.begin(); jj != packetBuffers_.end(); ++jj)
276  {
277  for(size_t ii = 0; ii < jj->size(); ++ii)
278  {
279  // Null-terminate string types
280  if((jj)->at(ii) == 0 && (dataType == DataType::JSON || dataType == DataType::String))
281  {
282  break;
283  }
284 
285  if(rawOutput_)
286  rawOutput.write((char*)&((jj)->at(ii)), sizeof(uint8_t));
287  *(thisFrag.dataBegin() + pos) = (jj)->at(ii);
288  ++pos;
289  }
290  }
291 
292  if(dataType == DataType::JSON || dataType == DataType::String)
293  {
294  *(thisFrag.dataBegin() + pos) = 0;
295  char zero = 0;
296  if(rawOutput_)
297  rawOutput.write(&zero, sizeof(char));
298  }
299  if(rawOutput_)
300  rawOutput.close();
301 }
302 
303 //==============================================================================
304 void ots::UDPReceiver::send(CommandType command)
305 {
306  if(sendCommands_)
307  {
308  CommandPacket packet;
309  packet.type = command;
310  packet.dataSize = 0;
311  sendto(datasocket_, &packet, sizeof(packet), 0, (struct sockaddr*)&si_data_, sizeof(si_data_));
312  }
313 }
314 
315 //==============================================================================
316 bool ots::UDPReceiver::isTimerExpired_()
317 {
318  auto now = std::chrono::high_resolution_clock::now();
319  auto diff = std::chrono::duration<double, std::milli>(now - lastFrag_).count();
320  return diff > fragmentWindow_;
321 }
322 
323 //==============================================================================
324 void ots::UDPReceiver::stop()
325 {
326  TLOG(TLVL_DEBUG) << "Stopping...";
327  //#pragma message "Using default implementation of UDPReceiver::stop()"
328 }
329 
330 //==============================================================================
331 void ots::UDPReceiver::stopNoMutex()
332 {
333  //#pragma message "Using default implementation of UDPReceiver::stopNoMutex()"
334 }
335 
336 //==============================================================================
337 void ots::UDPReceiver::start_()
338 {
339  //#pragma message "Using default implementation of UDPReceiver::start_()"
340 }
341 
342 // The following macro is defined in artdaq's GeneratorMacros.hh header
343 DEFINE_ARTDAQ_COMMANDABLE_GENERATOR(ots::UDPReceiver)