From 003b4616f61b0dc8818ed74808dc07c83a338116 Mon Sep 17 00:00:00 2001 From: Baibaifan <39549453+Baibaifan@users.noreply.github.com> Date: Thu, 10 Jun 2021 15:54:44 +0800 Subject: [PATCH] dp c_allreduce_sum_fusion op (#33169) --- .../framework/distributed_strategy.proto | 1 + paddle/fluid/operators/coalesce_tensor_op.cc | 27 +- .../fluid/platform/device_memory_aligment.cc | 4 +- .../fluid/platform/device_memory_aligment.h | 2 + .../paddle/distributed/fleet/ascend_utils.py | 5 +- .../fleet/base/distributed_strategy.py | 21 ++ .../meta_optimizers/raw_program_optimizer.py | 271 +++++++++++++++++- .../contrib/mixed_precision/decorator.py | 15 +- .../npu/test_coalesce_tensor_op_npu.py | 110 +++++++ 9 files changed, 447 insertions(+), 9 deletions(-) create mode 100644 python/paddle/fluid/tests/unittests/npu/test_coalesce_tensor_op_npu.py diff --git a/paddle/fluid/framework/distributed_strategy.proto b/paddle/fluid/framework/distributed_strategy.proto index 181e3b68853..be05941efb5 100644 --- a/paddle/fluid/framework/distributed_strategy.proto +++ b/paddle/fluid/framework/distributed_strategy.proto @@ -176,6 +176,7 @@ message DistributedStrategy { optional bool find_unused_parameters = 28 [ default = false ]; optional bool tensor_parallel = 29 [ default = false ]; optional bool without_graph_optimization = 30 [ default = false ]; + optional int32 fuse_grad_size_in_num = 31 [ default = 1 ]; optional RecomputeConfig recompute_configs = 101; optional AMPConfig amp_configs = 102; diff --git a/paddle/fluid/operators/coalesce_tensor_op.cc b/paddle/fluid/operators/coalesce_tensor_op.cc index 153fa529f96..c1c4f14582e 100644 --- a/paddle/fluid/operators/coalesce_tensor_op.cc +++ b/paddle/fluid/operators/coalesce_tensor_op.cc @@ -120,6 +120,7 @@ class CoalesceTensorOpKernel : public framework::OpKernel { : len; } } else if (context.Attr("set_constant")) { + // TODO(Liu yuang) ADD NPU SET_CONSTANT FUNCTION. math::SetConstant set_constant; set_constant(dev_ctx, fused_tensor, static_cast(context.Attr("constant"))); @@ -145,6 +146,14 @@ class CoalesceTensorOpKernel : public framework::OpKernel { offset = 0; std::stringstream ss; ss << "alloc_space_for_vars: "; +#if defined(PADDLE_WITH_ASCEND_CL) + auto stream = + context.template device_context() + .stream(); + platform::NPUMemsetAsync( + static_cast(fused_tensor->mutable_data(dev_ctx.GetPlace())), + 0.0, fused_tensor->numel() * sizeof(T), stream); +#endif for (size_t i = 0; i < out_tensors.size(); ++i) { size_t len = static_cast(out_tensors[i]->numel()); auto dim = out_tensors[i]->dims(); @@ -160,6 +169,12 @@ class CoalesceTensorOpKernel : public framework::OpKernel { ss << "output(" << out_var_names[i] << ") dim:(" << dim << ")" << " address: " << out_tensors[i]->data() << ", "; } + PADDLE_ENFORCE_EQ( + (int64_t)offset, fused_tensor->numel(), + platform::errors::InvalidArgument( + "The alloc_space_for_vars's offset: %s is unequal with " + "fused_tensor's numel: %s.", + offset, fused_tensor->numel())); VLOG(10) << ss.str(); } @@ -191,13 +206,13 @@ class CoalesceTensorOpKernel : public framework::OpKernel { ss << "input(" << var_names[i] << ") dim:(" << lod_tensors[i]->dims() << ") " << " addres:" << lod_tensors[i]->data() << ", "; + *numel += use_align ? platform::Alignment( static_cast(size) * size_of_dtype, place) / size_of_dtype : static_cast(size); } - VLOG(10) << ss.str(); } }; @@ -309,6 +324,16 @@ REGISTER_OP_XPU_KERNEL( ops::CoalesceTensorOpKernel); #endif +#if defined(PADDLE_WITH_ASCEND_CL) +REGISTER_OP_NPU_KERNEL( + coalesce_tensor, + ops::CoalesceTensorOpKernel, + ops::CoalesceTensorOpKernel, + ops::CoalesceTensorOpKernel, + ops::CoalesceTensorOpKernel); +#endif + REGISTER_OP_VERSION(coalesce_tensor) .AddCheckpoint( R"ROC( diff --git a/paddle/fluid/platform/device_memory_aligment.cc b/paddle/fluid/platform/device_memory_aligment.cc index f8e03110441..185646e7327 100644 --- a/paddle/fluid/platform/device_memory_aligment.cc +++ b/paddle/fluid/platform/device_memory_aligment.cc @@ -26,9 +26,11 @@ size_t Alignment(size_t size, const platform::Place &place) { #elif defined(PADDLE_WITH_XPU) // TODO(wangxi): add XpuMinChunkSize alignment = alignment; +#elif defined(PADDLE_WITH_ASCEND_CL) + alignment = NPUMinChunkSize(); #else PADDLE_THROW(platform::errors::PreconditionNotMet( - "Fluid is not compiled with CUDA.")); + "Fluid is not compiled with CUDA or NPU.")); #endif } size_t remaining = size % alignment; diff --git a/paddle/fluid/platform/device_memory_aligment.h b/paddle/fluid/platform/device_memory_aligment.h index a151e434833..e0f2f0f11c9 100644 --- a/paddle/fluid/platform/device_memory_aligment.h +++ b/paddle/fluid/platform/device_memory_aligment.h @@ -19,6 +19,8 @@ limitations under the License. */ #include "paddle/fluid/platform/place.h" #if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP) #include "paddle/fluid/platform/gpu_info.h" +#elif defined(PADDLE_WITH_ASCEND_CL) +#include "paddle/fluid/platform/npu_info.h" #endif namespace paddle { diff --git a/python/paddle/distributed/fleet/ascend_utils.py b/python/paddle/distributed/fleet/ascend_utils.py index 27437c50fad..2f6c210165e 100644 --- a/python/paddle/distributed/fleet/ascend_utils.py +++ b/python/paddle/distributed/fleet/ascend_utils.py @@ -80,8 +80,9 @@ def _get_ascend_rankfile(rank_table_file_path): nodes = os.getenv("DLS_TASK_NUMBER", None) assert nodes is not None, "DLS_TASK_NUMBER didn't set!" for node in range(int(nodes)): - node_ip = os.getenv(f"VC_CUSTOM{node}_HOSTS", None) - assert node_ip is not None, f"VC_CUSTOM{node}_HOSTS didn't set!" + node_ip = os.getenv("VC_CUSTOM{}_HOSTS".format(node), None) + assert node_ip is not None, "VC_CUSTOM{}_HOSTS didn't set!".format( + node) node_ips.append(node_ip) return node_ips, device_count node_ips.append(server['server_id']) diff --git a/python/paddle/distributed/fleet/base/distributed_strategy.py b/python/paddle/distributed/fleet/base/distributed_strategy.py index 508d2986869..e44a0e0459d 100644 --- a/python/paddle/distributed/fleet/base/distributed_strategy.py +++ b/python/paddle/distributed/fleet/base/distributed_strategy.py @@ -853,6 +853,27 @@ class DistributedStrategy(object): "WARNING: without_graph_optimization should have value of bool type" ) + @property + def fuse_grad_size_in_num(self): + """ + This based on raw_program_optimizer program and allreduce the num of the fused op + Examples: + .. code-block:: python + import paddle.distributed.fleet as fleet + strategy = fleet.DistributedStrategy() + strategy.fuse_grad_size_in_num = 2 + """ + return self.strategy.fuse_grad_size_in_num + + @fuse_grad_size_in_num.setter + @is_strict_auto + def fuse_grad_size_in_num(self, num): + if isinstance(num, int): + self.strategy.fuse_grad_size_in_num = num + else: + print( + "WARNING: fuse_grad_size_in_num should have value of int32 type") + @property def pipeline(self): """ diff --git a/python/paddle/distributed/fleet/meta_optimizers/raw_program_optimizer.py b/python/paddle/distributed/fleet/meta_optimizers/raw_program_optimizer.py index b232d8c9c49..1333f794cc9 100755 --- a/python/paddle/distributed/fleet/meta_optimizers/raw_program_optimizer.py +++ b/python/paddle/distributed/fleet/meta_optimizers/raw_program_optimizer.py @@ -1,4 +1,4 @@ -# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,9 +14,12 @@ from __future__ import print_function from __future__ import division import os +import collections +import numpy as np import paddle.fluid as fluid from paddle.fluid import core, unique_name +from paddle.fluid.dygraph import Layer, LayerList from ..base.private_helper_function import wait_server_ready from .meta_optimizer_base import MetaOptimizerBase from .common import OpRole, OP_ROLE_KEY, OP_ROLE_VAR_KEY, CollectiveHelper, is_loss_grad_op, is_backward_op, is_optimizer_op @@ -38,6 +41,9 @@ class RawProgramOptimizer(MetaOptimizerBase): super(RawProgramOptimizer, self)._set_basic_info( loss, role_maker, user_defined_optimizer, user_defined_strategy) self.without_graph_optimization = user_defined_strategy.without_graph_optimization + self.fuse_all_reduce_ops = user_defined_strategy.fuse_all_reduce_ops + if self.fuse_all_reduce_ops: + self.fuse_grad_size_in_num = user_defined_strategy.fuse_grad_size_in_num def _can_apply(self): if not self.role_maker._is_collective: @@ -124,7 +130,11 @@ class RawProgramOptimizer(MetaOptimizerBase): def _transpile_main_program(self, loss): self._insert_loss_grad_ops(loss) - self._insert_allreduce_ops() + if self.fuse_all_reduce_ops and core.is_compiled_with_npu(): + self._calc_stream = True + self._allreduce_fusion_program() + else: + self._insert_allreduce_ops() def _insert_loss_grad_ops(self, loss): """ @@ -195,3 +205,260 @@ class RawProgramOptimizer(MetaOptimizerBase): attrs={'ring_id': ring_id, OP_ROLE_KEY: OpRole.Backward}) break + + # TODO(Liu yuang): ADD CUDA allreduce_fusion fuction. + # This function helps reduce the input of allreduce by integrating can save communication time. + def _allreduce_fusion_program(self): + block = self.main_program.global_block() + ring_id = self.global_ring_id + record_idx, allreduce_input_vars, allreduce_output_vars = [], [], [] + block_ops = len(list(enumerate(block.ops))) + + for idx, op in reversed(list(enumerate(block.ops))): + if is_backward_op(op) and \ + OP_ROLE_VAR_KEY in op.attr_names: + op_role_var = op.attr(OP_ROLE_VAR_KEY) + if len(op_role_var) == 0: + continue + assert len(op_role_var) % 2 == 0 + for i in range(0, len(op_role_var), 2): + param_name = op_role_var[i] + param = block.var(param_name) + grad_name = op_role_var[i + 1] + grad = block.var(grad_name) + if param.is_distributed: + continue + if ".cast_fp16@GRAD" in grad_name: + param_name = param_name + ".cast_fp16" + if not block.has_var(param_name): + raise ValueError("op cast name error {}".format( + op.type)) + else: + param = block.var(param_name) + + if len(allreduce_output_vars) == 0: + allreduce_output_vars.append([grad]) + allreduce_input_vars.append([param]) + if self.fuse_grad_size_in_num == 1: + record_idx.append([idx, idx]) + continue + record_idx.append([-2, idx]) + elif len(allreduce_output_vars[ + -1]) == self.fuse_grad_size_in_num: + allreduce_output_vars.append([grad]) + allreduce_input_vars.append([param]) + if self.fuse_grad_size_in_num == 1: + record_idx.append([idx, idx]) + continue + if idx != block_ops - 1: + record_idx.append([-2, idx]) + else: + allreduce_output_vars[-1].append(grad) + allreduce_input_vars[-1].append(param) + record_idx[-1][0] = idx + + if record_idx[-1][0] == -2: + record_idx[-1][0] = record_idx[-1][1] + + assert len(allreduce_output_vars) == len( + record_idx + ), "It has different lens between the allreduce_output_vars and record_idx." + + if not allreduce_output_vars or not allreduce_input_vars: + return + + self.vars = collections.OrderedDict() + index, offset_pos, pos, offset = 0, 0, 0, 0 + start, end = record_idx[index] + men_list = [end, start] + + # Here we need to explain the flag. When integrating OP, we will encounter different groups of the same Op. + # Because we insert coalesce tensor in reverse ops, + # we need to use flag to record whether the current OP has been inserted into coalesce tensor。 + # For example: + # [(3, 2), (2, 2), (1, 0)], (3, 2), (2, 2) using same op, but in different groups. + + for idx, op in reversed(list(enumerate(block.ops))): + if idx == start: + pos = 0 + flag = True if end == men_list[-1] else False + offset = offset_pos if flag else 0 + done_output_vars, done_input_vars = self._split_fuction( + allreduce_output_vars[index], allreduce_input_vars[index]) + for id_, done_output_var in enumerate(done_output_vars): + if flag: + tmp_var = block.create_var( + name=unique_name.generate( + 'FusedOutput_{}_{}'.format(start, id_ + + offset)), + dtype=done_output_var[0].dtype, + persistable=False, + stop_gradient=True) + self.vars['FusedOutput_{}_{}'.format(start, id_ + + offset)] = tmp_var + + block._insert_op( + idx + id_ + offset, + type="coalesce_tensor", + inputs={"Input": done_input_vars[id_]}, + outputs={ + "Output": done_output_var, + "FusedOutput": tmp_var + }, + attrs={ + "copy_data": False, + "use_align": True, + "dtype": done_output_var[0].dtype + }) + pos += 1 + else: + tmp_var = block.create_var( + name=unique_name.generate( + 'FusedOutput_{}_{}'.format(start, id_)), + dtype=done_output_var[0].dtype, + persistable=False, + stop_gradient=True) + self.vars['FusedOutput_{}_{}'.format(start, + id_)] = tmp_var + + block._insert_op( + idx + id_, + type="coalesce_tensor", + inputs={"Input": done_input_vars[id_]}, + outputs={ + "Output": done_output_var, + "FusedOutput": tmp_var + }, + attrs={ + "copy_data": False, + "use_align": True, + "dtype": done_output_var[0].dtype + }) + pos += 1 + offset_pos = pos + + # TODO(Liu yuang): ADD CUDA and NPU's EVENT and c_allreduce_sum. + for id_ in range(len(done_output_vars)): + if flag: + block._insert_op( + end + id_ + pos + 1, + type='c_allreduce_sum', + inputs={ + 'X': self.vars['FusedOutput_{}_{}'.format( + start, id_ + offset)] + }, + outputs={ + 'Out': self.vars['FusedOutput_{}_{}'.format( + start, id_ + offset)] + }, + attrs={ + 'ring_id': ring_id, + 'use_calc_stream': True + if self._calc_stream else False, + OP_ROLE_KEY: OpRole.Backward + }) + else: + block._insert_op( + end + id_ + pos + 1, + type='c_allreduce_sum', + inputs={ + 'X': self.vars['FusedOutput_{}_{}'.format(start, + id_)] + }, + outputs={ + 'Out': self.vars['FusedOutput_{}_{}'.format( + start, id_)] + }, + attrs={ + 'ring_id': ring_id, + 'use_calc_stream': True + if self._calc_stream else False, + OP_ROLE_KEY: OpRole.Backward + }) + index += 1 + men_list.append(end) + men_list.append(start) + if len(record_idx) == index: + start = end = -1 + continue + start, end = record_idx[index] + + if not self._calc_stream: + for idx, op in enumerate(block.ops): + if is_optimizer_op(op): + block._insert_op( + idx, + type='c_sync_comm_stream', + inputs={'X': block.create_var()}, + outputs={'Out': block.create_var()}, + attrs={ + 'ring_id': ring_id, + OP_ROLE_KEY: OpRole.Backward + }) + break + + # Integrate grads of the same type to form a combination. If skip_comb is selected, will return grads of the same group. + # For example:[(fp16, fp16), (fp32), (fp16)] -> [(fp16, fp16, fp16), (fp32)] + def _split_fuction(self, + allreduce_output_vars, + allreduce_input_vars, + skip_comb=True): + input_vars, final_input_vars, output_vars, final_output_vars = [], [], [], [] + if len(allreduce_output_vars) - 1 == 0: + final_output_vars.append(allreduce_output_vars) + final_input_vars.append(allreduce_input_vars) + return final_output_vars, final_input_vars + + for idx in range(len(allreduce_input_vars) - 1): + if allreduce_input_vars[idx].dtype == allreduce_input_vars[idx + + 1].dtype: + input_vars.append(allreduce_input_vars[idx]) + if idx == len(allreduce_input_vars) - 2: + input_vars.append(allreduce_input_vars[idx + 1]) + final_input_vars.append(input_vars) + else: + input_vars.append(allreduce_input_vars[idx]) + final_input_vars.append(input_vars) + input_vars = [] + if idx == len(allreduce_input_vars) - 2: + input_vars.append(allreduce_input_vars[idx + 1]) + final_input_vars.append(input_vars) + + for idx in range(len(allreduce_output_vars) - 1): + if allreduce_output_vars[idx].dtype == allreduce_output_vars[ + idx + 1].dtype: + output_vars.append(allreduce_output_vars[idx]) + if idx == len(allreduce_output_vars) - 2: + output_vars.append(allreduce_output_vars[idx + 1]) + final_output_vars.append(output_vars) + else: + output_vars.append(allreduce_output_vars[idx]) + final_output_vars.append(output_vars) + output_vars = [] + if idx == len(allreduce_output_vars) - 2: + output_vars.append(allreduce_output_vars[idx + 1]) + final_output_vars.append(output_vars) + if skip_comb: + input_fp16_vars, input_fp32_vars, output_fp16_vars, output_fp32_vars = [], [], [], [] + for final_input_var in final_input_vars: + if final_input_var[0].dtype == core.VarDesc.VarType.FP16: + input_fp16_vars.extend(final_input_var) + else: + input_fp32_vars.extend(final_input_var) + + for final_output_var in final_output_vars: + if final_output_var[0].dtype == core.VarDesc.VarType.FP16: + output_fp16_vars.extend(final_output_var) + else: + output_fp32_vars.extend(final_output_var) + final_output_vars, final_input_vars = [], [] + if output_fp16_vars: + final_output_vars.append(output_fp16_vars) + if output_fp32_vars: + final_output_vars.append(output_fp32_vars) + if input_fp16_vars: + final_input_vars.append(input_fp16_vars) + if input_fp32_vars: + final_input_vars.append(input_fp32_vars) + + return final_output_vars, final_input_vars diff --git a/python/paddle/fluid/contrib/mixed_precision/decorator.py b/python/paddle/fluid/contrib/mixed_precision/decorator.py index 3cb9fe75559..d5d2e7a0d96 100644 --- a/python/paddle/fluid/contrib/mixed_precision/decorator.py +++ b/python/paddle/fluid/contrib/mixed_precision/decorator.py @@ -303,14 +303,23 @@ class OptimizerWithMixedPrecision(object): if self._is_distributed: # if distributed, split check_finite_and_unscale to overlap # unscale with communication - for p, g in params_grads: - with self._train_program._optimized_guard([p, g]): + if core.is_compiled_with_npu(): + with self._train_program._optimized_guard(grads): _, found_inf = check_finite_and_unscale( - [g, ], + grads, self._loss_scaling, name="find_infinite_scale", float_status=self._float_status) found_infs.append(found_inf) + else: + for p, g in params_grads: + with self._train_program._optimized_guard([p, g]): + _, found_inf = check_finite_and_unscale( + [g, ], + self._loss_scaling, + name="find_infinite_scale", + float_status=self._float_status) + found_infs.append(found_inf) elif self._use_pure_fp16: if fp32_grads: with self._train_program._optimized_guard(fp32_grads): diff --git a/python/paddle/fluid/tests/unittests/npu/test_coalesce_tensor_op_npu.py b/python/paddle/fluid/tests/unittests/npu/test_coalesce_tensor_op_npu.py new file mode 100644 index 00000000000..37fa5f8cad2 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/npu/test_coalesce_tensor_op_npu.py @@ -0,0 +1,110 @@ +# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import numpy as np +import unittest +import sys +sys.path.append("..") +from op_test import OpTest +import paddle +import paddle.fluid as fluid +from paddle.fluid import core + +paddle.enable_static() +SEED = 2021 +alignment = 512 + + +@unittest.skipIf(not paddle.is_compiled_with_npu(), + "core is not compiled with NPU") +class TestAllocContinuousSpace(OpTest): + def setUp(self): + self.__class__.use_npu = True + self.op_type = "coalesce_tensor" + self.dtype, self.fluid_dtype = self.init_dtype() + attrs = self.init_attr() + self.copy_data = attrs["copy_data"] + self.constant = attrs["constant"] + self.set_constant = attrs["set_constant"] + self.Inputs = self.init_input() + self.Outputs, self.FusedOutput = self.init_output( + self.Inputs, self.set_constant, self.constant) + self.inputs = {'Input': self.Inputs} + self.attrs = attrs + self.outputs = {'Output': self.Outputs, 'FusedOutput': self.FusedOutput} + + def init_dtype(self): + return np.float32, int(core.VarDesc.VarType.FP32) + + def init_input(self): + inputs = [] + inputs.append(("x1", np.zeros([20, 3]).astype(self.dtype))) + inputs.append(("x2", np.zeros([20, 3]).astype(self.dtype))) + return inputs + + def init_attr(self): + return { + "copy_data": False, + "set_constant": False, + "constant": 0.0, + "use_align": True, + "dtype": self.fluid_dtype + } + + def init_output(self, input_list, set_constant, constant): + inputs = [] + outputs = input_list + + for input in input_list: + length = len(input[1].flatten()) + aligned_len = (length + alignment) / alignment * alignment + out = np.zeros(int(aligned_len), dtype=self.dtype) + out[0:length] = input[1].flatten() + inputs.append(out) + + coalesce_tensor_var = np.concatenate([input for input in inputs]) + return outputs, coalesce_tensor_var + + def test_check_output(self): + self.check_output_with_place( + place=paddle.NPUPlace(0), + no_check_set=["FusedOutput"], + atol=1e-5, + check_dygraph=False) + + +@unittest.skipIf(not paddle.is_compiled_with_npu(), + "core is not compiled with NPU") +class TestAllocContinuousSpace2(TestAllocContinuousSpace): + def init_attr(self): + return { + "copy_data": True, + "set_constant": False, + "constant": 0.5, + "use_align": True, + "dtype": self.fluid_dtype + } + + def test_check_output(self): + self.check_output_with_place( + place=paddle.NPUPlace(0), + no_check_set=["FusedOutput"], + atol=1e-5, + check_dygraph=False) + + +if __name__ == '__main__': + unittest.main() -- GitLab