Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 23 additions & 2 deletions backend/routers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import time
import uuid as _uuid
import wave
from concurrent.futures import TimeoutError as FuturesTimeoutError
from datetime import datetime, timezone
from typing import Dict, List, Optional, Tuple

Expand Down Expand Up @@ -90,6 +91,12 @@
# Audio constants
AUDIO_SAMPLE_RATE = 16000

# Max time the /urls endpoint blocks on caching the first uncached file. Large
# merges (hundreds of chunks, or GCS pushback) can take 60-115s+, exceeding the
# 120s middleware timeout and client timeouts (#7325); past this budget the file
# is reported as pending while the merge finishes in the background.
FIRST_FILE_CACHE_WAIT_SECONDS = 15.0

_V1_DEPRECATION_HEADERS = {'Deprecation': 'true', 'Link': '</v2/sync-local-files>; rel="successor-version"'}

router = APIRouter()
Expand Down Expand Up @@ -276,10 +283,24 @@ def get_audio_signed_urls_endpoint(
}
)
else:
# First uncached file: cache synchronously for immediate playback
# First uncached file: cache synchronously for immediate playback,
# but bounded — a slow merge must not block the endpoint past
# client/middleware timeouts (#7325). On timeout the merge keeps
# running on sync_executor and lands in the cache for later calls;
# the file is reported as pending and clients fall back to the
# stream endpoint or re-fetch.
if not first_uncached_handled:
first_uncached_handled = True
_precache_audio_file(uid, conversation_id, af, caller='sync_urls_first')
first_file_future = submit_with_context(
sync_executor, _precache_audio_file, uid, conversation_id, af, caller='sync_urls_first'
)
try:
first_file_future.result(timeout=FIRST_FILE_CACHE_WAIT_SECONDS)
except FuturesTimeoutError:
logger.warning(
f"sync_urls first-file cache exceeded {FIRST_FILE_CACHE_WAIT_SECONDS}s, returning pending "
f"uid={uid} convo={conversation_id} file={audio_file_id}"
)
Comment on lines +297 to +303

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 CancelledError not caught alongside FuturesTimeoutError

future.result() can also raise concurrent.futures.CancelledError — a separate exception class from FuturesTimeoutError — when the future is cancelled before it starts executing. This happens when sync_executor is at capacity and shutdown_executors(cancel_futures=True) fires during a graceful shutdown: the queued task is cancelled and result() raises rather than blocking. The exception propagates unhandled through the endpoint and becomes a 500 instead of the graceful pending response that would be correct here. Catching (FuturesTimeoutError, CancelledError) in the same handler would close the gap.

# Get signed URL after caching
signed_url = get_merged_audio_signed_url(uid, conversation_id, audio_file_id)
if signed_url:
Expand Down
1 change: 1 addition & 0 deletions backend/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ pytest tests/unit/test_rate_limiting.py -v
pytest tests/unit/test_memories_batch.py -v
pytest tests/unit/test_memories_create.py -v
pytest tests/unit/test_sync_v2.py -v
pytest tests/unit/test_sync_urls_bounded_wait.py -v
pytest tests/unit/test_sync_transcription_prefs.py -v
pytest tests/unit/test_sync_record_usage.py -v
pytest tests/unit/test_vision_stream_async.py -v
Expand Down
98 changes: 98 additions & 0 deletions backend/tests/unit/test_sync_urls_bounded_wait.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
"""
Tests for the bounded first-file cache wait in GET /v1/sync/audio/{id}/urls (#7325).

The endpoint used to merge the first uncached audio file synchronously in the
request thread. Large merges (hundreds of chunks, or GCS pushback) took 60-115s+
in prod, exceeding the 120s timeout middleware and client timeouts, so the
request appeared to hang with zero bytes. The merge is now submitted to
sync_executor and waited on for at most FIRST_FILE_CACHE_WAIT_SECONDS; on
timeout the file is reported as pending while the merge finishes in the
background (and lands in the GCS cache for subsequent calls).

routers/sync.py has a heavy import chain (opuslib, pydub, STT/LLM stacks), so
following the precedent of test_sync_v2.py these are structural tests over the
source, plus logic tests of the exact wait/timeout primitive used.
"""

import os
import re
import time
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import TimeoutError as FuturesTimeoutError

import pytest

SYNC_PATH = os.path.join(os.path.dirname(__file__), '..', '..', 'routers', 'sync.py')


def _read_sync_source():
with open(SYNC_PATH, encoding='utf-8') as f:
return f.read()


def _urls_endpoint_body(source):
start = source.index('def get_audio_signed_urls_endpoint')
next_section = source.find('\n@router.', start + 1)
if next_section == -1:
next_section = len(source)
return source[start:next_section]


class TestUrlsEndpointStructure:
def test_first_file_merge_is_bounded(self):
"""The first-file cache must be waited on with a timeout, not called inline."""
body = _urls_endpoint_body(_read_sync_source())
assert 'submit_with_context(' in body, "first-file cache must run on an executor"
assert re.search(
r'\.result\(timeout=FIRST_FILE_CACHE_WAIT_SECONDS\)', body
), "first-file cache wait must be bounded by FIRST_FILE_CACHE_WAIT_SECONDS"
assert 'FuturesTimeoutError' in body, "timeout must be handled (file reported pending)"

def test_no_inline_first_file_merge(self):
"""The unbounded inline call that caused #7325 must not come back."""
body = _urls_endpoint_body(_read_sync_source())
assert not re.search(
r'^\s*_precache_audio_file\(', body, re.MULTILINE
), "_precache_audio_file must not be called inline in the request thread"

def test_coordinator_not_on_storage_executor(self):
"""_precache_audio_file fans out to storage_executor, so the bounded wait
must submit it to a different pool (deadlock rule 3 in AGENTS.md)."""
body = _urls_endpoint_body(_read_sync_source())
m = re.search(r'submit_with_context\(\s*(\w+),\s*_precache_audio_file', body)
assert m, "first-file cache must be submitted via submit_with_context"
assert (
m.group(1) != 'storage_executor'
), "coordinator must not share storage_executor with its chunk-download children"
Comment on lines +64 to +66

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 The assertion only guards the negative case. If a future refactor changes the pool to postprocess_executor, db_executor, or any other executor, this test still passes — including pools whose workers could deadlock or whose purpose clashes with audio-merge work. Asserting positively on sync_executor catches any such drift.

Suggested change
assert (
m.group(1) != 'storage_executor'
), "coordinator must not share storage_executor with its chunk-download children"
assert (
m.group(1) == 'sync_executor'
), "coordinator must run on sync_executor (not storage_executor, to avoid deadlock with its children)"

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!


def test_wait_budget_is_under_middleware_timeout(self):
"""The wait budget must stay well under the 120s HTTP timeout middleware."""
source = _read_sync_source()
m = re.search(r'^FIRST_FILE_CACHE_WAIT_SECONDS\s*=\s*([0-9.]+)', source, re.MULTILINE)
assert m, "FIRST_FILE_CACHE_WAIT_SECONDS constant must exist at module level"
assert 0 < float(m.group(1)) <= 60, "wait budget must be positive and well under 120s"


class TestBoundedWaitPrimitive:
"""The endpoint relies on Future.result(timeout=) leaving the task running
after a timeout — verify that contract so the background-completion claim holds."""

def test_timeout_leaves_merge_running_to_completion(self):
completed = []

def slow_merge():
time.sleep(0.3)
completed.append(True)

with ThreadPoolExecutor(max_workers=1) as pool:
future = pool.submit(slow_merge)
with pytest.raises(FuturesTimeoutError):
future.result(timeout=0.05)
assert not completed, "merge should still be running after the bounded wait"
future.result(timeout=2)
assert completed, "merge must finish in the background after the wait times out"

def test_fast_merge_returns_within_budget(self):
with ThreadPoolExecutor(max_workers=1) as pool:
future = pool.submit(lambda: 'cached')
assert future.result(timeout=1) == 'cached'
Loading