Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions python/cudf_polars/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from collections.abc import Callable, Generator

from cudf_polars.engine.core import StreamingEngine
from cudf_polars.engine.dask import DaskEngine
from cudf_polars.engine.options import StreamingOptions
from cudf_polars.engine.spmd import SPMDEngine

Expand Down Expand Up @@ -164,6 +165,15 @@ def factory(options: StreamingOptions) -> SPMDEngine:
return factory


@pytest.fixture
def dask_engine(
_unconfigured_engine: tuple[DaskEngine, StreamingOptions],
) -> DaskEngine:
"""Return the shared configured :class:`DaskEngine`."""
engine, options = _unconfigured_engine
return configure_streaming_engine(engine, options)


@pytest.fixture
def streaming_engine_factory(
_unconfigured_engine: tuple[StreamingEngine, StreamingOptions],
Expand Down Expand Up @@ -294,6 +304,8 @@ def pytest_generate_tests(metafunc: pytest.Metafunc):

if "spmd_engine" in fixtures or "spmd_engine_factory" in fixtures:
engines = ["spmd"]
elif "dask_engine" in fixtures:
engines = ["dask"]
elif "streaming_engine" in fixtures or "streaming_engine_factory" in fixtures:
engines = STREAMING_ENGINE_FIXTURE_PARAMS
elif "engine" in fixtures:
Expand Down
134 changes: 46 additions & 88 deletions python/cudf_polars/tests/streaming/test_dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from __future__ import annotations

import logging
import os
from typing import TYPE_CHECKING

Expand All @@ -24,17 +25,6 @@
from collections.abc import Iterator


@pytest.fixture(scope="module")
def engine() -> Iterator[DaskEngine]:
"""Create one Dask/GPU cluster shared across the test module."""
with DaskEngine(
# Small partition size so tests exercise the multi-partition code path
# deterministically, regardless of input size.
executor_options={"max_rows_per_partition": 10},
) as engine:
yield engine


pytestmark = [
pytest.mark.skipif(
is_running_with_rrun(),
Expand All @@ -43,127 +33,92 @@ def engine() -> Iterator[DaskEngine]:
]


@pytest.fixture(scope="module")
def dask_client() -> Iterator[distributed.Client]: # type: ignore[name-defined]
# Use for DaskEngine constructor tests to avoid re-creating the cluster
# Otherwise, use session-scoped DaskEngine in conftest
with (
distributed.LocalCluster(
n_workers=1,
threads_per_worker=1,
silence_logs=logging.WARNING,
) as cluster,
distributed.Client(cluster) as client,
):
yield client


# ---------------------------------------------------------------------------
# GPU tests — share a single Dask cluster for the whole module
# GPU tests — reuse the session-scoped Dask cluster from conftest
# ---------------------------------------------------------------------------


def test_from_options() -> None:
def test_from_options(dask_client: distributed.Client) -> None: # type: ignore[name-defined]
"""DaskEngine.from_options with default StreamingOptions creates a valid engine."""
opts = StreamingOptions(fallback_mode="silent")
with DaskEngine.from_options(opts) as engine:
with DaskEngine.from_options(opts, dask_client=dask_client) as engine:
assert engine.nranks >= 1


def test_yields_engine(engine: DaskEngine) -> None:
def test_yields_engine(dask_engine: DaskEngine) -> None:
"""DaskEngine is a GPUEngine with at least one rank."""
assert isinstance(engine, pl.GPUEngine)
assert engine.nranks >= 1
assert isinstance(dask_engine, pl.GPUEngine)
assert dask_engine.nranks >= 1


def test_executor_options_forwarded(engine: DaskEngine) -> None:
def test_executor_options_forwarded(dask_engine: DaskEngine) -> None:
"""Reserved executor_options keys are injected into the engine config."""
opts = engine.config["executor_options"]
opts = dask_engine.config["executor_options"]
assert opts["cluster"] == "dask"
assert isinstance(opts["dask_context"], DaskContext)


def test_gather_cluster_info(engine: DaskEngine) -> None:
def test_gather_cluster_info(dask_engine: DaskEngine) -> None:
"""gather_cluster_info returns one ClusterInfo per rank with expected fields."""
infos = engine.gather_cluster_info()
assert len(infos) == engine.nranks
infos = dask_engine.gather_cluster_info()
assert len(infos) == dask_engine.nranks
for info in infos:
assert isinstance(info.pid, int)
assert isinstance(info.hostname, str)
# Each worker runs in its own process.
assert len({info.pid for info in infos}) == engine.nranks
assert len({info.pid for info in infos}) == dask_engine.nranks


def test_worker_host_memory_limit(engine: DaskEngine) -> None:
def test_worker_host_memory_limit(dask_engine: DaskEngine) -> None:
"""Memory limit is respected."""
scheduler_info = engine._dask_ctx.client.scheduler_info(n_workers=-1)
scheduler_info = dask_engine._dask_ctx.client.scheduler_info(n_workers=-1)
worker = next(iter(scheduler_info["workers"].values()))
assert worker["memory_limit"] == distributed.system.MEMORY_LIMIT


def test_from_options_creates_engine() -> None:
def test_from_options_creates_engine(dask_client: distributed.Client) -> None: # type: ignore[name-defined]
"""DaskEngine.from_options produces a working engine and runs a query."""
opts = StreamingOptions(max_rows_per_partition=10, fallback_mode="silent")
with DaskEngine.from_options(opts) as eng:
with DaskEngine.from_options(opts, dask_client=dask_client) as eng:
assert isinstance(eng, pl.GPUEngine)
assert eng.nranks >= 1
lf = pl.LazyFrame({"a": [1, 2, 3]})
assert_gpu_result_equal(lf, engine=eng, check_row_order=False)


def test_scan(engine: DaskEngine) -> None:
"""Input rows are partitioned across workers; total output equals input."""
lf = pl.LazyFrame({"a": [1, 2, 3]})
assert_gpu_result_equal(lf, engine=engine, check_row_order=False)


def test_filter(engine: DaskEngine) -> None:
"""Filter is applied correctly across all workers."""
lf = pl.LazyFrame({"a": [1, 2, 3, 4, 5]})
assert_gpu_result_equal(
lf.filter(pl.col("a") > 3), engine=engine, check_row_order=False
)


def test_group_by(engine: DaskEngine) -> None:
"""Group-by produces the correct aggregation across all ranks."""
# max_rows_per_partition=10 (set on the module fixture) gives each rank
# exactly 5 partitions, so the multi-partition path is always exercised.
n, n_keys = engine.nranks * 50, 5
keys = [str(i % n_keys) for i in range(n)]
vals = list(range(n))
lf = pl.LazyFrame({"key": keys, "val": vals})
assert_gpu_result_equal(
lf.group_by("key").agg(pl.col("val").sum()),
engine=engine,
check_row_order=False,
)


def test_join(engine: DaskEngine) -> None:
"""Hash join between two tables produces the correct result across all ranks."""
# max_rows_per_partition=10 (set on the module fixture) gives each rank
# exactly 5 partitions, so the multi-partition path is always exercised.
n = engine.nranks * 50
lf_left = pl.LazyFrame({"key": list(range(n)), "val_left": list(range(n))})
lf_right = pl.LazyFrame(
{"key": list(range(n)), "val_right": [x * 2 for x in range(n)]}
)
assert_gpu_result_equal(
lf_left.join(lf_right, on="key"),
engine=engine,
check_row_order=False,
)


def test_empty_dataframe(engine: DaskEngine) -> None:
"""An empty LazyFrame produces an empty result with the correct schema."""
lf = pl.LazyFrame(
{"a": pl.Series([], dtype=pl.Int32), "b": pl.Series([], dtype=pl.Float64)}
)
assert_gpu_result_equal(lf, engine=engine)


def test_run(engine: DaskEngine) -> None:
result = engine._run(os.getpid)
assert len(set(result)) == engine.nranks
def test_run(dask_engine: DaskEngine) -> None:
result = dask_engine._run(os.getpid)
assert len(set(result)) == dask_engine.nranks


@pytest.fixture(scope="module")
def reset_engine() -> Iterator[DaskEngine]:
"""Module-scoped engine for reset tests — independent of ``engine``.
def reset_engine(dask_client: distributed.Client) -> Iterator[DaskEngine]: # type: ignore[name-defined]
"""Module-scoped engine for reset tests — independent of ``dask_engine``.

These tests exercise :meth:`DaskEngine._reset` (which mutates the
engine in-place). A dedicated fixture keeps those mutations from
leaking into the other tests.
leaking into the conftest-shared ``dask_engine``.

Note: Do not use this fixture if you call _reset with a new dask_client.
"""
with DaskEngine(
executor_options={"max_rows_per_partition": 10},
dask_client=dask_client,
) as e:
yield e

Expand Down Expand Up @@ -209,9 +164,12 @@ def test_reset_collects_after_options_change(reset_engine: DaskEngine) -> None:
)


def test_reset_after_shutdown_raises() -> None:
def test_reset_after_shutdown_raises(dask_client: distributed.Client) -> None: # type: ignore[name-defined]
"""``shutdown`` is idempotent; ``_reset`` after shutdown raises every time."""
engine = DaskEngine(executor_options={"max_rows_per_partition": 10})
engine = DaskEngine(
dask_client=dask_client,
executor_options={"max_rows_per_partition": 10},
)
engine.shutdown()
engine.shutdown() # idempotent
with pytest.raises(RuntimeError, match="shut-down"):
Expand Down
Loading