diff --git a/Cargo.lock b/Cargo.lock index 54997446..10546bcb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5821,6 +5821,27 @@ dependencies = [ "zeroize", ] +[[package]] +name = "pluto-infosync" +version = "1.7.1" +dependencies = [ + "async-trait", + "chrono", + "futures", + "libp2p", + "pluto-consensus", + "pluto-core", + "pluto-featureset", + "pluto-p2p", + "pluto-parsigex", + "pluto-peerinfo", + "pluto-priority", + "pluto-testutil", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "pluto-k1util" version = "1.7.1" diff --git a/Cargo.toml b/Cargo.toml index 641e28d1..4264df4f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ members = [ "crates/peerinfo", "crates/frost", "crates/priority", + "crates/infosync", ] # Vendored fork consumed only via [patch.crates-io]; excluded so it builds/tests # standalone (its upstream code isn't written to this workspace's lints) without @@ -139,6 +140,8 @@ pluto-tracing = { path = "crates/tracing" } pluto-p2p = { path = "crates/p2p" } pluto-peerinfo = { path = "crates/peerinfo" } pluto-frost = { path = "crates/frost" } +pluto-priority = { path = "crates/priority" } +pluto-infosync = { path = "crates/infosync" } [workspace.lints.rust] missing_docs = "deny" diff --git a/crates/core/src/types.rs b/crates/core/src/types.rs index b7af6f3c..710a1fba 100644 --- a/crates/core/src/types.rs +++ b/crates/core/src/types.rs @@ -303,8 +303,15 @@ impl TryFrom<&pbcore::Duty> for Duty { } /// The type of proposal. -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] -#[serde(rename_all = "snake_case")] +/// +/// An open set: values not recognised by this binary are preserved as +/// [`ProposalType::Unknown`] rather than dropped, so cluster-agreed proposal +/// types from newer peers survive round-trips. +/// +/// (De)serialized as its wire-format string via the `String` conversions below, +/// so unknown values round-trip verbatim. +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[serde(from = "String", into = "String")] pub enum ProposalType { /// Full proposal type. Full, @@ -312,6 +319,55 @@ pub enum ProposalType { Builder, /// Synthetic proposal type. Synthetic, + /// A proposal type not recognised by this binary, holding its raw wire + /// string. + Unknown(String), +} + +impl ProposalType { + /// Returns the wire-format string for this proposal type. + /// + /// The strings for the known variants MUST NOT change: they are exchanged + /// on the wire (e.g. by the priority/infosync protocols) and changing + /// them breaks compatibility. + pub fn as_str(&self) -> &str { + match self { + ProposalType::Full => "full", + ProposalType::Builder => "builder", + ProposalType::Synthetic => "synthetic", + ProposalType::Unknown(s) => s, + } + } +} + +impl From for ProposalType { + /// Parses a wire string, mapping unrecognised values to + /// [`ProposalType::Unknown`] and reusing the allocation. + fn from(value: String) -> Self { + match value.as_str() { + "full" => ProposalType::Full, + "builder" => ProposalType::Builder, + "synthetic" => ProposalType::Synthetic, + _ => ProposalType::Unknown(value), + } + } +} + +impl From<&str> for ProposalType { + fn from(value: &str) -> Self { + ProposalType::from(value.to_owned()) + } +} + +impl From for String { + /// Returns the wire-format string, reusing the [`ProposalType::Unknown`] + /// allocation. + fn from(value: ProposalType) -> Self { + match value { + ProposalType::Unknown(s) => s, + other => other.as_str().to_owned(), + } + } } // In golang implementation they use pk_len = 98, which is 0x + [48 bytes] @@ -975,6 +1031,35 @@ mod tests { assert_eq!(DutyType::InfoSync.to_string(), "info_sync"); } + #[test] + fn proposal_type_wire_round_trip() { + for (pt, s) in [ + (ProposalType::Full, "full"), + (ProposalType::Builder, "builder"), + (ProposalType::Synthetic, "synthetic"), + ] { + assert_eq!(pt.as_str(), s); + assert_eq!(ProposalType::from(s), pt); + } + + // Unrecognised wire strings are preserved as Unknown, not dropped. + let unknown = ProposalType::from("future_type"); + assert_eq!(unknown, ProposalType::Unknown("future_type".to_owned())); + assert_eq!(unknown.as_str(), "future_type"); + } + + #[test] + fn proposal_type_serde_is_wire_string() { + assert_eq!( + serde_json::to_string(&ProposalType::Builder).expect("serialize"), + "\"builder\"" + ); + assert_eq!( + serde_json::from_str::("\"future_type\"").expect("deserialize"), + ProposalType::Unknown("future_type".to_owned()) + ); + } + #[test] fn duty_type_is_valid() { assert!(!DutyType::Unknown.is_valid()); diff --git a/crates/infosync/Cargo.toml b/crates/infosync/Cargo.toml new file mode 100644 index 00000000..1f72ea94 --- /dev/null +++ b/crates/infosync/Cargo.toml @@ -0,0 +1,29 @@ +[package] +name = "pluto-infosync" +version.workspace = true +edition.workspace = true +repository.workspace = true +license.workspace = true +publish.workspace = true + +[dependencies] +pluto-core.workspace = true +pluto-featureset.workspace = true +pluto-priority.workspace = true +tokio-util.workspace = true +tracing.workspace = true + +[dev-dependencies] +async-trait.workspace = true +chrono.workspace = true +futures.workspace = true +libp2p.workspace = true +pluto-consensus.workspace = true +pluto-p2p.workspace = true +pluto-parsigex.workspace = true +pluto-peerinfo.workspace = true +pluto-testutil.workspace = true +tokio = { workspace = true, features = ["rt-multi-thread", "macros", "time", "sync"] } + +[lints] +workspace = true diff --git a/crates/infosync/src/lib.rs b/crates/infosync/src/lib.rs new file mode 100644 index 00000000..8d07288e --- /dev/null +++ b/crates/infosync/src/lib.rs @@ -0,0 +1,506 @@ +//! # Infosync +//! +//! A simple use-case of the [priority protocol](pluto_priority) that +//! prioritises cluster-wide supported versions, protocols, and proposal types. +//! +//! Each epoch the node triggers a prioritisation across the cluster (via +//! [`Component::trigger`]); the resulting cluster-agreed values are stored per +//! slot and queried with [`Component::protocols`] and [`Component::proposals`]. + +use std::sync::{Arc, Mutex}; + +use pluto_core::{ + types::{Duty, ProposalType, SlotNumber}, + version::SemVer, +}; +use pluto_featureset::{Feature, GLOBAL_STATE}; +use pluto_priority::{Component as Prioritiser, TopicProposal, TopicResult}; +use tokio_util::sync::CancellationToken; + +/// Priority topic carrying the cluster's supported [`SemVer`] versions. +const TOPIC_VERSION: &str = "version"; +/// Priority topic carrying the cluster's supported protocol ids. +/// +/// Exported so callers (e.g. consensus-protocol selection) can match results by +/// topic. +pub const TOPIC_PROTOCOL: &str = "protocol"; +/// Priority topic carrying the cluster's supported [`ProposalType`]s. +const TOPIC_PROPOSAL: &str = "proposal"; + +/// Eviction threshold for stored results. The oldest entry is dropped once the +/// stored count reaches this value (a `>=` check), so the retained history is +/// effectively capped at `MAX_RESULTS - 1` (99). +const MAX_RESULTS: usize = 100; + +/// Mock alpha protocol appended when the `MockAlpha` feature is enabled, used +/// to exercise infosync in production. +const MOCK_ALPHA_PROTOCOL: &str = "/charon/mock_alpha/1.0.0"; + +/// A cluster-wide agreed-upon infosync result for a single slot. +#[derive(Clone, Debug, PartialEq, Eq)] +struct InfoResult { + slot: SlotNumber, + versions: Vec, + protocols: Vec, + proposals: Vec, +} + +/// Shared store of agreed-upon results, accessed by both the public getters and +/// the priority subscribe callback. Holds the local protocol list so +/// [`ResultStore::protocols`] can fall back to it when no result applies. +struct ResultStore { + local_protocols: Vec, + results: Mutex>, +} + +impl ResultStore { + fn new(local_protocols: Vec) -> Self { + Self { + local_protocols, + results: Mutex::new(Vec::new()), + } + } + + fn local_protocols(&self) -> &[String] { + &self.local_protocols + } + + /// Folds a decided priority result into a stored [`InfoResult`]. + /// + /// Versions and protocols are stored as their raw agreed wire strings; + /// proposals are parsed into [`ProposalType`], with unrecognised values + /// preserved as [`ProposalType::Unknown`] rather than dropped. The result + /// is only stored when at least one version was agreed upon. + fn handle_results(&self, duty: &Duty, results: &[TopicResult]) { + let mut res = InfoResult { + slot: duty.slot, + versions: Vec::new(), + protocols: Vec::new(), + proposals: Vec::new(), + }; + + for result in results { + for prio in result.priorities_only() { + match result.topic.as_str() { + TOPIC_VERSION => res.versions.push(prio), + TOPIC_PROTOCOL => res.protocols.push(prio), + TOPIC_PROPOSAL => res.proposals.push(ProposalType::from(prio)), + _ => {} + } + } + } + + tracing::debug!(slot = %duty.slot, ?results, "Infosync completed"); + + if !res.versions.is_empty() { + self.add_result(res); + } + } + + /// Adds a result unless it is identical to the most recent one. Once the + /// stored count reaches [`MAX_RESULTS`] the oldest entry is dropped (see + /// that constant for the resulting cap). + fn add_result(&self, result: InfoResult) { + let mut results = self + .results + .lock() + .expect("infosync results mutex poisoned"); + + if results.last() == Some(&result) { + // Identical to previous, so don't add. + return; + } + + results.push(result); + + if results.len() >= MAX_RESULTS { + results.remove(0); + } + } + + /// Latest cluster-wide supported protocols at or before `slot`, falling + /// back to the local protocols when no earlier result exists. + fn protocols(&self, slot: SlotNumber) -> Vec { + let results = self + .results + .lock() + .expect("infosync results mutex poisoned"); + + let mut resp = self.local_protocols.clone(); + for result in results.iter() { + if result.slot > slot { + break; + } + resp = result.protocols.clone(); + } + + resp + } + + /// Latest cluster-wide supported proposal types at or before `slot`, + /// falling back to the default `[ProposalType::Full]` when no earlier + /// result exists. + fn proposals(&self, slot: SlotNumber) -> Vec { + let results = self + .results + .lock() + .expect("infosync results mutex poisoned"); + + let mut resp = vec![ProposalType::Full]; + for result in results.iter() { + if result.slot > slot { + break; + } + resp = result.proposals.clone(); + } + + resp + } +} + +/// Infosync component: prioritises and tracks cluster-wide supported versions, +/// protocols, and proposal types. +pub struct Component { + versions: Vec, + proposals: Vec, + store: Arc, + prioritiser: Arc, +} + +impl Component { + /// Returns a new infosync component. + /// + /// Registers a subscriber on `prioritiser` that records decided results. + /// The local `protocols` are augmented with a mock alpha protocol when the + /// `MockAlpha` feature is enabled, to exercise infosync in production. + pub fn new( + prioritiser: Arc, + versions: Vec, + protocols: Vec, + proposals: Vec, + ) -> Self { + let store = Arc::new(ResultStore::new(augment_protocols( + protocols, + mock_alpha_enabled(), + ))); + + let cb_store = Arc::clone(&store); + prioritiser.subscribe(Box::new(move |duty, results| { + cb_store.handle_results(&duty, &results); + Ok(()) + })); + + Self { + versions, + proposals, + store, + prioritiser, + } + } + + /// Returns the latest cluster-wide supported protocols at or before `slot`. + /// + /// Returns the local protocols if no earlier results are available. + pub fn protocols(&self, slot: SlotNumber) -> Vec { + self.store.protocols(slot) + } + + /// Returns the latest cluster-wide supported proposal types at or before + /// `slot`. + /// + /// Returns the default `[ProposalType::Full]` if no earlier results are + /// available. Values this binary does not recognise are preserved as + /// [`ProposalType::Unknown`] rather than dropped. + pub fn proposals(&self, slot: SlotNumber) -> Vec { + self.store.proposals(slot) + } + + /// Triggers a cluster-wide prioritisation of the local versions, protocols, + /// and proposal types for `slot`. + pub async fn trigger( + &self, + ctx: CancellationToken, + slot: SlotNumber, + ) -> pluto_priority::Result<()> { + let (duty, proposals) = build_request( + &self.versions, + self.store.local_protocols(), + &self.proposals, + slot, + ); + + self.prioritiser.prioritise(duty, &proposals, ctx).await + } +} + +/// Returns the versions as their string representations. +fn versions_to_strings(versions: &[SemVer]) -> Vec { + versions.iter().map(|v| v.to_string()).collect() +} + +/// Returns the proposal types as their wire-format strings. +fn proposals_to_strings(proposals: &[ProposalType]) -> Vec { + proposals.iter().map(|p| p.as_str().to_owned()).collect() +} + +/// Builds the info-sync duty and topic proposals sent by +/// [`Component::trigger`]. +/// +/// Split out as a free function so the wire payload — topics, priority +/// ordering, and the info-sync duty — is testable without a live prioritiser. +fn build_request( + versions: &[SemVer], + protocols: &[String], + proposals: &[ProposalType], + slot: SlotNumber, +) -> (Duty, Vec) { + let topics = vec![ + TopicProposal { + topic: TOPIC_VERSION.to_owned(), + priorities: versions_to_strings(versions), + }, + TopicProposal { + topic: TOPIC_PROTOCOL.to_owned(), + priorities: protocols.to_vec(), + }, + TopicProposal { + topic: TOPIC_PROPOSAL.to_owned(), + priorities: proposals_to_strings(proposals), + }, + ]; + + (Duty::new_info_sync_duty(slot), topics) +} + +/// Appends the mock alpha protocol when `mock_alpha` is enabled, used to +/// exercise infosync in production. +fn augment_protocols(mut protocols: Vec, mock_alpha: bool) -> Vec { + if mock_alpha { + protocols.push(MOCK_ALPHA_PROTOCOL.to_owned()); + } + + protocols +} + +/// Returns whether the `MockAlpha` feature is globally enabled. +fn mock_alpha_enabled() -> bool { + GLOBAL_STATE + .read() + .expect("global feature set lock poisoned") + .enabled(Feature::MockAlpha) +} + +#[cfg(test)] +mod tests { + use pluto_core::types::DutyType; + use pluto_priority::ScoredPriority; + + use super::*; + + fn scored(values: &[&str]) -> Vec { + values + .iter() + .enumerate() + .map(|(i, v)| ScoredPriority { + priority: (*v).to_owned(), + score: i64::try_from(i).expect("test index fits i64"), + }) + .collect() + } + + fn topic_result(topic: &str, values: &[&str]) -> TopicResult { + TopicResult { + topic: topic.to_owned(), + priorities: scored(values), + } + } + + fn slot(n: u64) -> SlotNumber { + SlotNumber::new(n) + } + + fn info_result( + s: u64, + versions: &[&str], + protocols: &[&str], + proposals: &[&str], + ) -> InfoResult { + InfoResult { + slot: slot(s), + versions: versions.iter().map(|v| (*v).to_owned()).collect(), + protocols: protocols.iter().map(|p| (*p).to_owned()).collect(), + proposals: proposals.iter().map(|p| ProposalType::from(*p)).collect(), + } + } + + #[test] + fn versions_to_strings_maps_display() { + let versions = vec![ + SemVer::parse("v1.7").expect("valid"), + SemVer::parse("v1.6").expect("valid"), + ]; + assert_eq!(versions_to_strings(&versions), vec!["v1.7", "v1.6"]); + } + + #[test] + fn proposals_to_strings_maps_wire_format() { + let proposals = vec![ProposalType::Builder, ProposalType::Full]; + assert_eq!(proposals_to_strings(&proposals), vec!["builder", "full"]); + } + + #[test] + fn augment_protocols_appends_mock_alpha_when_enabled() { + let base = vec!["proto-a".to_owned()]; + assert_eq!(augment_protocols(base.clone(), false), vec!["proto-a"]); + assert_eq!( + augment_protocols(base, true), + vec!["proto-a", MOCK_ALPHA_PROTOCOL] + ); + } + + #[test] + fn build_request_builds_infosync_duty_and_topic_proposals() { + let versions = vec![SemVer::parse("v1.7").expect("valid")]; + let protocols = vec!["proto-a".to_owned(), "proto-b".to_owned()]; + let proposals = vec![ProposalType::Builder, ProposalType::Full]; + + let (duty, topics) = build_request(&versions, &protocols, &proposals, slot(42)); + + // The duty is the info-sync duty for the requested slot. + assert_eq!(duty, Duty::new_info_sync_duty(slot(42))); + assert_eq!(duty.duty_type, DutyType::InfoSync); + + // One topic proposal per dimension, in order, carrying wire strings. + assert_eq!(topics.len(), 3); + assert_eq!(topics[0].topic, TOPIC_VERSION); + assert_eq!(topics[0].priorities, vec!["v1.7"]); + assert_eq!(topics[1].topic, TOPIC_PROTOCOL); + assert_eq!(topics[1].priorities, vec!["proto-a", "proto-b"]); + assert_eq!(topics[2].topic, TOPIC_PROPOSAL); + assert_eq!(topics[2].priorities, vec!["builder", "full"]); + } + + #[test] + fn protocols_defaults_to_local() { + let store = ResultStore::new(vec!["a".to_owned(), "b".to_owned()]); + assert_eq!(store.protocols(slot(10)), vec!["a", "b"]); + } + + #[test] + fn proposals_defaults_to_full() { + let store = ResultStore::new(Vec::new()); + assert_eq!(store.proposals(slot(10)), vec![ProposalType::Full]); + } + + #[test] + fn getters_select_latest_result_at_or_before_slot() { + let store = ResultStore::new(vec!["local".to_owned()]); + store.add_result(info_result(5, &["v1.7"], &["p5"], &["builder"])); + store.add_result(info_result(10, &["v1.7"], &["p10"], &["synthetic"])); + + // Before any result: local default / full default. + assert_eq!(store.protocols(slot(4)), vec!["local"]); + assert_eq!(store.proposals(slot(4)), vec![ProposalType::Full]); + + // At/after slot 5 but before 10: the slot-5 result. + assert_eq!(store.protocols(slot(5)), vec!["p5"]); + assert_eq!(store.proposals(slot(9)), vec![ProposalType::Builder]); + + // At/after slot 10: the slot-10 result. + assert_eq!(store.protocols(slot(10)), vec!["p10"]); + assert_eq!(store.proposals(slot(100)), vec![ProposalType::Synthetic]); + } + + #[test] + fn add_result_dedups_consecutive_identical() { + let store = ResultStore::new(Vec::new()); + let r = info_result(1, &["v1.7"], &["p"], &["full"]); + + store.add_result(r.clone()); + store.add_result(r.clone()); + assert_eq!(store.results.lock().expect("lock").len(), 1); + + // A different result is appended. + store.add_result(info_result(2, &["v1.7"], &["p"], &["full"])); + assert_eq!(store.results.lock().expect("lock").len(), 2); + + // The same content as the last is again deduped. + store.add_result(info_result(2, &["v1.7"], &["p"], &["full"])); + assert_eq!(store.results.lock().expect("lock").len(), 2); + } + + #[test] + fn add_result_caps_history() { + // Push well past MAX_RESULTS (100) with distinct slots. + const PUSH_COUNT: u64 = 150; + let store = ResultStore::new(Vec::new()); + for i in 0..PUSH_COUNT { + store.add_result(info_result(i, &["v1.7"], &["p"], &["full"])); + } + + let results = store.results.lock().expect("lock"); + assert!( + results.len() < MAX_RESULTS, + "history capped below MAX_RESULTS" + ); + // Oldest entries were dropped; the newest slot (149) is retained. + assert_eq!(results.last().expect("non-empty").slot, slot(149)); + } + + #[test] + fn handle_results_routes_topics_and_stores() { + let store = ResultStore::new(vec!["local".to_owned()]); + let duty = Duty::new_info_sync_duty(slot(7)); + let results = vec![ + topic_result(TOPIC_VERSION, &["v1.7", "v1.6"]), + topic_result(TOPIC_PROTOCOL, &["proto-a", "proto-b"]), + topic_result(TOPIC_PROPOSAL, &["builder", "full"]), + ]; + + store.handle_results(&duty, &results); + + assert_eq!(store.protocols(slot(7)), vec!["proto-a", "proto-b"]); + assert_eq!( + store.proposals(slot(7)), + vec![ProposalType::Builder, ProposalType::Full] + ); + } + + #[test] + fn handle_results_preserves_unknown_proposal_and_skips_unknown_topic() { + let store = ResultStore::new(Vec::new()); + let duty = Duty::new_info_sync_duty(slot(1)); + let results = vec![ + topic_result(TOPIC_VERSION, &["v1.7"]), + topic_result(TOPIC_PROPOSAL, &["builder", "future_type", "full"]), + topic_result("unknown-topic", &["ignored"]), + ]; + + store.handle_results(&duty, &results); + + // Unknown proposal types are preserved as `Unknown` (not dropped); only + // the unrecognised topic is ignored. + assert_eq!( + store.proposals(slot(1)), + vec![ + ProposalType::Builder, + ProposalType::Unknown("future_type".to_owned()), + ProposalType::Full, + ] + ); + } + + #[test] + fn handle_results_without_versions_is_not_stored() { + let store = ResultStore::new(vec!["local".to_owned()]); + let duty = Duty::new_info_sync_duty(slot(3)); + // No version topic → no agreed version → result discarded. + let results = vec![topic_result(TOPIC_PROTOCOL, &["proto-a"])]; + + store.handle_results(&duty, &results); + + assert!(store.results.lock().expect("lock").is_empty()); + // Falls back to local protocols since nothing was stored. + assert_eq!(store.protocols(slot(3)), vec!["local"]); + } +} diff --git a/crates/infosync/tests/infosync_integration.rs b/crates/infosync/tests/infosync_integration.rs new file mode 100644 index 00000000..f315e734 --- /dev/null +++ b/crates/infosync/tests/infosync_integration.rs @@ -0,0 +1,356 @@ +//! Three-host integration test for infosync. +//! +//! Rather than booting full multi-node apps (infosync is not yet wired into the +//! app), it drives three in-process libp2p hosts at the infosync+priority +//! layer. Each host runs a real priority exchange over a shared +//! "decide-on-first" consensus and triggers infosync for the same slot with +//! identical inputs; the test asserts every host converges on the same +//! cluster-wide version/protocol/proposal priorities. + +use std::{ + collections::{HashMap, HashSet}, + sync::{Arc, Mutex}, + time::Duration, +}; + +use async_trait::async_trait; +use futures::{FutureExt as _, StreamExt as _, future::select_all}; +use libp2p::{ + Multiaddr, PeerId, Swarm, SwarmBuilder, + core::{Transport as _, transport::MemoryTransport, upgrade::Version}, + multiaddr::Protocol, + swarm::SwarmEvent, +}; +use pluto_core::{ + corepb::v1::priority::PriorityResult, + deadline::{DeadlineCalculator, DeadlineError}, + types::{Duty, DutyType, ProposalType, SlotNumber}, + version::SUPPORTED, +}; +use pluto_infosync::Component as InfoSync; +use pluto_p2p::{p2p_context::P2PContext, peer::peer_id_from_key, utils::keypair_from_secret_key}; +use pluto_priority::{ + Consensus, ConsensusError, PrioritySubscriber, TopicResult, new_component, p2p::Behaviour, +}; +use pluto_testutil::random::generate_insecure_k1_key; +use tokio::{sync::mpsc, time::timeout}; +use tokio_util::sync::CancellationToken; + +/// Calculator that schedules every duty one hour out, so triggered infosync +/// duties never expire mid-test. +struct FutureCalculator; + +impl DeadlineCalculator for FutureCalculator { + fn deadline( + &self, + _duty: &Duty, + ) -> Result>, DeadlineError> { + Ok(Some( + chrono::Utc::now() + .checked_add_signed(chrono::Duration::hours(1)) + .expect("deadline in range"), + )) + } +} + +/// Mock consensus that decides on the first proposal per duty by invoking its +/// subscribers, and asserts every subsequent proposal for that duty is +/// identical. Shared across all hosts so they all observe the same decision. +#[derive(Default)] +struct TestConsensus { + subs: Mutex>, + proposed: Mutex>, +} + +#[async_trait] +impl Consensus for TestConsensus { + async fn propose_priority( + &self, + duty: Duty, + result: PriorityResult, + _ct: &CancellationToken, + ) -> Result<(), ConsensusError> { + let slot = duty.slot.inner(); + + // Claim the decision atomically before fanning out: hold the lock across + // the check and the insert so two concurrent first proposals for the + // same duty cannot both notify subscribers. Later proposals see the + // recorded result and assert it is identical. + { + let mut proposed = self.proposed.lock().expect("proposed mutex"); + if let Some(prev) = proposed.get(&slot) { + assert_eq!( + prev.topics, result.topics, + "all proposals for a duty must be identical" + ); + return Ok(()); + } + proposed.insert(slot, result.clone()); + } + + let subs = self.subs.lock().expect("subs mutex"); + for sub in subs.iter() { + sub(duty.clone(), result.clone())?; + } + Ok(()) + } + + fn subscribe_priority(&self, callback: PrioritySubscriber) { + self.subs.lock().expect("subs mutex").push(callback); + } +} + +/// In-process `/memory/` address (non-zero so the kernel does not +/// auto-assign). +fn memory_addr(seed: u8) -> Multiaddr { + Multiaddr::empty().with(Protocol::Memory(u64::from(seed) + 1)) +} + +/// The full set of protocol ids a node advertises, aggregated across components +/// (consensus, parsigex, peerinfo, priority) in that order — the same set and +/// order a node would prioritise via infosync, built without standing up an +/// app. +fn app_protocols() -> Vec { + let mut resp: Vec = Vec::new(); + resp.extend( + pluto_consensus::protocols::protocols() + .iter() + .map(ToString::to_string), + ); + resp.extend(pluto_parsigex::protocols().iter().map(ToString::to_string)); + resp.extend(pluto_peerinfo::protocols().iter().map(ToString::to_string)); + resp.extend(pluto_priority::protocols().iter().map(ToString::to_string)); + resp +} + +/// Decided topic results captured per host, alongside the host index and duty. +type Captured = (usize, Duty, Vec); + +/// A built host: its swarm and the infosync component driving it. +struct Host { + swarm: Swarm, + infosync: Arc, + addr: Multiaddr, +} + +/// Builds one host: a priority [`Prioritiser`] + [`Behaviour`] over an +/// in-process [`MemoryTransport`], wrapped by an infosync [`InfoSync`] +/// component. A capture subscriber is registered *after* infosync's own, so +/// receiving a capture message guarantees infosync's store is already updated. +#[allow(clippy::too_many_arguments)] +fn build_host( + seed: u8, + idx: usize, + peers: Vec, + consensus: Arc, + ct: &CancellationToken, + versions: Vec, + protocols: Vec, + proposals: Vec, + capture: mpsc::UnboundedSender, +) -> Host { + let key = generate_insecure_k1_key(seed); + let keypair = keypair_from_secret_key(key.clone()).expect("keypair"); + + let (prio, behaviour, expired) = new_component( + peers.clone(), + i64::try_from(peers.len()).expect("peer count fits i64"), + consensus, + Duration::from_secs(3600), + key, + FutureCalculator, + P2PContext::new(peers), + ct.clone(), + ) + .expect("new_component"); + let prio = Arc::new(prio); + + // infosync subscribes to the prioritiser inside `new`. + let infosync = Arc::new(InfoSync::new(prio.clone(), versions, protocols, proposals)); + + // Capture subscriber registered after infosync's (fan-out runs in + // registration order, so a capture message implies infosync is updated). + prio.subscribe(Box::new(move |duty, results| { + let _ = capture.send((idx, duty, results)); + Ok(()) + })); + + let swarm = SwarmBuilder::with_existing_identity(keypair) + .with_tokio() + .with_other_transport(|key| { + MemoryTransport::default() + .upgrade(Version::V1) + .authenticate(libp2p::noise::Config::new(key).expect("noise config")) + .multiplex(libp2p::yamux::Config::default()) + }) + .expect("transport") + .with_behaviour(|_key| behaviour) + .expect("behaviour") + .build(); + + prio.start(expired, ct.clone()); + + Host { + swarm, + infosync, + addr: memory_addr(seed), + } +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn three_host_infosync() { + const N: usize = 3; + const SEEDS: [u8; N] = [0, 1, 2]; + let slot = SlotNumber::new(99); + + // Identical inputs on every host: the supported versions, the aggregated + // protocol set, and the default proposal types (builder/synthetic disabled + // leaves the single `Full` fallback). + let versions = SUPPORTED.to_vec(); + let protocols = app_protocols(); + let proposals = vec![ProposalType::Full]; + + // Deterministic peer set from per-host seeds. + let keys: Vec<_> = SEEDS.into_iter().map(generate_insecure_k1_key).collect(); + let peers: Vec = keys + .iter() + .map(|k| peer_id_from_key(k.public_key()).expect("peer id")) + .collect(); + + let ct = CancellationToken::new(); + let consensus = Arc::new(TestConsensus::default()); + let (cap_tx, mut cap_rx) = mpsc::unbounded_channel::(); + + let mut hosts: Vec = Vec::with_capacity(N); + for (idx, &seed) in SEEDS.iter().enumerate() { + hosts.push(build_host( + seed, + idx, + peers.clone(), + consensus.clone(), + &ct, + versions.clone(), + protocols.clone(), + proposals.clone(), + cap_tx.clone(), + )); + } + drop(cap_tx); + + // Begin listening, then full-mesh dial. + for host in &mut hosts { + host.swarm.listen_on(host.addr.clone()).expect("listen"); + } + for host in &mut hosts { + loop { + if matches!( + host.swarm.select_next_some().await, + SwarmEvent::NewListenAddr { .. } + ) { + break; + } + } + } + let addrs: Vec = hosts.iter().map(|h| h.addr.clone()).collect(); + for (i, host) in hosts.iter_mut().enumerate() { + for (j, addr) in addrs.iter().enumerate() { + if i != j { + host.swarm.dial(addr.clone()).expect("dial"); + } + } + } + + // Wait until every host is connected to all peers before triggering, so the + // priority exchange reuses established connections rather than racing dials. + { + let mut connected: Vec> = vec![HashSet::new(); N]; + let mesh = async { + while connected.iter().any(|peers| peers.len() < N - 1) { + let next = hosts + .iter_mut() + .map(|h| h.swarm.select_next_some().boxed()) + .collect::>(); + let (event, idx, _) = select_all(next).await; + if let SwarmEvent::ConnectionEstablished { peer_id, .. } = event { + connected[idx].insert(peer_id); + } + } + }; + timeout(Duration::from_secs(30), mesh) + .await + .expect("full connection mesh within timeout"); + } + + // Drive each swarm in the background; keep the infosync handles. + let mut infosyncs: Vec> = Vec::with_capacity(N); + let mut drivers = Vec::with_capacity(N); + for host in hosts { + infosyncs.push(host.infosync); + let mut swarm = host.swarm; + drivers.push(tokio::spawn(async move { + loop { + let _ = swarm.select_next_some().await; + } + })); + } + + // Trigger infosync on every host for the same slot. `trigger` blocks until + // the duty deadline / cancellation, so it runs in the background while the + // decision is observed via the capture channel. + let mut triggers = Vec::with_capacity(N); + for isync in &infosyncs { + let isync = isync.clone(); + let ct = ct.clone(); + triggers.push(tokio::spawn(async move { isync.trigger(ct, slot).await })); + } + + // Expect one decided result per host (the single decision fans out to all). + let expected_versions: Vec = versions.iter().map(|v| v.to_string()).collect(); + let expected_proposals: Vec = proposals.iter().map(|p| p.as_str().to_owned()).collect(); + + let mut seen_hosts: HashSet = HashSet::new(); + for _ in 0..N { + let (idx, duty, results) = timeout(Duration::from_secs(30), cap_rx.recv()) + .await + .expect("decided result within timeout") + .expect("result delivered"); + + assert!(seen_hosts.insert(idx), "one decision per host"); + assert_eq!(duty.slot, slot, "decided duty is for the triggered slot"); + assert_eq!( + duty.duty_type, + DutyType::InfoSync, + "decided duty is the info-sync duty" + ); + assert_eq!( + results.len(), + 3, + "three topics: version, protocol, proposal" + ); + + for tr in &results { + let got = tr.priorities_only(); + match tr.topic.as_str() { + "version" => assert_eq!(got, expected_versions, "agreed versions"), + "protocol" => assert_eq!(got, protocols, "agreed protocols"), + "proposal" => assert_eq!(got, expected_proposals, "agreed proposals"), + other => panic!("unexpected topic: {other}"), + } + } + } + + // Every host's infosync recorded the agreed protocols and proposals. + for isync in &infosyncs { + assert_eq!(isync.protocols(slot), protocols, "infosync protocols"); + assert_eq!(isync.proposals(slot), proposals, "infosync proposals"); + } + + // Teardown. + ct.cancel(); + for t in triggers { + t.abort(); + } + for d in drivers { + d.abort(); + } +}