diff --git a/backend/routers/sync.py b/backend/routers/sync.py index 89428643e3..b33bfcb9f5 100644 --- a/backend/routers/sync.py +++ b/backend/routers/sync.py @@ -9,6 +9,7 @@ import time import uuid as _uuid import wave +from collections import deque from datetime import datetime, timezone from typing import Dict, List, Optional, Tuple @@ -949,6 +950,41 @@ def identify_speakers_for_segments( ) +ORDERED_ASSIGNMENT_WAIT_SECONDS = 600 + + +class _OrderedTurnstile: + """Serializes conversation assignment across parallel segment threads in timestamp order. + + Segments are transcribed concurrently, but each must wait its (chronological) turn + before looking up / creating a conversation. Without this, timestamp-adjacent chunks + race get_closest_conversation_to_timestamps() before any of them has persisted a + conversation, so every chunk becomes its own conversation (#6551, #5747). + """ + + def __init__(self, ordered_keys: List[str]): + self._pending = deque(ordered_keys) + self._done = set() + self._cond = threading.Condition() + + def _advance(self): + while self._pending and self._pending[0] in self._done: + self._pending.popleft() + + def wait_turn(self, key: str, timeout: float = ORDERED_ASSIGNMENT_WAIT_SECONDS) -> bool: + """Block until every earlier key has completed. Returns False on timeout (fail-open).""" + with self._cond: + return self._cond.wait_for( + lambda: self._advance() or not self._pending or self._pending[0] == key, timeout=timeout + ) + + def complete(self, key: str): + with self._cond: + self._done.add(key) + self._advance() + self._cond.notify_all() + + def process_segment( path: str, uid: str, @@ -960,6 +996,7 @@ def process_segment( transcription_prefs: dict = None, person_embeddings_cache: dict = None, target_conversation_id: str = None, + turnstile: Optional[_OrderedTurnstile] = None, ): try: url = get_syncing_file_temporal_signed_url(path) @@ -1011,6 +1048,13 @@ def delete_file(): if audio_bytes: del audio_bytes + # Conversation assignment must happen chronologically across the batch: wait until + # every earlier-timestamped segment has created/merged its conversation, otherwise + # the closest-conversation lookup races and adjacent chunks split into separate + # conversations. + if turnstile and not turnstile.wait_turn(path): + logger.warning(f'sync: ordered assignment wait timed out for {path}, proceeding out of order') + timestamp = get_timestamp_from_path(path) segment_end_timestamp = timestamp + transcript_segments[-1].end @@ -1104,11 +1148,34 @@ def delete_file(): reason = 'discarded' if closest_memory.get('discarded', False) else 'auto-sync' logger.info(f'Conversation {closest_memory["id"]} reprocessing ({reason}) after segment merge') _reprocess_conversation_after_update(uid, closest_memory['id'], language) + else: + # Summary/structured data is now stale (it predates the merged segments). + # Record it so the caller reprocesses once per conversation at batch end, + # instead of once per merged segment. + with lock: + response.setdefault('_merged', {})[closest_memory['id']] = language except Exception as e: error_msg = f'Failed to process segment {path}: {e}' logger.error(error_msg) with lock: errors.append(error_msg) + finally: + if turnstile: + turnstile.complete(path) + + +def _reprocess_merged_conversations(uid: str, response: dict): + """Regenerate summary/structured data for conversations that gained segments this batch. + + The merge path in process_segment only appends transcript segments; without this the + conversation keeps the summary generated from its first chunk only. + """ + merged = response.pop('_merged', {}) + for conversation_id, language in merged.items(): + try: + _reprocess_conversation_after_update(uid, conversation_id, language) + except Exception as e: + logger.error(f'sync: failed to reprocess merged conversation {conversation_id}: {e}') def _cleanup_files(file_paths): @@ -1247,6 +1314,11 @@ def _run_vad(path): logger.warning(f'sync: failed to load person embeddings, skipping speaker ID uid={uid}: {e}') person_embeddings_cache = {} + # Chronological order + turnstile: STT runs in parallel, but conversation + # assignment is serialized oldest-first so adjacent chunks merge instead of + # racing into separate conversations (#6551, #5747). + ordered_paths = sorted(segmented_paths, key=get_timestamp_from_path) + assignment_turnstile = _OrderedTurnstile(ordered_paths) await asyncio.gather( *[ run_blocking( @@ -1262,11 +1334,14 @@ def _run_vad(path): transcription_prefs, person_embeddings_cache, conversation_id, + assignment_turnstile, ) - for path in segmented_paths + for path in ordered_paths ] ) + await run_blocking(sync_executor, _reprocess_merged_conversations, uid, response) + # Record DG usage after successful processing (not before, to avoid charging on retries) if fair_use_restrict_dg: try: @@ -1534,6 +1609,12 @@ def _run_vad_bg(path): segment_errors = [] segment_lock = threading.Lock() + # Chronological order + turnstile: STT runs in parallel (per chunk), but + # conversation assignment is serialized oldest-first so adjacent chunks merge + # instead of racing into separate conversations (#6551, #5747). + segment_list = sorted(segmented_paths, key=get_timestamp_from_path) + assignment_turnstile = _OrderedTurnstile(segment_list) + def _process_one_segment(path): process_segment( path, @@ -1546,15 +1627,17 @@ def _process_one_segment(path): transcription_prefs, person_embeddings_cache, target_conversation_id, + assignment_turnstile, ) chunk_size = 5 - segment_list = list(segmented_paths) for i in range(0, len(segment_list), chunk_size): chunk = segment_list[i : i + chunk_size] + # Later segments in a chunk also wait their assignment turn, so widen + # their timeout by position to avoid spurious timeouts. seg_tasks = [ - asyncio.wait_for(run_blocking(sync_executor, _process_one_segment, path), timeout=300) - for path in chunk + asyncio.wait_for(run_blocking(sync_executor, _process_one_segment, path), timeout=300 + 60 * j) + for j, path in enumerate(chunk) ] seg_results = await asyncio.gather(*seg_tasks, return_exceptions=True) for r in seg_results: @@ -1573,6 +1656,8 @@ def _process_one_segment(path): except Exception: pass + await run_blocking(sync_executor, _reprocess_merged_conversations, uid, response) + stage_timings['stt_llm_ms'] = int((time.monotonic() - t0) * 1000) # Record DG usage after processing diff --git a/backend/test.sh b/backend/test.sh index efadf7c508..cad9917eed 100755 --- a/backend/test.sh +++ b/backend/test.sh @@ -90,6 +90,7 @@ pytest tests/unit/test_sync_fair_use_gate.py -v pytest tests/unit/test_sync_pcm_decode.py -v pytest tests/unit/test_sync_opus_decode.py -v pytest tests/unit/test_sync_silent_failure.py -v +pytest tests/unit/test_sync_ordered_assignment.py -v pytest tests/unit/test_fair_use_free_tier.py -v pytest tests/unit/test_fair_use_upgrade.py -v pytest tests/unit/test_skip_classifier_restrict.py -v diff --git a/backend/tests/unit/test_sync_ordered_assignment.py b/backend/tests/unit/test_sync_ordered_assignment.py new file mode 100644 index 0000000000..daba7bad83 --- /dev/null +++ b/backend/tests/unit/test_sync_ordered_assignment.py @@ -0,0 +1,166 @@ +""" +Tests for ordered conversation assignment in offline sync (#6551, #5747). + +Bug: sync_local_files (v1) and the v2 background pipeline processed VAD segments +fully in parallel. Each process_segment() call independently ran +get_closest_conversation_to_timestamps() and, finding nothing (none of its +timestamp-adjacent siblings had persisted a conversation yet), created its own +conversation — so a pendant backlog of chunks separated by seconds became many +separate conversations instead of merging. + +Fix: segments are sorted chronologically and an _OrderedTurnstile serializes the +conversation lookup/create/merge step in timestamp order (STT stays parallel). +Conversations that gained segments are reprocessed once per batch so their +summary covers the merged content. +""" + +import os +import threading +import time +from collections import deque +from typing import List + +SYNC_PATH = os.path.join(os.path.dirname(__file__), '..', '..', 'routers', 'sync.py') + + +def _read_sync_source(): + with open(SYNC_PATH) as f: + return f.read() + + +def _function_body(source: str, name: str) -> str: + start = source.index(f'def {name}(') + next_def = source.index('\ndef ', start + 1) + return source[start:next_def] + + +def _async_function_body(source: str, name: str) -> str: + start = source.index(f'async def {name}(') + end = source.find('\nasync def ', start + 1) + end2 = source.find('\ndef ', start + 1) + candidates = [e for e in (end, end2) if e != -1] + return source[start : min(candidates)] if candidates else source[start:] + + +def _load_turnstile_class(): + """Extract and exec the _OrderedTurnstile class without importing routers.sync + (which pulls in firestore/opuslib/etc.).""" + source = _read_sync_source() + start = source.index('class _OrderedTurnstile') + end = source.index('\ndef ', start) + class_src = source[start:end] + namespace = {'deque': deque, 'threading': threading, 'List': List} + exec('ORDERED_ASSIGNMENT_WAIT_SECONDS = 600\n' + class_src, namespace) + return namespace['_OrderedTurnstile'] + + +# --------------------------------------------------------------------------- +# 1. _OrderedTurnstile behavior +# --------------------------------------------------------------------------- + + +class TestOrderedTurnstile: + def test_serializes_in_given_order_despite_reverse_readiness(self): + """Threads become ready newest-first; assignment order must still be oldest-first.""" + Turnstile = _load_turnstile_class() + keys = ['t1.wav', 't2.wav', 't3.wav', 't4.wav'] + turnstile = Turnstile(keys) + order = [] + order_lock = threading.Lock() + + def worker(key, stt_delay): + time.sleep(stt_delay) # simulated parallel STT, newest finishes first + try: + assert turnstile.wait_turn(key, timeout=10) + with order_lock: + order.append(key) + finally: + turnstile.complete(key) + + threads = [ + threading.Thread(target=worker, args=(key, delay)) for key, delay in zip(keys, [0.2, 0.15, 0.1, 0.05]) + ] + for t in threads: + t.start() + for t in threads: + t.join(timeout=15) + assert order == keys, f'assignment must be chronological, got {order}' + + def test_early_complete_without_wait_unblocks_followers(self): + """A segment that short-circuits (silence) completes without waiting and must + not block later segments.""" + Turnstile = _load_turnstile_class() + turnstile = Turnstile(['a', 'b']) + # 'a' never calls wait_turn (early return path) — only complete() + turnstile.complete('a') + assert turnstile.wait_turn('b', timeout=1), "'b' must proceed after 'a' completed early" + + def test_wait_times_out_fail_open(self): + """If an earlier segment hangs, wait_turn returns False instead of deadlocking.""" + Turnstile = _load_turnstile_class() + turnstile = Turnstile(['a', 'b']) + t0 = time.monotonic() + assert turnstile.wait_turn('b', timeout=0.2) is False + assert time.monotonic() - t0 < 5 + + def test_first_key_proceeds_immediately(self): + Turnstile = _load_turnstile_class() + turnstile = Turnstile(['a', 'b', 'c']) + assert turnstile.wait_turn('a', timeout=0.1) is True + + def test_out_of_order_completion_converges(self): + """Completions arriving in arbitrary order still release the queue head correctly.""" + Turnstile = _load_turnstile_class() + turnstile = Turnstile(['a', 'b', 'c']) + turnstile.complete('b') # later key done first (early return) + turnstile.complete('a') + assert turnstile.wait_turn('c', timeout=1) is True + + +# --------------------------------------------------------------------------- +# 2. Structural guards — callers actually use the turnstile +# --------------------------------------------------------------------------- + + +class TestCallersUseOrderedAssignment: + def test_process_segment_accepts_turnstile_and_releases_it(self): + body = _function_body(_read_sync_source(), 'process_segment') + assert 'turnstile' in body.split('):')[0], 'process_segment must accept a turnstile param' + assert 'wait_turn' in body, 'process_segment must wait its chronological turn' + assert 'finally:' in body and 'turnstile.complete(path)' in body, ( + 'process_segment must always release its turn (finally), ' 'or followers deadlock on early returns/errors' + ) + + def test_wait_turn_precedes_conversation_lookup(self): + body = _function_body(_read_sync_source(), 'process_segment') + assert body.index('wait_turn') < body.index( + 'get_closest_conversation_to_timestamps' + ), 'turn must be acquired before the closest-conversation lookup' + + def test_v1_sorts_segments_and_passes_turnstile(self): + body = _async_function_body(_read_sync_source(), 'sync_local_files') + assert 'sorted(segmented_paths, key=get_timestamp_from_path)' in body + assert '_OrderedTurnstile(' in body + assert 'assignment_turnstile,' in body, 'v1 must pass the turnstile to process_segment' + + def test_v2_sorts_segments_and_passes_turnstile(self): + body = _async_function_body(_read_sync_source(), '_run_full_pipeline_background_async') + assert 'sorted(segmented_paths, key=get_timestamp_from_path)' in body + assert '_OrderedTurnstile(' in body + assert 'assignment_turnstile,' in body, 'v2 must pass the turnstile to process_segment' + + def test_both_pipelines_reprocess_merged_conversations(self): + source = _read_sync_source() + v1 = _async_function_body(source, 'sync_local_files') + v2 = _async_function_body(source, '_run_full_pipeline_background_async') + assert '_reprocess_merged_conversations' in v1 + assert '_reprocess_merged_conversations' in v2 + + def test_merge_path_records_merged_conversation(self): + body = _function_body(_read_sync_source(), 'process_segment') + assert "_merged" in body, 'merge path must record conversations that gained segments' + + def test_reprocess_helper_is_fail_safe(self): + body = _function_body(_read_sync_source(), '_reprocess_merged_conversations') + assert "pop('_merged'" in body, 'must pop the internal key so it never leaks into responses' + assert 'except Exception' in body, 'one failed reprocess must not fail the batch'