Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
1b165a8
Test Ray and Dask engine
madsbk May 4, 2026
7cdedd0
CI: install Ray
madsbk May 6, 2026
8feaf42
cleanup
madsbk May 6, 2026
e26e24b
CI: restrict UCX to TCP and CUDA transports
madsbk May 6, 2026
26daa0e
NUM_RANKS = 2
madsbk May 6, 2026
b294bf8
spmd_engine_factory
madsbk May 6, 2026
e5427fa
remove noise
madsbk May 6, 2026
75d234c
Add UCX-specific configs to cudf-polars CI runners
pentschev May 6, 2026
41d71b3
Remove UCX_TLS config from cudf-polars CI script
pentschev May 6, 2026
9a05eb8
Remove unnecessary blank line
pentschev May 6, 2026
0c26c09
Fix container option matrix
pentschev May 6, 2026
f3dc183
Merge remote-tracking branch 'upstream/main' into engine_reset-test-d…
pentschev May 6, 2026
1996620
ray>=2.55.1
madsbk May 6, 2026
f2005f3
Merge branch 'main' of github.com:rapidsai/cudf into engine_reset-tes…
madsbk May 6, 2026
673df5b
Merge branch 'engine_reset-test-dask-and-ray' of github.com:madsbk/cu…
madsbk May 6, 2026
27b631c
ray>=2.55.1
madsbk May 6, 2026
d280fab
Merge branch 'main' of github.com:rapidsai/cudf into engine_reset-tes…
madsbk May 7, 2026
5c4d952
Don't limit to spmd_engine, ignore warning locally instead
madsbk May 7, 2026
8c1819f
warns_on_spmd
madsbk May 7, 2026
d3904b7
cudf-polars: fix ConditionalJoin fallback on asymmetric partition counts
madsbk May 7, 2026
6482434
more warns_on_spmd
madsbk May 7, 2026
f79fb71
test_rapidsmpf_join_metadata(): https://github.com/rapidsai/cudf/pull…
madsbk May 7, 2026
cd78518
Merge branch 'main' of github.com:rapidsai/cudf into engine_reset-tes…
madsbk May 7, 2026
56fc913
xskip: multi-rank fallback
madsbk May 7, 2026
88f6b5d
cover
madsbk May 7, 2026
734df2e
test_over_in_filter_unsupported: fix warning
madsbk May 7, 2026
efc904b
Merge branch 'main' of github.com:rapidsai/cudf into engine_reset-tes…
madsbk May 7, 2026
83059fd
ConditionalJoin cleanup
madsbk May 7, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/pr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,7 @@ jobs:
# (rapidsmpf compatibility already validated in rapidsmpf CI)
matrix_filter: map(select(.ARCH == "amd64")) | group_by(.CUDA_VER|split(".")|map(tonumber)|.[0]) | map(max_by([(.PY_VER|split(".")|map(tonumber)), (.CUDA_VER|split(".")|map(tonumber))]))
build_type: pull-request
container-options: "--cap-add CAP_SYS_PTRACE --shm-size=8g --ulimit=nofile=1000000:1000000"
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes are necessary to provide enough resources for UCX. For reference, we already do the same for UCXX and RapidsMPF, both need it for the same reason:

script: "ci/test_cudf_polars_experimental.sh"
cudf-polars-polars-tests:
needs: [wheel-build-cudf-polars, changed-files]
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ jobs:
matrix_filter: map(select(.ARCH == "amd64")) | group_by(.CUDA_VER|split(".")|map(tonumber)|.[0]) | map(max_by([(.PY_VER|split(".")|map(tonumber)), (.CUDA_VER|split(".")|map(tonumber))]))
build_type: ${{ inputs.build_type }}
branch: ${{ inputs.branch }}
container-options: "--cap-add CAP_SYS_PTRACE --shm-size=8g --ulimit=nofile=1000000:1000000"
date: ${{ inputs.date }}
sha: ${{ inputs.sha }}
script: "ci/test_cudf_polars_experimental.sh"
Expand Down
2 changes: 1 addition & 1 deletion ci/run_cudf_polars_experimental_pytests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ set -euo pipefail
# Support invoking outside the script directory
cd "$(dirname "$(realpath "${BASH_SOURCE[0]}")")"/../python/cudf_polars/

echo "Running the full cudf-polars test suite with both the in-memory and spmd engine"
echo "Running the full cudf-polars test suite"
python -m pytest --cache-clear "$@" tests
2 changes: 1 addition & 1 deletion ci/test_cudf_polars_experimental.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ rapids-pip-retry install \
-v \
--prefer-binary \
--constraint "${PIP_CONSTRAINT}" \
"$(echo "${CUDF_POLARS_WHEELHOUSE}"/cudf_polars_"${RAPIDS_PY_CUDA_SUFFIX}"*.whl)[test,experimental]" \
"$(echo "${CUDF_POLARS_WHEELHOUSE}"/cudf_polars_"${RAPIDS_PY_CUDA_SUFFIX}"*.whl)[test,experimental,ray]" \
"$(echo "${LIBCUDF_WHEELHOUSE}"/libcudf_"${RAPIDS_PY_CUDA_SUFFIX}"*.whl)" \
"$(echo "${PYLIBCUDF_WHEELHOUSE}"/pylibcudf_"${RAPIDS_PY_CUDA_SUFFIX}"*.whl)"

Expand Down
13 changes: 13 additions & 0 deletions dependencies.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,14 @@ files:
key: experimental
includes:
- run_cudf_polars_experimental
py_run_cudf_polars_ray:
output: pyproject
pyproject_dir: python/cudf_polars
extras:
table: project.optional-dependencies
key: ray
includes:
- depends_on_ray
py_test_cudf_polars:
output: pyproject
pyproject_dir: python/cudf_polars
Expand Down Expand Up @@ -1290,6 +1298,11 @@ dependencies:
- matrix:
packages:
- *rapidsmpf_unsuffixed
depends_on_ray:
common:
- output_types: [conda, requirements, pyproject]
packages:
- ray>=2.55.1
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In conda/recipes/cudf-polars/recipe.yaml, we'll probably want to add a

  run_constraints:
    - ray >=2.55.1

to mirror this constraint

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ray is still optional, this is just for the CI testing

Copy link
Copy Markdown
Contributor

@mroeschke mroeschke May 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, IIRC run-constraints is like specifying a version constraint for optional runtime dependencies (like what got added to the pyproject.toml)

https://rattler-build.prefix.dev/latest/reference/recipe_file/#run-constraints

Packages that are optional at runtime but must obey the supplied additional constraint if they are installed.

But can be done in a follow up since the CI is currently green

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I didn't know that, thanks.
But yes, let's do it in a follow-up PR.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure thing, opened #22414

depends_on_rapids_logger:
common:
- output_types: [conda, requirements, pyproject]
Expand Down
20 changes: 11 additions & 9 deletions python/cudf_polars/cudf_polars/experimental/join.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,20 +164,22 @@ def _(
left, pi_left = rec(left)
right, pi_right = rec(right)

# Fallback to single partition on the smaller table
# Fallback to single partition on the smaller table whenever either
# side has more than one partition.
left_count = pi_left[left].count
right_count = pi_right[right].count
output_count = max(left_count, right_count)
fallback_msg = "ConditionalJoin not supported for multiple partitions."
if left_count < right_count:
if left_count > 1 or dynamic_planning:
if output_count > 1 or dynamic_planning:
if left_count < right_count:
left = Repartition(left.schema, left)
pi_left[left] = PartitionInfo(count=1)
_fallback_inform(fallback_msg, config_options)
elif right_count > 1 or dynamic_planning:
right = Repartition(right.schema, right)
pi_right[right] = PartitionInfo(count=1)
_fallback_inform(fallback_msg, config_options)
else:
right = Repartition(right.schema, right)
pi_right[right] = PartitionInfo(count=1)
_fallback_inform(
"ConditionalJoin not supported for multiple partitions.",
config_options,
)

# Reconstruct and return
new_node = ir.reconstruct([left, right])
Expand Down
43 changes: 43 additions & 0 deletions python/cudf_polars/cudf_polars/testing/engine_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

if TYPE_CHECKING:
from collections.abc import Mapping
from contextlib import AbstractContextManager

import polars as pl

Expand All @@ -21,6 +22,15 @@
STREAMING_ENGINE_FIXTURE_PARAMS: list[str] = []
if importlib.util.find_spec("rapidsmpf") is not None:
STREAMING_ENGINE_FIXTURE_PARAMS.extend(["spmd", "spmd-small"])
# ``DaskEngine`` and ``RayEngine`` both reject construction inside an
# ``rrun`` cluster.
from rapidsmpf.bootstrap import is_running_with_rrun as _is_running_with_rrun

if not _is_running_with_rrun(): # pragma: no cover
if importlib.util.find_spec("distributed") is not None:
STREAMING_ENGINE_FIXTURE_PARAMS.append("dask")
if importlib.util.find_spec("ray") is not None:
STREAMING_ENGINE_FIXTURE_PARAMS.append("ray")
ALL_ENGINE_FIXTURE_PARAMS = ["in-memory", *STREAMING_ENGINE_FIXTURE_PARAMS]


Expand Down Expand Up @@ -63,6 +73,34 @@ def is_streaming_engine(obj: Any) -> bool:
return isinstance(obj, StreamingEngine)


def warns_on_spmd( # pragma: no cover; rapidsmpf-only path
engine: Any,
*args: Any,
when: bool = True,
**kwargs: Any,
) -> AbstractContextManager[Any]:
"""
``pytest.warns(*args, **kwargs)`` on SPMD; ``nullcontext`` otherwise.

``pytest.warns`` only captures warnings emitted in the test process. On
multi-process backends (``DaskEngine``, ``RayEngine``) the fallback
warning fires on workers/actors and only appears in worker logs/stdout,
so the assertion is replaced with a passthrough on those backends.

The optional ``when`` kwarg lets callers compose an additional gate (e.g.
a parametrize value) without an outer ``if``.
"""
import contextlib

import pytest

from cudf_polars.experimental.rapidsmpf.frontend.spmd import SPMDEngine

if when and isinstance(engine, SPMDEngine):
return pytest.warns(*args, **kwargs)
return contextlib.nullcontext()


def create_streaming_options(
blocksize_mode: Literal["medium", "small"],
overrides: StreamingOptions | None = None,
Expand All @@ -87,13 +125,17 @@ def create_streaming_options(
from cudf_polars.experimental.rapidsmpf.frontend.options import StreamingOptions
from cudf_polars.utils.config import StreamingFallbackMode

# ``allow_gpu_sharing=True`` is always set so the cached multi-rank
# engines (Dask workers, Ray actors with ``num_ranks > 1``) don't trip
# the UUID-collision guard on every ``_reset(...)``.
match blocksize_mode:
case "medium":
baseline = StreamingOptions(
max_rows_per_partition=50,
dynamic_planning={},
target_partition_size=1_000_000,
raise_on_fail=True,
allow_gpu_sharing=True,
)
case "small":
baseline = StreamingOptions(
Expand All @@ -102,6 +144,7 @@ def create_streaming_options(
target_partition_size=10,
raise_on_fail=True,
fallback_mode=StreamingFallbackMode.SILENT,
allow_gpu_sharing=True,
)
case _: # pragma: no cover
raise ValueError(f"Unknown blocksize_mode: {blocksize_mode!r}")
Expand Down
3 changes: 3 additions & 0 deletions python/cudf_polars/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ rapidsmpf = [
"pyarrow>=19.0.0,<24",
"rapidsmpf==26.6.*,>=0.0.0a0",
] # This list was generated by `rapids-dependency-file-generator`. To make changes, edit ../../dependencies.yaml and run `rapids-dependency-file-generator`.
ray = [
"ray>=2.55.1",
] # This list was generated by `rapids-dependency-file-generator`. To make changes, edit ../../dependencies.yaml and run `rapids-dependency-file-generator`.

[project.urls]
Homepage = "https://github.com/rapidsai/cudf"
Expand Down
72 changes: 67 additions & 5 deletions python/cudf_polars/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@
StreamingEngines: TypeAlias = Mapping[str, StreamingEngine]


# Number of ranks for multi-rank streaming engines that share one GPU
# (currently ``RayEngine``). Single-GPU dev hosts and CI runners require
# ``allow_gpu_sharing=True`` to oversubscribe one device across actors.
NUM_RANKS = 2
Comment thread
madsbk marked this conversation as resolved.


@pytest.fixture(params=[False, True], ids=["no_nulls", "nulls"], scope="session")
def with_nulls(request):
return request.param
Expand Down Expand Up @@ -89,6 +95,27 @@ def streaming_engines() -> Generator[StreamingEngines, None, None]:
)

engines: dict[str, StreamingEngine] = {"spmd": SPMDEngine(comm=comm)}

if "dask" in STREAMING_ENGINE_FIXTURE_PARAMS: # pragma: no cover
from cudf_polars.experimental.rapidsmpf.frontend.dask import DaskEngine

engines["dask"] = DaskEngine(engine_options={"allow_gpu_sharing": True})

if "ray" in STREAMING_ENGINE_FIXTURE_PARAMS: # pragma: no cover
from cudf_polars.experimental.rapidsmpf.frontend.ray import RayEngine

# Always pin ``num_ranks`` so the cached engine has a deterministic
# actor count regardless of how many GPUs the host happens to have;
# otherwise ``RayEngine`` defaults to ``get_num_gpus_in_ray_cluster()``
# and tests that depend on rank-count behavior (e.g. fast-count
# parquet, concat) become non-portable. Pinning ``num_ranks`` requires
# ``allow_gpu_sharing=True`` (production guard).
Comment thread
madsbk marked this conversation as resolved.
engines["ray"] = RayEngine(
num_ranks=NUM_RANKS,
engine_options={"allow_gpu_sharing": True},
ray_init_options={"include_dashboard": False},
)

try:
yield engines
finally:
Expand All @@ -108,6 +135,28 @@ def spmd_engine(streaming_engines: StreamingEngines) -> SPMDEngine:
return engine


@pytest.fixture
def spmd_engine_factory(
streaming_engines: StreamingEngines,
) -> Callable[..., SPMDEngine]:
"""
Return a factory that yields the shared :class:`SPMDEngine`.

Use this in place of :func:`streaming_engine_factory` for tests that
must run on SPMD only.
"""
from cudf_polars.experimental.rapidsmpf.frontend.spmd import SPMDEngine

param = EngineFixtureParam(full_name="spmd")

def factory(options: StreamingOptions | None = None) -> SPMDEngine:
engine = build_streaming_engine(param, streaming_engines, options)
assert isinstance(engine, SPMDEngine)
return engine

return factory


@pytest.fixture(params=STREAMING_ENGINE_FIXTURE_PARAMS)
def _streaming_engine_param(request: pytest.FixtureRequest) -> EngineFixtureParam:
"""Parametrization helper to run tests for each streaming engine variant."""
Expand Down Expand Up @@ -246,10 +295,9 @@ def pytest_configure(config):

config.addinivalue_line(
"markers",
"skip_on_streaming_engine(reason): skip the test for streaming "
'``engine`` variants (e.g. ``"spmd"``, ``"spmd-small"``) while '
"still letting the in-memory variant run. Use this to track features "
"that have no multi-partition implementation",
"skip_on_streaming_engine(reason, *, engine=None): skip the test for "
'streaming ``engine`` variants (e.g. ``"spmd"``, ``"spmd-small"``, '
'``"dask"``, ``"ray"``) while still allowing the in-memory variant to run.',
)

# Ray's internal subprocess management leaks `/dev/null` file handles, and
Expand All @@ -275,9 +323,23 @@ def pytest_collection_modifyitems(items):
callspec = getattr(item, "callspec", None)
if callspec is None:
continue
engine_param = callspec.params.get("_all_engine_param")
# Tests bind to either ``engine`` (parametrized via ``_all_engine_param``)
# or ``streaming_engine`` / ``streaming_engine_factory`` (parametrized via
# ``_streaming_engine_param``). Check both.
engine_param = callspec.params.get("_all_engine_param") or callspec.params.get(
"_streaming_engine_param"
)
if engine_param is None or engine_param == "in-memory":
continue
engine_filter = marker.kwargs.get("engine")
if engine_filter is not None:
if isinstance(engine_filter, str):
engine_filter = (engine_filter,)
# Strip the ``-small`` suffix so ``"spmd-small"`` matches
# ``engine=("spmd",)``.
engine_name = engine_param.removesuffix("-small")
if engine_name not in engine_filter:
continue
reason = (
marker.args[0]
if marker.args
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ def test_gather_cluster_info(streaming_engine) -> None:
assert isinstance(info.gpu_uuid, str)
# Each rank runs in its own process.
assert len({info.pid for info in infos}) == streaming_engine.nranks
# Without allow_gpu_sharing, all UUIDs must be unique (enforced at init).
assert len({info.gpu_uuid for info in infos}) == streaming_engine.nranks


def test_cluster_info_cuda_visible_devices(monkeypatch) -> None:
Expand Down
21 changes: 11 additions & 10 deletions python/cudf_polars/tests/experimental/test_dataframescan.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,19 +60,20 @@ def test_parallel_dataframescan(df, streaming_engine_factory, max_rows_per_parti
assert count == 1


@pytest.mark.xfail(
reason=(
"Multi-rank Union interleaves child outputs across ranks: client "
"receives [rank0_A, rank0_B, rank1_A, rank1_B] instead of the "
"polars-CPU [A, B]. Tracked in "
"https://github.com/rapidsai/cudf/issues/22376."
),
strict=False,
)
def test_dataframescan_concat(df, streaming_engine_factory):
def test_dataframescan_concat(request, df, streaming_engine_factory):
streaming_engine = streaming_engine_factory(
StreamingOptions(max_rows_per_partition=1_000),
)
if streaming_engine.nranks > 1:
# Multi-rank Union interleaves child outputs across ranks: client
# receives [rank0_A, rank0_B, rank1_A, rank1_B] instead of the
# polars-CPU [A, B].
request.applymarker(
pytest.mark.xfail(
reason="https://github.com/rapidsai/cudf/issues/22376",
strict=False,
)
)
df2 = pl.concat([df, df])
assert_gpu_result_equal(df2, engine=streaming_engine)

Expand Down
9 changes: 5 additions & 4 deletions python/cudf_polars/tests/experimental/test_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,11 @@

from cudf_polars.experimental.rapidsmpf.frontend.options import StreamingOptions
from cudf_polars.testing.asserts import assert_gpu_result_equal
from cudf_polars.testing.engine_utils import warns_on_spmd


@pytest.fixture
def engine(streaming_engine_factory):
# ``fallback_mode="warn"`` overrides the small-blocksize baseline (which
# sets SILENT) so ``test_filter_non_pointwise`` can assert on the warning.
return streaming_engine_factory(
StreamingOptions(max_rows_per_partition=3, fallback_mode="warn"),
)
Expand All @@ -38,7 +37,9 @@ def test_filter_pointwise(df, engine):

def test_filter_non_pointwise(df, engine):
query = df.filter(pl.col("a") > pl.col("a").max())
with pytest.warns(
UserWarning, match="This filter is not supported for multiple partitions."
with warns_on_spmd(
engine,
UserWarning,
match="This filter is not supported for multiple partitions.",
):
assert_gpu_result_equal(query, engine=engine)
8 changes: 6 additions & 2 deletions python/cudf_polars/tests/experimental/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@ def test_groupby_std_var_ddof(df, engine, agg, ddof):


@pytest.mark.parametrize("fallback_mode", ["silent", "raise", "warn", "foo"])
def test_groupby_fallback(df, fallback_mode, streaming_engine_factory):
streaming_engine = streaming_engine_factory(
def test_groupby_fallback(df, fallback_mode, spmd_engine_factory):
streaming_engine = spmd_engine_factory(
StreamingOptions(fallback_mode=fallback_mode),
)
match = "Failed to decompose groupby aggs"
Expand Down Expand Up @@ -287,6 +287,10 @@ def test_groupby_count_type_mismatch(df, streaming_engine_factory):
assert_gpu_result_equal(q, engine=streaming_engine, check_row_order=False)


@pytest.mark.skip_on_streaming_engine(
"patch.object on ShuffleManager.Inserter doesn't reach worker processes",
engine=("dask", "ray"),
)
def test_shuffle_reduce_insert_finished_called_on_oom(streaming_engine_factory):
streaming_engine = streaming_engine_factory(
StreamingOptions(target_partition_size=10, max_rows_per_partition=5),
Expand Down
Loading
Loading