otsdaq  v2_05_02_indev
BufferImplementation.h
1 #ifndef _ots_BufferImplementation_h_
2 #define _ots_BufferImplementation_h_
3 
4 #include "otsdaq/DataManager/CircularBufferBase.h"
5 #include "otsdaq/Macros/CoutMacros.h"
6 #include "otsdaq/Macros/StringMacros.h"
7 #include "otsdaq/MessageFacility/MessageFacility.h"
8 
9 #include <atomic>
10 #include <iostream>
11 #include <map>
12 #include <string>
13 #include <vector>
14 
15 namespace ots
16 {
17 template<class D, class H>
19 {
20  struct ConsumerStruct
21  {
22  ConsumerStruct() : priority_(CircularBufferBase::LowConsumerPriority), readPointer_(0), subBuffersStatus_(nullptr) {}
23 
24  CircularBufferBase::ConsumerPriority priority_;
25  int readPointer_;
26  std::atomic_bool* subBuffersStatus_; // Status of the Circular Buffer:
27  };
28 
29  public:
30  BufferImplementation(const std::string& producerName = "", unsigned int numberOfSubBuffers = 100);
33  virtual ~BufferImplementation(void);
34 
35  void init(void);
36  void reset(void);
37  void resetConsumerList(void);
38  void registerConsumer(const std::string& name, CircularBufferBase::ConsumerPriority priority);
39  // void unregisterConsumer (const std::string& name);
40  int attachToEmptySubBuffer(D*& data, H*& header);
41  int setWrittenSubBuffer(void);
42  int write(const D& buffer, const H& header = H());
43  int read(D& buffer, const std::string& consumer);
44  int read(D& buffer, H& header, const std::string& consumer);
45  int read(D*& buffer, H*& header, const std::string& consumer);
46  int setReadSubBuffer(const std::string& consumer); // Must be used in conjunction
47  // with attachToEmptySubBuffer
48  // because it attach to the
49  // nextWritePointer buffer
50 
51  bool isEmpty(void) const;
52  unsigned int bufferSize(void) const { return numberOfSubBuffers_; }
53  unsigned int numberOfWrittenBuffers(void) const;
54 
55  const std::map<std::string, ConsumerStruct>& getConsumers(void) const { return consumers_; };
56 
57  void dumpStatus(std::ostream* out = (std::ostream*)&(std::cout)) const;
58 
59  protected:
60  const std::string mfSubject_;
61 
62  private:
63  enum
64  {
65  ErrorBufferFull = -1,
66  ErrorBufferLocked = -2,
67  ErrorBufferNotAvailable = -3,
68  ErrorReadBufferOutOfSync = -4
69  };
70 
71  const std::string producerName_;
72  unsigned int numberOfSubBuffers_;
73  std::map<std::string, ConsumerStruct> consumers_; // Pointers to the blocks which the consumers are reading
74  int writePointer_; // Pointer to the available free buffer, -1 means no free buffers!
75  std::atomic_bool* subBuffersStatus_; // Status of the Circular Buffer:
76  std::vector<H> headers_; // Buffer Header
77  std::vector<D> subBuffers_; // Buffers filled with data
78  const bool bufferFree_;
79 
80  unsigned int nextWritePointer(void);
81  unsigned int nextReadPointer(const std::string& consumer);
82  int getFreeBufferIndex(void); // can return -1 if there are no free buffers!
83  unsigned int getReadPointer(const std::string& consumer);
84  void setWritten(unsigned int subBuffer);
85  void setFree(unsigned int subBuffer, const std::string& consumer);
86  std::atomic_bool& isFree(unsigned int subBuffer) const;
87  std::atomic_bool& isFree(unsigned int subBuffer, const std::string& consumer) const;
88 
89  H& getHeader(unsigned int subBuffer);
90  D& getSubBuffer(unsigned int subBuffer);
91  void writeSubBuffer(unsigned int subBuffer, const D& buffer, const H& header);
92 };
93 #include "otsdaq/DataManager/BufferImplementation.icc"
94 
95 } // namespace ots
96 #endif