tdaq-develop-2025-02-12
JSONDispatcher_module.cc
1 
8 #include "art/Framework/Core/EDAnalyzer.h"
9 #include "art/Framework/Core/ModuleMacros.h"
10 #include "art/Framework/Principal/Event.h"
11 #include "art/Framework/Principal/Handle.h"
12 #include "art/Framework/Principal/Run.h"
13 #include "canvas/Utilities/Exception.h"
14 
15 #include "artdaq-core/Data/Fragment.hh"
16 #include "artdaq-ots/Overlays/UDPFragment.hh"
17 #include "otsdaq/Macros/CoutMacros.h"
18 
19 #include <boost/asio.hpp>
20 using boost::asio::ip::udp;
21 
22 #include <arpa/inet.h>
23 #include <algorithm>
24 #include <cassert>
25 #include <cmath>
26 #include <fstream>
27 #include <iomanip>
28 #include <iostream>
29 #include <vector>
30 
31 namespace ots
32 {
33 class JSONDispatcher : public art::EDAnalyzer
34 {
35  public:
36  explicit JSONDispatcher(fhicl::ParameterSet const& pset);
37  virtual ~JSONDispatcher();
38 
39  virtual void analyze(art::Event const& evt) override;
40  virtual void beginRun(art::Run const& run) override;
41 
42  private:
43  int prescale_;
44  std::string raw_data_label_;
45  std::string frag_type_;
46  boost::asio::io_service io_service_;
47  udp::socket socket_;
48  udp::endpoint remote_endpoint_;
49 };
50 
51 } // namespace ots
52 
53 ots::JSONDispatcher::JSONDispatcher(fhicl::ParameterSet const& pset)
54  : art::EDAnalyzer(pset)
55  , prescale_(pset.get<int>("prescale", 1))
56  , raw_data_label_(pset.get<std::string>("raw_data_label", "daq"))
57  , frag_type_(pset.get<std::string>("frag_type", "UDP"))
58  , io_service_()
59  , socket_(io_service_, udp::v4())
60 {
61  std::cout << __COUT_HDR_FL__ << "JSONDispatcher Constructor Start" << std::endl;
62  int port = pset.get<int>("port", 35555);
63  // std::cout << __COUT_HDR_FL__ << "JSONDispatcher port is " << std::to_string(port)
64  // << std::endl; std::cout << __COUT_HDR_FL__ << "JSONDispatcher setting
65  // reuse_address option" << std::endl;
66  boost::system::error_code ec;
67  socket_.set_option(boost::asio::socket_base::reuse_address(true), ec);
68  if(ec)
69  {
70  TLOG(TLVL_ERROR, "JSONDispatcher")
71  << "An error occurred setting reuse_address: " << ec.message() << std::endl;
72  }
73  // std::cout << __COUT_HDR_FL__ << "JSONDispatcher setting broadcast option" <<
74  // std::endl;
75  socket_.set_option(boost::asio::socket_base::broadcast(true), ec);
76  if(ec)
77  {
78  TLOG(TLVL_ERROR, "JSONDispatcher")
79  << "An error occurred setting broadcast: " << ec.message() << std::endl;
80  }
81 
82  // std::cout << __COUT_HDR_FL__ << "JSONDispatcher gettting UDP endpoint" <<
83  // std::endl;
84  remote_endpoint_ = udp::endpoint(boost::asio::ip::address_v4::broadcast(), port);
85  std::cout << __COUT_HDR_FL__ << "JSONDispatcher Constructor End" << std::endl;
86 }
87 
88 ots::JSONDispatcher::~JSONDispatcher() {}
89 
90 void ots::JSONDispatcher::beginRun(art::Run const& run)
91 {
92  std::cout << __COUT_HDR_FL__ << "JSONDispatcher beginning run " << run.run()
93  << std::endl;
94 }
95 
96 void ots::JSONDispatcher::analyze(art::Event const& evt)
97 {
98  // std::cout << __COUT_HDR_FL__ << "JSONDispatcher getting event number to check
99  // prescale" << std::endl;
100  art::EventNumber_t eventNumber = evt.event();
101  TLOG(TLVL_INFO, "JSONDispatcher")
102  << "Received event with sequence ID " << eventNumber;
103  if((int)eventNumber % prescale_ == 0)
104  {
105  // std::cout << __COUT_HDR_FL__ << "JSONDispatcher dispatching event" <<
106  // std::endl;
107  std::ostringstream outputJSON;
108  outputJSON << "{\"run\":" << std::to_string(evt.run())
109  << ",\"subrun\":" << std::to_string(evt.subRun())
110  << ",\"event\":" << std::to_string(eventNumber);
111 
112  // ***********************
113  // *** UDP Fragments ***
114  // ***********************
115 
116  // look for raw UDP data
117 
118  // std::cout << __COUT_HDR_FL__ << "JSONDispatcher getting handle on Fragments" <<
119  // std::endl;
120  art::Handle<artdaq::Fragments> raw;
121  evt.getByLabel(raw_data_label_, frag_type_, raw);
122  outputJSON << ",\"fragments\":[";
123 
124  if(raw.isValid())
125  {
126  // std::cout << __COUT_HDR_FL__ << "JSONDispatcher dumping UDPFragments" <<
127  // std::endl;
128  for(size_t idx = 0; idx < raw->size(); ++idx)
129  {
130  if(idx > 0)
131  {
132  outputJSON << ",";
133  }
134  outputJSON << "{";
135  const auto& frag((*raw)[idx]);
136 
137  ots::UDPFragment bb(frag);
138  int type = 0;
139 
140  if(frag.hasMetadata())
141  {
142  outputJSON << "\"metadata\":{";
143  // std::cout << __COUT_HDR_FL__ << "Fragment metadata: " << std::endl;
144  auto md = frag.metadata<ots::UDPFragment::Metadata>();
145  outputJSON << "\"port\":" << std::to_string(md->port) << ",";
146  char buf[sizeof(in_addr)];
147  struct sockaddr_in addr;
148  addr.sin_addr.s_addr = md->address;
149  inet_ntop(AF_INET, &(addr.sin_addr), buf, INET_ADDRSTRLEN);
150  outputJSON << "\"address\":\"" << std::string(buf) << "\"";
151  outputJSON << "},";
152  }
153  outputJSON << "\"header\":{";
154  outputJSON << "\"event_size\":" << std::to_string(bb.hdr_event_size())
155  << ",";
156  outputJSON << "\"data_type\":" << std::to_string(bb.hdr_data_type());
157  type = bb.hdr_data_type();
158  outputJSON << "},";
159  outputJSON << "\"data\":";
160  if(type == 0 || type > 2)
161  {
162  outputJSON << "[";
163  auto it = bb.dataBegin();
164  outputJSON << std::hex << "\"0x" << (int)*it << "\"" << std::dec;
165  ++it;
166 
167  for(; it != bb.dataEnd(); ++it)
168  {
169  outputJSON << "," << std::hex << "\"0x" << (int)*it << "\""
170  << std::dec;
171  }
172  outputJSON << "]";
173  }
174  else
175  {
176  if(type == 2)
177  {
178  outputJSON << "\"";
179  }
180  std::string output = std::string((const char*)bb.dataBegin());
181  if(type == 2)
182  {
183  std::string find = "\"";
184  std::string replace = "\\\"";
185  for(std::string::size_type i = 0;
186  (i = output.find(find, i)) != std::string::npos;)
187  {
188  output.replace(i, find.length(), replace);
189  i += replace.length();
190  }
191  }
192  outputJSON << output;
193  if(type == 2)
194  {
195  outputJSON << "\"";
196  }
197  }
198  outputJSON << "}";
199  }
200  }
201  else
202  {
203  return;
204  }
205 
206  outputJSON << "]}";
207  // std::cout << __COUT_HDR_FL__ <<"JSONDispatcher filling JSON into buffer" <<
208  // std::endl;
209  std::string s = outputJSON.str() + "\0";
210  // std::cout << __COUT_HDR_FL__ << "JSONDispatcher output: " << s << std::endl;
211  auto message = boost::asio::buffer(s);
212  // std::cout << __COUT_HDR_FL__ <<"JSONDispatcher broadcasting JSON data" <<
213  // std::endl;
214  socket_.send_to(message, remote_endpoint_);
215  // std::cout << __COUT_HDR_FL__ << "JSONDispatcher done with event" << std::endl;
216  }
217 }
218 
219 DEFINE_ART_MODULE(ots::JSONDispatcher)