功能样例

这些示例将会帮助您快速了解如何在Ascend NPU上使用PyTorch的相关特性。

备注

在运行下述示例之前,需要您已经安装了PyTorch-NPU环境,有关环境安装,请参考 安装指南

1. 数据并行

PyTorch的数据并行主要分为以下几种:DP、DDP以及FSDP(HSDP变种),接下来将简单描述在Ascend NPU场景下如何实现上述数据并行。

1.1 DDP

 1# encoding: UTF-8
 2
 3import os
 4import torch
 5import torch.distributed as dist
 6import torch.multiprocessing as mp
 7import torch.nn as nn
 8import torch.optim as optim
 9from torch.nn.parallel import DistributedDataParallel as DDP
10
11# 引入torch-npu包
12import torch_npu
13
14
15class ToyModel(nn.Module):
16    def __init__(self):
17        super(ToyModel, self).__init__()
18        self.net1 = nn.Linear(10, 10)
19        self.relu = nn.ReLU()
20        self.net2 = nn.Linear(10, 5)
21
22    def forward(self, x):
23        return self.net2(self.relu(self.net1(x)))
24
25
26def setup(rank, world_size):
27    os.environ["MASTER_ADDR"] = "localhost"
28    os.environ["MASTER_PORT"] = "29500"
29
30    # initialize the process group
31    dist.init_process_group("hccl", rank=rank, world_size=world_size)
32
33
34def example(rank, world_size):
35    device = torch.device("npu:{}".format(rank))
36    # create default process group
37    setup(rank, world_size)
38    # create local model
39    model = ToyModel().to(device)
40    # construct DDP model
41    ddp_model = DDP(model, device_ids=[rank])
42    # define loss function and optimizer
43    loss_fn = nn.MSELoss()
44    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)
45
46    # forward pass
47    outputs = ddp_model(torch.randn(20, 10).to(device))
48    # backward pass
49    labels = torch.randn(20, 5).to(device)
50    loss_fn(outputs, labels).backward()
51    # update parameters
52    optimizer.step()
53
54
55def main():
56    n_npus = torch.cuda.device_count()
57    assert n_npus >= 2, f"Requires at least 2 NPUs to run, but got {n_npus}"
58    world_size = n_npus
59    mp.spawn(example, args=(world_size,), nprocs=world_size, join=True)
60
61
62if __name__ == "__main__":
63    main()

1.2 FSDP

 1# encoding: UTF-8
 2
 3import os
 4import torch
 5import torch.distributed as dist
 6import torch.multiprocessing as mp
 7import torch.nn as nn
 8import torch.optim as optim
 9from torch.distributed.fsdp import FullyShardedDataParallel as FSDP
10
11# 引入torch-npu包
12import torch_npu
13
14
15class ToyModel(nn.Module):
16    def __init__(self):
17        super(ToyModel, self).__init__()
18        self.net1 = nn.Linear(10, 10)
19        self.relu = nn.ReLU()
20        self.net2 = nn.Linear(10, 5)
21
22    def forward(self, x):
23        return self.net2(self.relu(self.net1(x)))
24
25
26def setup(rank, world_size):
27    os.environ["MASTER_ADDR"] = "localhost"
28    os.environ["MASTER_PORT"] = "29500"
29
30    # initialize the process group
31    dist.init_process_group("hccl", rank=rank, world_size=world_size)
32
33
34def example(rank, world_size):
35    device = torch.device("npu:{}".format(rank))
36    # create default process group
37    setup(rank, world_size)
38    # create local model
39    model = ToyModel().to(device)
40    # construct FSDP model
41    ddp_model = FSDP(model, device_id=rank)
42    # define loss function and optimizer
43    loss_fn = nn.MSELoss()
44    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)
45
46    # forward pass
47    outputs = ddp_model(torch.randn(20, 10).to(device))
48    # backward pass
49    labels = torch.randn(20, 5).to(device)
50    loss_fn(outputs, labels).backward()
51    # update parameters
52    optimizer.step()
53
54
55def main():
56    n_npus = torch.cuda.device_count()
57    assert n_npus >= 2, f"Requires at least 2 NPUs to run, but got {n_npus}"
58    world_size = n_npus
59    mp.spawn(example, args=(world_size,), nprocs=world_size, join=True)
60
61
62if __name__ == "__main__":
63    main()