Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ export class EnsDbWriterWorker {
*/
private indexingStatusInterval: ReturnType<typeof setInterval> | null = null;

/**
* Tracks the most recently launched snapshot upsert so that {@link stop}
* can wait for any in-flight work to settle before returning.
*/
private inFlightSnapshot: Promise<unknown> | undefined;

Comment thread
shrugs marked this conversation as resolved.
/**
* ENSDb Client instance used by the worker to interact with ENSDb.
*/
Expand Down Expand Up @@ -121,10 +127,9 @@ export class EnsDbWriterWorker {
});

// Task 3: recurring upsert of Indexing Status Snapshot into ENSDb.
this.indexingStatusInterval = setInterval(
() => this.upsertIndexingStatusSnapshot(),
secondsToMilliseconds(INDEXING_STATUS_RECORD_UPDATE_INTERVAL),
);
this.indexingStatusInterval = setInterval(() => {
this.inFlightSnapshot = this.upsertIndexingStatusSnapshot();
}, secondsToMilliseconds(INDEXING_STATUS_RECORD_UPDATE_INTERVAL));
Comment thread
shrugs marked this conversation as resolved.
Comment thread
shrugs marked this conversation as resolved.
}

/**
Expand All @@ -137,13 +142,19 @@ export class EnsDbWriterWorker {
/**
* Stop the ENSDb Writer Worker
*
* Stops all recurring tasks in the worker.
* Stops all recurring tasks in the worker and waits for any in-flight
* snapshot upsert to settle. Safe to call when not running.
*/
public stop(): void {
public async stop(): Promise<void> {
if (this.indexingStatusInterval) {
clearInterval(this.indexingStatusInterval);
this.indexingStatusInterval = null;
}
if (this.inFlightSnapshot) {
// Errors are already logged inside upsertIndexingStatusSnapshot; swallow here.
await this.inFlightSnapshot.catch(() => {});
this.inFlightSnapshot = undefined;
Comment thread
shrugs marked this conversation as resolved.
}
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

/**
Expand Down
74 changes: 60 additions & 14 deletions apps/ensindexer/src/lib/ensdb-writer-worker/singleton.ts
Original file line number Diff line number Diff line change
@@ -1,40 +1,86 @@
import { ensDbClient } from "@/lib/ensdb/singleton";
import { indexingStatusBuilder } from "@/lib/indexing-status-builder/singleton";
import { localPonderClient } from "@/lib/local-ponder-client";
import { localPonderContext } from "@/lib/local-ponder-context";
import { logger } from "@/lib/logger";
import { publicConfigBuilder } from "@/lib/public-config-builder/singleton";

import { EnsDbWriterWorker } from "./ensdb-writer-worker";

let ensDbWriterWorker: EnsDbWriterWorker;
let ensDbWriterWorker: EnsDbWriterWorker | undefined;

function isAbortError(error: unknown): boolean {
return error instanceof Error && error.name === "AbortError";
}

/**
* Starts the EnsDbWriterWorker in a new asynchronous context.
* Start (or restart) the EnsDbWriterWorker.
*
* The worker will run indefinitely until it is stopped via {@link EnsDbWriterWorker.stop},
* for example in response to a process termination signal or an internal error, at
* which point it will attempt to gracefully shut down.
* Called from `apps/ensindexer/ponder/src/api/index.ts` on every Ponder
* API exec. Ponder re-executes the API entry file on hot reload, but this
* module is cached by vite-node, so module-level state survives across
* reloads. This function therefore must:
*
* @throws Error if the worker is already running when this function is called.
* 1. Be idempotent — treat a re-call as "the previous instance is dead,
* replace it" rather than throwing.
* 2. Re-bind reload-scoped resources (e.g. `apiShutdown`) fresh from
* `localPonderContext` on every call. Never hoist them to module
* scope. See `local-ponder-context.ts` for the staleness contract.
*/
export function startEnsDbWriterWorker() {
if (typeof ensDbWriterWorker !== "undefined") {
throw new Error("EnsDbWriterWorker has already been initialized");
export async function startEnsDbWriterWorker(): Promise<void> {
// Defensively reset any prior instance. The apiShutdown.add() callback
// from the previous API exec is the primary cleanup path on hot reload;
// this is a safety net for cases where the callback didn't run (e.g.
// unexpected shutdown ordering).
if (ensDbWriterWorker) {
await ensDbWriterWorker.stop();
ensDbWriterWorker = undefined;
}

ensDbWriterWorker = new EnsDbWriterWorker(
const worker = new EnsDbWriterWorker(
ensDbClient,
publicConfigBuilder,
indexingStatusBuilder,
localPonderClient,
);
ensDbWriterWorker = worker;

// Read apiShutdown FRESH from the reactive context. Ponder kills and
// replaces this on every dev-mode hot reload, so this read MUST happen
// inside the function call (not at module scope).
const apiShutdown = localPonderContext.apiShutdown;

ensDbWriterWorker
apiShutdown.add(async () => {
logger.info({
msg: "Stopping EnsDbWriterWorker due to API shutdown",
module: "EnsDbWriterWorker",
});
await worker.stop();
if (ensDbWriterWorker === worker) {
ensDbWriterWorker = undefined;
}
});

worker
.run()
// Handle any uncaught errors from the worker
.catch((error) => {
// Abort the worker on error to trigger cleanup
ensDbWriterWorker.stop();
.catch(async (error) => {
// If Ponder has begun shutting down our API instance (hot reload or
// graceful shutdown), the abort propagates through in-flight fetches
// as an AbortError. Treat that as a clean stop, not a worker failure.
if (apiShutdown.abortController.signal.aborted || isAbortError(error)) {
logger.info({
msg: "EnsDbWriterWorker stopped due to API shutdown",
module: "EnsDbWriterWorker",
});
return;
}

// Real worker error — clean up and trigger non-zero exit.
await worker.stop();
if (ensDbWriterWorker === worker) {
ensDbWriterWorker = undefined;
}

logger.error({
msg: "EnsDbWriterWorker encountered an error",
Expand Down
3 changes: 3 additions & 0 deletions apps/ensindexer/src/lib/local-ponder-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,7 @@ export const localPonderClient = new LocalPonderClient(
indexedBlockranges,
publicClients,
localPonderContext,
// Reload-scoped: read fresh on every fetch via the reactive proxy. See
// local-ponder-context.ts for the staleness contract.
() => localPonderContext.apiShutdown.abortController.signal,
);
97 changes: 92 additions & 5 deletions apps/ensindexer/src/lib/local-ponder-context.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,101 @@
import { deserializePonderAppContext, type PonderAppContext } from "@ensnode/ponder-sdk";

/**
* Local Ponder Context — reactive wrapper over Ponder's runtime globals.
*
* Why this is a Proxy and not an eagerly-deserialized object:
*
* Ponder's dev mode hot-reloads the API entry file by re-executing it via
* vite-node. On every indexing-file change, Ponder ALSO kills and replaces
* `common.shutdown` and `common.apiShutdown` on `globalThis.PONDER_COMMON`
* (see `ponder/src/bin/commands/dev.ts:95-101`). Modules in our API-side
* dependency graph (this file included) are NOT re-evaluated when only an
* indexing file changes — vite-node only invalidates the changed file's
* dep tree. So any value cached in a module-level closure during the
* original boot becomes stale on the very next reload.
*
* Stable fields (`command`, `localPonderAppUrl`, `logger`) are validated
* once and memoized — Ponder does not mutate `options` or `logger` on
* reload. Reload-scoped fields (`apiShutdown`, `shutdown`) MUST be re-read
* from `globalThis.PONDER_COMMON` on every access.
*
* Contract for callers: NEVER cache reload-scoped fields in a module-level
* closure or capture them in a constructor argument. Always reach for them
* via `localPonderContext.<field>` from code that runs per-reload (e.g. the
* API entry file or per-request handlers). If you need an `AbortSignal`
* across calls, store a getter (`() => localPonderContext.apiShutdown
* .abortController.signal`), not the signal itself.
*/

if (!globalThis.PONDER_COMMON) {
throw new Error("PONDER_COMMON must be defined by Ponder at runtime as a global variable.");
}

/**
* Local Ponder app context
* Ponder shutdown manager runtime shape.
*
* Represents the {@link PonderAppContext} object provided by Ponder runtime to
* the local Ponder app. Useful for accessing internal Ponder app configuration
* and utilities such as the logger.
* Mirrors `ponder/src/internal/shutdown.ts` — the object Ponder publishes
* on `globalThis.PONDER_COMMON.{shutdown,apiShutdown}`.
*/
export const localPonderContext = deserializePonderAppContext(globalThis.PONDER_COMMON);
export interface PonderAppShutdownManager {
add: (callback: () => undefined | Promise<unknown>) => void;
isKilled: boolean;
abortController: AbortController;
}

function isPonderAppShutdownManager(value: unknown): value is PonderAppShutdownManager {
if (typeof value !== "object" || value === null) return false;
const obj = value as Record<string, unknown>;
return (
typeof obj.add === "function" &&
typeof obj.isKilled === "boolean" &&
obj.abortController instanceof AbortController
);
}

function readShutdownManager(field: "apiShutdown" | "shutdown"): PonderAppShutdownManager {
const raw = (globalThis.PONDER_COMMON as Record<string, unknown> | undefined)?.[field];
if (!isPonderAppShutdownManager(raw)) {
throw new Error(`globalThis.PONDER_COMMON.${field} is not a valid Ponder shutdown manager.`);
}
return raw;
}

let cachedStableContext: PonderAppContext | undefined;
function getStableContext(): PonderAppContext {
if (!cachedStableContext) {
if (!globalThis.PONDER_COMMON) {
throw new Error("PONDER_COMMON must be defined by Ponder at runtime as a global variable.");
}
cachedStableContext = deserializePonderAppContext(globalThis.PONDER_COMMON);
}
return cachedStableContext;
}

/**
* Local Ponder Context.
*
* Combines stable {@link PonderAppContext} fields with reload-scoped
* shutdown managers. See module-level comment for the staleness contract.
*/
export interface LocalPonderContext extends PonderAppContext {
/**
* The current `apiShutdown` manager. RELOAD-SCOPED — identity changes
* every API hot-reload. Always read fresh; never cache.
*/
readonly apiShutdown: PonderAppShutdownManager;

/**
* The current `shutdown` manager. RELOAD-SCOPED — identity changes
* every indexing hot-reload. Always read fresh; never cache.
*/
readonly shutdown: PonderAppShutdownManager;
}

export const localPonderContext: LocalPonderContext = new Proxy({} as LocalPonderContext, {
get(_target, prop) {
if (prop === "apiShutdown") return readShutdownManager("apiShutdown");
if (prop === "shutdown") return readShutdownManager("shutdown");
return getStableContext()[prop as keyof PonderAppContext];
},
});
18 changes: 14 additions & 4 deletions packages/ponder-sdk/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,19 @@ import type { PonderIndexingStatus } from "./indexing-status";

/**
* PonderClient for fetching data from Ponder apps.
*
* The optional `getAbortSignal` is invoked at fetch time so each request
* uses the current `AbortSignal`. Passing a getter (instead of a captured
* `AbortSignal`) is required for consumers that need to track signals
* which change identity over the client's lifetime — e.g. signals derived
* from Ponder's `apiShutdown` manager, which Ponder kills and replaces on
* every dev-mode hot reload.
*/
export class PonderClient {
constructor(private readonly baseUrl: URL) {}
constructor(
private readonly baseUrl: URL,
private readonly getAbortSignal?: () => AbortSignal | undefined,
) {}
Comment thread
shrugs marked this conversation as resolved.
Comment thread
vercel[bot] marked this conversation as resolved.

/**
* Check Ponder Health
Expand All @@ -18,7 +28,7 @@ export class PonderClient {
*/
async health(): Promise<void> {
const requestUrl = new URL("/health", this.baseUrl);
const response = await fetch(requestUrl);
const response = await fetch(requestUrl, { signal: this.getAbortSignal?.() });

if (!response.ok) {
throw new Error(
Expand All @@ -35,7 +45,7 @@ export class PonderClient {
*/
async metrics(): Promise<PonderIndexingMetrics> {
const requestUrl = new URL("/metrics", this.baseUrl);
const response = await fetch(requestUrl);
const response = await fetch(requestUrl, { signal: this.getAbortSignal?.() });

if (!response.ok) {
throw new Error(
Expand All @@ -56,7 +66,7 @@ export class PonderClient {
*/
async status(): Promise<PonderIndexingStatus> {
const requestUrl = new URL("/status", this.baseUrl);
const response = await fetch(requestUrl);
const response = await fetch(requestUrl, { signal: this.getAbortSignal?.() });

if (!response.ok) {
throw new Error(
Expand Down
4 changes: 3 additions & 1 deletion packages/ponder-sdk/src/local-ponder-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,16 @@ export class LocalPonderClient extends PonderClient {
* @param ponderPublicClients All cached public clients provided by the local Ponder app
* (may include non-indexed chains).
* @param ponderAppContext The internal context of the local Ponder app.
* @param getAbortSignal Optional getter invoked at fetch time to attach an `AbortSignal` to outgoing HTTP requests. Use a getter (not a captured signal) when the underlying signal can change identity over time — e.g. across Ponder dev-mode hot reloads.
*/
constructor(
indexedChainIds: Set<ChainId>,
indexedBlockranges: Map<ChainId, BlockNumberRangeWithStartBlock>,
ponderPublicClients: Record<ChainIdString, CachedPublicClient>,
ponderAppContext: PonderAppContext,
getAbortSignal?: () => AbortSignal | undefined,
) {
super(ponderAppContext.localPonderAppUrl);
super(ponderAppContext.localPonderAppUrl, getAbortSignal);

this.indexedChainIds = indexedChainIds;

Expand Down
Loading