9 #define TRACE_NAME "MetricManager"
10 #include "artdaq-utilities/Plugins/MetricManager.hh"
11 #include "artdaq-utilities/Plugins/makeMetricPlugin.hh"
12 #include "fhiclcpp/ParameterSet.h"
15 #include <boost/exception/all.hpp>
19 : metric_plugins_(0), metric_send_interval_ms_(15000), system_metric_collector_(nullptr), initialized_(false), running_(false), active_(false), missed_metric_calls_(0), metric_calls_(0), metric_cache_max_size_(1000), metric_cache_notify_size_(10) {}
30 TLOG(TLVL_INFO) <<
"Configuring metrics with parameter set: " << pset.to_string();
32 std::vector<std::string> names = pset.get_names();
34 metric_plugins_.clear();
35 bool send_system_metrics =
false;
36 bool send_process_metrics =
false;
38 for (
auto name : names)
40 if (name ==
"metric_queue_size")
42 metric_cache_max_size_ = pset.get<
size_t>(
"metric_queue_size");
44 else if (name ==
"metric_queue_notify_size")
46 metric_cache_notify_size_ = pset.get<
size_t>(
"metric_queue_notify_size");
48 else if (name ==
"metric_cache_size")
50 metric_cache_max_size_ = pset.get<
size_t>(
"metric_cache_size");
52 else if (name ==
"metric_cache_notify_size")
54 metric_cache_notify_size_ = pset.get<
size_t>(
"metric_cache_notify_size");
56 else if (name ==
"metric_send_maximum_delay_ms")
58 metric_send_interval_ms_ = pset.get<
int>(
"metric_send_maximum_delay_ms");
60 else if (name ==
"send_system_metrics")
62 send_system_metrics = pset.get<
bool>(
"send_system_metrics");
64 else if (name ==
"send_process_metrics")
66 send_process_metrics = pset.get<
bool>(
"send_process_metrics");
72 TLOG(TLVL_DEBUG) <<
"Constructing metric plugin with name " << name;
73 fhicl::ParameterSet plugin_pset = pset.get<fhicl::ParameterSet>(name);
74 metric_plugins_.push_back(
75 makeMetricPlugin(plugin_pset.get<std::string>(
"metricPluginType",
""), plugin_pset, prefix_));
77 catch (
const cet::exception& e)
79 TLOG(TLVL_ERROR) <<
"Exception caught in MetricManager::initialize, error loading plugin with name " << name
80 <<
", cet::exception object caught:" << e.explain_self();
82 catch (
const boost::exception& e)
84 TLOG(TLVL_ERROR) <<
"Exception caught in MetricManager::initialize, error loading plugin with name " << name
85 <<
", boost::exception object caught: " << boost::diagnostic_information(e);
87 catch (
const std::exception& e)
89 TLOG(TLVL_ERROR) <<
"Exception caught in MetricManager::initialize, error loading plugin with name " << name
90 <<
", std::exception caught: " << e.what();
94 TLOG(TLVL_ERROR) <<
"Unknown Exception caught in MetricManager::initialize, error loading plugin with name "
100 if (send_system_metrics || send_process_metrics)
102 system_metric_collector_.reset(
new SystemMetricCollector(send_process_metrics, send_system_metrics));
110 auto lk = std::unique_lock<std::mutex>(metric_mutex_);
113 TLOG(TLVL_DEBUG) <<
"Starting MetricManager";
114 for (
auto& metric : metric_plugins_)
116 if (!metric)
continue;
119 metric->startMetrics();
120 TLOG(TLVL_INFO) <<
"Metric Plugin " << metric->getLibName() <<
" started.";
125 TLOG(TLVL_ERROR) <<
"Exception caught in MetricManager::do_start(), error starting plugin with name "
126 << metric->getLibName();
136 auto lk = std::unique_lock<std::mutex>(metric_mutex_);
137 TLOG(TLVL_DEBUG) <<
"Stopping Metrics";
139 metric_cv_.notify_all();
140 TLOG(TLVL_DEBUG) <<
"Joining Metric-Sending thread";
142 if (metric_sending_thread_.joinable()) metric_sending_thread_.join();
143 TLOG(TLVL_DEBUG) <<
"do_stop Complete";
156 initialize(pset, prefix);
161 TLOG(TLVL_DEBUG) <<
"MetricManager is shutting down...";
164 auto lk = std::unique_lock<std::mutex>(metric_mutex_);
167 for (
auto& i : metric_plugins_)
171 std::string name = i->getLibName();
173 TLOG(TLVL_DEBUG) <<
"Metric Plugin " << name <<
" shutdown.";
177 TLOG(TLVL_ERROR) <<
"Exception caught in MetricManager::shutdown(), error shutting down metric with name "
181 metric_plugins_.clear();
182 initialized_ =
false;
187 int level,
MetricMode mode, std::string
const& metricPrefix,
188 bool useNameOverride)
192 TLOG(TLVL_WARNING) <<
"Attempted to send metric when MetricManager has not yet been initialized!";
196 TLOG(TLVL_INFO) <<
"Attempted to send metric when MetricManager stopped!";
201 std::unique_lock<std::mutex> lk(metric_cache_mutex_);
203 if (!metric_cache_.count(name) || metric_cache_[name] ==
nullptr)
205 metric_cache_[name] =
206 std::make_unique<MetricData>(name, value, unit, level, mode, metricPrefix, useNameOverride);
210 auto size = metric_cache_[name]->DataPointCount;
211 if (size < metric_cache_max_size_)
213 if (size >= metric_cache_notify_size_)
215 TLOG(9) <<
"Metric cache is at size " << size <<
" of " << metric_cache_max_size_ <<
" for metric " << name
220 metric_cache_[name]->StringValue = value;
221 metric_cache_[name]->DataPointCount = 1;
225 metric_cache_[name]->StringValue +=
" " + value;
226 metric_cache_[name]->DataPointCount++;
231 TLOG(10) <<
"Rejecting metric because queue full";
232 missed_metric_calls_++;
236 metric_cv_.notify_all();
241 MetricMode mode, std::string
const& metricPrefix,
bool useNameOverride)
245 TLOG(TLVL_WARNING) <<
"Attempted to send metric when MetricManager has not yet been initialized!";
249 TLOG(TLVL_INFO) <<
"Attempted to send metric when MetricManager stopped!";
254 std::unique_lock<std::mutex> lk(metric_cache_mutex_);
256 if (!metric_cache_.count(name) || metric_cache_[name] ==
nullptr)
258 metric_cache_[name] =
259 std::make_unique<MetricData>(name, value, unit, level, mode, metricPrefix, useNameOverride);
263 auto size = metric_cache_[name]->DataPointCount;
264 if (size < metric_cache_max_size_)
266 if (size >= metric_cache_notify_size_)
268 TLOG(9) <<
"Metric cache is at size " << size <<
" of " << metric_cache_max_size_ <<
" for metric " << name
271 metric_cache_[name]->AddPoint(value);
275 TLOG(10) <<
"Rejecting metric because queue full";
276 missed_metric_calls_++;
280 metric_cv_.notify_all();
285 MetricMode mode, std::string
const& metricPrefix,
bool useNameOverride)
289 TLOG(TLVL_WARNING) <<
"Attempted to send metric when MetricManager has not yet been initialized!";
293 TLOG(TLVL_INFO) <<
"Attempted to send metric when MetricManager stopped!";
298 std::unique_lock<std::mutex> lk(metric_cache_mutex_);
300 if (!metric_cache_.count(name) || metric_cache_[name] ==
nullptr)
302 metric_cache_[name] =
303 std::make_unique<MetricData>(name, value, unit, level, mode, metricPrefix, useNameOverride);
307 auto size = metric_cache_[name]->DataPointCount;
308 if (size < metric_cache_max_size_)
310 if (size >= metric_cache_notify_size_)
312 TLOG(9) <<
"Metric cache is at size " << size <<
" of " << metric_cache_max_size_ <<
" for metric " << name
315 metric_cache_[name]->AddPoint(value);
319 TLOG(10) <<
"Rejecting metric because queue full";
320 missed_metric_calls_++;
324 metric_cv_.notify_all();
329 MetricMode mode, std::string
const& metricPrefix,
bool useNameOverride)
333 TLOG(TLVL_WARNING) <<
"Attempted to send metric when MetricManager has not yet been initialized!";
337 TLOG(TLVL_INFO) <<
"Attempted to send metric when MetricManager stopped!";
342 std::unique_lock<std::mutex> lk(metric_cache_mutex_);
344 if (!metric_cache_.count(name) || metric_cache_[name] ==
nullptr)
346 metric_cache_[name] =
347 std::make_unique<MetricData>(name, value, unit, level, mode, metricPrefix, useNameOverride);
351 auto size = metric_cache_[name]->DataPointCount;
352 if (size < metric_cache_max_size_)
354 if (size >= metric_cache_notify_size_)
356 TLOG(9) <<
"Metric cache is at size " << size <<
" of " << metric_cache_max_size_ <<
" for metric " << name
359 metric_cache_[name]->AddPoint(value);
363 TLOG(10) <<
"Rejecting metric because queue full";
364 missed_metric_calls_++;
368 metric_cv_.notify_all();
373 int level,
MetricMode mode, std::string
const& metricPrefix,
374 bool useNameOverride)
378 TLOG(TLVL_WARNING) <<
"Attempted to send metric when MetricManager has not yet been initialized!";
382 TLOG(TLVL_INFO) <<
"Attempted to send metric when MetricManager stopped!";
387 std::unique_lock<std::mutex> lk(metric_cache_mutex_);
389 if (!metric_cache_.count(name) || metric_cache_[name] ==
nullptr)
391 metric_cache_[name] =
392 std::make_unique<MetricData>(name, value, unit, level, mode, metricPrefix, useNameOverride);
396 auto size = metric_cache_[name]->DataPointCount;
397 if (size < metric_cache_max_size_)
399 if (size >= metric_cache_notify_size_)
401 TLOG(9) <<
"Metric cache is at size " << size <<
" of " << metric_cache_max_size_ <<
" for metric " << name
404 metric_cache_[name]->AddPoint(value);
408 TLOG(10) <<
"Rejecting metric because queue full";
409 missed_metric_calls_++;
413 metric_cv_.notify_all();
417 void artdaq::MetricManager::startMetricLoop_()
419 if (metric_sending_thread_.joinable()) metric_sending_thread_.join();
420 boost::thread::attributes attrs;
421 attrs.set_stack_size(4096 * 2000);
422 TLOG(TLVL_INFO) <<
"Starting Metric Sending Thread";
425 metric_sending_thread_ = boost::thread(attrs, boost::bind(&MetricManager::sendMetricLoop_,
this));
427 catch (
const boost::exception& e)
429 TLOG(TLVL_ERROR) <<
"Caught boost::exception starting Metric Sending thread: " << boost::diagnostic_information(e)
430 <<
", errno=" << errno;
431 std::cerr <<
"Caught boost::exception starting Metric Sending thread: " << boost::diagnostic_information(e)
432 <<
", errno=" << errno << std::endl;
435 TLOG(TLVL_INFO) <<
"Metric Sending thread started";
440 std::unique_lock<std::mutex> lk(metric_cache_mutex_);
441 return metric_cache_.size() == 0;
446 std::unique_lock<std::mutex> lk(metric_cache_mutex_);
450 for (
auto& q : metric_cache_)
452 size += q.second->DataPointCount;
457 if (metric_cache_.count(name)) size = metric_cache_[name]->DataPointCount;
463 void artdaq::MetricManager::sendMetricLoop_()
465 TLOG(TLVL_INFO) <<
"sendMetricLoop_ START";
466 auto last_send_time = std::chrono::steady_clock::time_point();
469 while (metricQueueEmpty() && running_)
471 std::unique_lock<std::mutex> lk(metric_mutex_);
472 metric_cv_.wait_for(lk, std::chrono::milliseconds(100));
473 auto now = std::chrono::steady_clock::now();
474 if (std::chrono::duration_cast<std::chrono::milliseconds>(now - last_send_time).count() >
475 metric_send_interval_ms_)
477 for (
auto& metric : metric_plugins_)
479 if (metric) metric->sendMetrics();
481 last_send_time = now;
485 auto processing_start = std::chrono::steady_clock::now();
486 auto temp_list = std::list<std::unique_ptr<MetricData>>();
488 std::unique_lock<std::mutex> lk(metric_cache_mutex_);
490 for (
auto& q : metric_cache_)
492 temp_list.emplace_back(std::move(q.second));
494 metric_cache_.clear();
496 auto calls = metric_calls_.exchange(0);
497 temp_list.emplace_back(
500 auto missed = missed_metric_calls_.exchange(0);
501 temp_list.emplace_back(
504 TLOG(TLVL_TRACE) <<
"There are " << temp_list.size() <<
" Metrics to process (" << calls <<
" calls, " << missed
507 if (system_metric_collector_ !=
nullptr)
509 TLOG(TLVL_TRACE) <<
"Collecting System metrics (CPU, RAM, Network)";
510 auto systemMetrics = system_metric_collector_->SendMetrics();
511 for (
auto& m : systemMetrics) { temp_list.emplace_back(std::move(m)); }
515 while (temp_list.size() > 0)
517 auto data_ = std::move(temp_list.front());
518 temp_list.pop_front();
520 if (!data_->UseNameOverride)
522 if (data_->MetricPrefix.size() > 0)
524 data_->Name = prefix_ +
"." + data_->MetricPrefix +
"." + data_->Name;
528 data_->Name = prefix_ +
"." + data_->Name;
532 for (
auto& metric : metric_plugins_)
534 if (!metric)
continue;
535 if (metric->IsLevelEnabled(data_->Level))
539 metric->addMetricData(data_);
540 last_send_time = std::chrono::steady_clock::now();
544 TLOG(TLVL_ERROR) <<
"Error in MetricManager::sendMetric: error sending value to metric plugin with name "
545 << metric->getLibName();
551 for (
auto& metric : metric_plugins_)
553 if (!metric)
continue;
554 metric->sendMetrics(
false, processing_start);
561 auto temp_list = std::list<std::unique_ptr<MetricData>>();
563 std::unique_lock<std::mutex> lk(metric_cache_mutex_);
565 for (
auto& q : metric_cache_)
567 temp_list.emplace_back(std::move(q.second));
569 metric_cache_.clear();
571 auto calls = metric_calls_.exchange(0);
572 temp_list.emplace_back(
575 auto missed = missed_metric_calls_.exchange(0);
576 temp_list.emplace_back(
579 TLOG(TLVL_TRACE) <<
"There are " << temp_list.size() <<
" Metrics to process (" << calls <<
" calls, " << missed
583 while (temp_list.size() > 0)
585 auto data_ = std::move(temp_list.front());
586 temp_list.pop_front();
588 if (!data_->UseNameOverride)
590 if (data_->MetricPrefix.size() > 0)
592 data_->Name = prefix_ +
"." + data_->MetricPrefix +
"." + data_->Name;
596 data_->Name = prefix_ +
"." + data_->Name;
600 for (
auto& metric : metric_plugins_)
602 if (!metric)
continue;
603 if (metric->IsLevelEnabled(data_->Level))
607 metric->addMetricData(data_);
608 last_send_time = std::chrono::steady_clock::now();
612 TLOG(TLVL_ERROR) <<
"Error in MetricManager::sendMetric: error sending value to metric plugin with name "
613 << metric->getLibName();
619 for (
auto& metric : metric_plugins_)
621 if (!metric)
continue;
624 metric->stopMetrics();
625 TLOG(TLVL_DEBUG) <<
"Metric Plugin " << metric->getLibName() <<
" stopped.";
629 TLOG(TLVL_ERROR) <<
"Exception caught in MetricManager::do_stop(), error stopping plugin with name "
630 << metric->getLibName();
633 TLOG(TLVL_DEBUG) <<
"MetricManager has been stopped.";
void shutdown()
Call the destructors for all configured MetricPlugin instances.
void initialize(fhicl::ParameterSet const &pset, std::string const &prefix="")
Initialize the MetricPlugin instances.
void sendMetric(std::string const &name, std::string const &value, std::string const &unit, int level, MetricMode mode, std::string const &metricPrefix="", bool useNameOverride=false)
Send a metric with the given parameters to any MetricPlugins with a threshold level >= to level...
size_t metricQueueSize(std::string const &name="")
Return the size of the named metric queue
void reinitialize(fhicl::ParameterSet const &pset, std::string const &prefix="")
Reinitialize all MetricPlugin Instances.
Report the sum of all values. Use for counters to report accurate results.
MetricManager()
Construct an instance of the MetricManager class.
void do_start()
Perform startup actions for each configured MetricPlugin.
void do_stop()
Stop sending metrics to the MetricPlugin instances.
virtual ~MetricManager() noexcept
MetricManager destructor.
std::unique_ptr< MetricPlugin > makeMetricPlugin(std::string const &generator_plugin_spec, fhicl::ParameterSet const &ps, std::string const &app_name)
Load a given MetricPlugin and return a pointer to it.
Report only the last value recorded. Useful for event counters, run numbers, etc. ...
MetricMode
The Mode of the metric indicates how multiple metric values should be combined within a reporting int...
void do_resume()
Resume metric sending. Currently a No-Op.
bool metricQueueEmpty()
Returns whether the metric queue is completely empty
void do_pause()
Pause metric sending. Currently a No-Op.