Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion examples/advanced/multi-gpu/pt/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,22 @@ Set `--nproc_per_node` to the number of GPUs you want to use:
python3 -m torch.distributed.run --nproc_per_node=4 client.py
```

The client script uses the global distributed rank for NVFlare Client API and
the local rank for CUDA device placement:

```python
import os

global_rank = dist.get_rank()
local_rank = int(os.environ["LOCAL_RANK"])

torch.cuda.set_device(local_rank)
flare.init(rank=global_rank)
```

`torchrun` sets both `RANK` and `LOCAL_RANK`. `RANK` is unique across the whole
distributed job, while `LOCAL_RANK` is only unique on the current node.

### Multiple Clients on Same Machine
When running multiple clients on the same machine, use different master ports:
```python
Expand Down Expand Up @@ -118,4 +134,3 @@ python -c "import torch; print(torch.cuda.device_count())"
- [PyTorch DDP Tutorial](https://pytorch.org/tutorials/intermediate/ddp_tutorial.html)
- [torch.distributed.run Documentation](https://pytorch.org/docs/stable/distributed.html#launch-utility)
- [NVFlare Documentation](https://nvflare.readthedocs.io/)

42 changes: 28 additions & 14 deletions examples/advanced/multi-gpu/pt/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
python -m torch.distributed.run --nnodes=1 --nproc_per_node=2 --master_port=7777 client.py
"""

import os

import torch
import torch.distributed as dist
import torch.nn as nn
Expand Down Expand Up @@ -61,10 +63,14 @@ def evaluate(input_weights, device, dataloader):
def main():
# Initialize distributed process group
dist.init_process_group("nccl")
rank = int(dist.get_rank())
device = f"cuda:{rank}"
torch.cuda.set_device(rank)
print(f"DDP rank {rank} initialized on {device}")
global_rank = int(dist.get_rank())
if "LOCAL_RANK" not in os.environ:
raise RuntimeError("LOCAL_RANK is required for torchrun/DDP GPU training. Launch with torchrun.")
local_rank = int(os.environ["LOCAL_RANK"])

device = f"cuda:{local_rank}"
torch.cuda.set_device(local_rank)
print(f"DDP global_rank={global_rank}, local_rank={local_rank} initialized on {device}")

# Data setup
transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
Expand All @@ -83,24 +89,32 @@ def main():
criterion = nn.CrossEntropyLoss()

# (2) initializes NVFlare client API
flare.init(rank=f"{rank}")
flare.init(rank=global_rank)

print(f"flare init DDP global_rank={global_rank}, local_rank={local_rank} initialized on {device}")
# (3) gets FLModel from NVFlare.
# Only rank 0 talks to NVFlare, so flare.is_running() never turns False on
# nonzero ranks — rank 0 must broadcast the running state so all ranks exit
# the loop together at job end.
while True:
running = [flare.is_running() if global_rank == 0 else None]
dist.broadcast_object_list(running, src=0)
if not running[0]:
break

print(f"flare init DDP rank {rank} initialized on {device}")
# (3) gets FLModel from NVFlare
while flare.is_running():
input_model = flare.receive()
if rank == 0:
if global_rank == 0:
print(f"\n[Round={input_model.current_round}, Site={flare.get_site_name()}]")
# (4) loads model from NVFlare
net.load_state_dict(input_model.params)
optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9)

# Wrap model with DDP
net.to(device)
ddp_model = DDP(net, device_ids=[rank])
ddp_model = DDP(net, device_ids=[local_rank])

# Sync model across ranks
if rank == 0:
if global_rank == 0:
torch.save(ddp_model.state_dict(), CHECKPOINT_PATH)

dist.barrier()
Expand All @@ -120,14 +134,14 @@ def main():
optimizer.step()

running_loss += loss.item()
if rank == 0 and i % 2000 == 1999:
if global_rank == 0 and i % 2000 == 1999:
print(f"[{epoch + 1}, {i + 1:5d}] loss: {running_loss / 2000:.3f}")
running_loss = 0.0

print(f"Rank {rank}: Finished Training")
print(f"Rank {global_rank}: Finished Training")

# Only rank 0 sends model back
if rank == 0:
if global_rank == 0:
# All processes should see same parameters as they all start from same
# random parameters and gradients are synchronized in backward passes.
# Therefore, saving it in one process is sufficient.
Expand Down
40 changes: 27 additions & 13 deletions examples/docker/jobs/pt-ddp-docker/app_site-1/custom/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
python -m torch.distributed.run --nnodes=1 --nproc_per_node=2 --master_port=7777 client.py
"""

import os

import torch
import torch.distributed as dist
import torch.nn as nn
Expand Down Expand Up @@ -61,10 +63,14 @@ def evaluate(input_weights, device, dataloader):
def main():
# Initialize distributed process group
dist.init_process_group("nccl")
rank = int(dist.get_rank())
device = f"cuda:{rank}"
torch.cuda.set_device(rank)
print(f"DDP rank {rank} initialized on {device}")
global_rank = int(dist.get_rank())
if "LOCAL_RANK" not in os.environ:
raise RuntimeError("LOCAL_RANK is required for torchrun/DDP GPU training. Launch with torchrun.")
local_rank = int(os.environ["LOCAL_RANK"])

device = f"cuda:{local_rank}"
torch.cuda.set_device(local_rank)
print(f"DDP global_rank={global_rank}, local_rank={local_rank} initialized on {device}")

# Data setup
transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
Expand All @@ -83,13 +89,21 @@ def main():
criterion = nn.CrossEntropyLoss()

# (2) initializes NVFlare client API
flare.init(rank=f"{rank}")
flare.init(rank=global_rank)

print(f"flare init DDP global_rank={global_rank}, local_rank={local_rank} initialized on {device}")
# (3) gets FLModel from NVFlare.
# Only rank 0 talks to NVFlare, so flare.is_running() never turns False on
# nonzero ranks — rank 0 must broadcast the running state so all ranks exit
# the loop together at job end.
while True:
running = [flare.is_running() if global_rank == 0 else None]
dist.broadcast_object_list(running, src=0)
if not running[0]:
break

print(f"flare init DDP rank {rank} initialized on {device}")
# (3) gets FLModel from NVFlare
while flare.is_running():
input_model = flare.receive()
if rank == 0:
if global_rank == 0:
print(f"\n[Round={input_model.current_round}, Site={flare.get_site_name()}]")
# (4) loads model from NVFlare
net.load_state_dict(input_model.params)
Expand All @@ -105,7 +119,7 @@ def main():

# Wrap model with DDP — all ranks now have identical parameters
net.to(device)
ddp_model = DDP(net, device_ids=[rank])
ddp_model = DDP(net, device_ids=[local_rank])
optimizer = optim.SGD(ddp_model.parameters(), lr=0.001, momentum=0.9)

# Training loop
Expand All @@ -122,14 +136,14 @@ def main():
optimizer.step()

running_loss += loss.item()
if rank == 0 and i % 2000 == 1999:
if global_rank == 0 and i % 2000 == 1999:
print(f"[{epoch + 1}, {i + 1:5d}] loss: {running_loss / 2000:.3f}")
running_loss = 0.0

print(f"Rank {rank}: Finished Training")
print(f"Rank {global_rank}: Finished Training")

# Only rank 0 sends model back
if rank == 0:
if global_rank == 0:
# All processes should see same parameters as they all start from same
# random parameters and gradients are synchronized in backward passes.
# Therefore, saving it in one process is sufficient.
Expand Down
40 changes: 27 additions & 13 deletions examples/docker/jobs/pt-ddp-docker/app_site-2/custom/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
python -m torch.distributed.run --nnodes=1 --nproc_per_node=2 --master_port=7777 client.py
"""

import os

import torch
import torch.distributed as dist
import torch.nn as nn
Expand Down Expand Up @@ -61,10 +63,14 @@ def evaluate(input_weights, device, dataloader):
def main():
# Initialize distributed process group
dist.init_process_group("nccl")
rank = int(dist.get_rank())
device = f"cuda:{rank}"
torch.cuda.set_device(rank)
print(f"DDP rank {rank} initialized on {device}")
global_rank = int(dist.get_rank())
if "LOCAL_RANK" not in os.environ:
raise RuntimeError("LOCAL_RANK is required for torchrun/DDP GPU training. Launch with torchrun.")
local_rank = int(os.environ["LOCAL_RANK"])

device = f"cuda:{local_rank}"
torch.cuda.set_device(local_rank)
print(f"DDP global_rank={global_rank}, local_rank={local_rank} initialized on {device}")

# Data setup
transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
Expand All @@ -83,13 +89,21 @@ def main():
criterion = nn.CrossEntropyLoss()

# (2) initializes NVFlare client API
flare.init(rank=f"{rank}")
flare.init(rank=global_rank)

print(f"flare init DDP global_rank={global_rank}, local_rank={local_rank} initialized on {device}")
# (3) gets FLModel from NVFlare.
# Only rank 0 talks to NVFlare, so flare.is_running() never turns False on
# nonzero ranks — rank 0 must broadcast the running state so all ranks exit
# the loop together at job end.
while True:
running = [flare.is_running() if global_rank == 0 else None]
dist.broadcast_object_list(running, src=0)
if not running[0]:
break

print(f"flare init DDP rank {rank} initialized on {device}")
# (3) gets FLModel from NVFlare
while flare.is_running():
input_model = flare.receive()
if rank == 0:
if global_rank == 0:
print(f"\n[Round={input_model.current_round}, Site={flare.get_site_name()}]")
# (4) loads model from NVFlare
net.load_state_dict(input_model.params)
Expand All @@ -105,7 +119,7 @@ def main():

# Wrap model with DDP — all ranks now have identical parameters
net.to(device)
ddp_model = DDP(net, device_ids=[rank])
ddp_model = DDP(net, device_ids=[local_rank])
optimizer = optim.SGD(ddp_model.parameters(), lr=0.001, momentum=0.9)

# Training loop
Expand All @@ -122,14 +136,14 @@ def main():
optimizer.step()

running_loss += loss.item()
if rank == 0 and i % 2000 == 1999:
if global_rank == 0 and i % 2000 == 1999:
print(f"[{epoch + 1}, {i + 1:5d}] loss: {running_loss / 2000:.3f}")
running_loss = 0.0

print(f"Rank {rank}: Finished Training")
print(f"Rank {global_rank}: Finished Training")

# Only rank 0 sends model back
if rank == 0:
if global_rank == 0:
# All processes should see same parameters as they all start from same
# random parameters and gradients are synchronized in backward passes.
# Therefore, saving it in one process is sufficient.
Expand Down
5 changes: 3 additions & 2 deletions nvflare/client/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,9 @@ def init(rank: Optional[Union[str, int]] = None, config_file: Optional[str] = No
"""Initializes NVFlare Client API environment.
Args:
rank (str): local rank of the process.
It is only useful when the training script has multiple worker processes. (for example multi GPU)
rank (str): rank of the process for Client API control-path behavior.
In distributed training, use the global process rank (for example torchrun's RANK),
not the device-local rank used for GPU placement.
config_file (str): client api configuration.
Returns:
Expand Down
5 changes: 3 additions & 2 deletions nvflare/client/api_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,9 @@ def init(self, rank: Optional[str] = None):
"""Initializes NVFlare Client API environment.

Args:
rank (str): local rank of the process.
It is only useful when the training script has multiple worker processes. (for example multi GPU)
rank (str): rank of the process for Client API control-path behavior.
In distributed training, use the global process rank (for example torchrun's RANK),
not the device-local rank used for GPU placement.

Returns:
None
Expand Down
5 changes: 3 additions & 2 deletions nvflare/client/ex_process/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,9 @@ def init(self, rank: Optional[str] = None):
"""Initializes NVFlare Client API environment.

Args:
rank (str): local rank of the process.
It is only useful when the training script has multiple worker processes. (for example multi GPU)
rank (str): rank of the process for Client API control-path behavior.
In distributed training, use the global process rank (for example torchrun's RANK),
not the device-local rank used for GPU placement.
"""

if rank is None:
Expand Down
5 changes: 3 additions & 2 deletions nvflare/client/in_process/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,9 @@ def init(self, rank: Optional[str] = None, config: Optional[Dict] = None):

Args:
config (Union[str, Dict]): config dictionary.
rank (str): local rank of the process.
It is only useful when the training script has multiple worker processes. (for example multi GPU)
rank (str): rank of the process for Client API control-path behavior.
In distributed training, use the global process rank (for example torchrun's RANK),
not the device-local rank used for GPU placement.
"""

self.rank = rank
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
{
format_version = 2
app_script = "torchrun_client.py"
app_config = ""
executors = [
{
tasks = [
"train"
]
executor {
path = "nvflare.app_opt.pt.client_api_launcher_executor.PTClientAPILauncherExecutor"
args {
launcher_id = "launcher"
pipe_id = "pipe"
heartbeat_timeout = 60
params_exchange_format = "pytorch"
params_transfer_type = "FULL"
train_with_evaluation = false
}
}
}
]
task_data_filters = []
task_result_filters = []
components = [
{
id = "launcher"
path = "nvflare.app_common.launchers.subprocess_launcher.SubprocessLauncher"
args {
script = "python3 -m torch.distributed.run --standalone --nproc_per_node=2 custom/{app_script} {app_config}"
launch_once = false
shutdown_timeout = 30
}
}
{
id = "pipe"
path = "nvflare.fuel.utils.pipe.cell_pipe.CellPipe"
args {
mode = "PASSIVE"
site_name = "{SITE_NAME}"
token = "{JOB_ID}"
root_url = "{ROOT_URL}"
secure_mode = "{SECURE_MODE}"
workspace_dir = "{WORKSPACE}"
}
}
]
}
Loading
Loading