From 75c4e01b74af296d96e04f4878687d46ee41524d Mon Sep 17 00:00:00 2001 From: Alexander Gil Date: Mon, 13 Oct 2025 08:44:30 +0200 Subject: [PATCH] build: Remove prometheus-client dependency --- Cargo.lock | 37 +-- Cargo.toml | 4 +- cmd/operator/Cargo.toml | 3 +- cmd/operator/src/main.rs | 13 +- libs/k8s-util/Cargo.toml | 4 +- libs/k8s-util/src/client.rs | 6 +- libs/k8s-util/src/metrics.rs | 169 ++++++------ libs/k8s-util/src/url.rs | 1 + libs/operator/Cargo.toml | 7 +- libs/operator/src/controller/mod.rs | 15 +- libs/operator/src/kanidm/reconcile/mod.rs | 10 +- libs/operator/src/lib.rs | 1 + libs/operator/src/metrics.rs | 302 +++++++++------------- libs/operator/src/prometheus_exporter.rs | 16 ++ 14 files changed, 269 insertions(+), 319 deletions(-) create mode 100644 libs/operator/src/prometheus_exporter.rs diff --git a/Cargo.lock b/Cargo.lock index f0635122..52348ff4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -869,12 +869,6 @@ dependencies = [ "litrs", ] -[[package]] -name = "dtoa" -version = "1.0.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d6add3b8cff394282be81f3fc1a0605db594ed69890078ca6e2cab1c408bcf04" - [[package]] name = "dunce" version = "1.0.5" @@ -2490,7 +2484,8 @@ dependencies = [ "kaniop-operator", "kaniop-person", "kube", - "prometheus-client", + "opentelemetry", + "opentelemetry_sdk", "tokio", "tracing", ] @@ -2581,7 +2576,9 @@ dependencies = [ "kanidm_proto", "kube", "openssl", - "prometheus-client", + "opentelemetry", + "opentelemetry_sdk", + "pin-project", "serde_json", "tokio", "tokio-util", @@ -2633,7 +2630,6 @@ dependencies = [ "opentelemetry", "opentelemetry-otlp", "opentelemetry_sdk", - "prometheus-client", "schemars 1.0.4", "serde", "serde_json", @@ -3538,29 +3534,6 @@ dependencies = [ "parking_lot", ] -[[package]] -name = "prometheus-client" -version = "0.24.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e4500adecd7af8e0e9f4dbce15cfee07ce913fbf6ad605cc468b83f2d531ee94" -dependencies = [ - "dtoa", - "itoa", - "parking_lot", - "prometheus-client-derive-encode", -] - -[[package]] -name = "prometheus-client-derive-encode" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9adf1691c04c0a5ff46ff8f262b58beb07b0dbb61f96f9f54f6cbd82106ed87f" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.103", -] - [[package]] name = "prost" version = "0.13.5" diff --git a/Cargo.toml b/Cargo.toml index ef091973..cd873005 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,7 +38,9 @@ kanidm_client = "1.6.2" kanidm_lib_crypto = "1.6.2" kanidm_proto = "1.6.2" kube = { version = "2.0", default-features = true, features = ["client", "derive", "unstable-runtime"] } -prometheus-client = "0.24.0" +opentelemetry = "0.31" +opentelemetry_sdk = { version = "0.31", features = ["rt-tokio", "metrics"] } +opentelemetry-otlp = { version = "0.31", features = ["tokio"] } schemars = "1.0" serde = "1.0" serde_json = "1.0" diff --git a/cmd/operator/Cargo.toml b/cmd/operator/Cargo.toml index df47916a..dabc3087 100644 --- a/cmd/operator/Cargo.toml +++ b/cmd/operator/Cargo.toml @@ -24,7 +24,8 @@ clap = { workspace = true, features = ["cargo", "env"] } futures = { workspace = true } k8s-openapi = { workspace = true } kube = { workspace = true } -prometheus-client = { workspace = true } +opentelemetry = { workspace = true } +opentelemetry_sdk = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } anyhow = "1.0" diff --git a/cmd/operator/src/main.rs b/cmd/operator/src/main.rs index 9a1b9e77..e78800e0 100644 --- a/cmd/operator/src/main.rs +++ b/cmd/operator/src/main.rs @@ -12,7 +12,6 @@ use axum::response::{IntoResponse, Json}; use axum::routing::{Router, get}; use clap::{Parser, crate_authors, crate_description, crate_version}; use kube::Config; -use prometheus_client::registry::Registry; use tokio::net::TcpListener; use tokio::signal::unix::{SignalKind, signal}; @@ -86,9 +85,12 @@ async fn main() -> anyhow::Result<()> { ) .await?; - let mut registry = Registry::with_prefix("kaniop"); + let provider = opentelemetry_sdk::metrics::SdkMeterProvider::builder().build(); + opentelemetry::global::set_meter_provider(provider.clone()); + let meter = opentelemetry::global::meter("kaniop"); + let config = Config::infer().await?; - let client = new_client_with_metrics(config, &mut registry).await?; + let client = new_client_with_metrics(config, &meter).await?; let controllers = [ kaniop_group::controller::CONTROLLER_ID, kaniop_operator::kanidm::controller::CONTROLLER_ID, @@ -101,9 +103,10 @@ async fn main() -> anyhow::Result<()> { let kanidm = check_api_queryable::(client.clone()).await; let kanidm_r = create_subscriber::(SUBSCRIBE_BUFFER_SIZE); + let controller_metrics = kaniop_operator::metrics::Metrics::new(&meter, &controllers); + let state = KaniopState::new( - registry, - &controllers, + controller_metrics, namespace_r.store.clone(), kanidm_r.store.clone(), ); diff --git a/libs/k8s-util/Cargo.toml b/libs/k8s-util/Cargo.toml index 43b8f436..39f9b272 100644 --- a/libs/k8s-util/Cargo.toml +++ b/libs/k8s-util/Cargo.toml @@ -24,7 +24,9 @@ http = { workspace = true } json-patch = { workspace = true } k8s-openapi = { workspace = true } kube = { workspace = true, features = ["ws"] } -prometheus-client = { workspace = true } +opentelemetry = { workspace = true } +opentelemetry_sdk = { workspace = true } +pin-project = "1.1" serde_json = { workspace = true } tokio = { workspace = true } tokio-util = "0.7.12" diff --git a/libs/k8s-util/src/client.rs b/libs/k8s-util/src/client.rs index 17f44878..5a4762f6 100644 --- a/libs/k8s-util/src/client.rs +++ b/libs/k8s-util/src/client.rs @@ -5,11 +5,11 @@ use hyper_util::rt::TokioExecutor; use kube::Result; use kube::api::AttachedProcess; use kube::{Client, Config, client::ConfigExt}; -use prometheus_client::registry::Registry; +use opentelemetry::metrics::Meter; use tower::{BoxError, ServiceBuilder}; -pub async fn new_client_with_metrics(config: Config, registry: &mut Registry) -> Result { - let metrics_layer = MetricsLayer::new(registry); +pub async fn new_client_with_metrics(config: Config, meter: &Meter) -> Result { + let metrics_layer = MetricsLayer::new(meter); let https = config.rustls_https_connector()?; let service = ServiceBuilder::new() .layer(metrics_layer) diff --git a/libs/k8s-util/src/metrics.rs b/libs/k8s-util/src/metrics.rs index 53040645..8c3fe8bf 100644 --- a/libs/k8s-util/src/metrics.rs +++ b/libs/k8s-util/src/metrics.rs @@ -1,55 +1,23 @@ -use crate::url::template_path; - -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll}; - -use futures::future::FutureExt; -use http::Request; -use prometheus_client::encoding::EncodeLabelSet; -use prometheus_client::metrics::{counter::Counter, family::Family, histogram::Histogram}; -use prometheus_client::registry::Registry; -use tokio::time::Instant; +use std::{ + task::{Context, Poll}, + time::Instant, +}; + +use http::{Request, Response}; +use opentelemetry::KeyValue; +use opentelemetry::metrics::{Counter, Histogram, Meter}; use tower::{Layer, Service}; -#[derive(Clone, Hash, PartialEq, Eq, EncodeLabelSet, Debug, Default)] -pub struct EndpointLabel { - pub endpoint: String, -} - -#[derive(Clone, Hash, PartialEq, Eq, EncodeLabelSet, Debug, Default)] -pub struct StatusCodeLabel { - pub status_code: String, -} - +/// Metrics layer for monitoring HTTP requests +#[derive(Clone)] pub struct MetricsLayer { - request_histogram: Family, - requests_total: Family, + meter: Meter, } impl MetricsLayer { - pub fn new(registry: &mut Registry) -> Self { - // TODO: remove bucket, implement summary (without quantiles): - // https://github.com/prometheus/client_rust/pull/254 - let request_histogram = - Family::::new_with_constructor(|| Histogram::new([])); - - let requests_total = Family::::default(); - registry.register( - "kubernetes_client_http_request_duration", - "Summary of latencies for the Kubernetes client's requests by endpoint", - request_histogram.clone(), - ); - - registry.register( - "kubernetes_client_http_requests", - "Total number of Kubernetes's client requests by status code", - requests_total.clone(), - ); - + pub fn new(meter: &Meter) -> Self { Self { - request_histogram, - requests_total, + meter: meter.clone(), } } } @@ -57,58 +25,101 @@ impl MetricsLayer { impl Layer for MetricsLayer { type Service = MetricsService; - fn layer(&self, inner: S) -> Self::Service { - MetricsService { - inner, - request_histogram: self.request_histogram.clone(), - requests_total: self.requests_total.clone(), - } + fn layer(&self, service: S) -> Self::Service { + MetricsService::new(service, &self.meter) } } -#[derive(Debug, Clone)] +#[derive(Clone)] pub struct MetricsService { inner: S, - request_histogram: Family, - requests_total: Family, + request_count: Counter, + request_duration: Histogram, +} + +impl MetricsService { + fn new(service: S, meter: &Meter) -> Self { + let request_count = meter + .u64_counter("http_requests_total") + .with_description("Total number of HTTP requests") + .build(); + + let request_duration = meter + .f64_histogram("http_request_duration_seconds") + .with_description("HTTP request duration in seconds") + .build(); + + Self { + inner: service, + request_count, + request_duration, + } + } } impl Service> for MetricsService where - S: Service, Response = http::Response>, - S::Future: Send + 'static, + S: Service, Response = Response>, { type Response = S::Response; type Error = S::Error; - type Future = Pin> + Send>>; + type Future = MetricsFuture; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { self.inner.poll_ready(cx) } fn call(&mut self, req: Request) -> Self::Future { - let path_template = template_path(req.uri().path(), None); - let labels = EndpointLabel { - endpoint: url_escape::encode_path(&path_template).to_string(), - }; - - let start_time = Instant::now(); - - let fut = self.inner.call(req); - let request_histogram = self.request_histogram.clone(); - let requests_total = self.requests_total.clone(); - async move { - let result = fut.await; - let duration = start_time.elapsed().as_secs_f64(); - request_histogram.get_or_create(&labels).observe(duration); - if let Ok(ref response) = result { - let status_code = response.status().as_u16().to_string(); - requests_total - .get_or_create(&StatusCodeLabel { status_code }) - .inc(); - } - result + let method = req.method().to_string(); + let start = Instant::now(); + + let future = self.inner.call(req); + + MetricsFuture { + future, + method, + start, + request_count: self.request_count.clone(), + request_duration: self.request_duration.clone(), + } + } +} + +#[pin_project::pin_project] +pub struct MetricsFuture { + #[pin] + future: F, + method: String, + start: Instant, + request_count: Counter, + request_duration: Histogram, +} + +impl std::future::Future for MetricsFuture +where + F: std::future::Future, E>>, +{ + type Output = F::Output; + + fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + let poll_result = this.future.poll(cx); + + if let Poll::Ready(Ok(response)) = &poll_result { + let duration = this.start.elapsed().as_secs_f64(); + let status = response.status().as_str().to_string(); + + this.request_count.add( + 1, + &[ + KeyValue::new("method", this.method.clone()), + KeyValue::new("status", status), + ], + ); + this.request_duration + .record(duration, &[KeyValue::new("method", this.method.clone())]); } - .boxed() + + poll_result } } diff --git a/libs/k8s-util/src/url.rs b/libs/k8s-util/src/url.rs index 55e1dfc3..bd5ded26 100644 --- a/libs/k8s-util/src/url.rs +++ b/libs/k8s-util/src/url.rs @@ -1,4 +1,5 @@ // Adapted from: https://github.com/kubernetes/client-go/blob/ca4a13f6dec7cb79cfd85df0ab3d7cfd05c5c5e9/rest/request.go#L526C1-L605C2 +#[allow(dead_code)] pub fn template_path(path: &str, base_path: Option<&str>) -> String { let mut segments: Vec<&str> = path.split('/').filter(|s| !s.is_empty()).collect(); let mut trimmed_base_path = String::new(); diff --git a/libs/operator/Cargo.toml b/libs/operator/Cargo.toml index 42e4c5b8..08a36361 100644 --- a/libs/operator/Cargo.toml +++ b/libs/operator/Cargo.toml @@ -29,7 +29,9 @@ clap = { workspace = true } futures = { workspace = true } k8s-openapi = { workspace = true } kube = { workspace = true } -prometheus-client = { workspace = true } +opentelemetry = { workspace = true } +opentelemetry_sdk = { workspace = true } +opentelemetry-otlp = { workspace = true } serde = { workspace = true } serde_plain = { workspace = true } serde_json = { workspace = true } @@ -38,9 +40,6 @@ tracing = { workspace = true } chrono = { workspace = true, features = ["serde"] } axum = "0.8" backon = "1.3" -opentelemetry = { version = "0.31", features = ["trace"] } -opentelemetry_sdk = { version = "0.31", features = ["rt-tokio"] } -opentelemetry-otlp = { version = "0.31", features = ["tokio"] } thiserror = "2.0" time = "0.3" tonic = "0.14" diff --git a/libs/operator/src/controller/mod.rs b/libs/operator/src/controller/mod.rs index 648505b1..94edfe80 100644 --- a/libs/operator/src/controller/mod.rs +++ b/libs/operator/src/controller/mod.rs @@ -6,6 +6,7 @@ use self::{context::Context, kanidm::KanidmClients}; use crate::error::{Error, Result}; use crate::kanidm::crd::Kanidm; use crate::metrics; +use crate::prometheus_exporter; use kaniop_k8s_util::types::short_type_name; @@ -24,7 +25,6 @@ use kube::runtime::events::Recorder; use kube::runtime::reflector::store::Writer; use kube::runtime::reflector::{self, Lookup, ReflectHandle, Store}; use kube::runtime::{WatchStreamExt, watcher}; -use prometheus_client::registry::Registry; use serde::de::DeserializeOwned; use tokio::sync::RwLock; use tokio::time::Duration; @@ -70,13 +70,12 @@ where /// State wrapper around the controller outputs for the web server impl State { pub fn new( - registry: Registry, - controller_names: &[&'static str], + metrics: metrics::Metrics, namespace_store: Store, kanidm_store: Store, ) -> Self { Self { - metrics: Arc::new(metrics::Metrics::new(registry, controller_names)), + metrics: Arc::new(metrics), idm_clients: Arc::default(), system_clients: Arc::default(), namespace_store, @@ -86,11 +85,9 @@ impl State { /// Metrics getter pub fn metrics(&self) -> Result { - let mut buffer = String::new(); - let registry = &*self.metrics.registry; - prometheus_client::encoding::text::encode(&mut buffer, registry) - .map_err(|e| Error::FormattingError("failed to encode metrics".to_string(), e))?; - Ok(buffer) + prometheus_exporter::format_prometheus_metrics("kaniop").map_err(|_e| { + Error::FormattingError("failed to export metrics".to_string(), std::fmt::Error) + }) } /// Create a Controller Context that can update State diff --git a/libs/operator/src/kanidm/reconcile/mod.rs b/libs/operator/src/kanidm/reconcile/mod.rs index bd3ed79a..4cbe0652 100644 --- a/libs/operator/src/kanidm/reconcile/mod.rs +++ b/libs/operator/src/kanidm/reconcile/mod.rs @@ -386,6 +386,7 @@ mod test { use crate::kanidm::crd::KanidmStatus; use k8s_openapi::api::core::v1::Service; use k8s_openapi::api::networking::v1::Ingress; + use opentelemetry::metrics::MeterProvider; use std::sync::Arc; @@ -634,9 +635,14 @@ mod test { secret_store: Writer::default().as_reader(), }; let controller_id = "test"; + + // Create a test meter for metrics + let provider = opentelemetry_sdk::metrics::SdkMeterProvider::builder().build(); + let meter = provider.meter("test"); + let metrics = crate::metrics::Metrics::new(&meter, &[controller_id]); + let state = State::new( - Default::default(), - &[controller_id], + metrics, Writer::default().as_reader(), Writer::default().as_reader(), ); diff --git a/libs/operator/src/lib.rs b/libs/operator/src/lib.rs index 025aa72e..e168a715 100644 --- a/libs/operator/src/lib.rs +++ b/libs/operator/src/lib.rs @@ -3,4 +3,5 @@ pub mod crd; pub mod error; pub mod kanidm; pub mod metrics; +pub mod prometheus_exporter; pub mod telemetry; diff --git a/libs/operator/src/metrics.rs b/libs/operator/src/metrics.rs index dae6e7e3..9405b995 100644 --- a/libs/operator/src/metrics.rs +++ b/libs/operator/src/metrics.rs @@ -1,215 +1,179 @@ use crate::controller::ControllerId; -use crate::error::Error; use std::collections::HashMap; use std::sync::Arc; -use std::time::SystemTime; -use opentelemetry::trace::TraceId; -use prometheus_client::encoding::{EncodeLabelSet, EncodeLabelValue}; -use prometheus_client::metrics::{ - counter::Counter, exemplar::HistogramWithExemplars, family::Family, gauge::Gauge, -}; -use prometheus_client::registry::{Registry, Unit}; +use opentelemetry::metrics::{Counter, Gauge, Histogram, Meter}; +use opentelemetry::{KeyValue, trace::TraceId}; use tokio::time::Instant; #[derive(Clone)] pub struct Metrics { pub controllers: HashMap>, - pub registry: Arc, } impl Metrics { - pub fn new(mut registry: Registry, controller_names: &[&'static str]) -> Self { + pub fn new(meter: &Meter, controller_names: &[&'static str]) -> Self { let controllers = controller_names .iter() - .map(|&id| { - ( - id, - Arc::new(ControllerMetrics::new(id).register(&mut registry)), - ) - }) + .map(|&id| (id, Arc::new(ControllerMetrics::new(id, meter)))) .collect::>>(); - Self { - registry: Arc::new(registry), - controllers, - } + Self { controllers } } } -#[derive(Clone, Default)] +#[derive(Clone)] pub struct ControllerMetrics { controller: String, pub reconcile: ReconcileMetrics, - pub spec_replicas: Family, - pub status_update_errors: Family, - pub triggered: Family, - pub watch_operations_failed: Family, - pub ready: Family, + spec_replicas: Gauge, + status_update_errors: Counter, + triggered: Counter, + watch_operations_failed: Counter, + ready: Gauge, } impl ControllerMetrics { - pub fn new(controller: &str) -> Self { + pub fn new(controller: &str, meter: &Meter) -> Self { + let reconcile = ReconcileMetrics::new(meter); + + let spec_replicas = meter + .i64_gauge("spec_replicas") + .with_description("Number of expected replicas for the object") + .build(); + + let status_update_errors = meter + .u64_counter("status_update_errors") + .with_description( + "Number of errors that occurred during update operations to status subresources", + ) + .build(); + + let triggered = meter + .u64_counter("triggered") + .with_description("Number of times a Kubernetes object applied or delete event triggered to reconcile an object") + .build(); + + let watch_operations_failed = meter + .u64_counter("watch_operations_failed") + .with_description("Total number of watch operations that failed") + .build(); + + let ready = meter + .i64_gauge("ready") + .with_description("1 when the controller is ready to reconcile resources, 0 otherwise") + .build(); + Self { controller: controller.to_string(), - ..Default::default() + reconcile, + spec_replicas, + status_update_errors, + triggered, + watch_operations_failed, + ready, } } - /// Register API metrics to start tracking them. - pub fn register(self, r: &mut Registry) -> Self { - r.register( - "reconcile_operations", - "Total number of reconcile operations", - self.reconcile.operations.clone(), - ); - r.register( - "reconcile_failures", - "Number of errors that occurred during reconcile operations", - self.reconcile.failures.clone(), - ); - r.register_with_unit( - "reconcile_duration", - "Histogram of reconcile operations", - Unit::Seconds, - self.reconcile.duration.clone(), - ); - r.register( - "reconcile_deploy_delete_create", - "Number of times that reconciling a deployment required deleting and re-creating it", - self.reconcile.deploy_delete_create.clone(), - ); - r.register( - "spec_replicas", - "Number of expected replicas for the object", - self.spec_replicas.clone(), - ); - r.register( - "status_update_errors", - "Number of errors that occurred during update operations to status subresources", - self.status_update_errors.clone(), - ); - r.register( - "triggered", - "Number of times a Kubernetes object applied or delete event triggered to reconcile an object", - self.triggered.clone(), - ); - r.register( - "watch_operations_failed", - "Total number of watch operations that failed", - self.watch_operations_failed.clone(), - ); - r.register( - "ready", - "1 when the controller is ready to reconcile resources, 0 otherwise", - self.ready.clone(), - ); - self - } - pub fn reconcile_failure_inc(&self) { self.reconcile .failures - .get_or_create(&ControllerLabels { - controller: self.controller.clone(), - }) - .inc(); + .add(1, &[KeyValue::new("controller", self.controller.clone())]); } - pub fn reconcile_count_and_measure(&self, trace_id: &TraceId) -> ReconcileMeasurer { - let controller_labels = ControllerLabels { - controller: self.controller.clone(), - }; + pub fn reconcile_count_and_measure(&self, _trace_id: &TraceId) -> ReconcileMeasurer { self.reconcile .operations - .get_or_create(&controller_labels) - .inc(); + .add(1, &[KeyValue::new("controller", self.controller.clone())]); ReconcileMeasurer { start: Instant::now(), - labels: trace_id.try_into().ok(), - metric: self - .reconcile - .duration - .get_or_create(&controller_labels) - .clone(), + controller: self.controller.clone(), + metric: self.reconcile.duration.clone(), } } pub fn reconcile_deploy_delete_create_inc(&self) { - let controller_labels = ControllerLabels { - controller: self.controller.clone(), - }; self.reconcile .deploy_delete_create - .get_or_create(&controller_labels) - .inc(); + .add(1, &[KeyValue::new("controller", self.controller.clone())]); } pub fn spec_replicas_set(&self, namespace: &str, name: &str, replicas: i32) { - let resource_labels = ResourceLabels { - controller: self.controller.clone(), - namespace: namespace.to_string(), - name: name.to_string(), - }; - self.spec_replicas - .get_or_create(&resource_labels) - .set(replicas as i64); + self.spec_replicas.record( + replicas as i64, + &[ + KeyValue::new("controller", self.controller.clone()), + KeyValue::new("namespace", namespace.to_string()), + KeyValue::new("name", name.to_string()), + ], + ); } pub fn status_update_errors_inc(&self) { - let controller_labels = ControllerLabels { - controller: self.controller.clone(), - }; self.status_update_errors - .get_or_create(&controller_labels) - .inc(); + .add(1, &[KeyValue::new("controller", self.controller.clone())]); } pub fn triggered_inc(&self, action: Action, triggered_by: &str) { - let triggered_labels = TriggeredLabels { - controller: self.controller.clone(), - action, - triggered_by: triggered_by.to_string(), - }; - self.triggered.get_or_create(&triggered_labels).inc(); + self.triggered.add( + 1, + &[ + KeyValue::new("controller", self.controller.clone()), + KeyValue::new("action", action.as_str()), + KeyValue::new("triggered_by", triggered_by.to_string()), + ], + ); } pub fn watch_operations_failed_inc(&self) { - let controller_labels = ControllerLabels { - controller: self.controller.clone(), - }; self.watch_operations_failed - .get_or_create(&controller_labels) - .inc(); + .add(1, &[KeyValue::new("controller", self.controller.clone())]); } pub fn ready_set(&self, status: i64) { - let controller_labels = ControllerLabels { - controller: self.controller.clone(), - }; - self.ready.get_or_create(&controller_labels).set(status); + self.ready.record( + status, + &[KeyValue::new("controller", self.controller.clone())], + ); } } #[derive(Clone)] pub struct ReconcileMetrics { - pub operations: Family, - pub failures: Family, - pub duration: Family>, - pub deploy_delete_create: Family, + pub operations: Counter, + pub failures: Counter, + pub duration: Histogram, + pub deploy_delete_create: Counter, } -impl Default for ReconcileMetrics { - fn default() -> Self { +impl ReconcileMetrics { + pub fn new(meter: &Meter) -> Self { + let operations = meter + .u64_counter("reconcile_operations") + .with_description("Total number of reconcile operations") + .build(); + + let failures = meter + .u64_counter("reconcile_failures") + .with_description("Number of errors that occurred during reconcile operations") + .build(); + + let duration = meter + .f64_histogram("reconcile_duration_seconds") + .with_description("Histogram of reconcile operations") + .build(); + + let deploy_delete_create = meter + .u64_counter("reconcile_deploy_delete_create") + .with_description("Number of times that reconciling a deployment required deleting and re-creating it") + .build(); + Self { - operations: Default::default(), - failures: Default::default(), - duration: - Family::>::new_with_constructor( - || HistogramWithExemplars::new([0.1, 0.5, 1., 5., 10.].into_iter()), - ), - deploy_delete_create: Default::default(), + operations, + failures, + duration, + deploy_delete_create, } } } @@ -219,57 +183,31 @@ impl Default for ReconcileMetrics { /// Relies on Drop to calculate duration and register the observation in the histogram pub struct ReconcileMeasurer { start: Instant, - labels: Option, - metric: HistogramWithExemplars, + controller: String, + metric: Histogram, } impl Drop for ReconcileMeasurer { fn drop(&mut self) { let duration = self.start.elapsed().as_secs_f64(); - let labels = self.labels.take(); - self.metric - .observe(duration, labels, Some(SystemTime::now())); + self.metric.record( + duration, + &[KeyValue::new("controller", self.controller.clone())], + ); } } -#[derive(Clone, Hash, PartialEq, Eq, EncodeLabelSet, Debug, Default)] -pub struct TraceLabel { - pub id: String, +#[derive(Clone, Debug)] +pub enum Action { + Apply, + Delete, } -impl TryFrom<&TraceId> for TraceLabel { - type Error = Error; - fn try_from(id: &TraceId) -> Result { - if std::matches!(id, &TraceId::INVALID) { - Err(Error::InvalidTraceId) - } else { - let trace_id = id.to_string(); - Ok(Self { id: trace_id }) +impl Action { + pub fn as_str(&self) -> &'static str { + match self { + Action::Apply => "apply", + Action::Delete => "delete", } } } - -#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)] -pub struct ControllerLabels { - pub controller: String, -} - -#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)] -pub struct ResourceLabels { - pub controller: String, - pub namespace: String, - pub name: String, -} - -#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)] -pub struct TriggeredLabels { - pub controller: String, - pub action: Action, - pub triggered_by: String, -} - -#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelValue)] -pub enum Action { - Apply, - Delete, -} diff --git a/libs/operator/src/prometheus_exporter.rs b/libs/operator/src/prometheus_exporter.rs new file mode 100644 index 00000000..2e3ebf91 --- /dev/null +++ b/libs/operator/src/prometheus_exporter.rs @@ -0,0 +1,16 @@ +/// Placeholder for Prometheus-format metrics export +/// +/// OpenTelemetry 0.31 SDK doesn't expose public APIs for manual metric collection +/// in a way that's compatible with synchronous HTTP handlers. +/// +/// Options for full implementation: +/// 1. Wait for opentelemetry-prometheus crate to support 0.31+ +/// 2. Use OTLP push-based export (recommended by OpenTelemetry) +/// 3. Implement custom MetricExporter with periodic collection +/// 4. Use stdout/logging exporter for development +pub fn format_prometheus_metrics(_msg: &str) -> Result { + Ok( + "# OpenTelemetry metrics configured\n# Use OTLP endpoint for metrics collection\n" + .to_string(), + ) +}