Skip to content
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
datapipes implementation (`physicsnemo.datapipes.transforms._sdf_torch` /
`_sdf_triton`, including its bespoke Triton winding kernel) is superseded and
removed; the public datapipes SDF transform delegates here.
- Added an iterable style dataset to physicsnemo datapipes, for on-the-fly gpu simulations.

### Changed

Expand Down Expand Up @@ -154,6 +155,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
use `torch.no_grad()`, not `torch.inference_mode()`). Also expands CI test
coverage and adds an API documentation page for
`physicsnemo.diffusion.multi_diffusion`.
- Performance improvements in IO prefetching and GPU preprocessing in physicsnemo datapipes.

### Deprecated

Expand Down
60 changes: 60 additions & 0 deletions docs/api/datapipes/physicsnemo.datapipes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,29 @@ of ``pin_memory`` from the ``DataLoader`` class to the ``Reader`` classes.
This is because of the much earlier GPU data transfer in the PhysicsNeMo
datapipe compared to PyTorch.

The ``DataLoader`` drives one of two mutually-exclusive paths, selected by
dataset type:

- **Map-style preload path** (:class:`~physicsnemo.datapipes.DatasetBase`,
e.g. ``Dataset``, ``MeshDataset``): a dedicated dispatcher thread keeps a
*bounded* number of samples in flight by pulling the index stream
**lazily** under backpressure and submitting host-only loads to a worker
pool. The main thread consumes the resulting samples in order
(host-to-device transfer plus transforms on a preprocessing stream) and
reassembles batches from boundary markers, so the full epoch is never
materialized up front and irregular batch sizes are supported.
- **Iterable generator path** (:class:`~physicsnemo.datapipes.IterableDatasetBase`):
a generator dataset driven entirely on the main thread (no sampler, no
worker pool); see `Iterable Datasets`_ below.

In both paths, **all device-kernel launches happen on the single main
thread**. This is the real constraint for Warp-based transforms: Warp may
launch on any CUDA stream as long as the launch comes from the main thread
and Warp's current stream is bound to the torch stream in use.
Preprocessing therefore runs on a side (preprocessing) stream and is
ordered against the compute stream with a CUDA event, so it overlaps
training without ever blocking the host.

.. autoclass:: physicsnemo.datapipes.dataloader.DataLoader
:members:
:show-inheritance:
Expand Down Expand Up @@ -179,6 +202,43 @@ consistent keys. Because the exact collation details differ by dataset, the
:show-inheritance:


Iterable Datasets
^^^^^^^^^^^^^^^^^

Map-style datasets (``Dataset``, ``MeshDataset``) assume a fixed length and a
sampler that hands out indices. Some workloads have neither: an online
simulation, a procedural generator, or any source that produces samples on
the fly with no meaningful ``__len__``. For those, subclass
:class:`~physicsnemo.datapipes.IterableDatasetBase` and yield samples from
``__iter__``. The ``DataLoader`` detects an iterable dataset automatically
and switches to the main-thread-only generator path; ``shuffle`` and
``sampler`` are ignored (a warning is issued) and ``len(loader)`` raises,
since the length is unknown.

An iterable dataset chooses one of two emission modes via the
``yields_batches`` attribute:

- ``yields_batches = False`` (default): ``__iter__`` yields individual
samples and the ``DataLoader`` collates them into batches of
``batch_size`` (honoring ``drop_last``).
- ``yields_batches = True``: ``__iter__`` yields fully-formed batches and the
``DataLoader`` passes them through without further collation, which is the
natural fit for a generator that already produces a batch per step.

Reproducibility follows a per-``(epoch, position)`` scheme rather than the
map-style per-``(epoch, index)`` scheme: implement ``set_epoch`` and/or
``set_generator`` to seed deterministically from the iteration position.
Because the generator runs on the main thread, Warp kernels inside it are
safe on any preprocessing stream the ``DataLoader`` binds. See the online
simulation tutorial in the
`examples directory <https://github.com/NVIDIA/physicsnemo/tree/main/examples/minimal/datapipes>`_
for a runnable Warp ``Darcy2D`` generator wired through this path.

.. autoclass:: physicsnemo.datapipes.IterableDatasetBase
:members:
:show-inheritance:


Readers
^^^^^^^

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,16 @@ pipeline:
pattern: "run_*/domain_*.pdmsh"
subsample_n_points: ${sampling_resolution}
subsample_n_cells: ${sampling_resolution}
# The volume model consumes the interior as a point cloud (interior.points +
# interior.point_data); dropping interior tet topology makes the point
# subsample a cheap contiguous block read instead of a full slice_points
# remap (the dominant per-sample IO cost otherwise).
drop_interior_cells: true
# The in-file `vehicle` surface boundary is unused by the volume pipeline
# (SDF comes from the injected stl_geometry below; model/collate use only
# the interior). Drop it so we don't subsample (expensive, GIL-held) or pin
# it every sample.
drop_in_file_boundaries: true
extra_boundaries:
stl_geometry:
pattern: "*_single_solid.stl.pmsh"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ pipeline:
pattern: "geo_LHC*/*.pdmsh"
subsample_n_points: ${sampling_resolution}
subsample_n_cells: ${sampling_resolution}
# Interior consumed as a point cloud; drop tet topology so the point
# subsample is a cheap contiguous block read (see drivaer_ml_volume.yaml).
drop_interior_cells: true
# In-file boundaries unused by the volume pipeline (SDF uses stl_geometry);
# drop them to skip per-sample subsample + pin.
drop_in_file_boundaries: true
extra_boundaries:
stl_geometry:
pattern: "*.stl.pmsh"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def _vector_loss(
target_sq = torch.mean(target**2, dim=tuple(range(pred.ndim - 1)))
return torch.sum(diff_sq / (target_sq + eps))

total = torch.tensor(0.0, device=pred.device, dtype=pred.dtype)
total = torch.zeros((), device=pred.device, dtype=pred.dtype)
for i in range(n_components):
p, t = pred[..., i], target[..., i]
if loss_type == "huber":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,19 +238,16 @@ def _transform_mesh(
T_inf=T_inf,
)

### `Mesh.copy` is a tensorclass-provided shallow copy: `points`,
### `cells`, the untouched associations, and the geometric `_cache`
### are all shared with `mesh`; only the cloned association is swapped.
# Shallow copy shares everything except the swapped association.
new_mesh = mesh.copy() # ty: ignore[unresolved-attribute]
setattr(new_mesh, self._association, new_td)

### Scale geometry into nondim space (`x* = x / L_ref`) on the
### forward pass, and back to physical units (`x = x* * L_ref`)
### on the inverse. `Mesh.scale` propagates `_cache` through the
### linear transform.
# Scale geometry to/from nondim space (x* = x / L_ref).
# assume_invertible=True avoids a per-mesh sync from the det check.
if L_ref is not None:
torch._assert_async(L_ref != 0)
factor = L_ref if inverse else 1.0 / L_ref
new_mesh = new_mesh.scale(factor)
new_mesh = new_mesh.scale(factor, assume_invertible=True)

return new_mesh

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,15 @@ def _run_epoch(
n_local = 0
num_steps = len(dataloader)
epoch_t0 = time.perf_counter()
### Single pinned scalar buffer reused every step so the loss D2H
### transfer is async (non_blocking=True from device to pinned host
### memory). The copy is issued right after forward_pass and read
### just before the logger line; by then backward + optimizer.step
### have run, giving the GPU time to complete the copy without
### blocking the host.
_loss_pinned = (
torch.zeros(1, pin_memory=True) if torch.cuda.is_available() else None
)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is very funky, but makes sense! Cool implementation


with grad_ctx:
step_t0 = time.perf_counter()
Expand All @@ -381,6 +390,13 @@ def _run_epoch(
target_config=target_config,
)

### Kick off the async D2H copy of the scalar loss value into the
### pinned buffer. Backward + optimizer.step run while the copy is
### in flight, so by the time we call .item() below the transfer
### is already done and there is no host stall.
if _loss_pinned is not None:
_loss_pinned.copy_(loss.detach(), non_blocking=True)

if is_train:
optimizer.zero_grad()
if precision == "float16" and scaler is not None:
Expand All @@ -407,9 +423,13 @@ def _run_epoch(
total_metrics_td.add_(metrics)
n_local += 1

### Per-step sync for the print line; lands after backward +
### optimizer.step so it overlaps with queued GPU work.
this_loss = loss.detach().item()
### Read the loss scalar from the pinned buffer; the async copy
### was issued before backward so it has had the full backward +
### optimizer.step to complete without stalling the host.
if _loss_pinned is not None:
this_loss = _loss_pinned.item()
else:
this_loss = loss.detach().item()
Comment on lines 393 to +432

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 No explicit synchronisation between the non-blocking D2H copy and .item()

_loss_pinned.copy_(loss.detach(), non_blocking=True) schedules a DMA transfer on the current CUDA stream and returns immediately. _loss_pinned.item() — called on a CPU tensor — reads pinned host memory directly without any CUDA barrier. The GPU-side ordering guarantee is that the backward + optimizer kernels execute after the copy (they're on the same stream), so by the time the GPU finishes those kernels the DMA is long done. But the host doesn't wait for any of that: it issues all the async CUDA calls and reaches .item() very quickly. On a highly optimised loop with minimal Python overhead, or on systems where the Python thread is preempted between the copy and .item(), the pinned buffer may still hold its previous value (0.0 from initialisation or the previous step's loss).

The fix is to record a CUDA event immediately after the copy and call event.synchronize() just before .item(), or — since this is only for a log line — simply fall back to the original loss.detach().item() which implicitly syncs. The async copy for a single float buys very little here regardless.

total_loss += this_loss

step_dt = time.perf_counter() - step_t0
Expand Down
38 changes: 38 additions & 0 deletions examples/minimal/datapipes/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -215,3 +215,41 @@ python tutorial_04_hydra_config.py --config-name tutorial_04_pointcloud
python tutorial_04_hydra_config.py --config-name tutorial_04_pointcloud \
subsample.n_points=5000
```

### Tutorial 5: Iterable Datasets for Online Simulation

**File:** `tutorial_5_iterable_online_simulation.py`

Not every dataset is map-style. When data is *generated* on the fly --
an online physics simulation, a procedural sampler, an unbounded stream --
there is no fixed length and no index to address. PhysicsNeMo models these
with `IterableDatasetBase`:

- Iterable datasets only support iteration: no `__len__`, no `__getitem__`,
no sampler, and no prefetch worker pool.
- They run entirely on the **main thread**, so they may freely launch Warp
kernels and use CUDA streams. Warp's only requirement is a single
launching thread, which the main thread satisfies -- this is what makes
an online GPU simulation safe on this path.
- The `DataLoader` still drives generation on a preprocessing stream (when
`use_streams=True`) and hands each item to the compute stream via a CUDA
event, so generating the next batch can overlap training on the current
one.

This tutorial wraps the built-in Warp `Darcy2D` flow generator (a
multigrid Jacobi solver) as an iterable dataset and iterates it through the
`DataLoader`. Because `Darcy2D` produces a full batch per step, the wrapper
is *self-batching* (`yields_batches = True`) and the loader passes each
batch through unchanged.

**When to use the iterable path vs the map/descriptor path:** use the
map-style `Dataset` when you have a fixed corpus on disk addressable by
index (storage-backed, benefits from threaded prefetch). Use an
`IterableDatasetBase` when samples are produced by a generator/simulation,
the length is unbounded or unknown, or the producer itself must launch
device kernels.

```bash
# Requires a CUDA device (the Darcy solver runs Warp kernels on the GPU)
python tutorial_5_iterable_online_simulation.py
```
Loading
Loading