tdaq-develop-2025-02-12
UDPDataStreamerConsumer_processor.cc
1 #include "otsdaq/DataProcessorPlugins/UDPDataStreamerConsumer.h"
2 #include "otsdaq/Macros/CoutMacros.h"
3 #include "otsdaq/Macros/ProcessorPluginMacros.h"
4 #include "otsdaq/MessageFacility/MessageFacility.h"
5 
6 #include <unistd.h>
7 #include <cassert>
8 #include <iostream>
9 
10 using namespace ots;
11 
12 //==============================================================================
14  std::string supervisorApplicationUID,
15  std::string bufferUID,
16  std::string processorUID,
17  const ConfigurationTree& theXDAQContextConfigTree,
18  const std::string& configurationPath)
19  : Socket(theXDAQContextConfigTree.getNode(configurationPath)
20  .getNode("HostIPAddress")
21  .getValue<std::string>(),
22  theXDAQContextConfigTree.getNode(configurationPath)
23  .getNode("HostPort")
24  .getValue<unsigned int>())
25  , WorkLoop(processorUID)
26  , UDPDataStreamerBase(theXDAQContextConfigTree.getNode(configurationPath)
27  .getNode("HostIPAddress")
28  .getValue<std::string>(),
29  theXDAQContextConfigTree.getNode(configurationPath)
30  .getNode("HostPort")
31  .getValue<unsigned int>(),
32  theXDAQContextConfigTree.getNode(configurationPath)
33  .getNode("StreamToIPAddress")
34  .getValue<std::string>(),
35  theXDAQContextConfigTree.getNode(configurationPath)
36  .getNode("StreamToPort")
37  .getValue<unsigned int>())
38  , DataConsumer(
39  supervisorApplicationUID, bufferUID, processorUID, HighConsumerPriority)
40  , Configurable(theXDAQContextConfigTree, configurationPath)
44 {
45  // Socket::initialize(); //dont call this! UDPDataStreamerBase() calls it
46  __COUT__ << "done!" << std::endl;
47 }
48 
49 //==============================================================================
50 UDPDataStreamerConsumer::~UDPDataStreamerConsumer(void) {}
51 
52 //==============================================================================
53 bool UDPDataStreamerConsumer::workLoopThread(toolbox::task::WorkLoop* /*workLoop*/)
54 {
55  fastRead();
56  return WorkLoop::continueWorkLoop_;
57 }
58 
59 //==============================================================================
60 void UDPDataStreamerConsumer::fastRead(void)
61 {
62  //__COUT__ << processorUID_ << " running!" << std::endl;
63  if(DataConsumer::read(dataP_, headerP_) < 0)
64  {
65  usleep(100);
66  return;
67  }
68  // unsigned int reconverted = (((*headerP_)["IPAddress"][0]&0xff)<<24) +
69  // (((*headerP_)["IPAddress"][1]&0xff)<<16) + (((*headerP_)["IPAddress"][2]&0xff)<<8)
70  // +
71  // ((*headerP_)["IPAddress"][3]&0xff);
72  //__COUT__ << processorUID_ << " -> Got some data. From: " << std::hex << reconverted
73  //<< std::dec << std::endl;
74 
75  // std::cout << __COUT_HDR_FL__ << dataP_->length() << std::endl;
76  TransmitterSocket::send(streamToSocket_, *dataP_);
77  DataConsumer::setReadSubBuffer<std::string, std::map<std::string, std::string>>();
78 }
79 
80 //==============================================================================
81 void UDPDataStreamerConsumer::slowRead(void)
82 {
83  //__COUT__ << processorUID_ << " running!" << std::endl;
84  // This is making a copy!!!
85  if(DataConsumer::read(data_, header_) < 0)
86  {
87  usleep(1000);
88  return;
89  }
90  // unsigned int reconverted = ((header_["IPAddress"][0]&0xff)<<24) +
91  // ((header_["IPAddress"][1]&0xff)<<16) + ((header_["IPAddress"][2]&0xff)<<8) +
92  // (header_["IPAddress"][3]&0xff);
93  //__COUT__ << processorUID_ << " -> Got some data. From: " << std::hex << reconverted
94  //<< std::dec << std::endl;
95 
96  TransmitterSocket::send(streamToSocket_, data_);
97 }
98 
99 DEFINE_OTS_PROCESSOR(UDPDataStreamerConsumer)
int read(D &buffer, H &header)
Copies the buffer into the passed parameters.
Definition: DataConsumer.h:38
std::string * dataP_
For fast read.
std::string data_
For slow read.
UDPDataStreamerConsumer(std::string supervisorApplicationUID, std::string bufferUID, std::string processorUID, const ConfigurationTree &theXDAQContextConfigTree, const std::string &configurationPath)