tdaq-develop-2025-02-12
WorkLoopManager.cc
1 #include "otsdaq/WorkLoopManager/WorkLoopManager.h"
2 #include "otsdaq/Macros/CoutMacros.h"
3 #include "otsdaq/MessageFacility/MessageFacility.h"
4 #include "otsdaq/SOAPUtilities/SOAPCommand.h"
5 #include "otsdaq/XmlUtilities/HttpXmlDocument.h"
6 
7 #include <toolbox/task/WorkLoopFactory.h>
8 
9 #include <cassert>
10 #include <cstdlib>
11 #include <iostream>
12 #include <sstream>
13 
14 using namespace ots;
15 
16 //==============================================================================
17 WorkLoopManager::WorkLoopManager(toolbox::task::ActionSignature* job)
18  : cWorkLoopType_("waiting"), job_(job), requestNumber_(0), requestName_(job_->name())
19 {
20  __COUT__ << "Request name: " << requestName_ << " jobName: " << job_->name()
21  << std::endl;
22 }
23 
24 //==============================================================================
25 WorkLoopManager::~WorkLoopManager(void) {}
26 
27 //==============================================================================
29 {
30  return processRequest(composeWorkLoopName(requestNumber_++, cgi));
31 }
32 
33 //==============================================================================
34 HttpXmlDocument WorkLoopManager::processRequest(const xoap::MessageReference& message)
35 {
36  return processRequest(composeWorkLoopName(requestNumber_++, message), &message);
37 }
38 
39 //==============================================================================
40 HttpXmlDocument WorkLoopManager::processRequest(std::string workLoopName,
41  const xoap::MessageReference* message)
42 {
43  HttpXmlDocument xmlDocument;
44  std::stringstream requestNumberStream;
45  RequestNumber requestNumber = getWorkLoopRequestNumber(workLoopName);
46  requestNumberStream << requestNumber;
47  std::stringstream reportStream;
48 
49  __COUT__ << "Processing request! WorkLoops: " << workLoops_.size()
50  << " req: " << requestNumber << std::endl;
51  if(workLoops_.size() >= maxWorkLoops)
52  {
53  if(!removeTimedOutRequests())
54  {
55  reportStream << "Too many running requests (" << workLoops_.size()
56  << "), try later!" << std::endl;
57  __COUT__ << "ERROR: " << reportStream.str() << std::endl;
58 
59  xmlDocument.addTextElementToData("RequestStatus", "ERROR");
60  xmlDocument.addTextElementToData("RequestName",
61  getWorkLoopRequest(workLoopName));
62  xmlDocument.addTextElementToData("RequestNumber", requestNumberStream.str());
63  xmlDocument.addTextElementToData("ErrorReport", reportStream.str());
64  return xmlDocument;
65  }
66  }
67  time(&(workLoops_[requestNumber].requestStartTime));
68  workLoops_[requestNumber].requestLastTimeChecked =
69  workLoops_[requestNumber].requestStartTime;
70  workLoops_[requestNumber].request = getWorkLoopRequest(workLoopName);
71  workLoops_[requestNumber].result = "EMPTY";
72  workLoops_[requestNumber].progress = 0;
73  workLoops_[requestNumber].done = false;
74  workLoops_[requestNumber].workLoopName = workLoopName;
75  if(message != 0)
76  workLoops_[requestNumber].message = *message;
77  try
78  {
79  workLoops_[requestNumber].workLoop =
80  toolbox::task::getWorkLoopFactory()->getWorkLoop(workLoopName,
81  cWorkLoopType_);
82  }
83  catch(xcept::Exception& e)
84  {
85  reportStream << "Can't create workloop, try again." << std::endl;
86  __COUT__ << "ERROR: " << reportStream.str() << std::endl;
87  xmlDocument.addTextElementToData("RequestStatus", "ERROR");
88  xmlDocument.addTextElementToData("RequestName", getWorkLoopRequest(workLoopName));
89  xmlDocument.addTextElementToData("RequestNumber", requestNumberStream.str());
90  xmlDocument.addTextElementToData("ErrorReport", reportStream.str());
91  removeWorkLoop(requestNumber);
92  return xmlDocument;
93  }
94 
95  try
96  {
97  workLoops_[requestNumber].workLoop->submit(job_);
98  }
99  catch(xcept::Exception& e)
100  {
101  reportStream << "Can't submit workloop job, try again." << std::endl;
102  __COUT__ << "ERROR: " << reportStream.str() << std::endl;
103  xmlDocument.addTextElementToData("RequestStatus", "ERROR");
104  xmlDocument.addTextElementToData("RequestName", getWorkLoopRequest(workLoopName));
105  xmlDocument.addTextElementToData("RequestNumber", requestNumberStream.str());
106  xmlDocument.addTextElementToData("ErrorReport", reportStream.str());
107  removeWorkLoop(requestNumber);
108  return xmlDocument;
109  }
110  try
111  {
112  workLoops_[requestNumber].workLoop->activate();
113  }
114  catch(xcept::Exception& e)
115  {
116  reportStream << "Can't activate workloop, try again." << std::endl;
117  __COUT__ << "ERROR: " << reportStream.str() << std::endl;
118  xmlDocument.addTextElementToData("RequestStatus", "ERROR");
119  xmlDocument.addTextElementToData("RequestName", getWorkLoopRequest(workLoopName));
120  xmlDocument.addTextElementToData("RequestNumber", requestNumberStream.str());
121  xmlDocument.addTextElementToData("ErrorReport", reportStream.str());
122  removeWorkLoop(requestNumber);
123  return xmlDocument;
124  }
125 
126  __COUT__ << "SUCCESS: Request is being processed!" << std::endl;
127 
128  xmlDocument.addTextElementToData("RequestStatus", "SUCCESS");
129  xmlDocument.addTextElementToData("RequestName", getWorkLoopRequest(workLoopName));
130  xmlDocument.addTextElementToData("RequestNumber", requestNumberStream.str());
131 
132  return xmlDocument;
133 }
134 
135 //==============================================================================
136 bool WorkLoopManager::report(toolbox::task::WorkLoop* workLoop,
137  std::string result,
138  float progress,
139  bool status)
140 {
141  RequestNumber requestNumber = getWorkLoopRequestNumber(workLoop);
142 
143  /*
144  __COUT__ << "Reporting result for request " << getWorkLoopRequest(workLoop) << "
145  with req#: " << requestNumber << std::endl; if(workLoops_.find(requestNumber) ==
146  workLoops_.end())
147  {
148  __COUT__ << "This MUST NEVER happen. You must find out what is wrong!" <<
149  std::endl; assert(0);
150  }
151  */
152  workLoops_[requestNumber].result = result;
153  workLoops_[requestNumber].progress = progress;
154  workLoops_[requestNumber].done = status;
155  return true;
156 }
157 
158 //==============================================================================
159 xoap::MessageReference WorkLoopManager::getMessage(toolbox::task::WorkLoop* workLoop)
160 {
161  RequestNumber requestNumber = getWorkLoopRequestNumber(workLoop);
162  return workLoops_[requestNumber].message;
163 }
164 
165 //==============================================================================
166 bool WorkLoopManager::getRequestResult(cgicc::Cgicc& cgi, HttpXmlDocument& xmlDocument)
167 {
168  std::stringstream reportStream;
169  std::string requestNumberString = cgi.getElement("RequestNumber")->getValue();
170  RequestNumber requestNumber = strtoull(requestNumberString.c_str(), 0, 10);
171 
172  __COUT__ << "Request: " << requestName_ << " RequestNumber=" << requestNumberString
173  << " assigned # " << requestNumber << std::endl;
174 
175  if(workLoops_.find(requestNumber) == workLoops_.end())
176  {
177  reportStream << "Can't find request " << requestNumber
178  << " within the currently active " << workLoops_.size()
179  << " requests!";
180  __COUT__ << "WARNING: " << reportStream.str() << std::endl;
181  xmlDocument.addTextElementToData("RequestStatus", "WARNING");
182  xmlDocument.addTextElementToData("RequestName", "UNKNOWN");
183  xmlDocument.addTextElementToData("RequestNumber", requestNumberString);
184  xmlDocument.addTextElementToData("ErrorReport", reportStream.str());
185  return false;
186  }
187 
188  if(!workLoops_[requestNumber].done)
189  {
190  reportStream << "Still processing request " << requestNumber;
191  __COUT__ << "WARNING: " << reportStream.str() << std::endl;
192  // Resetting timer since there is a listener
193  time(&(workLoops_[requestNumber].requestLastTimeChecked));
194 
195  xmlDocument.addTextElementToData("RequestStatus", "MESSAGE");
196  xmlDocument.addTextElementToData("RequestName",
197  workLoops_[requestNumber].request);
198  xmlDocument.addTextElementToData("RequestNumber", requestNumberString);
199  xmlDocument.addTextElementToData("ErrorReport", reportStream.str());
200  std::stringstream progress;
201  progress << workLoops_[requestNumber].progress;
202  xmlDocument.addTextElementToData("RequestProgress", progress.str());
203  return false;
204  }
205 
206  // Request done and ready to be retrieved so I can remove it from the queue
207  xmlDocument.addTextElementToData("RequestStatus", workLoops_[requestNumber].result);
208  xmlDocument.addTextElementToData("RequestName", workLoops_[requestNumber].request);
209  xmlDocument.addTextElementToData("RequestNumber", requestNumberString);
210  std::stringstream progress;
211  progress << workLoops_[requestNumber].progress;
212  xmlDocument.addTextElementToData("RequestProgress", progress.str());
213 
214  removeWorkLoop(requestNumber);
215  return true;
216 }
217 
218 //==============================================================================
219 bool WorkLoopManager::removeWorkLoop(toolbox::task::WorkLoop* workLoop)
220 {
221  return removeWorkLoop(getWorkLoopRequestNumber(workLoop));
222 }
223 
224 //==============================================================================
225 bool WorkLoopManager::removeWorkLoop(RequestNumber requestNumber)
226 {
227  if(workLoops_.find(requestNumber) == workLoops_.end())
228  {
229  __COUT__ << "WorkLoop " << requestNumber << " is not in the WorkLoops list!"
230  << std::endl;
231  return false;
232  }
233 
234  if(workLoops_[requestNumber].workLoop == 0)
235  {
236  __COUT__ << "WorkLoop " << requestNumber << " was not created at all!"
237  << std::endl;
238  workLoops_.erase(requestNumber);
239  return false;
240  }
241 
242  try
243  {
244  workLoops_[requestNumber].workLoop->cancel();
245  }
246  catch(xcept::Exception& e)
247  {
248  __COUT__ << "Can't cancel WorkLoop " << requestNumber << std::endl;
249  // diagService_->reportError("Failed to start Job Control monitoring. Exception:
250  // "+string(e.what()),DIAGWARN);
251  return false;
252  }
253  try
254  {
255  workLoops_[requestNumber].workLoop->remove(job_);
256  }
257  catch(xcept::Exception& e)
258  {
259  // ATTENTION!
260  // If the workloop job thread returns false, then the workloop job is
261  // automatically removed and it can't be removed again Leaving this try catch
262  // allows me to be general in the job threads so I can return true (repeat loop)
263  // or false ( loop only once) without crashing
264  // __COUT__ << "WARNING: Can't remove request WorkLoop: " << requestNumber <<
265  // std::endl;
266  }
267  __COUT__ << "Deleting WorkLoop " << requestNumber << std::endl;
268  toolbox::task::getWorkLoopFactory()->removeWorkLoop(
269  workLoops_[requestNumber].workLoopName,
270  cWorkLoopType_); // delete workLoops_[requestNumber].workLoop; is done by the
271  // factory
272  workLoops_.erase(requestNumber);
273  return true;
274 }
275 
276 //==============================================================================
277 bool WorkLoopManager::removeProcessedRequests(void)
278 {
279  std::map<RequestNumber, WorkLoopStruct>::iterator it = workLoops_.begin();
280  while(it != workLoops_.end())
281  if(it->second.done)
282  removeWorkLoop((it++)->first);
283  else
284  ++it;
285  return true;
286 }
287 
288 //==============================================================================
289 bool WorkLoopManager::removeTimedOutRequests(void)
290 {
291  time_t now;
292  time(&now);
293  std::map<RequestNumber, WorkLoopStruct>::iterator it = workLoops_.begin();
294  __COUT__ << "Removing timed out " << std::endl;
295  for(; it != workLoops_.end(); it++)
296  if(it->second.done &&
297  difftime(now, it->second.requestLastTimeChecked) > timeOutInSeconds)
298  {
299  __COUT__ << "Removing timed out request #" << it->first << " after "
300  << difftime(now, it->second.requestLastTimeChecked) << " seconds."
301  << std::endl;
302  removeWorkLoop(it->first);
303  return true; // since I return I don't care if the iterator pointer is
304  // screwed after I erase the element
305  }
306  __COUT__ << "Done Removing timed out " << std::endl;
307  return false;
308 }
309 
313 //==============================================================================
314 std::string WorkLoopManager::composeWorkLoopName(RequestNumber requestNumber,
315  cgicc::Cgicc& cgi)
316 {
317  std::stringstream name;
318  name << requestNumber << "-" << cgi.getElement(requestName_)->getValue();
319  __COUT__ << "Request: " << requestName_
320  << " Value=" << cgi.getElement(requestName_)->getValue()
321  << " WLName: " << name.str() << std::endl;
322  return name.str();
323 }
324 
325 //==============================================================================
326 std::string WorkLoopManager::composeWorkLoopName(RequestNumber requestNumber,
327  const xoap::MessageReference& message)
328 {
329  SOAPCommand soapCommand(message);
330  std::stringstream name;
331  name << requestNumber << "-" << soapCommand.getCommand();
332  __COUT__ << "Request: " << requestName_ << " Value=" << soapCommand.getCommand()
333  << " WLName: " << name.str() << std::endl;
334  if(soapCommand.hasParameters())
335  {
336  name << '<';
337  char separator = ',';
338  for(SOAPParameters::const_iterator it = soapCommand.getParameters().begin();
339  it != soapCommand.getParameters().end();
340  it++)
341  {
342  if(it != soapCommand.getParameters().begin())
343  name << separator;
344  name << it->first << "|" << it->second;
345  }
346  name << '>';
347  }
348  return name.str();
349 }
350 
351 //==============================================================================
352 WorkLoopManager::RequestNumber WorkLoopManager::getWorkLoopRequestNumber(
353  toolbox::task::WorkLoop* workLoop)
354 {
355  return getWorkLoopRequestNumber(workLoop->getName());
356 }
357 
358 //==============================================================================
359 WorkLoopManager::RequestNumber WorkLoopManager::getWorkLoopRequestNumber(
360  std::string workLoopName)
361 {
362  workLoopName = workLoopName.substr(0, workLoopName.find('-'));
363  return strtoull(workLoopName.c_str(), 0, 10);
364 }
365 
366 //==============================================================================
367 std::string WorkLoopManager::getWorkLoopRequest(toolbox::task::WorkLoop* workLoop)
368 {
369  return getWorkLoopRequest(workLoop->getName());
370 }
371 
372 //==============================================================================
373 std::string WorkLoopManager::getWorkLoopRequest(std::string workLoopName)
374 {
375  return workLoopName.substr(workLoopName.find('-') + 1,
376  workLoopName.find(std::string("/") + cWorkLoopType_) -
377  workLoopName.find('-') - 1);
378 }
379 
380 //==============================================================================
381 void WorkLoopManager::translateWorkLoopName(toolbox::task::WorkLoop* workLoop,
382  SOAPCommand& soapCommand)
383 {
384  std::string request = getWorkLoopRequest(workLoop);
385  if(request.find('<') == std::string::npos)
386  {
387  __COUT__ << "Simple request" << std::endl;
388  soapCommand.setCommand(request);
389  }
390  else
391  {
392  size_t ltPosition = request.find('<');
393  size_t gtPosition = request.find('>');
394  size_t begin = ltPosition + 1;
395  size_t orPosition;
396  size_t commaPosition;
397 
398  soapCommand.setCommand(request.substr(0, ltPosition));
399  while((commaPosition = request.find(',', begin)) != std::string::npos)
400  {
401  orPosition = request.find('|', begin);
402  soapCommand.setParameter(
403  request.substr(begin, orPosition - begin),
404  request.substr(orPosition + 1, commaPosition - orPosition - 1));
405  begin = commaPosition + 1;
406  __COUT__ << "Comma: " << commaPosition << std::endl;
407  }
408  orPosition = request.find('|', begin);
409  soapCommand.setParameter(
410  request.substr(begin, orPosition - begin),
411  request.substr(orPosition + 1, gtPosition - orPosition - 1));
412  }
413  __COUT__ << soapCommand << std::endl;
414  //__COUT__ << name.substr(name.find(',')+1,name.find('/')-name.find(',')-1) <<
415  // std::endl;
416 }
HttpXmlDocument processRequest(cgicc::Cgicc &cgi)
bool getRequestResult(cgicc::Cgicc &cgi, HttpXmlDocument &xmldoc)
Getters.