-
-
Notifications
You must be signed in to change notification settings - Fork 459
feat: ssz engine API transport #8994
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
de2fba7
36529c5
f10047c
66b6604
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,9 +1,9 @@ | ||
| import {Logger} from "@lodestar/logger"; | ||
| import {ForkName, ForkPostFulu, ForkPreFulu, ForkSeq, SLOTS_PER_EPOCH, isForkPostFulu} from "@lodestar/params"; | ||
| import {BlobsBundle, ExecutionPayload, ExecutionRequests, Root, RootHex, Wei} from "@lodestar/types"; | ||
| import {BlobsBundle, ExecutionPayload, ExecutionRequests, Root, RootHex, Wei, ssz as sszCodecs} from "@lodestar/types"; | ||
| import {BlobAndProof} from "@lodestar/types/deneb"; | ||
| import {BlobAndProofV2} from "@lodestar/types/fulu"; | ||
| import {strip0xPrefix} from "@lodestar/utils"; | ||
| import {fromHex, strip0xPrefix} from "@lodestar/utils"; | ||
| import {Metrics} from "../../metrics/index.js"; | ||
| import {EPOCHS_PER_BATCH} from "../../sync/constants.js"; | ||
| import {getLodestarClientVersion} from "../../util/metadata.js"; | ||
|
|
@@ -27,6 +27,17 @@ import { | |
| ReqOpts, | ||
| } from "./jsonRpcHttpClient.js"; | ||
| import {PayloadIdCache} from "./payloadIdCache.js"; | ||
| import {SszRestClient, isSszRestNetworkError} from "./sszRestClient.js"; | ||
| import { | ||
| decodeForkchoiceUpdatedResponse, | ||
| decodeGetBlobsResponse, | ||
| decodeGetPayloadResponse, | ||
| decodePayloadStatus, | ||
| encodeForkchoiceUpdatedRequest, | ||
| encodeGetBlobsRequest, | ||
| encodeGetPayloadRequest, | ||
| encodeNewPayloadRequest, | ||
| } from "./sszRestEncoding.js"; | ||
| import { | ||
| BLOB_AND_PROOF_V2_RPC_BYTES, | ||
| EngineApiRpcParamTypes, | ||
|
|
@@ -139,6 +150,10 @@ export class ExecutionEngineHttp implements IExecutionEngine { | |
| clientVersion?: ClientVersion | null; | ||
|
|
||
| readonly payloadIdCache = new PayloadIdCache(); | ||
|
|
||
| /** EIP-8161: SSZ-REST client, null if not configured */ | ||
| private readonly sszRestClient: SszRestClient | null; | ||
|
|
||
| /** | ||
| * A queue to serialize the fcUs and newPayloads calls: | ||
| * | ||
|
|
@@ -170,6 +185,21 @@ export class ExecutionEngineHttp implements IExecutionEngine { | |
| this.logger = logger; | ||
| this.metrics = metrics ?? null; | ||
|
|
||
| // EIP-8161: Initialize SSZ-REST client using the same Engine API URL. | ||
| // SSZ-REST routes are served on the same port under /engine/* paths. | ||
| { | ||
| const engineUrl = opts?.urls?.[0] ?? "http://localhost:8551"; | ||
| const baseUrl = engineUrl.replace(/\/+$/, ""); | ||
| this.sszRestClient = new SszRestClient({ | ||
| baseUrl, | ||
| jwtSecretHex: opts?.jwtSecretHex, | ||
| jwtId: opts?.jwtId, | ||
| jwtVersion: opts?.jwtVersion, | ||
| timeout: opts?.timeout, | ||
| }); | ||
| this.logger.info("SSZ-REST Engine API transport enabled (EIP-8161)", {url: baseUrl}); | ||
| } | ||
|
|
||
| this.rpc.emitter.on(JsonRpcHttpClientEvent.ERROR, ({error}) => { | ||
| this.updateEngineState(getExecutionEngineState({payloadError: error, oldState: this.state})); | ||
| }); | ||
|
|
@@ -215,6 +245,44 @@ export class ExecutionEngineHttp implements IExecutionEngine { | |
| parentBlockRoot?: Root, | ||
| executionRequests?: ExecutionRequests | ||
| ): Promise<ExecutePayloadResponse> { | ||
| // EIP-8161: Try SSZ-REST first, fall back to JSON-RPC on network errors | ||
| if (this.sszRestClient) { | ||
| try { | ||
| const version = | ||
| ForkSeq[fork] >= ForkSeq.fulu ? 5 : ForkSeq[fork] >= ForkSeq.electra ? 4 : ForkSeq[fork] >= ForkSeq.deneb ? 3 : ForkSeq[fork] >= ForkSeq.capella ? 2 : 1; | ||
| const path = `/engine/v${version}/payloads`; | ||
| const body = encodeNewPayloadRequest(fork, executionPayload, versionedHashes, parentBlockRoot, executionRequests); | ||
| const resp = await this.sszRestClient.doRequest(path, body); | ||
| const decoded = decodePayloadStatus(resp); | ||
| const status = decoded.status as ExecutionPayloadStatus; | ||
| this.updateEngineState(getExecutionEngineState({payloadStatus: status, oldState: this.state})); | ||
|
|
||
| switch (status) { | ||
| case ExecutionPayloadStatus.VALID: | ||
| return {status, latestValidHash: decoded.latestValidHash ?? "0x0", validationError: null}; | ||
| case ExecutionPayloadStatus.INVALID: | ||
| return {status, latestValidHash: decoded.latestValidHash, validationError: decoded.validationError}; | ||
| case ExecutionPayloadStatus.SYNCING: | ||
| case ExecutionPayloadStatus.ACCEPTED: | ||
| return {status, latestValidHash: null, validationError: null}; | ||
| case ExecutionPayloadStatus.INVALID_BLOCK_HASH: | ||
| return {status, latestValidHash: null, validationError: decoded.validationError ?? "Malformed block"}; | ||
| default: | ||
| return { | ||
| status: ExecutionPayloadStatus.ELERROR, | ||
| latestValidHash: null, | ||
| validationError: `Invalid EL status on executePayload: ${status}`, | ||
| }; | ||
| } | ||
| } catch (e) { | ||
| if (isSszRestNetworkError(e)) { | ||
| this.logger.debug("SSZ-REST newPayload failed, falling back to JSON-RPC", {error: (e as Error).message}); | ||
| } else { | ||
| throw e; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| const method = | ||
| ForkSeq[fork] >= ForkSeq.electra | ||
| ? "engine_newPayloadV4" | ||
|
|
@@ -345,6 +413,60 @@ export class ExecutionEngineHttp implements IExecutionEngine { | |
| finalizedBlockHash: RootHex, | ||
| payloadAttributes?: PayloadAttributes | ||
| ): Promise<PayloadId | null> { | ||
| // EIP-8161: Try SSZ-REST first, fall back to JSON-RPC on network errors | ||
| if (this.sszRestClient) { | ||
| try { | ||
| const version = ForkSeq[fork] >= ForkSeq.deneb ? 3 : ForkSeq[fork] >= ForkSeq.capella ? 2 : 1; | ||
| const path = `/engine/v${version}/forkchoice`; | ||
| const headBytes = fromHex(headBlockHash); | ||
| const safeBytes = fromHex(safeBlockHash); | ||
| const finalizedBytes = fromHex(finalizedBlockHash); | ||
| const body = encodeForkchoiceUpdatedRequest(headBytes, safeBytes, finalizedBytes, payloadAttributes); | ||
| const resp = await this.sszRestClient.doRequest(path, body); | ||
| const decoded = decodeForkchoiceUpdatedResponse(resp); | ||
| const status = decoded.payloadStatus.status as ExecutionPayloadStatus; | ||
|
|
||
| this.updateEngineState(getExecutionEngineState({payloadStatus: status, oldState: this.state})); | ||
| this.metrics?.engineNotifyForkchoiceUpdateResult.inc({result: status}); | ||
|
|
||
| const payloadAttributesRpc = payloadAttributes ? serializePayloadAttributes(payloadAttributes) : undefined; | ||
|
|
||
| switch (status) { | ||
| case ExecutionPayloadStatus.VALID: | ||
| if (payloadAttributesRpc) { | ||
| if (!decoded.payloadId || decoded.payloadId === "0x") { | ||
| throw Error(`Received invalid payloadId=${decoded.payloadId}`); | ||
| } | ||
| this.payloadIdCache.add({headBlockHash, finalizedBlockHash, ...payloadAttributesRpc}, decoded.payloadId); | ||
| void this.prunePayloadIdCache(); | ||
| } | ||
| return decoded.payloadId !== "0x" ? decoded.payloadId : null; | ||
|
|
||
| case ExecutionPayloadStatus.SYNCING: | ||
| if (payloadAttributes) { | ||
| throw Error("Execution Layer Syncing"); | ||
| } | ||
| return null; | ||
|
|
||
| case ExecutionPayloadStatus.INVALID: | ||
| throw Error( | ||
| `Invalid ${payloadAttributes ? "prepare payload" : "forkchoice request"}, validationError=${ | ||
| decoded.payloadStatus.validationError ?? "" | ||
| }` | ||
| ); | ||
|
|
||
| default: | ||
| throw Error(`Unknown status ${status}`); | ||
| } | ||
| } catch (e) { | ||
| if (isSszRestNetworkError(e)) { | ||
| this.logger.debug("SSZ-REST forkchoiceUpdate failed, falling back to JSON-RPC", {error: (e as Error).message}); | ||
| } else { | ||
| throw e; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // Once on capella, should this need to be permanently switched to v2 when payload attrs | ||
| // not provided | ||
| const method = | ||
|
|
@@ -422,6 +544,42 @@ export class ExecutionEngineHttp implements IExecutionEngine { | |
| executionRequests?: ExecutionRequests; | ||
| shouldOverrideBuilder?: boolean; | ||
| }> { | ||
| // EIP-8161: Try SSZ-REST first, fall back to JSON-RPC on network errors | ||
| if (this.sszRestClient) { | ||
| try { | ||
| const version = | ||
| ForkSeq[fork] >= ForkSeq.fulu ? 5 : ForkSeq[fork] >= ForkSeq.electra ? 4 : ForkSeq[fork] >= ForkSeq.deneb ? 3 : ForkSeq[fork] >= ForkSeq.capella ? 2 : 1; | ||
| const path = `/engine/v${version}/payloads/${payloadId}`; | ||
| const resp = await this.sszRestClient.doGetRequest(path); | ||
| const decoded = decodeGetPayloadResponse(resp); | ||
|
|
||
| // The executionPayloadSsz needs to be parsed back through the JSON-RPC parseExecutionPayload path | ||
| // For now, we return the raw SSZ and let the caller handle it. | ||
| // Actually, we can deserialize the SSZ payload using @lodestar/types | ||
| const executionPayload = deserializeExecutionPayloadSsz(fork, decoded.executionPayloadSsz); | ||
| const executionPayloadValue = decoded.blockValue; | ||
| const blobsBundle = | ||
| decoded.blobsBundleSsz.length > 0 ? deserializeBlobsBundleSsz(decoded.blobsBundleSsz) : undefined; | ||
| const executionRequests = | ||
| decoded.executionRequestsSsz.length > 0 | ||
| ? deserializeExecutionRequestsSsz(decoded.executionRequestsSsz) | ||
| : undefined; | ||
| return { | ||
| executionPayload, | ||
| executionPayloadValue, | ||
| blobsBundle, | ||
| executionRequests, | ||
| shouldOverrideBuilder: decoded.shouldOverrideBuilder, | ||
| }; | ||
| } catch (e) { | ||
| if (isSszRestNetworkError(e)) { | ||
| this.logger.debug("SSZ-REST getPayload failed, falling back to JSON-RPC", {error: (e as Error).message}); | ||
| } else { | ||
| throw e; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| let method: keyof EngineApiRpcReturnTypes; | ||
| switch (fork) { | ||
| case ForkName.phase0: | ||
|
|
@@ -500,6 +658,28 @@ export class ExecutionEngineHttp implements IExecutionEngine { | |
| versionedHashes: VersionedHashes | ||
| ): Promise<BlobAndProofV2[] | (BlobAndProof | null)[] | null> { | ||
| assertReqSizeLimit(versionedHashes.length, MAX_VERSIONED_HASHES); | ||
|
|
||
| // EIP-8161: Try SSZ-REST first for getBlobs, fall back to JSON-RPC on network errors | ||
| if (this.sszRestClient) { | ||
| try { | ||
| const version = isForkPostFulu(fork) ? 2 : 1; | ||
| const path = `/engine/v${version}/blobs`; | ||
| const body = encodeGetBlobsRequest(versionedHashes); | ||
| const resp = await this.sszRestClient.doRequest(path, body); | ||
| const decoded = decodeGetBlobsResponse(resp); | ||
| return decoded.map((item) => ({ | ||
| blob: item.blob, | ||
| proof: item.kzgProof, | ||
|
Comment on lines
+670
to
+672
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
When Useful? React with 👍 / 👎. |
||
| })); | ||
| } catch (e) { | ||
| if (isSszRestNetworkError(e)) { | ||
| this.logger.debug("SSZ-REST getBlobs failed, falling back to JSON-RPC", {error: (e as Error).message}); | ||
| } else { | ||
| throw e; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| const versionedHashesHex = versionedHashes.map(bytesToData); | ||
| if (isForkPostFulu(fork)) { | ||
| return await this.getBlobsV2(versionedHashesHex); | ||
|
|
@@ -629,6 +809,32 @@ export class ExecutionEngineHttp implements IExecutionEngine { | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * Deserialize an ExecutionPayload from SSZ bytes using the appropriate | ||
| * @lodestar/types codec for the given fork. Used by the SSZ-REST getPayload path. | ||
| */ | ||
| function deserializeExecutionPayloadSsz(fork: ForkName, data: Uint8Array): ExecutionPayload { | ||
| const forkSeq = ForkSeq[fork]; | ||
| if (forkSeq >= ForkSeq.electra) { | ||
| return sszCodecs.electra.ExecutionPayload.deserialize(data); | ||
| } | ||
| if (forkSeq >= ForkSeq.deneb) { | ||
| return sszCodecs.deneb.ExecutionPayload.deserialize(data); | ||
| } | ||
| if (forkSeq >= ForkSeq.capella) { | ||
| return sszCodecs.capella.ExecutionPayload.deserialize(data); | ||
| } | ||
| return sszCodecs.bellatrix.ExecutionPayload.deserialize(data); | ||
| } | ||
|
|
||
| function deserializeBlobsBundleSsz(data: Uint8Array): BlobsBundle { | ||
| return sszCodecs.deneb.BlobsBundle.deserialize(data); | ||
| } | ||
|
|
||
| function deserializeExecutionRequestsSsz(data: Uint8Array): ExecutionRequests { | ||
| return sszCodecs.electra.ExecutionRequests.deserialize(data); | ||
| } | ||
|
|
||
| type EngineRequestKey = keyof EngineApiRpcParamTypes; | ||
| type EngineRequestByKey = { | ||
| [K in EngineRequestKey]: {method: K; params: EngineApiRpcParamTypes[K]; methodOpts: ReqOpts}; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This version selection logic is duplicated in the
getPayloadmethod (lines 554-555). To improve maintainability and adhere to the DRY (Don't Repeat Yourself) principle, consider extracting this logic into a dedicated helper function. For example:You can then call this function here and in
getPayload.