diff --git a/python/cudf_polars/tests/conftest.py b/python/cudf_polars/tests/conftest.py index cc79fdb6b3c..47412f1b296 100644 --- a/python/cudf_polars/tests/conftest.py +++ b/python/cudf_polars/tests/conftest.py @@ -21,6 +21,7 @@ from collections.abc import Callable, Generator from cudf_polars.engine.core import StreamingEngine + from cudf_polars.engine.dask import DaskEngine from cudf_polars.engine.options import StreamingOptions from cudf_polars.engine.spmd import SPMDEngine @@ -164,6 +165,15 @@ def factory(options: StreamingOptions) -> SPMDEngine: return factory +@pytest.fixture +def dask_engine( + _unconfigured_engine: tuple[DaskEngine, StreamingOptions], +) -> DaskEngine: + """Return the shared configured :class:`DaskEngine`.""" + engine, options = _unconfigured_engine + return configure_streaming_engine(engine, options) + + @pytest.fixture def streaming_engine_factory( _unconfigured_engine: tuple[StreamingEngine, StreamingOptions], @@ -294,6 +304,8 @@ def pytest_generate_tests(metafunc: pytest.Metafunc): if "spmd_engine" in fixtures or "spmd_engine_factory" in fixtures: engines = ["spmd"] + elif "dask_engine" in fixtures: + engines = ["dask"] elif "streaming_engine" in fixtures or "streaming_engine_factory" in fixtures: engines = STREAMING_ENGINE_FIXTURE_PARAMS elif "engine" in fixtures: diff --git a/python/cudf_polars/tests/streaming/test_dask.py b/python/cudf_polars/tests/streaming/test_dask.py index 18b3edc248f..e4d450714f1 100644 --- a/python/cudf_polars/tests/streaming/test_dask.py +++ b/python/cudf_polars/tests/streaming/test_dask.py @@ -4,6 +4,7 @@ from __future__ import annotations +import logging import os from typing import TYPE_CHECKING @@ -24,17 +25,6 @@ from collections.abc import Iterator -@pytest.fixture(scope="module") -def engine() -> Iterator[DaskEngine]: - """Create one Dask/GPU cluster shared across the test module.""" - with DaskEngine( - # Small partition size so tests exercise the multi-partition code path - # deterministically, regardless of input size. - executor_options={"max_rows_per_partition": 10}, - ) as engine: - yield engine - - pytestmark = [ pytest.mark.skipif( is_running_with_rrun(), @@ -43,127 +33,92 @@ def engine() -> Iterator[DaskEngine]: ] +@pytest.fixture(scope="module") +def dask_client() -> Iterator[distributed.Client]: # type: ignore[name-defined] + # Use for DaskEngine constructor tests to avoid re-creating the cluster + # Otherwise, use session-scoped DaskEngine in conftest + with ( + distributed.LocalCluster( + n_workers=1, + threads_per_worker=1, + silence_logs=logging.WARNING, + ) as cluster, + distributed.Client(cluster) as client, + ): + yield client + + # --------------------------------------------------------------------------- -# GPU tests — share a single Dask cluster for the whole module +# GPU tests — reuse the session-scoped Dask cluster from conftest # --------------------------------------------------------------------------- -def test_from_options() -> None: +def test_from_options(dask_client: distributed.Client) -> None: # type: ignore[name-defined] """DaskEngine.from_options with default StreamingOptions creates a valid engine.""" opts = StreamingOptions(fallback_mode="silent") - with DaskEngine.from_options(opts) as engine: + with DaskEngine.from_options(opts, dask_client=dask_client) as engine: assert engine.nranks >= 1 -def test_yields_engine(engine: DaskEngine) -> None: +def test_yields_engine(dask_engine: DaskEngine) -> None: """DaskEngine is a GPUEngine with at least one rank.""" - assert isinstance(engine, pl.GPUEngine) - assert engine.nranks >= 1 + assert isinstance(dask_engine, pl.GPUEngine) + assert dask_engine.nranks >= 1 -def test_executor_options_forwarded(engine: DaskEngine) -> None: +def test_executor_options_forwarded(dask_engine: DaskEngine) -> None: """Reserved executor_options keys are injected into the engine config.""" - opts = engine.config["executor_options"] + opts = dask_engine.config["executor_options"] assert opts["cluster"] == "dask" assert isinstance(opts["dask_context"], DaskContext) -def test_gather_cluster_info(engine: DaskEngine) -> None: +def test_gather_cluster_info(dask_engine: DaskEngine) -> None: """gather_cluster_info returns one ClusterInfo per rank with expected fields.""" - infos = engine.gather_cluster_info() - assert len(infos) == engine.nranks + infos = dask_engine.gather_cluster_info() + assert len(infos) == dask_engine.nranks for info in infos: assert isinstance(info.pid, int) assert isinstance(info.hostname, str) # Each worker runs in its own process. - assert len({info.pid for info in infos}) == engine.nranks + assert len({info.pid for info in infos}) == dask_engine.nranks -def test_worker_host_memory_limit(engine: DaskEngine) -> None: +def test_worker_host_memory_limit(dask_engine: DaskEngine) -> None: """Memory limit is respected.""" - scheduler_info = engine._dask_ctx.client.scheduler_info(n_workers=-1) + scheduler_info = dask_engine._dask_ctx.client.scheduler_info(n_workers=-1) worker = next(iter(scheduler_info["workers"].values())) assert worker["memory_limit"] == distributed.system.MEMORY_LIMIT -def test_from_options_creates_engine() -> None: +def test_from_options_creates_engine(dask_client: distributed.Client) -> None: # type: ignore[name-defined] """DaskEngine.from_options produces a working engine and runs a query.""" opts = StreamingOptions(max_rows_per_partition=10, fallback_mode="silent") - with DaskEngine.from_options(opts) as eng: + with DaskEngine.from_options(opts, dask_client=dask_client) as eng: assert isinstance(eng, pl.GPUEngine) assert eng.nranks >= 1 lf = pl.LazyFrame({"a": [1, 2, 3]}) assert_gpu_result_equal(lf, engine=eng, check_row_order=False) -def test_scan(engine: DaskEngine) -> None: - """Input rows are partitioned across workers; total output equals input.""" - lf = pl.LazyFrame({"a": [1, 2, 3]}) - assert_gpu_result_equal(lf, engine=engine, check_row_order=False) - - -def test_filter(engine: DaskEngine) -> None: - """Filter is applied correctly across all workers.""" - lf = pl.LazyFrame({"a": [1, 2, 3, 4, 5]}) - assert_gpu_result_equal( - lf.filter(pl.col("a") > 3), engine=engine, check_row_order=False - ) - - -def test_group_by(engine: DaskEngine) -> None: - """Group-by produces the correct aggregation across all ranks.""" - # max_rows_per_partition=10 (set on the module fixture) gives each rank - # exactly 5 partitions, so the multi-partition path is always exercised. - n, n_keys = engine.nranks * 50, 5 - keys = [str(i % n_keys) for i in range(n)] - vals = list(range(n)) - lf = pl.LazyFrame({"key": keys, "val": vals}) - assert_gpu_result_equal( - lf.group_by("key").agg(pl.col("val").sum()), - engine=engine, - check_row_order=False, - ) - - -def test_join(engine: DaskEngine) -> None: - """Hash join between two tables produces the correct result across all ranks.""" - # max_rows_per_partition=10 (set on the module fixture) gives each rank - # exactly 5 partitions, so the multi-partition path is always exercised. - n = engine.nranks * 50 - lf_left = pl.LazyFrame({"key": list(range(n)), "val_left": list(range(n))}) - lf_right = pl.LazyFrame( - {"key": list(range(n)), "val_right": [x * 2 for x in range(n)]} - ) - assert_gpu_result_equal( - lf_left.join(lf_right, on="key"), - engine=engine, - check_row_order=False, - ) - - -def test_empty_dataframe(engine: DaskEngine) -> None: - """An empty LazyFrame produces an empty result with the correct schema.""" - lf = pl.LazyFrame( - {"a": pl.Series([], dtype=pl.Int32), "b": pl.Series([], dtype=pl.Float64)} - ) - assert_gpu_result_equal(lf, engine=engine) - - -def test_run(engine: DaskEngine) -> None: - result = engine._run(os.getpid) - assert len(set(result)) == engine.nranks +def test_run(dask_engine: DaskEngine) -> None: + result = dask_engine._run(os.getpid) + assert len(set(result)) == dask_engine.nranks @pytest.fixture(scope="module") -def reset_engine() -> Iterator[DaskEngine]: - """Module-scoped engine for reset tests — independent of ``engine``. +def reset_engine(dask_client: distributed.Client) -> Iterator[DaskEngine]: # type: ignore[name-defined] + """Module-scoped engine for reset tests — independent of ``dask_engine``. These tests exercise :meth:`DaskEngine._reset` (which mutates the engine in-place). A dedicated fixture keeps those mutations from - leaking into the other tests. + leaking into the conftest-shared ``dask_engine``. + + Note: Do not use this fixture if you call _reset with a new dask_client. """ with DaskEngine( executor_options={"max_rows_per_partition": 10}, + dask_client=dask_client, ) as e: yield e @@ -209,9 +164,12 @@ def test_reset_collects_after_options_change(reset_engine: DaskEngine) -> None: ) -def test_reset_after_shutdown_raises() -> None: +def test_reset_after_shutdown_raises(dask_client: distributed.Client) -> None: # type: ignore[name-defined] """``shutdown`` is idempotent; ``_reset`` after shutdown raises every time.""" - engine = DaskEngine(executor_options={"max_rows_per_partition": 10}) + engine = DaskEngine( + dask_client=dask_client, + executor_options={"max_rows_per_partition": 10}, + ) engine.shutdown() engine.shutdown() # idempotent with pytest.raises(RuntimeError, match="shut-down"):