diff --git a/Cargo.lock b/Cargo.lock index fc7c61ac..c51828c3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1777,8 +1777,10 @@ dependencies = [ "ethereum_ssz_derive 0.10.3", "eyre", "futures", + "headers-accept", "jsonwebtoken", "lazy_static", + "mediatype 0.20.0", "notify", "pbkdf2", "rand 0.9.4", @@ -1828,8 +1830,11 @@ dependencies = [ "axum-extra", "cb-common", "cb-metrics", + "ethereum_serde_utils 0.7.0", + "ethereum_ssz 0.10.3", "eyre", "futures", + "headers", "lazy_static", "notify", "parking_lot", @@ -1837,10 +1842,12 @@ dependencies = [ "reqwest 0.13.2", "serde", "serde_json", + "thiserror 2.0.18", "tokio", "tower-http", "tracing", "tree_hash 0.12.1", + "types", "url", "uuid 1.23.1", ] @@ -2903,7 +2910,7 @@ dependencies = [ "ethereum_ssz_derive 0.10.3", "futures", "futures-util", - "mediatype", + "mediatype 0.19.20", "pretty_reqwest_error", "reqwest 0.12.28", "reqwest-eventsource 0.6.0", @@ -3534,6 +3541,17 @@ dependencies = [ "sha1", ] +[[package]] +name = "headers-accept" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "479bcb872e714e11f72fcc6a71afadbc86d0dbe887bc44252b04cfbc63272897" +dependencies = [ + "headers-core", + "http 1.4.0", + "mediatype 0.20.0", +] + [[package]] name = "headers-core" version = "0.3.0" @@ -4373,6 +4391,12 @@ version = "0.19.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "33746aadcb41349ec291e7f2f0a3aa6834d1d7c58066fb4b01f68efc4c4b7631" +[[package]] +name = "mediatype" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f490ea2ae935dd8ac89c472d4df28c7f6b87cc20767e1b21fd5ed6a16e7f61e4" + [[package]] name = "memchr" version = "2.8.0" diff --git a/Cargo.toml b/Cargo.toml index 0adabb6c..170cb38a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -49,9 +49,11 @@ ethereum_ssz_derive = "0.10" eyre = "0.6.12" futures = "0.3.30" headers = "0.4.0" +headers-accept = "0.2.1" indexmap = "2.2.6" jsonwebtoken = { version = "9.3.1", default-features = false } lazy_static = "1.5.0" +mediatype = "0.20.0" lh_eth2 = { package = "eth2", git = "https://github.com/sigp/lighthouse", tag = "v8.1.3", features = ["events"] } lh_eth2_keystore = { package = "eth2_keystore", git = "https://github.com/sigp/lighthouse", tag = "v8.1.3" } lh_bls = { package = "bls", git = "https://github.com/sigp/lighthouse", tag = "v8.1.3" } diff --git a/crates/common/Cargo.toml b/crates/common/Cargo.toml index 9c335cb6..eb87dd94 100644 --- a/crates/common/Cargo.toml +++ b/crates/common/Cargo.toml @@ -26,12 +26,14 @@ ethereum_ssz.workspace = true ethereum_ssz_derive.workspace = true eyre.workspace = true futures.workspace = true +headers-accept.workspace = true jsonwebtoken.workspace = true lazy_static.workspace = true lh_bls.workspace = true lh_eth2.workspace = true lh_eth2_keystore.workspace = true lh_types.workspace = true +mediatype.workspace = true notify.workspace = true pbkdf2.workspace = true rand.workspace = true diff --git a/crates/common/src/config/utils.rs b/crates/common/src/config/utils.rs index 579825b6..23ab4c51 100644 --- a/crates/common/src/config/utils.rs +++ b/crates/common/src/config/utils.rs @@ -9,7 +9,7 @@ use serde::de::DeserializeOwned; use crate::{ config::{ADMIN_JWT_ENV, JWTS_ENV, MUXER_HTTP_MAX_LENGTH}, types::{BlsPublicKey, ModuleId}, - utils::read_chunked_body_with_max, + wire::read_chunked_body_with_max, }; pub fn load_env_var(env: &str) -> Result { diff --git a/crates/common/src/lib.rs b/crates/common/src/lib.rs index 462dcec1..9ce2667c 100644 --- a/crates/common/src/lib.rs +++ b/crates/common/src/lib.rs @@ -7,7 +7,9 @@ pub mod interop; pub mod pbs; pub mod signature; pub mod signer; +pub mod ssz; pub mod types; pub mod utils; +pub mod wire; pub const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(12); diff --git a/crates/common/src/pbs/error.rs b/crates/common/src/pbs/error.rs index 77d942cd..dfc614e1 100644 --- a/crates/common/src/pbs/error.rs +++ b/crates/common/src/pbs/error.rs @@ -1,7 +1,8 @@ use alloy::primitives::{B256, U256}; +use lh_types::ForkName; use thiserror::Error; -use crate::{types::BlsPublicKeyBytes, utils::ResponseReadError}; +use crate::{types::BlsPublicKeyBytes, wire::ResponseReadError}; #[derive(Debug, Error)] pub enum PbsError { @@ -28,6 +29,9 @@ pub enum PbsError { #[error("tokio join error: {0}")] TokioJoinError(#[from] tokio::task::JoinError), + + #[error("SSZ error: {0}")] + SszError(#[from] SszValueError), } impl PbsError { @@ -107,3 +111,12 @@ pub enum ValidationError { #[error("unsupported fork")] UnsupportedFork, } + +#[derive(Debug, Error, PartialEq, Eq)] +pub enum SszValueError { + #[error("invalid payload length: required {required} but payload was {actual}")] + InvalidPayloadLength { required: usize, actual: usize }, + + #[error("unsupported fork: {name}")] + UnsupportedFork { name: ForkName }, +} diff --git a/crates/common/src/pbs/mod.rs b/crates/common/src/pbs/mod.rs index af2c07b4..a1152b58 100644 --- a/crates/common/src/pbs/mod.rs +++ b/crates/common/src/pbs/mod.rs @@ -6,5 +6,6 @@ mod types; pub use builder::*; pub use constants::*; +pub use lh_types::ForkVersionDecode; pub use relay::*; pub use types::*; diff --git a/crates/common/src/pbs/types/mod.rs b/crates/common/src/pbs/types/mod.rs index b79f8f01..738221a8 100644 --- a/crates/common/src/pbs/types/mod.rs +++ b/crates/common/src/pbs/types/mod.rs @@ -1,5 +1,5 @@ use alloy::primitives::{B256, U256, b256}; -use lh_eth2::ForkVersionedResponse; +pub use lh_eth2::ForkVersionedResponse; pub use lh_types::ForkName; use lh_types::{BlindedPayload, ExecPayload, MainnetEthSpec}; use serde::{Deserialize, Serialize}; @@ -26,6 +26,10 @@ pub type PayloadAndBlobs = lh_eth2::types::ExecutionPayloadAndBlobs; pub type ExecutionPayloadHeader = lh_types::ExecutionPayloadHeader; +pub type ExecutionPayloadHeaderBellatrix = + lh_types::ExecutionPayloadHeaderBellatrix; +pub type ExecutionPayloadHeaderCapella = lh_types::ExecutionPayloadHeaderCapella; +pub type ExecutionPayloadHeaderDeneb = lh_types::ExecutionPayloadHeaderDeneb; pub type ExecutionPayloadHeaderElectra = lh_types::ExecutionPayloadHeaderElectra; pub type ExecutionPayloadHeaderFulu = lh_types::ExecutionPayloadHeaderFulu; pub type ExecutionPayloadHeaderRef<'a> = lh_types::ExecutionPayloadHeaderRef<'a, MainnetEthSpec>; @@ -34,7 +38,11 @@ pub type ExecutionPayloadElectra = lh_types::ExecutionPayloadElectra; pub type SignedBuilderBid = lh_types::SignedBuilderBid; pub type BuilderBid = lh_types::BuilderBid; +pub type BuilderBidBellatrix = lh_types::BuilderBidBellatrix; +pub type BuilderBidCapella = lh_types::BuilderBidCapella; +pub type BuilderBidDeneb = lh_types::BuilderBidDeneb; pub type BuilderBidElectra = lh_types::BuilderBidElectra; +pub type BuilderBidFulu = lh_types::BuilderBidFulu; /// Response object of GET /// `/eth/v1/builder/header/{slot}/{parent_hash}/{pubkey}` diff --git a/crates/common/src/ssz.rs b/crates/common/src/ssz.rs new file mode 100644 index 00000000..5fe9ebe2 --- /dev/null +++ b/crates/common/src/ssz.rs @@ -0,0 +1,139 @@ +use alloy::primitives::U256; +use lh_bls::Signature; +use lh_types::ForkName; +use ssz::{BYTES_PER_LENGTH_OFFSET, Decode, Encode}; + +use crate::pbs::{ + BuilderBidFulu, ExecutionPayloadHeaderFulu, ExecutionRequests, KzgCommitments, + error::SszValueError, +}; + +/// Test that SSZ encoding and decoding round-trips, returning the decoded +/// struct. +pub fn test_encode_decode_ssz(d: &[u8]) -> T { + let decoded = T::from_ssz_bytes(d).expect("deserialize"); + let encoded = T::as_ssz_bytes(&decoded); + assert_eq!(encoded, d); + decoded +} + +// Get the offset of the message in a SignedBuilderBid SSZ structure +fn get_ssz_value_offset_for_fork(fork: ForkName) -> Result { + match fork { + ForkName::Fulu => { + // Message goes header -> blob_kzg_commitments -> execution_requests -> value -> + // pubkey + Ok(get_message_offset::() + + ::ssz_fixed_len() + + ::ssz_fixed_len() + + ::ssz_fixed_len()) + } + + _ => Err(SszValueError::UnsupportedFork { name: fork }), + } +} + +/// Extracts the bid value from SSZ-encoded SignedBuilderBid response bytes. +pub fn get_bid_value_from_signed_builder_bid_ssz( + response_bytes: &[u8], + fork: ForkName, +) -> Result { + let value_offset = get_ssz_value_offset_for_fork(fork)?; + + // Sanity check the response length so we don't panic trying to slice it + let end_offset = value_offset + 32; // U256 is 32 bytes + if response_bytes.len() < end_offset { + return Err(SszValueError::InvalidPayloadLength { + required: end_offset, + actual: response_bytes.len(), + }); + } + + // Extract the value bytes and convert to U256 + let value_bytes = &response_bytes[value_offset..end_offset]; + let value = U256::from_le_slice(value_bytes); + Ok(value) +} + +// Get the offset where the `message` field starts in some SignedBuilderBid SSZ +// data. Requires that SignedBuilderBid always has the following structure: +// message -> signature +// where `message` is a BuilderBid type determined by the fork choice, and +// `signature` is a fixed-length Signature type. +fn get_message_offset() -> usize +where + BuilderBidType: ssz::Encode, +{ + // Since `message` is the first field, its offset is always 0 + let mut offset = 0; + + // If it's variable length, then it will be represented by a pointer to + // the actual data, so we need to get the location of where that data starts + if !BuilderBidType::is_ssz_fixed_len() { + offset += BYTES_PER_LENGTH_OFFSET + ::ssz_fixed_len(); + } + + offset +} + +#[cfg(test)] +mod test { + use alloy::primitives::U256; + use lh_types::ForkName; + use ssz::Encode; + + use super::get_bid_value_from_signed_builder_bid_ssz; + use crate::{ + pbs::{ + BuilderBid, BuilderBidFulu, ExecutionPayloadHeaderFulu, ExecutionRequests, + SignedBuilderBid, error::SszValueError, + }, + types::{BlsPublicKeyBytes, BlsSignature}, + utils::TestRandomSeed, + }; + + #[test] + fn test_ssz_value_extraction_unsupported_fork() { + let dummy_bytes = vec![0u8; 1000]; + let err = + get_bid_value_from_signed_builder_bid_ssz(&dummy_bytes, ForkName::Altair).unwrap_err(); + assert!(matches!(err, SszValueError::UnsupportedFork { .. })); + } + + #[test] + fn test_ssz_value_extraction_truncated_payload() { + // A payload that is far too short for any supported fork's value offset + let tiny_bytes = vec![0u8; 4]; + let err = + get_bid_value_from_signed_builder_bid_ssz(&tiny_bytes, ForkName::Fulu).unwrap_err(); + assert!(matches!(err, SszValueError::InvalidPayloadLength { .. })); + } + + /// Per-fork positive tests: construct a `SignedBuilderBid` with a known + /// value for each supported fork, SSZ-encode it, and verify + /// `get_bid_value_from_signed_builder_bid_ssz` round-trips correctly. + #[test] + fn test_ssz_value_extraction_with_known_bid() { + // Distinctive value — large enough that endianness bugs produce a + // different number and zero-matches are impossible. + let known_value = U256::from(0x0102_0304_0506_0708_u64); + let pubkey = BlsPublicKeyBytes::test_random(); + let sig = BlsSignature::test_random(); + + // ── Fulu ───────────────────────────────────────────────────────────── + { + let message = BuilderBid::Fulu(BuilderBidFulu { + header: ExecutionPayloadHeaderFulu::test_random(), + blob_kzg_commitments: Default::default(), + execution_requests: ExecutionRequests::default(), + value: known_value, + pubkey, + }); + let bid = SignedBuilderBid { message, signature: sig }; + let got = + get_bid_value_from_signed_builder_bid_ssz(&bid.as_ssz_bytes(), ForkName::Fulu) + .expect("Fulu extraction failed"); + assert_eq!(got, known_value, "Fulu: value mismatch"); + } + } +} diff --git a/crates/common/src/utils.rs b/crates/common/src/utils.rs index e504e477..c0b53b4c 100644 --- a/crates/common/src/utils.rs +++ b/crates/common/src/utils.rs @@ -1,5 +1,3 @@ -#[cfg(feature = "testing-flags")] -use std::cell::Cell; use std::{ net::Ipv4Addr, time::{SystemTime, UNIX_EPOCH}, @@ -9,15 +7,10 @@ use alloy::{ hex, primitives::{U256, keccak256}, }; -use axum::http::HeaderValue; -use futures::StreamExt; use lh_types::test_utils::{SeedableRng, TestRandom, XorShiftRng}; use rand::{Rng, distr::Alphanumeric}; -use reqwest::{Response, header::HeaderMap}; use serde::{Serialize, de::DeserializeOwned}; use serde_json::Value; -use ssz::{Decode, Encode}; -use thiserror::Error; use tracing::Level; use tracing_appender::{non_blocking::WorkerGuard, rolling::Rotation}; use tracing_subscriber::{ @@ -29,90 +22,11 @@ use tracing_subscriber::{ use crate::{ config::LogsSettings, constants::SIGNER_JWT_EXPIRATION, - pbs::HEADER_VERSION_VALUE, types::{BlsPublicKey, Chain, Jwt, JwtAdminClaims, JwtClaims, ModuleId}, }; const MILLIS_PER_SECOND: u64 = 1_000; -#[derive(Debug, Error)] -pub enum ResponseReadError { - #[error( - "response size exceeds max size; max: {max}, content_length: {content_length}, raw: {raw}" - )] - PayloadTooLarge { max: usize, content_length: usize, raw: String }, - - #[error("error reading response stream: {0}")] - ReqwestError(#[from] reqwest::Error), -} - -#[cfg(feature = "testing-flags")] -thread_local! { - static IGNORE_CONTENT_LENGTH: Cell = const { Cell::new(false) }; -} - -#[cfg(feature = "testing-flags")] -pub fn set_ignore_content_length(val: bool) { - IGNORE_CONTENT_LENGTH.with(|f| f.set(val)); -} - -#[cfg(feature = "testing-flags")] -#[allow(dead_code)] -fn should_ignore_content_length() -> bool { - IGNORE_CONTENT_LENGTH.with(|f| f.get()) -} - -/// Reads the body of a response as a chunked stream, ensuring the size does not -/// exceed `max_size`. -pub async fn read_chunked_body_with_max( - res: Response, - max_size: usize, -) -> Result, ResponseReadError> { - // Get the content length from the response headers - #[cfg(not(feature = "testing-flags"))] - let content_length = res.content_length(); - - #[cfg(feature = "testing-flags")] - let mut content_length = res.content_length(); - - #[cfg(feature = "testing-flags")] - if should_ignore_content_length() { - // Used for testing purposes to ignore content length - content_length = None; - } - - // Break if content length is provided but it's too big - if let Some(length) = content_length && - length as usize > max_size - { - return Err(ResponseReadError::PayloadTooLarge { - max: max_size, - content_length: length as usize, - raw: String::new(), // raw content is not available here - }); - } - - let mut stream = res.bytes_stream(); - let mut response_bytes = Vec::new(); - - while let Some(chunk) = stream.next().await { - let chunk = chunk?; - if response_bytes.len() + chunk.len() > max_size { - // avoid spamming logs if the message is too large - response_bytes.truncate(1024); - return Err(ResponseReadError::PayloadTooLarge { - max: max_size, - content_length: content_length.unwrap_or(0) as usize, - raw: String::from_utf8_lossy(&response_bytes).into_owned(), - }); - } - - response_bytes.extend_from_slice(&chunk); - } - - Ok(response_bytes) -} - pub fn timestamp_of_slot_start_sec(slot: u64, chain: Chain) -> u64 { chain.genesis_time_sec() + slot * chain.slot_time_sec() } @@ -166,15 +80,6 @@ pub fn test_encode_decode(d: &str) -> T { decoded } -pub fn test_encode_decode_ssz(d: &[u8]) -> T { - let decoded = T::from_ssz_bytes(d).expect("deserialize"); - let encoded = T::as_ssz_bytes(&decoded); - - assert_eq!(encoded, d); - - decoded -} - pub mod as_eth_str { use alloy::primitives::{ U256, @@ -502,21 +407,6 @@ pub fn random_jwt_secret() -> String { rand::rng().sample_iter(&Alphanumeric).take(32).map(char::from).collect() } -/// Returns the user agent from the request headers or an empty string if not -/// present -pub fn get_user_agent(req_headers: &HeaderMap) -> String { - req_headers - .get(reqwest::header::USER_AGENT) - .and_then(|ua| ua.to_str().ok().map(|s| s.to_string())) - .unwrap_or_default() -} - -/// Adds the commit boost version to the existing user agent -pub fn get_user_agent_with_version(req_headers: &HeaderMap) -> eyre::Result { - let ua = get_user_agent(req_headers); - Ok(HeaderValue::from_str(&format!("commit-boost/{HEADER_VERSION_VALUE} {ua}"))?) -} - #[cfg(unix)] pub async fn wait_for_signal() -> eyre::Result<()> { use tokio::signal::unix::{SignalKind, signal}; diff --git a/crates/common/src/wire.rs b/crates/common/src/wire.rs new file mode 100644 index 00000000..a8916a00 --- /dev/null +++ b/crates/common/src/wire.rs @@ -0,0 +1,916 @@ +#[cfg(feature = "testing-flags")] +use std::cell::Cell; +use std::str::FromStr; + +use axum::http::HeaderValue; +use bytes::Bytes; +use futures::StreamExt; +use headers_accept::Accept; +use lh_types::{BeaconBlock, ForkName}; +use mediatype::{MediaType, ReadParams}; +use reqwest::{ + Response, + header::{ACCEPT, CONTENT_TYPE, HeaderMap}, +}; +use thiserror::Error; + +use crate::pbs::{HEADER_VERSION_VALUE, SignedBlindedBeaconBlock}; + +pub const APPLICATION_JSON: &str = "application/json"; +pub const APPLICATION_OCTET_STREAM: &str = "application/octet-stream"; +pub const WILDCARD: &str = "*/*"; +pub const CONSENSUS_VERSION_HEADER: &str = "Eth-Consensus-Version"; + +#[derive(Debug, Error)] +pub enum ResponseReadError { + #[error( + "response size exceeds max size; max: {max}, content_length: {content_length}, raw: {raw}" + )] + PayloadTooLarge { max: usize, content_length: usize, raw: String }, + + #[error("error reading response stream: {0}")] + ReqwestError(#[from] reqwest::Error), +} + +#[cfg(feature = "testing-flags")] +thread_local! { + static IGNORE_CONTENT_LENGTH: Cell = const { Cell::new(false) }; +} + +#[cfg(feature = "testing-flags")] +pub fn set_ignore_content_length(val: bool) { + IGNORE_CONTENT_LENGTH.with(|f| f.set(val)); +} + +#[cfg(feature = "testing-flags")] +#[allow(dead_code)] +fn should_ignore_content_length() -> bool { + IGNORE_CONTENT_LENGTH.with(|f| f.get()) +} + +/// Reads the body of a response as a chunked stream, ensuring the size does not +/// exceed `max_size`. +pub async fn read_chunked_body_with_max( + res: Response, + max_size: usize, +) -> Result, ResponseReadError> { + // Get the content length from the response headers + #[cfg(not(feature = "testing-flags"))] + let content_length = res.content_length(); + + #[cfg(feature = "testing-flags")] + let mut content_length = res.content_length(); + + #[cfg(feature = "testing-flags")] + if should_ignore_content_length() { + // Used for testing purposes to ignore content length + content_length = None; + } + + // Break if content length is provided but it's too big + if let Some(length) = content_length && + length as usize > max_size + { + return Err(ResponseReadError::PayloadTooLarge { + max: max_size, + content_length: length as usize, + raw: String::new(), // raw content is not available here + }); + } + + let mut stream = res.bytes_stream(); + let mut response_bytes = Vec::new(); + + while let Some(chunk) = stream.next().await { + let chunk = chunk?; + if response_bytes.len() + chunk.len() > max_size { + // avoid spamming logs if the message is too large + response_bytes.truncate(1024); + return Err(ResponseReadError::PayloadTooLarge { + max: max_size, + content_length: content_length.unwrap_or(0) as usize, + raw: String::from_utf8_lossy(&response_bytes).into_owned(), + }); + } + + response_bytes.extend_from_slice(&chunk); + } + + Ok(response_bytes) +} + +/// Returns the user agent from the request headers or an empty string if not +/// present +pub fn get_user_agent(req_headers: &HeaderMap) -> String { + req_headers + .get(reqwest::header::USER_AGENT) + .and_then(|ua| ua.to_str().ok().map(|s| s.to_string())) + .unwrap_or_default() +} + +/// Adds the commit boost version to the existing user agent +pub fn get_user_agent_with_version(req_headers: &HeaderMap) -> eyre::Result { + let ua = get_user_agent(req_headers); + Ok(HeaderValue::from_str(&format!("commit-boost/{HEADER_VERSION_VALUE} {ua}"))?) +} + +/// Deterministic outbound `Accept` header used when PBS asks a relay for a +/// response it will itself decode (validation mode On/Extra). SSZ is preferred +/// for wire efficiency. Emitted verbatim so packet captures and support +/// tickets are reproducible. +pub const OUTBOUND_ACCEPT: &str = "application/octet-stream;q=1.0,application/json;q=0.9"; + +/// Default encoding used when the caller does not express a format +/// preference. This covers both `Accept: */*` (see `get_accept_types`) and +/// a missing Content-Type header on inbound or relay responses (see +/// `parse_response_encoding_and_fork` and `deserialize_body`). Keeping the +/// policy in one place prevents drift between those sites. +pub const NO_PREFERENCE_DEFAULT: EncodingType = EncodingType::Json; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct AcceptedEncodings { + pub primary: EncodingType, + pub fallback: Option, +} + +impl AcceptedEncodings { + pub const fn single(primary: EncodingType) -> Self { + Self { primary, fallback: None } + } + + pub fn contains(self, enc: EncodingType) -> bool { + self.primary == enc || self.fallback == Some(enc) + } + + /// Iterate in preference order: primary first, then fallback (if any). + pub fn iter(self) -> impl Iterator { + std::iter::once(self.primary).chain(self.fallback) + } + + pub fn preferred(self, supported: &[EncodingType]) -> Option { + self.iter().find(|a| supported.contains(a)) + } +} + +impl IntoIterator for AcceptedEncodings { + type Item = EncodingType; + type IntoIter = + std::iter::Chain, std::option::IntoIter>; + fn into_iter(self) -> Self::IntoIter { + std::iter::once(self.primary).chain(self.fallback) + } +} + +/// Parse the ACCEPT header into a q-value ordered [`AcceptedEncodings`] +/// (highest preference first, deduplicated), defaulting to the request's +/// Content-Type when no Accept header is present. Returns an error only if +/// every media type in the header is malformed or unsupported. Supports +/// requests with multiple ACCEPT headers or headers with multiple media +/// types. `q=0` entries are treated as explicit rejections per RFC 7231 +/// §5.3.1 and are skipped. +/// +/// The returned order honors the RFC 9110 §12.5.1 precedence rules already +/// applied by `headers_accept::Accept::media_types()` (specificity, then +/// q-value, then original order). +pub fn get_accept_types(req_headers: &HeaderMap) -> eyre::Result { + // Only two supported media types, so the ordered set is at most two + // entries: primary + optional fallback. + let mut primary: Option = None; + let mut fallback: Option = None; + let mut saw_any = false; + let mut had_supported = false; + for header in req_headers.get_all(ACCEPT).iter() { + let accept = Accept::from_str(header.to_str()?) + .map_err(|e| eyre::eyre!("invalid accept header: {e}"))?; + for mt in accept.media_types() { + saw_any = true; + + // Skip q=0 entries — RFC 7231 §5.3.1: "A request without any Accept + // header field implies that the user agent will accept any media + // type in response. When a header field is present ... a value of + // 0 means 'not acceptable'." + if let Some(q) = mt.get_param(mediatype::names::Q) && + q.as_str().parse::().is_ok_and(|v| v <= 0.0) + { + continue; + } + + let parsed = match mt.essence().to_string().as_str() { + APPLICATION_OCTET_STREAM => Some(EncodingType::Ssz), + APPLICATION_JSON => Some(EncodingType::Json), + WILDCARD => Some(NO_PREFERENCE_DEFAULT), + _ => None, + }; + if let Some(enc) = parsed { + had_supported = true; + match primary { + None => primary = Some(enc), + Some(p) if p != enc && fallback.is_none() => fallback = Some(enc), + _ => {} + } + } + } + } + + if let Some(primary) = primary { + return Ok(AcceptedEncodings { primary, fallback }); + } + + if saw_any && !had_supported { + eyre::bail!("unsupported accept type"); + } + + // No accept header (or only q=0 rejections): fall back to the request + // Content-Type, which mirrors the historical behavior. + Ok(AcceptedEncodings::single(get_content_type(req_headers))) +} + +/// Compute the q-value for the `index`-th preferred encoding when building an +/// outbound `Accept` header. The first entry gets q=1.0, each subsequent entry +/// decreases by 0.1, and the value is clamped to a minimum of 0.1 so we never +/// emit q=0 (which per RFC 7231 §5.3.1 means "not acceptable"). +fn accept_q_value_for_index(index: usize) -> f32 { + // `as i32` would silently wrap for large indices (e.g. usize::MAX → -1), + // which would invert the clamp. Saturate the cast explicitly. + let idx = i32::try_from(index).unwrap_or(i32::MAX); + let step = 10_i32.saturating_sub(idx).max(1); + step as f32 / 10.0 +} + +/// Format a single `Accept` header entry as `";q="`. +#[inline] +fn format_accept_entry(enc: EncodingType, q: f32) -> String { + format!("{};q={:.1}", enc.content_type(), q) +} + +/// Build an `Accept` header string that mirrors the caller's preference order +/// so the relay sees the same priority the beacon node asked us for. Each +/// subsequent entry receives a q-value 0.1 lower than the previous one, +/// starting at 1.0. +pub fn build_outbound_accept(preferred: AcceptedEncodings) -> String { + preferred + .iter() + .enumerate() + .map(|(i, enc)| format_accept_entry(enc, accept_q_value_for_index(i))) + .collect::>() + .join(",") +} + +pub fn get_content_type(req_headers: &HeaderMap) -> EncodingType { + EncodingType::from_str( + req_headers + .get(CONTENT_TYPE) + .and_then(|value| value.to_str().ok()) + .unwrap_or(APPLICATION_JSON), + ) + .unwrap_or(EncodingType::Json) +} + +pub fn get_consensus_version_header(req_headers: &HeaderMap) -> Option { + ForkName::from_str( + req_headers + .get(CONSENSUS_VERSION_HEADER) + .and_then(|value| value.to_str().ok()) + .unwrap_or(""), + ) + .ok() +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum EncodingType { + Json, + Ssz, +} + +impl EncodingType { + pub fn content_type(&self) -> &str { + match self { + EncodingType::Json => APPLICATION_JSON, + EncodingType::Ssz => APPLICATION_OCTET_STREAM, + } + } + + /// Pre-built `Content-Type` header for this encoding. + pub fn content_type_header(&self) -> &'static HeaderValue { + static JSON_HEADER: HeaderValue = HeaderValue::from_static(APPLICATION_JSON); + static SSZ_HEADER: HeaderValue = HeaderValue::from_static(APPLICATION_OCTET_STREAM); + match self { + EncodingType::Json => &JSON_HEADER, + EncodingType::Ssz => &SSZ_HEADER, + } + } +} + +impl std::fmt::Display for EncodingType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.content_type()) + } +} + +impl FromStr for EncodingType { + type Err = String; + fn from_str(value: &str) -> Result { + // Preserve prior behavior: empty defaults to JSON (used by + // `get_content_type` when Content-Type header is absent). + if value.is_empty() { + return Ok(EncodingType::Json); + } + // Parse as a media type so we tolerate RFC 7231 §3.1.1.1 parameters + // (e.g. `application/json; charset=utf-8`). Compare essence only. + let parsed = + MediaType::parse(value).map_err(|e| format!("invalid content type {value}: {e}"))?; + match parsed.essence().to_string().to_ascii_lowercase().as_str() { + APPLICATION_JSON => Ok(EncodingType::Json), + APPLICATION_OCTET_STREAM => Ok(EncodingType::Ssz), + _ => Err(format!("unsupported encoding type: {value}")), + } + } +} + +/// Parse the Content-Type and Eth-Consensus-Version headers from a relay +/// response, returning the encoding to use for body decoding and the +/// optional fork name. Tolerates MIME parameters per RFC 7231 §3.1.1.1 and +/// defaults to JSON when no Content-Type header is present (matching legacy +/// relay behavior). `code` is the HTTP status of the response and is echoed +/// back in any `PbsError::RelayResponse` this function produces, so callers +/// can surface the original status on decode failure. +pub fn parse_response_encoding_and_fork( + headers: &HeaderMap, + code: u16, +) -> Result<(EncodingType, Option), crate::pbs::error::PbsError> { + use crate::pbs::error::PbsError; + let content_type = match headers.get(CONTENT_TYPE) { + // No Content-Type: apply the shared no-preference default + None => NO_PREFERENCE_DEFAULT, + Some(hv) => { + let header_str = hv.to_str().map_err(|e| PbsError::RelayResponse { + error_msg: format!("cannot decode content-type header: {e}"), + code, + })?; + EncodingType::from_str(header_str) + .map_err(|msg| PbsError::RelayResponse { error_msg: msg, code })? + } + }; + Ok((content_type, get_consensus_version_header(headers))) +} + +#[derive(Debug, Error)] +pub enum BodyDeserializeError { + #[error("JSON deserialization error: {0}")] + SerdeJsonError(serde_json::Error), + #[error("SSZ deserialization error: {0:?}")] + SszDecodeError(ssz::DecodeError), + #[error("unsupported media type")] + UnsupportedMediaType, + #[error("missing consensus version header")] + MissingVersionHeader, +} + +pub fn deserialize_body( + headers: &HeaderMap, + body: Bytes, +) -> Result { + // Determine the encoding to decode with. Precedence: + // - Content-Type absent → NO_PREFERENCE_DEFAULT + // - Content-Type recognized → use it. + // - Content-Type present but unrecognized → UnsupportedMediaType. + let encoding = match headers.get(CONTENT_TYPE) { + None => NO_PREFERENCE_DEFAULT, + Some(hv) => { + let value = hv.to_str().map_err(|_| BodyDeserializeError::UnsupportedMediaType)?; + EncodingType::from_str(value).map_err(|_| BodyDeserializeError::UnsupportedMediaType)? + } + }; + + match encoding { + EncodingType::Json => serde_json::from_slice::(&body) + .map_err(BodyDeserializeError::SerdeJsonError), + EncodingType::Ssz => match get_consensus_version_header(headers) { + Some(version) => SignedBlindedBeaconBlock::from_ssz_bytes_with(&body, |bytes| { + BeaconBlock::from_ssz_bytes_for_fork(bytes, version) + }) + .map_err(BodyDeserializeError::SszDecodeError), + None => Err(BodyDeserializeError::MissingVersionHeader), + }, + } +} + +#[cfg(test)] +mod test { + use axum::http::{HeaderMap, HeaderName, HeaderValue}; + use bytes::Bytes; + use lh_types::ForkName; + use reqwest::header::{ACCEPT, CONTENT_TYPE}; + + use super::{ + APPLICATION_JSON, APPLICATION_OCTET_STREAM, AcceptedEncodings, BodyDeserializeError, + CONSENSUS_VERSION_HEADER, EncodingType, NO_PREFERENCE_DEFAULT, OUTBOUND_ACCEPT, WILDCARD, + accept_q_value_for_index, build_outbound_accept, deserialize_body, format_accept_entry, + get_accept_types, get_consensus_version_header, get_content_type, + parse_response_encoding_and_fork, + }; + + const APPLICATION_TEXT: &str = "application/text"; + + /// Make sure a missing Accept header is interpreted as JSON + #[test] + fn test_missing_accept_header() { + let headers = HeaderMap::new(); + let result = get_accept_types(&headers).unwrap(); + assert_eq!(result, AcceptedEncodings::single(EncodingType::Json)); + } + + /// Test accepting JSON + #[test] + fn test_accept_header_json() { + let mut headers = HeaderMap::new(); + headers.append(ACCEPT, HeaderValue::from_str(APPLICATION_JSON).unwrap()); + let result = get_accept_types(&headers).unwrap(); + assert_eq!(result, AcceptedEncodings::single(EncodingType::Json)); + } + + /// Test accepting SSZ + #[test] + fn test_accept_header_ssz() { + let mut headers = HeaderMap::new(); + headers.append(ACCEPT, HeaderValue::from_str(APPLICATION_OCTET_STREAM).unwrap()); + let result = get_accept_types(&headers).unwrap(); + assert_eq!(result, AcceptedEncodings::single(EncodingType::Ssz)); + } + + /// Wildcard `Accept: */*` resolves to the `NO_PREFERENCE_DEFAULT` + /// policy. Separate from the explicit + /// `Accept: application/json` path to keep the two intents distinct. + #[test] + fn test_accept_header_wildcard() { + let mut headers = HeaderMap::new(); + headers.append(ACCEPT, HeaderValue::from_str(WILDCARD).unwrap()); + let result = get_accept_types(&headers).unwrap(); + assert_eq!(result, AcceptedEncodings::single(NO_PREFERENCE_DEFAULT)); + } + + /// Test accepting one header with multiple values (order preserved, + /// first listed wins at equal q) + #[test] + fn test_accept_header_multiple_values() { + let header_string = format!("{APPLICATION_JSON}, {APPLICATION_OCTET_STREAM}"); + let mut headers = HeaderMap::new(); + headers.append(ACCEPT, HeaderValue::from_str(&header_string).unwrap()); + let result = get_accept_types(&headers).unwrap(); + assert_eq!(result, AcceptedEncodings { + primary: EncodingType::Json, + fallback: Some(EncodingType::Ssz) + }); + } + + /// Test accepting multiple headers + #[test] + fn test_multiple_accept_headers() { + let mut headers = HeaderMap::new(); + headers.append(ACCEPT, HeaderValue::from_str(APPLICATION_JSON).unwrap()); + headers.append(ACCEPT, HeaderValue::from_str(APPLICATION_OCTET_STREAM).unwrap()); + let result = get_accept_types(&headers).unwrap(); + assert!(result.contains(EncodingType::Json)); + assert!(result.contains(EncodingType::Ssz)); + assert!(result.fallback.is_some()); + } + + /// Test accepting one header with multiple values, including a type that + /// can't be used + #[test] + fn test_accept_header_multiple_values_including_unknown() { + let header_string = + format!("{APPLICATION_JSON}, {APPLICATION_OCTET_STREAM}, {APPLICATION_TEXT}"); + let mut headers = HeaderMap::new(); + headers.append(ACCEPT, HeaderValue::from_str(&header_string).unwrap()); + let result = get_accept_types(&headers).unwrap(); + assert_eq!(result, AcceptedEncodings { + primary: EncodingType::Json, + fallback: Some(EncodingType::Ssz) + }); + } + + /// Test rejecting an unknown accept type + #[test] + fn test_invalid_accept_header_type() { + let mut headers = HeaderMap::new(); + headers.append(ACCEPT, HeaderValue::from_str(APPLICATION_TEXT).unwrap()); + let result = get_accept_types(&headers); + assert!(result.is_err()); + } + + /// Test accepting one header with multiple values + #[test] + fn test_accept_header_invalid_parse() { + let header_string = format!("{APPLICATION_JSON}, a?;ef)"); + let mut headers = HeaderMap::new(); + headers.append(ACCEPT, HeaderValue::from_str(&header_string).unwrap()); + let result = get_accept_types(&headers); + assert!(result.is_err()); + } + + /// q-values are honored: JSON@1.0 should outrank SSZ@0.1 regardless of + /// byte order in the header. + #[test] + fn test_accept_header_q_value_ordering() { + let mut headers = HeaderMap::new(); + headers.append( + ACCEPT, + HeaderValue::from_str("application/json;q=1.0, application/octet-stream;q=0.1") + .unwrap(), + ); + assert_eq!(get_accept_types(&headers).unwrap(), AcceptedEncodings { + primary: EncodingType::Json, + fallback: Some(EncodingType::Ssz) + }); + + let mut headers = HeaderMap::new(); + headers.append( + ACCEPT, + HeaderValue::from_str("application/octet-stream;q=0.1, application/json;q=1.0") + .unwrap(), + ); + assert_eq!(get_accept_types(&headers).unwrap(), AcceptedEncodings { + primary: EncodingType::Json, + fallback: Some(EncodingType::Ssz) + }); + } + + /// q=0 is an explicit rejection per RFC 7231 §5.3.1 and must be dropped. + #[test] + fn test_accept_header_q_zero_rejected() { + let mut headers = HeaderMap::new(); + headers.append( + ACCEPT, + HeaderValue::from_str("application/json, application/octet-stream;q=0").unwrap(), + ); + assert_eq!( + get_accept_types(&headers).unwrap(), + AcceptedEncodings::single(EncodingType::Json) + ); + } + + /// An Accept header containing only q=0 for every supported type is a + /// deliberate "I accept nothing" and must error (so the route can return + /// 406 Not Acceptable per RFC 7231 §5.3.1 and §6.5.6). + #[test] + fn test_accept_header_only_q_zero_errors() { + let mut headers = HeaderMap::new(); + headers.append( + ACCEPT, + HeaderValue::from_str("application/json;q=0, application/octet-stream;q=0").unwrap(), + ); + assert!(get_accept_types(&headers).is_err()); + } + + /// `AcceptedEncodings::preferred` picks the caller's first choice that + /// the server can actually produce. + #[test] + fn test_preferred_encoding_picks_highest_q_match() { + let accepts = + AcceptedEncodings { primary: EncodingType::Json, fallback: Some(EncodingType::Ssz) }; + let supported = [EncodingType::Ssz, EncodingType::Json]; + assert_eq!(accepts.preferred(&supported), Some(EncodingType::Json)); + + let accepts = AcceptedEncodings::single(EncodingType::Ssz); + let supported = [EncodingType::Json]; + assert_eq!(accepts.preferred(&supported), None); + } + + /// Outbound Accept should be deterministic and q-ordered to match caller + /// preference. + #[test] + fn test_build_outbound_accept_deterministic() { + let ssz_then_json = + AcceptedEncodings { primary: EncodingType::Ssz, fallback: Some(EncodingType::Json) }; + let json_then_ssz = + AcceptedEncodings { primary: EncodingType::Json, fallback: Some(EncodingType::Ssz) }; + assert_eq!( + build_outbound_accept(ssz_then_json), + "application/octet-stream;q=1.0,application/json;q=0.9" + ); + assert_eq!( + build_outbound_accept(json_then_ssz), + "application/json;q=1.0,application/octet-stream;q=0.9" + ); + + // Stable across repeats + for _ in 0..100 { + assert_eq!( + build_outbound_accept(ssz_then_json), + "application/octet-stream;q=1.0,application/json;q=0.9" + ); + } + } + + /// `AcceptedEncodings::single` produces a primary with no fallback. + #[test] + fn test_accepted_encodings_single() { + let a = AcceptedEncodings::single(EncodingType::Ssz); + assert_eq!(a.primary, EncodingType::Ssz); + assert_eq!(a.fallback, None); + } + + /// `contains` checks both primary and fallback. + #[test] + fn test_accepted_encodings_contains() { + let only_ssz = AcceptedEncodings::single(EncodingType::Ssz); + assert!(only_ssz.contains(EncodingType::Ssz)); + assert!(!only_ssz.contains(EncodingType::Json)); + + let both = + AcceptedEncodings { primary: EncodingType::Ssz, fallback: Some(EncodingType::Json) }; + assert!(both.contains(EncodingType::Ssz)); + assert!(both.contains(EncodingType::Json)); + } + + /// `iter` yields primary first, then fallback if present. Single-value + /// instances yield exactly one element. + #[test] + fn test_accepted_encodings_iter_order() { + let both = + AcceptedEncodings { primary: EncodingType::Json, fallback: Some(EncodingType::Ssz) }; + assert_eq!(both.iter().collect::>(), vec![EncodingType::Json, EncodingType::Ssz]); + + let only = AcceptedEncodings::single(EncodingType::Ssz); + assert_eq!(only.iter().collect::>(), vec![EncodingType::Ssz]); + } + + /// `IntoIterator` matches `iter`: preference order preserved, fallback + /// included only when present. + #[test] + fn test_accepted_encodings_into_iterator() { + let both = + AcceptedEncodings { primary: EncodingType::Ssz, fallback: Some(EncodingType::Json) }; + let collected: Vec<_> = both.into_iter().collect(); + assert_eq!(collected, vec![EncodingType::Ssz, EncodingType::Json]); + + let only = AcceptedEncodings::single(EncodingType::Json); + let collected: Vec<_> = only.into_iter().collect(); + assert_eq!(collected, vec![EncodingType::Json]); + } + + /// Duplicate media types in an Accept header are deduplicated — the + /// second occurrence of `primary` must not populate `fallback`. + #[test] + fn test_accept_header_duplicate_dedups() { + let header_string = format!("{APPLICATION_JSON}, {APPLICATION_JSON}"); + let mut headers = HeaderMap::new(); + headers.append(ACCEPT, HeaderValue::from_str(&header_string).unwrap()); + assert_eq!( + get_accept_types(&headers).unwrap(), + AcceptedEncodings::single(EncodingType::Json) + ); + } + + /// Once primary and fallback are filled, further supported entries must + /// not overwrite fallback. (Belt-and-suspenders — only two supported + /// variants exist today, so this is mostly a guard against future + /// regressions if a third variant is added.) + #[test] + fn test_accept_header_third_supported_entry_ignored() { + // Repeat SSZ to simulate a third supported-but-duplicate entry + // landing after primary+fallback are already set. + let header_string = + format!("{APPLICATION_JSON}, {APPLICATION_OCTET_STREAM}, {APPLICATION_JSON}"); + let mut headers = HeaderMap::new(); + headers.append(ACCEPT, HeaderValue::from_str(&header_string).unwrap()); + assert_eq!(get_accept_types(&headers).unwrap(), AcceptedEncodings { + primary: EncodingType::Json, + fallback: Some(EncodingType::Ssz) + }); + } + + /// Unsupported media types interleaved with supported ones must not + /// occupy the primary or fallback slots. + #[test] + fn test_accept_header_unsupported_does_not_fill_fallback() { + let header_string = format!("{APPLICATION_TEXT}, {APPLICATION_JSON}"); + let mut headers = HeaderMap::new(); + headers.append(ACCEPT, HeaderValue::from_str(&header_string).unwrap()); + // `saw_any = true` and `had_supported = true`, so we return the + // supported type as primary with no fallback. + assert_eq!( + get_accept_types(&headers).unwrap(), + AcceptedEncodings::single(EncodingType::Json) + ); + } + + /// `build_outbound_accept` on a single-value `AcceptedEncodings` emits + /// exactly one entry at q=1.0 (no trailing comma, no orphan fallback). + #[test] + fn test_build_outbound_accept_single_value() { + let only_ssz = AcceptedEncodings::single(EncodingType::Ssz); + assert_eq!(build_outbound_accept(only_ssz), "application/octet-stream;q=1.0"); + + let only_json = AcceptedEncodings::single(EncodingType::Json); + assert_eq!(build_outbound_accept(only_json), "application/json;q=1.0"); + } + + /// `preferred` walks the caller's preference order and returns the + /// first supported match — not the server's first choice. + #[test] + fn test_preferred_respects_caller_order_over_server_order() { + // Caller prefers JSON first. Server lists SSZ first. Caller wins. + let accepts = + AcceptedEncodings { primary: EncodingType::Json, fallback: Some(EncodingType::Ssz) }; + assert_eq!( + accepts.preferred(&[EncodingType::Ssz, EncodingType::Json]), + Some(EncodingType::Json) + ); + } + + /// Snapshot test: constant emits exactly what we document in + /// OUTBOUND_ACCEPT. + #[test] + fn test_outbound_accept_constant_snapshot() { + assert_eq!(OUTBOUND_ACCEPT, "application/octet-stream;q=1.0,application/json;q=0.9"); + } + + /// q-value ladder: first entry is 1.0, each subsequent entry drops by 0.1. + #[test] + fn test_accept_q_value_for_index_ladder() { + assert!((accept_q_value_for_index(0) - 1.0).abs() < f32::EPSILON); + assert!((accept_q_value_for_index(1) - 0.9).abs() < f32::EPSILON); + assert!((accept_q_value_for_index(5) - 0.5).abs() < f32::EPSILON); + assert!((accept_q_value_for_index(9) - 0.1).abs() < f32::EPSILON); + } + + /// Clamp at 0.1: we never emit q=0 (which per RFC 7231 §5.3.1 would mean + /// "not acceptable"). + #[test] + fn test_accept_q_value_for_index_clamps_to_minimum() { + assert!((accept_q_value_for_index(10) - 0.1).abs() < f32::EPSILON); + assert!((accept_q_value_for_index(100) - 0.1).abs() < f32::EPSILON); + // Even an adversarial usize::MAX must not underflow or drop to zero. + assert!((accept_q_value_for_index(usize::MAX) - 0.1).abs() < f32::EPSILON); + } + + /// Entry formatter emits the spec-shaped string. + #[test] + fn test_format_accept_entry_shape() { + assert_eq!(format_accept_entry(EncodingType::Ssz, 1.0), "application/octet-stream;q=1.0"); + assert_eq!(format_accept_entry(EncodingType::Json, 0.9), "application/json;q=0.9"); + // One decimal place, even when the value has more precision. + assert_eq!(format_accept_entry(EncodingType::Json, 0.12345), "application/json;q=0.1"); + } + + // ── get_content_type ───────────────────────────────────────────────────── + + #[test] + fn test_content_type_missing_defaults_to_json() { + let headers = HeaderMap::new(); + assert_eq!(get_content_type(&headers), EncodingType::Json); + } + + #[test] + fn test_content_type_json() { + let mut headers = HeaderMap::new(); + headers.insert(CONTENT_TYPE, HeaderValue::from_str(APPLICATION_JSON).unwrap()); + assert_eq!(get_content_type(&headers), EncodingType::Json); + } + + #[test] + fn test_content_type_ssz() { + let mut headers = HeaderMap::new(); + headers.insert(CONTENT_TYPE, HeaderValue::from_str(APPLICATION_OCTET_STREAM).unwrap()); + assert_eq!(get_content_type(&headers), EncodingType::Ssz); + } + + #[test] + fn test_content_type_unknown_defaults_to_json() { + let mut headers = HeaderMap::new(); + headers.insert(CONTENT_TYPE, HeaderValue::from_str("application/xml").unwrap()); + assert_eq!(get_content_type(&headers), EncodingType::Json); + } + + // ── get_consensus_version_header ───────────────────────────────────────── + + #[test] + fn test_consensus_version_header_electra() { + let mut headers = HeaderMap::new(); + let name = HeaderName::try_from(CONSENSUS_VERSION_HEADER).unwrap(); + headers.insert(name, HeaderValue::from_str("electra").unwrap()); + assert_eq!(get_consensus_version_header(&headers), Some(ForkName::Electra)); + } + + #[test] + fn test_consensus_version_header_missing() { + let headers = HeaderMap::new(); + assert_eq!(get_consensus_version_header(&headers), None); + } + + #[test] + fn test_consensus_version_header_invalid() { + let mut headers = HeaderMap::new(); + let name = HeaderName::try_from(CONSENSUS_VERSION_HEADER).unwrap(); + headers.insert(name, HeaderValue::from_str("not_a_fork").unwrap()); + assert_eq!(get_consensus_version_header(&headers), None); + } + + // ── EncodingType ───────────────────────────────────────────────────────── + + #[test] + fn test_encoding_type_from_str_variants() { + use std::str::FromStr; + assert_eq!(EncodingType::from_str(APPLICATION_JSON).unwrap(), EncodingType::Json); + assert_eq!(EncodingType::from_str(APPLICATION_OCTET_STREAM).unwrap(), EncodingType::Ssz); + // empty string defaults to JSON per the impl + assert_eq!(EncodingType::from_str("").unwrap(), EncodingType::Json); + assert!(EncodingType::from_str("application/xml").is_err()); + } + + #[test] + fn test_encoding_type_from_str_with_mime_params() { + // RFC 7231 §3.1.1.1: media-type parameters must be tolerated. + // Relays behind proxies routinely add charset= and similar. + use std::str::FromStr; + assert_eq!( + EncodingType::from_str("application/json; charset=utf-8").unwrap(), + EncodingType::Json + ); + assert_eq!( + EncodingType::from_str("application/octet-stream; boundary=x").unwrap(), + EncodingType::Ssz + ); + // Case-insensitivity per RFC 7231: type/subtype are lowercased before + // comparison. + assert_eq!(EncodingType::from_str("APPLICATION/OCTET-STREAM").unwrap(), EncodingType::Ssz); + // Extra whitespace around parameters is tolerated by the MIME parser. + assert_eq!( + EncodingType::from_str("application/json;charset=utf-8").unwrap(), + EncodingType::Json + ); + // Garbage that can't parse as a media type is an error. + assert!(EncodingType::from_str("garbage").is_err()); + // A parseable media type that isn't one we support is an error. + assert!(EncodingType::from_str("text/plain").is_err()); + } + + #[test] + fn test_parse_response_encoding_and_fork_tolerates_mime_params() { + // Full integration of the helper: missing header defaults to JSON, + // present header with params still decodes correctly. + let mut headers = HeaderMap::new(); + let (enc, fork) = parse_response_encoding_and_fork(&headers, 200).unwrap(); + assert_eq!(enc, EncodingType::Json); + assert!(fork.is_none()); + + headers.insert( + CONTENT_TYPE, + HeaderValue::from_str("application/octet-stream; charset=binary").unwrap(), + ); + let (enc, _) = parse_response_encoding_and_fork(&headers, 200).unwrap(); + assert_eq!(enc, EncodingType::Ssz); + + headers.insert(CONTENT_TYPE, HeaderValue::from_str("application/xml").unwrap()); + let err = parse_response_encoding_and_fork(&headers, 415).unwrap_err(); + match err { + crate::pbs::error::PbsError::RelayResponse { code, .. } => assert_eq!(code, 415), + other => panic!("expected RelayResponse, got {other:?}"), + } + } + + #[test] + fn test_encoding_type_display() { + assert_eq!(EncodingType::Json.to_string(), APPLICATION_JSON); + assert_eq!(EncodingType::Ssz.to_string(), APPLICATION_OCTET_STREAM); + } + + // ── deserialize_body error paths ───────────────────────────────────────── + + /// Missing Content-Type falls back to the `NO_PREFERENCE_DEFAULT` (JSON) + /// path, matching pre-PR behavior. Garbage body reaches the JSON + /// decoder and errors as `SerdeJsonError`, proving the default kicked + /// in (vs. bailing early with `UnsupportedMediaType`). + #[tokio::test] + async fn test_deserialize_body_missing_content_type_falls_back_to_json() { + let headers = HeaderMap::new(); + let body = Bytes::from_static(b"not json"); + let err = deserialize_body(&headers, body).unwrap_err(); + assert!( + matches!(err, BodyDeserializeError::SerdeJsonError(_)), + "expected SerdeJsonError (JSON decode attempted), got: {err}" + ); + } + + /// Present-but-unrecognized Content-Type still bails as + /// `UnsupportedMediaType`; the fallback only covers *missing* headers. + #[tokio::test] + async fn test_deserialize_body_unrecognized_content_type() { + let mut headers = HeaderMap::new(); + headers.insert(CONTENT_TYPE, HeaderValue::from_static("text/plain")); + let body = Bytes::from_static(b"hi"); + let err = deserialize_body(&headers, body).unwrap_err(); + assert!(matches!(err, BodyDeserializeError::UnsupportedMediaType)); + } + + #[tokio::test] + async fn test_deserialize_body_ssz_missing_version_header() { + let mut headers = HeaderMap::new(); + headers.insert(CONTENT_TYPE, HeaderValue::from_str(APPLICATION_OCTET_STREAM).unwrap()); + let body = Bytes::from_static(b"\x00\x01\x02\x03"); + let err = deserialize_body(&headers, body).unwrap_err(); + assert!(matches!(err, BodyDeserializeError::MissingVersionHeader)); + } +} diff --git a/crates/pbs/Cargo.toml b/crates/pbs/Cargo.toml index a9124c06..30064c28 100644 --- a/crates/pbs/Cargo.toml +++ b/crates/pbs/Cargo.toml @@ -12,9 +12,13 @@ axum.workspace = true axum-extra.workspace = true cb-common.workspace = true cb-metrics.workspace = true +ethereum_serde_utils.workspace = true +ethereum_ssz.workspace = true eyre.workspace = true futures.workspace = true +headers.workspace = true lazy_static.workspace = true +lh_types.workspace = true notify.workspace = true parking_lot.workspace = true prometheus.workspace = true @@ -27,3 +31,4 @@ tracing.workspace = true tree_hash.workspace = true url.workspace = true uuid.workspace = true +thiserror.workspace = true diff --git a/crates/pbs/src/error.rs b/crates/pbs/src/error.rs index 590c03d4..1748d814 100644 --- a/crates/pbs/src/error.rs +++ b/crates/pbs/src/error.rs @@ -1,11 +1,18 @@ use axum::{http::StatusCode, response::IntoResponse}; +use cb_common::wire::BodyDeserializeError; +use thiserror::Error; -#[derive(Debug)] +#[derive(Debug, Error)] /// Errors that the PbsService returns to client pub enum PbsClientError { + #[error("no response from relays")] NoResponse, + #[error("no payload from relays")] NoPayload, + #[error("internal server error")] Internal, + #[error("failed to deserialize body: {0}")] + DecodeError(#[from] BodyDeserializeError), } impl PbsClientError { @@ -14,6 +21,7 @@ impl PbsClientError { PbsClientError::NoResponse => StatusCode::BAD_GATEWAY, PbsClientError::NoPayload => StatusCode::BAD_GATEWAY, PbsClientError::Internal => StatusCode::INTERNAL_SERVER_ERROR, + PbsClientError::DecodeError(_) => StatusCode::BAD_REQUEST, } } } @@ -24,6 +32,7 @@ impl IntoResponse for PbsClientError { PbsClientError::NoResponse => "no response from relays".to_string(), PbsClientError::NoPayload => "no payload from relays".to_string(), PbsClientError::Internal => "internal server error".to_string(), + PbsClientError::DecodeError(e) => format!("error decoding request: {e}"), }; (self.status_code(), msg).into_response() diff --git a/crates/pbs/src/mev_boost/get_header.rs b/crates/pbs/src/mev_boost/get_header.rs index c144e2c0..e9bb0439 100644 --- a/crates/pbs/src/mev_boost/get_header.rs +++ b/crates/pbs/src/mev_boost/get_header.rs @@ -18,10 +18,8 @@ use cb_common::{ }, signature::verify_signed_message, types::{BlsPublicKey, BlsPublicKeyBytes, BlsSignature, Chain}, - utils::{ - get_user_agent_with_version, ms_into_slot, read_chunked_body_with_max, - timestamp_of_slot_start_sec, utcnow_ms, - }, + utils::{ms_into_slot, timestamp_of_slot_start_sec, utcnow_ms}, + wire::{get_user_agent_with_version, read_chunked_body_with_max}, }; use futures::future::join_all; use parking_lot::RwLock; diff --git a/crates/pbs/src/mev_boost/register_validator.rs b/crates/pbs/src/mev_boost/register_validator.rs index 15f68416..ecdfb740 100644 --- a/crates/pbs/src/mev_boost/register_validator.rs +++ b/crates/pbs/src/mev_boost/register_validator.rs @@ -4,7 +4,8 @@ use alloy::primitives::Bytes; use axum::http::{HeaderMap, HeaderValue}; use cb_common::{ pbs::{HEADER_START_TIME_UNIX_MS, RelayClient, error::PbsError}, - utils::{get_user_agent_with_version, read_chunked_body_with_max, utcnow_ms}, + utils::utcnow_ms, + wire::{get_user_agent_with_version, read_chunked_body_with_max}, }; use eyre::bail; use futures::{ diff --git a/crates/pbs/src/mev_boost/status.rs b/crates/pbs/src/mev_boost/status.rs index c4a8cfed..0e435f94 100644 --- a/crates/pbs/src/mev_boost/status.rs +++ b/crates/pbs/src/mev_boost/status.rs @@ -3,7 +3,7 @@ use std::time::{Duration, Instant}; use axum::http::HeaderMap; use cb_common::{ pbs::{RelayClient, error::PbsError}, - utils::{get_user_agent_with_version, read_chunked_body_with_max}, + wire::{get_user_agent_with_version, read_chunked_body_with_max}, }; use futures::future::select_ok; use reqwest::header::USER_AGENT; diff --git a/crates/pbs/src/mev_boost/submit_block.rs b/crates/pbs/src/mev_boost/submit_block.rs index b416dba2..3a451bd7 100644 --- a/crates/pbs/src/mev_boost/submit_block.rs +++ b/crates/pbs/src/mev_boost/submit_block.rs @@ -13,7 +13,8 @@ use cb_common::{ SubmitBlindedBlockResponse, error::{PbsError, ValidationError}, }, - utils::{get_user_agent_with_version, read_chunked_body_with_max, utcnow_ms}, + utils::utcnow_ms, + wire::{get_user_agent_with_version, read_chunked_body_with_max}, }; use futures::{FutureExt, future::select_ok}; use reqwest::header::USER_AGENT; diff --git a/crates/pbs/src/routes/get_header.rs b/crates/pbs/src/routes/get_header.rs index 9ed312af..600df11d 100644 --- a/crates/pbs/src/routes/get_header.rs +++ b/crates/pbs/src/routes/get_header.rs @@ -6,7 +6,8 @@ use axum::{ }; use cb_common::{ pbs::{GetHeaderInfo, GetHeaderParams}, - utils::{get_user_agent, ms_into_slot}, + utils::ms_into_slot, + wire::get_user_agent, }; use reqwest::StatusCode; use tracing::{error, info}; diff --git a/crates/pbs/src/routes/register_validator.rs b/crates/pbs/src/routes/register_validator.rs index 51c8ce6e..10549006 100644 --- a/crates/pbs/src/routes/register_validator.rs +++ b/crates/pbs/src/routes/register_validator.rs @@ -1,5 +1,5 @@ use axum::{Json, extract::State, http::HeaderMap, response::IntoResponse}; -use cb_common::utils::get_user_agent; +use cb_common::wire::get_user_agent; use reqwest::StatusCode; use tracing::{error, info, trace}; diff --git a/crates/pbs/src/routes/reload.rs b/crates/pbs/src/routes/reload.rs index aa031d47..969328c4 100644 --- a/crates/pbs/src/routes/reload.rs +++ b/crates/pbs/src/routes/reload.rs @@ -1,5 +1,5 @@ use axum::{extract::State, http::HeaderMap, response::IntoResponse}; -use cb_common::utils::get_user_agent; +use cb_common::wire::get_user_agent; use reqwest::StatusCode; use tracing::{error, info}; diff --git a/crates/pbs/src/routes/status.rs b/crates/pbs/src/routes/status.rs index 52fd3e2f..0980ae6e 100644 --- a/crates/pbs/src/routes/status.rs +++ b/crates/pbs/src/routes/status.rs @@ -1,5 +1,5 @@ use axum::{extract::State, http::HeaderMap, response::IntoResponse}; -use cb_common::utils::get_user_agent; +use cb_common::wire::get_user_agent; use reqwest::StatusCode; use tracing::{error, info}; diff --git a/crates/pbs/src/routes/submit_block.rs b/crates/pbs/src/routes/submit_block.rs index 004b601e..aff6fd34 100644 --- a/crates/pbs/src/routes/submit_block.rs +++ b/crates/pbs/src/routes/submit_block.rs @@ -3,7 +3,8 @@ use std::sync::Arc; use axum::{Json, extract::State, http::HeaderMap, response::IntoResponse}; use cb_common::{ pbs::{BuilderApiVersion, GetPayloadInfo, SignedBlindedBeaconBlock}, - utils::{get_user_agent, timestamp_of_slot_start_millis, utcnow_ms}, + utils::{timestamp_of_slot_start_millis, utcnow_ms}, + wire::get_user_agent, }; use reqwest::StatusCode; use tracing::{error, info, trace}; diff --git a/tests/tests/pbs_mux.rs b/tests/tests/pbs_mux.rs index 4f842d56..c66487b6 100644 --- a/tests/tests/pbs_mux.rs +++ b/tests/tests/pbs_mux.rs @@ -12,7 +12,7 @@ use cb_common::{ }, signer::random_secret, types::Chain, - utils::{ResponseReadError, set_ignore_content_length}, + wire::{ResponseReadError, set_ignore_content_length}, }; use cb_pbs::{DefaultBuilderApi, PbsService, PbsState}; use cb_tests::{ @@ -358,7 +358,7 @@ async fn test_ssv_multi_with_node() -> Result<()> { info!("Sending get header"); let res = mock_validator.do_get_header(Some(pubkey2.clone())).await?; assert_eq!(res.status(), StatusCode::OK); - assert_eq!(relay_state.received_get_header(), 1); // pubkey2 was loaded from the SSV node + assert_eq!(relay_state.received_get_header(), 1); // pubkey2 was loaded from the SSV node // Shut down the server handles pbs_server.abort(); @@ -454,7 +454,7 @@ async fn test_ssv_multi_with_public() -> Result<()> { info!("Sending get header"); let res = mock_validator.do_get_header(Some(pubkey2.clone())).await?; assert_eq!(res.status(), StatusCode::OK); - assert_eq!(relay_state.received_get_header(), 1); // pubkey2 was loaded from the SSV public API + assert_eq!(relay_state.received_get_header(), 1); // pubkey2 was loaded from the SSV public API // Shut down the server handles pbs_server.abort();