Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -408,14 +408,20 @@ def _worker_evaluate(
mp_ctx: _WorkerContext = getattr(dask_worker, f"_cudf_polars_mp_context_{uid}")
if mp_ctx.ctx is None or mp_ctx.comm is None or mp_ctx.py_executor is None:
raise RuntimeError("_setup_worker must be called before _worker_evaluate")
return evaluate_on_rank(
# Always collect the final metadata so we can suppress duplicated outputs
# on non-root ranks. The client concatenates per-rank results, so without
# this dedup an output marked ``duplicated=True`` would be N-counted.
df, metadata = evaluate_on_rank(
mp_ctx.ctx,
mp_ctx.comm,
mp_ctx.py_executor,
ir,
config_options,
collect_metadata=collect_metadata,
collect_metadata=True,
)
if mp_ctx.comm.rank != 0 and metadata and metadata[-1].duplicated:
df = df.clear()
return df, metadata if collect_metadata else None


def evaluate_pipeline_dask_mode(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,14 +363,26 @@ def evaluate_polars_ir(
# object store (pickle / Arrow IPC). The DataFrame is already on CPU at
# this point (to_polars() copies the result off-GPU), so no GPU memory
# crosses process boundaries.
return evaluate_on_rank(
# Always collect the final metadata so we can suppress duplicated
# outputs on non-root ranks. The client concatenates per-rank results,
# so without this dedup an output marked ``duplicated=True`` would be
# N-counted.
df, metadata = evaluate_on_rank(
self._ctx,
self._comm,
self._py_executor,
ir,
config_options,
collect_metadata=collect_metadata,
collect_metadata=True,
)
if (
self._comm is not None
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can self._comm possibly be None if we got here?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think instead one should raise (like for if self._ctx is None) at the beginning of the function.

and self._comm.rank != 0
and metadata
and metadata[-1].duplicated
):
df = df.clear()
return df, metadata if collect_metadata else None

def _run(self, func: Callable[..., T], *args: Any, **kwargs: Any) -> T:
return func(*args, **kwargs)
Expand Down
12 changes: 1 addition & 11 deletions python/cudf_polars/cudf_polars/experimental/rapidsmpf/join.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,19 +205,9 @@ async def _collect_small_side_for_broadcast(
for s_id in range(len(chunks)):
inserter.insert(s_id, chunks.pop(0))
stream = ir_context.get_cuda_stream()
gathered = await allgather.extract_concatenated(stream)
# When every rank inserted zero chunks, the AllGather has no schema
# to infer and returns a 0-column table. Substitute a properly typed
# empty table for the small side so downstream joins still match the
# expected schema.
table = (
empty_table_chunk(ir, context, stream).table_view()
if gathered.num_columns() == 0 and len(ir.schema) > 0
else gathered
)
dfs = [
DataFrame.from_table(
table,
await allgather.extract_concatenated(stream),
list(ir.schema.keys()),
list(ir.schema.values()),
stream,
Expand Down
12 changes: 1 addition & 11 deletions python/cudf_polars/cudf_polars/experimental/rapidsmpf/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -732,20 +732,10 @@ async def metadata_drain_node(
):
# Drain metadata channel (we don't need it after this point)
metadata = await recv_metadata(ch_in, context)
send_empty = metadata.duplicated and comm.rank != 0
if metadata_collector is not None:
metadata_collector.append(metadata)

# Forward non-duplicated data messages
while (msg := await ch_in.recv(context)) is not None:
if not send_empty:
await ch_out.send(context, msg)

# Send empty data if needed
if send_empty:
stream = ir_context.get_cuda_stream()
await ch_out.send(
context, Message(0, empty_table_chunk(ir, context, stream))
)
await ch_out.send(context, msg)

await ch_out.drain(context)
46 changes: 17 additions & 29 deletions python/cudf_polars/cudf_polars/experimental/rapidsmpf/union.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
)

if TYPE_CHECKING:
from rapidsmpf.communicator.communicator import Communicator
from rapidsmpf.streaming.core.channel import Channel
from rapidsmpf.streaming.core.context import Context

Expand All @@ -35,7 +34,6 @@
@define_actor()
async def union_node(
context: Context,
comm: Communicator,
ir: Union,
ir_context: IRExecutionContext,
ch_out: Channel[TableChunk],
Expand All @@ -48,9 +46,6 @@ async def union_node(
----------
context
The rapidsmpf context.
comm
The communicator. Used to suppress duplicated children's chunks on
non-root ranks so they aren't emitted twice cluster-wide.
ir
The Union IR node.
ir_context
Expand All @@ -66,19 +61,14 @@ async def union_node(
# Merge and forward metadata.
# Union loses partitioning/ordering info since sources may differ.
# TODO: Warn users that Union does NOT preserve order?
total_local_count = 0
duplicated = True
metadata = await gather_in_task_group(
*(recv_metadata(ch, context) for ch in chs_in)
)
# When a child has duplicated=True, every rank has produced the same
# data and only rank 0 should forward it -- otherwise the downstream
# client-side concat would over-count by `nranks - 1` for each
# duplicated chunk.
skip = tuple(meta.duplicated and comm.rank != 0 for meta in metadata)
total_local_count = sum(
0 if drop else meta.local_count
for meta, drop in zip(metadata, skip, strict=True)
)
duplicated = all(meta.duplicated for meta in metadata)
for meta in metadata:
total_local_count += meta.local_count
duplicated = duplicated and meta.duplicated
await send_metadata(
ch_out,
context,
Expand All @@ -89,22 +79,21 @@ async def union_node(
)

seq_num_offset = 0
for ch_in, drop in zip(chs_in, skip, strict=True):
for ch_in in chs_in:
num_ch_chunks = 0
while (msg := await ch_in.recv(context)) is not None:
if not drop:
await ch_out.send(
context,
Message(
msg.sequence_number + seq_num_offset,
TableChunk.from_message(
msg, br=context.br()
).make_available_and_spill(
context.br(), allow_overbooking=True
),
num_ch_chunks += 1
await ch_out.send(
context,
Message(
msg.sequence_number + seq_num_offset,
TableChunk.from_message(
msg, br=context.br()
).make_available_and_spill(
context.br(), allow_overbooking=True
),
)
num_ch_chunks += 1
),
)
seq_num_offset += num_ch_chunks

await ch_out.drain(context)
Expand All @@ -127,7 +116,6 @@ def _(
nodes[ir] = [
union_node(
rec.state["context"],
rec.state["comm"],
ir,
rec.state["ir_context"],
channels[ir].reserve_input_slot(),
Expand Down
8 changes: 2 additions & 6 deletions python/cudf_polars/cudf_polars/experimental/select.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,17 +431,13 @@ def _(
)
named_expr = expr.NamedExpr(ir.exprs[0].name or "len", lit_expr)

# Use Empty as the input so the streaming network's metadata flows
# `duplicated=True` end-to-end. Without that, every rank emits the
# literal once and the client concatenates N copies.
input_ir: IR = Empty({})
new_node = Select(
{named_expr.name: named_expr.value.dtype},
[named_expr],
should_broadcast=True,
df=input_ir,
df=child,
)
partition_info[input_ir] = partition_info[new_node] = PartitionInfo(count=1)
partition_info[new_node] = PartitionInfo(count=1)
return new_node, partition_info

if not any(
Expand Down
10 changes: 10 additions & 0 deletions python/cudf_polars/cudf_polars/testing/engine_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@
STREAMING_ENGINE_FIXTURE_PARAMS: list[str] = []
if importlib.util.find_spec("rapidsmpf") is not None:
STREAMING_ENGINE_FIXTURE_PARAMS.extend(["spmd", "spmd-small"])
from rapidsmpf.bootstrap import is_running_with_rrun as _is_running_with_rrun

if not _is_running_with_rrun(): # pragma: no cover
if importlib.util.find_spec("distributed") is not None:
STREAMING_ENGINE_FIXTURE_PARAMS.append("dask")
if importlib.util.find_spec("ray") is not None:
STREAMING_ENGINE_FIXTURE_PARAMS.append("ray")
del _is_running_with_rrun
ALL_ENGINE_FIXTURE_PARAMS = ["in-memory", *STREAMING_ENGINE_FIXTURE_PARAMS]


Expand Down Expand Up @@ -94,6 +102,7 @@ def create_streaming_options(
dynamic_planning={},
target_partition_size=1_000_000,
raise_on_fail=True,
allow_gpu_sharing=True,
)
case "small":
baseline = StreamingOptions(
Expand All @@ -102,6 +111,7 @@ def create_streaming_options(
target_partition_size=10,
raise_on_fail=True,
fallback_mode=StreamingFallbackMode.SILENT,
allow_gpu_sharing=True,
)
case _: # pragma: no cover
raise ValueError(f"Unknown blocksize_mode: {blocksize_mode!r}")
Expand Down
15 changes: 15 additions & 0 deletions python/cudf_polars/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,21 @@ def streaming_engines() -> Generator[StreamingEngines, None, None]:
)

engines: dict[str, StreamingEngine] = {"spmd": SPMDEngine(comm=comm)}

if "dask" in STREAMING_ENGINE_FIXTURE_PARAMS: # pragma: no cover
from cudf_polars.experimental.rapidsmpf.frontend.dask import DaskEngine

engines["dask"] = DaskEngine(engine_options={"allow_gpu_sharing": True})

if "ray" in STREAMING_ENGINE_FIXTURE_PARAMS: # pragma: no cover
from cudf_polars.experimental.rapidsmpf.frontend.ray import RayEngine

engines["ray"] = RayEngine(
num_ranks=4,
engine_options={"allow_gpu_sharing": True},
ray_init_options={"include_dashboard": False},
)

try:
yield engines
finally:
Expand Down
9 changes: 0 additions & 9 deletions python/cudf_polars/tests/experimental/test_dataframescan.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,6 @@ def test_parallel_dataframescan(df, streaming_engine_factory, max_rows_per_parti
assert count == 1


@pytest.mark.xfail(
reason=(
"Multi-rank Union interleaves child outputs across ranks: client "
"receives [rank0_A, rank0_B, rank1_A, rank1_B] instead of the "
"polars-CPU [A, B]. Tracked in "
"https://github.com/rapidsai/cudf/issues/22376."
),
strict=False,
)
def test_dataframescan_concat(df, streaming_engine_factory):
streaming_engine = streaming_engine_factory(
StreamingOptions(max_rows_per_partition=1_000),
Expand Down
19 changes: 19 additions & 0 deletions python/cudf_polars/tests/experimental/test_spmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,25 @@ def test_run(spmd_engine: SPMDEngine) -> None:
assert result == [os.getpid()]


def test_sort_slice_over_union_of_duplicated_streams(
spmd_engine: SPMDEngine,
) -> None:
"""Sort+head over a concat of two group-by branches returns the global result on every rank."""
lf1 = (
pl.LazyFrame({"name": ["alice"], "score": [1.0]})
.group_by("name")
.agg(pl.col("score").sum())
)
lf2 = (
pl.LazyFrame({"name": ["bob"], "score": [2.0]})
.group_by("name")
.agg(pl.col("score").sum())
)
lf = pl.concat([lf1, lf2]).sort("score").head(10)
result = lf.collect(engine=spmd_engine)
assert sorted(result["name"].to_list()) == ["alice", "bob"]


def test_reset_keeps_comm_alive(comm: Communicator) -> None:
"""``_reset`` must not rebuild the communicator."""
with SPMDEngine(
Expand Down
16 changes: 16 additions & 0 deletions python/cudf_polars/tests/experimental/test_union.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,19 @@ def test_union_shared_fanout_no_deadlock(streaming_engine):
project = df.select("key", "val")
q = pl.concat([gb, project])
assert_gpu_result_equal(q, engine=streaming_engine, check_row_order=False)


def test_sort_slice_over_union_of_duplicated_streams(streaming_engine):
# Sort+head over a concat of two group-by branches.
lf1 = (
pl.LazyFrame({"name": ["alice"], "score": [1.0]})
.group_by("name")
.agg(pl.col("score").sum())
)
lf2 = (
pl.LazyFrame({"name": ["bob"], "score": [2.0]})
.group_by("name")
.agg(pl.col("score").sum())
)
q = pl.concat([lf1, lf2]).sort("score").head(10)
assert_gpu_result_equal(q, engine=streaming_engine, check_row_order=False)
9 changes: 2 additions & 7 deletions python/cudf_polars/tests/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -501,13 +501,8 @@ def test_groupby_sum_decimal_null_group(engine: pl.GPUEngine) -> None:


@pytest.mark.xfail(
raises=(AssertionError, pl.exceptions.SchemaError),
reason=(
"https://github.com/rapidsai/cudf/issues/19610 — in-memory engine "
"fails with AssertionError (wrong values); multi-rank streaming "
"fails earlier with SchemaError (literal agg yields a divergent "
"schema after cross-rank concat)."
),
raises=AssertionError,
reason="https://github.com/rapidsai/cudf/issues/19610",
)
def test_groupby_literal_agg(engine: pl.GPUEngine):
df = pl.LazyFrame({"c0": [True, False]})
Expand Down
Loading