1 #include "otsdaq-prepmodernization/DataProcessorPlugins/NimStreamConsumer.h"
3 #include "otsdaq/Macros/CoutMacros.h"
4 #include "otsdaq/Macros/ProcessorPluginMacros.h"
5 #include "otsdaq/MessageFacility/MessageFacility.h"
10 NimStreamConsumer::NimStreamConsumer(std::string supervisorApplicationUID,
11 std::string bufferUID,
12 std::string processorUID,
13 const ConfigurationTree& theXDAQContextConfigTree,
14 const std::string& configurationPath)
15 : WorkLoop(processorUID)
16 , VisualVConsumer(supervisorApplicationUID,
19 theXDAQContextConfigTree,
27 NimStreamConsumer::~NimStreamConsumer(
void) { timeline.clear(); }
42 bool NimStreamConsumer::workLoopThread(toolbox::task::WorkLoop* workLoop)
49 unsigned long long int data_int = 0;
51 __COUT__ << data_str.length() << __E__;
52 while((data_str.length() >= 64))
54 std::string word = data_str.substr(pos, 64);
55 __COUT__ <<
"Word Len: " << word.length() << __E__;
56 __COUT__ <<
"data_str len: " <<
sizeof(data_int) << __E__;
57 memcpy(&data_int, &word, 64);
59 newPt.timestamp = (data_int & 0x0000FFFFFFFF0000) >> 2;
63 newPt.y_0 = (data_int & 0x0000000000000010) >> 1;
64 newPt.y_1 = (data_int & 0x0000000000000020) >> 1;
65 newPt.y_2 = (data_int & 0x0000000000000040) >> 1;
66 newPt.y_3 = (data_int & 0x0000000000000080) >> 1;
67 timeline.push_back(newPt);
68 __COUT__ << newPt.timestamp << __E__;
70 data_str.erase(pos, pos + 64);
71 __COUT__ <<
"Erase Successful" << __E__;
76 return WorkLoop::continueWorkLoop_;
80 void NimStreamConsumer::fastRead(
void)
82 __MOUT__ << processorUID_ <<
" running!" << std::endl;
84 if(DataConsumer::read(dataP_, headerP_) < 0)
89 __MOUT__ << DataProcessor::processorUID_ <<
" UID: " << supervisorApplicationUID_
94 __MOUT__ <<
"Size fill: " << dataP_->length() << std::endl;
96 DataConsumer::setReadSubBuffer<std::string, std::map<std::string, std::string>>();
100 void NimStreamConsumer::slowRead(
void)
102 __MOUT__ << DataProcessor::processorUID_ <<
" running!" << std::endl;
104 if(DataConsumer::read(data_, header_) < 0)
111 __COUT__ <<
"Size of data_; " << data_.length()
112 <<
" Size of data_str: " << data_str.length() << __E__;
115 data_str = data_str + data_;
117 __MOUT__ << DataProcessor::processorUID_ <<
" UID: " << supervisorApplicationUID_
121 std::string NimStreamConsumer::getNext(std::map<std::string, std::string> args)
124 std::vector<timeline_pt> retPts;
125 int retCtr = stoi(args[
"count"]);
126 int timestamp = stoi(args[
"timestamp"]);
128 std::string retStr =
"{ ";
129 std::string y0Str =
"\" y0 \" : \"";
130 std::string y1Str =
"\" y1 \" : \"";
131 std::string y2Str =
"\" y2 \" : \"";
132 std::string y3Str =
"\" y3 \" : \"";
133 std::string tsStr =
"\" timestamp \" : \"";
136 if(timeline.size() > 0)
138 for(
const auto& cPt : timeline)
140 if(cPt.timestamp <= (timestamp + retCtr))
142 if(cPt.timestamp >= timestamp)
144 retPts.push_back(cPt);
152 for(
const auto& pt : retPts)
156 timeline_pt tmpPt = *(&pt + 1);
157 toTimestamp = tmpPt.timestamp;
158 __COUT__ << toTimestamp << __E__;
162 toTimestamp = timestamp + retCtr;
163 __COUT__ << toTimestamp << __E__;
166 for(
int k = pt.timestamp; k < toTimestamp; k++)
168 y0Str += std::to_string(pt.y_0);
169 y1Str += std::to_string(pt.y_1);
170 y2Str += std::to_string(pt.y_2);
171 y3Str += std::to_string(pt.y_3);
172 tsStr += std::to_string(pt.timestamp);
179 retStr += (
"{" + y0Str + y1Str + y2Str + y3Str + tsStr +
" }");
182 __COUT__ << retStr << __E__;
186 retStr =
"No Entries in timeline!";