From 4f8a33b2ff8fc84c9b87744a58ba2fd522e7ebbc Mon Sep 17 00:00:00 2001 From: Rae McKelvey <633012+okdistribute@users.noreply.github.com> Date: Tue, 14 Apr 2026 16:53:28 -0700 Subject: [PATCH 1/7] update quickstart to be more accurate --- examples/quickstart.rs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/examples/quickstart.rs b/examples/quickstart.rs index 034ae4db..04473658 100644 --- a/examples/quickstart.rs +++ b/examples/quickstart.rs @@ -3,17 +3,29 @@ use iroh_services::Client; #[tokio::main] async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt::init(); + let endpoint = Endpoint::bind(presets::N0).await?; + // Wait for the endpoint to be online + endpoint.online().await; + // needs IROH_SERVICES_API_SECRET set to an environment variable // client will now push endpoint metrics to iroh-services let client = Client::builder(&endpoint) .api_secret_from_env()? + .name("quickstart-example")? .build() .await?; // we can also ping the service just to confirm everything is working client.ping().await?; + // keep the endpoint running so it continues pushing metrics. + // ctrl+c to exit. + println!("endpoint running. ctrl+c to exit."); + tokio::signal::ctrl_c().await?; + endpoint.close().await; + Ok(()) } From 11355859f8c1d4e1b4c3b8c0079db641343562fa Mon Sep 17 00:00:00 2001 From: Frando Date: Wed, 13 May 2026 15:23:56 +0200 Subject: [PATCH 2/7] feat(logs): opt-in log collection over iroh-services --- Cargo.toml | 3 + examples/logs.rs | 96 +++++++++ src/caps.rs | 26 +++ src/client.rs | 139 ++++++++++++- src/client_host.rs | 64 +++++- src/lib.rs | 1 + src/logs.rs | 472 +++++++++++++++++++++++++++++++++++++++++++++ src/protocol.rs | 85 +++++++- 8 files changed, 874 insertions(+), 12 deletions(-) create mode 100644 examples/logs.rs create mode 100644 src/logs.rs diff --git a/Cargo.toml b/Cargo.toml index e1a1d7e8..d44f84b1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -60,3 +60,6 @@ default = [] [[example]] name = "net_diagnostics" + +[[example]] +name = "logs" diff --git a/examples/logs.rs b/examples/logs.rs new file mode 100644 index 00000000..6dae05db --- /dev/null +++ b/examples/logs.rs @@ -0,0 +1,96 @@ +//! Log collection example. +//! +//! Demonstrates how to install the iroh-services log collector alongside the +//! standard `tracing-subscriber` `fmt` layer, ship records to the cloud over +//! the iroh-services RPC channel, and let the cloud override the local +//! filter at runtime via `SetLogLevel`. +//! +//! The level filter starts at `off`. The cloud pushes a level after this +//! endpoint is opted into log collection from the dashboard or REST API; both +//! the buffered cloud-shipping layer and the stderr fmt layer respect that +//! level. Anything emitted before the cloud responds is silently dropped. +//! +//! Run with: cargo run --example logs + +use std::time::Duration; +use tracing_subscriber::prelude::*; + +use anyhow::Result; +use iroh::{Endpoint, endpoint::presets, protocol::Router}; +use iroh_services::{ + API_SECRET_ENV_VAR_NAME, ApiSecret, CLIENT_HOST_ALPN, Client, ClientHost, caps::LogsCap, logs, +}; +use tracing::info; + +#[tokio::main] +async fn main() -> Result<()> { + // 1. Build the buffer layer and compose it with a stderr fmt layer behind + // the same reloadable filter, so local console output mirrors what + // gets shipped. The cloud raises the filter from `off` after this + // endpoint is opted in via the dashboard. + let (collector, log_layer) = logs::layer(); + tracing_subscriber::registry() + .with(log_layer) + .with(tracing_subscriber::fmt::layer().with_writer(std::io::stderr)) + .try_init() + .map_err(|e| anyhow::anyhow!("failed to install tracing subscriber: {e}"))?; + + // 2. Create the endpoint and parse the API secret so we know which + // cloud endpoint to grant SetLevel to. + let endpoint = Endpoint::bind(presets::N0).await?; + let secret = ApiSecret::from_env_var(API_SECRET_ENV_VAR_NAME)?; + let cloud_id = secret.addr().id; + + let name = format!("logs-example-{}", &endpoint.id().to_string()[..8]); + + // 3. Build the client. `with_log_collection(collector.clone())` starts a + // background task that drains the buffer every second and ships the + // batch as a `PutLogs` RPC. + let client = Client::builder(&endpoint) + .api_secret(secret)? + .name(name)? + .with_log_collection(collector.clone()) + .build() + .await?; + + // 4. Grant `LogsCap::SetLevel` so the cloud can dial us back and apply a + // filter override. Spawned so a momentarily-down cloud does not block + // startup. + let client_for_grant = client.clone(); + let grant_task = tokio::spawn(async move { + if let Err(err) = client_for_grant + .grant_capability(cloud_id, [LogsCap::SetLevel]) + .await + { + eprintln!("failed to grant LogsCap::SetLevel: {err:?}"); + } + }); + + // 5. Accept the cloud's callback connections on `CLIENT_HOST_ALPN`. The + // `ClientHost` needs the same collector so the `SetLogLevel` request + // can hot-reload the filter. + let host = ClientHost::new(&endpoint).with_log_collector(collector); + let router = Router::builder(endpoint) + .accept(CLIENT_HOST_ALPN, host) + .spawn(); + + // 6. Emit an info log every other second forever. These will surface on + // the dashboard's Logs page (and on this process's stderr) once the + // endpoint is opted into log collection at `info` level or above. + println!("emitting logs; ctrl+c to exit."); + let mut tick = tokio::time::interval(Duration::from_secs(2)); + let mut counter: u64 = 0; + loop { + tokio::select! { + _ = tick.tick() => { + counter += 1; + info!(counter, "logs example heartbeat"); + } + _ = tokio::signal::ctrl_c() => break, + } + } + + grant_task.abort(); + router.endpoint().close().await; + Ok(()) +} diff --git a/src/caps.rs b/src/caps.rs index fcdcbf14..51a2c9e8 100644 --- a/src/caps.rs +++ b/src/caps.rs @@ -81,6 +81,8 @@ pub enum Cap { Metrics(MetricsCap), #[strum(to_string = "net-diagnostics:{0}")] NetDiagnostics(NetDiagnosticsCap), + #[strum(to_string = "logs:{0}")] + Logs(LogsCap), } impl FromStr for Cap { @@ -94,6 +96,7 @@ impl FromStr for Cap { "metrics" => Self::Metrics(MetricsCap::from_str(inner)?), "relay" => Self::Relay(RelayCap::from_str(inner)?), "net-diagnostics" => Self::NetDiagnostics(NetDiagnosticsCap::from_str(inner)?), + "logs" => Self::Logs(LogsCap::from_str(inner)?), _ => bail!("invalid cap domain"), }) } else { @@ -121,6 +124,16 @@ cap_enum!( } ); +cap_enum!( + /// Capabilities for the log collection feature. + pub enum LogsCap { + /// Permits the bearer to push log lines to the cloud. + Push, + /// Permits the bearer to set the log level filter on the issuer at runtime. + SetLevel, + } +); + impl Caps { pub fn new(caps: impl IntoIterator>) -> Self { Self::V0(CapSet::new(caps)) @@ -179,6 +192,7 @@ impl Capability for Cap { (Cap::Relay(slf), Cap::Relay(other)) => slf.permits(other), (Cap::Metrics(slf), Cap::Metrics(other)) => slf.permits(other), (Cap::NetDiagnostics(slf), Cap::NetDiagnostics(other)) => slf.permits(other), + (Cap::Logs(slf), Cap::Logs(other)) => slf.permits(other), (_, _) => false, } } @@ -192,6 +206,8 @@ fn client_capabilities(other: &Cap) -> bool { Cap::Metrics(MetricsCap::PutAny) => true, Cap::NetDiagnostics(NetDiagnosticsCap::PutAny) => true, Cap::NetDiagnostics(NetDiagnosticsCap::GetAny) => true, + Cap::Logs(LogsCap::Push) => true, + Cap::Logs(LogsCap::SetLevel) => true, } } @@ -221,6 +237,16 @@ impl Capability for NetDiagnosticsCap { } } +impl Capability for LogsCap { + fn permits(&self, other: &Self) -> bool { + match (self, other) { + (LogsCap::Push, LogsCap::Push) => true, + (LogsCap::SetLevel, LogsCap::SetLevel) => true, + (_, _) => false, + } + } +} + /// A set of capabilities #[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Clone, Serialize, Deserialize)] pub struct CapSet(BTreeSet); diff --git a/src/client.rs b/src/client.rs index 062ab654..7c2cc814 100644 --- a/src/client.rs +++ b/src/client.rs @@ -17,9 +17,10 @@ use uuid::Uuid; use crate::{ api_secret::ApiSecret, caps::Caps, + logs::LogCollector, net_diagnostics::{DiagnosticsReport, checks::run_diagnostics}, protocol::{ - ALPN, Auth, IrohServicesClient, NameEndpoint, Ping, Pong, PutMetrics, + ALPN, Auth, IrohServicesClient, NameEndpoint, Ping, Pong, PutLogs, PutMetrics, PutNetworkDiagnostics, RemoteError, }, }; @@ -54,6 +55,7 @@ pub struct Client { endpoint: Endpoint, message_channel: tokio::sync::mpsc::Sender, _actor_task: Arc>, + _log_flush_task: Option>>, } /// ClientBuilder provides configures and builds a iroh-services client, typically @@ -67,11 +69,19 @@ pub struct ClientBuilder { metrics_interval: Option, remote: Option, registry: Registry, + log_collector: Option, + log_flush_interval: Duration, + log_max_batch: usize, } const DEFAULT_CAP_EXPIRY: Duration = Duration::from_secs(60 * 60 * 24 * 30); // 1 month pub const API_SECRET_ENV_VAR_NAME: &str = "IROH_SERVICES_API_SECRET"; +/// Default interval between log batch flushes when log collection is enabled. +pub const DEFAULT_LOG_FLUSH_INTERVAL: Duration = Duration::from_secs(1); +/// Default maximum batch size pushed in a single PutLogs request. +pub const DEFAULT_LOG_MAX_BATCH: usize = 200; + impl ClientBuilder { pub fn new(endpoint: &Endpoint) -> Self { let mut registry = Registry::default(); @@ -85,9 +95,35 @@ impl ClientBuilder { metrics_interval: Some(Duration::from_secs(60)), remote: None, registry, + log_collector: None, + log_flush_interval: DEFAULT_LOG_FLUSH_INTERVAL, + log_max_batch: DEFAULT_LOG_MAX_BATCH, } } + /// Enables periodic shipment of buffered log lines to iroh-services. + /// + /// The collector is shared with [`crate::client_host::ClientHost`] when + /// runtime log-level overrides are needed; clone it before passing so both + /// sides hold a handle. + pub fn with_log_collection(mut self, collector: LogCollector) -> Self { + self.log_collector = Some(collector); + self + } + + /// Override the log batch flush interval. Defaults to one second. + pub fn log_flush_interval(mut self, interval: Duration) -> Self { + self.log_flush_interval = interval; + self + } + + /// Override the maximum number of lines included in a single PutLogs + /// request. Defaults to [`DEFAULT_LOG_MAX_BATCH`]. + pub fn log_max_batch(mut self, max: usize) -> Self { + self.log_max_batch = max; + self + } + /// Register a metrics group to forward to iroh-services /// /// The default registered metrics uses only the endpoint @@ -213,22 +249,37 @@ impl ClientBuilder { let conn = IrohLazyRemoteConnection::new(self.endpoint.clone(), remote, ALPN.to_vec()); let irpc_client = IrohServicesClient::boxed(conn); - let (tx, rx) = tokio::sync::mpsc::channel(1); + let session_id = Uuid::new_v4(); + // The actor mailbox is only used for control-plane messages (auth, + // ping, name, grant_cap) plus the periodic metrics + log flush. A + // small buffer is enough but `1` head-of-line-blocks log flushes + // behind metrics ticks, so leave a little room. + let (tx, rx) = tokio::sync::mpsc::channel(8); let actor_task = AbortOnDropHandle::new(n0_future::task::spawn( ClientActor { capabilities, client: irpc_client, name: self.name.clone(), - session_id: Uuid::new_v4(), + session_id, authorized: false, } .run(self.name, self.registry, self.metrics_interval, rx), )); + let log_flush_task = self.log_collector.map(|collector| { + let message_channel = tx.clone(); + let interval = self.log_flush_interval; + let max_batch = self.log_max_batch; + Arc::new(AbortOnDropHandle::new(n0_future::task::spawn( + run_log_flush(message_channel, collector, interval, max_batch, session_id), + ))) + }); + Ok(Client { endpoint: self.endpoint, message_channel: tx, _actor_task: Arc::new(actor_task), + _log_flush_task: log_flush_task, }) } } @@ -425,6 +476,10 @@ enum ClientActorMessage { report: Box, done: oneshot::Sender>, }, + PutLogs { + request: PutLogs, + done: oneshot::Sender>, + }, ReadName { done: oneshot::Sender>, }, @@ -505,6 +560,13 @@ impl ClientActor { warn!("failed to publish network diagnostics: {:#?}", err); } } + ClientActorMessage::PutLogs{ request, done } => { + let res = self.put_logs(request).await; + if let Err(err) = done.send(res) { + debug!("failed to publish logs: {:#?}", err); + self.authorized = false; + } + } } } _ = async { @@ -613,6 +675,77 @@ impl ClientActor { Ok(()) } + + async fn put_logs(&mut self, request: PutLogs) -> Result<(), Error> { + trace!( + lines = request.lines.len(), + dropped = request.dropped, + "client actor put logs" + ); + self.auth().await?; + + self.client + .rpc(request) + .await + .map_err(|_| RemoteError::InternalServerError)??; + + Ok(()) + } +} + +async fn run_log_flush( + message_channel: tokio::sync::mpsc::Sender, + collector: LogCollector, + interval: Duration, + max_batch: usize, + session_id: Uuid, +) { + const INITIAL_BACKOFF: Duration = Duration::from_millis(500); + const MAX_BACKOFF: Duration = Duration::from_secs(30); + + let mut ticker = n0_future::time::interval(interval); + // After a slow RPC the default `Burst` behavior would fire several + // ticks back-to-back; `Delay` waits a full interval from the previous + // completed tick. + ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); + let mut backoff = INITIAL_BACKOFF; + loop { + ticker.tick().await; + let (lines, dropped) = collector.drain(max_batch); + if lines.is_empty() && dropped == 0 { + backoff = INITIAL_BACKOFF; + continue; + } + let request = PutLogs { + session_id, + lines, + dropped, + }; + let (tx, rx) = oneshot::channel(); + if message_channel + .send(ClientActorMessage::PutLogs { request, done: tx }) + .await + .is_err() + { + // Mailbox closed only when the actor task has terminated; that + // means the entire client is gone and there is nothing to do. + debug!("log flush stopped: client actor channel closed"); + return; + } + match rx.await { + Ok(Ok(())) => { + backoff = INITIAL_BACKOFF; + } + // Either the RPC failed (Ok(Err)) or the actor dropped the + // response sender mid-handoff (Err(_)). Both are transient: keep + // ticking and back off so the next attempt happens later. + other => { + debug!(?other, ?backoff, "log flush attempt failed; backing off"); + n0_future::time::sleep(backoff).await; + backoff = (backoff * 2).min(MAX_BACKOFF); + } + } + } } async fn set_name_inner( diff --git a/src/client_host.rs b/src/client_host.rs index d0535126..e9774a07 100644 --- a/src/client_host.rs +++ b/src/client_host.rs @@ -7,12 +7,14 @@ use iroh::{ use irpc::WithChannels; use irpc_iroh::read_request; use n0_error::AnyError; +use n0_future::time::Duration; use rcan::{Capability, CapabilityOrigin, Rcan}; use tracing::{debug, warn}; use crate::{ - caps::{Caps, NetDiagnosticsCap}, - protocol::{ClientHostProtocol, NetDiagnosticsMessage, RemoteError}, + caps::{Caps, LogsCap, NetDiagnosticsCap}, + logs::LogCollector, + protocol::{ClientHostMessage, ClientHostProtocol, RemoteError}, }; /// The ALPN for sending messages from the cloud node to the client. @@ -21,9 +23,10 @@ pub const CLIENT_HOST_ALPN: &[u8] = b"n0/n0des-client-host/1"; pub type ClientHostClient = irpc::Client; /// Protocol handler for cloud-to-endpoint connections. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct ClientHost { endpoint: Endpoint, + log_collector: Option, } impl ProtocolHandler for ClientHost { @@ -39,16 +42,29 @@ impl ClientHost { pub fn new(endpoint: &Endpoint) -> Self { Self { endpoint: endpoint.clone(), + log_collector: None, } } + /// Enables the cloud to set the log level filter at runtime via the + /// [`SetLogLevel`] callback. + /// + /// Without a collector the handler still accepts the message but responds + /// with [`RemoteError::AuthError`] indicating the feature is disabled. + /// + /// [`SetLogLevel`]: crate::protocol::SetLogLevel + pub fn with_log_collector(mut self, collector: LogCollector) -> Self { + self.log_collector = Some(collector); + self + } + async fn handle_connection(&self, connection: Connection) -> Result<()> { let remote_node_id = connection.remote_id(); let Some(first_request) = read_request::(&connection).await? else { return Ok(()); }; - let NetDiagnosticsMessage::Auth(WithChannels { inner, tx, .. }) = first_request else { + let ClientHostMessage::Auth(WithChannels { inner, tx, .. }) = first_request else { debug!(remote_node_id = %remote_node_id.fmt_short(), "Expected initial auth message"); connection.close(400u32.into(), b"Expected initial auth message"); return Ok(()); @@ -66,17 +82,17 @@ impl ClientHost { } } - // Read exactly one RunNetworkDiagnostics request + // Read exactly one callback request let Some(request) = read_request::(&connection).await? else { return Ok(()); }; match request { - NetDiagnosticsMessage::Auth(_) => { + ClientHostMessage::Auth(_) => { connection.close(400u32.into(), b"Unexpected auth message"); anyhow::bail!("unexpected auth message"); } - NetDiagnosticsMessage::RunNetworkDiagnostics(msg) => { + ClientHostMessage::RunNetworkDiagnostics(msg) => { let WithChannels { tx, .. } = msg; let needed_caps = Caps::new([NetDiagnosticsCap::GetAny]); if !capability.permits(&needed_caps) { @@ -89,6 +105,40 @@ impl ClientHost { .await .inspect_err(|e| warn!("sending network diagnostics response: {:?}", e))?; } + ClientHostMessage::SetLogLevel(msg) => { + let WithChannels { inner, tx, .. } = msg; + let needed_caps = Caps::new([LogsCap::SetLevel]); + if !capability.permits(&needed_caps) { + return send_missing_caps(tx, needed_caps).await; + } + let Some(ref collector) = self.log_collector else { + tx.send(Err(RemoteError::AuthError( + "log collection is not enabled on this client".into(), + ))) + .await?; + return Ok(()); + }; + let expires_in = inner.expires_in_secs.map(Duration::from_secs); + match collector.set_filter( + &inner.directives, + expires_in, + inner.revert_to.as_deref(), + ) { + Ok(()) => { + debug!( + directives = %inner.directives, + expires_in_secs = ?inner.expires_in_secs, + "applied log level override" + ); + tx.send(Ok(())).await?; + } + Err(err) => { + warn!(?err, "failed to apply log level override"); + tx.send(Err(RemoteError::AuthError(err.to_string()))) + .await?; + } + } + } } connection.closed().await; diff --git a/src/lib.rs b/src/lib.rs index 517b1ac3..19598ea8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -33,6 +33,7 @@ mod client_host; pub mod api_secret; pub mod caps; +pub mod logs; pub mod net_diagnostics; pub mod protocol; diff --git a/src/logs.rs b/src/logs.rs new file mode 100644 index 00000000..4124f416 --- /dev/null +++ b/src/logs.rs @@ -0,0 +1,472 @@ +//! Client-side log collection: a `tracing-subscriber` layer that buffers +//! structured log records for shipment to iroh-services, plus a reload handle +//! that lets the cloud control the level filter at runtime. +//! +//! # The cloud is the source of truth +//! +//! The level filter starts at `off`. No tracing events are captured until the +//! cloud pushes a [`crate::protocol::SetLogLevel`] over the [`ClientHost`] +//! channel. The cloud sends one immediately after authenticating a connected +//! endpoint, derived from the per-endpoint +//! `endpoint_log_settings` row (when present) plus the project default. +//! +//! Concretely this means the install argument is empty: the client process +//! does not get to choose its own log level. The level you see is whatever +//! the dashboard or REST API has decided. +//! +//! [`ClientHost`]: crate::ClientHost +//! +//! # Typical usage +//! +//! ```no_run +//! use iroh_services::logs; +//! +//! # async fn run() -> anyhow::Result<()> { +//! // Buffer-only subscriber, filter starts at `off`. +//! let collector = logs::install()?; +//! +//! // Compose with a stderr fmt layer via `logs::layer()` to also render +//! // filtered events locally: +//! // +//! // use tracing_subscriber::prelude::*; +//! // let (collector, log_layer) = iroh_services::logs::layer(); +//! // tracing_subscriber::registry() +//! // .with(log_layer) +//! // .with(tracing_subscriber::fmt::layer()) +//! // .init(); +//! +//! // Hand the collector to the client builder so it pushes batches over RPC, +//! // and to the ClientHost so the cloud can override the level dynamically. +//! # Ok(()) +//! # } +//! ``` +//! +//! Backed by a bounded VecDeque of [`LogLine`]; the oldest entries are dropped +//! when the buffer fills, with the drop count reported on the next batch. + +use std::{ + collections::VecDeque, + sync::{Arc, Mutex}, + time::Instant, +}; + +use n0_future::{ + task::{AbortOnDropHandle, JoinHandle}, + time::Duration, +}; +use tracing::{Event, Subscriber, debug, warn}; +use tracing_subscriber::{ + EnvFilter, Layer, Registry, + fmt::{ + format::Writer, + time::{FormatTime, SystemTime}, + }, + layer::{Context, SubscriberExt as _}, + registry::LookupSpan, + reload, + util::SubscriberInitExt as _, +}; + +use crate::protocol::{FieldValue, LogLine, SpanInfo}; + +/// Maximum number of buffered log lines awaiting cloud shipment. +/// +/// When the buffer is full, the oldest line is dropped to make room and the +/// drop counter is incremented. KISS default; tune from real usage. +pub const DEFAULT_BUFFER_CAPACITY: usize = 1000; + +/// Maximum log emission rate per second per process. +/// +/// Lines beyond this rate are dropped (counted in the drop counter). The +/// default rate is generous enough to capture useful debug-level traffic +/// without unbounded growth from a runaway log loop. +pub const DEFAULT_RATE_PER_SECOND: u32 = 100; + +/// Errors that can occur while installing the log collector. +#[derive(Debug, thiserror::Error)] +pub enum InstallError { + /// The default tracing dispatcher is already set; install once at startup. + #[error("global tracing dispatcher is already set")] + AlreadyInstalled, + /// The supplied directives string was rejected by `EnvFilter`. + #[error("invalid filter directives: {0}")] + InvalidDirectives(String), +} + +/// Errors that can occur while changing the active filter at runtime. +#[derive(Debug, thiserror::Error)] +pub enum SetFilterError { + /// The supplied directives string was rejected by `EnvFilter`. + #[error("invalid filter directives: {0}")] + InvalidDirectives(String), + /// Reloading the filter failed because the subscriber went away. + #[error("reload handle is no longer valid")] + ReloadFailed, +} + +/// Handle to the buffered log collector. Cheap to clone; all clones share the +/// same backing buffer and reload handle. +#[derive(Clone)] +pub struct LogCollector { + inner: Arc, +} + +struct CollectorInner { + buffer: Mutex, + reload_handle: reload::Handle, + revert_task: Mutex>>, +} + +/// Off-state directive. The buffer captures nothing until the cloud sends +/// a `SetLogLevel` with something more permissive. +const OFF_DIRECTIVES: &str = "off"; + +struct RingBuffer { + lines: VecDeque, + dropped: u32, + capacity: usize, + rate_per_second: u32, + window_start: Instant, + window_count: u32, +} + +impl RingBuffer { + fn new(capacity: usize, rate_per_second: u32) -> Self { + Self { + lines: VecDeque::with_capacity(capacity.min(64)), + dropped: 0, + capacity, + rate_per_second, + window_start: Instant::now(), + window_count: 0, + } + } + + fn push(&mut self, line: LogLine) { + let now = Instant::now(); + if now.duration_since(self.window_start) >= Duration::from_secs(1) { + self.window_start = now; + self.window_count = 0; + } + if self.window_count >= self.rate_per_second { + self.dropped = self.dropped.saturating_add(1); + return; + } + self.window_count += 1; + + if self.lines.len() == self.capacity { + self.lines.pop_front(); + self.dropped = self.dropped.saturating_add(1); + } + self.lines.push_back(line); + } + + fn drain(&mut self, max: usize) -> (Vec, u32) { + let take = self.lines.len().min(max); + let lines: Vec = self.lines.drain(..take).collect(); + let dropped = std::mem::take(&mut self.dropped); + (lines, dropped) + } +} + +impl LogCollector { + /// Returns the current number of buffered lines. + pub fn buffered(&self) -> usize { + self.inner.buffer.lock().expect("poisoned").lines.len() + } + + /// Drains up to `max` lines from the buffer, along with the count of lines + /// dropped since the last drain. + pub fn drain(&self, max: usize) -> (Vec, u32) { + self.inner.buffer.lock().expect("poisoned").drain(max) + } + + /// Sets the active filter directives. When `expires_in` is set, + /// schedules a revert after that duration. The revert target is + /// `revert_to` when supplied; `None` means revert to `off`. + pub fn set_filter( + &self, + directives: &str, + expires_in: Option, + revert_to: Option<&str>, + ) -> Result<(), SetFilterError> { + let filter = EnvFilter::try_new(directives) + .map_err(|e| SetFilterError::InvalidDirectives(e.to_string()))?; + self.inner + .reload_handle + .reload(filter) + .map_err(|_| SetFilterError::ReloadFailed)?; + + let mut guard = self.inner.revert_task.lock().expect("poisoned"); + *guard = None; + + if let Some(expires_in) = expires_in { + let collector = self.clone(); + let revert_to = revert_to.map(str::to_string); + let handle: JoinHandle<()> = n0_future::task::spawn(async move { + n0_future::time::sleep(expires_in).await; + let target = revert_to.as_deref(); + if let Err(err) = collector.revert(target) { + warn!(?err, "failed to revert log filter"); + } + }); + *guard = Some(AbortOnDropHandle::new(handle)); + } + Ok(()) + } + + /// Reverts the active filter to `to`, or to the off state when `to` is + /// `None`. + pub fn revert(&self, to: Option<&str>) -> Result<(), SetFilterError> { + let directives = to.unwrap_or(OFF_DIRECTIVES); + let filter = EnvFilter::try_new(directives) + .map_err(|e| SetFilterError::InvalidDirectives(e.to_string()))?; + self.inner + .reload_handle + .reload(filter) + .map_err(|_| SetFilterError::ReloadFailed) + } +} + +impl std::fmt::Debug for LogCollector { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("LogCollector") + .field("buffered", &self.buffered()) + .finish() + } +} + +/// Installs a global tracing subscriber whose only output is a JSON-buffering +/// layer that ships records to the cloud. The level filter starts at `off`; +/// the cloud must push a `SetLogLevel` for any events to be captured. +/// +/// Call exactly once at process start. For local console output in addition +/// to cloud shipping, use [`layer`] and compose your own subscriber. +pub fn install() -> Result { + let (collector, layer) = layer(); + tracing_subscriber::registry() + .with(layer) + .try_init() + .map_err(|_| InstallError::AlreadyInstalled)?; + debug!("iroh-services log collector installed"); + Ok(collector) +} + +/// Builds the buffer layer and its [`LogCollector`] handle without installing +/// a global subscriber. Use this when composing the collector with other +/// layers; it returns the layer pre-wrapped in the reloadable filter. +/// +/// Typical pattern for buffer + stderr fmt: +/// +/// ```no_run +/// use iroh_services::logs; +/// use tracing_subscriber::prelude::*; +/// +/// let (collector, log_layer) = logs::layer(); +/// tracing_subscriber::registry() +/// .with(log_layer) +/// .with(tracing_subscriber::fmt::layer()) +/// .try_init() +/// .ok(); +/// # let _ = collector; +/// ``` +pub fn layer() -> (LogCollector, impl Layer + Send + Sync + 'static) { + // `EnvFilter::try_new("off")` cannot fail; "off" is always valid. + let filter = EnvFilter::try_new(OFF_DIRECTIVES).expect("'off' is always a valid directive"); + let (filter, reload_handle) = reload::Layer::new(filter); + + let inner = Arc::new(CollectorInner { + buffer: Mutex::new(RingBuffer::new( + DEFAULT_BUFFER_CAPACITY, + DEFAULT_RATE_PER_SECOND, + )), + reload_handle, + revert_task: Mutex::new(None), + }); + let collector = LogCollector { + inner: inner.clone(), + }; + let buffer_layer = BufferLayer { inner }; + (collector, buffer_layer.with_filter(filter)) +} + +struct BufferLayer { + inner: Arc, +} + +impl Layer for BufferLayer +where + S: Subscriber + for<'a> LookupSpan<'a>, +{ + fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) { + let metadata = event.metadata(); + let mut timestamp = String::new(); + let _ = SystemTime.format_time(&mut Writer::new(&mut timestamp)); + + let mut field_visitor = FieldVisitor::default(); + event.record(&mut field_visitor); + + let mut spans: Vec = Vec::new(); + if let Some(scope) = ctx.event_scope(event) { + for span in scope.from_root() { + spans.push(SpanInfo { + name: span.name().to_string(), + fields: Vec::new(), + }); + } + } + + let line = LogLine { + timestamp, + level: metadata.level().to_string(), + target: metadata.target().to_string(), + fields: field_visitor.fields, + spans, + }; + + self.inner.buffer.lock().expect("poisoned").push(line); + } +} + +#[derive(Default)] +struct FieldVisitor { + fields: Vec<(String, FieldValue)>, +} + +impl FieldVisitor { + fn push(&mut self, field: &tracing::field::Field, value: FieldValue) { + self.fields.push((field.name().to_string(), value)); + } +} + +impl tracing::field::Visit for FieldVisitor { + fn record_str(&mut self, field: &tracing::field::Field, value: &str) { + self.push(field, FieldValue::Str(value.to_string())); + } + + fn record_i64(&mut self, field: &tracing::field::Field, value: i64) { + self.push(field, FieldValue::I64(value)); + } + + fn record_u64(&mut self, field: &tracing::field::Field, value: u64) { + self.push(field, FieldValue::U64(value)); + } + + fn record_i128(&mut self, field: &tracing::field::Field, value: i128) { + self.push(field, FieldValue::Other(value.to_string())); + } + + fn record_u128(&mut self, field: &tracing::field::Field, value: u128) { + self.push(field, FieldValue::Other(value.to_string())); + } + + fn record_bool(&mut self, field: &tracing::field::Field, value: bool) { + self.push(field, FieldValue::Bool(value)); + } + + fn record_f64(&mut self, field: &tracing::field::Field, value: f64) { + self.push(field, FieldValue::F64(value)); + } + + fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) { + // The implicit `message` field arrives here when the producer used a + // bare format string (`info!("hello {x}")`). Store it as a plain + // string so the dashboard does not show it wrapped in quotes from a + // generic `Debug` formatter. + if field.name() == "message" { + self.push(field, FieldValue::Str(format!("{value:?}"))); + } else { + self.push(field, FieldValue::Other(format!("{value:?}"))); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn message_is(line: &LogLine, want: &str) -> bool { + line.fields + .iter() + .any(|(k, v)| k == "message" && matches!(v, FieldValue::Str(s) if s == want)) + } + + #[test] + fn ring_buffer_rolls_over_oldest() { + let mut buf = RingBuffer::new(2, 1000); + for i in 0..5 { + buf.push(LogLine { + timestamp: format!("{i}"), + level: "INFO".into(), + target: "test".into(), + fields: Vec::new(), + spans: Vec::new(), + }); + } + let (lines, dropped) = buf.drain(10); + assert_eq!(lines.len(), 2); + assert_eq!(lines[0].timestamp, "3"); + assert_eq!(lines[1].timestamp, "4"); + assert_eq!(dropped, 3); + } + + #[test] + fn ring_buffer_throttles_per_second() { + let mut buf = RingBuffer::new(1000, 2); + for i in 0..10 { + buf.push(LogLine { + timestamp: format!("{i}"), + level: "INFO".into(), + target: "test".into(), + fields: Vec::new(), + spans: Vec::new(), + }); + } + let (lines, dropped) = buf.drain(100); + assert_eq!(lines.len(), 2); + assert_eq!(dropped, 8); + } + + #[tokio::test] + async fn collector_reload_changes_filter_then_reverts() { + let collector = match install() { + Ok(c) => c, + Err(InstallError::AlreadyInstalled) => return, + Err(e) => panic!("install: {e}"), + }; + + // Filter starts at "off" — even info lines are dropped. + tracing::info!(target: "logtest", "should not appear yet"); + let (lines, _) = collector.drain(100); + assert!(!lines.iter().any(|l| message_is(l, "should not appear yet"))); + + // Cloud raises the level to info. + collector.set_filter("info", None, None).unwrap(); + tracing::info!(target: "logtest", "first info"); + tracing::trace!(target: "logtest", "should not appear"); + let (lines, _) = collector.drain(100); + assert!( + lines + .iter() + .any(|l| l.target == "logtest" && message_is(l, "first info")) + ); + assert!(!lines.iter().any(|l| message_is(l, "should not appear"))); + + // Cloud raises again to trace, with TTL and a revert target. + collector + .set_filter("trace", Some(Duration::from_millis(150)), Some("info")) + .unwrap(); + tracing::trace!(target: "logtest", "should appear"); + let (lines, _) = collector.drain(100); + assert!(lines.iter().any(|l| message_is(l, "should appear"))); + + n0_future::time::sleep(Duration::from_millis(300)).await; + tracing::trace!(target: "logtest", "should not appear after revert"); + let (lines, _) = collector.drain(100); + assert!( + !lines + .iter() + .any(|l| message_is(l, "should not appear after revert")) + ); + } +} diff --git a/src/protocol.rs b/src/protocol.rs index 40cbbf0e..42510e55 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -30,10 +30,14 @@ pub enum IrohServicesProtocol { #[rpc(tx=oneshot::Sender>)] NameEndpoint(NameEndpoint), + + #[rpc(tx=oneshot::Sender>)] + PutLogs(PutLogs), } -/// Dedicated protocol for cloud-to-endpoint net diagnostics connections. -#[rpc_requests(message = NetDiagnosticsMessage)] +/// Dedicated protocol for cloud-to-endpoint callbacks (net diagnostics, log +/// level overrides). +#[rpc_requests(message = ClientHostMessage)] #[derive(Debug, Serialize, Deserialize)] #[allow(clippy::large_enum_variant)] pub enum ClientHostProtocol { @@ -41,6 +45,8 @@ pub enum ClientHostProtocol { Auth(Auth), #[rpc(tx=oneshot::Sender>)] RunNetworkDiagnostics(RunNetworkDiagnostics), + #[rpc(tx=oneshot::Sender>)] + SetLogLevel(SetLogLevel), } pub type RemoteResult = Result; @@ -104,3 +110,78 @@ pub struct GrantCap { pub struct NameEndpoint { pub name: String, } + +/// A single structured log line emitted by a client process. +/// +/// The shape mirrors the JSON format produced by `tracing-subscriber`'s JSON +/// formatter, with the level, target, and timestamp lifted into top-level +/// fields so the cloud can index them as columns. The remaining structured +/// fields and the span stack travel as `Vec<(String, FieldValue)>` so the +/// schema is closed and `postcard` can encode and decode it without any +/// `deserialize_any` paths. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct LogLine { + /// RFC 3339 timestamp produced at log emission time. + pub timestamp: String, + /// Log level: TRACE, DEBUG, INFO, WARN, ERROR. + pub level: String, + /// Log target (typically the originating module path). + pub target: String, + /// Structured fields attached to the event. By convention, the + /// `message` field carries the human-readable text. + pub fields: Vec<(String, FieldValue)>, + /// Active span stack, outermost first. Empty when no span is in scope. + pub spans: Vec, +} + +/// A span recorded as part of a [`LogLine`]. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct SpanInfo { + pub name: String, + pub fields: Vec<(String, FieldValue)>, +} + +/// Wire-safe representation of a structured tracing field value. +/// +/// Closed enum so `postcard` can round-trip it without `deserialize_any`. +/// Anything that is not one of the typed variants (a `Debug`-formatted +/// value, a non-finite float, a 128-bit integer) is rendered to a string +/// at the producer with [`FieldValue::Other`]. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub enum FieldValue { + Str(String), + I64(i64), + U64(u64), + F64(f64), + Bool(bool), + /// Fallback for values that do not fit the typed variants. Carries the + /// `Debug`-formatted text. + Other(String), +} + +/// A batch of log lines pushed from a client to the cloud. +#[derive(Debug, Serialize, Deserialize)] +pub struct PutLogs { + pub session_id: Uuid, + pub lines: Vec, + /// Number of lines dropped on the client since the last successful push, + /// either due to the buffer being full or the throttle being exceeded. + pub dropped: u32, +} + +/// Cloud-issued instruction to override the client's tracing filter. +#[derive(Debug, Serialize, Deserialize)] +pub struct SetLogLevel { + /// `EnvFilter`-compatible directive string (for example + /// `"info,iroh=trace,iroh_blobs=debug"`). + pub directives: String, + /// If `Some`, the client reverts after this many seconds. If `None`, the + /// override is permanent until the next call. + pub expires_in_secs: Option, + /// Directives to revert to when the TTL fires. When `None`, the client + /// reverts to its install-time default. The cloud sends the project-wide + /// default here so per-endpoint overrides decay back to project policy + /// rather than to the client's own startup setting. + #[serde(default)] + pub revert_to: Option, +} From 8db29023c36e01ede6ed4e9f1c58edbe1241835f Mon Sep 17 00:00:00 2001 From: Rae McKelvey <633012+okdistribute@users.noreply.github.com> Date: Wed, 13 May 2026 20:31:51 +0200 Subject: [PATCH 3/7] feat(logs): add file_layer for local rolling log files New opt-in API: iroh_services::logs::file_layer(config) returns a tracing Layer that writes JSON records to a rolling file plus a WorkerGuard the caller must hold for the lifetime of the process. The FileLoggerConfig builder takes the destination directory and tunes rotation, file name prefix, and retention. The existing buffer + shipper API is untouched and still works. This is the foundation for moving log persistence from cloud to device: PR 2 will switch the shipper off in favour of this layer. Adds tracing-appender 0.2 and tempfile (dev) deps. Co-Authored-By: Claude Opus 4.7 (1M context) --- Cargo.lock | 53 ++++++++++++++++ Cargo.toml | 2 + src/logs.rs | 180 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 235 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index a183ed0b..2f74647a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1835,9 +1835,11 @@ dependencies = [ "ssh-key", "strum", "temp_env_vars", + "tempfile", "thiserror 2.0.18", "tokio", "tracing", + "tracing-appender", "tracing-subscriber", "uuid", ] @@ -2017,6 +2019,12 @@ version = "0.2.185" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "52ff2c0fe9bc6cb6b14a0592c2ff4fa9ceb83eea9db979b0487cd054946a2b8f" +[[package]] +name = "linux-raw-sys" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32a66949e030da00e8c7d4434b251670a91556f4144941d37452769c25d58a53" + [[package]] name = "litemap" version = "0.8.2" @@ -3032,6 +3040,19 @@ dependencies = [ "nom", ] +[[package]] +name = "rustix" +version = "1.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6fe4565b9518b83ef4f91bb47ce29620ca828bd32cb7e408f0062e9930ba190" +dependencies = [ + "bitflags", + "errno", + "libc", + "linux-raw-sys", + "windows-sys 0.61.2", +] + [[package]] name = "rustls" version = "0.23.38" @@ -3492,6 +3513,12 @@ version = "2.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" +[[package]] +name = "symlink" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7973cce6668464ea31f176d85b13c7ab3bba2cb3b77a2ed26abd7801688010a" + [[package]] name = "syn" version = "2.0.117" @@ -3569,6 +3596,19 @@ dependencies = [ "syn", ] +[[package]] +name = "tempfile" +version = "3.27.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32497e9a4c7b38532efcdebeef879707aa9f794296a4f0244f6f69e9bc8574bd" +dependencies = [ + "fastrand", + "getrandom 0.4.2", + "once_cell", + "rustix", + "windows-sys 0.61.2", +] + [[package]] name = "thiserror" version = "1.0.69" @@ -3891,6 +3931,19 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-appender" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "050686193eb999b4bb3bc2acfa891a13da00f79734704c4b8b4ef1a10b368a3c" +dependencies = [ + "crossbeam-channel", + "symlink", + "thiserror 2.0.18", + "time", + "tracing-subscriber", +] + [[package]] name = "tracing-attributes" version = "0.1.31" diff --git a/Cargo.toml b/Cargo.toml index d44f84b1..b3e626a6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,6 +33,7 @@ tracing-subscriber = { version = "0.3.20", features = [ "fmt", "json", ] } +tracing-appender = "0.2" serde_json = "1.0.140" bytes = { version = "1.10.1", features = ["serde"] } futures-buffered = "0.2.12" @@ -52,6 +53,7 @@ built = { version = "0.8", features = ["cargo-lock"] } [dev-dependencies] rand = { version = "0.10", features = ["chacha"] } temp_env_vars = "0.2.1" +tempfile = "3" tokio = { version = "1.45", features = ["macros", "rt", "rt-multi-thread", "signal"] } diff --git a/src/logs.rs b/src/logs.rs index 4124f416..56d4bf3d 100644 --- a/src/logs.rs +++ b/src/logs.rs @@ -46,6 +46,7 @@ use std::{ collections::VecDeque, + path::{Path, PathBuf}, sync::{Arc, Mutex}, time::Instant, }; @@ -381,6 +382,145 @@ impl tracing::field::Visit for FieldVisitor { } } +/// How often the rolling file appender starts a new file. +/// +/// Re-exported from `tracing-appender` so callers don't need to depend on it +/// directly. +pub use tracing_appender::rolling::Rotation; + +/// Guard returned by [`file_layer`] that keeps the non-blocking writer's +/// worker thread alive. Drop this only at process shutdown; once dropped, +/// any buffered records still in flight are flushed and the file layer +/// stops accepting writes. +pub use tracing_appender::non_blocking::WorkerGuard; + +/// Errors raised when constructing the file logger. +#[derive(Debug, thiserror::Error)] +pub enum FileLoggerError { + /// Could not create the log directory or open the rolling appender. + #[error("file logger setup failed: {0}")] + Io(#[from] std::io::Error), + /// `tracing-appender`'s builder rejected the configuration (for example + /// an invalid filename prefix). + #[error("file logger builder rejected configuration: {0}")] + Builder(String), +} + +/// Configuration for the rolling file logger. +/// +/// Use [`FileLoggerConfig::new`] to set the destination directory and tune +/// the remaining fields with the with-style setters. The defaults are +/// daily rotation, a `iroh-services` filename prefix, and a 30-file +/// retention window. +#[derive(Debug, Clone)] +pub struct FileLoggerConfig { + dir: PathBuf, + rotation: Rotation, + file_name_prefix: String, + max_files: Option, +} + +impl FileLoggerConfig { + /// Build a config rooted at `dir`. The directory is created on first + /// write if it does not exist. + pub fn new>(dir: P) -> Self { + Self { + dir: dir.into(), + rotation: Rotation::DAILY, + file_name_prefix: "iroh-services".into(), + max_files: Some(30), + } + } + + /// Override the rotation cadence. Default: [`Rotation::DAILY`]. + pub fn with_rotation(mut self, rotation: Rotation) -> Self { + self.rotation = rotation; + self + } + + /// Override the file name stem. Rotation appends a date suffix to this. + /// Default: `iroh-services`. + pub fn with_file_name_prefix>(mut self, prefix: S) -> Self { + self.file_name_prefix = prefix.into(); + self + } + + /// Override the retention cap. `None` keeps every file forever; `Some(n)` + /// keeps at most `n` files and deletes the oldest on rotation. Default: + /// `Some(30)`. + pub fn with_max_files(mut self, max_files: Option) -> Self { + self.max_files = max_files; + self + } +} + +/// Builds a tracing layer that writes records to a rolling file under +/// `config.dir`. Returns the layer plus a [`WorkerGuard`] the caller must +/// hold for the lifetime of the process — drop it at shutdown so any +/// buffered records flush before exit. +/// +/// The layer is not filtered. Compose it with the rest of your subscriber +/// to control what reaches the file. A common pattern is to use the same +/// [`EnvFilter`] reload handle as the cloud-controlled buffer layer, so a +/// dashboard-pushed `SetLogLevel` adjusts file output too. +/// +/// # Example +/// +/// ```no_run +/// use iroh_services::logs::{FileLoggerConfig, Rotation}; +/// use tracing_subscriber::prelude::*; +/// +/// # fn main() -> anyhow::Result<()> { +/// let (file_layer, _guard) = iroh_services::logs::file_layer( +/// FileLoggerConfig::new("./logs") +/// .with_rotation(Rotation::HOURLY) +/// .with_max_files(Some(24)), +/// )?; +/// +/// tracing_subscriber::registry() +/// .with(file_layer) +/// .init(); +/// # // Keep `_guard` alive for the program lifetime. +/// # Ok(()) +/// # } +/// ``` +pub fn file_layer( + config: FileLoggerConfig, +) -> Result<(impl Layer + Send + Sync + 'static, WorkerGuard), FileLoggerError> +where + S: Subscriber + for<'a> LookupSpan<'a>, +{ + let FileLoggerConfig { + dir, + rotation, + file_name_prefix, + max_files, + } = config; + + create_dir_all(&dir)?; + + let mut builder = tracing_appender::rolling::RollingFileAppender::builder() + .rotation(rotation) + .filename_prefix(file_name_prefix); + if let Some(max) = max_files { + builder = builder.max_log_files(max); + } + let appender = builder + .build(&dir) + .map_err(|e| FileLoggerError::Builder(e.to_string()))?; + + let (writer, guard) = tracing_appender::non_blocking(appender); + let layer = tracing_subscriber::fmt::layer() + .with_writer(writer) + .with_ansi(false) + .json(); + Ok((layer, guard)) +} + +fn create_dir_all(dir: &Path) -> Result<(), FileLoggerError> { + std::fs::create_dir_all(dir).map_err(FileLoggerError::Io) +} + #[cfg(test)] mod tests { use super::*; @@ -469,4 +609,44 @@ mod tests { .any(|l| message_is(l, "should not appear after revert")) ); } + + /// `file_layer` writes records to a file in the configured directory, + /// and the WorkerGuard flushes pending writes on drop. + #[test] + fn file_layer_writes_to_disk() { + use tracing::Dispatch; + use tracing_subscriber::{Registry, layer::SubscriberExt}; + + let tmp = tempfile::tempdir().unwrap(); + let (layer, guard) = file_layer::( + FileLoggerConfig::new(tmp.path()) + .with_file_name_prefix("test") + .with_max_files(Some(2)), + ) + .expect("file_layer setup"); + + let subscriber = Registry::default().with(layer); + let dispatch = Dispatch::new(subscriber); + tracing::dispatcher::with_default(&dispatch, || { + tracing::info!(target: "file_layer_test", "hello from the file logger"); + }); + // Drop the guard so the non-blocking writer flushes its queue. + drop(guard); + + // Find a file produced by the rolling appender and confirm our line + // is in it. + let mut found = false; + for entry in std::fs::read_dir(tmp.path()).unwrap() { + let entry = entry.unwrap(); + if !entry.file_name().to_string_lossy().starts_with("test") { + continue; + } + let contents = std::fs::read_to_string(entry.path()).unwrap(); + if contents.contains("hello from the file logger") { + found = true; + break; + } + } + assert!(found, "expected log line to be written to a test.* file"); + } } From fbd4404374601dcc245aea3da734022f347a0029 Mon Sep 17 00:00:00 2001 From: Rae McKelvey <633012+okdistribute@users.noreply.github.com> Date: Wed, 13 May 2026 20:41:15 +0200 Subject: [PATCH 4/7] feat(logs)!: replace cloud shipper with local file logger [breaking] BREAKING: removes the buffer-and-ship path. Logs are now written to local rolling files on the device; the cloud no longer stores log content. The directive-override path (SetLogLevel) is unchanged and now controls what reaches the local file. logs.rs: drop LogCollector::buffered/drain, BufferLayer, RingBuffer, FieldVisitor, DEFAULT_BUFFER_CAPACITY, DEFAULT_RATE_PER_SECOND. The collector now wraps the reload handle only and applies the cloud- controlled EnvFilter to the file_layer. install(config) and layer(config) take FileLoggerConfig and return a WorkerGuard. client.rs: drop with_log_collection, log_flush_interval, log_max_batch builder methods; drop log_collector/log_flush_interval/log_max_batch fields; drop _log_flush_task; drop ClientActorMessage::PutLogs and ClientActor::put_logs; drop run_log_flush. protocol.rs: drop LogLine, SpanInfo, FieldValue, PutLogs and the IrohServicesProtocol::PutLogs variant. Examples updated; tests cover both the unfiltered file_layer and the cloud-filtered layer end-to-end. Bumps version 0.14 -> 0.15. Co-Authored-By: Claude Opus 4.7 (1M context) --- Cargo.lock | 2 +- Cargo.toml | 2 +- examples/logs.rs | 50 ++--- src/client.rs | 130 +----------- src/logs.rs | 510 +++++++++++++---------------------------------- src/protocol.rs | 61 ------ 6 files changed, 164 insertions(+), 591 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2f74647a..c574ef9d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1810,7 +1810,7 @@ dependencies = [ [[package]] name = "iroh-services" -version = "0.14.0" +version = "0.15.0" dependencies = [ "anyhow", "built", diff --git a/Cargo.toml b/Cargo.toml index b3e626a6..15662353 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "iroh-services" -version = "0.14.0" +version = "0.15.0" edition = "2024" readme = "README.md" description = "p2p quic connections dialed by public key" diff --git a/examples/logs.rs b/examples/logs.rs index 6dae05db..00f718ac 100644 --- a/examples/logs.rs +++ b/examples/logs.rs @@ -1,39 +1,36 @@ //! Log collection example. //! -//! Demonstrates how to install the iroh-services log collector alongside the -//! standard `tracing-subscriber` `fmt` layer, ship records to the cloud over -//! the iroh-services RPC channel, and let the cloud override the local -//! filter at runtime via `SetLogLevel`. +//! Demonstrates installing the iroh-services file logger and letting the +//! cloud override the local filter at runtime via `SetLogLevel`. //! //! The level filter starts at `off`. The cloud pushes a level after this -//! endpoint is opted into log collection from the dashboard or REST API; both -//! the buffered cloud-shipping layer and the stderr fmt layer respect that -//! level. Anything emitted before the cloud responds is silently dropped. +//! endpoint is opted into log collection from the dashboard or REST API. +//! Anything emitted before the cloud responds is silently dropped. //! //! Run with: cargo run --example logs use std::time::Duration; -use tracing_subscriber::prelude::*; use anyhow::Result; use iroh::{Endpoint, endpoint::presets, protocol::Router}; use iroh_services::{ - API_SECRET_ENV_VAR_NAME, ApiSecret, CLIENT_HOST_ALPN, Client, ClientHost, caps::LogsCap, logs, + API_SECRET_ENV_VAR_NAME, ApiSecret, CLIENT_HOST_ALPN, Client, ClientHost, + caps::LogsCap, + logs::{self, FileLoggerConfig, Rotation}, }; use tracing::info; #[tokio::main] async fn main() -> Result<()> { - // 1. Build the buffer layer and compose it with a stderr fmt layer behind - // the same reloadable filter, so local console output mirrors what - // gets shipped. The cloud raises the filter from `off` after this - // endpoint is opted in via the dashboard. - let (collector, log_layer) = logs::layer(); - tracing_subscriber::registry() - .with(log_layer) - .with(tracing_subscriber::fmt::layer().with_writer(std::io::stderr)) - .try_init() - .map_err(|e| anyhow::anyhow!("failed to install tracing subscriber: {e}"))?; + // 1. Install the cloud-controlled file logger. Records land under + // `./logs/` and roll over hourly with up to 24 files kept. The + // WorkerGuard must outlive the process; drop it in `main`'s tail + // so any buffered records flush on exit. + let (collector, _guard) = logs::install( + FileLoggerConfig::new("./logs") + .with_rotation(Rotation::HOURLY) + .with_max_files(Some(24)), + )?; // 2. Create the endpoint and parse the API secret so we know which // cloud endpoint to grant SetLevel to. @@ -43,13 +40,10 @@ async fn main() -> Result<()> { let name = format!("logs-example-{}", &endpoint.id().to_string()[..8]); - // 3. Build the client. `with_log_collection(collector.clone())` starts a - // background task that drains the buffer every second and ships the - // batch as a `PutLogs` RPC. + // 3. Build the client. let client = Client::builder(&endpoint) .api_secret(secret)? .name(name)? - .with_log_collection(collector.clone()) .build() .await?; @@ -67,16 +61,16 @@ async fn main() -> Result<()> { }); // 5. Accept the cloud's callback connections on `CLIENT_HOST_ALPN`. The - // `ClientHost` needs the same collector so the `SetLogLevel` request - // can hot-reload the filter. + // `ClientHost` needs the collector so the `SetLogLevel` request can + // hot-reload the local filter. let host = ClientHost::new(&endpoint).with_log_collector(collector); let router = Router::builder(endpoint) .accept(CLIENT_HOST_ALPN, host) .spawn(); - // 6. Emit an info log every other second forever. These will surface on - // the dashboard's Logs page (and on this process's stderr) once the - // endpoint is opted into log collection at `info` level or above. + // 6. Emit an info log every other second forever. Records are written + // to the local rolling file once the cloud raises the level above + // `off`. println!("emitting logs; ctrl+c to exit."); let mut tick = tokio::time::interval(Duration::from_secs(2)); let mut counter: u64 = 0; diff --git a/src/client.rs b/src/client.rs index 7c2cc814..a127a70a 100644 --- a/src/client.rs +++ b/src/client.rs @@ -17,10 +17,9 @@ use uuid::Uuid; use crate::{ api_secret::ApiSecret, caps::Caps, - logs::LogCollector, net_diagnostics::{DiagnosticsReport, checks::run_diagnostics}, protocol::{ - ALPN, Auth, IrohServicesClient, NameEndpoint, Ping, Pong, PutLogs, PutMetrics, + ALPN, Auth, IrohServicesClient, NameEndpoint, Ping, Pong, PutMetrics, PutNetworkDiagnostics, RemoteError, }, }; @@ -55,7 +54,6 @@ pub struct Client { endpoint: Endpoint, message_channel: tokio::sync::mpsc::Sender, _actor_task: Arc>, - _log_flush_task: Option>>, } /// ClientBuilder provides configures and builds a iroh-services client, typically @@ -69,19 +67,11 @@ pub struct ClientBuilder { metrics_interval: Option, remote: Option, registry: Registry, - log_collector: Option, - log_flush_interval: Duration, - log_max_batch: usize, } const DEFAULT_CAP_EXPIRY: Duration = Duration::from_secs(60 * 60 * 24 * 30); // 1 month pub const API_SECRET_ENV_VAR_NAME: &str = "IROH_SERVICES_API_SECRET"; -/// Default interval between log batch flushes when log collection is enabled. -pub const DEFAULT_LOG_FLUSH_INTERVAL: Duration = Duration::from_secs(1); -/// Default maximum batch size pushed in a single PutLogs request. -pub const DEFAULT_LOG_MAX_BATCH: usize = 200; - impl ClientBuilder { pub fn new(endpoint: &Endpoint) -> Self { let mut registry = Registry::default(); @@ -95,35 +85,9 @@ impl ClientBuilder { metrics_interval: Some(Duration::from_secs(60)), remote: None, registry, - log_collector: None, - log_flush_interval: DEFAULT_LOG_FLUSH_INTERVAL, - log_max_batch: DEFAULT_LOG_MAX_BATCH, } } - /// Enables periodic shipment of buffered log lines to iroh-services. - /// - /// The collector is shared with [`crate::client_host::ClientHost`] when - /// runtime log-level overrides are needed; clone it before passing so both - /// sides hold a handle. - pub fn with_log_collection(mut self, collector: LogCollector) -> Self { - self.log_collector = Some(collector); - self - } - - /// Override the log batch flush interval. Defaults to one second. - pub fn log_flush_interval(mut self, interval: Duration) -> Self { - self.log_flush_interval = interval; - self - } - - /// Override the maximum number of lines included in a single PutLogs - /// request. Defaults to [`DEFAULT_LOG_MAX_BATCH`]. - pub fn log_max_batch(mut self, max: usize) -> Self { - self.log_max_batch = max; - self - } - /// Register a metrics group to forward to iroh-services /// /// The default registered metrics uses only the endpoint @@ -266,20 +230,10 @@ impl ClientBuilder { .run(self.name, self.registry, self.metrics_interval, rx), )); - let log_flush_task = self.log_collector.map(|collector| { - let message_channel = tx.clone(); - let interval = self.log_flush_interval; - let max_batch = self.log_max_batch; - Arc::new(AbortOnDropHandle::new(n0_future::task::spawn( - run_log_flush(message_channel, collector, interval, max_batch, session_id), - ))) - }); - Ok(Client { endpoint: self.endpoint, message_channel: tx, _actor_task: Arc::new(actor_task), - _log_flush_task: log_flush_task, }) } } @@ -476,10 +430,6 @@ enum ClientActorMessage { report: Box, done: oneshot::Sender>, }, - PutLogs { - request: PutLogs, - done: oneshot::Sender>, - }, ReadName { done: oneshot::Sender>, }, @@ -560,13 +510,6 @@ impl ClientActor { warn!("failed to publish network diagnostics: {:#?}", err); } } - ClientActorMessage::PutLogs{ request, done } => { - let res = self.put_logs(request).await; - if let Err(err) = done.send(res) { - debug!("failed to publish logs: {:#?}", err); - self.authorized = false; - } - } } } _ = async { @@ -675,77 +618,6 @@ impl ClientActor { Ok(()) } - - async fn put_logs(&mut self, request: PutLogs) -> Result<(), Error> { - trace!( - lines = request.lines.len(), - dropped = request.dropped, - "client actor put logs" - ); - self.auth().await?; - - self.client - .rpc(request) - .await - .map_err(|_| RemoteError::InternalServerError)??; - - Ok(()) - } -} - -async fn run_log_flush( - message_channel: tokio::sync::mpsc::Sender, - collector: LogCollector, - interval: Duration, - max_batch: usize, - session_id: Uuid, -) { - const INITIAL_BACKOFF: Duration = Duration::from_millis(500); - const MAX_BACKOFF: Duration = Duration::from_secs(30); - - let mut ticker = n0_future::time::interval(interval); - // After a slow RPC the default `Burst` behavior would fire several - // ticks back-to-back; `Delay` waits a full interval from the previous - // completed tick. - ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); - let mut backoff = INITIAL_BACKOFF; - loop { - ticker.tick().await; - let (lines, dropped) = collector.drain(max_batch); - if lines.is_empty() && dropped == 0 { - backoff = INITIAL_BACKOFF; - continue; - } - let request = PutLogs { - session_id, - lines, - dropped, - }; - let (tx, rx) = oneshot::channel(); - if message_channel - .send(ClientActorMessage::PutLogs { request, done: tx }) - .await - .is_err() - { - // Mailbox closed only when the actor task has terminated; that - // means the entire client is gone and there is nothing to do. - debug!("log flush stopped: client actor channel closed"); - return; - } - match rx.await { - Ok(Ok(())) => { - backoff = INITIAL_BACKOFF; - } - // Either the RPC failed (Ok(Err)) or the actor dropped the - // response sender mid-handoff (Err(_)). Both are transient: keep - // ticking and back off so the next attempt happens later. - other => { - debug!(?other, ?backoff, "log flush attempt failed; backing off"); - n0_future::time::sleep(backoff).await; - backoff = (backoff * 2).min(MAX_BACKOFF); - } - } - } } async fn set_name_inner( diff --git a/src/logs.rs b/src/logs.rs index 56d4bf3d..60b36886 100644 --- a/src/logs.rs +++ b/src/logs.rs @@ -1,88 +1,75 @@ -//! Client-side log collection: a `tracing-subscriber` layer that buffers -//! structured log records for shipment to iroh-services, plus a reload handle -//! that lets the cloud control the level filter at runtime. +//! Client-side log collection: a `tracing-subscriber` layer that writes +//! structured log records to rolling files on the local filesystem, plus a +//! reload handle that lets the cloud control the level filter at runtime. //! -//! # The cloud is the source of truth +//! # The cloud is the source of truth for the level //! //! The level filter starts at `off`. No tracing events are captured until the //! cloud pushes a [`crate::protocol::SetLogLevel`] over the [`ClientHost`] //! channel. The cloud sends one immediately after authenticating a connected -//! endpoint, derived from the per-endpoint -//! `endpoint_log_settings` row (when present) plus the project default. -//! -//! Concretely this means the install argument is empty: the client process -//! does not get to choose its own log level. The level you see is whatever -//! the dashboard or REST API has decided. +//! endpoint, derived from the per-endpoint `endpoint_log_settings` row (when +//! present) plus the project default. //! //! [`ClientHost`]: crate::ClientHost //! +//! # Files live on the device +//! +//! Records land in rolling JSON files under a caller-supplied directory. +//! Operators view, ship, or aggregate them with whatever tooling they +//! already use (`tail`, `journalctl`, `vector`, etc.). +//! //! # Typical usage //! //! ```no_run -//! use iroh_services::logs; -//! -//! # async fn run() -> anyhow::Result<()> { -//! // Buffer-only subscriber, filter starts at `off`. -//! let collector = logs::install()?; +//! use iroh_services::logs::{self, FileLoggerConfig}; //! -//! // Compose with a stderr fmt layer via `logs::layer()` to also render -//! // filtered events locally: -//! // -//! // use tracing_subscriber::prelude::*; -//! // let (collector, log_layer) = iroh_services::logs::layer(); -//! // tracing_subscriber::registry() -//! // .with(log_layer) -//! // .with(tracing_subscriber::fmt::layer()) -//! // .init(); +//! # fn main() -> anyhow::Result<()> { +//! // Installs a global subscriber. The filter starts at `off`; once a +//! // ClientHost connection is up, the cloud pushes a SetLogLevel that +//! // raises it to whatever the dashboard says. +//! let (collector, _guard) = logs::install(FileLoggerConfig::new("./logs"))?; //! -//! // Hand the collector to the client builder so it pushes batches over RPC, -//! // and to the ClientHost so the cloud can override the level dynamically. +//! // Hand the collector to ClientHost so it can call `collector.set_filter` +//! // when the cloud pushes overrides. +//! # let _ = collector; //! # Ok(()) //! # } //! ``` //! -//! Backed by a bounded VecDeque of [`LogLine`]; the oldest entries are dropped -//! when the buffer fills, with the drop count reported on the next batch. +//! Compose with additional layers (for example, a stderr fmt layer) via +//! [`layer`]: +//! +//! ```no_run +//! use iroh_services::logs::{self, FileLoggerConfig}; +//! use tracing_subscriber::prelude::*; +//! +//! # fn main() -> anyhow::Result<()> { +//! let (collector, file_layer, _guard) = logs::layer(FileLoggerConfig::new("./logs"))?; +//! tracing_subscriber::registry() +//! .with(file_layer) +//! .with(tracing_subscriber::fmt::layer()) +//! .try_init() +//! .ok(); +//! # let _ = collector; +//! # Ok(()) +//! # } +//! ``` use std::{ - collections::VecDeque, path::{Path, PathBuf}, sync::{Arc, Mutex}, - time::Instant, }; use n0_future::{ task::{AbortOnDropHandle, JoinHandle}, time::Duration, }; -use tracing::{Event, Subscriber, debug, warn}; +use tracing::{Subscriber, debug, warn}; use tracing_subscriber::{ - EnvFilter, Layer, Registry, - fmt::{ - format::Writer, - time::{FormatTime, SystemTime}, - }, - layer::{Context, SubscriberExt as _}, - registry::LookupSpan, - reload, + EnvFilter, Layer, Registry, layer::SubscriberExt as _, registry::LookupSpan, reload, util::SubscriberInitExt as _, }; -use crate::protocol::{FieldValue, LogLine, SpanInfo}; - -/// Maximum number of buffered log lines awaiting cloud shipment. -/// -/// When the buffer is full, the oldest line is dropped to make room and the -/// drop counter is incremented. KISS default; tune from real usage. -pub const DEFAULT_BUFFER_CAPACITY: usize = 1000; - -/// Maximum log emission rate per second per process. -/// -/// Lines beyond this rate are dropped (counted in the drop counter). The -/// default rate is generous enough to capture useful debug-level traffic -/// without unbounded growth from a runaway log loop. -pub const DEFAULT_RATE_PER_SECOND: u32 = 100; - /// Errors that can occur while installing the log collector. #[derive(Debug, thiserror::Error)] pub enum InstallError { @@ -92,6 +79,10 @@ pub enum InstallError { /// The supplied directives string was rejected by `EnvFilter`. #[error("invalid filter directives: {0}")] InvalidDirectives(String), + /// File logger setup failed (could not create directory, open appender, + /// etc.). + #[error("file logger setup failed: {0}")] + FileLogger(#[from] FileLoggerError), } /// Errors that can occur while changing the active filter at runtime. @@ -105,83 +96,23 @@ pub enum SetFilterError { ReloadFailed, } -/// Handle to the buffered log collector. Cheap to clone; all clones share the -/// same backing buffer and reload handle. +/// Handle to the cloud-controlled tracing filter. Cheap to clone; all clones +/// share the same backing reload handle. #[derive(Clone)] pub struct LogCollector { inner: Arc, } struct CollectorInner { - buffer: Mutex, reload_handle: reload::Handle, revert_task: Mutex>>, } -/// Off-state directive. The buffer captures nothing until the cloud sends -/// a `SetLogLevel` with something more permissive. +/// Off-state directive. Nothing is captured until the cloud sends a +/// `SetLogLevel` with something more permissive. const OFF_DIRECTIVES: &str = "off"; -struct RingBuffer { - lines: VecDeque, - dropped: u32, - capacity: usize, - rate_per_second: u32, - window_start: Instant, - window_count: u32, -} - -impl RingBuffer { - fn new(capacity: usize, rate_per_second: u32) -> Self { - Self { - lines: VecDeque::with_capacity(capacity.min(64)), - dropped: 0, - capacity, - rate_per_second, - window_start: Instant::now(), - window_count: 0, - } - } - - fn push(&mut self, line: LogLine) { - let now = Instant::now(); - if now.duration_since(self.window_start) >= Duration::from_secs(1) { - self.window_start = now; - self.window_count = 0; - } - if self.window_count >= self.rate_per_second { - self.dropped = self.dropped.saturating_add(1); - return; - } - self.window_count += 1; - - if self.lines.len() == self.capacity { - self.lines.pop_front(); - self.dropped = self.dropped.saturating_add(1); - } - self.lines.push_back(line); - } - - fn drain(&mut self, max: usize) -> (Vec, u32) { - let take = self.lines.len().min(max); - let lines: Vec = self.lines.drain(..take).collect(); - let dropped = std::mem::take(&mut self.dropped); - (lines, dropped) - } -} - impl LogCollector { - /// Returns the current number of buffered lines. - pub fn buffered(&self) -> usize { - self.inner.buffer.lock().expect("poisoned").lines.len() - } - - /// Drains up to `max` lines from the buffer, along with the count of lines - /// dropped since the last drain. - pub fn drain(&self, max: usize) -> (Vec, u32) { - self.inner.buffer.lock().expect("poisoned").drain(max) - } - /// Sets the active filter directives. When `expires_in` is set, /// schedules a revert after that duration. The revert target is /// `revert_to` when supplied; `None` means revert to `off`. @@ -231,155 +162,57 @@ impl LogCollector { impl std::fmt::Debug for LogCollector { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("LogCollector") - .field("buffered", &self.buffered()) - .finish() + f.debug_struct("LogCollector").finish_non_exhaustive() } } -/// Installs a global tracing subscriber whose only output is a JSON-buffering -/// layer that ships records to the cloud. The level filter starts at `off`; -/// the cloud must push a `SetLogLevel` for any events to be captured. +/// Installs a global tracing subscriber whose only output is a rolling +/// file appender under `config.dir`. The level filter starts at `off`; the +/// cloud must push a `SetLogLevel` for any events to be captured. +/// +/// Returns the [`LogCollector`] to hand to [`crate::ClientHost`] so it can +/// apply runtime overrides, and a [`WorkerGuard`] the caller must hold for +/// the lifetime of the process so the non-blocking writer flushes on exit. /// -/// Call exactly once at process start. For local console output in addition -/// to cloud shipping, use [`layer`] and compose your own subscriber. -pub fn install() -> Result { - let (collector, layer) = layer(); +/// Call exactly once at process start. For composition with other layers, +/// use [`layer`]. +pub fn install(config: FileLoggerConfig) -> Result<(LogCollector, WorkerGuard), InstallError> { + let (collector, file_layer, guard) = layer(config)?; tracing_subscriber::registry() - .with(layer) + .with(file_layer) .try_init() .map_err(|_| InstallError::AlreadyInstalled)?; - debug!("iroh-services log collector installed"); - Ok(collector) + debug!("iroh-services file logger installed"); + Ok((collector, guard)) } -/// Builds the buffer layer and its [`LogCollector`] handle without installing -/// a global subscriber. Use this when composing the collector with other -/// layers; it returns the layer pre-wrapped in the reloadable filter. -/// -/// Typical pattern for buffer + stderr fmt: -/// -/// ```no_run -/// use iroh_services::logs; -/// use tracing_subscriber::prelude::*; -/// -/// let (collector, log_layer) = logs::layer(); -/// tracing_subscriber::registry() -/// .with(log_layer) -/// .with(tracing_subscriber::fmt::layer()) -/// .try_init() -/// .ok(); -/// # let _ = collector; -/// ``` -pub fn layer() -> (LogCollector, impl Layer + Send + Sync + 'static) { +/// Builds the cloud-controlled file layer and its [`LogCollector`] without +/// installing a global subscriber. Use this when composing the file layer +/// with other layers; the returned layer is pre-wrapped in the reloadable +/// filter so the cloud's `SetLogLevel` overrides take effect. +pub fn layer( + config: FileLoggerConfig, +) -> Result< + ( + LogCollector, + impl Layer + Send + Sync + 'static, + WorkerGuard, + ), + InstallError, +> { // `EnvFilter::try_new("off")` cannot fail; "off" is always valid. let filter = EnvFilter::try_new(OFF_DIRECTIVES).expect("'off' is always a valid directive"); let (filter, reload_handle) = reload::Layer::new(filter); + let (file_layer, guard) = file_layer::(config)?; + let layer = file_layer.with_filter(filter); + let inner = Arc::new(CollectorInner { - buffer: Mutex::new(RingBuffer::new( - DEFAULT_BUFFER_CAPACITY, - DEFAULT_RATE_PER_SECOND, - )), reload_handle, revert_task: Mutex::new(None), }); - let collector = LogCollector { - inner: inner.clone(), - }; - let buffer_layer = BufferLayer { inner }; - (collector, buffer_layer.with_filter(filter)) -} - -struct BufferLayer { - inner: Arc, -} - -impl Layer for BufferLayer -where - S: Subscriber + for<'a> LookupSpan<'a>, -{ - fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) { - let metadata = event.metadata(); - let mut timestamp = String::new(); - let _ = SystemTime.format_time(&mut Writer::new(&mut timestamp)); - - let mut field_visitor = FieldVisitor::default(); - event.record(&mut field_visitor); - - let mut spans: Vec = Vec::new(); - if let Some(scope) = ctx.event_scope(event) { - for span in scope.from_root() { - spans.push(SpanInfo { - name: span.name().to_string(), - fields: Vec::new(), - }); - } - } - - let line = LogLine { - timestamp, - level: metadata.level().to_string(), - target: metadata.target().to_string(), - fields: field_visitor.fields, - spans, - }; - - self.inner.buffer.lock().expect("poisoned").push(line); - } -} - -#[derive(Default)] -struct FieldVisitor { - fields: Vec<(String, FieldValue)>, -} - -impl FieldVisitor { - fn push(&mut self, field: &tracing::field::Field, value: FieldValue) { - self.fields.push((field.name().to_string(), value)); - } -} - -impl tracing::field::Visit for FieldVisitor { - fn record_str(&mut self, field: &tracing::field::Field, value: &str) { - self.push(field, FieldValue::Str(value.to_string())); - } - - fn record_i64(&mut self, field: &tracing::field::Field, value: i64) { - self.push(field, FieldValue::I64(value)); - } - - fn record_u64(&mut self, field: &tracing::field::Field, value: u64) { - self.push(field, FieldValue::U64(value)); - } - - fn record_i128(&mut self, field: &tracing::field::Field, value: i128) { - self.push(field, FieldValue::Other(value.to_string())); - } - - fn record_u128(&mut self, field: &tracing::field::Field, value: u128) { - self.push(field, FieldValue::Other(value.to_string())); - } - - fn record_bool(&mut self, field: &tracing::field::Field, value: bool) { - self.push(field, FieldValue::Bool(value)); - } - - fn record_f64(&mut self, field: &tracing::field::Field, value: f64) { - self.push(field, FieldValue::F64(value)); - } - - fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) { - // The implicit `message` field arrives here when the producer used a - // bare format string (`info!("hello {x}")`). Store it as a plain - // string so the dashboard does not show it wrapped in quotes from a - // generic `Debug` formatter. - if field.name() == "message" { - self.push(field, FieldValue::Str(format!("{value:?}"))); - } else { - self.push(field, FieldValue::Other(format!("{value:?}"))); - } - } + let collector = LogCollector { inner }; + Ok((collector, layer, guard)) } /// How often the rolling file appender starts a new file. @@ -388,10 +221,10 @@ impl tracing::field::Visit for FieldVisitor { /// directly. pub use tracing_appender::rolling::Rotation; -/// Guard returned by [`file_layer`] that keeps the non-blocking writer's -/// worker thread alive. Drop this only at process shutdown; once dropped, -/// any buffered records still in flight are flushed and the file layer -/// stops accepting writes. +/// Guard returned by [`file_layer`] / [`layer`] / [`install`] that keeps the +/// non-blocking writer's worker thread alive. Drop this only at process +/// shutdown; once dropped, any buffered records still in flight are flushed +/// and the file layer stops accepting writes. pub use tracing_appender::non_blocking::WorkerGuard; /// Errors raised when constructing the file logger. @@ -454,36 +287,14 @@ impl FileLoggerConfig { } } -/// Builds a tracing layer that writes records to a rolling file under -/// `config.dir`. Returns the layer plus a [`WorkerGuard`] the caller must -/// hold for the lifetime of the process — drop it at shutdown so any -/// buffered records flush before exit. -/// -/// The layer is not filtered. Compose it with the rest of your subscriber -/// to control what reaches the file. A common pattern is to use the same -/// [`EnvFilter`] reload handle as the cloud-controlled buffer layer, so a -/// dashboard-pushed `SetLogLevel` adjusts file output too. -/// -/// # Example +/// Builds an unfiltered tracing layer that writes records to a rolling +/// file under `config.dir`. Returns the layer plus a [`WorkerGuard`] the +/// caller must hold for the lifetime of the process — drop it at shutdown +/// so any buffered records flush before exit. /// -/// ```no_run -/// use iroh_services::logs::{FileLoggerConfig, Rotation}; -/// use tracing_subscriber::prelude::*; -/// -/// # fn main() -> anyhow::Result<()> { -/// let (file_layer, _guard) = iroh_services::logs::file_layer( -/// FileLoggerConfig::new("./logs") -/// .with_rotation(Rotation::HOURLY) -/// .with_max_files(Some(24)), -/// )?; -/// -/// tracing_subscriber::registry() -/// .with(file_layer) -/// .init(); -/// # // Keep `_guard` alive for the program lifetime. -/// # Ok(()) -/// # } -/// ``` +/// Most callers want [`layer`] or [`install`] instead, which apply the +/// cloud-controlled `EnvFilter` reload handle. Use this when you want a +/// plain file appender with no cloud filter integration. pub fn file_layer( config: FileLoggerConfig, ) -> Result<(impl Layer + Send + Sync + 'static, WorkerGuard), FileLoggerError> @@ -525,91 +336,6 @@ fn create_dir_all(dir: &Path) -> Result<(), FileLoggerError> { mod tests { use super::*; - fn message_is(line: &LogLine, want: &str) -> bool { - line.fields - .iter() - .any(|(k, v)| k == "message" && matches!(v, FieldValue::Str(s) if s == want)) - } - - #[test] - fn ring_buffer_rolls_over_oldest() { - let mut buf = RingBuffer::new(2, 1000); - for i in 0..5 { - buf.push(LogLine { - timestamp: format!("{i}"), - level: "INFO".into(), - target: "test".into(), - fields: Vec::new(), - spans: Vec::new(), - }); - } - let (lines, dropped) = buf.drain(10); - assert_eq!(lines.len(), 2); - assert_eq!(lines[0].timestamp, "3"); - assert_eq!(lines[1].timestamp, "4"); - assert_eq!(dropped, 3); - } - - #[test] - fn ring_buffer_throttles_per_second() { - let mut buf = RingBuffer::new(1000, 2); - for i in 0..10 { - buf.push(LogLine { - timestamp: format!("{i}"), - level: "INFO".into(), - target: "test".into(), - fields: Vec::new(), - spans: Vec::new(), - }); - } - let (lines, dropped) = buf.drain(100); - assert_eq!(lines.len(), 2); - assert_eq!(dropped, 8); - } - - #[tokio::test] - async fn collector_reload_changes_filter_then_reverts() { - let collector = match install() { - Ok(c) => c, - Err(InstallError::AlreadyInstalled) => return, - Err(e) => panic!("install: {e}"), - }; - - // Filter starts at "off" — even info lines are dropped. - tracing::info!(target: "logtest", "should not appear yet"); - let (lines, _) = collector.drain(100); - assert!(!lines.iter().any(|l| message_is(l, "should not appear yet"))); - - // Cloud raises the level to info. - collector.set_filter("info", None, None).unwrap(); - tracing::info!(target: "logtest", "first info"); - tracing::trace!(target: "logtest", "should not appear"); - let (lines, _) = collector.drain(100); - assert!( - lines - .iter() - .any(|l| l.target == "logtest" && message_is(l, "first info")) - ); - assert!(!lines.iter().any(|l| message_is(l, "should not appear"))); - - // Cloud raises again to trace, with TTL and a revert target. - collector - .set_filter("trace", Some(Duration::from_millis(150)), Some("info")) - .unwrap(); - tracing::trace!(target: "logtest", "should appear"); - let (lines, _) = collector.drain(100); - assert!(lines.iter().any(|l| message_is(l, "should appear"))); - - n0_future::time::sleep(Duration::from_millis(300)).await; - tracing::trace!(target: "logtest", "should not appear after revert"); - let (lines, _) = collector.drain(100); - assert!( - !lines - .iter() - .any(|l| message_is(l, "should not appear after revert")) - ); - } - /// `file_layer` writes records to a file in the configured directory, /// and the WorkerGuard flushes pending writes on drop. #[test] @@ -630,11 +356,8 @@ mod tests { tracing::dispatcher::with_default(&dispatch, || { tracing::info!(target: "file_layer_test", "hello from the file logger"); }); - // Drop the guard so the non-blocking writer flushes its queue. drop(guard); - // Find a file produced by the rolling appender and confirm our line - // is in it. let mut found = false; for entry in std::fs::read_dir(tmp.path()).unwrap() { let entry = entry.unwrap(); @@ -649,4 +372,49 @@ mod tests { } assert!(found, "expected log line to be written to a test.* file"); } + + /// The cloud-controlled `layer` starts captured-nothing and only writes + /// after `set_filter` raises the level. Verifies the reload handle is + /// wired to the file layer end-to-end. + #[tokio::test(flavor = "current_thread")] + async fn cloud_filter_controls_file_writes() { + use tracing::Dispatch; + + let tmp = tempfile::tempdir().unwrap(); + let (collector, log_layer, guard) = + layer(FileLoggerConfig::new(tmp.path()).with_file_name_prefix("controlled")).unwrap(); + + let subscriber = Registry::default().with(log_layer); + let dispatch = Dispatch::new(subscriber); + tracing::dispatcher::with_default(&dispatch, || { + // Captured nothing yet — filter is "off". + tracing::info!(target: "logtest", "before-set"); + + collector + .set_filter("info", None, None) + .expect("set_filter to info"); + tracing::info!(target: "logtest", "after-set"); + }); + drop(guard); + + let mut combined = String::new(); + for entry in std::fs::read_dir(tmp.path()).unwrap() { + let entry = entry.unwrap(); + if entry + .file_name() + .to_string_lossy() + .starts_with("controlled") + { + combined.push_str(&std::fs::read_to_string(entry.path()).unwrap()); + } + } + assert!( + !combined.contains("before-set"), + "before-set should be filtered out, got: {combined}" + ); + assert!( + combined.contains("after-set"), + "after-set should be written after set_filter, got: {combined}" + ); + } } diff --git a/src/protocol.rs b/src/protocol.rs index 42510e55..6401bcae 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -30,9 +30,6 @@ pub enum IrohServicesProtocol { #[rpc(tx=oneshot::Sender>)] NameEndpoint(NameEndpoint), - - #[rpc(tx=oneshot::Sender>)] - PutLogs(PutLogs), } /// Dedicated protocol for cloud-to-endpoint callbacks (net diagnostics, log @@ -111,64 +108,6 @@ pub struct NameEndpoint { pub name: String, } -/// A single structured log line emitted by a client process. -/// -/// The shape mirrors the JSON format produced by `tracing-subscriber`'s JSON -/// formatter, with the level, target, and timestamp lifted into top-level -/// fields so the cloud can index them as columns. The remaining structured -/// fields and the span stack travel as `Vec<(String, FieldValue)>` so the -/// schema is closed and `postcard` can encode and decode it without any -/// `deserialize_any` paths. -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] -pub struct LogLine { - /// RFC 3339 timestamp produced at log emission time. - pub timestamp: String, - /// Log level: TRACE, DEBUG, INFO, WARN, ERROR. - pub level: String, - /// Log target (typically the originating module path). - pub target: String, - /// Structured fields attached to the event. By convention, the - /// `message` field carries the human-readable text. - pub fields: Vec<(String, FieldValue)>, - /// Active span stack, outermost first. Empty when no span is in scope. - pub spans: Vec, -} - -/// A span recorded as part of a [`LogLine`]. -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] -pub struct SpanInfo { - pub name: String, - pub fields: Vec<(String, FieldValue)>, -} - -/// Wire-safe representation of a structured tracing field value. -/// -/// Closed enum so `postcard` can round-trip it without `deserialize_any`. -/// Anything that is not one of the typed variants (a `Debug`-formatted -/// value, a non-finite float, a 128-bit integer) is rendered to a string -/// at the producer with [`FieldValue::Other`]. -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] -pub enum FieldValue { - Str(String), - I64(i64), - U64(u64), - F64(f64), - Bool(bool), - /// Fallback for values that do not fit the typed variants. Carries the - /// `Debug`-formatted text. - Other(String), -} - -/// A batch of log lines pushed from a client to the cloud. -#[derive(Debug, Serialize, Deserialize)] -pub struct PutLogs { - pub session_id: Uuid, - pub lines: Vec, - /// Number of lines dropped on the client since the last successful push, - /// either due to the buffer being full or the throttle being exceeded. - pub dropped: u32, -} - /// Cloud-issued instruction to override the client's tracing filter. #[derive(Debug, Serialize, Deserialize)] pub struct SetLogLevel { From 01c2294c54fbebcd75c08fda93e1518757a07d60 Mon Sep 17 00:00:00 2001 From: Rae McKelvey <633012+okdistribute@users.noreply.github.com> Date: Thu, 14 May 2026 10:48:47 +0200 Subject: [PATCH 5/7] feat(logs): FetchLogs RPC streams current rolling file to cloud Adds a new cloud-to-endpoint streaming RPC on ClientHostProtocol: the cloud asks for the contents of the endpoint's currently-active rolling log file, the SDK reads it in 64 KiB chunks and streams the bytes back over an mpsc channel. Capped by an optional max_bytes on the request. New LogsCap::Fetch capability gates the call; missing caps surface as a terminal MissingCapability item on the stream. With no log_collector configured the endpoint returns an AuthError chunk. LogCollector now carries its rolling appender's log_dir and file_name_prefix so client_host can locate the newest file (mtime- ordered, prefix match) without extra config. Two integration tests exercise (a) end-to-end streaming via QUIC over the actual ClientHostProtocol, and (b) the missing-cap rejection. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/caps.rs | 5 + src/client_host.rs | 227 ++++++++++++++++++++++++++++++++++++++++++++- src/logs.rs | 54 +++++++++++ src/protocol.rs | 19 +++- 4 files changed, 302 insertions(+), 3 deletions(-) diff --git a/src/caps.rs b/src/caps.rs index 51a2c9e8..5aee5dac 100644 --- a/src/caps.rs +++ b/src/caps.rs @@ -131,6 +131,9 @@ cap_enum!( Push, /// Permits the bearer to set the log level filter on the issuer at runtime. SetLevel, + /// Permits the bearer to ask the issuer for the contents of its + /// local rolling log file. + Fetch, } ); @@ -208,6 +211,7 @@ fn client_capabilities(other: &Cap) -> bool { Cap::NetDiagnostics(NetDiagnosticsCap::GetAny) => true, Cap::Logs(LogsCap::Push) => true, Cap::Logs(LogsCap::SetLevel) => true, + Cap::Logs(LogsCap::Fetch) => true, } } @@ -242,6 +246,7 @@ impl Capability for LogsCap { match (self, other) { (LogsCap::Push, LogsCap::Push) => true, (LogsCap::SetLevel, LogsCap::SetLevel) => true, + (LogsCap::Fetch, LogsCap::Fetch) => true, (_, _) => false, } } diff --git a/src/client_host.rs b/src/client_host.rs index e9774a07..5a1116ed 100644 --- a/src/client_host.rs +++ b/src/client_host.rs @@ -14,7 +14,7 @@ use tracing::{debug, warn}; use crate::{ caps::{Caps, LogsCap, NetDiagnosticsCap}, logs::LogCollector, - protocol::{ClientHostMessage, ClientHostProtocol, RemoteError}, + protocol::{ClientHostMessage, ClientHostProtocol, FetchLogs, RemoteError}, }; /// The ALPN for sending messages from the cloud node to the client. @@ -139,6 +139,23 @@ impl ClientHost { } } } + ClientHostMessage::FetchLogs(msg) => { + let WithChannels { inner, tx, .. } = msg; + let needed_caps = Caps::new([LogsCap::Fetch]); + if !capability.permits(&needed_caps) { + let _ = tx + .send(Err(RemoteError::MissingCapability(needed_caps))) + .await; + } else if let Some(collector) = self.log_collector.clone() { + stream_current_log_file(collector, inner, tx).await; + } else { + let _ = tx + .send(Err(RemoteError::AuthError( + "log collection is not enabled on this client".into(), + ))) + .await; + } + } } connection.closed().await; @@ -181,6 +198,80 @@ async fn send_missing_caps( Ok(()) } +/// Chunk size for streaming the rolling file back. 64 KiB is large enough +/// to amortize the round-trip overhead and small enough that a tight +/// `max_bytes` clamp still produces granular cut-off points. +const FETCH_LOGS_CHUNK_BYTES: usize = 64 * 1024; + +/// Open the collector's currently-active rolling file and stream it back +/// over `tx` in 64 KiB chunks. Stops at end-of-file or when +/// `request.max_bytes` is reached. Errors during read are reported as a +/// terminal `Err` chunk; the receiver should treat the stream's end as +/// success. +async fn stream_current_log_file( + collector: LogCollector, + request: FetchLogs, + tx: irpc::channel::mpsc::Sender, RemoteError>>, +) { + use tokio::io::AsyncReadExt; + + let path = match collector.current_log_file() { + Ok(Some(p)) => p, + Ok(None) => { + let _ = tx + .send(Err(RemoteError::AuthError( + "no log file is present on this client".into(), + ))) + .await; + return; + } + Err(err) => { + warn!(?err, "failed to locate current log file"); + let _ = tx.send(Err(RemoteError::InternalServerError)).await; + return; + } + }; + + let mut file = match tokio::fs::File::open(&path).await { + Ok(f) => f, + Err(err) => { + warn!(?err, path = %path.display(), "failed to open log file"); + let _ = tx.send(Err(RemoteError::InternalServerError)).await; + return; + } + }; + + let max_bytes = request.max_bytes.unwrap_or(u64::MAX); + let mut sent: u64 = 0; + let mut buf = vec![0u8; FETCH_LOGS_CHUNK_BYTES]; + loop { + let remaining = max_bytes.saturating_sub(sent); + if remaining == 0 { + break; + } + let take = (remaining.min(buf.len() as u64)) as usize; + let n = match file.read(&mut buf[..take]).await { + Ok(0) => break, + Ok(n) => n, + Err(err) => { + warn!(?err, "log file read failed"); + let _ = tx.send(Err(RemoteError::InternalServerError)).await; + return; + } + }; + if tx.send(Ok(buf[..n].to_vec())).await.is_err() { + // Receiver hung up; nothing more to do. + return; + } + sent = sent.saturating_add(n as u64); + } + debug!( + path = %path.display(), + bytes = sent, + "streamed log file to remote" + ); +} + #[cfg(test)] mod tests { use iroh::{address_lookup::MemoryLookup, endpoint::presets, protocol::Router}; @@ -191,7 +282,8 @@ mod tests { use crate::{ ALPN, caps::create_grant_token, - protocol::{Auth, IrohServicesClient, RunNetworkDiagnostics}, + logs::{self, FileLoggerConfig}, + protocol::{Auth, FetchLogs as FetchLogsReq, IrohServicesClient, RunNetworkDiagnostics}, }; #[tokio::test] @@ -286,4 +378,135 @@ mod tests { router.shutdown().await.unwrap(); client_ep.close().await; } + + /// FetchLogs streams the currently-active rolling file from the + /// endpoint back to the cloud caller in chunks. + #[tokio::test] + async fn test_fetch_logs_streams_current_file() { + let tmp = tempfile::tempdir().unwrap(); + // Stand up a LogCollector pointing at the tempdir but skip the + // global subscriber init: we want a controlled file we wrote + // directly, not whatever the appender buffers. + let (collector, _layer, guard) = + logs::layer(FileLoggerConfig::new(tmp.path()).with_file_name_prefix("fetch-test")) + .unwrap(); + drop(guard); + + // Write a known payload to a file matching the prefix so + // `current_log_file` picks it up. + let payload: Vec = (0..200_000u32).flat_map(|i| i.to_le_bytes()).collect(); + let file_path = tmp.path().join("fetch-test.2026-05-14"); + std::fs::write(&file_path, &payload).unwrap(); + + let lookup = MemoryLookup::new(); + let server_ep = iroh::Endpoint::builder(presets::Minimal) + .address_lookup(lookup.clone()) + .bind() + .await + .unwrap(); + let client_ep = iroh::Endpoint::builder(presets::Minimal) + .address_lookup(lookup.clone()) + .bind() + .await + .unwrap(); + + let host = ClientHost::new(&server_ep).with_log_collector(collector); + let router = Router::builder(server_ep.clone()) + .accept(CLIENT_HOST_ALPN, host) + .spawn(); + + // Issue a grant that includes LogsCap::Fetch. + let rcan = create_grant_token( + server_ep.secret_key().clone(), + client_ep.id(), + Duration::from_secs(3600), + Caps::new([LogsCap::Fetch]), + ) + .unwrap(); + let conn = IrohLazyRemoteConnection::new( + client_ep.clone(), + server_ep.addr(), + CLIENT_HOST_ALPN.to_vec(), + ); + let client = ClientHostClient::boxed(conn); + client.rpc(Auth { caps: rcan }).await.unwrap(); + + let mut rx = client + .server_streaming(FetchLogsReq { max_bytes: None }, 16) + .await + .unwrap(); + + let mut got: Vec = Vec::new(); + while let Some(chunk) = rx.recv().await.expect("server stream irpc error") { + let bytes = chunk.expect("server returned RemoteError"); + got.extend_from_slice(&bytes); + } + assert_eq!(got, payload, "streamed bytes should match the file"); + + router.shutdown().await.unwrap(); + client_ep.close().await; + } + + /// Endpoints without `LogsCap::Fetch` get a `MissingCapability` error + /// on the stream and no data. + #[tokio::test] + async fn test_fetch_logs_rejects_missing_cap() { + let tmp = tempfile::tempdir().unwrap(); + let (collector, _layer, guard) = + logs::layer(FileLoggerConfig::new(tmp.path()).with_file_name_prefix("noaccess")) + .unwrap(); + drop(guard); + + let lookup = MemoryLookup::new(); + let server_ep = iroh::Endpoint::builder(presets::Minimal) + .address_lookup(lookup.clone()) + .bind() + .await + .unwrap(); + let client_ep = iroh::Endpoint::builder(presets::Minimal) + .address_lookup(lookup.clone()) + .bind() + .await + .unwrap(); + + let host = ClientHost::new(&server_ep).with_log_collector(collector); + let router = Router::builder(server_ep.clone()) + .accept(CLIENT_HOST_ALPN, host) + .spawn(); + + // Grant SetLevel, not Fetch. + let rcan = create_grant_token( + server_ep.secret_key().clone(), + client_ep.id(), + Duration::from_secs(3600), + Caps::new([LogsCap::SetLevel]), + ) + .unwrap(); + let conn = IrohLazyRemoteConnection::new( + client_ep.clone(), + server_ep.addr(), + CLIENT_HOST_ALPN.to_vec(), + ); + let client = ClientHostClient::boxed(conn); + client.rpc(Auth { caps: rcan }).await.unwrap(); + + let mut rx = client + .server_streaming(FetchLogsReq { max_bytes: None }, 4) + .await + .unwrap(); + + let first = rx + .recv() + .await + .expect("server stream irpc error") + .expect("stream should produce one error"); + assert!(matches!(first, Err(RemoteError::MissingCapability(_)))); + assert!( + rx.recv().await.expect("server stream irpc error").is_none(), + "stream should close after error", + ); + + router.shutdown().await.unwrap(); + client_ep.close().await; + } } diff --git a/src/logs.rs b/src/logs.rs index 60b36886..1018b91e 100644 --- a/src/logs.rs +++ b/src/logs.rs @@ -106,6 +106,12 @@ pub struct LogCollector { struct CollectorInner { reload_handle: reload::Handle, revert_task: Mutex>>, + /// Directory where the rolling file appender writes. Used by + /// [`LogCollector::serve_fetch_logs`] to locate the current file. + log_dir: PathBuf, + /// Filename prefix the rolling appender uses; the date suffix is + /// appended for each rolled-over file. + file_name_prefix: String, } /// Off-state directive. Nothing is captured until the cloud sends a @@ -158,6 +164,50 @@ impl LogCollector { .reload(filter) .map_err(|_| SetFilterError::ReloadFailed) } + + /// Directory the rolling appender writes into. Exposed so that + /// [`crate::ClientHost`] can stream files back to the cloud on + /// request. + pub fn log_dir(&self) -> &std::path::Path { + &self.inner.log_dir + } + + /// Filename prefix the rolling appender uses. Combined with the + /// rotation suffix to identify the current file. + pub fn file_name_prefix(&self) -> &str { + &self.inner.file_name_prefix + } + + /// Locate the newest rolling file under [`Self::log_dir`] that starts + /// with [`Self::file_name_prefix`]. Returns `Ok(None)` when the + /// directory exists but no matching file is present. + pub fn current_log_file(&self) -> std::io::Result> { + let dir = self.log_dir(); + let prefix = self.file_name_prefix(); + let entries = match std::fs::read_dir(dir) { + Ok(e) => e, + Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(None), + Err(err) => return Err(err), + }; + let mut best: Option<(PathBuf, std::time::SystemTime)> = None; + for entry in entries.flatten() { + let name = entry.file_name(); + let name = name.to_string_lossy(); + if !name.starts_with(prefix) { + continue; + } + let Ok(meta) = entry.metadata() else { continue }; + if !meta.is_file() { + continue; + } + let mtime = meta.modified().unwrap_or(std::time::SystemTime::UNIX_EPOCH); + match &best { + Some((_, current)) if *current >= mtime => {} + _ => best = Some((entry.path(), mtime)), + } + } + Ok(best.map(|(p, _)| p)) + } } impl std::fmt::Debug for LogCollector { @@ -204,12 +254,16 @@ pub fn layer( let filter = EnvFilter::try_new(OFF_DIRECTIVES).expect("'off' is always a valid directive"); let (filter, reload_handle) = reload::Layer::new(filter); + let log_dir = config.dir.clone(); + let file_name_prefix = config.file_name_prefix.clone(); let (file_layer, guard) = file_layer::(config)?; let layer = file_layer.with_filter(filter); let inner = Arc::new(CollectorInner { reload_handle, revert_task: Mutex::new(None), + log_dir, + file_name_prefix, }); let collector = LogCollector { inner }; Ok((collector, layer, guard)) diff --git a/src/protocol.rs b/src/protocol.rs index 6401bcae..04e17afa 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -1,5 +1,8 @@ use anyhow::Result; -use irpc::{channel::oneshot, rpc_requests}; +use irpc::{ + channel::{mpsc, oneshot}, + rpc_requests, +}; use rcan::Rcan; use serde::{Deserialize, Serialize}; use uuid::Uuid; @@ -44,6 +47,8 @@ pub enum ClientHostProtocol { RunNetworkDiagnostics(RunNetworkDiagnostics), #[rpc(tx=oneshot::Sender>)] SetLogLevel(SetLogLevel), + #[rpc(tx=mpsc::Sender>>)] + FetchLogs(FetchLogs), } pub type RemoteResult = Result; @@ -108,6 +113,18 @@ pub struct NameEndpoint { pub name: String, } +/// Ask the client to stream the contents of its currently-active rolling +/// log file. The client picks the newest file under its configured +/// log directory matching the configured filename prefix. +#[derive(Debug, Serialize, Deserialize)] +pub struct FetchLogs { + /// Stop after this many bytes have been streamed. `None` means stream + /// the whole current file. The cloud caller is expected to enforce its + /// own plan-tier cap on top of this. + #[serde(default)] + pub max_bytes: Option, +} + /// Cloud-issued instruction to override the client's tracing filter. #[derive(Debug, Serialize, Deserialize)] pub struct SetLogLevel { From 68751902dbc578d64a7128c98fc3f8a9c33f8234 Mon Sep 17 00:00:00 2001 From: Rae McKelvey <633012+okdistribute@users.noreply.github.com> Date: Thu, 14 May 2026 11:39:48 +0200 Subject: [PATCH 6/7] feat(logs): client-side pull for initial directives via GetLogLevel MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replaces the cloud's push-with-sleep dance for restoring per-endpoint log directives on reconnect. The cloud's spawn_initial_set_log_level existed because the dial-back over CLIENT_HOST_ALPN couldn't run until the client had landed its GrantCap(LogsCap::SetLevel), so the cloud slept 2s and hoped the grant beat it. Race-prone and slow. Inverted: the client now pulls. Right after Auth succeeds in ClientActor, it RPCs the cloud with GetLogLevel and applies the result to its LogCollector. No sleeps, no race, no LogsCap::SetLevel needed for initial state — the call is the client reading its own setting. Re-introduces ClientBuilder::with_log_collector(collector) so consumers opt into the pull behaviour. Updates the logs example. The dashboard-triggered live override path (SetLogLevel pushed via ClientHost) is unchanged. It still requires LogsCap::SetLevel, but by the time the operator clicks Apply the connection is long up and the grant is on file, so there is no race to avoid. New protocol types: - IrohServicesProtocol::GetLogLevel — client-to-cloud RPC. - LogLevelSettings { directives, expires_in_secs?, revert_to? } — same shape as SetLogLevel so the client applies through the same set_filter code path. Co-Authored-By: Claude Opus 4.7 (1M context) --- examples/logs.rs | 17 ++++++++++------- src/client.rs | 47 +++++++++++++++++++++++++++++++++++++++++++++++ src/protocol.rs | 20 ++++++++++++++++++++ 3 files changed, 77 insertions(+), 7 deletions(-) diff --git a/examples/logs.rs b/examples/logs.rs index 00f718ac..ae4a15ab 100644 --- a/examples/logs.rs +++ b/examples/logs.rs @@ -40,16 +40,19 @@ async fn main() -> Result<()> { let name = format!("logs-example-{}", &endpoint.id().to_string()[..8]); - // 3. Build the client. + // 3. Build the client. `with_log_collector(collector.clone())` makes + // the client pull the cloud's persisted directives right after + // auth and apply them locally — no waiting for the cloud to push. let client = Client::builder(&endpoint) .api_secret(secret)? .name(name)? + .with_log_collector(collector.clone()) .build() .await?; - // 4. Grant `LogsCap::SetLevel` so the cloud can dial us back and apply a - // filter override. Spawned so a momentarily-down cloud does not block - // startup. + // 4. Grant `LogsCap::SetLevel` so the dashboard can dial us back with + // live updates after the initial pull. Spawned so a momentarily- + // down cloud does not block startup. let client_for_grant = client.clone(); let grant_task = tokio::spawn(async move { if let Err(err) = client_for_grant @@ -60,9 +63,9 @@ async fn main() -> Result<()> { } }); - // 5. Accept the cloud's callback connections on `CLIENT_HOST_ALPN`. The - // `ClientHost` needs the collector so the `SetLogLevel` request can - // hot-reload the local filter. + // 5. Accept the cloud's callback connections on `CLIENT_HOST_ALPN`. + // The `ClientHost` needs the same collector so dashboard-triggered + // overrides hot-reload the local filter. let host = ClientHost::new(&endpoint).with_log_collector(collector); let router = Router::builder(endpoint) .accept(CLIENT_HOST_ALPN, host) diff --git a/src/client.rs b/src/client.rs index a127a70a..54e7d80c 100644 --- a/src/client.rs +++ b/src/client.rs @@ -67,6 +67,7 @@ pub struct ClientBuilder { metrics_interval: Option, remote: Option, registry: Registry, + log_collector: Option, } const DEFAULT_CAP_EXPIRY: Duration = Duration::from_secs(60 * 60 * 24 * 30); // 1 month @@ -85,9 +86,23 @@ impl ClientBuilder { metrics_interval: Some(Duration::from_secs(60)), remote: None, registry, + log_collector: None, } } + /// Enables initial-state pull of log-level directives on every + /// (re-)authentication. Right after `Auth` succeeds the client RPCs + /// the cloud for the currently-persisted setting and applies it via + /// [`crate::logs::LogCollector::set_filter`]. + /// + /// The collector handle is also what + /// [`crate::ClientHost::with_log_collector`] uses to apply + /// dashboard-triggered overrides, so it's typically the same one. + pub fn with_log_collector(mut self, collector: crate::logs::LogCollector) -> Self { + self.log_collector = Some(collector); + self + } + /// Register a metrics group to forward to iroh-services /// /// The default registered metrics uses only the endpoint @@ -226,6 +241,7 @@ impl ClientBuilder { name: self.name.clone(), session_id, authorized: false, + log_collector: self.log_collector, } .run(self.name, self.registry, self.metrics_interval, rx), )); @@ -445,6 +461,7 @@ struct ClientActor { name: Option, session_id: Uuid, authorized: bool, + log_collector: Option, } impl ClientActor { @@ -543,6 +560,36 @@ impl ClientActor { .inspect_err(|e| debug!("authorization failed: {:?}", e)) .map_err(|e| RemoteError::AuthError(e.to_string()))?; self.authorized = true; + + // Initial pull: ask the cloud for whatever directive override is + // on file for this endpoint and apply it locally. Best-effort — + // a failure here logs but does not block authentication. The + // dashboard-triggered live override path still works + // independently for in-session changes. + if let Some(collector) = self.log_collector.as_ref() { + match self.client.rpc(crate::protocol::GetLogLevel).await { + Ok(Ok(Some(settings))) => { + let expires_in = settings.expires_in_secs.map(Duration::from_secs); + if let Err(err) = collector.set_filter( + &settings.directives, + expires_in, + settings.revert_to.as_deref(), + ) { + warn!(?err, "failed to apply initial log level"); + } + } + Ok(Ok(None)) => { + // Endpoint not opted in — leave the filter at `off`. + } + Ok(Err(err)) => { + debug!(?err, "cloud rejected initial GetLogLevel"); + } + Err(err) => { + debug!(?err, "initial GetLogLevel rpc failed"); + } + } + } + Ok(()) } diff --git a/src/protocol.rs b/src/protocol.rs index 04e17afa..635ba599 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -33,6 +33,8 @@ pub enum IrohServicesProtocol { #[rpc(tx=oneshot::Sender>)] NameEndpoint(NameEndpoint), + #[rpc(tx=oneshot::Sender>>)] + GetLogLevel(GetLogLevel), } /// Dedicated protocol for cloud-to-endpoint callbacks (net diagnostics, log @@ -141,3 +143,21 @@ pub struct SetLogLevel { #[serde(default)] pub revert_to: Option, } + +/// Client-initiated request for the cloud's current log-level settings. +/// Sent right after auth so the client lands on the correct filter +/// without waiting for the cloud to push. +#[derive(Debug, Serialize, Deserialize)] +pub struct GetLogLevel; + +/// Settings returned by [`GetLogLevel`]. Mirrors [`SetLogLevel`] so the +/// client applies the result through the same [`crate::logs::LogCollector::set_filter`] +/// code path. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LogLevelSettings { + pub directives: String, + #[serde(default)] + pub expires_in_secs: Option, + #[serde(default)] + pub revert_to: Option, +} From f5c881414a6c32aede7e9c0a23ecf41c0a7a9ae1 Mon Sep 17 00:00:00 2001 From: Rae McKelvey <633012+okdistribute@users.noreply.github.com> Date: Thu, 14 May 2026 12:36:59 +0200 Subject: [PATCH 7/7] refactor(logs): unify GetLogLevel response with SetLogLevel; tidy surface A pass of review cleanups: - LogLevelSettings was a structural copy of SetLogLevel (same three fields, same client-side application via set_filter). Dropped the duplicate; GetLogLevel now returns Option and the doc-comment on SetLogLevel notes it carries both directions. - LogCollector::log_dir() / file_name_prefix() were public but only used internally to compute current_log_file(). Narrowed current_log_file() to pub(crate) (only the ClientHost handler reads it) and inlined the two getters into its body. - Removed the dead InstallError::InvalidDirectives variant left over from the deleted shipper. - Fixed a stale comment in the install doctest that still described the pre-pull push semantics. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/logs.rs | 41 +++++++++++++---------------------------- src/protocol.rs | 22 +++++++--------------- 2 files changed, 20 insertions(+), 43 deletions(-) diff --git a/src/logs.rs b/src/logs.rs index 1018b91e..02a7756f 100644 --- a/src/logs.rs +++ b/src/logs.rs @@ -24,13 +24,11 @@ //! use iroh_services::logs::{self, FileLoggerConfig}; //! //! # fn main() -> anyhow::Result<()> { -//! // Installs a global subscriber. The filter starts at `off`; once a -//! // ClientHost connection is up, the cloud pushes a SetLogLevel that -//! // raises it to whatever the dashboard says. +//! // Installs a global subscriber. The filter starts at `off`; the +//! // Client pulls the cloud-persisted directive right after Auth and +//! // applies it via the collector. The dashboard can also push live +//! // overrides via ClientHost::set_log_level after that. //! let (collector, _guard) = logs::install(FileLoggerConfig::new("./logs"))?; -//! -//! // Hand the collector to ClientHost so it can call `collector.set_filter` -//! // when the cloud pushes overrides. //! # let _ = collector; //! # Ok(()) //! # } @@ -76,9 +74,6 @@ pub enum InstallError { /// The default tracing dispatcher is already set; install once at startup. #[error("global tracing dispatcher is already set")] AlreadyInstalled, - /// The supplied directives string was rejected by `EnvFilter`. - #[error("invalid filter directives: {0}")] - InvalidDirectives(String), /// File logger setup failed (could not create directory, open appender, /// etc.). #[error("file logger setup failed: {0}")] @@ -165,25 +160,15 @@ impl LogCollector { .map_err(|_| SetFilterError::ReloadFailed) } - /// Directory the rolling appender writes into. Exposed so that - /// [`crate::ClientHost`] can stream files back to the cloud on - /// request. - pub fn log_dir(&self) -> &std::path::Path { - &self.inner.log_dir - } - - /// Filename prefix the rolling appender uses. Combined with the - /// rotation suffix to identify the current file. - pub fn file_name_prefix(&self) -> &str { - &self.inner.file_name_prefix - } - - /// Locate the newest rolling file under [`Self::log_dir`] that starts - /// with [`Self::file_name_prefix`]. Returns `Ok(None)` when the - /// directory exists but no matching file is present. - pub fn current_log_file(&self) -> std::io::Result> { - let dir = self.log_dir(); - let prefix = self.file_name_prefix(); + /// Locate the newest rolling file in the configured log directory + /// whose name starts with the configured filename prefix. Returns + /// `Ok(None)` when the directory exists but no matching file is + /// present. Used by [`crate::ClientHost`] to serve [`FetchLogs`]. + /// + /// [`FetchLogs`]: crate::protocol::FetchLogs + pub(crate) fn current_log_file(&self) -> std::io::Result> { + let dir = &self.inner.log_dir; + let prefix = &self.inner.file_name_prefix; let entries = match std::fs::read_dir(dir) { Ok(e) => e, Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(None), diff --git a/src/protocol.rs b/src/protocol.rs index 635ba599..5a4c7dc9 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -33,7 +33,7 @@ pub enum IrohServicesProtocol { #[rpc(tx=oneshot::Sender>)] NameEndpoint(NameEndpoint), - #[rpc(tx=oneshot::Sender>>)] + #[rpc(tx=oneshot::Sender>>)] GetLogLevel(GetLogLevel), } @@ -127,8 +127,12 @@ pub struct FetchLogs { pub max_bytes: Option, } -/// Cloud-issued instruction to override the client's tracing filter. -#[derive(Debug, Serialize, Deserialize)] +/// Log-level filter settings. Used in two directions: +/// - As a cloud-to-client push (via the [`crate::ClientHost`] callback) +/// to apply a new override mid-session. +/// - As the response payload to [`GetLogLevel`] so the client can pull +/// the persisted setting on connect. +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct SetLogLevel { /// `EnvFilter`-compatible directive string (for example /// `"info,iroh=trace,iroh_blobs=debug"`). @@ -149,15 +153,3 @@ pub struct SetLogLevel { /// without waiting for the cloud to push. #[derive(Debug, Serialize, Deserialize)] pub struct GetLogLevel; - -/// Settings returned by [`GetLogLevel`]. Mirrors [`SetLogLevel`] so the -/// client applies the result through the same [`crate::logs::LogCollector::set_filter`] -/// code path. -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct LogLevelSettings { - pub directives: String, - #[serde(default)] - pub expires_in_secs: Option, - #[serde(default)] - pub revert_to: Option, -}