8 #include "art/Framework/Core/EDAnalyzer.h" 
    9 #include "art/Framework/Core/ModuleMacros.h" 
   10 #include "art/Framework/Principal/Event.h" 
   11 #include "art/Framework/Principal/Handle.h" 
   12 #include "art/Framework/Principal/Run.h" 
   13 #include "canvas/Utilities/Exception.h" 
   15 #include "artdaq-core/Data/Fragment.hh" 
   16 #include "artdaq-ots/Overlays/UDPFragment.hh" 
   17 #include "otsdaq/Macros/CoutMacros.h" 
   19 #include <boost/asio.hpp> 
   20 using boost::asio::ip::udp;
 
   22 #include <arpa/inet.h> 
   39     virtual void analyze(art::Event 
const& evt) 
override;
 
   40     virtual void beginRun(art::Run 
const& run) 
override;
 
   44     std::string             raw_data_label_;
 
   45     std::string             frag_type_;
 
   46     boost::asio::io_service io_service_;
 
   48     udp::endpoint           remote_endpoint_;
 
   53 ots::JSONDispatcher::JSONDispatcher(fhicl::ParameterSet 
const& pset)
 
   54     : art::EDAnalyzer(pset)
 
   55     , prescale_(pset.get<int>(
"prescale", 1))
 
   56     , raw_data_label_(pset.get<std::string>(
"raw_data_label", 
"daq"))
 
   57     , frag_type_(pset.get<std::string>(
"frag_type", 
"UDP"))
 
   59     , socket_(io_service_, udp::v4())
 
   61     std::cout << __COUT_HDR_FL__ << 
"JSONDispatcher Constructor Start" << std::endl;
 
   62     int port = pset.get<
int>(
"port", 35555);
 
   66     boost::system::error_code ec;
 
   67     socket_.set_option(boost::asio::socket_base::reuse_address(
true), ec);
 
   70         TLOG(TLVL_ERROR, 
"JSONDispatcher")
 
   71             << 
"An error occurred setting reuse_address: " << ec.message() << std::endl;
 
   75     socket_.set_option(boost::asio::socket_base::broadcast(
true), ec);
 
   78         TLOG(TLVL_ERROR, 
"JSONDispatcher")
 
   79             << 
"An error occurred setting broadcast: " << ec.message() << std::endl;
 
   84     remote_endpoint_ = udp::endpoint(boost::asio::ip::address_v4::broadcast(), port);
 
   85     std::cout << __COUT_HDR_FL__ << 
"JSONDispatcher Constructor End" << std::endl;
 
   88 ots::JSONDispatcher::~JSONDispatcher() {}
 
   90 void ots::JSONDispatcher::beginRun(art::Run 
const& run)
 
   92     std::cout << __COUT_HDR_FL__ << 
"JSONDispatcher beginning run " << run.run()
 
   96 void ots::JSONDispatcher::analyze(art::Event 
const& evt)
 
  100     art::EventNumber_t eventNumber = evt.event();
 
  101     TLOG(TLVL_INFO, 
"JSONDispatcher")
 
  102         << 
"Received event with sequence ID " << eventNumber;
 
  103     if((
int)eventNumber % prescale_ == 0)
 
  107         std::ostringstream outputJSON;
 
  108         outputJSON << 
"{\"run\":" << std::to_string(evt.run())
 
  109                    << 
",\"subrun\":" << std::to_string(evt.subRun())
 
  110                    << 
",\"event\":" << std::to_string(eventNumber);
 
  120         art::Handle<artdaq::Fragments> raw;
 
  121         evt.getByLabel(raw_data_label_, frag_type_, raw);
 
  122         outputJSON << 
",\"fragments\":[";
 
  128             for(
size_t idx = 0; idx < raw->size(); ++idx)
 
  135                 const auto& frag((*raw)[idx]);
 
  140                 if(frag.hasMetadata())
 
  142                     outputJSON << 
"\"metadata\":{";
 
  145                     outputJSON << 
"\"port\":" << std::to_string(md->port) << 
",";
 
  146                     char               buf[
sizeof(in_addr)];
 
  147                     struct sockaddr_in addr;
 
  148                     addr.sin_addr.s_addr = md->address;
 
  149                     inet_ntop(AF_INET, &(addr.sin_addr), buf, INET_ADDRSTRLEN);
 
  150                     outputJSON << 
"\"address\":\"" << std::string(buf) << 
"\"";
 
  153                 outputJSON << 
"\"header\":{";
 
  154                 outputJSON << 
"\"event_size\":" << std::to_string(bb.hdr_event_size())
 
  156                 outputJSON << 
"\"data_type\":" << std::to_string(bb.hdr_data_type());
 
  157                 type = bb.hdr_data_type();
 
  159                 outputJSON << 
"\"data\":";
 
  160                 if(type == 0 || type > 2)
 
  163                     auto it = bb.dataBegin();
 
  164                     outputJSON << std::hex << 
"\"0x" << (int)*it << 
"\"" << std::dec;
 
  167                     for(; it != bb.dataEnd(); ++it)
 
  169                         outputJSON << 
"," << std::hex << 
"\"0x" << (int)*it << 
"\"" 
  180                     std::string output = std::string((
const char*)bb.dataBegin());
 
  183                         std::string find    = 
"\"";
 
  184                         std::string replace = 
"\\\"";
 
  185                         for(std::string::size_type i = 0;
 
  186                             (i = output.find(find, i)) != std::string::npos;)
 
  188                             output.replace(i, find.length(), replace);
 
  189                             i += replace.length();
 
  192                     outputJSON << output;
 
  209         std::string s = outputJSON.str() + 
"\0";
 
  211         auto message = boost::asio::buffer(s);
 
  214         socket_.send_to(message, remote_endpoint_);