diff --git a/Cargo.lock b/Cargo.lock index a183ed0b..c574ef9d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1810,7 +1810,7 @@ dependencies = [ [[package]] name = "iroh-services" -version = "0.14.0" +version = "0.15.0" dependencies = [ "anyhow", "built", @@ -1835,9 +1835,11 @@ dependencies = [ "ssh-key", "strum", "temp_env_vars", + "tempfile", "thiserror 2.0.18", "tokio", "tracing", + "tracing-appender", "tracing-subscriber", "uuid", ] @@ -2017,6 +2019,12 @@ version = "0.2.185" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "52ff2c0fe9bc6cb6b14a0592c2ff4fa9ceb83eea9db979b0487cd054946a2b8f" +[[package]] +name = "linux-raw-sys" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32a66949e030da00e8c7d4434b251670a91556f4144941d37452769c25d58a53" + [[package]] name = "litemap" version = "0.8.2" @@ -3032,6 +3040,19 @@ dependencies = [ "nom", ] +[[package]] +name = "rustix" +version = "1.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6fe4565b9518b83ef4f91bb47ce29620ca828bd32cb7e408f0062e9930ba190" +dependencies = [ + "bitflags", + "errno", + "libc", + "linux-raw-sys", + "windows-sys 0.61.2", +] + [[package]] name = "rustls" version = "0.23.38" @@ -3492,6 +3513,12 @@ version = "2.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" +[[package]] +name = "symlink" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7973cce6668464ea31f176d85b13c7ab3bba2cb3b77a2ed26abd7801688010a" + [[package]] name = "syn" version = "2.0.117" @@ -3569,6 +3596,19 @@ dependencies = [ "syn", ] +[[package]] +name = "tempfile" +version = "3.27.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32497e9a4c7b38532efcdebeef879707aa9f794296a4f0244f6f69e9bc8574bd" +dependencies = [ + "fastrand", + "getrandom 0.4.2", + "once_cell", + "rustix", + "windows-sys 0.61.2", +] + [[package]] name = "thiserror" version = "1.0.69" @@ -3891,6 +3931,19 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-appender" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "050686193eb999b4bb3bc2acfa891a13da00f79734704c4b8b4ef1a10b368a3c" +dependencies = [ + "crossbeam-channel", + "symlink", + "thiserror 2.0.18", + "time", + "tracing-subscriber", +] + [[package]] name = "tracing-attributes" version = "0.1.31" diff --git a/Cargo.toml b/Cargo.toml index e1a1d7e8..15662353 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "iroh-services" -version = "0.14.0" +version = "0.15.0" edition = "2024" readme = "README.md" description = "p2p quic connections dialed by public key" @@ -33,6 +33,7 @@ tracing-subscriber = { version = "0.3.20", features = [ "fmt", "json", ] } +tracing-appender = "0.2" serde_json = "1.0.140" bytes = { version = "1.10.1", features = ["serde"] } futures-buffered = "0.2.12" @@ -52,6 +53,7 @@ built = { version = "0.8", features = ["cargo-lock"] } [dev-dependencies] rand = { version = "0.10", features = ["chacha"] } temp_env_vars = "0.2.1" +tempfile = "3" tokio = { version = "1.45", features = ["macros", "rt", "rt-multi-thread", "signal"] } @@ -60,3 +62,6 @@ default = [] [[example]] name = "net_diagnostics" + +[[example]] +name = "logs" diff --git a/examples/logs.rs b/examples/logs.rs new file mode 100644 index 00000000..ae4a15ab --- /dev/null +++ b/examples/logs.rs @@ -0,0 +1,93 @@ +//! Log collection example. +//! +//! Demonstrates installing the iroh-services file logger and letting the +//! cloud override the local filter at runtime via `SetLogLevel`. +//! +//! The level filter starts at `off`. The cloud pushes a level after this +//! endpoint is opted into log collection from the dashboard or REST API. +//! Anything emitted before the cloud responds is silently dropped. +//! +//! Run with: cargo run --example logs + +use std::time::Duration; + +use anyhow::Result; +use iroh::{Endpoint, endpoint::presets, protocol::Router}; +use iroh_services::{ + API_SECRET_ENV_VAR_NAME, ApiSecret, CLIENT_HOST_ALPN, Client, ClientHost, + caps::LogsCap, + logs::{self, FileLoggerConfig, Rotation}, +}; +use tracing::info; + +#[tokio::main] +async fn main() -> Result<()> { + // 1. Install the cloud-controlled file logger. Records land under + // `./logs/` and roll over hourly with up to 24 files kept. The + // WorkerGuard must outlive the process; drop it in `main`'s tail + // so any buffered records flush on exit. + let (collector, _guard) = logs::install( + FileLoggerConfig::new("./logs") + .with_rotation(Rotation::HOURLY) + .with_max_files(Some(24)), + )?; + + // 2. Create the endpoint and parse the API secret so we know which + // cloud endpoint to grant SetLevel to. + let endpoint = Endpoint::bind(presets::N0).await?; + let secret = ApiSecret::from_env_var(API_SECRET_ENV_VAR_NAME)?; + let cloud_id = secret.addr().id; + + let name = format!("logs-example-{}", &endpoint.id().to_string()[..8]); + + // 3. Build the client. `with_log_collector(collector.clone())` makes + // the client pull the cloud's persisted directives right after + // auth and apply them locally — no waiting for the cloud to push. + let client = Client::builder(&endpoint) + .api_secret(secret)? + .name(name)? + .with_log_collector(collector.clone()) + .build() + .await?; + + // 4. Grant `LogsCap::SetLevel` so the dashboard can dial us back with + // live updates after the initial pull. Spawned so a momentarily- + // down cloud does not block startup. + let client_for_grant = client.clone(); + let grant_task = tokio::spawn(async move { + if let Err(err) = client_for_grant + .grant_capability(cloud_id, [LogsCap::SetLevel]) + .await + { + eprintln!("failed to grant LogsCap::SetLevel: {err:?}"); + } + }); + + // 5. Accept the cloud's callback connections on `CLIENT_HOST_ALPN`. + // The `ClientHost` needs the same collector so dashboard-triggered + // overrides hot-reload the local filter. + let host = ClientHost::new(&endpoint).with_log_collector(collector); + let router = Router::builder(endpoint) + .accept(CLIENT_HOST_ALPN, host) + .spawn(); + + // 6. Emit an info log every other second forever. Records are written + // to the local rolling file once the cloud raises the level above + // `off`. + println!("emitting logs; ctrl+c to exit."); + let mut tick = tokio::time::interval(Duration::from_secs(2)); + let mut counter: u64 = 0; + loop { + tokio::select! { + _ = tick.tick() => { + counter += 1; + info!(counter, "logs example heartbeat"); + } + _ = tokio::signal::ctrl_c() => break, + } + } + + grant_task.abort(); + router.endpoint().close().await; + Ok(()) +} diff --git a/examples/quickstart.rs b/examples/quickstart.rs index 034ae4db..04473658 100644 --- a/examples/quickstart.rs +++ b/examples/quickstart.rs @@ -3,17 +3,29 @@ use iroh_services::Client; #[tokio::main] async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt::init(); + let endpoint = Endpoint::bind(presets::N0).await?; + // Wait for the endpoint to be online + endpoint.online().await; + // needs IROH_SERVICES_API_SECRET set to an environment variable // client will now push endpoint metrics to iroh-services let client = Client::builder(&endpoint) .api_secret_from_env()? + .name("quickstart-example")? .build() .await?; // we can also ping the service just to confirm everything is working client.ping().await?; + // keep the endpoint running so it continues pushing metrics. + // ctrl+c to exit. + println!("endpoint running. ctrl+c to exit."); + tokio::signal::ctrl_c().await?; + endpoint.close().await; + Ok(()) } diff --git a/src/caps.rs b/src/caps.rs index fcdcbf14..5aee5dac 100644 --- a/src/caps.rs +++ b/src/caps.rs @@ -81,6 +81,8 @@ pub enum Cap { Metrics(MetricsCap), #[strum(to_string = "net-diagnostics:{0}")] NetDiagnostics(NetDiagnosticsCap), + #[strum(to_string = "logs:{0}")] + Logs(LogsCap), } impl FromStr for Cap { @@ -94,6 +96,7 @@ impl FromStr for Cap { "metrics" => Self::Metrics(MetricsCap::from_str(inner)?), "relay" => Self::Relay(RelayCap::from_str(inner)?), "net-diagnostics" => Self::NetDiagnostics(NetDiagnosticsCap::from_str(inner)?), + "logs" => Self::Logs(LogsCap::from_str(inner)?), _ => bail!("invalid cap domain"), }) } else { @@ -121,6 +124,19 @@ cap_enum!( } ); +cap_enum!( + /// Capabilities for the log collection feature. + pub enum LogsCap { + /// Permits the bearer to push log lines to the cloud. + Push, + /// Permits the bearer to set the log level filter on the issuer at runtime. + SetLevel, + /// Permits the bearer to ask the issuer for the contents of its + /// local rolling log file. + Fetch, + } +); + impl Caps { pub fn new(caps: impl IntoIterator>) -> Self { Self::V0(CapSet::new(caps)) @@ -179,6 +195,7 @@ impl Capability for Cap { (Cap::Relay(slf), Cap::Relay(other)) => slf.permits(other), (Cap::Metrics(slf), Cap::Metrics(other)) => slf.permits(other), (Cap::NetDiagnostics(slf), Cap::NetDiagnostics(other)) => slf.permits(other), + (Cap::Logs(slf), Cap::Logs(other)) => slf.permits(other), (_, _) => false, } } @@ -192,6 +209,9 @@ fn client_capabilities(other: &Cap) -> bool { Cap::Metrics(MetricsCap::PutAny) => true, Cap::NetDiagnostics(NetDiagnosticsCap::PutAny) => true, Cap::NetDiagnostics(NetDiagnosticsCap::GetAny) => true, + Cap::Logs(LogsCap::Push) => true, + Cap::Logs(LogsCap::SetLevel) => true, + Cap::Logs(LogsCap::Fetch) => true, } } @@ -221,6 +241,17 @@ impl Capability for NetDiagnosticsCap { } } +impl Capability for LogsCap { + fn permits(&self, other: &Self) -> bool { + match (self, other) { + (LogsCap::Push, LogsCap::Push) => true, + (LogsCap::SetLevel, LogsCap::SetLevel) => true, + (LogsCap::Fetch, LogsCap::Fetch) => true, + (_, _) => false, + } + } +} + /// A set of capabilities #[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Clone, Serialize, Deserialize)] pub struct CapSet(BTreeSet); diff --git a/src/client.rs b/src/client.rs index 062ab654..54e7d80c 100644 --- a/src/client.rs +++ b/src/client.rs @@ -67,6 +67,7 @@ pub struct ClientBuilder { metrics_interval: Option, remote: Option, registry: Registry, + log_collector: Option, } const DEFAULT_CAP_EXPIRY: Duration = Duration::from_secs(60 * 60 * 24 * 30); // 1 month @@ -85,9 +86,23 @@ impl ClientBuilder { metrics_interval: Some(Duration::from_secs(60)), remote: None, registry, + log_collector: None, } } + /// Enables initial-state pull of log-level directives on every + /// (re-)authentication. Right after `Auth` succeeds the client RPCs + /// the cloud for the currently-persisted setting and applies it via + /// [`crate::logs::LogCollector::set_filter`]. + /// + /// The collector handle is also what + /// [`crate::ClientHost::with_log_collector`] uses to apply + /// dashboard-triggered overrides, so it's typically the same one. + pub fn with_log_collector(mut self, collector: crate::logs::LogCollector) -> Self { + self.log_collector = Some(collector); + self + } + /// Register a metrics group to forward to iroh-services /// /// The default registered metrics uses only the endpoint @@ -213,14 +228,20 @@ impl ClientBuilder { let conn = IrohLazyRemoteConnection::new(self.endpoint.clone(), remote, ALPN.to_vec()); let irpc_client = IrohServicesClient::boxed(conn); - let (tx, rx) = tokio::sync::mpsc::channel(1); + let session_id = Uuid::new_v4(); + // The actor mailbox is only used for control-plane messages (auth, + // ping, name, grant_cap) plus the periodic metrics + log flush. A + // small buffer is enough but `1` head-of-line-blocks log flushes + // behind metrics ticks, so leave a little room. + let (tx, rx) = tokio::sync::mpsc::channel(8); let actor_task = AbortOnDropHandle::new(n0_future::task::spawn( ClientActor { capabilities, client: irpc_client, name: self.name.clone(), - session_id: Uuid::new_v4(), + session_id, authorized: false, + log_collector: self.log_collector, } .run(self.name, self.registry, self.metrics_interval, rx), )); @@ -440,6 +461,7 @@ struct ClientActor { name: Option, session_id: Uuid, authorized: bool, + log_collector: Option, } impl ClientActor { @@ -538,6 +560,36 @@ impl ClientActor { .inspect_err(|e| debug!("authorization failed: {:?}", e)) .map_err(|e| RemoteError::AuthError(e.to_string()))?; self.authorized = true; + + // Initial pull: ask the cloud for whatever directive override is + // on file for this endpoint and apply it locally. Best-effort — + // a failure here logs but does not block authentication. The + // dashboard-triggered live override path still works + // independently for in-session changes. + if let Some(collector) = self.log_collector.as_ref() { + match self.client.rpc(crate::protocol::GetLogLevel).await { + Ok(Ok(Some(settings))) => { + let expires_in = settings.expires_in_secs.map(Duration::from_secs); + if let Err(err) = collector.set_filter( + &settings.directives, + expires_in, + settings.revert_to.as_deref(), + ) { + warn!(?err, "failed to apply initial log level"); + } + } + Ok(Ok(None)) => { + // Endpoint not opted in — leave the filter at `off`. + } + Ok(Err(err)) => { + debug!(?err, "cloud rejected initial GetLogLevel"); + } + Err(err) => { + debug!(?err, "initial GetLogLevel rpc failed"); + } + } + } + Ok(()) } diff --git a/src/client_host.rs b/src/client_host.rs index d0535126..5a1116ed 100644 --- a/src/client_host.rs +++ b/src/client_host.rs @@ -7,12 +7,14 @@ use iroh::{ use irpc::WithChannels; use irpc_iroh::read_request; use n0_error::AnyError; +use n0_future::time::Duration; use rcan::{Capability, CapabilityOrigin, Rcan}; use tracing::{debug, warn}; use crate::{ - caps::{Caps, NetDiagnosticsCap}, - protocol::{ClientHostProtocol, NetDiagnosticsMessage, RemoteError}, + caps::{Caps, LogsCap, NetDiagnosticsCap}, + logs::LogCollector, + protocol::{ClientHostMessage, ClientHostProtocol, FetchLogs, RemoteError}, }; /// The ALPN for sending messages from the cloud node to the client. @@ -21,9 +23,10 @@ pub const CLIENT_HOST_ALPN: &[u8] = b"n0/n0des-client-host/1"; pub type ClientHostClient = irpc::Client; /// Protocol handler for cloud-to-endpoint connections. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct ClientHost { endpoint: Endpoint, + log_collector: Option, } impl ProtocolHandler for ClientHost { @@ -39,16 +42,29 @@ impl ClientHost { pub fn new(endpoint: &Endpoint) -> Self { Self { endpoint: endpoint.clone(), + log_collector: None, } } + /// Enables the cloud to set the log level filter at runtime via the + /// [`SetLogLevel`] callback. + /// + /// Without a collector the handler still accepts the message but responds + /// with [`RemoteError::AuthError`] indicating the feature is disabled. + /// + /// [`SetLogLevel`]: crate::protocol::SetLogLevel + pub fn with_log_collector(mut self, collector: LogCollector) -> Self { + self.log_collector = Some(collector); + self + } + async fn handle_connection(&self, connection: Connection) -> Result<()> { let remote_node_id = connection.remote_id(); let Some(first_request) = read_request::(&connection).await? else { return Ok(()); }; - let NetDiagnosticsMessage::Auth(WithChannels { inner, tx, .. }) = first_request else { + let ClientHostMessage::Auth(WithChannels { inner, tx, .. }) = first_request else { debug!(remote_node_id = %remote_node_id.fmt_short(), "Expected initial auth message"); connection.close(400u32.into(), b"Expected initial auth message"); return Ok(()); @@ -66,17 +82,17 @@ impl ClientHost { } } - // Read exactly one RunNetworkDiagnostics request + // Read exactly one callback request let Some(request) = read_request::(&connection).await? else { return Ok(()); }; match request { - NetDiagnosticsMessage::Auth(_) => { + ClientHostMessage::Auth(_) => { connection.close(400u32.into(), b"Unexpected auth message"); anyhow::bail!("unexpected auth message"); } - NetDiagnosticsMessage::RunNetworkDiagnostics(msg) => { + ClientHostMessage::RunNetworkDiagnostics(msg) => { let WithChannels { tx, .. } = msg; let needed_caps = Caps::new([NetDiagnosticsCap::GetAny]); if !capability.permits(&needed_caps) { @@ -89,6 +105,57 @@ impl ClientHost { .await .inspect_err(|e| warn!("sending network diagnostics response: {:?}", e))?; } + ClientHostMessage::SetLogLevel(msg) => { + let WithChannels { inner, tx, .. } = msg; + let needed_caps = Caps::new([LogsCap::SetLevel]); + if !capability.permits(&needed_caps) { + return send_missing_caps(tx, needed_caps).await; + } + let Some(ref collector) = self.log_collector else { + tx.send(Err(RemoteError::AuthError( + "log collection is not enabled on this client".into(), + ))) + .await?; + return Ok(()); + }; + let expires_in = inner.expires_in_secs.map(Duration::from_secs); + match collector.set_filter( + &inner.directives, + expires_in, + inner.revert_to.as_deref(), + ) { + Ok(()) => { + debug!( + directives = %inner.directives, + expires_in_secs = ?inner.expires_in_secs, + "applied log level override" + ); + tx.send(Ok(())).await?; + } + Err(err) => { + warn!(?err, "failed to apply log level override"); + tx.send(Err(RemoteError::AuthError(err.to_string()))) + .await?; + } + } + } + ClientHostMessage::FetchLogs(msg) => { + let WithChannels { inner, tx, .. } = msg; + let needed_caps = Caps::new([LogsCap::Fetch]); + if !capability.permits(&needed_caps) { + let _ = tx + .send(Err(RemoteError::MissingCapability(needed_caps))) + .await; + } else if let Some(collector) = self.log_collector.clone() { + stream_current_log_file(collector, inner, tx).await; + } else { + let _ = tx + .send(Err(RemoteError::AuthError( + "log collection is not enabled on this client".into(), + ))) + .await; + } + } } connection.closed().await; @@ -131,6 +198,80 @@ async fn send_missing_caps( Ok(()) } +/// Chunk size for streaming the rolling file back. 64 KiB is large enough +/// to amortize the round-trip overhead and small enough that a tight +/// `max_bytes` clamp still produces granular cut-off points. +const FETCH_LOGS_CHUNK_BYTES: usize = 64 * 1024; + +/// Open the collector's currently-active rolling file and stream it back +/// over `tx` in 64 KiB chunks. Stops at end-of-file or when +/// `request.max_bytes` is reached. Errors during read are reported as a +/// terminal `Err` chunk; the receiver should treat the stream's end as +/// success. +async fn stream_current_log_file( + collector: LogCollector, + request: FetchLogs, + tx: irpc::channel::mpsc::Sender, RemoteError>>, +) { + use tokio::io::AsyncReadExt; + + let path = match collector.current_log_file() { + Ok(Some(p)) => p, + Ok(None) => { + let _ = tx + .send(Err(RemoteError::AuthError( + "no log file is present on this client".into(), + ))) + .await; + return; + } + Err(err) => { + warn!(?err, "failed to locate current log file"); + let _ = tx.send(Err(RemoteError::InternalServerError)).await; + return; + } + }; + + let mut file = match tokio::fs::File::open(&path).await { + Ok(f) => f, + Err(err) => { + warn!(?err, path = %path.display(), "failed to open log file"); + let _ = tx.send(Err(RemoteError::InternalServerError)).await; + return; + } + }; + + let max_bytes = request.max_bytes.unwrap_or(u64::MAX); + let mut sent: u64 = 0; + let mut buf = vec![0u8; FETCH_LOGS_CHUNK_BYTES]; + loop { + let remaining = max_bytes.saturating_sub(sent); + if remaining == 0 { + break; + } + let take = (remaining.min(buf.len() as u64)) as usize; + let n = match file.read(&mut buf[..take]).await { + Ok(0) => break, + Ok(n) => n, + Err(err) => { + warn!(?err, "log file read failed"); + let _ = tx.send(Err(RemoteError::InternalServerError)).await; + return; + } + }; + if tx.send(Ok(buf[..n].to_vec())).await.is_err() { + // Receiver hung up; nothing more to do. + return; + } + sent = sent.saturating_add(n as u64); + } + debug!( + path = %path.display(), + bytes = sent, + "streamed log file to remote" + ); +} + #[cfg(test)] mod tests { use iroh::{address_lookup::MemoryLookup, endpoint::presets, protocol::Router}; @@ -141,7 +282,8 @@ mod tests { use crate::{ ALPN, caps::create_grant_token, - protocol::{Auth, IrohServicesClient, RunNetworkDiagnostics}, + logs::{self, FileLoggerConfig}, + protocol::{Auth, FetchLogs as FetchLogsReq, IrohServicesClient, RunNetworkDiagnostics}, }; #[tokio::test] @@ -236,4 +378,135 @@ mod tests { router.shutdown().await.unwrap(); client_ep.close().await; } + + /// FetchLogs streams the currently-active rolling file from the + /// endpoint back to the cloud caller in chunks. + #[tokio::test] + async fn test_fetch_logs_streams_current_file() { + let tmp = tempfile::tempdir().unwrap(); + // Stand up a LogCollector pointing at the tempdir but skip the + // global subscriber init: we want a controlled file we wrote + // directly, not whatever the appender buffers. + let (collector, _layer, guard) = + logs::layer(FileLoggerConfig::new(tmp.path()).with_file_name_prefix("fetch-test")) + .unwrap(); + drop(guard); + + // Write a known payload to a file matching the prefix so + // `current_log_file` picks it up. + let payload: Vec = (0..200_000u32).flat_map(|i| i.to_le_bytes()).collect(); + let file_path = tmp.path().join("fetch-test.2026-05-14"); + std::fs::write(&file_path, &payload).unwrap(); + + let lookup = MemoryLookup::new(); + let server_ep = iroh::Endpoint::builder(presets::Minimal) + .address_lookup(lookup.clone()) + .bind() + .await + .unwrap(); + let client_ep = iroh::Endpoint::builder(presets::Minimal) + .address_lookup(lookup.clone()) + .bind() + .await + .unwrap(); + + let host = ClientHost::new(&server_ep).with_log_collector(collector); + let router = Router::builder(server_ep.clone()) + .accept(CLIENT_HOST_ALPN, host) + .spawn(); + + // Issue a grant that includes LogsCap::Fetch. + let rcan = create_grant_token( + server_ep.secret_key().clone(), + client_ep.id(), + Duration::from_secs(3600), + Caps::new([LogsCap::Fetch]), + ) + .unwrap(); + let conn = IrohLazyRemoteConnection::new( + client_ep.clone(), + server_ep.addr(), + CLIENT_HOST_ALPN.to_vec(), + ); + let client = ClientHostClient::boxed(conn); + client.rpc(Auth { caps: rcan }).await.unwrap(); + + let mut rx = client + .server_streaming(FetchLogsReq { max_bytes: None }, 16) + .await + .unwrap(); + + let mut got: Vec = Vec::new(); + while let Some(chunk) = rx.recv().await.expect("server stream irpc error") { + let bytes = chunk.expect("server returned RemoteError"); + got.extend_from_slice(&bytes); + } + assert_eq!(got, payload, "streamed bytes should match the file"); + + router.shutdown().await.unwrap(); + client_ep.close().await; + } + + /// Endpoints without `LogsCap::Fetch` get a `MissingCapability` error + /// on the stream and no data. + #[tokio::test] + async fn test_fetch_logs_rejects_missing_cap() { + let tmp = tempfile::tempdir().unwrap(); + let (collector, _layer, guard) = + logs::layer(FileLoggerConfig::new(tmp.path()).with_file_name_prefix("noaccess")) + .unwrap(); + drop(guard); + + let lookup = MemoryLookup::new(); + let server_ep = iroh::Endpoint::builder(presets::Minimal) + .address_lookup(lookup.clone()) + .bind() + .await + .unwrap(); + let client_ep = iroh::Endpoint::builder(presets::Minimal) + .address_lookup(lookup.clone()) + .bind() + .await + .unwrap(); + + let host = ClientHost::new(&server_ep).with_log_collector(collector); + let router = Router::builder(server_ep.clone()) + .accept(CLIENT_HOST_ALPN, host) + .spawn(); + + // Grant SetLevel, not Fetch. + let rcan = create_grant_token( + server_ep.secret_key().clone(), + client_ep.id(), + Duration::from_secs(3600), + Caps::new([LogsCap::SetLevel]), + ) + .unwrap(); + let conn = IrohLazyRemoteConnection::new( + client_ep.clone(), + server_ep.addr(), + CLIENT_HOST_ALPN.to_vec(), + ); + let client = ClientHostClient::boxed(conn); + client.rpc(Auth { caps: rcan }).await.unwrap(); + + let mut rx = client + .server_streaming(FetchLogsReq { max_bytes: None }, 4) + .await + .unwrap(); + + let first = rx + .recv() + .await + .expect("server stream irpc error") + .expect("stream should produce one error"); + assert!(matches!(first, Err(RemoteError::MissingCapability(_)))); + assert!( + rx.recv().await.expect("server stream irpc error").is_none(), + "stream should close after error", + ); + + router.shutdown().await.unwrap(); + client_ep.close().await; + } } diff --git a/src/lib.rs b/src/lib.rs index 517b1ac3..19598ea8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -33,6 +33,7 @@ mod client_host; pub mod api_secret; pub mod caps; +pub mod logs; pub mod net_diagnostics; pub mod protocol; diff --git a/src/logs.rs b/src/logs.rs new file mode 100644 index 00000000..02a7756f --- /dev/null +++ b/src/logs.rs @@ -0,0 +1,459 @@ +//! Client-side log collection: a `tracing-subscriber` layer that writes +//! structured log records to rolling files on the local filesystem, plus a +//! reload handle that lets the cloud control the level filter at runtime. +//! +//! # The cloud is the source of truth for the level +//! +//! The level filter starts at `off`. No tracing events are captured until the +//! cloud pushes a [`crate::protocol::SetLogLevel`] over the [`ClientHost`] +//! channel. The cloud sends one immediately after authenticating a connected +//! endpoint, derived from the per-endpoint `endpoint_log_settings` row (when +//! present) plus the project default. +//! +//! [`ClientHost`]: crate::ClientHost +//! +//! # Files live on the device +//! +//! Records land in rolling JSON files under a caller-supplied directory. +//! Operators view, ship, or aggregate them with whatever tooling they +//! already use (`tail`, `journalctl`, `vector`, etc.). +//! +//! # Typical usage +//! +//! ```no_run +//! use iroh_services::logs::{self, FileLoggerConfig}; +//! +//! # fn main() -> anyhow::Result<()> { +//! // Installs a global subscriber. The filter starts at `off`; the +//! // Client pulls the cloud-persisted directive right after Auth and +//! // applies it via the collector. The dashboard can also push live +//! // overrides via ClientHost::set_log_level after that. +//! let (collector, _guard) = logs::install(FileLoggerConfig::new("./logs"))?; +//! # let _ = collector; +//! # Ok(()) +//! # } +//! ``` +//! +//! Compose with additional layers (for example, a stderr fmt layer) via +//! [`layer`]: +//! +//! ```no_run +//! use iroh_services::logs::{self, FileLoggerConfig}; +//! use tracing_subscriber::prelude::*; +//! +//! # fn main() -> anyhow::Result<()> { +//! let (collector, file_layer, _guard) = logs::layer(FileLoggerConfig::new("./logs"))?; +//! tracing_subscriber::registry() +//! .with(file_layer) +//! .with(tracing_subscriber::fmt::layer()) +//! .try_init() +//! .ok(); +//! # let _ = collector; +//! # Ok(()) +//! # } +//! ``` + +use std::{ + path::{Path, PathBuf}, + sync::{Arc, Mutex}, +}; + +use n0_future::{ + task::{AbortOnDropHandle, JoinHandle}, + time::Duration, +}; +use tracing::{Subscriber, debug, warn}; +use tracing_subscriber::{ + EnvFilter, Layer, Registry, layer::SubscriberExt as _, registry::LookupSpan, reload, + util::SubscriberInitExt as _, +}; + +/// Errors that can occur while installing the log collector. +#[derive(Debug, thiserror::Error)] +pub enum InstallError { + /// The default tracing dispatcher is already set; install once at startup. + #[error("global tracing dispatcher is already set")] + AlreadyInstalled, + /// File logger setup failed (could not create directory, open appender, + /// etc.). + #[error("file logger setup failed: {0}")] + FileLogger(#[from] FileLoggerError), +} + +/// Errors that can occur while changing the active filter at runtime. +#[derive(Debug, thiserror::Error)] +pub enum SetFilterError { + /// The supplied directives string was rejected by `EnvFilter`. + #[error("invalid filter directives: {0}")] + InvalidDirectives(String), + /// Reloading the filter failed because the subscriber went away. + #[error("reload handle is no longer valid")] + ReloadFailed, +} + +/// Handle to the cloud-controlled tracing filter. Cheap to clone; all clones +/// share the same backing reload handle. +#[derive(Clone)] +pub struct LogCollector { + inner: Arc, +} + +struct CollectorInner { + reload_handle: reload::Handle, + revert_task: Mutex>>, + /// Directory where the rolling file appender writes. Used by + /// [`LogCollector::serve_fetch_logs`] to locate the current file. + log_dir: PathBuf, + /// Filename prefix the rolling appender uses; the date suffix is + /// appended for each rolled-over file. + file_name_prefix: String, +} + +/// Off-state directive. Nothing is captured until the cloud sends a +/// `SetLogLevel` with something more permissive. +const OFF_DIRECTIVES: &str = "off"; + +impl LogCollector { + /// Sets the active filter directives. When `expires_in` is set, + /// schedules a revert after that duration. The revert target is + /// `revert_to` when supplied; `None` means revert to `off`. + pub fn set_filter( + &self, + directives: &str, + expires_in: Option, + revert_to: Option<&str>, + ) -> Result<(), SetFilterError> { + let filter = EnvFilter::try_new(directives) + .map_err(|e| SetFilterError::InvalidDirectives(e.to_string()))?; + self.inner + .reload_handle + .reload(filter) + .map_err(|_| SetFilterError::ReloadFailed)?; + + let mut guard = self.inner.revert_task.lock().expect("poisoned"); + *guard = None; + + if let Some(expires_in) = expires_in { + let collector = self.clone(); + let revert_to = revert_to.map(str::to_string); + let handle: JoinHandle<()> = n0_future::task::spawn(async move { + n0_future::time::sleep(expires_in).await; + let target = revert_to.as_deref(); + if let Err(err) = collector.revert(target) { + warn!(?err, "failed to revert log filter"); + } + }); + *guard = Some(AbortOnDropHandle::new(handle)); + } + Ok(()) + } + + /// Reverts the active filter to `to`, or to the off state when `to` is + /// `None`. + pub fn revert(&self, to: Option<&str>) -> Result<(), SetFilterError> { + let directives = to.unwrap_or(OFF_DIRECTIVES); + let filter = EnvFilter::try_new(directives) + .map_err(|e| SetFilterError::InvalidDirectives(e.to_string()))?; + self.inner + .reload_handle + .reload(filter) + .map_err(|_| SetFilterError::ReloadFailed) + } + + /// Locate the newest rolling file in the configured log directory + /// whose name starts with the configured filename prefix. Returns + /// `Ok(None)` when the directory exists but no matching file is + /// present. Used by [`crate::ClientHost`] to serve [`FetchLogs`]. + /// + /// [`FetchLogs`]: crate::protocol::FetchLogs + pub(crate) fn current_log_file(&self) -> std::io::Result> { + let dir = &self.inner.log_dir; + let prefix = &self.inner.file_name_prefix; + let entries = match std::fs::read_dir(dir) { + Ok(e) => e, + Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(None), + Err(err) => return Err(err), + }; + let mut best: Option<(PathBuf, std::time::SystemTime)> = None; + for entry in entries.flatten() { + let name = entry.file_name(); + let name = name.to_string_lossy(); + if !name.starts_with(prefix) { + continue; + } + let Ok(meta) = entry.metadata() else { continue }; + if !meta.is_file() { + continue; + } + let mtime = meta.modified().unwrap_or(std::time::SystemTime::UNIX_EPOCH); + match &best { + Some((_, current)) if *current >= mtime => {} + _ => best = Some((entry.path(), mtime)), + } + } + Ok(best.map(|(p, _)| p)) + } +} + +impl std::fmt::Debug for LogCollector { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("LogCollector").finish_non_exhaustive() + } +} + +/// Installs a global tracing subscriber whose only output is a rolling +/// file appender under `config.dir`. The level filter starts at `off`; the +/// cloud must push a `SetLogLevel` for any events to be captured. +/// +/// Returns the [`LogCollector`] to hand to [`crate::ClientHost`] so it can +/// apply runtime overrides, and a [`WorkerGuard`] the caller must hold for +/// the lifetime of the process so the non-blocking writer flushes on exit. +/// +/// Call exactly once at process start. For composition with other layers, +/// use [`layer`]. +pub fn install(config: FileLoggerConfig) -> Result<(LogCollector, WorkerGuard), InstallError> { + let (collector, file_layer, guard) = layer(config)?; + tracing_subscriber::registry() + .with(file_layer) + .try_init() + .map_err(|_| InstallError::AlreadyInstalled)?; + debug!("iroh-services file logger installed"); + Ok((collector, guard)) +} + +/// Builds the cloud-controlled file layer and its [`LogCollector`] without +/// installing a global subscriber. Use this when composing the file layer +/// with other layers; the returned layer is pre-wrapped in the reloadable +/// filter so the cloud's `SetLogLevel` overrides take effect. +pub fn layer( + config: FileLoggerConfig, +) -> Result< + ( + LogCollector, + impl Layer + Send + Sync + 'static, + WorkerGuard, + ), + InstallError, +> { + // `EnvFilter::try_new("off")` cannot fail; "off" is always valid. + let filter = EnvFilter::try_new(OFF_DIRECTIVES).expect("'off' is always a valid directive"); + let (filter, reload_handle) = reload::Layer::new(filter); + + let log_dir = config.dir.clone(); + let file_name_prefix = config.file_name_prefix.clone(); + let (file_layer, guard) = file_layer::(config)?; + let layer = file_layer.with_filter(filter); + + let inner = Arc::new(CollectorInner { + reload_handle, + revert_task: Mutex::new(None), + log_dir, + file_name_prefix, + }); + let collector = LogCollector { inner }; + Ok((collector, layer, guard)) +} + +/// How often the rolling file appender starts a new file. +/// +/// Re-exported from `tracing-appender` so callers don't need to depend on it +/// directly. +pub use tracing_appender::rolling::Rotation; + +/// Guard returned by [`file_layer`] / [`layer`] / [`install`] that keeps the +/// non-blocking writer's worker thread alive. Drop this only at process +/// shutdown; once dropped, any buffered records still in flight are flushed +/// and the file layer stops accepting writes. +pub use tracing_appender::non_blocking::WorkerGuard; + +/// Errors raised when constructing the file logger. +#[derive(Debug, thiserror::Error)] +pub enum FileLoggerError { + /// Could not create the log directory or open the rolling appender. + #[error("file logger setup failed: {0}")] + Io(#[from] std::io::Error), + /// `tracing-appender`'s builder rejected the configuration (for example + /// an invalid filename prefix). + #[error("file logger builder rejected configuration: {0}")] + Builder(String), +} + +/// Configuration for the rolling file logger. +/// +/// Use [`FileLoggerConfig::new`] to set the destination directory and tune +/// the remaining fields with the with-style setters. The defaults are +/// daily rotation, a `iroh-services` filename prefix, and a 30-file +/// retention window. +#[derive(Debug, Clone)] +pub struct FileLoggerConfig { + dir: PathBuf, + rotation: Rotation, + file_name_prefix: String, + max_files: Option, +} + +impl FileLoggerConfig { + /// Build a config rooted at `dir`. The directory is created on first + /// write if it does not exist. + pub fn new>(dir: P) -> Self { + Self { + dir: dir.into(), + rotation: Rotation::DAILY, + file_name_prefix: "iroh-services".into(), + max_files: Some(30), + } + } + + /// Override the rotation cadence. Default: [`Rotation::DAILY`]. + pub fn with_rotation(mut self, rotation: Rotation) -> Self { + self.rotation = rotation; + self + } + + /// Override the file name stem. Rotation appends a date suffix to this. + /// Default: `iroh-services`. + pub fn with_file_name_prefix>(mut self, prefix: S) -> Self { + self.file_name_prefix = prefix.into(); + self + } + + /// Override the retention cap. `None` keeps every file forever; `Some(n)` + /// keeps at most `n` files and deletes the oldest on rotation. Default: + /// `Some(30)`. + pub fn with_max_files(mut self, max_files: Option) -> Self { + self.max_files = max_files; + self + } +} + +/// Builds an unfiltered tracing layer that writes records to a rolling +/// file under `config.dir`. Returns the layer plus a [`WorkerGuard`] the +/// caller must hold for the lifetime of the process — drop it at shutdown +/// so any buffered records flush before exit. +/// +/// Most callers want [`layer`] or [`install`] instead, which apply the +/// cloud-controlled `EnvFilter` reload handle. Use this when you want a +/// plain file appender with no cloud filter integration. +pub fn file_layer( + config: FileLoggerConfig, +) -> Result<(impl Layer + Send + Sync + 'static, WorkerGuard), FileLoggerError> +where + S: Subscriber + for<'a> LookupSpan<'a>, +{ + let FileLoggerConfig { + dir, + rotation, + file_name_prefix, + max_files, + } = config; + + create_dir_all(&dir)?; + + let mut builder = tracing_appender::rolling::RollingFileAppender::builder() + .rotation(rotation) + .filename_prefix(file_name_prefix); + if let Some(max) = max_files { + builder = builder.max_log_files(max); + } + let appender = builder + .build(&dir) + .map_err(|e| FileLoggerError::Builder(e.to_string()))?; + + let (writer, guard) = tracing_appender::non_blocking(appender); + let layer = tracing_subscriber::fmt::layer() + .with_writer(writer) + .with_ansi(false) + .json(); + Ok((layer, guard)) +} + +fn create_dir_all(dir: &Path) -> Result<(), FileLoggerError> { + std::fs::create_dir_all(dir).map_err(FileLoggerError::Io) +} + +#[cfg(test)] +mod tests { + use super::*; + + /// `file_layer` writes records to a file in the configured directory, + /// and the WorkerGuard flushes pending writes on drop. + #[test] + fn file_layer_writes_to_disk() { + use tracing::Dispatch; + use tracing_subscriber::{Registry, layer::SubscriberExt}; + + let tmp = tempfile::tempdir().unwrap(); + let (layer, guard) = file_layer::( + FileLoggerConfig::new(tmp.path()) + .with_file_name_prefix("test") + .with_max_files(Some(2)), + ) + .expect("file_layer setup"); + + let subscriber = Registry::default().with(layer); + let dispatch = Dispatch::new(subscriber); + tracing::dispatcher::with_default(&dispatch, || { + tracing::info!(target: "file_layer_test", "hello from the file logger"); + }); + drop(guard); + + let mut found = false; + for entry in std::fs::read_dir(tmp.path()).unwrap() { + let entry = entry.unwrap(); + if !entry.file_name().to_string_lossy().starts_with("test") { + continue; + } + let contents = std::fs::read_to_string(entry.path()).unwrap(); + if contents.contains("hello from the file logger") { + found = true; + break; + } + } + assert!(found, "expected log line to be written to a test.* file"); + } + + /// The cloud-controlled `layer` starts captured-nothing and only writes + /// after `set_filter` raises the level. Verifies the reload handle is + /// wired to the file layer end-to-end. + #[tokio::test(flavor = "current_thread")] + async fn cloud_filter_controls_file_writes() { + use tracing::Dispatch; + + let tmp = tempfile::tempdir().unwrap(); + let (collector, log_layer, guard) = + layer(FileLoggerConfig::new(tmp.path()).with_file_name_prefix("controlled")).unwrap(); + + let subscriber = Registry::default().with(log_layer); + let dispatch = Dispatch::new(subscriber); + tracing::dispatcher::with_default(&dispatch, || { + // Captured nothing yet — filter is "off". + tracing::info!(target: "logtest", "before-set"); + + collector + .set_filter("info", None, None) + .expect("set_filter to info"); + tracing::info!(target: "logtest", "after-set"); + }); + drop(guard); + + let mut combined = String::new(); + for entry in std::fs::read_dir(tmp.path()).unwrap() { + let entry = entry.unwrap(); + if entry + .file_name() + .to_string_lossy() + .starts_with("controlled") + { + combined.push_str(&std::fs::read_to_string(entry.path()).unwrap()); + } + } + assert!( + !combined.contains("before-set"), + "before-set should be filtered out, got: {combined}" + ); + assert!( + combined.contains("after-set"), + "after-set should be written after set_filter, got: {combined}" + ); + } +} diff --git a/src/protocol.rs b/src/protocol.rs index 40cbbf0e..5a4c7dc9 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -1,5 +1,8 @@ use anyhow::Result; -use irpc::{channel::oneshot, rpc_requests}; +use irpc::{ + channel::{mpsc, oneshot}, + rpc_requests, +}; use rcan::Rcan; use serde::{Deserialize, Serialize}; use uuid::Uuid; @@ -30,10 +33,13 @@ pub enum IrohServicesProtocol { #[rpc(tx=oneshot::Sender>)] NameEndpoint(NameEndpoint), + #[rpc(tx=oneshot::Sender>>)] + GetLogLevel(GetLogLevel), } -/// Dedicated protocol for cloud-to-endpoint net diagnostics connections. -#[rpc_requests(message = NetDiagnosticsMessage)] +/// Dedicated protocol for cloud-to-endpoint callbacks (net diagnostics, log +/// level overrides). +#[rpc_requests(message = ClientHostMessage)] #[derive(Debug, Serialize, Deserialize)] #[allow(clippy::large_enum_variant)] pub enum ClientHostProtocol { @@ -41,6 +47,10 @@ pub enum ClientHostProtocol { Auth(Auth), #[rpc(tx=oneshot::Sender>)] RunNetworkDiagnostics(RunNetworkDiagnostics), + #[rpc(tx=oneshot::Sender>)] + SetLogLevel(SetLogLevel), + #[rpc(tx=mpsc::Sender>>)] + FetchLogs(FetchLogs), } pub type RemoteResult = Result; @@ -104,3 +114,42 @@ pub struct GrantCap { pub struct NameEndpoint { pub name: String, } + +/// Ask the client to stream the contents of its currently-active rolling +/// log file. The client picks the newest file under its configured +/// log directory matching the configured filename prefix. +#[derive(Debug, Serialize, Deserialize)] +pub struct FetchLogs { + /// Stop after this many bytes have been streamed. `None` means stream + /// the whole current file. The cloud caller is expected to enforce its + /// own plan-tier cap on top of this. + #[serde(default)] + pub max_bytes: Option, +} + +/// Log-level filter settings. Used in two directions: +/// - As a cloud-to-client push (via the [`crate::ClientHost`] callback) +/// to apply a new override mid-session. +/// - As the response payload to [`GetLogLevel`] so the client can pull +/// the persisted setting on connect. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SetLogLevel { + /// `EnvFilter`-compatible directive string (for example + /// `"info,iroh=trace,iroh_blobs=debug"`). + pub directives: String, + /// If `Some`, the client reverts after this many seconds. If `None`, the + /// override is permanent until the next call. + pub expires_in_secs: Option, + /// Directives to revert to when the TTL fires. When `None`, the client + /// reverts to its install-time default. The cloud sends the project-wide + /// default here so per-endpoint overrides decay back to project policy + /// rather than to the client's own startup setting. + #[serde(default)] + pub revert_to: Option, +} + +/// Client-initiated request for the cloud's current log-level settings. +/// Sent right after auth so the client lands on the correct filter +/// without waiting for the cloud to push. +#[derive(Debug, Serialize, Deserialize)] +pub struct GetLogLevel;