otsdaq_utilities  v2_05_02_indev
ConsoleSupervisor.cc
1 #include "otsdaq-utilities/Console/ConsoleSupervisor.h"
2 #include <xdaq/NamespaceURI.h>
3 #include "otsdaq/CgiDataUtilities/CgiDataUtilities.h"
4 #include "otsdaq/Macros/CoutMacros.h"
5 #include "otsdaq/MessageFacility/MessageFacility.h"
6 #include "otsdaq/NetworkUtilities/ReceiverSocket.h"
7 #include "otsdaq/XmlUtilities/HttpXmlDocument.h"
8 
9 #include <dirent.h> //for DIR
10 #include <sys/stat.h> //for mkdir
11 #include <fstream>
12 #include <iostream>
13 #include <string>
14 #include <thread> //for std::thread
15 
16 using namespace ots;
17 
18 // UDP Message Format:
19 // UDPMESSAGE|TIMESTAMP|SEQNUM|HOSTNAME|HOSTADDR|SEVERITY|CATEGORY|APPLICATION|PID|ITERATION|MODULE|(FILE|LINE)|MESSAGE
20 // FILE and LINE are only printed for s67+
21 
22 XDAQ_INSTANTIATOR_IMPL(ConsoleSupervisor)
23 
24 #define USER_CONSOLE_PREF_PATH \
25  std::string(__ENV__("SERVICE_DATA_PATH")) + "/ConsolePreferences/"
26 #define USERS_PREFERENCES_FILETYPE "pref"
27 
28 #define QUIET_CFG_FILE \
29  std::string(__ENV__("USER_DATA")) + \
30  "/MessageFacilityConfigurations/" \
31  "QuietForwarder.cfg"
32 
33 #define CONSOLE_SPECIAL_ERROR \
34  std::string("|30-Aug-2019 15:30:17 CDT|0|||Error|Console||-1||ConsoleSupervisor|") + \
35  std::string(__FILE__) + std::string("|") + std::to_string(__LINE__) + \
36  std::string("|")
37 #define CONSOLE_SPECIAL_WARNING \
38  std::string( \
39  "|30-Aug-2019 15:30:17 CDT|0|||Warning|Console||-1||ConsoleSupervisor|") + \
40  std::string(__FILE__) + std::string("|") + std::to_string(__LINE__) + \
41  std::string("|")
42 
43 #undef __MF_SUBJECT__
44 #define __MF_SUBJECT__ "Console"
45 
46 //==============================================================================
47 ConsoleSupervisor::ConsoleSupervisor(xdaq::ApplicationStub* stub)
48  : CoreSupervisorBase(stub), messageCount_(0), maxMessageCount_(100000)
49 {
50  __SUP_COUT__ << "Constructor started." << __E__;
51 
52  INIT_MF("." /*directory used is USER_DATA/LOG/.*/);
53 
54  // attempt to make directory structure (just in case)
55  mkdir(((std::string)USER_CONSOLE_PREF_PATH).c_str(), 0755);
56 
57  init();
58 
59  __SUP_COUT__ << "Constructor complete." << __E__;
60 }
61 
62 //==============================================================================
63 ConsoleSupervisor::~ConsoleSupervisor(void) { destroy(); }
64 //==============================================================================
65 void ConsoleSupervisor::init(void)
66 {
67  // start mf msg listener
68  std::thread(
69  [](ConsoleSupervisor* cs) {
70  ConsoleSupervisor::messageFacilityReceiverWorkLoop(cs);
71  },
72  this)
73  .detach();
74 } // end init()
75 
76 //==============================================================================
77 void ConsoleSupervisor::destroy(void)
78 {
79  // called by destructor
80 } // end destroy()
81 
82 //==============================================================================
83 // messageFacilityReceiverWorkLoop ~~
84 // Thread for printing Message Facility messages without decorations
85 // Note: Uses std::mutex to avoid conflict with reading thread.
86 void ConsoleSupervisor::messageFacilityReceiverWorkLoop(ConsoleSupervisor* cs) try
87 {
88  __COUT__ << std::endl;
89 
90  std::string configFile = QUIET_CFG_FILE;
91  FILE* fp = fopen(configFile.c_str(), "r");
92  if(!fp)
93  {
94  __SS__ << "File with port info could not be loaded: " << QUIET_CFG_FILE
95  << std::endl;
96  __COUT__ << "\n" << ss.str();
97  __SS_THROW__;
98  }
99  char tmp[100];
100  fgets(tmp, 100, fp); // receive port (ignore)
101  fgets(tmp, 100, fp); // destination port *** used here ***
102  int myport;
103  sscanf(tmp, "%*s %d", &myport);
104 
105  fgets(tmp, 100, fp); // destination ip *** used here ***
106  char myip[100];
107  sscanf(tmp, "%*s %s", myip);
108  fclose(fp);
109 
110  ReceiverSocket rsock(myip, myport); // Take Port from Configuration
111  try
112  {
113  rsock.initialize(0xA00000 /*socketReceiveBufferSize*/);
114  }
115  catch(...)
116  {
117  // lockout the messages array for the remainder of the scope
118  // this guarantees the reading thread can safely access the messages
119  std::lock_guard<std::mutex> lock(cs->messageMutex_);
120 
121  // NOTE: if we do not want this to be fatal, do not throw here, just print out
122 
123  if(1) // generate special message and throw for failed socket
124  {
125  __SS__ << "FATAL Console error. Could not initialize socket on port "
126  << myport
127  << ". Perhaps the port is already in use? Check for multiple stale "
128  "instances of otsdaq processes, or notify admins."
129  << " Multiple instances of otsdaq on the same node should be "
130  "possible, but port numbers must be unique."
131  << std::endl;
132  __SS_THROW__;
133  }
134 
135  // generate special message to indicate failed socket
136  __SS__ << "FATAL Console error. Could not initialize socket on port " << myport
137  << ". Perhaps it is already in use? Exiting Console receive loop."
138  << std::endl;
139  __COUT__ << ss.str();
140 
141  cs->messages_.emplace_back(CONSOLE_SPECIAL_ERROR + ss.str(), cs->messageCount_++);
142 
143  if(cs->messages_.size() > cs->maxMessageCount_)
144  {
145  cs->messages_.erase(cs->messages_.begin());
146  }
147 
148  return;
149  }
150 
151  std::string buffer;
152  int i = 0;
153  int heartbeatCount = 0;
154  int selfGeneratedMessageCount = 0;
155 
156  std::map<unsigned int, unsigned int>
157  sourceLastSequenceID; // map from sourceID to
158  // lastSequenceID to
159  // identify missed messages
160  long long newSourceId;
161  unsigned int newSequenceId;
162 
163  // force a starting message
164  __MOUT__ << "DEBUG messages look like this." << __E__;
165 
166  while(1)
167  {
168  // if receive succeeds display message
169 
170  //__COUTV__(i);
171 
172  if(rsock.receive(
173  buffer, 1 /*timeoutSeconds*/, 0 /*timeoutUSeconds*/, false /*verbose*/) !=
174  -1)
175  {
176  // use 1-byte "ping" to keep socket alive
177  if(buffer.size() == 1)
178  {
179  // std::cout << "Ping!" << __E__;
180  continue;
181  }
182 
183  if(i != 200)
184  {
185  __COUT__ << "Console has first message." << __E__;
186  i = 200; // mark so things are good for all time. (this indicates things
187  // are configured to be sent here)
188 
189  __MOUT_INFO__ << "INFO messages look like this." << __E__;
190  __MOUT_WARN__ << "WARNING messages look like this." << __E__;
191  __MOUT_ERR__ << "ERROR messages look like this." << __E__;
192 
193  // //to debug special packets
194  // __SS__ << "???";
195  // cs->messages_[cs->writePointer_].set(CONSOLE_SPECIAL_ERROR
196  //+ ss.str(),
197  // cs->messageCount_++);
198  //
199  // if(++cs->writePointer_ == cs->messages_.size()) //handle
200  // wrap-around cs->writePointer_ = 0;
201  }
202 
203  if(selfGeneratedMessageCount)
204  --selfGeneratedMessageCount; // decrement internal message count
205  else // reset heartbeat if external messages are coming through
206  heartbeatCount = 0;
207 
208  //__COUT__ << buffer << std::endl;
209 
210  // lockout the messages array for the remainder of the scope
211  // this guarantees the reading thread can safely access the messages
212  std::lock_guard<std::mutex> lock(cs->messageMutex_);
213 
214  cs->messages_.emplace_back(buffer, cs->messageCount_++);
215 
216  // check if sequence ID is out of order
217  newSourceId = cs->messages_.back().getSourceIDAsNumber();
218  newSequenceId = cs->messages_.back().getSequenceIDAsNumber();
219 
220  //__COUT__ << "newSourceId: " << newSourceId << std::endl;
221  //__COUT__ << "newSequenceId: " << newSequenceId << std::endl;
222 
223  if(newSourceId != -1 &&
224  sourceLastSequenceID.find(newSourceId) !=
225  sourceLastSequenceID.end() && // ensure not first packet received
226  ((newSequenceId == 0 && sourceLastSequenceID[newSourceId] !=
227  (unsigned int)-1) || // wrap around case
228  newSequenceId !=
229  sourceLastSequenceID[newSourceId] + 1)) // normal sequence case
230  {
231  // missed some messages!
232  __SS__ << "Missed packets from " << cs->messages_.back().getSource()
233  << "! Sequence IDs " << sourceLastSequenceID[newSourceId] + 1
234  << " to " << newSequenceId - 1 << "." << __E__;
235  std::cout << ss.str();
236 
237  // generate special message to indicate missed packets
238  cs->messages_.emplace_back(CONSOLE_SPECIAL_WARNING + ss.str(),
239  cs->messageCount_++);
240  }
241 
242  // save the new last sequence ID
243  sourceLastSequenceID[newSourceId] = newSequenceId;
244 
245  while(cs->messages_.size() > 0 && cs->messages_.size() > cs->maxMessageCount_)
246  {
247  cs->messages_.erase(cs->messages_.begin());
248  }
249  }
250  else
251  {
252  if(i < 120) // if nothing received for 120 seconds, then something is wrong
253  // with Console configuration
254  ++i;
255 
256  sleep(1); // sleep one second, if timeout
257 
258  // every 60 heartbeatCount (2 seconds each = 1 sleep and 1 timeout) print a
259  // heartbeat message
260  if(i != 200 || // show first message, if not already a message
261  (heartbeatCount < 60 * 5 &&
262  heartbeatCount % 60 == 59)) // every ~2 min for first 5 messages
263  {
264  ++selfGeneratedMessageCount; // increment internal message count
265  __MOUT__ << "Console is alive and waiting... (if no messages, next "
266  "heartbeat is in two minutes)"
267  << std::endl;
268  }
269  else if(heartbeatCount % (60 * 30) == 59) // approx every hour
270  {
271  ++selfGeneratedMessageCount; // increment internal message count
272  __MOUT__ << "Console is alive and waiting a long time... (if no "
273  "messages, next heartbeat is in one hour)"
274  << std::endl;
275  }
276 
277  ++heartbeatCount;
278  }
279 
280  // if nothing received for 2 minutes seconds, then something is wrong with Console
281  // configuration after 5 seconds there is a self-send. Which will at least
282  // confirm configuration. OR if 5 generated messages and never cleared.. then
283  // the forwarding is not working.
284  if(i == 120 || selfGeneratedMessageCount == 5)
285  {
286  __COUTV__(i);
287  __COUTV__(selfGeneratedMessageCount);
288  __COUT__ << "No messages received at Console Supervisor. Exiting Console "
289  "messageFacilityReceiverWorkLoop"
290  << std::endl;
291  break; // assume something wrong, and break loop
292  }
293  }
294 
295 } // end messageFacilityReceiverWorkLoop()
296 catch(const std::runtime_error& e)
297 {
298  __COUT_ERR__ << "Error caught at Console Supervisor thread: " << e.what() << __E__;
299 }
300 catch(...)
301 {
302  __COUT_ERR__ << "Unknown error caught at Console Supervisor thread." << __E__;
303 }
304 
305 //==============================================================================
306 void ConsoleSupervisor::defaultPage(xgi::Input* /*in*/, xgi::Output* out)
307 {
308  __SUP_COUT__ << "ApplicationDescriptor LID="
309  << getApplicationDescriptor()->getLocalId() << std::endl;
310  *out << "<!DOCTYPE HTML><html lang='en'><frameset col='100%' row='100%'><frame "
311  "src='/WebPath/html/Console.html?urn="
312  << getApplicationDescriptor()->getLocalId() << "'></frameset></html>";
313 } // end defaultPage()
314 
315 //==============================================================================
316 // forceSupervisorPropertyValues
317 // override to force supervisor property values (and ignore user settings)
318 void ConsoleSupervisor::forceSupervisorPropertyValues()
319 {
320  CorePropertySupervisorBase::setSupervisorProperty(
321  CorePropertySupervisorBase::SUPERVISOR_PROPERTIES.AutomatedRequestTypes,
322  "GetConsoleMsgs");
323  // CorePropertySupervisorBase::setSupervisorProperty(CorePropertySupervisorBase::SUPERVISOR_PROPERTIES.NeedUsernameRequestTypes,
324  // "SaveUserPreferences | LoadUserPreferences");
325 } // end forceSupervisorPropertyValues()
326 
327 //==============================================================================
328 // Request
329 // Handles Web Interface requests to Console supervisor.
330 // Does not refresh cookie for automatic update checks.
331 void ConsoleSupervisor::request(const std::string& requestType,
332  cgicc::Cgicc& cgiIn,
333  HttpXmlDocument& xmlOut,
334  const WebUsers::RequestUserInfo& userInfo)
335 {
336  //__SUP_COUT__ << "requestType " << requestType << std::endl;
337 
338  // Commands:
339  // GetConsoleMsgs
340  // SaveUserPreferences
341  // LoadUserPreferences
342 
343  // Note: to report to logbook admin status use
344  // xmlOut.addTextElementToData(XML_ADMIN_STATUS,refreshTempStr_);
345 
346  if(requestType == "GetConsoleMsgs")
347  {
348  // lindex of -1 means first time and user just gets update lcount and lindex
349  std::string lastUpdateCountStr = CgiDataUtilities::postData(cgiIn, "lcount");
350 
351  if(lastUpdateCountStr == "")
352  {
353  __SUP_COUT_ERR__ << "Invalid Parameters! lastUpdateCount="
354  << lastUpdateCountStr << std::endl;
355  xmlOut.addTextElementToData("Error",
356  "Error - Invalid parameters for GetConsoleMsgs.");
357  return;
358  }
359 
360  size_t lastUpdateCount = std::stoull(lastUpdateCountStr);
361 
362  // __SUP_COUT__ << "lastUpdateCount=" << lastUpdateCount << std::endl;
363 
364  insertMessageRefresh(&xmlOut, lastUpdateCount);
365  }
366  else if(requestType == "SaveUserPreferences")
367  {
368  int colorIndex = CgiDataUtilities::postDataAsInt(cgiIn, "colorIndex");
369  int showSideBar = CgiDataUtilities::postDataAsInt(cgiIn, "showSideBar");
370  int noWrap = CgiDataUtilities::postDataAsInt(cgiIn, "noWrap");
371  int messageOnly = CgiDataUtilities::postDataAsInt(cgiIn, "messageOnly");
372  int hideLineNumers = CgiDataUtilities::postDataAsInt(cgiIn, "hideLineNumers");
373 
374  __SUP_COUT__ << "requestType " << requestType << std::endl;
375  __SUP_COUT__ << "colorIndex: " << colorIndex << std::endl;
376  __SUP_COUT__ << "showSideBar: " << showSideBar << std::endl;
377  __SUP_COUT__ << "noWrap: " << noWrap << std::endl;
378  __SUP_COUT__ << "messageOnly: " << messageOnly << std::endl;
379  __SUP_COUT__ << "hideLineNumers: " << hideLineNumers << std::endl;
380 
381  if(userInfo.username_ == "") // should never happen?
382  {
383  __SUP_COUT_ERR__ << "Invalid user found! user=" << userInfo.username_
384  << std::endl;
385  xmlOut.addTextElementToData("Error",
386  "Error - InvauserInfo.username_user found.");
387  return;
388  }
389 
390  std::string fn = (std::string)USER_CONSOLE_PREF_PATH + userInfo.username_ + "." +
391  (std::string)USERS_PREFERENCES_FILETYPE;
392 
393  __SUP_COUT__ << "Save preferences: " << fn << std::endl;
394  FILE* fp = fopen(fn.c_str(), "w");
395  if(!fp)
396  {
397  __SS__;
398  __THROW__(ss.str() + "Could not open file: " + fn);
399  }
400  fprintf(fp, "colorIndex %d\n", colorIndex);
401  fprintf(fp, "showSideBar %d\n", showSideBar);
402  fprintf(fp, "noWrap %d\n", noWrap);
403  fprintf(fp, "messageOnly %d\n", messageOnly);
404  fprintf(fp, "hideLineNumers %d\n", hideLineNumers);
405  fclose(fp);
406  }
407  else if(requestType == "LoadUserPreferences")
408  {
409  __SUP_COUT__ << "requestType " << requestType << std::endl;
410 
411  unsigned int colorIndex, showSideBar, noWrap, messageOnly, hideLineNumers;
412 
413  if(userInfo.username_ == "") // should never happen?
414  {
415  __SUP_COUT_ERR__ << "Invalid user found! user=" << userInfo.username_
416  << std::endl;
417  xmlOut.addTextElementToData("Error", "Error - Invalid user found.");
418  return;
419  }
420 
421  std::string fn = (std::string)USER_CONSOLE_PREF_PATH + userInfo.username_ + "." +
422  (std::string)USERS_PREFERENCES_FILETYPE;
423 
424  __SUP_COUT__ << "Load preferences: " << fn << std::endl;
425 
426  FILE* fp = fopen(fn.c_str(), "r");
427  if(!fp)
428  {
429  // return defaults
430  __SUP_COUT__ << "Returning defaults." << std::endl;
431  xmlOut.addTextElementToData("colorIndex", "0");
432  xmlOut.addTextElementToData("showSideBar", "0");
433  xmlOut.addTextElementToData("noWrap", "1");
434  xmlOut.addTextElementToData("messageOnly", "0");
435  xmlOut.addTextElementToData("hideLineNumers", "1");
436  return;
437  }
438  fscanf(fp, "%*s %u", &colorIndex);
439  fscanf(fp, "%*s %u", &showSideBar);
440  fscanf(fp, "%*s %u", &noWrap);
441  fscanf(fp, "%*s %u", &messageOnly);
442  fscanf(fp, "%*s %u", &hideLineNumers);
443  fclose(fp);
444  __SUP_COUT__ << "colorIndex: " << colorIndex << std::endl;
445  __SUP_COUT__ << "showSideBar: " << showSideBar << std::endl;
446  __SUP_COUT__ << "noWrap: " << noWrap << std::endl;
447  __SUP_COUT__ << "messageOnly: " << messageOnly << std::endl;
448  __SUP_COUT__ << "hideLineNumers: " << hideLineNumers << std::endl;
449 
450  char tmpStr[20];
451  sprintf(tmpStr, "%u", colorIndex);
452  xmlOut.addTextElementToData("colorIndex", tmpStr);
453  sprintf(tmpStr, "%u", showSideBar);
454  xmlOut.addTextElementToData("showSideBar", tmpStr);
455  sprintf(tmpStr, "%u", noWrap);
456  xmlOut.addTextElementToData("noWrap", tmpStr);
457  sprintf(tmpStr, "%u", messageOnly);
458  xmlOut.addTextElementToData("messageOnly", tmpStr);
459  sprintf(tmpStr, "%u", hideLineNumers);
460  xmlOut.addTextElementToData("hideLineNumers", tmpStr);
461  }
462  else
463  {
464  __SUP_SS__ << "requestType Request, " << requestType << ", not recognized."
465  << __E__;
466  __SUP_SS_THROW__;
467  }
468 } // end request()
469 
470 //==============================================================================
471 // ConsoleSupervisor::insertMessageRefresh()
472 // if lastUpdateClock is current, return nothing
473 // else return new messages
474 // (note: lastUpdateIndex==(unsigned int)-1 first time and returns as much as possible//
475 // nothing but lastUpdateClock)
476 //
477 // format of xml:
478 //
479 // <last_update_count/>
480 // <last_update_index/>
481 // <messages>
482 // <message_FIELDNAME*/>
483 //"Level"
484 //"Label"
485 //"Source"
486 //"Msg"
487 //"Time"
488 //"Count"
489 // </messages>
490 //
491 // NOTE: Uses std::mutex to avoid conflict with writing thread. (this is the reading
492 // thread)
493 void ConsoleSupervisor::insertMessageRefresh(HttpXmlDocument* xmlOut,
494  const size_t lastUpdateCount)
495 {
496  //__SUP_COUT__ << std::endl;
497 
498  if(messages_.size() == 0)
499  return;
500 
501  // validate lastUpdateCount
502  if(lastUpdateCount > messages_.back().getCount() && lastUpdateCount != (size_t)-1)
503  {
504  __SS__ << "Invalid lastUpdateCount: " << lastUpdateCount
505  << " messagesArray size = " << messages_.back().getCount() << std::endl;
506  __SS_THROW__;
507  }
508 
509  // lockout the messages array for the remainder of the scope
510  // this guarantees the reading thread can safely access the messages
511  std::lock_guard<std::mutex> lock(messageMutex_);
512 
513  xmlOut->addTextElementToData("last_update_count",
514  std::to_string(messages_.back().getCount()));
515 
516  refreshParent_ = xmlOut->addTextElementToData("messages", "");
517 
518  bool requestOutOfSync = false;
519  std::string requestOutOfSyncMsg;
520 
521  size_t refreshReadPointer = 0;
522  if(lastUpdateCount != (size_t)-1)
523  {
524  while(refreshReadPointer < messages_.size() &&
525  messages_[refreshReadPointer].getCount() <= lastUpdateCount)
526  {
527  ++refreshReadPointer;
528  }
529  }
530 
531  if(refreshReadPointer >= messages_.size())
532  return;
533 
534  if(messages_.size() - refreshReadPointer > 250)
535  {
536  __SUP_COUT__ << "Only sending latest 250 messages!";
537 
538  auto oldrrp = refreshReadPointer;
539  refreshReadPointer = messages_.size() - 250;
540 
541  // generate special message to indicate failed socket
542  __SS__ << "Skipping " << (refreshReadPointer - oldrrp)
543  << " messages because the web console has fallen behind!" << std::endl;
544  __COUT__ << ss.str();
545 
546  ConsoleMessageStruct msg(CONSOLE_SPECIAL_WARNING + ss.str(), lastUpdateCount);
547  auto it = messages_.begin();
548  std::advance(it, refreshReadPointer + 1);
549  messages_.insert(it, msg);
550  }
551 
552  // output oldest to new
553  for(; refreshReadPointer < messages_.size(); ++refreshReadPointer)
554  {
555  auto msg = messages_[refreshReadPointer];
556  if(msg.getCount() < lastUpdateCount)
557  {
558  if(!requestOutOfSync) // record out of sync message once only
559  {
560  requestOutOfSync = true;
561  __SS__ << "Request is out of sync! Message count should be more recent "
562  "than update clock! "
563  << msg.getCount() << " < " << lastUpdateCount << std::endl;
564  requestOutOfSyncMsg = ss.str();
565  }
566  // assume these messages are new (due to a system restart)
567  // continue;
568  }
569 
570  // for all fields, give value
571  for(auto& field : msg.fields)
572  {
573  if(field.second.fieldName == "Source")
574  continue; // skip, not userful
575  if(field.second.fieldName == "SourceID")
576  continue; // skip, not userful
577 
578  xmlOut->addTextElementToParent("message_" + field.second.fieldName,
579  field.second.fieldValue,
580  refreshParent_);
581  }
582 
583  // give timestamp also
584  xmlOut->addTextElementToParent("message_Time", msg.getTime(), refreshParent_);
585  // give clock also
586  xmlOut->addTextElementToParent(
587  "message_Count", std::to_string(msg.getCount()), refreshParent_);
588  }
589 
590  if(requestOutOfSync) // if request was out of sync, show message
591  __SUP_COUT__ << requestOutOfSyncMsg;
592 }