Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/cudf_polars/cudf_polars/callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ def _callback(
else:
return df, timer.timings
elif config_options.executor.name == "streaming":
from cudf_polars.experimental.parallel import evaluate_streaming
from cudf_polars.streaming.parallel import evaluate_streaming

if timer is not None:
msg = textwrap.dedent("""\
Expand Down
2 changes: 1 addition & 1 deletion python/cudf_polars/cudf_polars/containers/column.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ def serialize(

To enable dask support, dask serializers must be registered

>>> from cudf_polars.experimental.dask_serialize import register
>>> from cudf_polars.streaming.dask_serialize import register
>>> register()

Returns
Expand Down
2 changes: 1 addition & 1 deletion python/cudf_polars/cudf_polars/containers/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ def serialize(

To enable dask support, dask serializers must be registered

>>> from cudf_polars.experimental.dask_serialize import register
>>> from cudf_polars.streaming.dask_serialize import register
>>> register()

Parameters
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@

from cudf_polars.containers import DataFrame
from cudf_polars.dsl.ir import IRExecutionContext
from cudf_polars.experimental.base import StatsCollector
from cudf_polars.experimental.parallel import lower_ir_graph
from cudf_polars.experimental.rapidsmpf.collectives import ReserveOpIDs
from cudf_polars.experimental.rapidsmpf.collectives.common import reserve_op_id
from cudf_polars.experimental.rapidsmpf.core import generate_network
from cudf_polars.experimental.rapidsmpf.tracing import log_query_plan
from cudf_polars.experimental.rapidsmpf.utils import empty_table_chunk
from cudf_polars.experimental.statistics import collect_statistics
from cudf_polars.experimental.utils import _concat
from cudf_polars.streaming.actor_graph.core import generate_network
from cudf_polars.streaming.actor_graph.tracing import log_query_plan
from cudf_polars.streaming.actor_graph.utils import empty_table_chunk
from cudf_polars.streaming.base import StatsCollector
from cudf_polars.streaming.collectives import ReserveOpIDs
from cudf_polars.streaming.collectives.common import reserve_op_id
from cudf_polars.streaming.parallel import lower_ir_graph
from cudf_polars.streaming.statistics import collect_statistics
from cudf_polars.streaming.utils import _concat
from cudf_polars.utils.config import get_total_device_memory

if TYPE_CHECKING:
Expand All @@ -47,8 +47,8 @@
from rapidsmpf.streaming.cudf.channel_metadata import ChannelMetadata

from cudf_polars.dsl.ir import IR
from cudf_polars.experimental.base import PartitionInfo
from cudf_polars.experimental.parallel import ConfigOptions
from cudf_polars.streaming.base import PartitionInfo
from cudf_polars.streaming.parallel import ConfigOptions
from cudf_polars.utils.config import StreamingExecutor


Expand Down Expand Up @@ -174,7 +174,7 @@ def __init__(
):
# Refuse to construct if a ``DefaultSingletonEngine`` is alive
# (no-op for the singleton itself).
from cudf_polars.experimental.rapidsmpf.frontend.default_singleton_engine import (
from cudf_polars.engine.default_singleton_engine import (
check_no_live_default_singleton,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@

import rmm.mr

from cudf_polars.experimental.rapidsmpf.frontend.core import (
from cudf_polars.engine.core import (
ClusterInfo,
StreamingEngine,
check_reserved_keys,
evaluate_on_rank,
resolve_rapidsmpf_options,
)
from cudf_polars.experimental.rapidsmpf.frontend.hardware_binding import (
from cudf_polars.engine.hardware_binding import (
HardwareBindingPolicy,
bind_to_gpu,
)
Expand All @@ -48,9 +48,9 @@
from rapidsmpf.streaming.cudf.channel_metadata import ChannelMetadata

from cudf_polars.dsl.ir import IR
from cudf_polars.experimental.parallel import ConfigOptions
from cudf_polars.experimental.rapidsmpf.frontend.core import T
from cudf_polars.experimental.rapidsmpf.frontend.options import StreamingOptions
from cudf_polars.engine.core import T
from cudf_polars.engine.options import StreamingOptions
from cudf_polars.streaming.parallel import ConfigOptions
from cudf_polars.utils.config import StreamingExecutor


Expand Down Expand Up @@ -87,7 +87,7 @@ def dask_setup(nanny: distributed.Nanny) -> None:
Usage::

dask worker SCHEDULER:8786 --nworkers N --nthreads 1 \
--preload-nanny cudf_polars.experimental.rapidsmpf.frontend.dask
--preload-nanny cudf_polars.engine.dask

Parameters
----------
Expand Down Expand Up @@ -566,9 +566,7 @@ class DaskEngine(StreamingEngine):
and sets ``CUDA_VISIBLE_DEVICES`` per worker), disable some or all of the
built-in binding to avoid conflicts:

>>> from cudf_polars.experimental.rapidsmpf.frontend.hardware_binding import (
... HardwareBindingPolicy,
... )
>>> from cudf_polars.engine.hardware_binding import HardwareBindingPolicy
>>> with DaskEngine( # doctest: +SKIP
... dask_client=dc,
... engine_options={
Expand All @@ -581,7 +579,7 @@ class DaskEngine(StreamingEngine):
one GPU per worker before the worker process spawns::

dask worker SCHEDULER:8786 --nworkers N --nthreads 1 \
--preload-nanny cudf_polars.experimental.rapidsmpf.frontend.dask
--preload-nanny cudf_polars.engine.dask

Then connect from the client::

Expand Down Expand Up @@ -785,9 +783,7 @@ def from_options(

Examples
--------
>>> from cudf_polars.experimental.rapidsmpf.frontend.options import (
... StreamingOptions,
... )
>>> from cudf_polars.engine.options import StreamingOptions
>>> opts = StreamingOptions(num_streaming_threads=4, fallback_mode="silent")
>>> with DaskEngine.from_options(opts) as engine: # doctest: +SKIP
... result = pl.LazyFrame({"a": [1, 2, 3]}).collect(engine=engine)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
)
from rapidsmpf.progress_thread import ProgressThread

from cudf_polars.experimental.rapidsmpf.frontend.core import (
from cudf_polars.engine.core import (
resolve_rapidsmpf_options,
)
from cudf_polars.experimental.rapidsmpf.frontend.spmd import SPMDEngine
from cudf_polars.engine.spmd import SPMDEngine

if TYPE_CHECKING:
from collections.abc import Callable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from rapidsmpf.config import Options
from rapidsmpf.utils.string import parse_boolean

from cudf_polars.experimental.rapidsmpf.frontend.hardware_binding import (
from cudf_polars.engine.hardware_binding import (
HardwareBindingPolicy,
)
from cudf_polars.utils.config import MemoryResourceConfig
Expand Down Expand Up @@ -273,8 +273,7 @@ class StreamingOptions:
Env: ``CUDF_POLARS__CUDA_STREAM_POLICY``.
Category: engine.
hardware_binding
Hardware binding policy. Pass a
:class:`~cudf_polars.experimental.rapidsmpf.frontend.hardware_binding.HardwareBindingPolicy`
Hardware binding policy. Pass a :class:`~cudf_polars.engine.hardware_binding.HardwareBindingPolicy`
instance for fine-grained control.
Env: ``CUDF_POLARS__HARDWARE_BINDING`` (JSON object,
e.g. ``'{"enabled": false}'``).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@

import rmm.mr

from cudf_polars.experimental.rapidsmpf.frontend.core import (
from cudf_polars.engine.core import (
ClusterInfo,
StreamingEngine,
check_reserved_keys,
evaluate_on_rank,
resolve_rapidsmpf_options,
)
from cudf_polars.experimental.rapidsmpf.frontend.hardware_binding import (
from cudf_polars.engine.hardware_binding import (
HardwareBindingPolicy,
bind_to_gpu,
)
Expand All @@ -46,9 +46,9 @@
from ray.actor import ActorHandle

from cudf_polars.dsl.ir import IR
from cudf_polars.experimental.parallel import ConfigOptions
from cudf_polars.experimental.rapidsmpf.frontend.core import T
from cudf_polars.experimental.rapidsmpf.frontend.options import StreamingOptions
from cudf_polars.engine.core import T
from cudf_polars.engine.options import StreamingOptions
from cudf_polars.streaming.parallel import ConfigOptions
from cudf_polars.utils.config import StreamingExecutor


Expand Down Expand Up @@ -163,10 +163,9 @@ class RankActor:

Notes
-----
Calls :func:`~cudf_polars.experimental.rapidsmpf.frontend.hardware_binding.bind_to_gpu`
at construction time, before RMM and communicator initialisation, so that
CPU affinity, NUMA memory policy, and ``UCX_NET_DEVICES`` are set as early
as possible.
Calls :func:`~cudf_polars.engine.hardware_binding.bind_to_gpu` at construction
time, before RMM and communicator initialisation, so that CPU affinity, NUMA
memory policy, and ``UCX_NET_DEVICES`` are set as early as possible.
"""

def __init__(
Expand Down Expand Up @@ -432,11 +431,10 @@ class RayEngine(StreamingEngine):
Creates a RapidsMPF Ray cluster and returns an engine that can be passed
to ``LazyFrame.collect(engine=engine)``.

Prefer :meth:`from_options` for typical use — pass a
:class:`~cudf_polars.experimental.rapidsmpf.frontend.options.StreamingOptions`
instance for a unified, typed interface. The ``__init__`` parameters
(``rapidsmpf_options``, ``executor_options``, ``engine_options``) are
intended for advanced use when fine-grained control is needed.
Prefer :meth:`from_options` for typical use. Pass a :class:`~cudf_polars.engine.options.StreamingOptions`
instance for a unified, typed interface. The ``__init__`` parameters (``rapidsmpf_options``,
``executor_options``, ``engine_options``) are intended for advanced use when
fine-grained control is needed.

Prefer the context-manager form in scripts: it guarantees that actors and
Ray are shut down even if an exception is raised. In interactive environments
Expand Down Expand Up @@ -696,9 +694,7 @@ def from_options(

Examples
--------
>>> from cudf_polars.experimental.rapidsmpf.frontend.options import (
... StreamingOptions,
... )
>>> from cudf_polars.engine.options import StreamingOptions
>>> opts = StreamingOptions(num_streaming_threads=4, fallback_mode="silent")
>>> with RayEngine.from_options(opts) as engine: # doctest: +SKIP
... result = pl.LazyFrame({"a": [1, 2, 3]}).collect(engine=engine)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,20 @@
from pylibcudf.contiguous_split import pack

from cudf_polars.containers import DataFrame, DataType
from cudf_polars.experimental.rapidsmpf.collectives.common import reserve_op_id
from cudf_polars.experimental.rapidsmpf.frontend.core import (
from cudf_polars.engine.core import (
ClusterInfo,
StreamingEngine,
all_gather_host_data,
check_reserved_keys,
evaluate_on_rank,
resolve_rapidsmpf_options,
)
from cudf_polars.experimental.rapidsmpf.frontend.hardware_binding import (
from cudf_polars.engine.hardware_binding import (
HardwareBindingPolicy,
bind_to_gpu,
)
from cudf_polars.experimental.rapidsmpf.utils import set_memory_resource
from cudf_polars.streaming.actor_graph.utils import set_memory_resource
from cudf_polars.streaming.collectives.common import reserve_op_id
from cudf_polars.utils.config import (
MemoryResourceConfig,
SPMDContext,
Expand All @@ -58,9 +58,9 @@
import polars as pl

from cudf_polars.dsl.ir import IR
from cudf_polars.experimental.parallel import ConfigOptions
from cudf_polars.experimental.rapidsmpf.frontend.core import T
from cudf_polars.experimental.rapidsmpf.frontend.options import StreamingOptions
from cudf_polars.engine.core import T
from cudf_polars.engine.options import StreamingOptions
from cudf_polars.streaming.parallel import ConfigOptions
from cudf_polars.utils.config import StreamingExecutor


Expand Down Expand Up @@ -204,11 +204,10 @@ class SPMDEngine(StreamingEngine):
such as shuffles, all-gathers, and joins, coordinate across ranks to produce
a globally consistent result.

Prefer :meth:`from_options` for typical use — pass a
:class:`~cudf_polars.experimental.rapidsmpf.frontend.options.StreamingOptions`
instance for a unified, typed interface. The ``__init__`` parameters
(``rapidsmpf_options``, ``executor_options``, ``engine_options``) are
intended for advanced use when fine-grained control is needed.
Prefer :meth:`from_options` for typical use. Pass a :class:`~cudf_polars.engine.options.StreamingOptions`
instance for a unified, typed interface. The ``__init__`` parameters (``rapidsmpf_options``,
``executor_options``, ``engine_options``) are intended for advanced use when
fine-grained control is needed.

This class is the primary entry point for SPMD execution. It:

Expand Down Expand Up @@ -277,7 +276,7 @@ class SPMDEngine(StreamingEngine):

Every rank must issue the *same* sequence of Polars queries in the *same*
order. Collective operations (shuffles, all-gathers, joins) are matched
across ranks by a monotonically increasing operation ID if one rank calls
across ranks by a monotonically increasing operation ID; if one rank calls
a collective that another rank does not, all ranks will deadlock. This means
your driver script must be fully deterministic: avoid rank-conditional
``collect`` calls, early exits, or any branching that would cause different
Expand All @@ -297,8 +296,7 @@ class SPMDEngine(StreamingEngine):
executor_options
Executor-specific options (e.g. ``max_rows_per_partition``).
engine_options
Engine-specific keyword arguments (e.g. ``raise_on_fail``,
``parquet_options``).
Engine-specific keyword arguments (e.g. ``raise_on_fail``, ``parquet_options``).

Raises
------
Expand All @@ -307,13 +305,11 @@ class SPMDEngine(StreamingEngine):

Notes
-----
Calls
:func:`~cudf_polars.experimental.rapidsmpf.frontend.hardware_binding.bind_to_gpu`
at construction time, before RMM and communicator initialisation, so that
CPU affinity, NUMA memory policy, and ``UCX_NET_DEVICES`` are set as early
as possible. By default, binding is skipped under ``rrun`` (which already
performs its own binding) — see
:attr:`HardwareBindingPolicy.skip_under_rrun`.
Calls :func:`~cudf_polars.engine.hardware_binding.bind_to_gpu` at construction
time, before RMM and communicator initialisation, so that CPU affinity, NUMA
memory policy, and ``UCX_NET_DEVICES`` are set as early as possible. By default,
binding is skipped under ``rrun`` (which already performs its own binding),
see :attr:`HardwareBindingPolicy.skip_under_rrun`.

Examples
--------
Expand Down Expand Up @@ -447,9 +443,7 @@ def from_options(cls, options: StreamingOptions) -> SPMDEngine:

Examples
--------
>>> from cudf_polars.experimental.rapidsmpf.frontend.options import (
... StreamingOptions,
... )
>>> from cudf_polars.engine.options import StreamingOptions
>>> opts = StreamingOptions(num_streaming_threads=8, fallback_mode="silent")
>>> with SPMDEngine.from_options(opts) as engine: # doctest: +SKIP
... result = df.lazy().collect(engine=engine)
Expand Down
8 changes: 0 additions & 8 deletions python/cudf_polars/cudf_polars/experimental/__init__.py

This file was deleted.

27 changes: 0 additions & 27 deletions python/cudf_polars/cudf_polars/experimental/benchmarks/utils.py

This file was deleted.

Loading
Loading