diff --git a/python/cudf_polars/cudf_polars/testing/asserts.py b/python/cudf_polars/cudf_polars/testing/asserts.py index ed81e87627e..973272f0112 100644 --- a/python/cudf_polars/cudf_polars/testing/asserts.py +++ b/python/cudf_polars/cudf_polars/testing/asserts.py @@ -26,15 +26,11 @@ "assert_sink_result_equal", ] -# Will be overriden by `conftest.py` with the value from the `--executor` -# command-line argument. -DEFAULT_EXECUTOR = "in-memory" - def assert_gpu_result_equal( lazydf: pl.LazyFrame, *, - engine: GPUEngine | None = None, + engine: GPUEngine, collect_kwargs: CollectKwargs | None = None, polars_collect_kwargs: CollectKwargs | None = None, cudf_collect_kwargs: CollectKwargs | None = None, @@ -45,7 +41,6 @@ def assert_gpu_result_equal( rtol: float = 1e-05, atol: float = 1e-08, categorical_as_str: bool = False, - executor: str | None = None, ) -> None: """ Assert that collection of a lazyframe on GPU produces correct results. @@ -83,9 +78,6 @@ def assert_gpu_result_equal( Absolute tolerance for float comparisons categorical_as_str Decat categoricals to strings before comparing - executor - The executor configuration to pass to `GPUEngine`. If not specified - uses the module level `Executor` attribute. Raises ------ @@ -94,7 +86,6 @@ def assert_gpu_result_equal( NotImplementedError If GPU collection failed in some way. """ - engine = engine or get_default_engine(executor) final_polars_collect_kwargs, final_cudf_collect_kwargs = _process_kwargs( collect_kwargs, polars_collect_kwargs, cudf_collect_kwargs ) @@ -167,35 +158,6 @@ def assert_ir_translation_raises(q: pl.LazyFrame, *exceptions: type[Exception]) raise AssertionError(f"Translation DID NOT RAISE {exceptions}") -def get_default_engine( - executor: str | None = None, -) -> GPUEngine: - """ - Get the default engine used for testing. - - Parameters - ---------- - executor - The executor configuration to pass to `GPUEngine`. If not specified - uses the module level `Executor` attribute. - - Returns - ------- - engine - A polars GPUEngine configured with the default settings for tests. - - See Also - -------- - assert_gpu_result_equal - assert_sink_result_equal - """ - executor = executor or DEFAULT_EXECUTOR - return GPUEngine( - raise_on_fail=True, - executor=executor, - ) - - def _process_kwargs( collect_kwargs: CollectKwargs | None, polars_collect_kwargs: CollectKwargs | None, @@ -311,10 +273,9 @@ def assert_sink_result_equal( lazydf: pl.LazyFrame, path: str | Path, *, - engine: str | GPUEngine | None = None, + engine: GPUEngine, read_kwargs: dict | None = None, write_kwargs: dict | None = None, - executor: str | None = None, ) -> None: """ Assert that writing a LazyFrame via sink produces the same output. @@ -332,9 +293,6 @@ def assert_sink_result_equal( Optional keyword arguments to pass to the corresponding `pl.read_*` function. write_kwargs Optional keyword arguments to pass to the corresponding `sink_*` function. - executor - The executor configuration to pass to `GPUEngine`. If not specified - uses the module level `Executor` attribute. Raises ------ @@ -343,7 +301,6 @@ def assert_sink_result_equal( ValueError If the file extension is not one of the supported formats. """ - engine = engine or get_default_engine(executor) path = Path(path) read_kwargs = read_kwargs or {} write_kwargs = write_kwargs or {} diff --git a/python/cudf_polars/cudf_polars/testing/engine_utils.py b/python/cudf_polars/cudf_polars/testing/engine_utils.py index b0b640615f7..b80afc8ea86 100644 --- a/python/cudf_polars/cudf_polars/testing/engine_utils.py +++ b/python/cudf_polars/cudf_polars/testing/engine_utils.py @@ -7,10 +7,9 @@ import importlib.util from dataclasses import dataclass -from typing import TYPE_CHECKING, Any, Literal +from typing import TYPE_CHECKING, Any, Literal, TypeVar if TYPE_CHECKING: - from collections.abc import Mapping from contextlib import AbstractContextManager import polars as pl @@ -103,7 +102,6 @@ def warns_on_spmd( # pragma: no cover; rapidsmpf-only path def create_streaming_options( blocksize_mode: Literal["medium", "small"], - overrides: StreamingOptions | None = None, ) -> StreamingOptions: """ Create :class:`StreamingOptions` for a block size mode. @@ -114,13 +112,10 @@ def create_streaming_options( Block size configuration. ``"medium"`` uses moderate partition sizes, while ``"small"`` uses very small partitions and sets ``fallback_mode=SILENT`` to avoid excessive warnings from CPU fallback. - overrides - Optional options to merge on top of the selected baseline. Fields in - ``overrides`` take precedence over the baseline. Returns ------- - The merged streaming options. + The streaming options for the given block size. """ from cudf_polars.experimental.rapidsmpf.frontend.options import StreamingOptions from cudf_polars.utils.config import StreamingFallbackMode @@ -130,7 +125,7 @@ def create_streaming_options( # the UUID-collision guard on every ``_reset(...)``. match blocksize_mode: case "medium": - baseline = StreamingOptions( + return StreamingOptions( max_rows_per_partition=50, dynamic_planning={}, target_partition_size=1_000_000, @@ -138,7 +133,7 @@ def create_streaming_options( allow_gpu_sharing=True, ) case "small": - baseline = StreamingOptions( + return StreamingOptions( max_rows_per_partition=4, dynamic_planning={}, target_partition_size=10, @@ -148,54 +143,52 @@ def create_streaming_options( ) case _: # pragma: no cover raise ValueError(f"Unknown blocksize_mode: {blocksize_mode!r}") - if overrides is None: - return baseline - return StreamingOptions(**{**baseline.to_dict(), **overrides.to_dict()}) -def build_streaming_engine( - param: EngineFixtureParam, - engines: Mapping[str, StreamingEngine], - options: StreamingOptions | None = None, -) -> StreamingEngine: +def merge_streaming_options( + base: StreamingOptions, overrides: StreamingOptions +) -> StreamingOptions: """ - Return ``engines``'s entry for ``param``, ``_reset``-ed. + Merge override options into the base streaming options. - ``engines`` must already contain a slot for ``param.engine_name`` — - seeded by the ``streaming_engines`` session-scoped fixture. The - fixture owns mutation; this function only reads and ``_reset``-s. + Parameters + ---------- + base + The base streaming options. + overrides + Any additional streaming options. + + Returns + ------- + The merged streaming options with overrides overriding any base options. + """ + from cudf_polars.experimental.rapidsmpf.frontend.options import StreamingOptions + + return StreamingOptions(**{**base.to_dict(), **overrides.to_dict()}) + + +EngineT = TypeVar("EngineT", bound="StreamingEngine") + + +def configure_streaming_engine(engine: EngineT, options: StreamingOptions) -> EngineT: + """ + Configure an engine with a set of options. Parameters ---------- - param - Decoded engine fixture parameter describing the backend and block size mode. - engines - Streaming-engine collection keyed by backend name. Provided by - the ``streaming_engines`` test fixture. + engine + Streaming engine to configure. The caller owns its lifecycle. options - Optional streaming options to merge on top of the baseline selected by - ``param.blocksize_mode``. + Configuration options to apply to the engine. Returns ------- - The shared :class:`StreamingEngine`, ``_reset`` to the requested options. - - Raises - ------ - RuntimeError - If ``engines`` has no slot for ``param.engine_name``. + ``engine``, reset to the requested options. """ - streaming_options = create_streaming_options(param.blocksize_mode, options) - engine = engines.get(param.engine_name) - if engine is None: # pragma: no cover - raise RuntimeError( - f"No streaming engine for {param.engine_name!r}. The corresponding " - "session-scoped fixture must populate the collection before tests run." - ) engine._reset( - rapidsmpf_options=streaming_options.to_rapidsmpf_options(), - executor_options=streaming_options.to_executor_options(), - engine_options=streaming_options.to_engine_options(), + rapidsmpf_options=options.to_rapidsmpf_options(), + executor_options=options.to_executor_options(), + engine_options=options.to_engine_options(), ) return engine diff --git a/python/cudf_polars/tests/conftest.py b/python/cudf_polars/tests/conftest.py index 65c3dce3e49..0e4938b775f 100644 --- a/python/cudf_polars/tests/conftest.py +++ b/python/cudf_polars/tests/conftest.py @@ -8,28 +8,22 @@ import polars as pl -import cudf_polars.callback from cudf_polars.testing.engine_utils import ( ALL_ENGINE_FIXTURE_PARAMS, STREAMING_ENGINE_FIXTURE_PARAMS, EngineFixtureParam, - build_streaming_engine, + configure_streaming_engine, + create_streaming_options, + merge_streaming_options, ) if TYPE_CHECKING: - from collections.abc import Callable, Generator, Mapping - from typing import TypeAlias + from collections.abc import Callable, Generator from cudf_polars.experimental.rapidsmpf.frontend.core import StreamingEngine from cudf_polars.experimental.rapidsmpf.frontend.options import StreamingOptions from cudf_polars.experimental.rapidsmpf.frontend.spmd import SPMDEngine - # Read-only view over the per-backend streaming engines owned by the - # ``streaming_engines`` session fixture. Only that fixture mutates the - # underlying dict; consumers (``spmd_engine``, ``streaming_engine_factory``, - # ``engine``) only look up by backend name. - StreamingEngines: TypeAlias = Mapping[str, StreamingEngine] - # Number of ranks for multi-rank streaming engines that share one GPU # (currently ``RayEngine``). Single-GPU dev hosts and CI runners require @@ -38,24 +32,10 @@ @pytest.fixture(params=[False, True], ids=["no_nulls", "nulls"], scope="session") -def with_nulls(request): +def with_nulls(request: pytest.FixtureRequest): return request.param -@pytest.fixture -def clear_memory_resource_cache(): - """ - Clear the cudf_polars.callback.default_memory_resource cache before and after a test. - - This function caches memory resources for the duration of the process. Any test that - creates a pool (e.g. ``CudaAsyncMemoryResource``) should use this fixture to ensure that - the pool is freed after the test. - """ - cudf_polars.callback.default_memory_resource.cache_clear() - yield - cudf_polars.callback.default_memory_resource.cache_clear() - - @pytest.fixture(autouse=True) def _skip_unless_spmd(request: pytest.FixtureRequest) -> None: """Skip tests in SPMD multi-rank mode unless marked with ``pytest.mark.spmd``.""" @@ -70,128 +50,143 @@ def _skip_unless_spmd(request: pytest.FixtureRequest) -> None: @pytest.fixture(scope="session") -def streaming_engines() -> Generator[StreamingEngines, None, None]: - """Return a session-scoped mapping of engine name to engine instance. - - The returned :class:`StreamingEngines` is a dict that maps each engine - name to a single shared engine instance, which is reused across the entire - test session. +def _engine_param(request: pytest.FixtureRequest) -> EngineFixtureParam: + """Decoded engine variant selected by pytest parametrization. + + :func:`pytest_generate_tests` inspects all tests and then filters those + with ``_engine_param`` in their fixture list according to the public + engine fixture being used. For example, if a given test requests the + :func:`spmd_engine` fixture then its underlying ``_engine_param`` is + rebound to only loop over spmd engines for that test. """ - from rapidsmpf import bootstrap - from rapidsmpf.communicator.single import new_communicator as single_communicator - from rapidsmpf.config import Options, get_environment_variables - from rapidsmpf.progress_thread import ProgressThread - - from cudf_polars.experimental.rapidsmpf.frontend.spmd import SPMDEngine - - if bootstrap.is_running_with_rrun(): - comm = bootstrap.create_ucxx_comm( - progress_thread=ProgressThread(), - type=bootstrap.BackendType.AUTO, - ) - else: - comm = single_communicator( - Options(get_environment_variables()), ProgressThread() - ) - - engines: dict[str, StreamingEngine] = {"spmd": SPMDEngine(comm=comm)} + return EngineFixtureParam(full_name=request.param) - if "dask" in STREAMING_ENGINE_FIXTURE_PARAMS: # pragma: no cover - from cudf_polars.experimental.rapidsmpf.frontend.dask import DaskEngine - engines["dask"] = DaskEngine(engine_options={"allow_gpu_sharing": True}) +@pytest.fixture(scope="session") +def _unconfigured_engine( + _engine_param: EngineFixtureParam, +) -> Generator[tuple[pl.GPUEngine, StreamingOptions | None], None, None]: + """ + Fixture generating an engine resource and options to apply before use. - if "ray" in STREAMING_ENGINE_FIXTURE_PARAMS: # pragma: no cover - from cudf_polars.experimental.rapidsmpf.frontend.ray import RayEngine + Parameters + ---------- + _engine_param + The parameterisation of the engine - # Always pin ``num_ranks`` so the cached engine has a deterministic - # actor count regardless of how many GPUs the host happens to have; - # otherwise ``RayEngine`` defaults to ``get_num_gpus_in_ray_cluster()`` - # and tests that depend on rank-count behavior (e.g. fast-count - # parquet, concat) become non-portable. Pinning ``num_ranks`` requires - # ``allow_gpu_sharing=True`` (production guard). - engines["ray"] = RayEngine( - num_ranks=NUM_RANKS, - engine_options={"allow_gpu_sharing": True}, - ray_init_options={"include_dashboard": False}, - ) + Returns + ------- + tuple + Of an engine and options to apply to the engine to configure it (or + None if no configuration is needed). - try: - yield engines - finally: - while engines: - _, engine = engines.popitem() - engine.shutdown() + Notes + ----- + This session-scoped fixture keeps the heavy state of an engine alive + for the lifetime of its use, shutting it down once the particular + engine is not required any more. Tests should not use this fixture + directly, but rather one of the parameterised "public" engine fixtures. + Those take care of applying the configuration to the base engine each + time it is used in a test. + """ + if _engine_param.engine_name == "in-memory": + yield pl.GPUEngine(executor="in-memory", raise_on_fail=True), None + else: + engine: StreamingEngine + match _engine_param.engine_name: + case "spmd": + from cudf_polars.experimental.rapidsmpf.frontend.spmd import SPMDEngine + + engine = SPMDEngine() + case "dask": # pragma: no cover + from cudf_polars.experimental.rapidsmpf.frontend.dask import DaskEngine + + engine = DaskEngine(engine_options={"allow_gpu_sharing": True}) + case "ray": # pragma: no cover + from cudf_polars.experimental.rapidsmpf.frontend.ray import RayEngine + + # Always specify num_ranks so the engine has a fixed size + # regardless of how many GPUs the host happens to have; + # otherwise ``RayEngine`` defaults to + # ``get_num_gpus_in_ray_cluster()`` + engine = RayEngine( + num_ranks=NUM_RANKS, + engine_options={"allow_gpu_sharing": True}, + ray_init_options={"include_dashboard": False}, + ) + case _: # pragma: no cover + raise ValueError( + f"Unknown streaming engine: {_engine_param.engine_name!r}" + ) + with engine: + yield engine, create_streaming_options(_engine_param.blocksize_mode) @pytest.fixture -def spmd_engine(streaming_engines: StreamingEngines) -> SPMDEngine: - """Return the shared :class:`SPMDEngine` reset to default options.""" - from cudf_polars.experimental.rapidsmpf.frontend.spmd import SPMDEngine - - engine = streaming_engines["spmd"] - assert isinstance(engine, SPMDEngine) - engine._reset() - return engine +def spmd_engine( + _unconfigured_engine: tuple[SPMDEngine, StreamingOptions], +) -> SPMDEngine: + """Return the shared configured :class:`SPMDEngine`.""" + engine, options = _unconfigured_engine + return configure_streaming_engine(engine, options) @pytest.fixture def spmd_engine_factory( - streaming_engines: StreamingEngines, -) -> Callable[..., SPMDEngine]: + _unconfigured_engine: tuple[SPMDEngine, StreamingOptions], +) -> Callable[[StreamingOptions], SPMDEngine]: """ - Return a factory that yields the shared :class:`SPMDEngine`. + Return a function that, when called, produces a :class:`SPMDEngine`. + Parameters + ---------- + _unconfigured_engine + Session-scoped engine selected by pytest parametrization. + + Returns + ------- + Factory function that returns the shared :class:`SPMDEngine` when + provided with :class:`StreamingOptions`. + + Notes + ----- Use this in place of :func:`streaming_engine_factory` for tests that must run on SPMD only. """ - from cudf_polars.experimental.rapidsmpf.frontend.spmd import SPMDEngine - - param = EngineFixtureParam(full_name="spmd") + engine, base = _unconfigured_engine - def factory(options: StreamingOptions | None = None) -> SPMDEngine: - engine = build_streaming_engine(param, streaming_engines, options) - assert isinstance(engine, SPMDEngine) - return engine + def factory(options: StreamingOptions) -> SPMDEngine: + return configure_streaming_engine( + engine, + merge_streaming_options(base, options), + ) return factory -@pytest.fixture(params=STREAMING_ENGINE_FIXTURE_PARAMS) -def _streaming_engine_param(request: pytest.FixtureRequest) -> EngineFixtureParam: - """Parametrization helper to run tests for each streaming engine variant.""" - return EngineFixtureParam(full_name=request.param) - - -@pytest.fixture(params=ALL_ENGINE_FIXTURE_PARAMS) -def _all_engine_param(request: pytest.FixtureRequest) -> EngineFixtureParam: - """Parametrization helper to run tests for each engine variant.""" - return EngineFixtureParam(full_name=request.param) - - @pytest.fixture def streaming_engine_factory( - _streaming_engine_param: EngineFixtureParam, - streaming_engines: StreamingEngines, -) -> Callable[..., StreamingEngine]: + _unconfigured_engine: tuple[StreamingEngine, StreamingOptions], +) -> Callable[[StreamingOptions], StreamingEngine]: """ - Return a factory that yields a shared :class:`StreamingEngine`. + Return a function that, when called, produces a :class:`StreamingEngine`. Parameters ---------- - _streaming_engine_param - Parametrized engine descriptor controlling backend and block size mode. - streaming_engines - Session-scoped engine collection to look up the shared engine in. + _unconfigured_engine + Session-scoped engine selected by pytest parametrization. Returns ------- - Factory function that returns the shared :class:`StreamingEngine`. + Factory function that returns the shared :class:`StreamingEngine` when + provided with :class:`StreamingOptions`. """ + engine, base = _unconfigured_engine - def factory(options: StreamingOptions | None = None) -> StreamingEngine: - return build_streaming_engine( - _streaming_engine_param, streaming_engines, options + def factory(options: StreamingOptions) -> StreamingEngine: + return configure_streaming_engine( + engine, + merge_streaming_options(base, options), ) return factory @@ -199,43 +194,39 @@ def factory(options: StreamingOptions | None = None) -> StreamingEngine: @pytest.fixture def streaming_engine( - streaming_engine_factory: Callable[..., StreamingEngine], + _unconfigured_engine: tuple[StreamingEngine, StreamingOptions], ) -> StreamingEngine: """ - Return a default-configured :class:`StreamingEngine`. + Return the shared configured :class:`StreamingEngine`. - Inherits the parametrization of :func:`streaming_engine_factory`, so + Inherits the parametrization of ``_unconfigured_engine``, so tests using this fixture run once per ``(backend, blocksize_mode)`` combination. Parameters ---------- - streaming_engine_factory - Factory fixture used to construct streaming engines. + _unconfigured_engine + Session-scoped engine selected by pytest parametrization. Returns ------- - A streaming engine created with the parametrized baseline and no - per-test overrides. + A streaming engine configured with the parametrized baseline. """ - return streaming_engine_factory() + engine, options = _unconfigured_engine + return configure_streaming_engine(engine, options) @pytest.fixture def engine( - request: pytest.FixtureRequest, - _all_engine_param: EngineFixtureParam, + _unconfigured_engine: tuple[pl.GPUEngine, StreamingOptions | None], ) -> pl.GPUEngine: """ Return a :class:`polars.GPUEngine` for each engine variant under test. Parameters ---------- - request - Pytest fixture request object used to access dependent fixtures. - _all_engine_param - Parametrized engine descriptor covering both in-memory and streaming - variants. + _unconfigured_engine + Session-scoped engine selected by pytest parametrization. Returns ------- @@ -246,11 +237,13 @@ def engine( For tests that require a :class:`StreamingEngine` only, use the :func:`streaming_engine` fixture instead. """ - if _all_engine_param.engine_name == "in-memory": - return pl.GPUEngine(executor="in-memory", raise_on_fail=True) + engine, options = _unconfigured_engine + if options is None: + return engine + from cudf_polars.experimental.rapidsmpf.frontend.core import StreamingEngine - engines: StreamingEngines = request.getfixturevalue("streaming_engines") - return build_streaming_engine(_all_engine_param, engines) + assert isinstance(engine, StreamingEngine) + return configure_streaming_engine(engine, options) @pytest.fixture @@ -268,23 +261,11 @@ def engine_raise_on_fail() -> pl.GPUEngine: from ``.collect()``. Uses the in-memory executor so errors are not wrapped by a streaming task group. """ - # TODO: We should be testing will all supported engine variants + # TODO: We should be testing with all supported engine variants return pl.GPUEngine(executor="in-memory", raise_on_fail=True) -def pytest_addoption(parser): - parser.addoption( - "--executor", - action="store", - default="streaming", - choices=("in-memory", "streaming"), - help="Executor to use for GPUEngine.", - ) - - -def pytest_configure(config): - import cudf_polars.testing.asserts - +def pytest_configure(config: pytest.Config): config.addinivalue_line( "markers", "skip_on_streaming_engine(reason, *, engine=None): skip the test for " @@ -294,19 +275,43 @@ def pytest_configure(config): # Ray's internal subprocess management leaks `/dev/null` file handles, and # distributed's shutdown leaves unclosed sockets. Under Python 3.14 + - # pytest 9, these surface as unraisable `ResourceWarning`s and — combined - # with `filterwarnings = ["error", ...]` in pyproject.toml — fail + # pytest 9, these surface as unraisable `ResourceWarning`s and, combined + # with `filterwarnings = ["error", ...]` in pyproject.toml, fail # otherwise-unrelated tests when the GC finalizer happens to fire during # them. With `pytest-xdist --dist=worksteal`, the leak can land in any # test that shares a worker with a ray/dask test, so the suppression must # apply globally rather than per-module. config.addinivalue_line("filterwarnings", "ignore::ResourceWarning") - cudf_polars.testing.asserts.DEFAULT_EXECUTOR = config.getoption("--executor") +def pytest_generate_tests(metafunc: pytest.Metafunc): + """Parametrize the shared engine fixture without cartesian products.""" + fixtures = set(metafunc.fixturenames) + if "_engine_param" not in fixtures: + return -def pytest_collection_modifyitems(items): - """Apply ``skip_on_streaming_engine`` markers to streaming ``engine`` items.""" + if "spmd_engine" in fixtures or "spmd_engine_factory" in fixtures: + engines = ["spmd"] + elif "streaming_engine" in fixtures or "streaming_engine_factory" in fixtures: + engines = STREAMING_ENGINE_FIXTURE_PARAMS + elif "engine" in fixtures: + engines = ALL_ENGINE_FIXTURE_PARAMS + else: + raise AssertionError("Unknown engine fixture") + + metafunc.parametrize( + "_engine_param", + engines, + indirect=True, + ids=engines, + scope="session", + ) + + +def pytest_collection_modifyitems( + session: pytest.Session, config: pytest.Config, items: list[pytest.Item] +): + """Apply ``skip_on_streaming_engine`` markers to streaming engine items.""" for item in items: marker = item.get_closest_marker("skip_on_streaming_engine") if marker is None: @@ -314,12 +319,7 @@ def pytest_collection_modifyitems(items): callspec = getattr(item, "callspec", None) if callspec is None: continue - # Tests bind to either ``engine`` (parametrized via ``_all_engine_param``) - # or ``streaming_engine`` / ``streaming_engine_factory`` (parametrized via - # ``_streaming_engine_param``). Check both. - engine_param = callspec.params.get("_all_engine_param") or callspec.params.get( - "_streaming_engine_param" - ) + engine_param = callspec.params.get("_engine_param") if engine_param is None or engine_param == "in-memory": continue engine_filter = marker.kwargs.get("engine") diff --git a/python/cudf_polars/tests/experimental/test_default_singleton_engine.py b/python/cudf_polars/tests/experimental/test_default_singleton_engine.py index 93c63d2fb4d..d481484d532 100644 --- a/python/cudf_polars/tests/experimental/test_default_singleton_engine.py +++ b/python/cudf_polars/tests/experimental/test_default_singleton_engine.py @@ -4,11 +4,10 @@ Tests for :class:`DefaultSingletonEngine`. Every test body runs inside a worker spawned by the module-scoped -``proc_pool`` fixture. This isolates us from the session-scoped -``streaming_engines`` fixture in :file:`conftest.py`, which creates an -``SPMDEngine`` that lives for the entire pytest session and would -otherwise trip the "no other engine alive when default is created" -guardrail in :class:`DefaultSingletonEngine`. +``proc_pool`` fixture. This isolates us from any session-scoped shared +streaming engine that may be live in the parent pytest process and would +otherwise trip the "no other engine alive when default is created" guardrail +in :class:`DefaultSingletonEngine`. """ from __future__ import annotations diff --git a/python/cudf_polars/tests/experimental/test_metadata.py b/python/cudf_polars/tests/experimental/test_metadata.py index eb59e4b4641..2a2391a3136 100644 --- a/python/cudf_polars/tests/experimental/test_metadata.py +++ b/python/cudf_polars/tests/experimental/test_metadata.py @@ -582,22 +582,24 @@ def test_from_keys_order_scheme_single_rank(spmd_engine): assert result_rev_int.inter_rank_scheme is None -def test_remap_partitioning_order_scheme_select(spmd_engine, engine): +def test_remap_partitioning_order_scheme_select(spmd_engine): part = Partitioning( inter_rank=_make_order_scheme(spmd_engine.context, key_indices=(0,)), local="inherit", ) + engine = pl.GPUEngine(executor="in-memory", raise_on_fail=True) result = maybe_remap_partitioning(_make_select_ir(engine, ("b", "a")), part) assert result is not None assert isinstance(result.inter_rank, OrderScheme) assert result.inter_rank.keys[0].column_index == 1 -def test_remap_partitioning_order_scheme_drops_key(spmd_engine, engine): +def test_remap_partitioning_order_scheme_drops_key(spmd_engine): part = Partitioning( inter_rank=_make_order_scheme(spmd_engine.context, key_indices=(0,)), local="inherit", ) + engine = pl.GPUEngine(executor="in-memory", raise_on_fail=True) result = maybe_remap_partitioning(_make_select_ir(engine, ("b",)), part) assert result is not None assert result.inter_rank is None diff --git a/python/cudf_polars/tests/experimental/test_sink.py b/python/cudf_polars/tests/experimental/test_sink.py index e53bf3f4029..2ed3ddc71cd 100644 --- a/python/cudf_polars/tests/experimental/test_sink.py +++ b/python/cudf_polars/tests/experimental/test_sink.py @@ -91,15 +91,14 @@ def test_sink_parquet_directory( assert len(list(check_path.iterdir())) == expected_file_count -def test_sink_parquet_raises(streaming_engines): +def test_sink_parquet_raises(df: pl.LazyFrame, tmp_path, streaming_engine_factory): """No streaming-engine cluster supports ``sink_to_directory=False``.""" - from cudf_polars.utils.config import Cluster, StreamingExecutor - - for name in streaming_engines: - with pytest.raises( - ValueError, match=f"The {name} cluster requires sink_to_directory=True" - ): - StreamingExecutor(cluster=Cluster(name), sink_to_directory=False) + engine = streaming_engine_factory(StreamingOptions(sink_to_directory=False)) + with pytest.raises( + pl.exceptions.ComputeError, + match=r"ValueError: The [^ ]+ cluster requires sink_to_directory=True", + ): + df.sink_parquet(tmp_path / "test_sink_gpu.parquet", engine=engine) @pytest.mark.parametrize("include_header", [True, False]) diff --git a/python/cudf_polars/tests/test_config.py b/python/cudf_polars/tests/test_config.py index 0eed940ff59..09fc2710aa8 100644 --- a/python/cudf_polars/tests/test_config.py +++ b/python/cudf_polars/tests/test_config.py @@ -38,7 +38,7 @@ from cudf_polars.utils.cuda_stream import get_cuda_stream -def test_polars_verbose_warns(monkeypatch): +def test_polars_verbose_warns(engine, monkeypatch): def raise_unimplemented(self, *args): raise NotImplementedError("We don't support this") @@ -55,7 +55,7 @@ def raise_unimplemented(self, *args): ), ): # And ensure that collecting issues the correct warning. - assert_gpu_result_equal(q) + assert_gpu_result_equal(q, engine=engine) def test_unsupported_config_raises(): @@ -105,13 +105,30 @@ def test_invalid_memory_resource_raises(mr, monkeypatch): q.collect(engine=pl.GPUEngine(memory_resource=mr)) +@pytest.fixture +def clear_memory_resource_cache(): + """ + Clear the cudf_polars.callback.default_memory_resource cache before and after a test. + + This function caches memory resources for the duration of the process. Any test that + creates a pool (e.g. ``CudaAsyncMemoryResource``) should use this fixture to ensure that + the pool is freed after the test. + """ + cudf_polars.callback.default_memory_resource.cache_clear() + try: + yield + finally: + cudf_polars.callback.default_memory_resource.cache_clear() + + @pytest.mark.skipif( not _is_concurrent_managed_access_supported(), reason="managed memory not supported", ) @pytest.mark.parametrize("enable_managed_memory", ["1", "0"]) -@pytest.mark.usefixtures("clear_memory_resource_cache") -def test_cudf_polars_enable_disable_managed_memory(monkeypatch, enable_managed_memory): +def test_cudf_polars_enable_disable_managed_memory( + monkeypatch, enable_managed_memory, clear_memory_resource_cache +): q = pl.LazyFrame({"a": [1, 2, 3]}) with monkeypatch.context() as monkeycontext: diff --git a/python/cudf_polars/tests/test_executors.py b/python/cudf_polars/tests/test_executors.py index 562c3510d58..be73ff24f56 100644 --- a/python/cudf_polars/tests/test_executors.py +++ b/python/cudf_polars/tests/test_executors.py @@ -10,7 +10,7 @@ from cudf_polars.testing.asserts import assert_gpu_result_equal -def test_executor_basics(streaming_engine_factory): +def test_executor_basics(streaming_engine): df = pl.LazyFrame( { "a": pl.Series([[1, 2], [3]], dtype=pl.List(pl.Int8())), @@ -26,10 +26,10 @@ def test_executor_basics(streaming_engine_factory): } ) - assert_gpu_result_equal(df, engine=streaming_engine_factory()) + assert_gpu_result_equal(df, engine=streaming_engine) -def test_cudf_cache_evaluate(): +def test_cudf_cache_evaluate(engine): ldf = pl.DataFrame( { "a": [1, 2, 3, 4, 5, 6, 7], @@ -38,10 +38,10 @@ def test_cudf_cache_evaluate(): ).lazy() ldf2 = ldf.select((pl.col("a") + pl.col("b")).alias("c"), pl.col("a")) query = pl.concat([ldf, ldf2], how="diagonal") - assert_gpu_result_equal(query, executor="in-memory") + assert_gpu_result_equal(query, engine=engine) -def test_dask_experimental_map_function_get_hashable(streaming_engine_factory): +def test_dask_experimental_map_function_get_hashable(streaming_engine): df = pl.LazyFrame( { "a": pl.Series([11, 12, 13], dtype=pl.UInt16), @@ -51,7 +51,7 @@ def test_dask_experimental_map_function_get_hashable(streaming_engine_factory): } ) q = df.unpivot(index="d") - assert_gpu_result_equal(q, engine=streaming_engine_factory()) + assert_gpu_result_equal(q, engine=streaming_engine) def test_unknown_executor(): @@ -61,7 +61,9 @@ def test_unknown_executor(): pl.exceptions.ComputeError, match="ValueError: Unknown executor 'unknown-executor'", ): - assert_gpu_result_equal(df, executor="unknown-executor") + assert_gpu_result_equal( + df, engine=pl.GPUEngine(executor="unknown-executor", raise_on_fail=True) + ) @pytest.mark.parametrize("executor", [None, "in-memory", "streaming"]) @@ -76,5 +78,6 @@ def test_unknown_executor_options(executor): engine=pl.GPUEngine( executor=executor, executor_options={"foo": None}, + raise_on_fail=True, ) ) diff --git a/python/cudf_polars/tests/test_sink.py b/python/cudf_polars/tests/test_sink.py index b2ffbc8fa31..e4231c8acd1 100644 --- a/python/cudf_polars/tests/test_sink.py +++ b/python/cudf_polars/tests/test_sink.py @@ -126,14 +126,14 @@ def test_sink_parquet_compression_type(df, tmp_path, compression, compression_le "compression": compression, "compression_level": compression_level, }, - executor="in-memory", + engine=pl.GPUEngine(executor="in-memory", raise_on_fail=True), ) elif compression in {"snappy", "lz4", "uncompressed"}: assert_sink_result_equal( df, tmp_path / "compression.parquet", write_kwargs={"compression": compression}, - executor="in-memory", + engine=pl.GPUEngine(executor="in-memory", raise_on_fail=True), ) else: assert_sink_ir_translation_raises( diff --git a/python/cudf_polars/tests/test_tracing.py b/python/cudf_polars/tests/test_tracing.py index 97e5b4640f7..22cbdfa31d4 100644 --- a/python/cudf_polars/tests/test_tracing.py +++ b/python/cudf_polars/tests/test_tracing.py @@ -13,8 +13,6 @@ import polars as pl -import cudf_polars.testing.asserts - structlog = pytest.importorskip("structlog") @@ -40,11 +38,14 @@ def test_trace_basic( import rmm q = pl.DataFrame({"a": [1, 2, 3]}).lazy().select(pl.col("a").sum()) - q.collect(engine=pl.GPUEngine(memory_resource=rmm.mr.ManagedMemoryResource())) + q.collect( + engine=pl.GPUEngine( + executor="streaming", memory_resource=rmm.mr.ManagedMemoryResource() + ) + ) """) env = { - "CUDF_POLARS__EXECUTOR": cudf_polars.testing.asserts.DEFAULT_EXECUTOR, "CUDF_POLARS_LOG_TRACES": "1", } diff --git a/python/cudf_polars/tests/testing/test_engine_utils.py b/python/cudf_polars/tests/testing/test_engine_utils.py index 346a11acf2e..ff8ceeb215a 100644 --- a/python/cudf_polars/tests/testing/test_engine_utils.py +++ b/python/cudf_polars/tests/testing/test_engine_utils.py @@ -6,6 +6,7 @@ from cudf_polars.testing.engine_utils import ( EngineFixtureParam, create_streaming_options, + merge_streaming_options, ) @@ -45,7 +46,7 @@ def test_create_streaming_options_overrides_merge(): from cudf_polars.experimental.rapidsmpf.frontend.options import StreamingOptions overrides = StreamingOptions(max_rows_per_partition=999) - merged = create_streaming_options("medium", overrides) + merged = merge_streaming_options(create_streaming_options("medium"), overrides) # Override wins. assert merged.max_rows_per_partition == 999 # Untouched baseline field is preserved.