Skip to content

[DO NOT MERGE] PDS DS ALL#22469

Open
quasiben wants to merge 75 commits into
rapidsai:mainfrom
quasiben:pds-ds-all
Open

[DO NOT MERGE] PDS DS ALL#22469
quasiben wants to merge 75 commits into
rapidsai:mainfrom
quasiben:pds-ds-all

Conversation

@quasiben
Copy link
Copy Markdown
Member

Matt711 and others added 30 commits April 17, 2026 01:58
@quasiben quasiben requested a review from a team as a code owner May 11, 2026 22:49
@quasiben quasiben requested a review from madsbk May 11, 2026 22:49
@copy-pr-bot
Copy link
Copy Markdown

copy-pr-bot Bot commented May 11, 2026

This pull request requires additional validation before any workflows can run on NVIDIA's runners.

Pull request vetters can view their responsibilities here.

Contributors can view more details about this message here.

@github-actions github-actions Bot added Python Affects Python cuDF API. cudf-polars Issues specific to cudf-polars labels May 11, 2026
@GPUtester GPUtester moved this to In Progress in cuDF Python May 11, 2026
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 11, 2026

Review Change Stack

📝 Walkthrough

Summary by CodeRabbit

  • New Features

    • Added support for Polars over() window expressions in the streaming SPMD execution engine, enabling advanced analytic queries with partition-by and order-by clauses.
  • Bug Fixes

    • Improved result comparison logic with enhanced floating-point tie-breaking for more accurate result validation.

Walkthrough

This PR adds streaming window expression (over()) support to cudf_polars via a new Over IR node and refactors 19 TPC-H benchmark queries to optimize join cardinality through early dimension-table filtering and semi-joins. It enhances assertion tie-breaking, extends RapidsMPF runtime infrastructure for Over execution, and provides comprehensive single- and multi-rank test coverage.

Changes

TPC-H Benchmark Query Optimizations

Layer / File(s) Summary
Single-filter dimension queries
q2.py, q8.py, q43.py, q52.py, q55.py
Pre-filter date_dim or item into reduced lookup tables and use semi-joins before main join chains, replacing post-join filtering.
Multi-filter dimension queries
q14.py, q17.py, q18.py, q23.py, q25.py, q29.py
Pre-filter multiple dimensions (date_dim, item, store, customer_demographics) and use semi-joins + derived key-pair tables to constrain downstream joins before expensive operations.
GroupBy-centric refactors
q44.py, q53.py, q63.py, q67.py, q76.py, q88.py, q9.py, q98.py
Introduce bucket-based aggregation, simplified benchmark joins, and conditional-sum logic; pre-filter dimensions and use semi-joins.

Assertion Enhancement

Layer / File(s) Summary
Float-based tie-breaking
asserts.py
Retry failed equality assertions by sorting on non-float columns followed by float columns; raise ValidationError with context messages.

Over IR and Expression Decomposition

Layer / File(s) Summary
Over IR node
experimental/over.py
New Over IR node with schema, key_indices, scalar/non-scalar flag, and expressions; evaluates as broadcasting Select; includes aggregation decomposition for scalar paths.
GroupedWindow decomposition and fusion
experimental/expressions.py, experimental/select.py
Route GroupedWindow expressions through _decompose_grouped_window_node; fuse compatible Over nodes via _fuse_over_nodes.

RapidsMPF Streaming Runtime

Layer / File(s) Summary
Core fanout and collectives
rapidsmpf/core.py, rapidsmpf/collectives/common.py
Add Over to unbounded-fanout determination and collective-operation set; allocate two IDs for forward/return shuffle phases.
Shared utilities
rapidsmpf/utils.py
Introduce TableSizeStats, _make_hash_shuffle_metadata, _sample_chunks for reusable shuffle metadata construction and sampling.
GroupBy and Join refactoring
rapidsmpf/groupby.py, rapidsmpf/join.py
Use _make_hash_shuffle_metadata helper; replace JoinSideStats with TableSizeStats; import _sample_chunks from utils.
Over streaming actor
rapidsmpf/over.py
Implement over_actor with scalar broadcast path (per-chunk aggregates) and non-scalar shuffle path (forward hash-shuffle, local evaluation, return shuffle with origin stamping).
Tracing TODOs
rapidsmpf/tracing.py
Document future API changes toward TableChunk-based interface.

Test Coverage

Layer / File(s) Summary
Single-rank over() tests
tests/experimental/test_rolling.py
Eight parametrized tests covering aggregations, ranking, order_by, with_columns, colliding names, mixed keys, many partitions, empty input, unsupported keys, and already-partitioned scenarios.
Multi-rank SPMD tests
tests/experimental/test_spmd.py
Two tests validating scalar/non-scalar over() across two ranks with same-rank and cross-rank key routing, per-rank isolation, and allgather correctness.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~75 minutes

Suggested labels

Python, improvement, non-breaking, cudf-polars

Suggested reviewers

  • TomAugspurger
  • msarahan
  • bdice
  • pentschev
  • mroeschke
🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 inconclusive)

Check name Status Explanation Resolution
Title check ❓ Inconclusive The title '[DO NOT MERGE] PDS DS ALL' is vague and uses non-descriptive terms that don't convey meaningful information about the changeset. Replace with a clear, specific title that describes the main change, such as 'Add Over IR node and distributed window expression support' or similar.
✅ Passed checks (4 passed)
Check name Status Explanation
Description check ✅ Passed The description mentions it merges final PRs for completing PDS-DS and lists specific PR numbers, which is related to the changeset scope.
Docstring Coverage ✅ Passed Docstring coverage is 82.89% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Tip

💬 Introducing Slack Agent: The best way for teams to turn conversations into code.

Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.

  • Generate code and open pull requests
  • Plan features and break down work
  • Investigate incidents and troubleshoot customer tickets together
  • Automate recurring tasks and respond to alerts with triggers
  • Summarize progress and report instantly

Built for teams:

  • Shared memory across your entire org—no repeating context
  • Per-thread sandboxes to safely plan and execute work
  • Governance built-in—scoped access, auditability, and budget controls

One agent for your entire SDLC. Right inside Slack.

👉 Get started


Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🧹 Nitpick comments (1)
python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q25.py (1)

121-121: ⚡ Quick win

Deduplicate the reused semi-join key set.

sr_customer_item is only used for how="semi" joins, so duplicate (sr_customer_sk, sr_item_sk) pairs do not affect results. Keeping duplicates here can still enlarge the build/shuffle side of both downstream joins and weakens the early-reduction benefit of this rewrite.

Proposed change
-    sr_customer_item = store_returns_filtered.select(["sr_customer_sk", "sr_item_sk"])
+    sr_customer_item = store_returns_filtered.select(
+        ["sr_customer_sk", "sr_item_sk"]
+    ).unique()
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q25.py`
at line 121, sr_customer_item currently holds (sr_customer_sk, sr_item_sk) pairs
with possible duplicates which are only used for how="semi" joins; to reduce
build/shuffle and enable early reduction, deduplicate that key set right after
selection by applying the dataframe's dedupe operation (e.g., drop_duplicates /
unique) on the selected columns so sr_customer_item becomes the distinct set of
(sr_customer_sk, sr_item_sk) before downstream semi-joins; update the code that
creates sr_customer_item (originating from store_returns_filtered.select([...]))
to call the appropriate drop_duplicates/unique method on that selection.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q2.py`:
- Around line 165-174: The aggregation currently produces 0 for weeks with no
matching day rows; update the group_by(...).agg(...) logic (the list
comprehension that builds per-day aggregates in the
wscs.join(...).group_by("d_week_seq").agg(...) pipeline) so each day column is
produced as a sum paired with a non-null count, then set the day's sum to NULL
when its non-null count == 0; in practice, for each day entry emitted by the
comprehension produce both the conditional sum and a conditional count (e.g.,
pl.when(...).then(1).otherwise(0)), and replace/emit NULL for the sum where that
count is zero so the Polars result matches the SQL SUM(CASE ... ELSE NULL END)
NULL semantics.

In `@python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q44.py`:
- Around line 119-130: Change the ranking from ordinal to SQL-style competition
ranking in both places where item_profits is ranked: in ascending_rank and
descending_rank. Replace rank(method="ordinal") with rank(method="min") on
pl.col("avg_profit") so tied avg_profit values get the same rnk (alias "rnk")
and the subsequent .filter(pl.col("rnk") < 11) behavior matches the duckdb_impl
reference; ensure both the ascending_rank and descending_rank pipelines use
method="min".

In `@python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q76.py`:
- Around line 175-178: Replace the plain sum() aggregation for sales_amt with a
conditional that returns NULL when a group has no non-null ext_sales_price
values: detect “all-NULL” using an expression like
pl.col("ext_sales_price").drop_nulls().count() == 0 (or comparing null_count to
group count) and use
pl.when(...).then(None).otherwise(pl.col("ext_sales_price").sum()).alias("sales_amt");
update the aggregation array that currently contains
pl.col("ext_sales_price").sum().alias("sales_amt") accordingly.

In `@python/cudf_polars/cudf_polars/experimental/rapidsmpf/over.py`:
- Around line 538-540: The modulus calculation uses integer floor division
(total_bytes // target_partition_size) which underestimates needed partitions;
change it to round up so slightly-over-target samples pick an extra partition —
compute the forward-shuffle partition estimate using ceiling division (e.g.,
ceil(total_bytes / target_partition_size) or (total_bytes +
target_partition_size - 1) // target_partition_size) before taking min/max with
comm.nranks and total_count in the modulus expression (affecting the modulus
variable and the forward-shuffle partitioning logic).

In `@python/cudf_polars/tests/experimental/test_rolling.py`:
- Around line 130-145: The engine fixture is not reading indirect parameters so
tests like test_over_noncol_key_fallback ignore custom executor options; update
the engine fixture to accept a request (e.g., request: FixtureRequest) and when
request.param is present use that value as the fixture return/config (falling
back to the current default when absent). Locate the fixture named "engine" and
change its signature to accept the request, then replace hardcoded/default
options with request.param (or merge request.param into defaults) so
indirect=True in parametrize works for all tests listed.

In `@python/cudf_polars/tests/experimental/test_spmd.py`:
- Around line 430-435: Replace the collection-time skip marker usage with a
runtime skip: where the test checks "if nranks != 2:" and currently calls
"request.applymarker(pytest.mark.skip(...))", call "pytest.skip('reason...')"
instead so the test aborts immediately at runtime and does not proceed to index
_SAME_RANK_KEYS[rank] or _CROSS_RANK_KEYS[rank]; make this change for both
occurrences (the block using request.applymarker around nranks != 2 and the
similar block at the later lines) to ensure proper skipping.

---

Nitpick comments:
In `@python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q25.py`:
- Line 121: sr_customer_item currently holds (sr_customer_sk, sr_item_sk) pairs
with possible duplicates which are only used for how="semi" joins; to reduce
build/shuffle and enable early reduction, deduplicate that key set right after
selection by applying the dataframe's dedupe operation (e.g., drop_duplicates /
unique) on the selected columns so sr_customer_item becomes the distinct set of
(sr_customer_sk, sr_item_sk) before downstream semi-joins; update the code that
creates sr_customer_item (originating from store_returns_filtered.select([...]))
to call the appropriate drop_duplicates/unique method on that selection.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Enterprise

Run ID: 54ce9b2e-48f3-41d3-bd96-8145924001a1

📥 Commits

Reviewing files that changed from the base of the PR and between b71adc2 and ac4c9d1.

📒 Files selected for processing (32)
  • python/cudf_polars/cudf_polars/experimental/benchmarks/asserts.py
  • python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q14.py
  • python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q17.py
  • python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q18.py
  • python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q2.py
  • python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q23.py
  • python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q25.py
  • python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q29.py
  • python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q43.py
  • python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q44.py
  • python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q52.py
  • python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q53.py
  • python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q55.py
  • python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q63.py
  • python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q67.py
  • python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q76.py
  • python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q8.py
  • python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q88.py
  • python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q9.py
  • python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q98.py
  • python/cudf_polars/cudf_polars/experimental/expressions.py
  • python/cudf_polars/cudf_polars/experimental/over.py
  • python/cudf_polars/cudf_polars/experimental/rapidsmpf/collectives/common.py
  • python/cudf_polars/cudf_polars/experimental/rapidsmpf/core.py
  • python/cudf_polars/cudf_polars/experimental/rapidsmpf/groupby.py
  • python/cudf_polars/cudf_polars/experimental/rapidsmpf/join.py
  • python/cudf_polars/cudf_polars/experimental/rapidsmpf/over.py
  • python/cudf_polars/cudf_polars/experimental/rapidsmpf/tracing.py
  • python/cudf_polars/cudf_polars/experimental/rapidsmpf/utils.py
  • python/cudf_polars/cudf_polars/experimental/select.py
  • python/cudf_polars/tests/experimental/test_rolling.py
  • python/cudf_polars/tests/experimental/test_spmd.py

Comment on lines +165 to 174
wscs.join(date_dim_prefilter, left_on="sold_date_sk", right_on="d_date_sk")
.group_by("d_week_seq")
.agg(
[
pl.when(pl.col("d_day_name") == day)
.then(pl.col("sales_price"))
.otherwise(None)
.sum()
.alias(name)
for day, name in zip(days, day_cols, strict=True)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Preserve SQL NULL semantics for weekdays with no sales.

This drops the old null-guard path, so a week with no matching rows for a given d_day_name can now aggregate to 0 in Polars instead of NULL like the DuckDB SUM(CASE ... ELSE NULL END) reference. That changes the final ratio columns for sparse weeks.

♻️ Proposed fix
     wswscs = (
         wscs.join(date_dim_prefilter, left_on="sold_date_sk", right_on="d_date_sk")
         .group_by("d_week_seq")
         .agg(
             [
-                pl.when(pl.col("d_day_name") == day)
-                .then(pl.col("sales_price"))
-                .otherwise(None)
-                .sum()
+                pl.when((pl.col("d_day_name") == day).any())
+                .then(
+                    pl.col("sales_price")
+                    .filter(pl.col("d_day_name") == day)
+                    .sum()
+                )
+                .otherwise(None)
                 .alias(name)
                 for day, name in zip(days, day_cols, strict=True)
             ]
         )
     )
In current Polars, does `group_by(...).agg(pl.when(condition).then(value).otherwise(None).sum())` return `0` or `null` when no rows in a group satisfy the condition?

As per coding guidelines, logic errors producing wrong results must be prevented and edge cases like all-null groups should be handled explicitly.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q2.py`
around lines 165 - 174, The aggregation currently produces 0 for weeks with no
matching day rows; update the group_by(...).agg(...) logic (the list
comprehension that builds per-day aggregates in the
wscs.join(...).group_by("d_week_seq").agg(...) pipeline) so each day column is
produced as a sum paired with a non-null count, then set the day's sum to NULL
when its non-null count == 0; in practice, for each day entry emitted by the
comprehension produce both the conditional sum and a conditional count (e.g.,
pl.when(...).then(1).otherwise(0)), and replace/emit NULL for the sum where that
count is zero so the Polars result matches the SQL SUM(CASE ... ELSE NULL END)
NULL semantics.

Comment on lines 119 to 130
ascending_rank = (
item_profits.with_columns(
[pl.col("avg(ss_net_profit)").rank(method="ordinal").alias("rnk")]
pl.col("avg_profit").rank(method="ordinal").alias("rnk")
)
.filter(pl.col("rnk") < 11)
.select(["ss_item_sk", "rnk"])
)

# Step 4: Create descending ranking (best to worst)
descending_rank = (
item_profits.with_columns(
[
pl.col("avg(ss_net_profit)")
.rank(method="ordinal", descending=True)
.alias("rnk")
]
pl.col("avg_profit").rank(method="ordinal", descending=True).alias("rnk")
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

🧩 Analysis chain

🌐 Web query:

In Polars, how does Expr.rank(method="ordinal")handle ties, and whichmethodmatches SQLRANK() semantics for tied values?

💡 Result:

In Polars, Expr.rank(method="ordinal") assigns a distinct rank to every value based on the order they occur in the Series; it does not give tied values the same rank. Specifically, the docs state that for method="ordinal" “All values are given a distinct rank, corresponding to the order that the values occur in the Series.” [1] SQL RANK semantics (ties get the same rank, with gaps in subsequent ranks) correspond to Polars method="min" (a.k.a. “competition” ranking), because Polars defines method="min" as assigning the minimum rank that would be assigned to all tied values, and this is what SQL RANK does for ties. [1][2]

Citations:


Use method="min" for SQL-compatible tie handling in rankings.

rank(method="ordinal") assigns distinct ranks to all values including ties, which differs from SQL RANK() that keeps tied values at the same rank. This causes tied items to move across the < 11 cutoff inconsistently with the duckdb_impl reference. Switch to method="min", which implements SQL-style competition ranking where tied values receive the same (minimum) rank.

Suggested fix
     ascending_rank = (
         item_profits.with_columns(
-            pl.col("avg_profit").rank(method="ordinal").alias("rnk")
+            pl.col("avg_profit").rank(method="min").alias("rnk")
         )
         .filter(pl.col("rnk") < 11)
         .select(["ss_item_sk", "rnk"])
     )

     descending_rank = (
         item_profits.with_columns(
-            pl.col("avg_profit").rank(method="ordinal", descending=True).alias("rnk")
+            pl.col("avg_profit").rank(method="min", descending=True).alias("rnk")
         )
         .filter(pl.col("rnk") < 11)
         .select(["ss_item_sk", "rnk"])
     )
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q44.py`
around lines 119 - 130, Change the ranking from ordinal to SQL-style competition
ranking in both places where item_profits is ranked: in ascending_rank and
descending_rank. Replace rank(method="ordinal") with rank(method="min") on
pl.col("avg_profit") so tied avg_profit values get the same rnk (alias "rnk")
and the subsequent .filter(pl.col("rnk") < 11) behavior matches the duckdb_impl
reference; ensure both the ascending_rank and descending_rank pipelines use
method="min".

Comment on lines 175 to 178
[
pl.len().cast(pl.Int64).alias("sales_cnt"),
pl.when(pl.col("ext_sales_price").count() > 0)
.then(pl.col("ext_sales_price").sum())
.otherwise(None)
.alias("sales_amt"),
pl.col("ext_sales_price").sum().alias("sales_amt"),
]
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

🧩 Analysis chain

🌐 Web query:

In Polars (version around 1.30), for a group where all values are NULL, what does pl.col("x").sum()return ingroup_by().agg(...)? Please cite docs/examples.

💡 Result:

In Polars (around v1.30), if you do a group_by aggregation and the entire group for column x is NULL, pl.col("x").sum returns 0 (an identity/neutral element), not NULL. This is explicitly documented for Expr.sum: “If there are no non-null values, then the output is 0.” [1] You can also see this behavior discussed/illustrated in Polars issues, where all-NULL groups in group_by.agg(pl.col(...).sum) produce 0 rather than None/NULL (e.g., “When summing all null values, it return zero.” with a group_by example showing 0 output for an all-None group). [2] If you want an all-NULL group to return NULL instead, the documented workaround is to condition on non-null presence (e.g., using null_count/count) and return None when null_count > 0. [3]

Citations:


Preserve SQL NULL semantics for sales_amt aggregation.

Line 177 uses plain sum() which returns 0 for all-NULL groups, diverging from SQL semantics (which returns NULL). Add an explicit null check to return NULL when no non-null values exist in a group.

Proposed fix
             .agg(
                 [
                     pl.len().cast(pl.Int64).alias("sales_cnt"),
-                    pl.col("ext_sales_price").sum().alias("sales_amt"),
+                    pl.when(pl.col("ext_sales_price").count() > 0)
+                    .then(pl.col("ext_sales_price").sum())
+                    .otherwise(None)
+                    .alias("sales_amt"),
                 ]
             )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
[
pl.len().cast(pl.Int64).alias("sales_cnt"),
pl.when(pl.col("ext_sales_price").count() > 0)
.then(pl.col("ext_sales_price").sum())
.otherwise(None)
.alias("sales_amt"),
pl.col("ext_sales_price").sum().alias("sales_amt"),
]
[
pl.len().cast(pl.Int64).alias("sales_cnt"),
pl.when(pl.col("ext_sales_price").count() > 0)
.then(pl.col("ext_sales_price").sum())
.otherwise(None)
.alias("sales_amt"),
]
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q76.py`
around lines 175 - 178, Replace the plain sum() aggregation for sales_amt with a
conditional that returns NULL when a group has no non-null ext_sales_price
values: detect “all-NULL” using an expression like
pl.col("ext_sales_price").drop_nulls().count() == 0 (or comparing null_count to
group count) and use
pl.when(...).then(None).otherwise(pl.col("ext_sales_price").sum()).alias("sales_amt");
update the aggregation array that currently contains
pl.col("ext_sales_price").sum().alias("sales_amt") accordingly.

Comment on lines +538 to +540
modulus = min(
max(comm.nranks, total_bytes // max(1, target_partition_size)),
max(1, total_count),
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Round the forward-shuffle partition estimate up.

total_bytes // target_partition_size rounds down, so slightly-over-target samples still choose too few partitions. That lets the forward shuffle build partitions larger than the configured target size and weakens the protection this path is trying to provide.

Suggested fix
-    modulus = min(
-        max(comm.nranks, total_bytes // max(1, target_partition_size)),
-        max(1, total_count),
-    )
+    size_target = max(1, target_partition_size)
+    estimated_partitions = (total_bytes + size_target - 1) // size_target
+    modulus = min(
+        max(comm.nranks, estimated_partitions),
+        max(1, total_count),
+    )
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@python/cudf_polars/cudf_polars/experimental/rapidsmpf/over.py` around lines
538 - 540, The modulus calculation uses integer floor division (total_bytes //
target_partition_size) which underestimates needed partitions; change it to
round up so slightly-over-target samples pick an extra partition — compute the
forward-shuffle partition estimate using ceiling division (e.g.,
ceil(total_bytes / target_partition_size) or (total_bytes +
target_partition_size - 1) // target_partition_size) before taking min/max with
comm.nranks and total_count in the modulus expression (affecting the modulus
variable and the forward-shuffle partitioning logic).

Comment on lines +130 to +145
@pytest.mark.parametrize(
"engine",
[{"executor_options": {"max_rows_per_partition": 2}}],
indirect=True,
)
def test_over_noncol_key_fallback(engine, expr) -> None:
# Non-Col and mixed Col/expr partition-by keys are not yet supported for
# multi-partition streaming and should fall back to single-partition.
df = pl.LazyFrame(
{
"g": [1, 1, 2, 2, 2, 1],
"x": [1, 2, 3, 4, 5, 6],
}
)
with pytest.warns(UserWarning, match=r"not supported for multiple partitions"):
assert_gpu_result_equal(df.select(expr), engine=engine)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

🧩 Analysis chain

🌐 Web query:

In pytest, when using @pytest.mark.parametrize("engine", [...], indirect=True), does the fixture need to accept requestand readrequest.param for the parametrized value to affect the fixture?

💡 Result:

Yes. With indirect parametrization, pytest passes the parametrized value to the fixture via the fixture request object, and the fixture must accept request and use request.param to read/use that value. Pytest’s docs show an indirect fixture defined as def fixt(request): return request.param * 3, and the example parametrizes "fixt" with indirect=True, meaning the fixture receives the values before passing them to the test.[1] If your fixture ignores request.param (or doesn’t take request at all), then it cannot vary its behavior based on the indirect parametrized value; the only way for the fixture to “see” the indirectly parametrized argument is through request.param (the fixture request object has an optional param attribute for indirectly parametrized fixtures).[2][3] In practice: - Use @pytest.mark.parametrize("engine", [...], indirect=True) when engine is the name of a fixture. - Define the fixture like @pytest.fixture def engine(request):... = request.param so it actually uses the parametrized value. Sources: [1][2][3].

Citations:


🏁 Script executed:

# First, find the test file
find . -path "*cudf_polars/tests/experimental/test_rolling.py" -type f

Repository: rapidsai/cudf

Length of output: 113


🏁 Script executed:

# Once located, read the fixture definition (lines 16-20) and parametrization examples
head -250 "python/cudf_polars/tests/experimental/test_rolling.py" | tail -235 | cat -n

Repository: rapidsai/cudf

Length of output: 8829


The engine fixture must accept request and use request.param for indirect parametrization to work.

The fixture at lines 16–20 doesn't accept request, so it cannot read the parametrized values passed via indirect=True. As a result, all these tests ignore their custom executor options and run with the default max_rows_per_partition=3, defeating the purpose of the special-case scenarios (e.g., "many partitions" at max_rows_per_partition=1, broadcast_join_limit=-1 disable). Fix the fixture to accept request: FixtureRequest and use request.param when provided.

Also applies to: 148–167, 180–195, 206–245

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@python/cudf_polars/tests/experimental/test_rolling.py` around lines 130 -
145, The engine fixture is not reading indirect parameters so tests like
test_over_noncol_key_fallback ignore custom executor options; update the engine
fixture to accept a request (e.g., request: FixtureRequest) and when
request.param is present use that value as the fixture return/config (falling
back to the current default when absent). Locate the fixture named "engine" and
change its signature to accept the request, then replace hardcoded/default
options with request.param (or merge request.param into defaults) so
indirect=True in parametrize works for all tests listed.

Comment on lines +430 to +435
if nranks != 2:
request.applymarker(
pytest.mark.skip(
reason="key assignments are probed for exactly 2 ranks"
)
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

🧩 Analysis chain

🌐 Web query:

In pytest, does calling request.applymarker(pytest.mark.skip(...)) inside a test immediately stop execution, or is pytest.skip(...) required to abort the current test at runtime?

💡 Result:

Calling request.applymarker(pytest.mark.skip(...)) inside a test does not immediately stop execution of the test. Markers like skip are evaluated during the collection phase of pytest (before test execution begins) to determine whether to run the test at all [1][2][3][4]. By the time the test function body executes and request (a fixture) becomes available, the decision to run the test has already been made, so dynamically applying a skip marker has no effect on skipping—it continues running [3]. To abort the current test at runtime from within the test function, pytest.skip(...) must be called, which raises a skip exception and immediately halts execution, marking the test as skipped [1][5].

Citations:


🏁 Script executed:

head -n 520 python/cudf_polars/tests/experimental/test_spmd.py | tail -n +379 | cat -n

Repository: rapidsai/cudf

Length of output: 6610


Use pytest.skip() instead of request.applymarker(pytest.mark.skip(...)).

request.applymarker(pytest.mark.skip(...)) applies the skip marker during test collection, not at runtime. Once the test function executes, the marker has no effect. These tests will continue to index _SAME_RANK_KEYS[rank] and _CROSS_RANK_KEYS[rank] when nranks != 2, causing an IndexError instead of skipping.

Use pytest.skip() to immediately abort execution and mark the test as skipped:

Suggested fix
-        if nranks != 2:
-            request.applymarker(
-                pytest.mark.skip(
-                    reason="key assignments are probed for exactly 2 ranks"
-                )
-            )
+        if nranks != 2:
+            pytest.skip("key assignments are probed for exactly 2 ranks")

Also applies to: lines 486-491

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@python/cudf_polars/tests/experimental/test_spmd.py` around lines 430 - 435,
Replace the collection-time skip marker usage with a runtime skip: where the
test checks "if nranks != 2:" and currently calls
"request.applymarker(pytest.mark.skip(...))", call "pytest.skip('reason...')"
instead so the test aborts immediately at runtime and does not proceed to index
_SAME_RANK_KEYS[rank] or _CROSS_RANK_KEYS[rank]; make this change for both
occurrences (the block using request.applymarker around nranks != 2 and the
similar block at the later lines) to ensure proper skipping.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

cudf-polars Issues specific to cudf-polars Python Affects Python cuDF API.

Projects

Status: In Progress

Development

Successfully merging this pull request may close these issues.

4 participants