Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions backend/routers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
from utils.other.storage import (
get_syncing_file_temporal_signed_url,
delete_syncing_temporal_file,
schedule_syncing_temporal_file_deletion,
upload_syncing_temporal_file,
download_syncing_temporal_file,
download_audio_chunks_and_merge,
Expand Down Expand Up @@ -1016,12 +1017,7 @@ def process_segment(
):
try:
url = get_syncing_file_temporal_signed_url(path)

def delete_file():
time.sleep(480)
delete_syncing_temporal_file(path)

submit_with_context(storage_executor, delete_file)
schedule_syncing_temporal_file_deletion(path)

# Apply user transcription preferences (vocabulary, language, model)
prefs = transcription_prefs or {}
Expand Down
1 change: 1 addition & 0 deletions backend/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ pytest tests/unit/test_storage_upload_audio_chunk_data_protection.py -v
pytest tests/unit/test_storage_opus_encoding.py -v
pytest tests/unit/test_speech_profile_existence.py -v
pytest tests/unit/test_storage_fanout_limits.py -v
pytest tests/unit/test_deferred_blob_janitor.py -v
pytest tests/unit/test_people_conversations_500s.py -v
pytest tests/unit/test_firestore_read_ops_cache.py -v
pytest tests/unit/test_ws_auth_handshake.py -v
Expand Down
118 changes: 118 additions & 0 deletions backend/tests/unit/test_deferred_blob_janitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
"""
Tests for the deferred-deletion janitor (utils/other/deferred_delete.py).

One janitor thread + a due-time heap replace the previous per-file
time.sleep(480) on storage_executor, which parked ~70% of the pool's 128
threads as idle timers (#7531).

Behavioral tests exercise the real DeferredDeleter (the module has no heavy
imports). Structural tests verify the four former sleep-sites now use the
scheduler.
"""

import os
import threading
import time

from utils.other.deferred_delete import DeferredDeleter


def _read_source(rel_path):
base = os.path.join(os.path.dirname(__file__), '..', '..')
with open(os.path.join(base, rel_path), encoding='utf-8') as f:
return f.read()


def _wait_for(predicate, timeout=2.0):
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
if predicate():
return True
time.sleep(0.01)
return predicate()


class TestDeferredDeleterBehavior:
def test_deletes_after_delay(self):
deleted = []
d = DeferredDeleter(deleted.append)
d.schedule('a.wav', 0.05)
assert not deleted, 'must not delete before the delay elapses'
assert _wait_for(lambda: deleted == ['a.wav'])
assert d.pending_count() == 0

def test_out_of_order_schedules_delete_in_due_order(self):
deleted = []
lock = threading.Lock()

def record(path):
with lock:
deleted.append(path)

d = DeferredDeleter(record)
d.schedule('later.wav', 0.3)
d.schedule('sooner.wav', 0.05)
assert _wait_for(lambda: len(deleted) == 2)
assert deleted == ['sooner.wav', 'later.wav']

def test_earlier_schedule_interrupts_long_wait(self):
"""A near-term schedule arriving while the janitor waits on a far-future
item must fire on time, not after the far-future wait."""
deleted = []
d = DeferredDeleter(deleted.append)
d.schedule('far.wav', 30)
d.schedule('near.wav', 0.05)
assert _wait_for(lambda: 'near.wav' in deleted)
assert 'far.wav' not in deleted
assert d.pending_count() == 1

def test_failing_delete_does_not_kill_janitor(self):
deleted = []

def flaky(path):
if path == 'boom.wav':
raise RuntimeError('gcs down')
deleted.append(path)

d = DeferredDeleter(flaky)
d.schedule('boom.wav', 0.02)
d.schedule('ok.wav', 0.05)
assert _wait_for(lambda: deleted == ['ok.wav'])

def test_single_thread_reused_across_schedules(self):
d = DeferredDeleter(lambda path: None)
d.schedule('a.wav', 0.01)
first_thread = d._thread
assert _wait_for(lambda: d.pending_count() == 0)
d.schedule('b.wav', 0.01)
assert d._thread is first_thread
assert _wait_for(lambda: d.pending_count() == 0)

def test_many_pending_use_one_thread(self):
before = threading.active_count()
d = DeferredDeleter(lambda path: None)
for i in range(200):
d.schedule(f'{i}.wav', 60)
assert d.pending_count() == 200
assert threading.active_count() <= before + 1, '200 pending deletions must cost exactly one thread'


class TestSleepPatternRemoved:
def test_no_sleep_480_remains_in_backend(self):
for rel in ('routers/sync.py', 'utils/chat.py'):
assert 'time.sleep(480)' not in _read_source(rel), f'{rel} still parks threads as deletion timers'

def test_sync_uses_scheduler(self):
assert 'schedule_syncing_temporal_file_deletion(path)' in _read_source('routers/sync.py')

def test_chat_uses_scheduler_at_all_three_sites(self):
assert _read_source('utils/chat.py').count('schedule_syncing_temporal_file_deletion(path)') == 3

def test_storage_defines_scheduler_with_480_default(self):
src = _read_source('utils/other/storage.py')
assert 'SYNCING_TEMPORAL_DELETE_DELAY_SECONDS = 480' in src
assert 'def schedule_syncing_temporal_file_deletion' in src

def test_precache_sem_restored(self):
# 4 → 2 was load-shedding while the pool was full of sleepers (#7526)
assert '_PRECACHE_FILE_SEM = threading.BoundedSemaphore(4)' in _read_source('utils/other/storage.py')
10 changes: 7 additions & 3 deletions backend/tests/unit/test_storage_fanout_limits.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,14 @@ def test_precache_file_semaphore_exists(self):
src = _read_source('utils/other/storage.py')
assert '_PRECACHE_FILE_SEM' in src

def test_precache_file_semaphore_is_2(self):
"""Global precache file semaphore must be BoundedSemaphore(2)."""
def test_precache_file_semaphore_is_4(self):
"""Global precache file semaphore must be BoundedSemaphore(4).

Was 2 while storage_executor was saturated by per-file sleep(480)
deletion timers; restored to 4 once the janitor thread took those
over (see test_deferred_blob_janitor.py)."""
src = _read_source('utils/other/storage.py')
assert 'BoundedSemaphore(2)' in src
assert '_PRECACHE_FILE_SEM = threading.BoundedSemaphore(4)' in src

def test_precache_conversation_audio_uses_semaphore(self):
"""precache_conversation_audio must gate submissions with _PRECACHE_FILE_SEM."""
Expand Down
25 changes: 5 additions & 20 deletions backend/tests/unit/test_sync_silent_failure.py
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,7 @@ def setup_class(cls):
sys.modules['pydub'].AudioSegment = MagicMock()
sys.modules['utils.other.endpoints'].get_current_user_uid = MagicMock()
sys.modules['utils.other.storage'].get_syncing_file_temporal_signed_url = MagicMock(return_value='https://fake')
sys.modules['utils.other.storage'].schedule_syncing_temporal_file_deletion = MagicMock()
sys.modules['utils.other.storage'].delete_syncing_temporal_file = MagicMock()
sys.modules['utils.other.storage'].download_audio_chunks_and_merge = MagicMock()
sys.modules['utils.other.storage'].get_or_create_merged_audio = MagicMock()
Expand Down Expand Up @@ -1050,9 +1051,6 @@ class TestVoiceMessageRuntimeErrorHandling:
"""Tests that voice message functions in utils/chat.py handle RuntimeError from prerecorded."""

_saved_modules = {}
_storage_executor = None
_storage_executor_had_submit = False
_storage_executor_submit = None
_transcribe_fn = None
_process_fn = None
_process_stream_fn = None
Expand All @@ -1070,6 +1068,7 @@ def setup_class(cls):
sys.modules['fal_client'].submit = MagicMock()
sys.modules['utils.other.endpoints'].timeit = lambda f: f
sys.modules['utils.other.storage'].get_syncing_file_temporal_signed_url = MagicMock(return_value='https://fake')
sys.modules['utils.other.storage'].schedule_syncing_temporal_file_deletion = MagicMock()
sys.modules['utils.other.storage'].delete_syncing_temporal_file = MagicMock()
sys.modules['utils.notifications'].send_notification = MagicMock()
sys.modules['utils.retrieval.graph'].execute_graph_chat = MagicMock()
Expand Down Expand Up @@ -1116,26 +1115,12 @@ def setup_class(cls):
)
import utils.chat as chat_mod

cls._storage_executor = chat_mod.storage_executor
cls._storage_executor_had_submit = 'submit' in getattr(chat_mod.storage_executor, '__dict__', {})
cls._storage_executor_submit = chat_mod.storage_executor.submit
chat_mod.storage_executor.submit = MagicMock()

cls._transcribe_fn = staticmethod(transcribe_voice_message_segment)
cls._process_fn = staticmethod(process_voice_message_segment)
cls._process_stream_fn = staticmethod(process_voice_message_segment_stream)

@classmethod
def teardown_class(cls):
if cls._storage_executor is not None:
if cls._storage_executor_had_submit:
cls._storage_executor.submit = cls._storage_executor_submit
elif 'submit' in getattr(cls._storage_executor, '__dict__', {}):
delattr(cls._storage_executor, 'submit')
cls._storage_executor = None
cls._storage_executor_submit = None
cls._storage_executor_had_submit = False

sys.modules.pop('utils.chat', None)
for name, orig in cls._saved_modules.items():
if orig is None:
Expand All @@ -1149,7 +1134,7 @@ def test_transcribe_voice_message_handles_runtime_error(self):
with patch(
'utils.chat.prerecorded',
side_effect=RuntimeError('Deepgram transcription failed after 2 attempts: timeout'),
), patch('utils.chat.time.sleep'):
):
result = self._transcribe_fn('/tmp/test.wav', 'uid', language='en')

assert result == (None, 'en'), f"Expected (None, 'en'), got {result}"
Expand All @@ -1159,7 +1144,7 @@ def test_process_voice_message_handles_runtime_error(self):
with patch(
'utils.chat.prerecorded',
side_effect=RuntimeError('Deepgram transcription failed after 2 attempts: timeout'),
), patch('utils.chat.time.sleep'):
):
result = self._process_fn('/tmp/test.wav', 'uid', language='en')

assert result == [], f"Expected [], got {result}"
Expand All @@ -1173,7 +1158,7 @@ async def run():
with patch(
'utils.chat.prerecorded',
side_effect=RuntimeError('Deepgram transcription failed after 2 attempts: timeout'),
), patch('utils.chat.time.sleep'):
):
async for chunk in self._process_stream_fn('/tmp/test.wav', 'uid', language='en'):
chunks.append(chunk)
return chunks
Expand Down
26 changes: 4 additions & 22 deletions backend/utils/chat.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
import time
import base64
import uuid
from datetime import datetime, timezone
from typing import AsyncGenerator, List, Optional, Tuple

from utils.executors import storage_executor

import database.chat as chat_db
import database.notifications as notification_db
import database.users as user_db
Expand All @@ -17,7 +14,7 @@
from models.transcript_segment import TranscriptSegment
from utils.conversation_helpers import extract_memory_ids
from utils.notifications import send_notification
from utils.other.storage import get_syncing_file_temporal_signed_url, delete_syncing_temporal_file
from utils.other.storage import get_syncing_file_temporal_signed_url, schedule_syncing_temporal_file_deletion
from utils.retrieval.graph import execute_graph_chat, execute_graph_chat_stream
from utils.stt.pre_recorded import (
get_deepgram_model_for_language,
Expand Down Expand Up @@ -63,12 +60,7 @@ def transcribe_voice_message_segment(
language: str = 'multi',
) -> Tuple[Optional[str], Optional[str]]:
url = get_syncing_file_temporal_signed_url(path)

def delete_file():
time.sleep(480)
delete_syncing_temporal_file(path)

storage_executor.submit(delete_file)
schedule_syncing_temporal_file_deletion(path)

if not language:
language = resolve_voice_message_language(uid, None)
Expand Down Expand Up @@ -178,12 +170,7 @@ def process_voice_message_segment(
language: str = 'multi',
):
url = get_syncing_file_temporal_signed_url(path)

def delete_file():
time.sleep(480)
delete_syncing_temporal_file(path)

storage_executor.submit(delete_file)
schedule_syncing_temporal_file_deletion(path)

if not language:
language = resolve_voice_message_language(uid, None)
Expand Down Expand Up @@ -252,12 +239,7 @@ async def process_voice_message_segment_stream(
language: str = 'multi',
) -> AsyncGenerator[str, None]:
url = get_syncing_file_temporal_signed_url(path)

def delete_file():
time.sleep(480)
delete_syncing_temporal_file(path)

storage_executor.submit(delete_file)
schedule_syncing_temporal_file_deletion(path)

if not language:
language = resolve_voice_message_language(uid, None)
Expand Down
61 changes: 61 additions & 0 deletions backend/utils/other/deferred_delete.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
"""Single-thread deferred-deletion scheduler.

Replaces the previous pattern of parking an executor thread in
time.sleep(480) per file: at sync volume that kept ~70% of
storage_executor's 128 threads asleep as ad-hoc timers, which is what
drove the pool's repeated saturation (#7531). One daemon thread and a
due-time heap handle any number of pending deletions.

Best-effort by design: pending deletions are lost on process death, same
as the sleeping threads were; the syncing bucket's lifecycle rule is the
backstop.
"""

import heapq
import logging
import threading
import time
from typing import Callable

logger = logging.getLogger(__name__)


class DeferredDeleter:
def __init__(self, delete_fn: Callable[[str], None], name: str = 'deferred-delete-janitor'):
self._delete_fn = delete_fn
self._name = name
self._cond = threading.Condition()
self._heap = [] # (due_monotonic, seq, path)
self._seq = 0
self._thread = None

def schedule(self, path: str, delay_seconds: float) -> None:
"""Schedule path for deletion after delay_seconds. O(log n), never blocks."""
with self._cond:
self._seq += 1
heapq.heappush(self._heap, (time.monotonic() + delay_seconds, self._seq, path))
if self._thread is None:
self._thread = threading.Thread(target=self._run, name=self._name, daemon=True)
self._thread.start()
self._cond.notify()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Dead janitor thread silently stops all future deletions

_thread is set once and never cleared. If _run exits unexpectedly — e.g., via a BaseException subclass like MemoryError or SystemExit that bypasses the except Exception catch — _thread will still point to a dead Thread object. Every subsequent schedule() call skips the if self._thread is None: branch, items pile up in the heap, and no deletion ever fires. The lifecycle rule is the backstop, but this is a silent failure rather than a logged one. Changing the guard to if self._thread is None or not self._thread.is_alive(): would restart the janitor in the rare case it dies.


def pending_count(self) -> int:
with self._cond:
return len(self._heap)

def _run(self):
while True:
with self._cond:
while not self._heap:
self._cond.wait()
due, _, path = self._heap[0]
delay = due - time.monotonic()
if delay > 0:
# A schedule() for an earlier due-time re-notifies and we re-peek
self._cond.wait(timeout=delay)
continue
heapq.heappop(self._heap)
try:
self._delete_fn(path)
except Exception as e:
logger.warning('deferred delete failed for %s: %s', path, e)
Loading
Loading