1 #include "otsdaq/DataProcessorPlugins/TCPDataListenerProducer.h"
2 #include "otsdaq/Macros/CoutMacros.h"
3 #include "otsdaq/Macros/ProcessorPluginMacros.h"
4 #include "otsdaq/MessageFacility/MessageFacility.h"
5 #include "otsdaq/NetworkUtilities/NetworkConverters.h"
16 TCPDataListenerProducer::TCPDataListenerProducer(
17 std::string supervisorApplicationUID,
18 std::string bufferUID,
19 std::string processorUID,
21 const std::string& configurationPath)
27 theXDAQContextConfigTree.getNode(configurationPath)
28 .getNode(
"BufferSize")
29 .getValue<unsigned int>())
31 ,
Configurable(theXDAQContextConfigTree, configurationPath)
33 .getNode(
"ServerPort")
34 .getValue<unsigned int>(),
35 theXDAQContextConfigTree.getNode(configurationPath)
36 .getNode(
"ServerMaxClients")
37 .getValue<unsigned>())
40 , dataType_(theXDAQContextConfigTree.getNode(configurationPath)
42 .getValue<std::string>())
43 , port_(theXDAQContextConfigTree.getNode(configurationPath)
44 .getNode(
"ServerPort")
45 .getValue<unsigned int>())
50 TCPDataListenerProducer::~TCPDataListenerProducer(
void) {}
53 void TCPDataListenerProducer::startProcessingData(std::string runNumber)
56 DataProducer::startProcessingData(runNumber);
60 void TCPDataListenerProducer::stopProcessingData(
void)
62 DataProducer::stopProcessingData();
72 return WorkLoop::continueWorkLoop_;
76 void TCPDataListenerProducer::slowWrite(
void)
84 TCPListenServer::receive<std::string>();
87 std::this_thread::sleep_for(std::chrono::microseconds(1000));
91 catch(
const std::exception& e)
93 __COUT__ <<
"Error: " << e.what() << std::endl;
94 std::this_thread::sleep_for(std::chrono::seconds(1));
97 header_[
"Port"] = std::to_string(port_);
99 while(DataProducer::write(
data_, header_) < 0)
101 __COUT__ <<
"There are no available buffers! Retrying...after waiting 10 "
104 std::this_thread::sleep_for(std::chrono::microseconds(1000));
110 void TCPDataListenerProducer::fastWrite(
void)
115 if(DataProducer::attachToEmptySubBuffer(
dataP_, headerP_) < 0)
118 <<
"There are no available buffers! Retrying...after waiting 10 milliseconds!"
120 std::this_thread::sleep_for(std::chrono::microseconds(1000));
126 if(dataType_ ==
"Packet")
128 TCPListenServer::receivePacket();
130 *
dataP_ = TCPListenServer::receive<
136 catch(
const std::exception& e)
138 __COUT__ <<
"Error: " << e.what() << std::endl;
139 std::this_thread::sleep_for(std::chrono::milliseconds(500));
142 (*headerP_)[
"Port"] = std::to_string(port_);
144 DataProducer::setWrittenSubBuffer<std::string, std::map<std::string, std::string>>();
std::string data_
For slow write.
bool workLoopThread(toolbox::task::WorkLoop *workLoop) override
bool TCPDataListenerProducer::getNextFragment(void)
std::string * dataP_
For fast write.