1 #include "otsdaq/DataProcessorPlugins/UDPDataListenerProducer.h"
2 #include "otsdaq/Macros/CoutMacros.h"
3 #include "otsdaq/Macros/ProcessorPluginMacros.h"
4 #include "otsdaq/MessageFacility/MessageFacility.h"
5 #include "otsdaq/NetworkUtilities/NetworkConverters.h"
15 UDPDataListenerProducer::UDPDataListenerProducer(std::string supervisorApplicationUID,
16 std::string bufferUID,
17 std::string processorUID,
19 const std::string& configurationPath)
21 ,
Socket(theXDAQContextConfigTree.getNode(configurationPath).getNode(
"HostIPAddress").getValue<std::string>(),
22 theXDAQContextConfigTree.getNode(configurationPath).getNode(
"HostPort").getValue<unsigned int>())
25 supervisorApplicationUID, bufferUID, processorUID, theXDAQContextConfigTree.getNode(configurationPath).getNode(
"BufferSize").getValue<unsigned int>())
27 ,
Configurable(theXDAQContextConfigTree, configurationPath)
31 unsigned int socketReceiveBufferSize;
34 socketReceiveBufferSize = theXDAQContextConfigTree.getNode(configurationPath).getNode(
"SocketReceiveBufferSize").getValue<
unsigned int>();
39 socketReceiveBufferSize = 0x10000000;
42 Socket::initialize(socketReceiveBufferSize);
46 UDPDataListenerProducer::~UDPDataListenerProducer(
void) {}
49 bool UDPDataListenerProducer::workLoopThread(toolbox::task::WorkLoop* )
55 return WorkLoop::continueWorkLoop_;
59 void UDPDataListenerProducer::slowWrite(
void)
63 if(ReceiverSocket::receive(data_, ipAddress_, port_) != -1)
68 header_[
"IPAddress"] = NetworkConverters::networkToStringIP(ipAddress_);
69 header_[
"Port"] = NetworkConverters::networkToStringPort(port_);
75 while(DataProducer::write(data_, header_) < 0)
77 __CFG_COUT__ <<
"There are no available buffers! Retrying...after waiting 10 "
87 void UDPDataListenerProducer::fastWrite(
void)
91 if(DataProducer::attachToEmptySubBuffer(dataP_, headerP_) < 0)
93 __CFG_COUT__ <<
"There are no available buffers! Retrying...after waiting 10 milliseconds!" << std::endl;
123 if(ReceiverSocket::receive(*dataP_, ipAddress_, port_, 1, 0,
true) != -1)
125 header_[
"IPAddress"] = NetworkConverters::networkToStringIP(ipAddress_);
126 header_[
"Port"] = NetworkConverters::networkToStringPort(port_);
127 __CFG_COUT__ <<
"Received data IP: " << header_[
"IPAddress"] <<
" port: " << header_[
"Port"] << __E__;
128 DataProducer::setWrittenSubBuffer<std::string, std::map<std::string, std::string> >();