Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dependencies.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ files:
- depends_on_pylibcudf
- depends_on_libcudf
- depends_on_cudf_polars
- depends_on_ray
- docs
- py_version
py_build_cudf:
Expand Down
18 changes: 18 additions & 0 deletions docs/cudf/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -625,12 +625,30 @@ def on_missing_reference(app, env, node, contnode):
("py:class", "Axis"),
("py:class", "ArrowLike"),
("py:class", "ExecutorType"),
# cudf-polars: bare rapidsmpf type names appear in autodoc'd signatures
# because they are imported under ``if TYPE_CHECKING:`` and rendered as
# unqualified strings in type annotations. The ``rapidsmpf.*`` regex below
# only matches fully qualified targets, so the bare leaf names are listed
# explicitly here.
("py:class", "Statistics"),
("py:class", "Communicator"),
("py:class", "Options"),
# polars aliases that don't match the public intersphinx targets.
("py:class", "pl.DataFrame"),
("py:class", "polars.LazyFrame"),
("py:class", "polars.DataFrame"),
("py:class", "polars.dataframe.frame.DataFrame"),
]
# Temporarily disable nitpick warnings for pandas: https://github.com/pandas-dev/pandas/issues/64584
nitpick_ignore_regex = [
("py:.*", "pandas.*"),
("py:.*", "pd.*"),
("ref.*", ".*pandas.*"),
# External libs without configured intersphinx inventories.
("py:.*", r"rapidsmpf(\..*)?"),
("py:.*", r"ray(\..*)?"),
("py:.*", r"distributed(\..*)?"),
("py:.*", r"dask_cuda(\..*)?"),
]


Expand Down
70 changes: 63 additions & 7 deletions docs/cudf/source/cudf_polars/api.md
Original file line number Diff line number Diff line change
@@ -1,17 +1,73 @@
(cudf-polars-api)=
# API
# API Reference

For the most part, the public API of `cudf-polars` is the polars API.
For the most part, the public API of `cudf-polars` is the Polars API itself. This page
documents the additional classes and functions that `cudf-polars` exposes for the streaming
multi-GPU engines.

## Streaming engines

```{eval-rst}
.. autoclass:: cudf_polars.engine.ray.RayEngine
:members: from_options, gather_cluster_info, gather_statistics, global_statistics, shutdown, nranks
:show-inheritance:

.. autoclass:: cudf_polars.engine.dask.DaskEngine
:members: from_options, gather_cluster_info, gather_statistics, global_statistics, shutdown, nranks
:show-inheritance:

.. autoclass:: cudf_polars.engine.spmd.SPMDEngine
:members: from_options, gather_cluster_info, gather_statistics, global_statistics, shutdown, nranks, rank, comm, context
:show-inheritance:

.. autoclass:: cudf_polars.engine.default_singleton_engine.DefaultSingletonEngine
:members: get_or_create, shutdown
:show-inheritance:
```
Comment thread
coderabbitai[bot] marked this conversation as resolved.

The engine classes share a common base class:

```{eval-rst}
.. autoclass:: cudf_polars.engine.core.StreamingEngine
:members: gather_cluster_info, gather_statistics, global_statistics, shutdown, nranks
:show-inheritance:

.. autoclass:: cudf_polars.engine.core.ClusterInfo
:members:
```

## Configuration

```{eval-rst}
.. autoclass:: cudf_polars.engine.options.StreamingOptions
:members: from_dict, to_dict, to_rapidsmpf_options, to_executor_options, to_engine_options

.. autodata:: cudf_polars.engine.options.UNSPECIFIED

.. autoclass:: cudf_polars.engine.hardware_binding.HardwareBindingPolicy
:members:

.. autofunction:: cudf_polars.engine.hardware_binding.bind_to_gpu
```

## SPMD helpers
Comment thread
madsbk marked this conversation as resolved.

```{eval-rst}
.. autofunction:: cudf_polars.engine.spmd.allgather_polars_dataframe

.. autofunction:: cudf_polars.streaming.actor_graph.collectives.common.reserve_op_id
```

## Internal configuration objects

These dataclasses back the `engine_options` surfaced by `pl.GPUEngine` and `StreamingOptions`.
Most users interact with them through `StreamingOptions` fields rather than directly.

```{eval-rst}
.. automodule:: cudf_polars.utils.config
:members:
Cluster,
ConfigOptions,
CUDAStreamPoolConfig,
DynamicPlanningOptions,
ExecutorType,
InMemoryExecutor,
MemoryResourceConfig,
ParquetOptions,
StreamingExecutor,
StreamingFallbackMode,
Expand Down
198 changes: 198 additions & 0 deletions docs/cudf/source/cudf_polars/dask_engine.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
(cudf-polars-dask-engine)=
# Dask

{class}`~cudf_polars.engine.dask.DaskEngine` runs the streaming executor
on a [Dask distributed][dask-distributed] cluster: one Dask worker per GPU, coordinated by a
single client process. Partitions are streamed through the query plan and collective operations
(shuffles, allgathers, joins) run across workers over a shared UCXX communicator. On startup,
each worker is pinned to the CPU cores and NUMA node closest to its GPU (see
[Pre-configured GPU clusters](#pre-configured-gpu-clusters) below).

```python
import polars as pl
from cudf_polars.engine.dask import DaskEngine

with DaskEngine() as engine:
result = (
pl.scan_parquet("/data/dataset/*.parquet")
.filter(pl.col("amount") > 100)
.group_by("customer_id")
.agg(pl.col("amount").sum())
.collect(engine=engine)
)
print(result)
```

With no arguments, {class}`~cudf_polars.engine.dask.DaskEngine` creates a
`distributed.LocalCluster` with one worker per visible GPU, a `distributed.Client`, and
bootstraps a UCXX communicator across all workers. On exit, everything it created is torn down.

```{note}
`.collect()` pulls the full result back to the client process. For large distributed outputs,
prefer `.sink_*()` or aggregate/sample inside the query before `.collect()`. See
[Result collection](engines.md#result-collection).
```

## Configuring `DaskEngine`

For custom configuration, build
{class}`~cudf_polars.engine.options.StreamingOptions` and use
`DaskEngine.from_options()`:
Comment thread
rjzamora marked this conversation as resolved.

```python
import polars as pl
from cudf_polars.engine.options import StreamingOptions
from cudf_polars.engine.dask import DaskEngine

opts = StreamingOptions(num_streaming_threads=8, fallback_mode="silent")

with DaskEngine.from_options(opts) as engine:
result = pl.scan_parquet("/data/dataset/*.parquet").collect(engine=engine)
```

See {doc}`options` for the available fields.

## Bring your own Dask client

Pass an existing `distributed.Client` via `dask_client=` to attach to an already-running
scheduler:

```python
from distributed import Client
import polars as pl
from cudf_polars.engine.dask import DaskEngine

with Client("scheduler-address:8786") as dc:
Comment thread
coderabbitai[bot] marked this conversation as resolved.
with DaskEngine(dask_client=dc) as engine:
result = pl.scan_parquet("/data/*.parquet").collect(engine=engine)
```

When you supply the client, {class}`~cudf_polars.engine.dask.DaskEngine`
leaves it (and the cluster) alone on exit.

(pre-configured-gpu-clusters)=
### Pre-configured GPU clusters

Some Dask launchers, notably `dask_cuda.LocalCUDACluster`, already pin CPU affinity and set
`CUDA_VISIBLE_DEVICES` per worker. Disable the built-in hardware binding via
{class}`~cudf_polars.engine.hardware_binding.HardwareBindingPolicy`
to avoid having both layers fight over each worker's affinity (the second to run wins, which
makes the resulting placement non-deterministic):

```python
from dask_cuda import LocalCUDACluster
from distributed import Client
from cudf_polars.engine.dask import DaskEngine
from cudf_polars.engine.hardware_binding import (
HardwareBindingPolicy,
)

with Client(LocalCUDACluster()) as dc, DaskEngine(
dask_client=dc,
engine_options={
"hardware_binding": HardwareBindingPolicy(enabled=False),
},
) as engine:
...
```
Comment thread
coderabbitai[bot] marked this conversation as resolved.

### Manually launched workers

When launching workers yourself (for example on a multi-node HPC cluster), use the built-in nanny
preload to assign one GPU per worker. The preload sets `CUDA_VISIBLE_DEVICES` on each worker
before the process spawns:

```bash
# On each node, launch one worker per GPU with a single thread each:
dask worker SCHEDULER_ADDRESS:8786 --nworkers N --nthreads 1 \
--preload-nanny cudf_polars.engine.dask
```

Then connect from the client:

```python
import polars as pl
from distributed import Client
from cudf_polars.engine.dask import DaskEngine

with Client("SCHEDULER_ADDRESS:8786") as dc:
with DaskEngine(dask_client=dc) as engine:
result = pl.scan_parquet("/data/*.parquet").collect(engine=engine)
```
Comment thread
coderabbitai[bot] marked this conversation as resolved.

Hardware binding (CPU affinity, NUMA, network) is handled automatically by
{class}`~cudf_polars.engine.dask.DaskEngine`; the nanny preload only
deals with GPU assignment.

See the [Dask CLI deployment guide][dask-cli] for more on `dask worker` options.

#### Using `dask-cuda-worker`

As an alternative to the built-in nanny preload, you can launch workers with
[`dask-cuda-worker`][dask-cuda-worker] from the [dask-cuda][dask-cuda] project. It launches one
worker per visible GPU and installs a set of plugins on every worker: a `CPUAffinity` plugin
that pins the worker to the NUMA node of its GPU, an `RMMSetup` plugin, and a nanny preload that
configures UCX.

`DaskEngine` sets up the same things for its own streaming runtime, so the two need to be
coordinated or they will fight:

* **CPU affinity is unconditional in `dask-cuda-worker`**, the `CPUAffinity` plugin is always
installed and there is no CLI flag to turn it off. Pass `hardware_binding=HardwareBindingPolicy(enabled=False)`
to `DaskEngine` so it does not try to re-pin affinity on top of dask-cuda's binding.
* **Do not pass `--rmm-pool-size`, `--rmm-managed-memory`, or similar RMM flags** to
`dask-cuda-worker`. Let `DaskEngine` own the memory resource via its `memory_resource_config`
(see {doc}`options`) otherwise two different memory resources will be installed on the same
worker.
* **Do not pass `--enable-tcp-over-ucx`, `--enable-infiniband`, `--enable-nvlink`, or
`--enable-rdmacm`** to `dask-cuda-worker`. `DaskEngine` bootstraps its own UCXX communicator
and will select transports itself. Enabling them on both sides can produce inconsistent UCX
configuration across the cluster.

```bash
# On each node, GPU assignment + CPU affinity only (no RMM, no UCX flags):
dask-cuda-worker SCHEDULER_ADDRESS:8786
```

```python
import polars as pl
from distributed import Client
from cudf_polars.engine.dask import DaskEngine
from cudf_polars.engine.hardware_binding import (
HardwareBindingPolicy,
)

with Client("SCHEDULER_ADDRESS:8786") as dc:
with DaskEngine(
dask_client=dc,
engine_options={
# dask-cuda-worker always pins CPU affinity; disable DaskEngine's
# binding so the two don't conflict.
"hardware_binding": HardwareBindingPolicy(enabled=False),
},
) as engine:
result = pl.scan_parquet("/data/*.parquet").collect(engine=engine)
```

## Cluster diagnostics

{meth}`~cudf_polars.engine.dask.DaskEngine.gather_cluster_info` returns
placement information for every worker:

```python
with DaskEngine() as engine:
print(f"cluster has {engine.nranks} workers")
for info in engine.gather_cluster_info():
print(
f"hostname={info['hostname']}, pid={info['pid']}, "
f"CUDA_VISIBLE_DEVICES={info['cuda_visible_devices']}"
)
```

{class}`~cudf_polars.engine.dask.DaskEngine` raises `RuntimeError` if
created inside an `rrun` cluster.

[dask-distributed]: https://distributed.dask.org/
[dask-cli]: https://docs.dask.org/en/latest/deploying-cli.html
[dask-cuda]: https://docs.rapids.ai/api/dask-cuda/nightly/
[dask-cuda-worker]: https://docs.rapids.ai/api/dask-cuda/nightly/quickstart/#dask-cuda-worker
Loading
Loading