diff --git a/.github/workflows/pr.yaml b/.github/workflows/pr.yaml index 41a4c734f21..844f20fe573 100644 --- a/.github/workflows/pr.yaml +++ b/.github/workflows/pr.yaml @@ -503,6 +503,7 @@ jobs: # (rapidsmpf compatibility already validated in rapidsmpf CI) matrix_filter: map(select(.ARCH == "amd64")) | group_by(.CUDA_VER|split(".")|map(tonumber)|.[0]) | map(max_by([(.PY_VER|split(".")|map(tonumber)), (.CUDA_VER|split(".")|map(tonumber))])) build_type: pull-request + container-options: "--cap-add CAP_SYS_PTRACE --shm-size=8g --ulimit=nofile=1000000:1000000" script: "ci/test_cudf_polars_experimental.sh" cudf-polars-polars-tests: needs: [wheel-build-cudf-polars, changed-files] diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index b4977f60def..a6b0b6f3326 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -174,6 +174,7 @@ jobs: matrix_filter: map(select(.ARCH == "amd64")) | group_by(.CUDA_VER|split(".")|map(tonumber)|.[0]) | map(max_by([(.PY_VER|split(".")|map(tonumber)), (.CUDA_VER|split(".")|map(tonumber))])) build_type: ${{ inputs.build_type }} branch: ${{ inputs.branch }} + container-options: "--cap-add CAP_SYS_PTRACE --shm-size=8g --ulimit=nofile=1000000:1000000" date: ${{ inputs.date }} sha: ${{ inputs.sha }} script: "ci/test_cudf_polars_experimental.sh" diff --git a/ci/run_cudf_polars_experimental_pytests.sh b/ci/run_cudf_polars_experimental_pytests.sh index d0a4767bd99..da659c7b386 100755 --- a/ci/run_cudf_polars_experimental_pytests.sh +++ b/ci/run_cudf_polars_experimental_pytests.sh @@ -10,5 +10,5 @@ set -euo pipefail # Support invoking outside the script directory cd "$(dirname "$(realpath "${BASH_SOURCE[0]}")")"/../python/cudf_polars/ -echo "Running the full cudf-polars test suite with both the in-memory and spmd engine" +echo "Running the full cudf-polars test suite" python -m pytest --cache-clear "$@" tests diff --git a/ci/test_cudf_polars_experimental.sh b/ci/test_cudf_polars_experimental.sh index aa3abd66254..4b796ff4b94 100755 --- a/ci/test_cudf_polars_experimental.sh +++ b/ci/test_cudf_polars_experimental.sh @@ -28,7 +28,7 @@ rapids-pip-retry install \ -v \ --prefer-binary \ --constraint "${PIP_CONSTRAINT}" \ - "$(echo "${CUDF_POLARS_WHEELHOUSE}"/cudf_polars_"${RAPIDS_PY_CUDA_SUFFIX}"*.whl)[test,experimental]" \ + "$(echo "${CUDF_POLARS_WHEELHOUSE}"/cudf_polars_"${RAPIDS_PY_CUDA_SUFFIX}"*.whl)[test,experimental,ray]" \ "$(echo "${LIBCUDF_WHEELHOUSE}"/libcudf_"${RAPIDS_PY_CUDA_SUFFIX}"*.whl)" \ "$(echo "${PYLIBCUDF_WHEELHOUSE}"/pylibcudf_"${RAPIDS_PY_CUDA_SUFFIX}"*.whl)" diff --git a/dependencies.yaml b/dependencies.yaml index b1eb276befb..f4acc169263 100644 --- a/dependencies.yaml +++ b/dependencies.yaml @@ -384,6 +384,14 @@ files: key: experimental includes: - run_cudf_polars_experimental + py_run_cudf_polars_ray: + output: pyproject + pyproject_dir: python/cudf_polars + extras: + table: project.optional-dependencies + key: ray + includes: + - depends_on_ray py_test_cudf_polars: output: pyproject pyproject_dir: python/cudf_polars @@ -1290,6 +1298,11 @@ dependencies: - matrix: packages: - *rapidsmpf_unsuffixed + depends_on_ray: + common: + - output_types: [conda, requirements, pyproject] + packages: + - ray>=2.55.1 depends_on_rapids_logger: common: - output_types: [conda, requirements, pyproject] diff --git a/python/cudf_polars/cudf_polars/experimental/join.py b/python/cudf_polars/cudf_polars/experimental/join.py index cd5c514b45a..1682762c9e8 100644 --- a/python/cudf_polars/cudf_polars/experimental/join.py +++ b/python/cudf_polars/cudf_polars/experimental/join.py @@ -164,20 +164,22 @@ def _( left, pi_left = rec(left) right, pi_right = rec(right) - # Fallback to single partition on the smaller table + # Fallback to single partition on the smaller table whenever either + # side has more than one partition. left_count = pi_left[left].count right_count = pi_right[right].count output_count = max(left_count, right_count) - fallback_msg = "ConditionalJoin not supported for multiple partitions." - if left_count < right_count: - if left_count > 1 or dynamic_planning: + if output_count > 1 or dynamic_planning: + if left_count < right_count: left = Repartition(left.schema, left) pi_left[left] = PartitionInfo(count=1) - _fallback_inform(fallback_msg, config_options) - elif right_count > 1 or dynamic_planning: - right = Repartition(right.schema, right) - pi_right[right] = PartitionInfo(count=1) - _fallback_inform(fallback_msg, config_options) + else: + right = Repartition(right.schema, right) + pi_right[right] = PartitionInfo(count=1) + _fallback_inform( + "ConditionalJoin not supported for multiple partitions.", + config_options, + ) # Reconstruct and return new_node = ir.reconstruct([left, right]) diff --git a/python/cudf_polars/cudf_polars/testing/engine_utils.py b/python/cudf_polars/cudf_polars/testing/engine_utils.py index c36bcf2ed27..b0b640615f7 100644 --- a/python/cudf_polars/cudf_polars/testing/engine_utils.py +++ b/python/cudf_polars/cudf_polars/testing/engine_utils.py @@ -11,6 +11,7 @@ if TYPE_CHECKING: from collections.abc import Mapping + from contextlib import AbstractContextManager import polars as pl @@ -21,6 +22,15 @@ STREAMING_ENGINE_FIXTURE_PARAMS: list[str] = [] if importlib.util.find_spec("rapidsmpf") is not None: STREAMING_ENGINE_FIXTURE_PARAMS.extend(["spmd", "spmd-small"]) + # ``DaskEngine`` and ``RayEngine`` both reject construction inside an + # ``rrun`` cluster. + from rapidsmpf.bootstrap import is_running_with_rrun as _is_running_with_rrun + + if not _is_running_with_rrun(): # pragma: no cover + if importlib.util.find_spec("distributed") is not None: + STREAMING_ENGINE_FIXTURE_PARAMS.append("dask") + if importlib.util.find_spec("ray") is not None: + STREAMING_ENGINE_FIXTURE_PARAMS.append("ray") ALL_ENGINE_FIXTURE_PARAMS = ["in-memory", *STREAMING_ENGINE_FIXTURE_PARAMS] @@ -63,6 +73,34 @@ def is_streaming_engine(obj: Any) -> bool: return isinstance(obj, StreamingEngine) +def warns_on_spmd( # pragma: no cover; rapidsmpf-only path + engine: Any, + *args: Any, + when: bool = True, + **kwargs: Any, +) -> AbstractContextManager[Any]: + """ + ``pytest.warns(*args, **kwargs)`` on SPMD; ``nullcontext`` otherwise. + + ``pytest.warns`` only captures warnings emitted in the test process. On + multi-process backends (``DaskEngine``, ``RayEngine``) the fallback + warning fires on workers/actors and only appears in worker logs/stdout, + so the assertion is replaced with a passthrough on those backends. + + The optional ``when`` kwarg lets callers compose an additional gate (e.g. + a parametrize value) without an outer ``if``. + """ + import contextlib + + import pytest + + from cudf_polars.experimental.rapidsmpf.frontend.spmd import SPMDEngine + + if when and isinstance(engine, SPMDEngine): + return pytest.warns(*args, **kwargs) + return contextlib.nullcontext() + + def create_streaming_options( blocksize_mode: Literal["medium", "small"], overrides: StreamingOptions | None = None, @@ -87,6 +125,9 @@ def create_streaming_options( from cudf_polars.experimental.rapidsmpf.frontend.options import StreamingOptions from cudf_polars.utils.config import StreamingFallbackMode + # ``allow_gpu_sharing=True`` is always set so the cached multi-rank + # engines (Dask workers, Ray actors with ``num_ranks > 1``) don't trip + # the UUID-collision guard on every ``_reset(...)``. match blocksize_mode: case "medium": baseline = StreamingOptions( @@ -94,6 +135,7 @@ def create_streaming_options( dynamic_planning={}, target_partition_size=1_000_000, raise_on_fail=True, + allow_gpu_sharing=True, ) case "small": baseline = StreamingOptions( @@ -102,6 +144,7 @@ def create_streaming_options( target_partition_size=10, raise_on_fail=True, fallback_mode=StreamingFallbackMode.SILENT, + allow_gpu_sharing=True, ) case _: # pragma: no cover raise ValueError(f"Unknown blocksize_mode: {blocksize_mode!r}") diff --git a/python/cudf_polars/pyproject.toml b/python/cudf_polars/pyproject.toml index 47633e42364..7703cad7dad 100644 --- a/python/cudf_polars/pyproject.toml +++ b/python/cudf_polars/pyproject.toml @@ -63,6 +63,9 @@ rapidsmpf = [ "pyarrow>=19.0.0,<24", "rapidsmpf==26.6.*,>=0.0.0a0", ] # This list was generated by `rapids-dependency-file-generator`. To make changes, edit ../../dependencies.yaml and run `rapids-dependency-file-generator`. +ray = [ + "ray>=2.55.1", +] # This list was generated by `rapids-dependency-file-generator`. To make changes, edit ../../dependencies.yaml and run `rapids-dependency-file-generator`. [project.urls] Homepage = "https://github.com/rapidsai/cudf" diff --git a/python/cudf_polars/tests/conftest.py b/python/cudf_polars/tests/conftest.py index b3d83b36d36..65445b683ae 100644 --- a/python/cudf_polars/tests/conftest.py +++ b/python/cudf_polars/tests/conftest.py @@ -31,6 +31,12 @@ StreamingEngines: TypeAlias = Mapping[str, StreamingEngine] +# Number of ranks for multi-rank streaming engines that share one GPU +# (currently ``RayEngine``). Single-GPU dev hosts and CI runners require +# ``allow_gpu_sharing=True`` to oversubscribe one device across actors. +NUM_RANKS = 2 + + @pytest.fixture(params=[False, True], ids=["no_nulls", "nulls"], scope="session") def with_nulls(request): return request.param @@ -89,6 +95,27 @@ def streaming_engines() -> Generator[StreamingEngines, None, None]: ) engines: dict[str, StreamingEngine] = {"spmd": SPMDEngine(comm=comm)} + + if "dask" in STREAMING_ENGINE_FIXTURE_PARAMS: # pragma: no cover + from cudf_polars.experimental.rapidsmpf.frontend.dask import DaskEngine + + engines["dask"] = DaskEngine(engine_options={"allow_gpu_sharing": True}) + + if "ray" in STREAMING_ENGINE_FIXTURE_PARAMS: # pragma: no cover + from cudf_polars.experimental.rapidsmpf.frontend.ray import RayEngine + + # Always pin ``num_ranks`` so the cached engine has a deterministic + # actor count regardless of how many GPUs the host happens to have; + # otherwise ``RayEngine`` defaults to ``get_num_gpus_in_ray_cluster()`` + # and tests that depend on rank-count behavior (e.g. fast-count + # parquet, concat) become non-portable. Pinning ``num_ranks`` requires + # ``allow_gpu_sharing=True`` (production guard). + engines["ray"] = RayEngine( + num_ranks=NUM_RANKS, + engine_options={"allow_gpu_sharing": True}, + ray_init_options={"include_dashboard": False}, + ) + try: yield engines finally: @@ -108,6 +135,28 @@ def spmd_engine(streaming_engines: StreamingEngines) -> SPMDEngine: return engine +@pytest.fixture +def spmd_engine_factory( + streaming_engines: StreamingEngines, +) -> Callable[..., SPMDEngine]: + """ + Return a factory that yields the shared :class:`SPMDEngine`. + + Use this in place of :func:`streaming_engine_factory` for tests that + must run on SPMD only. + """ + from cudf_polars.experimental.rapidsmpf.frontend.spmd import SPMDEngine + + param = EngineFixtureParam(full_name="spmd") + + def factory(options: StreamingOptions | None = None) -> SPMDEngine: + engine = build_streaming_engine(param, streaming_engines, options) + assert isinstance(engine, SPMDEngine) + return engine + + return factory + + @pytest.fixture(params=STREAMING_ENGINE_FIXTURE_PARAMS) def _streaming_engine_param(request: pytest.FixtureRequest) -> EngineFixtureParam: """Parametrization helper to run tests for each streaming engine variant.""" @@ -246,10 +295,9 @@ def pytest_configure(config): config.addinivalue_line( "markers", - "skip_on_streaming_engine(reason): skip the test for streaming " - '``engine`` variants (e.g. ``"spmd"``, ``"spmd-small"``) while ' - "still letting the in-memory variant run. Use this to track features " - "that have no multi-partition implementation", + "skip_on_streaming_engine(reason, *, engine=None): skip the test for " + 'streaming ``engine`` variants (e.g. ``"spmd"``, ``"spmd-small"``, ' + '``"dask"``, ``"ray"``) while still allowing the in-memory variant to run.', ) # Ray's internal subprocess management leaks `/dev/null` file handles, and @@ -275,9 +323,23 @@ def pytest_collection_modifyitems(items): callspec = getattr(item, "callspec", None) if callspec is None: continue - engine_param = callspec.params.get("_all_engine_param") + # Tests bind to either ``engine`` (parametrized via ``_all_engine_param``) + # or ``streaming_engine`` / ``streaming_engine_factory`` (parametrized via + # ``_streaming_engine_param``). Check both. + engine_param = callspec.params.get("_all_engine_param") or callspec.params.get( + "_streaming_engine_param" + ) if engine_param is None or engine_param == "in-memory": continue + engine_filter = marker.kwargs.get("engine") + if engine_filter is not None: + if isinstance(engine_filter, str): + engine_filter = (engine_filter,) + # Strip the ``-small`` suffix so ``"spmd-small"`` matches + # ``engine=("spmd",)``. + engine_name = engine_param.removesuffix("-small") + if engine_name not in engine_filter: + continue reason = ( marker.args[0] if marker.args diff --git a/python/cudf_polars/tests/experimental/test_all_gather_host_data.py b/python/cudf_polars/tests/experimental/test_all_gather_host_data.py index 8f09a82c4bd..c85598a8c64 100644 --- a/python/cudf_polars/tests/experimental/test_all_gather_host_data.py +++ b/python/cudf_polars/tests/experimental/test_all_gather_host_data.py @@ -59,8 +59,6 @@ def test_gather_cluster_info(streaming_engine) -> None: assert isinstance(info.gpu_uuid, str) # Each rank runs in its own process. assert len({info.pid for info in infos}) == streaming_engine.nranks - # Without allow_gpu_sharing, all UUIDs must be unique (enforced at init). - assert len({info.gpu_uuid for info in infos}) == streaming_engine.nranks def test_cluster_info_cuda_visible_devices(monkeypatch) -> None: diff --git a/python/cudf_polars/tests/experimental/test_dataframescan.py b/python/cudf_polars/tests/experimental/test_dataframescan.py index dbf22848824..fb263e20b94 100644 --- a/python/cudf_polars/tests/experimental/test_dataframescan.py +++ b/python/cudf_polars/tests/experimental/test_dataframescan.py @@ -60,19 +60,20 @@ def test_parallel_dataframescan(df, streaming_engine_factory, max_rows_per_parti assert count == 1 -@pytest.mark.xfail( - reason=( - "Multi-rank Union interleaves child outputs across ranks: client " - "receives [rank0_A, rank0_B, rank1_A, rank1_B] instead of the " - "polars-CPU [A, B]. Tracked in " - "https://github.com/rapidsai/cudf/issues/22376." - ), - strict=False, -) -def test_dataframescan_concat(df, streaming_engine_factory): +def test_dataframescan_concat(request, df, streaming_engine_factory): streaming_engine = streaming_engine_factory( StreamingOptions(max_rows_per_partition=1_000), ) + if streaming_engine.nranks > 1: + # Multi-rank Union interleaves child outputs across ranks: client + # receives [rank0_A, rank0_B, rank1_A, rank1_B] instead of the + # polars-CPU [A, B]. + request.applymarker( + pytest.mark.xfail( + reason="https://github.com/rapidsai/cudf/issues/22376", + strict=False, + ) + ) df2 = pl.concat([df, df]) assert_gpu_result_equal(df2, engine=streaming_engine) diff --git a/python/cudf_polars/tests/experimental/test_filter.py b/python/cudf_polars/tests/experimental/test_filter.py index 4fb11df691c..b8b4fb2749c 100644 --- a/python/cudf_polars/tests/experimental/test_filter.py +++ b/python/cudf_polars/tests/experimental/test_filter.py @@ -9,12 +9,11 @@ from cudf_polars.experimental.rapidsmpf.frontend.options import StreamingOptions from cudf_polars.testing.asserts import assert_gpu_result_equal +from cudf_polars.testing.engine_utils import warns_on_spmd @pytest.fixture def engine(streaming_engine_factory): - # ``fallback_mode="warn"`` overrides the small-blocksize baseline (which - # sets SILENT) so ``test_filter_non_pointwise`` can assert on the warning. return streaming_engine_factory( StreamingOptions(max_rows_per_partition=3, fallback_mode="warn"), ) @@ -38,7 +37,9 @@ def test_filter_pointwise(df, engine): def test_filter_non_pointwise(df, engine): query = df.filter(pl.col("a") > pl.col("a").max()) - with pytest.warns( - UserWarning, match="This filter is not supported for multiple partitions." + with warns_on_spmd( + engine, + UserWarning, + match="This filter is not supported for multiple partitions.", ): assert_gpu_result_equal(query, engine=engine) diff --git a/python/cudf_polars/tests/experimental/test_groupby.py b/python/cudf_polars/tests/experimental/test_groupby.py index 03d87fe23e9..6ca11387da0 100644 --- a/python/cudf_polars/tests/experimental/test_groupby.py +++ b/python/cudf_polars/tests/experimental/test_groupby.py @@ -131,8 +131,8 @@ def test_groupby_std_var_ddof(df, engine, agg, ddof): @pytest.mark.parametrize("fallback_mode", ["silent", "raise", "warn", "foo"]) -def test_groupby_fallback(df, fallback_mode, streaming_engine_factory): - streaming_engine = streaming_engine_factory( +def test_groupby_fallback(df, fallback_mode, spmd_engine_factory): + streaming_engine = spmd_engine_factory( StreamingOptions(fallback_mode=fallback_mode), ) match = "Failed to decompose groupby aggs" @@ -287,6 +287,10 @@ def test_groupby_count_type_mismatch(df, streaming_engine_factory): assert_gpu_result_equal(q, engine=streaming_engine, check_row_order=False) +@pytest.mark.skip_on_streaming_engine( + "patch.object on ShuffleManager.Inserter doesn't reach worker processes", + engine=("dask", "ray"), +) def test_shuffle_reduce_insert_finished_called_on_oom(streaming_engine_factory): streaming_engine = streaming_engine_factory( StreamingOptions(target_partition_size=10, max_rows_per_partition=5), diff --git a/python/cudf_polars/tests/experimental/test_io_multirank.py b/python/cudf_polars/tests/experimental/test_io_multirank.py index 2208cc67316..bf9e8e70343 100644 --- a/python/cudf_polars/tests/experimental/test_io_multirank.py +++ b/python/cudf_polars/tests/experimental/test_io_multirank.py @@ -7,16 +7,15 @@ from typing import TYPE_CHECKING import pytest -from rapidsmpf.bootstrap import is_running_with_rrun import polars as pl -from cudf_polars.experimental.rapidsmpf.frontend.spmd import SPMDEngine +from cudf_polars.experimental.rapidsmpf.frontend.options import StreamingOptions from cudf_polars.testing.asserts import assert_sink_result_equal from cudf_polars.utils.config import Cluster, StreamingExecutor if TYPE_CHECKING: - from collections.abc import Iterator + from collections.abc import Callable from pathlib import Path from cudf_polars.experimental.rapidsmpf.frontend.core import StreamingEngine @@ -39,43 +38,14 @@ def df() -> pl.LazyFrame: ) -@pytest.fixture(params=["spmd", "ray", "dask"]) +@pytest.fixture def engine( - request: pytest.FixtureRequest, - spmd_engine: SPMDEngine, -) -> Iterator[StreamingEngine]: - """Yield each supported streaming engine.""" - backend = request.param - executor_options = {"max_rows_per_partition": 1_000} - - if backend == "spmd": - with SPMDEngine( - comm=spmd_engine.comm, - executor_options=executor_options, - ) as eng: - yield eng - return - - if is_running_with_rrun(): - pytest.skip(f"{backend}Engine must not be created from within an rrun cluster") - - if backend == "ray": - pytest.importorskip("ray", reason="ray is not installed") - from cudf_polars.experimental.rapidsmpf.frontend.ray import RayEngine - - with RayEngine( - executor_options=executor_options, - ray_init_options={"include_dashboard": False}, - ) as eng: - yield eng - return - - assert backend == "dask" - pytest.importorskip("distributed", reason="distributed is not installed") - from cudf_polars.experimental.rapidsmpf.frontend.dask import DaskEngine - - with DaskEngine(executor_options=executor_options) as eng: - yield eng + streaming_engine_factory: Callable[..., StreamingEngine], +) -> StreamingEngine: + """Yield each supported streaming engine pinned to small partitions.""" + return streaming_engine_factory( + StreamingOptions(max_rows_per_partition=1_000), + ) def test_sink_parquet_directory( diff --git a/python/cudf_polars/tests/experimental/test_join.py b/python/cudf_polars/tests/experimental/test_join.py index 6a09ff95ef5..1b4635dd924 100644 --- a/python/cudf_polars/tests/experimental/test_join.py +++ b/python/cudf_polars/tests/experimental/test_join.py @@ -19,6 +19,7 @@ from cudf_polars.experimental.shuffle import Shuffle from cudf_polars.experimental.statistics import collect_statistics from cudf_polars.testing.asserts import assert_gpu_result_equal +from cudf_polars.testing.engine_utils import warns_on_spmd from cudf_polars.utils.config import ConfigOptions, StreamingExecutor @@ -103,12 +104,11 @@ def test_join_conditional(reverse, max_rows_per_partition, streaming_engine_fact if reverse: left, right = right, left q = left.join_where(right, pl.col("y") < pl.col("yy")) - if max_rows_per_partition == 3: - with pytest.warns( - UserWarning, match="ConditionalJoin not supported for multiple partitions." - ): - assert_gpu_result_equal(q, engine=streaming_engine, check_row_order=False) - else: + with warns_on_spmd( + streaming_engine, + UserWarning, + match="ConditionalJoin not supported for multiple partitions.", + ): assert_gpu_result_equal(q, engine=streaming_engine, check_row_order=False) @@ -156,7 +156,7 @@ def test_join(left, right, how, reverse, streaming_engine_factory, options): @pytest.mark.parametrize("zlice", [(0, 2), (2, 2), (-2, None)]) -def test_join_and_slice(zlice, streaming_engine_factory): +def test_join_and_slice(request, zlice, streaming_engine_factory): streaming_engine = streaming_engine_factory( StreamingOptions( max_rows_per_partition=3, @@ -164,6 +164,16 @@ def test_join_and_slice(zlice, streaming_engine_factory): fallback_mode="warn", ), ) + if streaming_engine.nranks > 1: + # The multi-rank fallback for slice doesn't preserve row order + # within equal-key groups, so the slice can pick different rows + # than the CPU baseline. + request.applymarker( + pytest.mark.xfail( + reason="https://github.com/rapidsai/cudf/issues/22405", + strict=False, + ) + ) left = pl.LazyFrame( { "a": [1, 2, 3, 1, None], @@ -181,23 +191,22 @@ def test_join_and_slice(zlice, streaming_engine_factory): q = left.join(right, on="a", how="inner").slice(*zlice) # Check that we get the correct row count # See: https://github.com/rapidsai/cudf/issues/19153 - if zlice in {(2, 2), (-2, None)}: - with pytest.warns( - UserWarning, match="This slice not supported for multiple partitions." - ): - assert q.collect(engine=streaming_engine).height == q.collect().height - else: + with warns_on_spmd( + streaming_engine, + UserWarning, + match="This slice not supported for multiple partitions.", + when=zlice in {(2, 2), (-2, None)}, + ): assert q.collect(engine=streaming_engine).height == q.collect().height # Need sort to match order after a join q = left.join(right, on="a", how="inner").sort(pl.col("a")).slice(*zlice) - if zlice == (2, 2): - with pytest.warns( - UserWarning, - match="This slice not supported for multiple partitions.", - ): - assert_gpu_result_equal(q, engine=streaming_engine) - else: + with warns_on_spmd( + streaming_engine, + UserWarning, + match="This slice not supported for multiple partitions.", + when=zlice == (2, 2), + ): assert_gpu_result_equal(q, engine=streaming_engine) @@ -232,7 +241,8 @@ def test_join_maintain_order_fallback_streaming( ) q = left.join(right, on="y", how="inner", maintain_order=maintain_order) - with pytest.warns( + with warns_on_spmd( + streaming_engine, UserWarning, match=r"Join\(maintain_order=.*\) not supported for multiple partitions\.", ): diff --git a/python/cudf_polars/tests/experimental/test_metadata.py b/python/cudf_polars/tests/experimental/test_metadata.py index 618087a27c5..791e33744cd 100644 --- a/python/cudf_polars/tests/experimental/test_metadata.py +++ b/python/cudf_polars/tests/experimental/test_metadata.py @@ -66,20 +66,30 @@ def right() -> pl.LazyFrame: def test_rapidsmpf_join_metadata( left: pl.LazyFrame, right: pl.LazyFrame, - streaming_engine_factory, + spmd_engine_factory, options, ) -> None: - streaming_engine = streaming_engine_factory(options) - config_options = ConfigOptions.from_polars_engine(streaming_engine) + # Pinned to SPMD: ``ChannelMetadata.__reduce_cython__`` can't pickle + # ``self._handle`` across worker/actor processes, so the + # ``metadata_collector`` round-trip fails on Dask and Ray. + # + # When https://github.com/rapidsai/cudf/pull/22394 lands, dedup of + # replicated outputs moves to the Dask/Ray frontends and the + # ``duplicated`` flag's semantics change to "every rank holds the + # data". Revisit the ``len(metadata_collector) == 1`` and + # ``metadata.duplicated is False`` assertions below, and reconsider + # whether this test can widen to ``streaming_engine_factory``. + engine = spmd_engine_factory(options) + config_options = ConfigOptions.from_polars_engine(engine) broadcast_join_limit = config_options.executor.broadcast_join_limit q = left.join( right, on="y", how="left", ).filter(pl.col("x") > pl.col("zz")) - ir = Translator(q._ldf.visit(), streaming_engine).translate_ir() - left_count = left.collect(engine=streaming_engine).height - right_count = right.collect(engine=streaming_engine).height + ir = Translator(q._ldf.visit(), engine).translate_ir() + left_count = left.collect(engine=engine).height + right_count = right.collect(engine=engine).height metadata_collector = evaluate_logical_plan( ir, config_options, collect_metadata=True diff --git a/python/cudf_polars/tests/experimental/test_parallel.py b/python/cudf_polars/tests/experimental/test_parallel.py index 67fc372e2e4..a9a0ff63786 100644 --- a/python/cudf_polars/tests/experimental/test_parallel.py +++ b/python/cudf_polars/tests/experimental/test_parallel.py @@ -50,10 +50,10 @@ def test_rename_concat(streaming_engine) -> None: assert_gpu_result_equal(q, engine=streaming_engine) -def test_fallback_on_concat_zlice(streaming_engine_factory) -> None: +def test_fallback_on_concat_zlice(spmd_engine_factory) -> None: # Pin ``fallback_mode="warn"`` so the spmd-small baseline (which sets # ``SILENT``) doesn't suppress the warning this test asserts on. - streaming_engine = streaming_engine_factory(StreamingOptions(fallback_mode="warn")) + streaming_engine = spmd_engine_factory(StreamingOptions(fallback_mode="warn")) q = pl.concat( [ pl.LazyFrame({"a": [1, 2]}), diff --git a/python/cudf_polars/tests/experimental/test_rolling.py b/python/cudf_polars/tests/experimental/test_rolling.py index 37de6f7f8a1..ee3ae137e27 100644 --- a/python/cudf_polars/tests/experimental/test_rolling.py +++ b/python/cudf_polars/tests/experimental/test_rolling.py @@ -8,6 +8,7 @@ import polars as pl from cudf_polars.experimental.rapidsmpf.frontend.options import StreamingOptions +from cudf_polars.experimental.rapidsmpf.frontend.spmd import SPMDEngine from cudf_polars.testing.asserts import assert_gpu_result_equal from cudf_polars.utils.versions import POLARS_VERSION_LT_136 @@ -46,10 +47,20 @@ def test_rolling_datetime(request, engine): assert_gpu_result_equal(q, engine=engine) -def test_over_in_filter_unsupported(streaming_engine_factory) -> None: +def test_over_in_filter_unsupported(request, streaming_engine_factory) -> None: engine = streaming_engine_factory( StreamingOptions(max_rows_per_partition=1, fallback_mode="warn"), ) + if not isinstance(engine, SPMDEngine): + # On Dask/Ray the fallback warning fires on worker processes and is + # invisible to ``pytest.warns``; the multi-rank fallback also + # doesn't preserve row order. + request.applymarker( + pytest.mark.xfail( + reason="https://github.com/rapidsai/cudf/issues/22405", + strict=False, + ) + ) q = pl.concat( [ pl.LazyFrame({"k": ["x", "y"], "v": [3, 2]}), diff --git a/python/cudf_polars/tests/experimental/test_select.py b/python/cudf_polars/tests/experimental/test_select.py index 264f8b5aab1..cef9f0f66cf 100644 --- a/python/cudf_polars/tests/experimental/test_select.py +++ b/python/cudf_polars/tests/experimental/test_select.py @@ -22,6 +22,7 @@ assert_gpu_result_equal, assert_ir_translation_raises, ) +from cudf_polars.testing.engine_utils import warns_on_spmd from cudf_polars.utils.versions import ( POLARS_VERSION_LT_132, POLARS_VERSION_LT_134, @@ -54,8 +55,8 @@ def test_select(df, engine): @pytest.mark.parametrize("fallback_mode", ["silent", "raise", "warn", "foo"]) -def test_select_reduce_fallback(df, streaming_engine_factory, fallback_mode): - engine = streaming_engine_factory( +def test_select_reduce_fallback(df, spmd_engine_factory, fallback_mode): + engine = spmd_engine_factory( StreamingOptions(max_rows_per_partition=3, fallback_mode=fallback_mode), ) match = "This selection is not supported for multiple partitions." @@ -84,13 +85,17 @@ def test_select_reduce_fallback(df, streaming_engine_factory, fallback_mode): assert_gpu_result_equal(query, engine=engine) -def test_select_fill_null_with_strategy(df, engine): +def test_select_fill_null_with_strategy(df, streaming_engine_factory): + engine = streaming_engine_factory( + StreamingOptions(max_rows_per_partition=3, fallback_mode="warn"), + ) q = df.select(pl.col("a").forward_fill()) if POLARS_VERSION_LT_132: assert_ir_translation_raises(q, NotImplementedError) else: - with pytest.warns( + with warns_on_spmd( + engine, UserWarning, match="fill_null with strategy other than 'zero' or 'one' is not supported for multiple partitions", ): @@ -183,15 +188,19 @@ def test_select_mean_with_decimals(engine): assert_gpu_result_equal(q, engine=engine, check_dtypes=not POLARS_VERSION_LT_134) -def test_select_with_len(engine): - # https://github.com/pola-rs/polars/issues/25592 +def test_select_with_len(streaming_engine_factory): + engine = streaming_engine_factory( + StreamingOptions(max_rows_per_partition=3, fallback_mode="warn"), + ) df1 = pl.LazyFrame({"c0": [1] * 4}) df2 = pl.LazyFrame({"c0": [2] * 4}) q = pl.concat([df1.join(df2, how="cross"), df1.with_columns(pl.lit(None))]).select( pl.len() ) - with pytest.warns( - UserWarning, match="Cross join not support for multiple partitions" + with warns_on_spmd( + engine, + UserWarning, + match="Cross join not support for multiple partitions", ): assert_gpu_result_equal(q, engine=engine) diff --git a/python/cudf_polars/tests/experimental/test_spilling.py b/python/cudf_polars/tests/experimental/test_spilling.py index 6aa11801132..7f79b911038 100644 --- a/python/cudf_polars/tests/experimental/test_spilling.py +++ b/python/cudf_polars/tests/experimental/test_spilling.py @@ -50,20 +50,20 @@ def create_test_table(nbytes: int, stream: Stream) -> plc.Table: ], ) def test_make_spill_function( - streaming_engine_factory, + spmd_engine_factory, *, pinned_memory: bool, spilled_host_mem_type: MemoryType, ) -> None: """Test that spilling prioritizes longest queues and newest messages.""" - engine = streaming_engine_factory(StreamingOptions(pinned_memory=pinned_memory)) + engine = spmd_engine_factory(StreamingOptions(pinned_memory=pinned_memory)) context = engine.context if spilled_host_mem_type == MemoryType.PINNED_HOST: - assert engine.context.br().pinned_mr is not None + assert context.br().pinned_mr is not None other_host_mem_type = MemoryType.HOST else: - assert engine.context.br().pinned_mr is None + assert context.br().pinned_mr is None other_host_mem_type = MemoryType.PINNED_HOST # Create 3 spillable message containers simulating fanout buffers diff --git a/python/cudf_polars/tests/experimental/test_statistics.py b/python/cudf_polars/tests/experimental/test_statistics.py index 82c121d5830..42014a02106 100644 --- a/python/cudf_polars/tests/experimental/test_statistics.py +++ b/python/cudf_polars/tests/experimental/test_statistics.py @@ -7,14 +7,12 @@ from typing import TYPE_CHECKING import pytest -from rapidsmpf.bootstrap import is_running_with_rrun -from rapidsmpf.config import Options from rapidsmpf.statistics import Statistics -from cudf_polars.experimental.rapidsmpf.frontend.spmd import SPMDEngine +from cudf_polars.experimental.rapidsmpf.frontend.options import StreamingOptions if TYPE_CHECKING: - from collections.abc import Iterator + from collections.abc import Callable from cudf_polars.experimental.rapidsmpf.frontend.core import StreamingEngine @@ -25,49 +23,14 @@ ] -@pytest.fixture(params=["spmd", "ray", "dask"]) +@pytest.fixture def engine( - request: pytest.FixtureRequest, - spmd_engine: SPMDEngine, -) -> Iterator[StreamingEngine]: + streaming_engine_factory: Callable[..., StreamingEngine], +) -> StreamingEngine: """Yield each supported streaming engine with statistics enabled.""" - backend = request.param - rapidsmpf_options = Options({"statistics": "True"}) - executor_options = {"max_rows_per_partition": 10} - - if backend == "spmd": - with SPMDEngine( - comm=spmd_engine.comm, - rapidsmpf_options=rapidsmpf_options, - executor_options=executor_options, - ) as engine: - yield engine - return - - if is_running_with_rrun(): - pytest.skip(f"{backend}Engine must not be created from within an rrun cluster") - - if backend == "ray": - pytest.importorskip("ray", reason="ray is not installed") - from cudf_polars.experimental.rapidsmpf.frontend.ray import RayEngine - - with RayEngine( - rapidsmpf_options=rapidsmpf_options, - executor_options=executor_options, - ray_init_options={"include_dashboard": False}, - ) as engine: - yield engine - return - - assert backend == "dask" - pytest.importorskip("distributed", reason="distributed is not installed") - from cudf_polars.experimental.rapidsmpf.frontend.dask import DaskEngine - - with DaskEngine( - rapidsmpf_options=rapidsmpf_options, - executor_options=executor_options, - ) as engine: - yield engine + return streaming_engine_factory( + StreamingOptions(statistics=True, max_rows_per_partition=10), + ) def test_statistics(engine: StreamingEngine) -> None: diff --git a/python/cudf_polars/tests/experimental/test_unique.py b/python/cudf_polars/tests/experimental/test_unique.py index 6bb30624cb6..1a157c3fe21 100644 --- a/python/cudf_polars/tests/experimental/test_unique.py +++ b/python/cudf_polars/tests/experimental/test_unique.py @@ -10,13 +10,7 @@ from cudf_polars.experimental.rapidsmpf.frontend.options import StreamingOptions from cudf_polars.testing.asserts import assert_gpu_result_equal - - -@pytest.fixture -def engine(streaming_engine_factory): - return streaming_engine_factory( - StreamingOptions(fallback_mode="warn"), - ) +from cudf_polars.testing.engine_utils import warns_on_spmd @pytest.fixture(scope="module") @@ -77,11 +71,12 @@ def test_unique_head_tail(keep, zlice, streaming_engine_factory): ) -def test_unique_complex_slice_fallback(df, engine): +def test_unique_complex_slice_fallback(df, streaming_engine_factory): """Test that unique with complex slice (offset >= 1) falls back correctly.""" + engine = streaming_engine_factory(StreamingOptions(fallback_mode="warn")) # unique().slice(offset=5, length=10) has zlice[0] >= 1, triggering fallback q = df.unique(subset=("y",), keep="any").slice(5, 10) - with pytest.warns(UserWarning, match="Complex slice not supported"): + with warns_on_spmd(engine, UserWarning, match="Complex slice not supported"): result = q.collect(engine=engine) # Just verify the fallback produces valid output with expected shape assert result.shape == (10, 3)