1 #include "artdaq/DAQdata/Globals.hh"
2 #define TRACE_NAME "UDPReceiver"
4 #include "artdaq-ots/Generators/UDPReceiver.hh"
6 #include "artdaq-core/Utilities/SimpleLookupPolicy.hh"
7 #include "artdaq-ots/Overlays/FragmentType.hh"
8 #include "artdaq-ots/Overlays/UDPFragmentWriter.hh"
9 #include "artdaq/Generators/GeneratorMacros.hh"
10 #include "canvas/Utilities/Exception.h"
11 #include "fhiclcpp/ParameterSet.h"
12 #include "otsdaq/Macros/CoutMacros.h"
21 ots::UDPReceiver::UDPReceiver(fhicl::ParameterSet
const& ps)
22 : CommandableFragmentGenerator(ps)
23 , rawOutput_(ps.get<bool>(
"raw_output_enabled", false))
24 , rawPath_(ps.get<std::string>(
"raw_output_path",
"/tmp"))
25 , dataport_(ps.get<int>(
"port", 6343))
26 , ip_(ps.get<std::string>(
"ip",
"127.0.0.1"))
27 , rcvbuf_(ps.get<int>(
"rcvbuf", 0x1000000))
28 , expectedPacketNumber_(0)
30 , sendCommands_(ps.get<bool>(
"send_OtsUDP_commands", false))
31 , receiverThread_(nullptr)
32 , fakeDataMode_(ps.get<bool>(
"fake_data_mode", false))
33 , fragmentWindow_(ps.get<double>(
"fragment_time_window_ms", 1000))
34 , lastFrag_(std::chrono::high_resolution_clock::now())
36 TLOG(TLVL_DEBUG) <<
"Constructor.";
40 datasocket_ = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
43 throw art::Exception(art::errors::Configuration) <<
"UDPReceiver: Error creating socket!";
47 struct sockaddr_in si_me_data;
48 si_me_data.sin_family = AF_INET;
49 si_me_data.sin_port = htons(dataport_);
50 si_me_data.sin_addr.s_addr = htonl(INADDR_ANY);
51 if(bind(datasocket_, (
struct sockaddr*)&si_me_data,
sizeof(si_me_data)) == -1)
53 throw art::Exception(art::errors::Configuration) <<
"UDPReceiver: Cannot bind data socket to port " << dataport_;
62 if(rcvbuf_ > 0 && setsockopt(datasocket_, SOL_SOCKET, SO_RCVBUF, &rcvbuf_,
sizeof(rcvbuf_)))
64 throw art::Exception(art::errors::Configuration) <<
"UDPReceiver: Could not set receive buffer size: " << rcvbuf_;
68 si_data_.sin_family = AF_INET;
69 si_data_.sin_port = htons(dataport_);
70 if(inet_aton(ip_.c_str(), &si_data_.sin_addr) == 0)
72 throw art::Exception(art::errors::Configuration) <<
"UDPReceiver: Could not translate provided IP Address: " << ip_;
76 TLOG(TLVL_INFO) <<
"UDP Receiver Construction Complete!";
78 TLOG(TLVL_DEBUG) <<
"Constructed.";
82 ots::UDPReceiver::~UDPReceiver()
84 TLOG(TLVL_DEBUG) <<
"Destructor.";
87 if(receiverThread_ && receiverThread_->joinable())
90 receiverThread_->join();
91 receiverThread_.reset(
nullptr);
100 TLOG(TLVL_DEBUG) <<
"Destructed.";
104 void ots::UDPReceiver::start()
106 TLOG(TLVL_DEBUG) <<
"Starting...";
108 TLOG(TLVL_INFO) <<
"Starting...";
112 receiverThread_.reset(
new std::thread(&UDPReceiver::receiveLoop_,
this));
116 TLOG(TLVL_DEBUG) <<
"Started.";
120 void ots::UDPReceiver::receiveLoop_()
122 while(!should_stop())
124 struct pollfd ufds[1];
125 ufds[0].fd = datasocket_;
126 ufds[0].events = POLLIN | POLLPRI;
128 int rv = poll(ufds, 1, 1000);
131 TLOG(TLVL_TRACE) <<
"revents: " << ufds[0].revents <<
", ";
132 if(ufds[0].revents == POLLIN || ufds[0].revents == POLLPRI)
137 uint8_t peekBuffer[4];
138 socklen_t dataSz =
sizeof(si_data_);
139 recvfrom(datasocket_, peekBuffer,
sizeof(peekBuffer), MSG_PEEK, (
struct sockaddr*)&si_data_, &dataSz);
141 TLOG(TLVL_TRACE) <<
"Received UDP Datagram with sequence number " << std::hex <<
"0x" <<
static_cast<int>(peekBuffer[1]) <<
"!" << std::dec;
142 TLOG(TLVL_TRACE) <<
"peekBuffer[1] == expectedPacketNumber_: " << std::hex << static_cast<int>(peekBuffer[1])
143 <<
" =?= " << (
int)expectedPacketNumber_;
144 TLOG(TLVL_TRACE) << "peekBuffer: 0: " << std::hex << static_cast<
int>(peekBuffer[0]) << ", 1: " << std::hex << static_cast<
int>(peekBuffer[1])
145 << ", 2: " << std::hex << static_cast<
int>(peekBuffer[2]) << ", 3: " << std::hex << static_cast<
int>(peekBuffer[3]);
147 uint8_t seqNum = peekBuffer[1];
149 if(seqNum >= expectedPacketNumber_ || (seqNum < 64 && expectedPacketNumber_ > 192))
151 if(seqNum != expectedPacketNumber_)
153 int delta = seqNum - expectedPacketNumber_;
154 TLOG(TLVL_WARNING) << std::dec <<
"Sequence Number different than expected! (delta: " << delta <<
")";
155 expectedPacketNumber_ = seqNum;
158 packetBuffer_t receiveBuffer;
159 receiveBuffer.resize(1500);
160 int sts = recvfrom(datasocket_, &receiveBuffer[0], receiveBuffer.size(), 0, (
struct sockaddr*)&si_data_, &dataSz);
161 receiveBuffer.resize(sts);
165 TLOG(TLVL_WARNING) <<
"Error on socket: " << strerror(errno);
169 TLOG(TLVL_TRACE) <<
"Received " << sts <<
" bytes.";
172 std::unique_lock<std::mutex> lock(receiveBufferLock_);
173 TLOG(TLVL_TRACE) <<
"Now placing UDP datagram with sequence number " << std::hex << (int)seqNum <<
" into buffer." << std::dec;
174 receiveBuffers_.push_back(receiveBuffer);
176 ++expectedPacketNumber_;
181 TLOG(TLVL_WARNING) <<
"Received out-of-order datagram: " << seqNum <<
" != " << expectedPacketNumber_ <<
" (expected)";
182 packetBuffer_t receiveBuffer;
183 receiveBuffer.resize(1500);
184 int sts = recvfrom(datasocket_, &receiveBuffer[0], receiveBuffer.size(), 0, (
struct sockaddr*)&si_data_, &dataSz);
185 receiveBuffer.resize(sts);
190 TLOG(TLVL_INFO) <<
"receive Loop exiting...";
194 bool ots::UDPReceiver::getNext_(artdaq::FragmentPtrs& output)
203 std::unique_lock<std::mutex> lock(receiveBufferLock_);
204 std::move(receiveBuffers_.begin(), receiveBuffers_.end(), std::inserter(packetBuffers_, packetBuffers_.end()));
205 receiveBuffers_.clear();
212 for(
int ii = 1; ii < 9; ++ii)
217 packetBuffers_.push_back(pkt);
220 if(packetBuffers_.size() > 0)
222 size_t packetBufferSize = 0;
223 for(
auto& buf : packetBuffers_)
225 packetBufferSize += buf.size();
227 TLOG(TLVL_TRACE) <<
"Calling ProcessData, packetBuffers_.size() == " << std::to_string(packetBuffers_.size())
228 <<
", sz = " << std::to_string(packetBufferSize);
229 ProcessData_(output, packetBufferSize);
231 packetBuffers_.clear();
232 TLOG(TLVL_TRACE) <<
"Returning output of size " << output.size();
243 void ots::UDPReceiver::ProcessData_(artdaq::FragmentPtrs& output,
size_t totalSize)
245 TLOG(TLVL_TRACE) <<
"ProcessData_ start";
247 metadata.port = dataport_;
248 metadata.address = si_data_.sin_addr.s_addr;
250 std::size_t initial_payload_size = 0;
252 TLOG(TLVL_TRACE) <<
"Creating Fragment";
254 output.emplace_back(artdaq::Fragment::FragmentBytes(initial_payload_size, ev_counter(), fragment_id(), ots::detail::FragmentType::UDP, metadata));
258 TLOG(TLVL_TRACE) <<
"Creating UDPFragmentWriter";
262 TLOG(TLVL_TRACE) <<
"Received data, now placing data with UDP sequence number " << std::hex << static_cast<int>((packetBuffers_.front()).at(1))
263 <<
" into UDPFragment";
264 thisFrag.resize(totalSize + 1);
265 std::ofstream rawOutput;
268 std::string outputPath = rawPath_ +
"/UDPReceiver-" + ip_ +
":" + std::to_string(dataport_) +
".bin";
269 rawOutput.open(outputPath, std::ios::out | std::ios::app | std::ios::binary);
272 DataType dataType = getDataType((packetBuffers_.front()).at(0));
273 thisFrag.set_hdr_type((
int)dataType);
275 for(
auto jj = packetBuffers_.begin(); jj != packetBuffers_.end(); ++jj)
277 for(
size_t ii = 0; ii < jj->size(); ++ii)
280 if((jj)->at(ii) == 0 && (dataType == DataType::JSON || dataType == DataType::String))
286 rawOutput.write((
char*)&((jj)->at(ii)),
sizeof(uint8_t));
287 *(thisFrag.dataBegin() + pos) = (jj)->at(ii);
292 if(dataType == DataType::JSON || dataType == DataType::String)
294 *(thisFrag.dataBegin() + pos) = 0;
297 rawOutput.write(&zero,
sizeof(
char));
304 void ots::UDPReceiver::send(CommandType command)
308 CommandPacket packet;
309 packet.type = command;
311 sendto(datasocket_, &packet,
sizeof(packet), 0, (
struct sockaddr*)&si_data_,
sizeof(si_data_));
316 bool ots::UDPReceiver::isTimerExpired_()
318 auto now = std::chrono::high_resolution_clock::now();
319 auto diff = std::chrono::duration<double, std::milli>(now - lastFrag_).count();
320 return diff > fragmentWindow_;
324 void ots::UDPReceiver::stop()
326 TLOG(TLVL_DEBUG) <<
"Stopping...";
331 void ots::UDPReceiver::stopNoMutex()
337 void ots::UDPReceiver::start_()