otsdaq  v2_05_02_indev
TCPDataStreamerConsumer_processor.cc
1 #include "otsdaq/DataProcessorPlugins/TCPDataStreamerConsumer.h"
2 #include "otsdaq/Macros/CoutMacros.h"
3 #include "otsdaq/Macros/ProcessorPluginMacros.h"
4 #include "otsdaq/MessageFacility/MessageFacility.h"
5 
6 #include <unistd.h>
7 #include <cassert>
8 #include <iostream>
9 
10 using namespace ots;
11 
12 //==============================================================================
13 TCPDataStreamerConsumer::TCPDataStreamerConsumer(std::string supervisorApplicationUID,
14  std::string bufferUID,
15  std::string processorUID,
16  const ConfigurationTree& theXDAQContextConfigTree,
17  const std::string& configurationPath)
18  : WorkLoop(processorUID)
19  , TCPDataStreamerBase(theXDAQContextConfigTree.getNode(configurationPath).getNode("StreamToPort").getValue<unsigned int>())
20  , DataConsumer(supervisorApplicationUID, bufferUID, processorUID, HighConsumerPriority)
21  , Configurable(theXDAQContextConfigTree, configurationPath)
22 //, Socket ("192.168.133.1", 47200)
23 //, DataConsumer ("ARTDAQDataManager", 1, "ARTDAQBuffer", "ARTDAQDataStreamer0",
24 // HighConsumerPriority) , streamToSocket_("192.168.133.1", 50100)
25 {
26  std::cout << __COUT_HDR_FL__ << __PRETTY_FUNCTION__ << std::endl;
27  std::cout << __COUT_HDR_FL__ << __PRETTY_FUNCTION__ << std::endl;
28  std::cout << __COUT_HDR_FL__ << __PRETTY_FUNCTION__ << std::endl;
29  std::cout << __COUT_HDR_FL__ << __PRETTY_FUNCTION__ << std::endl;
30  std::cout << __COUT_HDR_FL__ << __PRETTY_FUNCTION__ << std::endl;
31  std::cout << __COUT_HDR_FL__ << __PRETTY_FUNCTION__ << std::endl;
32  // Socket::initialize(); //dont call this! TCPDataStreamer() calls it
33  std::cout << __COUT_HDR_FL__ << __PRETTY_FUNCTION__ << "done!" << std::endl;
34 }
35 
36 //==============================================================================
37 TCPDataStreamerConsumer::~TCPDataStreamerConsumer(void) {}
38 
39 //==============================================================================
40 bool TCPDataStreamerConsumer::workLoopThread(toolbox::task::WorkLoop* workLoop)
41 {
42  fastRead();
43  return WorkLoop::continueWorkLoop_;
44 }
45 
46 //==============================================================================
47 void TCPDataStreamerConsumer::fastRead(void)
48 {
49  // std::cout << __COUT_HDR_FL__ << __PRETTY_FUNCTION__ << processorUID_ << " running!"
50  // << std::endl;
51  if(DataConsumer::read(dataP_, headerP_) < 0)
52  {
53  usleep(100);
54  return;
55  }
56  // unsigned int reconverted = (((*headerP_)["IPAddress"][0]&0xff)<<24) +
57  // (((*headerP_)["IPAddress"][1]&0xff)<<16) + (((*headerP_)["IPAddress"][2]&0xff)<<8)
58  // +
59  // ((*headerP_)["IPAddress"][3]&0xff); std::cout << __COUT_HDR_FL__ <<
60  // __PRETTY_FUNCTION__ << processorUID_ << " -> Got some data. From: " << std::hex <<
61  // reconverted << std::dec << std::endl;
62 
63  // std::cout << __COUT_HDR_FL__ << dataP_->length() << std::endl;
64  TCPDataStreamerBase::send(*dataP_);
65  DataConsumer::setReadSubBuffer<std::string, std::map<std::string, std::string>>();
66 }
67 
68 //==============================================================================
69 void TCPDataStreamerConsumer::slowRead(void)
70 {
71  // std::cout << __COUT_HDR_FL__ << __PRETTY_FUNCTION__ << processorUID_ << " running!"
72  // << std::endl; This is making a copy!!!
73  if(DataConsumer::read(data_, header_) < 0)
74  {
75  usleep(1000);
76  return;
77  }
78  // unsigned int reconverted = ((header_["IPAddress"][0]&0xff)<<24) +
79  // ((header_["IPAddress"][1]&0xff)<<16) + ((header_["IPAddress"][2]&0xff)<<8) +
80  // (header_["IPAddress"][3]&0xff); std::cout << __COUT_HDR_FL__ <<
81  // __PRETTY_FUNCTION__
82  // << processorUID_ << " -> Got some data. From: " << std::hex << reconverted <<
83  // std::dec << std::endl;
84 
85  TCPDataStreamerBase::send(data_);
86 }
87 
88 DEFINE_OTS_PROCESSOR(TCPDataStreamerConsumer)