diff --git a/.changeset/streaming-input-audio-transcription-delta.md b/.changeset/streaming-input-audio-transcription-delta.md new file mode 100644 index 000000000..2e975f52a --- /dev/null +++ b/.changeset/streaming-input-audio-transcription-delta.md @@ -0,0 +1,5 @@ +--- +"@livekit/agents-plugin-openai": patch +--- + +feat(plugin-openai): stream `input_audio_transcription.delta` events on the OpenAI Realtime API as `UserInputTranscribed` partials (`isFinal: false`). Enables word-by-word user transcripts with `gpt-realtime-whisper` and any future delta-emitting transcription model. Accumulators are cleared on `.completed`, `.failed`, `conversation.item.deleted`, session close, and reconnect; `.failed` now emits a closing `isFinal: true` event when partials had streamed so consumers don't hang. diff --git a/examples/src/realtime_streaming_transcript.ts b/examples/src/realtime_streaming_transcript.ts new file mode 100644 index 000000000..00f525a31 --- /dev/null +++ b/examples/src/realtime_streaming_transcript.ts @@ -0,0 +1,54 @@ +// SPDX-FileCopyrightText: 2026 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Requires a .env at the repo root with LIVEKIT_URL, LIVEKIT_API_KEY, +// LIVEKIT_API_SECRET, and OPENAI_API_KEY. +// +// Run: pnpm build && node --env-file=.env ./examples/src/realtime_streaming_transcript.ts dev +import { type JobContext, ServerOptions, cli, defineAgent, log, voice } from '@livekit/agents'; +import * as openai from '@livekit/agents-plugin-openai'; +import { fileURLToPath } from 'node:url'; + +export default defineAgent({ + entry: async (ctx: JobContext) => { + const logger = log(); + await ctx.connect(); + + const session = new voice.AgentSession({ + llm: new openai.realtime.RealtimeModel({ + inputAudioTranscription: { model: 'gpt-realtime-whisper' }, + }), + }); + + let lastPartialLength = 0; + session.on(voice.AgentSessionEventTypes.UserInputTranscribed, (ev) => { + if (ev.isFinal) { + logger.info({ transcript: ev.transcript }, '[user transcript FINAL]'); + lastPartialLength = 0; + return; + } + if (ev.transcript.length - lastPartialLength >= 6) { + logger.info({ transcript: ev.transcript }, '[user transcript partial]'); + lastPartialLength = ev.transcript.length; + } + }); + + await session.start({ + agent: new voice.Agent({ + instructions: + 'You are a helpful assistant for a streaming-transcript demo. ' + + 'Keep every reply to one short sentence so the user stays the focus. ' + + 'Ask the user to read a long sentence or two aloud so they can see their own words streaming live.', + }), + room: ctx.room, + }); + + session.generateReply({ + instructions: + 'Greet the user briefly and ask them to say a long sentence so they can watch their own words appear live on screen as they speak.', + }); + }, +}); + +cli.runApp(new ServerOptions({ agent: fileURLToPath(import.meta.url) })); diff --git a/plugins/openai/src/realtime/api_proto.ts b/plugins/openai/src/realtime/api_proto.ts index 1d4c775c8..6fd5f5097 100644 --- a/plugins/openai/src/realtime/api_proto.ts +++ b/plugins/openai/src/realtime/api_proto.ts @@ -58,6 +58,7 @@ export type ServerEventType = | 'input_audio_buffer.speech_stopped' | 'conversation.item.added' // GA: renamed from conversation.item.created | 'conversation.item.created' // Beta: kept for backward compatibility + | 'conversation.item.input_audio_transcription.delta' | 'conversation.item.input_audio_transcription.completed' | 'conversation.item.input_audio_transcription.failed' | 'conversation.item.truncated' @@ -509,6 +510,13 @@ export interface ConversationItemAddedEvent extends BaseServerEvent { item: ItemResource; } +export interface ConversationItemInputAudioTranscriptionDeltaEvent extends BaseServerEvent { + type: 'conversation.item.input_audio_transcription.delta'; + item_id: string; + content_index?: number; + delta?: string; +} + export interface ConversationItemInputAudioTranscriptionCompletedEvent extends BaseServerEvent { type: 'conversation.item.input_audio_transcription.completed'; item_id: string; @@ -670,6 +678,7 @@ export type ServerEvent = | InputAudioBufferSpeechStoppedEvent | ConversationItemCreatedEvent | ConversationItemAddedEvent // GA: renamed from conversation.item.created + | ConversationItemInputAudioTranscriptionDeltaEvent | ConversationItemInputAudioTranscriptionCompletedEvent | ConversationItemInputAudioTranscriptionFailedEvent | ConversationItemTruncatedEvent diff --git a/plugins/openai/src/realtime/realtime_model.test.ts b/plugins/openai/src/realtime/realtime_model.test.ts index 792660ef9..dd6d98d7c 100644 --- a/plugins/openai/src/realtime/realtime_model.test.ts +++ b/plugins/openai/src/realtime/realtime_model.test.ts @@ -43,6 +43,248 @@ describe('RealtimeSession.generateReply', () => { }); }); +describe('RealtimeSession input_audio_transcription delta handling', () => { + type TranscriptionInternals = { + handleConversationItemInputAudioTranscriptionDelta: ( + ev: api_proto.ConversationItemInputAudioTranscriptionDeltaEvent, + ) => void; + handleConversationItemInputAudioTranscriptionCompleted: ( + ev: api_proto.ConversationItemInputAudioTranscriptionCompletedEvent, + ) => void; + finalizePartialOnTranscriptionFailure: (itemId: string, contentIndex: number) => void; + handleConversationItemDeleted: (ev: api_proto.ConversationItemDeletedEvent) => void; + inputTranscriptAccumulators: Map>; + audioCapableItemIds: Set; + itemDeleteFutures: Record; + remoteChatCtx: { + get: (id: string) => { item: llm.ChatMessage } | undefined; + delete: (id: string) => void; + }; + on: (event: string, listener: (payload: llm.InputTranscriptionCompleted) => void) => void; + }; + + function createTranscriptSession(opts?: { + chatItems?: Record; + }): TranscriptionInternals { + const session = Object.create(RealtimeSession.prototype) as TranscriptionInternals; + session.inputTranscriptAccumulators = new Map>(); + session.audioCapableItemIds = new Set(); + session.itemDeleteFutures = {}; + const chatItems = new Map(Object.entries(opts?.chatItems ?? {})); + session.remoteChatCtx = { + get: (id) => { + const item = chatItems.get(id); + return item ? { item } : undefined; + }, + delete: (id) => { + chatItems.delete(id); + }, + }; + return session; + } + + function delta( + item_id: string, + delta: string, + content_index = 0, + ): api_proto.ConversationItemInputAudioTranscriptionDeltaEvent { + return { + type: 'conversation.item.input_audio_transcription.delta', + event_id: `evt_${item_id}_${delta}_${content_index}`, + item_id, + content_index, + delta, + }; + } + + function completed( + item_id: string, + transcript: string, + content_index = 0, + ): api_proto.ConversationItemInputAudioTranscriptionCompletedEvent { + return { + type: 'conversation.item.input_audio_transcription.completed', + event_id: `evt_${item_id}_done`, + item_id, + content_index, + transcript, + }; + } + + it('accumulates partial transcripts across delta events for the same item_id', () => { + const session = createTranscriptSession(); + const emissions: llm.InputTranscriptionCompleted[] = []; + session.on('input_audio_transcription_completed', (ev) => emissions.push(ev)); + + session.handleConversationItemInputAudioTranscriptionDelta(delta('item_a', 'Hello')); + session.handleConversationItemInputAudioTranscriptionDelta(delta('item_a', ', world')); + session.handleConversationItemInputAudioTranscriptionDelta(delta('item_a', '!')); + + expect(emissions).toEqual([ + { itemId: 'item_a', transcript: 'Hello', isFinal: false }, + { itemId: 'item_a', transcript: 'Hello, world', isFinal: false }, + { itemId: 'item_a', transcript: 'Hello, world!', isFinal: false }, + ]); + }); + + it('keeps accumulators isolated across concurrent item_ids', () => { + const session = createTranscriptSession(); + const emissions: llm.InputTranscriptionCompleted[] = []; + session.on('input_audio_transcription_completed', (ev) => emissions.push(ev)); + + session.handleConversationItemInputAudioTranscriptionDelta(delta('item_a', 'Aaa')); + session.handleConversationItemInputAudioTranscriptionDelta(delta('item_b', 'Bbb')); + session.handleConversationItemInputAudioTranscriptionDelta(delta('item_a', 'Aaa')); + + expect(emissions).toEqual([ + { itemId: 'item_a', transcript: 'Aaa', isFinal: false }, + { itemId: 'item_b', transcript: 'Bbb', isFinal: false }, + { itemId: 'item_a', transcript: 'AaaAaa', isFinal: false }, + ]); + }); + + it('keeps accumulators isolated across content_index within the same item_id', () => { + const session = createTranscriptSession(); + const emissions: llm.InputTranscriptionCompleted[] = []; + session.on('input_audio_transcription_completed', (ev) => emissions.push(ev)); + + session.handleConversationItemInputAudioTranscriptionDelta(delta('item_a', 'idx0-', 0)); + session.handleConversationItemInputAudioTranscriptionDelta(delta('item_a', 'idx1-', 1)); + session.handleConversationItemInputAudioTranscriptionDelta(delta('item_a', 'more0', 0)); + + expect(emissions).toEqual([ + { itemId: 'item_a', transcript: 'idx0-', isFinal: false }, + { itemId: 'item_a', transcript: 'idx1-', isFinal: false }, + { itemId: 'item_a', transcript: 'idx0-more0', isFinal: false }, + ]); + }); + + it('clears the accumulator on .completed so a subsequent delta does not inherit prior state', () => { + const session = createTranscriptSession({ + chatItems: { item_a: new llm.ChatMessage({ role: 'user', content: '', id: 'item_a' }) }, + }); + const emissions: llm.InputTranscriptionCompleted[] = []; + session.on('input_audio_transcription_completed', (ev) => emissions.push(ev)); + + session.handleConversationItemInputAudioTranscriptionDelta(delta('item_a', 'first turn ')); + session.handleConversationItemInputAudioTranscriptionCompleted( + completed('item_a', 'first turn complete.'), + ); + session.handleConversationItemInputAudioTranscriptionDelta(delta('item_a', 'second')); + + expect(session.inputTranscriptAccumulators.get('item_a')?.get(0)).toBe('second'); + expect(emissions).toEqual([ + { itemId: 'item_a', transcript: 'first turn ', isFinal: false }, + { itemId: 'item_a', transcript: 'first turn complete.', isFinal: true }, + { itemId: 'item_a', transcript: 'second', isFinal: false }, + ]); + }); + + it('pushes the final transcript onto the matching ChatMessage exactly once on .completed', () => { + const chatMessage = new llm.ChatMessage({ role: 'user', content: '', id: 'item_a' }); + const session = createTranscriptSession({ chatItems: { item_a: chatMessage } }); + + session.handleConversationItemInputAudioTranscriptionDelta(delta('item_a', 'partial-only')); + expect(chatMessage.content).toEqual(['']); + + session.handleConversationItemInputAudioTranscriptionCompleted(completed('item_a', 'final.')); + expect(chatMessage.content).toEqual(['', 'final.']); + }); + + it('emits isFinal:true on .completed even when remoteChatCtx has no matching item', () => { + const session = createTranscriptSession(); + const emissions: llm.InputTranscriptionCompleted[] = []; + session.on('input_audio_transcription_completed', (ev) => emissions.push(ev)); + + session.handleConversationItemInputAudioTranscriptionDelta(delta('item_a', 'partial')); + session.handleConversationItemInputAudioTranscriptionCompleted(completed('item_a', 'final.')); + + expect(emissions).toEqual([ + { itemId: 'item_a', transcript: 'partial', isFinal: false }, + { itemId: 'item_a', transcript: 'final.', isFinal: true }, + ]); + expect(session.inputTranscriptAccumulators.size).toBe(0); + }); + + it('treats .completed as a no-op cleanup when no deltas preceded it (non-streaming STT model)', () => { + const session = createTranscriptSession({ + chatItems: { item_a: new llm.ChatMessage({ role: 'user', content: '', id: 'item_a' }) }, + }); + const emissions: llm.InputTranscriptionCompleted[] = []; + session.on('input_audio_transcription_completed', (ev) => emissions.push(ev)); + + session.handleConversationItemInputAudioTranscriptionCompleted( + completed('item_a', 'one-shot whisper-1 transcript'), + ); + + expect(session.inputTranscriptAccumulators.size).toBe(0); + expect(emissions).toEqual([ + { itemId: 'item_a', transcript: 'one-shot whisper-1 transcript', isFinal: true }, + ]); + }); + + it('skips emission for empty or missing deltas and does not create an accumulator', () => { + const session = createTranscriptSession(); + const emissions: llm.InputTranscriptionCompleted[] = []; + session.on('input_audio_transcription_completed', (ev) => emissions.push(ev)); + + session.handleConversationItemInputAudioTranscriptionDelta(delta('item_a', '')); + session.handleConversationItemInputAudioTranscriptionDelta({ + type: 'conversation.item.input_audio_transcription.delta', + event_id: 'evt_no_delta', + item_id: 'item_a', + }); + session.handleConversationItemInputAudioTranscriptionDelta(delta('item_a', 'real')); + + expect(session.inputTranscriptAccumulators.get('item_a')?.get(0)).toBe('real'); + expect(emissions).toEqual([{ itemId: 'item_a', transcript: 'real', isFinal: false }]); + }); + + it('clears the accumulator and emits a closing isFinal:true on transcription failure when partials had streamed', () => { + const session = createTranscriptSession(); + const emissions: llm.InputTranscriptionCompleted[] = []; + session.on('input_audio_transcription_completed', (ev) => emissions.push(ev)); + + session.handleConversationItemInputAudioTranscriptionDelta(delta('item_a', 'partial ')); + session.handleConversationItemInputAudioTranscriptionDelta(delta('item_a', 'text')); + session.finalizePartialOnTranscriptionFailure('item_a', 0); + + expect(session.inputTranscriptAccumulators.size).toBe(0); + expect(emissions).toEqual([ + { itemId: 'item_a', transcript: 'partial ', isFinal: false }, + { itemId: 'item_a', transcript: 'partial text', isFinal: false }, + { itemId: 'item_a', transcript: 'partial text', isFinal: true }, + ]); + }); + + it('emits nothing on transcription failure when no partials had streamed for that item', () => { + const session = createTranscriptSession(); + const emissions: llm.InputTranscriptionCompleted[] = []; + session.on('input_audio_transcription_completed', (ev) => emissions.push(ev)); + + session.finalizePartialOnTranscriptionFailure('item_a', 0); + + expect(emissions).toEqual([]); + expect(session.inputTranscriptAccumulators.size).toBe(0); + }); + + it('clears the accumulator when the conversation item is deleted', () => { + const session = createTranscriptSession(); + + session.handleConversationItemInputAudioTranscriptionDelta(delta('item_a', 'partial ')); + session.handleConversationItemInputAudioTranscriptionDelta(delta('item_a', 'more', 1)); + expect(session.inputTranscriptAccumulators.get('item_a')?.size).toBe(2); + + session.handleConversationItemDeleted({ + type: 'conversation.item.deleted', + event_id: 'evt_del', + item_id: 'item_a', + }); + + expect(session.inputTranscriptAccumulators.has('item_a')).toBe(false); + }); +}); + describe('livekitItemToOpenAIItem', () => { describe('message items', () => { it('should use output_text type for assistant messages', async () => { diff --git a/plugins/openai/src/realtime/realtime_model.ts b/plugins/openai/src/realtime/realtime_model.ts index 662448112..d6a1ec946 100644 --- a/plugins/openai/src/realtime/realtime_model.ts +++ b/plugins/openai/src/realtime/realtime_model.ts @@ -441,6 +441,8 @@ export class RealtimeSession extends llm.RealtimeSession { private itemCreateFutures: { [id: string]: Future } = {}; private itemDeleteFutures: { [id: string]: Future } = {}; + private inputTranscriptAccumulators = new Map>(); + // Track items that have real server-side audio (created in current session, not restored) // Items restored after reconnection are text-only and cannot be truncated private audioCapableItemIds: Set = new Set(); @@ -1030,6 +1032,7 @@ export class RealtimeSession extends llm.RealtimeSession { // Clear audio-capable item tracking - restored items are text-only on the server this.audioCapableItemIds.clear(); + this.inputTranscriptAccumulators.clear(); const events: api_proto.ClientEvent[] = []; @@ -1198,6 +1201,9 @@ export class RealtimeSession extends llm.RealtimeSession { case 'conversation.item.deleted': this.handleConversationItemDeleted(event); break; + case 'conversation.item.input_audio_transcription.delta': + this.handleConversationItemInputAudioTranscriptionDelta(event); + break; case 'conversation.item.input_audio_transcription.completed': this.handleConversationItemInputAudioTranscriptionCompleted(event); break; @@ -1313,6 +1319,8 @@ export class RealtimeSession extends llm.RealtimeSession { } this.itemDeleteFutures = {}; + this.inputTranscriptAccumulators.clear(); + // Clean up current generation if exists if (this.currentGeneration) { for (const gen of this.currentGeneration.messages.values()) { @@ -1470,6 +1478,7 @@ export class RealtimeSession extends llm.RealtimeSession { // Clean up audio-capable tracking for deleted items this.audioCapableItemIds.delete(event.item_id); + this.inputTranscriptAccumulators.delete(event.item_id); try { this.remoteChatCtx.delete(event.item_id); @@ -1484,26 +1493,56 @@ export class RealtimeSession extends llm.RealtimeSession { } } + private handleConversationItemInputAudioTranscriptionDelta( + event: api_proto.ConversationItemInputAudioTranscriptionDeltaEvent, + ): void { + if (!event.delta) return; + + const contentIndex = event.content_index ?? 0; + let byIndex = this.inputTranscriptAccumulators.get(event.item_id); + if (!byIndex) { + byIndex = new Map(); + this.inputTranscriptAccumulators.set(event.item_id, byIndex); + } + const accumulated = (byIndex.get(contentIndex) ?? '') + event.delta; + byIndex.set(contentIndex, accumulated); + + this.emit('input_audio_transcription_completed', { + itemId: event.item_id, + transcript: accumulated, + isFinal: false, + }); + } + + private clearAccumulator(itemId: string, contentIndex: number): string | undefined { + const byIndex = this.inputTranscriptAccumulators.get(itemId); + if (!byIndex) return undefined; + const partial = byIndex.get(contentIndex); + byIndex.delete(contentIndex); + if (byIndex.size === 0) this.inputTranscriptAccumulators.delete(itemId); + return partial; + } + private handleConversationItemInputAudioTranscriptionCompleted( event: api_proto.ConversationItemInputAudioTranscriptionCompletedEvent, ): void { - const remoteItem = this.remoteChatCtx.get(event.item_id); - if (!remoteItem) { - return; - } + this.clearAccumulator(event.item_id, event.content_index ?? 0); - const item = remoteItem.item; - if (item instanceof llm.ChatMessage) { - item.content.push(event.transcript); - } else { - throw new Error('item is not a chat message'); + const remoteItem = this.remoteChatCtx.get(event.item_id); + if (remoteItem) { + const item = remoteItem.item; + if (item instanceof llm.ChatMessage) { + item.content.push(event.transcript); + } else { + throw new Error('item is not a chat message'); + } } this.emit('input_audio_transcription_completed', { itemId: event.item_id, transcript: event.transcript, isFinal: true, - } as llm.InputTranscriptionCompleted); + }); } private handleConversationItemInputAudioTranscriptionFailed( @@ -1513,6 +1552,18 @@ export class RealtimeSession extends llm.RealtimeSession { { error: event.error }, 'OpenAI Realtime API failed to transcribe input audio', ); + this.finalizePartialOnTranscriptionFailure(event.item_id, event.content_index ?? 0); + } + + // Close any open partial stream so consumers waiting for isFinal don't hang. + private finalizePartialOnTranscriptionFailure(itemId: string, contentIndex: number): void { + const partial = this.clearAccumulator(itemId, contentIndex); + if (partial === undefined) return; + this.emit('input_audio_transcription_completed', { + itemId, + transcript: partial, + isFinal: true, + }); } private handleResponseContentPartAdded(event: api_proto.ResponseContentPartAddedEvent): void {