未验证 提交 c036c5c0 编写于 作者: S sneaxiy 提交者: GitHub

Add fused_allreduce_gradients_with_group for PPFleetX (#47447)

* add fused_allreduce_gradients_with_group

* add scale

* fix ci
上级 17fb92b3
...@@ -26,7 +26,7 @@ from .log_util import logger ...@@ -26,7 +26,7 @@ from .log_util import logger
__all__ = [] __all__ = []
def _apply_collective_grads(parameters, comm_group): def _apply_collective_grads(parameters, comm_group, bucket_size, scale=None):
grad_var_set = set() grad_var_set = set()
grad_vars = [] grad_vars = []
sparse_grad_vars = [] sparse_grad_vars = []
...@@ -41,16 +41,21 @@ def _apply_collective_grads(parameters, comm_group): ...@@ -41,16 +41,21 @@ def _apply_collective_grads(parameters, comm_group):
assert g_var not in grad_var_set assert g_var not in grad_var_set
grad_var_set.add(g_var) grad_var_set.add(g_var)
coalesced_grads_and_vars = build_groups(grad_vars, 128 * 1024 * 1024) coalesced_grads_and_vars = build_groups(grad_vars, bucket_size)
nranks = ( nranks = (
paddle.distributed.get_world_size() paddle.distributed.get_world_size()
if comm_group is None if comm_group is None
else comm_group.nranks else comm_group.nranks
) )
scale = nranks if scale is None else 1.0 / scale
scale = None if scale == 1.0 else scale
for coalesced_grad, _, _ in coalesced_grads_and_vars: for coalesced_grad, _, _ in coalesced_grads_and_vars:
# need to div nranks # need to div nranks
div_factor = paddle.to_tensor(nranks, dtype=coalesced_grad.dtype) if scale is not None:
div_factor = paddle.to_tensor(scale, dtype=coalesced_grad.dtype)
paddle.fluid.framework._dygraph_tracer().trace_op( paddle.fluid.framework._dygraph_tracer().trace_op(
type="elementwise_div", type="elementwise_div",
inputs={'X': coalesced_grad, 'Y': div_factor}, inputs={'X': coalesced_grad, 'Y': div_factor},
...@@ -62,7 +67,9 @@ def _apply_collective_grads(parameters, comm_group): ...@@ -62,7 +67,9 @@ def _apply_collective_grads(parameters, comm_group):
_split_tensors(coalesced_grads_and_vars) _split_tensors(coalesced_grads_and_vars)
def _apply_collective_grads_eager(parameters, comm_group): def _apply_collective_grads_eager(
parameters, comm_group, bucket_size, scale=None
):
grad_var_set = set() grad_var_set = set()
grad_vars = [] grad_vars = []
...@@ -76,16 +83,21 @@ def _apply_collective_grads_eager(parameters, comm_group): ...@@ -76,16 +83,21 @@ def _apply_collective_grads_eager(parameters, comm_group):
assert g_var not in grad_var_set assert g_var not in grad_var_set
grad_var_set.add(g_var) grad_var_set.add(g_var)
coalesced_grads_and_vars = build_groups(grad_vars, 128 * 1024 * 1024) coalesced_grads_and_vars = build_groups(grad_vars, bucket_size)
nranks = ( nranks = (
paddle.distributed.get_world_size() paddle.distributed.get_world_size()
if comm_group is None if comm_group is None
else comm_group.nranks else comm_group.nranks
) )
scale = 1.0 / nranks if scale is None else scale
scale = None if scale == 1.0 else scale
for coalesced_grad, _, _ in coalesced_grads_and_vars: for coalesced_grad, _, _ in coalesced_grads_and_vars:
# need to div nranks # need to div nranks
coalesced_grad.scale_(1.0 / nranks) if scale is not None:
coalesced_grad.scale_(scale)
paddle.distributed.all_reduce(coalesced_grad, group=comm_group) paddle.distributed.all_reduce(coalesced_grad, group=comm_group)
_split_tensors(coalesced_grads_and_vars) _split_tensors(coalesced_grads_and_vars)
...@@ -172,16 +184,22 @@ def broadcast_dp_parameters(model, hcg): ...@@ -172,16 +184,22 @@ def broadcast_dp_parameters(model, hcg):
) )
def fused_allreduce_gradients(parameter_list, hcg): def fused_allreduce_gradients_with_group(
data_parallel_group = None if hcg is None else hcg.get_data_parallel_group() parameter_list, group, bucket_size=128 * 1024 * 1024, scale=None
logger.debug("dp start fuse allreduce gradients") ):
apply_func = ( apply_func = (
_apply_collective_grads_eager _apply_collective_grads_eager
if in_dygraph_mode() if in_dygraph_mode()
else _apply_collective_grads else _apply_collective_grads
) )
with framework.no_grad(): with framework.no_grad():
apply_func(parameter_list, data_parallel_group) apply_func(parameter_list, group, bucket_size)
def fused_allreduce_gradients(parameter_list, hcg):
data_parallel_group = None if hcg is None else hcg.get_data_parallel_group()
logger.debug("dp start fuse allreduce gradients")
fused_allreduce_gradients_with_group(parameter_list, data_parallel_group)
def sharding_reduce_gradients(parameter_list, hcg): def sharding_reduce_gradients(parameter_list, hcg):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册