otsdaq  v2_05_02_indev
WorkLoopManager.cc
1 #include "otsdaq/WorkLoopManager/WorkLoopManager.h"
2 #include "otsdaq/Macros/CoutMacros.h"
3 #include "otsdaq/MessageFacility/MessageFacility.h"
4 #include "otsdaq/SOAPUtilities/SOAPCommand.h"
5 #include "otsdaq/XmlUtilities/HttpXmlDocument.h"
6 
7 #include <toolbox/task/WorkLoopFactory.h>
8 
9 #include <cassert>
10 #include <cstdlib>
11 #include <iostream>
12 #include <sstream>
13 
14 using namespace ots;
15 
16 //==============================================================================
17 WorkLoopManager::WorkLoopManager(toolbox::task::ActionSignature* job) : cWorkLoopType_("waiting"), job_(job), requestNumber_(0), requestName_(job_->name())
18 {
19  __COUT__ << "Request name: " << requestName_ << " jobName: " << job_->name() << std::endl;
20 }
21 
22 //==============================================================================
23 WorkLoopManager::~WorkLoopManager(void) {}
24 
25 //==============================================================================
26 HttpXmlDocument WorkLoopManager::processRequest(cgicc::Cgicc& cgi) { return processRequest(composeWorkLoopName(requestNumber_++, cgi)); }
27 
28 //==============================================================================
29 HttpXmlDocument WorkLoopManager::processRequest(const xoap::MessageReference& message)
30 {
31  return processRequest(composeWorkLoopName(requestNumber_++, message), &message);
32 }
33 
34 //==============================================================================
35 HttpXmlDocument WorkLoopManager::processRequest(std::string workLoopName, const xoap::MessageReference* message)
36 {
37  HttpXmlDocument xmlDocument;
38  std::stringstream requestNumberStream;
39  RequestNumber requestNumber = getWorkLoopRequestNumber(workLoopName);
40  requestNumberStream << requestNumber;
41  std::stringstream reportStream;
42 
43  __COUT__ << "Processing request! WorkLoops: " << workLoops_.size() << " req: " << requestNumber << std::endl;
44  if(workLoops_.size() >= maxWorkLoops)
45  {
46  if(!removeTimedOutRequests())
47  {
48  reportStream << "Too many running requests (" << workLoops_.size() << "), try later!" << std::endl;
49  __COUT__ << "ERROR: " << reportStream.str() << std::endl;
50 
51  xmlDocument.addTextElementToData("RequestStatus", "ERROR");
52  xmlDocument.addTextElementToData("RequestName", getWorkLoopRequest(workLoopName));
53  xmlDocument.addTextElementToData("RequestNumber", requestNumberStream.str());
54  xmlDocument.addTextElementToData("ErrorReport", reportStream.str());
55  return xmlDocument;
56  }
57  }
58  time(&(workLoops_[requestNumber].requestStartTime));
59  workLoops_[requestNumber].requestLastTimeChecked = workLoops_[requestNumber].requestStartTime;
60  workLoops_[requestNumber].request = getWorkLoopRequest(workLoopName);
61  workLoops_[requestNumber].result = "EMPTY";
62  workLoops_[requestNumber].progress = 0;
63  workLoops_[requestNumber].done = false;
64  workLoops_[requestNumber].workLoopName = workLoopName;
65  if(message != 0)
66  workLoops_[requestNumber].message = *message;
67  try
68  {
69  workLoops_[requestNumber].workLoop = toolbox::task::getWorkLoopFactory()->getWorkLoop(workLoopName, cWorkLoopType_);
70  }
71  catch(xcept::Exception& e)
72  {
73  reportStream << "Can't create workloop, try again." << std::endl;
74  __COUT__ << "ERROR: " << reportStream.str() << std::endl;
75  xmlDocument.addTextElementToData("RequestStatus", "ERROR");
76  xmlDocument.addTextElementToData("RequestName", getWorkLoopRequest(workLoopName));
77  xmlDocument.addTextElementToData("RequestNumber", requestNumberStream.str());
78  xmlDocument.addTextElementToData("ErrorReport", reportStream.str());
79  removeWorkLoop(requestNumber);
80  return xmlDocument;
81  }
82 
83  try
84  {
85  workLoops_[requestNumber].workLoop->submit(job_);
86  }
87  catch(xcept::Exception& e)
88  {
89  reportStream << "Can't submit workloop job, try again." << std::endl;
90  __COUT__ << "ERROR: " << reportStream.str() << std::endl;
91  xmlDocument.addTextElementToData("RequestStatus", "ERROR");
92  xmlDocument.addTextElementToData("RequestName", getWorkLoopRequest(workLoopName));
93  xmlDocument.addTextElementToData("RequestNumber", requestNumberStream.str());
94  xmlDocument.addTextElementToData("ErrorReport", reportStream.str());
95  removeWorkLoop(requestNumber);
96  return xmlDocument;
97  }
98  try
99  {
100  workLoops_[requestNumber].workLoop->activate();
101  }
102  catch(xcept::Exception& e)
103  {
104  reportStream << "Can't activate workloop, try again." << std::endl;
105  __COUT__ << "ERROR: " << reportStream.str() << std::endl;
106  xmlDocument.addTextElementToData("RequestStatus", "ERROR");
107  xmlDocument.addTextElementToData("RequestName", getWorkLoopRequest(workLoopName));
108  xmlDocument.addTextElementToData("RequestNumber", requestNumberStream.str());
109  xmlDocument.addTextElementToData("ErrorReport", reportStream.str());
110  removeWorkLoop(requestNumber);
111  return xmlDocument;
112  }
113 
114  __COUT__ << "SUCCESS: Request is being processed!" << std::endl;
115 
116  xmlDocument.addTextElementToData("RequestStatus", "SUCCESS");
117  xmlDocument.addTextElementToData("RequestName", getWorkLoopRequest(workLoopName));
118  xmlDocument.addTextElementToData("RequestNumber", requestNumberStream.str());
119 
120  return xmlDocument;
121 }
122 
123 //==============================================================================
124 bool WorkLoopManager::report(toolbox::task::WorkLoop* workLoop, std::string result, float progress, bool status)
125 {
126  RequestNumber requestNumber = getWorkLoopRequestNumber(workLoop);
127 
128  /*
129  __COUT__ << "Reporting result for request " << getWorkLoopRequest(workLoop) << "
130  with req#: " << requestNumber << std::endl; if(workLoops_.find(requestNumber) ==
131  workLoops_.end())
132  {
133  __COUT__ << "This MUST NEVER happen. You must find out what is wrong!" <<
134  std::endl; assert(0);
135  }
136  */
137  workLoops_[requestNumber].result = result;
138  workLoops_[requestNumber].progress = progress;
139  workLoops_[requestNumber].done = status;
140  return true;
141 }
142 
143 //==============================================================================
144 xoap::MessageReference WorkLoopManager::getMessage(toolbox::task::WorkLoop* workLoop)
145 {
146  RequestNumber requestNumber = getWorkLoopRequestNumber(workLoop);
147  return workLoops_[requestNumber].message;
148 }
149 
150 //==============================================================================
151 bool WorkLoopManager::getRequestResult(cgicc::Cgicc& cgi, HttpXmlDocument& xmlDocument)
152 {
153  std::stringstream reportStream;
154  std::string requestNumberString = cgi.getElement("RequestNumber")->getValue();
155  RequestNumber requestNumber = strtoull(requestNumberString.c_str(), 0, 10);
156 
157  __COUT__ << "Request: " << requestName_ << " RequestNumber=" << requestNumberString << " assigned # " << requestNumber << std::endl;
158 
159  if(workLoops_.find(requestNumber) == workLoops_.end())
160  {
161  reportStream << "Can't find request " << requestNumber << " within the currently active " << workLoops_.size() << " requests!";
162  __COUT__ << "WARNING: " << reportStream.str() << std::endl;
163  xmlDocument.addTextElementToData("RequestStatus", "WARNING");
164  xmlDocument.addTextElementToData("RequestName", "Unknown");
165  xmlDocument.addTextElementToData("RequestNumber", requestNumberString);
166  xmlDocument.addTextElementToData("ErrorReport", reportStream.str());
167  return false;
168  }
169 
170  if(!workLoops_[requestNumber].done)
171  {
172  reportStream << "Still processing request " << requestNumber;
173  __COUT__ << "WARNING: " << reportStream.str() << std::endl;
174  // Resetting timer since there is a listener
175  time(&(workLoops_[requestNumber].requestLastTimeChecked));
176 
177  xmlDocument.addTextElementToData("RequestStatus", "MESSAGE");
178  xmlDocument.addTextElementToData("RequestName", workLoops_[requestNumber].request);
179  xmlDocument.addTextElementToData("RequestNumber", requestNumberString);
180  xmlDocument.addTextElementToData("ErrorReport", reportStream.str());
181  std::stringstream progress;
182  progress << workLoops_[requestNumber].progress;
183  xmlDocument.addTextElementToData("RequestProgress", progress.str());
184  return false;
185  }
186 
187  // Request done and ready to be retrieved so I can remove it from the queue
188  xmlDocument.addTextElementToData("RequestStatus", workLoops_[requestNumber].result);
189  xmlDocument.addTextElementToData("RequestName", workLoops_[requestNumber].request);
190  xmlDocument.addTextElementToData("RequestNumber", requestNumberString);
191  std::stringstream progress;
192  progress << workLoops_[requestNumber].progress;
193  xmlDocument.addTextElementToData("RequestProgress", progress.str());
194 
195  removeWorkLoop(requestNumber);
196  return true;
197 }
198 
199 //==============================================================================
200 bool WorkLoopManager::removeWorkLoop(toolbox::task::WorkLoop* workLoop) { return removeWorkLoop(getWorkLoopRequestNumber(workLoop)); }
201 
202 //==============================================================================
203 bool WorkLoopManager::removeWorkLoop(RequestNumber requestNumber)
204 {
205  if(workLoops_.find(requestNumber) == workLoops_.end())
206  {
207  __COUT__ << "WorkLoop " << requestNumber << " is not in the WorkLoops list!" << std::endl;
208  return false;
209  }
210 
211  if(workLoops_[requestNumber].workLoop == 0)
212  {
213  __COUT__ << "WorkLoop " << requestNumber << " was not created at all!" << std::endl;
214  workLoops_.erase(requestNumber);
215  return false;
216  }
217 
218  try
219  {
220  workLoops_[requestNumber].workLoop->cancel();
221  }
222  catch(xcept::Exception& e)
223  {
224  __COUT__ << "Can't cancel WorkLoop " << requestNumber << std::endl;
225  // diagService_->reportError("Failed to start Job Control monitoring. Exception:
226  // "+string(e.what()),DIAGWARN);
227  return false;
228  }
229  try
230  {
231  workLoops_[requestNumber].workLoop->remove(job_);
232  }
233  catch(xcept::Exception& e)
234  {
235  // ATTENTION!
236  // If the workloop job thread returns false, then the workloop job is
237  // automatically removed and it can't be removed again Leaving this try catch
238  // allows me to be general in the job threads so I can return true (repeat loop)
239  // or false ( loop only once) without crashing
240  // __COUT__ << "WARNING: Can't remove request WorkLoop: " << requestNumber <<
241  // std::endl;
242  }
243  __COUT__ << "Deleting WorkLoop " << requestNumber << std::endl;
244  toolbox::task::getWorkLoopFactory()->removeWorkLoop(workLoops_[requestNumber].workLoopName,
245  cWorkLoopType_); // delete workLoops_[requestNumber].workLoop; is done by the
246  // factory
247  workLoops_.erase(requestNumber);
248  return true;
249 }
250 
251 //==============================================================================
252 bool WorkLoopManager::removeProcessedRequests(void)
253 {
254  std::map<RequestNumber, WorkLoopStruct>::iterator it = workLoops_.begin();
255  while(it != workLoops_.end())
256  if(it->second.done)
257  removeWorkLoop((it++)->first);
258  else
259  ++it;
260  return true;
261 }
262 
263 //==============================================================================
264 bool WorkLoopManager::removeTimedOutRequests(void)
265 {
266  time_t now;
267  time(&now);
268  std::map<RequestNumber, WorkLoopStruct>::iterator it = workLoops_.begin();
269  __COUT__ << "Removing timed out " << std::endl;
270  for(; it != workLoops_.end(); it++)
271  if(it->second.done && difftime(now, it->second.requestLastTimeChecked) > timeOutInSeconds)
272  {
273  __COUT__ << "Removing timed out request #" << it->first << " after " << difftime(now, it->second.requestLastTimeChecked) << " seconds."
274  << std::endl;
275  removeWorkLoop(it->first);
276  return true; // since I return I don't care if the iterator pointer is
277  // screwed after I erase the element
278  }
279  __COUT__ << "Done Removing timed out " << std::endl;
280  return false;
281 }
282 
283 // WorkLoopName Format:
284 // RequestNumber-CommandToExecute<Argument1Name:Argument1Value,Argument2Name:Argument2Value...>
285 // Then the WorkLoop adds at the end /waiting
286 //==============================================================================
287 std::string WorkLoopManager::composeWorkLoopName(RequestNumber requestNumber, cgicc::Cgicc& cgi)
288 {
289  std::stringstream name;
290  name << requestNumber << "-" << cgi.getElement(requestName_)->getValue();
291  __COUT__ << "Request: " << requestName_ << " Value=" << cgi.getElement(requestName_)->getValue() << " WLName: " << name.str() << std::endl;
292  return name.str();
293 }
294 
295 //==============================================================================
296 std::string WorkLoopManager::composeWorkLoopName(RequestNumber requestNumber, const xoap::MessageReference& message)
297 {
298  SOAPCommand soapCommand(message);
299  std::stringstream name;
300  name << requestNumber << "-" << soapCommand.getCommand();
301  __COUT__ << "Request: " << requestName_ << " Value=" << soapCommand.getCommand() << " WLName: " << name.str() << std::endl;
302  if(soapCommand.hasParameters())
303  {
304  name << '<';
305  char separator = ',';
306  for(SOAPParameters::const_iterator it = soapCommand.getParameters().begin(); it != soapCommand.getParameters().end(); it++)
307  {
308  if(it != soapCommand.getParameters().begin())
309  name << separator;
310  name << it->first << "|" << it->second;
311  }
312  name << '>';
313  }
314  return name.str();
315 }
316 
317 //==============================================================================
318 WorkLoopManager::RequestNumber WorkLoopManager::getWorkLoopRequestNumber(toolbox::task::WorkLoop* workLoop)
319 {
320  return getWorkLoopRequestNumber(workLoop->getName());
321 }
322 
323 //==============================================================================
324 WorkLoopManager::RequestNumber WorkLoopManager::getWorkLoopRequestNumber(std::string workLoopName)
325 {
326  workLoopName = workLoopName.substr(0, workLoopName.find('-'));
327  return strtoull(workLoopName.c_str(), 0, 10);
328 }
329 
330 //==============================================================================
331 std::string WorkLoopManager::getWorkLoopRequest(toolbox::task::WorkLoop* workLoop) { return getWorkLoopRequest(workLoop->getName()); }
332 
333 //==============================================================================
334 std::string WorkLoopManager::getWorkLoopRequest(std::string workLoopName)
335 {
336  return workLoopName.substr(workLoopName.find('-') + 1, workLoopName.find(std::string("/") + cWorkLoopType_) - workLoopName.find('-') - 1);
337 }
338 
339 //==============================================================================
340 void WorkLoopManager::translateWorkLoopName(toolbox::task::WorkLoop* workLoop, SOAPCommand& soapCommand)
341 {
342  std::string request = getWorkLoopRequest(workLoop);
343  if(request.find('<') == std::string::npos)
344  {
345  __COUT__ << "Simple request" << std::endl;
346  soapCommand.setCommand(request);
347  }
348  else
349  {
350  size_t ltPosition = request.find('<');
351  size_t gtPosition = request.find('>');
352  size_t begin = ltPosition + 1;
353  size_t orPosition;
354  size_t commaPosition;
355 
356  soapCommand.setCommand(request.substr(0, ltPosition));
357  while((commaPosition = request.find(',', begin)) != std::string::npos)
358  {
359  orPosition = request.find('|', begin);
360  soapCommand.setParameter(request.substr(begin, orPosition - begin), request.substr(orPosition + 1, commaPosition - orPosition - 1));
361  begin = commaPosition + 1;
362  __COUT__ << "Comma: " << commaPosition << std::endl;
363  }
364  orPosition = request.find('|', begin);
365  soapCommand.setParameter(request.substr(begin, orPosition - begin), request.substr(orPosition + 1, gtPosition - orPosition - 1));
366  }
367  __COUT__ << soapCommand << std::endl;
368  //__COUT__ << name.substr(name.find(',')+1,name.find('/')-name.find(',')-1) <<
369  // std::endl;
370 }