Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dependencies.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ files:
- depends_on_pylibcudf
- depends_on_libcudf
- depends_on_cudf_polars
- depends_on_ray
- docs
- py_version
py_build_cudf:
Expand Down
18 changes: 18 additions & 0 deletions docs/cudf/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -625,12 +625,30 @@ def on_missing_reference(app, env, node, contnode):
("py:class", "Axis"),
("py:class", "ArrowLike"),
("py:class", "ExecutorType"),
# cudf-polars: bare rapidsmpf type names appear in autodoc'd signatures
# because they are imported under ``if TYPE_CHECKING:`` and rendered as
# unqualified strings in type annotations. The ``rapidsmpf.*`` regex below
# only matches fully qualified targets, so the bare leaf names are listed
# explicitly here.
("py:class", "Statistics"),
("py:class", "Communicator"),
("py:class", "Options"),
# polars aliases that don't match the public intersphinx targets.
("py:class", "pl.DataFrame"),
("py:class", "polars.LazyFrame"),
("py:class", "polars.DataFrame"),
("py:class", "polars.dataframe.frame.DataFrame"),
]
# Temporarily disable nitpick warnings for pandas: https://github.com/pandas-dev/pandas/issues/64584
nitpick_ignore_regex = [
("py:.*", "pandas.*"),
("py:.*", "pd.*"),
("ref.*", ".*pandas.*"),
# External libs without configured intersphinx inventories.
("py:.*", r"rapidsmpf(\..*)?"),
("py:.*", r"ray(\..*)?"),
("py:.*", r"distributed(\..*)?"),
("py:.*", r"dask_cuda(\..*)?"),
]


Expand Down
70 changes: 63 additions & 7 deletions docs/cudf/source/cudf_polars/api.md
Original file line number Diff line number Diff line change
@@ -1,17 +1,73 @@
(cudf-polars-api)=
# API
# API Reference

For the most part, the public API of `cudf-polars` is the polars API.
For the most part, the public API of `cudf-polars` is the Polars API itself. This page
documents the additional classes and functions that `cudf-polars` exposes for the streaming
multi-GPU engines.

## Streaming engines

```{eval-rst}
.. autoclass:: cudf_polars.engine.ray.RayEngine
:members: from_options, gather_cluster_info, gather_statistics, global_statistics, shutdown, nranks
:show-inheritance:

.. autoclass:: cudf_polars.engine.dask.DaskEngine
:members: from_options, gather_cluster_info, gather_statistics, global_statistics, shutdown, nranks
:show-inheritance:

.. autoclass:: cudf_polars.engine.spmd.SPMDEngine
:members: from_options, gather_cluster_info, gather_statistics, global_statistics, shutdown, nranks, rank, comm, context
:show-inheritance:

.. autoclass:: cudf_polars.engine.default_singleton_engine.DefaultSingletonEngine
:members: get_or_create, shutdown
:show-inheritance:
```
Comment thread
coderabbitai[bot] marked this conversation as resolved.

The engine classes share a common base class:

```{eval-rst}
.. autoclass:: cudf_polars.engine.core.StreamingEngine
:members: gather_cluster_info, gather_statistics, global_statistics, shutdown, nranks
:show-inheritance:

.. autoclass:: cudf_polars.engine.core.ClusterInfo
:members:
```

## Configuration

```{eval-rst}
.. autoclass:: cudf_polars.engine.options.StreamingOptions
:members: from_dict, to_dict, to_rapidsmpf_options, to_executor_options, to_engine_options

.. autodata:: cudf_polars.engine.options.UNSPECIFIED

.. autoclass:: cudf_polars.engine.hardware_binding.HardwareBindingPolicy
:members:

.. autofunction:: cudf_polars.engine.hardware_binding.bind_to_gpu
```

## SPMD helpers
Comment thread
madsbk marked this conversation as resolved.

```{eval-rst}
.. autofunction:: cudf_polars.engine.spmd.allgather_polars_dataframe

.. autofunction:: cudf_polars.streaming.collectives.common.reserve_op_id
```

## Internal configuration objects

These dataclasses back the `engine_options` surfaced by `pl.GPUEngine` and `StreamingOptions`.
Most users interact with them through `StreamingOptions` fields rather than directly.

```{eval-rst}
.. automodule:: cudf_polars.utils.config
:members:
Cluster,
ConfigOptions,
CUDAStreamPoolConfig,
DynamicPlanningOptions,
ExecutorType,
InMemoryExecutor,
MemoryResourceConfig,
ParquetOptions,
StreamingExecutor,
StreamingFallbackMode,
Expand Down
198 changes: 198 additions & 0 deletions docs/cudf/source/cudf_polars/dask_engine.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
(cudf-polars-dask-engine)=
# Dask

{class}`~cudf_polars.engine.dask.DaskEngine` runs the streaming executor
on a [Dask distributed][dask-distributed] cluster: one Dask worker per GPU, coordinated by a
single client process. Partitions are streamed through the query plan and collective operations
(shuffles, allgathers, joins) run across workers over a shared UCXX communicator. On startup,
each worker is pinned to the CPU cores and NUMA node closest to its GPU (see
[Pre-configured GPU clusters](#pre-configured-gpu-clusters) below).

```python
import polars as pl
from cudf_polars.engine.dask import DaskEngine

with DaskEngine() as engine:
result = (
pl.scan_parquet("/data/dataset/*.parquet")
.filter(pl.col("amount") > 100)
.group_by("customer_id")
.agg(pl.col("amount").sum())
.collect(engine=engine)
)
print(result)
```

With no arguments, {class}`~cudf_polars.engine.dask.DaskEngine` creates a
`distributed.LocalCluster` with one worker per visible GPU, a `distributed.Client`, and
bootstraps a UCXX communicator across all workers. On exit, everything it created is torn down.

```{note}
`.collect()` pulls the full result back to the client process. For large distributed outputs,
prefer `.sink_*()` or aggregate/sample inside the query before `.collect()`. See
[Result collection](engines.md#result-collection).
```

## Configuring `DaskEngine`

For custom configuration, build
{class}`~cudf_polars.engine.options.StreamingOptions` and use
`DaskEngine.from_options()`:
Comment thread
rjzamora marked this conversation as resolved.

```python
import polars as pl
from cudf_polars.engine.options import StreamingOptions
from cudf_polars.engine.dask import DaskEngine

opts = StreamingOptions(num_streaming_threads=8, fallback_mode="silent")

with DaskEngine.from_options(opts) as engine:
result = pl.scan_parquet("/data/dataset/*.parquet").collect(engine=engine)
```

See {doc}`options` for the available fields.

## Bring your own Dask client

Pass an existing `distributed.Client` via `dask_client=` to attach to an already-running
scheduler:

```python
from distributed import Client
import polars as pl
from cudf_polars.engine.dask import DaskEngine

with Client("scheduler-address:8786") as dc:
Comment thread
coderabbitai[bot] marked this conversation as resolved.
with DaskEngine(dask_client=dc) as engine:
result = pl.scan_parquet("/data/*.parquet").collect(engine=engine)
```

When you supply the client, {class}`~cudf_polars.engine.dask.DaskEngine`
leaves it (and the cluster) alone on exit.

(pre-configured-gpu-clusters)=
### Pre-configured GPU clusters

Some Dask launchers, notably `dask_cuda.LocalCUDACluster`, already pin CPU affinity and set
`CUDA_VISIBLE_DEVICES` per worker. Disable the built-in hardware binding via
{class}`~cudf_polars.engine.hardware_binding.HardwareBindingPolicy`
to avoid having both layers fight over each worker's affinity (the second to run wins, which
makes the resulting placement non-deterministic):

```python
from dask_cuda import LocalCUDACluster
from distributed import Client
from cudf_polars.engine.dask import DaskEngine
from cudf_polars.engine.hardware_binding import (
HardwareBindingPolicy,
)

with Client(LocalCUDACluster()) as dc, DaskEngine(
dask_client=dc,
engine_options={
"hardware_binding": HardwareBindingPolicy(enabled=False),
},
) as engine:
...
```
Comment thread
coderabbitai[bot] marked this conversation as resolved.

### Manually launched workers

When launching workers yourself (for example on a multi-node HPC cluster), use the built-in nanny
preload to assign one GPU per worker. The preload sets `CUDA_VISIBLE_DEVICES` on each worker
before the process spawns:

```bash
# On each node, launch one worker per GPU with a single thread each:
dask worker SCHEDULER_ADDRESS:8786 --nworkers N --nthreads 1 \
--preload-nanny cudf_polars.engine.dask
```

Then connect from the client:

```python
import polars as pl
from distributed import Client
from cudf_polars.engine.dask import DaskEngine

with Client("SCHEDULER_ADDRESS:8786") as dc:
with DaskEngine(dask_client=dc) as engine:
result = pl.scan_parquet("/data/*.parquet").collect(engine=engine)
```
Comment thread
coderabbitai[bot] marked this conversation as resolved.

Hardware binding (CPU affinity, NUMA, network) is handled automatically by
{class}`~cudf_polars.engine.dask.DaskEngine`; the nanny preload only
deals with GPU assignment.

See the [Dask CLI deployment guide][dask-cli] for more on `dask worker` options.

#### Using `dask-cuda-worker`

As an alternative to the built-in nanny preload, you can launch workers with
[`dask-cuda-worker`][dask-cuda-worker] from the [dask-cuda][dask-cuda] project. It launches one
worker per visible GPU and installs a set of plugins on every worker: a `CPUAffinity` plugin
that pins the worker to the NUMA node of its GPU, an `RMMSetup` plugin, and a nanny preload that
configures UCX.

`DaskEngine` sets up the same things for its own streaming runtime, so the two need to be
coordinated or they will fight:

* **CPU affinity is unconditional in `dask-cuda-worker`**, the `CPUAffinity` plugin is always
installed and there is no CLI flag to turn it off. Pass `hardware_binding=HardwareBindingPolicy(enabled=False)`
to `DaskEngine` so it does not try to re-pin affinity on top of dask-cuda's binding.
* **Do not pass `--rmm-pool-size`, `--rmm-managed-memory`, or similar RMM flags** to
`dask-cuda-worker`. Let `DaskEngine` own the memory resource via its `memory_resource_config`
(see {doc}`options`); otherwise two different memory resources will be installed on the same
worker.
* **Do not pass `--enable-tcp-over-ucx`, `--enable-infiniband`, `--enable-nvlink`, or
`--enable-rdmacm`** to `dask-cuda-worker`. `DaskEngine` bootstraps its own UCXX communicator
and will select transports itself; enabling them on both sides can produce inconsistent UCX
configuration across the cluster.

```bash
# On each node, GPU assignment + CPU affinity only (no RMM, no UCX flags):
dask-cuda-worker SCHEDULER_ADDRESS:8786
```

```python
import polars as pl
from distributed import Client
from cudf_polars.engine.dask import DaskEngine
from cudf_polars.engine.hardware_binding import (
HardwareBindingPolicy,
)

with Client("SCHEDULER_ADDRESS:8786") as dc:
with DaskEngine(
dask_client=dc,
engine_options={
# dask-cuda-worker always pins CPU affinity; disable DaskEngine's
# binding so the two don't conflict.
"hardware_binding": HardwareBindingPolicy(enabled=False),
},
) as engine:
result = pl.scan_parquet("/data/*.parquet").collect(engine=engine)
```

## Cluster diagnostics

{meth}`~cudf_polars.engine.dask.DaskEngine.gather_cluster_info` returns
placement information for every worker:

```python
with DaskEngine() as engine:
print(f"cluster has {engine.nranks} workers")
for info in engine.gather_cluster_info():
print(
f"hostname={info['hostname']}, pid={info['pid']}, "
f"CUDA_VISIBLE_DEVICES={info['cuda_visible_devices']}"
)
```

{class}`~cudf_polars.engine.dask.DaskEngine` raises `RuntimeError` if
created inside an `rrun` cluster.

[dask-distributed]: https://distributed.dask.org/
[dask-cli]: https://docs.dask.org/en/latest/deploying-cli.html
[dask-cuda]: https://docs.rapids.ai/api/dask-cuda/nightly/
[dask-cuda-worker]: https://docs.rapids.ai/api/dask-cuda/nightly/quickstart/#dask-cuda-worker
Loading
Loading