Skip to content

feat(cartesia): add ink-2 stt#5827

Open
charlotte-zhuang wants to merge 23 commits into
livekit:mainfrom
cartesia-ai:main
Open

feat(cartesia): add ink-2 stt#5827
charlotte-zhuang wants to merge 23 commits into
livekit:mainfrom
cartesia-ai:main

Conversation

@charlotte-zhuang
Copy link
Copy Markdown
Contributor

@charlotte-zhuang charlotte-zhuang commented May 24, 2026

Adds the Cartesia Ink 2 STT model.

This model has a different API from Ink Whisper and supports different features, most importantly turn detection.

A decision was made to use the same cartesia.STT() class for both the new and old API to avoid confusion as to which class to use.

transcribe_on_flush

I also added a way to use both ink-whisper and ink-2 in conjunction with stream.flush().

I'm not entirely sure how this works in LiveKit, but we have some users who want to tell the model when turns end rather than our model telling them when the turn ended. Those users will see increased latency and unexpected transcripts without this "transcribe_on_flush" behavior.

The reason why I couldn't use the LegacyRecognizeStream is because that implementation streams out FINAL_TRANSCRIPT in chunks as soon as they become available. Then, LiveKit concatenates all FINAL_TRANSCRIPT speech events with spaces:

self._audio_transcript += f" {transcript}"

This space concatenation can cause issues with the transcript by inserting spaces in unexpected places. My solution was to create a new opt-in behavior where FINAL_TRANSCRIPT is only emitted after stream.flush() is called and all audio up until that point is flushed. This way, the entire human turn is put in a single transcript and the space concatenation issue is avoided.

An alternative solution would be to add a flag to SpeechEvent like transcript_event_sep: str = " " so that plugins can specify how their transcripts should be concatenated. Then, LegacyRecognizeStream can just emit transcripts with transcript_event_sep="".

Changes

These changes could be considered breaking in a way, but they should not affect expected usage patterns:

  1. Default model for English is now ink-2
  2. START_OF_SPEECH, INTERIM_TRANSCRIPT, PREFLIGHT_TRANSCRIPT, and END_OF_SPEECH events will be emitted
  3. Aligned transcripts are no longer supported
  4. update_options(model="...") is now a no-op. There is no reason to update your model mid-session so I'm not sure why this was added in the first place. This should not negatively effect any users since before Ink 2, there was only a single Ink Whisper snapshot, making the method even more bewildering.
  5. SpeechStream is now an ABC to allow for different behavior based on the model. It is highly unexpected for users to be constructing their own cartesia.stt.SpeechStream instances.
  6. Ink Whisper checks self._FlushSentinel and sends "finalize" commands to the server

These changes are purely additive:

  1. TurnDetectingRecognizeStream, which is the default for ink-2
  2. TranscribeOnFlushRecognizeStream, which makes stream.flush() control when the final transcript is emitted
  3. cartesia.py example agent
  4. AUDIO_ENCODING constant
  5. doc strings

Testing

Tested using cartesia.py using both ink-2 and ink-whisper.

Also verified that make check passes and existing tests pass.

devin-ai-integration[bot]

This comment was marked as resolved.

devin-ai-integration[bot]

This comment was marked as resolved.

devin-ai-integration[bot]

This comment was marked as resolved.

@charlotte-zhuang
Copy link
Copy Markdown
Contributor Author

It looks like Test STT / test-stt is failing because:

  1. Some errors with livekit.plugins.aws and livekit.plugins.fireworksai
  2. OPENAI_API_KEY is not set
  3. The GitHub action was unable to post a comment to this PR
FAILED tests/test_stt.py::test_recognize[livekit.plugins.openai] - openai.OpenAIError: Missing credentials. Please pass an `api_key`, `workload_identity`, `admin_api_key`, or set the `OPENAI_API_KEY` or `OPENAI_ADMIN_KEY` environment variable.
FAILED tests/test_stt.py::test_stream[livekit.plugins.fireworksai] - RuntimeError: livekit.plugins.fireworksai.stt.SpeechStream is closed
FAILED tests/test_stt.py::test_stream[livekit.plugins.openai] - openai.OpenAIError: Missing credentials. Please pass an `api_key`, `workload_identity`, `admin_api_key`, or set the `OPENAI_API_KEY` or `OPENAI_ADMIN_KEY` environment variable.
FAILED tests/test_stt.py::test_stream[livekit.plugins.aws] - smithy_core.exceptions.CallError: Unknown error for operation com.amazonaws.transcribestreaming#StartStreamTranscription - status: 403 - id: com.amazonaws.transcribestreaming#UnrecognizedClientException
POST /repos/livekit/agents/issues/5827/comments - 403 with id 8009:24BDD6:12551F54:4647D1C7:6A1310E4 in 157ms
RequestError [HttpError]: Resource not accessible by integration - https://docs.github.com/rest/issues/comments#create-an-issue-comment

Comment thread livekit-plugins/livekit-plugins-cartesia/livekit/plugins/cartesia/stt.py Outdated
samples_per_channel=samples_50ms,
)
@dataclass
class STTOptions:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you foresee more parameters introduced in the future that would apply to both models? i'm a bit hesitant on deprecating this at the moment

Copy link
Copy Markdown
Contributor Author

@charlotte-zhuang charlotte-zhuang May 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually deprecated STTOptions since this class appears to be for internal use only.

The one public use was in SpeechStream.__init__(), but realistically users get a SpeechStream instance by calling STT.stream(), not by constructing their own instances. That was why I made SpeechStream an ABC, which removed the need for STTOptions.

Comment thread livekit-plugins/livekit-plugins-cartesia/livekit/plugins/cartesia/models.py Outdated
Comment thread .github/next-release/changeset-cartesia-ink-2-stt.md Outdated
from .models import STTLanguages


class CartesiaRecognizeStream(stt.RecognizeStream):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i wonder if it would be possible to condense this to something like:

CartesiaRecognizeStream: TypeAlias = TurnsRecognizeStream | LegacyRecognizeStream

what do you think of this?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know if users call methods on the return of STT.stream()? i.e. is it important that all my RecognizeStream implementations have the same interface?

I just wanted to avoid a situation where you would need to do something like this

stt = cartesia.STT(model="ink-whisper")
stream = cartesia.stream()

cast(stream, Any).update_options(language="es")

So the CartesiaRecognizeStream ABC helps to make sure all our RecognizeStream implementations follow the same interface.

If I were to implement this for the first time, I would probably make our SpeechStream.update_options method private.

@tinalenguyen
Copy link
Copy Markdown
Member

/test-stt

@github-actions
Copy link
Copy Markdown
Contributor

STT Test Results

Status: ✗ Some tests failed

Metric Count
✓ Passed 22
✗ Failed 2
× Errors 1
→ Skipped 16
▣ Total 41
⏱ Duration 207.9s
Failed Tests
  • tests.test_stt::test_stream[livekit.plugins.nvidia]
    stt_factory = <function parameter_factory.<locals>.<lambda> at 0x7fe267f21bc0>
    request = <FixtureRequest for <Coroutine test_stream[livekit.plugins.nvidia]>>
    
        @pytest.mark.usefixtures("job_process")
        @pytest.mark.parametrize("stt_factory", STTs)
        async def test_stream(stt_factory: Callable[[], STT], request):
            sample_rate = SAMPLE_RATE
            plugin_id = request.node.callspec.id.split("-")[0]
            frames, transcript, _ = await make_test_speech(chunk_duration_ms=10, sample_rate=sample_rate)
      
            # TODO: differentiate missing key vs other errors
            try:
                stt_instance: STT = stt_factory()
            except ValueError as e:
                pytest.skip(f"{plugin_id}: {e}")
      
            async with stt_instance as stt:
                label = f"{stt.model}@{stt.provider}"
                if not stt.capabilities.streaming:
                    pytest.skip(f"{label} does not support streaming")
      
                for attempt in range(MAX_RETRIES):
                    try:
                        state = {"closing": False}
      
                        async def _stream_input(
                            frames: list[rtc.AudioFrame], stream: RecognizeStream, state: dict = state
                        ):
                            for frame in frames:
                                stream.push_frame(frame)
                                await asyncio.sleep(0.005)
      
                            stream.end_input()
                            state["closing"] = True
      
                        async def _stream_output(stream: RecognizeStream, state: dict = state):
                            text = ""
                            # make sure the events are sent in the right order
                            recv_start, recv_end = False, True
                            start_time = time.time()
                            got_final_transcript = False
      
                            async for event in stream:
                                if event.type == agents.stt.SpeechEventType.START_OF_SPEECH:
    
  • tests.test_stt::test_stream[livekit.plugins.speechmatics]
    def finalizer() -> None:
            """Yield again, to finalize."""
      
            async def async_finalizer() -> None:
                try:
                    await gen_obj.__anext__()
                except StopAsyncIteration:
                    pass
                else:
                    msg = "Async generator fixture didn't stop."
                    msg += "Yield only once."
                    raise ValueError(msg)
      
    >       runner.run(async_finalizer(), context=context)
    
    .venv/lib/python3.12/site-packages/pytest_asyncio/plugin.py:330: 
    _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
    
    self = <asyncio.runners.Runner object at 0x7f5f0ac02750>
    coro = <coroutine object _wrap_asyncgen_fixture.<locals>._asyncgen_fixture_wrapper.<locals>.finalizer.<locals>.async_finalizer at 0x7f5f09f4c380>
    
        def run(self, coro, *, context=None):
            """Run a coroutine inside the embedded event loop."""
            if not coroutines.iscoroutine(coro):
                raise ValueError("a coroutine was expected, got {!r}".format(coro))
      
            if events._get_running_loop() is not None:
                # fail fast with short traceback
                raise RuntimeError(
                    "Runner.run() cannot be called from a running event loop")
      
            self._lazy_init()
      
            if context is None:
                context = self._context
            task = self._loop.create_task(coro, context=context)
      
            if (threading.current_thread() is threading.main_thread()
                and signal.getsignal(signal.SIGINT) is signal.default_int_handler
            ):
                sigint_handler = functools.partial(self._on_sigint, main_task=task)
                try:
                    signal.signal(signal.SIGINT, sigint_handler)
                except ValueError:
                    # `signal.signal` may throw if `threading.main_thread` does
                    # not support signals (e.g. embedded interpreter with signals
                    # not registered - see gh-91880)
                    sigint_handler = None
    
  • tests.test_stt::test_stream[livekit.agents.inference]
    stt_factory = <function parameter_factory.<locals>.<lambda> at 0x7f5f0a3218a0>
    request = <FixtureRequest for <Coroutine test_stream[livekit.agents.inference]>>
    
        @pytest.mark.usefixtures("job_process")
        @pytest.mark.parametrize("stt_factory", STTs)
        async def test_stream(stt_factory: Callable[[], STT], request):
            sample_rate = SAMPLE_RATE
            plugin_id = request.node.callspec.id.split("-")[0]
            frames, transcript, _ = await make_test_speech(chunk_duration_ms=10, sample_rate=sample_rate)
      
            # TODO: differentiate missing key vs other errors
            try:
                stt_instance: STT = stt_factory()
            except ValueError as e:
                pytest.skip(f"{plugin_id}: {e}")
      
            async with stt_instance as stt:
                label = f"{stt.model}@{stt.provider}"
                if not stt.capabilities.streaming:
                    pytest.skip(f"{label} does not support streaming")
      
                for attempt in range(MAX_RETRIES):
                    try:
                        state = {"closing": False}
      
                        async def _stream_input(
                            frames: list[rtc.AudioFrame], stream: RecognizeStream, state: dict = state
                        ):
                            for frame in frames:
                                stream.push_frame(frame)
                                await asyncio.sleep(0.005)
      
                            stream.end_input()
                            state["closing"] = True
      
                        async def _stream_output(stream: RecognizeStream, state: dict = state):
                            text = ""
                            # make sure the events are sent in the right order
                            recv_start, recv_end = False, True
                            start_time = time.time()
                            got_final_transcript = False
      
                            async for event in stream:
                                if event.type == agents.stt.SpeechEventType.START_OF_SPEECH:
    
Skipped Tests
Test Reason
tests.test_stt::test_recognize[livekit.plugins.assemblyai] universal-streaming-english@AssemblyAI does not support batch recognition
tests.test_stt::test_recognize[livekit.plugins.speechmatics] enhanced@Speechmatics does not support batch recognition
tests.test_stt::test_recognize[livekit.plugins.fireworksai] unknown@FireworksAI does not support batch recognition
tests.test_stt::test_recognize[livekit.plugins.nvidia] unknown@unknown does not support batch recognition
tests.test_stt::test_recognize[livekit.plugins.cartesia] ink-2@Cartesia does not support batch recognition
tests.test_stt::test_recognize[livekit.plugins.soniox] stt-rt-v4@Soniox does not support batch recognition
tests.test_stt::test_recognize[livekit.agents.inference] unknown@livekit does not support batch recognition
tests.test_stt::test_recognize[livekit.plugins.azure] unknown@Azure STT does not support batch recognition
tests.test_stt::test_recognize[livekit.plugins.aws] unknown@Amazon Transcribe does not support batch recognition
tests.test_stt::test_recognize[ink-whisper@livekit.plugins.cartesia] ink-whisper@Cartesia does not support batch recognition
tests.test_stt::test_recognize[livekit.plugins.deepgram.STTv2] flux-general-en@Deepgram does not support batch recognition
tests.test_stt::test_stream[livekit.plugins.elevenlabs] scribe_v1@ElevenLabs does not support streaming
tests.test_stt::test_recognize[livekit.plugins.gradium.STT] unknown@Gradium does not support batch recognition
tests.test_stt::test_stream[livekit.plugins.fal] Wizper@Fal does not support streaming
tests.test_stt::test_stream[livekit.plugins.mistralai] voxtral-mini-latest@MistralAI does not support streaming
tests.test_stt::test_stream[livekit.plugins.openai] gpt-4o-mini-transcribe@api.openai.com does not support streaming

Triggered by workflow run #2165

devin-ai-integration[bot]

This comment was marked as resolved.

@charlotte-zhuang
Copy link
Copy Markdown
Contributor Author

@tinalenguyen I addressed all your comments!

I also ended up adding a third implementation of CartesiaRecognizeStream, which is why I ended up opting to keep it.

Copied from my PR description:

I also added a way to use both ink-whisper and ink-2 in conjunction with stream.flush().

I'm not entirely sure how this works in LiveKit, but we have some users who want to tell the model when turns end rather than our model telling them when the turn ended. Those users will see increased latency and unexpected transcripts without this "transcribe_on_flush" behavior.

The reason why I couldn't use the LegacyRecognizeStream is because that implementation streams out FINAL_TRANSCRIPT in chunks as soon as they become available. Then, LiveKit concatenates all FINAL_TRANSCRIPT speech events with spaces:

self._audio_transcript += f" {transcript}"

This space concatenation can cause issues with the transcript by inserting spaces in unexpected places. My solution was to create a new opt-in behavior where FINAL_TRANSCRIPT is only emitted after stream.flush() is called and all audio up until that point is flushed. This way, the entire human turn is put in a single transcript and the space concatenation issue is avoided.

An alternative solution would be to add a flag to SpeechEvent like transcript_event_sep: str = " " so that plugins can specify how their transcripts should be concatenated. Then, LegacyRecognizeStream can just emit transcripts with transcript_event_sep="".

devin-ai-integration[bot]

This comment was marked as resolved.

@CLAassistant
Copy link
Copy Markdown

CLAassistant commented May 27, 2026

CLA assistant check
All committers have signed the CLA.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants