功能样例
这些示例将会帮助您快速了解如何在Ascend NPU上使用PyTorch的相关特性。
备注
在运行下述示例之前,需要您已经安装了PyTorch-NPU环境,有关环境安装,请参考 安装指南
1. 数据并行
PyTorch的数据并行主要分为以下几种:DP、DDP以及FSDP(HSDP变种),接下来将简单描述在Ascend NPU场景下如何实现上述数据并行。
1.1 DDP
1# encoding: UTF-8
2
3import os
4import torch
5import torch.distributed as dist
6import torch.multiprocessing as mp
7import torch.nn as nn
8import torch.optim as optim
9from torch.nn.parallel import DistributedDataParallel as DDP
10
11# 引入torch-npu包
12import torch_npu
13
14
15class ToyModel(nn.Module):
16 def __init__(self):
17 super(ToyModel, self).__init__()
18 self.net1 = nn.Linear(10, 10)
19 self.relu = nn.ReLU()
20 self.net2 = nn.Linear(10, 5)
21
22 def forward(self, x):
23 return self.net2(self.relu(self.net1(x)))
24
25
26def setup(rank, world_size):
27 os.environ["MASTER_ADDR"] = "localhost"
28 os.environ["MASTER_PORT"] = "29500"
29
30 # initialize the process group
31 dist.init_process_group("hccl", rank=rank, world_size=world_size)
32
33
34def example(rank, world_size):
35 device = torch.device("npu:{}".format(rank))
36 # create default process group
37 setup(rank, world_size)
38 # create local model
39 model = ToyModel().to(device)
40 # construct DDP model
41 ddp_model = DDP(model, device_ids=[rank])
42 # define loss function and optimizer
43 loss_fn = nn.MSELoss()
44 optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)
45
46 # forward pass
47 outputs = ddp_model(torch.randn(20, 10).to(device))
48 # backward pass
49 labels = torch.randn(20, 5).to(device)
50 loss_fn(outputs, labels).backward()
51 # update parameters
52 optimizer.step()
53
54
55def main():
56 n_npus = torch.cuda.device_count()
57 assert n_npus >= 2, f"Requires at least 2 NPUs to run, but got {n_npus}"
58 world_size = n_npus
59 mp.spawn(example, args=(world_size,), nprocs=world_size, join=True)
60
61
62if __name__ == "__main__":
63 main()
1.2 FSDP
1# encoding: UTF-8
2
3import os
4import torch
5import torch.distributed as dist
6import torch.multiprocessing as mp
7import torch.nn as nn
8import torch.optim as optim
9from torch.distributed.fsdp import FullyShardedDataParallel as FSDP
10
11# 引入torch-npu包
12import torch_npu
13
14
15class ToyModel(nn.Module):
16 def __init__(self):
17 super(ToyModel, self).__init__()
18 self.net1 = nn.Linear(10, 10)
19 self.relu = nn.ReLU()
20 self.net2 = nn.Linear(10, 5)
21
22 def forward(self, x):
23 return self.net2(self.relu(self.net1(x)))
24
25
26def setup(rank, world_size):
27 os.environ["MASTER_ADDR"] = "localhost"
28 os.environ["MASTER_PORT"] = "29500"
29
30 # initialize the process group
31 dist.init_process_group("hccl", rank=rank, world_size=world_size)
32
33
34def example(rank, world_size):
35 device = torch.device("npu:{}".format(rank))
36 # create default process group
37 setup(rank, world_size)
38 # create local model
39 model = ToyModel().to(device)
40 # construct FSDP model
41 ddp_model = FSDP(model, device_id=rank)
42 # define loss function and optimizer
43 loss_fn = nn.MSELoss()
44 optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)
45
46 # forward pass
47 outputs = ddp_model(torch.randn(20, 10).to(device))
48 # backward pass
49 labels = torch.randn(20, 5).to(device)
50 loss_fn(outputs, labels).backward()
51 # update parameters
52 optimizer.step()
53
54
55def main():
56 n_npus = torch.cuda.device_count()
57 assert n_npus >= 2, f"Requires at least 2 NPUs to run, but got {n_npus}"
58 world_size = n_npus
59 mp.spawn(example, args=(world_size,), nprocs=world_size, join=True)
60
61
62if __name__ == "__main__":
63 main()