本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
运行 SageMaker 具有 Tensor 并行性的分布式模型并行训练作业
在本节中,您将了解:
-
如何配置 SageMaker pyTorch 估算器和 SageMaker 使用张量并行度的分布式模型并行度选项。
-
如何使用扩展版调整训练脚本
smdistributed.modelparallel用于张量并行度的模块。
要了解有关的更多信息smdistributed.modelparallel模块,请参阅SageMaker 分布式模型 parallel API
单独张量并行度
以下是一个分布式训练选项的示例,该选项可以单独激活张量并行性,而不需要管道并行性。配置mpi_options和smp_options用于指定分布式训练选项的字典 SageMaker PyTorch估算器。
可以通过适用于 PyTorch 的 Deep Learning Containers 实现扩展内存节省功能,该容器实现 SageMaker 分布式模型 parallel 库 v1.6.0 或更高版本。
配置 SageMaker PyTorch 估算器
mpi_options = { "enabled" : True, "processes_per_host" : 8, # 8 processes "custom_mpi_options" : "--mca btl_vader_single_copy_mechanism none " } smp_options = { "enabled":True, "parameters": { "pipeline_parallel_degree": 1, # alias for "partitions" "placement_strategy": "cluster", "tensor_parallel_degree": 4, # tp over 4 devices "ddp": True } } smd_mp_estimator = PyTorch( entry_point='your_training_script.py', # Specify role=role, instance_type='ml.p3.16xlarge', sagemaker_session=sagemaker_session, framework_version='1.10.2', py_version='py36', instance_count=1, distribution={ "smdistributed": {"modelparallel": smp_options}, "mpi": mpi_options }, base_job_name="SMD-MP-demo", ) smd_mp_estimator.fit('s3://my_bucket/my_training_data/')
要查找的完整参数列表distribution,请参阅模型并行度的配置参数
适应您的 PyTorch 训练脚本
以下示例训练脚本显示了如何调整 SageMaker 分布式模型并行度库到训练脚本。在此示例中,假设脚本被命名为your_training_script.py.
import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim from torchnet.dataset import SplitDataset from torchvision import datasets import smdistributed.modelparallel.torch as smp class Net(nn.Module): def __init__(self): super(Net, self).__init__() self.conv1 = nn.Conv2d(1, 32, 3, 1) self.conv2 = nn.Conv2d(32, 64, 3, 1) self.fc1 = nn.Linear(9216, 128) self.fc2 = nn.Linear(128, 10) def forward(self, x): x = self.conv1(x) x = F.relu(x) x = self.conv2(x) x = F.relu(x) x = F.max_pool2d(x, 2) x = torch.flatten(x, 1) x = self.fc1(x) x = F.relu(x) x = self.fc2(x) return F.log_softmax(x, 1) def train(model, device, train_loader, optimizer): model.train() for batch_idx, (data, target) in enumerate(train_loader): # smdistributed: Move input tensors to the GPU ID used by # the current process, based on the set_device call. data, target = data.to(device), target.to(device) optimizer.zero_grad() output = model(data) loss = F.nll_loss(output, target, reduction="mean") loss.backward() optimizer.step() # smdistributed: Initialize the backend smp.init() # smdistributed: Set the device to the GPU ID used by the current process. # Input tensors should be transferred to this device. torch.cuda.set_device(smp.local_rank()) device = torch.device("cuda") # smdistributed: Download only on a single process per instance. # When this is not present, the file is corrupted by multiple processes trying # to download and extract at the same time if smp.local_rank() == 0: dataset = datasets.MNIST("../data", train=True, download=False) smp.barrier() # smdistributed: Shard the dataset based on data parallel ranks if smp.dp_size() > 1: partitions_dict = {f"{i}": 1 / smp.dp_size() for i in range(smp.dp_size())} dataset = SplitDataset(dataset, partitions=partitions_dict) dataset.select(f"{smp.dp_rank()}") train_loader = torch.utils.data.DataLoader(dataset, batch_size=64) # smdistributed: Enable tensor parallelism for all supported modules in the model # i.e., nn.Linear in this case. Alternatively, we can use # smp.set_tensor_parallelism(model.fc1, True) # to enable it only for model.fc1 with smp.tensor_parallelism(): model = Net() # smdistributed: Use the DistributedModel wrapper to distribute the # modules for which tensor parallelism is enabled model = smp.DistributedModel(model) optimizer = optim.AdaDelta(model.parameters(), lr=4.0) optimizer = smp.DistributedOptimizer(optimizer) train(model, device, train_loader, optimizer)
Tensor 并行度与管道并行度相结合
以下是一个分布式训练选项的示例,该选项可以实现张量并行性与管道并行性相结合。 设置mpi_options和smp_options参 parallel,用于在配置 SageMaker PyTorch估算器。
可以通过适用于 PyTorch 的 Deep Learning Containers 实现扩展内存节省功能,该容器实现 SageMaker 分布式模型 parallel 库 v1.6.0 或更高版本。
配置 SageMaker PyTorch 估算器
mpi_options = { "enabled" : True, "processes_per_host" : 8, # 8 processes "custom_mpi_options" : "--mca btl_vader_single_copy_mechanism none " } smp_options = { "enabled":True, "parameters": { "microbatches": 4,"pipeline_parallel_degree": 2, # alias for "partitions" "placement_strategy": "cluster","tensor_parallel_degree": 2, # tp over 2 devices "ddp": True } } smd_mp_estimator = PyTorch( entry_point='your_training_script.py', # Specify role=role, instance_type='ml.p3.16xlarge', sagemaker_session=sagemaker_session, framework_version='1.10.2', py_version='py36', instance_count=1, distribution={ "smdistributed": {"modelparallel": smp_options}, "mpi": mpi_options }, base_job_name="SMD-MP-demo", ) smd_mp_estimator.fit('s3://my_bucket/my_training_data/')
适应您的 PyTorch 训练脚本
以下示例训练脚本显示了如何调整 SageMaker 分布式模型并行度库到训练脚本。请注意,训练脚本现在包含smp.step装饰器:
import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim from torchnet.dataset import SplitDataset from torchvision import datasets import smdistributed.modelparallel.torch as smp class Net(nn.Module): def __init__(self): super(Net, self).__init__() self.conv1 = nn.Conv2d(1, 32, 3, 1) self.conv2 = nn.Conv2d(32, 64, 3, 1) self.fc1 = nn.Linear(9216, 128) self.fc2 = nn.Linear(128, 10) def forward(self, x): x = self.conv1(x) x = F.relu(x) x = self.conv2(x) x = F.relu(x) x = F.max_pool2d(x, 2) x = torch.flatten(x, 1) x = self.fc1(x) x = F.relu(x) x = self.fc2(x) return F.log_softmax(x, 1) # smdistributed: Define smp.step. Return any tensors needed outside. @smp.step def train_step(model, data, target): output = model(data) loss = F.nll_loss(output, target, reduction="mean") model.backward(loss) return output, loss def train(model, device, train_loader, optimizer): model.train() for batch_idx, (data, target) in enumerate(train_loader): # smdistributed: Move input tensors to the GPU ID used by # the current process, based on the set_device call. data, target = data.to(device), target.to(device) optimizer.zero_grad() # Return value, loss_mb is a StepOutput object _, loss_mb = train_step(model, data, target) # smdistributed: Average the loss across microbatches. loss = loss_mb.reduce_mean() optimizer.step() # smdistributed: Initialize the backend smp.init() # smdistributed: Set the device to the GPU ID used by the current process. # Input tensors should be transferred to this device. torch.cuda.set_device(smp.local_rank()) device = torch.device("cuda") # smdistributed: Download only on a single process per instance. # When this is not present, the file is corrupted by multiple processes trying # to download and extract at the same time if smp.local_rank() == 0: dataset = datasets.MNIST("../data", train=True, download=False) smp.barrier() # smdistributed: Shard the dataset based on data parallel ranks if smp.dp_size() > 1: partitions_dict = {f"{i}": 1 / smp.dp_size() for i in range(smp.dp_size())} dataset = SplitDataset(dataset, partitions=partitions_dict) dataset.select(f"{smp.dp_rank()}") # smdistributed: Set drop_last=True to ensure that batch size is always divisible # by the number of microbatches train_loader = torch.utils.data.DataLoader(dataset, batch_size=64, drop_last=True) model = Net() # smdistributed: enable tensor parallelism only for model.fc1 smp.set_tensor_parallelism(model.fc1, True) # smdistributed: Use the DistributedModel container to provide the model # to be partitioned across different ranks. For the rest of the script, # the returned DistributedModel object should be used in place of # the model provided for DistributedModel class instantiation. model = smp.DistributedModel(model) optimizer = optim.AdaDelta(model.parameters(), lr=4.0) optimizer = smp.DistributedOptimizer(optimizer) train(model, device, train_loader, optimizer)