1 #include "otsdaq/WorkLoopManager/WorkLoopManager.h"
2 #include "otsdaq/Macros/CoutMacros.h"
3 #include "otsdaq/MessageFacility/MessageFacility.h"
4 #include "otsdaq/SOAPUtilities/SOAPCommand.h"
5 #include "otsdaq/XmlUtilities/HttpXmlDocument.h"
7 #include <toolbox/task/WorkLoopFactory.h>
17 WorkLoopManager::WorkLoopManager(toolbox::task::ActionSignature* job)
18 : cWorkLoopType_(
"waiting"), job_(job), requestNumber_(0), requestName_(job_->name())
20 __COUT__ <<
"Request name: " << requestName_ <<
" jobName: " << job_->name()
25 WorkLoopManager::~WorkLoopManager(
void) {}
36 return processRequest(composeWorkLoopName(requestNumber_++, message), &message);
41 const xoap::MessageReference* message)
44 std::stringstream requestNumberStream;
45 RequestNumber requestNumber = getWorkLoopRequestNumber(workLoopName);
46 requestNumberStream << requestNumber;
47 std::stringstream reportStream;
49 __COUT__ <<
"Processing request! WorkLoops: " << workLoops_.size()
50 <<
" req: " << requestNumber << std::endl;
51 if(workLoops_.size() >= maxWorkLoops)
53 if(!removeTimedOutRequests())
55 reportStream <<
"Too many running requests (" << workLoops_.size()
56 <<
"), try later!" << std::endl;
57 __COUT__ <<
"ERROR: " << reportStream.str() << std::endl;
59 xmlDocument.addTextElementToData(
"RequestStatus",
"ERROR");
60 xmlDocument.addTextElementToData(
"RequestName",
61 getWorkLoopRequest(workLoopName));
62 xmlDocument.addTextElementToData(
"RequestNumber", requestNumberStream.str());
63 xmlDocument.addTextElementToData(
"ErrorReport", reportStream.str());
67 time(&(workLoops_[requestNumber].requestStartTime));
68 workLoops_[requestNumber].requestLastTimeChecked =
69 workLoops_[requestNumber].requestStartTime;
70 workLoops_[requestNumber].request = getWorkLoopRequest(workLoopName);
71 workLoops_[requestNumber].result =
"EMPTY";
72 workLoops_[requestNumber].progress = 0;
73 workLoops_[requestNumber].done =
false;
74 workLoops_[requestNumber].workLoopName = workLoopName;
76 workLoops_[requestNumber].message = *message;
79 workLoops_[requestNumber].workLoop =
80 toolbox::task::getWorkLoopFactory()->getWorkLoop(workLoopName,
83 catch(xcept::Exception& e)
85 reportStream <<
"Can't create workloop, try again." << std::endl;
86 __COUT__ <<
"ERROR: " << reportStream.str() << std::endl;
87 xmlDocument.addTextElementToData(
"RequestStatus",
"ERROR");
88 xmlDocument.addTextElementToData(
"RequestName", getWorkLoopRequest(workLoopName));
89 xmlDocument.addTextElementToData(
"RequestNumber", requestNumberStream.str());
90 xmlDocument.addTextElementToData(
"ErrorReport", reportStream.str());
91 removeWorkLoop(requestNumber);
97 workLoops_[requestNumber].workLoop->submit(job_);
99 catch(xcept::Exception& e)
101 reportStream <<
"Can't submit workloop job, try again." << std::endl;
102 __COUT__ <<
"ERROR: " << reportStream.str() << std::endl;
103 xmlDocument.addTextElementToData(
"RequestStatus",
"ERROR");
104 xmlDocument.addTextElementToData(
"RequestName", getWorkLoopRequest(workLoopName));
105 xmlDocument.addTextElementToData(
"RequestNumber", requestNumberStream.str());
106 xmlDocument.addTextElementToData(
"ErrorReport", reportStream.str());
107 removeWorkLoop(requestNumber);
112 workLoops_[requestNumber].workLoop->activate();
114 catch(xcept::Exception& e)
116 reportStream <<
"Can't activate workloop, try again." << std::endl;
117 __COUT__ <<
"ERROR: " << reportStream.str() << std::endl;
118 xmlDocument.addTextElementToData(
"RequestStatus",
"ERROR");
119 xmlDocument.addTextElementToData(
"RequestName", getWorkLoopRequest(workLoopName));
120 xmlDocument.addTextElementToData(
"RequestNumber", requestNumberStream.str());
121 xmlDocument.addTextElementToData(
"ErrorReport", reportStream.str());
122 removeWorkLoop(requestNumber);
126 __COUT__ <<
"SUCCESS: Request is being processed!" << std::endl;
128 xmlDocument.addTextElementToData(
"RequestStatus",
"SUCCESS");
129 xmlDocument.addTextElementToData(
"RequestName", getWorkLoopRequest(workLoopName));
130 xmlDocument.addTextElementToData(
"RequestNumber", requestNumberStream.str());
136 bool WorkLoopManager::report(toolbox::task::WorkLoop* workLoop,
141 RequestNumber requestNumber = getWorkLoopRequestNumber(workLoop);
152 workLoops_[requestNumber].result = result;
153 workLoops_[requestNumber].progress = progress;
154 workLoops_[requestNumber].done = status;
159 xoap::MessageReference WorkLoopManager::getMessage(toolbox::task::WorkLoop* workLoop)
161 RequestNumber requestNumber = getWorkLoopRequestNumber(workLoop);
162 return workLoops_[requestNumber].message;
168 std::stringstream reportStream;
169 std::string requestNumberString = cgi.getElement(
"RequestNumber")->getValue();
170 RequestNumber requestNumber = strtoull(requestNumberString.c_str(), 0, 10);
172 __COUT__ <<
"Request: " << requestName_ <<
" RequestNumber=" << requestNumberString
173 <<
" assigned # " << requestNumber << std::endl;
175 if(workLoops_.find(requestNumber) == workLoops_.end())
177 reportStream <<
"Can't find request " << requestNumber
178 <<
" within the currently active " << workLoops_.size()
180 __COUT__ <<
"WARNING: " << reportStream.str() << std::endl;
181 xmlDocument.addTextElementToData(
"RequestStatus",
"WARNING");
182 xmlDocument.addTextElementToData(
"RequestName",
"UNKNOWN");
183 xmlDocument.addTextElementToData(
"RequestNumber", requestNumberString);
184 xmlDocument.addTextElementToData(
"ErrorReport", reportStream.str());
188 if(!workLoops_[requestNumber].done)
190 reportStream <<
"Still processing request " << requestNumber;
191 __COUT__ <<
"WARNING: " << reportStream.str() << std::endl;
193 time(&(workLoops_[requestNumber].requestLastTimeChecked));
195 xmlDocument.addTextElementToData(
"RequestStatus",
"MESSAGE");
196 xmlDocument.addTextElementToData(
"RequestName",
197 workLoops_[requestNumber].request);
198 xmlDocument.addTextElementToData(
"RequestNumber", requestNumberString);
199 xmlDocument.addTextElementToData(
"ErrorReport", reportStream.str());
200 std::stringstream progress;
201 progress << workLoops_[requestNumber].progress;
202 xmlDocument.addTextElementToData(
"RequestProgress", progress.str());
207 xmlDocument.addTextElementToData(
"RequestStatus", workLoops_[requestNumber].result);
208 xmlDocument.addTextElementToData(
"RequestName", workLoops_[requestNumber].request);
209 xmlDocument.addTextElementToData(
"RequestNumber", requestNumberString);
210 std::stringstream progress;
211 progress << workLoops_[requestNumber].progress;
212 xmlDocument.addTextElementToData(
"RequestProgress", progress.str());
214 removeWorkLoop(requestNumber);
219 bool WorkLoopManager::removeWorkLoop(toolbox::task::WorkLoop* workLoop)
221 return removeWorkLoop(getWorkLoopRequestNumber(workLoop));
225 bool WorkLoopManager::removeWorkLoop(RequestNumber requestNumber)
227 if(workLoops_.find(requestNumber) == workLoops_.end())
229 __COUT__ <<
"WorkLoop " << requestNumber <<
" is not in the WorkLoops list!"
234 if(workLoops_[requestNumber].workLoop == 0)
236 __COUT__ <<
"WorkLoop " << requestNumber <<
" was not created at all!"
238 workLoops_.erase(requestNumber);
244 workLoops_[requestNumber].workLoop->cancel();
246 catch(xcept::Exception& e)
248 __COUT__ <<
"Can't cancel WorkLoop " << requestNumber << std::endl;
255 workLoops_[requestNumber].workLoop->remove(job_);
257 catch(xcept::Exception& e)
267 __COUT__ <<
"Deleting WorkLoop " << requestNumber << std::endl;
268 toolbox::task::getWorkLoopFactory()->removeWorkLoop(
269 workLoops_[requestNumber].workLoopName,
272 workLoops_.erase(requestNumber);
277 bool WorkLoopManager::removeProcessedRequests(
void)
279 std::map<RequestNumber, WorkLoopStruct>::iterator it = workLoops_.begin();
280 while(it != workLoops_.end())
282 removeWorkLoop((it++)->first);
289 bool WorkLoopManager::removeTimedOutRequests(
void)
293 std::map<RequestNumber, WorkLoopStruct>::iterator it = workLoops_.begin();
294 __COUT__ <<
"Removing timed out " << std::endl;
295 for(; it != workLoops_.end(); it++)
296 if(it->second.done &&
297 difftime(now, it->second.requestLastTimeChecked) > timeOutInSeconds)
299 __COUT__ <<
"Removing timed out request #" << it->first <<
" after "
300 << difftime(now, it->second.requestLastTimeChecked) <<
" seconds."
302 removeWorkLoop(it->first);
306 __COUT__ <<
"Done Removing timed out " << std::endl;
314 std::string WorkLoopManager::composeWorkLoopName(RequestNumber requestNumber,
317 std::stringstream name;
318 name << requestNumber <<
"-" << cgi.getElement(requestName_)->getValue();
319 __COUT__ <<
"Request: " << requestName_
320 <<
" Value=" << cgi.getElement(requestName_)->getValue()
321 <<
" WLName: " << name.str() << std::endl;
326 std::string WorkLoopManager::composeWorkLoopName(RequestNumber requestNumber,
327 const xoap::MessageReference& message)
330 std::stringstream name;
331 name << requestNumber <<
"-" << soapCommand.getCommand();
332 __COUT__ <<
"Request: " << requestName_ <<
" Value=" << soapCommand.getCommand()
333 <<
" WLName: " << name.str() << std::endl;
334 if(soapCommand.hasParameters())
337 char separator =
',';
338 for(SOAPParameters::const_iterator it = soapCommand.getParameters().begin();
339 it != soapCommand.getParameters().end();
342 if(it != soapCommand.getParameters().begin())
344 name << it->first <<
"|" << it->second;
351 //==============================================================================
352 WorkLoopManager::RequestNumber WorkLoopManager::getWorkLoopRequestNumber(
353 toolbox::task::WorkLoop* workLoop)
355 return getWorkLoopRequestNumber(workLoop->getName());
358 //==============================================================================
359 WorkLoopManager::RequestNumber WorkLoopManager::getWorkLoopRequestNumber(
360 std::string workLoopName)
362 workLoopName = workLoopName.substr(0, workLoopName.find('-
'));
363 return strtoull(workLoopName.c_str(), 0, 10);
366 //==============================================================================
367 std::string WorkLoopManager::getWorkLoopRequest(toolbox::task::WorkLoop* workLoop)
369 return getWorkLoopRequest(workLoop->getName());
372 //==============================================================================
373 std::string WorkLoopManager::getWorkLoopRequest(std::string workLoopName)
375 return workLoopName.substr(workLoopName.find('-
') + 1,
376 workLoopName.find(std::string("/") + cWorkLoopType_) -
377 workLoopName.find('-
') - 1);
380 //==============================================================================
381 void WorkLoopManager::translateWorkLoopName(toolbox::task::WorkLoop* workLoop,
382 SOAPCommand& soapCommand)
384 std::string request = getWorkLoopRequest(workLoop);
385 if(request.find('<
') == std::string::npos)
387 __COUT__ << "Simple request" << std::endl;
388 soapCommand.setCommand(request);
392 size_t ltPosition = request.find('<
');
393 size_t gtPosition = request.find('>
');
394 size_t begin = ltPosition + 1;
396 size_t commaPosition;
398 soapCommand.setCommand(request.substr(0, ltPosition));
399 while((commaPosition = request.find(',
', begin)) != std::string::npos)
401 orPosition = request.find('|
', begin);
402 soapCommand.setParameter(
403 request.substr(begin, orPosition - begin),
404 request.substr(orPosition + 1, commaPosition - orPosition - 1));
405 begin = commaPosition + 1;
406 __COUT__ << "Comma: " << commaPosition << std::endl;
408 orPosition = request.find('|
', begin);
409 soapCommand.setParameter(
410 request.substr(begin, orPosition - begin),
411 request.substr(orPosition + 1, gtPosition - orPosition - 1));
413 __COUT__ << soapCommand << std::endl;
414 //__COUT__ << name.substr(name.find(',
')+1,name.find('/
')-name.find(',
')-1) <<
HttpXmlDocument processRequest(cgicc::Cgicc &cgi)
bool getRequestResult(cgicc::Cgicc &cgi, HttpXmlDocument &xmldoc)
Getters.