From 438549cc3844857a94749aca072f80ab47463b8a Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Tue, 19 May 2026 15:32:43 +0200 Subject: [PATCH 1/5] Overhaul cudf-polars docs for new streaming multi-GPU engines --- dependencies.yaml | 1 + docs/cudf/source/conf.py | 18 + docs/cudf/source/cudf_polars/api.md | 70 +++- docs/cudf/source/cudf_polars/dask_engine.md | 199 +++++++++++ .../cudf_polars/default_singleton_engine.md | 111 +++++++ .../cudf/source/cudf_polars/engine_options.md | 176 ---------- docs/cudf/source/cudf_polars/engines.md | 94 ++++++ .../source/cudf_polars/in_memory_engine.md | 37 +++ docs/cudf/source/cudf_polars/index.md | 104 ++++++ docs/cudf/source/cudf_polars/index.rst | 56 ---- docs/cudf/source/cudf_polars/options.md | 129 ++++++++ docs/cudf/source/cudf_polars/other_engines.md | 35 ++ docs/cudf/source/cudf_polars/profiling.md | 184 ++++++++++ docs/cudf/source/cudf_polars/spmd_engine.md | 211 ++++++++++++ .../source/cudf_polars/streaming_execution.md | 140 -------- docs/cudf/source/cudf_polars/usage.md | 313 +++++++----------- python/cudf_polars/cudf_polars/engine/dask.py | 4 +- .../engine/default_singleton_engine.py | 11 +- python/cudf_polars/cudf_polars/engine/ray.py | 22 +- python/cudf_polars/cudf_polars/engine/spmd.py | 8 +- 20 files changed, 1336 insertions(+), 587 deletions(-) create mode 100644 docs/cudf/source/cudf_polars/dask_engine.md create mode 100644 docs/cudf/source/cudf_polars/default_singleton_engine.md delete mode 100644 docs/cudf/source/cudf_polars/engine_options.md create mode 100644 docs/cudf/source/cudf_polars/engines.md create mode 100644 docs/cudf/source/cudf_polars/in_memory_engine.md create mode 100644 docs/cudf/source/cudf_polars/index.md delete mode 100644 docs/cudf/source/cudf_polars/index.rst create mode 100644 docs/cudf/source/cudf_polars/options.md create mode 100644 docs/cudf/source/cudf_polars/other_engines.md create mode 100644 docs/cudf/source/cudf_polars/profiling.md create mode 100644 docs/cudf/source/cudf_polars/spmd_engine.md delete mode 100644 docs/cudf/source/cudf_polars/streaming_execution.md diff --git a/dependencies.yaml b/dependencies.yaml index fe9e4e7bdb2..f5dbf41c861 100644 --- a/dependencies.yaml +++ b/dependencies.yaml @@ -161,6 +161,7 @@ files: - depends_on_pylibcudf - depends_on_libcudf - depends_on_cudf_polars + - depends_on_ray - docs - py_version py_build_cudf: diff --git a/docs/cudf/source/conf.py b/docs/cudf/source/conf.py index 6c58e57db74..e4152bfe37e 100644 --- a/docs/cudf/source/conf.py +++ b/docs/cudf/source/conf.py @@ -625,12 +625,30 @@ def on_missing_reference(app, env, node, contnode): ("py:class", "Axis"), ("py:class", "ArrowLike"), ("py:class", "ExecutorType"), + # cudf-polars: bare rapidsmpf type names appear in autodoc'd signatures + # because they are imported under ``if TYPE_CHECKING:`` and rendered as + # unqualified strings in type annotations. The ``rapidsmpf.*`` regex below + # only matches fully qualified targets, so the bare leaf names are listed + # explicitly here. + ("py:class", "Statistics"), + ("py:class", "Communicator"), + ("py:class", "Options"), + # polars aliases that don't match the public intersphinx targets. + ("py:class", "pl.DataFrame"), + ("py:class", "polars.LazyFrame"), + ("py:class", "polars.DataFrame"), + ("py:class", "polars.dataframe.frame.DataFrame"), ] # Temporarily disable nitpick warnings for pandas: https://github.com/pandas-dev/pandas/issues/64584 nitpick_ignore_regex = [ ("py:.*", "pandas.*"), ("py:.*", "pd.*"), ("ref.*", ".*pandas.*"), + # External libs without configured intersphinx inventories. + ("py:.*", r"rapidsmpf(\..*)?"), + ("py:.*", r"ray(\..*)?"), + ("py:.*", r"distributed(\..*)?"), + ("py:.*", r"dask_cuda(\..*)?"), ] diff --git a/docs/cudf/source/cudf_polars/api.md b/docs/cudf/source/cudf_polars/api.md index 823954a3b08..9f89053a584 100644 --- a/docs/cudf/source/cudf_polars/api.md +++ b/docs/cudf/source/cudf_polars/api.md @@ -1,17 +1,73 @@ (cudf-polars-api)= -# API +# API Reference -For the most part, the public API of `cudf-polars` is the polars API. +For the most part, the public API of `cudf-polars` is the Polars API itself. This page +documents the additional classes and functions that `cudf-polars` exposes for the streaming +multi-GPU engines. + +## Streaming engines + +```{eval-rst} +.. autoclass:: cudf_polars.engine.ray.RayEngine + :members: from_options, gather_cluster_info, gather_statistics, global_statistics, shutdown, nranks + :show-inheritance: + +.. autoclass:: cudf_polars.engine.dask.DaskEngine + :members: from_options, gather_cluster_info, gather_statistics, global_statistics, shutdown, nranks + :show-inheritance: + +.. autoclass:: cudf_polars.engine.spmd.SPMDEngine + :members: from_options, gather_cluster_info, gather_statistics, global_statistics, shutdown, nranks, rank, comm, context + :show-inheritance: + +.. autoclass:: cudf_polars.engine.default_singleton_engine.DefaultSingletonEngine + :members: get_or_create, shutdown + :show-inheritance: +``` + +The three engines share a common base class: + +```{eval-rst} +.. autoclass:: cudf_polars.engine.core.StreamingEngine + :members: gather_cluster_info, gather_statistics, global_statistics, shutdown, nranks + :show-inheritance: + +.. autoclass:: cudf_polars.engine.core.ClusterInfo + :members: +``` + +## Configuration + +```{eval-rst} +.. autoclass:: cudf_polars.engine.options.StreamingOptions + :members: from_dict, to_dict, to_rapidsmpf_options, to_executor_options, to_engine_options + +.. autodata:: cudf_polars.engine.options.UNSPECIFIED + +.. autoclass:: cudf_polars.engine.hardware_binding.HardwareBindingPolicy + :members: + +.. autofunction:: cudf_polars.engine.hardware_binding.bind_to_gpu +``` + +## SPMD helpers + +```{eval-rst} +.. autofunction:: cudf_polars.engine.spmd.allgather_polars_dataframe + +.. autofunction:: cudf_polars.streaming.collectives.common.reserve_op_id +``` + +## Internal configuration objects + +These dataclasses back the `engine_options` surfaced by `pl.GPUEngine` and `StreamingOptions`. +Most users interact with them through `StreamingOptions` fields rather than directly. ```{eval-rst} .. automodule:: cudf_polars.utils.config :members: - Cluster, - ConfigOptions, - CUDAStreamPoolConfig, DynamicPlanningOptions, - ExecutorType, - InMemoryExecutor, + MemoryResourceConfig, ParquetOptions, StreamingExecutor, StreamingFallbackMode, diff --git a/docs/cudf/source/cudf_polars/dask_engine.md b/docs/cudf/source/cudf_polars/dask_engine.md new file mode 100644 index 00000000000..87aab8f64e3 --- /dev/null +++ b/docs/cudf/source/cudf_polars/dask_engine.md @@ -0,0 +1,199 @@ +(cudf-polars-dask-engine)= +# Dask + +{class}`~cudf_polars.engine.dask.DaskEngine` runs the streaming executor +on a [Dask distributed][dask-distributed] cluster: one Dask worker per GPU, coordinated by a +single client process. Partitions are streamed through the query plan and collective operations +(shuffles, allgathers, joins) run across workers over a shared UCXX communicator. On startup, +each worker is pinned to the CPU cores and NUMA node closest to its GPU (see +[Pre-configured GPU clusters](#pre-configured-gpu-clusters) below). + +```python +import polars as pl +from cudf_polars.engine.dask import DaskEngine + +with DaskEngine() as engine: + result = ( + pl.scan_parquet("/data/dataset/*.parquet") + .filter(pl.col("amount") > 100) + .group_by("customer_id") + .agg(pl.col("amount").sum()) + .collect(engine=engine) + ) + print(result) +``` + +With no arguments, {class}`~cudf_polars.engine.dask.DaskEngine` creates a +`distributed.LocalCluster` with one worker per visible GPU, a `distributed.Client`, and +bootstraps a UCXX communicator across all workers. On exit, everything it created is torn down. + +```{note} +`.collect()` pulls the full result back to the driver process. For large distributed outputs, +prefer `.sink_*()` or aggregate/sample inside the query before `.collect()`. See +[Result collection](engines.md#result-collection). +``` + +## Configuring `DaskEngine` + +For custom configuration, build +{class}`~cudf_polars.engine.options.StreamingOptions` and use +`DaskEngine.from_options()`: + +```python +import polars as pl +from cudf_polars.engine.options import StreamingOptions +from cudf_polars.engine.dask import DaskEngine + +opts = StreamingOptions(num_streaming_threads=8, fallback_mode="silent") + +with DaskEngine.from_options(opts) as engine: + result = pl.scan_parquet("/data/dataset/*.parquet").collect(engine=engine) +``` + +See {doc}`options` for the available fields. + +## Bring your own Dask client + +Pass an existing `distributed.Client` via `dask_client=` to attach to an already-running +scheduler: + +```python +from distributed import Client +import polars as pl +from cudf_polars.engine.dask import DaskEngine + +with Client("scheduler-address:8786") as dc: + with DaskEngine(dask_client=dc) as engine: + result = pl.scan_parquet("/data/*.parquet").collect(engine=engine) +``` + +When you supply the client, {class}`~cudf_polars.engine.dask.DaskEngine` +leaves it (and the cluster) alone on exit. + +(pre-configured-gpu-clusters)= +### Pre-configured GPU clusters + +Some Dask launchers, notably `dask_cuda.LocalCUDACluster`, already pin CPU affinity and set +`CUDA_VISIBLE_DEVICES` per worker. Disable the built-in hardware binding via +{class}`~cudf_polars.engine.hardware_binding.HardwareBindingPolicy` +to avoid having both layers fight over each worker's affinity (the second to run wins, which +makes the resulting placement non-deterministic): + +```python +from dask_cuda import LocalCUDACluster +from distributed import Client +from cudf_polars.engine.dask import DaskEngine +from cudf_polars.engine.hardware_binding import ( + HardwareBindingPolicy, +) + +with Client(LocalCUDACluster()) as dc, DaskEngine( + dask_client=dc, + engine_options={ + "hardware_binding": HardwareBindingPolicy(enabled=False), + }, +) as engine: + ... +``` + +### Manually launched workers + +When launching workers yourself (for example on a multi-node HPC cluster), use the built-in nanny +preload to assign one GPU per worker. The preload sets `CUDA_VISIBLE_DEVICES` on each worker +before the process spawns: + +```bash +# On each node — launch one worker per GPU with a single thread each: +dask worker SCHEDULER_ADDRESS:8786 --nworkers N --nthreads 1 \ + --preload-nanny cudf_polars.engine.dask +``` + +Then connect from the client: + +```python +import polars as pl +from distributed import Client +from cudf_polars.engine.dask import DaskEngine + +with Client("SCHEDULER_ADDRESS:8786") as dc: + with DaskEngine(dask_client=dc) as engine: + result = pl.scan_parquet("/data/*.parquet").collect(engine=engine) +``` + +Hardware binding (CPU affinity, NUMA, network) is handled automatically by +{class}`~cudf_polars.engine.dask.DaskEngine`; the nanny preload only +deals with GPU assignment. + +See the [Dask CLI deployment guide][dask-cli] for more on `dask worker` options. + +#### Using `dask-cuda-worker` + +As an alternative to the built-in nanny preload, you can launch workers with +[`dask-cuda-worker`][dask-cuda-worker] from the [dask-cuda][dask-cuda] project. It launches one +worker per visible GPU and installs a set of plugins on every worker: a `CPUAffinity` plugin +that pins the worker to the NUMA node of its GPU, an `RMMSetup` plugin, and a nanny preload that +configures UCX. + +`DaskEngine` sets up the same things for its own streaming runtime, so the two need to be +coordinated or they will fight: + +* **CPU affinity is unconditional in `dask-cuda-worker`** — the `CPUAffinity` plugin is always + installed and there is no CLI flag to turn it off. Pass + `hardware_binding=HardwareBindingPolicy(enabled=False)` to `DaskEngine` so it does not try to + re-pin affinity on top of dask-cuda's binding. +* **Do not pass `--rmm-pool-size`, `--rmm-managed-memory`, or similar RMM flags** to + `dask-cuda-worker`. Let `DaskEngine` own the memory resource via its `memory_resource_config` + (see {doc}`options`); otherwise two different memory resources will be installed on the same + worker. +* **Do not pass `--enable-tcp-over-ucx`, `--enable-infiniband`, `--enable-nvlink`, or + `--enable-rdmacm`** to `dask-cuda-worker`. `DaskEngine` bootstraps its own UCXX communicator + and will select transports itself; enabling them on both sides can produce inconsistent UCX + configuration across the cluster. + +```bash +# On each node — GPU assignment + CPU affinity only (no RMM, no UCX flags): +dask-cuda-worker SCHEDULER_ADDRESS:8786 +``` + +```python +import polars as pl +from distributed import Client +from cudf_polars.engine.dask import DaskEngine +from cudf_polars.engine.hardware_binding import ( + HardwareBindingPolicy, +) + +with Client("SCHEDULER_ADDRESS:8786") as dc: + with DaskEngine( + dask_client=dc, + engine_options={ + # dask-cuda-worker always pins CPU affinity; disable DaskEngine's + # binding so the two don't conflict. + "hardware_binding": HardwareBindingPolicy(enabled=False), + }, + ) as engine: + result = pl.scan_parquet("/data/*.parquet").collect(engine=engine) +``` + +## Cluster diagnostics + +{meth}`~cudf_polars.engine.dask.DaskEngine.gather_cluster_info` returns +placement information for every worker: + +```python +with DaskEngine() as engine: + print(f"cluster has {engine.nranks} workers") + for info in engine.gather_cluster_info(): + print( + f"hostname={info['hostname']}, pid={info['pid']}, " + f"CUDA_VISIBLE_DEVICES={info['cuda_visible_devices']}" + ) +``` + +{class}`~cudf_polars.engine.dask.DaskEngine` raises `RuntimeError` if +created inside an `rrun` cluster. + +[dask-distributed]: https://distributed.dask.org/ +[dask-cli]: https://docs.dask.org/en/latest/deploying-cli.html +[dask-cuda]: https://docs.rapids.ai/api/dask-cuda/nightly/ +[dask-cuda-worker]: https://docs.rapids.ai/api/dask-cuda/nightly/quickstart/#dask-cuda-worker diff --git a/docs/cudf/source/cudf_polars/default_singleton_engine.md b/docs/cudf/source/cudf_polars/default_singleton_engine.md new file mode 100644 index 00000000000..5b8148c7ae9 --- /dev/null +++ b/docs/cudf/source/cudf_polars/default_singleton_engine.md @@ -0,0 +1,111 @@ +(cudf-polars-default-singleton-engine)= +# Default `engine="gpu"` + +`.collect(engine="gpu")` (and `engine=pl.GPUEngine()`) is the API users invoke when they don't +construct a streaming engine explicitly. It runs the same streaming executor as the explicit +engines (Ray, Dask, SPMD), conceptually similar to +[Polars' own streaming engine](https://docs.pola.rs/user-guide/concepts/streaming/) but on the +GPU. Under the hood it's backed by +{class}`~cudf_polars.engine.default_singleton_engine.DefaultSingletonEngine`, +a process-wide singleton specialization of +{class}`~cudf_polars.engine.spmd.SPMDEngine`. At most one live +instance exists per process; it is created lazily on first use and torn down at interpreter +exit. Ray is the showcased explicit engine (see {doc}`usage`); this page documents what +`engine="gpu"` does *without* you having to construct anything. + +```{important} +`engine="gpu"` is meant for trivial setup: single-GPU execution with no +configuration or engine object to manage. +For any non-trivial workflow, construct an engine explicitly. To tune +options, use +{meth}`RayEngine.from_options(...) `. +`engine="gpu"` accepts no options, so settings such as +`spill_to_pinned_memory=True` for spill-heavy workloads require an +explicit engine. See {doc}`usage` and {doc}`options`. +``` + +## What you get without an explicit engine + +When the user just writes: + +```python +import polars as pl + +result = ( + pl.scan_parquet("/data/*.parquet") + .group_by("customer_id") + .agg(pl.col("amount").sum()) + .collect(engine="gpu") +) +``` + +cudf-polars uses +{class}`~cudf_polars.engine.default_singleton_engine.DefaultSingletonEngine` +under the hood. No cluster is set up, the rapidsmpf `Context` is bootstrapped on first use, +and subsequent `.collect()` calls in the same process reuse it. + +## Explicit handle + +If you genuinely want the singleton (for example in tests or scripts that need to call +`.shutdown()` deterministically) you can obtain it via the factory: + +```python +from cudf_polars.engine.default_singleton_engine import ( + DefaultSingletonEngine, +) + +engine = DefaultSingletonEngine.get_or_create() +result = query.collect(engine=engine) +``` + +`get_or_create()` is idempotent: calling it again returns the same instance. + +For anything beyond defaults, prefer an explicit engine. See {doc}`usage`. + +## Lifecycle + +The singleton is bootstrapped once per process. The rapidsmpf `Context`, RMM adaptor, and +Python thread-pool executor are reused across every `.collect()` call. + +Shutdown is automatic: the engine registers an `atexit` hook that tears it down at interpreter +exit. To shut it down explicitly (for example to release resources before constructing a +multi-GPU engine), call the static method: + +```python +from cudf_polars.engine.default_singleton_engine import ( + DefaultSingletonEngine, +) + +DefaultSingletonEngine.shutdown() +``` + +`shutdown()` is idempotent (calling it twice is safe) and a no-op if no live engine exists. + +## Mutual exclusion with explicit engines + +`DefaultSingletonEngine`, {class}`~cudf_polars.engine.ray.RayEngine`, +{class}`~cudf_polars.engine.dask.DaskEngine`, and +{class}`~cudf_polars.engine.spmd.SPMDEngine` cannot coexist in the same +process. Concretely: + +- Constructing `RayEngine` / `DaskEngine` / `SPMDEngine` while the singleton is alive raises + `RuntimeError`. +- `DefaultSingletonEngine.get_or_create()` raises `RuntimeError` if any explicit streaming + engine is alive. + +Recommended pattern: pick one engine for the lifetime of the program. If you need to switch, +shut down the active engine first: + +```python +DefaultSingletonEngine.shutdown() +explicit_engine = SPMDEngine.from_options(opts) +``` + +## No options + +`DefaultSingletonEngine.get_or_create()` takes no arguments. To tune `StreamingOptions` such +as `spill_to_pinned_memory`, `fallback_mode`, `max_rows_per_partition`, or any rapidsmpf +runtime knob, construct an explicit +{class}`~cudf_polars.engine.ray.RayEngine` via +{meth}`RayEngine.from_options(...) `. +See {doc}`options` for the available fields. diff --git a/docs/cudf/source/cudf_polars/engine_options.md b/docs/cudf/source/cudf_polars/engine_options.md deleted file mode 100644 index ba6085275b8..00000000000 --- a/docs/cudf/source/cudf_polars/engine_options.md +++ /dev/null @@ -1,176 +0,0 @@ -# GPUEngine Configuration Options - -The `polars.GPUEngine` object may be configured in several different ways. - -## Executor - -`cudf-polars` includes multiple *executors*, backends that take a Polars query and execute it to produce the result (either an in-memory `polars.DataFrame` from `.collect()` or one or more files with `.sink_`). These can be specified with the `executor` option when you create the `GPUEngine`. - -```python -import polars as pl - -engine = pl.GPUEngine(executor="streaming") -query = ... - -result = query.collect(engine=engine) -``` - -The `streaming` executor is the default executor as of RAPIDS 25.08, and is -equivalent to passing `engine="gpu"` or `engine=pl.GPUEngine()` to `collect`. At -a high-level, the `streaming` executor works by breaking inputs (in-memory -DataFrames or parquet files) into multiple pieces and streaming those pieces -through the series of operations needed to produce the final result. - -We also provide an `in-memory` executor. This executor is often faster when the -underlying data fits comfortably in device memory, because the overhead of splitting -inputs and executing them in batches is less beneficial at this scale. With that said, -this executor must rely on [Unified Virtual Memory] (UVM) if the input and intermediate -data do not fit in device memory. The `in-memory` executor can be used with - -```python -engine = pl.GPUEngine(executor="in-memory") -``` - -In general, we recommend starting with the default `streaming` executor, because -it scales significantly better than `in-memory`. The `streaming` executor includes -several configuration options, which can be provided with the `executor_options` -key when constructing the `GPUEngine`: - -```python -engine = pl.GPUEngine( - executor="streaming", # the default - executor_options={ - "max_rows_per_partition": 500_000, - } -) -``` - -You can configure the default value for configuration options through -environment variables with the prefix `CUDF_POLARS__EXECUTOR__{option_name}`. -For example, the environment variable -`CUDF_POLARS__EXECUTOR__MAX_ROWS_PER_PARTITION` will set the default -`max_rows_per_partition` to use if it isn't overridden through -`executor_options`. - -For boolean options, like `sink_to_directory`, the values `{"1", "true", "yes", "y"}` -are considered `True` and `{"0", "false", "no", "n"}` are considered `False`. - -See [Configuration Reference](#cudf-polars-api) for a full list of options, and -[Streaming Execution](#cudf-polars-streaming) for more on the streaming executor, -including multi-GPU execution. - -## Parquet Reader Options - -Reading large parquet files can use a large amount of memory, especially when the files are compressed. This may lead to out of memory errors for some workflows. To mitigate this, the "chunked" parquet reader may be selected. When enabled, parquet files are read in chunks, limiting the peak memory usage at the cost of a small drop in performance. - -To configure the parquet reader, we provide a dictionary of options to the `parquet_options` keyword of the `GPUEngine` object. Valid keys and values are: -- `chunked` indicates that chunked parquet reading is to be used. By default, chunked reading is turned on. -- [`chunk_read_limit`](https://docs.rapids.ai/api/libcudf/legacy/classcudf_1_1io_1_1chunked__parquet__reader#aad118178b7536b7966e3325ae1143a1a) controls the maximum size per chunk. By default, the maximum chunk size is unlimited. -- [`pass_read_limit`](https://docs.rapids.ai/api/libcudf/legacy/classcudf_1_1io_1_1chunked__parquet__reader#aad118178b7536b7966e3325ae1143a1a) controls the maximum memory used for decompression. The default pass read limit is 16GiB. - -For example, to select the chunked reader with custom values for `pass_read_limit` and `chunk_read_limit`: -```python -engine = GPUEngine( - parquet_options={ - 'chunked': True, - 'chunk_read_limit': int(1e9), - 'pass_read_limit': int(4e9) - } -) -result = query.collect(engine=engine) -``` -Note that passing `chunked: False` disables chunked reading entirely, and thus `chunk_read_limit` and `pass_read_limit` will have no effect. - -You can configure the default value for configuration options through -environment variables with the prefix -`CUDF_POLARS__PARQUET_OPTIONS__{option_name}`. For example, the environment -variable `CUDF_POLARS__PARQUET_OPTIONS__CHUNKED=0` will set the default -`chunked` to `False`. - -## CUDA Stream Policy - -By default, all CUDA operations in `cudf-polars` are launched on the default -stream. You can configure `cudf-polars` to use multiple CUDA streams, which may -improve performance by overlapping data transfers and kernel launches. - -This behavior is configured by the `cuda_stream_policy` keyword or -`CUDF_POLARS__CUDA_STREAM_POLICY` environment variable. The valid options are - -* `default`: use the default CUDA stream for all kernel launches and memory operations -* `new`: create a new CUDA stream when necessary (e.g. when reading from a file or loading an in-memory `polars.LazyFrame` object, - or when performing a join where the inputs might be on different streams) -* `pool`: use an RMM stream pool (only supported with the rapidsmpf runtime) - -The ``rapidsmpf`` runtime for the streaming executor also supports using a CUDA Stream Pool. - -```python -engine = GPUEngine( - executor="streaming", - executor_options={ - "runtime": "rapidsmpf", - }, - cuda_stream_policy="pool", -) -``` - -Or provide a dictionary with configuration options for the pool, like `cuda_stream_pool={"pool_size": 16}`. -You can also set the `CUDF_POLARS__CUDA_STREAM_POLICY` environment variable the JSON encoded configuration dictionary. - -This stream pool is shared between cudf-polars and rapidsmpf. - -## Memory Resource - -All GPU memory allocations made by cudf-polars use an RMM Memory Resource object from {mod}`rmm.mr`. You can specify -the memory resource to use by: - -1. Passing a concrete `MemoryResource` instance to {class}`~polars.lazyframe.engine_config.GPUEngine`. -2. Passing the configuration options for a Memory Resource as the `memory_resource_config` keyword argument to {class}`~polars.lazyframe.engine_config.GPUEngine`. -3. Relying on the default behavior, which creates a memory resource for you (details below). - -By default, cudf-polars will create a new RMM Memory Resource for you, which is cached and reused -for each query. The type of that memory resource is hardware-dependent. GPUs that support [Unified Virtual Memory] memory, -use a {class}`rmm.mr.ManagedMemoryResource` wrapped in a {class}`rmm.mr.PoolMemoryResource` and {class}`rmm.mr.PrefetchResourceAdaptor`. -Otherwise, {class}`rmm.mr.CudaAsyncMemoryResource` is used. - -Set `POLARS_GPU_ENABLE_CUDA_MANAGED_MEMORY=0` to disabled managed memory and use {class}`rmm.mr.CudaAsyncMemoryResource` instead. - -Alternatively, you can customize the pool by passing the configuration for an RMM Memory Resource object as `memory_resource_config` -when creating your {class}`~polars.lazyframe.engine_config.GPUEngine`: - -```python -memory_resource_config = { - "qualname": "rmm.mr.CudaAsyncMemoryResource", - "options": { - "initial_pool_size": "100 MiB", - } -} - -engine = pl.GPUEngine(memory_resource_config=memory_resource_config) -``` - -This lets you control things like the initial pool size or release threshold. - -Finally, for maximum flexibility, you can create your own memory resource object and pass it into the {class}`~polars.lazyframe.engine_config.GPUEngine`: - -```python -import polars as pl -import rmm - -mr = rmm.mr.CudaAsyncMemoryResource() -engine = pl.GPUEngine(memory_resource=mr) -``` - -Passing a concrete memory resource takes precedence over passing the `memory_resource_config` options, -which takes precedence over the default memory resource. - -Note that providing a concrete memory resource isn't an option with the distributed scheduler, -because the concrete memory resource is only valid for the process in which it was created. - -## Disabling CUDA Managed Memory - -By default the `in-memory` executor will use [CUDA managed memory](https://docs.nvidia.com/cuda/cuda-c-programming-guide/index.html#unified-memory-introduction) with RMM's pool allocator. On systems that don't support managed memory, a non-managed asynchronous pool -allocator is used. -Managed memory can be turned off by setting `POLARS_GPU_ENABLE_CUDA_MANAGED_MEMORY` to `0`. System requirements for managed memory can be found [here]( -https://docs.nvidia.com/cuda/cuda-c-programming-guide/index.html#system-requirements-for-unified-memory). - -[Unified Virtual Memory]: https://developer.nvidia.com/blog/unified-memory-cuda-beginners/ diff --git a/docs/cudf/source/cudf_polars/engines.md b/docs/cudf/source/cudf_polars/engines.md new file mode 100644 index 00000000000..0434ebcf192 --- /dev/null +++ b/docs/cudf/source/cudf_polars/engines.md @@ -0,0 +1,94 @@ +(cudf-polars-engines)= +# Engines + +## What is an engine? + +`cudf-polars` executes Polars `LazyFrame` queries on GPU. You select GPU execution by passing an +`engine=` argument to `.collect()` or `.sink_*()`. The `engine` you pass decides *how* the +query runs: whether it streams through partitioned inputs or fits everything in device memory, +whether it runs in-process or distributes work across a cluster of GPU workers, and which +cluster backend coordinates those workers. + +## Execution modes + +### Streaming + +Streaming engines partition their inputs (Parquet files or in-memory `DataFrame`s) and process +those partitions through the query graph in chunks. This lets queries scale past device memory +and (on Ray, Dask, and SPMD) across multiple GPUs and multiple nodes. cudf-polars' streaming +executor is its own GPU implementation, but conceptually parallels +[Polars' CPU streaming engine](https://docs.pola.rs/user-guide/concepts/streaming/): the same +partition-and-stream model, just on the GPU. + +All four ways of running cudf-polars use this same streaming executor: +{class}`~cudf_polars.engine.ray.RayEngine`, +{class}`~cudf_polars.engine.dask.DaskEngine`, +{class}`~cudf_polars.engine.spmd.SPMDEngine`, and the default +`engine="gpu"` (backed internally by +{class}`~cudf_polars.engine.default_singleton_engine.DefaultSingletonEngine`). +They differ only in how their GPU worker(s) are provisioned. +{class}`~cudf_polars.engine.ray.RayEngine` with no arguments uses every +GPU visible to the process, so on a single node with N GPUs it runs the query on all N of them +without any extra configuration. Launching a multi-node cluster simply means pointing the +engine at that cluster; the user-facing code is the same. + +### In-memory + +The in-memory engine (`engine=pl.GPUEngine(executor="in-memory")`) is +the only non-streaming path. It runs the query on a single GPU, materializing intermediates in +device memory. Use it for small queries (data that fits in device memory), debugging, or when +you specifically need `LazyFrame.profile` support (see {doc}`profiling`). For production +workloads on any nontrivial dataset, use a streaming engine. See {doc}`in_memory_engine` for +details. + +## Cluster backends + +| Engine | Cluster model | Extra runtime dependency | Typical use | +| --------------------------------------------------------------------- | ------------------------------------------------------------------- | ------------------------ | --------------------------------------------------------------------------------- | +| {class}`~cudf_polars.engine.ray.RayEngine` | Single-client driver; one Ray actor per GPU | [Ray][ray-docs] | Works from a laptop to a cloud cluster. No separate cluster setup needed. | +| {class}`~cudf_polars.engine.dask.DaskEngine` | Single-client driver; one Dask worker per GPU | [Dask distributed][dask] | Teams with an existing Dask deployment or a preferred Dask launcher. | +| {class}`~cudf_polars.engine.spmd.SPMDEngine` | Same script runs once per GPU, joined by a communicator | UCXX (under `rrun`) | HPC / SPMD launchers such as `rrun`. Single-rank mode needs no cluster at all. | +| [`engine="gpu"`](default_singleton_engine.md) | Implicit process-wide singleton on one GPU; no cluster | None | Default when no engine is constructed. Short scripts and notebooks. No options. | + +All four approaches use the same execution model under the hood, so which to select depends +on your preferred deployment method, not performance tradeoffs. For any non-trivial workflow, +construct one of the first three engines explicitly (see {doc}`usage`); `engine="gpu"` is a +convenience and accepts no options, so it cannot be tuned. See +{doc}`default_singleton_engine` for details on the implementation that backs it. + +## Result collection + +`.collect()` returns a single `pl.DataFrame` on the **caller's process**. On the streaming +engines that has two flavors: + +- **`RayEngine` / `DaskEngine`** (single-client driver): every partition is pulled from the + cluster workers back to the driver and concatenated there. Convenient for results that fit + in driver memory but **a foot-gun for full distributed datasets**. E.g., calling `.collect()` + on a 1 TB query result sends 1 TB through your driver. Sink the result + (`.sink_parquet("path/")`, `.sink_csv(...)`, …) so each rank writes its own partition + directly, or reduce/sample the data inside the query before `.collect()`. +- **`SPMDEngine`** (one process per GPU): each rank's `.collect()` returns *that rank's* + local fragment. There is no driver to gather to. If you need a single concatenated + `pl.DataFrame` across ranks, call + {func}`~cudf_polars.engine.spmd.allgather_polars_dataframe` explicitly (see + [Collecting distributed results](spmd_engine.md#collecting-distributed-results)). If you + want to keep processing the data rank-by-rank, just stay in `SPMDEngine` and use its + MPI-style model: each rank already owns its fragment. +- **`engine="gpu"`**: single GPU, no cluster, so `.collect()` is the only sensible option. + +Rules of thumb for multi-machine `RayEngine` / `DaskEngine` runs: + +- For exports: prefer `.sink_*()` over `.collect()`. +- For analysis: aggregate, sample, or `limit()` the result inside the lazy query before + `.collect()` so the driver only sees a small DataFrame. +- For further distributed processing in Python: switch to `SPMDEngine` so each rank keeps + its fragment. + +## Where to go next + +- {doc}`usage` — tutorial that walks through running your first GPU query end-to-end. +- {doc}`other_engines` — per-engine reference pages for DaskEngine and SPMDEngine. +- {doc}`options` — the `StreamingOptions` configuration object and every field it surfaces. + +[ray-docs]: https://docs.ray.io/ +[dask]: https://distributed.dask.org/ diff --git a/docs/cudf/source/cudf_polars/in_memory_engine.md b/docs/cudf/source/cudf_polars/in_memory_engine.md new file mode 100644 index 00000000000..8ee82af3ae4 --- /dev/null +++ b/docs/cudf/source/cudf_polars/in_memory_engine.md @@ -0,0 +1,37 @@ +(cudf-polars-in-memory-engine)= +# In-memory engine + +The in-memory engine (`engine=pl.GPUEngine(executor="in-memory")`) is +the only non-streaming path in cudf-polars. It materializes the whole query in device memory +on a single GPU. + +For most workflows, prefer a streaming engine. Use the in-memory engine when: + +- The data comfortably fits in device memory and you want minimum setup. +- You need `LazyFrame.profile` (see {doc}`profiling`). +- You are debugging and want the simpler, non-streaming execution path. + +```python +result = query.collect(engine=pl.GPUEngine(executor="in-memory")) +``` + +This is the path documented in Polars' own [GPU support guide][polars-gpu]. By contrast, +`engine="gpu"` (or `engine=pl.GPUEngine()`) selects the default streaming path on a single GPU +(see {doc}`default_singleton_engine`). That default accepts no options, so for anything beyond +a quick script, construct an explicit engine. + +## Configuration + +The in-memory engine does not accept +{class}`~cudf_polars.engine.options.StreamingOptions`. Pass keyword +arguments to `pl.GPUEngine(...)` directly: + +```python +import polars as pl + +engine = pl.GPUEngine(executor="in-memory", parquet_options={"chunked": True}) +``` + +See the [Polars GPU support guide][polars-gpu] for the full in-memory usage story. + +[polars-gpu]: https://docs.pola.rs/user-guide/gpu-support/ diff --git a/docs/cudf/source/cudf_polars/index.md b/docs/cudf/source/cudf_polars/index.md new file mode 100644 index 00000000000..f98d3e2ecb4 --- /dev/null +++ b/docs/cudf/source/cudf_polars/index.md @@ -0,0 +1,104 @@ +# Polars GPU engine + +cuDF provides GPU-accelerated execution engines for Python users of the Polars Lazy API. The +engines support most of the core expressions and data types as well as a growing set of more +advanced dataframe manipulations and data file formats. When a GPU engine is selected, Polars +converts expressions into an optimized query plan and determines whether the plan is supported +on the GPU. If it is not, the execution transparently falls back to the standard Polars engine +and runs on the CPU. + +## Install + +Follow the [RAPIDS installation guide](https://docs.rapids.ai/install) and pick the +`cudf-polars` package for your CUDA and Python versions. For example, with conda: + +```bash +conda install -c rapidsai -c conda-forge -c nvidia cudf-polars +``` + +Or with pip (CUDA 13 wheels; use `cudf-polars-cu12` for CUDA 12): + +```bash +pip install cudf-polars-cu13 +``` + +## Quick start + +{class}`~cudf_polars.engine.ray.RayEngine` with no arguments uses +every GPU visible to the process, so the same code runs on one GPU and scales to multi-GPU / +multi-node setups automatically: + +```python +import polars as pl +from cudf_polars.engine.ray import RayEngine + +query = ( + pl.scan_parquet("/data/dataset/*.parquet") + .filter(pl.col("amount") > 100) + .group_by("customer_id") + .agg(pl.col("amount").sum()) +) + +with RayEngine() as engine: + result = query.collect(engine=engine) +``` + +See {doc}`usage` for the full tutorial, {doc}`engines` for a conceptual overview of the +available engines, and {doc}`options` for the +{class}`~cudf_polars.engine.options.StreamingOptions` configuration. + +## Benchmark + +```{note} +The following benchmarks were performed with the `POLARS_GPU_ENABLE_CUDA_MANAGED_MEMORY` +environment variable set to `"0"`. Using managed memory (the default) imposes a performance cost +in order to avoid out of memory errors. Peak performance can still be attained by setting the +environment variable to `0`. +``` + +We reproduced the [Polars Decision Support (PDS)](https://github.com/pola-rs/polars-benchmark) +benchmark to compare Polars GPU engine with the default CPU settings across several dataset sizes. +Here are the results: + +```{figure} ../_static/pds_benchmark_polars.png +:width: 600px +``` + +You can see up to 13x speedup using the GPU engine on the compute-heavy PDS queries involving +complex aggregation and join operations. Below are the speedups for the top performing queries: + +```{figure} ../_static/compute_heavy_queries_polars.png +:width: 1000px +``` + +*PDS-H benchmark | GPU: NVIDIA H100 PCIe | CPU: Intel Xeon W9-3495X (Sapphire Rapids) | Storage: +Local NVMe* + +You can reproduce the results by visiting the [Polars Decision Support (PDS) GitHub repository](https://github.com/pola-rs/polars-benchmark). + +## Learn More + +The GPU engine for Polars is now available in Open Beta and the engine is undergoing rapid development. +To learn more, visit the [GPU Support page](https://docs.pola.rs/user-guide/gpu-support/) on the Polars website. + +```{toctree} +:maxdepth: 1 +:caption: Contents: + +usage +engines +options +profiling +other_engines +api +``` + +## Launch on Google Colab + +```{figure} ../_static/colab.png +:width: 200px +:target: https://nvda.ws/4eKlWZW + +Try out the GPU engine for Polars in a free GPU notebook environment. +Sign in with your Google account and [launch the demo on Colab](https://nvda.ws/4eKlWZW). +``` diff --git a/docs/cudf/source/cudf_polars/index.rst b/docs/cudf/source/cudf_polars/index.rst deleted file mode 100644 index 40759a72fa6..00000000000 --- a/docs/cudf/source/cudf_polars/index.rst +++ /dev/null @@ -1,56 +0,0 @@ -Polars GPU engine -================= - -cuDF provides an in-memory, GPU-accelerated execution engine for Python users of the Polars Lazy API. -The engine supports most of the core expressions and data types as well as a growing set of more advanced dataframe manipulations -and data file formats. When using the GPU engine, Polars will convert expressions into an optimized query plan and determine -whether the plan is supported on the GPU. If it is not, the execution will transparently fall back to the standard Polars engine -and run on the CPU. This functionality is available in Open Beta, is undergoing rapid development, and is currently a single GPU implementation. - -Benchmark ---------- - -.. note:: - The following benchmarks were performed with the ``POLARS_GPU_ENABLE_CUDA_MANAGED_MEMORY`` environment variable set to ``"0"``. - Using managed memory (the default) imposes a performance cost in order to avoid out of memory errors. - Peak performance can still be attained by setting the environment variable to ``0``. - -We reproduced the `Polars Decision Support (PDS) `__ benchmark to compare Polars GPU engine with the default CPU settings across several dataset sizes. Here are the results: - -.. figure:: ../_static/pds_benchmark_polars.png - :width: 600px - - - -You can see up to 13x speedup using the GPU engine on the compute-heavy PDS queries involving complex aggregation and join operations. Below are the speedups for the top performing queries: - - -.. figure:: ../_static/compute_heavy_queries_polars.png - :width: 1000px - -:emphasis:`PDS-H benchmark | GPU: NVIDIA H100 PCIe | CPU: Intel Xeon W9-3495X (Sapphire Rapids) | Storage: Local NVMe` - -You can reproduce the results by visiting the `Polars Decision Support (PDS) GitHub repository `__. - -Learn More ----------- - -The GPU engine for Polars is now available in Open Beta and the engine is undergoing rapid development. To learn more, visit the `GPU Support page `__ on the Polars website. - -.. toctree:: - :maxdepth: 1 - :caption: Contents: - - usage - streaming_execution - engine_options - api - -Launch on Google Colab ----------------------- - -.. figure:: ../_static/colab.png - :width: 200px - :target: https://nvda.ws/4eKlWZW - - Try out the GPU engine for Polars in a free GPU notebook environment. Sign in with your Google account and `launch the demo on Colab `__. diff --git a/docs/cudf/source/cudf_polars/options.md b/docs/cudf/source/cudf_polars/options.md new file mode 100644 index 00000000000..a779f0c5ced --- /dev/null +++ b/docs/cudf/source/cudf_polars/options.md @@ -0,0 +1,129 @@ +(cudf-polars-options)= +# Configuration Options + +{class}`~cudf_polars.engine.options.StreamingOptions` is the recommended +way to configure the streaming engines (Ray, Dask, SPMD; the default `engine="gpu"` accepts no +options, see the note below). Build one and pass it to `RayEngine.from_options()` +to construct a {class}`~cudf_polars.engine.ray.RayEngine`: + +```python +import polars as pl +from cudf_polars.engine.options import StreamingOptions +from cudf_polars.engine.ray import RayEngine + +opts = StreamingOptions( + num_streaming_threads=8, + fallback_mode="silent", + spill_device_limit="70%", +) + +with RayEngine.from_options(opts) as engine: + result = ( + pl.scan_parquet("/data/*.parquet") + .filter(pl.col("amount") > 100) + .group_by("customer_id") + .agg(pl.col("amount").sum()) + .collect(engine=engine) + ) +``` + +```{note} +`engine="gpu"` (the default when no engine is constructed) accepts no +{class}`~cudf_polars.engine.options.StreamingOptions`. Many of the +fields below have a noticeable runtime impact (for example `spill_to_pinned_memory=True` +significantly speeds up spill-heavy workflows), so to use any non-default value construct one +of the engines listed below. +``` + +{class}`~cudf_polars.engine.options.StreamingOptions` covers three +categories of fields: + +| Category | Scope | Env var prefix | +| ----------- | -------------------------------------------------------------------------------------- | ------------------------- | +| `rapidsmpf` | Streaming runtime, e.g. threads, CUDA streams, spilling, pinned memory, log level | `RAPIDSMPF_` | +| `executor` | Query execution and partitioning, e.g. `max_rows_per_partition`, `fallback_mode`, ... | `CUDF_POLARS__EXECUTOR__` | +| `engine` | `pl.GPUEngine` kwargs, e.g. Parquet, memory resource, CUDA streams, hardware binding | `CUDF_POLARS__` | + +The `engine` category surfaces the same tuning knobs as plain `pl.GPUEngine(...)`. For example, +`parquet_options` and `memory_resource_config`. Configure these settings through +{class}`~cudf_polars.engine.options.StreamingOptions` rather than +passing them to `pl.GPUEngine(...)` directly. + +The `rapidsmpf` category adds configuration for the streaming runtime that has no equivalent on the plain +`pl.GPUEngine`. See the [streaming runtime configuration reference][rapidsmpf-config] for the underlying +meaning of each `RAPIDSMPF_*` field. + +Every option has a corresponding environment variable. When an option is not set explicitly, its +value is read from the environment variable if present; otherwise the underlying library applies +its built-in default. Boolean environment variables accept `{"1", "true", "yes", "y"}` as true +and `{"0", "false", "no", "n"}` as false. + + +## Building from a dictionary + +{meth}`~cudf_polars.engine.options.StreamingOptions.from_dict` accepts a +flat dict of field names. Unknown keys raise `TypeError`; `None` values leave the field +unspecified: + +```python +opts = StreamingOptions.from_dict({ + "num_streaming_threads": 8, + "fallback_mode": "silent", +}) +``` + +This is convenient when options come from a config file or CLI. + +## Engine keyword arguments + +Each engine ({class}`~cudf_polars.engine.ray.RayEngine`, +{class}`~cudf_polars.engine.dask.DaskEngine`, or +{class}`~cudf_polars.engine.spmd.SPMDEngine`) accepts +`rapidsmpf_options`, `executor_options`, and `engine_options` as raw keyword arguments. +We recommend using this only when you need fine-grained control that doesn't fit the +{class}`~cudf_polars.engine.options.StreamingOptions` schema. +Otherwise, prefer the engine's `from_options` constructor with +{class}`~cudf_polars.engine.options.StreamingOptions`. + +For the in-memory engine, +{class}`~cudf_polars.engine.options.StreamingOptions` does not apply. +See {doc}`in_memory_engine` for how to configure it. + + +## Options Reference + +Environment variables follow these patterns: + +* `rapidsmpf`: `RAPIDSMPF_` (e.g. `RAPIDSMPF_NUM_STREAMING_THREADS`) +* `executor`: `CUDF_POLARS__EXECUTOR__` (e.g. `CUDF_POLARS__EXECUTOR__FALLBACK_MODE`) +* `engine`: `CUDF_POLARS__` (e.g. `CUDF_POLARS__RAISE_ON_FAIL`; nested prefixes for structured options) + +### Category: `rapidsmpf` + +See the [streaming runtime configuration reference][rapidsmpf-config] for the full list of fields and defaults. + +### Category: `executor` + +| Field | Description | Default | +|--------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------|-------------| +| `num_py_executors` | Workers for the internal Python `ThreadPoolExecutor`. | `8` | +| `fallback_mode` | When an unsupported operation forces a fallback to CPU execution: `"warn"`, `"raise"`, `"silent"`. | `"warn"` | +| `max_rows_per_partition` | Maximum number of rows per partition. Only used for in-memory `DataFrame` sources, never for disk IO or dynamic planning. | `1_000_000` | +| `broadcast_limit` | Maximum number of bytes for broadcast joins. | auto | +| `target_partition_size` | Target partition size in bytes. Used for IO and dynamic planning. `0` means auto. | auto | +| `dynamic_planning` | Dynamic planning configuration, dict or {class}`~cudf_polars.utils.config.DynamicPlanningOptions`. `None` disables. | enabled | +| `sink_to_directory` | Whether `.sink_*()` writes its output as a directory. The `spmd`, `ray`, and `dask` engines always use `True`; passing `False` raises `ValueError`. | `True` | + +### Category: `engine` + +| Field | Description | Default | +|--------------------------|-------------------------------------------------------------------------------------------------------------------------------|---------------------------| +| `raise_on_fail` | Raise an error instead of falling back to CPU execution. | `False` | +| `parquet_options` | Parquet configuration, dict or {class}`~cudf_polars.utils.config.ParquetOptions`. | — | +| `memory_resource_config` | RMM configuration, dict or {class}`~cudf_polars.utils.config.MemoryResourceConfig`. | — | +| `cuda_stream_policy` | CUDA stream policy (`"default"`, `"pool"`, or a configuration dict). | — | +| `hardware_binding` | Hardware binding policy. Pass a {class}`~cudf_polars.engine.hardware_binding.HardwareBindingPolicy` for fine-grained control. | `HardwareBindingPolicy()` | +| `allow_gpu_sharing` | When `False` (default), the engine raises if multiple ranks share the same physical GPU. | `False` | + + +[rapidsmpf-config]: https://docs.rapids.ai/api/rapidsmpf/nightly/configuration/ diff --git a/docs/cudf/source/cudf_polars/other_engines.md b/docs/cudf/source/cudf_polars/other_engines.md new file mode 100644 index 00000000000..2c4e3acfa4a --- /dev/null +++ b/docs/cudf/source/cudf_polars/other_engines.md @@ -0,0 +1,35 @@ +(cudf-polars-other-engines)= +# Other Engines + +The examples in {doc}`usage` use +{class}`~cudf_polars.engine.ray.RayEngine`. The pages below cover +other ways to run cudf-polars: + +* **{doc}`dask_engine`** runs on a [Dask distributed][dask] cluster with one Dask worker per + GPU. Use this when you already have a Dask deployment or a preferred Dask launcher. +* **{doc}`spmd_engine`** is single program, multiple data: the same script runs once per GPU, + typically launched with `rrun`. Single-rank mode needs no external cluster at all. +* **{doc}`default_singleton_engine`** documents what `engine="gpu"` does under the hood + when no engine is constructed explicitly. Useful to *understand*; for any non-trivial + workflow we recommend constructing an explicit engine so you can pass + {class}`~cudf_polars.engine.options.StreamingOptions`. +* **{doc}`in_memory_engine`** (`engine=pl.GPUEngine(executor="in-memory")`) is the only non-streaming path. Suitable + for small queries (data that fits in device memory), debugging, or when you specifically + need `LazyFrame.profile`. + +See {doc}`engines` for the conceptual comparison with `RayEngine` (cluster model, runtime +dependencies, typical use), and {doc}`options` for the shared +{class}`~cudf_polars.engine.options.StreamingOptions` configuration +(the in-memory engine does not accept `StreamingOptions`). + +```{toctree} +:maxdepth: 1 +:hidden: + +dask_engine +spmd_engine +default_singleton_engine +in_memory_engine +``` + +[dask]: https://distributed.dask.org/ diff --git a/docs/cudf/source/cudf_polars/profiling.md b/docs/cudf/source/cudf_polars/profiling.md new file mode 100644 index 00000000000..a2e08586e65 --- /dev/null +++ b/docs/cudf/source/cudf_polars/profiling.md @@ -0,0 +1,184 @@ +(cudf-polars-profiling)= +# Profiling and Tracing + +## Streaming Statistics + +When a query runs on a streaming engine +({class}`~cudf_polars.engine.ray.RayEngine`, +{class}`~cudf_polars.engine.dask.DaskEngine`, +{class}`~cudf_polars.engine.spmd.SPMDEngine`, or the default +`engine="gpu"`), the underlying streaming runtime can record detailed per-rank statistics: +shuffle byte counts, allgather participation, memory-pool high-water marks, and more. See the +[underlying statistics reference][rapidsmpf-stats] for the full list of metrics. + +Statistics collection is off by default. Enable it by setting `statistics=True` on +{class}`~cudf_polars.engine.options.StreamingOptions` (or exporting +`RAPIDSMPF_STATISTICS=1`), then call `gather_statistics()` on the engine to pull the per-rank +records: + +```python +import polars as pl +from cudf_polars.engine.options import StreamingOptions +from cudf_polars.engine.ray import RayEngine + +opts = StreamingOptions(statistics=True) + +with RayEngine.from_options(opts) as engine: + result = ( + pl.scan_parquet("/data/*.parquet") + .group_by("customer_id") + .agg(pl.col("amount").sum()) + .collect(engine=engine) + ) + + per_rank = engine.gather_statistics(clear=True) + for rank, stats in enumerate(per_rank): + print(f"rank {rank}:\n{stats}") +``` + +`gather_statistics(*, clear=False)` returns a list of `rapidsmpf.statistics.Statistics` objects, +one per rank, in rank order. Passing `clear=True` resets each rank's counters after the gather — +useful when you want to scope statistics to a single query. + +Use `global_statistics(*, clear=False)` when you only need the cluster-wide picture. It gathers +and merges the per-rank statistics into a single `Statistics` (counts and values summed, maxima +reduced with `max`). Capture it inside the engine context, then print after exit: + +```python +with RayEngine.from_options(opts) as engine: + result = pl.scan_parquet("/data/*.parquet").collect(engine=engine) + total = engine.global_statistics(clear=True) +print(total) +``` + + +## GPU Profiling + +For streaming queries, we recommend profiling with [NVIDIA NSight Systems][nsight]; `cudf-polars` +includes [nvtx][nvtx] annotations to help you understand where time is being spent. Streaming +engines do not support `LazyFrame.profile`, since `profile` requires a single in-memory pass. + +If you specifically need [`LazyFrame.profile`](https://docs.pola.rs/api/python/stable/reference/lazyframe/api/polars.LazyFrame.profile.html), +the in-memory engine supports it. This is useful for small queries during development: + +```python +import polars as pl +q = pl.scan_parquet("ny-taxi/2024/*.parquet").filter(pl.col("total_amount") > 15.0) +profile = q.profile(engine=pl.GPUEngine(executor="in-memory")) +``` + +The result is `(result_df, timings_df)`; see the Polars docs link above for the schema. + +## Tracing + +cudf-polars can optionally trace execution of each node in the query plan. To enable tracing, set +the environment variable ``CUDF_POLARS_LOG_TRACES`` to a true value ("1", "true", "y", "yes") +before starting your process. + +cudf-polars logs traces at three scopes (levels): + +1. `plan`: These generally happen once per query. This will include things like the (serialized) + query plan. +2. `actor`: (streaming engines only). There will be roughly one `actor` trace per node in the + logical plan. +3. `evaluate_ir_node`: Logs the evaluation of a physical node in the query plan. Note that one + logical node might expand to more than one physical nodes. + +Each trace includes a `scope` key indicating which level that trace belongs to. `actor`-scoped +nodes will be nested under a `plan`-scoped node. When using a streaming engine, +`evaluate_ir_node`-scoped nodes will be nested under an `actor`-scoped node. + +### Schemas + +The different scopes have different schemas. Fields in **bold** are required / always present. + +#### scope=plan + +| Field Name | Type | Description | +| ---------- | ----- | ----------- | +| **scope** | Literal["plan"] | The string literal `"plan"`. Useful for distinguishing from other types of traces. | +| **cudf_polars_query_id** | UUID4 | A unique identifier for the polars query being executed. All traces logged as part of this query use this ID. | +| **plan** | `PlanObject` | A serialized representation of the query plan. | +| **event** | String | A message like "Query Plan" | + +#### scope=actor + +`actor`-scoped traces only appear when running on a streaming engine. + +| Field Name | Type | Description | +| ---------- | ----- | ----------- | +| **scope** | Literal["actor"] | The string literal `"actor"`. Useful for distinguishing from other types of traces. | +| **cudf_polars_query_id** | UUID4 | A unique identifier for the polars query being executed. All traces logged as part of this query use this ID. | +| **start** | int | A nanosecond-resolution counter indicating when the actor started. Note: actors generally start early in the query and suspend waiting for data. | +| **stop** | int | A nanosecond-resolution counter indicating when the actor completed. | +| **event** | String | A message like "Streaming Actor". | +| **actor_ir_type** | String | The type of the actor, like `"Scan"`. | +| **actor_ir_id** | int | A unique identifier for the actor. All traces logged under this actor will include this value. | +| chunk_count | int | A counter for how many table chunks have been processed by this actor at the time of logging. | +| duplicated | bool | Whether the output rows are duplicated across ranks (e.g. after an allgather). | +| row_count | int | Total row count produced by this node during execution. | + +#### scope=evaluate_ir_node + +| Field Name | Type | Description | +| ---------- | ----- | ----------- | +| **scope** | `Literal["evaluate_ir_node"]` | The string literal `"evaluate_ir_node"`. Useful for distinguishing from other types of traces. | +| **cudf_polars_query_id** | UUID4 | A unique identifier for the polars query being executed. All traces logged as part of this query use this ID. | +| **type** | string | The name of the IR node | +| **start** | int | A nanosecond-precision counter indicating when this node started executing | +| **stop** | int | A nanosecond-precision counter indicating when this node finished executing | +| **overhead_duration** | int | The overhead, in nanoseconds, added by tracing | +| `count_frames_{phase}` | int | The number of dataframes for the input / output `phase`. This metric can be disabled by setting `CUDF_POLARS_LOG_TRACES_DATAFRAMES=0`. | +| `frames_{phase}` | `list[dict]` | A list with dictionaries with "shape" and "size" fields, one per input dataframe, for the input / output `phase`. This metric can be disabled by setting `CUDF_POLARS_LOG_TRACES_DATAFRAMES=0`. | +| `total_bytes_{phase}` | int | The sum of the size (in bytes) of the dataframes for the input / output `phase`. This metric can be disabled by setting `CUDF_POLARS_LOG_TRACES_MEMORY=0`. | +| `rmm_current_bytes_{phase}` | int | The current number of bytes allocated by RMM Memory Resource used by cudf-polars for the input / output `phase`. This metric can be disabled by setting `CUDF_POLARS_LOG_TRACES_MEMORY=0`. | +| `rmm_current_count_{phase}` | int | The current number of allocations made by RMM Memory Resource used by cudf-polars for the input / output `phase`. This metric can be disabled by setting `CUDF_POLARS_LOG_TRACES_MEMORY=0`. | +| `rmm_peak_bytes_{phase}` | int | The peak number of bytes allocated by RMM Memory Resource used by cudf-polars for the input / output `phase`. This metric can be disabled by setting `CUDF_POLARS_LOG_TRACES_MEMORY=0`. | +| `rmm_peak_count_{phase}` | int | The peak number of allocations made by RMM Memory Resource used by cudf-polars for the input / output `phase`. This metric can be disabled by setting `CUDF_POLARS_LOG_TRACES_MEMORY=0`. | +| `rmm_total_bytes_{phase}` | int | The total number of bytes allocated by RMM Memory Resource used by cudf-polars for the input / output `phase`. This metric can be disabled by setting `CUDF_POLARS_LOG_TRACES_MEMORY=0`. | +| `rmm_total_count_{phase}` | int | The total number of allocations made by RMM Memory Resource used by cudf-polars for the input / output `phase`. This metric can be disabled by setting `CUDF_POLARS_LOG_TRACES_MEMORY=0`. | +| `nvml_current_bytes_{phase}` | int | The device memory usage of this process, as reported by NVML, for the input / output `phase`. This metric can be disabled by setting `CUDF_POLARS_LOG_TRACES_MEMORY=0`. | +| actor_ir_id | int | A unique identifier for the parent actor (streaming engines only). | + +Setting `CUDF_POLARS_LOG_TRACES=1` enables all the metrics. Depending on the query, the overhead +from collecting the memory or dataframe metrics can be measurable. You can disable some metrics +through additional environment variables. For example, to disable the memory related metrics, set: + +```bash +CUDF_POLARS_LOG_TRACES=1 CUDF_POLARS_LOG_TRACES_MEMORY=0 +``` + +And to disable the memory and dataframe metrics, which essentially leaves just the duration +metrics, set +```bash +CUDF_POLARS_LOG_TRACES=1 CUDF_POLARS_LOG_TRACES_MEMORY=0 CUDF_POLARS_LOG_TRACES_DATAFRAMES=0 +``` + +Note that tracing still needs to be enabled with `CUDF_POLARS_LOG_TRACES=1`. + +The implementation uses [structlog] to build log records. You can configure the output using +structlog's [configuration][structlog-configure] and enrich the records with +[context variables][structlog-context]. + +```python +>>> df = pl.DataFrame({"a": ["a", "a", "b"], "b": [1, 2, 3]}).lazy() +>>> df.group_by("a").agg(pl.col("b").min().alias("min"), pl.col("b").max().alias("max")).collect(engine=pl.GPUEngine(executor="in-memory")) +2025-09-10 07:44:01 [info ] Execute IR count_frames_input=0 count_frames_output=1 ... type=DataFrameScan +2025-09-10 07:44:01 [info ] Execute IR count_frames_input=1 count_frames_output=1 ... type=GroupBy +shape: (2, 3) +┌─────┬─────┬─────┐ +│ a ┆ min ┆ max │ +│ --- ┆ --- ┆ --- │ +│ str ┆ i64 ┆ i64 │ +╞═════╪═════╪═════╡ +│ b ┆ 3 ┆ 3 │ +│ a ┆ 1 ┆ 2 │ +└─────┴─────┴─────┘ +``` + +[nsight]: https://developer.nvidia.com/nsight-systems +[nvtx]: https://nvidia.github.io/NVTX/ +[rapidsmpf-stats]: https://docs.rapids.ai/api/rapidsmpf/nightly/statistics/ +[structlog]: https://www.structlog.org/ +[structlog-configure]: https://www.structlog.org/en/stable/configuration.html +[structlog-context]: https://www.structlog.org/en/stable/contextvars.html diff --git a/docs/cudf/source/cudf_polars/spmd_engine.md b/docs/cudf/source/cudf_polars/spmd_engine.md new file mode 100644 index 00000000000..4fdef37f17b --- /dev/null +++ b/docs/cudf/source/cudf_polars/spmd_engine.md @@ -0,0 +1,211 @@ +(cudf-polars-spmd-engine)= +# SPMD + +{class}`~cudf_polars.engine.spmd.SPMDEngine` runs the streaming executor +in [SPMD][spmd-wiki] mode: the same Python script runs once per GPU, and each process owns its +local data fragment. Collective operations (shuffles, allgathers, joins) coordinate across +processes to produce a globally consistent result. + +On startup, `SPMDEngine` pins the process to the CPU cores and NUMA node closest to its GPU. +Under `rrun` this binding is delegated to the launcher; outside `rrun` (single-process mode) +`SPMDEngine` performs it itself. See +{class}`~cudf_polars.engine.hardware_binding.HardwareBindingPolicy` +to override this behaviour. + +## Single-GPU setup + +To use {class}`~cudf_polars.engine.spmd.SPMDEngine` on a single GPU, +create the engine and run your Python script as normal. You still get the full streaming +executor (partitioned inputs, spilling, scaling past device memory); you just don't need any +multi-process coordination: + +```python +# python my_script.py +import polars as pl +from cudf_polars.engine.spmd import SPMDEngine + +with SPMDEngine() as engine: + result = ( + pl.scan_parquet("/data/dataset/*.parquet") + .filter(pl.col("amount") > 100) + .group_by("customer_id") + .agg(pl.col("amount").sum()) + .collect(engine=engine) + ) +``` + +With a single rank, the [Query symmetry requirement](#query-symmetry-requirement) and +[Collecting distributed results](#collecting-distributed-results) steps below do not apply, +`collect()` returns the full result directly. + +## Multi-GPU with `rrun` + +To run on more than one GPU, the same Python script must be launched collectively, and all +processes must be informed that they are participating in the cluster. This is the role of the +`rrun` launcher: it starts one process per GPU, +{class}`~cudf_polars.engine.spmd.SPMDEngine` detects this and bootstraps +a UCXX communicator across all ranks. + +When the same script is launched without `rrun`, `SPMDEngine` falls back to a single-process, +single-GPU communicator that requires no external communication library. This mode is useful +for local development, unit tests, and single-GPU pipelines (see [Single-GPU setup](#single-gpu-setup) above). + +```python +# multi-GPU launch: rrun -n 4 python my_script.py +# single-GPU: python my_script.py +import polars as pl +from cudf_polars.engine.spmd import SPMDEngine + +with SPMDEngine() as engine: + result = ( + pl.scan_parquet("/data/dataset/*.parquet") + .filter(pl.col("amount") > 100) + .group_by("customer_id") + .agg(pl.col("amount").sum()) + .collect(engine=engine) + ) +``` + +File-based sources (`scan_parquet`, `scan_csv`, …) are automatically partitioned so that each +rank reads a different file or row-group range. In-memory `DataFrame` objects are already +rank-local, so each rank processes its own copy. + +## Configuring `SPMDEngine` + +For custom configuration, build a +{class}`~cudf_polars.engine.options.StreamingOptions` and use +`SPMDEngine.from_options()`: + +```python +import polars as pl +from cudf_polars.engine.options import StreamingOptions +from cudf_polars.engine.spmd import SPMDEngine + +opts = StreamingOptions(num_streaming_threads=8, fallback_mode="silent") + +with SPMDEngine.from_options(opts) as engine: + result = pl.scan_parquet("/data/dataset/*.parquet").collect(engine=engine) +``` + +See {doc}`options` for the available fields. + +{class}`~cudf_polars.engine.spmd.SPMDEngine` exposes a few properties +that are useful in SPMD code: + +* `engine.nranks` / `engine.rank` — cluster size and local rank index. +* `engine.comm` — the active `rapidsmpf.communicator.Communicator`. +* `engine.context` — the active `rapidsmpf.streaming.core.context.Context`. + +## Query symmetry requirement + +All ranks must execute the **same sequence of queries in the same order**. Collective operations +are matched using internal operation IDs; if one rank executes a collective that another rank +does not, the program will deadlock. + +In practice: + +* Avoid rank-conditional `collect()` or `sink*()` calls. +* Avoid branches that change the query graph. +* Keep the driver script deterministic. + +```python +# OK — every rank runs the same query in the same order. +with SPMDEngine() as engine: + result = ( + pl.scan_parquet("/data/*.parquet") + .group_by("customer_id") + .agg(pl.col("amount").sum()) + .collect(engine=engine) + ) +``` + +```python +# DEADLOCKS — rank 0 issues a group_by collective the other ranks never see. +with SPMDEngine() as engine: + df = pl.scan_parquet("/data/*.parquet") + if engine.rank == 0: # don't do this + df = df.group_by("customer_id").agg(pl.col("amount").sum()) + result = df.collect(engine=engine) +``` + +## Collecting distributed results + +Unlike `RayEngine` / `DaskEngine`, where `.collect()` gathers every partition to the driver, +here each rank's `.collect()` returns *its own* fragment. If you want to keep processing the +data rank-by-rank, just use that fragment directly; if you need a single concatenated view, +use the helper below. + +`collect()` returns a rank-local result. Use +{func}`~cudf_polars.engine.spmd.allgather_polars_dataframe` to assemble +the full dataset on every rank: + +```python +from cudf_polars.streaming.collectives.common import reserve_op_id +from cudf_polars.engine.spmd import ( + SPMDEngine, + allgather_polars_dataframe, +) + +with SPMDEngine() as engine: + result = pl.scan_parquet("/data/*.parquet").collect(engine=engine) + + with reserve_op_id() as op_id: + full = allgather_polars_dataframe( + engine=engine, + local_df=result, + op_id=op_id, + ) +``` + +`op_id` identifies the collective across ranks — all ranks must pass the same value. +{func}`~cudf_polars.streaming.collectives.common.reserve_op_id` draws from the same +pool that cudf-polars uses internally for shuffle and join collectives, so there is no risk of +collision. Do not pass hardcoded integers: they may silently collide with an ID reserved by an +active collective inside `collect()`. + +The result is a `pl.DataFrame` containing rows from all ranks in rank order (rank 0 first, then +rank 1, …, rank N-1). + +## Reusing a communicator + +By default {class}`~cudf_polars.engine.spmd.SPMDEngine` bootstraps a new +UCXX communicator on every construction. When running multiple engines in sequence (for example +in a test suite or interactive session), repeated bootstrapping is unnecessary and can race on +the file-based coordination layer shared by all ranks. + +Pass a pre-created communicator via `comm=` to skip the bootstrap entirely. The engine does +**not** close the communicator on shutdown — the caller retains ownership and can reuse it +across multiple {class}`~cudf_polars.engine.spmd.SPMDEngine` lifetimes: + +```python +from rapidsmpf import bootstrap +from rapidsmpf.progress_thread import ProgressThread +from cudf_polars.engine.spmd import SPMDEngine + +# Bootstrap once. +comm = bootstrap.create_ucxx_comm(progress_thread=ProgressThread()) + +# Reuse across multiple engine lifetimes — no re-bootstrap between them. +with SPMDEngine(comm=comm) as engine: + result1 = df1.lazy().collect(engine=engine) + +with SPMDEngine(comm=comm) as engine: + result2 = df2.lazy().collect(engine=engine) +``` + +## Cluster diagnostics + +{meth}`~cudf_polars.engine.spmd.SPMDEngine.gather_cluster_info` returns +placement information for every rank: + +```python +with SPMDEngine() as engine: + if engine.rank == 0: + for i, info in enumerate(engine.gather_cluster_info()): + print( + f"rank {i}: hostname={info['hostname']}, pid={info['pid']}, " + f"CUDA_VISIBLE_DEVICES={info['cuda_visible_devices']}" + ) +``` + +[spmd-wiki]: https://en.wikipedia.org/wiki/Single_program,_multiple_data diff --git a/docs/cudf/source/cudf_polars/streaming_execution.md b/docs/cudf/source/cudf_polars/streaming_execution.md deleted file mode 100644 index b599401a874..00000000000 --- a/docs/cudf/source/cudf_polars/streaming_execution.md +++ /dev/null @@ -1,140 +0,0 @@ -(cudf-polars-streaming)= -# Streaming Execution - -The streaming executors work best when the inputs to your query come -from parquet files. That is, start with `scan_parquet`, not existing -Polars `DataFrame`s or CSV files. - -## Single GPU streaming - -The simplest case, requiring no additional dependencies, is the -`single` cluster option. An appropriate engine is: - -```python -engine = pl.GPUEngine() -``` - -This uses the default single-GPU *cluster* and is equivalent to -`pl.GPUEngine(executor="streaming", executor_options={"cluster": "single"})`, -or simply passing `engine="gpu"` to `.collect()`. - -When executed with this engine, any parquet inputs are split into -"partitions" that are streamed through the query graph. We try to -pick a good default for the typical partition size (based on the -amount of GPU memory available), however, it might not be optimal. You -can configure the execution by providing more options to the executor. -For example, to split input parquet files into 125 MB chunks: - -```python -engine = pl.GPUEngine( - executor="streaming", - executor_options={ - "target_partition_size": 125_000_000 # 125 MB - } -) -``` - -Use the executor option `max_rows_per_partition` to control how in-memory -``DataFrame`` inputs are split into multiple partitions. - -You may find, at the cost of higher memory footprint, that a larger value gives -better performance. - -````{note} -If part of a query does not run in streaming mode, but _does_ run -using the in-memory GPU engine, then we automatically concatenate the -inputs for that operation into a single partition, and effectively -fall back to the in-memory engine. - -The `fallback_mode` option can be used to raise an exception when -this fallback occurs or silence the warning instead: - - - engine = pl.GPUEngine( - executor="streaming", - executor_options={ - "fallback_mode": "raise", - } - ) -```` - -## Multi GPU streaming - -```{note} -The distributed cluster is considered experimental and might change without warning. -``` - -Streaming utilising multiple GPUs simultaneously is supported by -setting the `"cluster"` to `"distributed"`: -```python -engine = pl.GPUEngine( - executor="streaming", - executor_options={"cluster": "distributed"}, -) -``` - -Unlike the single GPU executor, this does require a number of -additional dependencies. We currently require -[Dask](https://www.dask.org/) and -[Dask-CUDA](https://docs.rapids.ai/api/dask-cuda/nightly/) to be -installed. In addition, we recommend that Dask Distributed plugin of -[UCXX](https://github.com/rapidsai/ucxx) and -[RapidsMPF](https://github.com/rapidsai/rapidsmpf) are installed to -take advantage of any high-performance networking. - -To quickly install all of these dependencies into a conda environment, -you can run: - -``` -conda install -c rapidsai -c conda-forge \ - cudf-polars rapidsmpf dask-cuda distributed-ucxx -``` - - -````{note} -Identically to the single-GPU streaming case, if part of a query does -not support execution with multiple partitions, but is supported by -the in-memory GPU engine, we concatenate the inputs and execute using -a single partition. -```` - -The multi-GPU engine uses the currently active Dask client to carry -out the partitioned execution, so for multi-GPU we would use something -like - -```python -from dask_cuda import LocalCUDACluster - -... - -client = LocalCUDACluster(...).get_client() - -q = ... -engine = pl.GPUEngine( - executor="streaming", - executor_options={"cluster": "distributed"}, -) -result = q.collect(engine=engine) -``` - -````{warning} -If you request a `"distributed"` cluster but do not have a cluster -deployed, `collect`ing the query will fail. -```` - -### Streaming sink operations - -When the `"distributed"` cluster option is active, sink operations like -`df.sink_parquet("my_path")` will always produce a directory containing -one or more files. It is not currently possible to disable this behavior. - -When the `"single"` cluster option is active, sink operations will -generate a single file by default. However, you may opt into the -distributed sink behavior by adding `{"sink_to_directory": True}` -to your `executor_options` dictionary. - -## Get Started - -The experimental streaming GPU executor is now available. For a quick -walkthrough of a multi-GPU example workflow and performance on a real dataset, -check out the [multi-GPU Polars demo](https://github.com/rapidsai-community/showcase/blob/main/accelerated_data_processing_examples/multi_gpu_polars_demo.ipynb). diff --git a/docs/cudf/source/cudf_polars/usage.md b/docs/cudf/source/cudf_polars/usage.md index 934cce9a9f0..12abf563325 100644 --- a/docs/cudf/source/cudf_polars/usage.md +++ b/docs/cudf/source/cudf_polars/usage.md @@ -1,230 +1,171 @@ +(cudf-polars-usage)= # Usage -`cudf-polars` enables GPU acceleration for Polars' LazyFrame API by executing logical plans with cuDF and pylibcudf. It requires minimal code changes and works by specifying a GPU engine during execution. +`cudf-polars` runs your Polars `LazyFrame` queries on GPU. You select GPU execution by passing +an `engine=` argument to `.collect()` or `.sink_*()`. See {doc}`engines` for the conceptual +picture; this page walks through running your first query. -For a high-level overview of GPU support in Polars, see the [Polars GPU support guide](https://docs.pola.rs/user-guide/gpu-support/). +We always recommend constructing an engine object and using them in a context manager to ensure proper resource cleanup. The engine constructor is +where you specify {class}`~cudf_polars.engine.options.StreamingOptions` +such as `spill_to_pinned_memory` or `fallback_mode`. Ray is the showcased example below; see also +{doc}`other_engines`. -## Getting Started - -Use `cudf-polars` by calling `.collect(engine="gpu")` or `.sink_(engine="gpu")` on a LazyFrame: +## Your first GPU query ```python import polars as pl +from cudf_polars.engine.ray import RayEngine + +query = ( + pl.scan_parquet("/data/dataset/*.parquet") + .filter(pl.col("amount") > 100) + .group_by("customer_id") + .agg(pl.col("amount").sum()) +) + +with RayEngine() as engine: + result = query.collect(engine=engine) +print(result) +``` + +{class}`~cudf_polars.engine.ray.RayEngine` with no arguments uses every +GPU visible to the process, so the example above runs on one GPU if that's all that's available +and scales automatically to every GPU on the node otherwise. It also attaches to an existing +Ray cluster if one is already running (see [Attaching to an existing Raycluster](#attaching-to-an-existing-ray-cluster)). + +```{note} +The examples on this page use {class}`~cudf_polars.engine.ray.RayEngine`. `cudf-polars` supports +multiple engines for GPU execution. See {doc}`other_engines` for alternatives, or {doc}`engines` for a conceptual overview of when to pick which. +``` -q = pl.scan_parquet("ny-taxi/2024/*.parquet").filter(pl.col("total_amount") > 15.0) -result = q.collect(engine="gpu") +```{note} +`.collect()` pulls the full result back to the driver process. For large distributed outputs, +prefer `.sink_*()` or aggregate/sample inside the query before `.collect()`. See +[Result collection](engines.md#result-collection). ``` -Alternatively, you can create a `GPUEngine` instance with custom configuration: +## Configuring `RayEngine` + +The same `from_options()` / `StreamingOptions` pattern shown here works for every streaming +engine — see {doc}`other_engines` for the DaskEngine and SPMDEngine variants. + +{class}`~cudf_polars.engine.ray.RayEngine` with no arguments starts a +local [Ray][ray-docs] cluster and creates one GPU worker per visible GPU. + +For custom configuration, build a +{class}`~cudf_polars.engine.options.StreamingOptions` and use +`RayEngine.from_options()`: ```python import polars as pl +from cudf_polars.engine.options import StreamingOptions +from cudf_polars.engine.ray import RayEngine -engine = pl.GPUEngine(raise_on_fail=True) +opts = StreamingOptions(num_streaming_threads=8, fallback_mode="silent") -q = pl.scan_parquet("ny-taxi/2024/*.parquet").filter(pl.col("total_amount") > 15.0) -result = q.collect(engine=engine) +with RayEngine.from_options(opts) as engine: + result = pl.scan_parquet("/data/dataset/*.parquet").collect(engine=engine) ``` -With `raise_on_fail=True`, the query will raise an exception if it cannot be run on the GPU instead of transparently falling back to polars CPU. See more [engine options](engine_options.md). +See {doc}`options` for the available fields. -## GPU Profiling - -The `streaming` executor does not support profiling query execution through the `LazyFrame.profile` method. With the default `synchronous` scheduler for the `streaming` executor, we recommend using [NVIDIA NSight Systems](https://developer.nvidia.com/nsight-systems) to profile your queries. -cudf-polars includes [nvtx](https://nvidia.github.io/NVTX/) annotations to help you understand where time is being spent. +```{note} +`RayEngine` is an object you create and pass to `.collect(engine=engine)`. Prefer the +context-manager form so the Ray cluster and GPU workers are torn down automatically. +``` -With the `distributed` scheduler for the `streaming` executor, we recommend using Dask's [built-in diagnostics](https://docs.dask.org/en/stable/diagnostics-distributed.html). +## Attaching to an existing Ray cluster -Finally, the `"in-memory"` *does* support [`LazyFrame.profile`](https://docs.pola.rs/api/python/stable/reference/lazyframe/api/polars.LazyFrame.profile.html). +For multi-node runs, start a Ray cluster separately (for example with `ray start` on each +node) and attach to it from your driver script. When Ray is already initialized, +{class}`~cudf_polars.engine.ray.RayEngine` connects to the running +cluster and leaves it untouched on exit: ```python +import ray import polars as pl -q = pl.scan_parquet("ny-taxi/2024/*.parquet").filter(pl.col("total_amount") > 15.0) -profile = q.profile(engine=pl.GPUEngine(executor="in-memory")) +from cudf_polars.engine.ray import RayEngine + +ray.init(address="auto") # attach to a running cluster +with RayEngine() as engine: + result = ( + pl.scan_parquet("s3://bucket/*.parquet") + .group_by("customer_id") + .agg(pl.col("amount").sum()) + .collect(engine=engine) + ) ``` -The result is a tuple containing 2 materialized DataFrames - the first with the query result and the second with profiling information of each node that is executed. -```python -print(profile[0]) -``` -``` -shape: (32_439_327, 19) -┌──────────┬──────────────────────┬───────────────────────┬─────────────────┬───┬───────────────────────┬──────────────┬──────────────────────┬─────────────┐ -│ VendorID ┆ tpep_pickup_datetime ┆ tpep_dropoff_datetime ┆ passenger_count ┆ … ┆ improvement_surcharge ┆ total_amount ┆ congestion_surcharge ┆ Airport_fee │ -│ --- ┆ --- ┆ --- ┆ --- ┆ ┆ --- ┆ --- ┆ --- ┆ --- │ -│ i32 ┆ datetime[μs] ┆ datetime[μs] ┆ i64 ┆ ┆ f64 ┆ f64 ┆ f64 ┆ f64 │ -╞══════════╪══════════════════════╪═══════════════════════╪═════════════════╪═══╪═══════════════════════╪══════════════╪══════════════════════╪═════════════╡ -│ 2 ┆ 2024-01-01 00:57:55 ┆ 2024-01-01 01:17:43 ┆ 1 ┆ … ┆ 1.0 ┆ 22.7 ┆ 2.5 ┆ 0.0 │ -│ 1 ┆ 2024-01-01 00:03:00 ┆ 2024-01-01 00:09:36 ┆ 1 ┆ … ┆ 1.0 ┆ 18.75 ┆ 2.5 ┆ 0.0 │ -│ 1 ┆ 2024-01-01 00:17:06 ┆ 2024-01-01 00:35:01 ┆ 1 ┆ … ┆ 1.0 ┆ 31.3 ┆ 2.5 ┆ 0.0 │ -│ 1 ┆ 2024-01-01 00:36:38 ┆ 2024-01-01 00:44:56 ┆ 1 ┆ … ┆ 1.0 ┆ 17.0 ┆ 2.5 ┆ 0.0 │ -│ 1 ┆ 2024-01-01 00:46:51 ┆ 2024-01-01 00:52:57 ┆ 1 ┆ … ┆ 1.0 ┆ 16.1 ┆ 2.5 ┆ 0.0 │ -│ … ┆ … ┆ … ┆ … ┆ … ┆ … ┆ … ┆ … ┆ … │ -│ 2 ┆ 2024-12-31 23:05:43 ┆ 2024-12-31 23:18:15 ┆ null ┆ … ┆ 1.0 ┆ 24.67 ┆ null ┆ null │ -│ 2 ┆ 2024-12-31 23:02:00 ┆ 2024-12-31 23:22:14 ┆ null ┆ … ┆ 1.0 ┆ 15.25 ┆ null ┆ null │ -│ 2 ┆ 2024-12-31 23:17:15 ┆ 2024-12-31 23:17:34 ┆ null ┆ … ┆ 1.0 ┆ 24.46 ┆ null ┆ null │ -│ 1 ┆ 2024-12-31 23:14:53 ┆ 2024-12-31 23:35:13 ┆ null ┆ … ┆ 1.0 ┆ 32.88 ┆ null ┆ null │ -│ 1 ┆ 2024-12-31 23:15:33 ┆ 2024-12-31 23:36:29 ┆ null ┆ … ┆ 1.0 ┆ 28.57 ┆ null ┆ null │ -└──────────┴──────────────────────┴───────────────────────┴─────────────────┴───┴───────────────────────┴──────────────┴──────────────────────┴─────────────┘ -``` +{class}`~cudf_polars.engine.ray.RayEngine` creates one rank per GPU in +the Ray cluster and bootstraps a UCXX communicator across them. It raises `RuntimeError` if +created inside an `rrun` cluster or if no GPUs are available. -```python -print(profile[1]) -``` -``` -shape: (3, 3) -┌────────────────────┬───────┬────────┐ -│ node ┆ start ┆ end │ -│ --- ┆ --- ┆ --- │ -│ str ┆ u64 ┆ u64 │ -╞════════════════════╪═══════╪════════╡ -│ optimization ┆ 0 ┆ 416 │ -│ gpu-ir-translation ┆ 416 ┆ 741 │ -│ Scan ┆ 813 ┆ 233993 │ -└────────────────────┴───────┴────────┘ -``` +## Manual Engine Lifetime Control -## Tracing - -cudf-polars can optionally trace execution of each node in the query plan. -To enable tracing, set the environment variable ``CUDF_POLARS_LOG_TRACES`` to a -true value ("1", "true", "y", "yes") before starting your process. - -cudf-polars logs traces at three scopes (levels): - -1. `plan`: These generally happen once per query. This will include things - like the (serialized) query plan. -2. `actor`: (rapidsmpf runtime only). There will be roughly one `actor` - trace per node in the logical plan. -3. `evaluate_ir_node`: Logs the evaluation of a physical node in the query plan. - Note that one logical node might expand to more than one physical nodes. - -Each trace includes a `scope` key indicating which level that trace belongs to. -`actor`-scoped nodes will be nested under a `plan`-scoped node. When using the -rapidsmpf runtime, `evaluate_ir_node`-scoped nodes will -be nested under an `actor`-scoped node. - -### Schemas - -The different scopes have different schemas. Fields in **bold** are required / always present. - -#### scope=plan - -| Field Name | Type | Description | -| ---------- | ----- | ----------- | -| **scope** | Literal["plan"] | The string literal `"plan"`. Useful for distinguishing from other types of traces. | -| **cudf_polars_query_id** | UUID4 | A unique identifier for the polars query being executed. All traces logged as part of this query use this ID. | -| **plan** | `PlanObject` | A serialized representation of the query plan. See #TODO below | -| **event** | String | A message like "Query Plan" | - -#### scope=actor - -`actor`-scoped traces will only appear with the rapidsmpf runtime. - -| Field Name | Type | Description | -| ---------- | ----- | ----------- | -| **scope** | Literal["actor"] | The string literal `"actor"`. Useful for distinguishing from other types of traces. | -| **cudf_polars_query_id** | UUID4 | A unique identifier for the polars query being executed. All traces logged as part of this query use this ID. | -| **start** | int | A nanosecond-resolution counter indicating when the actor started. Note: actors generally start early in the query and suspend waiting for data. | -| **stop** | int | A nanosecond-resolution counter indicating when the actor completed. | -| **event** | String | A message like "Streaming Actor". | -| **actor_ir_type** | String | The type of the actor, like `"Scan"`. | -| **actor_ir_id** | int | A unique identifier for the actor. All traces logged under this actor will include this value. | -| chunk_count | int | A counter for how many table chunks have been processed by this actor at the time of logging. | -| duplicated | bool | Whether the output rows are duplicated across ranks (e.g. after an allgather). | -| row_count | int | Total row count produced by this node during execution. | - -#### scope=evaluate_ir_node - -| Field Name | Type | Description | -| ---------- | ----- | ----------- | -| **scope** | `Literal["evaluate_ir_node"]` | The string literal `"evaluate_ir_node"`. Useful for distinguishing from other types of traces. | -| **cudf_polars_query_id** | UUID4 | A unique identifier for the polars query being executed. All traces logged as part of this query use this ID. | -| **type** | string | The name of the IR node | -| **start** | int | A nanosecond-precision counter indicating when this node started executing | -| **stop** | int | A nanosecond-precision counter indicating when this node finished executing | -| **overhead_duration** | int | The overhead, in nanoseconds, added by tracing | -| `count_frames_{phase}` | int | The number of dataframes for the input / output `phase`. This metric can be disabled by setting `CUDF_POLARS_LOG_TRACES_DATAFRAMES=0`. | -| `frames_{phase}` | `list[dict]` | A list with dictionaries with "shape" and "size" fields, one per input dataframe, for the input / output `phase`. This metric can be disabled by setting `CUDF_POLARS_LOG_TRACES_DATAFRAMES=0`. | -| `total_bytes_{phase}` | int | The sum of the size (in bytes) of the dataframes for the input / output `phase`. This metric can be disabled by setting `CUDF_POLARS_LOG_TRACES_MEMORY=0`. | -| `rmm_current_bytes_{phase}` | int | The current number of bytes allocated by RMM Memory Resource used by cudf-polars for the input / output `phase`. This metric can be disabled by setting `CUDF_POLARS_LOG_TRACES_MEMORY=0`. | -| `rmm_current_count_{phase}` | int | The current number of allocations made by RMM Memory Resource used by cudf-polars for the input / output `phase`. This metric can be disabled by setting `CUDF_POLARS_LOG_TRACES_MEMORY=0`. | -| `rmm_peak_bytes_{phase}` | int | The peak number of bytes allocated by RMM Memory Resource used by cudf-polars for the input / output `phase`. This metric can be disabled by setting `CUDF_POLARS_LOG_TRACES_MEMORY=0`. | -| `rmm_peak_count_{phase}` | int | The peak number of allocations made by RMM Memory Resource used by cudf-polars for the input / output `phase`. This metric can be disabled by setting `CUDF_POLARS_LOG_TRACES_MEMORY=0`. | -| `rmm_total_bytes_{phase}` | int | The total number of bytes allocated by RMM Memory Resource used by cudf-polars for the input / output `phase`. This metric can be disabled by setting `CUDF_POLARS_LOG_TRACES_MEMORY=0`. | -| `rmm_total_count_{phase}` | int | The total number of allocations made by RMM Memory Resource used by cudf-polars for the input / output `phase`. This metric can be disabled by setting `CUDF_POLARS_LOG_TRACES_MEMORY=0`. | -| `nvml_current_bytes_{phase}` | int | The device memory usage of this process, as reported by NVML, for the input / output `phase`. This metric can be disabled by setting `CUDF_POLARS_LOG_TRACES_MEMORY=0`. | -| actor_ir_id | int | A unique identifier for the parent actor (rapidsmpf runtime only). | - -Setting `CUDF_POLARS_LOG_TRACES=1` enables all the metrics. Depending on the query, the overhead -from collecting the memory or dataframe metrics can be measurable. You can disable some metrics -through additional environment variables. For example, do disable the memory related metrics, set: +When you need to control the engine lifetime explicitly. For example, in a Jupyter notebook +where a `with` block cannot span multiple cells, construct `RayEngine` once and reuse it, +then call `engine.shutdown()` when you are done: -``` -CUDF_POLARS_LOG_TRACES=1 CUDF_POLARS_LOG_TRACES_MEMORY=0 -``` +```python +# Cell 1: start the engine +from cudf_polars.engine.ray import RayEngine -And to disable the memory and dataframe metrics, which essentially leaves just -the duration metrics, set -``` -CUDF_POLARS_LOG_TRACES=1 CUDF_POLARS_LOG_TRACES_MEMORY=0 CUDF_POLARS_LOG_TRACES_DATAFRAMES=0 +engine = RayEngine() ``` -Note that tracing still needs to be enabled with `CUDF_POLARS_LOG_TRACES=1`. - -The implementation uses [structlog] to build log records. You can configure the -output using structlog's [configuration][structlog-configure] and enrich the -records with [context variables][structlog-context]. +```python +# Cell 2: run a query +import polars as pl -``` ->>> df = pl.DataFrame({"a": ["a", "a", "b"], "b": [1, 2, 3]}).lazy() ->>> df.group_by("a").agg(pl.col("b").min().alias("min"), pl.col("b").max().alias("max")).collect(engine="gpu") -2025-09-10 07:44:01 [info ] Execute IR count_frames_input=0 count_frames_output=1 ... type=DataFrameScan -2025-09-10 07:44:01 [info ] Execute IR count_frames_input=1 count_frames_output=1 ... type=GroupBy -shape: (2, 3) -┌─────┬─────┬─────┐ -│ a ┆ min ┆ max │ -│ --- ┆ --- ┆ --- │ -│ str ┆ i64 ┆ i64 │ -╞═════╪═════╪═════╡ -│ b ┆ 3 ┆ 3 │ -│ a ┆ 1 ┆ 2 │ -└─────┴─────┴─────┘ +result = ( + pl.scan_parquet("/data/*.parquet") + .group_by("customer_id") + .agg(pl.col("amount").sum()) + .collect(engine=engine) +) +result ``` -### Serialized Query Plan +```python +# Cell 3: run another query reusing the same engine +other = pl.scan_parquet("/data/other/*.parquet").collect(engine=engine) +``` -The query plan is serialized with the following schema: +```python +# Final cell: tear everything down +engine.shutdown() +``` -| Field Name | Type | Description | -| ---------- | ---- | ----------- | -| roots | `list` | A list of string node IDs for the root nodes | -| nodes | `Mapping` | A mapping from string node ID to the Node | -| partition_info | `Mapping` | A mapping from string node ID to the Node | +`engine.shutdown()` stops the GPU worker processes (rank actors) and, if the engine started Ray itself, +also calls `ray.shutdown()`. It is idempotent, so calling it twice is safe. -`nodes` and `partition_info` are flat: they contain every node in the query plan. +## Sink behavior -`IRNode` objects have the following schema: +When a streaming engine is used, sink operations such as `df.sink_parquet("my_path")` always produce +a directory containing one or more files. It is not currently possible to disable this behavior, and +setting `sink_to_directory=False` raises a `ValueError`. -| Field Name | Type | Description | -| ---------- | ---- | ----------- | -| id | `str` | The string node ID. This is unique within the query plan | -| children | `list` | The node IDs of this node's children nodes. Each child node ID is also available in the plan's `nodes` field. | -| schema | `Mapping` | A mapping from column name to (string) data type identifier. | -| properties | `Mapping` | Additional properties, unique to each node type. | -| type | `str` | The name of the IR node. | +The in-memory engine, by contrast, follows standard Polars semantics and writes to a single file at +the specified path. -`PartitionInfo` objects have the following schema: +## Cluster diagnostics -| Field Name | Type | Description | -| ---------- | ---- | ----------- | -| count | int | The number of partitions for this node | -| partitioned_on | `list[str]` | The columns this node is partitioned on. | +{meth}`~cudf_polars.engine.ray.RayEngine.gather_cluster_info` returns +a list of {class}`~cudf_polars.engine.core.ClusterInfo` — one per rank +actor — with fields `hostname`, `pid`, `cuda_visible_devices`, and `gpu_uuid`: +```python +with RayEngine() as engine: + print(f"cluster has {engine.nranks} ranks") + for i, info in enumerate(engine.gather_cluster_info()): + print( + f"rank {i}: hostname={info.hostname}, pid={info.pid}, " + f"cuda_visible_devices={info.cuda_visible_devices}, " + f"gpu_uuid={info.gpu_uuid}" + ) +# rank 0: hostname=node-0, pid=12345, cuda_visible_devices=0, gpu_uuid=GPU-abc123... +# rank 1: hostname=node-0, pid=12346, cuda_visible_devices=1, gpu_uuid=GPU-def456... +``` -[nvml]: https://developer.nvidia.com/management-library-nvml -[rmm-stats]: https://docs.rapids.ai/api/rmm/stable/guide/#memory-statistics-and-profiling -[structlog]: https://www.structlog.org/ -[structlog-configure]: https://www.structlog.org/en/stable/configuration.html -[structlog-context]: https://www.structlog.org/en/stable/contextvars.html +[ray-docs]: https://docs.ray.io/ diff --git a/python/cudf_polars/cudf_polars/engine/dask.py b/python/cudf_polars/cudf_polars/engine/dask.py index 509980fc2c2..fcb0b9ce280 100644 --- a/python/cudf_polars/cudf_polars/engine/dask.py +++ b/python/cudf_polars/cudf_polars/engine/dask.py @@ -761,7 +761,7 @@ def from_options( dask_client: distributed.Client | None = None, ) -> DaskEngine: """ - Create a :class:`DaskEngine` from a :class:`StreamingOptions` object. + Create a :class:`DaskEngine` from a :class:`~cudf_polars.engine.options.StreamingOptions` object. This is the recommended way to construct a ``DaskEngine`` for typical use. All RapidsMPF, executor, and engine options are read from @@ -806,7 +806,7 @@ def gather_cluster_info(self) -> list[ClusterInfo]: Returns ------- - List of :class:`ClusterInfo`, one per rank. + List of :class:`~cudf_polars.engine.core.ClusterInfo`, one per rank. """ return list(self._dask_ctx.client.run(ClusterInfo.local).values()) diff --git a/python/cudf_polars/cudf_polars/engine/default_singleton_engine.py b/python/cudf_polars/cudf_polars/engine/default_singleton_engine.py index 73f08730df6..fe3f0369810 100644 --- a/python/cudf_polars/cudf_polars/engine/default_singleton_engine.py +++ b/python/cudf_polars/cudf_polars/engine/default_singleton_engine.py @@ -1,6 +1,6 @@ # SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. # SPDX-License-Identifier: Apache-2.0 -"""Single-GPU, single-instance specialization of :class:`SPMDEngine`.""" +"""Single-GPU, single-instance specialization of :class:`~cudf_polars.engine.spmd.SPMDEngine`.""" from __future__ import annotations @@ -221,7 +221,7 @@ def check_no_live_default_singleton(self_engine: Any) -> None: class DefaultSingletonEngine(SPMDEngine): """ - Process-wide single-GPU singleton specialization of :class:`SPMDEngine`. + Process-wide single-GPU singleton specialization of :class:`~cudf_polars.engine.spmd.SPMDEngine`. At most one live instance exists per process. Use :meth:`get_or_create` to obtain it and :meth:`shutdown` to tear it down. @@ -230,7 +230,8 @@ class DefaultSingletonEngine(SPMDEngine): executor, and engine settings from the environment. Users needing custom configuration should construct an engine explicitly. - See :class:`RayEngine`, :class:`DaskEngine`, and :class:`SPMDEngine`. + See :class:`~cudf_polars.engine.ray.RayEngine`, :class:`~cudf_polars.engine.dask.DaskEngine`, + and :class:`~cudf_polars.engine.spmd.SPMDEngine`. Examples -------- @@ -275,7 +276,7 @@ def get_or_create(cls) -> DefaultSingletonEngine: Raises ------ RuntimeError - If any other :class:`StreamingEngine` is currently alive. + If any other :class:`~cudf_polars.engine.core.StreamingEngine` is currently alive. """ with _state.lock: if _state.instance is not None: @@ -292,7 +293,7 @@ def shutdown() -> None: Submits teardown to the dedicated worker thread, the same thread that constructed the rapidsmpf ``Context``, and waits up to - :data:`SHUTDOWN_TIMEOUT_SECONDS`. + ``SHUTDOWN_TIMEOUT_SECONDS`` seconds. """ with _state.lock: instance = _state.instance diff --git a/python/cudf_polars/cudf_polars/engine/ray.py b/python/cudf_polars/cudf_polars/engine/ray.py index 532e863bdf7..25b5206b514 100644 --- a/python/cudf_polars/cudf_polars/engine/ray.py +++ b/python/cudf_polars/cudf_polars/engine/ray.py @@ -470,15 +470,15 @@ class RayEngine(StreamingEngine): Hardware binding is disabled implicitly but the caller must pass ``engine_options={"allow_gpu_sharing": True}`` explicitly to acknowledge the multi-tenant GPU semantics. - .. note:: - Oversubscription does not increase throughput. When multiple - ranks share a GPU, they compete for the same compute and - memory resources, which may increase memory pressure and - reduce overall performance. This option is primarily useful - for testing multi-rank code paths on machines with fewer - GPUs than ranks, and for downstream projects that need to - validate distributed logic in resource-constrained CI - environments. + + Note, oversubscription does not increase throughput. When multiple + ranks share a GPU, they compete for the same compute and + memory resources, which may increase memory pressure and + reduce overall performance. This option is primarily useful + for testing multi-rank code paths on machines with fewer + GPUs than ranks, and for downstream projects that need to + validate distributed logic in resource-constrained CI + environments. Raises ------ @@ -671,7 +671,7 @@ def from_options( ray_init_options: dict[str, object] | None = None, ) -> RayEngine: """ - Create a :class:`RayEngine` from a :class:`StreamingOptions` object. + Create a :class:`RayEngine` from a :class:`~cudf_polars.engine.options.StreamingOptions` object. This is the recommended way to construct a ``RayEngine`` for typical use. All RapidsMPF, executor, and engine options are read from @@ -725,7 +725,7 @@ def gather_cluster_info(self) -> list[ClusterInfo]: Returns ------- - List of :class:`ClusterInfo`, one per rank. + List of :class:`~cudf_polars.engine.core.ClusterInfo`, one per rank. """ return ray.get([rank.get_info.remote() for rank in self.rank_actors]) diff --git a/python/cudf_polars/cudf_polars/engine/spmd.py b/python/cudf_polars/cudf_polars/engine/spmd.py index 10aae6ab1b3..efe66a01805 100644 --- a/python/cudf_polars/cudf_polars/engine/spmd.py +++ b/python/cudf_polars/cudf_polars/engine/spmd.py @@ -141,7 +141,7 @@ def allgather_polars_dataframe( Rank-local DataFrame to contribute. op_id Operation ID for this AllGather collective. Must be identical on every - rank. For example, use :func:`reserve_op_id` to obtain a collision-free + rank. For example, use :func:`~cudf_polars.streaming.collectives.common.reserve_op_id` to obtain a collision-free ID from the same pool used internally by cudf-polars. Avoid passing hardcoded integers. @@ -309,7 +309,7 @@ class SPMDEngine(StreamingEngine): time, before RMM and communicator initialisation, so that CPU affinity, NUMA memory policy, and ``UCX_NET_DEVICES`` are set as early as possible. By default, binding is skipped under ``rrun`` (which already performs its own binding), - see :attr:`HardwareBindingPolicy.skip_under_rrun`. + see ``HardwareBindingPolicy.skip_under_rrun``. Examples -------- @@ -425,7 +425,7 @@ def _cleanup_ctx(self) -> None: @classmethod def from_options(cls, options: StreamingOptions) -> SPMDEngine: """ - Create an :class:`SPMDEngine` from a :class:`StreamingOptions` object. + Create an :class:`SPMDEngine` from a :class:`~cudf_polars.engine.options.StreamingOptions` object. This is the recommended way to construct an ``SPMDEngine`` for typical use. All RapidsMPF, executor, and engine options are read from @@ -590,7 +590,7 @@ def gather_cluster_info(self) -> list[ClusterInfo]: Returns ------- - List of :class:`ClusterInfo`, one per rank. + List of :class:`~cudf_polars.engine.core.ClusterInfo`, one per rank. """ data = json.dumps(dataclasses.asdict(ClusterInfo.local())).encode() with reserve_op_id() as op_id: From 75cfe03afed6f1ffdaa0cab1be7969c40ac5d0b9 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Tue, 19 May 2026 18:40:46 +0200 Subject: [PATCH 2/5] reviews --- docs/cudf/source/cudf_polars/dask_engine.md | 13 ++++---- .../cudf_polars/default_singleton_engine.md | 4 +-- docs/cudf/source/cudf_polars/engines.md | 30 +++++++++---------- docs/cudf/source/cudf_polars/options.md | 20 +++++++------ docs/cudf/source/cudf_polars/spmd_engine.md | 20 ++++++------- docs/cudf/source/cudf_polars/usage.md | 19 ++++++------ 6 files changed, 53 insertions(+), 53 deletions(-) diff --git a/docs/cudf/source/cudf_polars/dask_engine.md b/docs/cudf/source/cudf_polars/dask_engine.md index 87aab8f64e3..074508abbde 100644 --- a/docs/cudf/source/cudf_polars/dask_engine.md +++ b/docs/cudf/source/cudf_polars/dask_engine.md @@ -28,7 +28,7 @@ With no arguments, {class}`~cudf_polars.engine.dask.DaskEngine` creates a bootstraps a UCXX communicator across all workers. On exit, everything it created is torn down. ```{note} -`.collect()` pulls the full result back to the driver process. For large distributed outputs, +`.collect()` pulls the full result back to the client process. For large distributed outputs, prefer `.sink_*()` or aggregate/sample inside the query before `.collect()`. See [Result collection](engines.md#result-collection). ``` @@ -103,7 +103,7 @@ preload to assign one GPU per worker. The preload sets `CUDA_VISIBLE_DEVICES` on before the process spawns: ```bash -# On each node — launch one worker per GPU with a single thread each: +# On each node, launch one worker per GPU with a single thread each: dask worker SCHEDULER_ADDRESS:8786 --nworkers N --nthreads 1 \ --preload-nanny cudf_polars.engine.dask ``` @@ -137,10 +137,9 @@ configures UCX. `DaskEngine` sets up the same things for its own streaming runtime, so the two need to be coordinated or they will fight: -* **CPU affinity is unconditional in `dask-cuda-worker`** — the `CPUAffinity` plugin is always - installed and there is no CLI flag to turn it off. Pass - `hardware_binding=HardwareBindingPolicy(enabled=False)` to `DaskEngine` so it does not try to - re-pin affinity on top of dask-cuda's binding. +* **CPU affinity is unconditional in `dask-cuda-worker`**, the `CPUAffinity` plugin is always + installed and there is no CLI flag to turn it off. Pass `hardware_binding=HardwareBindingPolicy(enabled=False)` + to `DaskEngine` so it does not try to re-pin affinity on top of dask-cuda's binding. * **Do not pass `--rmm-pool-size`, `--rmm-managed-memory`, or similar RMM flags** to `dask-cuda-worker`. Let `DaskEngine` own the memory resource via its `memory_resource_config` (see {doc}`options`); otherwise two different memory resources will be installed on the same @@ -151,7 +150,7 @@ coordinated or they will fight: configuration across the cluster. ```bash -# On each node — GPU assignment + CPU affinity only (no RMM, no UCX flags): +# On each node, GPU assignment + CPU affinity only (no RMM, no UCX flags): dask-cuda-worker SCHEDULER_ADDRESS:8786 ``` diff --git a/docs/cudf/source/cudf_polars/default_singleton_engine.md b/docs/cudf/source/cudf_polars/default_singleton_engine.md index 5b8148c7ae9..6ec4f2b87de 100644 --- a/docs/cudf/source/cudf_polars/default_singleton_engine.md +++ b/docs/cudf/source/cudf_polars/default_singleton_engine.md @@ -1,7 +1,7 @@ (cudf-polars-default-singleton-engine)= # Default `engine="gpu"` -`.collect(engine="gpu")` (and `engine=pl.GPUEngine()`) is the API users invoke when they don't +`.collect(engine="gpu")` (and `engine=pl.GPUEngine()`) is the API you invoke when you don't construct a streaming engine explicitly. It runs the same streaming executor as the explicit engines (Ray, Dask, SPMD), conceptually similar to [Polars' own streaming engine](https://docs.pola.rs/user-guide/concepts/streaming/) but on the @@ -26,7 +26,7 @@ explicit engine. See {doc}`usage` and {doc}`options`. ## What you get without an explicit engine -When the user just writes: +When you just write: ```python import polars as pl diff --git a/docs/cudf/source/cudf_polars/engines.md b/docs/cudf/source/cudf_polars/engines.md index 0434ebcf192..e833a71ea1b 100644 --- a/docs/cudf/source/cudf_polars/engines.md +++ b/docs/cudf/source/cudf_polars/engines.md @@ -43,12 +43,12 @@ details. ## Cluster backends -| Engine | Cluster model | Extra runtime dependency | Typical use | -| --------------------------------------------------------------------- | ------------------------------------------------------------------- | ------------------------ | --------------------------------------------------------------------------------- | -| {class}`~cudf_polars.engine.ray.RayEngine` | Single-client driver; one Ray actor per GPU | [Ray][ray-docs] | Works from a laptop to a cloud cluster. No separate cluster setup needed. | -| {class}`~cudf_polars.engine.dask.DaskEngine` | Single-client driver; one Dask worker per GPU | [Dask distributed][dask] | Teams with an existing Dask deployment or a preferred Dask launcher. | -| {class}`~cudf_polars.engine.spmd.SPMDEngine` | Same script runs once per GPU, joined by a communicator | UCXX (under `rrun`) | HPC / SPMD launchers such as `rrun`. Single-rank mode needs no cluster at all. | -| [`engine="gpu"`](default_singleton_engine.md) | Implicit process-wide singleton on one GPU; no cluster | None | Default when no engine is constructed. Short scripts and notebooks. No options. | +| Engine | Cluster model | Extra runtime dependency | Typical use | +| --------------------------------------------- | --------------------------------------------------------| ------------------------ | ------------------------------------------------------------------------------- | +| {class}`~cudf_polars.engine.ray.RayEngine` | Single client; one Ray actor per GPU | [Ray][ray-docs] | Works from a laptop to a cloud cluster. No separate cluster setup needed. | +| {class}`~cudf_polars.engine.dask.DaskEngine` | Single client; one Dask worker per GPU | [Dask distributed][dask] | Teams with an existing Dask deployment or a preferred Dask launcher. | +| {class}`~cudf_polars.engine.spmd.SPMDEngine` | Same script runs once per GPU, joined by a communicator | UCXX (under `rrun`) | HPC / SPMD launchers such as `rrun`. Single-rank mode needs no cluster at all. | +| [`engine="gpu"`](default_singleton_engine.md) | Implicit process-wide singleton on one GPU; no cluster | None | Default when no engine is constructed. Short scripts and notebooks. No options. | All four approaches use the same execution model under the hood, so which to select depends on your preferred deployment method, not performance tradeoffs. For any non-trivial workflow, @@ -61,14 +61,14 @@ convenience and accepts no options, so it cannot be tuned. See `.collect()` returns a single `pl.DataFrame` on the **caller's process**. On the streaming engines that has two flavors: -- **`RayEngine` / `DaskEngine`** (single-client driver): every partition is pulled from the - cluster workers back to the driver and concatenated there. Convenient for results that fit - in driver memory but **a foot-gun for full distributed datasets**. E.g., calling `.collect()` - on a 1 TB query result sends 1 TB through your driver. Sink the result +- **`RayEngine` / `DaskEngine`** (single client): every partition is pulled from the + cluster workers back to the client and concatenated there. This is convenient for small + results but does not scale to large queries. E.g., calling `.collect()` on a 1 TB query + result sends 1 TB through your client. Sink the result (`.sink_parquet("path/")`, `.sink_csv(...)`, …) so each rank writes its own partition directly, or reduce/sample the data inside the query before `.collect()`. - **`SPMDEngine`** (one process per GPU): each rank's `.collect()` returns *that rank's* - local fragment. There is no driver to gather to. If you need a single concatenated + local fragment. There is no client to gather to. If you need a single concatenated `pl.DataFrame` across ranks, call {func}`~cudf_polars.engine.spmd.allgather_polars_dataframe` explicitly (see [Collecting distributed results](spmd_engine.md#collecting-distributed-results)). If you @@ -80,15 +80,15 @@ Rules of thumb for multi-machine `RayEngine` / `DaskEngine` runs: - For exports: prefer `.sink_*()` over `.collect()`. - For analysis: aggregate, sample, or `limit()` the result inside the lazy query before - `.collect()` so the driver only sees a small DataFrame. + `.collect()` so the client only sees a small DataFrame. - For further distributed processing in Python: switch to `SPMDEngine` so each rank keeps its fragment. ## Where to go next -- {doc}`usage` — tutorial that walks through running your first GPU query end-to-end. -- {doc}`other_engines` — per-engine reference pages for DaskEngine and SPMDEngine. -- {doc}`options` — the `StreamingOptions` configuration object and every field it surfaces. +- {doc}`usage`: tutorial that walks through running your first GPU query end-to-end. +- {doc}`other_engines`: per-engine reference pages for DaskEngine and SPMDEngine. +- {doc}`options`: the `StreamingOptions` configuration object and every field it surfaces. [ray-docs]: https://docs.ray.io/ [dask]: https://distributed.dask.org/ diff --git a/docs/cudf/source/cudf_polars/options.md b/docs/cudf/source/cudf_polars/options.md index a779f0c5ced..0a1de90dae2 100644 --- a/docs/cudf/source/cudf_polars/options.md +++ b/docs/cudf/source/cudf_polars/options.md @@ -40,18 +40,19 @@ categories of fields: | Category | Scope | Env var prefix | | ----------- | -------------------------------------------------------------------------------------- | ------------------------- | -| `rapidsmpf` | Streaming runtime, e.g. threads, CUDA streams, spilling, pinned memory, log level | `RAPIDSMPF_` | | `executor` | Query execution and partitioning, e.g. `max_rows_per_partition`, `fallback_mode`, ... | `CUDF_POLARS__EXECUTOR__` | | `engine` | `pl.GPUEngine` kwargs, e.g. Parquet, memory resource, CUDA streams, hardware binding | `CUDF_POLARS__` | +| `rapidsmpf` | Streaming runtime, e.g. threads, CUDA streams, spilling, pinned memory, log level | `RAPIDSMPF_` | The `engine` category surfaces the same tuning knobs as plain `pl.GPUEngine(...)`. For example, `parquet_options` and `memory_resource_config`. Configure these settings through {class}`~cudf_polars.engine.options.StreamingOptions` rather than passing them to `pl.GPUEngine(...)` directly. -The `rapidsmpf` category adds configuration for the streaming runtime that has no equivalent on the plain -`pl.GPUEngine`. See the [streaming runtime configuration reference][rapidsmpf-config] for the underlying -meaning of each `RAPIDSMPF_*` field. +The `rapidsmpf` category adds lower-level configuration for the streaming runtime that has no equivalent on +the plain `pl.GPUEngine`. Most users will not need to touch these directly. See the +[streaming runtime configuration reference][rapidsmpf-config] for the underlying meaning of each +`RAPIDSMPF_*` field. Every option has a corresponding environment variable. When an option is not set explicitly, its value is read from the environment variable if present; otherwise the underlying library applies @@ -94,13 +95,9 @@ See {doc}`in_memory_engine` for how to configure it. Environment variables follow these patterns: -* `rapidsmpf`: `RAPIDSMPF_` (e.g. `RAPIDSMPF_NUM_STREAMING_THREADS`) * `executor`: `CUDF_POLARS__EXECUTOR__` (e.g. `CUDF_POLARS__EXECUTOR__FALLBACK_MODE`) * `engine`: `CUDF_POLARS__` (e.g. `CUDF_POLARS__RAISE_ON_FAIL`; nested prefixes for structured options) - -### Category: `rapidsmpf` - -See the [streaming runtime configuration reference][rapidsmpf-config] for the full list of fields and defaults. +* `rapidsmpf`: `RAPIDSMPF_` (e.g. `RAPIDSMPF_NUM_STREAMING_THREADS`) ### Category: `executor` @@ -125,5 +122,10 @@ See the [streaming runtime configuration reference][rapidsmpf-config] for the fu | `hardware_binding` | Hardware binding policy. Pass a {class}`~cudf_polars.engine.hardware_binding.HardwareBindingPolicy` for fine-grained control. | `HardwareBindingPolicy()` | | `allow_gpu_sharing` | When `False` (default), the engine raises if multiple ranks share the same physical GPU. | `False` | +### Category: `rapidsmpf` + +Lower-level streaming runtime knobs. Most users will not need to touch these directly. See the +[streaming runtime configuration reference][rapidsmpf-config] for the full list of fields and defaults. + [rapidsmpf-config]: https://docs.rapids.ai/api/rapidsmpf/nightly/configuration/ diff --git a/docs/cudf/source/cudf_polars/spmd_engine.md b/docs/cudf/source/cudf_polars/spmd_engine.md index 4fdef37f17b..04abb94c73a 100644 --- a/docs/cudf/source/cudf_polars/spmd_engine.md +++ b/docs/cudf/source/cudf_polars/spmd_engine.md @@ -92,9 +92,9 @@ See {doc}`options` for the available fields. {class}`~cudf_polars.engine.spmd.SPMDEngine` exposes a few properties that are useful in SPMD code: -* `engine.nranks` / `engine.rank` — cluster size and local rank index. -* `engine.comm` — the active `rapidsmpf.communicator.Communicator`. -* `engine.context` — the active `rapidsmpf.streaming.core.context.Context`. +* `engine.nranks` / `engine.rank`: cluster size and local rank index. +* `engine.comm`: the active `rapidsmpf.communicator.Communicator`. +* `engine.context`: the active `rapidsmpf.streaming.core.context.Context`. ## Query symmetry requirement @@ -106,10 +106,10 @@ In practice: * Avoid rank-conditional `collect()` or `sink*()` calls. * Avoid branches that change the query graph. -* Keep the driver script deterministic. +* Keep the client script deterministic. ```python -# OK — every rank runs the same query in the same order. +# OK: every rank runs the same query in the same order. with SPMDEngine() as engine: result = ( pl.scan_parquet("/data/*.parquet") @@ -120,7 +120,7 @@ with SPMDEngine() as engine: ``` ```python -# DEADLOCKS — rank 0 issues a group_by collective the other ranks never see. +# DEADLOCKS: rank 0 issues a group_by collective the other ranks never see. with SPMDEngine() as engine: df = pl.scan_parquet("/data/*.parquet") if engine.rank == 0: # don't do this @@ -130,7 +130,7 @@ with SPMDEngine() as engine: ## Collecting distributed results -Unlike `RayEngine` / `DaskEngine`, where `.collect()` gathers every partition to the driver, +Unlike `RayEngine` / `DaskEngine`, where `.collect()` gathers every partition to the client, here each rank's `.collect()` returns *its own* fragment. If you want to keep processing the data rank-by-rank, just use that fragment directly; if you need a single concatenated view, use the helper below. @@ -157,7 +157,7 @@ with SPMDEngine() as engine: ) ``` -`op_id` identifies the collective across ranks — all ranks must pass the same value. +`op_id` identifies the collective across ranks. All ranks must pass the same value. {func}`~cudf_polars.streaming.collectives.common.reserve_op_id` draws from the same pool that cudf-polars uses internally for shuffle and join collectives, so there is no risk of collision. Do not pass hardcoded integers: they may silently collide with an ID reserved by an @@ -174,7 +174,7 @@ in a test suite or interactive session), repeated bootstrapping is unnecessary a the file-based coordination layer shared by all ranks. Pass a pre-created communicator via `comm=` to skip the bootstrap entirely. The engine does -**not** close the communicator on shutdown — the caller retains ownership and can reuse it +**not** close the communicator on shutdown. The caller retains ownership and can reuse it across multiple {class}`~cudf_polars.engine.spmd.SPMDEngine` lifetimes: ```python @@ -185,7 +185,7 @@ from cudf_polars.engine.spmd import SPMDEngine # Bootstrap once. comm = bootstrap.create_ucxx_comm(progress_thread=ProgressThread()) -# Reuse across multiple engine lifetimes — no re-bootstrap between them. +# Reuse across multiple engine lifetimes, no re-bootstrap between them. with SPMDEngine(comm=comm) as engine: result1 = df1.lazy().collect(engine=engine) diff --git a/docs/cudf/source/cudf_polars/usage.md b/docs/cudf/source/cudf_polars/usage.md index 12abf563325..9169bc65435 100644 --- a/docs/cudf/source/cudf_polars/usage.md +++ b/docs/cudf/source/cudf_polars/usage.md @@ -39,16 +39,13 @@ multiple engines for GPU execution. See {doc}`other_engines` for alternatives, o ``` ```{note} -`.collect()` pulls the full result back to the driver process. For large distributed outputs, +`.collect()` pulls the full result back to the client process. For large distributed outputs, prefer `.sink_*()` or aggregate/sample inside the query before `.collect()`. See [Result collection](engines.md#result-collection). ``` ## Configuring `RayEngine` -The same `from_options()` / `StreamingOptions` pattern shown here works for every streaming -engine — see {doc}`other_engines` for the DaskEngine and SPMDEngine variants. - {class}`~cudf_polars.engine.ray.RayEngine` with no arguments starts a local [Ray][ray-docs] cluster and creates one GPU worker per visible GPU. @@ -74,10 +71,13 @@ See {doc}`options` for the available fields. context-manager form so the Ray cluster and GPU workers are torn down automatically. ``` +The same `from_options()` / `StreamingOptions` pattern shown here works for every streaming +engine. See {doc}`other_engines` for the DaskEngine and SPMDEngine variants. + ## Attaching to an existing Ray cluster For multi-node runs, start a Ray cluster separately (for example with `ray start` on each -node) and attach to it from your driver script. When Ray is already initialized, +node) and attach to it from your client script. When Ray is already initialized, {class}`~cudf_polars.engine.ray.RayEngine` connects to the running cluster and leaves it untouched on exit: @@ -96,9 +96,8 @@ with RayEngine() as engine: ) ``` -{class}`~cudf_polars.engine.ray.RayEngine` creates one rank per GPU in -the Ray cluster and bootstraps a UCXX communicator across them. It raises `RuntimeError` if -created inside an `rrun` cluster or if no GPUs are available. +{class}`~cudf_polars.engine.ray.RayEngine` creates one rank per GPU in the Ray cluster. +It raises `RuntimeError` if no GPUs are available. ## Manual Engine Lifetime Control @@ -152,8 +151,8 @@ the specified path. ## Cluster diagnostics {meth}`~cudf_polars.engine.ray.RayEngine.gather_cluster_info` returns -a list of {class}`~cudf_polars.engine.core.ClusterInfo` — one per rank -actor — with fields `hostname`, `pid`, `cuda_visible_devices`, and `gpu_uuid`: +a list of {class}`~cudf_polars.engine.core.ClusterInfo`, one per rank +actor, with fields `hostname`, `pid`, `cuda_visible_devices`, and `gpu_uuid`: ```python with RayEngine() as engine: From 2a25d35ed6109367f203e9e5e4e74f5fd8a00a6f Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Tue, 19 May 2026 20:08:11 +0200 Subject: [PATCH 3/5] coderabbitai --- docs/cudf/source/cudf_polars/api.md | 2 +- docs/cudf/source/cudf_polars/profiling.md | 8 +++++++- docs/cudf/source/cudf_polars/usage.md | 8 ++++---- 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/docs/cudf/source/cudf_polars/api.md b/docs/cudf/source/cudf_polars/api.md index 9f89053a584..53460fd7c6b 100644 --- a/docs/cudf/source/cudf_polars/api.md +++ b/docs/cudf/source/cudf_polars/api.md @@ -25,7 +25,7 @@ multi-GPU engines. :show-inheritance: ``` -The three engines share a common base class: +The engine classes share a common base class: ```{eval-rst} .. autoclass:: cudf_polars.engine.core.StreamingEngine diff --git a/docs/cudf/source/cudf_polars/profiling.md b/docs/cudf/source/cudf_polars/profiling.md index a2e08586e65..259a629e405 100644 --- a/docs/cudf/source/cudf_polars/profiling.md +++ b/docs/cudf/source/cudf_polars/profiling.md @@ -45,6 +45,12 @@ and merges the per-rank statistics into a single `Statistics` (counts and values reduced with `max`). Capture it inside the engine context, then print after exit: ```python +import polars as pl +from cudf_polars.engine.options import StreamingOptions +from cudf_polars.engine.ray import RayEngine + +opts = StreamingOptions(statistics=True) + with RayEngine.from_options(opts) as engine: result = pl.scan_parquet("/data/*.parquet").collect(engine=engine) total = engine.global_statistics(clear=True) @@ -142,7 +148,7 @@ The different scopes have different schemas. Fields in **bold** are required / a Setting `CUDF_POLARS_LOG_TRACES=1` enables all the metrics. Depending on the query, the overhead from collecting the memory or dataframe metrics can be measurable. You can disable some metrics -through additional environment variables. For example, to disable the memory related metrics, set: +through additional environment variables. For example, to disable the memory-related metrics, set: ```bash CUDF_POLARS_LOG_TRACES=1 CUDF_POLARS_LOG_TRACES_MEMORY=0 diff --git a/docs/cudf/source/cudf_polars/usage.md b/docs/cudf/source/cudf_polars/usage.md index 9169bc65435..1f83608d54a 100644 --- a/docs/cudf/source/cudf_polars/usage.md +++ b/docs/cudf/source/cudf_polars/usage.md @@ -5,7 +5,7 @@ an `engine=` argument to `.collect()` or `.sink_*()`. See {doc}`engines` for the conceptual picture; this page walks through running your first query. -We always recommend constructing an engine object and using them in a context manager to ensure proper resource cleanup. The engine constructor is +We always recommend constructing an engine object and using it in a context manager to ensure proper resource cleanup. The engine constructor is where you specify {class}`~cudf_polars.engine.options.StreamingOptions` such as `spill_to_pinned_memory` or `fallback_mode`. Ray is the showcased example below; see also {doc}`other_engines`. @@ -31,7 +31,7 @@ print(result) {class}`~cudf_polars.engine.ray.RayEngine` with no arguments uses every GPU visible to the process, so the example above runs on one GPU if that's all that's available and scales automatically to every GPU on the node otherwise. It also attaches to an existing -Ray cluster if one is already running (see [Attaching to an existing Raycluster](#attaching-to-an-existing-ray-cluster)). +Ray cluster if one is already running (see [Attaching to an existing Ray cluster](#attaching-to-an-existing-ray-cluster)). ```{note} The examples on this page use {class}`~cudf_polars.engine.ray.RayEngine`. `cudf-polars` supports @@ -101,8 +101,8 @@ It raises `RuntimeError` if no GPUs are available. ## Manual Engine Lifetime Control -When you need to control the engine lifetime explicitly. For example, in a Jupyter notebook -where a `with` block cannot span multiple cells, construct `RayEngine` once and reuse it, +When you need to control the engine lifetime explicitly, for example in a Jupyter notebook +where a `with` block cannot span multiple cells, construct a `RayEngine` once and reuse it, then call `engine.shutdown()` when you are done: ```python From 56d9d85759171bde2cd9e10d69ad7153e460a66e Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Wed, 20 May 2026 09:22:44 +0200 Subject: [PATCH 4/5] update paths --- docs/cudf/source/cudf_polars/api.md | 2 +- docs/cudf/source/cudf_polars/spmd_engine.md | 4 ++-- python/cudf_polars/cudf_polars/engine/spmd.py | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/cudf/source/cudf_polars/api.md b/docs/cudf/source/cudf_polars/api.md index 53460fd7c6b..6acff73d4f0 100644 --- a/docs/cudf/source/cudf_polars/api.md +++ b/docs/cudf/source/cudf_polars/api.md @@ -55,7 +55,7 @@ The engine classes share a common base class: ```{eval-rst} .. autofunction:: cudf_polars.engine.spmd.allgather_polars_dataframe -.. autofunction:: cudf_polars.streaming.collectives.common.reserve_op_id +.. autofunction:: cudf_polars.streaming.actor_graph.collectives.common.reserve_op_id ``` ## Internal configuration objects diff --git a/docs/cudf/source/cudf_polars/spmd_engine.md b/docs/cudf/source/cudf_polars/spmd_engine.md index 04abb94c73a..9174dd8c03e 100644 --- a/docs/cudf/source/cudf_polars/spmd_engine.md +++ b/docs/cudf/source/cudf_polars/spmd_engine.md @@ -140,7 +140,7 @@ use the helper below. the full dataset on every rank: ```python -from cudf_polars.streaming.collectives.common import reserve_op_id +from cudf_polars.streaming.actor_graph.collectives.common import reserve_op_id from cudf_polars.engine.spmd import ( SPMDEngine, allgather_polars_dataframe, @@ -158,7 +158,7 @@ with SPMDEngine() as engine: ``` `op_id` identifies the collective across ranks. All ranks must pass the same value. -{func}`~cudf_polars.streaming.collectives.common.reserve_op_id` draws from the same +{func}`~cudf_polars.streaming.actor_graph.collectives.common.reserve_op_id` draws from the same pool that cudf-polars uses internally for shuffle and join collectives, so there is no risk of collision. Do not pass hardcoded integers: they may silently collide with an ID reserved by an active collective inside `collect()`. diff --git a/python/cudf_polars/cudf_polars/engine/spmd.py b/python/cudf_polars/cudf_polars/engine/spmd.py index 2647c8f2ce3..18804f410ef 100644 --- a/python/cudf_polars/cudf_polars/engine/spmd.py +++ b/python/cudf_polars/cudf_polars/engine/spmd.py @@ -141,7 +141,7 @@ def allgather_polars_dataframe( Rank-local DataFrame to contribute. op_id Operation ID for this AllGather collective. Must be identical on every - rank. For example, use :func:`~cudf_polars.streaming.collectives.common.reserve_op_id` to obtain a collision-free + rank. For example, use :func:`~cudf_polars.streaming.actor_graph.collectives.common.reserve_op_id` to obtain a collision-free ID from the same pool used internally by cudf-polars. Avoid passing hardcoded integers. From de13973621de1f33e48fdf6a406bffa2f1dccab5 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Wed, 20 May 2026 09:27:05 +0200 Subject: [PATCH 5/5] cleanup --- docs/cudf/source/cudf_polars/dask_engine.md | 4 ++-- .../source/cudf_polars/default_singleton_engine.md | 8 +++----- docs/cudf/source/cudf_polars/engines.md | 10 +++++----- docs/cudf/source/cudf_polars/options.md | 7 +++---- docs/cudf/source/cudf_polars/other_engines.md | 11 +++++------ docs/cudf/source/cudf_polars/profiling.md | 4 ++-- docs/cudf/source/cudf_polars/spmd_engine.md | 11 +++++------ docs/cudf/source/cudf_polars/usage.md | 8 ++++---- 8 files changed, 29 insertions(+), 34 deletions(-) diff --git a/docs/cudf/source/cudf_polars/dask_engine.md b/docs/cudf/source/cudf_polars/dask_engine.md index 074508abbde..190968cc07a 100644 --- a/docs/cudf/source/cudf_polars/dask_engine.md +++ b/docs/cudf/source/cudf_polars/dask_engine.md @@ -142,11 +142,11 @@ coordinated or they will fight: to `DaskEngine` so it does not try to re-pin affinity on top of dask-cuda's binding. * **Do not pass `--rmm-pool-size`, `--rmm-managed-memory`, or similar RMM flags** to `dask-cuda-worker`. Let `DaskEngine` own the memory resource via its `memory_resource_config` - (see {doc}`options`); otherwise two different memory resources will be installed on the same + (see {doc}`options`) otherwise two different memory resources will be installed on the same worker. * **Do not pass `--enable-tcp-over-ucx`, `--enable-infiniband`, `--enable-nvlink`, or `--enable-rdmacm`** to `dask-cuda-worker`. `DaskEngine` bootstraps its own UCXX communicator - and will select transports itself; enabling them on both sides can produce inconsistent UCX + and will select transports itself. Enabling them on both sides can produce inconsistent UCX configuration across the cluster. ```bash diff --git a/docs/cudf/source/cudf_polars/default_singleton_engine.md b/docs/cudf/source/cudf_polars/default_singleton_engine.md index 6ec4f2b87de..05d3017d757 100644 --- a/docs/cudf/source/cudf_polars/default_singleton_engine.md +++ b/docs/cudf/source/cudf_polars/default_singleton_engine.md @@ -5,11 +5,9 @@ construct a streaming engine explicitly. It runs the same streaming executor as the explicit engines (Ray, Dask, SPMD), conceptually similar to [Polars' own streaming engine](https://docs.pola.rs/user-guide/concepts/streaming/) but on the -GPU. Under the hood it's backed by -{class}`~cudf_polars.engine.default_singleton_engine.DefaultSingletonEngine`, -a process-wide singleton specialization of -{class}`~cudf_polars.engine.spmd.SPMDEngine`. At most one live -instance exists per process; it is created lazily on first use and torn down at interpreter +GPU. Under the hood it's backed by {class}`~cudf_polars.engine.default_singleton_engine.DefaultSingletonEngine`, +a process-wide singleton specialization of {class}`~cudf_polars.engine.spmd.SPMDEngine`. At most one live +instance exists per process, which is created lazily on first use and torn down at interpreter exit. Ray is the showcased explicit engine (see {doc}`usage`); this page documents what `engine="gpu"` does *without* you having to construct anything. diff --git a/docs/cudf/source/cudf_polars/engines.md b/docs/cudf/source/cudf_polars/engines.md index e833a71ea1b..3bb83bfcbea 100644 --- a/docs/cudf/source/cudf_polars/engines.md +++ b/docs/cudf/source/cudf_polars/engines.md @@ -45,16 +45,16 @@ details. | Engine | Cluster model | Extra runtime dependency | Typical use | | --------------------------------------------- | --------------------------------------------------------| ------------------------ | ------------------------------------------------------------------------------- | -| {class}`~cudf_polars.engine.ray.RayEngine` | Single client; one Ray actor per GPU | [Ray][ray-docs] | Works from a laptop to a cloud cluster. No separate cluster setup needed. | -| {class}`~cudf_polars.engine.dask.DaskEngine` | Single client; one Dask worker per GPU | [Dask distributed][dask] | Teams with an existing Dask deployment or a preferred Dask launcher. | +| {class}`~cudf_polars.engine.ray.RayEngine` | Single client, one Ray actor per GPU | [Ray][ray-docs] | Works from a laptop to a cloud cluster. No separate cluster setup needed. | +| {class}`~cudf_polars.engine.dask.DaskEngine` | Single client, one Dask worker per GPU | [Dask distributed][dask] | Teams with an existing Dask deployment or a preferred Dask launcher. | | {class}`~cudf_polars.engine.spmd.SPMDEngine` | Same script runs once per GPU, joined by a communicator | UCXX (under `rrun`) | HPC / SPMD launchers such as `rrun`. Single-rank mode needs no cluster at all. | | [`engine="gpu"`](default_singleton_engine.md) | Implicit process-wide singleton on one GPU; no cluster | None | Default when no engine is constructed. Short scripts and notebooks. No options. | All four approaches use the same execution model under the hood, so which to select depends on your preferred deployment method, not performance tradeoffs. For any non-trivial workflow, -construct one of the first three engines explicitly (see {doc}`usage`); `engine="gpu"` is a -convenience and accepts no options, so it cannot be tuned. See -{doc}`default_singleton_engine` for details on the implementation that backs it. +construct one of the first three engines explicitly (see {doc}`usage`). `engine="gpu"` is a +convenience and accepts no options, so it cannot be tuned. See {doc}`default_singleton_engine` +for details on the implementation that backs it. ## Result collection diff --git a/docs/cudf/source/cudf_polars/options.md b/docs/cudf/source/cudf_polars/options.md index 0a1de90dae2..20ff7ce1b4a 100644 --- a/docs/cudf/source/cudf_polars/options.md +++ b/docs/cudf/source/cudf_polars/options.md @@ -2,7 +2,7 @@ # Configuration Options {class}`~cudf_polars.engine.options.StreamingOptions` is the recommended -way to configure the streaming engines (Ray, Dask, SPMD; the default `engine="gpu"` accepts no +way to configure the streaming engines (Ray, Dask, SPMD. The default `engine="gpu"` accepts no options, see the note below). Build one and pass it to `RayEngine.from_options()` to construct a {class}`~cudf_polars.engine.ray.RayEngine`: @@ -62,9 +62,8 @@ and `{"0", "false", "no", "n"}` as false. ## Building from a dictionary -{meth}`~cudf_polars.engine.options.StreamingOptions.from_dict` accepts a -flat dict of field names. Unknown keys raise `TypeError`; `None` values leave the field -unspecified: +{meth}`~cudf_polars.engine.options.StreamingOptions.from_dict` accepts a flat dict of field names. +Unknown keys raise `TypeError` and `None` values leave the field unspecified: ```python opts = StreamingOptions.from_dict({ diff --git a/docs/cudf/source/cudf_polars/other_engines.md b/docs/cudf/source/cudf_polars/other_engines.md index 2c4e3acfa4a..d09b031aa18 100644 --- a/docs/cudf/source/cudf_polars/other_engines.md +++ b/docs/cudf/source/cudf_polars/other_engines.md @@ -9,12 +9,11 @@ other ways to run cudf-polars: GPU. Use this when you already have a Dask deployment or a preferred Dask launcher. * **{doc}`spmd_engine`** is single program, multiple data: the same script runs once per GPU, typically launched with `rrun`. Single-rank mode needs no external cluster at all. -* **{doc}`default_singleton_engine`** documents what `engine="gpu"` does under the hood - when no engine is constructed explicitly. Useful to *understand*; for any non-trivial - workflow we recommend constructing an explicit engine so you can pass - {class}`~cudf_polars.engine.options.StreamingOptions`. -* **{doc}`in_memory_engine`** (`engine=pl.GPUEngine(executor="in-memory")`) is the only non-streaming path. Suitable - for small queries (data that fits in device memory), debugging, or when you specifically +* **{doc}`default_singleton_engine`** documents what `engine="gpu"` does under the hood when no + engine is constructed explicitly. Useful to *understand*, but for any non-trivial workflow we + recommend constructing an explicit engine so you can pass {class}`~cudf_polars.engine.options.StreamingOptions`. +* **{doc}`in_memory_engine`** (`engine=pl.GPUEngine(executor="in-memory")`) is the only non-streaming + path. Suitable for small queries (data that fits in device memory), debugging, or when you specifically need `LazyFrame.profile`. See {doc}`engines` for the conceptual comparison with `RayEngine` (cluster model, runtime diff --git a/docs/cudf/source/cudf_polars/profiling.md b/docs/cudf/source/cudf_polars/profiling.md index 259a629e405..ed19c8dd178 100644 --- a/docs/cudf/source/cudf_polars/profiling.md +++ b/docs/cudf/source/cudf_polars/profiling.md @@ -60,7 +60,7 @@ print(total) ## GPU Profiling -For streaming queries, we recommend profiling with [NVIDIA NSight Systems][nsight]; `cudf-polars` +For streaming queries, we recommend profiling with [NVIDIA NSight Systems][nsight]. `cudf-polars` includes [nvtx][nvtx] annotations to help you understand where time is being spent. Streaming engines do not support `LazyFrame.profile`, since `profile` requires a single in-memory pass. @@ -73,7 +73,7 @@ q = pl.scan_parquet("ny-taxi/2024/*.parquet").filter(pl.col("total_amount") > 15 profile = q.profile(engine=pl.GPUEngine(executor="in-memory")) ``` -The result is `(result_df, timings_df)`; see the Polars docs link above for the schema. +The result is `(result_df, timings_df)`, see the Polars docs link above for the schema. ## Tracing diff --git a/docs/cudf/source/cudf_polars/spmd_engine.md b/docs/cudf/source/cudf_polars/spmd_engine.md index 9174dd8c03e..a15e727ab53 100644 --- a/docs/cudf/source/cudf_polars/spmd_engine.md +++ b/docs/cudf/source/cudf_polars/spmd_engine.md @@ -14,10 +14,9 @@ to override this behaviour. ## Single-GPU setup -To use {class}`~cudf_polars.engine.spmd.SPMDEngine` on a single GPU, -create the engine and run your Python script as normal. You still get the full streaming -executor (partitioned inputs, spilling, scaling past device memory); you just don't need any -multi-process coordination: +To use {class}`~cudf_polars.engine.spmd.SPMDEngine` on a single GPU, create the engine and +run your Python script as normal. You still get the full streaming executor (partitioned inputs, +spilling, scaling past device memory), you just don't need any multi-process coordination: ```python # python my_script.py @@ -99,7 +98,7 @@ that are useful in SPMD code: ## Query symmetry requirement All ranks must execute the **same sequence of queries in the same order**. Collective operations -are matched using internal operation IDs; if one rank executes a collective that another rank +are matched using internal operation IDs. If one rank executes a collective that another rank does not, the program will deadlock. In practice: @@ -132,7 +131,7 @@ with SPMDEngine() as engine: Unlike `RayEngine` / `DaskEngine`, where `.collect()` gathers every partition to the client, here each rank's `.collect()` returns *its own* fragment. If you want to keep processing the -data rank-by-rank, just use that fragment directly; if you need a single concatenated view, +data rank-by-rank, just use that fragment directly. If you need a single concatenated view, use the helper below. `collect()` returns a rank-local result. Use diff --git a/docs/cudf/source/cudf_polars/usage.md b/docs/cudf/source/cudf_polars/usage.md index 1f83608d54a..6bcd4220774 100644 --- a/docs/cudf/source/cudf_polars/usage.md +++ b/docs/cudf/source/cudf_polars/usage.md @@ -3,11 +3,11 @@ `cudf-polars` runs your Polars `LazyFrame` queries on GPU. You select GPU execution by passing an `engine=` argument to `.collect()` or `.sink_*()`. See {doc}`engines` for the conceptual -picture; this page walks through running your first query. +picture, this page walks through running your first query. -We always recommend constructing an engine object and using it in a context manager to ensure proper resource cleanup. The engine constructor is -where you specify {class}`~cudf_polars.engine.options.StreamingOptions` -such as `spill_to_pinned_memory` or `fallback_mode`. Ray is the showcased example below; see also +We always recommend constructing an engine object and using it in a context manager to ensure proper +resource cleanup. The engine constructor is where you specify {class}`~cudf_polars.engine.options.StreamingOptions` +such as `spill_to_pinned_memory` or `fallback_mode`. Ray is the showcased example below, see also {doc}`other_engines`. ## Your first GPU query