tdaq-develop-2025-02-12
UDPDataListenerProducer_processor.cc
1 #include "otsdaq/DataProcessorPlugins/UDPDataListenerProducer.h"
2 #include "otsdaq/Macros/CoutMacros.h"
3 #include "otsdaq/Macros/ProcessorPluginMacros.h"
4 #include "otsdaq/MessageFacility/MessageFacility.h"
5 #include "otsdaq/NetworkUtilities/NetworkConverters.h"
6 
7 #include <string.h>
8 #include <unistd.h>
9 #include <cassert>
10 #include <iostream>
11 
12 using namespace ots;
13 
14 //==============================================================================
15 UDPDataListenerProducer::UDPDataListenerProducer(
16  std::string supervisorApplicationUID,
17  std::string bufferUID,
18  std::string processorUID,
19  const ConfigurationTree& theXDAQContextConfigTree,
20  const std::string& configurationPath)
21  : WorkLoop(processorUID)
22  , Socket(theXDAQContextConfigTree.getNode(configurationPath)
23  .getNode("HostIPAddress")
24  .getValue<std::string>(),
25  theXDAQContextConfigTree.getNode(configurationPath)
26  .getNode("HostPort")
27  .getValue<unsigned int>())
28  //, Socket ("192.168.133.100", 40000)
29  , DataProducer(supervisorApplicationUID,
30  bufferUID,
31  processorUID,
32  theXDAQContextConfigTree.getNode(configurationPath)
33  .getNode("BufferSize")
34  .getValue<unsigned int>())
35  //, DataProducer (supervisorApplicationUID, bufferUID, processorUID, 100)
36  , Configurable(theXDAQContextConfigTree, configurationPath)
37  , dataP_(nullptr)
38  , headerP_(nullptr)
39 {
40  unsigned int socketReceiveBufferSize;
41  try // if socketReceiveBufferSize is defined in configuration, use it
42  {
43  socketReceiveBufferSize = theXDAQContextConfigTree.getNode(configurationPath)
44  .getNode("SocketReceiveBufferSize")
45  .getValue<unsigned int>();
46  }
47  catch(...)
48  {
49  // for backwards compatibility, ignore
50  socketReceiveBufferSize = 0x10000000; // default to "large"
51  }
52 
53  Socket::initialize(socketReceiveBufferSize);
54 }
55 
56 //==============================================================================
57 UDPDataListenerProducer::~UDPDataListenerProducer(void) {}
58 
59 //==============================================================================
60 bool UDPDataListenerProducer::workLoopThread(toolbox::task::WorkLoop* /*workLoop*/)
62 {
63  //__CFG_COUT__DataProcessor::processorUID_ << " running, because workloop: " <<
64  // WorkLoop::continueWorkLoop_ << std::endl;
65  fastWrite();
66  return WorkLoop::continueWorkLoop_;
67 }
68 
69 //==============================================================================
70 void UDPDataListenerProducer::slowWrite(void)
71 {
72  //__CFG_COUT__name_ << " running!" << std::endl;
73 
74  if(ReceiverSocket::receive(data_, ipAddress_, port_) != -1)
75  {
76  //__CFG_COUT__name_ << " Buffer: " << message << std::endl;
77  //__CFG_COUT__processorUID_ << " -> Got some data. From: " << std::hex <<
78  // fromIPAddress << " port: " << fromPort << std::dec << std::endl;
79  header_["IPAddress"] = NetworkConverters::networkToStringIP(ipAddress_);
80  header_["Port"] = NetworkConverters::networkToStringPort(port_);
81  // unsigned long long value;
82  // memcpy((void *)&value, (void *) data_.substr(2).data(),8); //make data
83  // counter
84  // __CFG_COUT__std::hex << value << std::dec << std::endl;
85 
86  while(DataProducer::write(data_, header_) < 0)
87  {
88  __CFG_COUT__ << "There are no available buffers! Retrying...after waiting 10 "
89  "milliseconds!"
90  << std::endl;
91  usleep(10000);
92  return;
93  }
94  }
95 }
96 
97 //==============================================================================
98 void UDPDataListenerProducer::fastWrite(void)
99 {
100  //__CFG_COUT__ << " running!" << std::endl;
101 
102  if(DataProducer::attachToEmptySubBuffer(dataP_, headerP_) < 0)
103  {
104  __CFG_COUT__
105  << "There are no available buffers! Retrying...after waiting 10 milliseconds!"
106  << std::endl;
107  usleep(10000);
108  return;
109  }
110 
111  // if(0) // test data buffers
112  // {
113  // sleep(1);
114  // unsigned long long value = 0xA54321; // this is 8-bytes
115  // std::string& buffer = *dataP_;
116  // buffer.resize(
117  // 8); // NOTE: this is inexpensive according to Lorenzo/documentation
118  // // in C++11 (only increases size once and doesn't decrease size)
119  // memcpy((void*)&buffer[0] /*dest*/, (void*)&value /*src*/, 8 /*numOfBytes*/);
120  //
121  // // size() and length() are equivalent
122  // __CFG_COUT__ << "Writing to buffer " << buffer.size() << " bytes!" << __E__;
123  // __CFG_COUT__ << "Writing to buffer length " << buffer.length() << " bytes!"
124  // << __E__;
125  //
126  // __CFG_COUT__ << "Buffer Data: "
127  // << BinaryStringMacros::binaryNumberToHexString(buffer) << __E__;
128  //
129  // __CFG_COUTV__(DataProcessor::theCircularBuffer_);
130  //
131  // DataProducer::setWrittenSubBuffer<std::string,
132  // std::map<std::string, std::string> >();
133  // return;
134  // }
135 
136  if(ReceiverSocket::receive(*dataP_, ipAddress_, port_, 1, 0, true) != -1)
137  {
138  header_["IPAddress"] = NetworkConverters::networkToStringIP(ipAddress_);
139  header_["Port"] = NetworkConverters::networkToStringPort(port_);
140  __CFG_COUT__ << "Received data IP: " << header_["IPAddress"]
141  << " port: " << header_["Port"] << __E__;
142  DataProducer::setWrittenSubBuffer<std::string,
143  std::map<std::string, std::string> >();
144  }
145 }
146 
147 DEFINE_OTS_PROCESSOR(UDPDataListenerProducer)
ConfigurationTree getNode(const std::string &nodeName, bool doNotThrowOnBrokenUIDLinks=false) const
navigating between nodes
void getValue(T &value) const
bool workLoopThread(toolbox::task::WorkLoop *workLoop)
bool UDPDataListenerProducer::getNextFragment(void)
std::string * dataP_
For fast write.
std::string data_
For slow write.