otsdaq  v2_05_02_indev
UDPReceiver.hh
1 #ifndef artdaq_ots_Generators_UDPReceiver_hh
2 #define artdaq_ots_Generators_UDPReceiver_hh
3 
4 // The UDP Receiver class receives UDP data from an otsdaq application and
5 // puts that data into UDPFragments for further ARTDAQ analysis.
6 //
7 // It currently assumes two things to be true:
8 // 1. The first word of the UDP packet is an 8-bit flag with information
9 // about the status of the sender
10 // 2. The second word is an 8-bit sequence ID, used for detecting
11 // dropped UDP datagrams
12 
13 // Some C++ conventions used:
14 
15 // -Append a "_" to every private member function and variable
16 
17 #include "artdaq-core/Data/Fragment.hh"
18 #include "artdaq/Generators/CommandableFragmentGenerator.hh"
19 #include "fhiclcpp/fwd.h"
20 
21 #include <arpa/inet.h>
22 #include <netinet/in.h>
23 #include <sys/socket.h>
24 #include <sys/types.h>
25 #include <unistd.h>
26 
27 #include <array>
28 #include <atomic>
29 #include <chrono>
30 #include <list>
31 #include <mutex>
32 #include <queue>
33 #include <thread>
34 
35 namespace ots
36 {
37 enum class CommandType : uint8_t
38 {
39  Read = 0,
40  Write = 1,
41  Start_Burst = 2,
42  Stop_Burst = 3,
43 };
44 
45 enum class ReturnCode : uint8_t
46 {
47  Read = 0,
48  First = 1,
49  Middle = 2,
50  Last = 3,
51 };
52 
53 enum class DataType : uint8_t
54 {
55  Raw = 0,
56  JSON = 1,
57  String = 2,
58 };
59 
61 {
62  CommandType type;
63  uint8_t dataSize;
64  uint64_t address;
65  uint64_t data[182];
66 };
67 
68 typedef std::string packetBuffer_t;
69 typedef std::list<packetBuffer_t> packetBuffer_list_t;
70 
71 class UDPReceiver : public artdaq::CommandableFragmentGenerator
72 {
73  public:
74  explicit UDPReceiver(fhicl::ParameterSet const& ps);
75  virtual ~UDPReceiver();
76 
77  protected:
78  // The "getNext_" function is used to implement user-specific
79  // functionality; it's a mandatory override of the pure virtual
80  // getNext_ function declared in CommandableFragmentGenerator
81 
82  bool getNext_(artdaq::FragmentPtrs& output) override;
83  void start(void) override;
84  virtual void start_();
85  virtual void stop(void) override;
86  virtual void stopNoMutex(void) override;
87 
88  virtual void ProcessData_(artdaq::FragmentPtrs& output, size_t totalSize);
89 
90  DataType getDataType(uint8_t byte) { return static_cast<DataType>((byte & 0xF0) >> 4); }
91  ReturnCode getReturnCode(uint8_t byte) { return static_cast<ReturnCode>(byte & 0xF); }
92  void send(CommandType flag);
93 
94  packetBuffer_list_t packetBuffers_;
95 
96  bool rawOutput_;
97  std::string rawPath_;
98 
99  // FHiCL-configurable variables. Note that the C++ variable names
100  // are the FHiCL variable names with a "_" appended
101 
102  int dataport_;
103  std::string ip_;
104  int rcvbuf_;
105 
106  // The packet number of the next packet. Used to discover dropped packets
107  uint8_t expectedPacketNumber_;
108 
109  // Socket parameters
110  struct sockaddr_in si_data_;
111  int datasocket_;
112  bool sendCommands_;
113 
114  private:
115  void receiveLoop_();
116  bool isTimerExpired_();
117 
118  std::unique_ptr<std::thread> receiverThread_;
119  std::mutex receiveBufferLock_;
120  packetBuffer_list_t receiveBuffers_;
121 
122  bool fakeDataMode_;
123 
124  // Number of milliseconds per fragment
125  double fragmentWindow_;
126  std::chrono::high_resolution_clock::time_point lastFrag_;
127 };
128 } // namespace ots
129 
130 #endif /* artdaq_demo_Generators_ToySimulator_hh */