diff --git a/crates/consensus/src/consensus/application/actor.rs b/crates/consensus/src/consensus/application/actor.rs index 48faee8d84..2c7d456df8 100644 --- a/crates/consensus/src/consensus/application/actor.rs +++ b/crates/consensus/src/consensus/application/actor.rs @@ -17,7 +17,6 @@ use std::{ use alloy_consensus::BlockHeader; use alloy_primitives::{B256, Bytes}; -use alloy_rpc_types_engine::PayloadId; use commonware_codec::{Encode as _, EncodeSize as _, ReadExt as _}; use commonware_consensus::{ Heightable as _, @@ -34,13 +33,9 @@ use prometheus_client::metrics::counter::Counter; use commonware_utils::SystemTimeExt; use eyre::{OptionExt as _, WrapErr as _, bail, ensure, eyre}; -use futures::{ - StreamExt as _, - channel::{mpsc, oneshot}, - future::try_join, -}; +use futures::{StreamExt as _, channel::mpsc, future::try_join}; use rand_08::{CryptoRng, Rng}; -use reth_node_builder::{Block as _, ConsensusEngineHandle, PayloadKind}; +use reth_node_builder::{Block as _, ConsensusEngineHandle}; use reth_primitives_traits::BlockBody as _; use tempo_dkg_onchain_artifacts::OnchainDkgOutcome; use tempo_node::{TempoExecutionData, TempoFullNode, TempoPayloadTypes}; @@ -72,12 +67,11 @@ pub(in crate::consensus) struct Actor { inner: Inner, } -struct BuildProposalArgs<'a> { +struct BuildProposalArgs { propose_start: Instant, parent_view: View, parent_digest: Digest, round: Round, - payload_id_rx: &'a mut Option>>, leader: PublicKey, } @@ -331,7 +325,6 @@ impl Inner { } = request; let proposal_digest = { - let mut payload_id_rx: Option>> = None; let mut proposal = Box::pin(async { // Follow the commonware marshal::standard::inline application: // @@ -357,45 +350,41 @@ impl Inner { let already_verified = OptionFuture::some(self.marshal.get_verified(round)); futures::pin_mut!(already_verified); - let mut proposal = Box::pin(self.clone().build_proposal( + let mut proposal = Box::pin(self.clone().propose( context.clone(), BuildProposalArgs { propose_start, parent_view, parent_digest, round, - payload_id_rx: &mut payload_id_rx, leader, }, )); - let (block, proposal_return) = tokio::select! { + let proposal_result = tokio::select! { biased; Some(block) = &mut already_verified => { - drop(proposal); - self.cancel_payload_build(&mut payload_id_rx).await; debug!("skipping proposal: verified block already exists for round on restart"); - (block, None) + Ok((block, None)) }, res = &mut proposal => { - let proposal = res.wrap_err("failed creating a proposal")?; - - // Make sure that we get a response from the already_verified future before proposing. - if already_verified.is_none() { - proposal - } else { - if let Some(block) = already_verified.await { - debug!("skipping proposal: verified block already exists for round on restart"); - (block, None) - } else { - proposal - } - } + res.wrap_err("failed creating a proposal") }, }; + // already_verified blocks are always preferred, even if + // building a block failed. + let (block, proposal_return) = if already_verified.is_some() + && let Some(block) = already_verified.await + { + debug!("skipping proposal: verified block already exists for round on restart"); + (block, None) + } else { + proposal_result? + }; + let digest = block.digest(); if let Some(proposal_return) = proposal_return { let persist_start = Instant::now(); @@ -416,9 +405,6 @@ impl Inner { tokio::select! { () = response.closed() => { - drop(proposal); - self.cancel_payload_build(&mut payload_id_rx).await; - return Err(eyre!( "proposal return channel was closed by consensus \ engine before block could be proposed; aborting" @@ -500,52 +486,16 @@ impl Inner { Ok(()) } - async fn cancel_payload_build( - &self, - payload_id_rx: &mut Option>>, - ) { - let Some(rx) = payload_id_rx.take() else { - return; - }; - - let payload_id = match rx.await { - Ok(Ok(payload_id)) => payload_id, - Ok(Err(error)) => { - warn!(%error, "payload build was not started before cancellation"); - return; - } - Err(_) => { - warn!("executor dropped response before payload build could be cancelled"); - return; - } - }; - - let fut = match self - .execution_node - .payload_builder_handle - .resolve_kind_fut(payload_id, PayloadKind::WaitForPending) - .await - { - Ok(fut) => fut, - Err(error) => { - warn!(%error, %payload_id, "failed resolving payload while cancelling build"); - return; - } - }; - drop(fut); - } - - async fn build_proposal( + async fn propose( self, context: TContext, - args: BuildProposalArgs<'_>, + args: BuildProposalArgs, ) -> eyre::Result<(Block, Option)> { let BuildProposalArgs { propose_start, parent_view, parent_digest, round, - payload_id_rx, leader, } = args; @@ -724,41 +674,20 @@ impl Inner { .with_payload_build_budget(build_budget) .with_validation_latency_estimate(validation_latency_estimate); - // Share the dispatch receiver with the cancel branch so that, if cancellation - // hits between dispatch send and receiving `payload_id`, the cancel branch can - // still drain the rx, learn `payload_id`, and cancel the now-registered job. + // Subscribe to the payload build. The executor owns the build job + // and runs it to completion; dropping the receiver (for example + // because the proposal was cancelled) tells it that the payload is + // no longer wanted. let payload_build_start = Instant::now(); - *payload_id_rx = Some(self.state.executor.canonicalize_and_build( - parent.height(), - parent.digest(), - attrs, - )?); - - let payload_id = payload_id_rx - .as_mut() - .expect("just set") - .await - .wrap_err("executor dropped response")? - .wrap_err("failed requesting a new payload build")?; - - // Replace the slot with a pre-filled oneshot so the cancel branch can keep - // unconditionally awaiting `payload_id_rx` and immediately get back `payload_id`. - let (tx, rx) = oneshot::channel(); - let _ = tx.send(Ok(payload_id)); - *payload_id_rx = Some(rx); - let payload = self - .execution_node - .payload_builder_handle - .resolve_kind(payload_id, reth_node_builder::PayloadKind::WaitForPending) - .pace(&context, Duration::from_millis(20)) + .state + .executor + .canonicalize_and_build(parent.height(), parent.digest(), attrs)? .await - // XXX: this returns Option>; drilling into - // resolve_kind this really seems to resolve to None if no - // payload_id was found. - .ok_or_eyre("no payload found under provided id") - .and_then(|rsp| rsp.map_err(Into::::into)) - .wrap_err_with(|| format!("failed getting payload for payload ID `{payload_id}`"))?; + .wrap_err( + "executor dropped the payload channel: the build failed (the \ + executor logs the cause) or the executor shut down", + )?; let payload_build_elapsed = payload_build_start.elapsed(); let payload_validation_work_elapsed = payload.validation_work_duration(); diff --git a/crates/consensus/src/executor/actor.rs b/crates/consensus/src/executor/actor.rs index 7063b5aa04..d4c579916e 100644 --- a/crates/consensus/src/executor/actor.rs +++ b/crates/consensus/src/executor/actor.rs @@ -13,7 +13,7 @@ use commonware_runtime::{ Clock, ContextCell, FutureExt, Handle, Metrics as RuntimeMetrics, Pacer, Spawner, spawn_cell, }; use commonware_utils::{Acknowledgement, acknowledgement::Exact}; -use eyre::{Report, WrapErr as _, ensure, eyre}; +use eyre::{Report, WrapErr as _, ensure}; use futures::{ FutureExt as _, StreamExt as _, channel::{ @@ -21,11 +21,13 @@ use futures::{ oneshot, }, future::BoxFuture, + stream::FuturesUnordered, }; use prometheus_client::metrics::counter::Counter; use reth_ethereum::{chainspec::EthChainSpec, rpc::eth::primitives::BlockNumHash}; +use reth_node_builder::PayloadKind; use tempo_node::{TempoExecutionData, TempoFullNode}; -use tempo_payload_types::TempoPayloadAttributes; +use tempo_payload_types::{TempoBuiltPayload, TempoPayloadAttributes}; use tokio::select; use tracing::{ Level, Span, debug, error, error_span, info, info_span, instrument, warn, warn_span, @@ -33,11 +35,10 @@ use tracing::{ use super::{ Config, - ingress::{CanonicalizeHead, Command, Message}, + ingress::{CanonicalizeAndBuild, CanonicalizeHead, Command, Message}, }; use crate::{ consensus::{Digest, block::Block}, - executor::ingress::CanonicalizeAndBuild, utils::OptionFuture, }; @@ -137,6 +138,14 @@ pub(crate) struct Actor { /// The single execution-layer request currently being driven in the background. execution_task: OptionFuture>, + /// Payload build jobs currently being driven to completion. + /// + /// Each job resolves a payload from the execution layer's payload builder + /// and delivers it to the subscriber that requested the build. If the + /// subscriber dropped its receiver in the meantime, the built payload is + /// discarded. + payload_jobs: FuturesUnordered>, + latest_observed_finalized_tip: Option<(Height, Digest)>, /// The node's ed25519 public key if the node is participating in @@ -219,6 +228,7 @@ where pending_backfill: OptionFuture::none(), execution_queue: VecDeque::new(), execution_task: OptionFuture::none(), + payload_jobs: FuturesUnordered::new(), latest_observed_finalized_tip: None, @@ -259,13 +269,23 @@ where task_result = &mut self.execution_task => { match task_result { - ExecutionTaskResult::Completed { canonicalized } => { + ExecutionTaskResult::Completed { canonicalized, payload_job } => { if let Some(canonicalized) = canonicalized { // There is only one execution task running at // a time, and `last_canonicalized` is only // mutated here to keep a consistent view. self.last_canonicalized = canonicalized; } + if let Some(job) = payload_job { + self.payload_jobs.push( + run_payload_job( + self.context.clone(), + self.execution_node.clone(), + job, + ) + .boxed(), + ); + } } ExecutionTaskResult::Fatal { error } => { error_span!("shutdown").in_scope(|| error!( @@ -302,6 +322,8 @@ where } } + Some(()) = self.payload_jobs.next() => {} + msg = self.mailbox.next() => { let Some(msg) = msg else { break; }; if let Err(error) = self.handle_message(msg) { @@ -355,37 +377,47 @@ where let cause = message.cause; let is_backfilling = self.is_backfilling(); match message.command { - Command::CanonicalizeHead(request) => { + Command::CanonicalizeHead(CanonicalizeHead { + height, + digest, + response, + }) => { if is_backfilling { info_span!("handle_message") .in_scope(|| info!("request to canonicalize deferred while backfilling")); } - let CanonicalizeHead { - height, - digest, - response, - } = request; - self.enqueue_execution_request(ExecutionRequest::CanonicalizeHead(Box::new( - ForkchoiceUpdateTask { + self.enqueue_execution_request(ExecutionRequest::Canonicalize(Box::new( + Canonicalize { cause, head_or_finalized: HeadOrFinalized::Head, height, digest, - attrs: None, - response: ForkchoiceUpdateResponse::Canonicalize { response }, + fcu_response: Some(response), + build_attributes: None, }, ))); } - Command::CanonicalizeAndBuild(request) => { + Command::CanonicalizeAndBuild(CanonicalizeAndBuild { + height, + digest, + attributes, + response, + }) => { if is_backfilling { info_span!("handle_message").in_scope(|| { info!("request to canonicalize and build deferred while backfilling") }); } - self.enqueue_execution_request(ExecutionRequest::CanonicalizeAndBuild { - cause, - request, - }); + self.enqueue_execution_request(ExecutionRequest::Canonicalize(Box::new( + Canonicalize { + cause, + head_or_finalized: HeadOrFinalized::Head, + height, + digest, + fcu_response: None, + build_attributes: Some((*attributes, response)), + }, + ))); } Command::Finalize(finalized) => match *finalized { Update::Tip(_, height, digest) => { @@ -441,16 +473,14 @@ where && new_canonicalized != self.last_canonicalized { self.execution_queue - .push_back(ExecutionRequest::CanonicalizeHead(Box::new( - ForkchoiceUpdateTask { - cause: Span::current(), - head_or_finalized: HeadOrFinalized::Finalized, - height, - digest, - attrs: None, - response: ForkchoiceUpdateResponse::None, - }, - ))); + .push_back(ExecutionRequest::Canonicalize(Box::new(Canonicalize { + cause: Span::current(), + head_or_finalized: HeadOrFinalized::Finalized, + height, + digest, + fcu_response: None, + build_attributes: None, + }))); } let Some(request) = self.execution_queue.front() else { @@ -478,14 +508,8 @@ where } enum ExecutionRequest { - Heartbeat { - cause: Span, - }, - CanonicalizeHead(Box), - CanonicalizeAndBuild { - cause: Span, - request: CanonicalizeAndBuild, - }, + Heartbeat { cause: Span }, + Canonicalize(Box), FinalizeBlock(Box), } @@ -498,60 +522,6 @@ impl ExecutionRequest { } } -enum ForkchoiceUpdateResponse { - None, - Canonicalize { - response: oneshot::Sender>, - }, - Build { - response: oneshot::Sender>, - }, -} - -impl ForkchoiceUpdateResponse { - fn send_ok_without_payload_id(self) { - match self { - Self::None => {} - Self::Canonicalize { response } => { - let _ = response.send(Ok(())); - } - Self::Build { response } => { - let _ = response.send(Err(eyre!("no payload id for the build request"))); - } - } - } - - fn send_payload_id(self, payload_id: Option) { - match self { - Self::None => {} - Self::Canonicalize { response } => { - let _ = response.send(Ok(())); - } - Self::Build { response } => { - if let Some(payload_id) = payload_id { - let _ = response.send(Ok(payload_id)); - } else { - let _ = response.send(Err(eyre!("no payload id for the build request"))); - } - } - } - } - - fn send_error(self, error: eyre::Report) { - match self { - Self::None => { - warn!(%error, "queued forkchoice update failed"); - } - Self::Canonicalize { response } => { - let _ = response.send(Err(error)); - } - Self::Build { response } => { - let _ = response.send(Err(error)); - } - } - } -} - struct FinalizedBlockRequest { cause: Span, block: Block, @@ -568,19 +538,34 @@ enum ForkchoiceUpdateKind { enum ExecutionTaskResult { Completed { canonicalized: Option, + /// A payload build that the forkchoice update kicked off on the + /// execution layer and that still needs to be driven to completion. + payload_job: Option, }, Fatal { error: Report, }, } -struct ForkchoiceUpdateTask { +struct Canonicalize { cause: Span, head_or_finalized: HeadOrFinalized, height: Height, digest: Digest, - attrs: Option, - response: ForkchoiceUpdateResponse, + /// Acknowledges to the requester that the execution layer accepted the + /// forkchoice update. + fcu_response: Option>, + /// Payload attributes to register a build job with the forkchoice + /// update, paired with the subscriber awaiting the built payload. + build_attributes: Option<(TempoPayloadAttributes, oneshot::Sender)>, +} + +/// A payload build registered on the execution layer whose result still needs +/// to be delivered to the subscriber that requested it. +struct StartPayloadJob { + cause: Span, + payload_id: PayloadId, + response: oneshot::Sender, } async fn execute_request( @@ -610,35 +595,16 @@ where } ExecutionTaskResult::Completed { canonicalized: None, + payload_job: None, } } - ExecutionRequest::CanonicalizeHead(request) => { - let canonicalized = - run_forkchoice_update_task(&context, execution_node, canonicalized, *request).await; - ExecutionTaskResult::Completed { canonicalized } - } - ExecutionRequest::CanonicalizeAndBuild { cause, request } => { - let CanonicalizeAndBuild { - height, - digest, - attributes, - response, - } = request; - let canonicalized = run_forkchoice_update_task( - &context, - execution_node, + ExecutionRequest::Canonicalize(request) => { + let (canonicalized, payload_job) = + run_canonicalize_task(&context, execution_node, canonicalized, *request).await; + ExecutionTaskResult::Completed { canonicalized, - ForkchoiceUpdateTask { - cause, - head_or_finalized: HeadOrFinalized::Head, - height, - digest, - attrs: Some(*attributes), - response: ForkchoiceUpdateResponse::Build { response }, - }, - ) - .await; - ExecutionTaskResult::Completed { canonicalized } + payload_job, + } } ExecutionRequest::FinalizeBlock(request) => { let fatal_on_error = !request.is_backfill; @@ -652,12 +618,16 @@ where ) .await { - Ok(canonicalized) => ExecutionTaskResult::Completed { canonicalized }, + Ok(canonicalized) => ExecutionTaskResult::Completed { + canonicalized, + payload_job: None, + }, Err(error) if fatal_on_error => ExecutionTaskResult::Fatal { error }, Err(error) => { warn!(%error, "failed forwarding backfilled finalized block to execution layer"); ExecutionTaskResult::Completed { canonicalized: None, + payload_job: None, } } } @@ -667,56 +637,146 @@ where #[instrument( skip_all, - parent = &request.cause, + parent = &cause, fields( - head.height = %request.height, - head.digest = %request.digest, - head_or_finalized = %request.head_or_finalized, + %height, + %digest, + head_or_finalized = %head_or_finalized, ), )] -async fn run_forkchoice_update_task( +async fn run_canonicalize_task( context: &TContext, execution_node: Arc, canonicalized: LastCanonicalized, - request: ForkchoiceUpdateTask, -) -> Option { - let ForkchoiceUpdateTask { + Canonicalize { cause, head_or_finalized, height, digest, - attrs, - response, - } = request; - + fcu_response, + mut build_attributes, + }: Canonicalize, +) -> (Option, Option) { let new_canonicalized = match head_or_finalized { HeadOrFinalized::Head => canonicalized.update_head(height, digest), HeadOrFinalized::Finalized => canonicalized.update_finalized(height, digest), }; - if new_canonicalized == canonicalized && attrs.is_none() { - debug!("would not change forkchoice state; not sending it to the execution layer"); - response.send_ok_without_payload_id(); - return None; + if build_attributes + .as_ref() + .is_some_and(|(_, response)| response.is_canceled()) + { + info!("dropping payload build request: the subscriber went away while it was queued"); + build_attributes.take(); + } + + // Only build on top of the most recent head. If the requested parent + // could not be made the head (because a block above it was already + // finalized), the build is stale, and submitting its attributes anyway + // would register a build on top of the wrong block. Taking the + // attributes drops the response channel, which signals the failure to + // the subscriber. + if build_attributes.is_some() && new_canonicalized.forkchoice.head_block_hash != digest.0 { + info!("dropping payload build request: its parent cannot be made the head"); + build_attributes.take(); } + let (attributes, payload_response) = build_attributes.unzip(); + + // The forkchoice update is submitted even if it would not change the + // forkchoice state: the execution layer treats it as a no-op (the FCU + // heartbeat relies on this). match submit_forkchoice_update( &execution_node, context, - cause, + cause.clone(), new_canonicalized, - attrs, + attributes, ForkchoiceUpdateKind::Canonicalize { head_or_finalized }, ) .await { Ok(payload_id) => { - response.send_payload_id(payload_id); - Some(new_canonicalized) + if let Some(response) = fcu_response { + let _ = response.send(()); + } + let payload_job = match (payload_response, payload_id) { + (Some(response), Some(payload_id)) => Some(StartPayloadJob { + cause, + payload_id, + response, + }), + (Some(_dropped_to_signal_failure), None) => { + warn!("execution layer did not return a payload id for the build request"); + None + } + (None, _) => None, + }; + (Some(new_canonicalized), payload_job) } Err(error) => { - response.send_error(error); - None + // Dropping the response channels signals the failure to the + // subscribers; the cause is only logged here. + warn!(%error, "forkchoice update failed"); + (None, None) + } + } +} + +/// Drives a payload build on the execution layer to completion. +/// +/// Resolves the payload registered under `payload_id` from the execution +/// layer's payload builder and delivers it on `response`. If the subscriber +/// goes away before the payload is resolved (for example because the +/// consensus engine cancelled the proposal request that triggered the +/// build), the in-flight resolve future is dropped, which deregisters the +/// build job from the payload builder and aborts the build. +#[instrument( + skip_all, + parent = &cause, + fields(%payload_id), +)] +async fn run_payload_job( + context: TContext, + execution_node: Arc, + StartPayloadJob { + cause, + payload_id, + mut response, + }: StartPayloadJob, +) { + let payload = select! { + payload = execution_node + .payload_builder_handle + .resolve_kind(payload_id, PayloadKind::WaitForPending) + .pace(&context, Duration::from_millis(20)) + => payload, + + // Drops the in-flight payload-resolution, killing payload build. + () = response.cancellation() => { + info!("payload subscriber went away before the payload was resolved; killing the payload build"); + return; + } + }; + + // In the failure branches, dropping the response channel signals the + // failure to the subscriber; the cause is only logged here. + match payload { + Some(Ok(payload)) => { + if response.send(payload).is_err() { + info!( + "payload subscriber went away before the payload could be delivered; discarding it" + ); + } + } + Some(Err(error)) => { + warn!( + error = %eyre::Report::new(error), + "payload build job failed", + ); + } + None => { + warn!("no payload build job found under the payload ID"); } } } @@ -740,11 +800,6 @@ async fn submit_forkchoice_update( attrs: Option, kind: ForkchoiceUpdateKind, ) -> eyre::Result> { - match kind { - ForkchoiceUpdateKind::Heartbeat => info!("sending FCU"), - ForkchoiceUpdateKind::Canonicalize { .. } => info!("sending forkchoice-update"), - } - let fcu_response = execution_node .add_ons_handle .beacon_engine_handle diff --git a/crates/consensus/src/executor/ingress.rs b/crates/consensus/src/executor/ingress.rs index ff35f2d2b2..f295c18e56 100644 --- a/crates/consensus/src/executor/ingress.rs +++ b/crates/consensus/src/executor/ingress.rs @@ -1,11 +1,10 @@ -use alloy_rpc_types_engine::PayloadId; use commonware_consensus::{Reporter, marshal::Update, types::Height}; use eyre::WrapErr as _; use futures::{ SinkExt as _, channel::{mpsc, oneshot}, }; -use tempo_payload_types::TempoPayloadAttributes; +use tempo_payload_types::{TempoBuiltPayload, TempoPayloadAttributes}; use tracing::Span; use crate::consensus::{Digest, block::Block}; @@ -30,18 +29,27 @@ impl Mailbox { response, })) .wrap_err("failed sending canonicalize request to agent, this means it exited")?; - rx.await - .wrap_err("executor dropped response") - .and_then(|res| res) + rx.await.wrap_err( + "executor dropped the response channel: the forkchoice update \ + failed (the executor logs the cause) or the executor shut down", + ) } /// Canonicalizes the given head and requests a new payload to be built. + /// + /// The built payload is delivered on the returned channel once the + /// execution layer finishes constructing it. The receiver may be dropped + /// to signal that the payload is no longer wanted; the executor still + /// runs the build to completion and then discards the payload. + /// + /// Conversely, the executor dropping its sender means the build failed; + /// the executor logs the cause. pub(crate) fn canonicalize_and_build( &self, height: Height, digest: Digest, attributes: TempoPayloadAttributes, - ) -> eyre::Result>> { + ) -> eyre::Result> { let (response, rx) = oneshot::channel(); self.inner .unbounded_send(Message::in_current_span(CanonicalizeAndBuild { @@ -86,7 +94,7 @@ pub(super) enum Command { pub(super) struct CanonicalizeHead { pub(super) height: Height, pub(super) digest: Digest, - pub(super) response: oneshot::Sender>, + pub(super) response: oneshot::Sender<()>, } #[derive(Debug)] @@ -94,7 +102,7 @@ pub(super) struct CanonicalizeAndBuild { pub(super) height: Height, pub(super) digest: Digest, pub(super) attributes: Box, - pub(super) response: oneshot::Sender>, + pub(super) response: oneshot::Sender, } impl From for Command {