Skip to content
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/fix-ensindexer-hot-reload.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"ensindexer": patch
"@ensnode/ponder-sdk": patch
---

ENSIndexer in dev mode no longer crashes during hot reloading due to EnsDbWriterWorker failure.
Comment thread
shrugs marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ describe("EnsDbWriterWorker", () => {
);

// cleanup
worker.stop();
await worker.stop();
});

it("throws when stored config is incompatible", async () => {
Expand Down Expand Up @@ -129,7 +129,7 @@ describe("EnsDbWriterWorker", () => {
expect(ensDbClient.upsertEnsIndexerPublicConfig).toHaveBeenCalledWith(mockPublicConfig);

// cleanup
worker.stop();
await worker.stop();
});

it("throws error when worker is already running", async () => {
Expand All @@ -143,7 +143,7 @@ describe("EnsDbWriterWorker", () => {
await expect(worker.run()).rejects.toThrow("EnsDbWriterWorker is already running");

// cleanup
worker.stop();
await worker.stop();
});

it("throws error when config fetch fails", async () => {
Expand Down Expand Up @@ -193,7 +193,7 @@ describe("EnsDbWriterWorker", () => {
expect(publicConfigBuilder.getPublicConfig).toHaveBeenCalledTimes(1);

// cleanup
worker.stop();
await worker.stop();
});

it("calls pRetry for config fetch with retry logic", async () => {
Expand All @@ -213,7 +213,7 @@ describe("EnsDbWriterWorker", () => {
expect(ensDbClient.upsertEnsIndexerPublicConfig).toHaveBeenCalledWith(mockPublicConfig);

// cleanup
worker.stop();
await worker.stop();
});
});

Expand All @@ -230,7 +230,7 @@ describe("EnsDbWriterWorker", () => {

const callCountBeforeStop = upsertIndexingStatusSnapshot.mock.calls.length;

worker.stop();
await worker.stop();

// advance time after stop
await vi.advanceTimersByTimeAsync(2000);
Expand All @@ -255,7 +255,7 @@ describe("EnsDbWriterWorker", () => {
expect(worker.isRunning).toBe(true);

// act - stop worker
worker.stop();
await worker.stop();

// assert - not running after stop
expect(worker.isRunning).toBe(false);
Expand Down Expand Up @@ -303,7 +303,7 @@ describe("EnsDbWriterWorker", () => {
expect(ensDbClient.upsertIndexingStatusSnapshot).toHaveBeenCalledWith(crossChainSnapshot);

// cleanup
worker.stop();
await worker.stop();
});

it("recovers from errors and continues upserting snapshots", async () => {
Expand Down Expand Up @@ -361,7 +361,7 @@ describe("EnsDbWriterWorker", () => {
expect(ensDbClient.upsertIndexingStatusSnapshot).toHaveBeenCalledTimes(3);

// cleanup
worker.stop();
await worker.stop();
});
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ export class EnsDbWriterWorker {
*/
private indexingStatusInterval: ReturnType<typeof setInterval> | null = null;

/**
* Tracks the most recently launched snapshot upsert so that {@link stop}
* can wait for any in-flight work to settle before returning.
*/
private inFlightSnapshot: Promise<unknown> | undefined;

Comment thread
shrugs marked this conversation as resolved.
/**
* ENSDb Client instance used by the worker to interact with ENSDb.
*/
Expand Down Expand Up @@ -87,22 +93,32 @@ export class EnsDbWriterWorker {
* 3) A recurring attempt to upsert serialized representation of
* {@link CrossChainIndexingStatusSnapshot} into ENSDb.
*
* @param signal Optional AbortSignal that, if aborted, causes `run` to bail
* between its async setup steps. Use this to prevent the worker
* from finishing initialization (and starting the recurring
* interval) after the surrounding API instance has begun
* shutting down.
* @throws Error if the worker is already running, or
* if the in-memory ENSIndexer Public Config could not be fetched, or
* if the in-memory ENSIndexer Public Config is incompatible with the stored config in ENSDb.
* if the in-memory ENSIndexer Public Config is incompatible with the stored config in ENSDb, or
* if `signal` is aborted before the recurring interval is scheduled.
Comment thread
shrugs marked this conversation as resolved.
Outdated
*/
public async run(): Promise<void> {
public async run(signal?: AbortSignal): Promise<void> {
// Do not allow multiple concurrent runs of the worker
if (this.isRunning) {
throw new Error("EnsDbWriterWorker is already running");
}

signal?.throwIfAborted();

// Fetch data required for task 1 and task 2.
const inMemoryConfig = await this.getValidatedEnsIndexerPublicConfig();
signal?.throwIfAborted();

// Task 1: upsert ENSDb version into ENSDb.
logger.debug({ msg: "Upserting ENSDb version", module: "EnsDbWriterWorker" });
await this.ensDbClient.upsertEnsDbVersion(inMemoryConfig.versionInfo.ensDb);
signal?.throwIfAborted();
logger.info({
msg: "Upserted ENSDb version",
ensDbVersion: inMemoryConfig.versionInfo.ensDb,
Expand All @@ -115,16 +131,22 @@ export class EnsDbWriterWorker {
module: "EnsDbWriterWorker",
});
await this.ensDbClient.upsertEnsIndexerPublicConfig(inMemoryConfig);
signal?.throwIfAborted();
logger.info({
msg: "Upserted ENSIndexer public config",
module: "EnsDbWriterWorker",
});

// Task 3: recurring upsert of Indexing Status Snapshot into ENSDb.
this.indexingStatusInterval = setInterval(
() => this.upsertIndexingStatusSnapshot(),
secondsToMilliseconds(INDEXING_STATUS_RECORD_UPDATE_INTERVAL),
);
// Skip overlapping ticks so a slow upsert can't pile up concurrent
// ENSDb writes. With skip-overlap there is at most one in-flight
// upsert at a time, which `stop()` then has a single promise to await.
this.indexingStatusInterval = setInterval(() => {
if (this.inFlightSnapshot) return;
this.inFlightSnapshot = this.upsertIndexingStatusSnapshot().finally(() => {
this.inFlightSnapshot = undefined;
});
}, secondsToMilliseconds(INDEXING_STATUS_RECORD_UPDATE_INTERVAL));
Comment thread
shrugs marked this conversation as resolved.
Comment thread
shrugs marked this conversation as resolved.
}

/**
Expand All @@ -137,13 +159,19 @@ export class EnsDbWriterWorker {
/**
* Stop the ENSDb Writer Worker
*
* Stops all recurring tasks in the worker.
* Stops all recurring tasks in the worker and waits for any in-flight
* snapshot upsert to settle. Safe to call when not running.
*/
public stop(): void {
public async stop(): Promise<void> {
if (this.indexingStatusInterval) {
clearInterval(this.indexingStatusInterval);
this.indexingStatusInterval = null;
}
if (this.inFlightSnapshot) {
// Errors are already logged inside upsertIndexingStatusSnapshot; swallow here.
await this.inFlightSnapshot.catch(() => {});
this.inFlightSnapshot = undefined;
Comment thread
shrugs marked this conversation as resolved.
}
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

/**
Expand Down
91 changes: 74 additions & 17 deletions apps/ensindexer/src/lib/ensdb-writer-worker/singleton.ts
Original file line number Diff line number Diff line change
@@ -1,48 +1,105 @@
import { ensDbClient } from "@/lib/ensdb/singleton";
import { indexingStatusBuilder } from "@/lib/indexing-status-builder/singleton";
import { localPonderClient } from "@/lib/local-ponder-client";
import { localPonderContext } from "@/lib/local-ponder-context";
import { logger } from "@/lib/logger";
import { publicConfigBuilder } from "@/lib/public-config-builder/singleton";

import { EnsDbWriterWorker } from "./ensdb-writer-worker";

let ensDbWriterWorker: EnsDbWriterWorker;
let ensDbWriterWorker: EnsDbWriterWorker | undefined;

function isAbortError(error: unknown): boolean {
// `fetch` aborts reject with a `DOMException` whose `name === "AbortError"`,
// which is not always `instanceof Error` across runtimes. Check by name.
return (
typeof error === "object" &&
error !== null &&
(error as { name?: unknown }).name === "AbortError"
);
}

/**
* Starts the EnsDbWriterWorker in a new asynchronous context.
* Stop the given worker (if it is still the active singleton) and clear the
* singleton reference. Safe to call multiple times.
*/
async function gracefulShutdown(worker: EnsDbWriterWorker, reason: string): Promise<void> {
logger.info({
msg: `Stopping EnsDbWriterWorker: ${reason}`,
module: "EnsDbWriterWorker",
});
await worker.stop();
if (ensDbWriterWorker === worker) {
ensDbWriterWorker = undefined;
}
Comment thread
shrugs marked this conversation as resolved.
Outdated
}

/**
* Start (or restart) the EnsDbWriterWorker.
*
* The worker will run indefinitely until it is stopped via {@link EnsDbWriterWorker.stop},
* for example in response to a process termination signal or an internal error, at
* which point it will attempt to gracefully shut down.
* Called from `apps/ensindexer/ponder/src/api/index.ts` on every Ponder
* API exec. Ponder re-executes the API entry file on hot reload, but this
* module is cached by vite-node, so module-level state survives across
* reloads. This function therefore must:
*
* @throws Error if the worker is already running when this function is called.
* 1. Be idempotent — treat a re-call as "the previous instance is dead,
* replace it" rather than throwing.
* 2. Re-bind reload-scoped resources (e.g. `apiShutdown`) fresh from
* `localPonderContext` on every call. Never hoist them to module
* scope. See `local-ponder-context.ts` for the staleness contract.
*/
export function startEnsDbWriterWorker() {
if (typeof ensDbWriterWorker !== "undefined") {
throw new Error("EnsDbWriterWorker has already been initialized");
export async function startEnsDbWriterWorker(): Promise<void> {
// Defensively reset any prior instance. The apiShutdown.add() callback
// from the previous API exec is the primary cleanup path on hot reload;
// this is a safety net for cases where the callback didn't run (e.g.
// unexpected shutdown ordering).
if (ensDbWriterWorker) {
await gracefulShutdown(ensDbWriterWorker, "stale instance from previous API exec");
}

ensDbWriterWorker = new EnsDbWriterWorker(
const worker = new EnsDbWriterWorker(
ensDbClient,
publicConfigBuilder,
indexingStatusBuilder,
localPonderClient,
);
ensDbWriterWorker = worker;

// Read apiShutdown FRESH from the reactive context. Ponder kills and
// replaces this on every dev-mode hot reload, so this read MUST happen
// inside the function call (not at module scope).
const apiShutdown = localPonderContext.apiShutdown;
const abortSignal = apiShutdown.abortController.signal;

ensDbWriterWorker
.run()
apiShutdown.add(() => gracefulShutdown(worker, "API shutdown"));

worker
.run(abortSignal)
// Handle any uncaught errors from the worker
.catch((error) => {
// Abort the worker on error to trigger cleanup
ensDbWriterWorker.stop();
.catch(async (error) => {
// Treat as a clean stop only when BOTH the captured shutdown signal
// is aborted AND the error is an AbortError. Either condition alone
// could mask a real failure: a non-AbortError thrown after Ponder
// killed the signal is still a bug worth surfacing, and an
// AbortError without our signal aborted means it came from
// somewhere else (e.g. a reactive-getter race) and shouldn't be
// silently swallowed.
if (abortSignal.aborted && isAbortError(error)) {
Comment thread
shrugs marked this conversation as resolved.
Outdated
await gracefulShutdown(worker, "API shutdown (run aborted)");
return;
}

// Real worker error — clean up and trigger non-zero exit.
await gracefulShutdown(worker, "uncaught error");

logger.error({
msg: "EnsDbWriterWorker encountered an error",
error,
});

// Re-throw the error to ensure the application shuts down with a non-zero exit code.
// Set a non-zero exit code so the process terminates with failure.
// Don't rethrow — this catch handler is on a fire-and-forget promise,
// so a rethrow becomes an unhandled rejection.
process.exitCode = 1;
Comment thread
shrugs marked this conversation as resolved.
Outdated
throw error;
});
}
2 changes: 2 additions & 0 deletions apps/ensindexer/src/lib/local-ponder-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,6 @@ export const localPonderClient = new LocalPonderClient(
indexedBlockranges,
publicClients,
localPonderContext,
// See local-ponder-context.ts for the staleness contract.
() => localPonderContext.apiShutdown.abortController.signal,
);
Loading
Loading