Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 89 additions & 4 deletions backend/routers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import time
import uuid as _uuid
import wave
from collections import deque
from datetime import datetime, timezone
from typing import Dict, List, Optional, Tuple

Expand Down Expand Up @@ -949,6 +950,41 @@ def identify_speakers_for_segments(
)


ORDERED_ASSIGNMENT_WAIT_SECONDS = 600


class _OrderedTurnstile:
"""Serializes conversation assignment across parallel segment threads in timestamp order.

Segments are transcribed concurrently, but each must wait its (chronological) turn
before looking up / creating a conversation. Without this, timestamp-adjacent chunks
race get_closest_conversation_to_timestamps() before any of them has persisted a
conversation, so every chunk becomes its own conversation (#6551, #5747).
"""

def __init__(self, ordered_keys: List[str]):
self._pending = deque(ordered_keys)
self._done = set()
self._cond = threading.Condition()

def _advance(self):
while self._pending and self._pending[0] in self._done:
self._pending.popleft()

def wait_turn(self, key: str, timeout: float = ORDERED_ASSIGNMENT_WAIT_SECONDS) -> bool:
"""Block until every earlier key has completed. Returns False on timeout (fail-open)."""
with self._cond:
return self._cond.wait_for(
lambda: self._advance() or not self._pending or self._pending[0] == key, timeout=timeout
)
Comment on lines +976 to +979

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 _advance() side-effect in or predicate is easy to misread

_advance() always returns None (implicitly), so the or chain is effectively None or not self._pending or self._pending[0] == key. The call exists purely for its side effect of mutating _pending before the subsequent checks. A future reader could reasonably assume the branch short-circuits on a truthy result from _advance() and add an early return there, breaking the conditional entirely. A brief inline comment (e.g. # _advance() mutates _pending; its return value is intentionally None) would make the intent explicit without changing the logic.

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!


def complete(self, key: str):
with self._cond:
self._done.add(key)
self._advance()
self._cond.notify_all()


def process_segment(
path: str,
uid: str,
Expand All @@ -960,6 +996,7 @@ def process_segment(
transcription_prefs: dict = None,
person_embeddings_cache: dict = None,
target_conversation_id: str = None,
turnstile: Optional[_OrderedTurnstile] = None,
):
try:
url = get_syncing_file_temporal_signed_url(path)
Expand Down Expand Up @@ -1011,6 +1048,13 @@ def delete_file():
if audio_bytes:
del audio_bytes

# Conversation assignment must happen chronologically across the batch: wait until
# every earlier-timestamped segment has created/merged its conversation, otherwise
# the closest-conversation lookup races and adjacent chunks split into separate
# conversations.
if turnstile and not turnstile.wait_turn(path):
logger.warning(f'sync: ordered assignment wait timed out for {path}, proceeding out of order')

timestamp = get_timestamp_from_path(path)
segment_end_timestamp = timestamp + transcript_segments[-1].end

Expand Down Expand Up @@ -1104,11 +1148,34 @@ def delete_file():
reason = 'discarded' if closest_memory.get('discarded', False) else 'auto-sync'
logger.info(f'Conversation {closest_memory["id"]} reprocessing ({reason}) after segment merge')
_reprocess_conversation_after_update(uid, closest_memory['id'], language)
else:
# Summary/structured data is now stale (it predates the merged segments).
# Record it so the caller reprocesses once per conversation at batch end,
# instead of once per merged segment.
with lock:
response.setdefault('_merged', {})[closest_memory['id']] = language
except Exception as e:
error_msg = f'Failed to process segment {path}: {e}'
logger.error(error_msg)
with lock:
errors.append(error_msg)
finally:
if turnstile:
turnstile.complete(path)


def _reprocess_merged_conversations(uid: str, response: dict):
"""Regenerate summary/structured data for conversations that gained segments this batch.

The merge path in process_segment only appends transcript segments; without this the
conversation keeps the summary generated from its first chunk only.
"""
merged = response.pop('_merged', {})
for conversation_id, language in merged.items():
try:
_reprocess_conversation_after_update(uid, conversation_id, language)
except Exception as e:
logger.error(f'sync: failed to reprocess merged conversation {conversation_id}: {e}')


def _cleanup_files(file_paths):
Expand Down Expand Up @@ -1247,6 +1314,11 @@ def _run_vad(path):
logger.warning(f'sync: failed to load person embeddings, skipping speaker ID uid={uid}: {e}')
person_embeddings_cache = {}

# Chronological order + turnstile: STT runs in parallel, but conversation
# assignment is serialized oldest-first so adjacent chunks merge instead of
# racing into separate conversations (#6551, #5747).
ordered_paths = sorted(segmented_paths, key=get_timestamp_from_path)
assignment_turnstile = _OrderedTurnstile(ordered_paths)
await asyncio.gather(
*[
run_blocking(
Expand All @@ -1262,11 +1334,14 @@ def _run_vad(path):
transcription_prefs,
person_embeddings_cache,
conversation_id,
assignment_turnstile,
)
for path in segmented_paths
for path in ordered_paths
]
)

await run_blocking(sync_executor, _reprocess_merged_conversations, uid, response)

# Record DG usage after successful processing (not before, to avoid charging on retries)
if fair_use_restrict_dg:
try:
Expand Down Expand Up @@ -1534,6 +1609,12 @@ def _run_vad_bg(path):
segment_errors = []
segment_lock = threading.Lock()

# Chronological order + turnstile: STT runs in parallel (per chunk), but
# conversation assignment is serialized oldest-first so adjacent chunks merge
# instead of racing into separate conversations (#6551, #5747).
segment_list = sorted(segmented_paths, key=get_timestamp_from_path)
assignment_turnstile = _OrderedTurnstile(segment_list)

def _process_one_segment(path):
process_segment(
path,
Expand All @@ -1546,15 +1627,17 @@ def _process_one_segment(path):
transcription_prefs,
person_embeddings_cache,
target_conversation_id,
assignment_turnstile,
)

chunk_size = 5
segment_list = list(segmented_paths)
for i in range(0, len(segment_list), chunk_size):
chunk = segment_list[i : i + chunk_size]
# Later segments in a chunk also wait their assignment turn, so widen
# their timeout by position to avoid spurious timeouts.
seg_tasks = [
asyncio.wait_for(run_blocking(sync_executor, _process_one_segment, path), timeout=300)
for path in chunk
asyncio.wait_for(run_blocking(sync_executor, _process_one_segment, path), timeout=300 + 60 * j)
for j, path in enumerate(chunk)
]
seg_results = await asyncio.gather(*seg_tasks, return_exceptions=True)
for r in seg_results:
Expand All @@ -1573,6 +1656,8 @@ def _process_one_segment(path):
except Exception:
pass

await run_blocking(sync_executor, _reprocess_merged_conversations, uid, response)

stage_timings['stt_llm_ms'] = int((time.monotonic() - t0) * 1000)

# Record DG usage after processing
Expand Down
1 change: 1 addition & 0 deletions backend/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ pytest tests/unit/test_sync_fair_use_gate.py -v
pytest tests/unit/test_sync_pcm_decode.py -v
pytest tests/unit/test_sync_opus_decode.py -v
pytest tests/unit/test_sync_silent_failure.py -v
pytest tests/unit/test_sync_ordered_assignment.py -v
pytest tests/unit/test_fair_use_free_tier.py -v
pytest tests/unit/test_fair_use_upgrade.py -v
pytest tests/unit/test_skip_classifier_restrict.py -v
Expand Down
166 changes: 166 additions & 0 deletions backend/tests/unit/test_sync_ordered_assignment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
"""
Tests for ordered conversation assignment in offline sync (#6551, #5747).

Bug: sync_local_files (v1) and the v2 background pipeline processed VAD segments
fully in parallel. Each process_segment() call independently ran
get_closest_conversation_to_timestamps() and, finding nothing (none of its
timestamp-adjacent siblings had persisted a conversation yet), created its own
conversation — so a pendant backlog of chunks separated by seconds became many
separate conversations instead of merging.

Fix: segments are sorted chronologically and an _OrderedTurnstile serializes the
conversation lookup/create/merge step in timestamp order (STT stays parallel).
Conversations that gained segments are reprocessed once per batch so their
summary covers the merged content.
"""

import os
import threading
import time
from collections import deque
from typing import List

SYNC_PATH = os.path.join(os.path.dirname(__file__), '..', '..', 'routers', 'sync.py')


def _read_sync_source():
with open(SYNC_PATH) as f:
return f.read()


def _function_body(source: str, name: str) -> str:
start = source.index(f'def {name}(')
next_def = source.index('\ndef ', start + 1)
return source[start:next_def]


def _async_function_body(source: str, name: str) -> str:
start = source.index(f'async def {name}(')
end = source.find('\nasync def ', start + 1)
end2 = source.find('\ndef ', start + 1)
candidates = [e for e in (end, end2) if e != -1]
return source[start : min(candidates)] if candidates else source[start:]


def _load_turnstile_class():
"""Extract and exec the _OrderedTurnstile class without importing routers.sync
(which pulls in firestore/opuslib/etc.)."""
source = _read_sync_source()
start = source.index('class _OrderedTurnstile')
end = source.index('\ndef ', start)
class_src = source[start:end]
namespace = {'deque': deque, 'threading': threading, 'List': List}
exec('ORDERED_ASSIGNMENT_WAIT_SECONDS = 600\n' + class_src, namespace)
return namespace['_OrderedTurnstile']


# ---------------------------------------------------------------------------
# 1. _OrderedTurnstile behavior
# ---------------------------------------------------------------------------


class TestOrderedTurnstile:
def test_serializes_in_given_order_despite_reverse_readiness(self):
"""Threads become ready newest-first; assignment order must still be oldest-first."""
Turnstile = _load_turnstile_class()
keys = ['t1.wav', 't2.wav', 't3.wav', 't4.wav']
turnstile = Turnstile(keys)
order = []
order_lock = threading.Lock()

def worker(key, stt_delay):
time.sleep(stt_delay) # simulated parallel STT, newest finishes first
try:
assert turnstile.wait_turn(key, timeout=10)
with order_lock:
order.append(key)
finally:
turnstile.complete(key)

threads = [
threading.Thread(target=worker, args=(key, delay)) for key, delay in zip(keys, [0.2, 0.15, 0.1, 0.05])
]
for t in threads:
t.start()
for t in threads:
t.join(timeout=15)
assert order == keys, f'assignment must be chronological, got {order}'

def test_early_complete_without_wait_unblocks_followers(self):
"""A segment that short-circuits (silence) completes without waiting and must
not block later segments."""
Turnstile = _load_turnstile_class()
turnstile = Turnstile(['a', 'b'])
# 'a' never calls wait_turn (early return path) — only complete()
turnstile.complete('a')
assert turnstile.wait_turn('b', timeout=1), "'b' must proceed after 'a' completed early"

def test_wait_times_out_fail_open(self):
"""If an earlier segment hangs, wait_turn returns False instead of deadlocking."""
Turnstile = _load_turnstile_class()
turnstile = Turnstile(['a', 'b'])
t0 = time.monotonic()
assert turnstile.wait_turn('b', timeout=0.2) is False
assert time.monotonic() - t0 < 5

def test_first_key_proceeds_immediately(self):
Turnstile = _load_turnstile_class()
turnstile = Turnstile(['a', 'b', 'c'])
assert turnstile.wait_turn('a', timeout=0.1) is True

def test_out_of_order_completion_converges(self):
"""Completions arriving in arbitrary order still release the queue head correctly."""
Turnstile = _load_turnstile_class()
turnstile = Turnstile(['a', 'b', 'c'])
turnstile.complete('b') # later key done first (early return)
turnstile.complete('a')
assert turnstile.wait_turn('c', timeout=1) is True


# ---------------------------------------------------------------------------
# 2. Structural guards — callers actually use the turnstile
# ---------------------------------------------------------------------------


class TestCallersUseOrderedAssignment:
def test_process_segment_accepts_turnstile_and_releases_it(self):
body = _function_body(_read_sync_source(), 'process_segment')
assert 'turnstile' in body.split('):')[0], 'process_segment must accept a turnstile param'
assert 'wait_turn' in body, 'process_segment must wait its chronological turn'
assert 'finally:' in body and 'turnstile.complete(path)' in body, (
'process_segment must always release its turn (finally), ' 'or followers deadlock on early returns/errors'
)

def test_wait_turn_precedes_conversation_lookup(self):
body = _function_body(_read_sync_source(), 'process_segment')
assert body.index('wait_turn') < body.index(
'get_closest_conversation_to_timestamps'
), 'turn must be acquired before the closest-conversation lookup'

def test_v1_sorts_segments_and_passes_turnstile(self):
body = _async_function_body(_read_sync_source(), 'sync_local_files')
assert 'sorted(segmented_paths, key=get_timestamp_from_path)' in body
assert '_OrderedTurnstile(' in body
assert 'assignment_turnstile,' in body, 'v1 must pass the turnstile to process_segment'

def test_v2_sorts_segments_and_passes_turnstile(self):
body = _async_function_body(_read_sync_source(), '_run_full_pipeline_background_async')
assert 'sorted(segmented_paths, key=get_timestamp_from_path)' in body
assert '_OrderedTurnstile(' in body
assert 'assignment_turnstile,' in body, 'v2 must pass the turnstile to process_segment'

def test_both_pipelines_reprocess_merged_conversations(self):
source = _read_sync_source()
v1 = _async_function_body(source, 'sync_local_files')
v2 = _async_function_body(source, '_run_full_pipeline_background_async')
assert '_reprocess_merged_conversations' in v1
assert '_reprocess_merged_conversations' in v2

def test_merge_path_records_merged_conversation(self):
body = _function_body(_read_sync_source(), 'process_segment')
assert "_merged" in body, 'merge path must record conversations that gained segments'

def test_reprocess_helper_is_fail_safe(self):
body = _function_body(_read_sync_source(), '_reprocess_merged_conversations')
assert "pop('_merged'" in body, 'must pop the internal key so it never leaks into responses'
assert 'except Exception' in body, 'one failed reprocess must not fail the batch'
Loading