Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 31 additions & 102 deletions crates/consensus/src/consensus/application/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use std::{

use alloy_consensus::BlockHeader;
use alloy_primitives::{B256, Bytes};
use alloy_rpc_types_engine::PayloadId;
use commonware_codec::{Encode as _, EncodeSize as _, ReadExt as _};
use commonware_consensus::{
Heightable as _,
Expand All @@ -34,13 +33,9 @@ use prometheus_client::metrics::counter::Counter;

use commonware_utils::SystemTimeExt;
use eyre::{OptionExt as _, WrapErr as _, bail, ensure, eyre};
use futures::{
StreamExt as _,
channel::{mpsc, oneshot},
future::try_join,
};
use futures::{StreamExt as _, channel::mpsc, future::try_join};
use rand_08::{CryptoRng, Rng};
use reth_node_builder::{Block as _, ConsensusEngineHandle, PayloadKind};
use reth_node_builder::{Block as _, ConsensusEngineHandle};
use reth_primitives_traits::BlockBody as _;
use tempo_dkg_onchain_artifacts::OnchainDkgOutcome;
use tempo_node::{TempoExecutionData, TempoFullNode, TempoPayloadTypes};
Expand Down Expand Up @@ -72,12 +67,11 @@ pub(in crate::consensus) struct Actor<TContext, TState = Uninit> {
inner: Inner<TState>,
}

struct BuildProposalArgs<'a> {
struct BuildProposalArgs {
propose_start: Instant,
parent_view: View,
parent_digest: Digest,
round: Round,
payload_id_rx: &'a mut Option<oneshot::Receiver<eyre::Result<PayloadId>>>,
leader: PublicKey,
}

Expand Down Expand Up @@ -331,7 +325,6 @@ impl Inner<Init> {
} = request;

let proposal_digest = {
let mut payload_id_rx: Option<oneshot::Receiver<eyre::Result<PayloadId>>> = None;
let mut proposal = Box::pin(async {
// Follow the commonware marshal::standard::inline application:
//
Expand All @@ -357,45 +350,41 @@ impl Inner<Init> {
let already_verified = OptionFuture::some(self.marshal.get_verified(round));
futures::pin_mut!(already_verified);

let mut proposal = Box::pin(self.clone().build_proposal(
let mut proposal = Box::pin(self.clone().propose(
context.clone(),
BuildProposalArgs {
propose_start,
parent_view,
parent_digest,
round,
payload_id_rx: &mut payload_id_rx,
leader,
},
));

let (block, proposal_return) = tokio::select! {
let proposal_result = tokio::select! {
biased;

Some(block) = &mut already_verified => {
drop(proposal);
self.cancel_payload_build(&mut payload_id_rx).await;
debug!("skipping proposal: verified block already exists for round on restart");
(block, None)
Ok((block, None))
},

res = &mut proposal => {
let proposal = res.wrap_err("failed creating a proposal")?;

// Make sure that we get a response from the already_verified future before proposing.
if already_verified.is_none() {
proposal
} else {
if let Some(block) = already_verified.await {
debug!("skipping proposal: verified block already exists for round on restart");
(block, None)
} else {
proposal
}
}
res.wrap_err("failed creating a proposal")

@SuperFluffy SuperFluffy Jun 11, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix to not interfere with already verified blocks if for whatever reason the build failed.

},
};

// already_verified blocks are always preferred, even if
// building a block failed.
let (block, proposal_return) = if already_verified.is_some()
&& let Some(block) = already_verified.await
{
debug!("skipping proposal: verified block already exists for round on restart");
(block, None)
} else {
proposal_result?
};

let digest = block.digest();
if let Some(proposal_return) = proposal_return {
let persist_start = Instant::now();
Expand All @@ -416,9 +405,6 @@ impl Inner<Init> {

tokio::select! {
() = response.closed() => {
drop(proposal);
self.cancel_payload_build(&mut payload_id_rx).await;

return Err(eyre!(
"proposal return channel was closed by consensus \
engine before block could be proposed; aborting"
Expand Down Expand Up @@ -500,52 +486,16 @@ impl Inner<Init> {
Ok(())
}

async fn cancel_payload_build(
&self,
payload_id_rx: &mut Option<oneshot::Receiver<eyre::Result<PayloadId>>>,
) {
let Some(rx) = payload_id_rx.take() else {
return;
};

let payload_id = match rx.await {
Ok(Ok(payload_id)) => payload_id,
Ok(Err(error)) => {
warn!(%error, "payload build was not started before cancellation");
return;
}
Err(_) => {
warn!("executor dropped response before payload build could be cancelled");
return;
}
};

let fut = match self
.execution_node
.payload_builder_handle
.resolve_kind_fut(payload_id, PayloadKind::WaitForPending)
.await
{
Ok(fut) => fut,
Err(error) => {
warn!(%error, %payload_id, "failed resolving payload while cancelling build");
return;
}
};
drop(fut);
}

async fn build_proposal<TContext: Pacer>(
async fn propose<TContext: Pacer>(
self,
context: TContext,
args: BuildProposalArgs<'_>,
args: BuildProposalArgs,
) -> eyre::Result<(Block, Option<ProposalReturn>)> {
let BuildProposalArgs {
propose_start,
parent_view,
parent_digest,
round,
payload_id_rx,
leader,
} = args;

Expand Down Expand Up @@ -724,41 +674,20 @@ impl Inner<Init> {
.with_payload_build_budget(build_budget)
.with_validation_latency_estimate(validation_latency_estimate);

// Share the dispatch receiver with the cancel branch so that, if cancellation
// hits between dispatch send and receiving `payload_id`, the cancel branch can
// still drain the rx, learn `payload_id`, and cancel the now-registered job.
// Subscribe to the payload build. The executor owns the build job
// and runs it to completion; dropping the receiver (for example
// because the proposal was cancelled) tells it that the payload is
// no longer wanted.
let payload_build_start = Instant::now();
*payload_id_rx = Some(self.state.executor.canonicalize_and_build(
parent.height(),
parent.digest(),
attrs,
)?);

let payload_id = payload_id_rx
.as_mut()
.expect("just set")
.await
.wrap_err("executor dropped response")?
.wrap_err("failed requesting a new payload build")?;

// Replace the slot with a pre-filled oneshot so the cancel branch can keep
// unconditionally awaiting `payload_id_rx` and immediately get back `payload_id`.
let (tx, rx) = oneshot::channel();
let _ = tx.send(Ok(payload_id));
*payload_id_rx = Some(rx);

let payload = self
.execution_node
.payload_builder_handle
.resolve_kind(payload_id, reth_node_builder::PayloadKind::WaitForPending)
.pace(&context, Duration::from_millis(20))
.state
.executor
.canonicalize_and_build(parent.height(), parent.digest(), attrs)?
.await
// XXX: this returns Option<Result<_, _>>; drilling into
// resolve_kind this really seems to resolve to None if no
// payload_id was found.
.ok_or_eyre("no payload found under provided id")
.and_then(|rsp| rsp.map_err(Into::<eyre::Report>::into))
.wrap_err_with(|| format!("failed getting payload for payload ID `{payload_id}`"))?;
.wrap_err(
"executor dropped the payload channel: the build failed (the \
executor logs the cause) or the executor shut down",
)?;

let payload_build_elapsed = payload_build_start.elapsed();
let payload_validation_work_elapsed = payload.validation_work_duration();
Expand Down
Loading
Loading