9 #define TRACE_NAME "MetricManager"
10 #include "artdaq-utilities/Plugins/MetricManager.hh"
11 #include "artdaq-utilities/Plugins/makeMetricPlugin.hh"
12 #include "fhiclcpp/ParameterSet.h"
15 #include <boost/exception/all.hpp>
20 metric_send_interval_ms_(15000),
24 missed_metric_calls_(0),
26 metric_cache_max_size_(1000),
27 metric_cache_notify_size_(10) {}
36 TLOG(TLVL_INFO) <<
"Configuring metrics with parameter set: " << pset.to_string() ;
38 std::vector<std::string> names = pset.get_pset_names();
40 metric_plugins_.clear();
42 for (
auto name : names) {
43 if (name ==
"metric_queue_size") {
44 metric_cache_max_size_ = pset.get<
size_t>(
"metric_queue_size");
45 }
else if (name ==
"metric_queue_notify_size") {
46 metric_cache_notify_size_ = pset.get<
size_t>(
"metric_queue_notify_size");
47 }
else if (name ==
"metric_cache_size") {
48 metric_cache_max_size_ = pset.get<
size_t>(
"metric_cache_size");
49 }
else if (name ==
"metric_cache_notify_size") {
50 metric_cache_notify_size_ = pset.get<
size_t>(
"metric_cache_notify_size");
51 }
else if (name ==
"metric_send_maximum_delay_ms") {
52 metric_send_interval_ms_ = pset.get<
int>(
"metric_send_maximum_delay_ms");
55 TLOG(TLVL_DEBUG) <<
"Constructing metric plugin with name " << name ;
56 fhicl::ParameterSet plugin_pset = pset.get<fhicl::ParameterSet>(name);
57 metric_plugins_.push_back(
58 makeMetricPlugin(plugin_pset.get<std::string>(
"metricPluginType",
""), plugin_pset, prefix_));
59 }
catch (
const cet::exception& e) {
60 TLOG(TLVL_ERROR) <<
"Exception caught in MetricManager::initialize, error loading plugin with name " << name
61 <<
", cet::exception object caught:" << e.explain_self();
62 }
catch (
const boost::exception& e) {
63 TLOG(TLVL_ERROR) <<
"Exception caught in MetricManager::initialize, error loading plugin with name " << name
64 <<
", boost::exception object caught: " << boost::diagnostic_information(e);
65 }
catch (
const std::exception& e) {
66 TLOG(TLVL_ERROR) <<
"Exception caught in MetricManager::initialize, error loading plugin with name " << name
67 <<
", std::exception caught: " << e.what();
69 TLOG(TLVL_ERROR) <<
"Unknown Exception caught in MetricManager::initialize, error loading plugin with name "
79 auto lk = std::unique_lock<std::mutex>(metric_mutex_);
81 TLOG(TLVL_DEBUG) <<
"Starting MetricManager" ;
82 for (
auto& metric : metric_plugins_) {
83 if (!metric)
continue;
85 metric->startMetrics();
86 TLOG(TLVL_INFO) <<
"Metric Plugin " << metric->getLibName() <<
" started." ;
89 TLOG(TLVL_ERROR) <<
"Exception caught in MetricManager::do_start(), error starting plugin with name "
90 << metric->getLibName();
99 auto lk = std::unique_lock<std::mutex>(metric_mutex_);
100 TLOG(TLVL_DEBUG) <<
"Stopping Metrics" ;
102 metric_cv_.notify_all();
103 TLOG(TLVL_DEBUG) <<
"Joining Metric-Sending thread" ;
105 if (metric_sending_thread_.joinable()) metric_sending_thread_.join();
106 TLOG(TLVL_DEBUG) <<
"do_stop Complete" ;
116 initialize(pset, prefix);
120 TLOG(TLVL_DEBUG) <<
"MetricManager is shutting down..." ;
123 auto lk = std::unique_lock<std::mutex>(metric_mutex_);
125 for (
auto& i : metric_plugins_) {
127 std::string name = i->getLibName();
129 TLOG(TLVL_DEBUG) <<
"Metric Plugin " << name <<
" shutdown." ;
131 TLOG(TLVL_ERROR) <<
"Exception caught in MetricManager::shutdown(), error shutting down metric with name "
135 metric_plugins_.clear();
136 initialized_ =
false;
141 int level,
MetricMode mode, std::string
const& metricPrefix,
142 bool useNameOverride) {
144 TLOG(TLVL_WARNING) <<
"Attempted to send metric when MetricManager has not yet been initialized!";
145 }
else if (!running_) {
146 TLOG(TLVL_INFO) <<
"Attempted to send metric when MetricManager stopped!";
147 }
else if (active_) {
149 std::unique_lock<std::mutex> lk(metric_cache_mutex_);
151 if (!metric_cache_.count(name) || metric_cache_[name] ==
nullptr) {
152 metric_cache_[name] =
153 std::make_unique<MetricData>(name, value, unit, level, mode, metricPrefix, useNameOverride);
155 auto size = metric_cache_[name]->DataPointCount;
156 if (size < metric_cache_max_size_) {
157 if (size >= metric_cache_notify_size_) {
158 TLOG(9) <<
"Metric cache is at size " << size <<
" of " << metric_cache_max_size_ <<
" for metric " << name
162 metric_cache_[name]->StringValue = value;
163 metric_cache_[name]->DataPointCount = 1;
165 metric_cache_[name]->StringValue +=
" " + value;
166 metric_cache_[name]->DataPointCount++;
169 TLOG(10) <<
"Rejecting metric because queue full" ;
170 missed_metric_calls_++;
174 metric_cv_.notify_all();
179 MetricMode mode, std::string
const& metricPrefix,
bool useNameOverride) {
181 TLOG(TLVL_WARNING) <<
"Attempted to send metric when MetricManager has not yet been initialized!";
182 }
else if (!running_) {
183 TLOG(TLVL_INFO) <<
"Attempted to send metric when MetricManager stopped!";
184 }
else if (active_) {
186 std::unique_lock<std::mutex> lk(metric_cache_mutex_);
188 if (!metric_cache_.count(name) || metric_cache_[name] ==
nullptr) {
189 metric_cache_[name] =
190 std::make_unique<MetricData>(name, value, unit, level, mode, metricPrefix, useNameOverride);
192 auto size = metric_cache_[name]->DataPointCount;
193 if (size < metric_cache_max_size_) {
194 if (size >= metric_cache_notify_size_) {
195 TLOG(9) <<
"Metric cache is at size " << size <<
" of " << metric_cache_max_size_ <<
" for metric " << name
199 metric_cache_[name]->IntValue = value;
200 metric_cache_[name]->DataPointCount = 1;
202 metric_cache_[name]->IntValue += value;
203 metric_cache_[name]->DataPointCount++;
206 TLOG(10) <<
"Rejecting metric because queue full" ;
207 missed_metric_calls_++;
211 metric_cv_.notify_all();
216 MetricMode mode, std::string
const& metricPrefix,
bool useNameOverride) {
218 TLOG(TLVL_WARNING) <<
"Attempted to send metric when MetricManager has not yet been initialized!";
219 }
else if (!running_) {
220 TLOG(TLVL_INFO) <<
"Attempted to send metric when MetricManager stopped!";
221 }
else if (active_) {
223 std::unique_lock<std::mutex> lk(metric_cache_mutex_);
225 if (!metric_cache_.count(name) || metric_cache_[name] ==
nullptr) {
226 metric_cache_[name] =
227 std::make_unique<MetricData>(name, value, unit, level, mode, metricPrefix, useNameOverride);
229 auto size = metric_cache_[name]->DataPointCount;
230 if (size < metric_cache_max_size_) {
231 if (size >= metric_cache_notify_size_) {
232 TLOG(9) <<
"Metric cache is at size " << size <<
" of " << metric_cache_max_size_ <<
" for metric " << name
236 metric_cache_[name]->DoubleValue = value;
237 metric_cache_[name]->DataPointCount = 1;
239 metric_cache_[name]->DoubleValue += value;
240 metric_cache_[name]->DataPointCount++;
243 TLOG(10) <<
"Rejecting metric because queue full" ;
244 missed_metric_calls_++;
248 metric_cv_.notify_all();
253 MetricMode mode, std::string
const& metricPrefix,
bool useNameOverride) {
255 TLOG(TLVL_WARNING) <<
"Attempted to send metric when MetricManager has not yet been initialized!";
256 }
else if (!running_) {
257 TLOG(TLVL_INFO) <<
"Attempted to send metric when MetricManager stopped!";
258 }
else if (active_) {
260 std::unique_lock<std::mutex> lk(metric_cache_mutex_);
262 if (!metric_cache_.count(name) || metric_cache_[name] ==
nullptr) {
263 metric_cache_[name] =
264 std::make_unique<MetricData>(name, value, unit, level, mode, metricPrefix, useNameOverride);
266 auto size = metric_cache_[name]->DataPointCount;
267 if (size < metric_cache_max_size_) {
268 if (size >= metric_cache_notify_size_) {
269 TLOG(9) <<
"Metric cache is at size " << size <<
" of " << metric_cache_max_size_ <<
" for metric " << name
273 metric_cache_[name]->FloatValue = value;
274 metric_cache_[name]->DataPointCount = 1;
276 metric_cache_[name]->FloatValue += value;
277 metric_cache_[name]->DataPointCount++;
280 TLOG(10) <<
"Rejecting metric because queue full" ;
281 missed_metric_calls_++;
285 metric_cv_.notify_all();
290 int level,
MetricMode mode, std::string
const& metricPrefix,
291 bool useNameOverride) {
293 TLOG(TLVL_WARNING) <<
"Attempted to send metric when MetricManager has not yet been initialized!";
294 }
else if (!running_) {
295 TLOG(TLVL_INFO) <<
"Attempted to send metric when MetricManager stopped!";
296 }
else if (active_) {
298 std::unique_lock<std::mutex> lk(metric_cache_mutex_);
300 if (!metric_cache_.count(name) || metric_cache_[name] ==
nullptr) {
301 metric_cache_[name] =
302 std::make_unique<MetricData>(name, value, unit, level, mode, metricPrefix, useNameOverride);
304 auto size = metric_cache_[name]->DataPointCount;
305 if (size < metric_cache_max_size_) {
306 if (size >= metric_cache_notify_size_) {
307 TLOG(9) <<
"Metric cache is at size " << size <<
" of " << metric_cache_max_size_ <<
" for metric " << name
311 metric_cache_[name]->UnsignedValue = value;
312 metric_cache_[name]->DataPointCount = 1;
314 metric_cache_[name]->UnsignedValue += value;
315 metric_cache_[name]->DataPointCount++;
318 TLOG(10) <<
"Rejecting metric because queue full" ;
319 missed_metric_calls_++;
323 metric_cv_.notify_all();
327 void artdaq::MetricManager::startMetricLoop_() {
328 if (metric_sending_thread_.joinable()) metric_sending_thread_.join();
329 boost::thread::attributes attrs;
330 attrs.set_stack_size(4096 * 2000);
331 TLOG(TLVL_INFO) <<
"Starting Metric Sending Thread" ;
333 metric_sending_thread_ = boost::thread(attrs, boost::bind(&MetricManager::sendMetricLoop_,
this));
334 }
catch (
const boost::exception& e) {
335 TLOG(TLVL_ERROR) <<
"Caught boost::exception starting Metric Sending thread: " << boost::diagnostic_information(e)
336 <<
", errno=" << errno;
337 std::cerr <<
"Caught boost::exception starting Metric Sending thread: " << boost::diagnostic_information(e)
338 <<
", errno=" << errno << std::endl;
341 TLOG(TLVL_INFO) <<
"Metric Sending thread started";
345 std::unique_lock<std::mutex> lk(metric_cache_mutex_);
346 return metric_cache_.size() == 0;
350 std::unique_lock<std::mutex> lk(metric_cache_mutex_);
353 for (
auto& q : metric_cache_) {
354 size += q.second->DataPointCount;
357 if (metric_cache_.count(name)) size = metric_cache_[name]->DataPointCount;
363 void artdaq::MetricManager::sendMetricLoop_() {
364 TLOG(TLVL_INFO) <<
"sendMetricLoop_ START";
365 auto last_send_time = std::chrono::steady_clock::time_point();
367 while (metricQueueEmpty() && running_) {
368 std::unique_lock<std::mutex> lk(metric_mutex_);
369 metric_cv_.wait_for(lk, std::chrono::milliseconds(100));
370 auto now = std::chrono::steady_clock::now();
371 if (std::chrono::duration_cast<std::chrono::milliseconds>(now - last_send_time).count() >
372 metric_send_interval_ms_) {
373 for (
auto& metric : metric_plugins_) {
374 if (metric) metric->sendMetrics();
376 last_send_time = now;
380 auto processing_start = std::chrono::steady_clock::now();
381 auto temp_list = std::list<std::unique_ptr<MetricData>>();
383 std::unique_lock<std::mutex> lk(metric_cache_mutex_);
385 for (
auto& q : metric_cache_) {
386 temp_list.emplace_back(std::move(q.second));
388 metric_cache_.clear();
390 auto calls = metric_calls_.exchange(0);
391 temp_list.emplace_back(
394 auto missed = missed_metric_calls_.exchange(0);
395 temp_list.emplace_back(
398 TLOG(TLVL_TRACE) <<
"There are " << temp_list.size() <<
" Metrics to process (" << calls <<
" calls, " << missed
402 while (temp_list.size() > 0) {
403 auto data_ = std::move(temp_list.front());
404 temp_list.pop_front();
406 if (!data_->UseNameOverride) {
407 if (data_->MetricPrefix.size() > 0) {
408 data_->Name = prefix_ +
"." + data_->MetricPrefix +
"." + data_->Name;
410 data_->Name = prefix_ +
"." + data_->Name;
414 for (
auto& metric : metric_plugins_) {
415 if (!metric)
continue;
416 if (metric->getRunLevel() >= data_->Level) {
418 metric->addMetricData(data_);
419 last_send_time = std::chrono::steady_clock::now();
421 TLOG(TLVL_ERROR) <<
"Error in MetricManager::sendMetric: error sending value to metric plugin with name "
422 << metric->getLibName() ;
428 for (
auto& metric : metric_plugins_) {
429 if (!metric)
continue;
430 metric->sendMetrics(
false, processing_start);
437 auto temp_list = std::list<std::unique_ptr<MetricData>>();
439 std::unique_lock<std::mutex> lk(metric_cache_mutex_);
441 for (
auto& q : metric_cache_) {
442 temp_list.emplace_back(std::move(q.second));
444 metric_cache_.clear();
446 auto calls = metric_calls_.exchange(0);
447 temp_list.emplace_back(
450 auto missed = missed_metric_calls_.exchange(0);
451 temp_list.emplace_back(
454 TLOG(TLVL_TRACE) <<
"There are " << temp_list.size() <<
" Metrics to process (" << calls <<
" calls, " << missed
458 while (temp_list.size() > 0) {
459 auto data_ = std::move(temp_list.front());
460 temp_list.pop_front();
462 if (!data_->UseNameOverride) {
463 if (data_->MetricPrefix.size() > 0) {
464 data_->Name = prefix_ +
"." + data_->MetricPrefix +
"." + data_->Name;
466 data_->Name = prefix_ +
"." + data_->Name;
470 for (
auto& metric : metric_plugins_) {
471 if (!metric)
continue;
472 if (metric->getRunLevel() >= data_->Level) {
474 metric->addMetricData(data_);
475 last_send_time = std::chrono::steady_clock::now();
477 TLOG(TLVL_ERROR) <<
"Error in MetricManager::sendMetric: error sending value to metric plugin with name "
478 << metric->getLibName() ;
484 for (
auto& metric : metric_plugins_) {
485 if (!metric)
continue;
487 metric->stopMetrics();
488 TLOG(TLVL_DEBUG) <<
"Metric Plugin " << metric->getLibName() <<
" stopped." ;
490 TLOG(TLVL_ERROR) <<
"Exception caught in MetricManager::do_stop(), error stopping plugin with name "
491 << metric->getLibName();
494 TLOG(TLVL_DEBUG) <<
"MetricManager has been stopped." ;
void shutdown()
Call the destructors for all configured MetricPlugin instances.
void initialize(fhicl::ParameterSet const &pset, std::string const &prefix="")
Initialize the MetricPlugin instances.
void sendMetric(std::string const &name, std::string const &value, std::string const &unit, int level, MetricMode mode, std::string const &metricPrefix="", bool useNameOverride=false)
Send a metric with the given parameters to any MetricPlugins with a threshold level >= to level...
size_t metricQueueSize(std::string const &name="")
Return the size of the named metric queue
void reinitialize(fhicl::ParameterSet const &pset, std::string const &prefix="")
Reinitialize all MetricPlugin Instances.
MetricMode
The Mode of the metric indicates how multiple metric values should be combined within a reporting int...
MetricManager()
Construct an instance of the MetricManager class.
void do_start()
Perform startup actions for each configured MetricPlugin.
void do_stop()
Stop sending metrics to the MetricPlugin instances.
virtual ~MetricManager() noexcept
MetricManager destructor.
std::unique_ptr< MetricPlugin > makeMetricPlugin(std::string const &generator_plugin_spec, fhicl::ParameterSet const &ps, std::string const &app_name)
Load a given MetricPlugin and return a pointer to it.
Sends both the Accumulate mode and Rate mode metric. (Rate mode metric will append "/s" to metric uni...
Report only the last value recorded. Useful for event counters, run numbers, etc. ...
void do_resume()
Resume metric sending. Currently a No-Op.
bool metricQueueEmpty()
Returns whether the metric queue is completely empty
void do_pause()
Pause metric sending. Currently a No-Op.