diff --git a/src/handlers/streamHandler.ts b/src/handlers/streamHandler.ts index c406e64a6..fcd084f66 100644 --- a/src/handlers/streamHandler.ts +++ b/src/handlers/streamHandler.ts @@ -321,29 +321,39 @@ export function handleStreamingMode( const isSleepTimeRequired = proxyProvider === AZURE_OPEN_AI ? true : false; const encoder = new TextEncoder(); + // Hold a strong reference to the upstream response, reader, and writer + // for the entire lifetime of the streamed body. Without this, only + // `reader` is captured by the async closure below; V8 can then + // garbage-collect the upstream `Response` mid-stream under concurrent + // load, causing `reader.read()` to throw + // "Response object has been garbage collected" and the consumer to see + // a truncated stream / TLS-close error. + const streamCtx = { response, reader, writer }; + let streamTask: Promise; + if (proxyProvider === BEDROCK) { - (async () => { + streamTask = (async () => { try { if (shouldSendHookResultChunk(strictOpenAiCompliance, hooksResult)) { const hookResultChunk = constructHookResultChunk(hooksResult, fn); if (hookResultChunk) { - await writer.write(encoder.encode(hookResultChunk)); + await streamCtx.writer.write(encoder.encode(hookResultChunk)); } } for await (const chunk of readAWSStream( - reader, + streamCtx.reader, responseTransformer, fallbackChunkId, strictOpenAiCompliance, gatewayRequest )) { - await writer.write(encoder.encode(chunk)); + await streamCtx.writer.write(encoder.encode(chunk)); } } catch (error) { console.error('Error during stream processing:', proxyProvider, error); } finally { try { - await writer.close(); + await streamCtx.writer.close(); } catch (closeError) { console.error( 'Failed to close the writer:', @@ -354,16 +364,16 @@ export function handleStreamingMode( } })(); } else { - (async () => { + streamTask = (async () => { try { if (shouldSendHookResultChunk(strictOpenAiCompliance, hooksResult)) { const hookResultChunk = constructHookResultChunk(hooksResult, fn); if (hookResultChunk) { - await writer.write(encoder.encode(hookResultChunk)); + await streamCtx.writer.write(encoder.encode(hookResultChunk)); } } for await (const chunk of readStream( - reader, + streamCtx.reader, splitPattern, responseTransformer, isSleepTimeRequired, @@ -371,13 +381,13 @@ export function handleStreamingMode( strictOpenAiCompliance, gatewayRequest )) { - await writer.write(encoder.encode(chunk)); + await streamCtx.writer.write(encoder.encode(chunk)); } } catch (error) { console.error('Error during stream processing:', proxyProvider, error); } finally { try { - await writer.close(); + await streamCtx.writer.close(); } catch (closeError) { console.error( 'Failed to close the writer:', @@ -389,6 +399,13 @@ export function handleStreamingMode( })(); } + // Anchor the streaming task and context on the returned `readable` so the + // caller's response (which wraps `readable`) keeps them alive. This is + // the strong-reference chain that prevents V8 from collecting `response` + // or dropping the unawaited promise before the stream completes. + (readable as any).__pkg_streamTask = streamTask; + (readable as any).__pkg_streamCtx = streamCtx; + // Convert GEMINI/COHERE json stream to text/event-stream for non-proxy calls const isGoogleCohereOrBedrock = [GOOGLE, COHERE, BEDROCK].includes( proxyProvider