tdaq-develop-2025-02-12
TCPDataReceiverProducer_processor.cc
1 #include "otsdaq/DataProcessorPlugins/TCPDataReceiverProducer.h"
2 #include "otsdaq/Macros/CoutMacros.h"
3 #include "otsdaq/Macros/ProcessorPluginMacros.h"
4 #include "otsdaq/MessageFacility/MessageFacility.h"
5 #include "otsdaq/NetworkUtilities/NetworkConverters.h"
6 
7 #include <string.h>
8 #include <cassert>
9 #include <chrono>
10 #include <iostream>
11 #include <thread>
12 
13 using namespace ots;
14 
15 //==============================================================================
16 TCPDataReceiverProducer::TCPDataReceiverProducer(
17  std::string supervisorApplicationUID,
18  std::string bufferUID,
19  std::string processorUID,
20  const ConfigurationTree& theXDAQContextConfigTree,
21  const std::string& configurationPath)
22  : WorkLoop(processorUID)
23  //, Socket ("192.168.133.100", 40000)
24  , DataProducer(supervisorApplicationUID,
25  bufferUID,
26  processorUID,
27  theXDAQContextConfigTree.getNode(configurationPath)
28  .getNode("BufferSize")
29  .getValue<unsigned int>())
30  //, DataProducer (supervisorApplicationUID, bufferUID, processorUID, 100)
31  , Configurable(theXDAQContextConfigTree, configurationPath)
32  , TCPSubscribeClient(theXDAQContextConfigTree.getNode(configurationPath)
33  .getNode("ServerIPAddress")
34  .getValue<std::string>(),
35  theXDAQContextConfigTree.getNode(configurationPath)
36  .getNode("ServerPort")
37  .getValue<unsigned int>())
38  , dataP_(nullptr)
39  , headerP_(nullptr)
40  , ipAddress_(theXDAQContextConfigTree.getNode(configurationPath)
41  .getNode("ServerIPAddress")
42  .getValue<std::string>())
43  , port_(theXDAQContextConfigTree.getNode(configurationPath)
44  .getNode("ServerPort")
45  .getValue<unsigned int>())
46  , dataType_(theXDAQContextConfigTree.getNode(configurationPath)
47  .getNode("DataType")
48  .getValue<std::string>())
49 {
50 }
51 
52 //==============================================================================
53 TCPDataReceiverProducer::~TCPDataReceiverProducer(void) {}
54 
55 //==============================================================================
56 void TCPDataReceiverProducer::startProcessingData(std::string runNumber)
57 {
58  TCPSubscribeClient::connect(30, 1000);
59  TCPSubscribeClient::setReceiveTimeout(1, 0);
60  DataProducer::startProcessingData(runNumber);
61 }
62 
63 //==============================================================================
64 void TCPDataReceiverProducer::stopProcessingData(void)
65 {
66  DataProducer::stopProcessingData();
67  TCPSubscribeClient::disconnect();
68 }
69 
70 //==============================================================================
71 bool TCPDataReceiverProducer::workLoopThread(toolbox::task::WorkLoop* /*workLoop*/)
73 {
74  // std::cout << __COUT_HDR_FL__ << __PRETTY_FUNCTION__ << DataProcessor::processorUID_
75  // << " running, because workloop: " << WorkLoop::continueWorkLoop_ << std::endl;
76  fastWrite();
77  return WorkLoop::continueWorkLoop_;
78 }
79 
80 //==============================================================================
81 void TCPDataReceiverProducer::slowWrite(void)
82 {
83  // std::cout << __COUT_HDR_FL__ << __PRETTY_FUNCTION__ << name_ << " running!" <<
84  // std::endl;
85 
86  try
87  {
88  if(dataType_ == "Packet")
89  data_ =
90  TCPSubscribeClient::receivePacket(); // Throws an exception if it fails
91  else //"Raw" || DEFAULT
93  std::string>(); // Throws an exception if it fails
94  if(data_.size() == 0)
95  {
96  std::this_thread::sleep_for(std::chrono::microseconds(1000));
97  return;
98  }
99  }
100  catch(const std::exception& e)
101  {
102  __COUT__ << "Error: " << e.what() << std::endl;
103  std::this_thread::sleep_for(std::chrono::seconds(1));
104  return;
105  }
106  header_["IPAddress"] = ipAddress_;
107  header_["Port"] = std::to_string(port_);
108 
109  while(DataProducer::write(data_, header_) < 0)
110  {
111  __COUT__ << "There are no available buffers! Retrying...after waiting 10 "
112  "milliseconds!"
113  << std::endl;
114  std::this_thread::sleep_for(std::chrono::microseconds(1000));
115  return;
116  }
117 }
118 
119 //==============================================================================
120 void TCPDataReceiverProducer::fastWrite(void)
121 {
122  // std::cout << __COUT_HDR_FL__ << __PRETTY_FUNCTION__ << name_ << " running!" <<
123  // std::endl;
124 
125  if(DataProducer::attachToEmptySubBuffer(dataP_, headerP_) < 0)
126  {
127  __COUT__
128  << "There are no available buffers! Retrying...after waiting 10 milliseconds!"
129  << std::endl;
130  std::this_thread::sleep_for(std::chrono::microseconds(1000));
131  return;
132  }
133 
134  try
135  {
136  if(dataType_ == "Packet")
137  *dataP_ =
138  TCPSubscribeClient::receivePacket(); // Throws an exception if it fails
139  else //"Raw" || DEFAULT
141  std::string>(); // Throws an exception if it fails
142 
143  if(dataP_->size() == 0) // When it goes in timeout
144  return;
145  }
146  catch(const std::exception& e)
147  {
148  __COUT__ << "Error: " << e.what() << std::endl;
149  std::this_thread::sleep_for(std::chrono::milliseconds(500));
150  return;
151  }
152  (*headerP_)["IPAddress"] = ipAddress_;
153  (*headerP_)["Port"] = std::to_string(port_);
154 
155  DataProducer::setWrittenSubBuffer<std::string, std::map<std::string, std::string>>();
156 }
157 
158 DEFINE_OTS_PROCESSOR(TCPDataReceiverProducer)
std::string data_
For slow write.
bool workLoopThread(toolbox::task::WorkLoop *workLoop) override
bool TCPDataReceiverProducer::getNextFragment(void)
std::string * dataP_
For fast write.