diff --git a/ci/run_cudf_polars_experimental_pytests.sh b/ci/run_cudf_polars_experimental_pytests.sh index 3056bd22595..2177cdd1cb1 100755 --- a/ci/run_cudf_polars_experimental_pytests.sh +++ b/ci/run_cudf_polars_experimental_pytests.sh @@ -14,7 +14,7 @@ echo "Running the full cudf-polars test suite with both the in-memory and spmd e timeout 10m python -m pytest --cache-clear "$@" tests --ignore=tests/experimental/legacy echo "Running experimental legacy tests with the 'rapidsmpf' runtime and a 'distributed' cluster" -timeout 10m python -m pytest --cache-clear "$@" "tests/experimental/legacy" \ +timeout 10m python -m pytest --cache-clear "$@" "tests/experimental/legacy" -v \ --executor streaming \ --cluster distributed \ --runtime rapidsmpf diff --git a/ci/test_cudf_polars_experimental.sh b/ci/test_cudf_polars_experimental.sh index fdeddb60904..b5bfa821351 100755 --- a/ci/test_cudf_polars_experimental.sh +++ b/ci/test_cudf_polars_experimental.sh @@ -28,7 +28,7 @@ rapids-pip-retry install \ -v \ --prefer-binary \ --constraint "${PIP_CONSTRAINT}" \ - "$(echo "${CUDF_POLARS_WHEELHOUSE}"/cudf_polars_"${RAPIDS_PY_CUDA_SUFFIX}"*.whl)[test,experimental,rapidsmpf]" \ + "$(echo "${CUDF_POLARS_WHEELHOUSE}"/cudf_polars_"${RAPIDS_PY_CUDA_SUFFIX}"*.whl)[test,experimental,rapidsmpf,ray]" \ "$(echo "${LIBCUDF_WHEELHOUSE}"/libcudf_"${RAPIDS_PY_CUDA_SUFFIX}"*.whl)" \ "$(echo "${PYLIBCUDF_WHEELHOUSE}"/pylibcudf_"${RAPIDS_PY_CUDA_SUFFIX}"*.whl)" diff --git a/dependencies.yaml b/dependencies.yaml index 2ffd2e9318f..f4858400805 100644 --- a/dependencies.yaml +++ b/dependencies.yaml @@ -354,6 +354,14 @@ files: includes: - depends_on_rapidsmpf - pyarrow_run + py_run_cudf_polars_ray: + output: pyproject + pyproject_dir: python/cudf_polars + extras: + table: project.optional-dependencies + key: ray + includes: + - depends_on_ray py_test_cudf_polars: output: pyproject pyproject_dir: python/cudf_polars @@ -1260,6 +1268,11 @@ dependencies: - matrix: packages: - *rapidsmpf_unsuffixed + depends_on_ray: + common: + - output_types: [conda, requirements, pyproject] + packages: + - ray>=2.0 depends_on_rapids_logger: common: - output_types: [conda, requirements, pyproject] diff --git a/python/cudf_polars/cudf_polars/testing/engine_utils.py b/python/cudf_polars/cudf_polars/testing/engine_utils.py index ec216dc6d88..c1b0cb6ed9c 100644 --- a/python/cudf_polars/cudf_polars/testing/engine_utils.py +++ b/python/cudf_polars/cudf_polars/testing/engine_utils.py @@ -6,6 +6,7 @@ from __future__ import annotations import importlib.util +import os from dataclasses import dataclass from typing import TYPE_CHECKING, Any, Literal @@ -21,8 +22,25 @@ STREAMING_ENGINE_FIXTURE_PARAMS: list[str] = [] if importlib.util.find_spec("rapidsmpf") is not None: STREAMING_ENGINE_FIXTURE_PARAMS.extend(["spmd", "spmd-small"]) + # ``"ray"`` is gated additionally on ``ray`` being installed and not + # running under ``rrun`` (``RayEngine`` raises if it is). + if importlib.util.find_spec("ray") is not None: # pragma: no cover + from rapidsmpf.bootstrap import is_running_with_rrun as _is_running_with_rrun + + if not _is_running_with_rrun(): + STREAMING_ENGINE_FIXTURE_PARAMS.append("ray") + del _is_running_with_rrun ALL_ENGINE_FIXTURE_PARAMS = ["in-memory", *STREAMING_ENGINE_FIXTURE_PARAMS] +# Default rank count for test-side ``RayEngine`` constructions. Multi- +# rank exercises the cross-rank UCXX barrier inside +# :meth:`RankActor.reset` and the multi-actor evaluate path. All +# test-side ``RayEngine(...)`` calls should pass ``num_ranks=NUM_RANKS`` +# (paired with ``engine_options={"allow_gpu_sharing": True}``) so they +# coexist on a single GPU with the cached engine in +# :func:`build_streaming_engine`. +NUM_RANKS: int = 1 + @dataclass class EngineFixtureParam: @@ -110,6 +128,16 @@ def create_streaming_options( return StreamingOptions(**{**baseline.to_dict(), **overrides.to_dict()}) +# Single-slot module-level cache for the most recently built +# :class:`RayEngine`. Subsequent ``build_streaming_engine`` calls for the +# ``"ray"`` backend reuse this engine via :meth:`RayEngine._reset` instead +# of paying the full actor-fork + UCXX bootstrap cost again. Released by +# :func:`shutdown_streaming_engine_cache`, which test session-teardown +# fixtures must call before ``ray.shutdown()`` so the engine's actor +# handles aren't invalidated against a torn-down cluster. +_cached_ray_engine: StreamingEngine | None = None + + def build_streaming_engine( param: EngineFixtureParam, spmd_comm: Communicator, @@ -118,12 +146,19 @@ def build_streaming_engine( """ Build a :class:`StreamingEngine` from an engine fixture parameter. + For ``param.engine_name == "ray"`` a single :class:`RayEngine` is + cached at module scope and reused via :meth:`RayEngine._reset` on + subsequent calls — this amortizes the multi-second actor-fork + + UCXX-bootstrap cost across all tests in a session. Other backends + are constructed fresh on every call. + Parameters ---------- param Decoded engine fixture parameter describing the backend and block size mode. spmd_comm - Communicator used when constructing an :class:`SPMDEngine`. + Communicator used when constructing an :class:`SPMDEngine`. Unused + for the ``"ray"`` branch. options Optional streaming options to merge on top of the baseline selected by ``param.blocksize_mode``. @@ -132,21 +167,78 @@ def build_streaming_engine( ------- A streaming engine matching ``param``. """ - from cudf_polars.experimental.rapidsmpf.frontend.spmd import SPMDEngine - streaming_options = create_streaming_options(param.blocksize_mode, options) + common_kwargs: dict[str, Any] = { + "rapidsmpf_options": streaming_options.to_rapidsmpf_options(), + "executor_options": streaming_options.to_executor_options(), + "engine_options": streaming_options.to_engine_options(), + } match param.engine_name: case "spmd": - return SPMDEngine( - comm=spmd_comm, - rapidsmpf_options=streaming_options.to_rapidsmpf_options(), - executor_options=streaming_options.to_executor_options(), - engine_options=streaming_options.to_engine_options(), - ) + from cudf_polars.experimental.rapidsmpf.frontend.spmd import SPMDEngine + + return SPMDEngine(comm=spmd_comm, **common_kwargs) + case "ray": # pragma: no cover + return _build_or_reset_ray_engine(common_kwargs) case _: # pragma: no cover raise AssertionError(f"Unknown streaming backend: {param.engine_name!r}") +def _build_or_reset_ray_engine( # pragma: no cover + common_kwargs: dict[str, Any], +) -> StreamingEngine: + """ + Return the cached :class:`RayEngine`, ``_reset``-ed to ``common_kwargs``. + + Builds a fresh engine on the first call. On subsequent calls reuses + the cached engine and applies ``common_kwargs`` via + :meth:`RayEngine._reset` — preserving the rank actors and the UCXX + communicator. ``allow_gpu_sharing=True`` is injected on every call + because :class:`StreamingEngine` re-validates the + ``num_ranks > 1`` invariant on every reset. + """ + global _cached_ray_engine # noqa: PLW0603 + # ``_reset`` replaces engine state in full, so ``allow_gpu_sharing`` + # must be re-asserted on every call to satisfy the + # ``num_ranks > 1`` validation in ``StreamingEngine.__init__``. + engine_options = { + **common_kwargs.get("engine_options", {}), + "allow_gpu_sharing": True, + } + common_kwargs = {**common_kwargs, "engine_options": engine_options} + if _cached_ray_engine is None: + from cudf_polars.experimental.rapidsmpf.frontend.ray import RayEngine + + # Prevent Ray from overriding ``CUDA_VISIBLE_DEVICES`` to ``""`` + # when a worker process starts with zero visible GPUs (e.g. the + # test driver itself). In a future Ray release this becomes the + # default; setting it eagerly here keeps test behaviour stable + # across Ray versions. + os.environ.setdefault("RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO", "0") + _cached_ray_engine = RayEngine( + **common_kwargs, + num_ranks=NUM_RANKS, + ray_init_options={"include_dashboard": False}, + ) + else: + _cached_ray_engine._reset(**common_kwargs) # type: ignore[attr-defined] + return _cached_ray_engine + + +def shutdown_streaming_engine_cache() -> None: + """ + Release the cached :class:`RayEngine`, if any. + + Test session-teardown fixtures **must** call this before + ``ray.shutdown()`` so the engine's actor handles are released + against a still-live cluster. + """ + global _cached_ray_engine # noqa: PLW0603 + if _cached_ray_engine is not None: # pragma: no cover + _cached_ray_engine.shutdown() + _cached_ray_engine = None + + def get_blocksize_mode(obj: pl.GPUEngine) -> Literal["medium", "small"]: """ Infer the block size mode for a GPU engine. diff --git a/python/cudf_polars/pyproject.toml b/python/cudf_polars/pyproject.toml index d48793f0541..848c7dc3a8c 100644 --- a/python/cudf_polars/pyproject.toml +++ b/python/cudf_polars/pyproject.toml @@ -63,6 +63,9 @@ rapidsmpf = [ "pyarrow>=19.0.0,<24", "rapidsmpf==26.6.*,>=0.0.0a0", ] # This list was generated by `rapids-dependency-file-generator`. To make changes, edit ../../dependencies.yaml and run `rapids-dependency-file-generator`. +ray = [ + "ray>=2.0", +] # This list was generated by `rapids-dependency-file-generator`. To make changes, edit ../../dependencies.yaml and run `rapids-dependency-file-generator`. [project.urls] Homepage = "https://github.com/rapidsai/cudf" diff --git a/python/cudf_polars/tests/conftest.py b/python/cudf_polars/tests/conftest.py index e62d4ce6f86..384d57a6a28 100644 --- a/python/cudf_polars/tests/conftest.py +++ b/python/cudf_polars/tests/conftest.py @@ -15,6 +15,7 @@ STREAMING_ENGINE_FIXTURE_PARAMS, EngineFixtureParam, build_streaming_engine, + shutdown_streaming_engine_cache, ) if TYPE_CHECKING: @@ -87,6 +88,49 @@ def spmd_comm() -> Communicator: return single_communicator(Options(get_environment_variables()), ProgressThread()) +@pytest.fixture(scope="session") +def ray_cluster() -> Generator[None, None, None]: + """Session-scoped Ray cluster — one ``ray.init`` shared by all ray tests. + + Skipped if ``ray`` is not installed or the suite is running under + ``rrun`` (``RayEngine`` refuses to run inside rrun). + + Yields nothing — this fixture is a side-effect setup gate. The cached + :class:`RayEngine` (built by ``build_streaming_engine``) lives until + this fixture's teardown runs ``shutdown_streaming_engine_cache`` + *before* ``ray.shutdown()`` so the actor handles aren't invalidated + against a torn-down cluster. + """ + pytest.importorskip("ray") + import os + import tempfile + + import ray + from rapidsmpf.bootstrap import is_running_with_rrun + + if is_running_with_rrun(): + pytest.skip("RayEngine cannot be created inside an rrun cluster") + + if not ray.is_initialized(): + # Prevent Ray from overriding ``CUDA_VISIBLE_DEVICES`` to ``""`` + # when a worker process starts with zero visible GPUs (e.g. the + # test driver itself). Future Ray versions error if this isn't + # set explicitly. Must be set *before* ``ray.init``. + os.environ.setdefault("RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO", "0") + # Per-process temp dir disables ``/tmp/ray/session_latest``-based + # auto-discovery, so concurrent pytest runs (xdist, parallel CI + # invocations, repeated local runs) don't accidentally connect + # to each other's clusters. + temp_dir = tempfile.mkdtemp(prefix=f"ray-pytest-{os.getpid()}-") + ray.init(include_dashboard=False, _temp_dir=temp_dir) + try: + yield + finally: + # Release the cached RayEngine before tearing down the cluster. + shutdown_streaming_engine_cache() + ray.shutdown() + + @pytest.fixture(params=STREAMING_ENGINE_FIXTURE_PARAMS) def _streaming_engine_param(request: pytest.FixtureRequest) -> EngineFixtureParam: """Parametrization helper to run tests for each streaming engine variant.""" @@ -101,6 +145,7 @@ def _all_engine_param(request: pytest.FixtureRequest) -> EngineFixtureParam: @pytest.fixture def streaming_engine_factory( + request: pytest.FixtureRequest, _streaming_engine_param: EngineFixtureParam, spmd_comm: Communicator, ) -> Generator[Callable[..., StreamingEngine], None, None]: @@ -108,10 +153,16 @@ def streaming_engine_factory( Yield a factory that constructs :class:`StreamingEngine` instances for tests. The fixture is parametrized over :data:`STREAMING_ENGINE_FIXTURE_PARAMS`. - Created engines are tracked and automatically shut down after the test. + SPMD-backend engines are created fresh per call and shut down at test + end. Ray-backend engines come from the session-scoped cache inside + :func:`build_streaming_engine` (via :meth:`RayEngine._reset`); the cache + owns their lifetime, so the per-test teardown skips ray engines. Parameters ---------- + request + Pytest fixture request used to lazily resolve ``ray_cluster`` only + for ray-parametrized variants. _streaming_engine_param Parametrized engine descriptor controlling backend and block size mode. spmd_comm @@ -123,6 +174,9 @@ def streaming_engine_factory( factory accepts optional :class:`StreamingOptions`, which are merged on top of the parametrized blocksize baseline. """ + if _streaming_engine_param.engine_name == "ray": + request.getfixturevalue("ray_cluster") + engines: list[StreamingEngine] = [] def factory(options: StreamingOptions | None = None) -> StreamingEngine: @@ -132,8 +186,12 @@ def factory(options: StreamingOptions | None = None) -> StreamingEngine: yield factory - for engine in reversed(engines): - engine.shutdown() + # RayEngine instances are owned by the session-level cache (one + # cached engine reused via ``_reset`` across all ray tests); only + # shut down per-call engines for non-ray backends. + if _streaming_engine_param.engine_name != "ray": + for engine in reversed(engines): + engine.shutdown() @pytest.fixture @@ -189,12 +247,18 @@ def engine( yield pl.GPUEngine(executor="in-memory", raise_on_fail=True) return + if _all_engine_param.engine_name == "ray": + request.getfixturevalue("ray_cluster") + spmd_comm: Communicator = request.getfixturevalue("spmd_comm") engine = build_streaming_engine(_all_engine_param, spmd_comm) try: yield engine finally: - engine.shutdown() + # RayEngine instances are owned by the session-level cache; + # SPMDEngine instances are owned by this fixture call. + if _all_engine_param.engine_name != "ray": + engine.shutdown() @pytest.fixture @@ -246,10 +310,14 @@ def pytest_configure(config): config.addinivalue_line( "markers", - "skip_on_streaming_engine(reason): skip the test for streaming " - '``engine`` variants (e.g. ``"spmd"``, ``"spmd-small"``) while ' - "still letting the in-memory variant run. Use this to track features " - "that have no multi-partition implementation", + "skip_on_streaming_engine(reason, *, backend=None): skip the test " + "for streaming ``engine`` variants while still letting the in-memory " + "variant run. Use this to track features that have no " + "multi-partition implementation. By default, all streaming variants " + '(e.g. ``"spmd"``, ``"spmd-small"``, ``"ray"``) are skipped; pass ' + '``backend="ray"`` (a single backend name) or ' + '``backend=("ray", ...)`` (a sequence of names) to skip only ' + "those backends.", ) # Ray's internal subprocess management leaks `/dev/null` file handles, and @@ -283,7 +351,21 @@ def pytest_configure(config): def pytest_collection_modifyitems(items): - """Apply ``skip_on_streaming_engine`` markers to streaming ``engine`` items.""" + """Apply ``skip_on_streaming_engine`` markers during test collection. + + The ``skip_on_streaming_engine`` marker supports an optional + ``backend=`` keyword: + + - If ``backend`` is ``None`` (default), the test is skipped for all + streaming engine variants. + - If ``backend`` is specified (e.g. ``"ray"`` or ``("ray",)``), the + test is skipped only for matching variants, including suffixed + forms such as ``"ray-small"``. + + The skip reason is taken from the first positional argument if + provided, otherwise from the ``reason=`` keyword, and defaults to + ``"unsupported on streaming engine"``. + """ for item in items: marker = item.get_closest_marker("skip_on_streaming_engine") if marker is None: @@ -291,9 +373,24 @@ def pytest_collection_modifyitems(items): callspec = getattr(item, "callspec", None) if callspec is None: continue - engine_param = callspec.params.get("_all_engine_param") + # Cover both fixture paths: ``_all_engine_param`` for the + # ``engine`` fixture, ``_streaming_engine_param`` for the + # ``streaming_engine`` / ``streaming_engine_factory`` fixtures. + engine_param = callspec.params.get("_all_engine_param") or callspec.params.get( + "_streaming_engine_param" + ) if engine_param is None or engine_param == "in-memory": continue + backend_filter = marker.kwargs.get("backend") + if backend_filter is not None: + engine_backend = engine_param.removesuffix("-small") + backends = ( + (backend_filter,) + if isinstance(backend_filter, str) + else tuple(backend_filter) + ) + if engine_backend not in backends: + continue reason = ( marker.args[0] if marker.args diff --git a/python/cudf_polars/tests/experimental/test_all_gather_host_data.py b/python/cudf_polars/tests/experimental/test_all_gather_host_data.py index aad7b341676..4b69a5ec812 100644 --- a/python/cudf_polars/tests/experimental/test_all_gather_host_data.py +++ b/python/cudf_polars/tests/experimental/test_all_gather_host_data.py @@ -61,8 +61,6 @@ def test_gather_cluster_info(streaming_engine) -> None: assert isinstance(info.gpu_uuid, str) # Each rank runs in its own process. assert len({info.pid for info in infos}) == streaming_engine.nranks - # Without allow_gpu_sharing, all UUIDs must be unique (enforced at init). - assert len({info.gpu_uuid for info in infos}) == streaming_engine.nranks def test_cluster_info_cuda_visible_devices(monkeypatch) -> None: diff --git a/python/cudf_polars/tests/experimental/test_filter.py b/python/cudf_polars/tests/experimental/test_filter.py index 4fb11df691c..e31c809fc34 100644 --- a/python/cudf_polars/tests/experimental/test_filter.py +++ b/python/cudf_polars/tests/experimental/test_filter.py @@ -36,6 +36,10 @@ def test_filter_pointwise(df, engine): assert_gpu_result_equal(query, engine=engine) +@pytest.mark.skip_on_streaming_engine( + "Worker-emitted warnings aren't visible to pytest.warns", + backend="ray", +) def test_filter_non_pointwise(df, engine): query = df.filter(pl.col("a") > pl.col("a").max()) with pytest.warns( diff --git a/python/cudf_polars/tests/experimental/test_groupby.py b/python/cudf_polars/tests/experimental/test_groupby.py index 8d6ac5927e9..ec5c2f73ec5 100644 --- a/python/cudf_polars/tests/experimental/test_groupby.py +++ b/python/cudf_polars/tests/experimental/test_groupby.py @@ -131,6 +131,10 @@ def test_groupby_std_var_ddof(df, engine, agg, ddof): @pytest.mark.parametrize("fallback_mode", ["silent", "raise", "warn", "foo"]) +@pytest.mark.skip_on_streaming_engine( + "Worker-emitted warnings aren't visible to pytest.warns", + backend="ray", +) def test_groupby_fallback(df, fallback_mode, streaming_engine_factory): streaming_engine = streaming_engine_factory( StreamingOptions(fallback_mode=fallback_mode), @@ -290,6 +294,11 @@ def test_groupby_count_type_mismatch(df, streaming_engine_factory): assert_gpu_result_equal(q, engine=streaming_engine, check_row_order=False) +@pytest.mark.skip_on_streaming_engine( + "patch.object only takes effect in the test process; ray actors run " + "the un-patched ShuffleManager.Inserter.insert_hash", + backend="ray", +) def test_shuffle_reduce_insert_finished_called_on_oom(streaming_engine_factory): streaming_engine = streaming_engine_factory( StreamingOptions(target_partition_size=10, max_rows_per_partition=5), diff --git a/python/cudf_polars/tests/experimental/test_hstack.py b/python/cudf_polars/tests/experimental/test_hstack.py index 17dede9dddc..066e308c8dd 100644 --- a/python/cudf_polars/tests/experimental/test_hstack.py +++ b/python/cudf_polars/tests/experimental/test_hstack.py @@ -65,6 +65,10 @@ def test_cse_agg_select(df, engine): assert_gpu_result_equal(q, engine=engine) +@pytest.mark.skip_on_streaming_engine( + "Worker-emitted warnings aren't visible to pytest.warns", + backend="ray", +) def test_hstack_non_scalar_cse_fallback(df, engine): # Non-scalar CSE (head(5)) skips the CSE transform, falling through to the # non-pointwise HStack fallback in lower_ir_node.register(HStack). diff --git a/python/cudf_polars/tests/experimental/test_io_multirank.py b/python/cudf_polars/tests/experimental/test_io_multirank.py index e1265602304..daf5f8f4b39 100644 --- a/python/cudf_polars/tests/experimental/test_io_multirank.py +++ b/python/cudf_polars/tests/experimental/test_io_multirank.py @@ -13,6 +13,7 @@ from cudf_polars.experimental.rapidsmpf.frontend.spmd import SPMDEngine from cudf_polars.testing.asserts import assert_sink_result_equal +from cudf_polars.testing.engine_utils import NUM_RANKS from cudf_polars.utils.config import Cluster, StreamingExecutor if TYPE_CHECKING: @@ -67,6 +68,8 @@ def engine( with RayEngine( executor_options=executor_options, + engine_options={"allow_gpu_sharing": True}, + num_ranks=NUM_RANKS, ray_init_options={"include_dashboard": False}, ) as eng: yield eng diff --git a/python/cudf_polars/tests/experimental/test_join.py b/python/cudf_polars/tests/experimental/test_join.py index 6a09ff95ef5..67fdb81b296 100644 --- a/python/cudf_polars/tests/experimental/test_join.py +++ b/python/cudf_polars/tests/experimental/test_join.py @@ -90,6 +90,10 @@ def test_join_then_shuffle(left, right, streaming_engine_factory): @pytest.mark.parametrize("reverse", [True, False]) @pytest.mark.parametrize("max_rows_per_partition", [3, 9]) +@pytest.mark.skip_on_streaming_engine( + "Worker-emitted warnings aren't visible to pytest.warns", + backend="ray", +) def test_join_conditional(reverse, max_rows_per_partition, streaming_engine_factory): streaming_engine = streaming_engine_factory( StreamingOptions( @@ -156,6 +160,10 @@ def test_join(left, right, how, reverse, streaming_engine_factory, options): @pytest.mark.parametrize("zlice", [(0, 2), (2, 2), (-2, None)]) +@pytest.mark.skip_on_streaming_engine( + "Worker-emitted warnings aren't visible to pytest.warns", + backend="ray", +) def test_join_and_slice(zlice, streaming_engine_factory): streaming_engine = streaming_engine_factory( StreamingOptions( @@ -220,6 +228,10 @@ def test_bloom_filter_join(how, streaming_engine_factory): @pytest.mark.parametrize( "maintain_order", ["left_right", "right_left", "left", "right"] ) +@pytest.mark.skip_on_streaming_engine( + "Worker-emitted warnings aren't visible to pytest.warns", + backend="ray", +) def test_join_maintain_order_fallback_streaming( left, right, maintain_order, streaming_engine_factory ): diff --git a/python/cudf_polars/tests/experimental/test_metadata.py b/python/cudf_polars/tests/experimental/test_metadata.py index 618087a27c5..a7500dc3630 100644 --- a/python/cudf_polars/tests/experimental/test_metadata.py +++ b/python/cudf_polars/tests/experimental/test_metadata.py @@ -63,6 +63,11 @@ def right() -> pl.LazyFrame: ), ], ) +@pytest.mark.skip_on_streaming_engine( + "evaluate_logical_plan is the SPMD evaluation path and doesn't dispatch " + "to ray actors", + backend="ray", +) def test_rapidsmpf_join_metadata( left: pl.LazyFrame, right: pl.LazyFrame, diff --git a/python/cudf_polars/tests/experimental/test_parallel.py b/python/cudf_polars/tests/experimental/test_parallel.py index 42365a113e2..f0404b3996b 100644 --- a/python/cudf_polars/tests/experimental/test_parallel.py +++ b/python/cudf_polars/tests/experimental/test_parallel.py @@ -54,6 +54,10 @@ def test_rename_concat(streaming_engine) -> None: assert_gpu_result_equal(q, engine=streaming_engine) +@pytest.mark.skip_on_streaming_engine( + "Worker-emitted warnings aren't visible to pytest.warns", + backend="ray", +) def test_fallback_on_concat_zlice(streaming_engine_factory) -> None: # Pin ``fallback_mode="warn"`` so the spmd-small baseline (which sets # ``SILENT``) doesn't suppress the warning this test asserts on. diff --git a/python/cudf_polars/tests/experimental/test_ray.py b/python/cudf_polars/tests/experimental/test_ray.py index 7365be733b3..ae8982c0c6b 100644 --- a/python/cudf_polars/tests/experimental/test_ray.py +++ b/python/cudf_polars/tests/experimental/test_ray.py @@ -21,6 +21,10 @@ if TYPE_CHECKING: from collections.abc import Iterator +# Local override: this module's tests exercise the multi-rank code path +# (cross-rank UCX barrier inside ``RankActor.reset``, multi-actor +# ``evaluate``), independent of ``engine_utils.NUM_RANKS`` (which is the +# default for the broader test fixture matrix). NUM_RANKS = 2 diff --git a/python/cudf_polars/tests/experimental/test_rolling.py b/python/cudf_polars/tests/experimental/test_rolling.py index 37de6f7f8a1..42fc8edfd66 100644 --- a/python/cudf_polars/tests/experimental/test_rolling.py +++ b/python/cudf_polars/tests/experimental/test_rolling.py @@ -19,6 +19,10 @@ def engine(streaming_engine_factory): ) +@pytest.mark.skip_on_streaming_engine( + "Worker-emitted warnings aren't visible to pytest.warns", + backend="ray", +) def test_rolling_datetime(request, engine): if not POLARS_VERSION_LT_136: request.applymarker( @@ -46,6 +50,10 @@ def test_rolling_datetime(request, engine): assert_gpu_result_equal(q, engine=engine) +@pytest.mark.skip_on_streaming_engine( + "Worker-emitted warnings aren't visible to pytest.warns", + backend="ray", +) def test_over_in_filter_unsupported(streaming_engine_factory) -> None: engine = streaming_engine_factory( StreamingOptions(max_rows_per_partition=1, fallback_mode="warn"), diff --git a/python/cudf_polars/tests/experimental/test_select.py b/python/cudf_polars/tests/experimental/test_select.py index 264f8b5aab1..f882e595273 100644 --- a/python/cudf_polars/tests/experimental/test_select.py +++ b/python/cudf_polars/tests/experimental/test_select.py @@ -54,6 +54,10 @@ def test_select(df, engine): @pytest.mark.parametrize("fallback_mode", ["silent", "raise", "warn", "foo"]) +@pytest.mark.skip_on_streaming_engine( + "Worker-emitted warnings aren't visible to pytest.warns", + backend="ray", +) def test_select_reduce_fallback(df, streaming_engine_factory, fallback_mode): engine = streaming_engine_factory( StreamingOptions(max_rows_per_partition=3, fallback_mode=fallback_mode), @@ -84,6 +88,10 @@ def test_select_reduce_fallback(df, streaming_engine_factory, fallback_mode): assert_gpu_result_equal(query, engine=engine) +@pytest.mark.skip_on_streaming_engine( + "Worker-emitted warnings aren't visible to pytest.warns", + backend="ray", +) def test_select_fill_null_with_strategy(df, engine): q = df.select(pl.col("a").forward_fill()) @@ -183,6 +191,10 @@ def test_select_mean_with_decimals(engine): assert_gpu_result_equal(q, engine=engine, check_dtypes=not POLARS_VERSION_LT_134) +@pytest.mark.skip_on_streaming_engine( + "Worker-emitted warnings aren't visible to pytest.warns", + backend="ray", +) def test_select_with_len(engine): # https://github.com/pola-rs/polars/issues/25592 df1 = pl.LazyFrame({"c0": [1] * 4}) diff --git a/python/cudf_polars/tests/experimental/test_statistics.py b/python/cudf_polars/tests/experimental/test_statistics.py index 965449b80f0..31f4caff951 100644 --- a/python/cudf_polars/tests/experimental/test_statistics.py +++ b/python/cudf_polars/tests/experimental/test_statistics.py @@ -12,6 +12,7 @@ from rapidsmpf.statistics import Statistics from cudf_polars.experimental.rapidsmpf.frontend.spmd import SPMDEngine +from cudf_polars.testing.engine_utils import NUM_RANKS if TYPE_CHECKING: from collections.abc import Iterator @@ -56,6 +57,10 @@ def engine( with RayEngine( rapidsmpf_options=rapidsmpf_options, executor_options=executor_options, + # See test_io_multirank.py for the rationale: coexist with + # the cached engine in ``build_streaming_engine``. + engine_options={"allow_gpu_sharing": True}, + num_ranks=NUM_RANKS, ray_init_options={"include_dashboard": False}, ) as engine: yield engine diff --git a/python/cudf_polars/tests/experimental/test_unique.py b/python/cudf_polars/tests/experimental/test_unique.py index 49d2b580300..6bd90d1b139 100644 --- a/python/cudf_polars/tests/experimental/test_unique.py +++ b/python/cudf_polars/tests/experimental/test_unique.py @@ -68,6 +68,10 @@ def test_unique_fallback(df, streaming_engine_factory): @pytest.mark.parametrize("maintain_order", [True, False]) @pytest.mark.parametrize("cardinality", [{}, {"y": 0.5}]) +@pytest.mark.skip_on_streaming_engine( + "Worker-emitted warnings aren't visible to pytest.warns", + backend="ray", +) def test_unique_select(df, streaming_engine_factory, maintain_order, cardinality): engine = streaming_engine_factory( StreamingOptions( @@ -104,6 +108,10 @@ def test_unique_head_tail(keep, zlice, streaming_engine_factory): ) +@pytest.mark.skip_on_streaming_engine( + "Worker-emitted warnings aren't visible to pytest.warns", + backend="ray", +) def test_unique_complex_slice_fallback(df, engine): """Test that unique with complex slice (offset >= 1) falls back correctly.""" # unique().slice(offset=5, length=10) has zlice[0] >= 1, triggering fallback diff --git a/python/cudf_polars/tests/testing/test_engine_utils.py b/python/cudf_polars/tests/testing/test_engine_utils.py index faf113502d6..1ed94acf93c 100644 --- a/python/cudf_polars/tests/testing/test_engine_utils.py +++ b/python/cudf_polars/tests/testing/test_engine_utils.py @@ -3,6 +3,9 @@ from __future__ import annotations +import os +from typing import TYPE_CHECKING + import pytest from cudf_polars.testing.engine_utils import ( @@ -10,6 +13,11 @@ create_streaming_options, ) +if TYPE_CHECKING: + from collections.abc import Iterator + + from rapidsmpf.communicator.communicator import Communicator + def test_engine_fixture_param_in_memory(): param = EngineFixtureParam("in-memory") @@ -55,3 +63,146 @@ def test_create_streaming_options_overrides_merge(): assert merged.max_rows_per_partition == 999 # Untouched baseline field is preserved. assert merged.target_partition_size == 1_000_000 + + +# --------------------------------------------------------------------------- +# Single-slot RayEngine cache: ``build_streaming_engine`` reuses one +# :class:`RayEngine` across consecutive ray-backend calls via +# :meth:`RayEngine._reset` to amortize actor-fork + UCXX bootstrap. +# --------------------------------------------------------------------------- + +ray = pytest.importorskip("ray") +from rapidsmpf.bootstrap import is_running_with_rrun # noqa: E402 + +from cudf_polars.experimental.rapidsmpf.frontend.options import ( # noqa: E402 + StreamingOptions, +) +from cudf_polars.experimental.rapidsmpf.frontend.ray import RayEngine # noqa: E402 +from cudf_polars.testing.engine_utils import ( # noqa: E402 + NUM_RANKS, + build_streaming_engine, + shutdown_streaming_engine_cache, +) + +_ray_cache_skip = pytest.mark.skipif( + is_running_with_rrun(), + reason="RayEngine must not be created from within an rrun cluster", +) + + +@pytest.fixture +def clean_cache() -> Iterator[None]: + """Ensure a pristine cache before and after each test.""" + shutdown_streaming_engine_cache() + try: + yield + finally: + shutdown_streaming_engine_cache() + + +@pytest.fixture(scope="module") +def _ray_session() -> Iterator[None]: + """One ``ray.init`` for the whole module so individual tests don't pay + the bootstrap cost more than once. + """ + import tempfile + + if not ray.is_initialized(): + os.environ.setdefault("RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO", "0") + # Per-process temp dir isolates this module's ray cluster from + # any concurrent pytest invocations sharing ``/tmp/ray``. + temp_dir = tempfile.mkdtemp(prefix=f"ray-pytest-{os.getpid()}-") + ray.init(include_dashboard=False, _temp_dir=temp_dir) + try: + yield + finally: + # Release any cached engine before tearing down the cluster so + # the engine's actor handles are released against a still-live + # cluster (the same teardown-order rule as the conftest). + shutdown_streaming_engine_cache() + ray.shutdown() + + +_RAY_PARAM = EngineFixtureParam(full_name="ray") +_SPMD_PARAM = EngineFixtureParam(full_name="spmd") + + +@_ray_cache_skip +@pytest.mark.usefixtures("_ray_session", "clean_cache") +def test_first_call_builds_fresh_ray_engine() -> None: + """The first ``build_streaming_engine`` call for ``"ray"`` builds an engine.""" + engine = build_streaming_engine(_RAY_PARAM, spmd_comm=None) + assert isinstance(engine, RayEngine) + # Built with the cache's ``NUM_RANKS`` baseline. + assert engine.nranks == NUM_RANKS + + +@_ray_cache_skip +@pytest.mark.usefixtures("_ray_session", "clean_cache") +def test_second_call_reuses_cached_engine() -> None: + """Subsequent ray calls return the same engine and same actor processes.""" + e1 = build_streaming_engine(_RAY_PARAM, spmd_comm=None) + assert isinstance(e1, RayEngine) + pids_before = e1._run(os.getpid) + actors_before = list(e1.rank_actors) + + e2 = build_streaming_engine( + _RAY_PARAM, + spmd_comm=None, + options=StreamingOptions(max_rows_per_partition=42), + ) + assert isinstance(e2, RayEngine) + + assert e2 is e1 + actors_after = list(e2.rank_actors) + pids_after = e2._run(os.getpid) + assert all(a is b for a, b in zip(actors_before, actors_after, strict=True)) + assert pids_before == pids_after + + +@_ray_cache_skip +@pytest.mark.usefixtures("_ray_session", "clean_cache") +def test_reset_propagates_options() -> None: + """The polars-layer config reflects the most recent ``options`` arg.""" + build_streaming_engine( + _RAY_PARAM, + spmd_comm=None, + options=StreamingOptions(max_rows_per_partition=10), + ) + engine = build_streaming_engine( + _RAY_PARAM, + spmd_comm=None, + options=StreamingOptions(max_rows_per_partition=99), + ) + assert engine.config["executor_options"]["max_rows_per_partition"] == 99 + + +@_ray_cache_skip +@pytest.mark.usefixtures("_ray_session", "clean_cache") +def test_shutdown_cache_clears_slot() -> None: + """``shutdown_streaming_engine_cache`` releases the slot; next call rebuilds.""" + e1 = build_streaming_engine(_RAY_PARAM, spmd_comm=None) + assert isinstance(e1, RayEngine) + pids_before = e1._run(os.getpid) + + shutdown_streaming_engine_cache() + + e2 = build_streaming_engine(_RAY_PARAM, spmd_comm=None) + assert isinstance(e2, RayEngine) + assert e2 is not e1 + pids_after = e2._run(os.getpid) + # New engine = new actor processes. + assert set(pids_before).isdisjoint(set(pids_after)) + + +@_ray_cache_skip +@pytest.mark.usefixtures("clean_cache") +def test_spmd_branch_does_not_use_cache(spmd_comm: Communicator) -> None: + """``"spmd"`` returns a fresh engine each call — no cache leakage.""" + e1 = build_streaming_engine(_SPMD_PARAM, spmd_comm=spmd_comm) + e2 = build_streaming_engine(_SPMD_PARAM, spmd_comm=spmd_comm) + try: + assert e1 is not e2 + finally: + e1.shutdown() + e2.shutdown()