diff --git a/python/cudf_polars/cudf_polars/experimental/io.py b/python/cudf_polars/cudf_polars/experimental/io.py index 2cea0274ee6..c5871ef4efd 100644 --- a/python/cudf_polars/cudf_polars/experimental/io.py +++ b/python/cudf_polars/cudf_polars/experimental/io.py @@ -304,7 +304,7 @@ def _( if ( Path(ir.path).exists() and executor_options.sink_to_directory - and executor_options.cluster == Cluster.SINGLE + and executor_options.cluster == Cluster.DEFAULT_SINGLETON ): # This lowering-time check can't be performed with the spmd / ray / dask # clusters, which lower on each worker independently. There's a race condition diff --git a/python/cudf_polars/cudf_polars/experimental/rapidsmpf/__init__.py b/python/cudf_polars/cudf_polars/experimental/rapidsmpf/__init__.py index 09fd0ed9693..2ea9af50c1b 100644 --- a/python/cudf_polars/cudf_polars/experimental/rapidsmpf/__init__.py +++ b/python/cudf_polars/cudf_polars/experimental/rapidsmpf/__init__.py @@ -1,8 +1,19 @@ -# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. # SPDX-License-Identifier: Apache-2.0 """RapidsMPF streaming-engine support.""" from __future__ import annotations +# Side-effect imports: each module registers +# ``@generate_ir_sub_network.register(...)`` handlers at import time so the +# dispatch table is populated before any query is evaluated. +import cudf_polars.experimental.rapidsmpf.collectives.shuffle +import cudf_polars.experimental.rapidsmpf.collectives.sort +import cudf_polars.experimental.rapidsmpf.groupby +import cudf_polars.experimental.rapidsmpf.io +import cudf_polars.experimental.rapidsmpf.join +import cudf_polars.experimental.rapidsmpf.repartition +import cudf_polars.experimental.rapidsmpf.union # noqa: F401 + __all__: list[str] = [] diff --git a/python/cudf_polars/cudf_polars/experimental/rapidsmpf/collectives/sort.py b/python/cudf_polars/cudf_polars/experimental/rapidsmpf/collectives/sort.py index a950df3ce34..20faf0a2b03 100644 --- a/python/cudf_polars/cudf_polars/experimental/rapidsmpf/collectives/sort.py +++ b/python/cudf_polars/cudf_polars/experimental/rapidsmpf/collectives/sort.py @@ -112,7 +112,14 @@ async def _simple_top_or_bottom_k( ir_context=ir_context, ) ) - chunk: TableChunk = await evaluate_batch(chunks, context, ir, ir_context=ir_context) + chunk: TableChunk + if chunks: + chunk = await evaluate_batch(chunks, context, ir, ir_context=ir_context) + else: + # This rank received no input partitions. Produce an empty chunk + # with the IR's output schema so the AllGather below still has + # something to insert (and other ranks don't deadlock waiting). + chunk = empty_table_chunk(ir, context, ir_context.get_cuda_stream()) chunks.clear() if comm.nranks > 1 and not metadata_in.duplicated: diff --git a/python/cudf_polars/cudf_polars/experimental/rapidsmpf/core.py b/python/cudf_polars/cudf_polars/experimental/rapidsmpf/core.py index 97168f0b02d..539ff10e7b6 100644 --- a/python/cudf_polars/cudf_polars/experimental/rapidsmpf/core.py +++ b/python/cudf_polars/cudf_polars/experimental/rapidsmpf/core.py @@ -4,70 +4,41 @@ from __future__ import annotations -import contextlib +import dataclasses import uuid from collections import defaultdict -from concurrent.futures import ThreadPoolExecutor from typing import TYPE_CHECKING, Any -from rapidsmpf.communicator.single import ( - new_communicator as single_process_communicator, -) -from rapidsmpf.config import Options, get_environment_variables -from rapidsmpf.memory.buffer import MemoryType -from rapidsmpf.memory.buffer_resource import BufferResource, LimitAvailableMemory -from rapidsmpf.memory.pinned_memory_resource import PinnedMemoryResource -from rapidsmpf.progress_thread import ProgressThread -from rapidsmpf.rmm_resource_adaptor import RmmResourceAdaptor -from rapidsmpf.streaming.core.actor import ( - run_actor_network, -) -from rapidsmpf.streaming.core.context import Context from rapidsmpf.streaming.core.leaf_actor import pull_from_channel -from rapidsmpf.streaming.cudf.table_chunk import TableChunk - -import pylibcudf as plc -import rmm - -import cudf_polars.experimental.rapidsmpf.collectives.shuffle -import cudf_polars.experimental.rapidsmpf.collectives.sort -import cudf_polars.experimental.rapidsmpf.groupby -import cudf_polars.experimental.rapidsmpf.io -import cudf_polars.experimental.rapidsmpf.join -import cudf_polars.experimental.rapidsmpf.repartition -import cudf_polars.experimental.rapidsmpf.union -from cudf_polars.containers import DataFrame + +import cudf_polars.dsl.tracing from cudf_polars.dsl.ir import ( DataFrameScan, - IRExecutionContext, Join, Scan, Union, ) from cudf_polars.dsl.traversal import CachingVisitor, traversal -from cudf_polars.experimental.parallel import lower_ir_graph -from cudf_polars.experimental.rapidsmpf.collectives import ReserveOpIDs from cudf_polars.experimental.rapidsmpf.dispatch import FanoutInfo from cudf_polars.experimental.rapidsmpf.nodes import ( generate_ir_sub_network_wrapper, metadata_drain_node, ) -from cudf_polars.experimental.rapidsmpf.tracing import log_query_plan -from cudf_polars.experimental.rapidsmpf.utils import empty_table_chunk -from cudf_polars.experimental.statistics import collect_statistics -from cudf_polars.utils.config import CUDAStreamPoolConfig +from cudf_polars.utils.config import SPMDContext if TYPE_CHECKING: from collections.abc import MutableMapping from rapidsmpf.communicator.communicator import Communicator from rapidsmpf.streaming.core.channel import Channel + from rapidsmpf.streaming.core.context import Context from rapidsmpf.streaming.core.leaf_actor import DeferredMessages from rapidsmpf.streaming.cudf.channel_metadata import ChannelMetadata + from rapidsmpf.streaming.cudf.table_chunk import TableChunk import polars as pl - from cudf_polars.dsl.ir import IR + from cudf_polars.dsl.ir import IR, IRExecutionContext from cudf_polars.experimental.base import PartitionInfo, StatsCollector from cudf_polars.experimental.parallel import ConfigOptions from cudf_polars.experimental.rapidsmpf.dispatch import ( @@ -99,13 +70,32 @@ def evaluate_logical_plan( ------- The output DataFrame and metadata collector. """ - query_id = uuid.uuid4() + # For default_singleton, inject the process-wide DefaultSingletonEngine instance + # into config_options before treating it as a regular SPMDEngine. + if config_options.executor.cluster == "default_singleton": + from cudf_polars.experimental.rapidsmpf.frontend.default_singleton_engine import ( + DefaultSingletonEngine, + ) + + engine = DefaultSingletonEngine.get_or_create() + config_options = dataclasses.replace( + config_options, + executor=dataclasses.replace( + config_options.executor, + spmd_context=SPMDContext( + comm=engine.comm, + context=engine.context, + py_executor=engine.py_executor, + ), + ), + ) + query_id = uuid.uuid4() with cudf_polars.dsl.tracing.bound_contextvars( cudf_polars_query_id=str(query_id), ): match config_options.executor.cluster: - case "spmd": + case "spmd" | "default_singleton": from cudf_polars.experimental.rapidsmpf.frontend.spmd import ( evaluate_pipeline_spmd_mode, ) @@ -138,216 +128,12 @@ def evaluate_logical_plan( collect_metadata=collect_metadata, query_id=query_id, ) - case "single": - # Single-process execution: lower and run locally. - stats = collect_statistics(ir, config_options) - ir, partition_info = lower_ir_graph(ir, config_options, stats) - with ReserveOpIDs(ir, config_options) as collective_id_map: - log_query_plan(ir, config_options) - result, metadata_collector = evaluate_pipeline( - ir, - partition_info, - config_options, - stats, - collective_id_map, - single_process_communicator(Options(), ProgressThread()), - collect_metadata=collect_metadata, - query_id=query_id, - ) case other: raise ValueError(f"Unknown cluster mode: {other}") return result, metadata_collector -def evaluate_pipeline( - ir: IR, - partition_info: MutableMapping[IR, PartitionInfo], - config_options: ConfigOptions[StreamingExecutor], - stats: StatsCollector, - collective_id_map: dict[IR, list[int]], - comm: Communicator, - rmpf_context: Context | None = None, - *, - collect_metadata: bool = False, - query_id: uuid.UUID, -) -> tuple[pl.DataFrame, list[ChannelMetadata] | None]: - """ - Build and evaluate a RapidsMPF streaming pipeline. - - Parameters - ---------- - ir - The IR node. - partition_info - The partition information. - config_options - The configuration options. - stats - The statistics collector. - collective_id_map - The mapping of IR nodes to lists of collective IDs. - comm - The communicator describing the participating processes. - rmpf_context - The RapidsMPF context. - collect_metadata - Whether to collect runtime metadata. - query_id - A unique identifier for the query. - - Returns - ------- - The output DataFrame and metadata collector. - """ - _original_mr: Any = None - use_stream_pool = False - if rmpf_context is not None: - # Using "distributed" mode. - # Always use the RapidsMPF stream pool for now. - br = rmpf_context.br() - use_stream_pool = True - rmpf_context_manager = contextlib.nullcontext(rmpf_context) - else: - # Using "single" mode. - # Create a new local RapidsMPF context. - _original_mr = rmm.mr.get_current_device_resource() - mr = RmmResourceAdaptor(_original_mr) - rmm.mr.set_current_device_resource(mr) - memory_available: MutableMapping[MemoryType, LimitAvailableMemory] | None = None - single_spill_device = config_options.executor.client_device_threshold - if single_spill_device > 0.0 and single_spill_device < 1.0: - total_memory = rmm.mr.available_device_memory()[1] - memory_available = { - MemoryType.DEVICE: LimitAvailableMemory( - mr, limit=int(total_memory * single_spill_device) - ) - } - - options = Options( - { - # By default, set the number of streaming threads to the max - # number of IO threads. The user may override this with an - # environment variable (i.e. RAPIDSMPF_NUM_STREAMING_THREADS) - "num_streaming_threads": str( - max(config_options.executor.max_io_threads, 1) - ) - } - | get_environment_variables() - ) - pinned_mr = ( - PinnedMemoryResource.make_if_available() - if config_options.executor.spill_to_pinned_memory - else None - ) - stream_pool = ( - config_options.cuda_stream_policy.build() - if isinstance(config_options.cuda_stream_policy, CUDAStreamPoolConfig) - else None - ) - use_stream_pool = stream_pool is not None - br = BufferResource( - mr, - pinned_mr=pinned_mr, - memory_available=memory_available, - stream_pool=stream_pool, - ) - rmpf_context_manager = Context(comm.logger, br, options) - - with rmpf_context_manager as rmpf_context: - # Create the IR execution context - if use_stream_pool: - ir_context = IRExecutionContext( - get_cuda_stream=rmpf_context.get_stream_from_pool, query_id=query_id - ) - else: - ir_context = IRExecutionContext(query_id=query_id) - - # Generate network nodes - assert rmpf_context is not None, "RapidsMPF context must defined." - metadata_collector: list[ChannelMetadata] | None = ( - [] if collect_metadata else None - ) - nodes, output = generate_network( - rmpf_context, - comm, - ir, - partition_info, - config_options, - stats, - ir_context=ir_context, - collective_id_map=collective_id_map, - metadata_collector=metadata_collector, - ) - - try: - # Run the network - with ThreadPoolExecutor( - max_workers=config_options.executor.num_py_executors, - thread_name_prefix="cpse", - ) as executor: - run_actor_network(actors=nodes, py_executor=executor) - - # Extract/return the concatenated result. - # Keep chunks alive until after concatenation to prevent - # use-after-free with stream-ordered allocations - messages = output.release() - chunks = [ - TableChunk.from_message(msg, br=br).make_available_and_spill( - br, allow_overbooking=True - ) - for msg in messages - ] - dfs: list[DataFrame] = [] - if chunks: - col_names = list(ir.schema.keys()) - col_dtypes = list(ir.schema.values()) - dfs = [ - DataFrame.from_table( - chunk.table_view(), col_names, col_dtypes, chunk.stream - ) - for chunk in chunks - ] - if len(dfs) == 1: - df = dfs[0] - else: - with ir_context.stream_ordered_after(*dfs) as stream: - df = DataFrame.from_table( - plc.concatenate.concatenate( - [d.table for d in dfs], stream=stream - ), - col_names, - col_dtypes, - stream, - ) - else: - # No chunks received - create an empty DataFrame with correct schema - stream = ir_context.get_cuda_stream() - chunk = empty_table_chunk(ir, rmpf_context, stream) - df = DataFrame.from_table( - chunk.table_view(), - list(ir.schema.keys()), - list(ir.schema.values()), - stream, - ) - - result = df.to_polars() - - # Now we need to drop *all* GPU data. This ensures that no cudaFreeAsync runs - # before the Context, which ultimately contains the rmm MR, goes out of scope. - del messages, chunks, dfs, df - finally: - # Ensure these are dropped even if a node raises - # an exception in run_actor_network - del nodes, output - - # Restore the initial RMM memory resource - if _original_mr is not None: - rmm.mr.set_current_device_resource(_original_mr) - - return result, metadata_collector - - def determine_fanout_nodes( ir: IR, partition_info: MutableMapping[IR, PartitionInfo], diff --git a/python/cudf_polars/cudf_polars/experimental/rapidsmpf/frontend/core.py b/python/cudf_polars/cudf_polars/experimental/rapidsmpf/frontend/core.py index 26ad95198f6..1c59e207ba5 100644 --- a/python/cudf_polars/cudf_polars/experimental/rapidsmpf/frontend/core.py +++ b/python/cudf_polars/cudf_polars/experimental/rapidsmpf/frontend/core.py @@ -9,7 +9,9 @@ import json import os import socket -from typing import TYPE_CHECKING, Any, Self, TypeVar +import threading +import weakref +from typing import TYPE_CHECKING, Any, ClassVar, Self, TypeVar import cuda.core from rapidsmpf.coll import AllGather @@ -34,6 +36,7 @@ from cudf_polars.experimental.utils import _concat if TYPE_CHECKING: + import uuid from collections.abc import Callable, MutableMapping from concurrent.futures import ThreadPoolExecutor @@ -150,6 +153,12 @@ class StreamingEngine(pl.GPUEngine): when :meth:`shutdown` is called. If ``None``, an empty stack is created. """ + # Process-wide registry of every live :class:`StreamingEngine`. Used by + # :class:`DefaultSingletonEngine` to enforce that no other engine is + # alive when the singleton is constructed. + _active_engines: ClassVar[weakref.WeakSet[StreamingEngine]] = weakref.WeakSet() + _active_engines_lock: ClassVar[threading.Lock] = threading.Lock() + def __init__( self, *, @@ -158,6 +167,13 @@ def __init__( engine_options: dict[str, Any], exit_stack: contextlib.ExitStack | None = None, ): + # Refuse to construct if a ``DefaultSingletonEngine`` is alive + # (no-op for the singleton itself). + from cudf_polars.experimental.rapidsmpf.frontend.default_singleton_engine import ( + check_no_live_default_singleton, + ) + + check_no_live_default_singleton(self) self._nranks = nranks self._exit_stack: contextlib.ExitStack | None = ( exit_stack or contextlib.ExitStack() @@ -178,6 +194,24 @@ def __init__( "Multiple ranks share the same GPU (UUID collision detected). " f"UUIDs: {uuids}. Set allow_gpu_sharing=True to allow this." ) + with StreamingEngine._active_engines_lock: + StreamingEngine._active_engines.add(self) + + @classmethod + def _active_engine_count(cls) -> int: + """ + Return the number of currently-live :class:`StreamingEngine` instances. + + "Live" means constructed and not yet shut down (or garbage collected). + The count is process-wide and shared across all subclasses. + + Returns + ------- + Number of live engines, including ``self`` if called on a live + instance. + """ + with StreamingEngine._active_engines_lock: + return len(StreamingEngine._active_engines) @property def nranks(self) -> int: @@ -311,6 +345,8 @@ def shutdown(self) -> None: self.device = None self.memory_resource = None self.config = {} + with StreamingEngine._active_engines_lock: + StreamingEngine._active_engines.discard(self) def __enter__(self) -> Self: """Enter the context manager, returning ``self``.""" @@ -351,6 +387,7 @@ def execute_ir_on_rank( collective_id_map: dict[IR, list[int]], *, collect_metadata: bool = False, + query_id: uuid.UUID, ) -> tuple[pl.DataFrame, list[ChannelMetadata] | None]: """ Execute a Polars IR query on a single rank's GPU. @@ -379,6 +416,8 @@ def execute_ir_on_rank( Mapping from IR nodes to their pre-allocated collective operation IDs. collect_metadata Whether to collect channel metadata during execution. + query_id + Unique identifier for the query, propagated into actor traces. Returns ------- @@ -388,7 +427,9 @@ def execute_ir_on_rank( Collected channel metadata if ``collect_metadata`` is ``True``, otherwise ``None``. """ - ir_context = IRExecutionContext(get_cuda_stream=ctx.get_stream_from_pool) + ir_context = IRExecutionContext( + get_cuda_stream=ctx.get_stream_from_pool, query_id=query_id + ) metadata_collector: list[ChannelMetadata] | None = [] if collect_metadata else None nodes, output = generate_network( @@ -566,6 +607,7 @@ def evaluate_on_rank( config_options: ConfigOptions[StreamingExecutor], *, collect_metadata: bool = False, + query_id: uuid.UUID, ) -> tuple[pl.DataFrame, list[ChannelMetadata] | None]: """ Evaluate a polars IR plan on a single rank. @@ -592,6 +634,8 @@ def evaluate_on_rank( Executor configuration forwarded from the client. collect_metadata Whether to collect channel metadata during execution. + query_id + Unique identifier for the query, propagated into actor traces. Returns ------- @@ -620,4 +664,5 @@ def evaluate_on_rank( stats, collective_id_map, collect_metadata=collect_metadata, + query_id=query_id, ) diff --git a/python/cudf_polars/cudf_polars/experimental/rapidsmpf/frontend/dask.py b/python/cudf_polars/cudf_polars/experimental/rapidsmpf/frontend/dask.py index b4300346132..ff9ab45cd05 100644 --- a/python/cudf_polars/cudf_polars/experimental/rapidsmpf/frontend/dask.py +++ b/python/cudf_polars/cudf_polars/experimental/rapidsmpf/frontend/dask.py @@ -4,7 +4,6 @@ from __future__ import annotations -import contextlib import dataclasses import functools import logging @@ -285,10 +284,15 @@ def _teardown_worker( if mp_ctx is not None: if mp_ctx.py_executor is not None: mp_ctx.py_executor.shutdown(wait=True, cancel_futures=True) - mp_ctx.ctx = None - mp_ctx.comm = None - mp_ctx.mr = None - with contextlib.suppress(AttributeError): + # Shut down the Context explicitly on the same thread that + # constructed it. + try: + if mp_ctx.ctx is not None: + mp_ctx.ctx.shutdown() + finally: + mp_ctx.ctx = None + mp_ctx.comm = None + mp_ctx.mr = None delattr(dask_worker, attr) @@ -373,6 +377,7 @@ def _worker_evaluate( *, uid: str, collect_metadata: bool = False, + query_id: uuid.UUID, dask_worker: distributed.Worker | None = None, ) -> tuple[pl.DataFrame, list[ChannelMetadata] | None]: """ @@ -393,6 +398,8 @@ def _worker_evaluate( per-worker context attribute. collect_metadata Whether to collect channel metadata. + query_id + Unique identifier for the query, propagated into actor traces. dask_worker Injected by ``distributed`` when called via :meth:`distributed.Client.run`. @@ -415,6 +422,7 @@ def _worker_evaluate( ir, config_options, collect_metadata=collect_metadata, + query_id=query_id, ) @@ -474,6 +482,7 @@ def evaluate_pipeline_dask_mode( ir, worker_config, collect_metadata=collect_metadata, + query_id=query_id, ) dfs: list[pl.DataFrame] = [] diff --git a/python/cudf_polars/cudf_polars/experimental/rapidsmpf/frontend/default_singleton_engine.py b/python/cudf_polars/cudf_polars/experimental/rapidsmpf/frontend/default_singleton_engine.py new file mode 100644 index 00000000000..e5433770427 --- /dev/null +++ b/python/cudf_polars/cudf_polars/experimental/rapidsmpf/frontend/default_singleton_engine.py @@ -0,0 +1,336 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. +# SPDX-License-Identifier: Apache-2.0 +"""Single-GPU, single-instance specialization of :class:`SPMDEngine`.""" + +from __future__ import annotations + +import atexit +import dataclasses +import threading +import warnings +from concurrent.futures import Future +from typing import TYPE_CHECKING, Any + +from rapidsmpf.communicator.single import ( + new_communicator as single_communicator, +) +from rapidsmpf.progress_thread import ProgressThread + +from cudf_polars.experimental.rapidsmpf.frontend.core import ( + resolve_rapidsmpf_options, +) +from cudf_polars.experimental.rapidsmpf.frontend.spmd import SPMDEngine + +if TYPE_CHECKING: + from collections.abc import Callable + +__all__ = ["DefaultSingletonEngine", "check_no_live_default_singleton"] + + +def _set_future(fut: Future[Any], fn: Callable[[], Any]) -> bool: + """ + Run ``fn`` and pipe its result or exception into ``fut``. + + Parameters + ---------- + fut + The future to populate with ``fn``'s outcome. + fn + Zero-argument callable to invoke. + + Returns + ------- + ``True`` if ``fn`` returned normally, ``False`` if it raised or + the future was already cancelled. + """ + if not fut.set_running_or_notify_cancel(): + return False + try: + fut.set_result(fn()) + except BaseException as exc: + fut.set_exception(exc) + return False + return True + + +class _DaemonWorker: + """ + Single-shot daemon thread that owns the live engine lifecycle. + + Builds :class:`DefaultSingletonEngine` once on the worker thread. + After construction completes, blocks until :meth:`shutdown` is + called, then tears down the live engine on the same thread and + exits. Both phases therefore run on the same thread, which rapidsmpf + requires. + + Notes + ----- + We deliberately do not use ``ThreadPoolExecutor`` here, even though + a single-worker pool would otherwise be a natural fit. + + The rapidsmpf ``Context`` must be torn down on the same thread that + constructed it; otherwise rapidsmpf calls ``std::terminate``. The + engine lifecycle therefore has to live on a single dedicated worker + thread. + + ``ThreadPoolExecutor`` registers workers in internal shutdown + machinery and installs a shutdown hook that runs before regular + ``atexit`` handlers. By the time our own teardown hook runs, + ``executor.submit`` already raises:: + + RuntimeError: cannot schedule new futures after shutdown + + That prevents us from scheduling teardown work onto the construction + thread. The rapidsmpf ``Context`` is then later finalized on the + wrong thread, which triggers ``std::terminate``. + + A daemon :class:`threading.Thread` avoids this problem because it + is not managed by the executor shutdown machinery. The thread stays + alive until late interpreter finalization, after all ``atexit`` + hooks have run, which gives our teardown hook a chance to enqueue + cleanup work onto the correct thread. + + Parameters + ---------- + name + Thread name passed to :class:`threading.Thread`. + + Attributes + ---------- + startup_future + Resolves with the constructed :class:`DefaultSingletonEngine`, + or with the exception raised during construction. + shutdown_future + Resolves once teardown completes, or with the exception raised + during teardown. + """ + + def __init__(self, name: str) -> None: + self._shutdown_signal = threading.Event() + self.startup_future: Future[DefaultSingletonEngine] = Future() + self.shutdown_future: Future[None] = Future() + self._thread = threading.Thread(target=self._run, name=name, daemon=True) + self._thread.start() + + def shutdown(self) -> Future[None]: + """ + Signal the worker to tear down the live engine and exit. + + Returns + ------- + Resolves once the worker has torn down the engine. + """ + self._shutdown_signal.set() + return self.shutdown_future + + def _run(self) -> None: + """Worker thread entry point: build engine, wait, tear it down.""" + if not _set_future(self.startup_future, _build_engine): + return + self._shutdown_signal.wait() + _set_future(self.shutdown_future, _teardown_engine) + + +def _build_engine() -> DefaultSingletonEngine: + """ + Construct the live default engine on the dedicated worker thread. + + Returns + ------- + The newly constructed instance, also assigned to ``_state.instance``. + """ + with _state.lock: + try: + comm = single_communicator( + progress_thread=ProgressThread(), + options=resolve_rapidsmpf_options(None), + ) + instance = DefaultSingletonEngine(comm=comm) + assert instance.nranks == 1 + except BaseException: + _state.worker = None + raise + _state.instance = instance + return instance + + +def _teardown_engine() -> None: + """Tear down ``_state.instance`` on the dedicated worker thread.""" + with _state.lock: + instance = _state.instance + _state.instance = None + _state.worker = None + if instance is not None: + SPMDEngine.shutdown(instance) + + +@dataclasses.dataclass +class _SingletonState: + """ + Module-level singleton bookkeeping. + + Attributes + ---------- + instance + The live :class:`DefaultSingletonEngine`, if one exists. + worker + Worker thread that owns the engine lifecycle. + lock + Protects mutations to :attr:`instance` and :attr:`worker`. + + NB: Code must not hold this lock across calls to + ``SPMDEngine.shutdown``. + """ + + instance: DefaultSingletonEngine | None = None + worker: _DaemonWorker | None = None + lock: threading.Lock = dataclasses.field(default_factory=threading.Lock) + + +_state = _SingletonState() +SHUTDOWN_TIMEOUT_SECONDS: float = 10.0 + + +def check_no_live_default_singleton(self_engine: Any) -> None: + """ + Raise if the default singleton engine is alive. + + Parameters + ---------- + self_engine + The engine instance being constructed. + + Raises + ------ + RuntimeError + If a :class:`DefaultSingletonEngine` is currently alive and + ``self_engine`` is not itself a :class:`DefaultSingletonEngine`. + """ + if isinstance(self_engine, DefaultSingletonEngine): + return + with _state.lock: + if _state.instance is not None: + raise RuntimeError( + f"Cannot construct {type(self_engine).__name__} while the " + 'default GPU engine (e.g. `.collect(engine="gpu")`) is ' + "active. While the default engine is in use, no explicit " + "streaming engines may exist. Shut down the default engine " + "first by calling `DefaultSingletonEngine.shutdown()`." + ) + + +class DefaultSingletonEngine(SPMDEngine): + """ + Process-wide single-GPU singleton specialization of :class:`SPMDEngine`. + + At most one live instance exists per process. Use :meth:`get_or_create` + to obtain it and :meth:`shutdown` to tear it down. + + Always constructs a single-rank communicator and uses default RapidsMPF, + executor, and engine settings from the environment. + + Users needing custom configuration should construct an engine explicitly. + See :class:`RayEngine`, :class:`DaskEngine`, and :class:`SPMDEngine`. + + Examples + -------- + Constructed automatically when using ``engine="gpu"``: + + >>> result = df.lazy().collect(engine="gpu") # doctest: +SKIP + + Or constructed explicitly: + + >>> engine = DefaultSingletonEngine.get_or_create() # doctest: +SKIP + >>> result = df.lazy().collect(engine=engine) # doctest: +SKIP + """ + + def __init__(self, **kwargs: Any) -> None: + super().__init__(**kwargs) + # super().__init__ registered self in _active_engines. If another engine + # is already alive, tear down this just-built rapidsmpf Context ON THIS + # THREAD before raising. Otherwise, GC of the partially constructed + # instance could destroy the Context off-thread, violating rapidsmpf's + # same-thread teardown invariant. + active_count = SPMDEngine._active_engine_count() + if active_count > 1: + SPMDEngine.shutdown(self) + raise RuntimeError( + f"Cannot start the default GPU engine (e.g. " + f'`.collect(engine="gpu")`) while {active_count - 1} ' + "explicit streaming engine(s) are alive. While " + "explicit engines are in use, the default engine " + "cannot also exist. Shut them down first or exit " + "their `with` blocks." + ) + + @classmethod + def get_or_create(cls) -> DefaultSingletonEngine: + """ + Return the live singleton, constructing one if needed. + + Construction runs on a dedicated worker thread so the rapidsmpf + ``Context`` is born on the same thread that will eventually tear + it down. + + Raises + ------ + RuntimeError + If any other :class:`StreamingEngine` is currently alive. + """ + with _state.lock: + if _state.instance is not None: + return _state.instance + if _state.worker is None: + _state.worker = _DaemonWorker(name="default-singleton-engine") + worker = _state.worker + return worker.startup_future.result() + + @staticmethod + def shutdown() -> None: + """ + Shut down the live default singleton, if any. Idempotent. + + Submits teardown to the dedicated worker thread, the same thread + that constructed the rapidsmpf ``Context``, and waits up to + :data:`SHUTDOWN_TIMEOUT_SECONDS`. + """ + with _state.lock: + instance = _state.instance + worker = _state.worker + if instance is None: + return + assert worker is not None + future = worker.shutdown() + try: + future.result(timeout=SHUTDOWN_TIMEOUT_SECONDS) + except TimeoutError: + pass + else: + # _teardown_engine already cleared _state.instance and _state.worker. + return + + # Timeout fallback: the worker is hung mid-teardown and did not + # clear the singleton slots itself. Clear them here so a fresh + # get_or_create() call can spawn a new worker. + # + # The leaked instance is intentionally left in _active_engines. + # That prevents subsequent get_or_create() calls, or any other + # streaming engine, from starting while the previous rapidsmpf + # Context remains in an indeterminate state. + with _state.lock: + if _state.worker is worker: + _state.instance = None + _state.worker = None + warnings.warn( + f"DefaultSingletonEngine shutdown did not complete within " + f"{SHUTDOWN_TIMEOUT_SECONDS}s; the worker thread is leaked " + "and rapidsmpf resources may not have been released. " + "No new streaming engine can be created in this process " + "until the leaked worker eventually returns.", + stacklevel=2, + ) + + +# Register once at module import. The hook is a no-op when no engine is +# live, so it costs nothing if the user never touches the default engine. +atexit.register(DefaultSingletonEngine.shutdown) diff --git a/python/cudf_polars/cudf_polars/experimental/rapidsmpf/frontend/ray.py b/python/cudf_polars/cudf_polars/experimental/rapidsmpf/frontend/ray.py index efbb1db9ad4..8b99626ef0a 100644 --- a/python/cudf_polars/cudf_polars/experimental/rapidsmpf/frontend/ray.py +++ b/python/cudf_polars/cudf_polars/experimental/rapidsmpf/frontend/ray.py @@ -116,6 +116,7 @@ def evaluate_pipeline_ray_mode( ir_ref, actor_config_options, collect_metadata=collect_metadata, + query_id=query_id, ) for rank in rank_actors ] @@ -279,11 +280,16 @@ def shutdown(self) -> None: """ self._py_executor.shutdown(wait=True, cancel_futures=True) # Release resources in dependency order before exit_actor() terminates - # the process. - self._ctx = None - self._comm = None - self._mr = None - ray.actor.exit_actor() + # the process. Shut down the Context explicitly on the same thread + # that constructed it. + try: + if self._ctx is not None: + self._ctx.shutdown() + finally: + self._ctx = None + self._comm = None + self._mr = None + ray.actor.exit_actor() def get_info(self) -> ClusterInfo: """ @@ -326,6 +332,7 @@ def evaluate_polars_ir( config_options: ConfigOptions[StreamingExecutor], *, collect_metadata: bool, + query_id: uuid.UUID, ) -> tuple[pl.DataFrame, list[ChannelMetadata] | None]: """ Lower and execute a Polars IR query on this actor's GPU. @@ -343,6 +350,8 @@ def evaluate_polars_ir( Executor configuration forwarded from the client. collect_metadata If ``True``, collect channel metadata during execution. + query_id + Unique identifier for the query, propagated into actor traces. Returns ------- @@ -370,6 +379,7 @@ def evaluate_polars_ir( ir, config_options, collect_metadata=collect_metadata, + query_id=query_id, ) def _run(self, func: Callable[..., T], *args: Any, **kwargs: Any) -> T: diff --git a/python/cudf_polars/cudf_polars/experimental/rapidsmpf/frontend/spmd.py b/python/cudf_polars/cudf_polars/experimental/rapidsmpf/frontend/spmd.py index 7e1bde808cd..66dd97774e0 100644 --- a/python/cudf_polars/cudf_polars/experimental/rapidsmpf/frontend/spmd.py +++ b/python/cudf_polars/cudf_polars/experimental/rapidsmpf/frontend/spmd.py @@ -112,6 +112,7 @@ def evaluate_pipeline_spmd_mode( ir, config_options, collect_metadata=collect_metadata, + query_id=query_id, ) @@ -370,36 +371,44 @@ def __init__( ) # else: caller-provided comm; the caller retains ownership - self._py_executor: ThreadPoolExecutor = ThreadPoolExecutor( - max_workers=cast(int, executor_options.get("num_py_executors", 8)), - thread_name_prefix="spmd-executor", - ) self._mr: RmmResourceAdaptor = mr + self._comm: Communicator | None = comm + self._ctx: Context | None = None + self._py_executor: ThreadPoolExecutor | None = None exit_stack = contextlib.ExitStack() try: - exit_stack.callback(self._py_executor.shutdown, wait=False) exit_stack.enter_context(set_memory_resource(mr)) - # ``Context`` is *not* registered as a context manager so that - # :meth:`_reset` can swap it mid-life without leaving the - # exit-stack holding a stale reference. ``_cleanup_ctx`` is - # registered instead — it shuts down whatever ``self._ctx`` is - # at engine-shutdown time (i.e. the latest reset's Context). - ctx = Context.from_options(comm.logger, mr, rapidsmpf_options) + + # Register `_cleanup_ctx`, which shuts down whatever `self._ctx` points + # to at engine shutdown time, i.e. the `Context` from the latest reset. + self._ctx = Context.from_options(comm.logger, mr, rapidsmpf_options) exit_stack.callback(self._cleanup_ctx) - self._comm: Communicator | None = comm - self._ctx: Context | None = ctx + + # Register after `_cleanup_ctx` so on teardown (LIFO) the + # executor shuts down first. `wait=True` is safe because + # rapidsmpf's `run_actor_network` awaits its only submitted + # future so by the time we reach shutdown the executor has no + # in-flight work and wait returns immediately. + self._py_executor = ThreadPoolExecutor( + max_workers=cast(int, executor_options.get("num_py_executors", 8)), + thread_name_prefix="spmd-executor", + ) + exit_stack.callback( + self._py_executor.shutdown, wait=True, cancel_futures=True + ) + super().__init__( nranks=comm.nranks, executor_options={ **executor_options, "cluster": "spmd", "spmd_context": SPMDContext( - comm=comm, context=ctx, py_executor=self._py_executor + comm=comm, context=self._ctx, py_executor=self._py_executor ), }, engine_options={ **engine_options, - "memory_resource": ctx.br().device_mr, + "memory_resource": self._ctx.br().device_mr, }, exit_stack=exit_stack, ) @@ -500,7 +509,7 @@ def _reset( "spmd_context": SPMDContext( comm=self._comm, context=self._ctx, - py_executor=self._py_executor, + py_executor=self.py_executor, ), }, engine_options={ @@ -562,6 +571,24 @@ def context(self) -> Context: raise RuntimeError("context is not available after shutdown") return self._ctx + @property + def py_executor(self) -> ThreadPoolExecutor: + """ + The thread-pool executor used to drive the actor network. + + Returns + ------- + Active Python thread-pool executor. + + Raises + ------ + RuntimeError + If called after :meth:`shutdown`. + """ + if self._py_executor is None: + raise RuntimeError("py_executor is not available after shutdown") + return self._py_executor + def gather_cluster_info(self) -> list[ClusterInfo]: """ Collect diagnostic information from every rank. @@ -618,6 +645,7 @@ def shutdown(self) -> None: super().shutdown() self._comm = None self._ctx = None + self._py_executor = None def _run(self, func: Callable[..., T], *args: Any, **kwargs: Any) -> list[T]: data = json.dumps(func(*args, **kwargs)).encode() diff --git a/python/cudf_polars/cudf_polars/testing/asserts.py b/python/cudf_polars/cudf_polars/testing/asserts.py index f37b7f646f7..ed81e87627e 100644 --- a/python/cudf_polars/cudf_polars/testing/asserts.py +++ b/python/cudf_polars/cudf_polars/testing/asserts.py @@ -6,7 +6,7 @@ from __future__ import annotations from pathlib import Path -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING import polars as pl from polars import GPUEngine @@ -27,9 +27,8 @@ ] # Will be overriden by `conftest.py` with the value from the `--executor` -# and `--cluster` command-line arguments +# command-line argument. DEFAULT_EXECUTOR = "in-memory" -DEFAULT_CLUSTER = "single" def assert_gpu_result_equal( @@ -190,15 +189,10 @@ def get_default_engine( assert_gpu_result_equal assert_sink_result_equal """ - executor_options: dict[str, Any] = {} executor = executor or DEFAULT_EXECUTOR - if executor == "streaming": - executor_options["cluster"] = DEFAULT_CLUSTER - return GPUEngine( raise_on_fail=True, executor=executor, - executor_options=executor_options, ) diff --git a/python/cudf_polars/cudf_polars/utils/config.py b/python/cudf_polars/cudf_polars/utils/config.py index 7b5fb5c940c..598a4b3554a 100644 --- a/python/cudf_polars/cudf_polars/utils/config.py +++ b/python/cudf_polars/cudf_polars/utils/config.py @@ -131,17 +131,13 @@ class Cluster(enum.StrEnum): """ The cluster configuration for the streaming executor. - * ``Cluster.SINGLE`` : Single-GPU execution. Uses a zero-dependency, - synchronous, single-threaded task scheduler. - * ``Cluster.SPMD`` : Multi-GPU SPMD execution via the rapidsmpf streaming - runtime. - * ``Cluster.RAY`` : Multi-GPU execution via Ray actors and the rapidsmpf - streaming runtime. - * ``Cluster.DASK`` : Multi-GPU execution via Dask workers and the rapidsmpf - streaming runtime. + * ``Cluster.DEFAULT_SINGLETON`` : Single-GPU execution via the DefaultSingletonEngine. + * ``Cluster.SPMD`` : Multi-GPU SPMD execution via the SPMDEngine. + * ``Cluster.RAY`` : Multi-GPU execution via the RayEngine. + * ``Cluster.DASK`` : Multi-GPU execution via the DaskEngine. """ - SINGLE = "single" + DEFAULT_SINGLETON = "default_singleton" SPMD = "spmd" RAY = "ray" DASK = "dask" @@ -538,9 +534,9 @@ class StreamingExecutor: ---------- cluster The cluster configuration for the streaming executor. - ``Cluster.SINGLE`` by default. + ``Cluster.DEFAULT_SINGLETON`` by default. - * ``Cluster.SINGLE``: Single-GPU execution + * ``Cluster.DEFAULT_SINGLETON``: Single-GPU execution * ``Cluster.SPMD``: Multi-GPU SPMD execution * ``Cluster.RAY``: Multi-GPU Ray execution * ``Cluster.DASK``: Multi-GPU Dask execution @@ -586,8 +582,7 @@ class StreamingExecutor: sink_to_directory Whether multi-partition sink operations write to a directory rather than a single file. For the spmd, ray, and dask clusters this is - always True; setting it to False raises a ValueError. Defaults to - False for the single-GPU cluster. + always True; setting it to False raises a ValueError. dynamic_planning Options controlling dynamic shuffle planning. See :class:`~cudf_polars.utils.config.DynamicPlanningOptions` for more. @@ -675,7 +670,7 @@ class StreamingExecutor: def __post_init__(self) -> None: # noqa: D105 if self.cluster is None: - object.__setattr__(self, "cluster", Cluster.SINGLE) + object.__setattr__(self, "cluster", Cluster.DEFAULT_SINGLETON) assert self.cluster is not None, "Expected cluster to be set." # frozen dataclass, so use object.__setattr__ diff --git a/python/cudf_polars/tests/conftest.py b/python/cudf_polars/tests/conftest.py index 65445b683ae..65c3dce3e49 100644 --- a/python/cudf_polars/tests/conftest.py +++ b/python/cudf_polars/tests/conftest.py @@ -281,14 +281,6 @@ def pytest_addoption(parser): help="Executor to use for GPUEngine.", ) - parser.addoption( - "--cluster", - action="store", - default="single", - choices=("single",), - help="Cluster to use for 'streaming' executor.", - ) - def pytest_configure(config): import cudf_polars.testing.asserts @@ -311,7 +303,6 @@ def pytest_configure(config): config.addinivalue_line("filterwarnings", "ignore::ResourceWarning") cudf_polars.testing.asserts.DEFAULT_EXECUTOR = config.getoption("--executor") - cudf_polars.testing.asserts.DEFAULT_CLUSTER = config.getoption("--cluster") def pytest_collection_modifyitems(items): diff --git a/python/cudf_polars/tests/experimental/test_bind_to_gpu.py b/python/cudf_polars/tests/experimental/test_bind_to_gpu.py index 59c07d8ff15..aa1c439bc01 100644 --- a/python/cudf_polars/tests/experimental/test_bind_to_gpu.py +++ b/python/cudf_polars/tests/experimental/test_bind_to_gpu.py @@ -14,36 +14,48 @@ if TYPE_CHECKING: from collections.abc import Callable + from multiprocessing.connection import Connection -def _run_in_subprocess(target: Callable[[], None]) -> None: - """Execute ``target()`` in a forked child process. +def _wrapper(child_conn: Connection, target: Callable[[], None]) -> None: + """Run ``target`` in the child and report the outcome via ``child_conn``.""" + try: + target() + child_conn.send(None) + except BaseException as exc: + try: + child_conn.send(exc) + except Exception: + child_conn.send( + RuntimeError( + f"{type(exc).__name__}: {exc}\n" + f"{''.join(traceback.format_tb(exc.__traceback__))}" + ) + ) + finally: + child_conn.close() + - Because each call forks a new child, process-wide side-effects - (the ``_bind_done`` flag, CPU affinity, environment variables) never - leak between tests or back into the pytest process. +def _run_in_subprocess(target: Callable[[], None]) -> None: + """Execute ``target()`` in a spawned child process. + + Spawn (rather than fork) is used so the child starts from a clean + interpreter state and never inherits process-wide state from the + pytest worker - notably the active CUDA context, live pylibcudf + objects, and any session-scoped streaming engines. Inheriting a + CUDA context across ``fork()`` is unsafe: the child's GC of + inherited cudf/pylibcudf objects calls into CUDA from a process + that doesn't actually have a context, which surfaces as + ``cudaErrorInitializationError`` and an uncaught + ``std::terminate`` (SIGABRT) at child exit. + + ``target`` must be a module-level callable (spawn pickles the + target by name). """ - ctx = multiprocessing.get_context("fork") + ctx = multiprocessing.get_context("spawn") parent_conn, child_conn = ctx.Pipe() - def _wrapper() -> None: - try: - target() - child_conn.send(None) - except BaseException as exc: - try: - child_conn.send(exc) - except Exception: - child_conn.send( - RuntimeError( - f"{type(exc).__name__}: {exc}\n" - f"{''.join(traceback.format_tb(exc.__traceback__))}" - ) - ) - finally: - child_conn.close() - - proc = ctx.Process(target=_wrapper) + proc = ctx.Process(target=_wrapper, args=(child_conn, target)) proc.start() try: proc.join(timeout=30) @@ -71,187 +83,191 @@ def _reset_bind_state() -> None: hardware_binding._bind_done = False -def test_bind_called_once() -> None: - """bind() is called exactly once even when bind_to_gpu() is called twice.""" +# --------------------------------------------------------------------------- +# Subprocess test bodies. These must be module-level so they pickle across +# the spawn boundary. +# --------------------------------------------------------------------------- - def body() -> None: - _reset_bind_state() - with patch( - "cudf_polars.experimental.rapidsmpf.frontend.hardware_binding.bind" - ) as mock_bind: - from cudf_polars.experimental.rapidsmpf.frontend.hardware_binding import ( - HardwareBindingPolicy, - bind_to_gpu, - ) - policy = HardwareBindingPolicy() - bind_to_gpu(policy) - bind_to_gpu(policy) - assert mock_bind.call_count == 1 +def _body_bind_called_once() -> None: + _reset_bind_state() + with patch( + "cudf_polars.experimental.rapidsmpf.frontend.hardware_binding.bind" + ) as mock_bind: + from cudf_polars.experimental.rapidsmpf.frontend.hardware_binding import ( + HardwareBindingPolicy, + bind_to_gpu, + ) - _run_in_subprocess(body) + policy = HardwareBindingPolicy() + bind_to_gpu(policy) + bind_to_gpu(policy) + assert mock_bind.call_count == 1 + + +def test_bind_called_once() -> None: + """bind() is called exactly once even when bind_to_gpu() is called twice.""" + _run_in_subprocess(_body_bind_called_once) + + +def _body_bind_falls_back_to_gpu_0() -> None: + _reset_bind_state() + mock_bind = MagicMock(side_effect=[RuntimeError("no CUDA_VISIBLE_DEVICES"), None]) + with patch( + "cudf_polars.experimental.rapidsmpf.frontend.hardware_binding.bind", + mock_bind, + ): + from cudf_polars.experimental.rapidsmpf.frontend.hardware_binding import ( + HardwareBindingPolicy, + bind_to_gpu, + ) + + policy = HardwareBindingPolicy() + bind_to_gpu(policy) + bind_kw = { + "cpu": policy.cpu, + "memory": policy.memory, + "network": policy.network, + } + assert mock_bind.call_count == 2 + assert mock_bind.call_args_list == [ + call(**bind_kw), + call(gpu_id=0, **bind_kw), + ] def test_bind_falls_back_to_gpu_0() -> None: """When bind() raises RuntimeError, falls back to gpu_id=0.""" - - def body() -> None: - _reset_bind_state() - mock_bind = MagicMock( - side_effect=[RuntimeError("no CUDA_VISIBLE_DEVICES"), None] + _run_in_subprocess(_body_bind_falls_back_to_gpu_0) + + +def _body_bind_raise_on_fail_propagates_exception() -> None: + _reset_bind_state() + mock_bind = MagicMock(side_effect=RuntimeError("binding failed")) + with patch( + "cudf_polars.experimental.rapidsmpf.frontend.hardware_binding.bind", + mock_bind, + ): + from cudf_polars.experimental.rapidsmpf.frontend.hardware_binding import ( + HardwareBindingPolicy, + bind_to_gpu, ) - with patch( - "cudf_polars.experimental.rapidsmpf.frontend.hardware_binding.bind", - mock_bind, - ): - from cudf_polars.experimental.rapidsmpf.frontend.hardware_binding import ( - HardwareBindingPolicy, - bind_to_gpu, - ) - - policy = HardwareBindingPolicy() - bind_to_gpu(policy) - bind_kw = { - "cpu": policy.cpu, - "memory": policy.memory, - "network": policy.network, - } - assert mock_bind.call_count == 2 - assert mock_bind.call_args_list == [ - call(**bind_kw), - call(gpu_id=0, **bind_kw), - ] - _run_in_subprocess(body) + with pytest.raises(RuntimeError, match="binding failed"): + bind_to_gpu(HardwareBindingPolicy(raise_on_fail=True)) def test_bind_raise_on_fail_propagates_exception() -> None: """raise_on_fail=True lets RuntimeError from bind() propagate.""" + _run_in_subprocess(_body_bind_raise_on_fail_propagates_exception) + + +def _body_bind_raise_on_fail_false_suppresses_exception() -> None: + _reset_bind_state() + mock_bind = MagicMock(side_effect=RuntimeError("binding failed")) + with patch( + "cudf_polars.experimental.rapidsmpf.frontend.hardware_binding.bind", + mock_bind, + ): + from cudf_polars.experimental.rapidsmpf.frontend.hardware_binding import ( + HardwareBindingPolicy, + bind_to_gpu, + ) - def body() -> None: - _reset_bind_state() - mock_bind = MagicMock(side_effect=RuntimeError("binding failed")) - with patch( - "cudf_polars.experimental.rapidsmpf.frontend.hardware_binding.bind", - mock_bind, - ): - from cudf_polars.experimental.rapidsmpf.frontend.hardware_binding import ( - HardwareBindingPolicy, - bind_to_gpu, - ) - - with pytest.raises(RuntimeError, match="binding failed"): - bind_to_gpu(HardwareBindingPolicy(raise_on_fail=True)) - - _run_in_subprocess(body) + bind_to_gpu(HardwareBindingPolicy(raise_on_fail=False)) def test_bind_raise_on_fail_false_suppresses_exception() -> None: """raise_on_fail=False silently ignores RuntimeError from bind().""" + _run_in_subprocess(_body_bind_raise_on_fail_false_suppresses_exception) - def body() -> None: - _reset_bind_state() - mock_bind = MagicMock(side_effect=RuntimeError("binding failed")) - with patch( - "cudf_polars.experimental.rapidsmpf.frontend.hardware_binding.bind", - mock_bind, - ): - from cudf_polars.experimental.rapidsmpf.frontend.hardware_binding import ( - HardwareBindingPolicy, - bind_to_gpu, - ) - bind_to_gpu(HardwareBindingPolicy(raise_on_fail=False)) +def _body_bind_thread_safe() -> None: + import threading - _run_in_subprocess(body) + _reset_bind_state() + with patch( + "cudf_polars.experimental.rapidsmpf.frontend.hardware_binding.bind" + ) as mock_bind: + from cudf_polars.experimental.rapidsmpf.frontend.hardware_binding import ( + HardwareBindingPolicy, + bind_to_gpu, + ) + policy = HardwareBindingPolicy() + barrier = threading.Barrier(8) -def test_bind_thread_safe() -> None: - """Concurrent calls from multiple threads result in exactly one bind() call.""" + def _call_bind() -> None: + barrier.wait() + bind_to_gpu(policy) - def body() -> None: - import threading + threads = [threading.Thread(target=_call_bind) for _ in range(8)] + for t in threads: + t.start() + for t in threads: + t.join() - _reset_bind_state() - with patch( - "cudf_polars.experimental.rapidsmpf.frontend.hardware_binding.bind" - ) as mock_bind: - from cudf_polars.experimental.rapidsmpf.frontend.hardware_binding import ( - HardwareBindingPolicy, - bind_to_gpu, - ) + assert mock_bind.call_count == 1 - policy = HardwareBindingPolicy() - barrier = threading.Barrier(8) - def _call_bind() -> None: - barrier.wait() - bind_to_gpu(policy) +def test_bind_thread_safe() -> None: + """Concurrent calls from multiple threads result in exactly one bind() call.""" + _run_in_subprocess(_body_bind_thread_safe) - threads = [threading.Thread(target=_call_bind) for _ in range(8)] - for t in threads: - t.start() - for t in threads: - t.join() - assert mock_bind.call_count == 1 +def _body_bind_done_flag_set() -> None: + from cudf_polars.experimental.rapidsmpf.frontend import hardware_binding - _run_in_subprocess(body) + _reset_bind_state() + assert not hardware_binding._bind_done + with patch("cudf_polars.experimental.rapidsmpf.frontend.hardware_binding.bind"): + hardware_binding.bind_to_gpu(hardware_binding.HardwareBindingPolicy()) + assert hardware_binding._bind_done def test_bind_done_flag_set() -> None: """_bind_done is True after bind_to_gpu() succeeds.""" + _run_in_subprocess(_body_bind_done_flag_set) - def body() -> None: - from cudf_polars.experimental.rapidsmpf.frontend import hardware_binding - _reset_bind_state() - assert not hardware_binding._bind_done - with patch("cudf_polars.experimental.rapidsmpf.frontend.hardware_binding.bind"): - hardware_binding.bind_to_gpu(hardware_binding.HardwareBindingPolicy()) - assert hardware_binding._bind_done +def _body_bind_disabled() -> None: + _reset_bind_state() + with patch( + "cudf_polars.experimental.rapidsmpf.frontend.hardware_binding.bind" + ) as mock_bind: + from cudf_polars.experimental.rapidsmpf.frontend.hardware_binding import ( + HardwareBindingPolicy, + bind_to_gpu, + ) - _run_in_subprocess(body) + bind_to_gpu(HardwareBindingPolicy(enabled=False)) + mock_bind.assert_not_called() def test_bind_disabled() -> None: """enabled=False skips binding entirely.""" + _run_in_subprocess(_body_bind_disabled) - def body() -> None: - _reset_bind_state() - with patch( - "cudf_polars.experimental.rapidsmpf.frontend.hardware_binding.bind" - ) as mock_bind: - from cudf_polars.experimental.rapidsmpf.frontend.hardware_binding import ( - HardwareBindingPolicy, - bind_to_gpu, - ) - bind_to_gpu(HardwareBindingPolicy(enabled=False)) - mock_bind.assert_not_called() +def _body_bind_enable_once_false() -> None: + _reset_bind_state() + with patch( + "cudf_polars.experimental.rapidsmpf.frontend.hardware_binding.bind" + ) as mock_bind: + from cudf_polars.experimental.rapidsmpf.frontend.hardware_binding import ( + HardwareBindingPolicy, + bind_to_gpu, + ) - _run_in_subprocess(body) + policy = HardwareBindingPolicy(enable_once=False) + bind_to_gpu(policy) + bind_to_gpu(policy) + assert mock_bind.call_count == 2 def test_bind_enable_once_false() -> None: """enable_once=False allows repeated bind() calls.""" - - def body() -> None: - _reset_bind_state() - with patch( - "cudf_polars.experimental.rapidsmpf.frontend.hardware_binding.bind" - ) as mock_bind: - from cudf_polars.experimental.rapidsmpf.frontend.hardware_binding import ( - HardwareBindingPolicy, - bind_to_gpu, - ) - - policy = HardwareBindingPolicy(enable_once=False) - bind_to_gpu(policy) - bind_to_gpu(policy) - assert mock_bind.call_count == 2 - - _run_in_subprocess(body) + _run_in_subprocess(_body_bind_enable_once_false) # --------------------------------------------------------------------------- diff --git a/python/cudf_polars/tests/experimental/test_dataframescan.py b/python/cudf_polars/tests/experimental/test_dataframescan.py index fb263e20b94..0c8eb41c625 100644 --- a/python/cudf_polars/tests/experimental/test_dataframescan.py +++ b/python/cudf_polars/tests/experimental/test_dataframescan.py @@ -78,11 +78,9 @@ def test_dataframescan_concat(request, df, streaming_engine_factory): assert_gpu_result_equal(df2, engine=streaming_engine) -def test_join_in_memory_lazy_stable_id_pickle(): - engine = pl.GPUEngine( - raise_on_fail=True, - executor="streaming", - executor_options={"max_rows_per_partition": 1_000}, +def test_join_in_memory_lazy_stable_id_pickle(streaming_engine_factory): + engine = streaming_engine_factory( + StreamingOptions(max_rows_per_partition=1_000, raise_on_fail=True), ) left = ( pl.LazyFrame({"k": [1, 2, 3], "x": [10, 20, 30]}).collect(engine=engine).lazy() diff --git a/python/cudf_polars/tests/experimental/test_default_singleton_engine.py b/python/cudf_polars/tests/experimental/test_default_singleton_engine.py new file mode 100644 index 00000000000..93c63d2fb4d --- /dev/null +++ b/python/cudf_polars/tests/experimental/test_default_singleton_engine.py @@ -0,0 +1,405 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. +# SPDX-License-Identifier: Apache-2.0 +""" +Tests for :class:`DefaultSingletonEngine`. + +Every test body runs inside a worker spawned by the module-scoped +``proc_pool`` fixture. This isolates us from the session-scoped +``streaming_engines`` fixture in :file:`conftest.py`, which creates an +``SPMDEngine`` that lives for the entire pytest session and would +otherwise trip the "no other engine alive when default is created" +guardrail in :class:`DefaultSingletonEngine`. +""" + +from __future__ import annotations + +import multiprocessing +from concurrent.futures import ProcessPoolExecutor +from typing import TYPE_CHECKING + +import pytest + +if TYPE_CHECKING: + from collections.abc import Callable, Generator + + +# --------------------------------------------------------------------------- +# Subprocess infrastructure +# --------------------------------------------------------------------------- + + +@pytest.fixture(scope="module") +def proc_pool() -> Generator[ProcessPoolExecutor, None, None]: + """ + Module-scoped ``ProcessPoolExecutor`` used to run test bodies in isolation. + + Spawn (rather than fork) is used so each worker starts from a clean + interpreter state — no inherited rapidsmpf Context, no inherited + pytest fixtures, no inherited GPU resources. + """ + ctx = multiprocessing.get_context("spawn") + with ProcessPoolExecutor(max_workers=1, mp_context=ctx) as pool: + yield pool + + +def _reset_singleton_module_state() -> None: + """Tear down any leftover engine and reset every module-level slot.""" + from cudf_polars.experimental.rapidsmpf.frontend import ( + default_singleton_engine as dse, + ) + from cudf_polars.experimental.rapidsmpf.frontend.default_singleton_engine import ( + DefaultSingletonEngine, + ) + + DefaultSingletonEngine.shutdown() # idempotent; no-op if no live instance + if dse._state.worker is not None: + # Construction failed mid-flight (no live instance, but a worker + # was spawned). Signal it to exit and clear the slot. + dse._state.worker.shutdown() + dse._state.worker = None + + +def _run(body: object) -> None: + """ + Subprocess entry point. + + Resets the singleton module state, runs ``body``, and resets again on + exit so a stuck test doesn't bleed state into the next worker + invocation. ``body`` must be a top-level callable so it is picklable + across the spawn boundary. + """ + _reset_singleton_module_state() + try: + body() # type: ignore[operator] + finally: + _reset_singleton_module_state() + + +# --------------------------------------------------------------------------- +# Test bodies +# --------------------------------------------------------------------------- + + +def _body_lifecycle() -> None: + """ + Construction, type, get-or-create, shutdown, context manager, fresh-after-shutdown. + """ + import pytest + + import polars as pl + + from cudf_polars.experimental.rapidsmpf.frontend.default_singleton_engine import ( + DefaultSingletonEngine, + ) + from cudf_polars.experimental.rapidsmpf.frontend.spmd import SPMDEngine + + # Construction succeeds; isinstance check skips its parent's + # ``check_no_live_default_singleton`` so SPMDEngine.__init__ runs cleanly. + with DefaultSingletonEngine.get_or_create() as engine: + assert engine.nranks == 1 + assert engine.rank == 0 + assert isinstance(engine, SPMDEngine) + assert isinstance(engine, pl.GPUEngine) + # Get-or-create: a second call returns the same instance. + assert DefaultSingletonEngine.get_or_create() is engine + + # After context-manager exit, the slot is free. + e2 = DefaultSingletonEngine.get_or_create() + assert e2 is not engine + e2.shutdown() + e2.shutdown() # idempotent + + # Comm/context properties raise after shutdown. + with pytest.raises(RuntimeError, match="shutdown"): + _ = e2.comm + with pytest.raises(RuntimeError, match="shutdown"): + _ = e2.context + + +def _body_default_path_routing() -> None: + """ + Both ``engine="gpu"`` and ``engine=pl.GPUEngine(executor="streaming")`` + route through the singleton, reuse it across queries, and pick up an + explicit user singleton. + """ + import polars as pl + + from cudf_polars.experimental.rapidsmpf.frontend import ( + default_singleton_engine as dse, + ) + from cudf_polars.experimental.rapidsmpf.frontend.default_singleton_engine import ( + DefaultSingletonEngine, + ) + + # Default path #1: literal ``engine="gpu"`` triggers DefaultSingletonEngine. + assert dse._state.instance is None + result = pl.LazyFrame({"a": [1, 2, 3], "b": [4, 5, 6]}).collect(engine="gpu") + assert result.shape == (3, 2) + assert result["a"].to_list() == [1, 2, 3] + first = dse._state.instance + assert first is not None + first.shutdown() + assert dse._state.instance is None + + # Default path #2: vanilla streaming GPUEngine also triggers DefaultSingletonEngine. + engine = pl.GPUEngine(executor="streaming") + result = pl.LazyFrame({"a": [1, 2, 3], "b": [4, 5, 6]}).collect(engine=engine) + assert result.shape == (3, 2) + assert result["a"].to_list() == [1, 2, 3] + + # Second query reuses the same singleton. + first = dse._state.instance + assert first is not None + pl.LazyFrame({"a": [4, 5, 6]}).collect(engine=engine) + assert dse._state.instance is first + first.shutdown() + + # An already-live user singleton is reused by the default path. + with DefaultSingletonEngine.get_or_create() as user_engine: + pl.LazyFrame({"a": [1]}).collect(engine=engine) + assert dse._state.instance is user_engine + + +def _body_concurrent_warm_path() -> None: + """Concurrent ``get_or_create()`` calls return the same instance.""" + import threading + + from cudf_polars.experimental.rapidsmpf.frontend.default_singleton_engine import ( + DefaultSingletonEngine, + ) + + main_engine = DefaultSingletonEngine.get_or_create() + try: + barrier = threading.Barrier(8) + results: list[DefaultSingletonEngine] = [] + results_lock = threading.Lock() + + def worker() -> None: + barrier.wait() + with results_lock: + results.append(DefaultSingletonEngine.get_or_create()) + + threads = [threading.Thread(target=worker) for _ in range(8)] + for t in threads: + t.start() + for t in threads: + t.join() + + assert len(results) == 8 + assert all(r is main_engine for r in results) + finally: + main_engine.shutdown() + + +def _body_atexit_no_op() -> None: + """ + ``DefaultSingletonEngine.shutdown`` (registered once at import as the + atexit hook) is a no-op when no engine is live. + """ + from cudf_polars.experimental.rapidsmpf.frontend import ( + default_singleton_engine as dse, + ) + from cudf_polars.experimental.rapidsmpf.frontend.default_singleton_engine import ( + DefaultSingletonEngine, + ) + + assert dse._state.instance is None + DefaultSingletonEngine.shutdown() # no-op + assert dse._state.instance is None + + # Construct, shutdown, then call the atexit hook again — still a no-op. + DefaultSingletonEngine.get_or_create().shutdown() + assert dse._state.instance is None + DefaultSingletonEngine.shutdown() + assert dse._state.instance is None + + +def _body_singleton_blocked_when_explicit_alive() -> None: + """ + The reverse direction: ``get_or_create()`` refuses if any other + ``StreamingEngine`` is alive when ``DefaultSingletonEngine.__init__`` + finishes. Plus the ``_active_engine_count`` accessor. + """ + import pytest + + from cudf_polars.experimental.rapidsmpf.frontend.core import StreamingEngine + from cudf_polars.experimental.rapidsmpf.frontend.default_singleton_engine import ( + DefaultSingletonEngine, + ) + from cudf_polars.experimental.rapidsmpf.frontend.spmd import SPMDEngine + + assert StreamingEngine._active_engine_count() == 0 + + # A real explicit SPMDEngine blocks the default. + with ( + SPMDEngine(), + pytest.raises(RuntimeError, match="explicit streaming engine"), + ): + DefaultSingletonEngine.get_or_create() + assert StreamingEngine._active_engine_count() == 0 + + # Active-count tracks DefaultSingletonEngine's own lifecycle too. + with DefaultSingletonEngine.get_or_create(): + assert StreamingEngine._active_engine_count() == 1 + assert StreamingEngine._active_engine_count() == 0 + + +def _body_worker_thread_isolation() -> None: + """ + The dedicated worker thread owns construction and shutdown. + + - Construction runs on the named worker thread, not the caller's. + - ``shutdown`` from a non-creator thread (i.e. the test main thread) + doesn't crash, because the teardown is dispatched back to the + worker. + - If construction raises on the worker, the caller sees the same + exception and the slot is reset for retry. + """ + import threading + from typing import Any + from unittest.mock import patch + + import pytest + + from cudf_polars.experimental.rapidsmpf.frontend import ( + default_singleton_engine as dse, + ) + from cudf_polars.experimental.rapidsmpf.frontend.default_singleton_engine import ( + DefaultSingletonEngine, + ) + from cudf_polars.experimental.rapidsmpf.frontend.spmd import SPMDEngine + + # 1) Construction runs on the worker thread. + recorded: dict[str, threading.Thread] = {} + real_init = SPMDEngine.__init__ + + def recording_init(self: SPMDEngine, **kwargs: Any) -> None: + recorded["thread"] = threading.current_thread() + real_init(self, **kwargs) + + with patch.object(SPMDEngine, "__init__", recording_init): + engine = DefaultSingletonEngine.get_or_create() + try: + assert recorded["thread"] is not threading.current_thread() + assert recorded["thread"].name.startswith("default-singleton-engine") + finally: + engine.shutdown() + + # 2) Cross-thread shutdown via the worker is safe. + create_done = threading.Event() + + def creator() -> None: + DefaultSingletonEngine.get_or_create() + create_done.set() + + t = threading.Thread(target=creator) + t.start() + create_done.wait() + t.join() + live = dse._state.instance + assert live is not None + live.shutdown() # different thread than the one that constructed + assert dse._state.instance is None + assert dse._state.worker is None + + # 3) Construction error propagates and resets state for a retry. + def broken_init(self: SPMDEngine, **kwargs: object) -> None: + raise RuntimeError("synthetic boom") + + with ( + patch.object(SPMDEngine, "__init__", broken_init), + pytest.raises(RuntimeError, match="synthetic boom"), + ): + DefaultSingletonEngine.get_or_create() + assert dse._state.instance is None + assert dse._state.worker is None + DefaultSingletonEngine.get_or_create().shutdown() # retry succeeds + + +def _body_shutdown_timeout() -> None: + """ + A hung ``SPMDEngine.shutdown`` causes the timeout branch to fire: + a warning is emitted, the singleton slot is cleared, and any new + construction is refused until the leaked worker eventually returns. + """ + import threading + from unittest.mock import patch + + import pytest + + from cudf_polars.experimental.rapidsmpf.frontend import ( + default_singleton_engine as dse, + ) + from cudf_polars.experimental.rapidsmpf.frontend.default_singleton_engine import ( + DefaultSingletonEngine, + ) + from cudf_polars.experimental.rapidsmpf.frontend.spmd import SPMDEngine + + release_worker = threading.Event() + real_done = threading.Event() + real_shutdown = SPMDEngine.shutdown + + with patch.object(dse, "SHUTDOWN_TIMEOUT_SECONDS", 0.1): + engine = DefaultSingletonEngine.get_or_create() + + def hanging_super_shutdown(self: SPMDEngine) -> None: + # Only the original ``engine`` should hang; any other + # ``SPMDEngine.shutdown`` call (e.g. the self-cleanup path + # inside ``DefaultSingletonEngine.__init__`` when the + # leaked instance still occupies ``_active_engines``) must + # run normally, otherwise the test deadlocks. + if self is not engine: + real_shutdown(self) + return + release_worker.wait() + try: + # Run the real teardown so the rapidsmpf Context is + # destroyed on the construction (worker) thread, otherwise + # GC of the engine on the wrong thread crashes with + # "Context::shutdown() called from a different thread...". + real_shutdown(self) + finally: + real_done.set() + + with patch.object(SPMDEngine, "shutdown", hanging_super_shutdown): + with pytest.warns(UserWarning, match="did not complete within"): + engine.shutdown() + # Slot cleared, but the leaked engine is still in the + # active-engine registry — no new construction allowed. + assert dse._state.instance is None + assert dse._state.worker is None + with pytest.raises(RuntimeError, match="explicit streaming engine"): + DefaultSingletonEngine.get_or_create() + # Once the leaked worker returns it removes itself from the + # registry; a fresh construction is allowed again. + release_worker.set() + real_done.wait() + DefaultSingletonEngine.get_or_create().shutdown() + + +# --------------------------------------------------------------------------- +# Single parametrized entry point. Each body runs in a fresh subprocess via +# ``proc_pool``; failure messages identify the body by name (``[lifecycle]``, +# ``[shutdown_timeout]``, …), and ``pytest -k `` matches as expected. +# --------------------------------------------------------------------------- + + +_ALL_BODIES = [ + _body_lifecycle, + _body_default_path_routing, + _body_concurrent_warm_path, + _body_atexit_no_op, + _body_singleton_blocked_when_explicit_alive, + _body_worker_thread_isolation, + _body_shutdown_timeout, +] + + +@pytest.mark.parametrize( + "body", _ALL_BODIES, ids=lambda b: b.__name__.removeprefix("_body_") +) +def test_default_singleton_engine( + proc_pool: ProcessPoolExecutor, body: Callable[[], None] +) -> None: + """Run each ``_body_*`` function in an isolated subprocess.""" + proc_pool.submit(_run, body).result() diff --git a/python/cudf_polars/tests/experimental/test_explain.py b/python/cudf_polars/tests/experimental/test_explain.py index 1d2abb72c8c..b21bbb3460c 100644 --- a/python/cudf_polars/tests/experimental/test_explain.py +++ b/python/cudf_polars/tests/experimental/test_explain.py @@ -18,6 +18,7 @@ explain_query, serialize_query, ) +from cudf_polars.experimental.rapidsmpf.frontend.options import StreamingOptions from cudf_polars.testing.asserts import assert_gpu_result_equal from cudf_polars.testing.io import make_lazy_frame, make_partitioned_source @@ -36,15 +37,14 @@ def df(): ) -@pytest.fixture(scope="module") -def explain_engine(): - return pl.GPUEngine( - raise_on_fail=True, - executor="streaming", - executor_options={ - "target_partition_size": 10_000, - "max_rows_per_partition": 1_000, - }, +@pytest.fixture +def explain_engine(streaming_engine_factory): + return streaming_engine_factory( + StreamingOptions( + target_partition_size=10_000, + max_rows_per_partition=1_000, + raise_on_fail=True, + ) ) diff --git a/python/cudf_polars/tests/experimental/test_hstack.py b/python/cudf_polars/tests/experimental/test_hstack.py index 0c21678f7e2..b6753d31adb 100644 --- a/python/cudf_polars/tests/experimental/test_hstack.py +++ b/python/cudf_polars/tests/experimental/test_hstack.py @@ -16,24 +16,25 @@ from cudf_polars.dsl.ir import Filter, HStack from cudf_polars.dsl.traversal import traversal from cudf_polars.experimental.parallel import lower_ir_graph +from cudf_polars.experimental.rapidsmpf.frontend.options import StreamingOptions from cudf_polars.experimental.repartition import Repartition from cudf_polars.experimental.statistics import collect_statistics -from cudf_polars.testing.asserts import ( - DEFAULT_CLUSTER, - assert_gpu_result_equal, -) -from cudf_polars.utils.config import ConfigOptions - - -@pytest.fixture(scope="module") -def engine(): - return pl.GPUEngine( - raise_on_fail=True, - executor="streaming", - executor_options={ - "max_rows_per_partition": 3, - "cluster": DEFAULT_CLUSTER, - }, +from cudf_polars.testing.asserts import assert_gpu_result_equal +from cudf_polars.testing.engine_utils import warns_on_spmd +from cudf_polars.utils.config import ConfigOptions, StreamingFallbackMode + + +@pytest.fixture +def engine(streaming_engine_factory): + # Override ``fallback_mode`` because the ``spmd-small`` baseline sets + # it to ``SILENT``, which would swallow the ``UserWarning`` that + # ``test_hstack_non_scalar_cse_fallback`` asserts on. + return streaming_engine_factory( + StreamingOptions( + max_rows_per_partition=3, + raise_on_fail=True, + fallback_mode=StreamingFallbackMode.WARN, + ), ) @@ -70,7 +71,9 @@ def test_hstack_non_scalar_cse_fallback(df, engine): pl.col("a").head(5).min().alias("min_5"), pl.col("a").head(5).max().alias("max_5"), ) - with pytest.warns(UserWarning, match="not supported for multiple partitions"): + with warns_on_spmd( + engine, UserWarning, match="not supported for multiple partitions" + ): assert_gpu_result_equal(q, engine=engine) diff --git a/python/cudf_polars/tests/experimental/test_parallel.py b/python/cudf_polars/tests/experimental/test_parallel.py index a9a0ff63786..e057b561016 100644 --- a/python/cudf_polars/tests/experimental/test_parallel.py +++ b/python/cudf_polars/tests/experimental/test_parallel.py @@ -78,7 +78,11 @@ def test_evaluate_streaming(streaming_engine): q = df.select(pl.col("a") - (pl.col("b") + pl.col("c") * 2), pl.col("d")).sort("d") expected = q.collect(engine="cpu") - got_gpu = q.collect(engine=pl.GPUEngine(raise_on_fail=True)) + # Use the in-memory executor for the GPU comparison: a vanilla + # ``pl.GPUEngine(raise_on_fail=True)`` defaults to ``executor="streaming"``, + # which routes through ``DefaultSingletonEngine`` and would fail while the + # session ``SPMDEngine`` from conftest is alive. + got_gpu = q.collect(engine=pl.GPUEngine(executor="in-memory", raise_on_fail=True)) got_streaming = q.collect(engine=streaming_engine) assert_frame_equal(expected, got_gpu) assert_frame_equal(expected, got_streaming) diff --git a/python/cudf_polars/tests/experimental/test_sink.py b/python/cudf_polars/tests/experimental/test_sink.py index df68b7c199a..e53bf3f4029 100644 --- a/python/cudf_polars/tests/experimental/test_sink.py +++ b/python/cudf_polars/tests/experimental/test_sink.py @@ -11,7 +11,6 @@ from cudf_polars.experimental.rapidsmpf.frontend.options import StreamingOptions from cudf_polars.testing.asserts import assert_sink_result_equal -from cudf_polars.utils.config import ConfigOptions @pytest.fixture(scope="module") @@ -92,43 +91,15 @@ def test_sink_parquet_directory( assert len(list(check_path.iterdir())) == expected_file_count -def test_sink_parquet_raises_spmd(spmd_engine): - from cudf_polars.experimental.rapidsmpf.frontend.spmd import SPMDEngine +def test_sink_parquet_raises(streaming_engines): + """No streaming-engine cluster supports ``sink_to_directory=False``.""" + from cudf_polars.utils.config import Cluster, StreamingExecutor - with ( - pytest.raises( - ValueError, match="The spmd cluster requires sink_to_directory=True" - ), - SPMDEngine( - comm=spmd_engine.comm, executor_options={"sink_to_directory": False} - ) as engine, - ): - ConfigOptions.from_polars_engine(engine) - - -def test_sink_parquet_raises(df, tmp_path): - engine = pl.GPUEngine( - raise_on_fail=True, - executor="streaming", - executor_options={ - "max_rows_per_partition": 100_000, - "sink_to_directory": False, - }, - ) - path = tmp_path / "test_sink_raises.parquet" - df.sink_parquet(path, engine=engine) - - # Cannot overwrite an existing path with sink_to_directory=True - engine = pl.GPUEngine( - raise_on_fail=True, - executor="streaming", - executor_options={ - "max_rows_per_partition": 100_000, - "sink_to_directory": True, - }, - ) - with pytest.raises(NotImplementedError, match="not supported"): - df.sink_parquet(path, engine=engine) + for name in streaming_engines: + with pytest.raises( + ValueError, match=f"The {name} cluster requires sink_to_directory=True" + ): + StreamingExecutor(cluster=Cluster(name), sink_to_directory=False) @pytest.mark.parametrize("include_header", [True, False]) @@ -137,20 +108,19 @@ def test_sink_parquet_raises(df, tmp_path): @pytest.mark.parametrize("max_rows_per_partition", [1_000, 1_000_000]) def test_sink_csv( df, + streaming_engine_factory, tmp_path, include_header, null_value, separator, max_rows_per_partition, ): - engine = pl.GPUEngine( - raise_on_fail=True, - executor="streaming", - executor_options={ - "max_rows_per_partition": max_rows_per_partition, - }, + engine = streaming_engine_factory( + StreamingOptions( + max_rows_per_partition=max_rows_per_partition, + raise_on_fail=True, + ), ) - assert_sink_result_equal( df, tmp_path / "out.csv", @@ -167,15 +137,13 @@ def test_sink_csv( @pytest.mark.parametrize("max_rows_per_partition", [1_000, 1_000_000]) -def test_sink_ndjson(df, tmp_path, max_rows_per_partition): - engine = pl.GPUEngine( - raise_on_fail=True, - executor="streaming", - executor_options={ - "max_rows_per_partition": max_rows_per_partition, - }, +def test_sink_ndjson(df, streaming_engine_factory, tmp_path, max_rows_per_partition): + engine = streaming_engine_factory( + StreamingOptions( + max_rows_per_partition=max_rows_per_partition, + raise_on_fail=True, + ), ) - assert_sink_result_equal( df, tmp_path / "out.ndjson", diff --git a/python/cudf_polars/tests/experimental/test_sort.py b/python/cudf_polars/tests/experimental/test_sort.py index f0abf5caade..b147358dc4b 100644 --- a/python/cudf_polars/tests/experimental/test_sort.py +++ b/python/cudf_polars/tests/experimental/test_sort.py @@ -7,35 +7,29 @@ import polars as pl -from cudf_polars.testing.asserts import ( - DEFAULT_CLUSTER, - assert_gpu_result_equal, -) +from cudf_polars.experimental.rapidsmpf.frontend.options import StreamingOptions +from cudf_polars.testing.asserts import assert_gpu_result_equal -@pytest.fixture(scope="module") -def engine(): - return pl.GPUEngine( - raise_on_fail=True, - executor="streaming", - executor_options={ - "max_rows_per_partition": 3, - "cluster": DEFAULT_CLUSTER, - "fallback_mode": "raise", - }, +@pytest.fixture +def engine(streaming_engine_factory): + return streaming_engine_factory( + StreamingOptions( + max_rows_per_partition=3, + fallback_mode="raise", + raise_on_fail=True, + ), ) -@pytest.fixture(scope="module") -def engine_large(): - return pl.GPUEngine( - raise_on_fail=True, - executor="streaming", - executor_options={ - "max_rows_per_partition": 2_100, - "cluster": DEFAULT_CLUSTER, - "fallback_mode": "raise", - }, +@pytest.fixture +def engine_large(streaming_engine_factory): + return streaming_engine_factory( + StreamingOptions( + max_rows_per_partition=2_100, + fallback_mode="raise", + raise_on_fail=True, + ), ) @@ -130,16 +124,10 @@ def test_sort_slice(df, engine, offset): assert_gpu_result_equal(q, engine=engine) -def test_sort_after_sparse_join(): - engine = pl.GPUEngine( - raise_on_fail=True, - executor="streaming", - executor_options={ - "cluster": DEFAULT_CLUSTER, - "max_rows_per_partition": 4, - }, +def test_sort_after_sparse_join(streaming_engine_factory): + engine = streaming_engine_factory( + StreamingOptions(max_rows_per_partition=4, raise_on_fail=True), ) - left = pl.LazyFrame({"foo": list(range(5)), "bar": list(range(5))}) right = pl.LazyFrame({"foo": list(range(1))}) q = left.join(right, on="foo", how="inner").sort(by=["foo"]) diff --git a/python/cudf_polars/tests/test_config.py b/python/cudf_polars/tests/test_config.py index 6004c5eef40..c0afb466f99 100644 --- a/python/cudf_polars/tests/test_config.py +++ b/python/cudf_polars/tests/test_config.py @@ -74,7 +74,7 @@ def test_use_device_not_current(monkeypatch): # Fake that the current device is 1. monkeypatch.setattr(gpu, "getDevice", lambda: 1) q = pl.LazyFrame({}) - assert_gpu_result_equal(q, engine=pl.GPUEngine(device=0)) + assert_gpu_result_equal(q, engine=pl.GPUEngine(executor="in-memory", device=0)) @pytest.mark.parametrize("device", [-1, "foo"]) @@ -118,7 +118,7 @@ def test_cudf_polars_enable_disable_managed_memory(monkeypatch, enable_managed_m monkeycontext.setenv( "POLARS_GPU_ENABLE_CUDA_MANAGED_MEMORY", enable_managed_memory ) - result = q.collect(engine=pl.GPUEngine()) + result = q.collect(engine=pl.GPUEngine(executor="in-memory")) mr = default_memory_resource( 0, cuda_managed_memory=bool(enable_managed_memory == "1"), @@ -136,7 +136,7 @@ def test_cudf_polars_enable_disable_managed_memory(monkeypatch, enable_managed_m def test_explicit_device_zero(): q = pl.LazyFrame({"a": [1, 2, 3]}) - result = q.collect(engine=pl.GPUEngine(device=0)) + result = q.collect(engine=pl.GPUEngine(executor="in-memory", device=0)) assert_frame_equal(q.collect(), result) @@ -152,7 +152,7 @@ def allocate(bytes, stream): mr = rmm.mr.CallbackMemoryResource(allocate, upstream.deallocate) q = pl.LazyFrame({"a": [1, 2, 3]}) - result = q.collect(engine=pl.GPUEngine(memory_resource=mr)) + result = q.collect(engine=pl.GPUEngine(executor="in-memory", memory_resource=mr)) assert_frame_equal(q.collect(), result) assert n_allocations > 0 @@ -254,7 +254,7 @@ def test_validate_cluster() -> None: ) ) assert config.executor.name == "streaming" - assert config.executor.cluster == "single" + assert config.executor.cluster == "default_singleton" with pytest.raises(ValueError, match="'foo' is not a valid Cluster"): ConfigOptions.from_polars_engine( @@ -332,7 +332,7 @@ def test_parquet_options_from_env(monkeypatch: pytest.MonkeyPatch) -> None: def test_config_option_from_env(monkeypatch: pytest.MonkeyPatch) -> None: with monkeypatch.context() as m: - m.setenv("CUDF_POLARS__EXECUTOR__CLUSTER", "single") + m.setenv("CUDF_POLARS__EXECUTOR__CLUSTER", "default_singleton") m.setenv("CUDF_POLARS__EXECUTOR__FALLBACK_MODE", "silent") m.setenv("CUDF_POLARS__EXECUTOR__MAX_ROWS_PER_PARTITION", "42") m.setenv("CUDF_POLARS__EXECUTOR__TARGET_PARTITION_SIZE", "100") @@ -342,7 +342,7 @@ def test_config_option_from_env(monkeypatch: pytest.MonkeyPatch) -> None: engine = pl.GPUEngine() config = ConfigOptions.from_polars_engine(engine) assert config.executor.name == "streaming" - assert config.executor.cluster == "single" + assert config.executor.cluster == "default_singleton" assert config.executor.fallback_mode == "silent" assert config.executor.max_rows_per_partition == 42 assert config.executor.target_partition_size == 100 diff --git a/python/cudf_polars/tests/test_executors.py b/python/cudf_polars/tests/test_executors.py index 9ccbd913869..562c3510d58 100644 --- a/python/cudf_polars/tests/test_executors.py +++ b/python/cudf_polars/tests/test_executors.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES. +# SPDX-FileCopyrightText: Copyright (c) 2024-2026, NVIDIA CORPORATION & AFFILIATES. # SPDX-License-Identifier: Apache-2.0 from __future__ import annotations @@ -10,11 +10,7 @@ from cudf_polars.testing.asserts import assert_gpu_result_equal -@pytest.mark.parametrize("executor", [None, "in-memory", "streaming"]) -def test_executor_basics(executor): - if executor == "streaming": - pytest.importorskip("dask") - +def test_executor_basics(streaming_engine_factory): df = pl.LazyFrame( { "a": pl.Series([[1, 2], [3]], dtype=pl.List(pl.Int8())), @@ -30,7 +26,7 @@ def test_executor_basics(executor): } ) - assert_gpu_result_equal(df, executor=executor) + assert_gpu_result_equal(df, engine=streaming_engine_factory()) def test_cudf_cache_evaluate(): @@ -45,7 +41,7 @@ def test_cudf_cache_evaluate(): assert_gpu_result_equal(query, executor="in-memory") -def test_dask_experimental_map_function_get_hashable(): +def test_dask_experimental_map_function_get_hashable(streaming_engine_factory): df = pl.LazyFrame( { "a": pl.Series([11, 12, 13], dtype=pl.UInt16), @@ -55,7 +51,7 @@ def test_dask_experimental_map_function_get_hashable(): } ) q = df.unpivot(index="d") - assert_gpu_result_equal(q, executor="streaming") + assert_gpu_result_equal(q, engine=streaming_engine_factory()) def test_unknown_executor(): diff --git a/python/cudf_polars/tests/test_parquet_filters.py b/python/cudf_polars/tests/test_parquet_filters.py index c789a953d1e..2009800ced5 100644 --- a/python/cudf_polars/tests/test_parquet_filters.py +++ b/python/cudf_polars/tests/test_parquet_filters.py @@ -56,7 +56,12 @@ def pq_file(tmp_path_factory, df): def test_scan_by_hand(expr, selection, pq_file, chunked): q = pq_file.filter(expr).select(*selection) assert_gpu_result_equal( - q, engine=pl.GPUEngine(raise_on_fail=True, parquet_options={"chunked": chunked}) + q, + engine=pl.GPUEngine( + executor="in-memory", + raise_on_fail=True, + parquet_options={"chunked": chunked}, + ), ) diff --git a/python/cudf_polars/tests/test_profile.py b/python/cudf_polars/tests/test_profile.py index cf48dc933d7..d2b17e07378 100644 --- a/python/cudf_polars/tests/test_profile.py +++ b/python/cudf_polars/tests/test_profile.py @@ -26,12 +26,11 @@ def test_profile_basic() -> None: assert_frame_equal(result, q.collect(engine="in-memory"), check_row_order=False) -def test_profile_streaming_raises() -> None: +def test_profile_streaming_raises(spmd_engine) -> None: df = pl.LazyFrame({"a": [1, 2, 3, 4]}) q = df.sort("a").group_by("a").len() - engine = pl.GPUEngine(executor="streaming", executor_options={"cluster": "single"}) with pytest.raises( NotImplementedError, match=r"profile\(\) is not supported with the streaming executor.", ): - q.profile(engine=engine) + q.profile(engine=spmd_engine) diff --git a/python/cudf_polars/tests/test_scan.py b/python/cudf_polars/tests/test_scan.py index 1875de2f762..ac0e876d28e 100644 --- a/python/cudf_polars/tests/test_scan.py +++ b/python/cudf_polars/tests/test_scan.py @@ -30,7 +30,9 @@ from werkzeug import Request -NO_CHUNK_ENGINE = pl.GPUEngine(raise_on_fail=True, parquet_options={"chunked": False}) +NO_CHUNK_ENGINE = pl.GPUEngine( + executor="in-memory", raise_on_fail=True, parquet_options={"chunked": False} +) @pytest.fixture( @@ -136,7 +138,11 @@ def test_scan( row_index_offset=offset, n_rows=n_rows, ) - engine = pl.GPUEngine(raise_on_fail=True, parquet_options={"chunked": is_chunked}) + engine = pl.GPUEngine( + executor="in-memory", + raise_on_fail=True, + parquet_options={"chunked": is_chunked}, + ) if zlice is not None: q = q.slice(*zlice) @@ -372,10 +378,8 @@ def test_scan_include_file_path(request, tmp_path, format, scan_fn, df, n_rows): if format == "ndjson": assert_ir_translation_raises(q, NotImplementedError) - elif format == "parquet": - assert_gpu_result_equal(q, engine=NO_CHUNK_ENGINE) else: - assert_gpu_result_equal(q) + assert_gpu_result_equal(q, engine=NO_CHUNK_ENGINE) @pytest.fixture( @@ -428,6 +432,7 @@ def test_scan_parquet_chunked( assert_gpu_result_equal( q, engine=pl.GPUEngine( + executor="in-memory", raise_on_fail=True, parquet_options={ "chunked": True, @@ -568,7 +573,12 @@ def get_handler(req: Request) -> Response: q = pl.scan_parquet(httpserver.url_for(server_path)) assert_gpu_result_equal( - q, engine=pl.GPUEngine(raise_on_fail=True, parquet_options={"chunked": chunked}) + q, + engine=pl.GPUEngine( + executor="in-memory", + raise_on_fail=True, + parquet_options={"chunked": chunked}, + ), ) @@ -697,15 +707,14 @@ def test_scan_tiny_file_not_compressed(engine: pl.GPUEngine, tmp_path): def test_scan_parquet_zero_width_with_limit( engine: pl.GPUEngine, tmp_path, custom_engine, request ): + active_engine = custom_engine if custom_engine is not None else engine request.applymarker( pytest.mark.xfail( - is_streaming_engine(engine) or custom_engine is not None, + is_streaming_engine(active_engine), reason="https://github.com/rapidsai/cudf/issues/21644", ) ) path = tmp_path / "zero_width.parquet" pl.LazyFrame(height=20).sink_parquet(path) q = pl.scan_parquet(path).head(5) - assert_gpu_result_equal( - q, engine=custom_engine if custom_engine is not None else engine - ) + assert_gpu_result_equal(q, engine=active_engine) diff --git a/python/cudf_polars/tests/test_select.py b/python/cudf_polars/tests/test_select.py index eeacec4d726..286bd1dd6a5 100644 --- a/python/cudf_polars/tests/test_select.py +++ b/python/cudf_polars/tests/test_select.py @@ -47,7 +47,7 @@ def test_select_decimal_precision_none_result_max_precision(): ) query = ldf.select(pl.col("a")) cpu_result = query.collect() - gpu_result = query.collect(engine="gpu") + gpu_result = query.collect(engine=pl.GPUEngine(executor="in-memory")) # See github.com/pola-rs/polars/issues/19784 # for context on the decimal changes. assert cpu_result.schema["a"].precision == 38 diff --git a/python/cudf_polars/tests/test_sink.py b/python/cudf_polars/tests/test_sink.py index d23559d2134..b2ffbc8fa31 100644 --- a/python/cudf_polars/tests/test_sink.py +++ b/python/cudf_polars/tests/test_sink.py @@ -103,6 +103,7 @@ def test_sink_parquet( "row_group_size": row_group_size, }, engine=pl.GPUEngine( + executor="in-memory", raise_on_fail=True, parquet_options={"chunked": is_chunked, "n_output_chunks": n_output_chunks}, ), @@ -125,12 +126,14 @@ def test_sink_parquet_compression_type(df, tmp_path, compression, compression_le "compression": compression, "compression_level": compression_level, }, + executor="in-memory", ) elif compression in {"snappy", "lz4", "uncompressed"}: assert_sink_result_equal( df, tmp_path / "compression.parquet", write_kwargs={"compression": compression}, + executor="in-memory", ) else: assert_sink_ir_translation_raises( diff --git a/python/cudf_polars/tests/test_tracing.py b/python/cudf_polars/tests/test_tracing.py index 283ca361682..97e5b4640f7 100644 --- a/python/cudf_polars/tests/test_tracing.py +++ b/python/cudf_polars/tests/test_tracing.py @@ -94,7 +94,7 @@ def test_log_query_plan() -> None: raise_on_fail=True, executor="streaming", executor_options={ - "cluster": "single", + "cluster": "default_singleton", "max_rows_per_partition": 5, }, memory_resource=rmm.mr.ManagedMemoryResource(),