diff --git a/python/cudf_polars/cudf_polars/experimental/rapidsmpf/frontend/dask.py b/python/cudf_polars/cudf_polars/experimental/rapidsmpf/frontend/dask.py index b4300346132..9585e1b9467 100644 --- a/python/cudf_polars/cudf_polars/experimental/rapidsmpf/frontend/dask.py +++ b/python/cudf_polars/cudf_polars/experimental/rapidsmpf/frontend/dask.py @@ -408,14 +408,20 @@ def _worker_evaluate( mp_ctx: _WorkerContext = getattr(dask_worker, f"_cudf_polars_mp_context_{uid}") if mp_ctx.ctx is None or mp_ctx.comm is None or mp_ctx.py_executor is None: raise RuntimeError("_setup_worker must be called before _worker_evaluate") - return evaluate_on_rank( + # Always collect the final metadata so we can suppress duplicated outputs + # on non-root ranks. The client concatenates per-rank results, so without + # this dedup an output marked ``duplicated=True`` would be N-counted. + df, metadata = evaluate_on_rank( mp_ctx.ctx, mp_ctx.comm, mp_ctx.py_executor, ir, config_options, - collect_metadata=collect_metadata, + collect_metadata=True, ) + if mp_ctx.comm.rank != 0 and metadata and metadata[-1].duplicated: + df = df.clear() + return df, metadata if collect_metadata else None def evaluate_pipeline_dask_mode( diff --git a/python/cudf_polars/cudf_polars/experimental/rapidsmpf/frontend/ray.py b/python/cudf_polars/cudf_polars/experimental/rapidsmpf/frontend/ray.py index efbb1db9ad4..e050a8faddb 100644 --- a/python/cudf_polars/cudf_polars/experimental/rapidsmpf/frontend/ray.py +++ b/python/cudf_polars/cudf_polars/experimental/rapidsmpf/frontend/ray.py @@ -363,14 +363,26 @@ def evaluate_polars_ir( # object store (pickle / Arrow IPC). The DataFrame is already on CPU at # this point (to_polars() copies the result off-GPU), so no GPU memory # crosses process boundaries. - return evaluate_on_rank( + # Always collect the final metadata so we can suppress duplicated + # outputs on non-root ranks. The client concatenates per-rank results, + # so without this dedup an output marked ``duplicated=True`` would be + # N-counted. + df, metadata = evaluate_on_rank( self._ctx, self._comm, self._py_executor, ir, config_options, - collect_metadata=collect_metadata, + collect_metadata=True, ) + if ( + self._comm is not None + and self._comm.rank != 0 + and metadata + and metadata[-1].duplicated + ): + df = df.clear() + return df, metadata if collect_metadata else None def _run(self, func: Callable[..., T], *args: Any, **kwargs: Any) -> T: return func(*args, **kwargs) diff --git a/python/cudf_polars/cudf_polars/experimental/rapidsmpf/join.py b/python/cudf_polars/cudf_polars/experimental/rapidsmpf/join.py index b36b07342ce..abb2e7082f0 100644 --- a/python/cudf_polars/cudf_polars/experimental/rapidsmpf/join.py +++ b/python/cudf_polars/cudf_polars/experimental/rapidsmpf/join.py @@ -205,19 +205,9 @@ async def _collect_small_side_for_broadcast( for s_id in range(len(chunks)): inserter.insert(s_id, chunks.pop(0)) stream = ir_context.get_cuda_stream() - gathered = await allgather.extract_concatenated(stream) - # When every rank inserted zero chunks, the AllGather has no schema - # to infer and returns a 0-column table. Substitute a properly typed - # empty table for the small side so downstream joins still match the - # expected schema. - table = ( - empty_table_chunk(ir, context, stream).table_view() - if gathered.num_columns() == 0 and len(ir.schema) > 0 - else gathered - ) dfs = [ DataFrame.from_table( - table, + await allgather.extract_concatenated(stream), list(ir.schema.keys()), list(ir.schema.values()), stream, diff --git a/python/cudf_polars/cudf_polars/experimental/rapidsmpf/nodes.py b/python/cudf_polars/cudf_polars/experimental/rapidsmpf/nodes.py index 56509a93f06..fd83c5e092f 100644 --- a/python/cudf_polars/cudf_polars/experimental/rapidsmpf/nodes.py +++ b/python/cudf_polars/cudf_polars/experimental/rapidsmpf/nodes.py @@ -732,20 +732,10 @@ async def metadata_drain_node( ): # Drain metadata channel (we don't need it after this point) metadata = await recv_metadata(ch_in, context) - send_empty = metadata.duplicated and comm.rank != 0 if metadata_collector is not None: metadata_collector.append(metadata) - # Forward non-duplicated data messages while (msg := await ch_in.recv(context)) is not None: - if not send_empty: - await ch_out.send(context, msg) - - # Send empty data if needed - if send_empty: - stream = ir_context.get_cuda_stream() - await ch_out.send( - context, Message(0, empty_table_chunk(ir, context, stream)) - ) + await ch_out.send(context, msg) await ch_out.drain(context) diff --git a/python/cudf_polars/cudf_polars/experimental/rapidsmpf/union.py b/python/cudf_polars/cudf_polars/experimental/rapidsmpf/union.py index 2484620234d..b4cb6a922b9 100644 --- a/python/cudf_polars/cudf_polars/experimental/rapidsmpf/union.py +++ b/python/cudf_polars/cudf_polars/experimental/rapidsmpf/union.py @@ -24,7 +24,6 @@ ) if TYPE_CHECKING: - from rapidsmpf.communicator.communicator import Communicator from rapidsmpf.streaming.core.channel import Channel from rapidsmpf.streaming.core.context import Context @@ -35,7 +34,6 @@ @define_actor() async def union_node( context: Context, - comm: Communicator, ir: Union, ir_context: IRExecutionContext, ch_out: Channel[TableChunk], @@ -48,9 +46,6 @@ async def union_node( ---------- context The rapidsmpf context. - comm - The communicator. Used to suppress duplicated children's chunks on - non-root ranks so they aren't emitted twice cluster-wide. ir The Union IR node. ir_context @@ -66,19 +61,14 @@ async def union_node( # Merge and forward metadata. # Union loses partitioning/ordering info since sources may differ. # TODO: Warn users that Union does NOT preserve order? + total_local_count = 0 + duplicated = True metadata = await gather_in_task_group( *(recv_metadata(ch, context) for ch in chs_in) ) - # When a child has duplicated=True, every rank has produced the same - # data and only rank 0 should forward it -- otherwise the downstream - # client-side concat would over-count by `nranks - 1` for each - # duplicated chunk. - skip = tuple(meta.duplicated and comm.rank != 0 for meta in metadata) - total_local_count = sum( - 0 if drop else meta.local_count - for meta, drop in zip(metadata, skip, strict=True) - ) - duplicated = all(meta.duplicated for meta in metadata) + for meta in metadata: + total_local_count += meta.local_count + duplicated = duplicated and meta.duplicated await send_metadata( ch_out, context, @@ -89,22 +79,21 @@ async def union_node( ) seq_num_offset = 0 - for ch_in, drop in zip(chs_in, skip, strict=True): + for ch_in in chs_in: num_ch_chunks = 0 while (msg := await ch_in.recv(context)) is not None: - if not drop: - await ch_out.send( - context, - Message( - msg.sequence_number + seq_num_offset, - TableChunk.from_message( - msg, br=context.br() - ).make_available_and_spill( - context.br(), allow_overbooking=True - ), + num_ch_chunks += 1 + await ch_out.send( + context, + Message( + msg.sequence_number + seq_num_offset, + TableChunk.from_message( + msg, br=context.br() + ).make_available_and_spill( + context.br(), allow_overbooking=True ), - ) - num_ch_chunks += 1 + ), + ) seq_num_offset += num_ch_chunks await ch_out.drain(context) @@ -127,7 +116,6 @@ def _( nodes[ir] = [ union_node( rec.state["context"], - rec.state["comm"], ir, rec.state["ir_context"], channels[ir].reserve_input_slot(), diff --git a/python/cudf_polars/cudf_polars/experimental/select.py b/python/cudf_polars/cudf_polars/experimental/select.py index 9ab30f9be13..25d0189fdf6 100644 --- a/python/cudf_polars/cudf_polars/experimental/select.py +++ b/python/cudf_polars/cudf_polars/experimental/select.py @@ -431,17 +431,13 @@ def _( ) named_expr = expr.NamedExpr(ir.exprs[0].name or "len", lit_expr) - # Use Empty as the input so the streaming network's metadata flows - # `duplicated=True` end-to-end. Without that, every rank emits the - # literal once and the client concatenates N copies. - input_ir: IR = Empty({}) new_node = Select( {named_expr.name: named_expr.value.dtype}, [named_expr], should_broadcast=True, - df=input_ir, + df=child, ) - partition_info[input_ir] = partition_info[new_node] = PartitionInfo(count=1) + partition_info[new_node] = PartitionInfo(count=1) return new_node, partition_info if not any( diff --git a/python/cudf_polars/cudf_polars/testing/engine_utils.py b/python/cudf_polars/cudf_polars/testing/engine_utils.py index c36bcf2ed27..4ca627562ea 100644 --- a/python/cudf_polars/cudf_polars/testing/engine_utils.py +++ b/python/cudf_polars/cudf_polars/testing/engine_utils.py @@ -21,6 +21,14 @@ STREAMING_ENGINE_FIXTURE_PARAMS: list[str] = [] if importlib.util.find_spec("rapidsmpf") is not None: STREAMING_ENGINE_FIXTURE_PARAMS.extend(["spmd", "spmd-small"]) + from rapidsmpf.bootstrap import is_running_with_rrun as _is_running_with_rrun + + if not _is_running_with_rrun(): # pragma: no cover + if importlib.util.find_spec("distributed") is not None: + STREAMING_ENGINE_FIXTURE_PARAMS.append("dask") + if importlib.util.find_spec("ray") is not None: + STREAMING_ENGINE_FIXTURE_PARAMS.append("ray") + del _is_running_with_rrun ALL_ENGINE_FIXTURE_PARAMS = ["in-memory", *STREAMING_ENGINE_FIXTURE_PARAMS] @@ -94,6 +102,7 @@ def create_streaming_options( dynamic_planning={}, target_partition_size=1_000_000, raise_on_fail=True, + allow_gpu_sharing=True, ) case "small": baseline = StreamingOptions( @@ -102,6 +111,7 @@ def create_streaming_options( target_partition_size=10, raise_on_fail=True, fallback_mode=StreamingFallbackMode.SILENT, + allow_gpu_sharing=True, ) case _: # pragma: no cover raise ValueError(f"Unknown blocksize_mode: {blocksize_mode!r}") diff --git a/python/cudf_polars/tests/conftest.py b/python/cudf_polars/tests/conftest.py index b3d83b36d36..05e18da4eaa 100644 --- a/python/cudf_polars/tests/conftest.py +++ b/python/cudf_polars/tests/conftest.py @@ -89,6 +89,21 @@ def streaming_engines() -> Generator[StreamingEngines, None, None]: ) engines: dict[str, StreamingEngine] = {"spmd": SPMDEngine(comm=comm)} + + if "dask" in STREAMING_ENGINE_FIXTURE_PARAMS: # pragma: no cover + from cudf_polars.experimental.rapidsmpf.frontend.dask import DaskEngine + + engines["dask"] = DaskEngine(engine_options={"allow_gpu_sharing": True}) + + if "ray" in STREAMING_ENGINE_FIXTURE_PARAMS: # pragma: no cover + from cudf_polars.experimental.rapidsmpf.frontend.ray import RayEngine + + engines["ray"] = RayEngine( + num_ranks=4, + engine_options={"allow_gpu_sharing": True}, + ray_init_options={"include_dashboard": False}, + ) + try: yield engines finally: diff --git a/python/cudf_polars/tests/experimental/test_dataframescan.py b/python/cudf_polars/tests/experimental/test_dataframescan.py index dbf22848824..57684734fea 100644 --- a/python/cudf_polars/tests/experimental/test_dataframescan.py +++ b/python/cudf_polars/tests/experimental/test_dataframescan.py @@ -60,15 +60,6 @@ def test_parallel_dataframescan(df, streaming_engine_factory, max_rows_per_parti assert count == 1 -@pytest.mark.xfail( - reason=( - "Multi-rank Union interleaves child outputs across ranks: client " - "receives [rank0_A, rank0_B, rank1_A, rank1_B] instead of the " - "polars-CPU [A, B]. Tracked in " - "https://github.com/rapidsai/cudf/issues/22376." - ), - strict=False, -) def test_dataframescan_concat(df, streaming_engine_factory): streaming_engine = streaming_engine_factory( StreamingOptions(max_rows_per_partition=1_000), diff --git a/python/cudf_polars/tests/experimental/test_spmd.py b/python/cudf_polars/tests/experimental/test_spmd.py index 96ec5eab932..e3f87d44759 100644 --- a/python/cudf_polars/tests/experimental/test_spmd.py +++ b/python/cudf_polars/tests/experimental/test_spmd.py @@ -294,6 +294,25 @@ def test_run(spmd_engine: SPMDEngine) -> None: assert result == [os.getpid()] +def test_sort_slice_over_union_of_duplicated_streams( + spmd_engine: SPMDEngine, +) -> None: + """Sort+head over a concat of two group-by branches returns the global result on every rank.""" + lf1 = ( + pl.LazyFrame({"name": ["alice"], "score": [1.0]}) + .group_by("name") + .agg(pl.col("score").sum()) + ) + lf2 = ( + pl.LazyFrame({"name": ["bob"], "score": [2.0]}) + .group_by("name") + .agg(pl.col("score").sum()) + ) + lf = pl.concat([lf1, lf2]).sort("score").head(10) + result = lf.collect(engine=spmd_engine) + assert sorted(result["name"].to_list()) == ["alice", "bob"] + + def test_reset_keeps_comm_alive(comm: Communicator) -> None: """``_reset`` must not rebuild the communicator.""" with SPMDEngine( diff --git a/python/cudf_polars/tests/experimental/test_union.py b/python/cudf_polars/tests/experimental/test_union.py index 79a3ca64963..ecb8f1ebfe1 100644 --- a/python/cudf_polars/tests/experimental/test_union.py +++ b/python/cudf_polars/tests/experimental/test_union.py @@ -17,3 +17,19 @@ def test_union_shared_fanout_no_deadlock(streaming_engine): project = df.select("key", "val") q = pl.concat([gb, project]) assert_gpu_result_equal(q, engine=streaming_engine, check_row_order=False) + + +def test_sort_slice_over_union_of_duplicated_streams(streaming_engine): + # Sort+head over a concat of two group-by branches. + lf1 = ( + pl.LazyFrame({"name": ["alice"], "score": [1.0]}) + .group_by("name") + .agg(pl.col("score").sum()) + ) + lf2 = ( + pl.LazyFrame({"name": ["bob"], "score": [2.0]}) + .group_by("name") + .agg(pl.col("score").sum()) + ) + q = pl.concat([lf1, lf2]).sort("score").head(10) + assert_gpu_result_equal(q, engine=streaming_engine, check_row_order=False) diff --git a/python/cudf_polars/tests/test_groupby.py b/python/cudf_polars/tests/test_groupby.py index f14160a1043..a14177b9f0c 100644 --- a/python/cudf_polars/tests/test_groupby.py +++ b/python/cudf_polars/tests/test_groupby.py @@ -501,13 +501,8 @@ def test_groupby_sum_decimal_null_group(engine: pl.GPUEngine) -> None: @pytest.mark.xfail( - raises=(AssertionError, pl.exceptions.SchemaError), - reason=( - "https://github.com/rapidsai/cudf/issues/19610 — in-memory engine " - "fails with AssertionError (wrong values); multi-rank streaming " - "fails earlier with SchemaError (literal agg yields a divergent " - "schema after cross-rank concat)." - ), + raises=AssertionError, + reason="https://github.com/rapidsai/cudf/issues/19610", ) def test_groupby_literal_agg(engine: pl.GPUEngine): df = pl.LazyFrame({"c0": [True, False]})