1 #include "otsdaq/DataProcessorPlugins/UDPDataListenerProducer.h"
2 #include "otsdaq/Macros/CoutMacros.h"
3 #include "otsdaq/Macros/ProcessorPluginMacros.h"
4 #include "otsdaq/MessageFacility/MessageFacility.h"
5 #include "otsdaq/NetworkUtilities/NetworkConverters.h"
15 UDPDataListenerProducer::UDPDataListenerProducer(
16 std::string supervisorApplicationUID,
17 std::string bufferUID,
18 std::string processorUID,
20 const std::string& configurationPath)
22 ,
Socket(theXDAQContextConfigTree.getNode(configurationPath)
23 .getNode(
"HostIPAddress")
24 .getValue<std::string>(),
25 theXDAQContextConfigTree.getNode(configurationPath)
27 .getValue<unsigned int>())
32 theXDAQContextConfigTree.getNode(configurationPath)
33 .getNode(
"BufferSize")
34 .getValue<unsigned int>())
36 ,
Configurable(theXDAQContextConfigTree, configurationPath)
40 unsigned int socketReceiveBufferSize;
43 socketReceiveBufferSize = theXDAQContextConfigTree.
getNode(configurationPath)
44 .
getNode(
"SocketReceiveBufferSize")
50 socketReceiveBufferSize = 0x10000000;
53 Socket::initialize(socketReceiveBufferSize);
57 UDPDataListenerProducer::~UDPDataListenerProducer(
void) {}
66 return WorkLoop::continueWorkLoop_;
70 void UDPDataListenerProducer::slowWrite(
void)
74 if(ReceiverSocket::receive(
data_, ipAddress_, port_) != -1)
79 header_[
"IPAddress"] = NetworkConverters::networkToStringIP(ipAddress_);
80 header_[
"Port"] = NetworkConverters::networkToStringPort(port_);
86 while(DataProducer::write(
data_, header_) < 0)
88 __CFG_COUT__ <<
"There are no available buffers! Retrying...after waiting 10 "
98 void UDPDataListenerProducer::fastWrite(
void)
102 if(DataProducer::attachToEmptySubBuffer(
dataP_, headerP_) < 0)
105 <<
"There are no available buffers! Retrying...after waiting 10 milliseconds!"
136 if(ReceiverSocket::receive(*
dataP_, ipAddress_, port_, 1, 0,
true) != -1)
138 header_[
"IPAddress"] = NetworkConverters::networkToStringIP(ipAddress_);
139 header_[
"Port"] = NetworkConverters::networkToStringPort(port_);
140 __CFG_COUT__ <<
"Received data IP: " << header_[
"IPAddress"]
141 <<
" port: " << header_[
"Port"] << __E__;
142 DataProducer::setWrittenSubBuffer<std::string,
143 std::map<std::string, std::string> >();
ConfigurationTree getNode(const std::string &nodeName, bool doNotThrowOnBrokenUIDLinks=false) const
navigating between nodes
void getValue(T &value) const
bool workLoopThread(toolbox::task::WorkLoop *workLoop)
bool UDPDataListenerProducer::getNextFragment(void)
std::string * dataP_
For fast write.
std::string data_
For slow write.