otsdaq  v2_05_02_indev
JSONDispatcher_module.cc
1 // Class: UDPDump
3 // Module Type: analyzer
4 // File: UDPDump_module.cc
5 // Description: Prints out information about each event.
7 
8 #include "art/Framework/Core/EDAnalyzer.h"
9 #include "art/Framework/Core/ModuleMacros.h"
10 #include "art/Framework/Principal/Event.h"
11 #include "art/Framework/Principal/Handle.h"
12 #include "art/Framework/Principal/Run.h"
13 #include "canvas/Utilities/Exception.h"
14 
15 #include "artdaq-core/Data/Fragment.hh"
16 #include "artdaq-ots/Overlays/UDPFragment.hh"
17 #include "otsdaq/Macros/CoutMacros.h"
18 
19 #include <boost/asio.hpp>
20 using boost::asio::ip::udp;
21 
22 #include <arpa/inet.h>
23 #include <algorithm>
24 #include <cassert>
25 #include <cmath>
26 #include <fstream>
27 #include <iomanip>
28 #include <iostream>
29 #include <vector>
30 
31 namespace ots
32 {
33 class JSONDispatcher : public art::EDAnalyzer
34 {
35  public:
36  explicit JSONDispatcher(fhicl::ParameterSet const& pset);
37  virtual ~JSONDispatcher();
38 
39  virtual void analyze(art::Event const& evt) override;
40  virtual void beginRun(art::Run const& run) override;
41 
42  private:
43  int prescale_;
44  std::string raw_data_label_;
45  std::string frag_type_;
46  boost::asio::io_service io_service_;
47  udp::socket socket_;
48  udp::endpoint remote_endpoint_;
49 };
50 
51 } // namespace ots
52 
53 ots::JSONDispatcher::JSONDispatcher(fhicl::ParameterSet const& pset)
54  : art::EDAnalyzer(pset)
55  , prescale_(pset.get<int>("prescale", 1))
56  , raw_data_label_(pset.get<std::string>("raw_data_label", "daq"))
57  , frag_type_(pset.get<std::string>("frag_type", "UDP"))
58  , io_service_()
59  , socket_(io_service_, udp::v4())
60 {
61  std::cout << __COUT_HDR_FL__ << "JSONDispatcher Constructor Start" << std::endl;
62  int port = pset.get<int>("port", 35555);
63  // std::cout << __COUT_HDR_FL__ << "JSONDispatcher port is " << std::to_string(port)
64  // << std::endl; std::cout << __COUT_HDR_FL__ << "JSONDispatcher setting
65  // reuse_address option" << std::endl;
66  boost::system::error_code ec;
67  socket_.set_option(boost::asio::socket_base::reuse_address(true), ec);
68  if(ec)
69  {
70  TLOG(TLVL_ERROR, "JSONDispatcher") << "An error occurred setting reuse_address: " << ec.message() << std::endl;
71  }
72  // std::cout << __COUT_HDR_FL__ << "JSONDispatcher setting broadcast option" <<
73  // std::endl;
74  socket_.set_option(boost::asio::socket_base::broadcast(true), ec);
75  if(ec)
76  {
77  TLOG(TLVL_ERROR, "JSONDispatcher") << "An error occurred setting broadcast: " << ec.message() << std::endl;
78  }
79 
80  // std::cout << __COUT_HDR_FL__ << "JSONDispatcher gettting UDP endpoint" <<
81  // std::endl;
82  remote_endpoint_ = udp::endpoint(boost::asio::ip::address_v4::broadcast(), port);
83  std::cout << __COUT_HDR_FL__ << "JSONDispatcher Constructor End" << std::endl;
84 }
85 
86 ots::JSONDispatcher::~JSONDispatcher() {}
87 
88 void ots::JSONDispatcher::beginRun(art::Run const& run) { std::cout << __COUT_HDR_FL__ << "JSONDispatcher beginning run " << run.run() << std::endl; }
89 
90 void ots::JSONDispatcher::analyze(art::Event const& evt)
91 {
92  // std::cout << __COUT_HDR_FL__ << "JSONDispatcher getting event number to check
93  // prescale" << std::endl;
94  art::EventNumber_t eventNumber = evt.event();
95  TLOG(TLVL_INFO, "JSONDispatcher") << "Received event with sequence ID " << eventNumber;
96  if((int)eventNumber % prescale_ == 0)
97  {
98  // std::cout << __COUT_HDR_FL__ << "JSONDispatcher dispatching event" <<
99  // std::endl;
100  std::ostringstream outputJSON;
101  outputJSON << "{\"run\":" << std::to_string(evt.run()) << ",\"subrun\":" << std::to_string(evt.subRun())
102  << ",\"event\":" << std::to_string(eventNumber);
103 
104  // ***********************
105  // *** UDP Fragments ***
106  // ***********************
107 
108  // look for raw UDP data
109 
110  // std::cout << __COUT_HDR_FL__ << "JSONDispatcher getting handle on Fragments" <<
111  // std::endl;
112  art::Handle<artdaq::Fragments> raw;
113  evt.getByLabel(raw_data_label_, frag_type_, raw);
114  outputJSON << ",\"fragments\":[";
115 
116  if(raw.isValid())
117  {
118  // std::cout << __COUT_HDR_FL__ << "JSONDispatcher dumping UDPFragments" <<
119  // std::endl;
120  for(size_t idx = 0; idx < raw->size(); ++idx)
121  {
122  if(idx > 0)
123  {
124  outputJSON << ",";
125  }
126  outputJSON << "{";
127  const auto& frag((*raw)[idx]);
128 
129  ots::UDPFragment bb(frag);
130  int type = 0;
131 
132  if(frag.hasMetadata())
133  {
134  outputJSON << "\"metadata\":{";
135  // std::cout << __COUT_HDR_FL__ << "Fragment metadata: " << std::endl;
136  auto md = frag.metadata<ots::UDPFragment::Metadata>();
137  outputJSON << "\"port\":" << std::to_string(md->port) << ",";
138  char buf[sizeof(in_addr)];
139  struct sockaddr_in addr;
140  addr.sin_addr.s_addr = md->address;
141  inet_ntop(AF_INET, &(addr.sin_addr), buf, INET_ADDRSTRLEN);
142  outputJSON << "\"address\":\"" << std::string(buf) << "\"";
143  outputJSON << "},";
144  }
145  outputJSON << "\"header\":{";
146  outputJSON << "\"event_size\":" << std::to_string(bb.hdr_event_size()) << ",";
147  outputJSON << "\"data_type\":" << std::to_string(bb.hdr_data_type());
148  type = bb.hdr_data_type();
149  outputJSON << "},";
150  outputJSON << "\"data\":";
151  if(type == 0 || type > 2)
152  {
153  outputJSON << "[";
154  auto it = bb.dataBegin();
155  outputJSON << std::hex << "\"0x" << (int)*it << "\"" << std::dec;
156  ++it;
157 
158  for(; it != bb.dataEnd(); ++it)
159  {
160  outputJSON << "," << std::hex << "\"0x" << (int)*it << "\"" << std::dec;
161  }
162  outputJSON << "]";
163  }
164  else
165  {
166  if(type == 2)
167  {
168  outputJSON << "\"";
169  }
170  std::string output = std::string((const char*)bb.dataBegin());
171  if(type == 2)
172  {
173  std::string find = "\"";
174  std::string replace = "\\\"";
175  for(std::string::size_type i = 0; (i = output.find(find, i)) != std::string::npos;)
176  {
177  output.replace(i, find.length(), replace);
178  i += replace.length();
179  }
180  }
181  outputJSON << output;
182  if(type == 2)
183  {
184  outputJSON << "\"";
185  }
186  }
187  outputJSON << "}";
188  }
189  }
190  else
191  {
192  return;
193  }
194 
195  outputJSON << "]}";
196  // std::cout << __COUT_HDR_FL__ <<"JSONDispatcher filling JSON into buffer" <<
197  // std::endl;
198  std::string s = outputJSON.str() + "\0";
199  // std::cout << __COUT_HDR_FL__ << "JSONDispatcher output: " << s << std::endl;
200  auto message = boost::asio::buffer(s);
201  // std::cout << __COUT_HDR_FL__ <<"JSONDispatcher broadcasting JSON data" <<
202  // std::endl;
203  socket_.send_to(message, remote_endpoint_);
204  // std::cout << __COUT_HDR_FL__ << "JSONDispatcher done with event" << std::endl;
205  }
206 }
207 
208 DEFINE_ART_MODULE(ots::JSONDispatcher)