tdaq-develop-2025-02-12
CircularBuffer.h
1 #ifndef _ots_CircularBuffer_h_
2 #define _ots_CircularBuffer_h_
3 
4 #include "otsdaq/DataManager/BufferImplementation.h"
5 #include "otsdaq/DataManager/CircularBufferBase.h"
6 
7 #include "otsdaq/Macros/CoutMacros.h"
8 #include "otsdaq/MessageFacility/MessageFacility.h"
9 
10 #include <atomic>
11 #include <iostream>
12 #include <map>
13 #include <string>
14 
15 namespace ots
16 {
17 template<class D, class H>
19 {
20  public:
21  CircularBuffer(const std::string& dataBufferId);
22  virtual ~CircularBuffer(void);
23 
24  void reset(void);
25  void resetConsumerList(void);
26  bool isEmpty(void) const;
27  unsigned int getTotalNumberOfSubBuffers(void) const;
28  unsigned int getProducerBufferSize(const std::string& producerID) const;
29 
30  inline int read(D& buffer, const std::string& consumerID)
31  {
32  H dummyHeader;
33  return read(buffer, dummyHeader, consumerID);
34  }
35 
36  inline int read(D& buffer, H& header, const std::string& consumerID)
37  {
38  setNextProducerBuffer(consumerID);
39  unsigned int readCounter = theBuffer_.size() - 1;
40  // ++megaCounter_;
41  // if(megaCounter_%10000000 == 0)
42  // std::cout << __COUT_HDR_FL__ << __COUT_HDR_FL__
43  // << "Consumer: " << consumerID
44  // << " Reading producer: " << lastReadBuffer_[consumerID]->first
45  // << " Buffer empty? " << lastReadBuffer_[consumerID]->second.isEmpty()
46  // << " written buffers: " <<
47  // lastReadBuffer_[consumerID]->second.numberOfWrittenBuffers()
48  // << std::endl;
49  int readReturnVal;
50  while((readReturnVal = lastReadBuffer_[consumerID]->second.read(
51  buffer, header, consumerID)) < 0 &&
52  readCounter > 0)
53  {
54  setNextProducerBuffer(consumerID);
55  --readCounter;
56  }
57  return readReturnVal;
58  }
59 
60  int read(D*& buffer, H*& header, const std::string& consumerID)
61  {
62  setNextProducerBuffer(consumerID);
63  unsigned int readCounter = theBuffer_.size() - 1;
64  // ++megaCounter_;
65  // if(megaCounter_%10000000 == 0)
66  // std::cout << __COUT_HDR_FL__ << __COUT_HDR_FL__
67  // << "Consumer: " << consumerID
68  // << " Reading producer: " << lastReadBuffer_[consumerID]->first
69  // << " Buffer empty? " << lastReadBuffer_[consumerID]->second.isEmpty()
70  // << " written buffers: " <<
71  // lastReadBuffer_[consumerID]->second.numberOfWrittenBuffers()
72  // << std::endl;
73  int readReturnVal;
74  while((readReturnVal = lastReadBuffer_[consumerID]->second.read(
75  buffer, header, consumerID)) < 0 &&
76  readCounter > 0)
77  {
78  setNextProducerBuffer(consumerID);
79  --readCounter;
80  }
81  return readReturnVal;
82  }
83 
84  BufferImplementation<D, H>& getLastReadBuffer(const std::string& consumerID)
85  {
86  return lastReadBuffer_[consumerID]->second;
87  }
88  BufferImplementation<D, H>& getBuffer(const std::string& producerID)
89  {
90  // __COUTV__(producerID);
91  // __COUTV__(int(theBuffer_.find(producerID) == theBuffer_.end()));
92  return theBuffer_[producerID];
93  }
94 
95  // void unregisterConsumer (const std::string& consumerID);
96  // void unregisterProducer (const std::string& producerID);
97 
98  private:
99  std::map<std::string /*producer id*/,
100  BufferImplementation<D, H> /*one producer, many consumers*/>
101  theBuffer_;
102 
103  void registerProducer(const std::string& producerID,
104  unsigned int numberOfSubBuffers = 100);
105  void registerConsumer(const std::string& consumerID,
107 
108  void setNextProducerBuffer(const std::string& consumer);
109 
110  std::map<std::string /*consumer id*/,
111  /*iterator within producer map*/ typename std::
112  map<std::string, BufferImplementation<D, H>>::iterator>
113  lastReadBuffer_;
114 
115  std::map<std::string, CircularBufferBase::ConsumerPriority> consumers_;
116 };
117 #include "otsdaq/DataManager/CircularBuffer.icc"
118 
119 } // namespace ots
120 #endif
void reset(void)
This DOES NOT reset the consumer list.