diff --git a/miles/rollout/session/sessions.py b/miles/rollout/session/sessions.py index 0c285bf8bb..f3de329816 100644 --- a/miles/rollout/session/sessions.py +++ b/miles/rollout/session/sessions.py @@ -1,3 +1,4 @@ +import asyncio import json import logging import time @@ -155,7 +156,8 @@ async def chat_completions(request: Request, session_id: str): request_body["no_stop_trim"] = False request_messages = request_body.get("messages", []) - pretokenized = session.prepare_pretokenized( + pretokenized = await asyncio.to_thread( + session.prepare_pretokenized, request_messages, tools=request_body.get("tools"), tito_tokenizer=registry.tito_tokenizer, @@ -246,7 +248,8 @@ async def chat_completions(request: Request, session_id: str): ) return backend.build_proxy_response(result) - session.update_pretokenized_state( + await asyncio.to_thread( + session.update_pretokenized_state, request_messages, assistant_message, prompt_token_ids=prompt_token_ids, diff --git a/tests/fast/router/test_sessions.py b/tests/fast/router/test_sessions.py index 8dd58189e1..66efa14649 100644 --- a/tests/fast/router/test_sessions.py +++ b/tests/fast/router/test_sessions.py @@ -1,13 +1,16 @@ """Integration tests for session HTTP routes (create / get / delete / proxy).""" import re +import time import uuid +from concurrent.futures import ThreadPoolExecutor from types import SimpleNamespace from unittest.mock import patch import pytest import requests +from miles.rollout.session.linear_trajectory import LinearTrajectory from miles.rollout.session.session_server import SessionServer from miles.utils.http_utils import find_available_port from miles.utils.test_utils.mock_sglang_server import MockSGLangServer, ProcessResult, with_mock_server @@ -135,3 +138,48 @@ def test_proxy_chat_appends_record(self, router_env): record = records[0] assert record["path"] == "/v1/chat/completions" assert record["status_code"] == 200 + + +class TestTokenizationOffload: + """TITO tokenization is CPU work. It must run off the event loop so one slow + tokenization cannot stall every other request on the single uvicorn worker.""" + + def test_slow_tokenization_does_not_block_event_loop(self, router_env): + session_id = requests.post(f"{router_env.url}/sessions", timeout=5.0).json()["session_id"] + + sleep_seconds = 1.0 + + def slow_prepare(self, *args, **kwargs): + time.sleep(sleep_seconds) + return None + + payload = { + "messages": [{"role": "user", "content": "hello"}], + "return_logprob": True, + } + + with patch.object(LinearTrajectory, "prepare_pretokenized", slow_prepare): + with ThreadPoolExecutor(max_workers=1) as executor: + chat_future = executor.submit( + requests.post, + f"{router_env.url}/sessions/{session_id}/v1/chat/completions", + json=payload, + timeout=10.0, + ) + # Let the chat request reach the (now slow) tokenization step. + time.sleep(0.2) + + start = time.monotonic() + health = requests.get(f"{router_env.url}/health", timeout=5.0) + elapsed = time.monotonic() - start + + chat_future.result() + + assert health.status_code == 200 + # If tokenization ran on the event loop, /health would be blocked for + # ~sleep_seconds. Offloaded to a thread, the loop stays responsive and + # /health returns almost immediately. + assert elapsed < sleep_seconds / 2, ( + f"/health blocked for {elapsed:.2f}s during tokenization; " + "event loop was not free (tokenization not offloaded)" + )