Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions docs/api/datapipes/physicsnemo.datapipes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,29 @@ of ``pin_memory`` from the ``DataLoader`` class to the ``Reader`` classes.
This is because of the much earlier GPU data transfer in the PhysicsNeMo
datapipe compared to PyTorch.

The ``DataLoader`` drives one of two mutually-exclusive paths, selected by
dataset type:

- **Map-style preload path** (:class:`~physicsnemo.datapipes.DatasetBase`,
e.g. ``Dataset``, ``MeshDataset``): a dedicated dispatcher thread keeps a
*bounded* number of samples in flight by pulling the index stream
**lazily** under backpressure and submitting host-only loads to a worker
pool. The main thread consumes the resulting samples in order
(host-to-device transfer plus transforms on a preprocessing stream) and
reassembles batches from boundary markers, so the full epoch is never
materialized up front and irregular batch sizes are supported.
- **Iterable generator path** (:class:`~physicsnemo.datapipes.IterableDatasetBase`):
a generator dataset driven entirely on the main thread (no sampler, no
worker pool); see `Iterable Datasets`_ below.

In both paths, **all device-kernel launches happen on the single main
thread**. This is the real constraint for Warp-based transforms: Warp may
launch on any CUDA stream as long as the launch comes from the main thread
and Warp's current stream is bound to the torch stream in use.
Preprocessing therefore runs on a side (preprocessing) stream and is
ordered against the compute stream with a CUDA event, so it overlaps
training without ever blocking the host.

.. autoclass:: physicsnemo.datapipes.dataloader.DataLoader
:members:
:show-inheritance:
Expand Down Expand Up @@ -179,6 +202,43 @@ consistent keys. Because the exact collation details differ by dataset, the
:show-inheritance:


Iterable Datasets
^^^^^^^^^^^^^^^^^

Map-style datasets (``Dataset``, ``MeshDataset``) assume a fixed length and a
sampler that hands out indices. Some workloads have neither: an online
simulation, a procedural generator, or any source that produces samples on
the fly with no meaningful ``__len__``. For those, subclass
:class:`~physicsnemo.datapipes.IterableDatasetBase` and yield samples from
``__iter__``. The ``DataLoader`` detects an iterable dataset automatically
and switches to the main-thread-only generator path; ``shuffle`` and
``sampler`` are ignored (a warning is issued) and ``len(loader)`` raises,
since the length is unknown.

An iterable dataset chooses one of two emission modes via the
``yields_batches`` attribute:

- ``yields_batches = False`` (default): ``__iter__`` yields individual
samples and the ``DataLoader`` collates them into batches of
``batch_size`` (honoring ``drop_last``).
- ``yields_batches = True``: ``__iter__`` yields fully-formed batches and the
``DataLoader`` passes them through without further collation, which is the
natural fit for a generator that already produces a batch per step.

Reproducibility follows a per-``(epoch, position)`` scheme rather than the
map-style per-``(epoch, index)`` scheme: implement ``set_epoch`` and/or
``set_generator`` to seed deterministically from the iteration position.
Because the generator runs on the main thread, Warp kernels inside it are
safe on any preprocessing stream the ``DataLoader`` binds. See the online
simulation tutorial in the
`examples directory <https://github.com/NVIDIA/physicsnemo/tree/main/examples/minimal/datapipes>`_
for a runnable Warp ``Darcy2D`` generator wired through this path.

.. autoclass:: physicsnemo.datapipes.IterableDatasetBase
:members:
:show-inheritance:


Readers
^^^^^^^

Expand Down
38 changes: 38 additions & 0 deletions examples/minimal/datapipes/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -215,3 +215,41 @@ python tutorial_04_hydra_config.py --config-name tutorial_04_pointcloud
python tutorial_04_hydra_config.py --config-name tutorial_04_pointcloud \
subsample.n_points=5000
```

### Tutorial 5: Iterable Datasets for Online Simulation

**File:** `tutorial_5_iterable_online_simulation.py`

Not every dataset is map-style. When data is *generated* on the fly --
an online physics simulation, a procedural sampler, an unbounded stream --
there is no fixed length and no index to address. PhysicsNeMo models these
with `IterableDatasetBase`:

- Iterable datasets only support iteration: no `__len__`, no `__getitem__`,
no sampler, and no prefetch worker pool.
- They run entirely on the **main thread**, so they may freely launch Warp
kernels and use CUDA streams. Warp's only requirement is a single
launching thread, which the main thread satisfies -- this is what makes
an online GPU simulation safe on this path.
- The `DataLoader` still drives generation on a preprocessing stream (when
`use_streams=True`) and hands each item to the compute stream via a CUDA
event, so generating the next batch can overlap training on the current
one.

This tutorial wraps the built-in Warp `Darcy2D` flow generator (a
multigrid Jacobi solver) as an iterable dataset and iterates it through the
`DataLoader`. Because `Darcy2D` produces a full batch per step, the wrapper
is *self-batching* (`yields_batches = True`) and the loader passes each
batch through unchanged.

**When to use the iterable path vs the map/descriptor path:** use the
map-style `Dataset` when you have a fixed corpus on disk addressable by
index (storage-backed, benefits from threaded prefetch). Use an
`IterableDatasetBase` when samples are produced by a generator/simulation,
the length is unbounded or unknown, or the producer itself must launch
device kernels.

```bash
# Requires a CUDA device (the Darcy solver runs Warp kernels on the GPU)
python tutorial_5_iterable_online_simulation.py
```
187 changes: 187 additions & 0 deletions examples/minimal/datapipes/tutorial_5_iterable_online_simulation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
# SPDX-FileCopyrightText: Copyright (c) 2023 - 2026 NVIDIA CORPORATION & AFFILIATES.
# SPDX-FileCopyrightText: All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Tutorial 5: Iterable datasets for online simulation.

Most datasets are *map-style*: a fixed number of samples addressed by
index, read from storage. Some workloads instead *generate* data on the
fly -- an online physics simulation, a procedural sampler, a streaming
source with no fixed length. These are *iterable* datasets.

PhysicsNeMo models this with :class:`IterableDatasetBase`. Unlike a
map-style dataset, an iterable dataset:

- has no length and no indexing -- it only supports iteration;
- is driven entirely on the **main thread** (no worker pool), so it may
freely launch Warp kernels and use CUDA streams. This is exactly the
property that makes an online GPU simulation safe here: Warp's
constraint is a single launching thread, which the main thread
satisfies.

This tutorial wraps the built-in Warp ``Darcy2D`` flow generator -- which
solves the 2D Darcy equation with a multigrid Jacobi solver and yields a
ready-made batch each step -- as an iterable dataset and drives it through
the PhysicsNeMo :class:`DataLoader`.

Run with::

python tutorial_5_iterable_online_simulation.py

Requires a CUDA device (the Darcy solver runs Warp kernels on the GPU).
"""

from __future__ import annotations

import time

import numpy as np
import torch

from physicsnemo.datapipes import DataLoader, IterableDatasetBase
from physicsnemo.datapipes.benchmarks.darcy import Darcy2D


class DarcyOnlineDataset(IterableDatasetBase):
"""Online 2D Darcy-flow simulation as an iterable dataset.

Wraps :class:`~physicsnemo.datapipes.benchmarks.darcy.Darcy2D`, whose
iterator runs the solver and yields a full ``{"permeability", "darcy"}``
batch per step. The underlying generator is infinite, so this wrapper
caps it at ``num_batches`` per epoch to give the loader a finite stream.

Because ``Darcy2D`` already produces a complete batch, this is a
*self-batching* dataset: we set :attr:`yields_batches` so the loader
passes each batch through unchanged instead of re-collating.

Parameters
----------
num_batches : int
Number of batches to emit per epoch.
resolution : int, default=64
Simulation grid resolution.
batch_size : int, default=8
Number of simulations per batch.
device : str, default="cuda"
Device the Warp solver runs on.
base_seed : int, default=0
Base seed for reproducible permeability sampling.
"""

# Darcy2D emits a full batch per step; do not re-collate.
yields_batches = True

def __init__(
self,
num_batches: int,
*,
resolution: int = 64,
batch_size: int = 8,
device: str = "cuda",
base_seed: int = 0,
) -> None:
self._sim = Darcy2D(
resolution=resolution,
batch_size=batch_size,
device=device,
normaliser={"permeability": (1.25, 0.75), "darcy": (4.52e-2, 2.79e-2)},
)
self._num_batches = num_batches
self._base_seed = base_seed
self._epoch = 0

def set_epoch(self, epoch: int) -> None:
"""Select the epoch so each epoch draws a distinct, reproducible stream."""
self._epoch = epoch

def __iter__(self):
# One solver iterator drives the simulation; we pull a bounded
# number of steps from the otherwise-infinite generator.
sim_iter = iter(self._sim)
for position in range(self._num_batches):
# Per-(epoch, position) seeding: the stream is reproducible
# across runs and distinct across epochs and positions. There is
# no stable sample index for a generator, so we key on the
# monotonic emission position instead.
seed = np.random.SeedSequence(
[self._base_seed, self._epoch, position]
).generate_state(1)[0]
np.random.seed(int(seed))
yield next(sim_iter)


def main() -> None:
if not torch.cuda.is_available():
print("This tutorial requires a CUDA device (Warp Darcy solver). Skipping.")
return

num_epochs = 5
num_batches = 16
dataset = DarcyOnlineDataset(num_batches=num_batches, resolution=64, batch_size=8)

# use_streams=True runs each simulation step on a preprocessing stream
# and hands the result to the compute stream via a CUDA event, so
# generation of the next batch can overlap training on the current one.
loader = DataLoader(dataset, use_streams=True, seed=0)

# Iterable datasets have no length: this will take the exception path.oOOh,
try:
len(loader)
except TypeError as exc:
print(f"len(loader) is undefined for iterable datasets: {exc}")

for epoch in range(num_epochs):
loader.set_epoch(epoch)
print(f"\nEpoch {epoch}")
host_times = []
cuda_events = []
epoch_start = time.perf_counter()
prev_host = epoch_start
cuda_start = torch.cuda.Event(enable_timing=True)
cuda_start.record(torch.cuda.current_stream())
for i, batch in enumerate(loader):
host_now = time.perf_counter()
permeability = batch["permeability"]
darcy = batch["darcy"]
cuda_end = torch.cuda.Event(enable_timing=True)
cuda_end.record(torch.cuda.current_stream())
cuda_events.append((cuda_start, cuda_end))
cuda_start = cuda_end

host_times.append(host_now - prev_host)
prev_host = host_now
print(
f" batch {i}: permeability {tuple(permeability.shape)} "
f"on {permeability.device}, darcy {tuple(darcy.shape)}, "
f"host_dt={host_times[-1]:.4f}s"
)

torch.cuda.synchronize()
cuda_times_ms = [start.elapsed_time(end) for start, end in cuda_events]
epoch_wall = time.perf_counter() - epoch_start
mean_host = sum(host_times) / len(host_times)
mean_cuda = sum(cuda_times_ms) / len(cuda_times_ms)
print(
f" epoch summary: batches={len(host_times)}, wall={epoch_wall:.3f}s, "
f"host_mean={mean_host:.4f}s, cuda_mean={mean_cuda:.2f}ms, "
f"cuda_min={min(cuda_times_ms):.2f}ms, cuda_max={max(cuda_times_ms):.2f}ms"
)

# Train as usual; the batches are ordinary device tensors.


if __name__ == "__main__":
main()
44 changes: 42 additions & 2 deletions physicsnemo/core/function_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,44 @@

from physicsnemo.core.version_check import check_version_spec

# Cache of Warp stream wrappers keyed by the underlying CUDA stream handle.
#
# ``warp.stream_from_torch`` wraps a torch-owned (external) CUDA stream, and the
# resulting ``warp.Stream`` unregisters that handle from Warp on ``__del__``.
# Creating a fresh wrapper on every launch therefore churns register/unregister
# on a shared stream; unregistering while another wrapper -- or an in-flight
# kernel -- still uses the stream corrupts it (illegal memory access). Keeping
# one long-lived wrapper per handle registers each stream exactly once.
_WARP_STREAM_CACHE: Dict[int, Any] = {}


def warp_stream_from_torch(torch_stream: "torch.cuda.Stream") -> Any:
"""Return a cached Warp stream wrapping *torch_stream*.

Wrapping a torch stream registers it with Warp; the wrapper unregisters it
on garbage collection. Caching one wrapper per CUDA stream handle keeps the
registration stable for the process lifetime, which is required when the
same torch stream is bound by nested Warp scopes (e.g. an outer
preprocessing scope and an inner functional launch).

Parameters
----------
torch_stream : torch.cuda.Stream
Torch CUDA stream to wrap.

Returns
-------
warp.Stream
Cached Warp stream sharing ``torch_stream``'s underlying CUDA handle.
"""
wp = importlib.import_module("warp")
handle = torch_stream.cuda_stream
cached = _WARP_STREAM_CACHE.get(handle)
if cached is None:
cached = wp.stream_from_torch(torch_stream)
_WARP_STREAM_CACHE[handle] = cached
return cached


@dataclass(frozen=True)
class Implementation:
Expand Down Expand Up @@ -687,11 +725,13 @@ def warp_launch_context(tensor: torch.Tensor):
Warp device and stream.
"""
try:
wp = importlib.import_module("warp")
importlib.import_module("warp")
except ImportError as exc:
raise ImportError("warp is not available") from exc
if tensor.device.type == "cuda":
stream = wp.stream_from_torch(torch.cuda.current_stream(tensor.device))
# Reuse a cached wrapper so binding the current torch stream does
# not churn Warp's stream registration (see warp_stream_from_torch).
stream = warp_stream_from_torch(torch.cuda.current_stream(tensor.device))
device = None
else:
stream = None
Expand Down
Loading
Loading