Skip to content

fix(backend): offline-sync chunks split into separate conversations — serialize assignment chronologically (#6551)#7819

Merged
kodjima33 merged 3 commits into
mainfrom
watchdog/issue-6551-ordered-sync-assignment
Jun 11, 2026
Merged

fix(backend): offline-sync chunks split into separate conversations — serialize assignment chronologically (#6551)#7819
kodjima33 merged 3 commits into
mainfrom
watchdog/issue-6551-ordered-sync-assignment

Conversation

@kodjima33

Copy link
Copy Markdown
Collaborator

Fixes #6551, addresses the main symptom of #5747.

Bug

When a pendant/device reconnects and batch-syncs backlog audio, chunks separated by only seconds are saved as separate conversations, ignoring any merge window. Users get a cluttered conversation list and must merge manually (which re-runs server compute).

Root cause

Both sync pipelines process VAD segments in parallel:

  • v1 sync_local_files: one asyncio.gather over ALL segments
  • v2 _run_full_pipeline_background_async: chunks of 5 in parallel, from an unordered set

Each process_segment() independently runs get_closest_conversation_to_timestamps() (±2 min window) and then creates/merges. Timestamp-adjacent segments race: none of them has persisted a conversation when the others look, so every chunk creates its own conversation. The ±2 min merge window never gets a chance to work.

Fix

  • Segments are sorted chronologically (get_timestamp_from_path) in both pipelines.
  • A small _OrderedTurnstile (condition variable) serializes only the conversation lookup/create/merge step in timestamp order — STT stays fully parallel. Fail-open: a 600s wait timeout proceeds out of order rather than deadlocking; early-return paths (silence) release their turn via finally.
  • Deadlock-safe: tasks are submitted to the FIFO sync_executor in chronological order, so a waiter only ever waits on segments that are already running or done; process_conversation uses only llm/db/postprocess executors (no nested sync_executor use).
  • v2 per-segment wait_for timeout widened by in-chunk position (300 + 60·j) so later segments' turnstile wait can't cause spurious timeouts.
  • Conversations that gained merged segments are reprocessed once per batch (_reprocess_merged_conversations), mirroring manual-merge behavior — otherwise the merged conversation keeps the summary generated from its first chunk only.

Verification

  • Behavioral simulation against the real process_segment (stubbed Firestore/DG/LLM, 3 chunks at T, T+30s, T+60s, STT finishing newest-first):
    • before: 3 separate conversations (reproduces the issue)
    • after: 1 conversation, segments merged in order, started_at/finished_at span correct, exactly 1 batch-end reprocess
  • 12 new unit tests in tests/unit/test_sync_ordered_assignment.py (turnstile serialization/fail-open/early-complete + structural guards on both pipelines), registered in test.sh; all pass locally via stdlib harness (pytest unavailable locally — CI runs the suite).
  • All existing structural assertions in test_sync_silent_failure.py re-verified to still pass against the modified source.
  • black clean, scan_async_blockers/lint_async_blockers clean, py_compile OK.
  • UNVERIFIED at runtime against a live pendant backlog sync.

Notes / residual

  • The merge window stays the existing hardcoded ±2 min; honoring the user's Conversation Timeout setting (e.g. 10 min) server-side would need the app to pass it to /v2/sync-local-files (it's currently only a /v4/listen query param) — left out of scope.
  • Cross-request races (two concurrent sync batches) are not covered; in-batch is the reported case.

🤖 automated by hourly watchdog; opened for review, not merged.

@greptile-apps

greptile-apps Bot commented Jun 11, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR fixes a race condition in offline-sync pipelines where parallel processing of backlog audio chunks caused timestamp-adjacent segments to each independently create a new conversation rather than merging. The fix sorts segments chronologically and introduces _OrderedTurnstile, a condition-variable-based primitive that serializes only the conversation lookup/create/merge step while leaving STT fully parallel.

  • _OrderedTurnstile is added to sync.py and wired into both the v1 sync_local_files endpoint and the v2 _run_full_pipeline_background_async pipeline; segments are now sorted before submission and each waits its chronological turn before touching Firestore.
  • A deferred _reprocess_merged_conversations call is appended after each batch completes so merged conversations get their summaries regenerated once rather than once per merged segment; 12 new unit tests cover the turnstile semantics and structural guards on both pipelines.

Confidence Score: 4/5

Safe to merge; the turnstile logic is correct and both pipelines are wired symmetrically.

The _OrderedTurnstile design is sound: _advance() is always called under the condition lock, silent-segment early returns release the turnstile via finally without deadlocking followers, and the chunk-based v2 pipeline avoids cross-chunk races. The only issues found are a hardcoded timeout string in an error message and a subtle side-effect idiom in the predicate lambda.

backend/routers/sync.py — specifically the per-chunk timeout error message and the _advance() predicate comment.

Important Files Changed

Filename Overview
backend/routers/sync.py Core change: adds _OrderedTurnstile class, wires it into process_segment (wait_turn/complete), sorts segments chronologically in both pipelines, and adds _reprocess_merged_conversations. Logic is sound; one misleading error message and a subtle _advance() side-effect idiom noted.
backend/tests/unit/test_sync_ordered_assignment.py New test file: 12 tests covering turnstile serialization, fail-open timeout, early-complete unblocking, and structural guards confirming both pipelines sort, pass, and call the turnstile correctly.
backend/test.sh Adds test_sync_ordered_assignment.py to the test suite; one-line change, correct placement.

Sequence Diagram

sequenceDiagram
    participant C as Caller
    participant T as _OrderedTurnstile
    participant PS1 as process_segment(T1)
    participant PS2 as process_segment(T2)
    participant PS3 as process_segment(T3)
    participant FS as Firestore

    C->>T: _OrderedTurnstile([T1, T2, T3])
    par STT runs in parallel
        C->>PS1: run_blocking(sync_executor, T1)
        C->>PS2: run_blocking(sync_executor, T2)
        C->>PS3: run_blocking(sync_executor, T3)
    end
    PS3-->>T: wait_turn(T3) blocks
    PS2-->>T: wait_turn(T2) blocks
    PS1-->>T: wait_turn(T1) proceeds immediately
    PS1->>FS: get_closest_conversation / create C1
    PS1-->>T: complete(T1)
    T-->>PS2: unblocked
    PS2->>FS: get_closest_conversation finds C1 merges
    PS2-->>T: complete(T2)
    T-->>PS3: unblocked
    PS3->>FS: get_closest_conversation finds C1 merges
    PS3-->>T: complete(T3)
    C->>FS: _reprocess_merged_conversations reprocess C1 once
Loading

Comments Outside Diff (1)

  1. backend/routers/sync.py, line 1643-1646 (link)

    P2 The error message is hardcoded to '300s' but the actual asyncio.wait_for timeout for chunk position j is 300 + 60 * j seconds — up to 540 s for the last segment in a 5-element chunk. A misleading timeout in segment_errors makes post-mortem debugging harder, especially since this error surfaces in the sync-job result returned to clients.

Reviews (1): Last reviewed commit: "test(backend): register test_sync_ordere..." | Re-trigger Greptile

Comment thread backend/routers/sync.py
Comment on lines +976 to +979
with self._cond:
return self._cond.wait_for(
lambda: self._advance() or not self._pending or self._pending[0] == key, timeout=timeout
)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 _advance() side-effect in or predicate is easy to misread

_advance() always returns None (implicitly), so the or chain is effectively None or not self._pending or self._pending[0] == key. The call exists purely for its side effect of mutating _pending before the subsequent checks. A future reader could reasonably assume the branch short-circuits on a truthy result from _advance() and add an early return there, breaking the conditional entirely. A brief inline comment (e.g. # _advance() mutates _pending; its return value is intentionally None) would make the intent explicit without changing the logic.

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

@kodjima33 kodjima33 merged commit 52c6ce7 into main Jun 11, 2026
3 checks passed
@kodjima33 kodjima33 deleted the watchdog/issue-6551-ordered-sync-assignment branch June 11, 2026 14:40
mdmohsin7 added a commit that referenced this pull request Jun 11, 2026
Resolves sync.py conflicts with #7819 (chronological assignment turnstile):
- process_segment keeps both the turnstile finally-release and the
  success-bool return used by the processed-segment ledger
- coordinator combines the ledger skip with the ordered turnstile;
  skipped segments release their assignment slot so later segments
  don't stall waiting for a turn that would never complete
mdmohsin7 added a commit that referenced this pull request Jun 12, 2026
…gle janitor thread (#7855)

## Problem

Four call sites (1 in `routers/sync.py`, 3 voice-message flows in
`utils/chat.py`) delete temporal GCS blobs by parking a
`storage_executor` thread in `time.sleep(480)` per file. At current sync
volume (~20 jobs/min post-#7801) that keeps **~90 of the pool's 128
threads asleep as ad-hoc timers** — confirmed in prod
`executor_pool_health` logs (~70–80% \"utilization\" with queue_depth 0,
i.e. occupancy without work).

This is the remaining root cause behind #7531's storage-pool saturation:
the pool was bumped 32→64→96→128 in ten days and `_PRECACHE_FILE_SEM`
was halved (#7526) largely to feed threads whose only job was waiting.

## Change

- **New `utils/other/deferred_delete.py`**: `DeferredDeleter` — a
due-time min-heap plus one lazily-started daemon thread. `schedule()` is
an O(log n) heap push; the janitor wakes when the next deletion is due.
An earlier-due schedule arriving mid-wait re-notifies and re-peeks, so
ordering holds. Delete failures are logged and skipped (the syncing
bucket's lifecycle rule remains the backstop, exactly as it was for the
sleeping threads).
- **`storage.py`**: `schedule_syncing_temporal_file_deletion(path,
delay=480s)` wraps a module-level janitor bound to
`delete_syncing_temporal_file`. Same 480s semantics (under the 15-min
signed-URL validity).
- **All four call sites** switch to the scheduler; `chat.py` drops its
now-unused `storage_executor`/`time` imports.
- **`_PRECACHE_FILE_SEM` 2 → 4**, reverting #7526's load-shed now that
the sleepers are gone — audio merge/precache get the freed headroom.

Deletion timing, crash semantics (pending deletions die with the process
either way), and the lifecycle backstop are all unchanged. Hundreds of
pending deletions now cost one thread instead of hundreds.

## Expected impact (measurable immediately after deploy)

`executor_pool_health` storage `active_count` should drop from ~90–100
to real work only (~5–15). That single number is the before/after check.

## Tests

- New `tests/unit/test_deferred_blob_janitor.py` (11 tests): real-module
behavioral coverage (due-order with out-of-order schedules, near-term
schedule interrupting a long wait, failure doesn't kill the janitor, 200
pending = exactly 1 thread) + structural guards (no `time.sleep(480)`
remains, all four sites use the scheduler, sem = 4). Registered in
`test.sh`.
- `test_storage_fanout_limits.py`: semaphore value assertion updated 2 →
4 with rationale.
- `test_sync_silent_failure.py`: removed the executor-swap
setup/teardown machinery that existed solely to neutralize the old
sleeping deleters in tests; all 41 pass.
- `scan_async_blockers.py`: no new findings (the one `chat.py` hit
pre-exists on main).
- `test_sync_v2.py` remains at the main baseline (5 pre-existing
failures from #7819, unrelated).

🤖 Generated with [Claude Code](https://claude.com/claude-code)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Limitless Pendant] Backlog-synced conversations split into chunks, ignoring Conversation Timeout setting

1 participant