diff --git a/backend/routers/sync.py b/backend/routers/sync.py index 89428643e3..1e3cf960cd 100644 --- a/backend/routers/sync.py +++ b/backend/routers/sync.py @@ -9,6 +9,7 @@ import time import uuid as _uuid import wave +from concurrent.futures import TimeoutError as FuturesTimeoutError from datetime import datetime, timezone from typing import Dict, List, Optional, Tuple @@ -90,6 +91,12 @@ # Audio constants AUDIO_SAMPLE_RATE = 16000 +# Max time the /urls endpoint blocks on caching the first uncached file. Large +# merges (hundreds of chunks, or GCS pushback) can take 60-115s+, exceeding the +# 120s middleware timeout and client timeouts (#7325); past this budget the file +# is reported as pending while the merge finishes in the background. +FIRST_FILE_CACHE_WAIT_SECONDS = 15.0 + _V1_DEPRECATION_HEADERS = {'Deprecation': 'true', 'Link': '; rel="successor-version"'} router = APIRouter() @@ -276,10 +283,24 @@ def get_audio_signed_urls_endpoint( } ) else: - # First uncached file: cache synchronously for immediate playback + # First uncached file: cache synchronously for immediate playback, + # but bounded — a slow merge must not block the endpoint past + # client/middleware timeouts (#7325). On timeout the merge keeps + # running on sync_executor and lands in the cache for later calls; + # the file is reported as pending and clients fall back to the + # stream endpoint or re-fetch. if not first_uncached_handled: first_uncached_handled = True - _precache_audio_file(uid, conversation_id, af, caller='sync_urls_first') + first_file_future = submit_with_context( + sync_executor, _precache_audio_file, uid, conversation_id, af, caller='sync_urls_first' + ) + try: + first_file_future.result(timeout=FIRST_FILE_CACHE_WAIT_SECONDS) + except FuturesTimeoutError: + logger.warning( + f"sync_urls first-file cache exceeded {FIRST_FILE_CACHE_WAIT_SECONDS}s, returning pending " + f"uid={uid} convo={conversation_id} file={audio_file_id}" + ) # Get signed URL after caching signed_url = get_merged_audio_signed_url(uid, conversation_id, audio_file_id) if signed_url: diff --git a/backend/test.sh b/backend/test.sh index 409b628aa0..006ec56ecc 100755 --- a/backend/test.sh +++ b/backend/test.sh @@ -103,6 +103,7 @@ pytest tests/unit/test_rate_limiting.py -v pytest tests/unit/test_memories_batch.py -v pytest tests/unit/test_memories_create.py -v pytest tests/unit/test_sync_v2.py -v +pytest tests/unit/test_sync_urls_bounded_wait.py -v pytest tests/unit/test_sync_transcription_prefs.py -v pytest tests/unit/test_sync_record_usage.py -v pytest tests/unit/test_vision_stream_async.py -v diff --git a/backend/tests/unit/test_sync_urls_bounded_wait.py b/backend/tests/unit/test_sync_urls_bounded_wait.py new file mode 100644 index 0000000000..5cf5522699 --- /dev/null +++ b/backend/tests/unit/test_sync_urls_bounded_wait.py @@ -0,0 +1,98 @@ +""" +Tests for the bounded first-file cache wait in GET /v1/sync/audio/{id}/urls (#7325). + +The endpoint used to merge the first uncached audio file synchronously in the +request thread. Large merges (hundreds of chunks, or GCS pushback) took 60-115s+ +in prod, exceeding the 120s timeout middleware and client timeouts, so the +request appeared to hang with zero bytes. The merge is now submitted to +sync_executor and waited on for at most FIRST_FILE_CACHE_WAIT_SECONDS; on +timeout the file is reported as pending while the merge finishes in the +background (and lands in the GCS cache for subsequent calls). + +routers/sync.py has a heavy import chain (opuslib, pydub, STT/LLM stacks), so +following the precedent of test_sync_v2.py these are structural tests over the +source, plus logic tests of the exact wait/timeout primitive used. +""" + +import os +import re +import time +from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import TimeoutError as FuturesTimeoutError + +import pytest + +SYNC_PATH = os.path.join(os.path.dirname(__file__), '..', '..', 'routers', 'sync.py') + + +def _read_sync_source(): + with open(SYNC_PATH, encoding='utf-8') as f: + return f.read() + + +def _urls_endpoint_body(source): + start = source.index('def get_audio_signed_urls_endpoint') + next_section = source.find('\n@router.', start + 1) + if next_section == -1: + next_section = len(source) + return source[start:next_section] + + +class TestUrlsEndpointStructure: + def test_first_file_merge_is_bounded(self): + """The first-file cache must be waited on with a timeout, not called inline.""" + body = _urls_endpoint_body(_read_sync_source()) + assert 'submit_with_context(' in body, "first-file cache must run on an executor" + assert re.search( + r'\.result\(timeout=FIRST_FILE_CACHE_WAIT_SECONDS\)', body + ), "first-file cache wait must be bounded by FIRST_FILE_CACHE_WAIT_SECONDS" + assert 'FuturesTimeoutError' in body, "timeout must be handled (file reported pending)" + + def test_no_inline_first_file_merge(self): + """The unbounded inline call that caused #7325 must not come back.""" + body = _urls_endpoint_body(_read_sync_source()) + assert not re.search( + r'^\s*_precache_audio_file\(', body, re.MULTILINE + ), "_precache_audio_file must not be called inline in the request thread" + + def test_coordinator_not_on_storage_executor(self): + """_precache_audio_file fans out to storage_executor, so the bounded wait + must submit it to a different pool (deadlock rule 3 in AGENTS.md).""" + body = _urls_endpoint_body(_read_sync_source()) + m = re.search(r'submit_with_context\(\s*(\w+),\s*_precache_audio_file', body) + assert m, "first-file cache must be submitted via submit_with_context" + assert ( + m.group(1) != 'storage_executor' + ), "coordinator must not share storage_executor with its chunk-download children" + + def test_wait_budget_is_under_middleware_timeout(self): + """The wait budget must stay well under the 120s HTTP timeout middleware.""" + source = _read_sync_source() + m = re.search(r'^FIRST_FILE_CACHE_WAIT_SECONDS\s*=\s*([0-9.]+)', source, re.MULTILINE) + assert m, "FIRST_FILE_CACHE_WAIT_SECONDS constant must exist at module level" + assert 0 < float(m.group(1)) <= 60, "wait budget must be positive and well under 120s" + + +class TestBoundedWaitPrimitive: + """The endpoint relies on Future.result(timeout=) leaving the task running + after a timeout — verify that contract so the background-completion claim holds.""" + + def test_timeout_leaves_merge_running_to_completion(self): + completed = [] + + def slow_merge(): + time.sleep(0.3) + completed.append(True) + + with ThreadPoolExecutor(max_workers=1) as pool: + future = pool.submit(slow_merge) + with pytest.raises(FuturesTimeoutError): + future.result(timeout=0.05) + assert not completed, "merge should still be running after the bounded wait" + future.result(timeout=2) + assert completed, "merge must finish in the background after the wait times out" + + def test_fast_merge_returns_within_budget(self): + with ThreadPoolExecutor(max_workers=1) as pool: + future = pool.submit(lambda: 'cached') + assert future.result(timeout=1) == 'cached'