From 304ffe6d3acc4b61de957248493730d4165984eb Mon Sep 17 00:00:00 2001 From: "rosetta-livekit-bot[bot]" <282703043+rosetta-livekit-bot[bot]@users.noreply.github.com> Date: Mon, 25 May 2026 15:33:17 +0000 Subject: [PATCH 1/3] feat(plugins-soniox): surface per-run language segments --- .changeset/soniox-language-segments.md | 6 + agents/src/stt/stt.ts | 18 + plugins/soniox/README.md | 3 + plugins/soniox/api-extractor.json | 8 + .../soniox/etc/agents-plugin-soniox.api.md | 102 +++ plugins/soniox/package.json | 51 ++ plugins/soniox/src/index.ts | 18 + plugins/soniox/src/stt.ts | 587 ++++++++++++++++++ plugins/soniox/tsconfig.json | 15 + plugins/soniox/tsup.config.ts | 7 + pnpm-lock.yaml | 25 + turbo.json | 1 + 12 files changed, 841 insertions(+) create mode 100644 .changeset/soniox-language-segments.md create mode 100644 plugins/soniox/README.md create mode 100644 plugins/soniox/api-extractor.json create mode 100644 plugins/soniox/etc/agents-plugin-soniox.api.md create mode 100644 plugins/soniox/package.json create mode 100644 plugins/soniox/src/index.ts create mode 100644 plugins/soniox/src/stt.ts create mode 100644 plugins/soniox/tsconfig.json create mode 100644 plugins/soniox/tsup.config.ts diff --git a/.changeset/soniox-language-segments.md b/.changeset/soniox-language-segments.md new file mode 100644 index 000000000..782c59f10 --- /dev/null +++ b/.changeset/soniox-language-segments.md @@ -0,0 +1,6 @@ +--- +'@livekit/agents': patch +'@livekit/agents-plugin-soniox': patch +--- + +Add Soniox STT support and surface per-run source and target language segments on STT speech data. diff --git a/agents/src/stt/stt.ts b/agents/src/stt/stt.ts index 474a53101..5bd922e32 100644 --- a/agents/src/stt/stt.ts +++ b/agents/src/stt/stt.ts @@ -75,6 +75,24 @@ export interface SpeechData { * May contain multiple entries when a single utterance spans multiple source languages. */ sourceLanguages?: LanguageCode[]; + /** + * The original transcription segments in the source language(s), when translation is active. + * Each entry corresponds to the same-indexed entry in `sourceLanguages`. + */ + sourceTexts?: string[]; + /** + * The target language(s) produced by a translation-capable STT service, one entry per + * consecutive same-language run, parallel to `targetTexts`. + * + * `language` holds the dominant or first target language and `targetLanguages` carries the + * fine-grained per-run breakdown. Populated when translation is active. + */ + targetLanguages?: LanguageCode[]; + /** + * The translated transcription segments in the target language(s). + * Each entry corresponds to the same-indexed entry in `targetLanguages`. + */ + targetTexts?: string[]; /** * Optional plugin-specific metadata (e.g. voice profile, provider diagnostics). * diff --git a/plugins/soniox/README.md b/plugins/soniox/README.md new file mode 100644 index 000000000..1b968f822 --- /dev/null +++ b/plugins/soniox/README.md @@ -0,0 +1,3 @@ +# Soniox plugin for LiveKit Agents + +Support for Soniox Speech-to-Text streaming. diff --git a/plugins/soniox/api-extractor.json b/plugins/soniox/api-extractor.json new file mode 100644 index 000000000..baa041649 --- /dev/null +++ b/plugins/soniox/api-extractor.json @@ -0,0 +1,8 @@ +/** + * Config file for API Extractor. For more info, please visit: https://api-extractor.com + */ +{ + "$schema": "https://developer.microsoft.com/json-schemas/api-extractor/v7/api-extractor.schema.json", + "extends": "../../api-extractor-shared.json", + "mainEntryPointFilePath": "./dist/index.d.ts" +} diff --git a/plugins/soniox/etc/agents-plugin-soniox.api.md b/plugins/soniox/etc/agents-plugin-soniox.api.md new file mode 100644 index 000000000..92bc97f76 --- /dev/null +++ b/plugins/soniox/etc/agents-plugin-soniox.api.md @@ -0,0 +1,102 @@ +## API Report File for "@livekit/agents-plugin-soniox" + +> Do not edit this file. It is a report generated by [API Extractor](https://api-extractor.com/). + +```ts + +import { APIConnectOptions } from '@livekit/agents'; +import { AudioBuffer as AudioBuffer_2 } from '@livekit/agents'; +import { stt } from '@livekit/agents'; + +// @public (undocumented) +export interface ContextGeneralItem { + // (undocumented) + key: string; + // (undocumented) + value: string; +} + +// @public (undocumented) +export interface ContextObject { + general?: ContextGeneralItem[]; + terms?: string[]; + text?: string; + translationTerms?: ContextTranslationTerm[]; +} + +// @public (undocumented) +export interface ContextTranslationTerm { + // (undocumented) + source: string; + // (undocumented) + target: string; +} + +// @public (undocumented) +export class SpeechStream extends stt.SpeechStream { + constructor(stt: STT, opts: STTOptions, connOptions?: APIConnectOptions); + // (undocumented) + label: string; + // (undocumented) + protected run(): Promise; +} + +// @public (undocumented) +export class STT extends stt.STT { + constructor(opts?: Partial); + // (undocumented) + label: string; + // (undocumented) + get model(): string; + // (undocumented) + get provider(): string; + // (undocumented) + _recognize(_: AudioBuffer_2): Promise; + // (undocumented) + stream(options?: { + connOptions?: APIConnectOptions; + }): SpeechStream; +} + +// @public (undocumented) +export interface STTOptions { + // (undocumented) + apiKey?: string; + // (undocumented) + baseUrl: string; + // (undocumented) + clientReferenceId?: string; + // (undocumented) + context?: ContextObject | string; + // (undocumented) + enableLanguageIdentification: boolean; + // (undocumented) + enableSpeakerDiarization: boolean; + // (undocumented) + languageHints?: string[]; + // (undocumented) + languageHintsStrict: boolean; + maxEndpointDelayMs: number; + // (undocumented) + model: string; + // (undocumented) + numChannels: number; + // (undocumented) + sampleRate: number; + // (undocumented) + translation?: TranslationConfig; +} + +// @public (undocumented) +export type TranslationConfig = { + type: 'one_way'; + targetLanguage: string; +} | { + type: 'two_way'; + languageA: string; + languageB: string; +}; + +// (No @packageDocumentation comment for this package) + +``` diff --git a/plugins/soniox/package.json b/plugins/soniox/package.json new file mode 100644 index 000000000..fd1495f0d --- /dev/null +++ b/plugins/soniox/package.json @@ -0,0 +1,51 @@ +{ + "name": "@livekit/agents-plugin-soniox", + "version": "1.4.4", + "description": "Soniox plugin for LiveKit Agents for Node.js", + "main": "dist/index.js", + "require": "dist/index.cjs", + "types": "dist/index.d.ts", + "exports": { + "import": { + "types": "./dist/index.d.ts", + "default": "./dist/index.js" + }, + "require": { + "types": "./dist/index.d.cts", + "default": "./dist/index.cjs" + } + }, + "author": "LiveKit", + "type": "module", + "repository": "git@github.com:livekit/agents-js.git", + "license": "Apache-2.0", + "files": [ + "dist", + "src", + "README.md" + ], + "scripts": { + "build": "tsup --onSuccess \"pnpm build:types\"", + "build:types": "tsc --declaration --emitDeclarationOnly && node ../../scripts/copyDeclarationOutput.js", + "clean": "rm -rf dist", + "clean:build": "pnpm clean && pnpm build", + "lint": "eslint -f unix \"src/**/*.{ts,js}\"", + "api:check": "api-extractor run --typescript-compiler-folder ../../node_modules/typescript", + "api:update": "api-extractor run --local --typescript-compiler-folder ../../node_modules/typescript --verbose" + }, + "devDependencies": { + "@livekit/agents": "workspace:*", + "@livekit/rtc-node": "catalog:", + "@microsoft/api-extractor": "^7.35.0", + "@types/ws": "catalog:", + "tsup": "^8.3.5", + "typescript": "^5.0.0" + }, + "dependencies": { + "ws": "catalog:" + }, + "peerDependencies": { + "@livekit/agents": "workspace:*", + "@livekit/rtc-node": "catalog:" + } +} diff --git a/plugins/soniox/src/index.ts b/plugins/soniox/src/index.ts new file mode 100644 index 000000000..be17b5166 --- /dev/null +++ b/plugins/soniox/src/index.ts @@ -0,0 +1,18 @@ +// SPDX-FileCopyrightText: 2026 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import { Plugin } from '@livekit/agents'; + +export * from './stt.js'; + +class SonioxPlugin extends Plugin { + constructor() { + super({ + title: 'soniox', + version: __PACKAGE_VERSION__, + package: __PACKAGE_NAME__, + }); + } +} + +Plugin.registerPlugin(new SonioxPlugin()); diff --git a/plugins/soniox/src/stt.ts b/plugins/soniox/src/stt.ts new file mode 100644 index 000000000..7567ea900 --- /dev/null +++ b/plugins/soniox/src/stt.ts @@ -0,0 +1,587 @@ +// SPDX-FileCopyrightText: 2026 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import { + type APIConnectOptions, + APIConnectionError, + APITimeoutError, + type AudioBuffer, + log, + stt, + waitForAbort, +} from '@livekit/agents'; +import { WebSocket } from 'ws'; + +const BASE_URL = 'wss://stt-rt.soniox.com/transcribe-websocket'; +const KEEPALIVE_MESSAGE = '{"type":"keepalive"}'; +const END_TOKEN = ''; +const FINALIZED_TOKEN = ''; + +/** @public */ +export interface ContextGeneralItem { + key: string; + value: string; +} + +/** @public */ +export interface ContextTranslationTerm { + source: string; + target: string; +} + +/** @public */ +export interface ContextObject { + /** Context key-value pairs. */ + general?: ContextGeneralItem[]; + /** Free-form text context. */ + text?: string; + /** Terms to bias recognition toward. */ + terms?: string[]; + /** Translation-specific source/target term pairs. */ + translationTerms?: ContextTranslationTerm[]; +} + +/** @public */ +export type TranslationConfig = + | { + type: 'one_way'; + /** Target language for one-way translation. */ + targetLanguage: string; + } + | { + type: 'two_way'; + /** First language for two-way translation. */ + languageA: string; + /** Second language for two-way translation. */ + languageB: string; + }; + +/** @public */ +export interface STTOptions { + apiKey?: string; + baseUrl: string; + model: string; + languageHints?: string[]; + languageHintsStrict: boolean; + context?: ContextObject | string; + numChannels: number; + sampleRate: number; + enableSpeakerDiarization: boolean; + enableLanguageIdentification: boolean; + /** Maximum delay in milliseconds between speech cessation and endpoint detection. */ + maxEndpointDelayMs: number; + clientReferenceId?: string; + translation?: TranslationConfig; +} + +const defaultSTTOptions: STTOptions = { + apiKey: process.env.SONIOX_API_KEY, + baseUrl: BASE_URL, + model: 'stt-rt-v4', + languageHintsStrict: false, + numChannels: 1, + sampleRate: 16000, + enableSpeakerDiarization: false, + enableLanguageIdentification: true, + maxEndpointDelayMs: 500, +}; + +/** @public */ +export class STT extends stt.STT { + #opts: STTOptions; + label = 'soniox.STT'; + + constructor(opts: Partial = {}) { + const merged = { ...defaultSTTOptions, ...opts }; + if (!merged.apiKey) { + throw new Error('Soniox API key is required. Set SONIOX_API_KEY or pass apiKey'); + } + if (merged.maxEndpointDelayMs < 500 || merged.maxEndpointDelayMs > 3000) { + throw new Error('maxEndpointDelayMs must be between 500 and 3000'); + } + + super({ + streaming: true, + interimResults: true, + alignedTranscript: 'chunk', + diarization: merged.enableSpeakerDiarization, + }); + this.#opts = merged; + } + + get model(): string { + return this.#opts.model; + } + + get provider(): string { + return 'Soniox'; + } + + // eslint-disable-next-line @typescript-eslint/no-unused-vars + async _recognize(_: AudioBuffer): Promise { + throw new Error('Soniox Speech-to-Text API does not support single frame recognition'); + } + + stream(options?: { connOptions?: APIConnectOptions }): SpeechStream { + return new SpeechStream(this, this.#opts, options?.connOptions); + } +} + +interface SonioxToken { + text: string; + is_final: boolean; + translation_status?: string; + language?: string; + speaker?: string | number; + start_ms?: number; + end_ms?: number; + confidence?: number; +} + +interface SonioxMessage { + tokens?: SonioxToken[]; + total_audio_proc_ms?: number; + finished?: boolean; + error_code?: string; + error_message?: string; +} + +/** @public */ +export class SpeechStream extends stt.SpeechStream { + #opts: STTOptions; + #logger = log(); + #reportedDurationMs = 0; + label = 'soniox.SpeechStream'; + + constructor(stt: STT, opts: STTOptions, connOptions?: APIConnectOptions) { + super(stt, opts.sampleRate, connOptions); + this.#opts = opts; + } + + protected async run(): Promise { + let ws: WebSocket | undefined; + try { + ws = await this.#connectWS(); + await this.#runWS(ws); + } catch (error) { + if (error instanceof APITimeoutError || error instanceof APIConnectionError) { + throw error; + } + throw new APIConnectionError({ + message: `Soniox Speech-to-Text API connection error: ${error}`, + }); + } finally { + ws?.close(); + } + } + + async #connectWS(): Promise { + const ws = new WebSocket(this.#opts.baseUrl); + const timeout = setTimeout(() => { + ws.terminate(); + }, 10000); + + try { + await new Promise((resolve, reject) => { + ws.once('open', () => resolve()); + ws.once('error', (error) => reject(error)); + ws.once('close', (code) => reject(new Error(`WebSocket returned ${code}`))); + }); + } catch (error) { + throw new APITimeoutError({ + message: `Timeout connecting to or initializing Soniox Speech-to-Text API session: ${error}`, + }); + } finally { + clearTimeout(timeout); + } + + ws.send(JSON.stringify(this.#config())); + this.#reportedDurationMs = 0; + return ws; + } + + #config(): Record { + const config: Record = { + api_key: this.#opts.apiKey, + model: this.#opts.model, + audio_format: 'pcm_s16le', + num_channels: this.#opts.numChannels, + enable_endpoint_detection: true, + sample_rate: this.#opts.sampleRate, + language_hints: this.#opts.languageHints, + language_hints_strict: this.#opts.languageHintsStrict, + context: serializeContext(this.#opts.context), + enable_speaker_diarization: this.#opts.enableSpeakerDiarization, + enable_language_identification: this.#opts.enableLanguageIdentification, + client_reference_id: this.#opts.clientReferenceId, + max_endpoint_delay_ms: this.#opts.maxEndpointDelayMs, + }; + + if (this.#opts.translation) { + config.translation = serializeTranslation(this.#opts.translation); + } + + return Object.fromEntries(Object.entries(config).filter(([, value]) => value !== undefined)); + } + + async #runWS(ws: WebSocket): Promise { + let closing = false; + const isTranslationMode = this.#opts.translation !== undefined; + const final = new TokenAccumulator(); + const finalOriginal = new TokenAccumulator(); + let isSpeaking = false; + + const sendEndpointTranscript = () => { + if (final.text) { + const [srcSegs, tgtSegs] = isTranslationMode + ? [finalOriginal.langSegments, final.langSegments] + : [final.langSegments, []]; + const [sourceLanguages, sourceTexts] = langSegmentsToFields(srcSegs); + const [targetLanguages, targetTexts] = langSegmentsToFields(tgtSegs); + + this.#put({ + type: stt.SpeechEventType.FINAL_TRANSCRIPT, + alternatives: [ + final.toSpeechData(this.startTimeOffset, { + sourceLanguages, + sourceTexts, + targetLanguages, + targetTexts, + }), + ], + }); + this.#put({ type: stt.SpeechEventType.END_OF_SPEECH }); + final.reset(); + finalOriginal.reset(); + isSpeaking = false; + } else { + finalOriginal.reset(); + } + }; + + const keepalive = setInterval(() => { + if (ws.readyState === WebSocket.OPEN) { + ws.send(KEEPALIVE_MESSAGE); + } + }, 5000); + + const sendTask = this.#sendAudio(ws, () => { + closing = true; + }); + + const listenTask = new Promise((resolve, reject) => { + ws.on('message', (msg) => { + try { + const content = JSON.parse(msg.toString()) as SonioxMessage; + const tokens = content.tokens ?? []; + const nonFinal = new TokenAccumulator(); + const nonFinalOriginal = new TokenAccumulator(); + const totalAudioProcMs = content.total_audio_proc_ms ?? 0; + + for (const token of tokens) { + const isTranslated = token.translation_status === 'translation'; + if (isTranslationMode && !isEndToken(token) && !isTranslated) { + if (token.is_final) { + finalOriginal.update(token); + } else { + nonFinalOriginal.update(token); + } + continue; + } + if (token.is_final) { + if (isEndToken(token)) { + sendEndpointTranscript(); + this.#reportProcessedAudioDuration(totalAudioProcMs); + } else { + final.update(token); + } + } else { + nonFinal.update(token); + } + } + + if (final.text || nonFinal.text) { + if (!isSpeaking) { + isSpeaking = true; + this.#put({ type: stt.SpeechEventType.START_OF_SPEECH }); + } + + const mergedOriginals = mergeLangSegments( + finalOriginal.langSegments, + nonFinalOriginal.langSegments, + ); + const mergedPrimary = mergeLangSegments(final.langSegments, nonFinal.langSegments); + const [srcSegs, tgtSegs] = isTranslationMode + ? [mergedOriginals, mergedPrimary] + : [mergedPrimary, []]; + const [sourceLanguages, sourceTexts] = langSegmentsToFields(srcSegs); + const [targetLanguages, targetTexts] = langSegmentsToFields(tgtSegs); + const eventType = + final.text && !nonFinal.text + ? stt.SpeechEventType.PREFLIGHT_TRANSCRIPT + : stt.SpeechEventType.INTERIM_TRANSCRIPT; + + this.#put({ + type: eventType, + alternatives: [ + final.mergedSpeechData(nonFinal, this.startTimeOffset, { + sourceLanguages, + sourceTexts, + targetLanguages, + targetTexts, + }), + ], + }); + } + + if (content.finished || content.error_code || content.error_message) { + sendEndpointTranscript(); + this.#reportProcessedAudioDuration(totalAudioProcMs); + } + + if (content.error_code || content.error_message) { + this.#logger.error( + `WebSocket error: ${content.error_code ?? ''} - ${content.error_message ?? ''}`, + ); + } + + if (content.finished) { + resolve(); + } + } catch (error) { + reject(error); + } + }); + ws.once('error', (error) => reject(error)); + ws.once('close', (code) => { + if (!closing) { + reject(new Error(`Soniox STT WebSocket closed with code ${code}`)); + } else { + resolve(); + } + }); + }); + + try { + await Promise.race([sendTask, listenTask, waitForAbort(this.abortSignal)]); + } finally { + closing = true; + clearInterval(keepalive); + ws.close(); + } + } + + async #sendAudio(ws: WebSocket, onClosing: () => void): Promise { + const abortPromise = waitForAbort(this.abortSignal); + while (!this.closed) { + const result = await Promise.race([this.input.next(), abortPromise]); + if (result === undefined || result.done) { + break; + } + + const data = result.value; + if (data === SpeechStream.FLUSH_SENTINEL) { + continue; + } + ws.send(data.data.buffer); + } + onClosing(); + } + + #reportProcessedAudioDuration(totalAudioProcMs: number): void { + const toReportMs = totalAudioProcMs - this.#reportedDurationMs; + if (toReportMs <= 0) return; + this.#put({ + type: stt.SpeechEventType.RECOGNITION_USAGE, + recognitionUsage: { + audioDuration: toReportMs / 1000, + }, + }); + this.#reportedDurationMs = Math.trunc(totalAudioProcMs); + } + + #put(event: stt.SpeechEvent): void { + if (!this.queue.closed) { + this.queue.put(event); + } + } +} + +const isEndToken = (token: SonioxToken): boolean => + token.text === END_TOKEN || token.text === FINALIZED_TOKEN; + +const serializeContext = (context: ContextObject | string | undefined): unknown => { + if (context === undefined || typeof context === 'string') return context; + return { + general: context.general, + text: context.text, + terms: context.terms, + translation_terms: context.translationTerms, + }; +}; + +const serializeTranslation = (translation: TranslationConfig): Record => { + if (translation.type === 'one_way') { + return { type: 'one_way', target_language: translation.targetLanguage }; + } + return { + type: 'two_way', + language_a: translation.languageA, + language_b: translation.languageB, + }; +}; + +type LangSegment = [stt.SpeechData['language'], string]; + +const mergeLangSegments = (a: LangSegment[], b: LangSegment[]): LangSegment[] => { + const result = [...a]; + for (const [lang, text] of b) { + const last = result[result.length - 1]; + if (last && last[0] === lang) { + last[1] += text; + } else { + result.push([lang, text]); + } + } + return result; +}; + +const langSegmentsToFields = ( + segments: LangSegment[], +): [stt.SpeechData['sourceLanguages'] | undefined, string[] | undefined] => { + if (segments.length === 0) return [undefined, undefined]; + return [segments.map(([lang]) => lang), segments.map(([, text]) => text)]; +}; + +interface SpeechDataFields { + sourceLanguages?: stt.SpeechData['sourceLanguages']; + sourceTexts?: string[]; + targetLanguages?: stt.SpeechData['targetLanguages']; + targetTexts?: string[]; +} + +interface LangStats { + numChars: number; + updatedAt: number; +} + +class TokenAccumulator { + text = ''; + language: stt.SpeechData['language'] = '' as stt.SpeechData['language']; + speakerId?: string; + startTime = 0; + endTime = 0; + #confidenceSum = 0; + #confidenceCount = 0; + #hasStartTime = false; + #langSegments: LangSegment[] = []; + #langStats = new Map(); + + get langSegments(): LangSegment[] { + return this.#langSegments; + } + + get confidence(): number { + return this.#confidenceCount === 0 ? 0 : this.#confidenceSum / this.#confidenceCount; + } + + update(token: SonioxToken): void { + const text = token.text; + const lang = (token.language ?? '') as stt.SpeechData['language']; + this.text += text; + if (lang && text) { + const stats = this.#langStats.get(lang) ?? { numChars: 0, updatedAt: 0 }; + this.#langStats.set(lang, { numChars: stats.numChars + text.length, updatedAt: Date.now() }); + this.language = this.#getLanguage(); + } + if (token.speaker !== undefined && this.speakerId === undefined) { + this.speakerId = String(token.speaker); + } + if (token.start_ms !== undefined && !this.#hasStartTime) { + this.#hasStartTime = true; + this.startTime = token.start_ms; + } + if (token.end_ms !== undefined) { + this.endTime = token.end_ms; + } + if (token.confidence !== undefined) { + this.#confidenceSum += token.confidence; + this.#confidenceCount += 1; + } + if (text) { + const last = this.#langSegments[this.#langSegments.length - 1]; + if (last && last[0] === lang) { + last[1] += text; + } else { + this.#langSegments.push([lang, text]); + } + } + } + + reset(): void { + this.text = ''; + this.language = '' as stt.SpeechData['language']; + this.speakerId = undefined; + this.startTime = 0; + this.endTime = 0; + this.#confidenceSum = 0; + this.#confidenceCount = 0; + this.#hasStartTime = false; + this.#langSegments = []; + this.#langStats.clear(); + } + + toSpeechData(startTimeOffset = 0, fields: SpeechDataFields = {}): stt.SpeechData { + return { + text: this.text, + language: this.language, + sourceLanguages: fields.sourceLanguages, + sourceTexts: fields.sourceTexts, + targetLanguages: fields.targetLanguages, + targetTexts: fields.targetTexts, + speakerId: this.speakerId, + startTime: this.startTime / 1000 + startTimeOffset, + endTime: this.endTime / 1000 + startTimeOffset, + confidence: this.confidence, + }; + } + + mergedSpeechData( + other: TokenAccumulator, + startTimeOffset = 0, + fields: SpeechDataFields = {}, + ): stt.SpeechData { + const starts = [this, other].filter((acc) => acc.#hasStartTime).map((acc) => acc.startTime); + const start = starts.length ? Math.min(...starts) : 0; + const totalCount = this.#confidenceCount + other.#confidenceCount; + const totalSum = this.#confidenceSum + other.#confidenceSum; + return { + text: this.text + other.text, + language: this.language || other.language, + sourceLanguages: fields.sourceLanguages, + sourceTexts: fields.sourceTexts, + targetLanguages: fields.targetLanguages, + targetTexts: fields.targetTexts, + speakerId: this.speakerId ?? other.speakerId, + startTime: start / 1000 + startTimeOffset, + endTime: Math.max(this.endTime, other.endTime) / 1000 + startTimeOffset, + confidence: totalCount > 0 ? totalSum / totalCount : 0, + }; + } + + #getLanguage(): stt.SpeechData['language'] { + let selected = ''; + let selectedStats: LangStats | undefined; + for (const [lang, stats] of this.#langStats) { + if ( + !selectedStats || + stats.numChars > selectedStats.numChars || + (stats.numChars === selectedStats.numChars && stats.updatedAt < selectedStats.updatedAt) + ) { + selected = lang; + selectedStats = stats; + } + } + return selected as stt.SpeechData['language']; + } +} diff --git a/plugins/soniox/tsconfig.json b/plugins/soniox/tsconfig.json new file mode 100644 index 000000000..d72dc3a34 --- /dev/null +++ b/plugins/soniox/tsconfig.json @@ -0,0 +1,15 @@ +{ + "extends": "../../tsconfig.json", + "include": ["./src"], + "compilerOptions": { + "rootDir": "./src", + "declarationDir": "./dist", + "outDir": "./dist" + }, + "typedocOptions": { + "name": "plugins/agents-plugin-soniox", + "entryPointStrategy": "resolve", + "readme": "none", + "entryPoints": ["src/index.ts"] + } +} diff --git a/plugins/soniox/tsup.config.ts b/plugins/soniox/tsup.config.ts new file mode 100644 index 000000000..8ca20961f --- /dev/null +++ b/plugins/soniox/tsup.config.ts @@ -0,0 +1,7 @@ +import { defineConfig } from 'tsup'; + +import defaults from '../../tsup.config'; + +export default defineConfig({ + ...defaults, +}); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 88e0a5b64..8065c62e2 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1234,6 +1234,31 @@ importers: specifier: ^5.0.0 version: 5.9.3 + plugins/soniox: + dependencies: + ws: + specifier: 'catalog:' + version: 8.20.1 + devDependencies: + '@livekit/agents': + specifier: workspace:* + version: link:../../agents + '@livekit/rtc-node': + specifier: 'catalog:' + version: 0.13.27 + '@microsoft/api-extractor': + specifier: ^7.35.0 + version: 7.43.7(@types/node@25.6.0) + '@types/ws': + specifier: 'catalog:' + version: 8.18.1 + tsup: + specifier: ^8.3.5 + version: 8.4.0(@microsoft/api-extractor@7.43.7(@types/node@25.6.0))(postcss@8.5.9)(tsx@4.21.0)(typescript@5.9.3) + typescript: + specifier: ^5.0.0 + version: 5.9.3 + plugins/tavus: dependencies: livekit-server-sdk: diff --git a/turbo.json b/turbo.json index 7fd515bc2..06a1aaa4c 100644 --- a/turbo.json +++ b/turbo.json @@ -74,6 +74,7 @@ "RUNWAY_AVATAR_ID", "RUNWAY_AVATAR_PRESET_ID", "SARVAM_API_KEY", + "SONIOX_API_KEY", "SIP_PARTICIPANT_IDENTITY", "SIP_PHONE_NUMBER", "LK_OPENAI_DEBUG", From 8084edb22fa983572ed7b3349d9ae13fd2039e2e Mon Sep 17 00:00:00 2001 From: Chenghao Mou Date: Mon, 25 May 2026 20:12:19 +0100 Subject: [PATCH 2/3] refactoring --- plugins/soniox/src/_internal.ts | 323 ++++++++++++++++++++++++++++++++ plugins/soniox/src/stt.test.ts | 303 ++++++++++++++++++++++++++++++ plugins/soniox/src/stt.ts | 297 +---------------------------- 3 files changed, 633 insertions(+), 290 deletions(-) create mode 100644 plugins/soniox/src/_internal.ts create mode 100644 plugins/soniox/src/stt.test.ts diff --git a/plugins/soniox/src/_internal.ts b/plugins/soniox/src/_internal.ts new file mode 100644 index 000000000..5826e5a70 --- /dev/null +++ b/plugins/soniox/src/_internal.ts @@ -0,0 +1,323 @@ +// SPDX-FileCopyrightText: 2026 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Module-private helpers for the Soniox plugin. Imported by `stt.ts` and +// exercised by `stt.test.ts`. Not re-exported from `index.ts`, and the +// package `exports` map blocks consumers from importing this file directly. +import { stt } from '@livekit/agents'; + +const END_TOKEN = ''; +const FINALIZED_TOKEN = ''; + +export interface SonioxToken { + text: string; + is_final: boolean; + translation_status?: string; + language?: string; + speaker?: string | number; + start_ms?: number; + end_ms?: number; + confidence?: number; +} + +export interface SonioxMessage { + tokens?: SonioxToken[]; + total_audio_proc_ms?: number; + finished?: boolean; + error_code?: string; + error_message?: string; +} + +export type LangSegment = [stt.SpeechData['language'], string]; + +export const isEndToken = (token: SonioxToken): boolean => + token.text === END_TOKEN || token.text === FINALIZED_TOKEN; + +export const mergeLangSegments = (a: LangSegment[], b: LangSegment[]): LangSegment[] => { + const result = [...a]; + for (const [lang, text] of b) { + const last = result[result.length - 1]; + if (last && last[0] === lang) { + last[1] += text; + } else { + result.push([lang, text]); + } + } + return result; +}; + +type LangFields = Pick< + stt.SpeechData, + 'sourceLanguages' | 'sourceTexts' | 'targetLanguages' | 'targetTexts' +>; + +/** + * Route language segments into `SpeechData`'s source/target field pairs. + * In translation mode `primary` becomes the target side and `original` the + * source side; otherwise `primary` is the source and there is no target. + * Empty inputs are omitted from the result. + */ +export const langFields = ( + isTranslationMode: boolean, + primary: LangSegment[], + original: LangSegment[], +): LangFields => { + const source = isTranslationMode ? original : primary; + const target = isTranslationMode ? primary : []; + const fields: LangFields = {}; + if (source.length) { + fields.sourceLanguages = source.map(([lang]) => lang); + fields.sourceTexts = source.map(([, text]) => text); + } + if (target.length) { + fields.targetLanguages = target.map(([lang]) => lang); + fields.targetTexts = target.map(([, text]) => text); + } + return fields; +}; + +export class TokenAccumulator { + text = ''; + language: stt.SpeechData['language'] = '' as stt.SpeechData['language']; + speakerId?: string; + startTime = 0; + endTime = 0; + #confidenceSum = 0; + #confidenceCount = 0; + #hasStartTime = false; + #langSegments: LangSegment[] = []; + // Map iteration is insertion-ordered; the strict `>` in #getLanguage means + // the first-inserted language wins on ties. Python uses min(last-updated) + // as its tiebreaker; this insertion-order semantic is close enough for an + // opinionated lossy summary and avoids tracking timestamps. + #langStats = new Map(); + + get langSegments(): LangSegment[] { + return this.#langSegments; + } + + get confidence(): number { + return this.#confidenceCount === 0 ? 0 : this.#confidenceSum / this.#confidenceCount; + } + + update(token: SonioxToken): void { + const text = token.text; + const lang = (token.language ?? '') as stt.SpeechData['language']; + this.text += text; + if (lang && text) { + this.#langStats.set(lang, (this.#langStats.get(lang) ?? 0) + text.length); + this.language = this.#getLanguage(); + } + if (token.speaker !== undefined && this.speakerId === undefined) { + this.speakerId = String(token.speaker); + } + if (token.start_ms !== undefined && !this.#hasStartTime) { + this.#hasStartTime = true; + this.startTime = token.start_ms; + } + if (token.end_ms !== undefined) { + this.endTime = token.end_ms; + } + if (token.confidence !== undefined) { + this.#confidenceSum += token.confidence; + this.#confidenceCount += 1; + } + if (text) { + const last = this.#langSegments[this.#langSegments.length - 1]; + if (last && last[0] === lang) { + last[1] += text; + } else { + this.#langSegments.push([lang, text]); + } + } + } + + reset(): void { + this.text = ''; + this.language = '' as stt.SpeechData['language']; + this.speakerId = undefined; + this.startTime = 0; + this.endTime = 0; + this.#confidenceSum = 0; + this.#confidenceCount = 0; + this.#hasStartTime = false; + this.#langSegments = []; + this.#langStats.clear(); + } + + toSpeechData(startTimeOffset = 0): stt.SpeechData { + return { + text: this.text, + language: this.language, + speakerId: this.speakerId, + startTime: this.startTime / 1000 + startTimeOffset, + endTime: this.endTime / 1000 + startTimeOffset, + confidence: this.confidence, + }; + } + + mergedSpeechData(other: TokenAccumulator, startTimeOffset = 0): stt.SpeechData { + const starts = [this, other].filter((acc) => acc.#hasStartTime).map((acc) => acc.startTime); + const start = starts.length ? Math.min(...starts) : 0; + const totalCount = this.#confidenceCount + other.#confidenceCount; + const totalSum = this.#confidenceSum + other.#confidenceSum; + return { + text: this.text + other.text, + language: this.language || other.language, + speakerId: this.speakerId ?? other.speakerId, + startTime: start / 1000 + startTimeOffset, + endTime: Math.max(this.endTime, other.endTime) / 1000 + startTimeOffset, + confidence: totalCount > 0 ? totalSum / totalCount : 0, + }; + } + + #getLanguage(): stt.SpeechData['language'] { + let selected = ''; + let maxChars = -1; + for (const [lang, numChars] of this.#langStats) { + if (numChars > maxChars) { + maxChars = numChars; + selected = lang; + } + } + return selected as stt.SpeechData['language']; + } +} + +/** + * Per-session state mutated across calls to {@link processMessage}. + * `final` and `finalOriginal` accumulate finalized tokens until an endpoint + * is reached; `isSpeaking` gates the START_OF_SPEECH event; `reportedDurationMs` + * tracks audio-duration usage already emitted. + */ +export interface ProcessMessageState { + final: TokenAccumulator; + finalOriginal: TokenAccumulator; + isSpeaking: boolean; + reportedDurationMs: number; +} + +export const newProcessMessageState = (): ProcessMessageState => ({ + final: new TokenAccumulator(), + finalOriginal: new TokenAccumulator(), + isSpeaking: false, + reportedDurationMs: 0, +}); + +export interface ProcessMessageOptions { + isTranslationMode: boolean; + startTimeOffset: number; +} + +function* sendEndpointTranscript( + state: ProcessMessageState, + options: ProcessMessageOptions, +): Generator { + if (state.final.text) { + yield { + type: stt.SpeechEventType.FINAL_TRANSCRIPT, + alternatives: [ + { + ...state.final.toSpeechData(options.startTimeOffset), + ...langFields( + options.isTranslationMode, + state.final.langSegments, + state.finalOriginal.langSegments, + ), + }, + ], + }; + yield { type: stt.SpeechEventType.END_OF_SPEECH }; + state.final.reset(); + state.finalOriginal.reset(); + state.isSpeaking = false; + } else { + state.finalOriginal.reset(); + } +} + +function* reportProcessedAudioDuration( + totalAudioProcMs: number, + state: ProcessMessageState, +): Generator { + const toReportMs = totalAudioProcMs - state.reportedDurationMs; + if (toReportMs <= 0) return; + yield { + type: stt.SpeechEventType.RECOGNITION_USAGE, + recognitionUsage: { + audioDuration: toReportMs / 1000, + }, + }; + state.reportedDurationMs = Math.trunc(totalAudioProcMs); +} + +/** + * Process a single parsed Soniox WebSocket message, mutating `state` and + * yielding the resulting {@link stt.SpeechEvent}s in order. The function is + * pure with respect to its inputs — all session state lives in `state`. + */ +export function* processMessage( + state: ProcessMessageState, + content: SonioxMessage, + options: ProcessMessageOptions, +): Generator { + const tokens = content.tokens ?? []; + const nonFinal = new TokenAccumulator(); + const nonFinalOriginal = new TokenAccumulator(); + const totalAudioProcMs = content.total_audio_proc_ms ?? 0; + + for (const token of tokens) { + const isTranslated = token.translation_status === 'translation'; + if (options.isTranslationMode && !isEndToken(token) && !isTranslated) { + if (token.is_final) { + state.finalOriginal.update(token); + } else { + nonFinalOriginal.update(token); + } + continue; + } + if (token.is_final) { + if (isEndToken(token)) { + yield* sendEndpointTranscript(state, options); + yield* reportProcessedAudioDuration(totalAudioProcMs, state); + } else { + state.final.update(token); + } + } else { + nonFinal.update(token); + } + } + + if (state.final.text || nonFinal.text) { + if (!state.isSpeaking) { + state.isSpeaking = true; + yield { type: stt.SpeechEventType.START_OF_SPEECH }; + } + + const mergedPrimary = mergeLangSegments(state.final.langSegments, nonFinal.langSegments); + const mergedOriginals = mergeLangSegments( + state.finalOriginal.langSegments, + nonFinalOriginal.langSegments, + ); + const eventType = + state.final.text && !nonFinal.text + ? stt.SpeechEventType.PREFLIGHT_TRANSCRIPT + : stt.SpeechEventType.INTERIM_TRANSCRIPT; + + yield { + type: eventType, + alternatives: [ + { + ...state.final.mergedSpeechData(nonFinal, options.startTimeOffset), + ...langFields(options.isTranslationMode, mergedPrimary, mergedOriginals), + }, + ], + }; + } + + if (content.finished || content.error_code || content.error_message) { + yield* sendEndpointTranscript(state, options); + yield* reportProcessedAudioDuration(totalAudioProcMs, state); + } +} diff --git a/plugins/soniox/src/stt.test.ts b/plugins/soniox/src/stt.test.ts new file mode 100644 index 000000000..caf3e298a --- /dev/null +++ b/plugins/soniox/src/stt.test.ts @@ -0,0 +1,303 @@ +// SPDX-FileCopyrightText: 2026 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import { stt } from '@livekit/agents'; +import { describe, expect, it } from 'vitest'; +import { + type SonioxMessage, + type SonioxToken, + TokenAccumulator, + mergeLangSegments, + newProcessMessageState, + processMessage, +} from './_internal.js'; + +// --------------------------------------------------------------------------- +// TokenAccumulator: language-segment coalescing +// --------------------------------------------------------------------------- + +describe('TokenAccumulator', () => { + it('coalesces consecutive same-language token runs into segments', () => { + const accumulator = new TokenAccumulator(); + for (const [lang, text] of [ + ['en', 'Hello'], + ['en', ' world'], + ['es', ' hola'], + ['es', ' mundo'], + ['en', ' again'], + ] as const) { + accumulator.update({ text, language: lang, is_final: true }); + } + + expect(accumulator.langSegments).toEqual([ + ['en', 'Hello world'], + ['es', ' hola mundo'], + ['en', ' again'], + ]); + expect(accumulator.text).toBe('Hello world hola mundo again'); + expect(accumulator.langSegments.map(([, t]: [string, string]) => t).join('')).toBe( + accumulator.text, + ); + }); + + it('starts with no lang segments', () => { + const accumulator = new TokenAccumulator(); + expect(accumulator.langSegments).toEqual([]); + }); + + it('resets all state via reset()', () => { + const accumulator = new TokenAccumulator(); + accumulator.update({ text: 'hi', language: 'en', is_final: true, start_ms: 100, end_ms: 200 }); + accumulator.reset(); + expect(accumulator.text).toBe(''); + expect(accumulator.startTime).toBe(0); + expect(accumulator.endTime).toBe(0); + expect(accumulator.langSegments).toEqual([]); + }); +}); + +// --------------------------------------------------------------------------- +// mergeLangSegments helper +// --------------------------------------------------------------------------- + +describe('mergeLangSegments', () => { + it('appends adjacent same-language segments instead of duplicating them', () => { + const merged = mergeLangSegments( + [['en' as stt.SpeechData['language'], 'Hello']], + [['en' as stt.SpeechData['language'], ' world']], + ); + expect(merged).toEqual([['en', 'Hello world']]); + }); + + it('keeps distinct-language boundaries as separate runs', () => { + const merged = mergeLangSegments( + [['en' as stt.SpeechData['language'], 'Hello']], + [['es' as stt.SpeechData['language'], ' hola']], + ); + expect(merged).toEqual([ + ['en', 'Hello'], + ['es', ' hola'], + ]); + }); +}); + +// --------------------------------------------------------------------------- +// processMessage end-to-end via direct invocation +// --------------------------------------------------------------------------- + +function finalToken(text: string, language: string, translationStatus?: string): SonioxToken { + const token: SonioxToken = { text, language, is_final: true }; + if (translationStatus !== undefined) token.translation_status = translationStatus; + return token; +} + +function nonfinalToken(text: string, language: string, translationStatus?: string): SonioxToken { + const token: SonioxToken = { text, language, is_final: false }; + if (translationStatus !== undefined) token.translation_status = translationStatus; + return token; +} + +const END_TOKEN_FINAL: SonioxToken = { text: '', is_final: true } as SonioxToken; + +function runProcess( + messages: SonioxMessage[], + options: { isTranslationMode: boolean; startTimeOffset?: number } = { isTranslationMode: false }, +): stt.SpeechEvent[] { + const state = newProcessMessageState(); + const events: stt.SpeechEvent[] = []; + for (const msg of messages) { + events.push( + ...processMessage(state, msg, { + isTranslationMode: options.isTranslationMode, + startTimeOffset: options.startTimeOffset ?? 0, + }), + ); + } + return events; +} + +describe('processMessage', () => { + it('two-way translation, code-switched input produces per-run source and target lists', () => { + const events = runProcess( + [ + { + tokens: [ + finalToken('No hablo español, ', 'es', 'original'), + finalToken('but I speak English.', 'en', 'original'), + finalToken("I don't speak Spanish, ", 'en', 'translation'), + finalToken('pero hablo inglés.', 'es', 'translation'), + END_TOKEN_FINAL, + ], + total_audio_proc_ms: 1000, + }, + ], + { isTranslationMode: true }, + ); + + const types = events.map((e) => e.type); + expect(types).toContain(stt.SpeechEventType.FINAL_TRANSCRIPT); + expect(types).toContain(stt.SpeechEventType.END_OF_SPEECH); + + const final = events.find((e) => e.type === stt.SpeechEventType.FINAL_TRANSCRIPT)!; + const sd = final.alternatives![0]!; + + expect(sd.text).toBe("I don't speak Spanish, pero hablo inglés."); + expect(sd.language).toBe('en'); + expect(sd.sourceLanguages).toEqual(['es', 'en']); + expect(sd.sourceTexts).toEqual(['No hablo español, ', 'but I speak English.']); + expect(sd.targetLanguages).toEqual(['en', 'es']); + expect(sd.targetTexts).toEqual(["I don't speak Spanish, ", 'pero hablo inglés.']); + expect(sd.targetTexts!.join('')).toBe(sd.text); + }); + + it('one-way translation produces a single-entry target language list', () => { + const events = runProcess( + [ + { + tokens: [ + finalToken('Hello world.', 'en', 'original'), + finalToken('Hola mundo.', 'es', 'translation'), + END_TOKEN_FINAL, + ], + total_audio_proc_ms: 500, + }, + ], + { isTranslationMode: true }, + ); + + const final = events.find((e) => e.type === stt.SpeechEventType.FINAL_TRANSCRIPT)!; + const sd = final.alternatives![0]!; + + expect(sd.text).toBe('Hola mundo.'); + expect(sd.language).toBe('es'); + expect(sd.sourceLanguages).toEqual(['en']); + expect(sd.sourceTexts).toEqual(['Hello world.']); + expect(sd.targetLanguages).toEqual(['es']); + expect(sd.targetTexts).toEqual(['Hola mundo.']); + }); + + it('untranslated "none" chunk yields asymmetric source and target lists', () => { + const events = runProcess( + [ + { + tokens: [ + finalToken('Good morning. ', 'en', 'original'), + finalToken('Bonjour à tous. ', 'fr', 'none'), + finalToken('How are you?', 'en', 'original'), + finalToken('Guten Morgen. ', 'de', 'translation'), + finalToken("Wie geht's?", 'de', 'translation'), + END_TOKEN_FINAL, + ], + total_audio_proc_ms: 1200, + }, + ], + { isTranslationMode: true }, + ); + + const final = events.find((e) => e.type === stt.SpeechEventType.FINAL_TRANSCRIPT)!; + const sd = final.alternatives![0]!; + + // fr chunk sits between two en chunks → three source runs. + expect(sd.sourceLanguages).toEqual(['en', 'fr', 'en']); + expect(sd.sourceTexts).toEqual(['Good morning. ', 'Bonjour à tous. ', 'How are you?']); + // Both translation tokens are de → single coalesced target run. + expect(sd.targetLanguages).toEqual(['de']); + expect(sd.targetTexts).toEqual(["Guten Morgen. Wie geht's?"]); + // Independent per-run lists may legitimately have different lengths. + expect(sd.sourceLanguages!.length).not.toBe(sd.targetLanguages!.length); + }); + + it('interim transcript merges final-so-far with non-final tokens per run', () => { + const events = runProcess( + [ + { + tokens: [ + finalToken('Hola, ', 'es', 'original'), + finalToken('Hello, ', 'en', 'translation'), + nonfinalToken('¿cómo estás?', 'es', 'original'), + nonfinalToken('how are you?', 'en', 'translation'), + ], + total_audio_proc_ms: 800, + }, + ], + { isTranslationMode: true }, + ); + + const interim = events.find( + (e) => + e.type === stt.SpeechEventType.INTERIM_TRANSCRIPT || + e.type === stt.SpeechEventType.PREFLIGHT_TRANSCRIPT, + )!; + const sd = interim.alternatives![0]!; + + expect(sd.sourceLanguages).toEqual(['es']); + expect(sd.sourceTexts).toEqual(['Hola, ¿cómo estás?']); + expect(sd.targetLanguages).toEqual(['en']); + expect(sd.targetTexts).toEqual(['Hello, how are you?']); + }); + + it('interim transcript surfaces per-run source breakdown in non-translation mode', () => { + const events = runProcess([ + { + tokens: [finalToken('こんにちは、', 'ja'), nonfinalToken('My name is Sam.', 'en')], + total_audio_proc_ms: 600, + }, + ]); + + const interim = events.find( + (e) => + e.type === stt.SpeechEventType.INTERIM_TRANSCRIPT || + e.type === stt.SpeechEventType.PREFLIGHT_TRANSCRIPT, + )!; + const sd = interim.alternatives![0]!; + + expect(sd.sourceLanguages).toEqual(['ja', 'en']); + expect(sd.sourceTexts).toEqual(['こんにちは、', 'My name is Sam.']); + expect(sd.targetLanguages).toBeUndefined(); + expect(sd.targetTexts).toBeUndefined(); + }); + + it('non-translation mode populates source from the per-run breakdown (single language)', () => { + const events = runProcess([ + { + tokens: [finalToken('Hello world.', 'en'), END_TOKEN_FINAL], + total_audio_proc_ms: 500, + }, + ]); + + const final = events.find((e) => e.type === stt.SpeechEventType.FINAL_TRANSCRIPT)!; + const sd = final.alternatives![0]!; + + expect(sd.text).toBe('Hello world.'); + expect(sd.language).toBe('en'); + expect(sd.sourceLanguages).toEqual(['en']); + expect(sd.sourceTexts).toEqual(['Hello world.']); + expect(sd.targetLanguages).toBeUndefined(); + expect(sd.targetTexts).toBeUndefined(); + }); + + it('non-translation, code-switched input carries the per-run source breakdown', () => { + const events = runProcess([ + { + tokens: [ + finalToken('こんにちは、君の名前は何だ。', 'ja'), + finalToken(' My name is Sam.', 'en'), + END_TOKEN_FINAL, + ], + total_audio_proc_ms: 1500, + }, + ]); + + const final = events.find((e) => e.type === stt.SpeechEventType.FINAL_TRANSCRIPT)!; + const sd = final.alternatives![0]!; + + expect(sd.text).toBe('こんにちは、君の名前は何だ。 My name is Sam.'); + // sd.language is the plugin's opinionated lossy summary (most-chars-wins); + // the per-run sourceLanguages / sourceTexts are what this test exercises. + expect(sd.sourceLanguages).toEqual(['ja', 'en']); + expect(sd.sourceTexts).toEqual(['こんにちは、君の名前は何だ。', ' My name is Sam.']); + expect(sd.targetLanguages).toBeUndefined(); + expect(sd.targetTexts).toBeUndefined(); + expect(sd.sourceTexts!.join('')).toBe(sd.text); + }); +}); diff --git a/plugins/soniox/src/stt.ts b/plugins/soniox/src/stt.ts index 7567ea900..057b8d264 100644 --- a/plugins/soniox/src/stt.ts +++ b/plugins/soniox/src/stt.ts @@ -11,11 +11,10 @@ import { waitForAbort, } from '@livekit/agents'; import { WebSocket } from 'ws'; +import { type SonioxMessage, newProcessMessageState, processMessage } from './_internal.js'; const BASE_URL = 'wss://stt-rt.soniox.com/transcribe-websocket'; const KEEPALIVE_MESSAGE = '{"type":"keepalive"}'; -const END_TOKEN = ''; -const FINALIZED_TOKEN = ''; /** @public */ export interface ContextGeneralItem { @@ -127,30 +126,10 @@ export class STT extends stt.STT { } } -interface SonioxToken { - text: string; - is_final: boolean; - translation_status?: string; - language?: string; - speaker?: string | number; - start_ms?: number; - end_ms?: number; - confidence?: number; -} - -interface SonioxMessage { - tokens?: SonioxToken[]; - total_audio_proc_ms?: number; - finished?: boolean; - error_code?: string; - error_message?: string; -} - /** @public */ export class SpeechStream extends stt.SpeechStream { #opts: STTOptions; #logger = log(); - #reportedDurationMs = 0; label = 'soniox.SpeechStream'; constructor(stt: STT, opts: STTOptions, connOptions?: APIConnectOptions) { @@ -196,7 +175,6 @@ export class SpeechStream extends stt.SpeechStream { } ws.send(JSON.stringify(this.#config())); - this.#reportedDurationMs = 0; return ws; } @@ -226,37 +204,10 @@ export class SpeechStream extends stt.SpeechStream { async #runWS(ws: WebSocket): Promise { let closing = false; - const isTranslationMode = this.#opts.translation !== undefined; - const final = new TokenAccumulator(); - const finalOriginal = new TokenAccumulator(); - let isSpeaking = false; - - const sendEndpointTranscript = () => { - if (final.text) { - const [srcSegs, tgtSegs] = isTranslationMode - ? [finalOriginal.langSegments, final.langSegments] - : [final.langSegments, []]; - const [sourceLanguages, sourceTexts] = langSegmentsToFields(srcSegs); - const [targetLanguages, targetTexts] = langSegmentsToFields(tgtSegs); - - this.#put({ - type: stt.SpeechEventType.FINAL_TRANSCRIPT, - alternatives: [ - final.toSpeechData(this.startTimeOffset, { - sourceLanguages, - sourceTexts, - targetLanguages, - targetTexts, - }), - ], - }); - this.#put({ type: stt.SpeechEventType.END_OF_SPEECH }); - final.reset(); - finalOriginal.reset(); - isSpeaking = false; - } else { - finalOriginal.reset(); - } + const state = newProcessMessageState(); + const options = { + isTranslationMode: this.#opts.translation !== undefined, + startTimeOffset: this.startTimeOffset, }; const keepalive = setInterval(() => { @@ -273,78 +224,14 @@ export class SpeechStream extends stt.SpeechStream { ws.on('message', (msg) => { try { const content = JSON.parse(msg.toString()) as SonioxMessage; - const tokens = content.tokens ?? []; - const nonFinal = new TokenAccumulator(); - const nonFinalOriginal = new TokenAccumulator(); - const totalAudioProcMs = content.total_audio_proc_ms ?? 0; - - for (const token of tokens) { - const isTranslated = token.translation_status === 'translation'; - if (isTranslationMode && !isEndToken(token) && !isTranslated) { - if (token.is_final) { - finalOriginal.update(token); - } else { - nonFinalOriginal.update(token); - } - continue; - } - if (token.is_final) { - if (isEndToken(token)) { - sendEndpointTranscript(); - this.#reportProcessedAudioDuration(totalAudioProcMs); - } else { - final.update(token); - } - } else { - nonFinal.update(token); - } - } - - if (final.text || nonFinal.text) { - if (!isSpeaking) { - isSpeaking = true; - this.#put({ type: stt.SpeechEventType.START_OF_SPEECH }); - } - - const mergedOriginals = mergeLangSegments( - finalOriginal.langSegments, - nonFinalOriginal.langSegments, - ); - const mergedPrimary = mergeLangSegments(final.langSegments, nonFinal.langSegments); - const [srcSegs, tgtSegs] = isTranslationMode - ? [mergedOriginals, mergedPrimary] - : [mergedPrimary, []]; - const [sourceLanguages, sourceTexts] = langSegmentsToFields(srcSegs); - const [targetLanguages, targetTexts] = langSegmentsToFields(tgtSegs); - const eventType = - final.text && !nonFinal.text - ? stt.SpeechEventType.PREFLIGHT_TRANSCRIPT - : stt.SpeechEventType.INTERIM_TRANSCRIPT; - - this.#put({ - type: eventType, - alternatives: [ - final.mergedSpeechData(nonFinal, this.startTimeOffset, { - sourceLanguages, - sourceTexts, - targetLanguages, - targetTexts, - }), - ], - }); - } - - if (content.finished || content.error_code || content.error_message) { - sendEndpointTranscript(); - this.#reportProcessedAudioDuration(totalAudioProcMs); + for (const event of processMessage(state, content, options)) { + this.#put(event); } - if (content.error_code || content.error_message) { this.#logger.error( `WebSocket error: ${content.error_code ?? ''} - ${content.error_message ?? ''}`, ); } - if (content.finished) { resolve(); } @@ -388,18 +275,6 @@ export class SpeechStream extends stt.SpeechStream { onClosing(); } - #reportProcessedAudioDuration(totalAudioProcMs: number): void { - const toReportMs = totalAudioProcMs - this.#reportedDurationMs; - if (toReportMs <= 0) return; - this.#put({ - type: stt.SpeechEventType.RECOGNITION_USAGE, - recognitionUsage: { - audioDuration: toReportMs / 1000, - }, - }); - this.#reportedDurationMs = Math.trunc(totalAudioProcMs); - } - #put(event: stt.SpeechEvent): void { if (!this.queue.closed) { this.queue.put(event); @@ -407,9 +282,6 @@ export class SpeechStream extends stt.SpeechStream { } } -const isEndToken = (token: SonioxToken): boolean => - token.text === END_TOKEN || token.text === FINALIZED_TOKEN; - const serializeContext = (context: ContextObject | string | undefined): unknown => { if (context === undefined || typeof context === 'string') return context; return { @@ -430,158 +302,3 @@ const serializeTranslation = (translation: TranslationConfig): Record { - const result = [...a]; - for (const [lang, text] of b) { - const last = result[result.length - 1]; - if (last && last[0] === lang) { - last[1] += text; - } else { - result.push([lang, text]); - } - } - return result; -}; - -const langSegmentsToFields = ( - segments: LangSegment[], -): [stt.SpeechData['sourceLanguages'] | undefined, string[] | undefined] => { - if (segments.length === 0) return [undefined, undefined]; - return [segments.map(([lang]) => lang), segments.map(([, text]) => text)]; -}; - -interface SpeechDataFields { - sourceLanguages?: stt.SpeechData['sourceLanguages']; - sourceTexts?: string[]; - targetLanguages?: stt.SpeechData['targetLanguages']; - targetTexts?: string[]; -} - -interface LangStats { - numChars: number; - updatedAt: number; -} - -class TokenAccumulator { - text = ''; - language: stt.SpeechData['language'] = '' as stt.SpeechData['language']; - speakerId?: string; - startTime = 0; - endTime = 0; - #confidenceSum = 0; - #confidenceCount = 0; - #hasStartTime = false; - #langSegments: LangSegment[] = []; - #langStats = new Map(); - - get langSegments(): LangSegment[] { - return this.#langSegments; - } - - get confidence(): number { - return this.#confidenceCount === 0 ? 0 : this.#confidenceSum / this.#confidenceCount; - } - - update(token: SonioxToken): void { - const text = token.text; - const lang = (token.language ?? '') as stt.SpeechData['language']; - this.text += text; - if (lang && text) { - const stats = this.#langStats.get(lang) ?? { numChars: 0, updatedAt: 0 }; - this.#langStats.set(lang, { numChars: stats.numChars + text.length, updatedAt: Date.now() }); - this.language = this.#getLanguage(); - } - if (token.speaker !== undefined && this.speakerId === undefined) { - this.speakerId = String(token.speaker); - } - if (token.start_ms !== undefined && !this.#hasStartTime) { - this.#hasStartTime = true; - this.startTime = token.start_ms; - } - if (token.end_ms !== undefined) { - this.endTime = token.end_ms; - } - if (token.confidence !== undefined) { - this.#confidenceSum += token.confidence; - this.#confidenceCount += 1; - } - if (text) { - const last = this.#langSegments[this.#langSegments.length - 1]; - if (last && last[0] === lang) { - last[1] += text; - } else { - this.#langSegments.push([lang, text]); - } - } - } - - reset(): void { - this.text = ''; - this.language = '' as stt.SpeechData['language']; - this.speakerId = undefined; - this.startTime = 0; - this.endTime = 0; - this.#confidenceSum = 0; - this.#confidenceCount = 0; - this.#hasStartTime = false; - this.#langSegments = []; - this.#langStats.clear(); - } - - toSpeechData(startTimeOffset = 0, fields: SpeechDataFields = {}): stt.SpeechData { - return { - text: this.text, - language: this.language, - sourceLanguages: fields.sourceLanguages, - sourceTexts: fields.sourceTexts, - targetLanguages: fields.targetLanguages, - targetTexts: fields.targetTexts, - speakerId: this.speakerId, - startTime: this.startTime / 1000 + startTimeOffset, - endTime: this.endTime / 1000 + startTimeOffset, - confidence: this.confidence, - }; - } - - mergedSpeechData( - other: TokenAccumulator, - startTimeOffset = 0, - fields: SpeechDataFields = {}, - ): stt.SpeechData { - const starts = [this, other].filter((acc) => acc.#hasStartTime).map((acc) => acc.startTime); - const start = starts.length ? Math.min(...starts) : 0; - const totalCount = this.#confidenceCount + other.#confidenceCount; - const totalSum = this.#confidenceSum + other.#confidenceSum; - return { - text: this.text + other.text, - language: this.language || other.language, - sourceLanguages: fields.sourceLanguages, - sourceTexts: fields.sourceTexts, - targetLanguages: fields.targetLanguages, - targetTexts: fields.targetTexts, - speakerId: this.speakerId ?? other.speakerId, - startTime: start / 1000 + startTimeOffset, - endTime: Math.max(this.endTime, other.endTime) / 1000 + startTimeOffset, - confidence: totalCount > 0 ? totalSum / totalCount : 0, - }; - } - - #getLanguage(): stt.SpeechData['language'] { - let selected = ''; - let selectedStats: LangStats | undefined; - for (const [lang, stats] of this.#langStats) { - if ( - !selectedStats || - stats.numChars > selectedStats.numChars || - (stats.numChars === selectedStats.numChars && stats.updatedAt < selectedStats.updatedAt) - ) { - selected = lang; - selectedStats = stats; - } - } - return selected as stt.SpeechData['language']; - } -} From 96ea001b4b2905f96bb250940681566afe045f5b Mon Sep 17 00:00:00 2001 From: Chenghao Mou Date: Mon, 25 May 2026 20:19:32 +0100 Subject: [PATCH 3/3] address comment --- plugins/soniox/src/_internal.ts | 2 +- plugins/soniox/src/stt.test.ts | 9 +++++++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/plugins/soniox/src/_internal.ts b/plugins/soniox/src/_internal.ts index 5826e5a70..d735bf563 100644 --- a/plugins/soniox/src/_internal.ts +++ b/plugins/soniox/src/_internal.ts @@ -35,7 +35,7 @@ export const isEndToken = (token: SonioxToken): boolean => token.text === END_TOKEN || token.text === FINALIZED_TOKEN; export const mergeLangSegments = (a: LangSegment[], b: LangSegment[]): LangSegment[] => { - const result = [...a]; + const result: LangSegment[] = a.map(([lang, text]) => [lang, text]); for (const [lang, text] of b) { const last = result[result.length - 1]; if (last && last[0] === lang) { diff --git a/plugins/soniox/src/stt.test.ts b/plugins/soniox/src/stt.test.ts index caf3e298a..18f623b90 100644 --- a/plugins/soniox/src/stt.test.ts +++ b/plugins/soniox/src/stt.test.ts @@ -4,6 +4,7 @@ import { stt } from '@livekit/agents'; import { describe, expect, it } from 'vitest'; import { + type LangSegment, type SonioxMessage, type SonioxToken, TokenAccumulator, @@ -79,6 +80,14 @@ describe('mergeLangSegments', () => { ['es', ' hola'], ]); }); + + it('does not mutate the input segments when extending a trailing run', () => { + const a: LangSegment[] = [['en' as stt.SpeechData['language'], 'Hello']]; + const b: LangSegment[] = [['en' as stt.SpeechData['language'], ' world']]; + mergeLangSegments(a, b); + expect(a).toEqual([['en', 'Hello']]); + expect(b).toEqual([['en', ' world']]); + }); }); // ---------------------------------------------------------------------------