1 #include "otsdaq/DataProcessorPlugins/TCPDataListenerProducer.h"
2 #include "otsdaq/Macros/CoutMacros.h"
3 #include "otsdaq/Macros/ProcessorPluginMacros.h"
4 #include "otsdaq/MessageFacility/MessageFacility.h"
5 #include "otsdaq/NetworkUtilities/NetworkConverters.h"
16 TCPDataListenerProducer::TCPDataListenerProducer(std::string supervisorApplicationUID,
17 std::string bufferUID,
18 std::string processorUID,
20 const std::string& configurationPath)
24 supervisorApplicationUID, bufferUID, processorUID, theXDAQContextConfigTree.getNode(configurationPath).getNode(
"BufferSize").getValue<unsigned int>())
26 ,
Configurable(theXDAQContextConfigTree, configurationPath)
27 ,
TCPSubscribeClient(theXDAQContextConfigTree.getNode(configurationPath).getNode(
"ServerIPAddress").getValue<std::string>(),
28 theXDAQContextConfigTree.getNode(configurationPath).getNode(
"ServerPort").getValue<unsigned int>())
31 , ipAddress_(theXDAQContextConfigTree.getNode(configurationPath).getNode(
"ServerIPAddress").getValue<std::string>())
32 , port_(theXDAQContextConfigTree.getNode(configurationPath).getNode(
"ServerPort").getValue<unsigned int>())
33 , dataType_(theXDAQContextConfigTree.getNode(configurationPath).getNode(
"DataType").getValue<std::string>())
38 TCPDataListenerProducer::~TCPDataListenerProducer(
void) {}
41 void TCPDataListenerProducer::startProcessingData(std::string runNumber)
43 TCPSubscribeClient::connect();
44 TCPSubscribeClient::setReceiveTimeout(1, 1000);
45 DataProducer::startProcessingData(runNumber);
49 void TCPDataListenerProducer::stopProcessingData(
void)
51 DataProducer::stopProcessingData();
52 TCPSubscribeClient::disconnect();
56 bool TCPDataListenerProducer::workLoopThread(toolbox::task::WorkLoop* )
62 return WorkLoop::continueWorkLoop_;
66 void TCPDataListenerProducer::slowWrite(
void)
73 data_ = TCPSubscribeClient::receive<std::string>();
76 std::this_thread::sleep_for(std::chrono::microseconds(1000));
80 catch(
const std::exception& e)
82 __COUT__ <<
"Error: " << e.what() << std::endl;
86 header_[
"IPAddress"] = ipAddress_;
87 header_[
"Port"] = std::to_string(port_);
89 while(DataProducer::write(data_, header_) < 0)
91 __COUT__ <<
"There are no available buffers! Retrying...after waiting 10 "
94 std::this_thread::sleep_for(std::chrono::microseconds(1000));
100 void TCPDataListenerProducer::fastWrite(
void)
105 if(DataProducer::attachToEmptySubBuffer(dataP_, headerP_) < 0)
107 __COUT__ <<
"There are no available buffers! Retrying...after waiting 10 milliseconds!" << std::endl;
108 std::this_thread::sleep_for(std::chrono::microseconds(1000));
114 if(dataType_ ==
"Packet")
115 *dataP_ = TCPSubscribeClient::receivePacket();
117 *dataP_ = TCPSubscribeClient::receive<std::string>();
119 if(dataP_->size() == 0)
122 catch(
const std::exception& e)
124 __COUT__ <<
"Error: " << e.what() << std::endl;
128 (*headerP_)[
"IPAddress"] = ipAddress_;
129 (*headerP_)[
"Port"] = std::to_string(port_);
131 DataProducer::setWrittenSubBuffer<std::string, std::map<std::string, std::string>>();