Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 20 additions & 8 deletions packages/appkit/src/plugins/analytics/analytics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,15 @@ export class AnalyticsPlugin extends Plugin implements ToolProvider {
* the bytes through the existing `/arrow-result/:jobId` endpoint with
* a real binary content-type.
*/
protected inlineArrowStash: InlineArrowStash = new InlineArrowStash();
// Short put-wait so that a momentarily full stash backpressures rather
// than immediately falling back to EXTERNAL_LINKS — important on
// warehouses (e.g. Reyden) that refuse EXTERNAL_LINKS outright. The
// stash is drain-on-read, so an in-flight `/arrow-result` GET from any
// concurrent query usually frees a slot well within this window. True
// sustained overload still falls back via the existing path below.
protected inlineArrowStash: InlineArrowStash = new InlineArrowStash({
putWaitMs: 500,
});

constructor(config: IAnalyticsConfig) {
super(config);
Expand Down Expand Up @@ -327,7 +335,8 @@ export class AnalyticsPlugin extends Plugin implements ToolProvider {
* `JSON_ARRAY` contract is preserved.
* - **ARROW_STREAM** first tries `INLINE + ARROW_STREAM`. If the
* warehouse refuses (most classic + some serverless variants), or the
* inline stash is full, falls back to `EXTERNAL_LINKS + ARROW_STREAM`.
* inline stash is full after a brief backpressure wait, falls back to
* `EXTERNAL_LINKS + ARROW_STREAM`.
*
* INLINE Arrow attachments under the ARROW_STREAM path are decoded once
* and put on the plugin's `inlineArrowStash`; the SSE message carries the
Expand Down Expand Up @@ -406,21 +415,24 @@ export class AnalyticsPlugin extends Plugin implements ToolProvider {
throw ExecutionError.canceled();
}
const decoded = Buffer.from(result.attachment, "base64");
const inlineId = this.inlineArrowStash.put(
const inlineId = await this.inlineArrowStash.putBlocking(
stashUserKey,
new Uint8Array(
decoded.buffer,
decoded.byteOffset,
decoded.byteLength,
),
signal,
);
if (inlineId === null) {
// Stash is full — every id we have already handed out must
// stay valid, so the stash refuses new entries rather than
// evicting in-flight ones. Fall back to EXTERNAL_LINKS for
// this request so the client still gets its result.
// Stash is full even after the put-wait elapsed — every id we
// have already handed out must stay valid, so the stash refuses
// new entries rather than evicting in-flight ones. Fall back to
// EXTERNAL_LINKS for this request so the client still gets its
// result. On warehouses that refuse EXTERNAL_LINKS (e.g. Reyden)
// the executor will surface NOT_IMPLEMENTED to the caller.
logger.warn(
"Inline Arrow stash full, falling back to EXTERNAL_LINKS for the current query",
"Inline Arrow stash full after put-wait, falling back to EXTERNAL_LINKS for the current query",
);
} else {
return makeArrowMessage(inlineId, { status: result.status });
Expand Down
89 changes: 89 additions & 0 deletions packages/appkit/src/plugins/analytics/inline-arrow-stash.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ import { randomUUID } from "node:crypto";
* stays valid until it is drained, expires, or the process exits.
* Callers are expected to fall back to a different delivery path (e.g.
* EXTERNAL_LINKS) when `put()` rejects.
* - **Optional backpressure** (`putBlocking()`): rather than rejecting
* immediately when full, wait FIFO for up to `putWaitMs` for an existing
* entry to drain before retrying. The stash is drain-on-read with a
* short TTL, so on warehouses where `INLINE + ARROW_STREAM` is the
* accepted path (e.g. Reyden, which refuses `EXTERNAL_LINKS`), a brief
* wait almost always frees a slot from the in-flight `/arrow-result`
* GET that any concurrent query is about to issue. Callers can keep the
* EXTERNAL_LINKS fallback for true sustained overload.
*
* Caveat (multi-replica deployments): this stash is process-local. A
* subsequent `GET /arrow-result/inline-*` that lands on a different
Expand All @@ -45,6 +53,12 @@ interface InlineArrowStashOptions {
* evicted to fit new ones.
*/
maxBytes?: number;
/**
* Max time `putBlocking()` waits for an existing entry to drain when
* the stash is full. Defaults to 0 — i.e. `putBlocking()` behaves like
* the synchronous `put()`. Synchronous `put()` itself never waits.
*/
putWaitMs?: number;
/** Test seam: override the synthetic-id generator. */
idGenerator?: () => string;
/** Test seam: override the clock. */
Expand All @@ -58,17 +72,25 @@ interface StashEntry {
insertedAt: number;
}

interface Waiter {
needed: number;
wake: () => void;
}

export class InlineArrowStash {
private entries = new Map<string, StashEntry>();
private totalBytes = 0;
private waiters: Waiter[] = [];
private readonly ttlMs: number;
private readonly maxBytes: number;
private readonly putWaitMs: number;
private readonly idGenerator: () => string;
private readonly now: () => number;

constructor(opts: InlineArrowStashOptions = {}) {
this.ttlMs = opts.ttlMs ?? 10 * 60 * 1000;
this.maxBytes = opts.maxBytes ?? 256 * 1024 * 1024;
this.putWaitMs = opts.putWaitMs ?? 0;
this.idGenerator = opts.idGenerator ?? randomUUID;
this.now = opts.now ?? Date.now;
}
Expand Down Expand Up @@ -107,6 +129,53 @@ export class InlineArrowStash {
return id;
}

/**
* Like `put()` but, when the stash is full, waits up to `putWaitMs`
* for an existing entry to drain (via `take()` or TTL eviction) before
* giving up. Returns the synthetic id on success, or `null` when the
* wait elapses without a slot freeing — at which point the caller
* should fall back to its out-of-band delivery path.
*
* Wakes happen FIFO: the head waiter is satisfied first. If `signal`
* aborts before a slot frees, the wait resolves with `null` and the
* waiter drops out of the queue without consuming the next free slot.
*/
async putBlocking(
userId: string,
bytes: Uint8Array,
signal?: AbortSignal,
): Promise<string | null> {
const immediate = this.put(userId, bytes);
if (immediate !== null) return immediate;
if (this.putWaitMs <= 0 || signal?.aborted) return null;

return new Promise<string | null>((resolve) => {
let settled = false;
const settle = (value: string | null) => {
if (settled) return;
settled = true;
const idx = this.waiters.indexOf(waiter);
if (idx >= 0) this.waiters.splice(idx, 1);
clearTimeout(timer);
if (signal) signal.removeEventListener("abort", onAbort);
resolve(value);
};
const wake = () => {
if (settled) return;
// wakeWaiters() has confirmed totalBytes + needed <= maxBytes;
// put() should succeed. The retry guards against an unlikely
// concurrent gc()/take() shuffle.
const id = this.put(userId, bytes);
settle(id);
};
const onAbort = () => settle(null);
const waiter: Waiter = { needed: bytes.length, wake };
this.waiters.push(waiter);
const timer = setTimeout(() => settle(null), this.putWaitMs);
if (signal) signal.addEventListener("abort", onAbort, { once: true });
});
}

/**
* Drain a payload from the stash. Returns `undefined` if the id is
* unknown, expired, or belongs to a different user.
Expand All @@ -118,6 +187,7 @@ export class InlineArrowStash {
if (entry.userId !== userId) return undefined;
this.entries.delete(id);
this.totalBytes -= entry.bytes.length;
this.wakeWaiters();
return entry.bytes;
}

Expand All @@ -133,15 +203,34 @@ export class InlineArrowStash {
clear(): void {
this.entries.clear();
this.totalBytes = 0;
this.wakeWaiters();
}

private gc(): void {
const now = this.now();
let freed = false;
for (const [id, entry] of this.entries) {
if (entry.expiresAt <= now) {
this.entries.delete(id);
this.totalBytes -= entry.bytes.length;
freed = true;
}
}
if (freed) this.wakeWaiters();
}

/**
* FIFO drain of the wait queue. Walks from head until either the queue
* is empty or the head waiter does not fit. The head is shifted off
* BEFORE wake() runs so that any re-entry from gc()/put() inside the
* waiter's put attempt cannot pick up the same waiter again.
*/
private wakeWaiters(): void {
while (this.waiters.length > 0) {
const head = this.waiters[0];
if (this.totalBytes + head.needed > this.maxBytes) return;
this.waiters.shift();
head.wake();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1057,8 +1057,13 @@ describe("Analytics Plugin", () => {
});
(plugin as any).SQLClient.executeStatement = executeMock;

// Force the stash to reject the put — simulates capacity exhaustion.
vi.spyOn((plugin as any).inlineArrowStash, "put").mockReturnValue(null);
// Force the stash to reject the put — simulates capacity exhaustion
// after the put-wait elapses. Stubbing putBlocking() directly skips
// the real timer; the production call site goes through putBlocking.
vi.spyOn(
(plugin as any).inlineArrowStash,
"putBlocking",
).mockResolvedValue(null);

plugin.injectRoutes(router);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,4 +113,124 @@ describe("InlineArrowStash", () => {
expect(stash.count()).toBe(0);
expect(stash.size()).toBe(0);
});

describe("putBlocking backpressure", () => {
test("succeeds immediately when capacity is available", async () => {
const stash = new InlineArrowStash({
putWaitMs: 50,
idGenerator: () => "x",
});
const id = await stash.putBlocking("user-1", bytes(10));
expect(id).toBe("inline-x");
});

test("waits for a take() to free a slot, then succeeds", async () => {
let seq = 0;
const stash = new InlineArrowStash({
maxBytes: 100,
putWaitMs: 500,
idGenerator: () => String(seq++),
});
const a = mustPut(stash, "user-1", bytes(80));
mustPut(stash, "user-1", bytes(20));
expect(stash.size()).toBe(100);

// 50 bytes won't fit until something drains.
const pending = stash.putBlocking("user-1", bytes(50));
// Drain the 80-byte entry → frees room for 50.
stash.take(a, "user-1");

const id = await pending;
expect(id).not.toBeNull();
expect(stash.size()).toBe(70); // 20 left over + 50 just inserted
});

test("returns null when the wait elapses without a slot freeing", async () => {
const stash = new InlineArrowStash({
maxBytes: 100,
putWaitMs: 20,
});
mustPut(stash, "user-1", bytes(100));
const t0 = Date.now();
const id = await stash.putBlocking("user-1", bytes(50));
const elapsed = Date.now() - t0;
expect(id).toBeNull();
expect(elapsed).toBeGreaterThanOrEqual(15);
});

test("preserves FIFO order across waiters", async () => {
let seq = 0;
const stash = new InlineArrowStash({
maxBytes: 100,
putWaitMs: 500,
idGenerator: () => String(seq++),
});
const a = mustPut(stash, "user-1", bytes(100));

// A1 needs 60, A2 needs 30, both wait.
const a1 = stash.putBlocking("user-1", bytes(60));
const a2 = stash.putBlocking("user-1", bytes(30));

stash.take(a, "user-1"); // frees 100 → both fit
const [id1, id2] = await Promise.all([a1, a2]);
expect(id1).not.toBeNull();
expect(id2).not.toBeNull();
// Order of issued ids matches submission order.
expect(Number(id1!.replace("inline-", ""))).toBeLessThan(
Number(id2!.replace("inline-", "")),
);
});

test("rejects later waiters when head consumes the freed capacity", async () => {
let seq = 0;
const stash = new InlineArrowStash({
maxBytes: 100,
putWaitMs: 30,
idGenerator: () => String(seq++),
});
const a = mustPut(stash, "user-1", bytes(100));

const a1 = stash.putBlocking("user-1", bytes(80));
const a2 = stash.putBlocking("user-1", bytes(80));

stash.take(a, "user-1"); // 100 free → only a1 (80) fits
const [id1, id2] = await Promise.all([a1, a2]);
expect(id1).not.toBeNull();
// a2 was still queued, gets evicted on timeout
expect(id2).toBeNull();
});

test("settles with null when signal aborts mid-wait", async () => {
const stash = new InlineArrowStash({
maxBytes: 100,
putWaitMs: 5000,
});
mustPut(stash, "user-1", bytes(100));
const ac = new AbortController();
const pending = stash.putBlocking("user-1", bytes(50), ac.signal);
ac.abort();
expect(await pending).toBeNull();
});

test("pre-aborted signal short-circuits", async () => {
const stash = new InlineArrowStash({
maxBytes: 100,
putWaitMs: 5000,
});
mustPut(stash, "user-1", bytes(100));
const id = await stash.putBlocking(
"user-1",
bytes(50),
AbortSignal.abort(),
);
expect(id).toBeNull();
});

test("putWaitMs=0 (default) behaves like sync put", async () => {
const stash = new InlineArrowStash({ maxBytes: 100 });
mustPut(stash, "user-1", bytes(100));
const id = await stash.putBlocking("user-1", bytes(50));
expect(id).toBeNull();
});
});
});