# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from __future__ import print_function import tempfile import unittest import os import json import collections import math import numpy as np import paddle import paddle.nn as nn import paddle.fluid as fluid import paddle.nn.functional as F import paddle.tensor as tensor import paddle.utils as utils import paddle.static as static from paddle.fluid import core from paddle.fluid import layers from paddle.fluid.framework import _non_static_mode from paddle.nn.layer.transformer import _convert_param_attr_to_list from paddle.fluid.initializer import Normal, Constant, NumpyArrayInitializer from paddle.distributed import fleet from paddle.distributed.fleet import auto from paddle.distributed.auto_parallel.completion import Completer from paddle.distributed.auto_parallel.parallelizer import AutoParallelizer from paddle.distributed.auto_parallel.dist_context import DistributedContext from paddle.distributed.auto_parallel.partitioner import Partitioner from paddle.distributed.auto_parallel.reshard import Resharder from paddle.distributed.auto_parallel.process_group import get_all_process_groups from paddle.distributed.auto_parallel.process_group import new_process_group from paddle.distributed.auto_parallel.cluster import Cluster from paddle.distributed.auto_parallel.cluster import DeviceType from paddle.distributed.auto_parallel.cluster import LinkType from paddle.distributed.auto_parallel.utils import check_distributed_attr_for_program from paddle.distributed.auto_parallel.utils import print_program_with_dist_attr from paddle.distributed.auto_parallel.mapper import build_process_graph from paddle.distributed.auto_parallel.mapper import build_cluster_graph from paddle.distributed.auto_parallel.mapper import mapping from paddle.distributed.auto_parallel.mapper import get_dtype_bytes from paddle.distributed.auto_parallel.mapper import get_comm_volume if os.getenv("CUDA_VISIBLE_DEVICES") is not None: os.environ["CUDA_VISIBLE_DEVICES"] = "" paddle.enable_static() _global_parallel_strategy = None _global_process_mesh = None _global_num_stages = None cluster_json = """ { "machines": [ { "hostname": "machine0", "addr": "0.0.0.1", "port": "768", "devices": [ { "global_id": 0, "local_id": 0, "type": "GPU", "model": "A100-SXM4-40GB", "sp_gflops": 19500, "dp_gflops": 9700, "memory": 40 }, { "global_id": 1, "local_id": 1, "type": "GPU", "model": "A100-SXM4-40GB", "sp_gflops": 19500, "dp_gflops": 9700, "memory": 40 }, { "global_id": 2, "local_id": 2, "type": "GPU", "model": "A100-SXM4-40GB", "sp_gflops": 19500, "dp_gflops": 9700, "memory": 40 }, { "global_id": 3, "local_id": 3, "type": "GPU", "model": "A100-SXM4-40GB", "sp_gflops": 19500, "dp_gflops": 9700, "memory": 40 }, { "global_id": 4, "local_id": 0, "type": "NIC" } ], "links": [ { "source_global_id": 0, "target_global_id": 1, "type": "NVL", "bandwidth": 42 }, { "source_global_id": 0, "target_global_id": 2, "type": "NVL", "bandwidth": 42 }, { "source_global_id": 0, "target_global_id": 3, "type": "NVL", "bandwidth": 42 }, { "source_global_id": 0, "target_global_id": 4, "type": "PHB", "bandwidth": 12 }, { "source_global_id": 1, "target_global_id": 0, "type": "NVL", "bandwidth": 42 }, { "source_global_id": 1, "target_global_id": 2, "type": "NVL", "bandwidth": 42 }, { "source_global_id": 1, "target_global_id": 3, "type": "NVL", "bandwidth": 42 }, { "source_global_id": 1, "target_global_id": 4, "type": "PHB", "bandwidth": 12 }, { "source_global_id": 2, "target_global_id": 0, "type": "NVL", "bandwidth": 42 }, { "source_global_id": 2, "target_global_id": 1, "type": "NVL", "bandwidth": 42 }, { "source_global_id": 2, "target_global_id": 3, "type": "NVL", "bandwidth": 42 }, { "source_global_id": 2, "target_global_id": 4, "type": "PHB", "bandwidth": 12 }, { "source_global_id": 3, "target_global_id": 0, "type": "NVL", "bandwidth": 42 }, { "source_global_id": 3, "target_global_id": 1, "type": "NVL", "bandwidth": 42 }, { "source_global_id": 3, "target_global_id": 2, "type": "NVL", "bandwidth": 42 }, { "source_global_id": 3, "target_global_id": 4, "type": "PHB", "bandwidth": 12 }, { "source_global_id": 4, "target_global_id": 9, "type": "NET", "bandwidth": 1 } ] }, { "hostname": "machine1", "addr": "0.0.0.2", "port": "768", "devices": [ { "global_id": 5, "local_id": 0, "type": "GPU", "model": "Tesla V100-SXM2-32GB", "sp_gflops": 15700, "dp_gflops": 7800, "memory": 32 }, { "global_id": 6, "local_id": 1, "type": "GPU", "model": "Tesla V100-SXM2-32GB", "sp_gflops": 15700, "dp_gflops": 7800, "memory": 32 }, { "global_id": 7, "local_id": 2, "type": "GPU", "model": "Tesla V100-SXM2-32GB", "sp_gflops": 15700, "dp_gflops": 7800, "memory": 32 }, { "global_id": 8, "local_id": 3, "type": "GPU", "model": "Tesla V100-SXM2-32GB", "sp_gflops": 15700, "dp_gflops": 7800, "memory": 32 }, { "global_id": 9, "local_id": 0, "type": "NIC" } ], "links": [ { "source_global_id": 5, "target_global_id": 6, "type": "NVL", "bandwidth": 42 }, { "source_global_id": 5, "target_global_id": 7, "type": "NVL", "bandwidth": 42 }, { "source_global_id": 5, "target_global_id": 8, "type": "NVL", "bandwidth": 42 }, { "source_global_id": 5, "target_global_id": 9, "type": "PHB", "bandwidth": 12 }, { "source_global_id": 6, "target_global_id": 5, "type": "NVL", "bandwidth": 42 }, { "source_global_id": 6, "target_global_id": 7, "type": "NVL", "bandwidth": 42 }, { "source_global_id": 6, "target_global_id": 8, "type": "NVL", "bandwidth": 42 }, { "source_global_id": 6, "target_global_id": 9, "type": "PHB", "bandwidth": 12 }, { "source_global_id": 7, "target_global_id": 5, "type": "NVL", "bandwidth": 42 }, { "source_global_id": 7, "target_global_id": 6, "type": "NVL", "bandwidth": 42 }, { "source_global_id": 7, "target_global_id": 8, "type": "NVL", "bandwidth": 42 }, { "source_global_id": 7, "target_global_id": 9, "type": "PHB", "bandwidth": 12 }, { "source_global_id": 8, "target_global_id": 5, "type": "NVL", "bandwidth": 42 }, { "source_global_id": 8, "target_global_id": 6, "type": "NVL", "bandwidth": 42 }, { "source_global_id": 8, "target_global_id": 7, "type": "NVL", "bandwidth": 42 }, { "source_global_id": 8, "target_global_id": 9, "type": "PHB", "bandwidth": 12 }, { "source_global_id": 9, "target_global_id": 4, "type": "NET", "bandwidth": 1 } ] } ] } """ class MLPLayer(nn.Layer): def __init__(self, hidden_size=64, intermediate_size=4 * 64, initializer_range=0.02): super(MLPLayer, self).__init__() d_model = hidden_size dim_feedforward = intermediate_size np.random.seed(2021) arr0 = np.random.normal(0, 0.02, size=(d_model, dim_feedforward)) arr1 = np.random.normal(0, 0.02, size=(dim_feedforward, d_model)) arr2 = np.random.normal(0, 0.02, size=(d_model, dim_feedforward)) arr3 = np.random.normal(0, 0.02, size=(dim_feedforward, d_model)) weight_attr0 = paddle.ParamAttr(initializer=NumpyArrayInitializer(arr0)) weight_attr1 = paddle.ParamAttr(initializer=NumpyArrayInitializer(arr1)) weight_attr2 = paddle.ParamAttr(initializer=NumpyArrayInitializer(arr2)) weight_attr3 = paddle.ParamAttr(initializer=NumpyArrayInitializer(arr3)) bias_attr = None self.linear0 = nn.Linear(d_model, dim_feedforward, weight_attr0, bias_attr=bias_attr) self.linear1 = nn.Linear(dim_feedforward, d_model, weight_attr1, bias_attr=bias_attr) self.norm = nn.LayerNorm(d_model, epsilon=1e-5) self.linear2 = nn.Linear(d_model, dim_feedforward, weight_attr2, bias_attr=bias_attr) self.linear3 = nn.Linear(dim_feedforward, d_model, weight_attr3, bias_attr=bias_attr) def forward(self, input): if _global_parallel_strategy == "dp_mp_pp": auto.shard_tensor(self.linear0.weight, _global_process_mesh[0], [None, "y"]) auto.shard_tensor(self.linear1.weight, _global_process_mesh[0], ["y", None]) auto.shard_tensor(self.linear2.weight, _global_process_mesh[1], [None, "y"]) auto.shard_tensor(self.linear3.weight, _global_process_mesh[1], ["y", None]) out = self.norm(input) out = self.linear0(out) out = F.gelu(out, approximate=True) out = self.linear1(out) auto.shard_tensor(out, _global_process_mesh[1], ["x", None]) out = self.linear2(out) out = F.gelu(out, approximate=True) out = self.linear3(out) return out def mlp_forward(train_program, start_program): with static.program_guard(train_program,start_program), \ utils.unique_name.guard(): batch_size = 4 hidden_size = 64 input = static.data(name="input", shape=[batch_size, hidden_size], dtype='float32') label = static.data(name="label", shape=[batch_size, 1], dtype='float32') if _global_parallel_strategy == "dp_mp_pp": auto.shard_tensor(input, _global_process_mesh[0], ["x", None]) mlp = MLPLayer(hidden_size=hidden_size, intermediate_size=4 * hidden_size, initializer_range=0.02) predict = mlp(input) error_cost = paddle.nn.functional.square_error_cost(predict, label) loss = paddle.mean(error_cost) return loss, train_program, start_program def get_dist_prog(train_program, startup_program, dist_context, rank_id): loss, train_program, startup_program = mlp_forward(train_program, startup_program) fleet._user_defined_strategy = fleet.DistributedStrategy() fleet.user_defined_optimizer = paddle.fluid.optimizer.AdamOptimizer() parallelizer = AutoParallelizer(fleet) parallelizer._dist_context = dist_context # auto completion completer = Completer(dist_context) complete_train_program = completer.complete_forward_annotation( train_program) dist_context.block_state.parse_forward_blocks(complete_train_program) params_grads = parallelizer._generate_backward(complete_train_program, startup_program, loss, parameter_list=None, no_grad_set=None, callbacks=None) partitioner = Partitioner(dist_context, rank_id) dist_train_program, dist_startup_prog, dist_params_grads = partitioner.partition( complete_train_program, startup_program, params_grads) partitioned_optimize_ops = parallelizer._apply_optimize( dist_train_program, dist_startup_prog, dist_params_grads) resharder = Resharder(dist_train_program, dist_startup_prog, rank_id, dist_context, dist_params_grads) resharder.reshard() return dist_train_program, dist_startup_prog def is_in_machine(device_local_id, machine): for device in machine.devices.values(): if device_local_id == device.local_id: return True return False def get_device_local_ids(machine): local_ids = [] for device in machine.devices.values(): local_ids.append[device.local_id] return local_ids class TestAutoParallelMapper(unittest.TestCase): def setUp(self): self.temp_dir = tempfile.TemporaryDirectory() def tearDown(self): self.temp_dir.cleanup() def test_mapper_dp_mp_pp(self): cluster_json_path = os.path.join(self.temp_dir.name, "auto_parallel_cluster.json") cluster_json_object = json.loads(cluster_json) with open(cluster_json_path, "w") as cluster_json_file: json.dump(cluster_json_object, cluster_json_file) cluster = Cluster() cluster.build_from_file(cluster_json_path) global _global_parallel_strategy _global_parallel_strategy = "dp_mp_pp" global _global_num_stages _global_num_stages = 2 global _global_process_mesh _global_process_mesh = [ auto.ProcessMesh([[0, 1], [2, 3]], dim_names=["x", "y"]), auto.ProcessMesh([[4, 5], [6, 7]], dim_names=["x", "y"]) ] processes = [0, 1, 2, 3, 4, 5, 6, 7] dist_programs = {} for rank_id in processes: train_program = static.Program() startup_program = static.Program() dist_context = DistributedContext() dist_train_program, dist_startup_prog = get_dist_prog( train_program, startup_program, dist_context, rank_id) # if rank_id == 0: # print_program_with_dist_attr(dist_train_program, dist_context) dist_programs[rank_id] = [dist_train_program, None] rank_mapping = mapping(dist_programs, cluster) all_mapped_ranks = set() for machine_id, machine_mapping in rank_mapping.items(): machine = cluster.machines[machine_id] machine_mapped_ranks = set() machine_mapped_device_local_ids = set() for rank, device_ids in machine_mapping["ranks"].items(): # Only allow one process to one device mapping self.assertEqual(len(device_ids), 1) self.assertTrue(is_in_machine(device_ids[0], machine)) machine_mapped_ranks.add(rank) machine_mapped_device_local_ids.add(device_ids[0]) self.assertEqual(len(machine_mapped_ranks), len(machine_mapped_device_local_ids)) all_mapped_ranks.update(machine_mapped_ranks) self.assertEqual(set(processes), all_mapped_ranks) def test_mapper_misc(self): self.assertEqual(get_dtype_bytes(paddle.float64), 8) self.assertEqual(get_dtype_bytes(paddle.float32), 4) self.assertEqual(get_dtype_bytes(paddle.float16), 2) self.assertEqual(get_dtype_bytes(paddle.bfloat16), 2) self.assertEqual(get_dtype_bytes(paddle.int64), 8) self.assertEqual(get_dtype_bytes(paddle.int32), 4) self.assertEqual(get_dtype_bytes(paddle.int16), 2) self.assertEqual(get_dtype_bytes(paddle.int8), 1) self.assertEqual(get_dtype_bytes(paddle.uint8), 1) self.assertRaises(ValueError, get_dtype_bytes, "unknown type") train_program = static.Program() startup_program = static.Program() ring_id = 0 root_id = 0 nranks = 2 with fluid.program_guard(train_program, startup_program): input = layers.data(name="input", shape=[10, 10], dtype='float32') output = train_program.current_block().create_var( name="outofbroadcast", dtype='float32', type=core.VarDesc.VarType.LOD_TENSOR, persistable=False, stop_gradient=False) broadcast_op = train_program.global_block().append_op( type="c_broadcast", inputs={'X': input}, attrs={ 'ring_id': ring_id, 'root': root_id }, outputs={'Out': output}) self.assertEqual(get_comm_volume(broadcast_op, 0, 1), 400) self.assertEqual(get_comm_volume(broadcast_op, 1, 0), None) allgather_op = train_program.global_block().append_op( type="c_allgather", inputs={'X': input}, attrs={ 'ring_id': ring_id, 'nranks': nranks }, outputs={'Out': output}) self.assertEqual(get_comm_volume(allgather_op, 0, 1), 400) self.assertEqual(get_comm_volume(allgather_op, 0, 0), None) reduce_op = train_program.global_block().append_op( type="c_reduce_sum", inputs={'X': input}, attrs={ 'ring_id': ring_id, 'root_id': root_id }, outputs={'Out': output}) self.assertEqual(get_comm_volume(reduce_op, 0, 1), None) self.assertEqual(get_comm_volume(reduce_op, 1, 0), 400) cast_op = train_program.global_block().append_op( type="cast", inputs={"X": input}, outputs={"Out": output}, attrs={ "in_dtype": fluid.core.VarDesc.VarType.FP32, "out_dtype": fluid.core.VarDesc.VarType.FP32 }) self.assertRaises(ValueError, get_comm_volume, cast_op, 0, 1) if __name__ == '__main__': unittest.main()