[cudf_polars] Reorganize package layout#22491
Conversation
|
Auto-sync is disabled for draft pull requests in this repository. Workflows must be run manually. Contributors can view more details about this message here. |
|
Note Reviews pausedUse the following commands to manage reviews:
Use the checkboxes below for quick actions:
✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
| def lower_dataframescan_rapidsmpf( | ||
| ir: DataFrameScan, rec: LowerIRTransformer | ||
| ) -> tuple[IR, MutableMapping[IR, PartitionInfo]]: | ||
| """Lower a DataFrameScan node for the RapidsMPF streaming runtime.""" | ||
| config_options = rec.state["config_options"] | ||
|
|
||
| # NOTE: We calculate the expected partition count | ||
| # to help trigger fallback warnings in lower_ir_graph. | ||
| # The generate_ir_sub_network logic is NOT required | ||
| # to obey this partition count. However, the count | ||
| # WILL match after an IO operation (for now). | ||
| rows_per_partition = config_options.executor.max_rows_per_partition | ||
| nrows = max(ir.df.shape()[0], 1) | ||
| count = math.ceil(nrows / rows_per_partition) | ||
|
|
||
| return ir, {ir: PartitionInfo(count=count)} |
There was a problem hiding this comment.
Removed lower_dataframescan_rapidsmpf from streaming/actor_graph/io.py (formerly experimental/rapidsmpf/io.py).
Its body are inlined verbatim into the existing @lower_ir_node.register(DataFrameScan) implementation in streaming/io.py (formerly experimental/io.py).
| def lower_scan_rapidsmpf( | ||
| ir: Scan, rec: LowerIRTransformer | ||
| ) -> tuple[IR, MutableMapping[IR, PartitionInfo]]: | ||
| """Lower a Scan node for the RapidsMPF streaming runtime.""" | ||
| config_options = rec.state["config_options"] | ||
| if ( | ||
| ir.typ in ("csv", "parquet", "ndjson") | ||
| and ir.n_rows == -1 | ||
| and ir.skip_rows == 0 | ||
| and ir.row_index is None | ||
| ): | ||
| # NOTE: We calculate the expected partition count | ||
| # to help trigger fallback warnings in lower_ir_graph. | ||
| # The generate_ir_sub_network logic is NOT required | ||
| # to obey this partition count. However, the count | ||
| # WILL match after an IO operation (for now). | ||
| plan = scan_partition_plan(ir, rec.state["stats"], config_options) | ||
| paths = list(ir.paths) | ||
| if plan.flavor == IOPartitionFlavor.SPLIT_FILES: | ||
| count = plan.factor * len(paths) | ||
| else: | ||
| count = math.ceil(len(paths) / plan.factor) | ||
|
|
||
| return ir, {ir: PartitionInfo(count=count, io_plan=plan)} | ||
| else: | ||
| plan = IOPartitionPlan( | ||
| flavor=IOPartitionFlavor.SINGLE_READ, factor=len(ir.paths) | ||
| ) | ||
| return ir, {ir: PartitionInfo(count=1, io_plan=plan)} |
There was a problem hiding this comment.
Removed lower_scan_rapidsmpf from streaming/actor_graph/io.py (formerly experimental/rapidsmpf/io.py).
Its body are inlined verbatim into the existing @lower_ir_node.register(Scan) implementation in streaming/io.py (formerly experimental/io.py).
9f3df89 to
019cc0d
Compare
019cc0d to
774de38
Compare
|
/merge |
This PR is pure moving/renaming.
### New layout
```
cudf_polars/
callback.py
containers/
dsl/
engine/ ← user-facing GPU engine classes (Streaming/Ray/Dask/SPMD/DefaultSingleton)
streaming/ ← multi-partition execution layer (formerly "experimental")
actor_graph/ ← RapidsMPF-backed runtime
collectives/ ← RapidsMPF collective communication primitives
benchmarks/
utils.py ← consolidated benchmark utilities (formerly split between utils.py shim and utils_new_frontends.py)
pdsds.py
...
base.py
dispatch.py
parallel.py
groupby.py
io.py
join.py
...
testing/
typing/
utils/
```
Engine entry points move from deeply nested experimental paths to top-level imports:
```
cudf_polars.experimental.rapidsmpf.frontend.options → cudf_polars.engine.options
cudf_polars.experimental.rapidsmpf.frontend.spmd → cudf_polars.engine.spmd
cudf_polars.experimental.rapidsmpf.frontend.ray → cudf_polars.engine.ray
cudf_polars.experimental.rapidsmpf.frontend.dask → cudf_polars.engine.dask
cudf_polars.experimental.rapidsmpf.frontend.core → cudf_polars.engine.core
```
Benchmarks is now under `streaming`:
```
python -m cudf_polars.streaming.benchmarks.pdsh
```
Authors:
- Mads R. B. Kristensen (https://github.com/madsbk)
Approvers:
- Lawrence Mitchell (https://github.com/wence-)
- Peter Andreas Entschev (https://github.com/pentschev)
- Bradley Dice (https://github.com/bdice)
- Matthew Murray (https://github.com/Matt711)
URL: rapidsai#22491
- Follow up to #22491 - Moves the `collectives` module under `actor_graph` to break circular dependencies. The "collectives" are **mostly** used to build the actor graph anyway. **Note**: Before merging this, I'd like to get confirmation that others see circular-import errors locally. E.g. ``` pytest -v python/cudf_polars/tests/streaming/test_groupby.py ... E ImportError: cannot import name 'ShuffleManager' from partially initialized module 'cudf_polars.streaming.collectives.shuffle' (most likely due to a circular import) (/raid/rzamora/rapids-26.06/cudf/python/cudf_polars/cudf_polars/streaming/collectives/shuffle.py) ``` Authors: - Richard (Rick) Zamora (https://github.com/rjzamora) Approvers: - Matthew Murray (https://github.com/Matt711) - Mads R. B. Kristensen (https://github.com/madsbk) URL: #22578
Description
This PR is pure moving/renaming.
New layout
Engine entry points move from deeply nested experimental paths to top-level imports:
Benchmarks is now under
streaming: