artdaq_utilities  v1_05_00
MetricManager.cc
1 // MetricManager.cc: MetricManager class implementation file
2 // Author: Eric Flumerfelt
3 // Last Modified: 11/14/2014
4 //
5 // MetricManager loads a user-specified set of plugins, sends them their configuration,
6 // and sends them data as it is recieved. It also maintains the state of the plugins
7 // relative to the application state.
8 
9 #define TRACE_NAME "MetricManager"
10 #include "artdaq-utilities/Plugins/MetricManager.hh"
11 #include "artdaq-utilities/Plugins/makeMetricPlugin.hh"
12 #include "fhiclcpp/ParameterSet.h"
13 #include "tracemf.h"
14 
15 #include <boost/exception/all.hpp>
16 #include <chrono>
17 
19  : metric_plugins_(0), metric_send_interval_ms_(15000), system_metric_collector_(nullptr), initialized_(false), running_(false), active_(false), missed_metric_calls_(0), metric_calls_(0), metric_cache_max_size_(1000), metric_cache_notify_size_(10) {}
20 
21 artdaq::MetricManager::~MetricManager() noexcept { shutdown(); }
22 
23 void artdaq::MetricManager::initialize(fhicl::ParameterSet const& pset, std::string const& prefix)
24 {
25  prefix_ = prefix;
26  if (initialized_)
27  {
28  shutdown();
29  }
30  TLOG(TLVL_INFO) << "Configuring metrics with parameter set: " << pset.to_string();
31 
32  std::vector<std::string> names = pset.get_names();
33 
34  metric_plugins_.clear();
35  bool send_system_metrics = false;
36  bool send_process_metrics = false;
37 
38  for (auto name : names)
39  {
40  if (name == "metric_queue_size")
41  {
42  metric_cache_max_size_ = pset.get<size_t>("metric_queue_size");
43  }
44  else if (name == "metric_queue_notify_size")
45  {
46  metric_cache_notify_size_ = pset.get<size_t>("metric_queue_notify_size");
47  }
48  else if (name == "metric_cache_size")
49  {
50  metric_cache_max_size_ = pset.get<size_t>("metric_cache_size");
51  }
52  else if (name == "metric_cache_notify_size")
53  {
54  metric_cache_notify_size_ = pset.get<size_t>("metric_cache_notify_size");
55  }
56  else if (name == "metric_send_maximum_delay_ms")
57  {
58  metric_send_interval_ms_ = pset.get<int>("metric_send_maximum_delay_ms");
59  }
60  else if (name == "send_system_metrics")
61  {
62  send_system_metrics = pset.get<bool>("send_system_metrics");
63  }
64  else if (name == "send_process_metrics")
65  {
66  send_process_metrics = pset.get<bool>("send_process_metrics");
67  }
68  else
69  {
70  try
71  {
72  TLOG(TLVL_DEBUG) << "Constructing metric plugin with name " << name;
73  fhicl::ParameterSet plugin_pset = pset.get<fhicl::ParameterSet>(name);
74  metric_plugins_.push_back(
75  makeMetricPlugin(plugin_pset.get<std::string>("metricPluginType", ""), plugin_pset, prefix_));
76  }
77  catch (const cet::exception& e)
78  {
79  TLOG(TLVL_ERROR) << "Exception caught in MetricManager::initialize, error loading plugin with name " << name
80  << ", cet::exception object caught:" << e.explain_self();
81  }
82  catch (const boost::exception& e)
83  {
84  TLOG(TLVL_ERROR) << "Exception caught in MetricManager::initialize, error loading plugin with name " << name
85  << ", boost::exception object caught: " << boost::diagnostic_information(e);
86  }
87  catch (const std::exception& e)
88  {
89  TLOG(TLVL_ERROR) << "Exception caught in MetricManager::initialize, error loading plugin with name " << name
90  << ", std::exception caught: " << e.what();
91  }
92  catch (...)
93  {
94  TLOG(TLVL_ERROR) << "Unknown Exception caught in MetricManager::initialize, error loading plugin with name "
95  << name;
96  }
97  }
98  }
99 
100  if (send_system_metrics || send_process_metrics)
101  {
102  system_metric_collector_.reset(new SystemMetricCollector(send_process_metrics, send_system_metrics));
103  }
104 
105  initialized_ = true;
106 }
107 
109 {
110  auto lk = std::unique_lock<std::mutex>(metric_mutex_);
111  if (!running_)
112  {
113  TLOG(TLVL_DEBUG) << "Starting MetricManager";
114  for (auto& metric : metric_plugins_)
115  {
116  if (!metric) continue;
117  try
118  {
119  metric->startMetrics();
120  TLOG(TLVL_INFO) << "Metric Plugin " << metric->getLibName() << " started.";
121  active_ = true;
122  }
123  catch (...)
124  {
125  TLOG(TLVL_ERROR) << "Exception caught in MetricManager::do_start(), error starting plugin with name "
126  << metric->getLibName();
127  }
128  }
129  running_ = true;
130  startMetricLoop_();
131  }
132 }
133 
135 {
136  auto lk = std::unique_lock<std::mutex>(metric_mutex_);
137  TLOG(TLVL_DEBUG) << "Stopping Metrics";
138  running_ = false;
139  metric_cv_.notify_all();
140  TLOG(TLVL_DEBUG) << "Joining Metric-Sending thread";
141  lk.unlock();
142  if (metric_sending_thread_.joinable()) metric_sending_thread_.join();
143  TLOG(TLVL_DEBUG) << "do_stop Complete";
144 }
145 
147 { /*do_stop();*/
148 }
150 { /*do_start();*/
151 }
152 
153 void artdaq::MetricManager::reinitialize(fhicl::ParameterSet const& pset, std::string const& prefix)
154 {
155  shutdown();
156  initialize(pset, prefix);
157 }
158 
160 {
161  TLOG(TLVL_DEBUG) << "MetricManager is shutting down...";
162  do_stop();
163 
164  auto lk = std::unique_lock<std::mutex>(metric_mutex_);
165  if (initialized_)
166  {
167  for (auto& i : metric_plugins_)
168  {
169  try
170  {
171  std::string name = i->getLibName();
172  i.reset(nullptr);
173  TLOG(TLVL_DEBUG) << "Metric Plugin " << name << " shutdown.";
174  }
175  catch (...)
176  {
177  TLOG(TLVL_ERROR) << "Exception caught in MetricManager::shutdown(), error shutting down metric with name "
178  << i->getLibName();
179  }
180  }
181  metric_plugins_.clear();
182  initialized_ = false;
183  }
184 }
185 
186 void artdaq::MetricManager::sendMetric(std::string const& name, std::string const& value, std::string const& unit,
187  int level, MetricMode mode, std::string const& metricPrefix,
188  bool useNameOverride)
189 {
190  if (!initialized_)
191  {
192  TLOG(TLVL_WARNING) << "Attempted to send metric when MetricManager has not yet been initialized!";
193  }
194  else if (!running_)
195  {
196  TLOG(TLVL_INFO) << "Attempted to send metric when MetricManager stopped!";
197  }
198  else if (active_)
199  {
200  {
201  std::unique_lock<std::mutex> lk(metric_cache_mutex_);
202  metric_calls_++;
203  if (!metric_cache_.count(name) || metric_cache_[name] == nullptr)
204  {
205  metric_cache_[name] =
206  std::make_unique<MetricData>(name, value, unit, level, mode, metricPrefix, useNameOverride);
207  }
208  else
209  {
210  auto size = metric_cache_[name]->DataPointCount;
211  if (size < metric_cache_max_size_)
212  {
213  if (size >= metric_cache_notify_size_)
214  {
215  TLOG(9) << "Metric cache is at size " << size << " of " << metric_cache_max_size_ << " for metric " << name
216  << ".";
217  }
218  if (mode == MetricMode::LastPoint)
219  {
220  metric_cache_[name]->StringValue = value;
221  metric_cache_[name]->DataPointCount = 1;
222  }
223  else
224  {
225  metric_cache_[name]->StringValue += " " + value;
226  metric_cache_[name]->DataPointCount++;
227  }
228  }
229  else
230  {
231  TLOG(10) << "Rejecting metric because queue full";
232  missed_metric_calls_++;
233  }
234  }
235  }
236  metric_cv_.notify_all();
237  }
238 }
239 
240 void artdaq::MetricManager::sendMetric(std::string const& name, int const& value, std::string const& unit, int level,
241  MetricMode mode, std::string const& metricPrefix, bool useNameOverride)
242 {
243  if (!initialized_)
244  {
245  TLOG(TLVL_WARNING) << "Attempted to send metric when MetricManager has not yet been initialized!";
246  }
247  else if (!running_)
248  {
249  TLOG(TLVL_INFO) << "Attempted to send metric when MetricManager stopped!";
250  }
251  else if (active_)
252  {
253  {
254  std::unique_lock<std::mutex> lk(metric_cache_mutex_);
255  metric_calls_++;
256  if (!metric_cache_.count(name) || metric_cache_[name] == nullptr)
257  {
258  metric_cache_[name] =
259  std::make_unique<MetricData>(name, value, unit, level, mode, metricPrefix, useNameOverride);
260  }
261  else
262  {
263  auto size = metric_cache_[name]->DataPointCount;
264  if (size < metric_cache_max_size_)
265  {
266  if (size >= metric_cache_notify_size_)
267  {
268  TLOG(9) << "Metric cache is at size " << size << " of " << metric_cache_max_size_ << " for metric " << name
269  << ".";
270  }
271  metric_cache_[name]->AddPoint(value);
272  }
273  else
274  {
275  TLOG(10) << "Rejecting metric because queue full";
276  missed_metric_calls_++;
277  }
278  }
279  }
280  metric_cv_.notify_all();
281  }
282 }
283 
284 void artdaq::MetricManager::sendMetric(std::string const& name, double const& value, std::string const& unit, int level,
285  MetricMode mode, std::string const& metricPrefix, bool useNameOverride)
286 {
287  if (!initialized_)
288  {
289  TLOG(TLVL_WARNING) << "Attempted to send metric when MetricManager has not yet been initialized!";
290  }
291  else if (!running_)
292  {
293  TLOG(TLVL_INFO) << "Attempted to send metric when MetricManager stopped!";
294  }
295  else if (active_)
296  {
297  {
298  std::unique_lock<std::mutex> lk(metric_cache_mutex_);
299  metric_calls_++;
300  if (!metric_cache_.count(name) || metric_cache_[name] == nullptr)
301  {
302  metric_cache_[name] =
303  std::make_unique<MetricData>(name, value, unit, level, mode, metricPrefix, useNameOverride);
304  }
305  else
306  {
307  auto size = metric_cache_[name]->DataPointCount;
308  if (size < metric_cache_max_size_)
309  {
310  if (size >= metric_cache_notify_size_)
311  {
312  TLOG(9) << "Metric cache is at size " << size << " of " << metric_cache_max_size_ << " for metric " << name
313  << ".";
314  }
315  metric_cache_[name]->AddPoint(value);
316  }
317  else
318  {
319  TLOG(10) << "Rejecting metric because queue full";
320  missed_metric_calls_++;
321  }
322  }
323  }
324  metric_cv_.notify_all();
325  }
326 }
327 
328 void artdaq::MetricManager::sendMetric(std::string const& name, float const& value, std::string const& unit, int level,
329  MetricMode mode, std::string const& metricPrefix, bool useNameOverride)
330 {
331  if (!initialized_)
332  {
333  TLOG(TLVL_WARNING) << "Attempted to send metric when MetricManager has not yet been initialized!";
334  }
335  else if (!running_)
336  {
337  TLOG(TLVL_INFO) << "Attempted to send metric when MetricManager stopped!";
338  }
339  else if (active_)
340  {
341  {
342  std::unique_lock<std::mutex> lk(metric_cache_mutex_);
343  metric_calls_++;
344  if (!metric_cache_.count(name) || metric_cache_[name] == nullptr)
345  {
346  metric_cache_[name] =
347  std::make_unique<MetricData>(name, value, unit, level, mode, metricPrefix, useNameOverride);
348  }
349  else
350  {
351  auto size = metric_cache_[name]->DataPointCount;
352  if (size < metric_cache_max_size_)
353  {
354  if (size >= metric_cache_notify_size_)
355  {
356  TLOG(9) << "Metric cache is at size " << size << " of " << metric_cache_max_size_ << " for metric " << name
357  << ".";
358  }
359  metric_cache_[name]->AddPoint(value);
360  }
361  else
362  {
363  TLOG(10) << "Rejecting metric because queue full";
364  missed_metric_calls_++;
365  }
366  }
367  }
368  metric_cv_.notify_all();
369  }
370 }
371 
372 void artdaq::MetricManager::sendMetric(std::string const& name, long unsigned int const& value, std::string const& unit,
373  int level, MetricMode mode, std::string const& metricPrefix,
374  bool useNameOverride)
375 {
376  if (!initialized_)
377  {
378  TLOG(TLVL_WARNING) << "Attempted to send metric when MetricManager has not yet been initialized!";
379  }
380  else if (!running_)
381  {
382  TLOG(TLVL_INFO) << "Attempted to send metric when MetricManager stopped!";
383  }
384  else if (active_)
385  {
386  {
387  std::unique_lock<std::mutex> lk(metric_cache_mutex_);
388  metric_calls_++;
389  if (!metric_cache_.count(name) || metric_cache_[name] == nullptr)
390  {
391  metric_cache_[name] =
392  std::make_unique<MetricData>(name, value, unit, level, mode, metricPrefix, useNameOverride);
393  }
394  else
395  {
396  auto size = metric_cache_[name]->DataPointCount;
397  if (size < metric_cache_max_size_)
398  {
399  if (size >= metric_cache_notify_size_)
400  {
401  TLOG(9) << "Metric cache is at size " << size << " of " << metric_cache_max_size_ << " for metric " << name
402  << ".";
403  }
404  metric_cache_[name]->AddPoint(value);
405  }
406  else
407  {
408  TLOG(10) << "Rejecting metric because queue full";
409  missed_metric_calls_++;
410  }
411  }
412  }
413  metric_cv_.notify_all();
414  }
415 }
416 
417 void artdaq::MetricManager::startMetricLoop_()
418 {
419  if (metric_sending_thread_.joinable()) metric_sending_thread_.join();
420  boost::thread::attributes attrs;
421  attrs.set_stack_size(4096 * 2000); // 8000 KB
422  TLOG(TLVL_INFO) << "Starting Metric Sending Thread";
423  try
424  {
425  metric_sending_thread_ = boost::thread(attrs, boost::bind(&MetricManager::sendMetricLoop_, this));
426  }
427  catch (const boost::exception& e)
428  {
429  TLOG(TLVL_ERROR) << "Caught boost::exception starting Metric Sending thread: " << boost::diagnostic_information(e)
430  << ", errno=" << errno;
431  std::cerr << "Caught boost::exception starting Metric Sending thread: " << boost::diagnostic_information(e)
432  << ", errno=" << errno << std::endl;
433  exit(5);
434  }
435  TLOG(TLVL_INFO) << "Metric Sending thread started";
436 }
437 
439 {
440  std::unique_lock<std::mutex> lk(metric_cache_mutex_);
441  return metric_cache_.size() == 0;
442 }
443 
444 size_t artdaq::MetricManager::metricQueueSize(std::string const& name)
445 {
446  std::unique_lock<std::mutex> lk(metric_cache_mutex_);
447  size_t size = 0;
448  if (name == "")
449  {
450  for (auto& q : metric_cache_)
451  {
452  size += q.second->DataPointCount;
453  }
454  }
455  else
456  {
457  if (metric_cache_.count(name)) size = metric_cache_[name]->DataPointCount;
458  }
459 
460  return size;
461 }
462 
463 void artdaq::MetricManager::sendMetricLoop_()
464 {
465  TLOG(TLVL_INFO) << "sendMetricLoop_ START";
466  auto last_send_time = std::chrono::steady_clock::time_point();
467  while (running_)
468  {
469  while (metricQueueEmpty() && running_)
470  {
471  std::unique_lock<std::mutex> lk(metric_mutex_);
472  metric_cv_.wait_for(lk, std::chrono::milliseconds(100));
473  auto now = std::chrono::steady_clock::now();
474  if (std::chrono::duration_cast<std::chrono::milliseconds>(now - last_send_time).count() >
475  metric_send_interval_ms_)
476  {
477  for (auto& metric : metric_plugins_)
478  {
479  if (metric) metric->sendMetrics();
480  }
481  last_send_time = now;
482  }
483  }
484 
485  auto processing_start = std::chrono::steady_clock::now();
486  auto temp_list = std::list<std::unique_ptr<MetricData>>();
487  {
488  std::unique_lock<std::mutex> lk(metric_cache_mutex_);
489 
490  for (auto& q : metric_cache_)
491  {
492  temp_list.emplace_back(std::move(q.second));
493  }
494  metric_cache_.clear();
495 
496  auto calls = metric_calls_.exchange(0);
497  temp_list.emplace_back(
498  new MetricData("Metric Calls", calls, "metrics", 4, MetricMode::Accumulate | MetricMode::Rate, "", false));
499 
500  auto missed = missed_metric_calls_.exchange(0);
501  temp_list.emplace_back(
502  new MetricData("Missed Metric Calls", missed, "metrics", 4, MetricMode::Accumulate | MetricMode::Rate, "", false));
503 
504  TLOG(TLVL_TRACE) << "There are " << temp_list.size() << " Metrics to process (" << calls << " calls, " << missed
505  << " missed)";
506 
507  if (system_metric_collector_ != nullptr)
508  {
509  TLOG(TLVL_TRACE) << "Collecting System metrics (CPU, RAM, Network)";
510  auto systemMetrics = system_metric_collector_->SendMetrics();
511  for (auto& m : systemMetrics) { temp_list.emplace_back(std::move(m)); }
512  }
513  }
514 
515  while (temp_list.size() > 0)
516  {
517  auto data_ = std::move(temp_list.front());
518  temp_list.pop_front();
519  if (data_->Type == MetricType::InvalidMetric) continue;
520  if (!data_->UseNameOverride)
521  {
522  if (data_->MetricPrefix.size() > 0)
523  {
524  data_->Name = prefix_ + "." + data_->MetricPrefix + "." + data_->Name;
525  }
526  else
527  {
528  data_->Name = prefix_ + "." + data_->Name;
529  }
530  }
531 
532  for (auto& metric : metric_plugins_)
533  {
534  if (!metric) continue;
535  if (metric->IsLevelEnabled(data_->Level))
536  {
537  try
538  {
539  metric->addMetricData(data_);
540  last_send_time = std::chrono::steady_clock::now();
541  }
542  catch (...)
543  {
544  TLOG(TLVL_ERROR) << "Error in MetricManager::sendMetric: error sending value to metric plugin with name "
545  << metric->getLibName();
546  }
547  }
548  }
549  }
550 
551  for (auto& metric : metric_plugins_)
552  {
553  if (!metric) continue;
554  metric->sendMetrics(false, processing_start);
555  }
556 
557  // Limit rate of metrics going to plugins
558  usleep(10000);
559  }
560 
561  auto temp_list = std::list<std::unique_ptr<MetricData>>();
562  {
563  std::unique_lock<std::mutex> lk(metric_cache_mutex_);
564 
565  for (auto& q : metric_cache_)
566  {
567  temp_list.emplace_back(std::move(q.second));
568  }
569  metric_cache_.clear();
570 
571  auto calls = metric_calls_.exchange(0);
572  temp_list.emplace_back(
573  new MetricData("Metric Calls", calls, "metrics", 4, MetricMode::Accumulate | MetricMode::Rate, "", false));
574 
575  auto missed = missed_metric_calls_.exchange(0);
576  temp_list.emplace_back(
577  new MetricData("Missed Metric Calls", missed, "metrics", 4, MetricMode::Accumulate | MetricMode::Rate, "", false));
578 
579  TLOG(TLVL_TRACE) << "There are " << temp_list.size() << " Metrics to process (" << calls << " calls, " << missed
580  << " missed)";
581  }
582 
583  while (temp_list.size() > 0)
584  {
585  auto data_ = std::move(temp_list.front());
586  temp_list.pop_front();
587  if (data_->Type == MetricType::InvalidMetric) continue;
588  if (!data_->UseNameOverride)
589  {
590  if (data_->MetricPrefix.size() > 0)
591  {
592  data_->Name = prefix_ + "." + data_->MetricPrefix + "." + data_->Name;
593  }
594  else
595  {
596  data_->Name = prefix_ + "." + data_->Name;
597  }
598  }
599 
600  for (auto& metric : metric_plugins_)
601  {
602  if (!metric) continue;
603  if (metric->IsLevelEnabled(data_->Level))
604  {
605  try
606  {
607  metric->addMetricData(data_);
608  last_send_time = std::chrono::steady_clock::now();
609  }
610  catch (...)
611  {
612  TLOG(TLVL_ERROR) << "Error in MetricManager::sendMetric: error sending value to metric plugin with name "
613  << metric->getLibName();
614  }
615  }
616  }
617  }
618 
619  for (auto& metric : metric_plugins_)
620  {
621  if (!metric) continue;
622  try
623  {
624  metric->stopMetrics();
625  TLOG(TLVL_DEBUG) << "Metric Plugin " << metric->getLibName() << " stopped.";
626  }
627  catch (...)
628  {
629  TLOG(TLVL_ERROR) << "Exception caught in MetricManager::do_stop(), error stopping plugin with name "
630  << metric->getLibName();
631  }
632  }
633  TLOG(TLVL_DEBUG) << "MetricManager has been stopped.";
634 }
void shutdown()
Call the destructors for all configured MetricPlugin instances.
void initialize(fhicl::ParameterSet const &pset, std::string const &prefix="")
Initialize the MetricPlugin instances.
void sendMetric(std::string const &name, std::string const &value, std::string const &unit, int level, MetricMode mode, std::string const &metricPrefix="", bool useNameOverride=false)
Send a metric with the given parameters to any MetricPlugins with a threshold level &gt;= to level...
size_t metricQueueSize(std::string const &name="")
Return the size of the named metric queue
void reinitialize(fhicl::ParameterSet const &pset, std::string const &prefix="")
Reinitialize all MetricPlugin Instances.
Report the sum of all values. Use for counters to report accurate results.
MetricManager()
Construct an instance of the MetricManager class.
void do_start()
Perform startup actions for each configured MetricPlugin.
void do_stop()
Stop sending metrics to the MetricPlugin instances.
virtual ~MetricManager() noexcept
MetricManager destructor.
std::unique_ptr< MetricPlugin > makeMetricPlugin(std::string const &generator_plugin_spec, fhicl::ParameterSet const &ps, std::string const &app_name)
Load a given MetricPlugin and return a pointer to it.
Report only the last value recorded. Useful for event counters, run numbers, etc. ...
MetricMode
The Mode of the metric indicates how multiple metric values should be combined within a reporting int...
Definition: MetricData.hh:29
Default, invalid value.
void do_resume()
Resume metric sending. Currently a No-Op.
bool metricQueueEmpty()
Returns whether the metric queue is completely empty
void do_pause()
Pause metric sending. Currently a No-Op.