Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ci/run_cudf_polars_experimental_pytests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ echo "Running the full cudf-polars test suite with both the in-memory and spmd e
timeout 10m python -m pytest --cache-clear "$@" tests --ignore=tests/experimental/legacy

echo "Running experimental legacy tests with the 'rapidsmpf' runtime and a 'distributed' cluster"
timeout 10m python -m pytest --cache-clear "$@" "tests/experimental/legacy" \
timeout 10m python -m pytest --cache-clear "$@" "tests/experimental/legacy" -v \
--executor streaming \
--cluster distributed \
--runtime rapidsmpf
2 changes: 1 addition & 1 deletion ci/test_cudf_polars_experimental.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ rapids-pip-retry install \
-v \
--prefer-binary \
--constraint "${PIP_CONSTRAINT}" \
"$(echo "${CUDF_POLARS_WHEELHOUSE}"/cudf_polars_"${RAPIDS_PY_CUDA_SUFFIX}"*.whl)[test,experimental,rapidsmpf]" \
"$(echo "${CUDF_POLARS_WHEELHOUSE}"/cudf_polars_"${RAPIDS_PY_CUDA_SUFFIX}"*.whl)[test,experimental,rapidsmpf,ray]" \
"$(echo "${LIBCUDF_WHEELHOUSE}"/libcudf_"${RAPIDS_PY_CUDA_SUFFIX}"*.whl)" \
"$(echo "${PYLIBCUDF_WHEELHOUSE}"/pylibcudf_"${RAPIDS_PY_CUDA_SUFFIX}"*.whl)"

Expand Down
13 changes: 13 additions & 0 deletions dependencies.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,14 @@ files:
includes:
- depends_on_rapidsmpf
- pyarrow_run
py_run_cudf_polars_ray:
output: pyproject
pyproject_dir: python/cudf_polars
extras:
table: project.optional-dependencies
key: ray
includes:
- depends_on_ray
py_test_cudf_polars:
output: pyproject
pyproject_dir: python/cudf_polars
Expand Down Expand Up @@ -1260,6 +1268,11 @@ dependencies:
- matrix:
packages:
- *rapidsmpf_unsuffixed
depends_on_ray:
common:
- output_types: [conda, requirements, pyproject]
packages:
- ray>=2.0
depends_on_rapids_logger:
common:
- output_types: [conda, requirements, pyproject]
Expand Down
110 changes: 101 additions & 9 deletions python/cudf_polars/cudf_polars/testing/engine_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from __future__ import annotations

import importlib.util
import os
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Literal

Expand All @@ -21,8 +22,25 @@
STREAMING_ENGINE_FIXTURE_PARAMS: list[str] = []
if importlib.util.find_spec("rapidsmpf") is not None:
STREAMING_ENGINE_FIXTURE_PARAMS.extend(["spmd", "spmd-small"])
# ``"ray"`` is gated additionally on ``ray`` being installed and not
# running under ``rrun`` (``RayEngine`` raises if it is).
if importlib.util.find_spec("ray") is not None: # pragma: no cover
from rapidsmpf.bootstrap import is_running_with_rrun as _is_running_with_rrun

if not _is_running_with_rrun():
STREAMING_ENGINE_FIXTURE_PARAMS.append("ray")
del _is_running_with_rrun
ALL_ENGINE_FIXTURE_PARAMS = ["in-memory", *STREAMING_ENGINE_FIXTURE_PARAMS]

# Default rank count for test-side ``RayEngine`` constructions. Multi-
# rank exercises the cross-rank UCXX barrier inside
# :meth:`RankActor.reset` and the multi-actor evaluate path. All
# test-side ``RayEngine(...)`` calls should pass ``num_ranks=NUM_RANKS``
# (paired with ``engine_options={"allow_gpu_sharing": True}``) so they
# coexist on a single GPU with the cached engine in
# :func:`build_streaming_engine`.
NUM_RANKS: int = 1


@dataclass
class EngineFixtureParam:
Expand Down Expand Up @@ -110,6 +128,16 @@ def create_streaming_options(
return StreamingOptions(**{**baseline.to_dict(), **overrides.to_dict()})


# Single-slot module-level cache for the most recently built
# :class:`RayEngine`. Subsequent ``build_streaming_engine`` calls for the
# ``"ray"`` backend reuse this engine via :meth:`RayEngine._reset` instead
# of paying the full actor-fork + UCXX bootstrap cost again. Released by
# :func:`shutdown_streaming_engine_cache`, which test session-teardown
# fixtures must call before ``ray.shutdown()`` so the engine's actor
# handles aren't invalidated against a torn-down cluster.
_cached_ray_engine: StreamingEngine | None = None


def build_streaming_engine(
param: EngineFixtureParam,
spmd_comm: Communicator,
Expand All @@ -118,12 +146,19 @@ def build_streaming_engine(
"""
Build a :class:`StreamingEngine` from an engine fixture parameter.

For ``param.engine_name == "ray"`` a single :class:`RayEngine` is
cached at module scope and reused via :meth:`RayEngine._reset` on
subsequent calls — this amortizes the multi-second actor-fork +
UCXX-bootstrap cost across all tests in a session. Other backends
are constructed fresh on every call.

Parameters
----------
param
Decoded engine fixture parameter describing the backend and block size mode.
spmd_comm
Communicator used when constructing an :class:`SPMDEngine`.
Communicator used when constructing an :class:`SPMDEngine`. Unused
for the ``"ray"`` branch.
options
Optional streaming options to merge on top of the baseline selected by
``param.blocksize_mode``.
Expand All @@ -132,21 +167,78 @@ def build_streaming_engine(
-------
A streaming engine matching ``param``.
"""
from cudf_polars.experimental.rapidsmpf.frontend.spmd import SPMDEngine

streaming_options = create_streaming_options(param.blocksize_mode, options)
common_kwargs: dict[str, Any] = {
"rapidsmpf_options": streaming_options.to_rapidsmpf_options(),
"executor_options": streaming_options.to_executor_options(),
"engine_options": streaming_options.to_engine_options(),
}
match param.engine_name:
case "spmd":
return SPMDEngine(
comm=spmd_comm,
rapidsmpf_options=streaming_options.to_rapidsmpf_options(),
executor_options=streaming_options.to_executor_options(),
engine_options=streaming_options.to_engine_options(),
)
from cudf_polars.experimental.rapidsmpf.frontend.spmd import SPMDEngine

return SPMDEngine(comm=spmd_comm, **common_kwargs)
case "ray": # pragma: no cover
return _build_or_reset_ray_engine(common_kwargs)
case _: # pragma: no cover
raise AssertionError(f"Unknown streaming backend: {param.engine_name!r}")


def _build_or_reset_ray_engine( # pragma: no cover
common_kwargs: dict[str, Any],
) -> StreamingEngine:
"""
Return the cached :class:`RayEngine`, ``_reset``-ed to ``common_kwargs``.

Builds a fresh engine on the first call. On subsequent calls reuses
the cached engine and applies ``common_kwargs`` via
:meth:`RayEngine._reset` — preserving the rank actors and the UCXX
communicator. ``allow_gpu_sharing=True`` is injected on every call
because :class:`StreamingEngine` re-validates the
``num_ranks > 1`` invariant on every reset.
"""
global _cached_ray_engine # noqa: PLW0603
# ``_reset`` replaces engine state in full, so ``allow_gpu_sharing``
# must be re-asserted on every call to satisfy the
# ``num_ranks > 1`` validation in ``StreamingEngine.__init__``.
engine_options = {
**common_kwargs.get("engine_options", {}),
"allow_gpu_sharing": True,
}
common_kwargs = {**common_kwargs, "engine_options": engine_options}
if _cached_ray_engine is None:
from cudf_polars.experimental.rapidsmpf.frontend.ray import RayEngine

# Prevent Ray from overriding ``CUDA_VISIBLE_DEVICES`` to ``""``
# when a worker process starts with zero visible GPUs (e.g. the
# test driver itself). In a future Ray release this becomes the
# default; setting it eagerly here keeps test behaviour stable
# across Ray versions.
os.environ.setdefault("RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO", "0")
_cached_ray_engine = RayEngine(
**common_kwargs,
num_ranks=NUM_RANKS,
ray_init_options={"include_dashboard": False},
)
else:
_cached_ray_engine._reset(**common_kwargs) # type: ignore[attr-defined]
return _cached_ray_engine


def shutdown_streaming_engine_cache() -> None:
"""
Release the cached :class:`RayEngine`, if any.

Test session-teardown fixtures **must** call this before
``ray.shutdown()`` so the engine's actor handles are released
against a still-live cluster.
"""
global _cached_ray_engine # noqa: PLW0603
if _cached_ray_engine is not None: # pragma: no cover
_cached_ray_engine.shutdown()
_cached_ray_engine = None


def get_blocksize_mode(obj: pl.GPUEngine) -> Literal["medium", "small"]:
"""
Infer the block size mode for a GPU engine.
Expand Down
3 changes: 3 additions & 0 deletions python/cudf_polars/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ rapidsmpf = [
"pyarrow>=19.0.0,<24",
"rapidsmpf==26.6.*,>=0.0.0a0",
] # This list was generated by `rapids-dependency-file-generator`. To make changes, edit ../../dependencies.yaml and run `rapids-dependency-file-generator`.
ray = [
"ray>=2.0",
] # This list was generated by `rapids-dependency-file-generator`. To make changes, edit ../../dependencies.yaml and run `rapids-dependency-file-generator`.

[project.urls]
Homepage = "https://github.com/rapidsai/cudf"
Expand Down
Loading
Loading