otsdaq  v2_05_02_indev
UDPDataListenerProducer_processor.cc
1 #include "otsdaq/DataProcessorPlugins/UDPDataListenerProducer.h"
2 #include "otsdaq/Macros/CoutMacros.h"
3 #include "otsdaq/Macros/ProcessorPluginMacros.h"
4 #include "otsdaq/MessageFacility/MessageFacility.h"
5 #include "otsdaq/NetworkUtilities/NetworkConverters.h"
6 
7 #include <string.h>
8 #include <unistd.h>
9 #include <cassert>
10 #include <iostream>
11 
12 using namespace ots;
13 
14 //==============================================================================
15 UDPDataListenerProducer::UDPDataListenerProducer(std::string supervisorApplicationUID,
16  std::string bufferUID,
17  std::string processorUID,
18  const ConfigurationTree& theXDAQContextConfigTree,
19  const std::string& configurationPath)
20  : WorkLoop(processorUID)
21  , Socket(theXDAQContextConfigTree.getNode(configurationPath).getNode("HostIPAddress").getValue<std::string>(),
22  theXDAQContextConfigTree.getNode(configurationPath).getNode("HostPort").getValue<unsigned int>())
23  //, Socket ("192.168.133.100", 40000)
24  , DataProducer(
25  supervisorApplicationUID, bufferUID, processorUID, theXDAQContextConfigTree.getNode(configurationPath).getNode("BufferSize").getValue<unsigned int>())
26  //, DataProducer (supervisorApplicationUID, bufferUID, processorUID, 100)
27  , Configurable(theXDAQContextConfigTree, configurationPath)
28  , dataP_(nullptr)
29  , headerP_(nullptr)
30 {
31  unsigned int socketReceiveBufferSize;
32  try // if socketReceiveBufferSize is defined in configuration, use it
33  {
34  socketReceiveBufferSize = theXDAQContextConfigTree.getNode(configurationPath).getNode("SocketReceiveBufferSize").getValue<unsigned int>();
35  }
36  catch(...)
37  {
38  // for backwards compatibility, ignore
39  socketReceiveBufferSize = 0x10000000; // default to "large"
40  }
41 
42  Socket::initialize(socketReceiveBufferSize);
43 }
44 
45 //==============================================================================
46 UDPDataListenerProducer::~UDPDataListenerProducer(void) {}
47 
48 //==============================================================================
49 bool UDPDataListenerProducer::workLoopThread(toolbox::task::WorkLoop* /*workLoop*/)
50 // bool UDPDataListenerProducer::getNextFragment(void)
51 {
52  //__CFG_COUT__DataProcessor::processorUID_ << " running, because workloop: " <<
53  // WorkLoop::continueWorkLoop_ << std::endl;
54  fastWrite();
55  return WorkLoop::continueWorkLoop_;
56 }
57 
58 //==============================================================================
59 void UDPDataListenerProducer::slowWrite(void)
60 {
61  //__CFG_COUT__name_ << " running!" << std::endl;
62 
63  if(ReceiverSocket::receive(data_, ipAddress_, port_) != -1)
64  {
65  //__CFG_COUT__name_ << " Buffer: " << message << std::endl;
66  //__CFG_COUT__processorUID_ << " -> Got some data. From: " << std::hex <<
67  // fromIPAddress << " port: " << fromPort << std::dec << std::endl;
68  header_["IPAddress"] = NetworkConverters::networkToStringIP(ipAddress_);
69  header_["Port"] = NetworkConverters::networkToStringPort(port_);
70  // unsigned long long value;
71  // memcpy((void *)&value, (void *) data_.substr(2).data(),8); //make data
72  // counter
73  // __CFG_COUT__std::hex << value << std::dec << std::endl;
74 
75  while(DataProducer::write(data_, header_) < 0)
76  {
77  __CFG_COUT__ << "There are no available buffers! Retrying...after waiting 10 "
78  "milliseconds!"
79  << std::endl;
80  usleep(10000);
81  return;
82  }
83  }
84 }
85 
86 //==============================================================================
87 void UDPDataListenerProducer::fastWrite(void)
88 {
89  //__CFG_COUT__ << " running!" << std::endl;
90 
91  if(DataProducer::attachToEmptySubBuffer(dataP_, headerP_) < 0)
92  {
93  __CFG_COUT__ << "There are no available buffers! Retrying...after waiting 10 milliseconds!" << std::endl;
94  usleep(10000);
95  return;
96  }
97 
98  // if(0) // test data buffers
99  // {
100  // sleep(1);
101  // unsigned long long value = 0xA54321; // this is 8-bytes
102  // std::string& buffer = *dataP_;
103  // buffer.resize(
104  // 8); // NOTE: this is inexpensive according to Lorenzo/documentation
105  // // in C++11 (only increases size once and doesn't decrease size)
106  // memcpy((void*)&buffer[0] /*dest*/, (void*)&value /*src*/, 8 /*numOfBytes*/);
107  //
108  // // size() and length() are equivalent
109  // __CFG_COUT__ << "Writing to buffer " << buffer.size() << " bytes!" << __E__;
110  // __CFG_COUT__ << "Writing to buffer length " << buffer.length() << " bytes!"
111  // << __E__;
112  //
113  // __CFG_COUT__ << "Buffer Data: "
114  // << BinaryStringMacros::binaryNumberToHexString(buffer) << __E__;
115  //
116  // __CFG_COUTV__(DataProcessor::theCircularBuffer_);
117  //
118  // DataProducer::setWrittenSubBuffer<std::string,
119  // std::map<std::string, std::string> >();
120  // return;
121  // }
122 
123  if(ReceiverSocket::receive(*dataP_, ipAddress_, port_, 1, 0, true) != -1)
124  {
125  header_["IPAddress"] = NetworkConverters::networkToStringIP(ipAddress_);
126  header_["Port"] = NetworkConverters::networkToStringPort(port_);
127  __CFG_COUT__ << "Received data IP: " << header_["IPAddress"] << " port: " << header_["Port"] << __E__;
128  DataProducer::setWrittenSubBuffer<std::string, std::map<std::string, std::string> >();
129  }
130 }
131 
132 DEFINE_OTS_PROCESSOR(UDPDataListenerProducer)