1 #include "otsdaq-utilities/Console/ConsoleSupervisor.h"
2 #include <xdaq/NamespaceURI.h>
3 #include "otsdaq/CgiDataUtilities/CgiDataUtilities.h"
4 #include "otsdaq/Macros/CoutMacros.h"
5 #include "otsdaq/MessageFacility/MessageFacility.h"
6 #include "otsdaq/NetworkUtilities/ReceiverSocket.h"
7 #include "otsdaq/XmlUtilities/HttpXmlDocument.h"
24 #define USER_CONSOLE_PREF_PATH \
25 std::string(__ENV__("SERVICE_DATA_PATH")) + "/ConsolePreferences/"
26 #define USERS_PREFERENCES_FILETYPE "pref"
28 #define QUIET_CFG_FILE \
29 std::string(__ENV__("USER_DATA")) + \
30 "/MessageFacilityConfigurations/" \
33 #define CONSOLE_SPECIAL_ERROR \
34 std::string("|30-Aug-2019 15:30:17 CDT|0|||Error|Console||-1||ConsoleSupervisor|") + \
35 std::string(__FILE__) + std::string("|") + std::to_string(__LINE__) + \
37 #define CONSOLE_SPECIAL_WARNING \
39 "|30-Aug-2019 15:30:17 CDT|0|||Warning|Console||-1||ConsoleSupervisor|") + \
40 std::string(__FILE__) + std::string("|") + std::to_string(__LINE__) + \
44 #define __MF_SUBJECT__ "Console"
47 ConsoleSupervisor::ConsoleSupervisor(xdaq::ApplicationStub* stub)
48 : CoreSupervisorBase(stub), messageCount_(0), maxMessageCount_(100000)
50 __SUP_COUT__ <<
"Constructor started." << __E__;
55 mkdir(((std::string)USER_CONSOLE_PREF_PATH).c_str(), 0755);
59 __SUP_COUT__ <<
"Constructor complete." << __E__;
63 ConsoleSupervisor::~ConsoleSupervisor(
void) { destroy(); }
65 void ConsoleSupervisor::init(
void)
70 ConsoleSupervisor::messageFacilityReceiverWorkLoop(cs);
77 void ConsoleSupervisor::destroy(
void)
86 void ConsoleSupervisor::messageFacilityReceiverWorkLoop(
ConsoleSupervisor* cs)
try
88 __COUT__ << std::endl;
90 std::string configFile = QUIET_CFG_FILE;
91 FILE* fp = fopen(configFile.c_str(),
"r");
94 __SS__ <<
"File with port info could not be loaded: " << QUIET_CFG_FILE
96 __COUT__ <<
"\n" << ss.str();
103 sscanf(tmp,
"%*s %d", &myport);
107 sscanf(tmp,
"%*s %s", myip);
110 ReceiverSocket rsock(myip, myport);
113 rsock.initialize(0xA00000 );
119 std::lock_guard<std::mutex> lock(cs->messageMutex_);
125 __SS__ <<
"FATAL Console error. Could not initialize socket on port "
127 <<
". Perhaps the port is already in use? Check for multiple stale "
128 "instances of otsdaq processes, or notify admins."
129 <<
" Multiple instances of otsdaq on the same node should be "
130 "possible, but port numbers must be unique."
136 __SS__ <<
"FATAL Console error. Could not initialize socket on port " << myport
137 <<
". Perhaps it is already in use? Exiting Console receive loop."
139 __COUT__ << ss.str();
141 cs->messages_.emplace_back(CONSOLE_SPECIAL_ERROR + ss.str(), cs->messageCount_++);
143 if(cs->messages_.size() > cs->maxMessageCount_)
145 cs->messages_.erase(cs->messages_.begin());
153 int heartbeatCount = 0;
154 int selfGeneratedMessageCount = 0;
156 std::map<unsigned int, unsigned int>
157 sourceLastSequenceID;
160 long long newSourceId;
161 unsigned int newSequenceId;
164 __MOUT__ <<
"DEBUG messages look like this." << __E__;
173 buffer, 1 , 0 ,
false ) !=
177 if(buffer.size() == 1)
185 __COUT__ <<
"Console has first message." << __E__;
189 __MOUT_INFO__ <<
"INFO messages look like this." << __E__;
190 __MOUT_WARN__ <<
"WARNING messages look like this." << __E__;
191 __MOUT_ERR__ <<
"ERROR messages look like this." << __E__;
203 if(selfGeneratedMessageCount)
204 --selfGeneratedMessageCount;
212 std::lock_guard<std::mutex> lock(cs->messageMutex_);
214 cs->messages_.emplace_back(buffer, cs->messageCount_++);
217 newSourceId = cs->messages_.back().getSourceIDAsNumber();
218 newSequenceId = cs->messages_.back().getSequenceIDAsNumber();
223 if(newSourceId != -1 &&
224 sourceLastSequenceID.find(newSourceId) !=
225 sourceLastSequenceID.end() &&
226 ((newSequenceId == 0 && sourceLastSequenceID[newSourceId] !=
229 sourceLastSequenceID[newSourceId] + 1))
232 __SS__ <<
"Missed packets from " << cs->messages_.back().getSource()
233 <<
"! Sequence IDs " << sourceLastSequenceID[newSourceId] + 1
234 <<
" to " << newSequenceId - 1 <<
"." << __E__;
235 std::cout << ss.str();
238 cs->messages_.emplace_back(CONSOLE_SPECIAL_WARNING + ss.str(),
239 cs->messageCount_++);
243 sourceLastSequenceID[newSourceId] = newSequenceId;
245 while(cs->messages_.size() > 0 && cs->messages_.size() > cs->maxMessageCount_)
247 cs->messages_.erase(cs->messages_.begin());
261 (heartbeatCount < 60 * 5 &&
262 heartbeatCount % 60 == 59))
264 ++selfGeneratedMessageCount;
265 __MOUT__ <<
"Console is alive and waiting... (if no messages, next "
266 "heartbeat is in two minutes)"
269 else if(heartbeatCount % (60 * 30) == 59)
271 ++selfGeneratedMessageCount;
272 __MOUT__ <<
"Console is alive and waiting a long time... (if no "
273 "messages, next heartbeat is in one hour)"
284 if(i == 120 || selfGeneratedMessageCount == 5)
287 __COUTV__(selfGeneratedMessageCount);
288 __COUT__ <<
"No messages received at Console Supervisor. Exiting Console "
289 "messageFacilityReceiverWorkLoop"
296 catch(
const std::runtime_error& e)
298 __COUT_ERR__ <<
"Error caught at Console Supervisor thread: " << e.what() << __E__;
302 __COUT_ERR__ <<
"Unknown error caught at Console Supervisor thread." << __E__;
306 void ConsoleSupervisor::defaultPage(xgi::Input* , xgi::Output* out)
308 __SUP_COUT__ <<
"ApplicationDescriptor LID="
309 << getApplicationDescriptor()->getLocalId() << std::endl;
310 *out <<
"<!DOCTYPE HTML><html lang='en'><frameset col='100%' row='100%'><frame "
311 "src='/WebPath/html/Console.html?urn="
312 << getApplicationDescriptor()->getLocalId() <<
"'></frameset></html>";
318 void ConsoleSupervisor::forceSupervisorPropertyValues()
320 CorePropertySupervisorBase::setSupervisorProperty(
321 CorePropertySupervisorBase::SUPERVISOR_PROPERTIES.AutomatedRequestTypes,
331 void ConsoleSupervisor::request(
const std::string& requestType,
333 HttpXmlDocument& xmlOut,
334 const WebUsers::RequestUserInfo& userInfo)
346 if(requestType ==
"GetConsoleMsgs")
349 std::string lastUpdateCountStr = CgiDataUtilities::postData(cgiIn,
"lcount");
351 if(lastUpdateCountStr ==
"")
353 __SUP_COUT_ERR__ <<
"Invalid Parameters! lastUpdateCount="
354 << lastUpdateCountStr << std::endl;
355 xmlOut.addTextElementToData(
"Error",
356 "Error - Invalid parameters for GetConsoleMsgs.");
360 size_t lastUpdateCount = std::stoull(lastUpdateCountStr);
364 insertMessageRefresh(&xmlOut, lastUpdateCount);
366 else if(requestType ==
"SaveUserPreferences")
368 int colorIndex = CgiDataUtilities::postDataAsInt(cgiIn,
"colorIndex");
369 int showSideBar = CgiDataUtilities::postDataAsInt(cgiIn,
"showSideBar");
370 int noWrap = CgiDataUtilities::postDataAsInt(cgiIn,
"noWrap");
371 int messageOnly = CgiDataUtilities::postDataAsInt(cgiIn,
"messageOnly");
372 int hideLineNumers = CgiDataUtilities::postDataAsInt(cgiIn,
"hideLineNumers");
374 __SUP_COUT__ <<
"requestType " << requestType << std::endl;
375 __SUP_COUT__ <<
"colorIndex: " << colorIndex << std::endl;
376 __SUP_COUT__ <<
"showSideBar: " << showSideBar << std::endl;
377 __SUP_COUT__ <<
"noWrap: " << noWrap << std::endl;
378 __SUP_COUT__ <<
"messageOnly: " << messageOnly << std::endl;
379 __SUP_COUT__ <<
"hideLineNumers: " << hideLineNumers << std::endl;
381 if(userInfo.username_ ==
"")
383 __SUP_COUT_ERR__ <<
"Invalid user found! user=" << userInfo.username_
385 xmlOut.addTextElementToData(
"Error",
386 "Error - InvauserInfo.username_user found.");
390 std::string fn = (std::string)USER_CONSOLE_PREF_PATH + userInfo.username_ +
"." +
391 (std::string)USERS_PREFERENCES_FILETYPE;
393 __SUP_COUT__ <<
"Save preferences: " << fn << std::endl;
394 FILE* fp = fopen(fn.c_str(),
"w");
398 __THROW__(ss.str() +
"Could not open file: " + fn);
400 fprintf(fp,
"colorIndex %d\n", colorIndex);
401 fprintf(fp,
"showSideBar %d\n", showSideBar);
402 fprintf(fp,
"noWrap %d\n", noWrap);
403 fprintf(fp,
"messageOnly %d\n", messageOnly);
404 fprintf(fp,
"hideLineNumers %d\n", hideLineNumers);
407 else if(requestType ==
"LoadUserPreferences")
409 __SUP_COUT__ <<
"requestType " << requestType << std::endl;
411 unsigned int colorIndex, showSideBar, noWrap, messageOnly, hideLineNumers;
413 if(userInfo.username_ ==
"")
415 __SUP_COUT_ERR__ <<
"Invalid user found! user=" << userInfo.username_
417 xmlOut.addTextElementToData(
"Error",
"Error - Invalid user found.");
421 std::string fn = (std::string)USER_CONSOLE_PREF_PATH + userInfo.username_ +
"." +
422 (std::string)USERS_PREFERENCES_FILETYPE;
424 __SUP_COUT__ <<
"Load preferences: " << fn << std::endl;
426 FILE* fp = fopen(fn.c_str(),
"r");
430 __SUP_COUT__ <<
"Returning defaults." << std::endl;
431 xmlOut.addTextElementToData(
"colorIndex",
"0");
432 xmlOut.addTextElementToData(
"showSideBar",
"0");
433 xmlOut.addTextElementToData(
"noWrap",
"1");
434 xmlOut.addTextElementToData(
"messageOnly",
"0");
435 xmlOut.addTextElementToData(
"hideLineNumers",
"1");
438 fscanf(fp,
"%*s %u", &colorIndex);
439 fscanf(fp,
"%*s %u", &showSideBar);
440 fscanf(fp,
"%*s %u", &noWrap);
441 fscanf(fp,
"%*s %u", &messageOnly);
442 fscanf(fp,
"%*s %u", &hideLineNumers);
444 __SUP_COUT__ <<
"colorIndex: " << colorIndex << std::endl;
445 __SUP_COUT__ <<
"showSideBar: " << showSideBar << std::endl;
446 __SUP_COUT__ <<
"noWrap: " << noWrap << std::endl;
447 __SUP_COUT__ <<
"messageOnly: " << messageOnly << std::endl;
448 __SUP_COUT__ <<
"hideLineNumers: " << hideLineNumers << std::endl;
451 sprintf(tmpStr,
"%u", colorIndex);
452 xmlOut.addTextElementToData(
"colorIndex", tmpStr);
453 sprintf(tmpStr,
"%u", showSideBar);
454 xmlOut.addTextElementToData(
"showSideBar", tmpStr);
455 sprintf(tmpStr,
"%u", noWrap);
456 xmlOut.addTextElementToData(
"noWrap", tmpStr);
457 sprintf(tmpStr,
"%u", messageOnly);
458 xmlOut.addTextElementToData(
"messageOnly", tmpStr);
459 sprintf(tmpStr,
"%u", hideLineNumers);
460 xmlOut.addTextElementToData(
"hideLineNumers", tmpStr);
464 __SUP_SS__ <<
"requestType Request, " << requestType <<
", not recognized."
493 void ConsoleSupervisor::insertMessageRefresh(HttpXmlDocument* xmlOut,
494 const size_t lastUpdateCount)
498 if(messages_.size() == 0)
502 if(lastUpdateCount > messages_.back().getCount() && lastUpdateCount != (size_t)-1)
504 __SS__ <<
"Invalid lastUpdateCount: " << lastUpdateCount
505 <<
" messagesArray size = " << messages_.back().getCount() << std::endl;
511 std::lock_guard<std::mutex> lock(messageMutex_);
513 xmlOut->addTextElementToData(
"last_update_count",
514 std::to_string(messages_.back().getCount()));
516 refreshParent_ = xmlOut->addTextElementToData(
"messages",
"");
518 bool requestOutOfSync =
false;
519 std::string requestOutOfSyncMsg;
521 size_t refreshReadPointer = 0;
522 if(lastUpdateCount != (
size_t)-1)
524 while(refreshReadPointer < messages_.size() &&
525 messages_[refreshReadPointer].getCount() <= lastUpdateCount)
527 ++refreshReadPointer;
531 if(refreshReadPointer >= messages_.size())
534 if(messages_.size() - refreshReadPointer > 250)
536 __SUP_COUT__ <<
"Only sending latest 250 messages!";
538 auto oldrrp = refreshReadPointer;
539 refreshReadPointer = messages_.size() - 250;
542 __SS__ <<
"Skipping " << (refreshReadPointer - oldrrp)
543 <<
" messages because the web console has fallen behind!" << std::endl;
544 __COUT__ << ss.str();
546 ConsoleMessageStruct msg(CONSOLE_SPECIAL_WARNING + ss.str(), lastUpdateCount);
547 auto it = messages_.begin();
548 std::advance(it, refreshReadPointer + 1);
549 messages_.insert(it, msg);
553 for(; refreshReadPointer < messages_.size(); ++refreshReadPointer)
555 auto msg = messages_[refreshReadPointer];
556 if(msg.getCount() < lastUpdateCount)
558 if(!requestOutOfSync)
560 requestOutOfSync =
true;
561 __SS__ <<
"Request is out of sync! Message count should be more recent "
562 "than update clock! "
563 << msg.getCount() <<
" < " << lastUpdateCount << std::endl;
564 requestOutOfSyncMsg = ss.str();
571 for(
auto& field : msg.fields)
573 if(field.second.fieldName ==
"Source")
575 if(field.second.fieldName ==
"SourceID")
578 xmlOut->addTextElementToParent(
"message_" + field.second.fieldName,
579 field.second.fieldValue,
584 xmlOut->addTextElementToParent(
"message_Time", msg.getTime(), refreshParent_);
586 xmlOut->addTextElementToParent(
587 "message_Count", std::to_string(msg.getCount()), refreshParent_);
591 __SUP_COUT__ << requestOutOfSyncMsg;