快速开始
备注
在本示例之前,请确保已经安装了 DeepSpeed 环境。 如果还未安装,可以执行 pip install deepspeed
完成安装。
1. 使用DeepSpeed多卡并行训练
以下代码使用了cifar10数据集,使用DeepSpeed训练模型在多张NPU卡上进行模型训练(来自 DeepSpeed Examples),自DeepSpeed v0.12.6之后,代码无需任何修改,即可自动检测NPU并进行训练。
1import argparse
2
3import deepspeed
4import torch
5import torch.nn as nn
6import torch.nn.functional as F
7import torchvision
8import torchvision.transforms as transforms
9from deepspeed.accelerator import get_accelerator
10from deepspeed.moe.utils import split_params_into_different_moe_groups_for_optimizer
11
12
13def add_argument():
14 parser = argparse.ArgumentParser(description="CIFAR")
15
16 # For train.
17 parser.add_argument(
18 "-e",
19 "--epochs",
20 default=30,
21 type=int,
22 help="number of total epochs (default: 30)",
23 )
24 parser.add_argument(
25 "--local_rank",
26 type=int,
27 default=-1,
28 help="local rank passed from distributed launcher",
29 )
30 parser.add_argument(
31 "--log-interval",
32 type=int,
33 default=2000,
34 help="output logging information at a given interval",
35 )
36
37 # For mixed precision training.
38 parser.add_argument(
39 "--dtype",
40 default="fp16",
41 type=str,
42 choices=["bf16", "fp16", "fp32"],
43 help="Datatype used for training",
44 )
45
46 # For ZeRO Optimization.
47 parser.add_argument(
48 "--stage",
49 default=0,
50 type=int,
51 choices=[0, 1, 2, 3],
52 help="Datatype used for training",
53 )
54
55 # For MoE (Mixture of Experts).
56 parser.add_argument(
57 "--moe",
58 default=False,
59 action="store_true",
60 help="use deepspeed mixture of experts (moe)",
61 )
62 parser.add_argument(
63 "--ep-world-size", default=1, type=int, help="(moe) expert parallel world size"
64 )
65 parser.add_argument(
66 "--num-experts",
67 type=int,
68 nargs="+",
69 default=[
70 1,
71 ],
72 help="number of experts list, MoE related.",
73 )
74 parser.add_argument(
75 "--mlp-type",
76 type=str,
77 default="standard",
78 help="Only applicable when num-experts > 1, accepts [standard, residual]",
79 )
80 parser.add_argument(
81 "--top-k", default=1, type=int, help="(moe) gating top 1 and 2 supported"
82 )
83 parser.add_argument(
84 "--min-capacity",
85 default=0,
86 type=int,
87 help="(moe) minimum capacity of an expert regardless of the capacity_factor",
88 )
89 parser.add_argument(
90 "--noisy-gate-policy",
91 default=None,
92 type=str,
93 help="(moe) noisy gating (only supported with top-1). Valid values are None, RSample, and Jitter",
94 )
95 parser.add_argument(
96 "--moe-param-group",
97 default=False,
98 action="store_true",
99 help="(moe) create separate moe param groups, required when using ZeRO w. MoE",
100 )
101
102 # Include DeepSpeed configuration arguments.
103 parser = deepspeed.add_config_arguments(parser)
104
105 args = parser.parse_args()
106
107 return args
108
109
110def create_moe_param_groups(model):
111 """Create separate parameter groups for each expert."""
112 parameters = {"params": [p for p in model.parameters()], "name": "parameters"}
113 return split_params_into_different_moe_groups_for_optimizer(parameters)
114
115
116def get_ds_config(args):
117 """Get the DeepSpeed configuration dictionary."""
118 ds_config = {
119 "train_batch_size": 16,
120 "steps_per_print": 2000,
121 "optimizer": {
122 "type": "Adam",
123 "params": {
124 "lr": 0.001,
125 "betas": [0.8, 0.999],
126 "eps": 1e-8,
127 "weight_decay": 3e-7,
128 },
129 },
130 "scheduler": {
131 "type": "WarmupLR",
132 "params": {
133 "warmup_min_lr": 0,
134 "warmup_max_lr": 0.001,
135 "warmup_num_steps": 1000,
136 },
137 },
138 "gradient_clipping": 1.0,
139 "prescale_gradients": False,
140 "bf16": {"enabled": args.dtype == "bf16"},
141 "fp16": {
142 "enabled": args.dtype == "fp16",
143 "fp16_master_weights_and_grads": False,
144 "loss_scale": 0,
145 "loss_scale_window": 500,
146 "hysteresis": 2,
147 "min_loss_scale": 1,
148 "initial_scale_power": 15,
149 },
150 "wall_clock_breakdown": False,
151 "zero_optimization": {
152 "stage": args.stage,
153 "allgather_partitions": True,
154 "reduce_scatter": True,
155 "allgather_bucket_size": 50000000,
156 "reduce_bucket_size": 50000000,
157 "overlap_comm": True,
158 "contiguous_gradients": True,
159 "cpu_offload": False,
160 },
161 }
162 return ds_config
163
164
165class Net(nn.Module):
166 def __init__(self, args):
167 super(Net, self).__init__()
168 self.conv1 = nn.Conv2d(3, 6, 5)
169 self.pool = nn.MaxPool2d(2, 2)
170 self.conv2 = nn.Conv2d(6, 16, 5)
171 self.fc1 = nn.Linear(16 * 5 * 5, 120)
172 self.fc2 = nn.Linear(120, 84)
173 self.moe = args.moe
174 if self.moe:
175 fc3 = nn.Linear(84, 84)
176 self.moe_layer_list = []
177 for n_e in args.num_experts:
178 # Create moe layers based on the number of experts.
179 self.moe_layer_list.append(
180 deepspeed.moe.layer.MoE(
181 hidden_size=84,
182 expert=fc3,
183 num_experts=n_e,
184 ep_size=args.ep_world_size,
185 use_residual=args.mlp_type == "residual",
186 k=args.top_k,
187 min_capacity=args.min_capacity,
188 noisy_gate_policy=args.noisy_gate_policy,
189 )
190 )
191 self.moe_layer_list = nn.ModuleList(self.moe_layer_list)
192 self.fc4 = nn.Linear(84, 10)
193 else:
194 self.fc3 = nn.Linear(84, 10)
195
196 def forward(self, x):
197 x = self.pool(F.relu(self.conv1(x)))
198 x = self.pool(F.relu(self.conv2(x)))
199 x = x.view(-1, 16 * 5 * 5)
200 x = F.relu(self.fc1(x))
201 x = F.relu(self.fc2(x))
202 if self.moe:
203 for layer in self.moe_layer_list:
204 x, _, _ = layer(x)
205 x = self.fc4(x)
206 else:
207 x = self.fc3(x)
208 return x
209
210
211def test(model_engine, testset, local_device, target_dtype, test_batch_size=4):
212 """Test the network on the test data.
213
214 Args:
215 model_engine (deepspeed.runtime.engine.DeepSpeedEngine): the DeepSpeed engine.
216 testset (torch.utils.data.Dataset): the test dataset.
217 local_device (str): the local device name.
218 target_dtype (torch.dtype): the target datatype for the test data.
219 test_batch_size (int): the test batch size.
220
221 """
222 # The 10 classes for CIFAR10.
223 classes = (
224 "plane",
225 "car",
226 "bird",
227 "cat",
228 "deer",
229 "dog",
230 "frog",
231 "horse",
232 "ship",
233 "truck",
234 )
235
236 # Define the test dataloader.
237 testloader = torch.utils.data.DataLoader(
238 testset, batch_size=test_batch_size, shuffle=False, num_workers=0
239 )
240
241 # For total accuracy.
242 correct, total = 0, 0
243 # For accuracy per class.
244 class_correct = list(0.0 for i in range(10))
245 class_total = list(0.0 for i in range(10))
246
247 # Start testing.
248 model_engine.eval()
249 with torch.no_grad():
250 for data in testloader:
251 images, labels = data
252 if target_dtype != None:
253 images = images.to(target_dtype)
254 outputs = model_engine(images.to(local_device))
255 _, predicted = torch.max(outputs.data, 1)
256 # Count the total accuracy.
257 total += labels.size(0)
258 correct += (predicted == labels.to(local_device)).sum().item()
259
260 # Count the accuracy per class.
261 batch_correct = (predicted == labels.to(local_device)).squeeze()
262 for i in range(test_batch_size):
263 label = labels[i]
264 class_correct[label] += batch_correct[i].item()
265 class_total[label] += 1
266
267 if model_engine.local_rank == 0:
268 print(
269 f"Accuracy of the network on the {total} test images: {100 * correct / total : .0f} %"
270 )
271
272 # For all classes, print the accuracy.
273 for i in range(10):
274 print(
275 f"Accuracy of {classes[i] : >5s} : {100 * class_correct[i] / class_total[i] : 2.0f} %"
276 )
277
278
279def main(args):
280 # Initialize DeepSpeed distributed backend.
281 deepspeed.init_distributed()
282
283 ########################################################################
284 # Step1. Data Preparation.
285 #
286 # The output of torchvision datasets are PILImage images of range [0, 1].
287 # We transform them to Tensors of normalized range [-1, 1].
288 #
289 # Note:
290 # If running on Windows and you get a BrokenPipeError, try setting
291 # the num_worker of torch.utils.data.DataLoader() to 0.
292 ########################################################################
293 transform = transforms.Compose(
294 [transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]
295 )
296
297 if torch.distributed.get_rank() != 0:
298 # Might be downloading cifar data, let rank 0 download first.
299 torch.distributed.barrier()
300
301 # Load or download cifar data.
302 trainset = torchvision.datasets.CIFAR10(
303 root="./data", train=True, download=True, transform=transform
304 )
305 testset = torchvision.datasets.CIFAR10(
306 root="./data", train=False, download=True, transform=transform
307 )
308
309 if torch.distributed.get_rank() == 0:
310 # Cifar data is downloaded, indicate other ranks can proceed.
311 torch.distributed.barrier()
312
313 ########################################################################
314 # Step 2. Define the network with DeepSpeed.
315 #
316 # First, we define a Convolution Neural Network.
317 # Then, we define the DeepSpeed configuration dictionary and use it to
318 # initialize the DeepSpeed engine.
319 ########################################################################
320 net = Net(args)
321
322 # Get list of parameters that require gradients.
323 parameters = filter(lambda p: p.requires_grad, net.parameters())
324
325 # If using MoE, create separate param groups for each expert.
326 if args.moe_param_group:
327 parameters = create_moe_param_groups(net)
328
329 # Initialize DeepSpeed to use the following features.
330 # 1) Distributed model.
331 # 2) Distributed data loader.
332 # 3) DeepSpeed optimizer.
333 ds_config = get_ds_config(args)
334 model_engine, optimizer, trainloader, __ = deepspeed.initialize(
335 args=args,
336 model=net,
337 model_parameters=parameters,
338 training_data=trainset,
339 config=ds_config,
340 )
341
342 # Get the local device name (str) and local rank (int).
343 local_device = get_accelerator().device_name(model_engine.local_rank)
344 local_rank = model_engine.local_rank
345
346 # For float32, target_dtype will be None so no datatype conversion needed.
347 target_dtype = None
348 if model_engine.bfloat16_enabled():
349 target_dtype = torch.bfloat16
350 elif model_engine.fp16_enabled():
351 target_dtype = torch.half
352
353 # Define the Classification Cross-Entropy loss function.
354 criterion = nn.CrossEntropyLoss()
355
356 ########################################################################
357 # Step 3. Train the network.
358 #
359 # This is when things start to get interesting.
360 # We simply have to loop over our data iterator, and feed the inputs to the
361 # network and optimize. (DeepSpeed handles the distributed details for us!)
362 ########################################################################
363
364 for epoch in range(args.epochs): # loop over the dataset multiple times
365 running_loss = 0.0
366 for i, data in enumerate(trainloader):
367 # Get the inputs. ``data`` is a list of [inputs, labels].
368 inputs, labels = data[0].to(local_device), data[1].to(local_device)
369
370 # Try to convert to target_dtype if needed.
371 if target_dtype != None:
372 inputs = inputs.to(target_dtype)
373
374 outputs = model_engine(inputs)
375 loss = criterion(outputs, labels)
376
377 model_engine.backward(loss)
378 model_engine.step()
379
380 # Print statistics
381 running_loss += loss.item()
382 if local_rank == 0 and i % args.log_interval == (
383 args.log_interval - 1
384 ): # Print every log_interval mini-batches.
385 print(
386 f"[{epoch + 1 : d}, {i + 1 : 5d}] loss: {running_loss / args.log_interval : .3f}"
387 )
388 running_loss = 0.0
389 print("Finished Training")
390
391 ########################################################################
392 # Step 4. Test the network on the test data.
393 ########################################################################
394 test(model_engine, testset, local_device, target_dtype)
395
396
397if __name__ == "__main__":
398 args = add_argument()
399 main(args)
2. 训练结果查看
训练完成后,会打印模型对图像识别的结果。
1Finished Training
2Accuracy of the network on the 10000 test images: 57 %
3Accuracy of plane : 65 %
4Accuracy of car : 67 %
5Accuracy of bird : 52 %
6Accuracy of cat : 34 %
7Accuracy of deer : 52 %
8Accuracy of dog : 49 %
9Accuracy of frog : 59 %
10Accuracy of horse : 66 %
11Accuracy of ship : 66 %
12Accuracy of truck : 56 %