Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 27 additions & 10 deletions src/handlers/streamHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -321,29 +321,39 @@ export function handleStreamingMode(
const isSleepTimeRequired = proxyProvider === AZURE_OPEN_AI ? true : false;
const encoder = new TextEncoder();

// Hold a strong reference to the upstream response, reader, and writer
// for the entire lifetime of the streamed body. Without this, only
// `reader` is captured by the async closure below; V8 can then
// garbage-collect the upstream `Response` mid-stream under concurrent
// load, causing `reader.read()` to throw
// "Response object has been garbage collected" and the consumer to see
// a truncated stream / TLS-close error.
const streamCtx = { response, reader, writer };
let streamTask: Promise<void>;

if (proxyProvider === BEDROCK) {
(async () => {
streamTask = (async () => {
try {
if (shouldSendHookResultChunk(strictOpenAiCompliance, hooksResult)) {
const hookResultChunk = constructHookResultChunk(hooksResult, fn);
if (hookResultChunk) {
await writer.write(encoder.encode(hookResultChunk));
await streamCtx.writer.write(encoder.encode(hookResultChunk));
}
}
for await (const chunk of readAWSStream(
reader,
streamCtx.reader,
responseTransformer,
fallbackChunkId,
strictOpenAiCompliance,
gatewayRequest
)) {
await writer.write(encoder.encode(chunk));
await streamCtx.writer.write(encoder.encode(chunk));
}
} catch (error) {
console.error('Error during stream processing:', proxyProvider, error);
} finally {
try {
await writer.close();
await streamCtx.writer.close();
} catch (closeError) {
console.error(
'Failed to close the writer:',
Expand All @@ -354,30 +364,30 @@ export function handleStreamingMode(
}
})();
} else {
(async () => {
streamTask = (async () => {
try {
if (shouldSendHookResultChunk(strictOpenAiCompliance, hooksResult)) {
const hookResultChunk = constructHookResultChunk(hooksResult, fn);
if (hookResultChunk) {
await writer.write(encoder.encode(hookResultChunk));
await streamCtx.writer.write(encoder.encode(hookResultChunk));
}
}
for await (const chunk of readStream(
reader,
streamCtx.reader,
splitPattern,
responseTransformer,
isSleepTimeRequired,
fallbackChunkId,
strictOpenAiCompliance,
gatewayRequest
)) {
await writer.write(encoder.encode(chunk));
await streamCtx.writer.write(encoder.encode(chunk));
}
} catch (error) {
console.error('Error during stream processing:', proxyProvider, error);
} finally {
try {
await writer.close();
await streamCtx.writer.close();
} catch (closeError) {
console.error(
'Failed to close the writer:',
Expand All @@ -389,6 +399,13 @@ export function handleStreamingMode(
})();
}

// Anchor the streaming task and context on the returned `readable` so the
// caller's response (which wraps `readable`) keeps them alive. This is
// the strong-reference chain that prevents V8 from collecting `response`
// or dropping the unawaited promise before the stream completes.
(readable as any).__pkg_streamTask = streamTask;
(readable as any).__pkg_streamCtx = streamCtx;

// Convert GEMINI/COHERE json stream to text/event-stream for non-proxy calls
const isGoogleCohereOrBedrock = [GOOGLE, COHERE, BEDROCK].includes(
proxyProvider
Expand Down