diff --git a/packages/appkit/src/plugins/analytics/analytics.ts b/packages/appkit/src/plugins/analytics/analytics.ts index b3a268eb..61be56c6 100644 --- a/packages/appkit/src/plugins/analytics/analytics.ts +++ b/packages/appkit/src/plugins/analytics/analytics.ts @@ -60,7 +60,15 @@ export class AnalyticsPlugin extends Plugin implements ToolProvider { * the bytes through the existing `/arrow-result/:jobId` endpoint with * a real binary content-type. */ - protected inlineArrowStash: InlineArrowStash = new InlineArrowStash(); + // Short put-wait so that a momentarily full stash backpressures rather + // than immediately falling back to EXTERNAL_LINKS — important on + // warehouses (e.g. Reyden) that refuse EXTERNAL_LINKS outright. The + // stash is drain-on-read, so an in-flight `/arrow-result` GET from any + // concurrent query usually frees a slot well within this window. True + // sustained overload still falls back via the existing path below. + protected inlineArrowStash: InlineArrowStash = new InlineArrowStash({ + putWaitMs: 500, + }); constructor(config: IAnalyticsConfig) { super(config); @@ -327,7 +335,8 @@ export class AnalyticsPlugin extends Plugin implements ToolProvider { * `JSON_ARRAY` contract is preserved. * - **ARROW_STREAM** first tries `INLINE + ARROW_STREAM`. If the * warehouse refuses (most classic + some serverless variants), or the - * inline stash is full, falls back to `EXTERNAL_LINKS + ARROW_STREAM`. + * inline stash is full after a brief backpressure wait, falls back to + * `EXTERNAL_LINKS + ARROW_STREAM`. * * INLINE Arrow attachments under the ARROW_STREAM path are decoded once * and put on the plugin's `inlineArrowStash`; the SSE message carries the @@ -406,21 +415,24 @@ export class AnalyticsPlugin extends Plugin implements ToolProvider { throw ExecutionError.canceled(); } const decoded = Buffer.from(result.attachment, "base64"); - const inlineId = this.inlineArrowStash.put( + const inlineId = await this.inlineArrowStash.putBlocking( stashUserKey, new Uint8Array( decoded.buffer, decoded.byteOffset, decoded.byteLength, ), + signal, ); if (inlineId === null) { - // Stash is full — every id we have already handed out must - // stay valid, so the stash refuses new entries rather than - // evicting in-flight ones. Fall back to EXTERNAL_LINKS for - // this request so the client still gets its result. + // Stash is full even after the put-wait elapsed — every id we + // have already handed out must stay valid, so the stash refuses + // new entries rather than evicting in-flight ones. Fall back to + // EXTERNAL_LINKS for this request so the client still gets its + // result. On warehouses that refuse EXTERNAL_LINKS (e.g. Reyden) + // the executor will surface NOT_IMPLEMENTED to the caller. logger.warn( - "Inline Arrow stash full, falling back to EXTERNAL_LINKS for the current query", + "Inline Arrow stash full after put-wait, falling back to EXTERNAL_LINKS for the current query", ); } else { return makeArrowMessage(inlineId, { status: result.status }); diff --git a/packages/appkit/src/plugins/analytics/inline-arrow-stash.ts b/packages/appkit/src/plugins/analytics/inline-arrow-stash.ts index 3ae98330..10c8c78f 100644 --- a/packages/appkit/src/plugins/analytics/inline-arrow-stash.ts +++ b/packages/appkit/src/plugins/analytics/inline-arrow-stash.ts @@ -28,6 +28,14 @@ import { randomUUID } from "node:crypto"; * stays valid until it is drained, expires, or the process exits. * Callers are expected to fall back to a different delivery path (e.g. * EXTERNAL_LINKS) when `put()` rejects. + * - **Optional backpressure** (`putBlocking()`): rather than rejecting + * immediately when full, wait FIFO for up to `putWaitMs` for an existing + * entry to drain before retrying. The stash is drain-on-read with a + * short TTL, so on warehouses where `INLINE + ARROW_STREAM` is the + * accepted path (e.g. Reyden, which refuses `EXTERNAL_LINKS`), a brief + * wait almost always frees a slot from the in-flight `/arrow-result` + * GET that any concurrent query is about to issue. Callers can keep the + * EXTERNAL_LINKS fallback for true sustained overload. * * Caveat (multi-replica deployments): this stash is process-local. A * subsequent `GET /arrow-result/inline-*` that lands on a different @@ -45,6 +53,12 @@ interface InlineArrowStashOptions { * evicted to fit new ones. */ maxBytes?: number; + /** + * Max time `putBlocking()` waits for an existing entry to drain when + * the stash is full. Defaults to 0 — i.e. `putBlocking()` behaves like + * the synchronous `put()`. Synchronous `put()` itself never waits. + */ + putWaitMs?: number; /** Test seam: override the synthetic-id generator. */ idGenerator?: () => string; /** Test seam: override the clock. */ @@ -58,17 +72,25 @@ interface StashEntry { insertedAt: number; } +interface Waiter { + needed: number; + wake: () => void; +} + export class InlineArrowStash { private entries = new Map(); private totalBytes = 0; + private waiters: Waiter[] = []; private readonly ttlMs: number; private readonly maxBytes: number; + private readonly putWaitMs: number; private readonly idGenerator: () => string; private readonly now: () => number; constructor(opts: InlineArrowStashOptions = {}) { this.ttlMs = opts.ttlMs ?? 10 * 60 * 1000; this.maxBytes = opts.maxBytes ?? 256 * 1024 * 1024; + this.putWaitMs = opts.putWaitMs ?? 0; this.idGenerator = opts.idGenerator ?? randomUUID; this.now = opts.now ?? Date.now; } @@ -107,6 +129,53 @@ export class InlineArrowStash { return id; } + /** + * Like `put()` but, when the stash is full, waits up to `putWaitMs` + * for an existing entry to drain (via `take()` or TTL eviction) before + * giving up. Returns the synthetic id on success, or `null` when the + * wait elapses without a slot freeing — at which point the caller + * should fall back to its out-of-band delivery path. + * + * Wakes happen FIFO: the head waiter is satisfied first. If `signal` + * aborts before a slot frees, the wait resolves with `null` and the + * waiter drops out of the queue without consuming the next free slot. + */ + async putBlocking( + userId: string, + bytes: Uint8Array, + signal?: AbortSignal, + ): Promise { + const immediate = this.put(userId, bytes); + if (immediate !== null) return immediate; + if (this.putWaitMs <= 0 || signal?.aborted) return null; + + return new Promise((resolve) => { + let settled = false; + const settle = (value: string | null) => { + if (settled) return; + settled = true; + const idx = this.waiters.indexOf(waiter); + if (idx >= 0) this.waiters.splice(idx, 1); + clearTimeout(timer); + if (signal) signal.removeEventListener("abort", onAbort); + resolve(value); + }; + const wake = () => { + if (settled) return; + // wakeWaiters() has confirmed totalBytes + needed <= maxBytes; + // put() should succeed. The retry guards against an unlikely + // concurrent gc()/take() shuffle. + const id = this.put(userId, bytes); + settle(id); + }; + const onAbort = () => settle(null); + const waiter: Waiter = { needed: bytes.length, wake }; + this.waiters.push(waiter); + const timer = setTimeout(() => settle(null), this.putWaitMs); + if (signal) signal.addEventListener("abort", onAbort, { once: true }); + }); + } + /** * Drain a payload from the stash. Returns `undefined` if the id is * unknown, expired, or belongs to a different user. @@ -118,6 +187,7 @@ export class InlineArrowStash { if (entry.userId !== userId) return undefined; this.entries.delete(id); this.totalBytes -= entry.bytes.length; + this.wakeWaiters(); return entry.bytes; } @@ -133,15 +203,34 @@ export class InlineArrowStash { clear(): void { this.entries.clear(); this.totalBytes = 0; + this.wakeWaiters(); } private gc(): void { const now = this.now(); + let freed = false; for (const [id, entry] of this.entries) { if (entry.expiresAt <= now) { this.entries.delete(id); this.totalBytes -= entry.bytes.length; + freed = true; } } + if (freed) this.wakeWaiters(); + } + + /** + * FIFO drain of the wait queue. Walks from head until either the queue + * is empty or the head waiter does not fit. The head is shifted off + * BEFORE wake() runs so that any re-entry from gc()/put() inside the + * waiter's put attempt cannot pick up the same waiter again. + */ + private wakeWaiters(): void { + while (this.waiters.length > 0) { + const head = this.waiters[0]; + if (this.totalBytes + head.needed > this.maxBytes) return; + this.waiters.shift(); + head.wake(); + } } } diff --git a/packages/appkit/src/plugins/analytics/tests/analytics.test.ts b/packages/appkit/src/plugins/analytics/tests/analytics.test.ts index 0683c44c..59dbffb2 100644 --- a/packages/appkit/src/plugins/analytics/tests/analytics.test.ts +++ b/packages/appkit/src/plugins/analytics/tests/analytics.test.ts @@ -1057,8 +1057,13 @@ describe("Analytics Plugin", () => { }); (plugin as any).SQLClient.executeStatement = executeMock; - // Force the stash to reject the put — simulates capacity exhaustion. - vi.spyOn((plugin as any).inlineArrowStash, "put").mockReturnValue(null); + // Force the stash to reject the put — simulates capacity exhaustion + // after the put-wait elapses. Stubbing putBlocking() directly skips + // the real timer; the production call site goes through putBlocking. + vi.spyOn( + (plugin as any).inlineArrowStash, + "putBlocking", + ).mockResolvedValue(null); plugin.injectRoutes(router); diff --git a/packages/appkit/src/plugins/analytics/tests/inline-arrow-stash.test.ts b/packages/appkit/src/plugins/analytics/tests/inline-arrow-stash.test.ts index 9bd59813..9ab21c6b 100644 --- a/packages/appkit/src/plugins/analytics/tests/inline-arrow-stash.test.ts +++ b/packages/appkit/src/plugins/analytics/tests/inline-arrow-stash.test.ts @@ -113,4 +113,124 @@ describe("InlineArrowStash", () => { expect(stash.count()).toBe(0); expect(stash.size()).toBe(0); }); + + describe("putBlocking backpressure", () => { + test("succeeds immediately when capacity is available", async () => { + const stash = new InlineArrowStash({ + putWaitMs: 50, + idGenerator: () => "x", + }); + const id = await stash.putBlocking("user-1", bytes(10)); + expect(id).toBe("inline-x"); + }); + + test("waits for a take() to free a slot, then succeeds", async () => { + let seq = 0; + const stash = new InlineArrowStash({ + maxBytes: 100, + putWaitMs: 500, + idGenerator: () => String(seq++), + }); + const a = mustPut(stash, "user-1", bytes(80)); + mustPut(stash, "user-1", bytes(20)); + expect(stash.size()).toBe(100); + + // 50 bytes won't fit until something drains. + const pending = stash.putBlocking("user-1", bytes(50)); + // Drain the 80-byte entry → frees room for 50. + stash.take(a, "user-1"); + + const id = await pending; + expect(id).not.toBeNull(); + expect(stash.size()).toBe(70); // 20 left over + 50 just inserted + }); + + test("returns null when the wait elapses without a slot freeing", async () => { + const stash = new InlineArrowStash({ + maxBytes: 100, + putWaitMs: 20, + }); + mustPut(stash, "user-1", bytes(100)); + const t0 = Date.now(); + const id = await stash.putBlocking("user-1", bytes(50)); + const elapsed = Date.now() - t0; + expect(id).toBeNull(); + expect(elapsed).toBeGreaterThanOrEqual(15); + }); + + test("preserves FIFO order across waiters", async () => { + let seq = 0; + const stash = new InlineArrowStash({ + maxBytes: 100, + putWaitMs: 500, + idGenerator: () => String(seq++), + }); + const a = mustPut(stash, "user-1", bytes(100)); + + // A1 needs 60, A2 needs 30, both wait. + const a1 = stash.putBlocking("user-1", bytes(60)); + const a2 = stash.putBlocking("user-1", bytes(30)); + + stash.take(a, "user-1"); // frees 100 → both fit + const [id1, id2] = await Promise.all([a1, a2]); + expect(id1).not.toBeNull(); + expect(id2).not.toBeNull(); + // Order of issued ids matches submission order. + expect(Number(id1!.replace("inline-", ""))).toBeLessThan( + Number(id2!.replace("inline-", "")), + ); + }); + + test("rejects later waiters when head consumes the freed capacity", async () => { + let seq = 0; + const stash = new InlineArrowStash({ + maxBytes: 100, + putWaitMs: 30, + idGenerator: () => String(seq++), + }); + const a = mustPut(stash, "user-1", bytes(100)); + + const a1 = stash.putBlocking("user-1", bytes(80)); + const a2 = stash.putBlocking("user-1", bytes(80)); + + stash.take(a, "user-1"); // 100 free → only a1 (80) fits + const [id1, id2] = await Promise.all([a1, a2]); + expect(id1).not.toBeNull(); + // a2 was still queued, gets evicted on timeout + expect(id2).toBeNull(); + }); + + test("settles with null when signal aborts mid-wait", async () => { + const stash = new InlineArrowStash({ + maxBytes: 100, + putWaitMs: 5000, + }); + mustPut(stash, "user-1", bytes(100)); + const ac = new AbortController(); + const pending = stash.putBlocking("user-1", bytes(50), ac.signal); + ac.abort(); + expect(await pending).toBeNull(); + }); + + test("pre-aborted signal short-circuits", async () => { + const stash = new InlineArrowStash({ + maxBytes: 100, + putWaitMs: 5000, + }); + mustPut(stash, "user-1", bytes(100)); + const id = await stash.putBlocking( + "user-1", + bytes(50), + AbortSignal.abort(), + ); + expect(id).toBeNull(); + }); + + test("putWaitMs=0 (default) behaves like sync put", async () => { + const stash = new InlineArrowStash({ maxBytes: 100 }); + mustPut(stash, "user-1", bytes(100)); + const id = await stash.putBlocking("user-1", bytes(50)); + expect(id).toBeNull(); + }); + }); });