1 #include "otsdaq/DataProcessorPlugins/TCPDataReceiverProducer.h"
2 #include "otsdaq/Macros/CoutMacros.h"
3 #include "otsdaq/Macros/ProcessorPluginMacros.h"
4 #include "otsdaq/MessageFacility/MessageFacility.h"
5 #include "otsdaq/NetworkUtilities/NetworkConverters.h"
16 TCPDataReceiverProducer::TCPDataReceiverProducer(
17 std::string supervisorApplicationUID,
18 std::string bufferUID,
19 std::string processorUID,
21 const std::string& configurationPath)
27 theXDAQContextConfigTree.getNode(configurationPath)
28 .getNode(
"BufferSize")
29 .getValue<unsigned int>())
31 ,
Configurable(theXDAQContextConfigTree, configurationPath)
33 .getNode(
"ServerIPAddress")
34 .getValue<std::string>(),
35 theXDAQContextConfigTree.getNode(configurationPath)
36 .getNode(
"ServerPort")
37 .getValue<unsigned int>())
40 , ipAddress_(theXDAQContextConfigTree.getNode(configurationPath)
41 .getNode(
"ServerIPAddress")
42 .getValue<std::string>())
43 , port_(theXDAQContextConfigTree.getNode(configurationPath)
44 .getNode(
"ServerPort")
45 .getValue<unsigned int>())
46 , dataType_(theXDAQContextConfigTree.getNode(configurationPath)
48 .getValue<std::string>())
53 TCPDataReceiverProducer::~TCPDataReceiverProducer(
void) {}
56 void TCPDataReceiverProducer::startProcessingData(std::string runNumber)
58 TCPSubscribeClient::connect(30, 1000);
59 TCPSubscribeClient::setReceiveTimeout(1, 0);
60 DataProducer::startProcessingData(runNumber);
64 void TCPDataReceiverProducer::stopProcessingData(
void)
66 DataProducer::stopProcessingData();
67 TCPSubscribeClient::disconnect();
77 return WorkLoop::continueWorkLoop_;
81 void TCPDataReceiverProducer::slowWrite(
void)
88 if(dataType_ ==
"Packet")
90 TCPSubscribeClient::receivePacket();
96 std::this_thread::sleep_for(std::chrono::microseconds(1000));
100 catch(
const std::exception& e)
102 __COUT__ <<
"Error: " << e.what() << std::endl;
103 std::this_thread::sleep_for(std::chrono::seconds(1));
106 header_[
"IPAddress"] = ipAddress_;
107 header_[
"Port"] = std::to_string(port_);
109 while(DataProducer::write(
data_, header_) < 0)
111 __COUT__ <<
"There are no available buffers! Retrying...after waiting 10 "
114 std::this_thread::sleep_for(std::chrono::microseconds(1000));
120 void TCPDataReceiverProducer::fastWrite(
void)
125 if(DataProducer::attachToEmptySubBuffer(
dataP_, headerP_) < 0)
128 <<
"There are no available buffers! Retrying...after waiting 10 milliseconds!"
130 std::this_thread::sleep_for(std::chrono::microseconds(1000));
136 if(dataType_ ==
"Packet")
138 TCPSubscribeClient::receivePacket();
146 catch(
const std::exception& e)
148 __COUT__ <<
"Error: " << e.what() << std::endl;
149 std::this_thread::sleep_for(std::chrono::milliseconds(500));
152 (*headerP_)[
"IPAddress"] = ipAddress_;
153 (*headerP_)[
"Port"] = std::to_string(port_);
155 DataProducer::setWrittenSubBuffer<std::string, std::map<std::string, std::string>>();
std::string data_
For slow write.
bool workLoopThread(toolbox::task::WorkLoop *workLoop) override
bool TCPDataReceiverProducer::getNextFragment(void)
std::string * dataP_
For fast write.