Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/vad-stream-flush-reset.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@livekit/agents': patch
'@livekit/agents-plugin-silero': patch
---

Reset active VAD streams on flush so STT end-of-speech can recover without recreating streams.
6 changes: 6 additions & 0 deletions agents/src/vad.ts
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,12 @@ export abstract class VADStream implements AsyncIterableIterator<VADEvent> {
this.inputWriter.write(frame);
}

/**
* Mark the end of the current segment.
*
* Implementations must treat this as a hard segment boundary: drop any accumulated
* speech/silence state so the next pushed frame starts a fresh segment.
*/
flush() {
if (this.inputClosed) {
throw new Error('Input is closed');
Expand Down
39 changes: 33 additions & 6 deletions agents/src/voice/audio_recognition.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import { type SpeechEvent, SpeechEventType } from '../stt/stt.js';
import { traceTypes, tracer } from '../telemetry/index.js';
import { splitWords } from '../tokenize/basic/word.js';
import { Task, cancelAndWait, delay, readStream, waitForAbort } from '../utils.js';
import { type VAD, type VADEvent, VADEventType } from '../vad.js';
import { type VAD, type VADEvent, VADEventType, type VADStream } from '../vad.js';
import type { TurnDetectionMode } from './agent_session.js';
import { type UserTurnExceededEvent, createUserTurnExceededEvent } from './events.js';
import type { STTNode } from './io.js';
Expand Down Expand Up @@ -218,6 +218,7 @@ export class AudioRecognition {
private userTurnStart: number | undefined;
private userTurnCommitted = false;
private speaking = false;
private vadSpeechStarted = false;
private sampleRate?: number;

private userTurnSpan?: Span;
Expand Down Expand Up @@ -251,6 +252,7 @@ export class AudioRecognition {
private commitUserTurnTask?: Task<void>;
private sttForwardTask?: Task<void>;
private vadTask?: Task<void>;
private vadStream?: VADStream;
private sttConsumerTask?: Task<void>;
private interruptionTask?: Task<void>;

Expand Down Expand Up @@ -1002,9 +1004,20 @@ export class AudioRecognition {
// and user state won't be updated until a new VAD SOS is received.
// Reset VAD so that incorrect end of turn from STT can be corrected by VAD interruption.
// If user is still speaking (an immediate VAD SOS will interrupt the agent).
if (this.vad && this.speaking) {
this.logger.warn('stt end of speech received while user is speaking, resetting vad');
this.resetVad();
if (this.vad && this.vadSpeechStarted) {
if (this.vadStream) {
this.vadStream.flush();
} else {
this.resetVad();
}

this.logger.warn(
{
vadSpeechStartTime: this.speechStartTime,
flushed: this.vadStream !== undefined,
},
'stt end of speech received while vad is still in a speech segment, flushing vad',
);
}
this.speaking = false;
this.userTurnCommitted = true;
Expand Down Expand Up @@ -1168,9 +1181,13 @@ export class AudioRecognition {
// clear the transcript if the user turn was committed
this.audioTranscript = '';
this.finalTranscriptConfidence = [];
this.lastSpeakingTime = undefined;
this.lastFinalTranscriptTime = 0;
this.speechStartTime = undefined;
// Concurrent user speech might have changed it; only reset if there is no new speech.
if (this.lastSpeakingTime === lastSpeakingTime) {
this.speechStartTime = undefined;
this.vadSpeechStarted = false;
this.lastSpeakingTime = undefined;
}
}

this.userTurnCommitted = false;
Expand Down Expand Up @@ -1304,6 +1321,7 @@ export class AudioRecognition {
if (!vad) return;

const vadStream = vad.stream();
this.vadStream = vadStream;
vadStream.updateInputStream(this.vadInputStream);

const abortHandler = () => {
Expand All @@ -1322,6 +1340,10 @@ export class AudioRecognition {
this.logger.debug('VAD task: START_OF_SPEECH');
{
const startTime = Date.now() - ev.speechDuration - ev.inferenceDuration;
if (!this.vadSpeechStarted) {
this.speechStartTime = startTime;
this.vadSpeechStarted = true;
}
const span = this.ensureUserTurnSpan(startTime);
const ctx = this.userTurnContext(span);
this.endpointing.onStartOfSpeech(startTime, this.isAgentSpeaking);
Expand Down Expand Up @@ -1366,6 +1388,7 @@ export class AudioRecognition {
}

// when VAD fires END_OF_SPEECH, it already waited for the silence_duration
this.vadSpeechStarted = false;
this.speaking = false;

if (
Expand All @@ -1382,6 +1405,9 @@ export class AudioRecognition {
this.logger.error(e, 'Error in VAD task');
} finally {
this.logger.debug('VAD task closed');
if (this.vadStream === vadStream) {
this.vadStream = undefined;
}
}
}

Expand Down Expand Up @@ -1563,6 +1589,7 @@ export class AudioRecognition {
this.speechStartTime = undefined;
this.userTurnStart = undefined;
this.lastSpeakingTime = undefined;
this.vadSpeechStarted = false;
this.speaking = false;
this.userTurnCommitted = false;

Expand Down
6 changes: 6 additions & 0 deletions plugins/silero/src/onnx_model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ export class OnnxModel {
return this.#contextSize;
}

reset(): void {
this.#context.fill(0);
this.#rnnState.fill(0);
this.#inputBuffer.fill(0);
}

async run(x: Float32Array): Promise<number> {
this.#inputBuffer.set(this.#context, 0);
this.#inputBuffer.set(x, this.#contextSize);
Expand Down
38 changes: 36 additions & 2 deletions plugins/silero/src/vad.ts
Original file line number Diff line number Diff line change
Expand Up @@ -150,21 +150,55 @@ export class VADStream extends baseStream {
let speechThresholdDuration = 0;
let silenceThresholdDuration = 0;

let inputFrames = [];
let inputFrames: AudioFrame[] = [];
let inferenceFrames: AudioFrame[] = [];
let resampler: AudioResampler | null = null;

// used to avoid drift when the sampleRate ratio is not an integer
let inputCopyRemainingFrac = 0.0;

const resetState = () => {
this.#model.reset();
this.#expFilter = new ExpFilter(0.35);

speechBufferIndex = 0;
this.#speechBufferMaxReached = false;
this.#speechBuffer?.fill(0);

pubSpeaking = false;
pubSpeechDuration = 0;
pubSilenceDuration = 0;
pubCurrentSample = 0;
pubTimestamp = 0;
speechThresholdDuration = 0;
silenceThresholdDuration = 0;

inputFrames = [];
inferenceFrames = [];
inputCopyRemainingFrac = 0.0;
this.#extraInferenceTime = 0;

resampler?.close();
resampler =
this.#inputSampleRate && this.#opts.sampleRate !== this.#inputSampleRate
? new AudioResampler(
this.#inputSampleRate,
this.#opts.sampleRate,
1,
AudioResamplerQuality.QUICK,
)
: null;
};

while (!this.closed) {
const { done, value: frame } = await this.inputReader.read();
if (done) {
break;
}

if (typeof frame === 'symbol') {
continue; // ignore flush sentinel for now
resetState();
continue;
}

if (!this.#inputSampleRate || !this.#speechBuffer) {
Expand Down
Loading