未验证 提交 8df46229 编写于 作者: L lilong12 提交者: GitHub

wrapper the usage of distributed functions (#39720)

上级 b3270adf
......@@ -158,16 +158,17 @@ class ProcessGroupMapFromGid {
}
void insert(int gid, ProcessGroup* pg) {
PADDLE_ENFORCE_EQ(has(gid), false,
platform::errors::PreconditionNotMet(
"The process group with id %d doesnot exist.", gid));
// PADDLE_ENFORCE_EQ(has(gid), false,
// platform::errors::PreconditionNotMet(
// "The process group with id %d does exist.", gid));
map_[gid] = pg;
}
ProcessGroup* get(int gid) {
PADDLE_ENFORCE_EQ(has(gid), false,
platform::errors::PreconditionNotMet(
"The process group with id %d doesnot exist.", gid));
// PADDLE_ENFORCE_EQ(has(gid), true,
// platform::errors::PreconditionNotMet(
// "The process group with id %d doesnot exist.",
// gid));
return map_.find(gid)->second;
}
......
......@@ -16,7 +16,9 @@ import numpy as np
import os
from datetime import timedelta
from ..fluid.layer_helper import LayerHelper
import paddle.fluid.framework as framework
from ..fluid.framework import Variable
from ..fluid.framework import in_dygraph_mode
from ..fluid.framework import OpProtoHolder
from ..fluid.framework import _non_static_mode
from ..fluid.framework import convert_np_dtype_to_dtype_
......@@ -174,10 +176,6 @@ def _new_ring_id():
return len(_get_group_map()) + max(_get_global_env().nrings, 9)
def _new_group_name_id():
return len(_get_group_map_by_name()) + max(_get_global_env().nrings, 9)
def get_group(id=0):
"""
......@@ -202,194 +200,24 @@ def get_group(id=0):
return gm[id] if id in gm else None
def _new_process_group_impl(backend, store, rank, world_size, group_name,
pg_options):
if backend == "gloo":
gloo_store = core.GlooStore(store)
def _new_process_group_impl(backend,
store,
rank,
world_size,
group_name,
pg_options,
group_id=0):
pg = None
if backend == "gloo":
pg = core.ProcessGroupGloo(gloo_store, rank, world_size)
pg = core.ProcessGroupGloo(store, rank, world_size, group_id)
elif backend == "nccl":
pg = core.ProcessGroupNCCL(store, rank, world_size)
pg = core.ProcessGroupNCCL(store, rank, world_size, group_id)
elif backend == "hccl":
pg = core.ProcessGroupHCCL(store, rank, world_size)
pg = core.ProcessGroupHCCL(store, rank, world_size, group_id)
return pg
def _init_parallel_env(rank=None,
world_size=None,
backend="nccl",
timeout=timedelta(0),
pg_options=None):
"""
Initializes the default distributed environment.
Args:
rank (int, optional): the rank of the current process or device from 0 to world_size (exclusive).
If you launch your training with paddle.distributed.run or
paddle.distributed.launch module, None can be given. Default: None.
world_size (int, optional): total number of processes or devices.
If you launch your training with paddle.distributed.run or
paddle.distributed.launch module, None can be given. Default: None.
backend (str, optional): the name of the backend used to initialize
the distributed environment. The value can be one of 'nccl' for
GPU, 'gloo' for CPU or 'hccl' for NPU. Default: 'nccl'.
timeout (datetime.timedelta, optional): timeout used for operations of
the group. Default: datetime.timedelta(0) which means no timeout.
pg_options (dict, optional): options for the group. Default: None.
Returns:
Group: a group.
Examples:
.. code-block:: python
# filename: train.py
import paddle
paddle.distributed.init_parallel_env(0, 1)
# how to start
# python paddle.distributed.run --gpus="0,1" train.py
"""
global _group_map_by_name
global _default_group_name
assert _default_group_name not in _group_map_by_name, (
"The default distributed environment has been initialized.")
assert backend in _valid_backend_list, (
"Backend must be one of {}, but the given one is: {}".format(
_valid_backend_list, backend))
_default_backend = backend
assert isinstance(timeout, timedelta), (
"timeout must be of the type datetime.timedelta.")
if rank is None or world_size is None:
assert rank is None and world_size is None, (
"rank and world_size should be unset at the same time.")
trainer_id = os.getenv("PADDLE_TRAINER_ID", None)
trainer_num = os.getenv("PADDLE_TRAINERS_NUM", None)
if trainer_id is None or trainer_num is None:
warnings.warn("If rank and world_size are both None, please start "
"your training with paddle.distributed.run or "
"paddle.distributed.launch module. Otherwise, "
"init_parallel_env will do nothing.")
return None
rank = int(trainer_id)
world_size = int(trainer_num)
assert rank >= 0 and world_size > rank and world_size > 1, (
"rank must be non-negative and world_size must be the "
"maximum rank plus one. Moreover, at least two processes are "
"required to create a process group.")
master_addr = os.getenv("MASTER_ADDR", None)
master_port = os.getenv("MASTER_PORT", None)
if not master_addr or not master_port:
endpoints = os.getenv("PADDLE_MASTER", None)
if endpoints is None:
endpoints = os.getenv("PADDLE_TRAINER_ENDPOINTS", None)
if not endpoints:
raise ValueError(
"The environment variable 'MASTER_ADDR' and 'MASTER_PORT' "
"must be specified, for example 'export MASTER_ADDR=127.0.0.1' "
"and 'export MASTER_ADDR=54612'. Or you can start your training"
"with paddle.distributed.run or "
"paddle.distributed.luanch module.")
if ',' in endpoints:
endpoints = endpoints.split(',')[0]
master_addr, master_port = endpoints.split(":")
master_port = int(master_port)
is_master = rank == 0
global _default_store
_default_store = core.TCPStore(master_addr, master_port, is_master,
world_size, timeout)
pg = _new_process_group_impl(backend, _default_store, rank, world_size,
_default_group_name, pg_options)
ranks = list(range(world_size))
group = Group(
rank, world_size, id=0, ranks=ranks, pg=pg, name=_default_group_name)
paddle.fluid.dygraph.parallel_helper._set_parallel_ctx(True)
_group_map_by_name[_default_group_name] = group
return group
def _new_group(ranks=None,
backend=None,
group_name=None,
timeout=timedelta(0),
pg_options=None):
"""
Create a new process group.
Args:
ranks (list, optional): list of ranks for the new group. If None is given,
all processes is used. Default: None.
backend (str, optional): the name of the backend used to initialize
the distributed environment. Default: the one for init_parallel_env.
timeout (datetime.timedelta, optional): timeout used for operations of
the group. Default: datetime.timedelta(0).
pg_options (dict, optional): options for the group. Default: None.
Examples:
.. code-block:: python
import paddle
paddle.distributed.init_parallel_env(0, 1)
paddle.distributed.new_group([0, 1])
# how to start
# python paddle.distributed.run --gpus="0,1" train.py
"""
global _default_group_name
if group_name is None:
group_name = _default_group_name + str(_new_group_name_id())
if group_name == _default_group_name:
raise ValueError("group_name must be specified and it cannot be '{}' "
"which is used for the default process group created "
"by init_parallel_env.".format(_default_group_name))
global_group = _get_default_group()
global_rank = global_group.rank
global_ranks = global_group.ranks
if ranks is None:
ranks = global_ranks
assert len(ranks) <= len(global_ranks), (
"Size of new group must be less than or "
"equal to that of the default global group.")
size = len(ranks)
assert size > 1, "A group must have at least two memebers."
ranks = sorted(ranks)
if global_rank in ranks:
rank = ranks.index(global_rank)
pg = _new_process_group_impl(backend, _default_store, rank, size,
group_name, pg_options)
else:
rank = -1
pg = None
group = Group(
rank,
size,
id=_new_group_name_id(),
ranks=ranks,
pg=pg,
name=group_name)
_group_map_by_name[group_name] = group
return group
def barrier(group=None):
"""
......@@ -414,6 +242,12 @@ def barrier(group=None):
if group is not None and not group.is_member():
return
if framework._in_eager_mode_ and in_dygraph_mode():
group = _get_default_group() if group is None else group
task = group.process_group.barrier()
task.wait()
return
ring_id = 0 if group is None else group.id
temp = fill_constant([1], dtype="int32", value="1")
......@@ -455,6 +289,40 @@ def new_group(ranks=None, backend=None):
paddle.distributed.all_reduce(tindata, group=gp, use_calc_stream=False)
"""
global _group_map
if framework._in_eager_mode_:
global _default_group_name
gid = _new_ring_id()
group_name = _default_group_name + str(gid)
global_group = _get_default_group()
global_rank = global_group.rank
global_ranks = global_group.ranks
if ranks is None:
ranks = global_ranks
assert len(ranks) <= len(global_ranks), (
"Size of new group must be less than or "
"equal to that of the default global group.")
size = len(ranks)
assert size > 1, "A group must have at least two memebers."
ranks = sorted(ranks)
if global_rank in ranks:
rank = ranks.index(global_rank)
pg = _new_process_group_impl(
backend,
_default_store,
rank,
size,
group_name,
pg_options=None,
group_id=gid)
else:
rank = -1
pg = None
group = Group(rank, size, id=gid, ranks=ranks, pg=pg, name=group_name)
_group_map_by_name[group_name] = group
_group_map[gid] = group
return group
if not backend:
backend = 'nccl'
......@@ -465,7 +333,6 @@ def new_group(ranks=None, backend=None):
ring_id = _new_ring_id()
global _group_map
if global_rank not in ranks:
gp = Group(-1, -1, ring_id, ranks)
_group_map[ring_id] = gp
......@@ -628,7 +495,18 @@ def broadcast(tensor, src, group=None, use_calc_stream=True):
if not isinstance(src, int):
raise ValueError("src should be int.")
ring_id = 0 if group is None else group.id
if framework._in_eager_mode_ and in_dygraph_mode():
group = _get_default_group() if group is None else group
gsrc = group.get_group_rank(src)
assert gsrc >= 0, ("src rank out of group, need global rank")
task = group.process_group.broadcast(tensor, gsrc)
if use_calc_stream:
task.wait()
return None
else:
return task
ring_id = ring_id = 0 if group is None else group.id
gsrc = src if group is None else group.get_group_rank(src)
assert gsrc >= 0, ("src rank out of group, need global rank")
......@@ -701,6 +579,23 @@ def all_reduce(tensor, op=ReduceOp.SUM, group=None, use_calc_stream=True):
if group is not None and not group.is_member():
return
if framework._in_eager_mode_ and in_dygraph_mode():
if op == ReduceOp.SUM:
op_type = core.ReduceOp.SUM
elif op == ReduceOp.MAX:
op_type = core.ReduceOp.MAX
elif op == ReduceOp.MIN:
op_type = core.ReduceOp.MIN
else:
raise ValueError("Unknown reduce_op type for allreduce.")
group = _get_default_group() if group is None else group
task = group.process_group.allreduce(tensor, op_type)
if use_calc_stream:
task.wait()
return None
else:
return task
ring_id = 0 if group is None else group.id
if _non_static_mode():
if op == ReduceOp.SUM:
......@@ -721,9 +616,6 @@ def all_reduce(tensor, op=ReduceOp.SUM, group=None, use_calc_stream=True):
check_variable_and_dtype(
tensor, 'tensor', ['float16', 'float32', 'float64', 'int32', 'int64'],
'all_reduce')
if not op in [ReduceOp.SUM, ReduceOp.MAX, ReduceOp.MIN, ReduceOp.PROD]:
raise ValueError("The op for all_reduce must be one of educeOp.PROD, "
"ReduceOp.SUM, ReduceOp.MAX, ReduceOp.MIN.")
if op == ReduceOp.SUM:
op_type = 'c_allreduce_sum'
elif op == ReduceOp.MAX:
......@@ -789,8 +681,24 @@ def reduce(tensor, dst, op=ReduceOp.SUM, group=None, use_calc_stream=True):
if group is not None and not group.is_member():
return
if not isinstance(dst, int):
raise ValueError("dst should be int.")
if framework._in_eager_mode_ and in_dygraph_mode():
if op == ReduceOp.SUM:
op_type = core.ReduceOp.SUM
elif op == ReduceOp.MAX:
op_type = core.ReduceOp.MAX
elif op == ReduceOp.MIN:
op_type = core.ReduceOp.MIN
else:
raise ValueError("Unknown reduce_op type for reduce.")
group = _get_default_group() if group is None else group
gdst = group.get_group_rank(dst)
assert gdst >= 0, ("dst rank out of group, need global rank")
task = group.process_group.reduce(tensor, gdst, op_type)
if use_calc_stream:
task.wait()
return None
else:
return task
ring_id = 0 if group is None else group.id
gdst = dst if group is None else group.get_group_rank(dst)
......@@ -820,9 +728,6 @@ def reduce(tensor, dst, op=ReduceOp.SUM, group=None, use_calc_stream=True):
check_variable_and_dtype(
tensor, 'tensor', ['float16', 'float32', 'float64', 'int32', 'int64'],
'all_reduce')
if not op in [ReduceOp.SUM, ReduceOp.MAX, ReduceOp.MIN, ReduceOp.PROD]:
raise ValueError("The op for reduce must be one of educeOp.PROD, "
"ReduceOp.SUM, ReduceOp.MAX, ReduceOp.MIN.")
if op == ReduceOp.SUM:
op_type = 'c_reduce_sum'
......@@ -897,6 +802,15 @@ def all_gather(tensor_list, tensor, group=None, use_calc_stream=True):
if group is not None and not group.is_member():
return
if framework._in_eager_mode_ and in_dygraph_mode():
group = _get_default_group() if group is None else group
out = paddle.concat(tensor_list)
task = group.process_group.all_gather(tensor, out)
task.wait()
tensor_list.clear()
tensor_list.extend(paddle.split(out, group.nranks, 0))
return
ring_id = 0 if group is None else group.id
nranks = _get_global_group().nranks if group is None else group.nranks
......@@ -985,18 +899,32 @@ def scatter(tensor, tensor_list=None, src=0, group=None, use_calc_stream=True):
if not isinstance(src, int):
raise ValueError("src should be int.")
ring_id = 0 if group is None else group.id
gsrc = src if group is None else group.get_group_rank(src)
if framework._in_eager_mode_ and in_dygraph_mode():
group = _get_default_group() if group is None else group
gsrc = group.get_group_rank(src)
rank = group.rank
nranks = group.nranks
else:
ring_id = 0 if group is None else group.id
gsrc = src if group is None else group.get_group_rank(src)
rank = _get_global_group().rank if group is None else group.rank
nranks = _get_global_group().nranks if group is None else group.nranks
assert gsrc >= 0, ("src rank out of group, need global rank")
rank = _get_global_group().rank if group is None else group.rank
nranks = _get_global_group().nranks if group is None else group.nranks
if rank != gsrc:
tensor_list = []
for _ in range(nranks):
tensor_list.append(tensor)
temp = paddle.concat(tensor_list, axis=0)
if _non_static_mode():
if framework._in_eager_mode_ and in_dygraph_mode():
task = group.process_group.scatter(temp, tensor, gsrc)
if use_calc_stream:
task.wait()
return None
else:
return task
if in_dygraph_mode():
return _C_ops.c_scatter(temp, tensor, 'use_calc_stream',
use_calc_stream, 'ring_id', ring_id, 'nranks',
nranks, 'root', gsrc)
......@@ -1070,11 +998,12 @@ def _c_concat(tensor, group=None):
"""
if group is not None and not group.is_member():
return
ring_id = 0 if group is None else group.id
group = _get_default_group() if group is None else group
ring_id = group.id
global_rank = _get_global_env().rank
rank = global_rank if group is None else group.get_group_rank(global_rank)
nranks = _get_global_env().world_size if group is None else group.nranks
rank = group.rank
nranks = group.nranks
if _non_static_mode():
return _C_ops.c_concat(tensor, 'ring_id', ring_id, 'use_calc_stream',
......@@ -1765,9 +1694,21 @@ def alltoall(in_tensor_list, out_tensor_list, group=None, use_calc_stream=True):
if group is not None and not group.is_member():
return
ring_id = 0 if group is None else group.id
if framework._in_eager_mode_ and in_dygraph_mode():
group = _get_default_group() if group is None else group
else:
ring_id = 0 if group is None else group.id
temp = paddle.concat(in_tensor_list, axis=0)
nranks = len(in_tensor_list)
if framework._in_eager_mode_ and in_dygraph_mode():
out = paddle.concat(out_tensor_list, axis=0)
task = group.process_group.alltoall(temp, out)
task.wait()
out_tensor_list.clear()
out_tensor_list.extend(paddle.split(out, nranks, 0))
return
if _non_static_mode():
out = _C_ops.alltoall(temp, 'use_calc_stream', use_calc_stream,
'ring_id', ring_id)
......@@ -1834,6 +1775,16 @@ def send(tensor, dst=0, group=None, use_calc_stream=True):
"""
if group is not None and not group.is_member():
return
if framework._in_eager_mode_ and in_dygraph_mode():
group = _get_default_group() if group is None else group
task = group.process_group.send(tensor, dst)
if use_calc_stream:
task.wait()
return None
else:
return task
ring_id = 0 if group is None else group.id
if _non_static_mode():
......@@ -1887,6 +1838,16 @@ def recv(tensor, src=0, group=None, use_calc_stream=True):
"""
if group is not None and not group.is_member():
return
if framework._in_eager_mode_ and in_dygraph_mode():
group = _get_default_group() if group is None else group
task = group.process_group.recv(tensor, src)
if use_calc_stream:
task.wait()
return None
else:
return task
ring_id = 0 if group is None else group.id
if _non_static_mode():
......
......@@ -24,11 +24,21 @@ from paddle import compat as cpt
# deprecated module import
from paddle.fluid import core
import paddle.fluid.framework as framework
from paddle.fluid.framework import _set_expected_place
from paddle.fluid.dygraph import parallel_helper
from paddle.distributed.fleet.launch_utils import check_backend
from paddle.fluid.dygraph.parallel import ParallelEnv
from paddle.distributed.fleet.base.private_helper_function import wait_server_ready # noqa: F401
import paddle.distributed.collective as collective
from paddle.distributed.collective import _group_map_by_name
from paddle.distributed.collective import _group_map
from paddle.distributed.collective import _default_group_name
from paddle.distributed.collective import _valid_backend_list
from paddle.distributed.collective import _default_backend
from paddle.distributed.collective import _default_store
from paddle.distributed.collective import _new_process_group_impl
from paddle.distributed.collective import Group
__all__ = []
......@@ -159,18 +169,88 @@ def init_parallel_env():
if not is_cpu_only and core.is_compiled_with_cuda():
_check_var_exists("FLAGS_selected_gpus")
backend = "nccl" if backend == "auto" else backend
elif not is_cpu_only and core.is_compiled_with_xpu():
_check_var_exists('FLAGS_selected_xpus')
backend = "bkcl" if backend == "auto" else backend
elif not is_cpu_only and core.is_compiled_with_npu():
_check_var_exists('FLAGS_selected_npus')
backend = "hccl" if backend == "auto" else backend
elif not is_cpu_only and core.is_compiled_with_mlu():
_check_var_exists('FLAGS_selected_mlus')
backend = "cncl" if backend == "auto" else backend
_check_var_exists("PADDLE_TRAINER_ID")
_check_var_exists("PADDLE_CURRENT_ENDPOINT")
_check_var_exists("PADDLE_TRAINERS_NUM")
_check_var_exists("PADDLE_TRAINER_ENDPOINTS")
# NOTE(chenweihang): [ why config global place here? ]
# the dygraph mode will be set to default mode,
# users will not call `dygraph.guard` or `enable_dygraph`
# directly, if they want to switch default place,
# they need to call a function to change default place,
# here just set correctly place to users
if is_cpu_only:
place = core.CPUPlace()
elif core.is_compiled_with_cuda():
place = core.CUDAPlace(parallel_env.device_id)
elif core.is_compiled_with_xpu():
place = core.XPUPlace(parallel_env.device_id)
elif core.is_compiled_with_npu():
place = core.NPUPlace(parallel_env.device_id)
elif core.is_compiled_with_mlu():
place = core.MLUPlace(parallel_env.device_id)
_set_expected_place(place)
group = None
if backend in _valid_backend_list and framework._in_eager_mode_:
if _default_group_name in collective._group_map_by_name:
return collective._group_map_by_name[_default_group_name]
_default_backend = backend
rank = int(os.getenv("PADDLE_TRAINER_ID"))
world_size = int(os.getenv("PADDLE_TRAINERS_NUM"))
assert rank >= 0 and world_size > rank and world_size > 1, (
"rank must be non-negative and world_size must be the "
"maximum rank plus one. Moreover, at least two processes are "
"required to create a process group.")
master_addr = os.getenv("MASTER_ADDR", None)
master_port = os.getenv("MASTER_PORT", None)
if not master_addr or not master_port:
endpoints = os.getenv("PADDLE_MASTER", None)
if endpoints is None:
endpoints = os.getenv("PADDLE_TRAINER_ENDPOINTS").split(',')[0]
assert endpoints, (
"The environment variable 'MASTER_ADDR' and 'MASTER_PORT' "
"must be specified, for example 'export MASTER_ADDR=127.0.0.1' "
"and 'export MASTER_ADDR=54612'. Or you can start your training"
"with paddle.distributed.run module.")
master_addr, master_port = endpoints.split(":")
master_port = int(master_port)
is_master = rank == 0
_default_store = core.TCPStore(master_addr, master_port, is_master,
world_size)
pg = _new_process_group_impl(
backend,
_default_store,
rank,
world_size,
_default_group_name,
pg_options=None)
ranks = list(range(world_size))
group = Group(
rank,
world_size,
id=0,
ranks=ranks,
pg=pg,
name=_default_group_name)
collective._group_map_by_name[_default_group_name] = group
_group_map[0] = group
parallel_helper._set_parallel_ctx(True)
return group
node_num = set([i.split(":")[0] for i in parallel_env.trainer_endpoints])
# 3: init gloo context (step 1: httpsever start)
init_gloo = int(os.getenv("PADDLE_WITH_GLOO", "0"))
......@@ -202,24 +282,6 @@ def init_parallel_env():
strategy.current_endpoint = parallel_env.current_endpoint
strategy.nrings = parallel_env.nrings
# NOTE(chenweihang): [ why config global place here? ]
# the dygraph mode will be set to default mode,
# users will not call `dygraph.guard` or `enable_dygraph`
# directly, if they want to switch default place,
# they need to call a function to change default place,
# here just set correctly place to users
if is_cpu_only:
place = core.CPUPlace()
elif core.is_compiled_with_cuda():
place = core.CUDAPlace(parallel_env.device_id)
elif core.is_compiled_with_xpu():
place = core.XPUPlace(parallel_env.device_id)
elif core.is_compiled_with_npu():
place = core.NPUPlace(parallel_env.device_id)
elif core.is_compiled_with_mlu():
place = core.MLUPlace(parallel_env.device_id)
_set_expected_place(place)
# init nccl or hccl or bkcl or heter context
if is_cpu_only:
parallel_helper._set_parallel_ctx(
......@@ -274,6 +336,7 @@ def init_parallel_env():
if parallel_env.rank == 0:
http_server_d["running"] = False
http_server.join()
return group
def get_rank():
......
......@@ -360,9 +360,10 @@ def sync_params_buffers(model,
is_model_parallel=False):
model_vars = []
for _, param in model._obtain_parameters_buffers().items():
if not isinstance(param, core.VarBase):
raise TypeError("The data type of '%s' must be Varbase" %
param.name)
if not isinstance(param, (core.VarBase, core.eager.Tensor)):
raise TypeError(
"The data type of '%s' must be Varbase or eager.Tensor" %
param.name)
# is_distributed param not need to sync when in mp mode
if isinstance(param, ParamBase):
......
......@@ -60,6 +60,7 @@ list(APPEND DIST_TEST_OPS test_auto_parallel_data_unshard)
list(APPEND DIST_TEST_OPS test_auto_parallel_save_load)
list(APPEND DIST_TEST_OPS test_auto_parallel_autoconvert)
list(APPEND DIST_TEST_OPS test_collective_process_group)
list(APPEND DIST_TEST_OPS test_eager_dist_api)
set(MIXED_DIST_TEST_OPS ${DIST_TEST_OPS})
#remove distribute unittests.
list(APPEND MIXED_DIST_TEST_OPS test_dgc_op)
......@@ -311,6 +312,7 @@ if ((NOT WITH_GPU) AND (NOT WITH_ROCM))
LIST(REMOVE_ITEM TEST_OPS test_auto_parallel_save_load)
LIST(REMOVE_ITEM TEST_OPS test_auto_parallel_autoconvert)
LIST(REMOVE_ITEM TEST_OPS test_collective_process_group)
LIST(REMOVE_ITEM TEST_OPS test_eager_dist_api)
elseif(WITH_GPU)
if (${CUDNN_VERSION} VERSION_LESS 7100)
LIST(REMOVE_ITEM TEST_OPS test_conv2d_fusion_op)
......@@ -1147,6 +1149,7 @@ if(WITH_DISTRIBUTE AND WITH_GPU AND WITH_NCCL)
set_tests_properties(test_auto_parallel_save_load PROPERTIES TIMEOUT 120)
set_tests_properties(test_auto_parallel_autoconvert PROPERTIES TIMEOUT 120)
set_tests_properties(test_collective_process_group PROPERTIES TIMEOUT 120)
set_tests_properties(test_eager_dist_api PROPERTIES TIMEOUT 300)
if(${NCCL_VERSION} VERSION_GREATER_EQUAL 2212)
set_tests_properties(test_parallel_dygraph_sparse_embedding PROPERTIES TIMEOUT 120)
......
......@@ -37,11 +37,15 @@ class TestProcessGroupFp32(unittest.TestCase):
pass
def test_init_process_group(self):
paddle.distributed.collective._init_parallel_env()
paddle.distributed.collective._new_group()
with self.assertRaises(ValueError):
paddle.distributed.collective._new_group(
backend="gloo", group_name="_default_pg")
with _test_eager_guard():
paddle.distributed.init_parallel_env()
paddle.distributed.new_group()
group = paddle.distributed.new_group([-1, -2])
assert group.process_group == None
group = paddle.distributed.collective.Group(-1, 2, 0, [-1, -2])
ret = paddle.distributed.barrier(group)
assert ret == None
print("test ok\n")
......
......@@ -26,16 +26,16 @@ from datetime import timedelta
import paddle.fluid.core as core
from paddle.fluid.framework import _test_eager_guard
from paddle.fluid.dygraph.parallel import ParallelEnv
import paddle.distributed as dist
def init_process_group(strategy=None):
nranks = ParallelEnv().nranks
rank = ParallelEnv().local_rank
is_master = True if rank == 0 else False
store = paddle.fluid.core.TCPStore("127.0.0.1", 6173, is_master, nranks)
pg_group = core.ProcessGroupNCCL(store, rank, nranks)
pg_group = dist.init_parallel_env()
return pg_group
return pg_group.process_group
class TestProcessGroupFp32(unittest.TestCase):
......@@ -68,12 +68,10 @@ class TestProcessGroupFp32(unittest.TestCase):
sum_result = tensor_x + tensor_y
if pg.rank() == 0:
task = pg.allreduce(tensor_x)
task.wait()
task = dist.all_reduce(tensor_x)
assert np.array_equal(tensor_x, sum_result)
else:
task = pg.allreduce(tensor_y)
task.wait()
task = dist.all_reduce(tensor_y)
assert np.array_equal(tensor_y, sum_result)
print("test allreduce sum api ok")
......@@ -89,16 +87,41 @@ class TestProcessGroupFp32(unittest.TestCase):
max_result = paddle.maximum(tensor_x, tensor_y)
if pg.rank() == 0:
task = pg.allreduce(tensor_x, core.ReduceOp.MAX)
task = dist.all_reduce(
tensor_x, dist.ReduceOp.MAX, use_calc_stream=False)
task.wait()
assert np.array_equal(tensor_x, max_result)
else:
task = pg.allreduce(tensor_y, core.ReduceOp.MAX)
task = dist.all_reduce(
tensor_y, dist.ReduceOp.MAX, use_calc_stream=False)
task.wait()
assert np.array_equal(tensor_y, max_result)
print("test allreduce max api ok")
# test allreduce min
# rank 0
x = np.random.random(self.shape).astype(self.dtype)
tensor_x = paddle.to_tensor(x)
# rank 1
y = np.random.random(self.shape).astype(self.dtype)
tensor_y = paddle.to_tensor(y)
min_result = paddle.minimum(tensor_x, tensor_y)
if pg.rank() == 0:
task = dist.all_reduce(
tensor_x, dist.ReduceOp.MIN, use_calc_stream=False)
task.wait()
assert np.array_equal(tensor_x, min_result)
else:
task = dist.all_reduce(
tensor_y, dist.ReduceOp.MIN, use_calc_stream=False)
task.wait()
assert np.array_equal(tensor_y, min_result)
print("test allreduce min api ok")
# test broadcast
# rank 0
x = np.random.random(self.shape).astype(self.dtype)
......@@ -109,16 +132,14 @@ class TestProcessGroupFp32(unittest.TestCase):
broadcast_result = paddle.assign(tensor_x)
if pg.rank() == 0:
task = pg.broadcast(tensor_x, 0)
task = dist.broadcast(tensor_x, 0, use_calc_stream=False)
task.synchronize()
paddle.device.cuda.synchronize()
assert task.is_completed()
assert np.array_equal(broadcast_result, tensor_x)
else:
task = pg.broadcast(tensor_y, 0)
task.synchronize()
task = dist.broadcast(tensor_y, 0)
paddle.device.cuda.synchronize()
assert task.is_completed()
assert np.array_equal(broadcast_result, tensor_y)
print("test broadcast api ok")
......@@ -126,8 +147,7 @@ class TestProcessGroupFp32(unittest.TestCase):
# test barrier
# rank 0
if pg.rank() == 0:
task = pg.barrier()
task.wait()
dist.barrier()
# rank 1
else:
task = pg.barrier()
......@@ -151,9 +171,13 @@ class TestProcessGroupFp32(unittest.TestCase):
paddle.device.cuda.synchronize()
# rank 1
else:
task = pg.all_gather(tensor_y, tensor_out)
task.wait()
tensor_out_list = [
paddle.empty_like(tensor_x), paddle.empty_like(tensor_x)
]
task = dist.all_gather(
tensor_out_list, tensor_y, use_calc_stream=False)
paddle.device.cuda.synchronize()
tensor_out = paddle.concat(tensor_out_list)
out_1 = paddle.slice(tensor_out, [0], [0], [out_shape[0] // 2])
out_2 = paddle.slice(tensor_out, [0], [out_shape[0] // 2],
[out_shape[0]])
......@@ -178,12 +202,14 @@ class TestProcessGroupFp32(unittest.TestCase):
if pg.rank() == 0:
task = pg.alltoall(tensor_x, tensor_out1)
task.wait()
paddle.device.cuda.synchronize()
# rank 1
else:
task = pg.alltoall(tensor_y, tensor_out2)
task.wait()
in_1, in_2 = paddle.split(tensor_y, 2)
out_1, out_2 = paddle.split(tensor_out2, 2)
out_tensor_list = [out_1, out_2]
task = dist.alltoall([in_1, in_2], out_tensor_list)
paddle.device.cuda.synchronize()
tensor_out2 = paddle.concat(out_tensor_list)
out1_2 = paddle.slice(tensor_out1, [0], [self.shape[0] // 2],
[self.shape[0]])
out2_1 = paddle.slice(tensor_out2, [0], [0], [self.shape[0] // 2])
......@@ -201,18 +227,61 @@ class TestProcessGroupFp32(unittest.TestCase):
tensor_y = paddle.to_tensor(y)
sum_result = tensor_x + tensor_y
if pg.rank() == 0:
task = pg.reduce(tensor_x, 0)
task.wait()
task = dist.reduce(tensor_x, 0, use_calc_stream=True)
paddle.device.cuda.synchronize()
# rank 1
else:
task = pg.reduce(tensor_y, 0)
task = dist.reduce(tensor_y, 0, use_calc_stream=False)
task.wait()
paddle.device.cuda.synchronize()
if pg.rank() == 0:
assert np.array_equal(tensor_x, sum_result)
print("test reduce sum api ok\n")
# test reduce max
# rank 0
x = np.random.random(self.shape).astype(self.dtype)
tensor_x = paddle.to_tensor(x)
# rank 1
y = np.random.random(self.shape).astype(self.dtype)
tensor_y = paddle.to_tensor(y)
max_result = paddle.maximum(tensor_x, tensor_y)
if pg.rank() == 0:
task = dist.reduce(
tensor_x, 0, dist.ReduceOp.MAX, use_calc_stream=False)
task.wait()
assert np.array_equal(tensor_x, max_result)
else:
task = dist.reduce(
tensor_y, 0, dist.ReduceOp.MAX, use_calc_stream=False)
task.wait()
print("test reduce max api ok")
# test reduce min
# rank 0
x = np.random.random(self.shape).astype(self.dtype)
tensor_x = paddle.to_tensor(x)
# rank 1
y = np.random.random(self.shape).astype(self.dtype)
tensor_y = paddle.to_tensor(y)
min_result = paddle.minimum(tensor_x, tensor_y)
if pg.rank() == 0:
task = dist.reduce(
tensor_x, 0, dist.ReduceOp.MIN, use_calc_stream=False)
task.wait()
assert np.array_equal(tensor_x, min_result)
else:
task = dist.reduce(
tensor_y, 0, dist.ReduceOp.MIN, use_calc_stream=False)
task.wait()
print("test reduce min api ok")
# test Scatter
# rank 0
in_shape = list(self.shape)
......@@ -222,12 +291,14 @@ class TestProcessGroupFp32(unittest.TestCase):
tensor_x = paddle.to_tensor(x)
tensor_y = paddle.to_tensor(y)
if pg.rank() == 0:
task = pg.scatter(tensor_x, tensor_y, 0)
task.wait()
in_1, in_2 = paddle.split(tensor_x, 2)
task = dist.scatter(
tensor_y, [in_1, in_2], 0, use_calc_stream=True)
#task.wait()
paddle.device.cuda.synchronize()
# rank 1
else:
task = pg.scatter(tensor_x, tensor_y, 0)
task = dist.scatter(tensor_y, [], 0, use_calc_stream=False)
task.wait()
paddle.device.cuda.synchronize()
out1 = paddle.slice(tensor_x, [0], [0], [self.shape[0]])
......@@ -239,6 +310,40 @@ class TestProcessGroupFp32(unittest.TestCase):
assert np.array_equal(tensor_y, out2)
print("test scatter api ok\n")
# test send min
# rank 0
x = np.random.random(self.shape).astype(self.dtype)
tensor_x = paddle.to_tensor(x)
# rank 1
y = np.random.random(self.shape).astype(self.dtype)
tensor_y = paddle.to_tensor(y)
if pg.rank() == 0:
task = dist.send(tensor_x, 1, use_calc_stream=False)
task.wait()
else:
task = dist.recv(tensor_y, 0, use_calc_stream=False)
task.wait()
assert np.array_equal(tensor_y, tensor_x)
print("test send api ok")
# test send min
# rank 0
x = np.random.random(self.shape).astype(self.dtype)
tensor_x = paddle.to_tensor(x)
# rank 1
y = np.random.random(self.shape).astype(self.dtype)
tensor_y = paddle.to_tensor(y)
if pg.rank() == 0:
task = dist.send(tensor_x, 1, use_calc_stream=True)
else:
task = dist.recv(tensor_y, 0, use_calc_stream=True)
assert np.array_equal(tensor_y, tensor_x)
print("test send api ok")
class TestProcessGroupFp16(TestProcessGroupFp32):
def setUp(self):
......
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import unittest
from test_parallel_dygraph_dataparallel import TestMultipleGpus
class TestProcessGroup(TestMultipleGpus):
def test_process_group_nccl(self):
self.run_mnist_2gpu('process_group_nccl.py')
def test_process_group_gloo(self):
self.run_mnist_2gpu('process_group_gloo.py')
def test_init_process_group(self):
self.run_mnist_2gpu('init_process_group.py')
if __name__ == "__main__":
unittest.main()
......@@ -46,7 +46,7 @@ class TestFleetDygraphSingle(unittest.TestCase):
def test_dygraph_single(self):
paddle.disable_static()
fleet.init(is_collective=True)
paddle.distributed.init_parallel_env()
layer = LinearNet()
loss_fn = nn.MSELoss()
......
......@@ -70,6 +70,9 @@ def start_local_trainers(cluster,
"PADDLE_CURRENT_ENDPOINT": "%s" % t.endpoint,
"PADDLE_TRAINERS_NUM": "%d" % cluster.trainers_nranks(),
"PADDLE_TRAINER_ENDPOINTS": ",".join(cluster.trainers_endpoints()),
"MASTER_ADDR": "127.0.0.1",
"MASTER_PORT": "6170",
"NCCL_DEBUG": "INFO",
"PADDLE_DISTRI_BACKEND":
"gloo", # make init_parallel_env get 'gloo' argument.
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册