tdaq-develop-2025-02-12
TCPDataStreamerConsumer_processor.cc
1 #include "otsdaq/DataProcessorPlugins/TCPDataStreamerConsumer.h"
2 #include "otsdaq/Macros/CoutMacros.h"
3 #include "otsdaq/Macros/ProcessorPluginMacros.h"
4 #include "otsdaq/MessageFacility/MessageFacility.h"
5 
6 #include <unistd.h>
7 #include <cassert>
8 #include <iostream>
9 
10 using namespace ots;
11 
12 //==============================================================================
14  std::string supervisorApplicationUID,
15  std::string bufferUID,
16  std::string processorUID,
17  const ConfigurationTree& theXDAQContextConfigTree,
18  const std::string& configurationPath)
19  : WorkLoop(processorUID)
20  , TCPDataStreamerBase(theXDAQContextConfigTree.getNode(configurationPath)
21  .getNode("StreamToPort")
22  .getValue<unsigned int>())
23  , DataConsumer(
24  supervisorApplicationUID, bufferUID, processorUID, HighConsumerPriority)
25  , Configurable(theXDAQContextConfigTree, configurationPath)
29 {
30  std::cout << __COUT_HDR_FL__ << __PRETTY_FUNCTION__ << std::endl;
31  std::cout << __COUT_HDR_FL__ << __PRETTY_FUNCTION__ << std::endl;
32  std::cout << __COUT_HDR_FL__ << __PRETTY_FUNCTION__ << std::endl;
33  std::cout << __COUT_HDR_FL__ << __PRETTY_FUNCTION__ << std::endl;
34  std::cout << __COUT_HDR_FL__ << __PRETTY_FUNCTION__ << std::endl;
35  std::cout << __COUT_HDR_FL__ << __PRETTY_FUNCTION__ << std::endl;
36  // Socket::initialize(); //dont call this! TCPDataStreamer() calls it
37  std::cout << __COUT_HDR_FL__ << __PRETTY_FUNCTION__ << "done!" << std::endl;
38 }
39 
40 //==============================================================================
41 TCPDataStreamerConsumer::~TCPDataStreamerConsumer(void) {}
42 
43 //==============================================================================
44 bool TCPDataStreamerConsumer::workLoopThread(toolbox::task::WorkLoop* workLoop)
45 {
46  fastRead();
47  return WorkLoop::continueWorkLoop_;
48 }
49 
50 //==============================================================================
51 void TCPDataStreamerConsumer::fastRead(void)
52 {
53  // std::cout << __COUT_HDR_FL__ << __PRETTY_FUNCTION__ << processorUID_ << " running!"
54  // << std::endl;
55  if(DataConsumer::read(dataP_, headerP_) < 0)
56  {
57  usleep(100);
58  return;
59  }
60  // unsigned int reconverted = (((*headerP_)["IPAddress"][0]&0xff)<<24) +
61  // (((*headerP_)["IPAddress"][1]&0xff)<<16) + (((*headerP_)["IPAddress"][2]&0xff)<<8)
62  // +
63  // ((*headerP_)["IPAddress"][3]&0xff); std::cout << __COUT_HDR_FL__ <<
64  // __PRETTY_FUNCTION__ << processorUID_ << " -> Got some data. From: " << std::hex <<
65  // reconverted << std::dec << std::endl;
66 
67  // std::cout << __COUT_HDR_FL__ << dataP_->length() << std::endl;
68  TCPDataStreamerBase::send(*dataP_);
69  DataConsumer::setReadSubBuffer<std::string, std::map<std::string, std::string>>();
70 }
71 
72 //==============================================================================
73 void TCPDataStreamerConsumer::slowRead(void)
74 {
75  // std::cout << __COUT_HDR_FL__ << __PRETTY_FUNCTION__ << processorUID_ << " running!"
76  // << std::endl; This is making a copy!!!
77  if(DataConsumer::read(data_, header_) < 0)
78  {
79  usleep(1000);
80  return;
81  }
82  // unsigned int reconverted = ((header_["IPAddress"][0]&0xff)<<24) +
83  // ((header_["IPAddress"][1]&0xff)<<16) + ((header_["IPAddress"][2]&0xff)<<8) +
84  // (header_["IPAddress"][3]&0xff); std::cout << __COUT_HDR_FL__ <<
85  // __PRETTY_FUNCTION__
86  // << processorUID_ << " -> Got some data. From: " << std::hex << reconverted <<
87  // std::dec << std::endl;
88 
89  TCPDataStreamerBase::send(data_);
90 }
91 
92 DEFINE_OTS_PROCESSOR(TCPDataStreamerConsumer)
int read(D &buffer, H &header)
Copies the buffer into the passed parameters.
Definition: DataConsumer.h:38
std::string data_
For slow read.
TCPDataStreamerConsumer(std::string supervisorApplicationUID, std::string bufferUID, std::string processorUID, const ConfigurationTree &theXDAQContextConfigTree, const std::string &configurationPath)
std::string * dataP_
For fast read.