From b5acc75ceef654d962ba3b093019fdbbe4d1604f Mon Sep 17 00:00:00 2001 From: Mohammed Mohsin Date: Fri, 12 Jun 2026 15:40:59 +0530 Subject: [PATCH 01/10] feat(backend): single-thread deferred-deletion scheduler MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit One daemon thread + a due-time heap replace the per-file time.sleep(480) pattern that parked a storage_executor thread per blob — ~70% of the pool idle as ad-hoc timers at sync volume (#7531). --- backend/utils/other/deferred_delete.py | 61 ++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) create mode 100644 backend/utils/other/deferred_delete.py diff --git a/backend/utils/other/deferred_delete.py b/backend/utils/other/deferred_delete.py new file mode 100644 index 0000000000..3cdcec2364 --- /dev/null +++ b/backend/utils/other/deferred_delete.py @@ -0,0 +1,61 @@ +"""Single-thread deferred-deletion scheduler. + +Replaces the previous pattern of parking an executor thread in +time.sleep(480) per file: at sync volume that kept ~70% of +storage_executor's 128 threads asleep as ad-hoc timers, which is what +drove the pool's repeated saturation (#7531). One daemon thread and a +due-time heap handle any number of pending deletions. + +Best-effort by design: pending deletions are lost on process death, same +as the sleeping threads were; the syncing bucket's lifecycle rule is the +backstop. +""" + +import heapq +import logging +import threading +import time +from typing import Callable + +logger = logging.getLogger(__name__) + + +class DeferredDeleter: + def __init__(self, delete_fn: Callable[[str], None], name: str = 'deferred-delete-janitor'): + self._delete_fn = delete_fn + self._name = name + self._cond = threading.Condition() + self._heap = [] # (due_monotonic, seq, path) + self._seq = 0 + self._thread = None + + def schedule(self, path: str, delay_seconds: float) -> None: + """Schedule path for deletion after delay_seconds. O(log n), never blocks.""" + with self._cond: + self._seq += 1 + heapq.heappush(self._heap, (time.monotonic() + delay_seconds, self._seq, path)) + if self._thread is None: + self._thread = threading.Thread(target=self._run, name=self._name, daemon=True) + self._thread.start() + self._cond.notify() + + def pending_count(self) -> int: + with self._cond: + return len(self._heap) + + def _run(self): + while True: + with self._cond: + while not self._heap: + self._cond.wait() + due, _, path = self._heap[0] + delay = due - time.monotonic() + if delay > 0: + # A schedule() for an earlier due-time re-notifies and we re-peek + self._cond.wait(timeout=delay) + continue + heapq.heappop(self._heap) + try: + self._delete_fn(path) + except Exception as e: + logger.warning('deferred delete failed for %s: %s', path, e) From 9b8180ce3b98c4484fa37af2a4398f0da6f2de0a Mon Sep 17 00:00:00 2001 From: Mohammed Mohsin Date: Fri, 12 Jun 2026 15:40:59 +0530 Subject: [PATCH 02/10] feat(storage): schedule_syncing_temporal_file_deletion via janitor; restore precache sem to 4 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The 4→2 cut (#7526) was load-shedding while the pool was full of sleeping deletion timers; with the janitor holding those, precache gets its concurrency back. --- backend/utils/other/storage.py | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/backend/utils/other/storage.py b/backend/utils/other/storage.py index df95cdc00f..ecfbbafa9c 100644 --- a/backend/utils/other/storage.py +++ b/backend/utils/other/storage.py @@ -19,6 +19,7 @@ from database.redis_db import cache_signed_url, get_cached_signed_url from utils import encryption +from utils.other.deferred_delete import DeferredDeleter from database import users as users_db import logging @@ -26,7 +27,10 @@ # Per-request fan-out limits for storage_executor (#7387) _STORAGE_CHUNK_SEM = threading.BoundedSemaphore(32) -_PRECACHE_FILE_SEM = threading.BoundedSemaphore(2) +# 4 → 2 in #7526 was load-shedding while the pool was full of sleeping +# per-file deletion timers; restored to 4 now that the janitor thread +# (deferred_delete.py) holds those instead of pool threads. +_PRECACHE_FILE_SEM = threading.BoundedSemaphore(4) _CHUNK_WINDOW_SIZE = 8 _merge_tracker_lock = threading.Lock() @@ -342,6 +346,24 @@ def delete_syncing_temporal_file(file_path: str): pass +# Long enough for every signed-URL consumer (Deepgram fetch, speaker-ID +# download) to finish; the URLs themselves expire at 15 minutes. +SYNCING_TEMPORAL_DELETE_DELAY_SECONDS = 480 + +_syncing_temporal_deleter = DeferredDeleter(delete_syncing_temporal_file, name='syncing-blob-janitor') + + +def schedule_syncing_temporal_file_deletion( + file_path: str, delay_seconds: float = SYNCING_TEMPORAL_DELETE_DELAY_SECONDS +): + """Delete a temporal syncing blob once its signed-URL consumers are done. + + One janitor thread + a due-time heap, instead of the previous per-file + time.sleep(480) that parked a storage_executor thread per blob (#7531). + """ + _syncing_temporal_deleter.schedule(file_path, delay_seconds) + + def upload_syncing_temporal_file(file_path: str): """Stage a local file in the syncing bucket (blob name = local relative path).""" bucket = storage_client.bucket(syncing_local_bucket) From f27ffd7d47f44df843c89ec525ff8b9d51cf602e Mon Sep 17 00:00:00 2001 From: Mohammed Mohsin Date: Fri, 12 Jun 2026 15:40:59 +0530 Subject: [PATCH 03/10] refactor(sync): use deferred-deletion janitor for segment wav cleanup --- backend/routers/sync.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/backend/routers/sync.py b/backend/routers/sync.py index d92fbabf9d..d4d586eb15 100644 --- a/backend/routers/sync.py +++ b/backend/routers/sync.py @@ -61,6 +61,7 @@ from utils.other.storage import ( get_syncing_file_temporal_signed_url, delete_syncing_temporal_file, + schedule_syncing_temporal_file_deletion, upload_syncing_temporal_file, download_syncing_temporal_file, download_audio_chunks_and_merge, @@ -1016,12 +1017,7 @@ def process_segment( ): try: url = get_syncing_file_temporal_signed_url(path) - - def delete_file(): - time.sleep(480) - delete_syncing_temporal_file(path) - - submit_with_context(storage_executor, delete_file) + schedule_syncing_temporal_file_deletion(path) # Apply user transcription preferences (vocabulary, language, model) prefs = transcription_prefs or {} From dd0a6ab0ff4aa6d6e95dc183607dc453812eb03a Mon Sep 17 00:00:00 2001 From: Mohammed Mohsin Date: Fri, 12 Jun 2026 15:41:00 +0530 Subject: [PATCH 04/10] refactor(chat): use deferred-deletion janitor at all three voice-message sites Drops the now-unused storage_executor and time imports. --- backend/utils/chat.py | 26 ++++---------------------- 1 file changed, 4 insertions(+), 22 deletions(-) diff --git a/backend/utils/chat.py b/backend/utils/chat.py index 08d5e1098b..2f09a27d71 100644 --- a/backend/utils/chat.py +++ b/backend/utils/chat.py @@ -1,11 +1,8 @@ -import time import base64 import uuid from datetime import datetime, timezone from typing import AsyncGenerator, List, Optional, Tuple -from utils.executors import storage_executor - import database.chat as chat_db import database.notifications as notification_db import database.users as user_db @@ -17,7 +14,7 @@ from models.transcript_segment import TranscriptSegment from utils.conversation_helpers import extract_memory_ids from utils.notifications import send_notification -from utils.other.storage import get_syncing_file_temporal_signed_url, delete_syncing_temporal_file +from utils.other.storage import get_syncing_file_temporal_signed_url, schedule_syncing_temporal_file_deletion from utils.retrieval.graph import execute_graph_chat, execute_graph_chat_stream from utils.stt.pre_recorded import ( get_deepgram_model_for_language, @@ -63,12 +60,7 @@ def transcribe_voice_message_segment( language: str = 'multi', ) -> Tuple[Optional[str], Optional[str]]: url = get_syncing_file_temporal_signed_url(path) - - def delete_file(): - time.sleep(480) - delete_syncing_temporal_file(path) - - storage_executor.submit(delete_file) + schedule_syncing_temporal_file_deletion(path) if not language: language = resolve_voice_message_language(uid, None) @@ -178,12 +170,7 @@ def process_voice_message_segment( language: str = 'multi', ): url = get_syncing_file_temporal_signed_url(path) - - def delete_file(): - time.sleep(480) - delete_syncing_temporal_file(path) - - storage_executor.submit(delete_file) + schedule_syncing_temporal_file_deletion(path) if not language: language = resolve_voice_message_language(uid, None) @@ -252,12 +239,7 @@ async def process_voice_message_segment_stream( language: str = 'multi', ) -> AsyncGenerator[str, None]: url = get_syncing_file_temporal_signed_url(path) - - def delete_file(): - time.sleep(480) - delete_syncing_temporal_file(path) - - storage_executor.submit(delete_file) + schedule_syncing_temporal_file_deletion(path) if not language: language = resolve_voice_message_language(uid, None) From 85cf7e1948a2e4c3a6d34f0e93e44ac15e55306f Mon Sep 17 00:00:00 2001 From: Mohammed Mohsin Date: Fri, 12 Jun 2026 15:41:55 +0530 Subject: [PATCH 05/10] test(backend): behavioral + structural coverage for the deletion janitor --- .../tests/unit/test_deferred_blob_janitor.py | 118 ++++++++++++++++++ 1 file changed, 118 insertions(+) create mode 100644 backend/tests/unit/test_deferred_blob_janitor.py diff --git a/backend/tests/unit/test_deferred_blob_janitor.py b/backend/tests/unit/test_deferred_blob_janitor.py new file mode 100644 index 0000000000..3cb2727378 --- /dev/null +++ b/backend/tests/unit/test_deferred_blob_janitor.py @@ -0,0 +1,118 @@ +""" +Tests for the deferred-deletion janitor (utils/other/deferred_delete.py). + +One janitor thread + a due-time heap replace the previous per-file +time.sleep(480) on storage_executor, which parked ~70% of the pool's 128 +threads as idle timers (#7531). + +Behavioral tests exercise the real DeferredDeleter (the module has no heavy +imports). Structural tests verify the four former sleep-sites now use the +scheduler. +""" + +import os +import threading +import time + +from utils.other.deferred_delete import DeferredDeleter + + +def _read_source(rel_path): + base = os.path.join(os.path.dirname(__file__), '..', '..') + with open(os.path.join(base, rel_path), encoding='utf-8') as f: + return f.read() + + +def _wait_for(predicate, timeout=2.0): + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + if predicate(): + return True + time.sleep(0.01) + return predicate() + + +class TestDeferredDeleterBehavior: + def test_deletes_after_delay(self): + deleted = [] + d = DeferredDeleter(deleted.append) + d.schedule('a.wav', 0.05) + assert not deleted, 'must not delete before the delay elapses' + assert _wait_for(lambda: deleted == ['a.wav']) + assert d.pending_count() == 0 + + def test_out_of_order_schedules_delete_in_due_order(self): + deleted = [] + lock = threading.Lock() + + def record(path): + with lock: + deleted.append(path) + + d = DeferredDeleter(record) + d.schedule('later.wav', 0.3) + d.schedule('sooner.wav', 0.05) + assert _wait_for(lambda: len(deleted) == 2) + assert deleted == ['sooner.wav', 'later.wav'] + + def test_earlier_schedule_interrupts_long_wait(self): + """A near-term schedule arriving while the janitor waits on a far-future + item must fire on time, not after the far-future wait.""" + deleted = [] + d = DeferredDeleter(deleted.append) + d.schedule('far.wav', 30) + d.schedule('near.wav', 0.05) + assert _wait_for(lambda: 'near.wav' in deleted) + assert 'far.wav' not in deleted + assert d.pending_count() == 1 + + def test_failing_delete_does_not_kill_janitor(self): + deleted = [] + + def flaky(path): + if path == 'boom.wav': + raise RuntimeError('gcs down') + deleted.append(path) + + d = DeferredDeleter(flaky) + d.schedule('boom.wav', 0.02) + d.schedule('ok.wav', 0.05) + assert _wait_for(lambda: deleted == ['ok.wav']) + + def test_single_thread_reused_across_schedules(self): + d = DeferredDeleter(lambda path: None) + d.schedule('a.wav', 0.01) + first_thread = d._thread + assert _wait_for(lambda: d.pending_count() == 0) + d.schedule('b.wav', 0.01) + assert d._thread is first_thread + assert _wait_for(lambda: d.pending_count() == 0) + + def test_many_pending_use_one_thread(self): + before = threading.active_count() + d = DeferredDeleter(lambda path: None) + for i in range(200): + d.schedule(f'{i}.wav', 60) + assert d.pending_count() == 200 + assert threading.active_count() <= before + 1, '200 pending deletions must cost exactly one thread' + + +class TestSleepPatternRemoved: + def test_no_sleep_480_remains_in_backend(self): + for rel in ('routers/sync.py', 'utils/chat.py'): + assert 'time.sleep(480)' not in _read_source(rel), f'{rel} still parks threads as deletion timers' + + def test_sync_uses_scheduler(self): + assert 'schedule_syncing_temporal_file_deletion(path)' in _read_source('routers/sync.py') + + def test_chat_uses_scheduler_at_all_three_sites(self): + assert _read_source('utils/chat.py').count('schedule_syncing_temporal_file_deletion(path)') == 3 + + def test_storage_defines_scheduler_with_480_default(self): + src = _read_source('utils/other/storage.py') + assert 'SYNCING_TEMPORAL_DELETE_DELAY_SECONDS = 480' in src + assert 'def schedule_syncing_temporal_file_deletion' in src + + def test_precache_sem_restored(self): + # 4 → 2 was load-shedding while the pool was full of sleepers (#7526) + assert '_PRECACHE_FILE_SEM = threading.BoundedSemaphore(4)' in _read_source('utils/other/storage.py') From 3b5846980f7354f5c5574036302b4a7162cb4a47 Mon Sep 17 00:00:00 2001 From: Mohammed Mohsin Date: Fri, 12 Jun 2026 15:41:55 +0530 Subject: [PATCH 06/10] test(storage): precache semaphore assertion 2 -> 4 --- backend/tests/unit/test_storage_fanout_limits.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/backend/tests/unit/test_storage_fanout_limits.py b/backend/tests/unit/test_storage_fanout_limits.py index 6d39f11d52..536553289f 100644 --- a/backend/tests/unit/test_storage_fanout_limits.py +++ b/backend/tests/unit/test_storage_fanout_limits.py @@ -90,10 +90,14 @@ def test_precache_file_semaphore_exists(self): src = _read_source('utils/other/storage.py') assert '_PRECACHE_FILE_SEM' in src - def test_precache_file_semaphore_is_2(self): - """Global precache file semaphore must be BoundedSemaphore(2).""" + def test_precache_file_semaphore_is_4(self): + """Global precache file semaphore must be BoundedSemaphore(4). + + Was 2 while storage_executor was saturated by per-file sleep(480) + deletion timers; restored to 4 once the janitor thread took those + over (see test_deferred_blob_janitor.py).""" src = _read_source('utils/other/storage.py') - assert 'BoundedSemaphore(2)' in src + assert '_PRECACHE_FILE_SEM = threading.BoundedSemaphore(4)' in src def test_precache_conversation_audio_uses_semaphore(self): """precache_conversation_audio must gate submissions with _PRECACHE_FILE_SEM.""" From 91c70e669c8ad8ecdb464e32df88e5934bd8b693 Mon Sep 17 00:00:00 2001 From: Mohammed Mohsin Date: Fri, 12 Jun 2026 15:41:55 +0530 Subject: [PATCH 07/10] test(sync): drop executor-swap machinery obsoleted by the deletion janitor --- .../tests/unit/test_sync_silent_failure.py | 25 ++++--------------- 1 file changed, 5 insertions(+), 20 deletions(-) diff --git a/backend/tests/unit/test_sync_silent_failure.py b/backend/tests/unit/test_sync_silent_failure.py index 3fe9af73b4..06f117cf4b 100644 --- a/backend/tests/unit/test_sync_silent_failure.py +++ b/backend/tests/unit/test_sync_silent_failure.py @@ -655,6 +655,7 @@ def setup_class(cls): sys.modules['pydub'].AudioSegment = MagicMock() sys.modules['utils.other.endpoints'].get_current_user_uid = MagicMock() sys.modules['utils.other.storage'].get_syncing_file_temporal_signed_url = MagicMock(return_value='https://fake') + sys.modules['utils.other.storage'].schedule_syncing_temporal_file_deletion = MagicMock() sys.modules['utils.other.storage'].delete_syncing_temporal_file = MagicMock() sys.modules['utils.other.storage'].download_audio_chunks_and_merge = MagicMock() sys.modules['utils.other.storage'].get_or_create_merged_audio = MagicMock() @@ -1050,9 +1051,6 @@ class TestVoiceMessageRuntimeErrorHandling: """Tests that voice message functions in utils/chat.py handle RuntimeError from prerecorded.""" _saved_modules = {} - _storage_executor = None - _storage_executor_had_submit = False - _storage_executor_submit = None _transcribe_fn = None _process_fn = None _process_stream_fn = None @@ -1070,6 +1068,7 @@ def setup_class(cls): sys.modules['fal_client'].submit = MagicMock() sys.modules['utils.other.endpoints'].timeit = lambda f: f sys.modules['utils.other.storage'].get_syncing_file_temporal_signed_url = MagicMock(return_value='https://fake') + sys.modules['utils.other.storage'].schedule_syncing_temporal_file_deletion = MagicMock() sys.modules['utils.other.storage'].delete_syncing_temporal_file = MagicMock() sys.modules['utils.notifications'].send_notification = MagicMock() sys.modules['utils.retrieval.graph'].execute_graph_chat = MagicMock() @@ -1116,26 +1115,12 @@ def setup_class(cls): ) import utils.chat as chat_mod - cls._storage_executor = chat_mod.storage_executor - cls._storage_executor_had_submit = 'submit' in getattr(chat_mod.storage_executor, '__dict__', {}) - cls._storage_executor_submit = chat_mod.storage_executor.submit - chat_mod.storage_executor.submit = MagicMock() - cls._transcribe_fn = staticmethod(transcribe_voice_message_segment) cls._process_fn = staticmethod(process_voice_message_segment) cls._process_stream_fn = staticmethod(process_voice_message_segment_stream) @classmethod def teardown_class(cls): - if cls._storage_executor is not None: - if cls._storage_executor_had_submit: - cls._storage_executor.submit = cls._storage_executor_submit - elif 'submit' in getattr(cls._storage_executor, '__dict__', {}): - delattr(cls._storage_executor, 'submit') - cls._storage_executor = None - cls._storage_executor_submit = None - cls._storage_executor_had_submit = False - sys.modules.pop('utils.chat', None) for name, orig in cls._saved_modules.items(): if orig is None: @@ -1149,7 +1134,7 @@ def test_transcribe_voice_message_handles_runtime_error(self): with patch( 'utils.chat.prerecorded', side_effect=RuntimeError('Deepgram transcription failed after 2 attempts: timeout'), - ), patch('utils.chat.time.sleep'): + ): result = self._transcribe_fn('/tmp/test.wav', 'uid', language='en') assert result == (None, 'en'), f"Expected (None, 'en'), got {result}" @@ -1159,7 +1144,7 @@ def test_process_voice_message_handles_runtime_error(self): with patch( 'utils.chat.prerecorded', side_effect=RuntimeError('Deepgram transcription failed after 2 attempts: timeout'), - ), patch('utils.chat.time.sleep'): + ): result = self._process_fn('/tmp/test.wav', 'uid', language='en') assert result == [], f"Expected [], got {result}" @@ -1173,7 +1158,7 @@ async def run(): with patch( 'utils.chat.prerecorded', side_effect=RuntimeError('Deepgram transcription failed after 2 attempts: timeout'), - ), patch('utils.chat.time.sleep'): + ): async for chunk in self._process_stream_fn('/tmp/test.wav', 'uid', language='en'): chunks.append(chunk) return chunks From 7121d754f7a3a089397be99c30dd9b35cc73f6d7 Mon Sep 17 00:00:00 2001 From: Mohammed Mohsin Date: Fri, 12 Jun 2026 15:41:56 +0530 Subject: [PATCH 08/10] test(backend): register test_deferred_blob_janitor in test.sh --- backend/test.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/backend/test.sh b/backend/test.sh index cf12aea3d9..d79e8e834f 100755 --- a/backend/test.sh +++ b/backend/test.sh @@ -65,6 +65,7 @@ pytest tests/unit/test_storage_upload_audio_chunk_data_protection.py -v pytest tests/unit/test_storage_opus_encoding.py -v pytest tests/unit/test_speech_profile_existence.py -v pytest tests/unit/test_storage_fanout_limits.py -v +pytest tests/unit/test_deferred_blob_janitor.py -v pytest tests/unit/test_people_conversations_500s.py -v pytest tests/unit/test_firestore_read_ops_cache.py -v pytest tests/unit/test_ws_auth_handshake.py -v From d0de8a3751755f4393406b321fb7afaa672b0c41 Mon Sep 17 00:00:00 2001 From: Mohammed Mohsin Date: Fri, 12 Jun 2026 20:11:24 +0530 Subject: [PATCH 09/10] fix(backend): restart janitor thread if killed by a BaseException Greptile review: a MemoryError/SystemExit escaping the except-Exception catch would leave _thread pointing at a dead thread, silently piling up schedules for the process lifetime. is_alive() guard self-heals. --- backend/utils/other/deferred_delete.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/backend/utils/other/deferred_delete.py b/backend/utils/other/deferred_delete.py index 3cdcec2364..cb0240d0f8 100644 --- a/backend/utils/other/deferred_delete.py +++ b/backend/utils/other/deferred_delete.py @@ -34,7 +34,10 @@ def schedule(self, path: str, delay_seconds: float) -> None: with self._cond: self._seq += 1 heapq.heappush(self._heap, (time.monotonic() + delay_seconds, self._seq, path)) - if self._thread is None: + # is_alive() guard: restart the janitor if a BaseException + # (MemoryError, SystemExit) ever killed it — otherwise schedules + # would pile up silently for the rest of the process lifetime + if self._thread is None or not self._thread.is_alive(): self._thread = threading.Thread(target=self._run, name=self._name, daemon=True) self._thread.start() self._cond.notify() From 814428f5a505e23e253961baedd55c0098427a36 Mon Sep 17 00:00:00 2001 From: Mohammed Mohsin Date: Fri, 12 Jun 2026 20:11:24 +0530 Subject: [PATCH 10/10] test(backend): janitor restarts after BaseException kills the thread --- .../tests/unit/test_deferred_blob_janitor.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/backend/tests/unit/test_deferred_blob_janitor.py b/backend/tests/unit/test_deferred_blob_janitor.py index 3cb2727378..b95a4aa2c2 100644 --- a/backend/tests/unit/test_deferred_blob_janitor.py +++ b/backend/tests/unit/test_deferred_blob_janitor.py @@ -88,6 +88,24 @@ def test_single_thread_reused_across_schedules(self): assert d._thread is first_thread assert _wait_for(lambda: d.pending_count() == 0) + def test_janitor_restarts_if_killed_by_base_exception(self): + """SystemExit/MemoryError bypass the except-Exception catch and kill the + thread; the next schedule() must notice and start a fresh janitor.""" + deleted = [] + + def lethal_then_ok(path): + if path == 'lethal.wav': + raise SystemExit # BaseException: terminates the janitor thread + deleted.append(path) + + d = DeferredDeleter(lethal_then_ok) + d.schedule('lethal.wav', 0.02) + dead_thread = d._thread + assert _wait_for(lambda: not dead_thread.is_alive()) + d.schedule('after-death.wav', 0.02) + assert d._thread is not dead_thread, 'schedule() must replace a dead janitor' + assert _wait_for(lambda: deleted == ['after-death.wav']) + def test_many_pending_use_one_thread(self): before = threading.active_count() d = DeferredDeleter(lambda path: None)