From 2b190d42334b12a32002ed0249f24abedd1576ea Mon Sep 17 00:00:00 2001 From: meital Date: Tue, 19 May 2026 13:45:58 +0000 Subject: [PATCH] fix(streamHandler): prevent V8 GC of upstream Response mid-stream MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Under concurrent streaming load, `handleStreamingMode` returns a new `Response` wrapping `readable`. The upstream `response` is then no longer referenced from any user-visible variable. The unawaited async IIFE that pipes upstream → writer captures only `reader` and `writer` in its closure, not `response` itself. In Node's undici-backed fetch, the `ReadableStreamDefaultReader` does not keep its parent `Response` alive — the `Response` owns the underlying network connection. When V8 GC runs (driven by allocation churn from concurrent streams, not absolute memory pressure), the upstream `response` can be collected before the body finishes streaming. The next `reader.read()` then throws "Response object has been garbage collected", the IIFE catches it and closes the writer, and the consumer sees a truncated stream / TLS close. The async IIFE itself is also a GC hazard — its promise is not anchored anywhere. In practice the microtask queue keeps it alive, but that is not guaranteed under aggressive GC. Fix: pack `response`, `reader`, and `writer` into a `streamCtx` object that the closure references explicitly, capture the IIFE promise as `streamTask`, and anchor both on the returned `readable` (which the caller's `Response` keeps alive for the duration of the stream). Reference chain after the fix: caller's Response → readable → __pkg_streamCtx → upstream response → __pkg_streamTask → IIFE closure This keeps the upstream response, the reader, the writer, and the piping task strongly referenced for the entire stream lifetime, with zero behavior change for the happy path. --- src/handlers/streamHandler.ts | 37 +++++++++++++++++++++++++---------- 1 file changed, 27 insertions(+), 10 deletions(-) diff --git a/src/handlers/streamHandler.ts b/src/handlers/streamHandler.ts index c406e64a6..fcd084f66 100644 --- a/src/handlers/streamHandler.ts +++ b/src/handlers/streamHandler.ts @@ -321,29 +321,39 @@ export function handleStreamingMode( const isSleepTimeRequired = proxyProvider === AZURE_OPEN_AI ? true : false; const encoder = new TextEncoder(); + // Hold a strong reference to the upstream response, reader, and writer + // for the entire lifetime of the streamed body. Without this, only + // `reader` is captured by the async closure below; V8 can then + // garbage-collect the upstream `Response` mid-stream under concurrent + // load, causing `reader.read()` to throw + // "Response object has been garbage collected" and the consumer to see + // a truncated stream / TLS-close error. + const streamCtx = { response, reader, writer }; + let streamTask: Promise; + if (proxyProvider === BEDROCK) { - (async () => { + streamTask = (async () => { try { if (shouldSendHookResultChunk(strictOpenAiCompliance, hooksResult)) { const hookResultChunk = constructHookResultChunk(hooksResult, fn); if (hookResultChunk) { - await writer.write(encoder.encode(hookResultChunk)); + await streamCtx.writer.write(encoder.encode(hookResultChunk)); } } for await (const chunk of readAWSStream( - reader, + streamCtx.reader, responseTransformer, fallbackChunkId, strictOpenAiCompliance, gatewayRequest )) { - await writer.write(encoder.encode(chunk)); + await streamCtx.writer.write(encoder.encode(chunk)); } } catch (error) { console.error('Error during stream processing:', proxyProvider, error); } finally { try { - await writer.close(); + await streamCtx.writer.close(); } catch (closeError) { console.error( 'Failed to close the writer:', @@ -354,16 +364,16 @@ export function handleStreamingMode( } })(); } else { - (async () => { + streamTask = (async () => { try { if (shouldSendHookResultChunk(strictOpenAiCompliance, hooksResult)) { const hookResultChunk = constructHookResultChunk(hooksResult, fn); if (hookResultChunk) { - await writer.write(encoder.encode(hookResultChunk)); + await streamCtx.writer.write(encoder.encode(hookResultChunk)); } } for await (const chunk of readStream( - reader, + streamCtx.reader, splitPattern, responseTransformer, isSleepTimeRequired, @@ -371,13 +381,13 @@ export function handleStreamingMode( strictOpenAiCompliance, gatewayRequest )) { - await writer.write(encoder.encode(chunk)); + await streamCtx.writer.write(encoder.encode(chunk)); } } catch (error) { console.error('Error during stream processing:', proxyProvider, error); } finally { try { - await writer.close(); + await streamCtx.writer.close(); } catch (closeError) { console.error( 'Failed to close the writer:', @@ -389,6 +399,13 @@ export function handleStreamingMode( })(); } + // Anchor the streaming task and context on the returned `readable` so the + // caller's response (which wraps `readable`) keeps them alive. This is + // the strong-reference chain that prevents V8 from collecting `response` + // or dropping the unawaited promise before the stream completes. + (readable as any).__pkg_streamTask = streamTask; + (readable as any).__pkg_streamCtx = streamCtx; + // Convert GEMINI/COHERE json stream to text/event-stream for non-proxy calls const isGoogleCohereOrBedrock = [GOOGLE, COHERE, BEDROCK].includes( proxyProvider