未验证 提交 2e82b6c8 编写于 作者: S ShenLiang 提交者: GitHub

[Hybrid Parallel] Add Topology for hybrid communicate (#32011)

* support hyparallel, add topology

* fix utest
上级 a3b08bad
...@@ -20,16 +20,13 @@ from .base.util_factory import UtilBase ...@@ -20,16 +20,13 @@ from .base.util_factory import UtilBase
from .dataset import * from .dataset import *
from .data_generator import MultiSlotDataGenerator, MultiSlotStringDataGenerator from .data_generator import MultiSlotDataGenerator, MultiSlotStringDataGenerator
from . import metrics from . import metrics
from .base.topology import CommunicateTopology, HybridCommunicateGroup
__all__ = [ __all__ = [
"DistributedStrategy", "DistributedStrategy", "UtilBase", "UserDefinedRoleMaker",
"UtilBase", "PaddleCloudRoleMaker", "Fleet", "MultiSlotDataGenerator",
"UserDefinedRoleMaker", "MultiSlotStringDataGenerator", "Role", "CommunicateTopology",
"PaddleCloudRoleMaker", "HybridCommunicateGroup"
"Fleet",
"MultiSlotDataGenerator",
"MultiSlotStringDataGenerator",
"Role",
] ]
fleet = Fleet() fleet = Fleet()
......
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle
import collections
import numpy as np
from itertools import product
from functools import reduce
__all__ = ['CommunicateTopology', 'HybridCommunicateGroup']
class CommunicateTopology(object):
def __init__(self, hybrid_group_names, dims):
self._parallel_names = hybrid_group_names
self._dims = dims
self.coordinate = collections.namedtuple('Coordinate',
self._parallel_names)
self._world_size = reduce(lambda x, y: x * y, self._dims)
ranges = [range(d) for d in self._dims]
all_coordinate = [self.coordinate(*x) for x in product(*ranges)]
self._coord2rank = dict(zip(all_coordinate, range(len(all_coordinate))))
self._rank2coord = dict(
zip(self._coord2rank.values(), self._coord2rank.keys()))
def get_hybrid_group_names(self):
return self._parallel_names
def get_dim(self, axis_name):
return self._dims[self._parallel_names.index(axis_name)]
def world_size(self):
return self._world_size
def get_rank(self, **args):
assert len(args) == len(self._dims)
key = self.coordinate(**args)
assert key in self._coord2rank.keys()
return self._coord2rank[key]
def get_coord(self, rank):
assert rank < self._world_size
assert rank in self._rank2coord.keys()
return self._rank2coord[rank]
def get_axis_list(self, axis_name, index):
axis = self._parallel_names.index(axis_name)
ranks = [
self._coord2rank[coord] for coord in self._coord2rank.keys()
if coord[axis] == index
]
ranks.sort()
return ranks
def get_dim_size(self, axis_name):
assert axis_name in self._parallel_names
return self._dims[self._parallel_names.index(axis_name)]
def get_comm_list(self, axis_name):
assert axis_name in self._parallel_names
other_axis_names = [
name for name in self._parallel_names if name != axis_name
]
ranges = []
for name in other_axis_names:
dim_num = self.get_dim_size(name)
ranges.append(range(dim_num))
all_result = []
for x in product(*ranges):
key_coord = {}
for other_name in other_axis_names:
key_coord[other_name] = x[other_axis_names.index(other_name)]
result = []
for i in range(0, self.get_dim_size(axis_name)):
key_coord[axis_name] = i
result.append(self._coord2rank[self.coordinate(**key_coord)])
all_result.append(result)
return all_result
class HybridCommunicateGroup(object):
def __init__(self, topology):
self.nranks = paddle.distributed.get_world_size()
self.global_rank = paddle.distributed.get_rank()
self._topo = topology
self._num_data_parallel = self._topo.get_dim('data')
self._num_model_parallel = self._topo.get_dim('model')
self._num_pipe_parallel = self._topo.get_dim('pipe')
self._data_parallel_id = self._get_data_parallel_id()
self._model_parallel_id = self._get_model_parallel_id()
assert self._check_vaild_topo(
), "Here is an unreasonable topogy setting"
# create comm group for data parallel
self._dp_group, self._dp_comm_group = self._set_comm_group("data")
print("data parallel group", self._dp_group)
# create comm group for model parallel
self._mp_group, self._mp_comm_group = self._set_comm_group("model")
print("model parallel group", self._mp_group)
def _check_vaild_topo(self):
return self._num_data_parallel * self._num_model_parallel * self._num_pipe_parallel == self.nranks
def _set_comm_group(self, parallel_method="data"):
parallel_group = []
parallel_comm_group = None
parallel_groups = self._topo.get_comm_list(parallel_method)
for group in parallel_groups:
comm_group = paddle.distributed.new_group(ranks=group)
if self.global_rank in group:
parallel_group = group
parallel_comm_group = comm_group
assert len(parallel_group) > 0
assert parallel_comm_group is not None
return parallel_group, parallel_comm_group
def topology(self):
return self._topo
def get_global_rank(self):
return self.global_rank
# data parallel message:
def _get_data_parallel_id(self):
return self._topo.get_coord(self.global_rank).data
def get_data_parallel_rank(self):
return self._data_parallel_id
def get_data_parallel_world_size(self):
return self._num_data_parallel
def get_data_parallel_group(self):
return self._dp_comm_group
def get_data_parallel_group_src_rank(self):
return self._dp_comm_group.ranks[0]
# model parallel message:
def _get_model_parallel_id(self):
return self._topo.get_coord(self.global_rank).model
def get_model_parallel_rank(self):
return self._model_parallel_id
def get_model_parallel_world_size(self):
return self._num_model_parallel
def get_model_parallel_group(self):
return self._mp_comm_group
def get_model_parallel_group_src_rank(self):
return self._mp_comm_group.ranks[0]
...@@ -524,7 +524,6 @@ if(WITH_DISTRIBUTE) ...@@ -524,7 +524,6 @@ if(WITH_DISTRIBUTE)
if(WITH_GPU OR WITH_ROCM) if(WITH_GPU OR WITH_ROCM)
bash_test_modules(test_c_comm_init_op START_BASH test_c_comm_init_op.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR}) bash_test_modules(test_c_comm_init_op START_BASH test_c_comm_init_op.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR})
py_test_modules(test_launch_coverage MODULES test_launch_coverage) py_test_modules(test_launch_coverage MODULES test_launch_coverage)
bash_test_modules(test_new_group START_BASH test_new_group.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR})
endif() endif()
bash_test_modules(test_fleetrun START_BASH test_fleetrun.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR}) bash_test_modules(test_fleetrun START_BASH test_fleetrun.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR})
...@@ -543,6 +542,7 @@ if(WITH_DISTRIBUTE) ...@@ -543,6 +542,7 @@ if(WITH_DISTRIBUTE)
endif() endif()
endforeach(TEST_OP) endforeach(TEST_OP)
bash_test_modules(test_fleet_launch_ps START_BASH test_fleet_launch_ps.sh SERIAL LABELS "RUN_TYPE=EXCLUSIVE" ENVS "PADDLE_DIST_UT_PORT=${dist_ut_port}" PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR} ) bash_test_modules(test_fleet_launch_ps START_BASH test_fleet_launch_ps.sh SERIAL LABELS "RUN_TYPE=EXCLUSIVE" ENVS "PADDLE_DIST_UT_PORT=${dist_ut_port}" PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR} )
bash_test_modules(test_new_group START_BASH test_new_group.sh SERIAL LABELS "RUN_TYPE=EXCLUSIVE" ENVS "PADDLE_DIST_UT_PORT=${dist_ut_port}+20" PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR} )
endif(NOT APPLE) endif(NOT APPLE)
endif() endif()
......
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
import os
import paddle
from paddle.distributed import fleet
class TestNewGroupAPI(object):
def __init__(self):
paddle.distributed.init_parallel_env()
topo = fleet.CommunicateTopology(["data", "model", "pipe"], [2, 1, 1])
self.hcg = fleet.HybridCommunicateGroup(topo)
d1 = np.array([1, 2, 3])
d2 = np.array([2, 3, 4])
self.tensor1 = paddle.to_tensor(d1)
self.tensor2 = paddle.to_tensor(d2)
def test_all(self):
topo = self.hcg.topology()
global_rank = self.hcg.get_data_parallel_rank()
dp_rank = self.hcg.get_data_parallel_rank()
dp_gp = self.hcg.get_data_parallel_group()
dp_world_size = self.hcg.get_data_parallel_world_size()
dp_src_rank = self.hcg.get_data_parallel_group_src_rank()
np.testing.assert_array_equal(dp_world_size, 2)
np.testing.assert_array_equal(dp_src_rank, 0)
mp_rank = self.hcg.get_model_parallel_rank()
mp_gp = self.hcg.get_model_parallel_group()
mp_world_size = self.hcg.get_model_parallel_world_size()
mp_src_rank = self.hcg.get_model_parallel_group_src_rank()
np.testing.assert_array_equal(mp_world_size, 1)
tmp = np.array([0, 0, 0])
result = paddle.to_tensor(tmp)
paddle.distributed.scatter(
result, [self.tensor2, self.tensor1],
src=dp_src_rank,
group=dp_gp,
use_calc_stream=True)
if dp_rank == 0:
assert np.array_equal(result, self.tensor2)
elif dp_rank == 1:
assert np.array_equal(result, self.tensor1)
print("test scatter api ok")
paddle.distributed.broadcast(
result, src=1, group=dp_gp, use_calc_stream=True)
assert np.array_equal(result, self.tensor1)
print("test broadcast api ok")
paddle.distributed.reduce(
result, dst=dp_src_rank, group=dp_gp, use_calc_stream=True)
if dp_rank == 0:
assert np.array_equal(result,
paddle.add(self.tensor1, self.tensor1))
elif dp_rank == 1:
assert np.array_equal(result, self.tensor1)
print("test reduce api ok")
paddle.distributed.all_reduce(result, use_calc_stream=True)
assert np.array_equal(
result,
paddle.add(paddle.add(self.tensor1, self.tensor1), self.tensor1))
print("test all_reduce api ok")
paddle.distributed.wait(result, dp_gp, use_calc_stream=True)
paddle.distributed.wait(result, dp_gp, use_calc_stream=False)
print("test wait api ok")
result = []
paddle.distributed.all_gather(
result, self.tensor1, group=dp_gp, use_calc_stream=True)
assert np.array_equal(result[0], self.tensor1)
assert np.array_equal(result[1], self.tensor1)
print("test all_gather api ok")
paddle.distributed.barrier(group=dp_gp)
print("test barrier api ok")
return
if __name__ == "__main__":
gpt = TestNewGroupAPI()
gpt.test_all()
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import paddle
import paddle.nn as nn
import unittest
from paddle.distributed import fleet
import numpy as np
class TestCommunicateTopology(unittest.TestCase):
def test_topology(self):
topo = fleet.CommunicateTopology(["dp", "mp", "pp"], [2, 2, 2])
# test get_comm_list
dp_comm_list = [[0, 4], [1, 5], [2, 6], [3, 7]]
mp_comm_list = [[0, 2], [1, 3], [4, 6], [5, 7]]
pp_comm_list = [[0, 1], [2, 3], [4, 5], [6, 7]]
np.testing.assert_array_equal(dp_comm_list, topo.get_comm_list("dp"))
np.testing.assert_array_equal(mp_comm_list, topo.get_comm_list("mp"))
np.testing.assert_array_equal(pp_comm_list, topo.get_comm_list("pp"))
# test get_hybrid_group_names
parallel_names = ["dp", "mp", "pp"]
np.testing.assert_array_equal(parallel_names,
topo.get_hybrid_group_names())
# test get_dims
np.testing.assert_array_equal(2, topo.get_dim("dp"))
np.testing.assert_array_equal(2, topo.get_dim("mp"))
np.testing.assert_array_equal(2, topo.get_dim("pp"))
# test world size
self.assertEqual(topo.world_size(), 8)
# test get_rank
self.assertEqual(topo.get_rank(dp=0, mp=0, pp=0), 0)
self.assertEqual(topo.get_rank(dp=0, mp=0, pp=1), 1)
self.assertEqual(topo.get_rank(dp=0, mp=1, pp=0), 2)
self.assertEqual(topo.get_rank(dp=0, mp=1, pp=1), 3)
self.assertEqual(topo.get_rank(dp=1, mp=0, pp=0), 4)
self.assertEqual(topo.get_rank(dp=1, mp=0, pp=1), 5)
self.assertEqual(topo.get_rank(dp=1, mp=1, pp=0), 6)
self.assertEqual(topo.get_rank(dp=1, mp=1, pp=1), 7)
# test get_coord
self.assertEqual(topo.get_coord(0), topo.coordinate(0, 0, 0))
self.assertEqual(topo.get_coord(1), topo.coordinate(0, 0, 1))
self.assertEqual(topo.get_coord(2), topo.coordinate(0, 1, 0))
self.assertEqual(topo.get_coord(3), topo.coordinate(0, 1, 1))
self.assertEqual(topo.get_coord(4), topo.coordinate(1, 0, 0))
self.assertEqual(topo.get_coord(5), topo.coordinate(1, 0, 1))
self.assertEqual(topo.get_coord(6), topo.coordinate(1, 1, 0))
self.assertEqual(topo.get_coord(7), topo.coordinate(1, 1, 1))
# test get_axis_list
self.assertEqual(topo.get_axis_list("dp", 0), [0, 1, 2, 3])
self.assertEqual(topo.get_axis_list("dp", 1), [4, 5, 6, 7])
self.assertEqual(topo.get_axis_list("mp", 0), [0, 1, 4, 5])
self.assertEqual(topo.get_axis_list("mp", 1), [2, 3, 6, 7])
self.assertEqual(topo.get_axis_list("pp", 0), [0, 2, 4, 6])
self.assertEqual(topo.get_axis_list("pp", 1), [1, 3, 5, 7])
# test get_dim_size
self.assertEqual(topo.get_dim_size("dp"), 2)
self.assertEqual(topo.get_dim_size("mp"), 2)
self.assertEqual(topo.get_dim_size("pp"), 2)
if __name__ == '__main__':
unittest.main()
...@@ -17,3 +17,4 @@ ...@@ -17,3 +17,4 @@
set -e set -e
CUDA_VISIBLE_DEVICES=0,1 python -m paddle.distributed.launch --gpus=0,1 new_group.py CUDA_VISIBLE_DEVICES=0,1 python -m paddle.distributed.launch --gpus=0,1 new_group.py
CUDA_VISIBLE_DEVICES=0,1 python -m paddle.distributed.launch --gpus=0,1 hybrid_communicate_group.py
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册