未验证 提交 ed724065 编写于 作者: M mapingshuo 提交者: GitHub

add gradient Merge Optimizer (#25625)

上级 f45f8363
......@@ -4933,3 +4933,223 @@ class LookaheadOptimizer(object):
with switch.default():
pass
return mini_out
class GradientMergeOptimizer(object):
"""
Gradient Merge, also called as Gradient Accumulation,
is a training strategy for larger batches. With this strategy,
the parameter will not be updated until specific steps.
For each step, the forward network and the backward network
will run to calculate the gradient of the parameters.
For every k step, the optimization network will run,
applying a specific optimization method (such as SGD, Adam)
to the parameters.
Args:
inner_optimizer (Optimizer): The specific optimization (such as SGD, Adam)
which update the parameters
k_steps (int): the update period of the parameters
avg (bool): whether to average the gradients of each mini-batch,
the default value is `True`
Examples:
.. code-block:: python
import paddle.fluid as fluid
import numpy as np
def gen_data(batch_size):
return {"x": np.random.random(size=(batch_size, 32)).astype('float32'),
"y": np.random.random(size=(batch_size, 1)).astype('int64')}
def mlp(input_x, input_y, hid_dim=128, label_dim=2):
fc_1 = fluid.layers.fc(input=input_x, size=hid_dim)
prediction = fluid.layers.fc(input=[fc_1], size=label_dim, act='softmax')
cost = fluid.layers.cross_entropy(input=prediction, label=input_y)
sum_cost = fluid.layers.reduce_mean(cost)
return sum_cost, fc_1, prediction
input_x = fluid.layers.data(name="x", shape=[32], dtype='float32')
input_y = fluid.layers.data(name="y", shape=[1], dtype='int64')
cost, fc_1, pred = mlp(input_x, input_y)
sgd = fluid.optimizer.Adam(learning_rate=0.01)
sgd = fluid.optimizer.GradientMergeOptimizer(sgd, k_steps=4, avg=True)
sgd.minimize(cost)
place = fluid.CPUPlace()
exe = fluid.Executor(place)
exe.run(fluid.default_startup_program())
for i in range(10):
cost_val = exe.run(feed=gen_data(32),
program=fluid.default_main_program(),
fetch_list=[cost.name])
print("step=%d, cost=%f" % (i, cost_val[0]))
"""
def __init__(self, inner_optimizer, k_steps=1, avg=True):
if framework.in_dygraph_mode():
raise Exception(
"In dygraph, we don't support GradientMergeOptimizer."
"You can do Gradient merge by yourself with k-times forward + backward, "
"and one-time optimizer.minimize()")
assert (inner_optimizer is not None), "inner optimizer can not be None"
assert (isinstance(k_steps, int) and
k_steps > 0), "k_steps should be a positive integer"
self.inner_optimizer = inner_optimizer
self.k_steps = k_steps
self.type = "gradient_merge"
self.avg = avg
def minimize(self,
loss,
startup_program=None,
parameter_list=None,
no_grad_set=None):
assert isinstance(loss, Variable), "The loss should be an Variable."
assert (
parameter_list is None
), "The parameter_list should be None when using GradientMergeOptimizer"
assert (
no_grad_set is None
), "The no_grad_set should be None when using GradientMergeOptimizer"
params_grads = self.inner_optimizer.backward(
loss, startup_program=startup_program)
#TODO(mapingshuo) support sparse embedding
for k, v in params_grads:
assert (
v.type != core.VarDesc.VarType.SELECTED_ROWS
), "SELECTED_ROWS is not supported in GradientMergeOptimizer for now"
param_to_grad = {k.name: v for (k, v) in params_grads}
# Get startup_program and main_program
if startup_program is None:
startup_program = default_startup_program()
main_block = loss.block
# add some vars to the main_program and startup_program
startup_block = startup_program.global_block()
param_names = param_to_grad.keys()
param_to_gradient_merge = {}
for param_name in param_names:
param_var = main_block.var(param_name)
assert (param_var is not None)
gradient_merge_var = main_block.create_var(
name=param_name + "@GRAD@GradientMerge",
shape=param_var.shape,
dtype=param_var.dtype,
persistable=True)
param_to_gradient_merge[param_name] = gradient_merge_var
startup_gradient_merge_var = startup_block.create_var(
name=param_name + "@GRAD@GradientMerge",
shape=param_var.shape,
dtype=param_var.dtype,
persistable=True)
startup_block.append_op(
type="fill_constant",
outputs={"Out": startup_gradient_merge_var},
attrs={
"shape": param_var.shape,
"dtype": param_var.dtype,
"value": float(0),
})
with framework.program_guard(main_block.program, startup_program):
# Add Var k to main prog and startup prog
gradient_merge_k = layers.create_global_var(
name="gradient_merge_k",
shape=[1],
value=int(self.k_steps),
dtype='int32',
persistable=True)
# Add Var step
gradient_merge_step = layers.create_global_var(
name="gradient_merge_step",
shape=[1],
value=int(0),
dtype='int32',
persistable=True)
layers.increment(x=gradient_merge_step, value=1.0, in_place=True)
# gradient merge
zero_var = layers.fill_constant(
shape=[1], dtype='float32', value=0.0)
one_var = layers.fill_constant(
shape=[1], dtype='float32', value=1.0)
mod = layers.elementwise_mod(gradient_merge_step, gradient_merge_k)
with layers.control_flow.Switch() as switch:
with switch.case(mod != zero_var):
# 1. update the gradient_merge_vars
# gradient_merge_vars += gradient_vars
cur_block = main_block.program.current_block()
for param_name in param_names:
grad = param_to_grad[param_name]
grad_merge = param_to_gradient_merge[param_name]
cur_block.append_op(
type="elementwise_add",
inputs={'X': grad,
'Y': grad_merge},
outputs={'Out': grad_merge},
attrs={'axis': -1,
'use_mkldnn': False})
with switch.default():
# 1. update the graient_vars
# gradient_vars += gradient_merge_vars
cur_block_idx = main_block.program.current_block_idx
cur_block = main_block.program.current_block()
for param_name in param_names:
grad = param_to_grad[param_name]
grad_merge = param_to_gradient_merge[param_name]
if self.avg:
tmp_var = layers.elementwise_add(grad, grad_merge)
cur_block.append_op(
type='scale',
inputs={'X': tmp_var},
outputs={'Out': grad},
attrs={
'scale': 1.0 / self.k_steps,
'bias': 0.0,
'bias_after_scale': False
})
else:
cur_block.append_op(
type="elementwise_add",
inputs={'X': grad,
'Y': grad_merge},
outputs={'Out': grad},
attrs={'axis': -1,
'use_mkldnn': False})
# 2. apply_optimize
target_grad_block = main_block.program._create_block(
parent_idx=cur_block.parent_idx)
target_grad_block._set_forward_block_idx(cur_block_idx)
main_block.program.current_block_idx = cur_block_idx
optimize_ops = self.inner_optimizer.apply_optimize(
loss,
startup_program=startup_program,
params_grads=params_grads)
# 3. clear gradient_merge_vars
for param_name in param_names:
grad_merge = param_to_gradient_merge[param_name]
layers.fill_constant(
shape=grad_merge.shape,
dtype=grad_merge.dtype,
value=0.0,
out=grad_merge)
return optimize_ops, params_grads
......@@ -948,5 +948,82 @@ class TestRecomputeOptimizerCUDA(unittest.TestCase):
self.assertEqual(drop_vec[0].tolist(), drop_vec[1].tolist())
class TestGradientMergeOptimizer(unittest.TestCase):
def net(self):
program = framework.Program()
block = program.global_block()
mul_x = block.create_parameter(
dtype="float32", shape=[5, 10], lod_level=0, name="mul.x")
mul_y = block.create_var(
dtype="float32", shape=[10, 8], lod_level=0, name="mul.y")
mul_out = block.create_var(
dtype="float32", shape=[5, 8], lod_level=0, name="mul.out")
b1 = block.create_parameter(
dtype="float32", shape=[5, 8], lod_level=0, name="b1")
b1_out = block.create_var(
dtype="float32", shape=[5, 8], lod_level=0, name="b1_out")
mean_out = block.create_var(
dtype="float32", shape=[1], lod_level=0, name="mean.out")
block.append_op(
type="mul",
inputs={"X": mul_x,
"Y": mul_y},
outputs={"Out": mul_out},
attrs={"x_num_col_dims": 1})
block.append_op(
type="elementwise_add",
inputs={"X": mul_out,
"Y": b1},
outputs={"Out": b1_out})
block.append_op(
type="mean", inputs={"X": b1_out}, outputs={"Out": mean_out})
return mean_out
def test_program_desc(self, ):
cost = self.net()
main_program = cost.block.program
init_program = framework.Program()
self.assertEqual(main_program.num_blocks, 1)
self.assertEqual(len(cost.block.ops), 3)
self.assertEqual([op.type for op in cost.block.ops],
["mul", "elementwise_add", "mean"])
opt = optimizer.SGD(learning_rate=1.0)
opt = optimizer.GradientMergeOptimizer(opt, k_steps=4)
with framework.program_guard(main_program, init_program):
ops, params_grads = opt.minimize(cost)
self.assertEqual(main_program.num_blocks, 4)
# main block
self.assertEqual(len(cost.block.ops), 17)
self.assertEqual([op.type for op in cost.block.ops], [
'mul', 'elementwise_add', 'mean', 'fill_constant', 'mean_grad',
'elementwise_add_grad', 'mul_grad', 'increment', 'fill_constant',
'fill_constant', 'elementwise_mod', 'cast', 'not_equal',
'logical_not', 'conditional_block', 'conditional_block',
'conditional_block_grad'
])
# merge block
self.assertEqual(len(main_program.block(1).ops), 2)
self.assertEqual([op.type for op in main_program.block(1).ops], [
'elementwise_add',
'elementwise_add',
])
# reset block
self.assertEqual(len(main_program.block(2).ops), 6)
self.assertEqual([op.type for op in main_program.block(2).ops], [
'elementwise_add', 'scale', 'elementwise_add', 'scale',
'fill_constant', 'fill_constant'
])
# optimize block
self.assertEqual(len(main_program.block(3).ops), 2)
self.assertEqual([op.type for op in main_program.block(3).ops],
['sgd', 'sgd'])
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册