Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/streaming-input-audio-transcription-delta.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@livekit/agents-plugin-openai": patch
---

feat(plugin-openai): stream `input_audio_transcription.delta` events on the OpenAI Realtime API as `UserInputTranscribed` partials (`isFinal: false`). Enables word-by-word user transcripts with `gpt-realtime-whisper` and any future delta-emitting transcription model. Accumulators are cleared on `.completed`, `.failed`, `conversation.item.deleted`, session close, and reconnect; `.failed` now emits a closing `isFinal: true` event when partials had streamed so consumers don't hang.
54 changes: 54 additions & 0 deletions examples/src/realtime_streaming_transcript.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// SPDX-FileCopyrightText: 2026 LiveKit, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
// Requires a .env at the repo root with LIVEKIT_URL, LIVEKIT_API_KEY,
// LIVEKIT_API_SECRET, and OPENAI_API_KEY.
//
// Run: pnpm build && node --env-file=.env ./examples/src/realtime_streaming_transcript.ts dev
import { type JobContext, ServerOptions, cli, defineAgent, log, voice } from '@livekit/agents';
import * as openai from '@livekit/agents-plugin-openai';
import { fileURLToPath } from 'node:url';

export default defineAgent({
entry: async (ctx: JobContext) => {
const logger = log();
await ctx.connect();

const session = new voice.AgentSession({
llm: new openai.realtime.RealtimeModel({
inputAudioTranscription: { model: 'gpt-realtime-whisper' },
}),
});

let lastPartialLength = 0;
session.on(voice.AgentSessionEventTypes.UserInputTranscribed, (ev) => {
if (ev.isFinal) {
logger.info({ transcript: ev.transcript }, '[user transcript FINAL]');
lastPartialLength = 0;
return;
}
if (ev.transcript.length - lastPartialLength >= 6) {
logger.info({ transcript: ev.transcript }, '[user transcript partial]');
lastPartialLength = ev.transcript.length;
}
});

await session.start({
agent: new voice.Agent({
instructions:
'You are a helpful assistant for a streaming-transcript demo. ' +
'Keep every reply to one short sentence so the user stays the focus. ' +
'Ask the user to read a long sentence or two aloud so they can see their own words streaming live.',
}),
room: ctx.room,
});

session.generateReply({
instructions:
'Greet the user briefly and ask them to say a long sentence so they can watch their own words appear live on screen as they speak.',
});
},
});

cli.runApp(new ServerOptions({ agent: fileURLToPath(import.meta.url) }));
9 changes: 9 additions & 0 deletions plugins/openai/src/realtime/api_proto.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ export type ServerEventType =
| 'input_audio_buffer.speech_stopped'
| 'conversation.item.added' // GA: renamed from conversation.item.created
| 'conversation.item.created' // Beta: kept for backward compatibility
| 'conversation.item.input_audio_transcription.delta'
| 'conversation.item.input_audio_transcription.completed'
| 'conversation.item.input_audio_transcription.failed'
| 'conversation.item.truncated'
Expand Down Expand Up @@ -509,6 +510,13 @@ export interface ConversationItemAddedEvent extends BaseServerEvent {
item: ItemResource;
}

export interface ConversationItemInputAudioTranscriptionDeltaEvent extends BaseServerEvent {
type: 'conversation.item.input_audio_transcription.delta';
item_id: string;
content_index?: number;
delta?: string;
}

export interface ConversationItemInputAudioTranscriptionCompletedEvent extends BaseServerEvent {
type: 'conversation.item.input_audio_transcription.completed';
item_id: string;
Expand Down Expand Up @@ -670,6 +678,7 @@ export type ServerEvent =
| InputAudioBufferSpeechStoppedEvent
| ConversationItemCreatedEvent
| ConversationItemAddedEvent // GA: renamed from conversation.item.created
| ConversationItemInputAudioTranscriptionDeltaEvent
| ConversationItemInputAudioTranscriptionCompletedEvent
| ConversationItemInputAudioTranscriptionFailedEvent
| ConversationItemTruncatedEvent
Expand Down
242 changes: 242 additions & 0 deletions plugins/openai/src/realtime/realtime_model.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,248 @@ describe('RealtimeSession.generateReply', () => {
});
});

describe('RealtimeSession input_audio_transcription delta handling', () => {
type TranscriptionInternals = {
handleConversationItemInputAudioTranscriptionDelta: (
ev: api_proto.ConversationItemInputAudioTranscriptionDeltaEvent,
) => void;
handleConversationItemInputAudioTranscriptionCompleted: (
ev: api_proto.ConversationItemInputAudioTranscriptionCompletedEvent,
) => void;
finalizePartialOnTranscriptionFailure: (itemId: string, contentIndex: number) => void;
handleConversationItemDeleted: (ev: api_proto.ConversationItemDeletedEvent) => void;
inputTranscriptAccumulators: Map<string, Map<number, string>>;
audioCapableItemIds: Set<string>;
itemDeleteFutures: Record<string, never>;
remoteChatCtx: {
get: (id: string) => { item: llm.ChatMessage } | undefined;
delete: (id: string) => void;
};
on: (event: string, listener: (payload: llm.InputTranscriptionCompleted) => void) => void;
};

function createTranscriptSession(opts?: {
chatItems?: Record<string, llm.ChatMessage>;
}): TranscriptionInternals {
const session = Object.create(RealtimeSession.prototype) as TranscriptionInternals;
session.inputTranscriptAccumulators = new Map<string, Map<number, string>>();
session.audioCapableItemIds = new Set<string>();
session.itemDeleteFutures = {};
const chatItems = new Map<string, llm.ChatMessage>(Object.entries(opts?.chatItems ?? {}));
session.remoteChatCtx = {
get: (id) => {
const item = chatItems.get(id);
return item ? { item } : undefined;
},
delete: (id) => {
chatItems.delete(id);
},
};
return session;
}

function delta(
item_id: string,
delta: string,
content_index = 0,
): api_proto.ConversationItemInputAudioTranscriptionDeltaEvent {
return {
type: 'conversation.item.input_audio_transcription.delta',
event_id: `evt_${item_id}_${delta}_${content_index}`,
item_id,
content_index,
delta,
};
}

function completed(
item_id: string,
transcript: string,
content_index = 0,
): api_proto.ConversationItemInputAudioTranscriptionCompletedEvent {
return {
type: 'conversation.item.input_audio_transcription.completed',
event_id: `evt_${item_id}_done`,
item_id,
content_index,
transcript,
};
}

it('accumulates partial transcripts across delta events for the same item_id', () => {
const session = createTranscriptSession();
const emissions: llm.InputTranscriptionCompleted[] = [];
session.on('input_audio_transcription_completed', (ev) => emissions.push(ev));

session.handleConversationItemInputAudioTranscriptionDelta(delta('item_a', 'Hello'));
session.handleConversationItemInputAudioTranscriptionDelta(delta('item_a', ', world'));
session.handleConversationItemInputAudioTranscriptionDelta(delta('item_a', '!'));

expect(emissions).toEqual([
{ itemId: 'item_a', transcript: 'Hello', isFinal: false },
{ itemId: 'item_a', transcript: 'Hello, world', isFinal: false },
{ itemId: 'item_a', transcript: 'Hello, world!', isFinal: false },
]);
});

it('keeps accumulators isolated across concurrent item_ids', () => {
const session = createTranscriptSession();
const emissions: llm.InputTranscriptionCompleted[] = [];
session.on('input_audio_transcription_completed', (ev) => emissions.push(ev));

session.handleConversationItemInputAudioTranscriptionDelta(delta('item_a', 'Aaa'));
session.handleConversationItemInputAudioTranscriptionDelta(delta('item_b', 'Bbb'));
session.handleConversationItemInputAudioTranscriptionDelta(delta('item_a', 'Aaa'));

expect(emissions).toEqual([
{ itemId: 'item_a', transcript: 'Aaa', isFinal: false },
{ itemId: 'item_b', transcript: 'Bbb', isFinal: false },
{ itemId: 'item_a', transcript: 'AaaAaa', isFinal: false },
]);
});

it('keeps accumulators isolated across content_index within the same item_id', () => {
const session = createTranscriptSession();
const emissions: llm.InputTranscriptionCompleted[] = [];
session.on('input_audio_transcription_completed', (ev) => emissions.push(ev));

session.handleConversationItemInputAudioTranscriptionDelta(delta('item_a', 'idx0-', 0));
session.handleConversationItemInputAudioTranscriptionDelta(delta('item_a', 'idx1-', 1));
session.handleConversationItemInputAudioTranscriptionDelta(delta('item_a', 'more0', 0));

expect(emissions).toEqual([
{ itemId: 'item_a', transcript: 'idx0-', isFinal: false },
{ itemId: 'item_a', transcript: 'idx1-', isFinal: false },
{ itemId: 'item_a', transcript: 'idx0-more0', isFinal: false },
]);
});

it('clears the accumulator on .completed so a subsequent delta does not inherit prior state', () => {
const session = createTranscriptSession({
chatItems: { item_a: new llm.ChatMessage({ role: 'user', content: '', id: 'item_a' }) },
});
const emissions: llm.InputTranscriptionCompleted[] = [];
session.on('input_audio_transcription_completed', (ev) => emissions.push(ev));

session.handleConversationItemInputAudioTranscriptionDelta(delta('item_a', 'first turn '));
session.handleConversationItemInputAudioTranscriptionCompleted(
completed('item_a', 'first turn complete.'),
);
session.handleConversationItemInputAudioTranscriptionDelta(delta('item_a', 'second'));

expect(session.inputTranscriptAccumulators.get('item_a')?.get(0)).toBe('second');
expect(emissions).toEqual([
{ itemId: 'item_a', transcript: 'first turn ', isFinal: false },
{ itemId: 'item_a', transcript: 'first turn complete.', isFinal: true },
{ itemId: 'item_a', transcript: 'second', isFinal: false },
]);
});

it('pushes the final transcript onto the matching ChatMessage exactly once on .completed', () => {
const chatMessage = new llm.ChatMessage({ role: 'user', content: '', id: 'item_a' });
const session = createTranscriptSession({ chatItems: { item_a: chatMessage } });

session.handleConversationItemInputAudioTranscriptionDelta(delta('item_a', 'partial-only'));
expect(chatMessage.content).toEqual(['']);

session.handleConversationItemInputAudioTranscriptionCompleted(completed('item_a', 'final.'));
expect(chatMessage.content).toEqual(['', 'final.']);
});

it('emits isFinal:true on .completed even when remoteChatCtx has no matching item', () => {
const session = createTranscriptSession();
const emissions: llm.InputTranscriptionCompleted[] = [];
session.on('input_audio_transcription_completed', (ev) => emissions.push(ev));

session.handleConversationItemInputAudioTranscriptionDelta(delta('item_a', 'partial'));
session.handleConversationItemInputAudioTranscriptionCompleted(completed('item_a', 'final.'));

expect(emissions).toEqual([
{ itemId: 'item_a', transcript: 'partial', isFinal: false },
{ itemId: 'item_a', transcript: 'final.', isFinal: true },
]);
expect(session.inputTranscriptAccumulators.size).toBe(0);
});

it('treats .completed as a no-op cleanup when no deltas preceded it (non-streaming STT model)', () => {
const session = createTranscriptSession({
chatItems: { item_a: new llm.ChatMessage({ role: 'user', content: '', id: 'item_a' }) },
});
const emissions: llm.InputTranscriptionCompleted[] = [];
session.on('input_audio_transcription_completed', (ev) => emissions.push(ev));

session.handleConversationItemInputAudioTranscriptionCompleted(
completed('item_a', 'one-shot whisper-1 transcript'),
);

expect(session.inputTranscriptAccumulators.size).toBe(0);
expect(emissions).toEqual([
{ itemId: 'item_a', transcript: 'one-shot whisper-1 transcript', isFinal: true },
]);
});

it('skips emission for empty or missing deltas and does not create an accumulator', () => {
const session = createTranscriptSession();
const emissions: llm.InputTranscriptionCompleted[] = [];
session.on('input_audio_transcription_completed', (ev) => emissions.push(ev));

session.handleConversationItemInputAudioTranscriptionDelta(delta('item_a', ''));
session.handleConversationItemInputAudioTranscriptionDelta({
type: 'conversation.item.input_audio_transcription.delta',
event_id: 'evt_no_delta',
item_id: 'item_a',
});
session.handleConversationItemInputAudioTranscriptionDelta(delta('item_a', 'real'));

expect(session.inputTranscriptAccumulators.get('item_a')?.get(0)).toBe('real');
expect(emissions).toEqual([{ itemId: 'item_a', transcript: 'real', isFinal: false }]);
});

it('clears the accumulator and emits a closing isFinal:true on transcription failure when partials had streamed', () => {
const session = createTranscriptSession();
const emissions: llm.InputTranscriptionCompleted[] = [];
session.on('input_audio_transcription_completed', (ev) => emissions.push(ev));

session.handleConversationItemInputAudioTranscriptionDelta(delta('item_a', 'partial '));
session.handleConversationItemInputAudioTranscriptionDelta(delta('item_a', 'text'));
session.finalizePartialOnTranscriptionFailure('item_a', 0);

expect(session.inputTranscriptAccumulators.size).toBe(0);
expect(emissions).toEqual([
{ itemId: 'item_a', transcript: 'partial ', isFinal: false },
{ itemId: 'item_a', transcript: 'partial text', isFinal: false },
{ itemId: 'item_a', transcript: 'partial text', isFinal: true },
]);
});

it('emits nothing on transcription failure when no partials had streamed for that item', () => {
const session = createTranscriptSession();
const emissions: llm.InputTranscriptionCompleted[] = [];
session.on('input_audio_transcription_completed', (ev) => emissions.push(ev));

session.finalizePartialOnTranscriptionFailure('item_a', 0);

expect(emissions).toEqual([]);
expect(session.inputTranscriptAccumulators.size).toBe(0);
});

it('clears the accumulator when the conversation item is deleted', () => {
const session = createTranscriptSession();

session.handleConversationItemInputAudioTranscriptionDelta(delta('item_a', 'partial '));
session.handleConversationItemInputAudioTranscriptionDelta(delta('item_a', 'more', 1));
expect(session.inputTranscriptAccumulators.get('item_a')?.size).toBe(2);

session.handleConversationItemDeleted({
type: 'conversation.item.deleted',
event_id: 'evt_del',
item_id: 'item_a',
});

expect(session.inputTranscriptAccumulators.has('item_a')).toBe(false);
});
});

describe('livekitItemToOpenAIItem', () => {
describe('message items', () => {
it('should use output_text type for assistant messages', async () => {
Expand Down
Loading
Loading