Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions python/cudf_polars/cudf_polars/engine/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@

from cudf_polars.containers import DataFrame
from cudf_polars.dsl.ir import IRExecutionContext
from cudf_polars.streaming.actor_graph.collectives import ReserveOpIDs
from cudf_polars.streaming.actor_graph.collectives.common import reserve_op_id
from cudf_polars.streaming.actor_graph.core import generate_network
from cudf_polars.streaming.actor_graph.tracing import log_query_plan
from cudf_polars.streaming.actor_graph.utils import empty_table_chunk
from cudf_polars.streaming.base import StatsCollector
from cudf_polars.streaming.collectives import ReserveOpIDs
from cudf_polars.streaming.collectives.common import reserve_op_id
from cudf_polars.streaming.parallel import lower_ir_graph
from cudf_polars.streaming.statistics import collect_statistics
from cudf_polars.streaming.utils import _concat
Expand Down
2 changes: 1 addition & 1 deletion python/cudf_polars/cudf_polars/engine/spmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@
HardwareBindingPolicy,
bind_to_gpu,
)
from cudf_polars.streaming.actor_graph.collectives.common import reserve_op_id
from cudf_polars.streaming.actor_graph.utils import set_memory_resource
from cudf_polars.streaming.collectives.common import reserve_op_id
from cudf_polars.utils.config import (
MemoryResourceConfig,
SPMDContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

from __future__ import annotations

import cudf_polars.streaming.actor_graph.collectives.shuffle
import cudf_polars.streaming.actor_graph.collectives.sort

# Side-effect imports: each module registers
# ``@generate_ir_sub_network.register(...)`` handlers at import time so the
# dispatch table is populated before any query is evaluated.
Expand All @@ -13,8 +16,6 @@
import cudf_polars.streaming.actor_graph.join
import cudf_polars.streaming.actor_graph.over
import cudf_polars.streaming.actor_graph.repartition
import cudf_polars.streaming.actor_graph.union
import cudf_polars.streaming.collectives.shuffle
import cudf_polars.streaming.collectives.sort # noqa: F401
import cudf_polars.streaming.actor_graph.union # noqa: F401

__all__: list[str] = []
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
# SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES.
# SPDX-License-Identifier: Apache-2.0
"""Collective operations for the RapidsMPF streaming runtime."""
"""Collective operations for building a RapidsMPF actor graph."""

from __future__ import annotations

from cudf_polars.streaming.collectives.common import (
from cudf_polars.streaming.actor_graph.collectives.common import (
ReserveOpIDs,
reserve_op_id,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
from cudf_polars.dsl.expr import Col, NamedExpr
from cudf_polars.dsl.ir import Empty, Sort
from cudf_polars.dsl.utils.naming import names_to_indices, unique_names
from cudf_polars.streaming.actor_graph.collectives.allgather import AllGatherManager
from cudf_polars.streaming.actor_graph.collectives.shuffle import ShuffleManager
from cudf_polars.streaming.actor_graph.dispatch import generate_ir_sub_network
from cudf_polars.streaming.actor_graph.nodes import (
default_node_single,
Expand All @@ -47,8 +49,6 @@
replay_buffered_channel,
send_metadata,
)
from cudf_polars.streaming.collectives.allgather import AllGatherManager
from cudf_polars.streaming.collectives.shuffle import ShuffleManager
from cudf_polars.streaming.repartition import Repartition
from cudf_polars.streaming.sort import (
_get_final_sort_boundaries,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from cudf_polars.dsl.expr import Col, NamedExpr
from cudf_polars.dsl.ir import IR, Distinct, GroupBy, Select
from cudf_polars.dsl.utils.naming import unique_names
from cudf_polars.streaming.actor_graph.collectives.shuffle import ShuffleManager
from cudf_polars.streaming.actor_graph.dispatch import (
generate_ir_sub_network,
)
Expand All @@ -43,7 +44,6 @@
send_metadata,
shutdown_on_error,
)
from cudf_polars.streaming.collectives.shuffle import ShuffleManager
from cudf_polars.streaming.groupby import combine, decompose
from cudf_polars.streaming.repartition import Repartition

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
from cudf_polars.containers import DataFrame
from cudf_polars.dsl.ir import IR, Join
from cudf_polars.dsl.utils.naming import names_to_indices
from cudf_polars.streaming.actor_graph.collectives.allgather import AllGatherManager
from cudf_polars.streaming.actor_graph.collectives.shuffle import _global_shuffle
from cudf_polars.streaming.actor_graph.dispatch import (
generate_ir_sub_network,
)
Expand All @@ -51,8 +53,6 @@
send_metadata,
shutdown_on_error,
)
from cudf_polars.streaming.collectives.allgather import AllGatherManager
from cudf_polars.streaming.collectives.shuffle import _global_shuffle
from cudf_polars.streaming.repartition import Repartition
from cudf_polars.streaming.utils import _concat

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@
from cudf_polars.dsl.expressions.base import ExecutionContext
from cudf_polars.dsl.utils.naming import unique_names
from cudf_polars.dsl.utils.reshape import broadcast
from cudf_polars.streaming.actor_graph.collectives.shuffle import (
LocalRepartitioner,
ShuffleManager,
)
from cudf_polars.streaming.actor_graph.dispatch import generate_ir_sub_network
from cudf_polars.streaming.actor_graph.utils import (
ChannelManager,
Expand All @@ -81,10 +85,6 @@
send_metadata,
shutdown_on_error,
)
from cudf_polars.streaming.collectives.shuffle import (
LocalRepartitioner,
ShuffleManager,
)
from cudf_polars.streaming.over import Over, _build_over_groupby_irs

if TYPE_CHECKING:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
)

from cudf_polars.containers import DataFrame
from cudf_polars.streaming.actor_graph.collectives.allgather import AllGatherManager
from cudf_polars.streaming.actor_graph.dispatch import generate_ir_sub_network
from cudf_polars.streaming.actor_graph.nodes import shutdown_on_error
from cudf_polars.streaming.actor_graph.utils import (
Expand All @@ -25,7 +26,6 @@
recv_metadata,
send_metadata,
)
from cudf_polars.streaming.collectives.allgather import AllGatherManager
from cudf_polars.streaming.repartition import Repartition
from cudf_polars.streaming.utils import _concat

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@
from cudf_polars.dsl.ir import Cache, Filter, GroupBy, HStack, Join, Projection, Select
from cudf_polars.dsl.tracing import Scope
from cudf_polars.dsl.utils.naming import names_to_indices
from cudf_polars.streaming.actor_graph.collectives.allgather import AllGatherManager
from cudf_polars.streaming.actor_graph.tracing import ActorTracer
from cudf_polars.streaming.collectives.allgather import AllGatherManager
from cudf_polars.streaming.utils import _concat
from cudf_polars.utils.dtypes import make_empty_column

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1187,7 +1187,7 @@ def run_polars_spmd(
from cudf_polars.engine.spmd import (
allgather_polars_dataframe,
)
from cudf_polars.streaming.collectives.common import reserve_op_id
from cudf_polars.streaming.actor_graph.collectives.common import reserve_op_id

def _allgather_result(df: pl.DataFrame) -> pl.DataFrame:
with reserve_op_id() as op_id:
Expand Down
2 changes: 1 addition & 1 deletion python/cudf_polars/tests/streaming/test_allgather.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
import pylibcudf as plc

from cudf_polars.dsl.ir import IRExecutionContext
from cudf_polars.streaming.actor_graph.collectives.allgather import AllGatherManager
from cudf_polars.streaming.actor_graph.utils import allgather_reduce
from cudf_polars.streaming.collectives.allgather import AllGatherManager


async def _test_allgather(engine) -> None:
Expand Down
2 changes: 1 addition & 1 deletion python/cudf_polars/tests/streaming/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import polars as pl

from cudf_polars.engine.options import StreamingOptions
from cudf_polars.streaming.collectives.shuffle import ShuffleManager
from cudf_polars.streaming.actor_graph.collectives.shuffle import ShuffleManager
from cudf_polars.testing.asserts import assert_gpu_result_equal


Expand Down
8 changes: 4 additions & 4 deletions python/cudf_polars/tests/streaming/test_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@
from cudf_polars.dsl import expr
from cudf_polars.dsl.ir import GroupBy, HStack, Projection, Select, Sort
from cudf_polars.engine.options import StreamingOptions
from cudf_polars.streaming.actor_graph.collectives.sort import (
_is_already_sorted,
_sort_to_order_keys,
)
from cudf_polars.streaming.actor_graph.core import evaluate_logical_plan
from cudf_polars.streaming.actor_graph.utils import (
NormalizedPartitioning,
maybe_remap_partitioning,
)
from cudf_polars.streaming.collectives.sort import (
_is_already_sorted,
_sort_to_order_keys,
)
from cudf_polars.utils.config import ConfigOptions


Expand Down
10 changes: 5 additions & 5 deletions python/cudf_polars/tests/streaming/test_shuffler.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@
from cudf_polars.containers import DataFrame, DataType
from cudf_polars.engine.options import StreamingOptions
from cudf_polars.engine.spmd import allgather_polars_dataframe
from cudf_polars.streaming.actor_graph.utils import (
_is_already_partitioned,
)
from cudf_polars.streaming.collectives.common import reserve_op_id
from cudf_polars.streaming.collectives.shuffle import (
from cudf_polars.streaming.actor_graph.collectives.common import reserve_op_id
from cudf_polars.streaming.actor_graph.collectives.shuffle import (
LocalRepartitioner,
ShuffleManager,
)
from cudf_polars.streaming.actor_graph.utils import (
_is_already_partitioned,
)
from cudf_polars.testing.asserts import assert_gpu_result_equal


Expand Down
2 changes: 1 addition & 1 deletion python/cudf_polars/tests/streaming/test_spmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
SPMDEngine,
allgather_polars_dataframe,
)
from cudf_polars.streaming.collectives.common import reserve_op_id
from cudf_polars.streaming.actor_graph.collectives.common import reserve_op_id
from cudf_polars.testing.asserts import assert_gpu_result_equal
from cudf_polars.utils.config import MemoryResourceConfig

Expand Down
Loading