otsdaq_prepmodernization  v2_05_02_indev
NimStreamConsumer_processor.cc
1 #include "otsdaq-prepmodernization/DataProcessorPlugins/NimStreamConsumer.h"
2 
3 #include "otsdaq/Macros/CoutMacros.h"
4 #include "otsdaq/Macros/ProcessorPluginMacros.h"
5 #include "otsdaq/MessageFacility/MessageFacility.h"
6 
7 using namespace ots;
8 
9 //==============================================================================
10 NimStreamConsumer::NimStreamConsumer(std::string supervisorApplicationUID,
11  std::string bufferUID,
12  std::string processorUID,
13  const ConfigurationTree& theXDAQContextConfigTree,
14  const std::string& configurationPath)
15  : WorkLoop(processorUID)
16  , VisualVConsumer(supervisorApplicationUID,
17  bufferUID,
18  processorUID,
19  theXDAQContextConfigTree,
20  configurationPath)
21 //, DataConsumer(supervisorApplicationUID, bufferUID, processorUID, LowConsumerPriority)
22 //, Configurable(theXDAQContextConfigTree, configurationPath)
23 {
24 }
25 
26 //==============================================================================
27 NimStreamConsumer::~NimStreamConsumer(void) { timeline.clear(); }
28 
29 //==============================================================================
30 // void NimStreamConsumer::startProcessingData(std::string runNumber)
31 // {
32 // DataConsumer::startProcessingData(runNumber);
33 // }
34 //
35 // //==============================================================================
36 // void NimStreamConsumer::stopProcessingData(void)
37 // {
38 // DataConsumer::stopProcessingData();
39 // }
40 
41 //==============================================================================
42 bool NimStreamConsumer::workLoopThread(toolbox::task::WorkLoop* workLoop)
43 {
44  //__MOUT__ << DataProcessor::processorUID_ << " running, because workloop: " <<
45  // WorkLoop::continueWorkLoop_ << std::endl;
46  slowRead(); // fastRead();
47 
48  timeline_pt newPt;
49  unsigned long long int data_int = 0;
50  int pos = 0;
51  __COUT__ << data_str.length() << __E__;
52  while((data_str.length() >= 64))
53  {
54  std::string word = data_str.substr(pos, 64);
55  __COUT__ << "Word Len: " << word.length() << __E__;
56  __COUT__ << "data_str len: " << sizeof(data_int) << __E__;
57  memcpy(&data_int, &word, 64);
58  //__COUT__ << data_ << __E__;
59  newPt.timestamp = (data_int & 0x0000FFFFFFFF0000) >> 2; // One Quadword, Mask out
60  // header and state data
61  // #TODO VERIFY THAT
62  // TIMESTAMP IS 32b
63  newPt.y_0 = (data_int & 0x0000000000000010) >> 1; // Get y0
64  newPt.y_1 = (data_int & 0x0000000000000020) >> 1; // Get y1
65  newPt.y_2 = (data_int & 0x0000000000000040) >> 1; // Get y2
66  newPt.y_3 = (data_int & 0x0000000000000080) >> 1; // Get y3
67  timeline.push_back(newPt);
68  __COUT__ << newPt.timestamp << __E__;
69  //__COUT__ << data_int << __E__;
70  data_str.erase(pos, pos + 64);
71  __COUT__ << "Erase Successful" << __E__;
72  // pos += 64;
73  // data_int = 0;
74  }
75 
76  return WorkLoop::continueWorkLoop_;
77 }
78 
79 //==============================================================================
80 void NimStreamConsumer::fastRead(void)
81 {
82  __MOUT__ << processorUID_ << " running!" << std::endl;
83  // This is making a copy!!!
84  if(DataConsumer::read(dataP_, headerP_) < 0)
85  {
86  usleep(100);
87  return;
88  }
89  __MOUT__ << DataProcessor::processorUID_ << " UID: " << supervisorApplicationUID_
90  << std::endl;
91 
92  // HW emulator
93  // Burst Type | Sequence | 8B data
94  __MOUT__ << "Size fill: " << dataP_->length() << std::endl;
95 
96  DataConsumer::setReadSubBuffer<std::string, std::map<std::string, std::string>>();
97 }
98 
99 //==============================================================================
100 void NimStreamConsumer::slowRead(void)
101 {
102  __MOUT__ << DataProcessor::processorUID_ << " running!" << std::endl;
103  // This is making a copy!!!
104  if(DataConsumer::read(data_, header_) < 0)
105  {
106  usleep(1000);
107  return;
108  }
109  else
110  {
111  __COUT__ << "Size of data_; " << data_.length()
112  << " Size of data_str: " << data_str.length() << __E__;
113  //__COUT__ << "data_: " << data_ << __E__;
114  //__COUT__ << "data_str: " << data_str << __E__;
115  data_str = data_str + data_;
116  }
117  __MOUT__ << DataProcessor::processorUID_ << " UID: " << supervisorApplicationUID_
118  << std::endl;
119 }
120 //===============================================================================
121 std::string NimStreamConsumer::getNext(std::map<std::string, std::string> args)
122 {
123  timeline_pt closePt;
124  std::vector<timeline_pt> retPts;
125  int retCtr = stoi(args["count"]);
126  int timestamp = stoi(args["timestamp"]);
127  int toTimestamp;
128  std::string retStr = "{ ";
129  std::string y0Str = "\" y0 \" : \"";
130  std::string y1Str = "\" y1 \" : \"";
131  std::string y2Str = "\" y2 \" : \"";
132  std::string y3Str = "\" y3 \" : \"";
133  std::string tsStr = "\" timestamp \" : \"";
134  // Find the closest timestamp and return up to "count" timestamp points via vector for
135  // the VisualDataManager to interpret and send to the js frontend
136  if(timeline.size() > 0)
137  {
138  for(const auto& cPt : timeline)
139  {
140  if(cPt.timestamp <= (timestamp + retCtr))
141  {
142  if(cPt.timestamp >= timestamp)
143  {
144  retPts.push_back(cPt);
145  }
146  }
147  else
148  {
149  break;
150  }
151  }
152  for(const auto& pt : retPts)
153  {
154  try
155  {
156  timeline_pt tmpPt = *(&pt + 1);
157  toTimestamp = tmpPt.timestamp;
158  __COUT__ << toTimestamp << __E__;
159  }
160  catch(...)
161  {
162  toTimestamp = timestamp + retCtr;
163  __COUT__ << toTimestamp << __E__;
164  }
165 
166  for(int k = pt.timestamp; k < toTimestamp; k++)
167  {
168  y0Str += std::to_string(pt.y_0);
169  y1Str += std::to_string(pt.y_1);
170  y2Str += std::to_string(pt.y_2);
171  y3Str += std::to_string(pt.y_3);
172  tsStr += std::to_string(pt.timestamp);
173  }
174  y0Str += "\",";
175  y1Str += "\",";
176  y2Str += "\",";
177  y3Str += "\",";
178  tsStr += "\"";
179  retStr += ("{" + y0Str + y1Str + y2Str + y3Str + tsStr + " }");
180  };
181  retStr += "}";
182  __COUT__ << retStr << __E__;
183  }
184  else
185  {
186  retStr = "No Entries in timeline!";
187  }
188 
189  return retStr;
190 }
191 
192 DEFINE_OTS_PROCESSOR(NimStreamConsumer)