otsdaq  v2_05_02_indev
UDPDataStreamerConsumer_processor.cc
1 #include "otsdaq/DataProcessorPlugins/UDPDataStreamerConsumer.h"
2 #include "otsdaq/Macros/CoutMacros.h"
3 #include "otsdaq/Macros/ProcessorPluginMacros.h"
4 #include "otsdaq/MessageFacility/MessageFacility.h"
5 
6 #include <unistd.h>
7 #include <cassert>
8 #include <iostream>
9 
10 using namespace ots;
11 
12 //==============================================================================
13 UDPDataStreamerConsumer::UDPDataStreamerConsumer(std::string supervisorApplicationUID,
14  std::string bufferUID,
15  std::string processorUID,
16  const ConfigurationTree& theXDAQContextConfigTree,
17  const std::string& configurationPath)
18  : Socket(theXDAQContextConfigTree.getNode(configurationPath).getNode("HostIPAddress").getValue<std::string>(),
19  theXDAQContextConfigTree.getNode(configurationPath).getNode("HostPort").getValue<unsigned int>())
20  , WorkLoop(processorUID)
21  , UDPDataStreamerBase(theXDAQContextConfigTree.getNode(configurationPath).getNode("HostIPAddress").getValue<std::string>(),
22  theXDAQContextConfigTree.getNode(configurationPath).getNode("HostPort").getValue<unsigned int>(),
23  theXDAQContextConfigTree.getNode(configurationPath).getNode("StreamToIPAddress").getValue<std::string>(),
24  theXDAQContextConfigTree.getNode(configurationPath).getNode("StreamToPort").getValue<unsigned int>())
25  , DataConsumer(supervisorApplicationUID, bufferUID, processorUID, HighConsumerPriority)
26  , Configurable(theXDAQContextConfigTree, configurationPath)
27 //, Socket ("192.168.133.1", 47200)
28 //, DataConsumer ("ARTDAQDataManager", 1, "ARTDAQBuffer", "ARTDAQDataStreamer0",
29 // HighConsumerPriority) , streamToSocket_("192.168.133.1", 50100)
30 {
31  // Socket::initialize(); //dont call this! UDPDataStreamerBase() calls it
32  __COUT__ << "done!" << std::endl;
33 }
34 
35 //==============================================================================
36 UDPDataStreamerConsumer::~UDPDataStreamerConsumer(void) {}
37 
38 //==============================================================================
39 bool UDPDataStreamerConsumer::workLoopThread(toolbox::task::WorkLoop* /*workLoop*/)
40 {
41  fastRead();
42  return WorkLoop::continueWorkLoop_;
43 }
44 
45 //==============================================================================
46 void UDPDataStreamerConsumer::fastRead(void)
47 {
48  //__COUT__ << processorUID_ << " running!" << std::endl;
49  if(DataConsumer::read(dataP_, headerP_) < 0)
50  {
51  usleep(100);
52  return;
53  }
54  // unsigned int reconverted = (((*headerP_)["IPAddress"][0]&0xff)<<24) +
55  // (((*headerP_)["IPAddress"][1]&0xff)<<16) + (((*headerP_)["IPAddress"][2]&0xff)<<8)
56  // +
57  // ((*headerP_)["IPAddress"][3]&0xff);
58  //__COUT__ << processorUID_ << " -> Got some data. From: " << std::hex << reconverted
59  //<< std::dec << std::endl;
60 
61  // std::cout << __COUT_HDR_FL__ << dataP_->length() << std::endl;
62  TransmitterSocket::send(streamToSocket_, *dataP_);
63  DataConsumer::setReadSubBuffer<std::string, std::map<std::string, std::string>>();
64 }
65 
66 //==============================================================================
67 void UDPDataStreamerConsumer::slowRead(void)
68 {
69  //__COUT__ << processorUID_ << " running!" << std::endl;
70  // This is making a copy!!!
71  if(DataConsumer::read(data_, header_) < 0)
72  {
73  usleep(1000);
74  return;
75  }
76  // unsigned int reconverted = ((header_["IPAddress"][0]&0xff)<<24) +
77  // ((header_["IPAddress"][1]&0xff)<<16) + ((header_["IPAddress"][2]&0xff)<<8) +
78  // (header_["IPAddress"][3]&0xff);
79  //__COUT__ << processorUID_ << " -> Got some data. From: " << std::hex << reconverted
80  //<< std::dec << std::endl;
81 
82  TransmitterSocket::send(streamToSocket_, data_);
83 }
84 
85 DEFINE_OTS_PROCESSOR(UDPDataStreamerConsumer)