Skip to content
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
1b165a8
Test Ray and Dask engine
madsbk May 4, 2026
7cdedd0
CI: install Ray
madsbk May 6, 2026
8feaf42
cleanup
madsbk May 6, 2026
e26e24b
CI: restrict UCX to TCP and CUDA transports
madsbk May 6, 2026
26daa0e
NUM_RANKS = 2
madsbk May 6, 2026
b294bf8
spmd_engine_factory
madsbk May 6, 2026
e5427fa
remove noise
madsbk May 6, 2026
75d234c
Add UCX-specific configs to cudf-polars CI runners
pentschev May 6, 2026
41d71b3
Remove UCX_TLS config from cudf-polars CI script
pentschev May 6, 2026
9a05eb8
Remove unnecessary blank line
pentschev May 6, 2026
0c26c09
Fix container option matrix
pentschev May 6, 2026
f3dc183
Merge remote-tracking branch 'upstream/main' into engine_reset-test-d…
pentschev May 6, 2026
1996620
ray>=2.55.1
madsbk May 6, 2026
f2005f3
Merge branch 'main' of github.com:rapidsai/cudf into engine_reset-tes…
madsbk May 6, 2026
673df5b
Merge branch 'engine_reset-test-dask-and-ray' of github.com:madsbk/cu…
madsbk May 6, 2026
27b631c
ray>=2.55.1
madsbk May 6, 2026
d280fab
Merge branch 'main' of github.com:rapidsai/cudf into engine_reset-tes…
madsbk May 7, 2026
5c4d952
Don't limit to spmd_engine, ignore warning locally instead
madsbk May 7, 2026
8c1819f
warns_on_spmd
madsbk May 7, 2026
d3904b7
cudf-polars: fix ConditionalJoin fallback on asymmetric partition counts
madsbk May 7, 2026
6482434
more warns_on_spmd
madsbk May 7, 2026
f79fb71
test_rapidsmpf_join_metadata(): https://github.com/rapidsai/cudf/pull…
madsbk May 7, 2026
cd78518
Merge branch 'main' of github.com:rapidsai/cudf into engine_reset-tes…
madsbk May 7, 2026
56fc913
xskip: multi-rank fallback
madsbk May 7, 2026
88f6b5d
cover
madsbk May 7, 2026
734df2e
test_over_in_filter_unsupported: fix warning
madsbk May 7, 2026
efc904b
Merge branch 'main' of github.com:rapidsai/cudf into engine_reset-tes…
madsbk May 7, 2026
83059fd
ConditionalJoin cleanup
madsbk May 7, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/pr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,7 @@ jobs:
# (rapidsmpf compatibility already validated in rapidsmpf CI)
matrix_filter: map(select(.ARCH == "amd64")) | group_by(.CUDA_VER|split(".")|map(tonumber)|.[0]) | map(max_by([(.PY_VER|split(".")|map(tonumber)), (.CUDA_VER|split(".")|map(tonumber))]))
build_type: pull-request
container-options: "--cap-add CAP_SYS_PTRACE --shm-size=8g --ulimit=nofile=1000000:1000000"
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes are necessary to provide enough resources for UCX. For reference, we already do the same for UCXX and RapidsMPF, both need it for the same reason:

script: "ci/test_cudf_polars_experimental.sh"
cudf-polars-polars-tests:
needs: [wheel-build-cudf-polars, changed-files]
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ jobs:
matrix_filter: map(select(.ARCH == "amd64")) | group_by(.CUDA_VER|split(".")|map(tonumber)|.[0]) | map(max_by([(.PY_VER|split(".")|map(tonumber)), (.CUDA_VER|split(".")|map(tonumber))]))
build_type: ${{ inputs.build_type }}
branch: ${{ inputs.branch }}
container-options: "--cap-add CAP_SYS_PTRACE --shm-size=8g --ulimit=nofile=1000000:1000000"
date: ${{ inputs.date }}
sha: ${{ inputs.sha }}
script: "ci/test_cudf_polars_experimental.sh"
Expand Down
2 changes: 1 addition & 1 deletion ci/run_cudf_polars_experimental_pytests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ set -euo pipefail
# Support invoking outside the script directory
cd "$(dirname "$(realpath "${BASH_SOURCE[0]}")")"/../python/cudf_polars/

echo "Running the full cudf-polars test suite with both the in-memory and spmd engine"
echo "Running the full cudf-polars test suite"
python -m pytest --cache-clear "$@" tests
2 changes: 1 addition & 1 deletion ci/test_cudf_polars_experimental.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ rapids-pip-retry install \
-v \
--prefer-binary \
--constraint "${PIP_CONSTRAINT}" \
"$(echo "${CUDF_POLARS_WHEELHOUSE}"/cudf_polars_"${RAPIDS_PY_CUDA_SUFFIX}"*.whl)[test,experimental]" \
"$(echo "${CUDF_POLARS_WHEELHOUSE}"/cudf_polars_"${RAPIDS_PY_CUDA_SUFFIX}"*.whl)[test,experimental,ray]" \
"$(echo "${LIBCUDF_WHEELHOUSE}"/libcudf_"${RAPIDS_PY_CUDA_SUFFIX}"*.whl)" \
"$(echo "${PYLIBCUDF_WHEELHOUSE}"/pylibcudf_"${RAPIDS_PY_CUDA_SUFFIX}"*.whl)"

Expand Down
13 changes: 13 additions & 0 deletions dependencies.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,14 @@ files:
key: experimental
includes:
- run_cudf_polars_experimental
py_run_cudf_polars_ray:
output: pyproject
pyproject_dir: python/cudf_polars
extras:
table: project.optional-dependencies
key: ray
includes:
- depends_on_ray
py_test_cudf_polars:
output: pyproject
pyproject_dir: python/cudf_polars
Expand Down Expand Up @@ -1290,6 +1298,11 @@ dependencies:
- matrix:
packages:
- *rapidsmpf_unsuffixed
depends_on_ray:
common:
- output_types: [conda, requirements, pyproject]
packages:
- ray>=2.55.1
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In conda/recipes/cudf-polars/recipe.yaml, we'll probably want to add a

  run_constraints:
    - ray >=2.55.1

to mirror this constraint

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ray is still optional, this is just for the CI testing

Copy link
Copy Markdown
Contributor

@mroeschke mroeschke May 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, IIRC run-constraints is like specifying a version constraint for optional runtime dependencies (like what got added to the pyproject.toml)

https://rattler-build.prefix.dev/latest/reference/recipe_file/#run-constraints

Packages that are optional at runtime but must obey the supplied additional constraint if they are installed.

But can be done in a follow up since the CI is currently green

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I didn't know that, thanks.
But yes, let's do it in a follow-up PR.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure thing, opened #22414

depends_on_rapids_logger:
common:
- output_types: [conda, requirements, pyproject]
Expand Down
7 changes: 4 additions & 3 deletions python/cudf_polars/cudf_polars/experimental/join.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,17 +164,18 @@ def _(
left, pi_left = rec(left)
right, pi_right = rec(right)

# Fallback to single partition on the smaller table
# Fallback to single partition on the smaller table whenever either
# side has more than one partition.
left_count = pi_left[left].count
right_count = pi_right[right].count
output_count = max(left_count, right_count)
fallback_msg = "ConditionalJoin not supported for multiple partitions."
if left_count < right_count:
if left_count > 1 or dynamic_planning:
if output_count > 1 or dynamic_planning:
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed a bug in the conditional-join fallback logic. We need to repartition the smaller table to a single broadcastable partition whenever either side has more than one partition.

Previously the fallback only triggered when the smaller side itself had multiple partitions, which breaks asymmetric cases like (left=2, right=1). In that situation the single right partition only exists on one rank, so peer ranks execute the conditional join against an empty right side and silently drop rows.

@TomAugspurger and @rjzamora, does that sound correct?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll defer to Rick on this...

Though if the new way is correct, then maybe the logic would be a bit more obvious if we combine the first two if conditions?

if (left_count < right_count) and (output_count > 1 or dynamic_planning): 
    left = Repartition(left.schema, left)
    pi_left[left] = PartitionInfo(count=1)
    _fallback_inform(fallback_msg, config_options)
elif output_count > 1 or dynamic_planning:
    right = Repartition(right.schema, right)
    pi_right[right] = PartitionInfo(count=1)
    _fallback_inform(fallback_msg, config_options)

Then the repeated condition (left_count < right_count) will maybe a bit a bit clear (that's true, but the second condition output_count > 1 or dynamic_planning isn't.

At least I hope those are logically equivalent.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we are maybe disabling dynamic planning for some join tests? By default, dynamic-planning is always on, so we always repartition the smaller side.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we are maybe disabling dynamic planning for some join tests? By default, dynamic-planning is always on, so we always repartition the smaller side.

Maybe, it only happens when running with multiple GPUs

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though if the new way is correct, then maybe the logic would be a bit more obvious if we combine the first two if conditions?

Yes, and we can even simplify a bit more. Done

left = Repartition(left.schema, left)
pi_left[left] = PartitionInfo(count=1)
_fallback_inform(fallback_msg, config_options)
elif right_count > 1 or dynamic_planning:
elif output_count > 1 or dynamic_planning:
right = Repartition(right.schema, right)
pi_right[right] = PartitionInfo(count=1)
_fallback_inform(fallback_msg, config_options)
Expand Down
43 changes: 43 additions & 0 deletions python/cudf_polars/cudf_polars/testing/engine_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

if TYPE_CHECKING:
from collections.abc import Mapping
from contextlib import AbstractContextManager

import polars as pl

Expand All @@ -21,6 +22,15 @@
STREAMING_ENGINE_FIXTURE_PARAMS: list[str] = []
if importlib.util.find_spec("rapidsmpf") is not None:
STREAMING_ENGINE_FIXTURE_PARAMS.extend(["spmd", "spmd-small"])
# ``DaskEngine`` and ``RayEngine`` both reject construction inside an
# ``rrun`` cluster.
from rapidsmpf.bootstrap import is_running_with_rrun as _is_running_with_rrun

if not _is_running_with_rrun(): # pragma: no cover
if importlib.util.find_spec("distributed") is not None:
STREAMING_ENGINE_FIXTURE_PARAMS.append("dask")
if importlib.util.find_spec("ray") is not None:
STREAMING_ENGINE_FIXTURE_PARAMS.append("ray")
ALL_ENGINE_FIXTURE_PARAMS = ["in-memory", *STREAMING_ENGINE_FIXTURE_PARAMS]


Expand Down Expand Up @@ -63,6 +73,34 @@ def is_streaming_engine(obj: Any) -> bool:
return isinstance(obj, StreamingEngine)


def warns_on_spmd(
engine: Any,
*args: Any,
when: bool = True,
**kwargs: Any,
) -> AbstractContextManager[Any]:
"""
``pytest.warns(*args, **kwargs)`` on SPMD; ``nullcontext`` otherwise.

``pytest.warns`` only captures warnings emitted in the test process. On
multi-process backends (``DaskEngine``, ``RayEngine``) the fallback
warning fires on workers/actors and only appears in worker logs/stdout,
so the assertion is replaced with a passthrough on those backends.

The optional ``when`` kwarg lets callers compose an additional gate (e.g.
a parametrize value) without an outer ``if``.
"""
import contextlib

import pytest

from cudf_polars.experimental.rapidsmpf.frontend.spmd import SPMDEngine

if when and isinstance(engine, SPMDEngine):
return pytest.warns(*args, **kwargs)
return contextlib.nullcontext()


def create_streaming_options(
blocksize_mode: Literal["medium", "small"],
overrides: StreamingOptions | None = None,
Expand All @@ -87,13 +125,17 @@ def create_streaming_options(
from cudf_polars.experimental.rapidsmpf.frontend.options import StreamingOptions
from cudf_polars.utils.config import StreamingFallbackMode

# ``allow_gpu_sharing=True`` is always set so the cached multi-rank
# engines (Dask workers, Ray actors with ``num_ranks > 1``) don't trip
# the UUID-collision guard on every ``_reset(...)``.
match blocksize_mode:
case "medium":
baseline = StreamingOptions(
max_rows_per_partition=50,
dynamic_planning={},
target_partition_size=1_000_000,
raise_on_fail=True,
allow_gpu_sharing=True,
)
case "small":
baseline = StreamingOptions(
Expand All @@ -102,6 +144,7 @@ def create_streaming_options(
target_partition_size=10,
raise_on_fail=True,
fallback_mode=StreamingFallbackMode.SILENT,
allow_gpu_sharing=True,
)
case _: # pragma: no cover
raise ValueError(f"Unknown blocksize_mode: {blocksize_mode!r}")
Expand Down
3 changes: 3 additions & 0 deletions python/cudf_polars/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ rapidsmpf = [
"pyarrow>=19.0.0,<24",
"rapidsmpf==26.6.*,>=0.0.0a0",
] # This list was generated by `rapids-dependency-file-generator`. To make changes, edit ../../dependencies.yaml and run `rapids-dependency-file-generator`.
ray = [
"ray>=2.55.1",
] # This list was generated by `rapids-dependency-file-generator`. To make changes, edit ../../dependencies.yaml and run `rapids-dependency-file-generator`.

[project.urls]
Homepage = "https://github.com/rapidsai/cudf"
Expand Down
72 changes: 67 additions & 5 deletions python/cudf_polars/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@
StreamingEngines: TypeAlias = Mapping[str, StreamingEngine]


# Number of ranks for multi-rank streaming engines that share one GPU
# (currently ``RayEngine``). Single-GPU dev hosts and CI runners require
# ``allow_gpu_sharing=True`` to oversubscribe one device across actors.
NUM_RANKS = 2
Comment thread
madsbk marked this conversation as resolved.


@pytest.fixture(params=[False, True], ids=["no_nulls", "nulls"], scope="session")
def with_nulls(request):
return request.param
Expand Down Expand Up @@ -89,6 +95,27 @@ def streaming_engines() -> Generator[StreamingEngines, None, None]:
)

engines: dict[str, StreamingEngine] = {"spmd": SPMDEngine(comm=comm)}

if "dask" in STREAMING_ENGINE_FIXTURE_PARAMS: # pragma: no cover
from cudf_polars.experimental.rapidsmpf.frontend.dask import DaskEngine

engines["dask"] = DaskEngine(engine_options={"allow_gpu_sharing": True})

if "ray" in STREAMING_ENGINE_FIXTURE_PARAMS: # pragma: no cover
from cudf_polars.experimental.rapidsmpf.frontend.ray import RayEngine

# Always pin ``num_ranks`` so the cached engine has a deterministic
# actor count regardless of how many GPUs the host happens to have;
# otherwise ``RayEngine`` defaults to ``get_num_gpus_in_ray_cluster()``
# and tests that depend on rank-count behavior (e.g. fast-count
# parquet, concat) become non-portable. Pinning ``num_ranks`` requires
# ``allow_gpu_sharing=True`` (production guard).
Comment thread
madsbk marked this conversation as resolved.
engines["ray"] = RayEngine(
num_ranks=NUM_RANKS,
engine_options={"allow_gpu_sharing": True},
ray_init_options={"include_dashboard": False},
)

try:
yield engines
finally:
Expand All @@ -108,6 +135,28 @@ def spmd_engine(streaming_engines: StreamingEngines) -> SPMDEngine:
return engine


@pytest.fixture
def spmd_engine_factory(
streaming_engines: StreamingEngines,
) -> Callable[..., SPMDEngine]:
"""
Return a factory that yields the shared :class:`SPMDEngine`.

Use this in place of :func:`streaming_engine_factory` for tests that
must run on SPMD only.
"""
from cudf_polars.experimental.rapidsmpf.frontend.spmd import SPMDEngine

param = EngineFixtureParam(full_name="spmd")

def factory(options: StreamingOptions | None = None) -> SPMDEngine:
engine = build_streaming_engine(param, streaming_engines, options)
assert isinstance(engine, SPMDEngine)
return engine

return factory


@pytest.fixture(params=STREAMING_ENGINE_FIXTURE_PARAMS)
def _streaming_engine_param(request: pytest.FixtureRequest) -> EngineFixtureParam:
"""Parametrization helper to run tests for each streaming engine variant."""
Expand Down Expand Up @@ -246,10 +295,9 @@ def pytest_configure(config):

config.addinivalue_line(
"markers",
"skip_on_streaming_engine(reason): skip the test for streaming "
'``engine`` variants (e.g. ``"spmd"``, ``"spmd-small"``) while '
"still letting the in-memory variant run. Use this to track features "
"that have no multi-partition implementation",
"skip_on_streaming_engine(reason, *, engine=None): skip the test for "
'streaming ``engine`` variants (e.g. ``"spmd"``, ``"spmd-small"``, '
'``"dask"``, ``"ray"``) while still allowing the in-memory variant to run.',
)

# Ray's internal subprocess management leaks `/dev/null` file handles, and
Expand All @@ -275,9 +323,23 @@ def pytest_collection_modifyitems(items):
callspec = getattr(item, "callspec", None)
if callspec is None:
continue
engine_param = callspec.params.get("_all_engine_param")
# Tests bind to either ``engine`` (parametrized via ``_all_engine_param``)
# or ``streaming_engine`` / ``streaming_engine_factory`` (parametrized via
# ``_streaming_engine_param``). Check both.
engine_param = callspec.params.get("_all_engine_param") or callspec.params.get(
"_streaming_engine_param"
)
if engine_param is None or engine_param == "in-memory":
continue
engine_filter = marker.kwargs.get("engine")
if engine_filter is not None:
if isinstance(engine_filter, str):
engine_filter = (engine_filter,)
# Strip the ``-small`` suffix so ``"spmd-small"`` matches
# ``engine=("spmd",)``.
engine_name = engine_param.removesuffix("-small")
if engine_name not in engine_filter:
continue
reason = (
marker.args[0]
if marker.args
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ def test_gather_cluster_info(streaming_engine) -> None:
assert isinstance(info.gpu_uuid, str)
# Each rank runs in its own process.
assert len({info.pid for info in infos}) == streaming_engine.nranks
# Without allow_gpu_sharing, all UUIDs must be unique (enforced at init).
assert len({info.gpu_uuid for info in infos}) == streaming_engine.nranks


def test_cluster_info_cuda_visible_devices(monkeypatch) -> None:
Expand Down
9 changes: 5 additions & 4 deletions python/cudf_polars/tests/experimental/test_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,11 @@

from cudf_polars.experimental.rapidsmpf.frontend.options import StreamingOptions
from cudf_polars.testing.asserts import assert_gpu_result_equal
from cudf_polars.testing.engine_utils import warns_on_spmd


@pytest.fixture
def engine(streaming_engine_factory):
# ``fallback_mode="warn"`` overrides the small-blocksize baseline (which
# sets SILENT) so ``test_filter_non_pointwise`` can assert on the warning.
return streaming_engine_factory(
StreamingOptions(max_rows_per_partition=3, fallback_mode="warn"),
)
Expand All @@ -38,7 +37,9 @@ def test_filter_pointwise(df, engine):

def test_filter_non_pointwise(df, engine):
query = df.filter(pl.col("a") > pl.col("a").max())
with pytest.warns(
UserWarning, match="This filter is not supported for multiple partitions."
with warns_on_spmd(
engine,
UserWarning,
match="This filter is not supported for multiple partitions.",
):
assert_gpu_result_equal(query, engine=engine)
8 changes: 6 additions & 2 deletions python/cudf_polars/tests/experimental/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@ def test_groupby_std_var_ddof(df, engine, agg, ddof):


@pytest.mark.parametrize("fallback_mode", ["silent", "raise", "warn", "foo"])
def test_groupby_fallback(df, fallback_mode, streaming_engine_factory):
streaming_engine = streaming_engine_factory(
def test_groupby_fallback(df, fallback_mode, spmd_engine_factory):
streaming_engine = spmd_engine_factory(
StreamingOptions(fallback_mode=fallback_mode),
)
match = "Failed to decompose groupby aggs"
Expand Down Expand Up @@ -287,6 +287,10 @@ def test_groupby_count_type_mismatch(df, streaming_engine_factory):
assert_gpu_result_equal(q, engine=streaming_engine, check_row_order=False)


@pytest.mark.skip_on_streaming_engine(
"patch.object on ShuffleManager.Inserter doesn't reach worker processes",
engine=("dask", "ray"),
)
def test_shuffle_reduce_insert_finished_called_on_oom(streaming_engine_factory):
streaming_engine = streaming_engine_factory(
StreamingOptions(target_partition_size=10, max_rows_per_partition=5),
Expand Down
Loading
Loading