diff --git a/examples/endpoint_meta.rs b/examples/endpoint_meta.rs new file mode 100644 index 00000000..f9cdef43 --- /dev/null +++ b/examples/endpoint_meta.rs @@ -0,0 +1,56 @@ +//! Endpoint metadata example. +//! +//! Demonstrates how to associate metadata with an endpoint via the iroh-services +//! Client: a human-readable `name`, a single `group`, and arbitrary key-value +//! `attributes`. Each can be set at build time via the [`ClientBuilder`], and +//! updated later through the `Client::set_*` methods. +//! +//! Run with: `IROH_SERVICES_API_SECRET=... cargo run --example endpoint_meta` +use iroh::{Endpoint, endpoint::presets}; +use iroh_services::Client; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt::init(); + + let endpoint = Endpoint::bind(presets::N0).await?; + + // Derive a unique name from the endpoint id so repeated runs don't collide + // in dashboards. In a real app this is typically a user id, machine name, + // or other stable identifier from your application. + let id = endpoint.id().to_string(); + let name = format!("endpoint-meta-example-{}", &id[..8]); + + // Set name, group, and attributes at build time. The client sends these + // immediately after authenticating with iroh-services. Validation errors + // (e.g. name too long) surface here; transport errors during startup are + // logged at `warn` level rather than failing the build. + let client = Client::builder(&endpoint) + .api_secret_from_env()? + .name(name)? + .group("examples")? + .attributes([ + ("env", "dev"), + ("region", "us-west"), + ("role", "endpoint-meta-example"), + ])? + .build() + .await?; + + client.ping().await?; + println!("endpoint registered with initial metadata"); + + // Each metadata field can also be updated after construction. These calls + // return explicit errors, unlike the builder which logs and continues. + client.set_name("endpoint-meta-example-renamed").await?; + client.set_group("staging").await?; + + // set_attributes fully replaces the prior set on each call. Pass an empty + // iterator to clear all attributes. + client + .set_attributes([("env", "staging"), ("region", "eu-central")]) + .await?; + + println!("metadata updated"); + Ok(()) +} diff --git a/src/client.rs b/src/client.rs index 44ef8ac2..bad7049c 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,4 +1,5 @@ use std::{ + collections::BTreeMap, str::FromStr, sync::{Arc, RwLock}, }; @@ -20,7 +21,7 @@ use crate::{ net_diagnostics::{DiagnosticsReport, checks::run_diagnostics}, protocol::{ ALPN, Auth, IrohServicesClient, NameEndpoint, Ping, Pong, PutMetrics, - PutNetworkDiagnostics, RemoteError, + PutNetworkDiagnostics, RemoteError, SetAttributes, SetGroup, }, }; @@ -64,6 +65,8 @@ pub struct ClientBuilder { cap: Option>, endpoint: Endpoint, name: Option, + group: Option, + attributes: Option>, metrics_interval: Option, remote: Option, registry: Registry, @@ -82,6 +85,8 @@ impl ClientBuilder { cap_expiry: DEFAULT_CAP_EXPIRY, endpoint: endpoint.clone(), name: None, + group: None, + attributes: None, metrics_interval: Some(Duration::from_secs(60)), remote: None, registry, @@ -131,6 +136,47 @@ impl ClientBuilder { Ok(self) } + /// Attach the endpoint to a single named group when the client first + /// authenticates. Group names follow the same rules as endpoint names + /// (2–128 bytes UTF-8). Errors during startup propagate as warn-level + /// logs; for explicit error handling use [`Client::set_group`]. + pub fn group(mut self, group: impl Into) -> Result { + let group = group.into(); + validate_name(&group).map_err(BuildError::InvalidGroup)?; + self.group = Some(group); + Ok(self) + } + + /// Attach arbitrary key-value attributes to the endpoint when the client + /// first authenticates. Accepts any iterable of `(key, value)` pairs: + /// + /// ```no_run + /// # use iroh::{Endpoint, endpoint::presets}; + /// # use iroh_services::Client; + /// # async fn example(endpoint: &Endpoint) -> anyhow::Result<()> { + /// let _ = Client::builder(endpoint).attributes([("env", "prod"), ("region", "us-west")])?; + /// # Ok(()) } + /// ``` + /// + /// Keys follow the same length rules as endpoint names (2–128 bytes); + /// values may be empty and are capped at 128 bytes; the map is limited + /// to 128 entries. Errors during startup propagate as warn-level logs; + /// for explicit error handling use [`Client::set_attributes`]. + pub fn attributes(mut self, attrs: I) -> Result + where + I: IntoIterator, + K: Into, + V: Into, + { + let collected: BTreeMap = attrs + .into_iter() + .map(|(k, v)| (k.into(), v.into())) + .collect(); + validate_attributes(&collected).map_err(BuildError::InvalidAttributes)?; + self.attributes = Some(collected); + Ok(self) + } + /// Check IROH_SERVICES_API_SECRET environment variable for a valid API secret pub fn api_secret_from_env(self) -> Result { let ticket = ApiSecret::from_env_var(API_SECRET_ENV_VAR_NAME)?; @@ -219,10 +265,19 @@ impl ClientBuilder { capabilities, client: irpc_client, name: self.name.clone(), + group: self.group.clone(), + attributes: self.attributes.clone().unwrap_or_default(), session_id: Uuid::new_v4(), authorized: false, } - .run(self.name, self.registry, self.metrics_interval, rx), + .run( + self.name, + self.group, + self.attributes, + self.registry, + self.metrics_interval, + rx, + ), )); Ok(Client { @@ -249,6 +304,10 @@ pub enum BuildError { Connect(ConnectError), #[error("Invalid endpoint name: {0}")] InvalidName(#[from] ValidateNameError), + #[error("Invalid endpoint group: {0}")] + InvalidGroup(ValidateNameError), + #[error("Invalid endpoint attributes: {0}")] + InvalidAttributes(#[from] ValidateAttributesError), } impl From for BuildError { @@ -291,10 +350,45 @@ fn validate_name(name: &str) -> Result<(), ValidateNameError> { } } +/// Maximum length in bytes for an attribute value. Values may be empty. +pub const CLIENT_ATTRIBUTE_VALUE_MAX_LENGTH: usize = 128; +/// Maximum number of entries allowed in the attributes map. +pub const CLIENT_ATTRIBUTES_MAX_COUNT: usize = 128; + +/// Error returned when an attributes map fails validation. +#[derive(Debug, thiserror::Error)] +pub enum ValidateAttributesError { + #[error("Too many attributes (must be no more than {CLIENT_ATTRIBUTES_MAX_COUNT}).")] + TooManyEntries, + #[error("Invalid attribute key: {0}")] + InvalidKey(#[from] ValidateNameError), + #[error( + "Attribute value too long (must be no more than {CLIENT_ATTRIBUTE_VALUE_MAX_LENGTH} bytes)." + )] + ValueTooLong, +} + +fn validate_attributes(attrs: &BTreeMap) -> Result<(), ValidateAttributesError> { + if attrs.len() > CLIENT_ATTRIBUTES_MAX_COUNT { + return Err(ValidateAttributesError::TooManyEntries); + } + for (k, v) in attrs { + validate_name(k)?; + if v.len() > CLIENT_ATTRIBUTE_VALUE_MAX_LENGTH { + return Err(ValidateAttributesError::ValueTooLong); + } + } + Ok(()) +} + #[derive(thiserror::Error, Debug)] pub enum Error { #[error("Invalid endpoint name: {0}")] InvalidName(#[from] ValidateNameError), + #[error("Invalid endpoint group: {0}")] + InvalidGroup(ValidateNameError), + #[error("Invalid endpoint attributes: {0}")] + InvalidAttributes(#[from] ValidateAttributesError), #[error("Remote error: {0}")] Remote(#[from] RemoteError), #[error("Connection error: {0}")] @@ -328,6 +422,45 @@ impl Client { set_name_inner(self.message_channel.clone(), name.into()).await } + /// Attach the active endpoint to a single named group cloud-side. + /// + /// Group names follow the same rules as endpoint names: any UTF-8 string, + /// minimum 2 bytes, maximum 128 bytes. **group uniqueness is not enforced.** + pub async fn set_group(&self, group: impl Into) -> Result<(), Error> { + set_group_inner(self.message_channel.clone(), group.into()).await + } + + /// Replace the arbitrary key-value attributes on the active endpoint cloud-side. + /// + /// Accepts any iterable of `(key, value)` pairs (arrays of tuples, `Vec`s, + /// `HashMap`s, `BTreeMap`s, etc.), so most calls fit on a single line: + /// + /// ```no_run + /// # use iroh_services::Client; + /// # async fn example(client: Client) -> anyhow::Result<()> { + /// client + /// .set_attributes([("env", "prod"), ("region", "us-west")]) + /// .await?; + /// # Ok(()) } + /// ``` + /// + /// Keys follow the same rules as endpoint names (2–128 bytes). Values may + /// be empty and are limited to 128 bytes. The map is limited to 128 + /// entries. Each call fully replaces the prior set; passing an empty + /// iterator clears all attributes. + pub async fn set_attributes(&self, attrs: I) -> Result<(), Error> + where + I: IntoIterator, + K: Into, + V: Into, + { + let collected: BTreeMap = attrs + .into_iter() + .map(|(k, v)| (k.into(), v.into())) + .collect(); + set_attributes_inner(self.message_channel.clone(), collected).await + } + /// Pings the remote node. pub async fn ping(&self) -> Result { let (tx, rx) = oneshot::channel(); @@ -432,12 +565,22 @@ enum ClientActorMessage { name: String, done: oneshot::Sender>, }, + SetGroup { + group: String, + done: oneshot::Sender>, + }, + SetAttributes { + attributes: BTreeMap, + done: oneshot::Sender>, + }, } struct ClientActor { capabilities: Rcan, client: IrohServicesClient, name: Option, + group: Option, + attributes: BTreeMap, session_id: Uuid, authorized: bool, } @@ -446,6 +589,8 @@ impl ClientActor { async fn run( mut self, initial_name: Option, + initial_group: Option, + initial_attributes: Option>, registry: Registry, interval: Option, mut inbox: tokio::sync::mpsc::Receiver, @@ -461,6 +606,18 @@ impl ClientActor { warn!(err = %err, "failed setting endpoint name on startup"); } + if let Some(group) = initial_group + && let Err(err) = self.send_set_group(group).await + { + warn!(err = %err, "failed setting endpoint group on startup"); + } + + if let Some(attributes) = initial_attributes + && let Err(err) = self.send_set_attributes(attributes).await + { + warn!(err = %err, "failed setting endpoint attributes on startup"); + } + loop { trace!("client actor tick"); tokio::select! { @@ -499,6 +656,18 @@ impl ClientActor { warn!("failed to name endpoint: {:#?}", err); } } + ClientActorMessage::SetGroup{ group, done } => { + let res = self.send_set_group(group).await; + if let Err(err) = done.send(res) { + warn!("failed to set group: {:#?}", err); + } + } + ClientActorMessage::SetAttributes{ attributes, done } => { + let res = self.send_set_attributes(attributes).await; + if let Err(err) = done.send(res) { + warn!("failed to set attributes: {:#?}", err); + } + } ClientActorMessage::PutNetworkDiagnostics{ report, done } => { let res = self.put_network_diagnostics(*report).await; if let Err(err) = done.send(res) { @@ -566,6 +735,39 @@ impl ClientActor { Ok(()) } + async fn send_set_group(&mut self, group: String) -> Result<(), RemoteError> { + trace!("client sending set group request"); + self.auth().await?; + + self.client + .rpc(SetGroup { + group: group.clone(), + }) + .await + .inspect_err(|e| debug!("set group error: {e}")) + .map_err(|_| RemoteError::InternalServerError)??; + self.group = Some(group); + Ok(()) + } + + async fn send_set_attributes( + &mut self, + attributes: BTreeMap, + ) -> Result<(), RemoteError> { + trace!("client sending set attributes request"); + self.auth().await?; + + self.client + .rpc(SetAttributes { + attributes: attributes.clone(), + }) + .await + .inspect_err(|e| debug!("set attributes error: {e}")) + .map_err(|_| RemoteError::InternalServerError)??; + self.attributes = attributes; + Ok(()) + } + async fn send_metrics(&mut self, encoder: &mut Encoder) -> Result<(), RemoteError> { trace!("client actor send metrics"); self.auth().await?; @@ -631,8 +833,45 @@ async fn set_name_inner( .map_err(Error::Remote) } +async fn set_group_inner( + message_channel: tokio::sync::mpsc::Sender, + group: String, +) -> Result<(), Error> { + validate_name(&group).map_err(Error::InvalidGroup)?; + debug!(group_len = group.len(), "calling set group"); + let (tx, rx) = oneshot::channel(); + message_channel + .send(ClientActorMessage::SetGroup { group, done: tx }) + .await + .map_err(|_| Error::Other(anyhow!("sending set group request")))?; + rx.await + .map_err(|e| Error::Other(anyhow!("response on internal channel: {:?}", e)))? + .map_err(Error::Remote) +} + +async fn set_attributes_inner( + message_channel: tokio::sync::mpsc::Sender, + attributes: BTreeMap, +) -> Result<(), Error> { + validate_attributes(&attributes)?; + debug!(attr_count = attributes.len(), "calling set attributes"); + let (tx, rx) = oneshot::channel(); + message_channel + .send(ClientActorMessage::SetAttributes { + attributes, + done: tx, + }) + .await + .map_err(|_| Error::Other(anyhow!("sending set attributes request")))?; + rx.await + .map_err(|e| Error::Other(anyhow!("response on internal channel: {:?}", e)))? + .map_err(Error::Remote) +} + #[cfg(test)] mod tests { + use std::collections::HashMap; + use iroh::{Endpoint, EndpointAddr, SecretKey, endpoint::presets}; use rand::{RngExt, SeedableRng}; use temp_env_vars::temp_env_vars; @@ -641,7 +880,10 @@ mod tests { Client, api_secret::ApiSecret, caps::{Cap, Caps}, - client::{API_SECRET_ENV_VAR_NAME, BuildError, ValidateNameError}, + client::{ + API_SECRET_ENV_VAR_NAME, BuildError, CLIENT_ATTRIBUTES_MAX_COUNT, + ValidateAttributesError, ValidateNameError, + }, }; #[tokio::test] @@ -728,4 +970,104 @@ mod tests { Some(BuildError::InvalidName(ValidateNameError::TooLong)) )); } + + #[tokio::test] + async fn test_group() { + let mut rng = rand::rngs::ChaCha8Rng::seed_from_u64(0); + let shared_secret = SecretKey::from_bytes(&rng.random()); + let fake_endpoint_id = SecretKey::from_bytes(&rng.random()).public(); + let api_secret = ApiSecret::new(shared_secret.clone(), fake_endpoint_id); + + let endpoint = Endpoint::builder(presets::Minimal).bind().await.unwrap(); + + let builder = Client::builder(&endpoint) + .group("staging") + .unwrap() + .api_secret(api_secret) + .unwrap(); + + assert_eq!(builder.group, Some("staging".to_string())); + + let Err(err) = Client::builder(&endpoint).group("a") else { + panic!("group should fail for strings under 2 bytes"); + }; + assert!(matches!( + err.downcast_ref::(), + Some(BuildError::InvalidGroup(ValidateNameError::TooShort)) + )); + + let too_long_group = "👋".repeat(129); + let Err(err) = Client::builder(&endpoint).group(&too_long_group) else { + panic!("group should fail for strings over 128 bytes"); + }; + assert!(matches!( + err.downcast_ref::(), + Some(BuildError::InvalidGroup(ValidateNameError::TooLong)) + )); + } + + #[tokio::test] + async fn test_attributes() { + let endpoint = Endpoint::builder(presets::Minimal).bind().await.unwrap(); + + // empty iterator is accepted (clears attributes server-side) + let builder = Client::builder(&endpoint) + .attributes(std::iter::empty::<(String, String)>()) + .unwrap(); + assert_eq!(builder.attributes.as_ref().map(|m| m.len()), Some(0)); + + // array literal of `&str` tuples — the one-liner ergonomics + let builder = Client::builder(&endpoint) + .attributes([("env", "prod"), ("region", "us-west")]) + .unwrap(); + let attrs = builder.attributes.as_ref().expect("attributes set"); + assert_eq!(attrs.get("env").map(String::as_str), Some("prod")); + assert_eq!(attrs.get("region").map(String::as_str), Some("us-west")); + + // HashMap also works + let mut map: HashMap = HashMap::new(); + map.insert("k1".into(), "v1".into()); + map.insert("k2".into(), "".into()); // empty value is allowed + let builder = Client::builder(&endpoint).attributes(map).unwrap(); + let attrs = builder.attributes.as_ref().expect("attributes set"); + assert_eq!(attrs.get("k2").map(String::as_str), Some("")); + + // value over 128 bytes errors + let too_long_value = "x".repeat(129); + let Err(err) = Client::builder(&endpoint).attributes([("ok", too_long_value.as_str())]) + else { + panic!("attributes should fail for value over 128 bytes"); + }; + assert!(matches!( + err.downcast_ref::(), + Some(BuildError::InvalidAttributes( + ValidateAttributesError::ValueTooLong + )) + )); + + // key under 2 bytes errors + let Err(err) = Client::builder(&endpoint).attributes([("a", "v")]) else { + panic!("attributes should fail for key under 2 bytes"); + }; + assert!(matches!( + err.downcast_ref::(), + Some(BuildError::InvalidAttributes( + ValidateAttributesError::InvalidKey(ValidateNameError::TooShort) + )) + )); + + // more than 128 entries errors + let big: Vec<(String, String)> = (0..(CLIENT_ATTRIBUTES_MAX_COUNT + 1)) + .map(|i| (format!("key_{i:04}"), format!("val_{i}"))) + .collect(); + let Err(err) = Client::builder(&endpoint).attributes(big) else { + panic!("attributes should fail for more than 128 entries"); + }; + assert!(matches!( + err.downcast_ref::(), + Some(BuildError::InvalidAttributes( + ValidateAttributesError::TooManyEntries + )) + )); + } } diff --git a/src/protocol.rs b/src/protocol.rs index 40cbbf0e..cc9c1a93 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -1,3 +1,5 @@ +use std::collections::BTreeMap; + use anyhow::Result; use irpc::{channel::oneshot, rpc_requests}; use rcan::Rcan; @@ -30,6 +32,12 @@ pub enum IrohServicesProtocol { #[rpc(tx=oneshot::Sender>)] NameEndpoint(NameEndpoint), + + #[rpc(tx=oneshot::Sender>)] + SetGroup(SetGroup), + + #[rpc(tx=oneshot::Sender>)] + SetAttributes(SetAttributes), } /// Dedicated protocol for cloud-to-endpoint net diagnostics connections. @@ -104,3 +112,15 @@ pub struct GrantCap { pub struct NameEndpoint { pub name: String, } + +/// Attach the client endpoint to a single named group cloud-side. +#[derive(Debug, Serialize, Deserialize)] +pub struct SetGroup { + pub group: String, +} + +/// Replace the arbitrary key-value attributes on the client endpoint cloud-side. +#[derive(Debug, Serialize, Deserialize)] +pub struct SetAttributes { + pub attributes: BTreeMap, +}