From 4f5a70718704351a7c40dcef200737f567cc53e6 Mon Sep 17 00:00:00 2001 From: Peter Sharpe Date: Thu, 25 Jun 2026 10:36:02 -0400 Subject: [PATCH 01/15] deduplication --- .../unified_external_aero_recipe/README.md | 143 ++++++------------ 1 file changed, 45 insertions(+), 98 deletions(-) diff --git a/examples/cfd/external_aerodynamics/unified_external_aero_recipe/README.md b/examples/cfd/external_aerodynamics/unified_external_aero_recipe/README.md index e12b49ddf7..191dc73f37 100644 --- a/examples/cfd/external_aerodynamics/unified_external_aero_recipe/README.md +++ b/examples/cfd/external_aerodynamics/unified_external_aero_recipe/README.md @@ -16,10 +16,10 @@ we have unified the external aerodynamic recipes for our best models, including GLOBE (our newest model, designed for large 3D use cases). Here, you're able to train (and run inference with) the following models: +- [DoMINO](https://arxiv.org/abs/2501.13350) - [Transolver](https://arxiv.org/abs/2402.02366) - [GeoTransolver](https://arxiv.org/abs/2512.20399), optionally using the [FLARE](https://arxiv.org/abs/2508.12594) attention mechanism backend - [GLOBE](https://arxiv.org/abs/2511.15856) -- DoMINO is coming shortly We currently support the following datasets: - DrivaerML @@ -33,21 +33,15 @@ PhysicsNeMo-Curator: ## Dataset Handling -The data processing pipeline in this example explicitly performs non dimensionalization -of input data to unitless fields for model inputs. Check out the yaml configurations -in `datasets/` to see examples; the reference freestream conditions -(`U_inf`, `rho_inf`, `p_inf`, ...) are stored per-sample in each data -file's `global_data` and read directly from there by -`MeshReaderWithGlobalData`. Because datasets are non-dimensionalized, and are loaded -with the physicsnemo datapipes which support a MultiDataset abstraction, it's -possible to merge datasets on-the-fly during training to perform multi-dataset -training. We at PhysicsNeMo haven't extensively explored all of the parameters -of this multi-dataset training yet, but the infrastructure can support it and -we welcome you to try it if you're interested in it. - -Dataset non dimensionalization is handled in the `nondim.py` transformation, which -is part of the data transformation pipeline. See `src/nondim.py` in this example -for the source code. +The pipeline non-dimensionalizes raw fields to unitless model inputs +(see the YAML configs in `datasets/`). Per-sample reference freestream +conditions (`U_inf`, `rho_inf`, `p_inf`, ...) live in each file's +`global_data` and are read by `MeshReaderWithGlobalData`. Because the +datasets are non-dimensionalized and loaded through the PhysicsNeMo +datapipes' `MultiDataset` abstraction, you can merge datasets on the fly +for multi-dataset training; the infrastructure supports it, though we +haven't extensively tuned it. Non-dimensionalization itself is the +`NonDimensionalizeByMetadata` transform in `src/nondim.py`. ## Quick start @@ -78,18 +72,16 @@ GLOBE, multi-dataset Transolver, HiLift, DoMINO), see the ## Pipeline architecture -Each dataset gets its own `MeshDataset` or `DomainMeshDataset` with an -ordered chain of `MeshTransform` steps defined in YAML. Multiple -datasets are then merged via `MultiDataset`. The pipeline is +Each dataset gets its own `MeshDataset` / `DomainMeshDataset` with an +ordered chain of `MeshTransform` steps defined in YAML; multiple +datasets are merged via `MultiDataset`. The pipeline is **DomainMesh-native end-to-end**: every dataset YAML produces a -`DomainMesh` whose `interior` describes "where to predict" (a point -cloud at cell centroids for surface configs; the volume mesh for -volume configs) and whose `boundaries` describe "what the inputs are". -Each model YAML's `forward_kwargs:` block then declaratively maps -DomainMesh paths into the model's `forward()` kwargs, with the recipe -collate either passing those values through directly (for mesh-native -models like GLOBE) or batch-wrapping them into `(B, N, C)` tensors -(for transformer-style models). +`DomainMesh` (see the +[DomainMesh contract](#domainmesh-contract-and-the-data-to-model-mapping)), +and each model YAML's `forward_kwargs:` block declaratively maps +DomainMesh paths into the model's `forward()` kwargs. The recipe collate +either passes those values through (mesh-native models like GLOBE) or +batch-wraps them into `(B, N, C)` tensors (transformer-style models). ```mermaid flowchart LR @@ -115,16 +107,14 @@ flowchart LR - **Freestream conditions on `global_data`** — Each sample's freestream conditions (`U_inf`, `rho_inf`, `p_inf`, `nu`, `L_ref`, and `T_inf` - for compressible datasets) are embedded directly in the data files' - `global_data` at conversion time, at the **domain level** of each - `.pdmsh` / `.pmsh`. Downstream transforms like - `NonDimensionalizeByMetadata` read them straight off the loaded - sample. The surface configs read a boundary `Mesh` - directly out of the parent DomainMesh's on-disk tensordict tree; the - boundary's own `global_data` is typically empty, so those configs use - the recipe-local `MeshReaderWithGlobalData` to merge the domain-level - `global_data` onto each boundary at load time (`merge_global_data_from: - "../../global_data"`). + for compressible datasets) are embedded at the **domain level** of each + `.pdmsh` / `.pmsh` at conversion time, so transforms like + `NonDimensionalizeByMetadata` read them straight off the loaded sample. + Surface configs read a boundary `Mesh` whose own `global_data` is + typically empty, so they use the recipe-local + `MeshReaderWithGlobalData` to merge the domain-level `global_data` onto + each boundary at load time. See [Design decisions](#design-decisions) + for the rationale. - **DropMeshFields** — Removes fields that are not needed for training (e.g. `TimeValue` in DrivaerML) to reduce memory and avoid schema @@ -149,16 +139,12 @@ flowchart LR - Pressure → Cp: `(p - p_inf) / q_inf` where `q_inf = 0.5 * rho_inf * |U_inf|²` - Wall shear stress → Cf: `tau / q_inf` - Velocity → `U / |U_inf|` - - Also supports temperature, density, and identity (pass-through) field - types. Provides an `inverse()` method for re-dimensionalizing - predictions. - Note that for input points, we non-dimensionalize by a reference scalar `L_ref`. - In some recipes, the x/y/z axes are all scaled to unit-scale independently. - Here, we've made a conscious decision to maintain the aspect ratios of the input - positions and vectors deliberately use a scalar parameter for coordinate - non-dimensionalization. + Also supports temperature, density, and identity (pass-through) field + types, and provides an `inverse()` for re-dimensionalizing predictions. + Input points are non-dimensionalized by a single reference scalar + `L_ref` (rather than scaling x/y/z independently) so geometry aspect + ratios are preserved. - **ComputeSDFFromBoundary** — Volume pipelines only. Computes a signed distance field (and surface normals) from an auxiliary STL @@ -193,33 +179,12 @@ flowchart LR - **MeshToDomainMesh** — Terminal transform for surface dataset YAMLs (volume YAMLs already produce a `DomainMesh` natively via - `DomainMeshReader`). Converts the single surface `Mesh` into a - `DomainMesh(interior, boundaries={"vehicle": ...}, global_data)` - per the recipe's prediction-vs-input contract: `interior` is a - `Mesh[0, n_spatial_dims]` point cloud at the cell centroids carrying - the prediction targets in `point_data`; `boundaries["vehicle"]` is - the original triangulated surface with non-target cell features - (e.g. precomputed normals from `ComputeSurfaceNormals`) preserved - in `cell_data`. The dataset builder auto-injects - `cell_data_targets` from the YAML's `targets:` block, so users - don't list field names twice. - -## Non-dimensionalization and normalization - -The pipeline applies two layers of field conditioning: - -1. **Physics-based non-dimensionalization** (`NonDimensionalizeByMetadata`) - converts raw simulation outputs to standard aerodynamic coefficients - (Cp, Cf) or non-dimensional velocity. This is essential when - combining datasets that may use different freestream conditions, fluid - properties, or unit conventions. The freestream conditions (`U_inf`, - `rho_inf`, `p_inf`, optional `T_inf`, `L_ref`) are stored per-sample - in each data file's `global_data` and read directly from there. - -2. **Statistical normalization** (`NormalizeMeshFields`) applies z-score - scaling so that all field values fed to the model have roughly zero - mean and unit variance. Statistics are specified inline in the dataset - YAML config or loaded from a `.pt` file. + `DomainMeshReader`). Converts the surface `Mesh` into a + `DomainMesh(interior, boundaries={"vehicle": ...}, global_data)` per + the prediction-vs-input contract described under + [DomainMesh contract](#domainmesh-contract-and-the-data-to-model-mapping). + The dataset builder auto-injects `cell_data_targets` from the YAML's + `targets:` block, so users don't list field names twice. ## Model and training @@ -338,10 +303,6 @@ Pick one of each on the CLI: python src/train.py model= dataset= ``` -The full list of canonical CLI invocations for the previously-named -recipes (FA variants, GLOBE, multi-dataset Transolver, HiLift, DoMINO) -is in the [Recipe Gallery](#recipe-gallery) section below. - To add a new model, drop a new template under `conf/model/` declaring `input_type`, `output_type`, `forward_kwargs`, and the `model:` block. No registry edits needed. @@ -380,23 +341,11 @@ cd examples/cfd/external_aerodynamics/unified_external_aero_recipe ### Train -```bash -# Single GPU (default: GeoTransolver / DrivAerML volume) -python src/train.py - -# Pick a different model and/or dataset -python src/train.py model=transolver_surface dataset=drivaer_ml_surface - -# Multi-GPU -torchrun --nproc_per_node=N src/train.py - -# Override config values -python src/train.py precision=float32 training.num_epochs=100 -``` - -Supports checkpointing (auto-resume), TensorBoard + JSONL logging, -mixed precision (float16/bfloat16), -`torch.compile`, and NVIDIA profiling. +See [Quick start](#quick-start) for the common invocations and the +[Recipe Gallery](#recipe-gallery) for every named recipe. Training +supports checkpointing (auto-resume), TensorBoard + JSONL logging, +mixed precision (float16/bfloat16), `torch.compile`, and NVIDIA +profiling. ### Infer @@ -459,10 +408,8 @@ Measures per-sample load time and throughput without running the model. A single canonical `conf/train.yaml` drives every training run. It picks one entry from `conf/model/` and one dataset from `datasets/` via Hydra-style `model=...` and `dataset=...` overrides, applies the -centralized training schedule, and runs the loop. Every previously-named -recipe (FA variants, GLOBE, multi-dataset Transolver, HiLift, DoMINO) is -reproducible from CLI overrides — see the -[Recipe Gallery](#recipe-gallery) for the canonical invocations. +centralized training schedule, and runs the loop. Every named recipe is +reproducible from CLI overrides. ```text unified_external_aero_recipe/ From 66b01b18decbca803d6b8388649b4b53d1594550 Mon Sep 17 00:00:00 2001 From: Peter Sharpe Date: Thu, 25 Jun 2026 10:36:20 -0400 Subject: [PATCH 02/15] cuts the warning --- .../unified_external_aero_recipe/README.md | 4 ---- 1 file changed, 4 deletions(-) diff --git a/examples/cfd/external_aerodynamics/unified_external_aero_recipe/README.md b/examples/cfd/external_aerodynamics/unified_external_aero_recipe/README.md index 191dc73f37..39fb846f32 100644 --- a/examples/cfd/external_aerodynamics/unified_external_aero_recipe/README.md +++ b/examples/cfd/external_aerodynamics/unified_external_aero_recipe/README.md @@ -1,10 +1,6 @@ # Unified External Aerodynamics Recipe -> This unified recipe is still under some final polishing but nearly -> completed. Feel free to used it and experiment. In the meantime, -> be wary of sharp edges! - ## Introduction External Aerodynamic recipes in physicsnemo have proliferated: we have From ad3d58b3e5cd9f115dca6842414517a425a845a1 Mon Sep 17 00:00:00 2001 From: Peter Sharpe Date: Thu, 25 Jun 2026 10:44:51 -0400 Subject: [PATCH 03/15] Metrics: removes total_channels and field_dim --- .../unified_external_aero_recipe/src/metrics.py | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/examples/cfd/external_aerodynamics/unified_external_aero_recipe/src/metrics.py b/examples/cfd/external_aerodynamics/unified_external_aero_recipe/src/metrics.py index 00e937fc8b..347913b45b 100644 --- a/examples/cfd/external_aerodynamics/unified_external_aero_recipe/src/metrics.py +++ b/examples/cfd/external_aerodynamics/unified_external_aero_recipe/src/metrics.py @@ -39,7 +39,7 @@ from jaxtyping import Float from omegaconf import DictConfig, OmegaConf from tensordict import TensorDict -from utils import FieldType, align_scalar_shapes, field_dim, validate_field_coverage +from utils import FieldType, align_scalar_shapes, validate_field_coverage ### Recipe-wide alias for the metric-name enum that the dataset YAMLs use. MetricName: TypeAlias = Literal["mae", "l1", "l2"] @@ -143,8 +143,6 @@ class MetricCalculator: Args: target_config: ``{name: scalar|vector}`` mapping. - process_group: Optional distributed process group for all-reduce. - When ``None`` (default), no reduction is performed. n_spatial_dims: Vector field dimensionality (used to label components when ``> len(VECTOR_COMPONENTS)`` falls back to integer indices). @@ -156,7 +154,6 @@ class MetricCalculator: def __init__( self, target_config: dict[str, FieldType], - process_group: dist.ProcessGroup | None = None, n_spatial_dims: int = 3, metrics: Sequence[MetricName] | None = None, prefix: str = "", @@ -165,7 +162,6 @@ def __init__( ### `FieldType` contract; copy verbatim so callers can mutate their ### original without affecting us. self.target_config: dict[str, FieldType] = dict(target_config) - self.process_group = process_group self.n_spatial_dims = n_spatial_dims self.metric_names = ( list(metrics) if metrics is not None else list(DEFAULT_METRICS) @@ -178,11 +174,6 @@ def __init__( f"Unknown metric {m!r}; available {list(METRIC_FUNCTIONS)!r}" ) - ### `field_dim` raises on unknown field types, validating the config. - self.total_channels = sum( - field_dim(t, n_spatial_dims) for t in self.target_config.values() - ) - def _make_key(self, *parts: str) -> str: """Build a flat metric key, ``"/__..."``. From 3291a11224a9e0fa72465de700254a6106f18d3f Mon Sep 17 00:00:00 2001 From: Peter Sharpe Date: Thu, 25 Jun 2026 10:45:32 -0400 Subject: [PATCH 04/15] train.py - omegaconf tidying --- .../unified_external_aero_recipe/src/train.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/examples/cfd/external_aerodynamics/unified_external_aero_recipe/src/train.py b/examples/cfd/external_aerodynamics/unified_external_aero_recipe/src/train.py index a5e755949d..f972319b00 100644 --- a/examples/cfd/external_aerodynamics/unified_external_aero_recipe/src/train.py +++ b/examples/cfd/external_aerodynamics/unified_external_aero_recipe/src/train.py @@ -40,7 +40,6 @@ from typing import Any, Literal, cast import hydra -import omegaconf import torch from datasets import build_dataloaders from loss import LossCalculator @@ -672,7 +671,7 @@ def main(cfg: DictConfig) -> None: ### where `cfg.out_dim` is auto-derived from the chosen dataset's ### `targets:` block; resolving earlier would fail on the model ### template's `out_dim: ${out_dim}` interpolation. - logger.info(f"Config:\n{omegaconf.OmegaConf.to_yaml(cfg, resolve=True)}") + logger.info(f"Config:\n{OmegaConf.to_yaml(cfg, resolve=True)}") logger.info(f"Train samples: {len(train_loader.sampler)}") logger.info(f"Val samples: {len(val_loader.sampler)}") @@ -761,7 +760,7 @@ def main(cfg: DictConfig) -> None: ) # Save the full resolved config - resolved_yaml = omegaconf.OmegaConf.to_yaml(cfg, resolve=True) + resolved_yaml = OmegaConf.to_yaml(cfg, resolve=True) config_artifact_path = os.path.join(run_dir, "resolved_config.yaml") with open(config_artifact_path, "w") as f: f.write(resolved_yaml) From 06cb44c8cea003fbc003a3342ec011ea6f72ad01 Mon Sep 17 00:00:00 2001 From: Peter Sharpe Date: Thu, 25 Jun 2026 11:11:45 -0400 Subject: [PATCH 05/15] In nondim, removes `inverse_tensor`, which is already superseded by either `inverse()` or `inverse_td()`. And never used --- .../src/nondim.py | 68 +------------------ 1 file changed, 2 insertions(+), 66 deletions(-) diff --git a/examples/cfd/external_aerodynamics/unified_external_aero_recipe/src/nondim.py b/examples/cfd/external_aerodynamics/unified_external_aero_recipe/src/nondim.py index 0e08119f68..15624e6a89 100644 --- a/examples/cfd/external_aerodynamics/unified_external_aero_recipe/src/nondim.py +++ b/examples/cfd/external_aerodynamics/unified_external_aero_recipe/src/nondim.py @@ -79,16 +79,6 @@ def freestream_scales( {"pressure", "stress", "velocity", "temperature", "density", "identity"} ) -# Number of tensor channels each field type occupies. -_FIELD_CHANNELS: dict[NondimFieldType, int] = { - "pressure": 1, - "stress": 3, - "velocity": 3, - "temperature": 1, - "density": 1, - "identity": 1, -} - def _nondim_field( val: torch.Tensor, @@ -307,59 +297,6 @@ def inverse(self, mesh: Mesh) -> Mesh: """ return self._transform_mesh(mesh, _redim_field, inverse=True) - def inverse_tensor( - self, - tensor: Float[torch.Tensor, "*batch C"], - field_types: dict[str, NondimFieldType], - q_inf: Float[torch.Tensor, ""], - p_inf: Float[torch.Tensor, ""], - U_inf_mag: Float[torch.Tensor, ""], - *, - rho_inf: Float[torch.Tensor, ""] | None = None, - T_inf: Float[torch.Tensor, ""] | None = None, - ) -> Float[torch.Tensor, "*batch C"]: - """Re-dimensionalize a concatenated output tensor. - - Operates on model output tensors (shape ``(*, C)``) where channels - are ordered according to *field_types*. Useful at inference time - when you have a raw model prediction rather than a Mesh. - - Args: - tensor: Shape ``(*, C)`` with channels ordered by *field_types*. - field_types: Ordered mapping of ``{field_name: nondim_type}`` - where *nondim_type* is one of ``"pressure"``, ``"stress"``, - ``"velocity"``, ``"temperature"``, ``"density"``, or - ``"identity"``. Uses the model's output field names - (e.g. after renaming), not the original mesh field names. - q_inf: Reference dynamic pressure (scalar or broadcastable). - p_inf: Reference static pressure (scalar or broadcastable). - U_inf_mag: Reference freestream-velocity magnitude - (scalar or broadcastable). - rho_inf: Freestream density. Required when *field_types* - contains ``"density"``. - T_inf: Freestream temperature. Required when *field_types* - contains ``"temperature"``. - - Returns: - Same shape as *tensor*, with each field's channels - re-dimensionalized. - """ - out = tensor.clone() - idx = 0 - for ftype in field_types.values(): - n = _FIELD_CHANNELS[ftype] - out[..., idx : idx + n] = _redim_field( - out[..., idx : idx + n], - ftype, - q_inf, - p_inf, - U_inf_mag, - rho_inf=rho_inf, - T_inf=T_inf, - ) - idx += n - return out - def inverse_td( self, td: TensorDict, @@ -373,9 +310,8 @@ def inverse_td( ) -> TensorDict: """Re-dimensionalize a per-field :class:`~tensordict.TensorDict`. - Companion to :meth:`inverse_tensor` for the per-field - TensorDict-keyed I/O flow used by recipes that consume named - prediction fields directly. Each leaf is independently + Used by recipes that consume named prediction fields directly as a + per-field TensorDict. Each leaf is independently re-dimensionalized using the formula matching its ``field_types`` entry; leaves whose names are absent from ``field_types`` are passed through unchanged. From 193a48556854667270be3e1adf4fa04f449b969c Mon Sep 17 00:00:00 2001 From: Peter Sharpe Date: Thu, 25 Jun 2026 11:13:32 -0400 Subject: [PATCH 06/15] grammar --- .../unified_external_aero_recipe/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/cfd/external_aerodynamics/unified_external_aero_recipe/README.md b/examples/cfd/external_aerodynamics/unified_external_aero_recipe/README.md index 39fb846f32..fe179bcd91 100644 --- a/examples/cfd/external_aerodynamics/unified_external_aero_recipe/README.md +++ b/examples/cfd/external_aerodynamics/unified_external_aero_recipe/README.md @@ -3,7 +3,7 @@ ## Introduction -External Aerodynamic recipes in physicsnemo have proliferated: we have +External aerodynamics recipes in PhysicsNeMo have proliferated: we have a number of recipes, across a range of models, all working on different models with unique data handling, pipelines, model architectures, metrics, training paradigms, etc. While there is nothing wrong with that, it does make comparison From c9a1942dcf6ed642e9d6dffce67caacfa04a5b4d Mon Sep 17 00:00:00 2001 From: Peter Sharpe Date: Thu, 25 Jun 2026 11:23:35 -0400 Subject: [PATCH 07/15] Adds caveat on DoMINO --- .../unified_external_aero_recipe/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/cfd/external_aerodynamics/unified_external_aero_recipe/README.md b/examples/cfd/external_aerodynamics/unified_external_aero_recipe/README.md index fe179bcd91..ebdb26f06d 100644 --- a/examples/cfd/external_aerodynamics/unified_external_aero_recipe/README.md +++ b/examples/cfd/external_aerodynamics/unified_external_aero_recipe/README.md @@ -12,7 +12,7 @@ we have unified the external aerodynamic recipes for our best models, including GLOBE (our newest model, designed for large 3D use cases). Here, you're able to train (and run inference with) the following models: -- [DoMINO](https://arxiv.org/abs/2501.13350) +- [DoMINO](https://arxiv.org/abs/2501.13350) coming soon - [Transolver](https://arxiv.org/abs/2402.02366) - [GeoTransolver](https://arxiv.org/abs/2512.20399), optionally using the [FLARE](https://arxiv.org/abs/2508.12594) attention mechanism backend - [GLOBE](https://arxiv.org/abs/2511.15856) From ee2413885e4729ac87a73d03f77ec936d084cd84 Mon Sep 17 00:00:00 2001 From: Peter Sharpe Date: Thu, 25 Jun 2026 12:00:55 -0400 Subject: [PATCH 08/15] Refactor metrics handling in training process - Removed the `_all_reduce` method from `MetricCalculator` as it was redundant. - Updated the `__call__` method to directly return the metrics without reduction. - Introduced `_reduce_and_average_epoch` function in `train.py` to handle averaging of epoch loss and metrics across distributed processes. - Added unit tests for the new `_reduce_and_average_epoch` function to ensure correctness in both single-process and distributed scenarios. --- .../src/metrics.py | 19 +--- .../unified_external_aero_recipe/src/train.py | 60 ++++++++++++- .../tests/test_train_helpers.py | 86 ++++++++++++++++++- 3 files changed, 143 insertions(+), 22 deletions(-) diff --git a/examples/cfd/external_aerodynamics/unified_external_aero_recipe/src/metrics.py b/examples/cfd/external_aerodynamics/unified_external_aero_recipe/src/metrics.py index 347913b45b..69dc1e547a 100644 --- a/examples/cfd/external_aerodynamics/unified_external_aero_recipe/src/metrics.py +++ b/examples/cfd/external_aerodynamics/unified_external_aero_recipe/src/metrics.py @@ -35,7 +35,6 @@ from typing import Literal, TypeAlias, cast import torch -import torch.distributed as dist from jaxtyping import Float from omegaconf import DictConfig, OmegaConf from tensordict import TensorDict @@ -218,22 +217,6 @@ def _metrics_for_tensor( for m in self.metric_names } - def _all_reduce(self, metrics: TensorDict) -> TensorDict: - if self.process_group is None: - return metrics - world_size = dist.get_world_size(self.process_group) - if world_size == 1: - return metrics - ### Single all_reduce over a stacked 1-D tensor (vs. one comm - ### per leaf) -- one collective beats N regardless of the - ### container type. Rebuild a TensorDict from the reduced - ### stack so callers see the same per-key access pattern. - keys = list(metrics.keys()) - stacked = torch.stack([metrics[k] for k in keys]) - dist.all_reduce(stacked, group=self.process_group) - stacked = stacked / world_size - return TensorDict({k: stacked[i] for i, k in enumerate(keys)}, batch_size=[]) - def __call__( self, pred: TensorDict, @@ -284,7 +267,7 @@ def __call__( t_mag = torch.linalg.vector_norm(t, dim=-1) out.update(self._metrics_for_tensor(p_mag, t_mag, (name,))) - return self._all_reduce(TensorDict(out, batch_size=[])) + return TensorDict(out, batch_size=[]) def __repr__(self) -> str: fields_str = ", ".join(f"{n}:{t}" for n, t in self.target_config.items()) diff --git a/examples/cfd/external_aerodynamics/unified_external_aero_recipe/src/train.py b/examples/cfd/external_aerodynamics/unified_external_aero_recipe/src/train.py index f972319b00..784dae10f1 100644 --- a/examples/cfd/external_aerodynamics/unified_external_aero_recipe/src/train.py +++ b/examples/cfd/external_aerodynamics/unified_external_aero_recipe/src/train.py @@ -41,6 +41,7 @@ import hydra import torch +import torch.distributed as dist from datasets import build_dataloaders from loss import LossCalculator from metrics import MetricCalculator, resolve_metrics @@ -120,6 +121,53 @@ def _to_float_dicts( ) +def _reduce_and_average_epoch( + total_loss: float, + losses_td: TensorDict | None, + metrics_td: TensorDict | None, + n_local: int, + *, + device: torch.device | str, +) -> tuple[float, dict[str, float], dict[str, float]]: + """Average epoch loss/metric *sums* over the GLOBAL sample count. + + The per-rank loop accumulates *sums* of per-step (per-sample, since + ``batch_size == 1``) losses and metrics. This packs ``total_loss``, + the local sample count, and every loss/metric leaf into one float32 + tensor, all-reduces it once (SUM) when running distributed, then + divides by the reduced count. One collective + one D2H. Correct for + uneven per-rank shards (``global_sum / global_count``) and + deadlock-free (invoked once after the per-rank loops finish, not per + step). The single-process path is identical to the previous + ``sum / n_local`` averaging. + """ + if losses_td is None or metrics_td is None: + return total_loss / max(n_local, 1), {}, {} + loss_keys = cast(list[str], list(losses_td.keys())) + metric_keys = cast(list[str], list(metrics_td.keys())) + leaves = cast( + list[torch.Tensor], list(losses_td.values()) + list(metrics_td.values()) + ) + ### [total_loss, n_local, *loss_sums, *metric_sums] -> one collective. + packed = torch.cat( + [ + torch.tensor([total_loss, float(n_local)], device=device), + torch.stack(leaves).float().to(device), + ] + ) + if dist.is_available() and dist.is_initialized() and dist.get_world_size() > 1: + dist.all_reduce(packed) + reduced_loss, reduced_n, *leaf_sums = packed.tolist() + n = max(reduced_n, 1.0) + n_loss = len(loss_keys) + averaged = [v / n for v in leaf_sums] + return ( + reduced_loss / n, + dict(zip(loss_keys, averaged[:n_loss])), + dict(zip(metric_keys, averaged[n_loss:])), + ) + + def _log_to_tensorboard( writer: SummaryWriter | None, values: Mapping[str, float | torch.Tensor], @@ -401,8 +449,16 @@ def _run_epoch( epoch_dt = time.perf_counter() - epoch_t0 n = max(n_batches, 1) - avg_loss = total_loss / n - avg_losses, avg_metrics = _to_float_dicts(total_losses_td, total_metrics_td, n=n) + ### Reduce the epoch sums + sample count across ranks once, so logged + ### loss/metrics are the GLOBAL averages (not rank-0's shard) under + ### DDP. `n` above is kept local for the per-rank step-rate line below. + avg_loss, avg_losses, avg_metrics = _reduce_and_average_epoch( + total_loss, + total_losses_td, + total_metrics_td, + n_batches, + device=dist_manager.device, + ) logger.info( f"Epoch {epoch} {mode} done in {epoch_dt:.1f}s " diff --git a/examples/cfd/external_aerodynamics/unified_external_aero_recipe/tests/test_train_helpers.py b/examples/cfd/external_aerodynamics/unified_external_aero_recipe/tests/test_train_helpers.py index c35024553c..46741efb4f 100644 --- a/examples/cfd/external_aerodynamics/unified_external_aero_recipe/tests/test_train_helpers.py +++ b/examples/cfd/external_aerodynamics/unified_external_aero_recipe/tests/test_train_helpers.py @@ -14,7 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Unit tests for `src/train.py`'s private TensorDict-aware walker and for `src/output_normalize.py`. +"""Unit tests for `src/train.py`'s private TensorDict-aware helpers and for `src/output_normalize.py`. ``TensorDict`` is not a ``dict`` subclass, so the bare ``isinstance(obj, dict)`` branches in the recipe's recursive helpers @@ -29,6 +29,9 @@ model output (``Mesh`` or ``(B, N, C)`` tensor) to a per-target TensorDict, with clear error messages on shape / channel-count mismatches. +- :func:`train._reduce_and_average_epoch`: averages epoch loss / metric + sums over the global sample count; its single-process path must equal + the previous ``total_loss / n`` + per-leaf ``sum / n`` averaging. (The analogous tests for the shared, tensorboard-free :func:`utils.recursive_to_device` live in ``test_utils.py``, outside @@ -50,7 +53,11 @@ pytest.importorskip("tensorboard") from output_normalize import normalize_output_to_tensordict # noqa: E402 -from train import _walk_batch_for_logging # noqa: E402 -- after the skip guard +from train import ( # noqa: E402 -- after the skip guard + _reduce_and_average_epoch, + _to_float_dicts, + _walk_batch_for_logging, +) from physicsnemo.mesh import Mesh # noqa: E402 -- after the importorskip guard @@ -172,3 +179,78 @@ def test_mesh_output_missing_target_raises(self): mesh = Mesh(points=torch.randn(7, 3), point_data={"other": torch.randn(7)}) with pytest.raises(KeyError, match="missing target fields"): normalize_output_to_tensordict(mesh, target_config, "mesh") + + +### --------------------------------------------------------------------------- +### _reduce_and_average_epoch +### --------------------------------------------------------------------------- + + +class TestReduceAndAverageEpoch: + """Tests for `_reduce_and_average_epoch` (single-process path). + + The distributed branch is gated on an initialized process group with + ``world_size > 1``; with no group initialized these tests exercise the + pure-local path, which must stay equivalent to the previous + ``total_loss / n`` + per-leaf ``sum / n`` averaging it replaced. The + collective branch mirrors the already-shipped ``infer._allreduce_sums`` + and is validated by inspection. + """ + + @staticmethod + def _epoch_sums() -> tuple[TensorDict, TensorDict]: + """A representative pair of 0-D (epoch-accumulated) sum TensorDicts.""" + losses_td = TensorDict( + {"pressure": torch.tensor(6.0), "wss": torch.tensor(9.0)}, + batch_size=[], + ) + metrics_td = TensorDict( + {"pressure_l2": torch.tensor(3.0), "wss_mae": torch.tensor(12.0)}, + batch_size=[], + ) + return losses_td, metrics_td + + def test_single_process_divides_sums_by_local_count(self): + """No process group: global average == local sum / n_local.""" + losses_td, metrics_td = self._epoch_sums() + avg_loss, avg_losses, avg_metrics = _reduce_and_average_epoch( + 15.0, losses_td, metrics_td, 3, device="cpu" + ) + assert avg_loss == pytest.approx(5.0) + assert avg_losses == pytest.approx({"pressure": 2.0, "wss": 3.0}) + assert avg_metrics == pytest.approx({"pressure_l2": 1.0, "wss_mae": 4.0}) + + def test_single_process_matches_to_float_dicts(self): + """Equivalent to the `total_loss / n` + `_to_float_dicts(n=...)` it replaced.""" + losses_td, metrics_td = self._epoch_sums() + n_local, total_loss = 4, 10.0 + old_losses, old_metrics = _to_float_dicts(losses_td, metrics_td, n=n_local) + new_loss, new_losses, new_metrics = _reduce_and_average_epoch( + total_loss, losses_td, metrics_td, n_local, device="cpu" + ) + assert new_loss == pytest.approx(total_loss / n_local) + assert new_losses == pytest.approx(old_losses) + assert new_metrics == pytest.approx(old_metrics) + + def test_none_sentinel_returns_loss_only(self): + """The "no steps seeded" sentinel (either TD ``None``) yields (loss / n, {}, {}).""" + assert _reduce_and_average_epoch(8.0, None, None, 2, device="cpu") == ( + 4.0, + {}, + {}, + ) + ### A single ``None`` is enough to trip the sentinel. + losses_td, _ = self._epoch_sums() + assert _reduce_and_average_epoch(8.0, losses_td, None, 2, device="cpu") == ( + 4.0, + {}, + {}, + ) + + def test_zero_local_count_avoids_zero_division(self): + """``n_local == 0`` (a step-less epoch) divides by 1, not 0.""" + assert _reduce_and_average_epoch(7.0, None, None, 0, device="cpu") == ( + 7.0, + {}, + {}, + ) From 6bb1c494e76c7fd27a714e0878cf038207175ccf Mon Sep 17 00:00:00 2001 From: Peter Sharpe Date: Thu, 25 Jun 2026 12:02:57 -0400 Subject: [PATCH 09/15] Implement dedicated un-augmented validation dataset for manifest mode - Added `_build_manifest_val_dataset` function to create a separate validation dataset when augmentations are enabled, ensuring validation does not inherit training augmentations. - Updated `build_dataloaders` to utilize the new validation dataset logic, allowing for consistent evaluation behavior across manifest and directory modes. - Introduced unit tests for `_build_manifest_val_dataset` to verify correct behavior for both augmented and non-augmented scenarios. --- .../src/datasets.py | 70 ++++++++++++-- .../tests/test_manifest.py | 95 ++++++++++++++++++- 2 files changed, 158 insertions(+), 7 deletions(-) diff --git a/examples/cfd/external_aerodynamics/unified_external_aero_recipe/src/datasets.py b/examples/cfd/external_aerodynamics/unified_external_aero_recipe/src/datasets.py index 4530167e50..26be83a589 100644 --- a/examples/cfd/external_aerodynamics/unified_external_aero_recipe/src/datasets.py +++ b/examples/cfd/external_aerodynamics/unified_external_aero_recipe/src/datasets.py @@ -581,6 +581,41 @@ def _resolve_manifest_indices_from_spec( return train_indices, val_indices +def _build_manifest_val_dataset( + ds_yaml: DictConfig, + *, + augment: bool, + device: str | torch.device | None, + num_workers: int, + pin_memory: bool, +) -> MeshDataset | None: + """Build a dedicated un-augmented validation dataset for manifest mode. + + Manifest mode shares a single reader across the train / val splits + (the :class:`ManifestSampler` pair carves out per-split indices). That + means validation would otherwise run through the *augmented* transform + chain whenever ``augment`` is enabled -- unlike directory mode, which + always builds its val dataset with ``augment=False``. + + To restore parity, when *augment* is ``True`` this returns a separate + dataset built with ``augment=False`` over the same ``train_datadir``. + Its reader globs the same sorted paths, so manifest indices resolved + against the train reader address the same samples here. When *augment* + is ``False`` the train and val transform chains are identical, so this + returns ``None`` and the caller lets validation share the train dataset + (avoiding a redundant second reader). + """ + if not augment: + return None + return build_dataset( + ds_yaml, + augment=False, + device=device, + num_workers=num_workers, + pin_memory=pin_memory, + ) + + def _build_collate( cfg: DictConfig, target_config: dict[str, FieldType] ) -> Callable[[list[tuple[Any, Any]]], dict[str, Any]]: @@ -710,7 +745,10 @@ def build_dataloaders( ``cfg.train_split`` / ``cfg.val_split`` keys select which subsets to use; one reader covers the full directory and :class:`ManifestSampler` restricts each loader to the matching - indices. + indices. Augmentations are training-only: when ``cfg.augment`` is set, + validation uses a separate un-augmented dataset over the same + directory (mirroring directory mode); otherwise it shares the train + dataset. NOTE (limitation): only ONE chosen dataset may carry a manifest today. If both ``cfg.dataset`` and an entry in ``cfg.extra_datasets`` @@ -764,6 +802,7 @@ def build_dataloaders( val_datasets: list = [] manifest_train_indices: list[int] | None = None manifest_val_indices: list[int] | None = None + manifest_val_dataset: MeshDataset | None = None using_manifests = False first_targets: dict[str, str] | None = None first_metrics: list[str] | None = None @@ -848,11 +887,24 @@ def build_dataloaders( pin_memory=pin_memory, ) train_datasets.append(dataset) - ### NOTE: this overwrites any prior dataset's indices; see the - ### docstring's multi-dataset limitation note. + ### NOTE: this overwrites any prior manifest dataset's indices + ### (and the val dataset below); see the docstring's + ### multi-dataset limitation note. manifest_train_indices, manifest_val_indices = ( _resolve_manifest_indices_from_spec(dataset.reader, manifest_spec) ) + ### Augmentations are training-only: when enabled, give + ### validation its own un-augmented dataset over the same + ### directory so eval is never augmented (matching directory + ### mode). Stays None when augment is off, so val shares the + ### train dataset. + manifest_val_dataset = _build_manifest_val_dataset( + ds_yaml, + augment=augment, + device=device, + num_workers=num_workers, + pin_memory=pin_memory, + ) continue ### Directory mode: separate readers / datasets per split. @@ -900,9 +952,15 @@ def build_dataloaders( train_dataset = _combine_datasets(train_datasets) if using_manifests: - ### Manifest mode: train and val share one underlying dataset; - ### the samplers carve out the per-split index sets. - val_dataset = train_dataset + ### Manifest mode: train and val share one underlying reader; the + ### samplers carve out the per-split index sets. When augmentations + ### are enabled, validation uses a dedicated un-augmented dataset + ### (built in the loop above) so eval is never augmented -- matching + ### directory mode; otherwise the chains are identical and val + ### shares the train dataset. + val_dataset = ( + manifest_val_dataset if manifest_val_dataset is not None else train_dataset + ) train_sampler, val_sampler = _build_manifest_samplers( manifest_train_indices, manifest_val_indices, diff --git a/examples/cfd/external_aerodynamics/unified_external_aero_recipe/tests/test_manifest.py b/examples/cfd/external_aerodynamics/unified_external_aero_recipe/tests/test_manifest.py index b2b0b5280d..b4ee4b2001 100644 --- a/examples/cfd/external_aerodynamics/unified_external_aero_recipe/tests/test_manifest.py +++ b/examples/cfd/external_aerodynamics/unified_external_aero_recipe/tests/test_manifest.py @@ -31,10 +31,12 @@ from types import SimpleNamespace import pytest -from omegaconf import OmegaConf +from omegaconf import DictConfig, OmegaConf from datasets import ( ManifestSampler, + _build_manifest_val_dataset, + build_dataset, load_manifest, resolve_manifest_indices, resolve_manifest_spec, @@ -388,3 +390,94 @@ def test_directory_mode_unaffected_by_loud_failure(self, tmp_path: Path): ds_yaml = OmegaConf.create({"train_datadir": str(tmp_path)}) ds_block = OmegaConf.create({}) assert resolve_manifest_spec(ds_yaml, ds_block) is None + + +### --------------------------------------------------------------------------- +### _build_manifest_val_dataset +### --------------------------------------------------------------------------- + + +class TestManifestValDataset: + """Tests for :func:`datasets._build_manifest_val_dataset`. + + Manifest mode shares one reader across the train / val splits, so + validation must not inherit the train augmentations. This mirrors + directory mode, which always builds its val dataset with + ``augment=False`` -- the asymmetry these tests lock down. + """ + + @staticmethod + def _augmented_ds_yaml(datadir: Path) -> DictConfig: + """Minimal manifest-style volume dataset YAML carrying augmentations. + + Trimmed to what the dataset builder inspects: the reader globs + paths lazily (no file is opened at construction), so the directory + only needs placeholder files, and the transform chain just needs a + ``CenterMesh`` anchor plus the augmentations that get inserted + after it. + """ + return OmegaConf.create( + { + "pipeline": { + "reader": { + "_target_": "${dp:DomainMeshReader}", + "path": str(datadir), + "pattern": "run_*/domain_*.pdmsh", + }, + "augmentations": [ + {"_target_": "${dp:RandomRotateMesh}", "axes": ["z"]}, + {"_target_": "${dp:RandomTranslateMesh}"}, + ], + "transforms": [ + {"_target_": "${dp:CenterMesh}"}, + ], + }, + "targets": {"pressure": "scalar"}, + } + ) + + @staticmethod + def _make_datadir(tmp_path: Path) -> Path: + """Create placeholder runs the reader can glob (it never opens them).""" + for i in range(2): + run = tmp_path / f"run_{i}" + run.mkdir() + (run / f"domain_{i}.pdmsh").write_bytes(b"") + return tmp_path + + def test_augment_off_returns_none(self, tmp_path: Path): + """``augment=False`` -> val shares the train dataset (None sentinel).""" + ds_yaml = self._augmented_ds_yaml(self._make_datadir(tmp_path)) + assert ( + _build_manifest_val_dataset( + ds_yaml, + augment=False, + device=None, + num_workers=1, + pin_memory=False, + ) + is None + ) + + def test_augment_on_returns_unaugmented_dataset(self, tmp_path: Path): + """``augment=True`` -> a separate dataset whose chain has no augmentations.""" + ds_yaml = self._augmented_ds_yaml(self._make_datadir(tmp_path)) + + ### Guard against a vacuous assertion: the train dataset must + ### actually carry a stochastic augmentation for the val check to + ### mean anything. + train_ds = build_dataset( + ds_yaml, augment=True, device=None, num_workers=1, pin_memory=False + ) + assert any(getattr(t, "stochastic", False) for t in train_ds.transforms) + + val_ds = _build_manifest_val_dataset( + ds_yaml, augment=True, device=None, num_workers=1, pin_memory=False + ) + assert val_ds is not None + ### A distinct object (own reader), not the train dataset. + assert val_ds is not train_ds + ### No stochastic (augmentation) transforms survive on the val chain. + assert not any(getattr(t, "stochastic", False) for t in val_ds.transforms) + ### ...but the deterministic CenterMesh transform is still present. + assert any(type(t).__name__ == "CenterMesh" for t in val_ds.transforms) From b34736b99f49bbcd5be88fbfcc6f491e7ef4aebe Mon Sep 17 00:00:00 2001 From: Peter Sharpe Date: Thu, 25 Jun 2026 12:15:41 -0400 Subject: [PATCH 10/15] better docstring --- .../unified_external_aero_recipe/src/train.py | 67 ++++++++++++++++--- 1 file changed, 56 insertions(+), 11 deletions(-) diff --git a/examples/cfd/external_aerodynamics/unified_external_aero_recipe/src/train.py b/examples/cfd/external_aerodynamics/unified_external_aero_recipe/src/train.py index 784dae10f1..b7b7f726e9 100644 --- a/examples/cfd/external_aerodynamics/unified_external_aero_recipe/src/train.py +++ b/examples/cfd/external_aerodynamics/unified_external_aero_recipe/src/train.py @@ -129,17 +129,62 @@ def _reduce_and_average_epoch( *, device: torch.device | str, ) -> tuple[float, dict[str, float], dict[str, float]]: - """Average epoch loss/metric *sums* over the GLOBAL sample count. - - The per-rank loop accumulates *sums* of per-step (per-sample, since - ``batch_size == 1``) losses and metrics. This packs ``total_loss``, - the local sample count, and every loss/metric leaf into one float32 - tensor, all-reduces it once (SUM) when running distributed, then - divides by the reduced count. One collective + one D2H. Correct for - uneven per-rank shards (``global_sum / global_count``) and - deadlock-free (invoked once after the per-rank loops finish, not per - step). The single-process path is identical to the previous - ``sum / n_local`` averaging. + """Collapse one epoch's rank-local loss/metric *sums* into global means. + + Under DDP each rank only sees its own shard of the epoch, so the + numbers we log are meaningless until reduced across ranks. This takes + the rank-local running *sums* the epoch loop accumulated (``total_loss`` + and the 0-D ``losses_td`` / ``metrics_td`` leaves) plus the rank's + sample count, reduces them across ranks once, and divides by the + *global* count to produce sample-weighted means. + + Reducing sums and a count (rather than per-rank means) is what makes the + result correct for uneven shards: ``global_sum / global_count`` weights + every sample equally no matter how the dataset split across ranks. + Doing this once at end-of-epoch, rather than per step, also keeps it + deadlock-free: every rank issues exactly one collective here even if + ranks ran different step counts. The values are packed into a single + ``float32`` buffer, so the whole epoch costs one ``all_reduce`` and one + device-to-host sync (the ``.tolist()``). It mirrors the inference-side + reducer ``infer._allreduce_sums``. + + Args: + total_loss: Rank-local sum of the per-step scalar losses over the + epoch (``sum(loss.item())``), not a mean. + losses_td: Rank-local epoch accumulator of per-field losses: a 0-D + (``batch_size=[]``) ``TensorDict`` whose leaves are the summed + scalar losses, one per loss term. ``None`` is the "epoch ran + zero steps" sentinel (see Notes). + metrics_td: The matching per-field metric-sum accumulator, with the + same ``None`` sentinel. Seeded in lock-step with ``losses_td``, + so the two are ``None`` together or populated together. + n_local: Number of samples this rank processed this epoch. Equals + the step count because the recipe runs ``batch_size == 1``. + device: Device on which to build the reduction buffer. Must be the + rank's collective/compute device (``dist_manager.device``) so the + NCCL ``all_reduce`` runs on the correct device. + + Returns: + A ``(avg_loss, avg_losses, avg_metrics)`` tuple where ``avg_loss`` is + the global mean loss, ``avg_losses`` is ``{loss_name: global_mean}``, + and ``avg_metrics`` is ``{metric_name: global_mean}``. The dict keys + and their order are taken from ``losses_td`` / ``metrics_td``. On the + ``None`` sentinel it returns ``(total_loss / max(n_local, 1), {}, + {})`` without entering the collective. + + Notes: + Single-process (or ``world_size == 1``) skips the reduction, so the + result is identical to the previous ``sum / n_local`` averaging and + single-GPU logs are unchanged. + + The one fused ``all_reduce`` is valid only because every rank packs + the same leaves in the same order, which holds since all ranks share + one ``target_config`` (identical loss/metric keys). The ``None`` + early return similarly assumes ranks are seeded together: under DDP + every rank gets at least one sample, so the accumulators are + non-``None`` on all ranks at once. A lone rank taking the early + return while its peers enter the collective is the only way this + would hang, and the samplers this recipe uses do not produce that. """ if losses_td is None or metrics_td is None: return total_loss / max(n_local, 1), {}, {} From 9224727c81b14bef8bbbe7b609c953b35dadde44 Mon Sep 17 00:00:00 2001 From: Peter Sharpe Date: Thu, 25 Jun 2026 13:28:19 -0400 Subject: [PATCH 11/15] Synchronize nomenclature between train.py and infer.py --- .../unified_external_aero_recipe/src/train.py | 62 +++++++++++++++---- 1 file changed, 50 insertions(+), 12 deletions(-) diff --git a/examples/cfd/external_aerodynamics/unified_external_aero_recipe/src/train.py b/examples/cfd/external_aerodynamics/unified_external_aero_recipe/src/train.py index b7b7f726e9..b0a6096273 100644 --- a/examples/cfd/external_aerodynamics/unified_external_aero_recipe/src/train.py +++ b/examples/cfd/external_aerodynamics/unified_external_aero_recipe/src/train.py @@ -76,6 +76,11 @@ _PROFILE_MAX_STEPS = 10 +### --------------------------------------------------------------------------- +### Config +### --------------------------------------------------------------------------- + + def _flatten_config( d: dict[str, Any], parent: str = "", sep: str = "." ) -> dict[str, str]: @@ -90,6 +95,11 @@ def _flatten_config( return items +### --------------------------------------------------------------------------- +### Aggregation +### --------------------------------------------------------------------------- + + def _to_float_dicts( losses_td: TensorDict | None, metrics_td: TensorDict | None, @@ -213,6 +223,11 @@ def _reduce_and_average_epoch( ) +### --------------------------------------------------------------------------- +### Logging +### --------------------------------------------------------------------------- + + def _log_to_tensorboard( writer: SummaryWriter | None, values: Mapping[str, float | torch.Tensor], @@ -230,6 +245,11 @@ def _log_to_tensorboard( writer.add_scalar(f"{tag_prefix}/{k}", v, global_step=global_step) +### --------------------------------------------------------------------------- +### Forward pass +### --------------------------------------------------------------------------- + + def forward_pass( batch: dict[str, Any], model: torch.nn.Module, @@ -292,6 +312,11 @@ def forward_pass( return loss, loss_td.detach(), metric_td.detach() +### --------------------------------------------------------------------------- +### Epoch loops +### --------------------------------------------------------------------------- + + def _run_epoch( dataloader: DataLoader, model: torch.nn.Module, @@ -343,6 +368,7 @@ def _run_epoch( grad_ctx = nullcontext() if is_train else torch.no_grad() log_prefix = "Epoch" if is_train else "Val Epoch" + is_rank0 = dist_manager.rank == 0 ### `total_loss` is a Python float fed by the per-step print line's ### sync; `total_losses_td` / `total_metrics_td` are on-device @@ -425,7 +451,7 @@ def _run_epoch( ### emitted in both modes so downstream tooling can compute val ### step-time statistics directly instead of inferring them from ### ``val_ts - train_ts``. - if dist_manager.rank == 0: + if is_rank0: losses_floats, metrics_floats = _to_float_dicts(losses, metrics) if is_train: global_step = epoch * num_steps + i @@ -510,7 +536,7 @@ def _run_epoch( f"({n_batches} steps, {epoch_dt / n:.3f}s/step avg)" ) - if dist_manager.rank == 0: + if is_rank0: _log_to_tensorboard(writer, avg_losses, "epoch", epoch) _log_to_tensorboard(writer, avg_metrics, "epoch/metrics", epoch) if log_jsonl is not None: @@ -601,6 +627,11 @@ def val_epoch( ) +### --------------------------------------------------------------------------- +### I/O benchmarking +### --------------------------------------------------------------------------- + + def _walk_batch_for_logging( value: Any, prefix: str = "" ) -> Iterator[tuple[str, torch.Tensor]]: @@ -720,6 +751,11 @@ def benchmark_io_epoch( ) +### --------------------------------------------------------------------------- +### Driver +### --------------------------------------------------------------------------- + + @profile def main(cfg: DictConfig) -> None: """Run the full training loop, or I/O-only benchmark when ``benchmark_io=true``. @@ -742,6 +778,8 @@ def main(cfg: DictConfig) -> None: DistributedManager.initialize() dist_manager = DistributedManager() + device = dist_manager.device + is_rank0 = dist_manager.rank == 0 logger = RankZeroLoggingWrapper(PythonLogger(name="training"), dist_manager) seed = cfg.training.get("seed", None) @@ -755,7 +793,7 @@ def main(cfg: DictConfig) -> None: val_writer = None log_jsonl = None run_dir = os.path.join(cfg.output_dir, cfg.run_id) - if dist_manager.rank == 0: + if is_rank0: os.makedirs(run_dir, exist_ok=True) os.makedirs(checkpoint_dir, exist_ok=True) @@ -779,7 +817,7 @@ def main(cfg: DictConfig) -> None: logger.info(f"Targets (from dataset YAML): {target_config}") # -- Log dataset metadata (rank 0) -------------------------------------------- - if dist_manager.rank == 0 and log_jsonl is not None: + if is_rank0 and log_jsonl is not None: ### Use len(sampler) so manifest mode (where train and val share ### one underlying dataset) reports the actual per-split count, ### not the always-identical len(dataset). PyTorch always assigns @@ -810,7 +848,7 @@ def main(cfg: DictConfig) -> None: benchmark_io_epoch(train_loader, "train", logger, max_steps=max_steps) benchmark_io_epoch(val_loader, "val", logger, max_steps=max_steps) logger.info("benchmark_io complete!") - if dist_manager.rank == 0: + if is_rank0: if train_writer is not None: train_writer.close() if val_writer is not None: @@ -823,13 +861,13 @@ def main(cfg: DictConfig) -> None: num_params = sum(p.numel() for p in model.parameters()) logger.info(f"Parameters: {num_params:,}") - model.to(dist_manager.device) + model.to(device) if dist_manager.world_size > 1: model = torch.nn.parallel.DistributedDataParallel( model, device_ids=[dist_manager.local_rank], - output_device=dist_manager.device, + output_device=device, ) if normalizer is not None: @@ -846,7 +884,7 @@ def main(cfg: DictConfig) -> None: scaler = GradScaler() if precision == "float16" else None # -- Log full config + model params (rank 0) --------------------------------- - if dist_manager.rank == 0: + if is_rank0: flat_cfg = _flatten_config( OmegaConf.to_container(cfg, resolve=True, throw_on_missing=False) ) @@ -896,7 +934,7 @@ def main(cfg: DictConfig) -> None: "scheduler": scheduler, "models": model, } - loaded_epoch = load_checkpoint(device=dist_manager.device, **ckpt_args) + loaded_epoch = load_checkpoint(device=device, **ckpt_args) if cfg.compile: model = torch.compile(model) @@ -943,7 +981,7 @@ def main(cfg: DictConfig) -> None: log_jsonl=log_jsonl, ) - if dist_manager.rank == 0: + if is_rank0: all_keys = list(dict.fromkeys(list(train_metrics) + list(val_metrics))) rows = [ @@ -964,7 +1002,7 @@ def main(cfg: DictConfig) -> None: f"{table}\n" ) - if epoch % cfg.training.save_interval == 0 and dist_manager.rank == 0: + if epoch % cfg.training.save_interval == 0 and is_rank0: save_checkpoint(**ckpt_args, epoch=epoch + 1) if normalizer is not None: norm_path = os.path.join(ckpt_args["path"], "norm_stats.pt") @@ -973,7 +1011,7 @@ def main(cfg: DictConfig) -> None: if cfg.training.get("scheduler_update_mode", "epoch") == "epoch": scheduler.step() - if dist_manager.rank == 0: + if is_rank0: if train_writer is not None: train_writer.close() if val_writer is not None: From 4a213b02e05e3a75c8a7904dc92b5e87fd34ec3b Mon Sep 17 00:00:00 2001 From: Peter Sharpe Date: Thu, 25 Jun 2026 15:09:42 -0400 Subject: [PATCH 12/15] Standardizes nomenclature for JSONL logging around clean Phase definition. (Previously, was scattered across infer.py and train.py) --- .../unified_external_aero_recipe/README.md | 14 ++++++++----- .../unified_external_aero_recipe/src/infer.py | 8 +++----- .../unified_external_aero_recipe/src/train.py | 6 ++++-- .../unified_external_aero_recipe/src/utils.py | 20 +++++++++++++++++++ .../tests/test_utils.py | 8 +++++--- 5 files changed, 41 insertions(+), 15 deletions(-) diff --git a/examples/cfd/external_aerodynamics/unified_external_aero_recipe/README.md b/examples/cfd/external_aerodynamics/unified_external_aero_recipe/README.md index ebdb26f06d..3cdc35fe2a 100644 --- a/examples/cfd/external_aerodynamics/unified_external_aero_recipe/README.md +++ b/examples/cfd/external_aerodynamics/unified_external_aero_recipe/README.md @@ -366,11 +366,13 @@ the checkpoint named by `run_id` (under `checkpoint_dir`, default ${output_dir}/${run_id}/ predictions/.pdmsh # DomainMesh: interior carries # pred_ and true_ - metrics.jsonl # per-sample + summary records + metrics.jsonl # infer_step + infer_summary records ``` - **Metrics** are reported in training space (non-dim / normalized), so - they line up with the validation numbers logged during training. + they line up with the validation numbers logged during training. The + JSONL carries one `infer_step` row per sample plus an `infer_summary` + aggregate (and `infer_forces_summary` for surface cases). - **Physical units**: written fields are re-dimensionalized (`redimensionalize=true`, default) by inverting normalization then non-dimensionalization; `rescale_geometry=true` additionally restores @@ -735,9 +737,11 @@ Training and validation metrics are logged in two places per run: loss / per-field loss / per-field metrics / learning rate / step time / GPU memory go in the `train/` writer; per-epoch summaries (loss + metrics) go in both writers. -- **JSONL** at `${output_dir}/${run_id}/metrics.jsonl`. One line per - config snapshot, dataset summary, training step, and train / val epoch. - Easy to grep, easy to ship to an external store. +- **JSONL** at `${output_dir}/${run_id}/metrics.jsonl`. One record per + line, tagged by a `phase` field: `config` (resolved run config), + `dataset` (split sizes + targets), `train_step` / `val_step` (per-step + loss + metrics), and `train_summary` / `val_summary` (per-epoch reduced + means). Easy to grep, easy to ship to an external store. Rank-0 only; no external tracker required. diff --git a/examples/cfd/external_aerodynamics/unified_external_aero_recipe/src/infer.py b/examples/cfd/external_aerodynamics/unified_external_aero_recipe/src/infer.py index fc074b9812..0ff8660e62 100644 --- a/examples/cfd/external_aerodynamics/unified_external_aero_recipe/src/infer.py +++ b/examples/cfd/external_aerodynamics/unified_external_aero_recipe/src/infer.py @@ -638,7 +638,7 @@ def main(cfg: DictConfig) -> None: ### One JSONL row per sample -- the documented metrics.jsonl ### contract. Console logging below is throttled by log_every. record: dict[str, Any] = { - "phase": "sample", + "phase": "infer_step", "step": i, "sample_id": sample_id, "metrics": sample_metrics, @@ -676,7 +676,7 @@ def main(cfg: DictConfig) -> None: ) log_jsonl( { - "phase": "summary", + "phase": "infer_summary", "space": "training", "num_samples": count, "metrics": averages, @@ -697,14 +697,12 @@ def main(cfg: DictConfig) -> None: ) log_jsonl( { - "phase": "forces_summary", + "phase": "infer_forces_summary", "num_samples": force_acc.count, "coefficients": coeff_summary, } ) - logger.info(f"Inference complete! Predictions written to {pred_dir}") - if __name__ == "__main__": main() diff --git a/examples/cfd/external_aerodynamics/unified_external_aero_recipe/src/train.py b/examples/cfd/external_aerodynamics/unified_external_aero_recipe/src/train.py index b0a6096273..bc87d8ca59 100644 --- a/examples/cfd/external_aerodynamics/unified_external_aero_recipe/src/train.py +++ b/examples/cfd/external_aerodynamics/unified_external_aero_recipe/src/train.py @@ -53,6 +53,7 @@ from torch.utils.tensorboard import SummaryWriter from utils import ( FieldType, + Phase, Precision, build_muon_optimizer, get_autocast_context, @@ -484,7 +485,7 @@ def _run_epoch( if log_jsonl is not None: log_jsonl( { - "phase": "step", + "phase": "train_step", "global_step": global_step, "loss": this_loss, "mem_gb": mem_gb, @@ -540,9 +541,10 @@ def _run_epoch( _log_to_tensorboard(writer, avg_losses, "epoch", epoch) _log_to_tensorboard(writer, avg_metrics, "epoch/metrics", epoch) if log_jsonl is not None: + summary_phase: Phase = "train_summary" if is_train else "val_summary" log_jsonl( { - "phase": mode, + "phase": summary_phase, "epoch": epoch, "loss": avg_loss, **avg_losses, diff --git a/examples/cfd/external_aerodynamics/unified_external_aero_recipe/src/utils.py b/examples/cfd/external_aerodynamics/unified_external_aero_recipe/src/utils.py index aa8fe33e79..f9ce7983d0 100644 --- a/examples/cfd/external_aerodynamics/unified_external_aero_recipe/src/utils.py +++ b/examples/cfd/external_aerodynamics/unified_external_aero_recipe/src/utils.py @@ -46,6 +46,26 @@ ### its error message for the padding rationale). Precision: TypeAlias = Literal["float32", "float16", "bfloat16"] +### Canonical ``phase`` tags for each ``metrics.jsonl`` record, shared by +### train.py and infer.py so both entry points emit one vocabulary. Values are +### ``{split}_{granularity}`` (or a one-shot metadata tag): ``config`` / +### ``dataset`` are run metadata; ``*_step`` rows are per-unit (one per step / +### sample, as the recipe runs ``batch_size == 1``); ``*_summary`` rows are the +### reduced per-pass aggregates (``infer_forces_summary`` is surface-only). Call +### sites write the bare strings; annotate a value ``: Phase`` to have the type +### checker enforce them (see the train/val summary branch in ``_run_epoch``). +Phase: TypeAlias = Literal[ + "config", + "dataset", + "train_step", + "val_step", + "infer_step", + "train_summary", + "val_summary", + "infer_summary", + "infer_forces_summary", +] + def set_seed(seed: int | None, rank: int = 0) -> None: """Pin all RNG states for reproducible training. diff --git a/examples/cfd/external_aerodynamics/unified_external_aero_recipe/tests/test_utils.py b/examples/cfd/external_aerodynamics/unified_external_aero_recipe/tests/test_utils.py index 71c43a4d31..140f325648 100644 --- a/examples/cfd/external_aerodynamics/unified_external_aero_recipe/tests/test_utils.py +++ b/examples/cfd/external_aerodynamics/unified_external_aero_recipe/tests/test_utils.py @@ -99,12 +99,14 @@ def test_make_jsonl_logger_writes_timestamped_line(tmp_path): """Logger appends one JSON object per call, each stamped with a 'ts' field.""" path = tmp_path / "metrics.jsonl" log = make_jsonl_logger(path) - log({"phase": "summary", "value": 1.5}) - log({"phase": "sample", "step": 0}) + log({"phase": "infer_summary", "value": 1.5}) + log({"phase": "infer_step", "step": 0}) lines = path.read_text().strip().splitlines() assert len(lines) == 2 first = json.loads(lines[0]) - assert first["phase"] == "summary" and first["value"] == 1.5 and "ts" in first + assert first["phase"] == "infer_summary" + assert first["value"] == 1.5 + assert "ts" in first ### --------------------------------------------------------------------------- From d028742ad18ce1bed7faae8c3baeeb72c53260f7 Mon Sep 17 00:00:00 2001 From: Peter Sharpe Date: Thu, 25 Jun 2026 15:43:50 -0400 Subject: [PATCH 13/15] last name synchronizations between infer and train --- .../unified_external_aero_recipe/src/infer.py | 15 +++++++++++---- .../unified_external_aero_recipe/src/train.py | 14 +++++++++----- 2 files changed, 20 insertions(+), 9 deletions(-) diff --git a/examples/cfd/external_aerodynamics/unified_external_aero_recipe/src/infer.py b/examples/cfd/external_aerodynamics/unified_external_aero_recipe/src/infer.py index 0ff8660e62..b8c8a3a934 100644 --- a/examples/cfd/external_aerodynamics/unified_external_aero_recipe/src/infer.py +++ b/examples/cfd/external_aerodynamics/unified_external_aero_recipe/src/infer.py @@ -325,7 +325,10 @@ def _allreduce_sums( """All-reduce a ``{key: running_sum}`` dict and the sample count. No-op (returns a plain copy) when not running distributed. Folds every - sum plus the count into a single collective. + sum plus the count into a single collective. The training loop's + ``train._reduce_and_average_epoch`` is the analogous reducer; it also + divides by the global count to return means, whereas this returns the + reduced sums for the caller to average. """ if not ( dist.is_available() and dist.is_initialized() and dist.get_world_size() > 1 @@ -502,7 +505,6 @@ def main(cfg: DictConfig) -> None: # -- Force / moment coefficient setup (surface cases) ----------------------- force_cfg = OmegaConf.select(cfg, "force_coefficients", default=None) force_ctx = ForceContext.from_config(force_cfg, field_types, device) - force_acc = ForceAccumulator() if force_ctx is not None: logger.info( f"Force coefficients: integrating Cp='{force_ctx.pressure_field}', " @@ -535,9 +537,12 @@ def main(cfg: DictConfig) -> None: } ) + dataset: Any = val_loader.dataset + sampler: Any = val_loader.sampler + + force_acc = ForceAccumulator() + # -- Inference loop --------------------------------------------------------- - dataset = val_loader.dataset - sampler = val_loader.sampler n_samples = len(sampler) log_every = max(1, int(cfg.get("logging", {}).get("log_every_n_steps", 10))) logger.info(f"Running inference over {n_samples} sample(s) -> {pred_dir}") @@ -703,6 +708,8 @@ def main(cfg: DictConfig) -> None: } ) + logger.info(f"Inference complete! Predictions written to {pred_dir}") + if __name__ == "__main__": main() diff --git a/examples/cfd/external_aerodynamics/unified_external_aero_recipe/src/train.py b/examples/cfd/external_aerodynamics/unified_external_aero_recipe/src/train.py index bc87d8ca59..cfcd7b3d49 100644 --- a/examples/cfd/external_aerodynamics/unified_external_aero_recipe/src/train.py +++ b/examples/cfd/external_aerodynamics/unified_external_aero_recipe/src/train.py @@ -377,11 +377,15 @@ def _run_epoch( ### their D2H transfer to the single batched ``.tolist()`` at ### end-of-epoch. ``None`` here means "not yet seeded"; the first ### iteration clones the per-step TensorDict to break aliasing. + ### ``n_local`` below is this rank's step/sample count. The averaging + ### denominator is the GLOBAL count that ``_reduce_and_average_epoch`` + ### all-reduces from each rank's ``n_local`` at end-of-epoch; the local + ### value is reused directly only for the per-rank step-rate line. total_loss = 0.0 total_losses_td: TensorDict | None = None total_metrics_td: TensorDict | None = None precision = getattr(cfg, "precision", "float32") - n_batches = 0 + n_local = 0 num_steps = len(dataloader) epoch_t0 = time.perf_counter() @@ -424,7 +428,7 @@ def _run_epoch( else: total_losses_td.add_(losses) total_metrics_td.add_(metrics) - n_batches += 1 + n_local += 1 ### Per-step sync for the print line; lands after backward + ### optimizer.step so it overlaps with queued GPU work. @@ -520,7 +524,7 @@ def _run_epoch( step_t0 = time.perf_counter() epoch_dt = time.perf_counter() - epoch_t0 - n = max(n_batches, 1) + n = max(n_local, 1) ### Reduce the epoch sums + sample count across ranks once, so logged ### loss/metrics are the GLOBAL averages (not rank-0's shard) under ### DDP. `n` above is kept local for the per-rank step-rate line below. @@ -528,13 +532,13 @@ def _run_epoch( total_loss, total_losses_td, total_metrics_td, - n_batches, + n_local, device=dist_manager.device, ) logger.info( f"Epoch {epoch} {mode} done in {epoch_dt:.1f}s " - f"({n_batches} steps, {epoch_dt / n:.3f}s/step avg)" + f"({n_local} steps, {epoch_dt / n:.3f}s/step avg)" ) if is_rank0: From 4fe1bbedf0bc3d53389ff268cc61c17ce70636b9 Mon Sep 17 00:00:00 2001 From: Peter Sharpe Date: Fri, 26 Jun 2026 12:33:42 -0400 Subject: [PATCH 14/15] Refactors reduce_and_average_epoch into reduce_and_average, so that all_reduce logic is shared for iters and epochs. --- .../unified_external_aero_recipe/README.md | 9 +- .../unified_external_aero_recipe/src/infer.py | 2 +- .../unified_external_aero_recipe/src/train.py | 157 +++++++++--------- .../tests/test_train_helpers.py | 36 ++-- 4 files changed, 93 insertions(+), 111 deletions(-) diff --git a/examples/cfd/external_aerodynamics/unified_external_aero_recipe/README.md b/examples/cfd/external_aerodynamics/unified_external_aero_recipe/README.md index 3cdc35fe2a..b58ef7f318 100644 --- a/examples/cfd/external_aerodynamics/unified_external_aero_recipe/README.md +++ b/examples/cfd/external_aerodynamics/unified_external_aero_recipe/README.md @@ -740,10 +740,13 @@ Training and validation metrics are logged in two places per run: - **JSONL** at `${output_dir}/${run_id}/metrics.jsonl`. One record per line, tagged by a `phase` field: `config` (resolved run config), `dataset` (split sizes + targets), `train_step` / `val_step` (per-step - loss + metrics), and `train_summary` / `val_summary` (per-epoch reduced - means). Easy to grep, easy to ship to an external store. + loss + metrics), and `train_summary` / `val_summary` (per-epoch loss + + metrics). Easy to grep, easy to ship to an external store. -Rank-0 only; no external tracker required. +Under multi-GPU (DDP), all logged loss and metric values -- per-step and +per-epoch alike -- are global all-rank means (reduced across ranks with a +single fused `all_reduce`), not rank-0's shard. Only rank 0 writes the +TensorBoard / JSONL files; no external tracker required. ## Source modules diff --git a/examples/cfd/external_aerodynamics/unified_external_aero_recipe/src/infer.py b/examples/cfd/external_aerodynamics/unified_external_aero_recipe/src/infer.py index b8c8a3a934..67de040564 100644 --- a/examples/cfd/external_aerodynamics/unified_external_aero_recipe/src/infer.py +++ b/examples/cfd/external_aerodynamics/unified_external_aero_recipe/src/infer.py @@ -326,7 +326,7 @@ def _allreduce_sums( No-op (returns a plain copy) when not running distributed. Folds every sum plus the count into a single collective. The training loop's - ``train._reduce_and_average_epoch`` is the analogous reducer; it also + ``train._reduce_and_average`` is the analogous reducer; it also divides by the global count to return means, whereas this returns the reduced sums for the caller to average. """ diff --git a/examples/cfd/external_aerodynamics/unified_external_aero_recipe/src/train.py b/examples/cfd/external_aerodynamics/unified_external_aero_recipe/src/train.py index cfcd7b3d49..7a48ea8957 100644 --- a/examples/cfd/external_aerodynamics/unified_external_aero_recipe/src/train.py +++ b/examples/cfd/external_aerodynamics/unified_external_aero_recipe/src/train.py @@ -101,76 +101,49 @@ def _flatten_config( ### --------------------------------------------------------------------------- -def _to_float_dicts( +def _reduce_and_average( + loss_sum: float, losses_td: TensorDict | None, metrics_td: TensorDict | None, - *, - n: int = 1, -) -> tuple[dict[str, float], dict[str, float]]: - """Stack both TDs' 0-D leaves, divide by *n*, and ``.tolist()`` in one D2H sync. - - Used at both per-step (``n=1``) and per-epoch (``n=batch_count``) - boundaries: collapses ``2 * n_fields`` ``.item()`` calls into a single - ``.tolist()`` over a stacked 1-D tensor. Either TD being ``None`` - (the "not yet seeded" sentinel for zero-step epochs) returns - ``({}, {})``. - """ - if losses_td is None or metrics_td is None: - return {}, {} - ### Bridge TensorDict's wider key/value types to the runtime contract - ### this recipe enforces: every loss / metric leaf is a 0-D scalar - ### Tensor keyed by str. - loss_keys = cast(list[str], list(losses_td.keys())) - metric_keys = cast(list[str], list(metrics_td.keys())) - loss_tensors = cast(list[torch.Tensor], list(losses_td.values())) - metric_tensors = cast(list[torch.Tensor], list(metrics_td.values())) - flat = (torch.stack(loss_tensors + metric_tensors) / n).tolist() - n_loss = len(loss_keys) - return ( - dict(zip(loss_keys, flat[:n_loss])), - dict(zip(metric_keys, flat[n_loss:])), - ) - - -def _reduce_and_average_epoch( - total_loss: float, - losses_td: TensorDict | None, - metrics_td: TensorDict | None, - n_local: int, + n_samples: int, *, device: torch.device | str, ) -> tuple[float, dict[str, float], dict[str, float]]: - """Collapse one epoch's rank-local loss/metric *sums* into global means. + """Collapse rank-local loss/metric *sums* + a sample count into global means. + + Under DDP each rank only sees its own shard, so the numbers we log are + meaningless until reduced across ranks. This takes a rank-local *sum* + (``loss_sum`` plus the 0-D ``losses_td`` / ``metrics_td`` leaves) and the + matching sample count, reduces them across ranks once, and divides by the + *global* count to produce sample-weighted means. It is granularity-neutral + and called at two boundaries: - Under DDP each rank only sees its own shard of the epoch, so the - numbers we log are meaningless until reduced across ranks. This takes - the rank-local running *sums* the epoch loop accumulated (``total_loss`` - and the 0-D ``losses_td`` / ``metrics_td`` leaves) plus the rank's - sample count, reduces them across ranks once, and divides by the - *global* count to produce sample-weighted means. + - Per step, with ``n_samples == 1`` (one sample per batch), so the logged + iteration curves are global all-rank means rather than rank-0's shard. + - Per epoch, with ``n_samples == n_local`` and the running epoch sums, so + the summary is a global mean over the whole dataset. Reducing sums and a count (rather than per-rank means) is what makes the result correct for uneven shards: ``global_sum / global_count`` weights - every sample equally no matter how the dataset split across ranks. - Doing this once at end-of-epoch, rather than per step, also keeps it - deadlock-free: every rank issues exactly one collective here even if - ranks ran different step counts. The values are packed into a single - ``float32`` buffer, so the whole epoch costs one ``all_reduce`` and one - device-to-host sync (the ``.tolist()``). It mirrors the inference-side - reducer ``infer._allreduce_sums``. + every sample equally no matter how the dataset split across ranks. The + values are packed into a single ``float32`` buffer, so each call costs one + ``all_reduce`` and one device-to-host sync (the ``.tolist()``). It mirrors + the inference-side reducer ``infer._allreduce_sums``. Args: - total_loss: Rank-local sum of the per-step scalar losses over the - epoch (``sum(loss.item())``), not a mean. - losses_td: Rank-local epoch accumulator of per-field losses: a 0-D - (``batch_size=[]``) ``TensorDict`` whose leaves are the summed - scalar losses, one per loss term. ``None`` is the "epoch ran - zero steps" sentinel (see Notes). + loss_sum: Rank-local sum of scalar losses over the ``n_samples`` being + collapsed -- one step's ``loss.item()`` per step, or the epoch + running sum -- not a mean. + losses_td: Rank-local per-field loss sum: a 0-D (``batch_size=[]``) + ``TensorDict`` whose leaves are summed scalar losses, one per loss + term. ``None`` is the "zero samples" sentinel (see Notes); it does + not arise on the per-step path, where a batch is always present. metrics_td: The matching per-field metric-sum accumulator, with the - same ``None`` sentinel. Seeded in lock-step with ``losses_td``, - so the two are ``None`` together or populated together. - n_local: Number of samples this rank processed this epoch. Equals - the step count because the recipe runs ``batch_size == 1``. + same ``None`` sentinel. Seeded in lock-step with ``losses_td``, so + the two are ``None`` together or populated together. + n_samples: Number of samples this rank contributed to ``loss_sum`` and + the accumulators (``1`` per step, ``n_local`` per epoch; equal to + the step count because the recipe runs ``batch_size == 1``). device: Device on which to build the reduction buffer. Must be the rank's collective/compute device (``dist_manager.device``) so the NCCL ``all_reduce`` runs on the correct device. @@ -180,34 +153,38 @@ def _reduce_and_average_epoch( the global mean loss, ``avg_losses`` is ``{loss_name: global_mean}``, and ``avg_metrics`` is ``{metric_name: global_mean}``. The dict keys and their order are taken from ``losses_td`` / ``metrics_td``. On the - ``None`` sentinel it returns ``(total_loss / max(n_local, 1), {}, - {})`` without entering the collective. + ``None`` sentinel it returns ``(loss_sum / max(n_samples, 1), {}, {})`` + without entering the collective. Notes: Single-process (or ``world_size == 1``) skips the reduction, so the - result is identical to the previous ``sum / n_local`` averaging and + result is identical to plain ``sum / n_samples`` averaging and single-GPU logs are unchanged. + Calling this per step adds one collective per iteration, which is only + deadlock-free because every rank issues the same number of collectives + -- i.e. every rank runs the same step count. The recipe's samplers + guarantee that: train uses ``drop_last=True`` and val pads to even + shards, so no rank finishes early and skips a step's ``all_reduce``. + The one fused ``all_reduce`` is valid only because every rank packs the same leaves in the same order, which holds since all ranks share - one ``target_config`` (identical loss/metric keys). The ``None`` - early return similarly assumes ranks are seeded together: under DDP - every rank gets at least one sample, so the accumulators are - non-``None`` on all ranks at once. A lone rank taking the early - return while its peers enter the collective is the only way this - would hang, and the samplers this recipe uses do not produce that. + one ``target_config`` (identical loss/metric keys). The ``None`` early + return similarly assumes ranks are seeded together: under DDP every + rank gets at least one sample, so the accumulators are non-``None`` on + all ranks at once. """ if losses_td is None or metrics_td is None: - return total_loss / max(n_local, 1), {}, {} + return loss_sum / max(n_samples, 1), {}, {} loss_keys = cast(list[str], list(losses_td.keys())) metric_keys = cast(list[str], list(metrics_td.keys())) leaves = cast( list[torch.Tensor], list(losses_td.values()) + list(metrics_td.values()) ) - ### [total_loss, n_local, *loss_sums, *metric_sums] -> one collective. + ### [loss_sum, n_samples, *loss_sums, *metric_sums] -> one collective. packed = torch.cat( [ - torch.tensor([total_loss, float(n_local)], device=device), + torch.tensor([loss_sum, float(n_samples)], device=device), torch.stack(leaves).float().to(device), ] ) @@ -378,7 +355,7 @@ def _run_epoch( ### end-of-epoch. ``None`` here means "not yet seeded"; the first ### iteration clones the per-step TensorDict to break aliasing. ### ``n_local`` below is this rank's step/sample count. The averaging - ### denominator is the GLOBAL count that ``_reduce_and_average_epoch`` + ### denominator is the GLOBAL count that ``_reduce_and_average`` ### all-reduces from each rank's ``n_local`` at end-of-epoch; the local ### value is reused directly only for the per-rank step-rate line. total_loss = 0.0 @@ -441,12 +418,25 @@ def _run_epoch( if torch.cuda.is_available() else 0 ) + + ### Reduce this step's loss + metrics across ranks so the iteration + ### logs are global all-rank means, not rank-0's shard. This is a + ### collective: EVERY rank must call it, so it sits outside the + ### rank-0 logging gate below. ``n_samples=1`` is this step's local + ### sample count (one batch == one sample), matching the + ### ``n_local += 1`` accumulation above. Equal per-rank step counts + ### keep the per-step collective deadlock-free (see + ### ``_reduce_and_average``). + step_loss, step_losses, step_metrics = _reduce_and_average( + this_loss, losses, metrics, 1, device=dist_manager.device + ) + ### Train mode includes Mem in the per-step line; val drops it ### because the no_grad path is the lowest-noise place to look. mem_str = f" Mem: {mem_gb:.2f}GB" if is_train else "" logger.info( f"{log_prefix} {epoch} [{i + 1}/{num_steps}] " - f"Loss: {this_loss:.6f} " + f"Loss: {step_loss:.6f} " f"Step: {step_dt:.3f}s" f"{mem_str}" ) @@ -455,9 +445,10 @@ def _run_epoch( ### epoch-only to keep dashboards uncluttered). Per-step JSONL is ### emitted in both modes so downstream tooling can compute val ### step-time statistics directly instead of inferring them from - ### ``val_ts - train_ts``. + ### ``val_ts - train_ts``. The logged loss / metrics are the global + ### all-rank means from ``_reduce_and_average`` above (not rank-0's + ### shard); rank 0 is only the writer. if is_rank0: - losses_floats, metrics_floats = _to_float_dicts(losses, metrics) if is_train: global_step = epoch * num_steps + i if writer is not None: @@ -466,10 +457,10 @@ def _run_epoch( ### metric tags get an explicit `iteration/metrics/...` ### namespace so we never have to split by string prefix. _log_to_tensorboard( - writer, losses_floats, "iteration", global_step + writer, step_losses, "iteration", global_step ) _log_to_tensorboard( - writer, metrics_floats, "iteration/metrics", global_step + writer, step_metrics, "iteration/metrics", global_step ) writer.add_scalar( "iteration/lr", @@ -491,11 +482,11 @@ def _run_epoch( { "phase": "train_step", "global_step": global_step, - "loss": this_loss, + "loss": step_loss, "mem_gb": mem_gb, "step_time_s": step_dt, - **losses_floats, - **metrics_floats, + **step_losses, + **step_metrics, } ) elif log_jsonl is not None: @@ -512,10 +503,10 @@ def _run_epoch( "phase": "val_step", "epoch": epoch, "val_step": i, - "loss": this_loss, + "loss": step_loss, "step_time_s": step_dt, - **losses_floats, - **metrics_floats, + **step_losses, + **step_metrics, } ) @@ -528,7 +519,7 @@ def _run_epoch( ### Reduce the epoch sums + sample count across ranks once, so logged ### loss/metrics are the GLOBAL averages (not rank-0's shard) under ### DDP. `n` above is kept local for the per-rank step-rate line below. - avg_loss, avg_losses, avg_metrics = _reduce_and_average_epoch( + avg_loss, avg_losses, avg_metrics = _reduce_and_average( total_loss, total_losses_td, total_metrics_td, diff --git a/examples/cfd/external_aerodynamics/unified_external_aero_recipe/tests/test_train_helpers.py b/examples/cfd/external_aerodynamics/unified_external_aero_recipe/tests/test_train_helpers.py index 46741efb4f..71328499d1 100644 --- a/examples/cfd/external_aerodynamics/unified_external_aero_recipe/tests/test_train_helpers.py +++ b/examples/cfd/external_aerodynamics/unified_external_aero_recipe/tests/test_train_helpers.py @@ -29,9 +29,10 @@ model output (``Mesh`` or ``(B, N, C)`` tensor) to a per-target TensorDict, with clear error messages on shape / channel-count mismatches. -- :func:`train._reduce_and_average_epoch`: averages epoch loss / metric - sums over the global sample count; its single-process path must equal - the previous ``total_loss / n`` + per-leaf ``sum / n`` averaging. +- :func:`train._reduce_and_average`: averages rank-local loss / metric + sums over the global sample count (used per step and per epoch); its + single-process path must equal plain ``total_loss / n`` + per-leaf + ``sum / n`` averaging. (The analogous tests for the shared, tensorboard-free :func:`utils.recursive_to_device` live in ``test_utils.py``, outside @@ -54,8 +55,7 @@ from output_normalize import normalize_output_to_tensordict # noqa: E402 from train import ( # noqa: E402 -- after the skip guard - _reduce_and_average_epoch, - _to_float_dicts, + _reduce_and_average, _walk_batch_for_logging, ) @@ -182,12 +182,12 @@ def test_mesh_output_missing_target_raises(self): ### --------------------------------------------------------------------------- -### _reduce_and_average_epoch +### _reduce_and_average ### --------------------------------------------------------------------------- -class TestReduceAndAverageEpoch: - """Tests for `_reduce_and_average_epoch` (single-process path). +class TestReduceAndAverage: + """Tests for `_reduce_and_average` (single-process path). The distributed branch is gated on an initialized process group with ``world_size > 1``; with no group initialized these tests exercise the @@ -213,35 +213,23 @@ def _epoch_sums() -> tuple[TensorDict, TensorDict]: def test_single_process_divides_sums_by_local_count(self): """No process group: global average == local sum / n_local.""" losses_td, metrics_td = self._epoch_sums() - avg_loss, avg_losses, avg_metrics = _reduce_and_average_epoch( + avg_loss, avg_losses, avg_metrics = _reduce_and_average( 15.0, losses_td, metrics_td, 3, device="cpu" ) assert avg_loss == pytest.approx(5.0) assert avg_losses == pytest.approx({"pressure": 2.0, "wss": 3.0}) assert avg_metrics == pytest.approx({"pressure_l2": 1.0, "wss_mae": 4.0}) - def test_single_process_matches_to_float_dicts(self): - """Equivalent to the `total_loss / n` + `_to_float_dicts(n=...)` it replaced.""" - losses_td, metrics_td = self._epoch_sums() - n_local, total_loss = 4, 10.0 - old_losses, old_metrics = _to_float_dicts(losses_td, metrics_td, n=n_local) - new_loss, new_losses, new_metrics = _reduce_and_average_epoch( - total_loss, losses_td, metrics_td, n_local, device="cpu" - ) - assert new_loss == pytest.approx(total_loss / n_local) - assert new_losses == pytest.approx(old_losses) - assert new_metrics == pytest.approx(old_metrics) - def test_none_sentinel_returns_loss_only(self): """The "no steps seeded" sentinel (either TD ``None``) yields (loss / n, {}, {}).""" - assert _reduce_and_average_epoch(8.0, None, None, 2, device="cpu") == ( + assert _reduce_and_average(8.0, None, None, 2, device="cpu") == ( 4.0, {}, {}, ) ### A single ``None`` is enough to trip the sentinel. losses_td, _ = self._epoch_sums() - assert _reduce_and_average_epoch(8.0, losses_td, None, 2, device="cpu") == ( + assert _reduce_and_average(8.0, losses_td, None, 2, device="cpu") == ( 4.0, {}, {}, @@ -249,7 +237,7 @@ def test_none_sentinel_returns_loss_only(self): def test_zero_local_count_avoids_zero_division(self): """``n_local == 0`` (a step-less epoch) divides by 1, not 0.""" - assert _reduce_and_average_epoch(7.0, None, None, 0, device="cpu") == ( + assert _reduce_and_average(7.0, None, None, 0, device="cpu") == ( 7.0, {}, {}, From 1c1e8643d87f4f233aebe63bf981920431a334a7 Mon Sep 17 00:00:00 2001 From: Peter Sharpe Date: Fri, 26 Jun 2026 12:46:28 -0400 Subject: [PATCH 15/15] trim comment a hair --- .../unified_external_aero_recipe/src/utils.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/examples/cfd/external_aerodynamics/unified_external_aero_recipe/src/utils.py b/examples/cfd/external_aerodynamics/unified_external_aero_recipe/src/utils.py index f9ce7983d0..85bd4fc26f 100644 --- a/examples/cfd/external_aerodynamics/unified_external_aero_recipe/src/utils.py +++ b/examples/cfd/external_aerodynamics/unified_external_aero_recipe/src/utils.py @@ -51,9 +51,7 @@ ### ``{split}_{granularity}`` (or a one-shot metadata tag): ``config`` / ### ``dataset`` are run metadata; ``*_step`` rows are per-unit (one per step / ### sample, as the recipe runs ``batch_size == 1``); ``*_summary`` rows are the -### reduced per-pass aggregates (``infer_forces_summary`` is surface-only). Call -### sites write the bare strings; annotate a value ``: Phase`` to have the type -### checker enforce them (see the train/val summary branch in ``_run_epoch``). +### reduced per-pass aggregates (``infer_forces_summary`` is surface-only). Phase: TypeAlias = Literal[ "config", "dataset",