Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/fix-bedrock-streaming-cached-tokens.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@effect/ai-amazon-bedrock": patch
---

Fix streaming responses losing all token usage data (including cached input tokens). The finish part was being emitted during the `messageStop` event before the `metadata` event had populated token counts, causing `inputTokens`, `outputTokens`, `totalTokens`, and `cachedInputTokens` to be uninitialized. The finish part is now deferred until both `messageStop` and `metadata` have been received, matching the Bedrock ConverseStream API event ordering.
29 changes: 19 additions & 10 deletions packages/ai/amazon-bedrock/src/AmazonBedrockLanguageModel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -638,12 +638,27 @@ const makeStreamResponse: (

let trace: ConverseTrace | undefined = undefined
let cacheWriteInputTokens: number | undefined = undefined
let finishReason: Response.FinishReason | undefined = undefined
let hasMetadata = false
const usage: Mutable<typeof Response.Usage.Encoded> = {
inputTokens: undefined,
outputTokens: undefined,
totalTokens: undefined
}

const tryEmitFinish = (parts: Array<Response.StreamPartEncoded>) => {
if (finishReason !== undefined && hasMetadata) {
parts.push({
type: "finish",
reason: finishReason,
usage,
metadata: {
bedrock: { trace, usage: { cacheWriteInputTokens } }
}
})
}
}

return stream.pipe(
Stream.mapEffect(Effect.fnUntraced(function*(event) {
const parts: Array<Response.StreamPartEncoded> = []
Expand All @@ -659,16 +674,8 @@ const makeStreamResponse: (
}

case "messageStop": {
const reason = InternalUtilities.resolveFinishReason(event.messageStop.stopReason)
parts.push({
type: "finish",
reason,
usage,
metadata: {
bedrock: { trace, usage: { cacheWriteInputTokens } }
}
})

finishReason = InternalUtilities.resolveFinishReason(event.messageStop.stopReason)
tryEmitFinish(parts)
break
}

Expand Down Expand Up @@ -890,6 +897,8 @@ const makeStreamResponse: (
if (Predicate.isNotUndefined(event.metadata.trace)) {
trace = event.metadata.trace
}
hasMetadata = true
tryEmitFinish(parts)
break
}

Expand Down
Loading
Loading