快速开始

备注

在本示例之前,请确保已经安装了 DeepSpeed 环境。 如果还未安装,可以执行 pip install deepspeed 完成安装。

1. 使用DeepSpeed多卡并行训练

以下代码使用了cifar10数据集,使用DeepSpeed训练模型在多张NPU卡上进行模型训练(来自 DeepSpeed Examples),自DeepSpeed v0.12.6之后,代码无需任何修改,即可自动检测NPU并进行训练。

  1import argparse
  2
  3import deepspeed
  4import torch
  5import torch.nn as nn
  6import torch.nn.functional as F
  7import torchvision
  8import torchvision.transforms as transforms
  9from deepspeed.accelerator import get_accelerator
 10from deepspeed.moe.utils import split_params_into_different_moe_groups_for_optimizer
 11
 12
 13def add_argument():
 14    parser = argparse.ArgumentParser(description="CIFAR")
 15
 16    # For train.
 17    parser.add_argument(
 18        "-e",
 19        "--epochs",
 20        default=30,
 21        type=int,
 22        help="number of total epochs (default: 30)",
 23    )
 24    parser.add_argument(
 25        "--local_rank",
 26        type=int,
 27        default=-1,
 28        help="local rank passed from distributed launcher",
 29    )
 30    parser.add_argument(
 31        "--log-interval",
 32        type=int,
 33        default=2000,
 34        help="output logging information at a given interval",
 35    )
 36
 37    # For mixed precision training.
 38    parser.add_argument(
 39        "--dtype",
 40        default="fp16",
 41        type=str,
 42        choices=["bf16", "fp16", "fp32"],
 43        help="Datatype used for training",
 44    )
 45
 46    # For ZeRO Optimization.
 47    parser.add_argument(
 48        "--stage",
 49        default=0,
 50        type=int,
 51        choices=[0, 1, 2, 3],
 52        help="Datatype used for training",
 53    )
 54
 55    # For MoE (Mixture of Experts).
 56    parser.add_argument(
 57        "--moe",
 58        default=False,
 59        action="store_true",
 60        help="use deepspeed mixture of experts (moe)",
 61    )
 62    parser.add_argument(
 63        "--ep-world-size", default=1, type=int, help="(moe) expert parallel world size"
 64    )
 65    parser.add_argument(
 66        "--num-experts",
 67        type=int,
 68        nargs="+",
 69        default=[
 70            1,
 71        ],
 72        help="number of experts list, MoE related.",
 73    )
 74    parser.add_argument(
 75        "--mlp-type",
 76        type=str,
 77        default="standard",
 78        help="Only applicable when num-experts > 1, accepts [standard, residual]",
 79    )
 80    parser.add_argument(
 81        "--top-k", default=1, type=int, help="(moe) gating top 1 and 2 supported"
 82    )
 83    parser.add_argument(
 84        "--min-capacity",
 85        default=0,
 86        type=int,
 87        help="(moe) minimum capacity of an expert regardless of the capacity_factor",
 88    )
 89    parser.add_argument(
 90        "--noisy-gate-policy",
 91        default=None,
 92        type=str,
 93        help="(moe) noisy gating (only supported with top-1). Valid values are None, RSample, and Jitter",
 94    )
 95    parser.add_argument(
 96        "--moe-param-group",
 97        default=False,
 98        action="store_true",
 99        help="(moe) create separate moe param groups, required when using ZeRO w. MoE",
100    )
101
102    # Include DeepSpeed configuration arguments.
103    parser = deepspeed.add_config_arguments(parser)
104
105    args = parser.parse_args()
106
107    return args
108
109
110def create_moe_param_groups(model):
111    """Create separate parameter groups for each expert."""
112    parameters = {"params": [p for p in model.parameters()], "name": "parameters"}
113    return split_params_into_different_moe_groups_for_optimizer(parameters)
114
115
116def get_ds_config(args):
117    """Get the DeepSpeed configuration dictionary."""
118    ds_config = {
119        "train_batch_size": 16,
120        "steps_per_print": 2000,
121        "optimizer": {
122            "type": "Adam",
123            "params": {
124                "lr": 0.001,
125                "betas": [0.8, 0.999],
126                "eps": 1e-8,
127                "weight_decay": 3e-7,
128            },
129        },
130        "scheduler": {
131            "type": "WarmupLR",
132            "params": {
133                "warmup_min_lr": 0,
134                "warmup_max_lr": 0.001,
135                "warmup_num_steps": 1000,
136            },
137        },
138        "gradient_clipping": 1.0,
139        "prescale_gradients": False,
140        "bf16": {"enabled": args.dtype == "bf16"},
141        "fp16": {
142            "enabled": args.dtype == "fp16",
143            "fp16_master_weights_and_grads": False,
144            "loss_scale": 0,
145            "loss_scale_window": 500,
146            "hysteresis": 2,
147            "min_loss_scale": 1,
148            "initial_scale_power": 15,
149        },
150        "wall_clock_breakdown": False,
151        "zero_optimization": {
152            "stage": args.stage,
153            "allgather_partitions": True,
154            "reduce_scatter": True,
155            "allgather_bucket_size": 50000000,
156            "reduce_bucket_size": 50000000,
157            "overlap_comm": True,
158            "contiguous_gradients": True,
159            "cpu_offload": False,
160        },
161    }
162    return ds_config
163
164
165class Net(nn.Module):
166    def __init__(self, args):
167        super(Net, self).__init__()
168        self.conv1 = nn.Conv2d(3, 6, 5)
169        self.pool = nn.MaxPool2d(2, 2)
170        self.conv2 = nn.Conv2d(6, 16, 5)
171        self.fc1 = nn.Linear(16 * 5 * 5, 120)
172        self.fc2 = nn.Linear(120, 84)
173        self.moe = args.moe
174        if self.moe:
175            fc3 = nn.Linear(84, 84)
176            self.moe_layer_list = []
177            for n_e in args.num_experts:
178                # Create moe layers based on the number of experts.
179                self.moe_layer_list.append(
180                    deepspeed.moe.layer.MoE(
181                        hidden_size=84,
182                        expert=fc3,
183                        num_experts=n_e,
184                        ep_size=args.ep_world_size,
185                        use_residual=args.mlp_type == "residual",
186                        k=args.top_k,
187                        min_capacity=args.min_capacity,
188                        noisy_gate_policy=args.noisy_gate_policy,
189                    )
190                )
191            self.moe_layer_list = nn.ModuleList(self.moe_layer_list)
192            self.fc4 = nn.Linear(84, 10)
193        else:
194            self.fc3 = nn.Linear(84, 10)
195
196    def forward(self, x):
197        x = self.pool(F.relu(self.conv1(x)))
198        x = self.pool(F.relu(self.conv2(x)))
199        x = x.view(-1, 16 * 5 * 5)
200        x = F.relu(self.fc1(x))
201        x = F.relu(self.fc2(x))
202        if self.moe:
203            for layer in self.moe_layer_list:
204                x, _, _ = layer(x)
205            x = self.fc4(x)
206        else:
207            x = self.fc3(x)
208        return x
209
210
211def test(model_engine, testset, local_device, target_dtype, test_batch_size=4):
212    """Test the network on the test data.
213
214    Args:
215        model_engine (deepspeed.runtime.engine.DeepSpeedEngine): the DeepSpeed engine.
216        testset (torch.utils.data.Dataset): the test dataset.
217        local_device (str): the local device name.
218        target_dtype (torch.dtype): the target datatype for the test data.
219        test_batch_size (int): the test batch size.
220
221    """
222    # The 10 classes for CIFAR10.
223    classes = (
224        "plane",
225        "car",
226        "bird",
227        "cat",
228        "deer",
229        "dog",
230        "frog",
231        "horse",
232        "ship",
233        "truck",
234    )
235
236    # Define the test dataloader.
237    testloader = torch.utils.data.DataLoader(
238        testset, batch_size=test_batch_size, shuffle=False, num_workers=0
239    )
240
241    # For total accuracy.
242    correct, total = 0, 0
243    # For accuracy per class.
244    class_correct = list(0.0 for i in range(10))
245    class_total = list(0.0 for i in range(10))
246
247    # Start testing.
248    model_engine.eval()
249    with torch.no_grad():
250        for data in testloader:
251            images, labels = data
252            if target_dtype != None:
253                images = images.to(target_dtype)
254            outputs = model_engine(images.to(local_device))
255            _, predicted = torch.max(outputs.data, 1)
256            # Count the total accuracy.
257            total += labels.size(0)
258            correct += (predicted == labels.to(local_device)).sum().item()
259
260            # Count the accuracy per class.
261            batch_correct = (predicted == labels.to(local_device)).squeeze()
262            for i in range(test_batch_size):
263                label = labels[i]
264                class_correct[label] += batch_correct[i].item()
265                class_total[label] += 1
266
267    if model_engine.local_rank == 0:
268        print(
269            f"Accuracy of the network on the {total} test images: {100 * correct / total : .0f} %"
270        )
271
272        # For all classes, print the accuracy.
273        for i in range(10):
274            print(
275                f"Accuracy of {classes[i] : >5s} : {100 * class_correct[i] / class_total[i] : 2.0f} %"
276            )
277
278
279def main(args):
280    # Initialize DeepSpeed distributed backend.
281    deepspeed.init_distributed()
282
283    ########################################################################
284    # Step1. Data Preparation.
285    #
286    # The output of torchvision datasets are PILImage images of range [0, 1].
287    # We transform them to Tensors of normalized range [-1, 1].
288    #
289    # Note:
290    #     If running on Windows and you get a BrokenPipeError, try setting
291    #     the num_worker of torch.utils.data.DataLoader() to 0.
292    ########################################################################
293    transform = transforms.Compose(
294        [transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]
295    )
296
297    if torch.distributed.get_rank() != 0:
298        # Might be downloading cifar data, let rank 0 download first.
299        torch.distributed.barrier()
300
301    # Load or download cifar data.
302    trainset = torchvision.datasets.CIFAR10(
303        root="./data", train=True, download=True, transform=transform
304    )
305    testset = torchvision.datasets.CIFAR10(
306        root="./data", train=False, download=True, transform=transform
307    )
308
309    if torch.distributed.get_rank() == 0:
310        # Cifar data is downloaded, indicate other ranks can proceed.
311        torch.distributed.barrier()
312
313    ########################################################################
314    # Step 2. Define the network with DeepSpeed.
315    #
316    # First, we define a Convolution Neural Network.
317    # Then, we define the DeepSpeed configuration dictionary and use it to
318    # initialize the DeepSpeed engine.
319    ########################################################################
320    net = Net(args)
321
322    # Get list of parameters that require gradients.
323    parameters = filter(lambda p: p.requires_grad, net.parameters())
324
325    # If using MoE, create separate param groups for each expert.
326    if args.moe_param_group:
327        parameters = create_moe_param_groups(net)
328
329    # Initialize DeepSpeed to use the following features.
330    #   1) Distributed model.
331    #   2) Distributed data loader.
332    #   3) DeepSpeed optimizer.
333    ds_config = get_ds_config(args)
334    model_engine, optimizer, trainloader, __ = deepspeed.initialize(
335        args=args,
336        model=net,
337        model_parameters=parameters,
338        training_data=trainset,
339        config=ds_config,
340    )
341
342    # Get the local device name (str) and local rank (int).
343    local_device = get_accelerator().device_name(model_engine.local_rank)
344    local_rank = model_engine.local_rank
345
346    # For float32, target_dtype will be None so no datatype conversion needed.
347    target_dtype = None
348    if model_engine.bfloat16_enabled():
349        target_dtype = torch.bfloat16
350    elif model_engine.fp16_enabled():
351        target_dtype = torch.half
352
353    # Define the Classification Cross-Entropy loss function.
354    criterion = nn.CrossEntropyLoss()
355
356    ########################################################################
357    # Step 3. Train the network.
358    #
359    # This is when things start to get interesting.
360    # We simply have to loop over our data iterator, and feed the inputs to the
361    # network and optimize. (DeepSpeed handles the distributed details for us!)
362    ########################################################################
363
364    for epoch in range(args.epochs):  # loop over the dataset multiple times
365        running_loss = 0.0
366        for i, data in enumerate(trainloader):
367            # Get the inputs. ``data`` is a list of [inputs, labels].
368            inputs, labels = data[0].to(local_device), data[1].to(local_device)
369
370            # Try to convert to target_dtype if needed.
371            if target_dtype != None:
372                inputs = inputs.to(target_dtype)
373
374            outputs = model_engine(inputs)
375            loss = criterion(outputs, labels)
376
377            model_engine.backward(loss)
378            model_engine.step()
379
380            # Print statistics
381            running_loss += loss.item()
382            if local_rank == 0 and i % args.log_interval == (
383                args.log_interval - 1
384            ):  # Print every log_interval mini-batches.
385                print(
386                    f"[{epoch + 1 : d}, {i + 1 : 5d}] loss: {running_loss / args.log_interval : .3f}"
387                )
388                running_loss = 0.0
389    print("Finished Training")
390
391    ########################################################################
392    # Step 4. Test the network on the test data.
393    ########################################################################
394    test(model_engine, testset, local_device, target_dtype)
395
396
397if __name__ == "__main__":
398    args = add_argument()
399    main(args)

2. 训练结果查看

训练完成后,会打印模型对图像识别的结果。

 1Finished Training
 2Accuracy of the network on the 10000 test images:  57 %
 3Accuracy of plane :  65 %
 4Accuracy of   car :  67 %
 5Accuracy of  bird :  52 %
 6Accuracy of   cat :  34 %
 7Accuracy of  deer :  52 %
 8Accuracy of   dog :  49 %
 9Accuracy of  frog :  59 %
10Accuracy of horse :  66 %
11Accuracy of  ship :  66 %
12Accuracy of truck :  56 %