artdaq_utilities  v1_04_10
MetricManager.cc
1 // MetricManager.cc: MetricManager class implementation file
2 // Author: Eric Flumerfelt
3 // Last Modified: 11/14/2014
4 //
5 // MetricManager loads a user-specified set of plugins, sends them their configuration,
6 // and sends them data as it is recieved. It also maintains the state of the plugins
7 // relative to the application state.
8 
9 #define TRACE_NAME "MetricManager"
10 #include "artdaq-utilities/Plugins/MetricManager.hh"
11 #include "artdaq-utilities/Plugins/makeMetricPlugin.hh"
12 #include "fhiclcpp/ParameterSet.h"
13 #include "tracemf.h"
14 
15 #include <boost/exception/all.hpp>
16 #include <chrono>
17 
19  : metric_plugins_(0),
20  metric_send_interval_ms_(15000),
21  initialized_(false),
22  running_(false),
23  active_(false),
24  missed_metric_calls_(0),
25  metric_calls_(0),
26  metric_cache_max_size_(1000),
27  metric_cache_notify_size_(10) {}
28 
29 artdaq::MetricManager::~MetricManager() noexcept { shutdown(); }
30 
31 void artdaq::MetricManager::initialize(fhicl::ParameterSet const& pset, std::string const& prefix) {
32  prefix_ = prefix;
33  if (initialized_) {
34  shutdown();
35  }
36  TLOG(TLVL_INFO) << "Configuring metrics with parameter set: " << pset.to_string() ;
37 
38  std::vector<std::string> names = pset.get_pset_names();
39 
40  metric_plugins_.clear();
41 
42  for (auto name : names) {
43  if (name == "metric_queue_size") {
44  metric_cache_max_size_ = pset.get<size_t>("metric_queue_size");
45  } else if (name == "metric_queue_notify_size") {
46  metric_cache_notify_size_ = pset.get<size_t>("metric_queue_notify_size");
47  } else if (name == "metric_cache_size") {
48  metric_cache_max_size_ = pset.get<size_t>("metric_cache_size");
49  } else if (name == "metric_cache_notify_size") {
50  metric_cache_notify_size_ = pset.get<size_t>("metric_cache_notify_size");
51  } else if (name == "metric_send_maximum_delay_ms") {
52  metric_send_interval_ms_ = pset.get<int>("metric_send_maximum_delay_ms");
53  } else {
54  try {
55  TLOG(TLVL_DEBUG) << "Constructing metric plugin with name " << name ;
56  fhicl::ParameterSet plugin_pset = pset.get<fhicl::ParameterSet>(name);
57  metric_plugins_.push_back(
58  makeMetricPlugin(plugin_pset.get<std::string>("metricPluginType", ""), plugin_pset, prefix_));
59  } catch (const cet::exception& e) {
60  TLOG(TLVL_ERROR) << "Exception caught in MetricManager::initialize, error loading plugin with name " << name
61  << ", cet::exception object caught:" << e.explain_self();
62  } catch (const boost::exception& e) {
63  TLOG(TLVL_ERROR) << "Exception caught in MetricManager::initialize, error loading plugin with name " << name
64  << ", boost::exception object caught: " << boost::diagnostic_information(e);
65  } catch (const std::exception& e) {
66  TLOG(TLVL_ERROR) << "Exception caught in MetricManager::initialize, error loading plugin with name " << name
67  << ", std::exception caught: " << e.what();
68  } catch (...) {
69  TLOG(TLVL_ERROR) << "Unknown Exception caught in MetricManager::initialize, error loading plugin with name "
70  << name;
71  }
72  }
73  }
74 
75  initialized_ = true;
76 }
77 
79  auto lk = std::unique_lock<std::mutex>(metric_mutex_);
80  if (!running_) {
81  TLOG(TLVL_DEBUG) << "Starting MetricManager" ;
82  for (auto& metric : metric_plugins_) {
83  if (!metric) continue;
84  try {
85  metric->startMetrics();
86  TLOG(TLVL_INFO) << "Metric Plugin " << metric->getLibName() << " started." ;
87  active_ = true;
88  } catch (...) {
89  TLOG(TLVL_ERROR) << "Exception caught in MetricManager::do_start(), error starting plugin with name "
90  << metric->getLibName();
91  }
92  }
93  running_ = true;
94  startMetricLoop_();
95  }
96 }
97 
99  auto lk = std::unique_lock<std::mutex>(metric_mutex_);
100  TLOG(TLVL_DEBUG) << "Stopping Metrics" ;
101  running_ = false;
102  metric_cv_.notify_all();
103  TLOG(TLVL_DEBUG) << "Joining Metric-Sending thread" ;
104  lk.unlock();
105  if (metric_sending_thread_.joinable()) metric_sending_thread_.join();
106  TLOG(TLVL_DEBUG) << "do_stop Complete" ;
107 }
108 
109 void artdaq::MetricManager::do_pause() { /*do_stop();*/
110 }
111 void artdaq::MetricManager::do_resume() { /*do_start();*/
112 }
113 
114 void artdaq::MetricManager::reinitialize(fhicl::ParameterSet const& pset, std::string const& prefix) {
115  shutdown();
116  initialize(pset, prefix);
117 }
118 
120  TLOG(TLVL_DEBUG) << "MetricManager is shutting down..." ;
121  do_stop();
122 
123  auto lk = std::unique_lock<std::mutex>(metric_mutex_);
124  if (initialized_) {
125  for (auto& i : metric_plugins_) {
126  try {
127  std::string name = i->getLibName();
128  i.reset(nullptr);
129  TLOG(TLVL_DEBUG) << "Metric Plugin " << name << " shutdown." ;
130  } catch (...) {
131  TLOG(TLVL_ERROR) << "Exception caught in MetricManager::shutdown(), error shutting down metric with name "
132  << i->getLibName();
133  }
134  }
135  metric_plugins_.clear();
136  initialized_ = false;
137  }
138 }
139 
140 void artdaq::MetricManager::sendMetric(std::string const& name, std::string const& value, std::string const& unit,
141  int level, MetricMode mode, std::string const& metricPrefix,
142  bool useNameOverride) {
143  if (!initialized_) {
144  TLOG(TLVL_WARNING) << "Attempted to send metric when MetricManager has not yet been initialized!";
145  } else if (!running_) {
146  TLOG(TLVL_INFO) << "Attempted to send metric when MetricManager stopped!";
147  } else if (active_) {
148  {
149  std::unique_lock<std::mutex> lk(metric_cache_mutex_);
150  metric_calls_++;
151  if (!metric_cache_.count(name) || metric_cache_[name] == nullptr) {
152  metric_cache_[name] =
153  std::make_unique<MetricData>(name, value, unit, level, mode, metricPrefix, useNameOverride);
154  } else {
155  auto size = metric_cache_[name]->DataPointCount;
156  if (size < metric_cache_max_size_) {
157  if (size >= metric_cache_notify_size_) {
158  TLOG(9) << "Metric cache is at size " << size << " of " << metric_cache_max_size_ << " for metric " << name
159  << ".";
160  }
161  if (mode == MetricMode::LastPoint) {
162  metric_cache_[name]->StringValue = value;
163  metric_cache_[name]->DataPointCount = 1;
164  } else {
165  metric_cache_[name]->StringValue += " " + value;
166  metric_cache_[name]->DataPointCount++;
167  }
168  } else {
169  TLOG(10) << "Rejecting metric because queue full" ;
170  missed_metric_calls_++;
171  }
172  }
173 }
174  metric_cv_.notify_all();
175  }
176  }
177 
178 void artdaq::MetricManager::sendMetric(std::string const& name, int const& value, std::string const& unit, int level,
179  MetricMode mode, std::string const& metricPrefix, bool useNameOverride) {
180  if (!initialized_) {
181  TLOG(TLVL_WARNING) << "Attempted to send metric when MetricManager has not yet been initialized!";
182  } else if (!running_) {
183  TLOG(TLVL_INFO) << "Attempted to send metric when MetricManager stopped!";
184  } else if (active_) {
185  {
186  std::unique_lock<std::mutex> lk(metric_cache_mutex_);
187  metric_calls_++;
188  if (!metric_cache_.count(name) || metric_cache_[name] == nullptr) {
189  metric_cache_[name] =
190  std::make_unique<MetricData>(name, value, unit, level, mode, metricPrefix, useNameOverride);
191  } else {
192  auto size = metric_cache_[name]->DataPointCount;
193  if (size < metric_cache_max_size_) {
194  if (size >= metric_cache_notify_size_) {
195  TLOG(9) << "Metric cache is at size " << size << " of " << metric_cache_max_size_ << " for metric " << name
196  << ".";
197  }
198  if (mode == MetricMode::LastPoint) {
199  metric_cache_[name]->IntValue = value;
200  metric_cache_[name]->DataPointCount = 1;
201  } else {
202  metric_cache_[name]->IntValue += value;
203  metric_cache_[name]->DataPointCount++;
204  }
205  } else {
206  TLOG(10) << "Rejecting metric because queue full" ;
207  missed_metric_calls_++;
208  }
209  }
210 }
211  metric_cv_.notify_all();
212  }
213  }
214 
215 void artdaq::MetricManager::sendMetric(std::string const& name, double const& value, std::string const& unit, int level,
216  MetricMode mode, std::string const& metricPrefix, bool useNameOverride) {
217  if (!initialized_) {
218  TLOG(TLVL_WARNING) << "Attempted to send metric when MetricManager has not yet been initialized!";
219  } else if (!running_) {
220  TLOG(TLVL_INFO) << "Attempted to send metric when MetricManager stopped!";
221  } else if (active_) {
222  {
223  std::unique_lock<std::mutex> lk(metric_cache_mutex_);
224  metric_calls_++;
225  if (!metric_cache_.count(name) || metric_cache_[name] == nullptr) {
226  metric_cache_[name] =
227  std::make_unique<MetricData>(name, value, unit, level, mode, metricPrefix, useNameOverride);
228  } else {
229  auto size = metric_cache_[name]->DataPointCount;
230  if (size < metric_cache_max_size_) {
231  if (size >= metric_cache_notify_size_) {
232  TLOG(9) << "Metric cache is at size " << size << " of " << metric_cache_max_size_ << " for metric " << name
233  << ".";
234  }
235  if (mode == MetricMode::LastPoint) {
236  metric_cache_[name]->DoubleValue = value;
237  metric_cache_[name]->DataPointCount = 1;
238  } else {
239  metric_cache_[name]->DoubleValue += value;
240  metric_cache_[name]->DataPointCount++;
241  }
242  } else {
243  TLOG(10) << "Rejecting metric because queue full" ;
244  missed_metric_calls_++;
245  }
246  }
247 }
248  metric_cv_.notify_all();
249  }
250  }
251 
252 void artdaq::MetricManager::sendMetric(std::string const& name, float const& value, std::string const& unit, int level,
253  MetricMode mode, std::string const& metricPrefix, bool useNameOverride) {
254  if (!initialized_) {
255  TLOG(TLVL_WARNING) << "Attempted to send metric when MetricManager has not yet been initialized!";
256  } else if (!running_) {
257  TLOG(TLVL_INFO) << "Attempted to send metric when MetricManager stopped!";
258  } else if (active_) {
259  {
260  std::unique_lock<std::mutex> lk(metric_cache_mutex_);
261  metric_calls_++;
262  if (!metric_cache_.count(name) || metric_cache_[name] == nullptr) {
263  metric_cache_[name] =
264  std::make_unique<MetricData>(name, value, unit, level, mode, metricPrefix, useNameOverride);
265  } else {
266  auto size = metric_cache_[name]->DataPointCount;
267  if (size < metric_cache_max_size_) {
268  if (size >= metric_cache_notify_size_) {
269  TLOG(9) << "Metric cache is at size " << size << " of " << metric_cache_max_size_ << " for metric " << name
270  << ".";
271  }
272  if (mode == MetricMode::LastPoint) {
273  metric_cache_[name]->FloatValue = value;
274  metric_cache_[name]->DataPointCount = 1;
275  } else {
276  metric_cache_[name]->FloatValue += value;
277  metric_cache_[name]->DataPointCount++;
278  }
279  } else {
280  TLOG(10) << "Rejecting metric because queue full" ;
281  missed_metric_calls_++;
282  }
283  }
284 }
285  metric_cv_.notify_all();
286  }
287  }
288 
289 void artdaq::MetricManager::sendMetric(std::string const& name, long unsigned int const& value, std::string const& unit,
290  int level, MetricMode mode, std::string const& metricPrefix,
291  bool useNameOverride) {
292  if (!initialized_) {
293  TLOG(TLVL_WARNING) << "Attempted to send metric when MetricManager has not yet been initialized!";
294  } else if (!running_) {
295  TLOG(TLVL_INFO) << "Attempted to send metric when MetricManager stopped!";
296  } else if (active_) {
297  {
298  std::unique_lock<std::mutex> lk(metric_cache_mutex_);
299  metric_calls_++;
300  if (!metric_cache_.count(name) || metric_cache_[name] == nullptr) {
301  metric_cache_[name] =
302  std::make_unique<MetricData>(name, value, unit, level, mode, metricPrefix, useNameOverride);
303  } else {
304  auto size = metric_cache_[name]->DataPointCount;
305  if (size < metric_cache_max_size_) {
306  if (size >= metric_cache_notify_size_) {
307  TLOG(9) << "Metric cache is at size " << size << " of " << metric_cache_max_size_ << " for metric " << name
308  << ".";
309  }
310  if (mode == MetricMode::LastPoint) {
311  metric_cache_[name]->UnsignedValue = value;
312  metric_cache_[name]->DataPointCount = 1;
313  } else {
314  metric_cache_[name]->UnsignedValue += value;
315  metric_cache_[name]->DataPointCount++;
316  }
317  } else {
318  TLOG(10) << "Rejecting metric because queue full" ;
319  missed_metric_calls_++;
320  }
321  }
322  }
323  metric_cv_.notify_all();
324  }
325 }
326 
327 void artdaq::MetricManager::startMetricLoop_() {
328  if (metric_sending_thread_.joinable()) metric_sending_thread_.join();
329  boost::thread::attributes attrs;
330  attrs.set_stack_size(4096 * 2000); // 8000 KB
331  TLOG(TLVL_INFO) << "Starting Metric Sending Thread" ;
332  try {
333  metric_sending_thread_ = boost::thread(attrs, boost::bind(&MetricManager::sendMetricLoop_, this));
334  } catch (const boost::exception& e) {
335  TLOG(TLVL_ERROR) << "Caught boost::exception starting Metric Sending thread: " << boost::diagnostic_information(e)
336  << ", errno=" << errno;
337  std::cerr << "Caught boost::exception starting Metric Sending thread: " << boost::diagnostic_information(e)
338  << ", errno=" << errno << std::endl;
339  exit(5);
340  }
341  TLOG(TLVL_INFO) << "Metric Sending thread started";
342 }
343 
345  std::unique_lock<std::mutex> lk(metric_cache_mutex_);
346  return metric_cache_.size() == 0;
347 }
348 
349 size_t artdaq::MetricManager::metricQueueSize(std::string const& name) {
350  std::unique_lock<std::mutex> lk(metric_cache_mutex_);
351  size_t size = 0;
352  if (name == "") {
353  for (auto& q : metric_cache_) {
354  size += q.second->DataPointCount;
355  }
356  } else {
357  if (metric_cache_.count(name)) size = metric_cache_[name]->DataPointCount;
358  }
359 
360  return size;
361 }
362 
363 void artdaq::MetricManager::sendMetricLoop_() {
364  TLOG(TLVL_INFO) << "sendMetricLoop_ START";
365  auto last_send_time = std::chrono::steady_clock::time_point();
366  while (running_) {
367  while (metricQueueEmpty() && running_) {
368  std::unique_lock<std::mutex> lk(metric_mutex_);
369  metric_cv_.wait_for(lk, std::chrono::milliseconds(100));
370  auto now = std::chrono::steady_clock::now();
371  if (std::chrono::duration_cast<std::chrono::milliseconds>(now - last_send_time).count() >
372  metric_send_interval_ms_) {
373  for (auto& metric : metric_plugins_) {
374  if (metric) metric->sendMetrics();
375  }
376  last_send_time = now;
377  }
378  }
379 
380  auto processing_start = std::chrono::steady_clock::now();
381  auto temp_list = std::list<std::unique_ptr<MetricData>>();
382  {
383  std::unique_lock<std::mutex> lk(metric_cache_mutex_);
384 
385  for (auto& q : metric_cache_) {
386  temp_list.emplace_back(std::move(q.second));
387  }
388  metric_cache_.clear();
389 
390  auto calls = metric_calls_.exchange(0);
391  temp_list.emplace_back(
392  new MetricData("Metric Calls", calls, "metrics", 4, MetricMode::AccumulateAndRate, "", false));
393 
394  auto missed = missed_metric_calls_.exchange(0);
395  temp_list.emplace_back(
396  new MetricData("Missed Metric Calls", missed, "metrics", 4, MetricMode::AccumulateAndRate, "", false));
397 
398  TLOG(TLVL_TRACE) << "There are " << temp_list.size() << " Metrics to process (" << calls << " calls, " << missed
399  << " missed)";
400  }
401 
402  while (temp_list.size() > 0) {
403  auto data_ = std::move(temp_list.front());
404  temp_list.pop_front();
405  if (data_->Type == MetricType::InvalidMetric) continue;
406  if (!data_->UseNameOverride) {
407  if (data_->MetricPrefix.size() > 0) {
408  data_->Name = prefix_ + "." + data_->MetricPrefix + "." + data_->Name;
409  } else {
410  data_->Name = prefix_ + "." + data_->Name;
411  }
412  }
413 
414  for (auto& metric : metric_plugins_) {
415  if (!metric) continue;
416  if (metric->getRunLevel() >= data_->Level) {
417  try {
418  metric->addMetricData(data_);
419  last_send_time = std::chrono::steady_clock::now();
420  } catch (...) {
421  TLOG(TLVL_ERROR) << "Error in MetricManager::sendMetric: error sending value to metric plugin with name "
422  << metric->getLibName() ;
423  }
424  }
425  }
426  }
427 
428  for (auto& metric : metric_plugins_) {
429  if (!metric) continue;
430  metric->sendMetrics(false, processing_start);
431  }
432 
433  // Limit rate of metrics going to plugins
434  usleep(10000);
435  }
436 
437  auto temp_list = std::list<std::unique_ptr<MetricData>>();
438  {
439  std::unique_lock<std::mutex> lk(metric_cache_mutex_);
440 
441  for (auto& q : metric_cache_) {
442  temp_list.emplace_back(std::move(q.second));
443  }
444  metric_cache_.clear();
445 
446  auto calls = metric_calls_.exchange(0);
447  temp_list.emplace_back(
448  new MetricData("Metric Calls", calls, "metrics", 4, MetricMode::AccumulateAndRate, "", false));
449 
450  auto missed = missed_metric_calls_.exchange(0);
451  temp_list.emplace_back(
452  new MetricData("Missed Metric Calls", missed, "metrics", 4, MetricMode::AccumulateAndRate, "", false));
453 
454  TLOG(TLVL_TRACE) << "There are " << temp_list.size() << " Metrics to process (" << calls << " calls, " << missed
455  << " missed)";
456  }
457 
458  while (temp_list.size() > 0) {
459  auto data_ = std::move(temp_list.front());
460  temp_list.pop_front();
461  if (data_->Type == MetricType::InvalidMetric) continue;
462  if (!data_->UseNameOverride) {
463  if (data_->MetricPrefix.size() > 0) {
464  data_->Name = prefix_ + "." + data_->MetricPrefix + "." + data_->Name;
465  } else {
466  data_->Name = prefix_ + "." + data_->Name;
467  }
468  }
469 
470  for (auto& metric : metric_plugins_) {
471  if (!metric) continue;
472  if (metric->getRunLevel() >= data_->Level) {
473  try {
474  metric->addMetricData(data_);
475  last_send_time = std::chrono::steady_clock::now();
476  } catch (...) {
477  TLOG(TLVL_ERROR) << "Error in MetricManager::sendMetric: error sending value to metric plugin with name "
478  << metric->getLibName() ;
479  }
480  }
481  }
482  }
483 
484  for (auto& metric : metric_plugins_) {
485  if (!metric) continue;
486  try {
487  metric->stopMetrics();
488  TLOG(TLVL_DEBUG) << "Metric Plugin " << metric->getLibName() << " stopped." ;
489  } catch (...) {
490  TLOG(TLVL_ERROR) << "Exception caught in MetricManager::do_stop(), error stopping plugin with name "
491  << metric->getLibName();
492  }
493  }
494  TLOG(TLVL_DEBUG) << "MetricManager has been stopped." ;
495 }
void shutdown()
Call the destructors for all configured MetricPlugin instances.
void initialize(fhicl::ParameterSet const &pset, std::string const &prefix="")
Initialize the MetricPlugin instances.
void sendMetric(std::string const &name, std::string const &value, std::string const &unit, int level, MetricMode mode, std::string const &metricPrefix="", bool useNameOverride=false)
Send a metric with the given parameters to any MetricPlugins with a threshold level &gt;= to level...
size_t metricQueueSize(std::string const &name="")
Return the size of the named metric queue
void reinitialize(fhicl::ParameterSet const &pset, std::string const &prefix="")
Reinitialize all MetricPlugin Instances.
MetricMode
The Mode of the metric indicates how multiple metric values should be combined within a reporting int...
Definition: MetricData.hh:30
MetricManager()
Construct an instance of the MetricManager class.
void do_start()
Perform startup actions for each configured MetricPlugin.
void do_stop()
Stop sending metrics to the MetricPlugin instances.
virtual ~MetricManager() noexcept
MetricManager destructor.
std::unique_ptr< MetricPlugin > makeMetricPlugin(std::string const &generator_plugin_spec, fhicl::ParameterSet const &ps, std::string const &app_name)
Load a given MetricPlugin and return a pointer to it.
Sends both the Accumulate mode and Rate mode metric. (Rate mode metric will append &quot;/s&quot; to metric uni...
Report only the last value recorded. Useful for event counters, run numbers, etc. ...
Default, invalid value.
void do_resume()
Resume metric sending. Currently a No-Op.
bool metricQueueEmpty()
Returns whether the metric queue is completely empty
void do_pause()
Pause metric sending. Currently a No-Op.