1 #include "otsdaq/WorkLoopManager/WorkLoopManager.h"
2 #include "otsdaq/Macros/CoutMacros.h"
3 #include "otsdaq/MessageFacility/MessageFacility.h"
4 #include "otsdaq/SOAPUtilities/SOAPCommand.h"
5 #include "otsdaq/XmlUtilities/HttpXmlDocument.h"
7 #include <toolbox/task/WorkLoopFactory.h>
17 WorkLoopManager::WorkLoopManager(toolbox::task::ActionSignature* job) : cWorkLoopType_(
"waiting"), job_(job), requestNumber_(0), requestName_(job_->name())
19 __COUT__ <<
"Request name: " << requestName_ <<
" jobName: " << job_->name() << std::endl;
23 WorkLoopManager::~WorkLoopManager(
void) {}
26 HttpXmlDocument WorkLoopManager::processRequest(cgicc::Cgicc& cgi) {
return processRequest(composeWorkLoopName(requestNumber_++, cgi)); }
29 HttpXmlDocument WorkLoopManager::processRequest(
const xoap::MessageReference& message)
31 return processRequest(composeWorkLoopName(requestNumber_++, message), &message);
35 HttpXmlDocument WorkLoopManager::processRequest(std::string workLoopName,
const xoap::MessageReference* message)
38 std::stringstream requestNumberStream;
39 RequestNumber requestNumber = getWorkLoopRequestNumber(workLoopName);
40 requestNumberStream << requestNumber;
41 std::stringstream reportStream;
43 __COUT__ <<
"Processing request! WorkLoops: " << workLoops_.size() <<
" req: " << requestNumber << std::endl;
44 if(workLoops_.size() >= maxWorkLoops)
46 if(!removeTimedOutRequests())
48 reportStream <<
"Too many running requests (" << workLoops_.size() <<
"), try later!" << std::endl;
49 __COUT__ <<
"ERROR: " << reportStream.str() << std::endl;
51 xmlDocument.addTextElementToData(
"RequestStatus",
"ERROR");
52 xmlDocument.addTextElementToData(
"RequestName", getWorkLoopRequest(workLoopName));
53 xmlDocument.addTextElementToData(
"RequestNumber", requestNumberStream.str());
54 xmlDocument.addTextElementToData(
"ErrorReport", reportStream.str());
58 time(&(workLoops_[requestNumber].requestStartTime));
59 workLoops_[requestNumber].requestLastTimeChecked = workLoops_[requestNumber].requestStartTime;
60 workLoops_[requestNumber].request = getWorkLoopRequest(workLoopName);
61 workLoops_[requestNumber].result =
"EMPTY";
62 workLoops_[requestNumber].progress = 0;
63 workLoops_[requestNumber].done =
false;
64 workLoops_[requestNumber].workLoopName = workLoopName;
66 workLoops_[requestNumber].message = *message;
69 workLoops_[requestNumber].workLoop = toolbox::task::getWorkLoopFactory()->getWorkLoop(workLoopName, cWorkLoopType_);
71 catch(xcept::Exception& e)
73 reportStream <<
"Can't create workloop, try again." << std::endl;
74 __COUT__ <<
"ERROR: " << reportStream.str() << std::endl;
75 xmlDocument.addTextElementToData(
"RequestStatus",
"ERROR");
76 xmlDocument.addTextElementToData(
"RequestName", getWorkLoopRequest(workLoopName));
77 xmlDocument.addTextElementToData(
"RequestNumber", requestNumberStream.str());
78 xmlDocument.addTextElementToData(
"ErrorReport", reportStream.str());
79 removeWorkLoop(requestNumber);
85 workLoops_[requestNumber].workLoop->submit(job_);
87 catch(xcept::Exception& e)
89 reportStream <<
"Can't submit workloop job, try again." << std::endl;
90 __COUT__ <<
"ERROR: " << reportStream.str() << std::endl;
91 xmlDocument.addTextElementToData(
"RequestStatus",
"ERROR");
92 xmlDocument.addTextElementToData(
"RequestName", getWorkLoopRequest(workLoopName));
93 xmlDocument.addTextElementToData(
"RequestNumber", requestNumberStream.str());
94 xmlDocument.addTextElementToData(
"ErrorReport", reportStream.str());
95 removeWorkLoop(requestNumber);
100 workLoops_[requestNumber].workLoop->activate();
102 catch(xcept::Exception& e)
104 reportStream <<
"Can't activate workloop, try again." << std::endl;
105 __COUT__ <<
"ERROR: " << reportStream.str() << std::endl;
106 xmlDocument.addTextElementToData(
"RequestStatus",
"ERROR");
107 xmlDocument.addTextElementToData(
"RequestName", getWorkLoopRequest(workLoopName));
108 xmlDocument.addTextElementToData(
"RequestNumber", requestNumberStream.str());
109 xmlDocument.addTextElementToData(
"ErrorReport", reportStream.str());
110 removeWorkLoop(requestNumber);
114 __COUT__ <<
"SUCCESS: Request is being processed!" << std::endl;
116 xmlDocument.addTextElementToData(
"RequestStatus",
"SUCCESS");
117 xmlDocument.addTextElementToData(
"RequestName", getWorkLoopRequest(workLoopName));
118 xmlDocument.addTextElementToData(
"RequestNumber", requestNumberStream.str());
124 bool WorkLoopManager::report(toolbox::task::WorkLoop* workLoop, std::string result,
float progress,
bool status)
126 RequestNumber requestNumber = getWorkLoopRequestNumber(workLoop);
137 workLoops_[requestNumber].result = result;
138 workLoops_[requestNumber].progress = progress;
139 workLoops_[requestNumber].done = status;
144 xoap::MessageReference WorkLoopManager::getMessage(toolbox::task::WorkLoop* workLoop)
146 RequestNumber requestNumber = getWorkLoopRequestNumber(workLoop);
147 return workLoops_[requestNumber].message;
151 bool WorkLoopManager::getRequestResult(cgicc::Cgicc& cgi,
HttpXmlDocument& xmlDocument)
153 std::stringstream reportStream;
154 std::string requestNumberString = cgi.getElement(
"RequestNumber")->getValue();
155 RequestNumber requestNumber = strtoull(requestNumberString.c_str(), 0, 10);
157 __COUT__ <<
"Request: " << requestName_ <<
" RequestNumber=" << requestNumberString <<
" assigned # " << requestNumber << std::endl;
159 if(workLoops_.find(requestNumber) == workLoops_.end())
161 reportStream <<
"Can't find request " << requestNumber <<
" within the currently active " << workLoops_.size() <<
" requests!";
162 __COUT__ <<
"WARNING: " << reportStream.str() << std::endl;
163 xmlDocument.addTextElementToData(
"RequestStatus",
"WARNING");
164 xmlDocument.addTextElementToData(
"RequestName",
"Unknown");
165 xmlDocument.addTextElementToData(
"RequestNumber", requestNumberString);
166 xmlDocument.addTextElementToData(
"ErrorReport", reportStream.str());
170 if(!workLoops_[requestNumber].done)
172 reportStream <<
"Still processing request " << requestNumber;
173 __COUT__ <<
"WARNING: " << reportStream.str() << std::endl;
175 time(&(workLoops_[requestNumber].requestLastTimeChecked));
177 xmlDocument.addTextElementToData(
"RequestStatus",
"MESSAGE");
178 xmlDocument.addTextElementToData(
"RequestName", workLoops_[requestNumber].request);
179 xmlDocument.addTextElementToData(
"RequestNumber", requestNumberString);
180 xmlDocument.addTextElementToData(
"ErrorReport", reportStream.str());
181 std::stringstream progress;
182 progress << workLoops_[requestNumber].progress;
183 xmlDocument.addTextElementToData(
"RequestProgress", progress.str());
188 xmlDocument.addTextElementToData(
"RequestStatus", workLoops_[requestNumber].result);
189 xmlDocument.addTextElementToData(
"RequestName", workLoops_[requestNumber].request);
190 xmlDocument.addTextElementToData(
"RequestNumber", requestNumberString);
191 std::stringstream progress;
192 progress << workLoops_[requestNumber].progress;
193 xmlDocument.addTextElementToData(
"RequestProgress", progress.str());
195 removeWorkLoop(requestNumber);
200 bool WorkLoopManager::removeWorkLoop(toolbox::task::WorkLoop* workLoop) {
return removeWorkLoop(getWorkLoopRequestNumber(workLoop)); }
203 bool WorkLoopManager::removeWorkLoop(RequestNumber requestNumber)
205 if(workLoops_.find(requestNumber) == workLoops_.end())
207 __COUT__ <<
"WorkLoop " << requestNumber <<
" is not in the WorkLoops list!" << std::endl;
211 if(workLoops_[requestNumber].workLoop == 0)
213 __COUT__ <<
"WorkLoop " << requestNumber <<
" was not created at all!" << std::endl;
214 workLoops_.erase(requestNumber);
220 workLoops_[requestNumber].workLoop->cancel();
222 catch(xcept::Exception& e)
224 __COUT__ <<
"Can't cancel WorkLoop " << requestNumber << std::endl;
231 workLoops_[requestNumber].workLoop->remove(job_);
233 catch(xcept::Exception& e)
243 __COUT__ <<
"Deleting WorkLoop " << requestNumber << std::endl;
244 toolbox::task::getWorkLoopFactory()->removeWorkLoop(workLoops_[requestNumber].workLoopName,
247 workLoops_.erase(requestNumber);
252 bool WorkLoopManager::removeProcessedRequests(
void)
254 std::map<RequestNumber, WorkLoopStruct>::iterator it = workLoops_.begin();
255 while(it != workLoops_.end())
257 removeWorkLoop((it++)->first);
264 bool WorkLoopManager::removeTimedOutRequests(
void)
268 std::map<RequestNumber, WorkLoopStruct>::iterator it = workLoops_.begin();
269 __COUT__ <<
"Removing timed out " << std::endl;
270 for(; it != workLoops_.end(); it++)
271 if(it->second.done && difftime(now, it->second.requestLastTimeChecked) > timeOutInSeconds)
273 __COUT__ <<
"Removing timed out request #" << it->first <<
" after " << difftime(now, it->second.requestLastTimeChecked) <<
" seconds."
275 removeWorkLoop(it->first);
279 __COUT__ <<
"Done Removing timed out " << std::endl;
287 std::string WorkLoopManager::composeWorkLoopName(RequestNumber requestNumber, cgicc::Cgicc& cgi)
289 std::stringstream name;
290 name << requestNumber <<
"-" << cgi.getElement(requestName_)->getValue();
291 __COUT__ <<
"Request: " << requestName_ <<
" Value=" << cgi.getElement(requestName_)->getValue() <<
" WLName: " << name.str() << std::endl;
296 std::string WorkLoopManager::composeWorkLoopName(RequestNumber requestNumber,
const xoap::MessageReference& message)
299 std::stringstream name;
300 name << requestNumber <<
"-" << soapCommand.getCommand();
301 __COUT__ <<
"Request: " << requestName_ <<
" Value=" << soapCommand.getCommand() <<
" WLName: " << name.str() << std::endl;
302 if(soapCommand.hasParameters())
305 char separator =
',';
306 for(SOAPParameters::const_iterator it = soapCommand.getParameters().begin(); it != soapCommand.getParameters().end(); it++)
308 if(it != soapCommand.getParameters().begin())
310 name << it->first <<
"|" << it->second;
317 //==============================================================================
318 WorkLoopManager::RequestNumber WorkLoopManager::getWorkLoopRequestNumber(toolbox::task::WorkLoop* workLoop)
320 return getWorkLoopRequestNumber(workLoop->getName());
323 //==============================================================================
324 WorkLoopManager::RequestNumber WorkLoopManager::getWorkLoopRequestNumber(std::string workLoopName)
326 workLoopName = workLoopName.substr(0, workLoopName.find('-
'));
327 return strtoull(workLoopName.c_str(), 0, 10);
330 //==============================================================================
331 std::string WorkLoopManager::getWorkLoopRequest(toolbox::task::WorkLoop* workLoop) { return getWorkLoopRequest(workLoop->getName()); }
333 //==============================================================================
334 std::string WorkLoopManager::getWorkLoopRequest(std::string workLoopName)
336 return workLoopName.substr(workLoopName.find('-
') + 1, workLoopName.find(std::string("/") + cWorkLoopType_) - workLoopName.find('-
') - 1);
339 //==============================================================================
340 void WorkLoopManager::translateWorkLoopName(toolbox::task::WorkLoop* workLoop, SOAPCommand& soapCommand)
342 std::string request = getWorkLoopRequest(workLoop);
343 if(request.find('<
') == std::string::npos)
345 __COUT__ << "Simple request" << std::endl;
346 soapCommand.setCommand(request);
350 size_t ltPosition = request.find('<
');
351 size_t gtPosition = request.find('>
');
352 size_t begin = ltPosition + 1;
354 size_t commaPosition;
356 soapCommand.setCommand(request.substr(0, ltPosition));
357 while((commaPosition = request.find(',
', begin)) != std::string::npos)
359 orPosition = request.find('|
', begin);
360 soapCommand.setParameter(request.substr(begin, orPosition - begin), request.substr(orPosition + 1, commaPosition - orPosition - 1));
361 begin = commaPosition + 1;
362 __COUT__ << "Comma: " << commaPosition << std::endl;
364 orPosition = request.find('|
', begin);
365 soapCommand.setParameter(request.substr(begin, orPosition - begin), request.substr(orPosition + 1, gtPosition - orPosition - 1));
367 __COUT__ << soapCommand << std::endl;
368 //__COUT__ << name.substr(name.find(',
')+1,name.find('/
')-name.find(',
')-1) <<