未验证 提交 0c2a51d2 编写于 作者: W WangXi 提交者: GitHub

optimizer amp, all use fp16 communication, overlap last comm and compute (#28957)

上级 0b032fae
...@@ -20,8 +20,9 @@ namespace paddle { ...@@ -20,8 +20,9 @@ namespace paddle {
namespace operators { namespace operators {
template <typename T> template <typename T>
__global__ void GpuInverse(const T* s, T* o) { __global__ void InverseAndMemset(const T* s, T* o, bool* found_inf) {
*o = Inverse<T>(*s); *o = Inverse<T>(*s);
*found_inf = false;
} }
template <typename T> template <typename T>
...@@ -30,10 +31,11 @@ __global__ void CheckFiniteAndUnscale(const T* in, const T* scale, int num, ...@@ -30,10 +31,11 @@ __global__ void CheckFiniteAndUnscale(const T* in, const T* scale, int num,
const int idx = threadIdx.x + blockIdx.x * blockDim.x; const int idx = threadIdx.x + blockIdx.x * blockDim.x;
if (idx < num) { if (idx < num) {
if (!isfinite(in[idx])) { T val = in[idx] * (*scale);
out[idx] = val;
if (!isfinite(val)) {
*found_inf = true; *found_inf = true;
} }
out[idx] = *found_inf ? in[idx] : in[idx] * (*scale);
} }
} }
...@@ -49,13 +51,13 @@ class CheckFiniteAndUnscaleGpuKernel : public framework::OpKernel<T> { ...@@ -49,13 +51,13 @@ class CheckFiniteAndUnscaleGpuKernel : public framework::OpKernel<T> {
const T* scale_data = scale->data<T>(); const T* scale_data = scale->data<T>();
bool* found_inf_data = found_inf->mutable_data<bool>(dev_ctx.GetPlace()); bool* found_inf_data = found_inf->mutable_data<bool>(dev_ctx.GetPlace());
cudaMemset(found_inf_data, false, found_inf->numel() * sizeof(bool));
framework::Tensor inverse_scale = framework::Tensor inverse_scale =
ctx.AllocateTmpTensor<T, platform::CUDADeviceContext>({1}, dev_ctx); ctx.AllocateTmpTensor<T, platform::CUDADeviceContext>({1}, dev_ctx);
T* inverse_scale_v = inverse_scale.template data<T>(); T* inverse_scale_v = inverse_scale.template data<T>();
GpuInverse<T><<<1, 1, 0, dev_ctx.stream()>>>(scale_data, inverse_scale_v); InverseAndMemset<T><<<1, 1, 0, dev_ctx.stream()>>>(
scale_data, inverse_scale_v, found_inf_data);
for (size_t i = 0; i < xs.size(); ++i) { for (size_t i = 0; i < xs.size(); ++i) {
const auto* x = xs[i]; const auto* x = xs[i];
......
...@@ -61,13 +61,14 @@ class LazyZeroInputs<platform::CUDADeviceContext, T> { ...@@ -61,13 +61,14 @@ class LazyZeroInputs<platform::CUDADeviceContext, T> {
bool has_inf{false}; bool has_inf{false};
memory::Copy(platform::CPUPlace(), &has_inf, gpu_place, found_inf_data, memory::Copy(platform::CPUPlace(), &has_inf, gpu_place, found_inf_data,
sizeof(bool), dev_ctx.stream()); sizeof(bool), dev_ctx.stream());
dev_ctx.Wait(); // wait async copy
if (has_inf) { if (has_inf) {
VLOG(1) << "-- UpdateLossScaling: Infinite values are found in grads. --"; VLOG(1) << "-- UpdateLossScaling: Infinite values are found in grads. --";
for (size_t i = 0; i < xs.size(); ++i) { for (size_t i = 0; i < xs.size(); ++i) {
auto* out = outs[i]; auto* out = outs[i];
T* out_data = out->mutable_data<T>(dev_ctx.GetPlace()); T* out_data = out->mutable_data<T>(dev_ctx.GetPlace());
int num = out->numel(); int num = out->numel();
cudaMemset(out_data, 0, num * sizeof(T)); cudaMemsetAsync(out_data, 0, num * sizeof(T), dev_ctx.stream());
} }
} }
} }
......
...@@ -53,6 +53,15 @@ class AMPOptimizer(MetaOptimizerBase): ...@@ -53,6 +53,15 @@ class AMPOptimizer(MetaOptimizerBase):
config['incr_ratio'], config['decr_ratio'], config['incr_ratio'], config['decr_ratio'],
config['use_dynamic_loss_scaling']) config['use_dynamic_loss_scaling'])
# if worker_num > 1, all cards will communication with each other,
# add is_distributed to optimize amp, overlap communication and
# computation by split the check_finite_and_unscale op.
is_distributed = self.role_maker._worker_num() > 1
if self.user_defined_strategy.sharding:
# FIXME(wangxi). sharding failed when split check_finite_and_unscale
is_distributed = False
self.wrapped_opt._set_distributed(is_distributed)
def _can_apply(self): def _can_apply(self):
if not self.role_maker._is_collective: if not self.role_maker._is_collective:
return False return False
......
...@@ -61,6 +61,7 @@ class OptimizerWithMixedPrecision(object): ...@@ -61,6 +61,7 @@ class OptimizerWithMixedPrecision(object):
self._param_grads = None self._param_grads = None
self._train_program = None self._train_program = None
self._is_distributed = False
self._scaled_loss = None self._scaled_loss = None
self._loss_scaling = None self._loss_scaling = None
self._init_loss_scaling = init_loss_scaling self._init_loss_scaling = init_loss_scaling
...@@ -73,6 +74,12 @@ class OptimizerWithMixedPrecision(object): ...@@ -73,6 +74,12 @@ class OptimizerWithMixedPrecision(object):
self._num_good_steps = None self._num_good_steps = None
self._num_bad_steps = None self._num_bad_steps = None
def _set_distributed(self, flag):
# if distributed, all cards will communication with each other,
# overlap communication and computation by split the
# check_finite_and_unscale op.
self._is_distributed = flag
def get_loss_scaling(self): def get_loss_scaling(self):
"""Return the real-time loss scaling factor. """Return the real-time loss scaling factor.
""" """
...@@ -168,13 +175,28 @@ class OptimizerWithMixedPrecision(object): ...@@ -168,13 +175,28 @@ class OptimizerWithMixedPrecision(object):
""" """
grads = [g for _, g in params_grads] grads = [g for _, g in params_grads]
if not self._is_distributed:
with self._train_program._optimized_guard(grads): with self._train_program._optimized_guard(grads):
grads, found_inf = check_finite_and_unscale( grads, found_inf = check_finite_and_unscale(
grads, self._loss_scaling, name="find_infinite_scale") grads, self._loss_scaling, name="find_infinite_scale")
else:
# if distributed, split check_finite_and_unscale to overlap
# unscale with communication
found_infs = []
for p, g in params_grads:
with self._train_program._optimized_guard([p, g]):
_, found_inf = check_finite_and_unscale(
[g, ], self._loss_scaling, name="find_infinite_scale")
found_infs.append(found_inf)
if self._use_dynamic_loss_scaling: if self._use_dynamic_loss_scaling:
with self._train_program._optimized_guard(grads): if self._is_distributed:
grads = update_loss_scaling( with self._train_program._optimized_guard([]):
all_infs = layers.concat(found_infs)
found_inf = layers.reduce_any(all_infs)
with self._train_program._optimized_guard([]):
update_loss_scaling(
grads, grads,
found_inf, found_inf,
self._loss_scaling, self._loss_scaling,
...@@ -186,13 +208,7 @@ class OptimizerWithMixedPrecision(object): ...@@ -186,13 +208,7 @@ class OptimizerWithMixedPrecision(object):
self._decr_ratio, self._decr_ratio,
name="update_loss_scaling") name="update_loss_scaling")
params_unscaled_grads = [] optimize_ops = self._optimizer.apply_gradients(params_grads)
for pg, new_g in zip(params_grads, grads):
params_unscaled_grads.append((pg[0], new_g))
# apply_gradient append all ops in global block, thus we shouldn't
# apply gradient in the switch branch.
optimize_ops = self._optimizer.apply_gradients(params_unscaled_grads)
return optimize_ops return optimize_ops
def apply_optimize(self, loss, startup_program, params_grads): def apply_optimize(self, loss, startup_program, params_grads):
......
...@@ -19,6 +19,7 @@ import paddle.distributed.fleet as fleet ...@@ -19,6 +19,7 @@ import paddle.distributed.fleet as fleet
from paddle.distributed.fleet.meta_optimizers import AMPOptimizer from paddle.distributed.fleet.meta_optimizers import AMPOptimizer
import os import os
from fleet_meta_optimizer_base import TestFleetMetaOptimizer from fleet_meta_optimizer_base import TestFleetMetaOptimizer
import paddle.distributed.fleet.base.role_maker as role_maker
paddle.enable_static() paddle.enable_static()
...@@ -32,7 +33,10 @@ class TestFleetAMPOptimizer(TestFleetMetaOptimizer): ...@@ -32,7 +33,10 @@ class TestFleetAMPOptimizer(TestFleetMetaOptimizer):
opt = fluid.optimizer.MomentumOptimizer( opt = fluid.optimizer.MomentumOptimizer(
learning_rate=0.001, momentum=0.9) learning_rate=0.001, momentum=0.9)
opt = AMPOptimizer(opt) opt = AMPOptimizer(opt)
opt.user_defined_strategy = strategy
self.set_strategy(strategy, 'amp')
role = role_maker.PaddleCloudRoleMaker(is_collective=True)
opt._set_basic_info(avg_cost, role, opt, strategy)
params_grads = opt.backward(avg_cost, startup_prog) params_grads = opt.backward(avg_cost, startup_prog)
ops = [op.type for op in avg_cost.block.ops] ops = [op.type for op in avg_cost.block.ops]
...@@ -47,7 +51,10 @@ class TestFleetAMPOptimizer(TestFleetMetaOptimizer): ...@@ -47,7 +51,10 @@ class TestFleetAMPOptimizer(TestFleetMetaOptimizer):
opt = fluid.optimizer.MomentumOptimizer( opt = fluid.optimizer.MomentumOptimizer(
learning_rate=0.001, momentum=0.9) learning_rate=0.001, momentum=0.9)
opt = AMPOptimizer(opt) opt = AMPOptimizer(opt)
opt.user_defined_strategy = strategy
self.set_strategy(strategy, 'amp')
role = role_maker.PaddleCloudRoleMaker(is_collective=True)
opt._set_basic_info(avg_cost, role, opt, strategy)
params_grads = opt.backward(avg_cost, startup_prog) params_grads = opt.backward(avg_cost, startup_prog)
with fluid.program_guard(train_prog, startup_prog): with fluid.program_guard(train_prog, startup_prog):
opt.apply_gradients(params_grads) opt.apply_gradients(params_grads)
...@@ -64,7 +71,10 @@ class TestFleetAMPOptimizer(TestFleetMetaOptimizer): ...@@ -64,7 +71,10 @@ class TestFleetAMPOptimizer(TestFleetMetaOptimizer):
opt = fluid.optimizer.MomentumOptimizer( opt = fluid.optimizer.MomentumOptimizer(
learning_rate=0.001, momentum=0.9) learning_rate=0.001, momentum=0.9)
opt = AMPOptimizer(opt) opt = AMPOptimizer(opt)
opt.user_defined_strategy = strategy
self.set_strategy(strategy, 'amp')
role = role_maker.PaddleCloudRoleMaker(is_collective=True)
opt._set_basic_info(avg_cost, role, opt, strategy)
params_grads = opt.backward(avg_cost, startup_prog) params_grads = opt.backward(avg_cost, startup_prog)
opt.apply_optimize(avg_cost, startup_prog, params_grads) opt.apply_optimize(avg_cost, startup_prog, params_grads)
...@@ -83,6 +93,22 @@ class TestFleetAMPOptimizer(TestFleetMetaOptimizer): ...@@ -83,6 +93,22 @@ class TestFleetAMPOptimizer(TestFleetMetaOptimizer):
self.assertIn('cast', ops) self.assertIn('cast', ops)
self.assertIn('check_finite_and_unscale', ops) self.assertIn('check_finite_and_unscale', ops)
def test_amp_distributed_optimizer(self):
""" test amp when distributed """
train_prog, startup_prog = fluid.Program(), fluid.Program()
avg_cost, strategy = self.net(train_prog, startup_prog)
self.set_strategy(strategy, 'amp')
self.optimizer(avg_cost, strategy, train_prog, startup_prog)
ops = [op.type for op in avg_cost.block.ops]
self.assertIn('cast', ops)
self.assertIn('check_finite_and_unscale', ops)
check_count = 0
for name in ops:
if name == 'check_finite_and_unscale':
check_count += 1
self.assertEqual(check_count, len(train_prog.all_parameters()))
def test_amp_recompute_optimizer(self): def test_amp_recompute_optimizer(self):
""" test amp + recompute """ """ test amp + recompute """
train_prog, startup_prog = fluid.Program(), fluid.Program() train_prog, startup_prog = fluid.Program(), fluid.Program()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册