Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,9 @@ x25519-dalek = "2.0.1"
zeroize = "1.5.7"
zstd = "0.13.2"

[patch.crates-io]
prometheus-client = { git = "https://github.com/patrick-ogrady/client_rust", rev = "0dcb4a74b922158350c8af25552e228b79cda477" }

[profile.bench]
# Because we enable overflow checks in "release," we should benchmark with them.
overflow-checks = true
Expand Down
154 changes: 59 additions & 95 deletions runtime/src/telemetry/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,7 @@ pub mod raw {
}

use commonware_utils::sync::Mutex;
use prometheus_client::encoding::{
text::{encode, encode_eof},
MetricEncoder as PromMetricEncoder,
};
use prometheus_client::encoding::text::{encode_descriptor, encode_eof, encode_metric};
pub use registration::Registration;
use std::{
any::Any,
Expand Down Expand Up @@ -230,31 +227,6 @@ pub fn count_running_tasks(metrics: &impl crate::Metrics, prefix: &str) -> usize
.sum()
}

// Adaptation of client_rust's internal descriptor encoder.
//
// Source:
// https://github.com/prometheus/client_rust/blob/4a6d40a55443d5b18f5be311d246c03e56f417d6/src/encoding/text.rs#L218-L275
fn encode_descriptor<W>(
writer: &mut W,
name: &str,
help: &str,
metric_type: MetricType,
) -> Result<(), std::fmt::Error>
where
W: std::fmt::Write,
{
writer.write_str("# HELP ")?;
writer.write_str(name)?;
writer.write_str(" ")?;
writer.write_str(help)?;
writer.write_str("\n# TYPE ")?;
writer.write_str(name)?;
writer.write_str(" ")?;
writer.write_str(metric_type.as_str())?;
writer.write_str("\n")?;
Ok(())
}

/// Join a metric or label prefix with a child name using Prometheus' `_` separator.
pub(crate) fn prefixed_name(prefix: &str, name: &str) -> String {
if prefix.is_empty() {
Expand Down Expand Up @@ -372,61 +344,14 @@ impl<M: std::fmt::Debug> std::fmt::Debug for Registered<M> {

type MetricAttributes = Vec<(Cow<'static, str>, Cow<'static, str>)>;
type MetricKey = (String, MetricAttributes);
type SampleEncoder = dyn Fn(&mut String) -> Result<(), std::fmt::Error> + Send + Sync;

struct PendingMetricEntry {
family_name: String,
attributes: MetricAttributes,
encode_samples: Box<SampleEncoder>,
metric: Arc<dyn Metric>,
metric_any: Arc<dyn Any + Send + Sync>,
}

pub(crate) struct SharedMetric<M>(pub(crate) Arc<M>);

impl<M: std::fmt::Debug> std::fmt::Debug for SharedMetric<M> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}

impl<M: EncodeMetric> EncodeMetric for SharedMetric<M> {
fn encode(&self, encoder: PromMetricEncoder<'_>) -> Result<(), std::fmt::Error> {
self.0.encode(encoder)
}

fn metric_type(&self) -> MetricType {
self.0.metric_type()
}
}

fn create_sample_encoder<M>(
name: String,
labels: MetricAttributes,
metric: Arc<M>,
) -> Box<SampleEncoder>
where
M: Metric,
{
// TODO (#3659): Avoid allocating an upstream registry per metric once
// `prometheus-client` exposes a public sample-only `MetricEncoder` path
// for encoding one metric with const labels.
let mut registry = registry::Registry::with_labels(labels.into_iter());
registry.register(name, "", SharedMetric(metric));

Box::new(move |samples| {
let mut encoded = String::new();
encode(&mut encoded, &registry).expect("encoding temporary metric registry failed");
for line in encoded.lines() {
if line.starts_with('#') {
continue;
}
samples.push_str(line);
samples.push('\n');
}
Ok(())
})
}

fn owned_attributes(attributes: Vec<(String, String)>) -> MetricAttributes {
attributes
.into_iter()
Expand All @@ -445,7 +370,7 @@ fn normalize_help(help: String) -> String {
struct MetricEntry {
family_name: String,
attributes: MetricAttributes,
encode_samples: Box<SampleEncoder>,
metric: Arc<dyn Metric>,
metric_any: Arc<dyn Any + Send + Sync>,
claims: usize,
family_index: usize,
Expand Down Expand Up @@ -535,8 +460,6 @@ impl RegistryInner {
let attributes = owned_attributes(attributes);
let help = normalize_help(help);
let metric_type = metric.metric_type();
let encode_samples =
create_sample_encoder(name.clone(), attributes.clone(), metric.clone());
let key = (name.clone(), attributes.clone());
if let Some(existing_id) = self.keys.get(&key).copied() {
let entry = self.metric_ref(existing_id);
Expand Down Expand Up @@ -569,14 +492,15 @@ impl RegistryInner {
let id = self.allocate_metric_id();
let registration = Registration::from(RegistryGuard { id, registry });
let metric_any: Arc<dyn Any + Send + Sync> = metric.clone();
let metric_erased: Arc<dyn Metric> = metric.clone();
self.insert_metric_entry(
id,
help,
metric_type,
PendingMetricEntry {
family_name: name,
attributes,
encode_samples,
metric: metric_erased,
metric_any,
},
);
Expand Down Expand Up @@ -645,7 +569,7 @@ impl RegistryInner {
let PendingMetricEntry {
family_name,
attributes,
encode_samples,
metric,
metric_any,
} = entry;
self.keys
Expand All @@ -654,7 +578,7 @@ impl RegistryInner {
std::collections::btree_map::Entry::Occupied(entry) => entry.into_mut(),
std::collections::btree_map::Entry::Vacant(entry) => {
let mut descriptor = String::new();
encode_descriptor(&mut descriptor, &family_name, &help, metric_type)
encode_descriptor(&mut descriptor, &family_name, &help, None, metric_type)
.expect("encoding cached descriptor failed");
entry.insert(MetricFamily {
help,
Expand All @@ -669,7 +593,7 @@ impl RegistryInner {
self.metric_slot_mut(id).replace(MetricEntry {
family_name,
attributes,
encode_samples,
metric,
metric_any,
claims: 1,
family_index,
Expand Down Expand Up @@ -734,21 +658,31 @@ impl RegistryInner {

pub fn encode(&self) -> String {
let mut output = String::new();
let mut samples = String::new();
for family in self.families.values() {
samples.clear();
let mut encoded_descriptor = false;
for metric_id in &family.metric_ids {
let metric = self.metric_ref(*metric_id);
(metric.encode_samples)(&mut samples).expect("encoding live metric samples failed");
}
// Suppress the HELP/TYPE descriptor when the family produced no
// samples (e.g. a `Family<S, M>` with no child entries). Matches
// upstream prometheus-client's empty-metric filtering.
if samples.is_empty() {
continue;

// Suppress the HELP/TYPE descriptor when the family would
// produce no samples (e.g. a `Family<S, M>` with no child
// entries). Matches upstream prometheus-client's empty-metric
// filtering.
if metric.metric.is_empty() {
continue;
}
if !encoded_descriptor {
output.push_str(&family.descriptor);
encoded_descriptor = true;
}
encode_metric(
&mut output,
&metric.family_name,
None,
&metric.attributes,
metric.metric.as_ref(),
)
.expect("encoding live metric samples failed");
}
output.push_str(&family.descriptor);
output.push_str(&samples);
}

encode_eof(&mut output).expect("encoding EOF failed");
Expand Down Expand Up @@ -814,6 +748,7 @@ mod tests {
use crate::{deterministic, Metrics, Runner, Spawner};
use commonware_macros::test_traced;
use futures::future;
use prometheus_client::encoding::text::encode;
use std::sync::mpsc::{self, TryRecvError};

#[test_traced]
Expand Down Expand Up @@ -1195,6 +1130,35 @@ mod tests {
assert_eq!(encoded.matches("# EOF").count(), 1);
}

#[test]
fn test_encode_suppresses_family_cleared_after_scrape() {
let registry = Registry::default();
let family = Arc::new(raw::Family::<Vec<(String, String)>, raw::Counter>::default());
family
.get_or_create(&vec![("epoch".to_string(), "1".to_string())])
.inc();
let _registered = registry.register(
"votes".to_string(),
"vote count".to_string(),
Vec::new(),
family.clone(),
);

let populated = registry.encode();
assert!(
populated.contains("votes_total{epoch=\"1\"} 1"),
"populated family missing: {populated}"
);

family.clear();
let cleared = registry.encode();
assert!(
!cleared.contains("votes"),
"cleared family leaked: {cleared}"
);
assert_eq!(cleared, "# EOF\n");
}

#[test]
fn test_encode_matches_upstream_registry() {
// Byte-for-byte parity between our `Registry::encode` and upstream
Expand Down
Loading