未验证 提交 003b4616 编写于 作者: B Baibaifan 提交者: GitHub

dp c_allreduce_sum_fusion op (#33169)

上级 1410d722
......@@ -176,6 +176,7 @@ message DistributedStrategy {
optional bool find_unused_parameters = 28 [ default = false ];
optional bool tensor_parallel = 29 [ default = false ];
optional bool without_graph_optimization = 30 [ default = false ];
optional int32 fuse_grad_size_in_num = 31 [ default = 1 ];
optional RecomputeConfig recompute_configs = 101;
optional AMPConfig amp_configs = 102;
......
......@@ -120,6 +120,7 @@ class CoalesceTensorOpKernel : public framework::OpKernel<T> {
: len;
}
} else if (context.Attr<bool>("set_constant")) {
// TODO(Liu yuang) ADD NPU SET_CONSTANT FUNCTION.
math::SetConstant<DeviceContext, T> set_constant;
set_constant(dev_ctx, fused_tensor,
static_cast<T>(context.Attr<float>("constant")));
......@@ -145,6 +146,14 @@ class CoalesceTensorOpKernel : public framework::OpKernel<T> {
offset = 0;
std::stringstream ss;
ss << "alloc_space_for_vars: ";
#if defined(PADDLE_WITH_ASCEND_CL)
auto stream =
context.template device_context<paddle::platform::NPUDeviceContext>()
.stream();
platform::NPUMemsetAsync(
static_cast<void *>(fused_tensor->mutable_data<T>(dev_ctx.GetPlace())),
0.0, fused_tensor->numel() * sizeof(T), stream);
#endif
for (size_t i = 0; i < out_tensors.size(); ++i) {
size_t len = static_cast<size_t>(out_tensors[i]->numel());
auto dim = out_tensors[i]->dims();
......@@ -160,6 +169,12 @@ class CoalesceTensorOpKernel : public framework::OpKernel<T> {
ss << "output(" << out_var_names[i] << ") dim:(" << dim << ")"
<< " address: " << out_tensors[i]->data<void>() << ", ";
}
PADDLE_ENFORCE_EQ(
(int64_t)offset, fused_tensor->numel(),
platform::errors::InvalidArgument(
"The alloc_space_for_vars's offset: %s is unequal with "
"fused_tensor's numel: %s.",
offset, fused_tensor->numel()));
VLOG(10) << ss.str();
}
......@@ -191,13 +206,13 @@ class CoalesceTensorOpKernel : public framework::OpKernel<T> {
ss << "input(" << var_names[i] << ") dim:(" << lod_tensors[i]->dims()
<< ") "
<< " addres:" << lod_tensors[i]->data<void>() << ", ";
*numel += use_align
? platform::Alignment(
static_cast<size_t>(size) * size_of_dtype, place) /
size_of_dtype
: static_cast<size_t>(size);
}
VLOG(10) << ss.str();
}
};
......@@ -309,6 +324,16 @@ REGISTER_OP_XPU_KERNEL(
ops::CoalesceTensorOpKernel<paddle::platform::XPUDeviceContext, double>);
#endif
#if defined(PADDLE_WITH_ASCEND_CL)
REGISTER_OP_NPU_KERNEL(
coalesce_tensor,
ops::CoalesceTensorOpKernel<paddle::platform::CPUDeviceContext, int>,
ops::CoalesceTensorOpKernel<paddle::platform::CPUDeviceContext, float>,
ops::CoalesceTensorOpKernel<paddle::platform::CPUDeviceContext,
plat::float16>,
ops::CoalesceTensorOpKernel<paddle::platform::CPUDeviceContext, double>);
#endif
REGISTER_OP_VERSION(coalesce_tensor)
.AddCheckpoint(
R"ROC(
......
......@@ -26,9 +26,11 @@ size_t Alignment(size_t size, const platform::Place &place) {
#elif defined(PADDLE_WITH_XPU)
// TODO(wangxi): add XpuMinChunkSize
alignment = alignment;
#elif defined(PADDLE_WITH_ASCEND_CL)
alignment = NPUMinChunkSize();
#else
PADDLE_THROW(platform::errors::PreconditionNotMet(
"Fluid is not compiled with CUDA."));
"Fluid is not compiled with CUDA or NPU."));
#endif
}
size_t remaining = size % alignment;
......
......@@ -19,6 +19,8 @@ limitations under the License. */
#include "paddle/fluid/platform/place.h"
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
#include "paddle/fluid/platform/gpu_info.h"
#elif defined(PADDLE_WITH_ASCEND_CL)
#include "paddle/fluid/platform/npu_info.h"
#endif
namespace paddle {
......
......@@ -80,8 +80,9 @@ def _get_ascend_rankfile(rank_table_file_path):
nodes = os.getenv("DLS_TASK_NUMBER", None)
assert nodes is not None, "DLS_TASK_NUMBER didn't set!"
for node in range(int(nodes)):
node_ip = os.getenv(f"VC_CUSTOM{node}_HOSTS", None)
assert node_ip is not None, f"VC_CUSTOM{node}_HOSTS didn't set!"
node_ip = os.getenv("VC_CUSTOM{}_HOSTS".format(node), None)
assert node_ip is not None, "VC_CUSTOM{}_HOSTS didn't set!".format(
node)
node_ips.append(node_ip)
return node_ips, device_count
node_ips.append(server['server_id'])
......
......@@ -853,6 +853,27 @@ class DistributedStrategy(object):
"WARNING: without_graph_optimization should have value of bool type"
)
@property
def fuse_grad_size_in_num(self):
"""
This based on raw_program_optimizer program and allreduce the num of the fused op
Examples:
.. code-block:: python
import paddle.distributed.fleet as fleet
strategy = fleet.DistributedStrategy()
strategy.fuse_grad_size_in_num = 2
"""
return self.strategy.fuse_grad_size_in_num
@fuse_grad_size_in_num.setter
@is_strict_auto
def fuse_grad_size_in_num(self, num):
if isinstance(num, int):
self.strategy.fuse_grad_size_in_num = num
else:
print(
"WARNING: fuse_grad_size_in_num should have value of int32 type")
@property
def pipeline(self):
"""
......
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
......@@ -14,9 +14,12 @@
from __future__ import print_function
from __future__ import division
import os
import collections
import numpy as np
import paddle.fluid as fluid
from paddle.fluid import core, unique_name
from paddle.fluid.dygraph import Layer, LayerList
from ..base.private_helper_function import wait_server_ready
from .meta_optimizer_base import MetaOptimizerBase
from .common import OpRole, OP_ROLE_KEY, OP_ROLE_VAR_KEY, CollectiveHelper, is_loss_grad_op, is_backward_op, is_optimizer_op
......@@ -38,6 +41,9 @@ class RawProgramOptimizer(MetaOptimizerBase):
super(RawProgramOptimizer, self)._set_basic_info(
loss, role_maker, user_defined_optimizer, user_defined_strategy)
self.without_graph_optimization = user_defined_strategy.without_graph_optimization
self.fuse_all_reduce_ops = user_defined_strategy.fuse_all_reduce_ops
if self.fuse_all_reduce_ops:
self.fuse_grad_size_in_num = user_defined_strategy.fuse_grad_size_in_num
def _can_apply(self):
if not self.role_maker._is_collective:
......@@ -124,7 +130,11 @@ class RawProgramOptimizer(MetaOptimizerBase):
def _transpile_main_program(self, loss):
self._insert_loss_grad_ops(loss)
self._insert_allreduce_ops()
if self.fuse_all_reduce_ops and core.is_compiled_with_npu():
self._calc_stream = True
self._allreduce_fusion_program()
else:
self._insert_allreduce_ops()
def _insert_loss_grad_ops(self, loss):
"""
......@@ -195,3 +205,260 @@ class RawProgramOptimizer(MetaOptimizerBase):
attrs={'ring_id': ring_id,
OP_ROLE_KEY: OpRole.Backward})
break
# TODO(Liu yuang): ADD CUDA allreduce_fusion fuction.
# This function helps reduce the input of allreduce by integrating can save communication time.
def _allreduce_fusion_program(self):
block = self.main_program.global_block()
ring_id = self.global_ring_id
record_idx, allreduce_input_vars, allreduce_output_vars = [], [], []
block_ops = len(list(enumerate(block.ops)))
for idx, op in reversed(list(enumerate(block.ops))):
if is_backward_op(op) and \
OP_ROLE_VAR_KEY in op.attr_names:
op_role_var = op.attr(OP_ROLE_VAR_KEY)
if len(op_role_var) == 0:
continue
assert len(op_role_var) % 2 == 0
for i in range(0, len(op_role_var), 2):
param_name = op_role_var[i]
param = block.var(param_name)
grad_name = op_role_var[i + 1]
grad = block.var(grad_name)
if param.is_distributed:
continue
if ".cast_fp16@GRAD" in grad_name:
param_name = param_name + ".cast_fp16"
if not block.has_var(param_name):
raise ValueError("op cast name error {}".format(
op.type))
else:
param = block.var(param_name)
if len(allreduce_output_vars) == 0:
allreduce_output_vars.append([grad])
allreduce_input_vars.append([param])
if self.fuse_grad_size_in_num == 1:
record_idx.append([idx, idx])
continue
record_idx.append([-2, idx])
elif len(allreduce_output_vars[
-1]) == self.fuse_grad_size_in_num:
allreduce_output_vars.append([grad])
allreduce_input_vars.append([param])
if self.fuse_grad_size_in_num == 1:
record_idx.append([idx, idx])
continue
if idx != block_ops - 1:
record_idx.append([-2, idx])
else:
allreduce_output_vars[-1].append(grad)
allreduce_input_vars[-1].append(param)
record_idx[-1][0] = idx
if record_idx[-1][0] == -2:
record_idx[-1][0] = record_idx[-1][1]
assert len(allreduce_output_vars) == len(
record_idx
), "It has different lens between the allreduce_output_vars and record_idx."
if not allreduce_output_vars or not allreduce_input_vars:
return
self.vars = collections.OrderedDict()
index, offset_pos, pos, offset = 0, 0, 0, 0
start, end = record_idx[index]
men_list = [end, start]
# Here we need to explain the flag. When integrating OP, we will encounter different groups of the same Op.
# Because we insert coalesce tensor in reverse ops,
# we need to use flag to record whether the current OP has been inserted into coalesce tensor。
# For example:
# [(3, 2), (2, 2), (1, 0)], (3, 2), (2, 2) using same op, but in different groups.
for idx, op in reversed(list(enumerate(block.ops))):
if idx == start:
pos = 0
flag = True if end == men_list[-1] else False
offset = offset_pos if flag else 0
done_output_vars, done_input_vars = self._split_fuction(
allreduce_output_vars[index], allreduce_input_vars[index])
for id_, done_output_var in enumerate(done_output_vars):
if flag:
tmp_var = block.create_var(
name=unique_name.generate(
'FusedOutput_{}_{}'.format(start, id_ +
offset)),
dtype=done_output_var[0].dtype,
persistable=False,
stop_gradient=True)
self.vars['FusedOutput_{}_{}'.format(start, id_ +
offset)] = tmp_var
block._insert_op(
idx + id_ + offset,
type="coalesce_tensor",
inputs={"Input": done_input_vars[id_]},
outputs={
"Output": done_output_var,
"FusedOutput": tmp_var
},
attrs={
"copy_data": False,
"use_align": True,
"dtype": done_output_var[0].dtype
})
pos += 1
else:
tmp_var = block.create_var(
name=unique_name.generate(
'FusedOutput_{}_{}'.format(start, id_)),
dtype=done_output_var[0].dtype,
persistable=False,
stop_gradient=True)
self.vars['FusedOutput_{}_{}'.format(start,
id_)] = tmp_var
block._insert_op(
idx + id_,
type="coalesce_tensor",
inputs={"Input": done_input_vars[id_]},
outputs={
"Output": done_output_var,
"FusedOutput": tmp_var
},
attrs={
"copy_data": False,
"use_align": True,
"dtype": done_output_var[0].dtype
})
pos += 1
offset_pos = pos
# TODO(Liu yuang): ADD CUDA and NPU's EVENT and c_allreduce_sum.
for id_ in range(len(done_output_vars)):
if flag:
block._insert_op(
end + id_ + pos + 1,
type='c_allreduce_sum',
inputs={
'X': self.vars['FusedOutput_{}_{}'.format(
start, id_ + offset)]
},
outputs={
'Out': self.vars['FusedOutput_{}_{}'.format(
start, id_ + offset)]
},
attrs={
'ring_id': ring_id,
'use_calc_stream': True
if self._calc_stream else False,
OP_ROLE_KEY: OpRole.Backward
})
else:
block._insert_op(
end + id_ + pos + 1,
type='c_allreduce_sum',
inputs={
'X': self.vars['FusedOutput_{}_{}'.format(start,
id_)]
},
outputs={
'Out': self.vars['FusedOutput_{}_{}'.format(
start, id_)]
},
attrs={
'ring_id': ring_id,
'use_calc_stream': True
if self._calc_stream else False,
OP_ROLE_KEY: OpRole.Backward
})
index += 1
men_list.append(end)
men_list.append(start)
if len(record_idx) == index:
start = end = -1
continue
start, end = record_idx[index]
if not self._calc_stream:
for idx, op in enumerate(block.ops):
if is_optimizer_op(op):
block._insert_op(
idx,
type='c_sync_comm_stream',
inputs={'X': block.create_var()},
outputs={'Out': block.create_var()},
attrs={
'ring_id': ring_id,
OP_ROLE_KEY: OpRole.Backward
})
break
# Integrate grads of the same type to form a combination. If skip_comb is selected, will return grads of the same group.
# For example:[(fp16, fp16), (fp32), (fp16)] -> [(fp16, fp16, fp16), (fp32)]
def _split_fuction(self,
allreduce_output_vars,
allreduce_input_vars,
skip_comb=True):
input_vars, final_input_vars, output_vars, final_output_vars = [], [], [], []
if len(allreduce_output_vars) - 1 == 0:
final_output_vars.append(allreduce_output_vars)
final_input_vars.append(allreduce_input_vars)
return final_output_vars, final_input_vars
for idx in range(len(allreduce_input_vars) - 1):
if allreduce_input_vars[idx].dtype == allreduce_input_vars[idx +
1].dtype:
input_vars.append(allreduce_input_vars[idx])
if idx == len(allreduce_input_vars) - 2:
input_vars.append(allreduce_input_vars[idx + 1])
final_input_vars.append(input_vars)
else:
input_vars.append(allreduce_input_vars[idx])
final_input_vars.append(input_vars)
input_vars = []
if idx == len(allreduce_input_vars) - 2:
input_vars.append(allreduce_input_vars[idx + 1])
final_input_vars.append(input_vars)
for idx in range(len(allreduce_output_vars) - 1):
if allreduce_output_vars[idx].dtype == allreduce_output_vars[
idx + 1].dtype:
output_vars.append(allreduce_output_vars[idx])
if idx == len(allreduce_output_vars) - 2:
output_vars.append(allreduce_output_vars[idx + 1])
final_output_vars.append(output_vars)
else:
output_vars.append(allreduce_output_vars[idx])
final_output_vars.append(output_vars)
output_vars = []
if idx == len(allreduce_output_vars) - 2:
output_vars.append(allreduce_output_vars[idx + 1])
final_output_vars.append(output_vars)
if skip_comb:
input_fp16_vars, input_fp32_vars, output_fp16_vars, output_fp32_vars = [], [], [], []
for final_input_var in final_input_vars:
if final_input_var[0].dtype == core.VarDesc.VarType.FP16:
input_fp16_vars.extend(final_input_var)
else:
input_fp32_vars.extend(final_input_var)
for final_output_var in final_output_vars:
if final_output_var[0].dtype == core.VarDesc.VarType.FP16:
output_fp16_vars.extend(final_output_var)
else:
output_fp32_vars.extend(final_output_var)
final_output_vars, final_input_vars = [], []
if output_fp16_vars:
final_output_vars.append(output_fp16_vars)
if output_fp32_vars:
final_output_vars.append(output_fp32_vars)
if input_fp16_vars:
final_input_vars.append(input_fp16_vars)
if input_fp32_vars:
final_input_vars.append(input_fp32_vars)
return final_output_vars, final_input_vars
......@@ -303,14 +303,23 @@ class OptimizerWithMixedPrecision(object):
if self._is_distributed:
# if distributed, split check_finite_and_unscale to overlap
# unscale with communication
for p, g in params_grads:
with self._train_program._optimized_guard([p, g]):
if core.is_compiled_with_npu():
with self._train_program._optimized_guard(grads):
_, found_inf = check_finite_and_unscale(
[g, ],
grads,
self._loss_scaling,
name="find_infinite_scale",
float_status=self._float_status)
found_infs.append(found_inf)
else:
for p, g in params_grads:
with self._train_program._optimized_guard([p, g]):
_, found_inf = check_finite_and_unscale(
[g, ],
self._loss_scaling,
name="find_infinite_scale",
float_status=self._float_status)
found_infs.append(found_inf)
elif self._use_pure_fp16:
if fp32_grads:
with self._train_program._optimized_guard(fp32_grads):
......
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import numpy as np
import unittest
import sys
sys.path.append("..")
from op_test import OpTest
import paddle
import paddle.fluid as fluid
from paddle.fluid import core
paddle.enable_static()
SEED = 2021
alignment = 512
@unittest.skipIf(not paddle.is_compiled_with_npu(),
"core is not compiled with NPU")
class TestAllocContinuousSpace(OpTest):
def setUp(self):
self.__class__.use_npu = True
self.op_type = "coalesce_tensor"
self.dtype, self.fluid_dtype = self.init_dtype()
attrs = self.init_attr()
self.copy_data = attrs["copy_data"]
self.constant = attrs["constant"]
self.set_constant = attrs["set_constant"]
self.Inputs = self.init_input()
self.Outputs, self.FusedOutput = self.init_output(
self.Inputs, self.set_constant, self.constant)
self.inputs = {'Input': self.Inputs}
self.attrs = attrs
self.outputs = {'Output': self.Outputs, 'FusedOutput': self.FusedOutput}
def init_dtype(self):
return np.float32, int(core.VarDesc.VarType.FP32)
def init_input(self):
inputs = []
inputs.append(("x1", np.zeros([20, 3]).astype(self.dtype)))
inputs.append(("x2", np.zeros([20, 3]).astype(self.dtype)))
return inputs
def init_attr(self):
return {
"copy_data": False,
"set_constant": False,
"constant": 0.0,
"use_align": True,
"dtype": self.fluid_dtype
}
def init_output(self, input_list, set_constant, constant):
inputs = []
outputs = input_list
for input in input_list:
length = len(input[1].flatten())
aligned_len = (length + alignment) / alignment * alignment
out = np.zeros(int(aligned_len), dtype=self.dtype)
out[0:length] = input[1].flatten()
inputs.append(out)
coalesce_tensor_var = np.concatenate([input for input in inputs])
return outputs, coalesce_tensor_var
def test_check_output(self):
self.check_output_with_place(
place=paddle.NPUPlace(0),
no_check_set=["FusedOutput"],
atol=1e-5,
check_dygraph=False)
@unittest.skipIf(not paddle.is_compiled_with_npu(),
"core is not compiled with NPU")
class TestAllocContinuousSpace2(TestAllocContinuousSpace):
def init_attr(self):
return {
"copy_data": True,
"set_constant": False,
"constant": 0.5,
"use_align": True,
"dtype": self.fluid_dtype
}
def test_check_output(self):
self.check_output_with_place(
place=paddle.NPUPlace(0),
no_check_set=["FusedOutput"],
atol=1e-5,
check_dygraph=False)
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册