Skip to content

[BUG][cudf-polars] Multi-rank Union (and pl.concat) interleaves child outputs across ranks, breaking row order #22376

@madsbk

Description

@madsbk

Summary

When the streaming engine executes pl.concat([A, B]) (or any Union IR node) with N > 1 ranks, the resulting row order does not match Polars semantics. Polars guarantees that pl.concat([A, B]) yields all rows of A followed by all rows of B. Under multi-rank execution, the client instead observes rank-local concatenations interleaved at rank boundaries.

Reproducer

import polars as pl
from cudf_polars.experimental.rapidsmpf.frontend.ray import RayEngine
from cudf_polars.testing.asserts import assert_gpu_result_equal

with RayEngine(
    num_ranks=2,  # Work when `num_ranks=1`
    engine_options={"allow_gpu_sharing": True},
    executor_options={"max_rows_per_partition": 1_000},
) as streaming_engine:
    df = pl.LazyFrame({
        "x": range(30_000),
        "y": [1, 2, 3] * 10_000,
        "z": [1.0, 2.0, 3.0, 4.0, 5.0] * 6_000,
    })
    df2 = pl.concat([df, df])
    assert_gpu_result_equal(df2, engine=streaming_engine, check_row_order=True)

Currently xfailed in:
python/cudf_polars/tests/experimental/test_dataframescan.py::test_dataframescan_concat

Observed vs. expected order (2 ranks, children A and B)

  • Expected (Polars CPU): A_rank0, A_rank1, B_rank0, B_rank1
  • Actual (streaming engine): A_rank0, B_rank0, A_rank1, B_rank1

Each rank correctly processes child A before B, but the client-side concatenation across ranks does not enforce a barrier between children. As a result, chunks from child A on rank 1 may arrive after chunks from child B on rank 0.

Root cause (sketch)

cudf_polars/experimental/rapidsmpf/union.py::union_node iterates over chs_in in order and forwards child[0] then child[1] per rank, but emits chunks asynchronously. The downstream collector concatenates chunks in arrival order across ranks. There is no cross-rank synchronization between children, and chunks carry no child_id metadata to allow reordering on the client.

Possible fixes

  1. Add a per-child cross-rank barrier in union_node: emit all chunks for child A, synchronize across ranks via the communicator, then proceed to child B. Simple, but serializes cross-child streaming.

  2. Tag chunks with child_id metadata and have the client-side collector group and concatenate per child before combining children. Preserves streaming overlap, but requires extending metadata on TableChunk/messages.

Option (2) is preferred unless profiling shows the barrier cost is negligible.

Scope

Affects any Union/pl.concat query under the rapidsmpf streaming engine with N > 1 ranks. Not specific to DataFrameScan.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions