Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions backend/routers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
from utils.other.storage import (
get_syncing_file_temporal_signed_url,
delete_syncing_temporal_file,
schedule_syncing_temporal_file_deletion,
upload_syncing_temporal_file,
download_syncing_temporal_file,
download_audio_chunks_and_merge,
Expand Down Expand Up @@ -1016,12 +1017,7 @@ def process_segment(
):
try:
url = get_syncing_file_temporal_signed_url(path)

def delete_file():
time.sleep(480)
delete_syncing_temporal_file(path)

submit_with_context(storage_executor, delete_file)
schedule_syncing_temporal_file_deletion(path)

# Apply user transcription preferences (vocabulary, language, model)
prefs = transcription_prefs or {}
Expand Down
1 change: 1 addition & 0 deletions backend/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ pytest tests/unit/test_storage_upload_audio_chunk_data_protection.py -v
pytest tests/unit/test_storage_opus_encoding.py -v
pytest tests/unit/test_speech_profile_existence.py -v
pytest tests/unit/test_storage_fanout_limits.py -v
pytest tests/unit/test_deferred_blob_janitor.py -v
pytest tests/unit/test_people_conversations_500s.py -v
pytest tests/unit/test_firestore_read_ops_cache.py -v
pytest tests/unit/test_ws_auth_handshake.py -v
Expand Down
136 changes: 136 additions & 0 deletions backend/tests/unit/test_deferred_blob_janitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
"""
Tests for the deferred-deletion janitor (utils/other/deferred_delete.py).

One janitor thread + a due-time heap replace the previous per-file
time.sleep(480) on storage_executor, which parked ~70% of the pool's 128
threads as idle timers (#7531).

Behavioral tests exercise the real DeferredDeleter (the module has no heavy
imports). Structural tests verify the four former sleep-sites now use the
scheduler.
"""

import os
import threading
import time

from utils.other.deferred_delete import DeferredDeleter


def _read_source(rel_path):
base = os.path.join(os.path.dirname(__file__), '..', '..')
with open(os.path.join(base, rel_path), encoding='utf-8') as f:
return f.read()


def _wait_for(predicate, timeout=2.0):
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
if predicate():
return True
time.sleep(0.01)
return predicate()


class TestDeferredDeleterBehavior:
def test_deletes_after_delay(self):
deleted = []
d = DeferredDeleter(deleted.append)
d.schedule('a.wav', 0.05)
assert not deleted, 'must not delete before the delay elapses'
assert _wait_for(lambda: deleted == ['a.wav'])
assert d.pending_count() == 0

def test_out_of_order_schedules_delete_in_due_order(self):
deleted = []
lock = threading.Lock()

def record(path):
with lock:
deleted.append(path)

d = DeferredDeleter(record)
d.schedule('later.wav', 0.3)
d.schedule('sooner.wav', 0.05)
assert _wait_for(lambda: len(deleted) == 2)
assert deleted == ['sooner.wav', 'later.wav']

def test_earlier_schedule_interrupts_long_wait(self):
"""A near-term schedule arriving while the janitor waits on a far-future
item must fire on time, not after the far-future wait."""
deleted = []
d = DeferredDeleter(deleted.append)
d.schedule('far.wav', 30)
d.schedule('near.wav', 0.05)
assert _wait_for(lambda: 'near.wav' in deleted)
assert 'far.wav' not in deleted
assert d.pending_count() == 1

def test_failing_delete_does_not_kill_janitor(self):
deleted = []

def flaky(path):
if path == 'boom.wav':
raise RuntimeError('gcs down')
deleted.append(path)

d = DeferredDeleter(flaky)
d.schedule('boom.wav', 0.02)
d.schedule('ok.wav', 0.05)
assert _wait_for(lambda: deleted == ['ok.wav'])

def test_single_thread_reused_across_schedules(self):
d = DeferredDeleter(lambda path: None)
d.schedule('a.wav', 0.01)
first_thread = d._thread
assert _wait_for(lambda: d.pending_count() == 0)
d.schedule('b.wav', 0.01)
assert d._thread is first_thread
assert _wait_for(lambda: d.pending_count() == 0)

def test_janitor_restarts_if_killed_by_base_exception(self):
"""SystemExit/MemoryError bypass the except-Exception catch and kill the
thread; the next schedule() must notice and start a fresh janitor."""
deleted = []

def lethal_then_ok(path):
if path == 'lethal.wav':
raise SystemExit # BaseException: terminates the janitor thread
deleted.append(path)

d = DeferredDeleter(lethal_then_ok)
d.schedule('lethal.wav', 0.02)
dead_thread = d._thread
assert _wait_for(lambda: not dead_thread.is_alive())
d.schedule('after-death.wav', 0.02)
assert d._thread is not dead_thread, 'schedule() must replace a dead janitor'
assert _wait_for(lambda: deleted == ['after-death.wav'])

def test_many_pending_use_one_thread(self):
before = threading.active_count()
d = DeferredDeleter(lambda path: None)
for i in range(200):
d.schedule(f'{i}.wav', 60)
assert d.pending_count() == 200
assert threading.active_count() <= before + 1, '200 pending deletions must cost exactly one thread'


class TestSleepPatternRemoved:
def test_no_sleep_480_remains_in_backend(self):
for rel in ('routers/sync.py', 'utils/chat.py'):
assert 'time.sleep(480)' not in _read_source(rel), f'{rel} still parks threads as deletion timers'

def test_sync_uses_scheduler(self):
assert 'schedule_syncing_temporal_file_deletion(path)' in _read_source('routers/sync.py')

def test_chat_uses_scheduler_at_all_three_sites(self):
assert _read_source('utils/chat.py').count('schedule_syncing_temporal_file_deletion(path)') == 3

def test_storage_defines_scheduler_with_480_default(self):
src = _read_source('utils/other/storage.py')
assert 'SYNCING_TEMPORAL_DELETE_DELAY_SECONDS = 480' in src
assert 'def schedule_syncing_temporal_file_deletion' in src

def test_precache_sem_restored(self):
# 4 → 2 was load-shedding while the pool was full of sleepers (#7526)
assert '_PRECACHE_FILE_SEM = threading.BoundedSemaphore(4)' in _read_source('utils/other/storage.py')
10 changes: 7 additions & 3 deletions backend/tests/unit/test_storage_fanout_limits.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,14 @@ def test_precache_file_semaphore_exists(self):
src = _read_source('utils/other/storage.py')
assert '_PRECACHE_FILE_SEM' in src

def test_precache_file_semaphore_is_2(self):
"""Global precache file semaphore must be BoundedSemaphore(2)."""
def test_precache_file_semaphore_is_4(self):
"""Global precache file semaphore must be BoundedSemaphore(4).

Was 2 while storage_executor was saturated by per-file sleep(480)
deletion timers; restored to 4 once the janitor thread took those
over (see test_deferred_blob_janitor.py)."""
src = _read_source('utils/other/storage.py')
assert 'BoundedSemaphore(2)' in src
assert '_PRECACHE_FILE_SEM = threading.BoundedSemaphore(4)' in src

def test_precache_conversation_audio_uses_semaphore(self):
"""precache_conversation_audio must gate submissions with _PRECACHE_FILE_SEM."""
Expand Down
25 changes: 5 additions & 20 deletions backend/tests/unit/test_sync_silent_failure.py
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,7 @@ def setup_class(cls):
sys.modules['pydub'].AudioSegment = MagicMock()
sys.modules['utils.other.endpoints'].get_current_user_uid = MagicMock()
sys.modules['utils.other.storage'].get_syncing_file_temporal_signed_url = MagicMock(return_value='https://fake')
sys.modules['utils.other.storage'].schedule_syncing_temporal_file_deletion = MagicMock()
sys.modules['utils.other.storage'].delete_syncing_temporal_file = MagicMock()
sys.modules['utils.other.storage'].download_audio_chunks_and_merge = MagicMock()
sys.modules['utils.other.storage'].get_or_create_merged_audio = MagicMock()
Expand Down Expand Up @@ -1050,9 +1051,6 @@ class TestVoiceMessageRuntimeErrorHandling:
"""Tests that voice message functions in utils/chat.py handle RuntimeError from prerecorded."""

_saved_modules = {}
_storage_executor = None
_storage_executor_had_submit = False
_storage_executor_submit = None
_transcribe_fn = None
_process_fn = None
_process_stream_fn = None
Expand All @@ -1070,6 +1068,7 @@ def setup_class(cls):
sys.modules['fal_client'].submit = MagicMock()
sys.modules['utils.other.endpoints'].timeit = lambda f: f
sys.modules['utils.other.storage'].get_syncing_file_temporal_signed_url = MagicMock(return_value='https://fake')
sys.modules['utils.other.storage'].schedule_syncing_temporal_file_deletion = MagicMock()
sys.modules['utils.other.storage'].delete_syncing_temporal_file = MagicMock()
sys.modules['utils.notifications'].send_notification = MagicMock()
sys.modules['utils.retrieval.graph'].execute_graph_chat = MagicMock()
Expand Down Expand Up @@ -1116,26 +1115,12 @@ def setup_class(cls):
)
import utils.chat as chat_mod

cls._storage_executor = chat_mod.storage_executor
cls._storage_executor_had_submit = 'submit' in getattr(chat_mod.storage_executor, '__dict__', {})
cls._storage_executor_submit = chat_mod.storage_executor.submit
chat_mod.storage_executor.submit = MagicMock()

cls._transcribe_fn = staticmethod(transcribe_voice_message_segment)
cls._process_fn = staticmethod(process_voice_message_segment)
cls._process_stream_fn = staticmethod(process_voice_message_segment_stream)

@classmethod
def teardown_class(cls):
if cls._storage_executor is not None:
if cls._storage_executor_had_submit:
cls._storage_executor.submit = cls._storage_executor_submit
elif 'submit' in getattr(cls._storage_executor, '__dict__', {}):
delattr(cls._storage_executor, 'submit')
cls._storage_executor = None
cls._storage_executor_submit = None
cls._storage_executor_had_submit = False

sys.modules.pop('utils.chat', None)
for name, orig in cls._saved_modules.items():
if orig is None:
Expand All @@ -1149,7 +1134,7 @@ def test_transcribe_voice_message_handles_runtime_error(self):
with patch(
'utils.chat.prerecorded',
side_effect=RuntimeError('Deepgram transcription failed after 2 attempts: timeout'),
), patch('utils.chat.time.sleep'):
):
result = self._transcribe_fn('/tmp/test.wav', 'uid', language='en')

assert result == (None, 'en'), f"Expected (None, 'en'), got {result}"
Expand All @@ -1159,7 +1144,7 @@ def test_process_voice_message_handles_runtime_error(self):
with patch(
'utils.chat.prerecorded',
side_effect=RuntimeError('Deepgram transcription failed after 2 attempts: timeout'),
), patch('utils.chat.time.sleep'):
):
result = self._process_fn('/tmp/test.wav', 'uid', language='en')

assert result == [], f"Expected [], got {result}"
Expand All @@ -1173,7 +1158,7 @@ async def run():
with patch(
'utils.chat.prerecorded',
side_effect=RuntimeError('Deepgram transcription failed after 2 attempts: timeout'),
), patch('utils.chat.time.sleep'):
):
async for chunk in self._process_stream_fn('/tmp/test.wav', 'uid', language='en'):
chunks.append(chunk)
return chunks
Expand Down
26 changes: 4 additions & 22 deletions backend/utils/chat.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
import time
import base64
import uuid
from datetime import datetime, timezone
from typing import AsyncGenerator, List, Optional, Tuple

from utils.executors import storage_executor

import database.chat as chat_db
import database.notifications as notification_db
import database.users as user_db
Expand All @@ -17,7 +14,7 @@
from models.transcript_segment import TranscriptSegment
from utils.conversation_helpers import extract_memory_ids
from utils.notifications import send_notification
from utils.other.storage import get_syncing_file_temporal_signed_url, delete_syncing_temporal_file
from utils.other.storage import get_syncing_file_temporal_signed_url, schedule_syncing_temporal_file_deletion
from utils.retrieval.graph import execute_graph_chat, execute_graph_chat_stream
from utils.stt.pre_recorded import (
get_deepgram_model_for_language,
Expand Down Expand Up @@ -63,12 +60,7 @@ def transcribe_voice_message_segment(
language: str = 'multi',
) -> Tuple[Optional[str], Optional[str]]:
url = get_syncing_file_temporal_signed_url(path)

def delete_file():
time.sleep(480)
delete_syncing_temporal_file(path)

storage_executor.submit(delete_file)
schedule_syncing_temporal_file_deletion(path)

if not language:
language = resolve_voice_message_language(uid, None)
Expand Down Expand Up @@ -178,12 +170,7 @@ def process_voice_message_segment(
language: str = 'multi',
):
url = get_syncing_file_temporal_signed_url(path)

def delete_file():
time.sleep(480)
delete_syncing_temporal_file(path)

storage_executor.submit(delete_file)
schedule_syncing_temporal_file_deletion(path)

if not language:
language = resolve_voice_message_language(uid, None)
Expand Down Expand Up @@ -252,12 +239,7 @@ async def process_voice_message_segment_stream(
language: str = 'multi',
) -> AsyncGenerator[str, None]:
url = get_syncing_file_temporal_signed_url(path)

def delete_file():
time.sleep(480)
delete_syncing_temporal_file(path)

storage_executor.submit(delete_file)
schedule_syncing_temporal_file_deletion(path)

if not language:
language = resolve_voice_message_language(uid, None)
Expand Down
64 changes: 64 additions & 0 deletions backend/utils/other/deferred_delete.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
"""Single-thread deferred-deletion scheduler.

Replaces the previous pattern of parking an executor thread in
time.sleep(480) per file: at sync volume that kept ~70% of
storage_executor's 128 threads asleep as ad-hoc timers, which is what
drove the pool's repeated saturation (#7531). One daemon thread and a
due-time heap handle any number of pending deletions.

Best-effort by design: pending deletions are lost on process death, same
as the sleeping threads were; the syncing bucket's lifecycle rule is the
backstop.
"""

import heapq
import logging
import threading
import time
from typing import Callable

logger = logging.getLogger(__name__)


class DeferredDeleter:
def __init__(self, delete_fn: Callable[[str], None], name: str = 'deferred-delete-janitor'):
self._delete_fn = delete_fn
self._name = name
self._cond = threading.Condition()
self._heap = [] # (due_monotonic, seq, path)
self._seq = 0
self._thread = None

def schedule(self, path: str, delay_seconds: float) -> None:
"""Schedule path for deletion after delay_seconds. O(log n), never blocks."""
with self._cond:
self._seq += 1
heapq.heappush(self._heap, (time.monotonic() + delay_seconds, self._seq, path))
# is_alive() guard: restart the janitor if a BaseException
# (MemoryError, SystemExit) ever killed it — otherwise schedules
# would pile up silently for the rest of the process lifetime
if self._thread is None or not self._thread.is_alive():
self._thread = threading.Thread(target=self._run, name=self._name, daemon=True)
self._thread.start()
self._cond.notify()

def pending_count(self) -> int:
with self._cond:
return len(self._heap)

def _run(self):
while True:
with self._cond:
while not self._heap:
self._cond.wait()
due, _, path = self._heap[0]
delay = due - time.monotonic()
if delay > 0:
# A schedule() for an earlier due-time re-notifies and we re-peek
self._cond.wait(timeout=delay)
continue
heapq.heappop(self._heap)
try:
self._delete_fn(path)
except Exception as e:
logger.warning('deferred delete failed for %s: %s', path, e)
Loading
Loading