tdaq-develop-2025-02-12
BufferImplementation.h
1 #ifndef _ots_BufferImplementation_h_
2 #define _ots_BufferImplementation_h_
3 
4 #include "otsdaq/DataManager/CircularBufferBase.h"
5 #include "otsdaq/Macros/CoutMacros.h"
6 #include "otsdaq/Macros/StringMacros.h"
7 #include "otsdaq/MessageFacility/MessageFacility.h"
8 
9 #include <atomic>
10 #include <iostream>
11 #include <map>
12 #include <string>
13 #include <vector>
14 
15 namespace ots
16 {
17 template<class D, class H>
19 {
20  struct ConsumerStruct
21  {
22  ConsumerStruct()
24  , readPointer_(0)
25  , subBuffersStatus_(nullptr)
26  {
27  }
28 
30  int readPointer_;
31  std::atomic_bool* subBuffersStatus_;
32  };
33 
34  public:
35  BufferImplementation(const std::string& producerName = "",
36  unsigned int numberOfSubBuffers = 100);
39  virtual ~BufferImplementation(void);
40 
41  void init(void);
42  void reset(void);
43  void resetConsumerList(void);
44  void registerConsumer(const std::string& name,
46  // void unregisterConsumer (const std::string& name);
47  int attachToEmptySubBuffer(D*& data, H*& header);
48  int setWrittenSubBuffer(void);
49  int write(const D& buffer, const H& header = H());
50  int read(D& buffer, const std::string& consumer);
51  int read(D& buffer, H& header, const std::string& consumer);
52  int read(D*& buffer, H*& header, const std::string& consumer);
53  int setReadSubBuffer(const std::string& consumer);
57 
58  bool isEmpty(void) const;
59  unsigned int bufferSize(void) const { return numberOfSubBuffers_; }
60  unsigned int numberOfWrittenBuffers(void) const;
61 
62  const std::map<std::string, ConsumerStruct>& getConsumers(void) const
63  {
64  return consumers_;
65  };
66 
67  void dumpStatus(std::ostream* out = (std::ostream*)&(std::cout)) const;
68 
69  protected:
70  const std::string mfSubject_;
71 
72  private:
73  enum
74  {
75  ErrorBufferFull = -1,
76  ErrorBufferLocked = -2,
77  ErrorBufferNotAvailable = -3,
78  ErrorReadBufferOutOfSync = -4
79  };
80 
81  const std::string producerName_;
82  unsigned int numberOfSubBuffers_;
83  std::map<std::string, ConsumerStruct>
84  consumers_;
85  int writePointer_;
86  std::atomic_bool* subBuffersStatus_;
87  std::vector<H> headers_;
88  std::vector<D> subBuffers_;
89  const bool bufferFree_;
90 
91  unsigned int nextWritePointer(void);
92  unsigned int nextReadPointer(const std::string& consumer);
93  int getFreeBufferIndex(void);
94  unsigned int getReadPointer(const std::string& consumer);
95  void setWritten(unsigned int subBuffer);
96  void setFree(unsigned int subBuffer, const std::string& consumer);
97  std::atomic_bool& isFree(unsigned int subBuffer) const;
98  std::atomic_bool& isFree(unsigned int subBuffer, const std::string& consumer) const;
99 
100  H& getHeader(unsigned int subBuffer);
101  D& getSubBuffer(unsigned int subBuffer);
102  void writeSubBuffer(unsigned int subBuffer, const D& buffer, const H& header);
103 };
104 #include "otsdaq/DataManager/BufferImplementation.icc"
105 
106 } // namespace ots
107 #endif
int setReadSubBuffer(const std::string &consumer)