Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 5 additions & 32 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ kanidm_client = "1.6.2"
kanidm_lib_crypto = "1.6.2"
kanidm_proto = "1.6.2"
kube = { version = "2.0", default-features = true, features = ["client", "derive", "unstable-runtime"] }
prometheus-client = "0.24.0"
opentelemetry = "0.31"
opentelemetry_sdk = { version = "0.31", features = ["rt-tokio", "metrics"] }
opentelemetry-otlp = { version = "0.31", features = ["tokio"] }
schemars = "1.0"
serde = "1.0"
serde_json = "1.0"
Expand Down
3 changes: 2 additions & 1 deletion cmd/operator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ clap = { workspace = true, features = ["cargo", "env"] }
futures = { workspace = true }
k8s-openapi = { workspace = true }
kube = { workspace = true }
prometheus-client = { workspace = true }
opentelemetry = { workspace = true }
opentelemetry_sdk = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
anyhow = "1.0"
Expand Down
13 changes: 8 additions & 5 deletions cmd/operator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use axum::response::{IntoResponse, Json};
use axum::routing::{Router, get};
use clap::{Parser, crate_authors, crate_description, crate_version};
use kube::Config;
use prometheus_client::registry::Registry;
use tokio::net::TcpListener;
use tokio::signal::unix::{SignalKind, signal};

Expand Down Expand Up @@ -86,9 +85,12 @@ async fn main() -> anyhow::Result<()> {
)
.await?;

let mut registry = Registry::with_prefix("kaniop");
let provider = opentelemetry_sdk::metrics::SdkMeterProvider::builder().build();
opentelemetry::global::set_meter_provider(provider.clone());
let meter = opentelemetry::global::meter("kaniop");

let config = Config::infer().await?;
let client = new_client_with_metrics(config, &mut registry).await?;
let client = new_client_with_metrics(config, &meter).await?;
let controllers = [
kaniop_group::controller::CONTROLLER_ID,
kaniop_operator::kanidm::controller::CONTROLLER_ID,
Expand All @@ -101,9 +103,10 @@ async fn main() -> anyhow::Result<()> {
let kanidm = check_api_queryable::<Kanidm>(client.clone()).await;
let kanidm_r = create_subscriber::<Kanidm>(SUBSCRIBE_BUFFER_SIZE);

let controller_metrics = kaniop_operator::metrics::Metrics::new(&meter, &controllers);

let state = KaniopState::new(
registry,
&controllers,
controller_metrics,
namespace_r.store.clone(),
kanidm_r.store.clone(),
);
Expand Down
4 changes: 3 additions & 1 deletion libs/k8s-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ http = { workspace = true }
json-patch = { workspace = true }
k8s-openapi = { workspace = true }
kube = { workspace = true, features = ["ws"] }
prometheus-client = { workspace = true }
opentelemetry = { workspace = true }
opentelemetry_sdk = { workspace = true }
pin-project = "1.1"
serde_json = { workspace = true }
tokio = { workspace = true }
tokio-util = "0.7.12"
Expand Down
6 changes: 3 additions & 3 deletions libs/k8s-util/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ use hyper_util::rt::TokioExecutor;
use kube::Result;
use kube::api::AttachedProcess;
use kube::{Client, Config, client::ConfigExt};
use prometheus_client::registry::Registry;
use opentelemetry::metrics::Meter;
use tower::{BoxError, ServiceBuilder};

pub async fn new_client_with_metrics(config: Config, registry: &mut Registry) -> Result<Client> {
let metrics_layer = MetricsLayer::new(registry);
pub async fn new_client_with_metrics(config: Config, meter: &Meter) -> Result<Client> {
let metrics_layer = MetricsLayer::new(meter);
let https = config.rustls_https_connector()?;
let service = ServiceBuilder::new()
.layer(metrics_layer)
Expand Down
169 changes: 90 additions & 79 deletions libs/k8s-util/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,114 +1,125 @@
use crate::url::template_path;

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

use futures::future::FutureExt;
use http::Request;
use prometheus_client::encoding::EncodeLabelSet;
use prometheus_client::metrics::{counter::Counter, family::Family, histogram::Histogram};
use prometheus_client::registry::Registry;
use tokio::time::Instant;
use std::{
task::{Context, Poll},
time::Instant,
};

use http::{Request, Response};
use opentelemetry::KeyValue;
use opentelemetry::metrics::{Counter, Histogram, Meter};
use tower::{Layer, Service};

#[derive(Clone, Hash, PartialEq, Eq, EncodeLabelSet, Debug, Default)]
pub struct EndpointLabel {
pub endpoint: String,
}

#[derive(Clone, Hash, PartialEq, Eq, EncodeLabelSet, Debug, Default)]
pub struct StatusCodeLabel {
pub status_code: String,
}

/// Metrics layer for monitoring HTTP requests
#[derive(Clone)]
pub struct MetricsLayer {
request_histogram: Family<EndpointLabel, Histogram>,
requests_total: Family<StatusCodeLabel, Counter>,
meter: Meter,
}

impl MetricsLayer {
pub fn new(registry: &mut Registry) -> Self {
// TODO: remove bucket, implement summary (without quantiles):
// https://github.com/prometheus/client_rust/pull/254
let request_histogram =
Family::<EndpointLabel, Histogram>::new_with_constructor(|| Histogram::new([]));

let requests_total = Family::<StatusCodeLabel, Counter>::default();
registry.register(
"kubernetes_client_http_request_duration",
"Summary of latencies for the Kubernetes client's requests by endpoint",
request_histogram.clone(),
);

registry.register(
"kubernetes_client_http_requests",
"Total number of Kubernetes's client requests by status code",
requests_total.clone(),
);

pub fn new(meter: &Meter) -> Self {
Self {
request_histogram,
requests_total,
meter: meter.clone(),
}
}
}

impl<S> Layer<S> for MetricsLayer {
type Service = MetricsService<S>;

fn layer(&self, inner: S) -> Self::Service {
MetricsService {
inner,
request_histogram: self.request_histogram.clone(),
requests_total: self.requests_total.clone(),
}
fn layer(&self, service: S) -> Self::Service {
MetricsService::new(service, &self.meter)
}
}

#[derive(Debug, Clone)]
#[derive(Clone)]
pub struct MetricsService<S> {
inner: S,
request_histogram: Family<EndpointLabel, Histogram>,
requests_total: Family<StatusCodeLabel, Counter>,
request_count: Counter<u64>,
request_duration: Histogram<f64>,
}

impl<S> MetricsService<S> {
fn new(service: S, meter: &Meter) -> Self {
let request_count = meter
.u64_counter("http_requests_total")
.with_description("Total number of HTTP requests")
.build();

let request_duration = meter
.f64_histogram("http_request_duration_seconds")
.with_description("HTTP request duration in seconds")
.build();

Self {
inner: service,
request_count,
request_duration,
}
}
}

impl<S, ReqBody, ResBody> Service<Request<ReqBody>> for MetricsService<S>
where
S: Service<Request<ReqBody>, Response = http::Response<ResBody>>,
S::Future: Send + 'static,
S: Service<Request<ReqBody>, Response = Response<ResBody>>,
{
type Response = S::Response;
type Error = S::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
type Future = MetricsFuture<S::Future>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}

fn call(&mut self, req: Request<ReqBody>) -> Self::Future {
let path_template = template_path(req.uri().path(), None);
let labels = EndpointLabel {
endpoint: url_escape::encode_path(&path_template).to_string(),
};

let start_time = Instant::now();

let fut = self.inner.call(req);
let request_histogram = self.request_histogram.clone();
let requests_total = self.requests_total.clone();
async move {
let result = fut.await;
let duration = start_time.elapsed().as_secs_f64();
request_histogram.get_or_create(&labels).observe(duration);
if let Ok(ref response) = result {
let status_code = response.status().as_u16().to_string();
requests_total
.get_or_create(&StatusCodeLabel { status_code })
.inc();
}
result
let method = req.method().to_string();
let start = Instant::now();

let future = self.inner.call(req);

MetricsFuture {
future,
method,
start,
request_count: self.request_count.clone(),
request_duration: self.request_duration.clone(),
}
}
}

#[pin_project::pin_project]
pub struct MetricsFuture<F> {
#[pin]
future: F,
method: String,
start: Instant,
request_count: Counter<u64>,
request_duration: Histogram<f64>,
}

impl<F, ResBody, E> std::future::Future for MetricsFuture<F>
where
F: std::future::Future<Output = Result<Response<ResBody>, E>>,
{
type Output = F::Output;

fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let poll_result = this.future.poll(cx);

if let Poll::Ready(Ok(response)) = &poll_result {
let duration = this.start.elapsed().as_secs_f64();
let status = response.status().as_str().to_string();

this.request_count.add(
1,
&[
KeyValue::new("method", this.method.clone()),
KeyValue::new("status", status),
],
);
this.request_duration
.record(duration, &[KeyValue::new("method", this.method.clone())]);
}
.boxed()

poll_result
}
}
1 change: 1 addition & 0 deletions libs/k8s-util/src/url.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// Adapted from: https://github.com/kubernetes/client-go/blob/ca4a13f6dec7cb79cfd85df0ab3d7cfd05c5c5e9/rest/request.go#L526C1-L605C2
#[allow(dead_code)]
pub fn template_path(path: &str, base_path: Option<&str>) -> String {
let mut segments: Vec<&str> = path.split('/').filter(|s| !s.is_empty()).collect();
let mut trimmed_base_path = String::new();
Expand Down
7 changes: 3 additions & 4 deletions libs/operator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ clap = { workspace = true }
futures = { workspace = true }
k8s-openapi = { workspace = true }
kube = { workspace = true }
prometheus-client = { workspace = true }
opentelemetry = { workspace = true }
opentelemetry_sdk = { workspace = true }
opentelemetry-otlp = { workspace = true }
serde = { workspace = true }
serde_plain = { workspace = true }
serde_json = { workspace = true }
Expand All @@ -38,9 +40,6 @@ tracing = { workspace = true }
chrono = { workspace = true, features = ["serde"] }
axum = "0.8"
backon = "1.3"
opentelemetry = { version = "0.31", features = ["trace"] }
opentelemetry_sdk = { version = "0.31", features = ["rt-tokio"] }
opentelemetry-otlp = { version = "0.31", features = ["tokio"] }
thiserror = "2.0"
time = "0.3"
tonic = "0.14"
Expand Down
Loading
Loading