diff --git a/Cargo.lock b/Cargo.lock index eadee1e21d7..cf092dc5362 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3935,8 +3935,7 @@ dependencies = [ [[package]] name = "prometheus-client" version = "0.24.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cca3d75b4566b9a29fe1ed623587fb058e826eb329a0be4b7c4da1ebb2d7a6ca" +source = "git+https://github.com/patrick-ogrady/client_rust?rev=0dcb4a74b922158350c8af25552e228b79cda477#0dcb4a74b922158350c8af25552e228b79cda477" dependencies = [ "dtoa", "itoa", @@ -3947,8 +3946,7 @@ dependencies = [ [[package]] name = "prometheus-client-derive-encode" version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9adf1691c04c0a5ff46ff8f262b58beb07b0dbb61f96f9f54f6cbd82106ed87f" +source = "git+https://github.com/patrick-ogrady/client_rust?rev=0dcb4a74b922158350c8af25552e228b79cda477#0dcb4a74b922158350c8af25552e228b79cda477" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index 3ca0f33a594..d3ba07e37e4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -172,6 +172,9 @@ x25519-dalek = "2.0.1" zeroize = "1.5.7" zstd = "0.13.2" +[patch.crates-io] +prometheus-client = { git = "https://github.com/patrick-ogrady/client_rust", rev = "0dcb4a74b922158350c8af25552e228b79cda477" } + [profile.bench] # Because we enable overflow checks in "release," we should benchmark with them. overflow-checks = true diff --git a/runtime/src/telemetry/metrics/mod.rs b/runtime/src/telemetry/metrics/mod.rs index 91cbd57d2eb..2747c3f6f9d 100644 --- a/runtime/src/telemetry/metrics/mod.rs +++ b/runtime/src/telemetry/metrics/mod.rs @@ -35,10 +35,7 @@ pub mod raw { } use commonware_utils::sync::Mutex; -use prometheus_client::encoding::{ - text::{encode, encode_eof}, - MetricEncoder as PromMetricEncoder, -}; +use prometheus_client::encoding::text::{encode_descriptor, encode_eof, encode_metric}; pub use registration::Registration; use std::{ any::Any, @@ -230,31 +227,6 @@ pub fn count_running_tasks(metrics: &impl crate::Metrics, prefix: &str) -> usize .sum() } -// Adaptation of client_rust's internal descriptor encoder. -// -// Source: -// https://github.com/prometheus/client_rust/blob/4a6d40a55443d5b18f5be311d246c03e56f417d6/src/encoding/text.rs#L218-L275 -fn encode_descriptor( - writer: &mut W, - name: &str, - help: &str, - metric_type: MetricType, -) -> Result<(), std::fmt::Error> -where - W: std::fmt::Write, -{ - writer.write_str("# HELP ")?; - writer.write_str(name)?; - writer.write_str(" ")?; - writer.write_str(help)?; - writer.write_str("\n# TYPE ")?; - writer.write_str(name)?; - writer.write_str(" ")?; - writer.write_str(metric_type.as_str())?; - writer.write_str("\n")?; - Ok(()) -} - /// Join a metric or label prefix with a child name using Prometheus' `_` separator. pub(crate) fn prefixed_name(prefix: &str, name: &str) -> String { if prefix.is_empty() { @@ -372,61 +344,14 @@ impl std::fmt::Debug for Registered { type MetricAttributes = Vec<(Cow<'static, str>, Cow<'static, str>)>; type MetricKey = (String, MetricAttributes); -type SampleEncoder = dyn Fn(&mut String) -> Result<(), std::fmt::Error> + Send + Sync; struct PendingMetricEntry { family_name: String, attributes: MetricAttributes, - encode_samples: Box, + metric: Arc, metric_any: Arc, } -pub(crate) struct SharedMetric(pub(crate) Arc); - -impl std::fmt::Debug for SharedMetric { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - self.0.fmt(f) - } -} - -impl EncodeMetric for SharedMetric { - fn encode(&self, encoder: PromMetricEncoder<'_>) -> Result<(), std::fmt::Error> { - self.0.encode(encoder) - } - - fn metric_type(&self) -> MetricType { - self.0.metric_type() - } -} - -fn create_sample_encoder( - name: String, - labels: MetricAttributes, - metric: Arc, -) -> Box -where - M: Metric, -{ - // TODO (#3659): Avoid allocating an upstream registry per metric once - // `prometheus-client` exposes a public sample-only `MetricEncoder` path - // for encoding one metric with const labels. - let mut registry = registry::Registry::with_labels(labels.into_iter()); - registry.register(name, "", SharedMetric(metric)); - - Box::new(move |samples| { - let mut encoded = String::new(); - encode(&mut encoded, ®istry).expect("encoding temporary metric registry failed"); - for line in encoded.lines() { - if line.starts_with('#') { - continue; - } - samples.push_str(line); - samples.push('\n'); - } - Ok(()) - }) -} - fn owned_attributes(attributes: Vec<(String, String)>) -> MetricAttributes { attributes .into_iter() @@ -445,7 +370,7 @@ fn normalize_help(help: String) -> String { struct MetricEntry { family_name: String, attributes: MetricAttributes, - encode_samples: Box, + metric: Arc, metric_any: Arc, claims: usize, family_index: usize, @@ -535,8 +460,6 @@ impl RegistryInner { let attributes = owned_attributes(attributes); let help = normalize_help(help); let metric_type = metric.metric_type(); - let encode_samples = - create_sample_encoder(name.clone(), attributes.clone(), metric.clone()); let key = (name.clone(), attributes.clone()); if let Some(existing_id) = self.keys.get(&key).copied() { let entry = self.metric_ref(existing_id); @@ -569,6 +492,7 @@ impl RegistryInner { let id = self.allocate_metric_id(); let registration = Registration::from(RegistryGuard { id, registry }); let metric_any: Arc = metric.clone(); + let metric_erased: Arc = metric.clone(); self.insert_metric_entry( id, help, @@ -576,7 +500,7 @@ impl RegistryInner { PendingMetricEntry { family_name: name, attributes, - encode_samples, + metric: metric_erased, metric_any, }, ); @@ -645,7 +569,7 @@ impl RegistryInner { let PendingMetricEntry { family_name, attributes, - encode_samples, + metric, metric_any, } = entry; self.keys @@ -654,7 +578,7 @@ impl RegistryInner { std::collections::btree_map::Entry::Occupied(entry) => entry.into_mut(), std::collections::btree_map::Entry::Vacant(entry) => { let mut descriptor = String::new(); - encode_descriptor(&mut descriptor, &family_name, &help, metric_type) + encode_descriptor(&mut descriptor, &family_name, &help, None, metric_type) .expect("encoding cached descriptor failed"); entry.insert(MetricFamily { help, @@ -669,7 +593,7 @@ impl RegistryInner { self.metric_slot_mut(id).replace(MetricEntry { family_name, attributes, - encode_samples, + metric, metric_any, claims: 1, family_index, @@ -734,21 +658,31 @@ impl RegistryInner { pub fn encode(&self) -> String { let mut output = String::new(); - let mut samples = String::new(); for family in self.families.values() { - samples.clear(); + let mut encoded_descriptor = false; for metric_id in &family.metric_ids { let metric = self.metric_ref(*metric_id); - (metric.encode_samples)(&mut samples).expect("encoding live metric samples failed"); - } - // Suppress the HELP/TYPE descriptor when the family produced no - // samples (e.g. a `Family` with no child entries). Matches - // upstream prometheus-client's empty-metric filtering. - if samples.is_empty() { - continue; + + // Suppress the HELP/TYPE descriptor when the family would + // produce no samples (e.g. a `Family` with no child + // entries). Matches upstream prometheus-client's empty-metric + // filtering. + if metric.metric.is_empty() { + continue; + } + if !encoded_descriptor { + output.push_str(&family.descriptor); + encoded_descriptor = true; + } + encode_metric( + &mut output, + &metric.family_name, + None, + &metric.attributes, + metric.metric.as_ref(), + ) + .expect("encoding live metric samples failed"); } - output.push_str(&family.descriptor); - output.push_str(&samples); } encode_eof(&mut output).expect("encoding EOF failed"); @@ -814,6 +748,7 @@ mod tests { use crate::{deterministic, Metrics, Runner, Spawner}; use commonware_macros::test_traced; use futures::future; + use prometheus_client::encoding::text::encode; use std::sync::mpsc::{self, TryRecvError}; #[test_traced] @@ -1195,6 +1130,35 @@ mod tests { assert_eq!(encoded.matches("# EOF").count(), 1); } + #[test] + fn test_encode_suppresses_family_cleared_after_scrape() { + let registry = Registry::default(); + let family = Arc::new(raw::Family::, raw::Counter>::default()); + family + .get_or_create(&vec![("epoch".to_string(), "1".to_string())]) + .inc(); + let _registered = registry.register( + "votes".to_string(), + "vote count".to_string(), + Vec::new(), + family.clone(), + ); + + let populated = registry.encode(); + assert!( + populated.contains("votes_total{epoch=\"1\"} 1"), + "populated family missing: {populated}" + ); + + family.clear(); + let cleared = registry.encode(); + assert!( + !cleared.contains("votes"), + "cleared family leaked: {cleared}" + ); + assert_eq!(cleared, "# EOF\n"); + } + #[test] fn test_encode_matches_upstream_registry() { // Byte-for-byte parity between our `Registry::encode` and upstream