Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions miles/rollout/session/sessions.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import json
import logging
import time
Expand Down Expand Up @@ -155,7 +156,8 @@ async def chat_completions(request: Request, session_id: str):
request_body["no_stop_trim"] = False

request_messages = request_body.get("messages", [])
pretokenized = session.prepare_pretokenized(
pretokenized = await asyncio.to_thread(
session.prepare_pretokenized,
request_messages,
tools=request_body.get("tools"),
tito_tokenizer=registry.tito_tokenizer,
Expand Down Expand Up @@ -246,7 +248,8 @@ async def chat_completions(request: Request, session_id: str):
)
return backend.build_proxy_response(result)

session.update_pretokenized_state(
await asyncio.to_thread(
session.update_pretokenized_state,
request_messages,
assistant_message,
prompt_token_ids=prompt_token_ids,
Expand Down
48 changes: 48 additions & 0 deletions tests/fast/router/test_sessions.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
"""Integration tests for session HTTP routes (create / get / delete / proxy)."""

import re
import time
import uuid
from concurrent.futures import ThreadPoolExecutor
from types import SimpleNamespace
from unittest.mock import patch

import pytest
import requests

from miles.rollout.session.linear_trajectory import LinearTrajectory
from miles.rollout.session.session_server import SessionServer
from miles.utils.http_utils import find_available_port
from miles.utils.test_utils.mock_sglang_server import MockSGLangServer, ProcessResult, with_mock_server
Expand Down Expand Up @@ -135,3 +138,48 @@ def test_proxy_chat_appends_record(self, router_env):
record = records[0]
assert record["path"] == "/v1/chat/completions"
assert record["status_code"] == 200


class TestTokenizationOffload:
"""TITO tokenization is CPU work. It must run off the event loop so one slow
tokenization cannot stall every other request on the single uvicorn worker."""

def test_slow_tokenization_does_not_block_event_loop(self, router_env):
session_id = requests.post(f"{router_env.url}/sessions", timeout=5.0).json()["session_id"]

sleep_seconds = 1.0

def slow_prepare(self, *args, **kwargs):
time.sleep(sleep_seconds)
return None

payload = {
"messages": [{"role": "user", "content": "hello"}],
"return_logprob": True,
}

with patch.object(LinearTrajectory, "prepare_pretokenized", slow_prepare):
with ThreadPoolExecutor(max_workers=1) as executor:
chat_future = executor.submit(
requests.post,
f"{router_env.url}/sessions/{session_id}/v1/chat/completions",
json=payload,
timeout=10.0,
)
# Let the chat request reach the (now slow) tokenization step.
time.sleep(0.2)

start = time.monotonic()
health = requests.get(f"{router_env.url}/health", timeout=5.0)
elapsed = time.monotonic() - start

chat_future.result()

assert health.status_code == 200
# If tokenization ran on the event loop, /health would be blocked for
# ~sleep_seconds. Offloaded to a thread, the loop stays responsive and
# /health returns almost immediately.
assert elapsed < sleep_seconds / 2, (
f"/health blocked for {elapsed:.2f}s during tokenization; "
"event loop was not free (tokenization not offloaded)"
)
Loading