otsdaq_mu2e  v2_04_02
DataGenReceiver_generator.cc
1 #include "otsdaq-demo/Generators/DataGenReceiver.hh"
2 
3 //#include "art/Utilities/Exception.h"
4 #include "artdaq-core/Utilities/SimpleLookupPolicy.h"
5 #include "artdaq/Application/GeneratorMacros.hh"
6 #include "cetlib/exception.h"
7 #include "fhiclcpp/ParameterSet.h"
8 #include "otsdaq-demo/Overlays/DataGenFragmentWriter.hh"
9 #include "otsdaq-demo/Overlays/FragmentType.hh"
10 
11 #include <sys/poll.h>
12 #include <fstream>
13 #include <iomanip>
14 #include <iostream>
15 #include <iterator>
16 
17 //==============================================================================
18 ots::DataGenReceiver::DataGenReceiver(fhicl::ParameterSet const& ps)
19  : WorkLoop("DataGenReceiver")
20  , DataConsumer(ps.get<std::string>("SupervisorApplicationUID", "ARTDAQDataManager"),
21  ps.get<std::string>("BufferUID", "ARTDAQBuffer"),
22  ps.get<std::string>("ProcessorUID", "DataGenReceiver"),
23  HighConsumerPriority)
24  , CommandableFragmentGenerator(ps)
25  , rawOutput_(ps.get<bool>("raw_output_enabled", false))
26  , rawPath_(ps.get<std::string>("raw_output_path", "/tmp"))
27  , dataport_(ps.get<int>("port", 6343))
28  , ip_(ps.get<std::string>("ip", "127.0.0.1"))
29  , expectedPacketNumber_(0)
30  , sendCommands_(ps.get<bool>("send_OtsUDP_commands", false))
31  , fragmentWindow_(ps.get<double>("fragment_time_window_ms", 1000))
32  , lastFrag_(std::chrono::high_resolution_clock::now())
33 {
34  std::cout << __COUT_HDR_FL__ << __PRETTY_FUNCTION__ << std::endl;
35  mf::LogInfo("DataGenReceiver")
36  << "MY TRIGGER MODE IS: " << ps.get<std::string>("trigger_mode", "UNDEFINED!!!");
37  std::cout << __COUT_HDR_FL__ << __PRETTY_FUNCTION__ << "registering to buffer!"
38  << std::endl;
39  registerToBuffer();
40 }
41 
42 //==============================================================================
43 bool ots::DataGenReceiver::getNext_(artdaq::FragmentPtrs& output)
44 {
45  //__COUT__ << "READING DATA!" << std::endl;
46  if(should_stop())
47  return false;
48 
49  // unsigned long block;
50  if(read<std::string, std::map<std::string, std::string>>(buffer_) < 0)
51  usleep(10000);
52  else
53  {
54  // std::cout << __COUT_HDR_FL__ << __PRETTY_FUNCTION__ << name_ << " Buffer:
55  // " << buffer << std::endl;
56  unsigned long long value;
57  memcpy((void*)&value, (void*)buffer_.substr(2).data(),
58  8); // make data counter
59  std::cout << __COUT_HDR_FL__ << __PRETTY_FUNCTION__ << std::hex << value
60  << std::dec << std::endl;
61  ProcessData_(output);
62  }
63  return true;
64 }
65 
66 //==============================================================================
67 void ots::DataGenReceiver::ProcessData_(artdaq::FragmentPtrs& frags)
68 {
69  std::cout << __COUT_HDR_FL__ << __PRETTY_FUNCTION__ << std::endl;
70  unsigned long long value;
71  memcpy((void*)&value, (void*)buffer_.substr(2).data(),
72  8); // make data counter
73  std::cout << __COUT_HDR_FL__ << __PRETTY_FUNCTION__ << std::hex << value << std::dec
74  << std::endl;
75 
77  // metadata.port = dataport_;
78  // metadata.address = si_data_.sin_addr.s_addr;
79  metadata.port = 2000;
80  metadata.address = 0xc0aabb11;
81 
82  std::ofstream output;
83  // std::cout << __COUT_HDR_FL__ << "SAVING FILE??? " << rawOutput_ <<
84  // std::endl;
85  if(rawOutput_)
86  {
87  std::string outputPath = rawPath_ + "/DataGenReceiver-" + ip_ + ":" +
88  std::to_string(dataport_) + ".bin";
89  // std::cout << __COUT_HDR_FL__ << "FILE DATA PATH: " << outputPath <<
90  // std::endl;
91  output.open(outputPath, std::ios::out | std::ios::app | std::ios::binary);
92  }
93 
94  mf::LogInfo("DataGenReceiver")
95  << "Starting DataGenReceiver Packet Processing Loop" << std::endl;
96  // for( auto packet = packetBuffers_.begin(); packet !=
97  // packetBuffers_.end(); ++packet ) {
98 
99  std::size_t initial_payload_size = 0;
100 
101  frags.emplace_back(artdaq::Fragment::FragmentBytes(initial_payload_size,
102  ev_counter(),
103  fragment_id(),
104  ots::detail::FragmentType::DataGen,
105  metadata));
106  ev_counter_inc();
107 
108  //******* WE BUILD THE EVENT *************
109  // We now have a fragment to contain this event:
110  ots::DataGenFragmentWriter thisFrag(*frags.back());
111  thisFrag.resize(sizeof(uint8_t) * 64050);
112  std::cout << __COUT_HDR_FL__ << "DJN2 after, datasizebytes=" << thisFrag.asksize()
113  << std::endl
114  << std::endl;
115 
116  memcpy(thisFrag.dataBegin(), (&buffer_[2]), sizeof(DataGenFragment::DataBlob));
117 
118  //******* DONE BUILDING THE EVENT *************
119  if(rawOutput_)
120  {
121  output.write(&buffer_[0], sizeof(DataGenFragment::DataBlob));
122  }
123  // }
124  mf::LogInfo("DataGenReceiver") << "Done with DataGenReceiver Packet "
125  "Processing Loop. Deleting PacketBuffers"
126  << std::endl;
127  // packetBuffers_.clear();
128  return;
129 }
130 
131 //==============================================================================
132 void ots::DataGenReceiver::start()
133 {
134 #pragma message "Using default implementation of DataGenReceiver::start_()"
135 }
136 
137 //==============================================================================
138 void ots::DataGenReceiver::stop()
139 {
140 #pragma message "Using default implementation of DataGenReceiver::stop()"
141 }
142 
143 //==============================================================================
144 void ots::DataGenReceiver::stopNoMutex(){
145 #pragma message "Using default implementation of DataGenReceiver::stopNoMutex()"
146 }
147 
148 // The following macro is defined in artdaq's GeneratorMacros.hh header
149 DEFINE_ARTDAQ_COMMANDABLE_GENERATOR(ots::DataGenReceiver)