feat(megatron): split-API train-step state machine on MegatronPolicyWorker#2683
Open
mehraakash wants to merge 5 commits into
Open
feat(megatron): split-API train-step state machine on MegatronPolicyWorker#2683mehraakash wants to merge 5 commits into
mehraakash wants to merge 5 commits into
Conversation
…orker Adds begin_train_step / train_microbatch / finish_train_step / abort_train_step on MegatronPolicyWorkerImpl, mirroring the DTensor v1/v2 implementations but adapted for mcore's contiguous grad bucket + pipeline-schedule reduce path. Mechanism: - begin_train_step: zero_grad_buffer + optimizer.zero_grad, store loss_fn / gbs / mbs / local_valid_seqs/toks accumulators on _train_step_state, and null model.config.grad_sync_func (saved for restore) so the PP scheduler's direct reduce dispatch cannot bypass no_sync. - train_microbatch(data): wrap one ``megatron_forward_backward`` invocation in ``with self.model.no_sync():`` so mcore DDP hooks accumulate ``param.main_grad`` locally without dispatching the cross-DP reduce. Pass ``global_valid_seqs/toks=tensor(1.0)`` so the loss returns un-normalized sums; backward deposits raw d(sum)/dθ. Accumulate local mask sums + per-mb metrics + the total pipeline-microbatch count (for finish-time MoE aux-loss scaling). - finish_train_step: all_reduce mask sums to get true N (toks for TOKEN_LEVEL loss, seqs for SEQUENCE_LEVEL), call self.model.scale_gradients(1/N), then the one true cross-DP reduce via start_grad_sync + finish_grad_sync, optimizer.step (clips internally), restore grad_sync_func, scheduler.step(increment=gbs). Rescale per-mb metrics by 1/N (linear-in-1/N math), aggregate, surface global counts. - abort_train_step: restore grad_sync_func, zero_grad_buffer + zero_grad, drop state. ``trainer_version`` unchanged. Sync ``train()`` is left untouched. Includes CPU unit tests at tests/unit/models/policy/test_megatron_split_state.py covering the lifecycle and call-order invariants (no_sync wrap, grad_sync_func save/restore, mask-sum accumulation, N selection by loss_type, abort idempotence, MoE scaling). Marked pytest.mark.mcore so they run only in mcore-enabled CI containers. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> Signed-off-by: Akash Mehra <akamehra@nvidia.com>
Signed-off-by: Akash Mehra <akamehra@nvidia.com>
|
Auto-sync is disabled for ready for review pull requests in this repository. Workflows must be run manually. Contributors can view more details about this message here. |
Author
|
/ok to test |
3 tasks
Pre-existing zero-error file from NVIDIA-NeMo#2078 (Eagle3) that was never added to the project-includes whitelist. Carrying the fix forward in this PR to unblock the lint job. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> Signed-off-by: Akash Mehra <akamehra@nvidia.com>
Author
|
/ok to test 3d24224 |
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> Signed-off-by: Akash Mehra <akamehra@nvidia.com>
Author
|
/ok to test cb65400 |
The file is introduced by NVIDIA-NeMo#2692 (DTensor PR), not by this branch. Whitelisting it here causes pyrefly to fail with 'No Python files matched pattern' since the file does not exist on mcore. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> Signed-off-by: Akash Mehra <akamehra@nvidia.com>
Author
|
/ok to test 108ec17 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Adds begin_train_step / train_microbatch / finish_train_step / abort_train_step on MegatronPolicyWorkerImpl, mirroring the DTensor v1/v2 implementations but adapted for mcore's contiguous grad bucket + pipeline-schedule reduce path.
Mechanism:
megatron_forward_backwardinvocation inwith self.model.no_sync():so mcore DDP hooks accumulateparam.main_gradlocally without dispatching the cross-DP reduce. Passglobal_valid_seqs/toks=tensor(1.0)so the loss returns un-normalized sums; backward deposits raw d(sum)/dθ. Accumulate local mask sums + per-mb metrics + the total pipeline-microbatch count (for finish-time MoE aux-loss scaling).trainer_versionunchanged.Sync
train()is left untouched.Includes CPU unit tests at tests/unit/models/policy/test_megatron_split_state.py covering the lifecycle and call-order invariants (no_sync wrap, grad_sync_func save/restore, mask-sum accumulation, N selection by loss_type, abort idempotence, MoE scaling). Marked pytest.mark.mcore so they run only in mcore-enabled CI containers.
What does this PR do ?
Add a one line overview of what this PR aims to accomplish.
Issues
List issues that this PR closes (syntax):
Usage
# Add a code snippet demonstrating how to use thisBefore your PR is "Ready for review"
Pre checks:
Additional Information