8 #include "art/Framework/Core/EDAnalyzer.h"
9 #include "art/Framework/Core/ModuleMacros.h"
10 #include "art/Framework/Principal/Event.h"
11 #include "art/Framework/Principal/Handle.h"
12 #include "art/Framework/Principal/Run.h"
13 #include "canvas/Utilities/Exception.h"
15 #include "artdaq-core/Data/Fragment.hh"
16 #include "artdaq-ots/Overlays/UDPFragment.hh"
17 #include "otsdaq/Macros/CoutMacros.h"
19 #include <boost/asio.hpp>
20 using boost::asio::ip::udp;
22 #include <arpa/inet.h>
39 virtual void analyze(art::Event
const& evt)
override;
40 virtual void beginRun(art::Run
const& run)
override;
44 std::string raw_data_label_;
45 std::string frag_type_;
46 boost::asio::io_service io_service_;
48 udp::endpoint remote_endpoint_;
53 ots::JSONDispatcher::JSONDispatcher(fhicl::ParameterSet
const& pset)
54 : art::EDAnalyzer(pset)
55 , prescale_(pset.get<int>(
"prescale", 1))
56 , raw_data_label_(pset.get<std::string>(
"raw_data_label",
"daq"))
57 , frag_type_(pset.get<std::string>(
"frag_type",
"UDP"))
59 , socket_(io_service_, udp::v4())
61 std::cout << __COUT_HDR_FL__ <<
"JSONDispatcher Constructor Start" << std::endl;
62 int port = pset.get<
int>(
"port", 35555);
66 boost::system::error_code ec;
67 socket_.set_option(boost::asio::socket_base::reuse_address(
true), ec);
70 TLOG(TLVL_ERROR,
"JSONDispatcher") <<
"An error occurred setting reuse_address: " << ec.message() << std::endl;
74 socket_.set_option(boost::asio::socket_base::broadcast(
true), ec);
77 TLOG(TLVL_ERROR,
"JSONDispatcher") <<
"An error occurred setting broadcast: " << ec.message() << std::endl;
82 remote_endpoint_ = udp::endpoint(boost::asio::ip::address_v4::broadcast(), port);
83 std::cout << __COUT_HDR_FL__ <<
"JSONDispatcher Constructor End" << std::endl;
86 ots::JSONDispatcher::~JSONDispatcher() {}
88 void ots::JSONDispatcher::beginRun(art::Run
const& run) { std::cout << __COUT_HDR_FL__ <<
"JSONDispatcher beginning run " << run.run() << std::endl; }
90 void ots::JSONDispatcher::analyze(art::Event
const& evt)
94 art::EventNumber_t eventNumber = evt.event();
95 TLOG(TLVL_INFO,
"JSONDispatcher") <<
"Received event with sequence ID " << eventNumber;
96 if((
int)eventNumber % prescale_ == 0)
100 std::ostringstream outputJSON;
101 outputJSON <<
"{\"run\":" << std::to_string(evt.run()) <<
",\"subrun\":" << std::to_string(evt.subRun())
102 <<
",\"event\":" << std::to_string(eventNumber);
112 art::Handle<artdaq::Fragments> raw;
113 evt.getByLabel(raw_data_label_, frag_type_, raw);
114 outputJSON <<
",\"fragments\":[";
120 for(
size_t idx = 0; idx < raw->size(); ++idx)
127 const auto& frag((*raw)[idx]);
132 if(frag.hasMetadata())
134 outputJSON <<
"\"metadata\":{";
137 outputJSON <<
"\"port\":" << std::to_string(md->port) <<
",";
138 char buf[
sizeof(in_addr)];
139 struct sockaddr_in addr;
140 addr.sin_addr.s_addr = md->address;
141 inet_ntop(AF_INET, &(addr.sin_addr), buf, INET_ADDRSTRLEN);
142 outputJSON <<
"\"address\":\"" << std::string(buf) <<
"\"";
145 outputJSON <<
"\"header\":{";
146 outputJSON <<
"\"event_size\":" << std::to_string(bb.hdr_event_size()) <<
",";
147 outputJSON <<
"\"data_type\":" << std::to_string(bb.hdr_data_type());
148 type = bb.hdr_data_type();
150 outputJSON <<
"\"data\":";
151 if(type == 0 || type > 2)
154 auto it = bb.dataBegin();
155 outputJSON << std::hex <<
"\"0x" << (int)*it <<
"\"" << std::dec;
158 for(; it != bb.dataEnd(); ++it)
160 outputJSON <<
"," << std::hex <<
"\"0x" << (int)*it <<
"\"" << std::dec;
170 std::string output = std::string((
const char*)bb.dataBegin());
173 std::string find =
"\"";
174 std::string replace =
"\\\"";
175 for(std::string::size_type i = 0; (i = output.find(find, i)) != std::string::npos;)
177 output.replace(i, find.length(), replace);
178 i += replace.length();
181 outputJSON << output;
198 std::string s = outputJSON.str() +
"\0";
200 auto message = boost::asio::buffer(s);
203 socket_.send_to(message, remote_endpoint_);