未验证 提交 1f79fd47 编写于 作者: Y Yuang Liu 提交者: GitHub

pp comm overlap use tensor fusion helper (#55540)

上级 6216beb3
......@@ -36,7 +36,11 @@ if _use_four_directions:
else:
from .pp_utils import p2p_communication as p2p
from .pp_utils.utils import HOOK_ACTION, FusedCommBuffer, assign_group_by_size
from paddle.distributed.fleet.utils.tensor_fusion_helper import (
assign_group_by_size,
)
from .pp_utils.utils import HOOK_ACTION, FusedCommBuffer
__all__ = []
......
......@@ -12,27 +12,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from collections import OrderedDict
import numpy as np
import paddle
from paddle import _legacy_C_ops
from paddle.distributed.fleet.meta_parallel.sharding.group_sharded_storage import (
GradStorage,
from paddle.distributed.fleet.utils.tensor_fusion_helper import (
flatten_dense_tensors,
)
from paddle.fluid import core
from paddle.framework import base as imperative_base
alignment = {
"gpu": 256,
}
align = {
paddle.float16.value: 2,
paddle.bfloat16.value: 2,
paddle.float32.value: 4,
}
__all__ = []
......@@ -131,35 +118,6 @@ def _all_gather(tensor, group=None, use_calc_stream=True):
)
def flatten_dense_tensors(parameters, use_main_grad=False):
_buffer_size = 0
_param2align = {}
dtype = paddle.float32 if use_main_grad else parameters[0].dtype
for param in parameters:
assert param.trainable, "param must be trainable..."
size = np.prod(param.shape) * align[dtype]
remaining = size % alignment["gpu"]
ali = 0 if remaining == 0 else alignment["gpu"] - remaining
align_ = ali // align[dtype]
_buffer_size += np.prod(param.shape) + align_
_param2align[param.name] = align_
# process gradient
grad_storage = GradStorage(
size=_buffer_size,
dtype=dtype,
device="gpu",
destination="0",
parm2align=_param2align,
)
for param in parameters:
grad_storage.add_grad(param, _param2align[param.name])
return grad_storage.buffer
class FusedCommBuffer:
def __init__(self, id, params, comm_group, acc_steps=1, act=None, dst=-1):
self._id = id
......@@ -188,8 +146,11 @@ class FusedCommBuffer:
self._init_step_dict()
self.grad_storage = flatten_dense_tensors(
self._params, self.use_main_grad
)
self._params,
use_main_grad=self.use_main_grad,
fuse_param=False,
warp_buffer=False,
).buffer
self._record_addr()
......@@ -272,22 +233,3 @@ class FusedCommBuffer:
self.grad_storage.scale_(scale_factor)
self._reset_params_checked_in()
def assign_group_by_size(parameters, group_size=128 * 1024 * 1024):
group_idx = 0
memory_counter = 0
var_groups = OrderedDict()
dtype = parameters[0].dtype
for var in parameters:
bytes = np.prod(var.shape) * core.size_of_dtype(var.dtype)
if memory_counter < group_size and dtype == var.dtype:
memory_counter += bytes
else:
memory_counter = bytes
dtype = var.dtype
group_idx += 1
var_groups.setdefault(group_idx, []).append(var)
return var_groups
......@@ -30,8 +30,7 @@ align = {
}
def assign_group_by_size(parameters, group_size=256 * 1024 * 1024):
# TODO(Yuang Liu): make pp_utils/utils use this tensor fusion helper
def assign_group_by_size(parameters, group_size=128 * 1024 * 1024):
is_sparse_gradient = [False] * len(parameters)
group_indices = core.eager_assign_group_by_size(
......@@ -45,7 +44,9 @@ def assign_group_by_size(parameters, group_size=256 * 1024 * 1024):
return var_groups
def flatten_dense_tensors(parameters, use_main_grad):
def flatten_dense_tensors(
parameters, use_main_grad=False, fuse_param=True, warp_buffer=False
):
from paddle.distributed.fleet.meta_parallel.sharding.group_sharded_storage import (
GradStorage,
ParamStorage,
......@@ -64,9 +65,11 @@ def flatten_dense_tensors(parameters, use_main_grad):
_buffer_size += np.prod(param.shape) + align_
_param2align[param.name] = align_
param_storage = ParamStorage(size=_buffer_size, dtype=dtype, device="gpu")
param_storage.add_rank_params(parameters, _param2align)
if fuse_param:
param_storage = ParamStorage(
size=_buffer_size, dtype=dtype, device="gpu"
)
param_storage.add_rank_params(parameters, _param2align)
# process gradient
grad_dtype = paddle.float32 if use_main_grad else dtype
......@@ -81,27 +84,35 @@ def flatten_dense_tensors(parameters, use_main_grad):
for param in parameters:
grad_storage.add_grad(param, _param2align[param.name])
param_storage.warp_buffer()
grad_storage.warp_buffer()
if warp_buffer:
if fuse_param:
param_storage.warp_buffer()
grad_storage.warp_buffer()
if not use_main_grad:
# param_storage --> grad_storage
param_storage.buffer._copy_gradient_from(grad_storage.buffer)
if fuse_param:
if not use_main_grad:
# param_storage --> grad_storage
param_storage.buffer._copy_gradient_from(grad_storage.buffer)
else:
param_storage.buffer.main_grad = grad_storage.buffer
param_storage.buffer.stop_gradient = False
return param_storage, grad_storage
else:
param_storage.buffer.main_grad = grad_storage.buffer
param_storage.buffer.stop_gradient = False
return param_storage, grad_storage
return grad_storage
def obtain_storage(parameters, use_main_grad, clip, dist):
if len(parameters) < 1:
return []
var_groups = assign_group_by_size(parameters)
var_groups = assign_group_by_size(parameters, group_size=256 * 1024 * 1024)
storage = []
for group_idx, parameters in var_groups.items():
param_storage, grad_storage = flatten_dense_tensors(
parameters, use_main_grad
parameters,
use_main_grad=use_main_grad,
fuse_param=True,
warp_buffer=True,
)
param_storage.buffer.need_clip = clip
param_storage.buffer.is_distributed = dist
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册