otsdaq  v2_05_02_indev
CircularBuffer.h
1 #ifndef _ots_CircularBuffer_h_
2 #define _ots_CircularBuffer_h_
3 
4 #include "otsdaq/DataManager/BufferImplementation.h"
5 #include "otsdaq/DataManager/CircularBufferBase.h"
6 
7 #include "otsdaq/Macros/CoutMacros.h"
8 #include "otsdaq/MessageFacility/MessageFacility.h"
9 
10 #include <atomic>
11 #include <iostream>
12 #include <map>
13 #include <string>
14 
15 namespace ots
16 {
17 template<class D, class H>
19 {
20  public:
21  CircularBuffer(const std::string& dataBufferId);
22  virtual ~CircularBuffer(void);
23 
24  void reset(void); // This DOES NOT reset the consumer list
25  void resetConsumerList(void);
26  bool isEmpty(void) const;
27  unsigned int getTotalNumberOfSubBuffers(void) const;
28  unsigned int getProducerBufferSize(const std::string& producerID) const;
29 
30  inline int read(D& buffer, const std::string& consumerID)
31  {
32  H dummyHeader;
33  return read(buffer, dummyHeader, consumerID);
34  }
35 
36  inline int read(D& buffer, H& header, const std::string& consumerID)
37  {
38  setNextProducerBuffer(consumerID);
39  unsigned int readCounter = theBuffer_.size() - 1;
40  // ++megaCounter_;
41  // if(megaCounter_%10000000 == 0)
42  // std::cout << __COUT_HDR_FL__ << __COUT_HDR_FL__
43  // << "Consumer: " << consumerID
44  // << " Reading producer: " << lastReadBuffer_[consumerID]->first
45  // << " Buffer empty? " << lastReadBuffer_[consumerID]->second.isEmpty()
46  // << " written buffers: " <<
47  // lastReadBuffer_[consumerID]->second.numberOfWrittenBuffers()
48  // << std::endl;
49  int readReturnVal;
50  while((readReturnVal = lastReadBuffer_[consumerID]->second.read(buffer, header, consumerID)) < 0 && readCounter > 0)
51  {
52  setNextProducerBuffer(consumerID);
53  --readCounter;
54  }
55  return readReturnVal;
56  }
57 
58  int read(D*& buffer, H*& header, const std::string& consumerID)
59  {
60  setNextProducerBuffer(consumerID);
61  unsigned int readCounter = theBuffer_.size() - 1;
62  // ++megaCounter_;
63  // if(megaCounter_%10000000 == 0)
64  // std::cout << __COUT_HDR_FL__ << __COUT_HDR_FL__
65  // << "Consumer: " << consumerID
66  // << " Reading producer: " << lastReadBuffer_[consumerID]->first
67  // << " Buffer empty? " << lastReadBuffer_[consumerID]->second.isEmpty()
68  // << " written buffers: " <<
69  // lastReadBuffer_[consumerID]->second.numberOfWrittenBuffers()
70  // << std::endl;
71  int readReturnVal;
72  while((readReturnVal = lastReadBuffer_[consumerID]->second.read(buffer, header, consumerID)) < 0 && readCounter > 0)
73  {
74  setNextProducerBuffer(consumerID);
75  --readCounter;
76  }
77  return readReturnVal;
78  }
79 
80  BufferImplementation<D, H>& getLastReadBuffer(const std::string& consumerID) { return lastReadBuffer_[consumerID]->second; }
81  BufferImplementation<D, H>& getBuffer(const std::string& producerID)
82  {
83  // __COUTV__(producerID);
84  // __COUTV__(int(theBuffer_.find(producerID) == theBuffer_.end()));
85  return theBuffer_[producerID];
86  }
87 
88  // void unregisterConsumer (const std::string& consumerID);
89  // void unregisterProducer (const std::string& producerID);
90 
91  private:
92  std::map<std::string /*producer id*/, BufferImplementation<D, H> /*one producer, many consumers*/> theBuffer_;
93 
94  void registerProducer(const std::string& producerID, unsigned int numberOfSubBuffers = 100);
95  void registerConsumer(const std::string& consumerID, CircularBufferBase::ConsumerPriority priority);
96 
97  void setNextProducerBuffer(const std::string& consumer);
98 
99  std::map<std::string /*consumer id*/,
100  /*iterator within producer map*/ typename std::map<std::string, BufferImplementation<D, H>>::iterator>
101  lastReadBuffer_;
102 
103  std::map<std::string, CircularBufferBase::ConsumerPriority> consumers_;
104 };
105 #include "otsdaq/DataManager/CircularBuffer.icc"
106 
107 } // namespace ots
108 #endif