otsdaq  v2_05_02_indev
TCPDataListenerProducer_processor.cc
1 #include "otsdaq/DataProcessorPlugins/TCPDataListenerProducer.h"
2 #include "otsdaq/Macros/CoutMacros.h"
3 #include "otsdaq/Macros/ProcessorPluginMacros.h"
4 #include "otsdaq/MessageFacility/MessageFacility.h"
5 #include "otsdaq/NetworkUtilities/NetworkConverters.h"
6 
7 #include <string.h>
8 #include <cassert>
9 #include <chrono>
10 #include <iostream>
11 #include <thread>
12 
13 using namespace ots;
14 
15 //==============================================================================
16 TCPDataListenerProducer::TCPDataListenerProducer(std::string supervisorApplicationUID,
17  std::string bufferUID,
18  std::string processorUID,
19  const ConfigurationTree& theXDAQContextConfigTree,
20  const std::string& configurationPath)
21  : WorkLoop(processorUID)
22  //, Socket ("192.168.133.100", 40000)
23  , DataProducer(
24  supervisorApplicationUID, bufferUID, processorUID, theXDAQContextConfigTree.getNode(configurationPath).getNode("BufferSize").getValue<unsigned int>())
25  //, DataProducer (supervisorApplicationUID, bufferUID, processorUID, 100)
26  , Configurable(theXDAQContextConfigTree, configurationPath)
27  , TCPSubscribeClient(theXDAQContextConfigTree.getNode(configurationPath).getNode("ServerIPAddress").getValue<std::string>(),
28  theXDAQContextConfigTree.getNode(configurationPath).getNode("ServerPort").getValue<unsigned int>())
29  , dataP_(nullptr)
30  , headerP_(nullptr)
31  , ipAddress_(theXDAQContextConfigTree.getNode(configurationPath).getNode("ServerIPAddress").getValue<std::string>())
32  , port_(theXDAQContextConfigTree.getNode(configurationPath).getNode("ServerPort").getValue<unsigned int>())
33  , dataType_(theXDAQContextConfigTree.getNode(configurationPath).getNode("DataType").getValue<std::string>())
34 {
35 }
36 
37 //==============================================================================
38 TCPDataListenerProducer::~TCPDataListenerProducer(void) {}
39 
40 //==============================================================================
41 void TCPDataListenerProducer::startProcessingData(std::string runNumber)
42 {
43  TCPSubscribeClient::connect();
44  TCPSubscribeClient::setReceiveTimeout(1, 1000);
45  DataProducer::startProcessingData(runNumber);
46 }
47 
48 //==============================================================================
49 void TCPDataListenerProducer::stopProcessingData(void)
50 {
51  DataProducer::stopProcessingData();
52  TCPSubscribeClient::disconnect();
53 }
54 
55 //==============================================================================
56 bool TCPDataListenerProducer::workLoopThread(toolbox::task::WorkLoop* /*workLoop*/)
57 // bool TCPDataListenerProducer::getNextFragment(void)
58 {
59  // std::cout << __COUT_HDR_FL__ << __PRETTY_FUNCTION__ << DataProcessor::processorUID_
60  // << " running, because workloop: " << WorkLoop::continueWorkLoop_ << std::endl;
61  fastWrite();
62  return WorkLoop::continueWorkLoop_;
63 }
64 
65 //==============================================================================
66 void TCPDataListenerProducer::slowWrite(void)
67 {
68  // std::cout << __COUT_HDR_FL__ << __PRETTY_FUNCTION__ << name_ << " running!" <<
69  // std::endl;
70 
71  try
72  {
73  data_ = TCPSubscribeClient::receive<std::string>(); // Throws an exception if it fails
74  if(data_.size() == 0)
75  {
76  std::this_thread::sleep_for(std::chrono::microseconds(1000));
77  return;
78  }
79  }
80  catch(const std::exception& e)
81  {
82  __COUT__ << "Error: " << e.what() << std::endl;
83  ;
84  return;
85  }
86  header_["IPAddress"] = ipAddress_;
87  header_["Port"] = std::to_string(port_);
88 
89  while(DataProducer::write(data_, header_) < 0)
90  {
91  __COUT__ << "There are no available buffers! Retrying...after waiting 10 "
92  "milliseconds!"
93  << std::endl;
94  std::this_thread::sleep_for(std::chrono::microseconds(1000));
95  return;
96  }
97 }
98 
99 //==============================================================================
100 void TCPDataListenerProducer::fastWrite(void)
101 {
102  // std::cout << __COUT_HDR_FL__ << __PRETTY_FUNCTION__ << name_ << " running!" <<
103  // std::endl;
104 
105  if(DataProducer::attachToEmptySubBuffer(dataP_, headerP_) < 0)
106  {
107  __COUT__ << "There are no available buffers! Retrying...after waiting 10 milliseconds!" << std::endl;
108  std::this_thread::sleep_for(std::chrono::microseconds(1000));
109  return;
110  }
111 
112  try
113  {
114  if(dataType_ == "Packet")
115  *dataP_ = TCPSubscribeClient::receivePacket(); // Throws an exception if it fails
116  else//"Raw" || DEFAULT
117  *dataP_ = TCPSubscribeClient::receive<std::string>(); // Throws an exception if it fails
118 
119  if(dataP_->size() == 0)
120  return;
121  }
122  catch(const std::exception& e)
123  {
124  __COUT__ << "Error: " << e.what() << std::endl;
125  ;
126  return;
127  }
128  (*headerP_)["IPAddress"] = ipAddress_;
129  (*headerP_)["Port"] = std::to_string(port_);
130 
131  DataProducer::setWrittenSubBuffer<std::string, std::map<std::string, std::string>>();
132 }
133 
134 DEFINE_OTS_PROCESSOR(TCPDataListenerProducer)