Skip to content

Introduce a process-wide singleton engine for .collect(engine="gpu")#22410

Open
madsbk wants to merge 9 commits intorapidsai:mainfrom
madsbk:default_gpu_engine
Open

Introduce a process-wide singleton engine for .collect(engine="gpu")#22410
madsbk wants to merge 9 commits intorapidsai:mainfrom
madsbk:default_gpu_engine

Conversation

@madsbk
Copy link
Copy Markdown
Member

@madsbk madsbk commented May 7, 2026

lf.collect(engine="gpu") and pl.GPUEngine(executor="streaming") using the default cluster now route through a new process-wide DefaultSingletonEngine instead of constructing a fresh rapidsmpf Context, RMM adaptor, and Python executor for every query. Bootstrap now happens once per process rather than once per query.

DefaultSingletonEngine is a process-wide single-GPU singleton specialization of SPMDEngine: at most one live instance exists per process, it always uses a single-rank communicator plus default environment-derived settings, and repeated calls reuse the same engine instance until explicit shutdown.

The default cluster enum value is renamed from Cluster.SINGLE to Cluster.DEFAULT_SINGLETON so the dispatch token better reflects the actual behavior.

This PR also removes the dead inline-context fallback in evaluate_pipeline, which was the original "single" execution path.

@madsbk madsbk self-assigned this May 7, 2026
@madsbk madsbk added improvement Improvement / enhancement to an existing function non-breaking Non-breaking change labels May 7, 2026
@github-actions github-actions Bot added Python Affects Python cuDF API. cudf-polars Issues specific to cudf-polars labels May 7, 2026
@GPUtester GPUtester moved this to In Progress in cuDF Python May 7, 2026
@madsbk madsbk force-pushed the default_gpu_engine branch 9 times, most recently from 7e6beeb to 0fb9fe8 Compare May 9, 2026 08:07
Because each call forks a new child, process-wide side-effects
(the ``_bind_done`` flag, CPU affinity, environment variables) never
leak between tests or back into the pytest process.
def _run_in_subprocess(target: Callable[[], None]) -> None:
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR exposed some issues with the "fork" approach, so we now use "spawn" instead. Otherwise, the tests remain the same.

Comment on lines +115 to +122
chunk: TableChunk
if chunks:
chunk = await evaluate_batch(chunks, context, ir, ir_context=ir_context)
else:
# This rank received no input partitions. Produce an empty chunk
# with the IR's output schema so the AllGather below still has
# something to insert (and other ranks don't deadlock waiting).
chunk = empty_table_chunk(ir, context, ir_context.get_cuda_stream())
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fixes a bug exposed by this PR: a multi-rank top-k crashes when a rank receives zero input partitions. An empty chunks list flows into evaluate_batch([], …)concat_batch([], …)_concat()pylibcudf.concatenate.concatenate([]), which raises ValueError: input list may not be empty.

Why now: this branch widens the explain_engine fixture in test_explain.py from a hand-rolled pl.GPUEngine(executor="streaming") (which used cluster="single") to streaming_engine_factory, which now parametrizes over [spmd, spmd-small, dask, ray].

The fixture switch was needed so the test would not conflict with the session-scoped streaming engines introduced by the new active-engine guard. The expanded matrix is what first exercises a multi-worker top-k where one rank legitimately receives no input partitions, exposing the latent bug.

Fix: when chunks is empty, construct an empty TableChunk for the rank using the existing empty_table_chunk(ir, context, stream) helper and feed that into the AllGather. The non-empty ranks still dominate the merged top-k result, and no rank deadlocks waiting for a message from the empty rank.

executor="streaming",
executor_options={"max_rows_per_partition": 1_000},
def test_join_in_memory_lazy_stable_id_pickle(streaming_engine_factory):
engine = streaming_engine_factory(
Copy link
Copy Markdown
Member Author

@madsbk madsbk May 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can no longer use the default streaming engine in the tests, so I refactored some of them, including this one, to use either the streaming engines or the in-memory engine instead. Otherwise, the tests remain the same.

@madsbk madsbk force-pushed the default_gpu_engine branch from 0fb9fe8 to e7fe81a Compare May 9, 2026 08:17
@madsbk madsbk force-pushed the default_gpu_engine branch from e7fe81a to 34498a9 Compare May 9, 2026 08:17
@madsbk madsbk added breaking Breaking change and removed non-breaking Non-breaking change labels May 9, 2026
@madsbk madsbk force-pushed the default_gpu_engine branch from 770331a to f2fa352 Compare May 9, 2026 12:28
@madsbk madsbk marked this pull request as ready for review May 9, 2026 13:59
@madsbk madsbk requested a review from a team as a code owner May 9, 2026 13:59
@madsbk madsbk requested a review from mroeschke May 9, 2026 13:59
@rapidsai rapidsai deleted a comment from copy-pr-bot Bot May 9, 2026
coderabbitai[bot]

This comment was marked as outdated.

@rapidsai rapidsai deleted a comment from coderabbitai Bot May 9, 2026
@rapidsai rapidsai deleted a comment from coderabbitai Bot May 9, 2026
coderabbitai[bot]

This comment was marked as outdated.

@rapidsai rapidsai deleted a comment from coderabbitai Bot May 9, 2026
coderabbitai[bot]

This comment was marked as outdated.

coderabbitai[bot]

This comment was marked as outdated.

@madsbk madsbk force-pushed the default_gpu_engine branch from eb2e155 to 792f8f8 Compare May 10, 2026 07:50
@rapidsai rapidsai deleted a comment from coderabbitai Bot May 10, 2026
@rapidsai rapidsai deleted a comment from coderabbitai Bot May 10, 2026
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 10, 2026

Review Change Stack

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • ✅ Review completed - (🔄 Check again to review again)
📝 Walkthrough

Walkthrough

This PR introduces DefaultSingletonEngine, a process-wide single-GPU executor with dedicated worker-thread lifecycle. It refactors the RapidsMPF pipeline to accept explicit contexts, replaces Cluster.SINGLE with Cluster.DEFAULT_SINGLETON, adds coexistence guards preventing engine conflicts, fixes streaming sort/sink edge cases, propagates query IDs through execution paths, and migrates tests to use streaming_engine_factory and in-memory executors.

Changes

Default Singleton GPU Engine

Layer / File(s) Summary
Configuration & Enum Updates
python/cudf_polars/cudf_polars/utils/config.py
Cluster.SINGLE replaced with Cluster.DEFAULT_SINGLETON; StreamingExecutor.__post_init__ defaults cluster to DEFAULT_SINGLETON; docstrings updated for cluster and sink_to_directory parameters.
DefaultSingletonEngine Implementation
python/cudf_polars/cudf_polars/experimental/rapidsmpf/frontend/default_singleton_engine.py
New DefaultSingletonEngine class with daemon worker-thread lifecycle; _DaemonWorker coordinates startup/shutdown; _set_future propagates results/exceptions; _build_engine creates communicator on worker thread; _teardown_engine performs cleanup; create_or_get() lazy-instantiates singleton; idempotent shutdown() with timeout fallback; atexit registration.
Engine Registry & Coexistence Guards
python/cudf_polars/cudf_polars/experimental/rapidsmpf/frontend/core.py, frontend/spmd.py, frontend/dask.py, frontend/ray.py
StreamingEngine tracks live instances via WeakSet with threading.Lock; _active_engine_count() class method; check_no_live_default_singleton() guard prevents non-singleton construction when singleton is active; guard called at start of SPMDEngine, DaskEngine, RayEngine constructors; shutdown() unregisters instances.
Query ID Propagation
python/cudf_polars/cudf_polars/experimental/rapidsmpf/frontend/core.py, frontend/dask.py, frontend/ray.py
execute_ir_on_rank() and evaluate_on_rank() accept query_id: uuid.UUID parameter; IRExecutionContext includes query_id; Ray's RankActor.evaluate_polars_ir() and Dask's _worker_evaluate() propagate query_id through worker execution.
Execution Pipeline Refactoring
python/cudf_polars/cudf_polars/experimental/rapidsmpf/core.py
evaluate_logical_plan adds default_singleton cluster mode routing to DefaultSingletonEngine.create_or_get(); evaluate_pipeline() now requires explicit rmpf_context: Context parameter; removes internal RapidsMPF/RMM lifecycle branching; always concatenates chunks or returns empty DataFrame with correct schema when no partitions arrive.
Sort & Sink Edge Case Fixes
python/cudf_polars/cudf_polars/experimental/rapidsmpf/collectives/sort.py, experimental/io.py
_simple_top_or_bottom_k conditionally produces empty TableChunk when no input partitions arrive; Sink lowering treats directory as existing only when cluster is DEFAULT_SINGLETON (previously SINGLE).
SPMDEngine Python Executor Property
python/cudf_polars/cudf_polars/experimental/rapidsmpf/frontend/spmd.py
Add public py_executor property returning active executor or raising RuntimeError after shutdown; make _py_executor nullable; register shutdown with wait=True; _reset and shutdown() use property and clear state correctly.
Test Infrastructure Cleanup
python/cudf_polars/cudf_polars/testing/asserts.py, tests/conftest.py
Remove DEFAULT_CLUSTER constant; simplify get_default_engine() to construct GPUEngine without executor_options; remove --cluster pytest option and DEFAULT_CLUSTER wiring from conftest.
Subprocess Test Harness Refactoring
python/cudf_polars/tests/experimental/test_bind_to_gpu.py
Refactor from fork to spawn multiprocessing context; add _wrapper() function for exception serialization via pipe; convert test bodies to picklable module-level _body_* callables; update _run_in_subprocess() to use spawn and pipe communication.
DefaultSingletonEngine Test Suite
python/cudf_polars/tests/experimental/test_default_singleton_engine.py
New subprocess-isolated test module covering singleton lifecycle, reuse, context-manager shutdown, default-path routing from pl.GPUEngine("streaming"), concurrent create_or_get() thread safety, blocking/coexistence rules with explicit engines, worker-thread guarantees, failure/retry, and timeout-handling with warning and slot clearing.
Streaming Engine Factory Test Updates
python/cudf_polars/tests/experimental/test_*.py
Tests updated to construct streaming engines via streaming_engine_factory fixture and StreamingOptions instead of direct pl.GPUEngine(..., executor="streaming"); adjust partition/fallback mode configurations; remove module-level fixture scoping where appropriate.
In-Memory Executor Test Adjustments
python/cudf_polars/tests/test_*.py, experimental/test_*.py
Numerous tests explicitly construct pl.GPUEngine(executor="in-memory") for GPU collection to avoid conflicts with active streaming sessions; test assertions updated to expect "default_singleton" cluster string instead of "single".
Executor-Specific Test Refactoring
python/cudf_polars/tests/test_executors.py
test_executor_basics and test_dask_experimental_map_function_get_hashable refactored to use streaming_engine_factory instead of executor parametrization; remove dask skip logic; SPDX header updated to 2024–2026.
RapidsMPF Dispatch Table Initialization
python/cudf_polars/cudf_polars/experimental/rapidsmpf/__init__.py
Add side-effect imports of RapidsMPF submodules (shuffle, sort, groupby, io, join, repartition, union) at package import time to populate @generate_ir_sub_network.register(...) dispatch table before query evaluation.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Suggested reviewers

  • mroeschke
  • bdice
  • pentschev
  • TomAugspurger
🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 47.79% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The pull request title accurately and concisely describes the main change: introducing a process-wide singleton engine for the default GPU collection path.
Description check ✅ Passed The pull request description is directly related to the changeset, explaining the motivation, implementation approach, and specific changes (DefaultSingletonEngine, cluster enum rename, pipeline refactor).
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Comment @coderabbitai help to get the list of available commands and usage tips.

coderabbitai[bot]

This comment was marked as outdated.

coderabbitai[bot]

This comment was marked as outdated.

coderabbitai[bot]

This comment was marked as outdated.

coderabbitai[bot]

This comment was marked as off-topic.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

breaking Breaking change cudf-polars Issues specific to cudf-polars improvement Improvement / enhancement to an existing function Python Affects Python cuDF API.

Projects

Status: In Progress

Development

Successfully merging this pull request may close these issues.

2 participants