tdaq-develop-2025-02-12
TCPDataListenerProducer_processor.cc
1 #include "otsdaq/DataProcessorPlugins/TCPDataListenerProducer.h"
2 #include "otsdaq/Macros/CoutMacros.h"
3 #include "otsdaq/Macros/ProcessorPluginMacros.h"
4 #include "otsdaq/MessageFacility/MessageFacility.h"
5 #include "otsdaq/NetworkUtilities/NetworkConverters.h"
6 
7 #include <string.h>
8 #include <cassert>
9 #include <chrono>
10 #include <iostream>
11 #include <thread>
12 
13 using namespace ots;
14 
15 //==============================================================================
16 TCPDataListenerProducer::TCPDataListenerProducer(
17  std::string supervisorApplicationUID,
18  std::string bufferUID,
19  std::string processorUID,
20  const ConfigurationTree& theXDAQContextConfigTree,
21  const std::string& configurationPath)
22  : WorkLoop(processorUID)
23  //, Socket ("192.168.133.100", 40000)
24  , DataProducer(supervisorApplicationUID,
25  bufferUID,
26  processorUID,
27  theXDAQContextConfigTree.getNode(configurationPath)
28  .getNode("BufferSize")
29  .getValue<unsigned int>())
30  //, DataProducer (supervisorApplicationUID, bufferUID, processorUID, 100)
31  , Configurable(theXDAQContextConfigTree, configurationPath)
32  , TCPListenServer(theXDAQContextConfigTree.getNode(configurationPath)
33  .getNode("ServerPort")
34  .getValue<unsigned int>(),
35  theXDAQContextConfigTree.getNode(configurationPath)
36  .getNode("ServerMaxClients")
37  .getValue<unsigned>())
38  , dataP_(nullptr)
39  , headerP_(nullptr)
40  , dataType_(theXDAQContextConfigTree.getNode(configurationPath)
41  .getNode("DataType")
42  .getValue<std::string>())
43  , port_(theXDAQContextConfigTree.getNode(configurationPath)
44  .getNode("ServerPort")
45  .getValue<unsigned int>())
46 {
47 }
48 
49 //==============================================================================
50 TCPDataListenerProducer::~TCPDataListenerProducer(void) {}
51 
52 //==============================================================================
53 void TCPDataListenerProducer::startProcessingData(std::string runNumber)
54 {
55  startAccept();
56  DataProducer::startProcessingData(runNumber);
57 }
58 
59 //==============================================================================
60 void TCPDataListenerProducer::stopProcessingData(void)
61 {
62  DataProducer::stopProcessingData();
63 }
64 
65 //==============================================================================
66 bool TCPDataListenerProducer::workLoopThread(toolbox::task::WorkLoop* /*workLoop*/)
68 {
69  // std::cout << __COUT_HDR_FL__ << __PRETTY_FUNCTION__ << DataProcessor::processorUID_
70  // << " running, because workloop: " << WorkLoop::continueWorkLoop_ << std::endl;
71  fastWrite();
72  return WorkLoop::continueWorkLoop_;
73 }
74 
75 //==============================================================================
76 void TCPDataListenerProducer::slowWrite(void)
77 {
78  // std::cout << __COUT_HDR_FL__ << __PRETTY_FUNCTION__ << name_ << " running!" <<
79  // std::endl;
80 
81  try
82  {
83  data_ =
84  TCPListenServer::receive<std::string>(); // Throws an exception if it fails
85  if(data_.size() == 0)
86  {
87  std::this_thread::sleep_for(std::chrono::microseconds(1000));
88  return;
89  }
90  }
91  catch(const std::exception& e)
92  {
93  __COUT__ << "Error: " << e.what() << std::endl;
94  std::this_thread::sleep_for(std::chrono::seconds(1));
95  return;
96  }
97  header_["Port"] = std::to_string(port_);
98 
99  while(DataProducer::write(data_, header_) < 0)
100  {
101  __COUT__ << "There are no available buffers! Retrying...after waiting 10 "
102  "milliseconds!"
103  << std::endl;
104  std::this_thread::sleep_for(std::chrono::microseconds(1000));
105  return;
106  }
107 }
108 
109 //==============================================================================
110 void TCPDataListenerProducer::fastWrite(void)
111 {
112  // std::cout << __COUT_HDR_FL__ << __PRETTY_FUNCTION__ << name_ << " running!" <<
113  // std::endl;
114 
115  if(DataProducer::attachToEmptySubBuffer(dataP_, headerP_) < 0)
116  {
117  __COUT__
118  << "There are no available buffers! Retrying...after waiting 10 milliseconds!"
119  << std::endl;
120  std::this_thread::sleep_for(std::chrono::microseconds(1000));
121  return;
122  }
123 
124  try
125  {
126  if(dataType_ == "Packet")
127  *dataP_ =
128  TCPListenServer::receivePacket(); // Throws an exception if it fails
129  else //"Raw" || DEFAULT
130  *dataP_ = TCPListenServer::receive<
131  std::string>(); // Throws an exception if it fails
132 
133  if(dataP_->size() == 0) // When it goes in timeout
134  return;
135  }
136  catch(const std::exception& e)
137  {
138  __COUT__ << "Error: " << e.what() << std::endl;
139  std::this_thread::sleep_for(std::chrono::milliseconds(500));
140  return;
141  }
142  (*headerP_)["Port"] = std::to_string(port_);
143 
144  DataProducer::setWrittenSubBuffer<std::string, std::map<std::string, std::string>>();
145 }
146 
147 DEFINE_OTS_PROCESSOR(TCPDataListenerProducer)
std::string data_
For slow write.
bool workLoopThread(toolbox::task::WorkLoop *workLoop) override
bool TCPDataListenerProducer::getNextFragment(void)
std::string * dataP_
For fast write.