diff --git a/python/paddle/fluid/optimizer.py b/python/paddle/fluid/optimizer.py index a8a7f217bd700f10879b283029d38ebe8538220e..6e7a90e44e5f2669143b95529a0b91d1764f6bf9 100644 --- a/python/paddle/fluid/optimizer.py +++ b/python/paddle/fluid/optimizer.py @@ -4933,3 +4933,223 @@ class LookaheadOptimizer(object): with switch.default(): pass return mini_out + + +class GradientMergeOptimizer(object): + """ + Gradient Merge, also called as Gradient Accumulation, + is a training strategy for larger batches. With this strategy, + the parameter will not be updated until specific steps. + + For each step, the forward network and the backward network + will run to calculate the gradient of the parameters. + + For every k step, the optimization network will run, + applying a specific optimization method (such as SGD, Adam) + to the parameters. + + Args: + inner_optimizer (Optimizer): The specific optimization (such as SGD, Adam) + which update the parameters + k_steps (int): the update period of the parameters + avg (bool): whether to average the gradients of each mini-batch, + the default value is `True` + + Examples: + .. code-block:: python + + import paddle.fluid as fluid + import numpy as np + + def gen_data(batch_size): + return {"x": np.random.random(size=(batch_size, 32)).astype('float32'), + "y": np.random.random(size=(batch_size, 1)).astype('int64')} + + def mlp(input_x, input_y, hid_dim=128, label_dim=2): + fc_1 = fluid.layers.fc(input=input_x, size=hid_dim) + prediction = fluid.layers.fc(input=[fc_1], size=label_dim, act='softmax') + cost = fluid.layers.cross_entropy(input=prediction, label=input_y) + sum_cost = fluid.layers.reduce_mean(cost) + return sum_cost, fc_1, prediction + + input_x = fluid.layers.data(name="x", shape=[32], dtype='float32') + input_y = fluid.layers.data(name="y", shape=[1], dtype='int64') + cost, fc_1, pred = mlp(input_x, input_y) + sgd = fluid.optimizer.Adam(learning_rate=0.01) + sgd = fluid.optimizer.GradientMergeOptimizer(sgd, k_steps=4, avg=True) + sgd.minimize(cost) + + place = fluid.CPUPlace() + exe = fluid.Executor(place) + exe.run(fluid.default_startup_program()) + + for i in range(10): + cost_val = exe.run(feed=gen_data(32), + program=fluid.default_main_program(), + fetch_list=[cost.name]) + print("step=%d, cost=%f" % (i, cost_val[0])) + """ + + def __init__(self, inner_optimizer, k_steps=1, avg=True): + if framework.in_dygraph_mode(): + raise Exception( + "In dygraph, we don't support GradientMergeOptimizer." + "You can do Gradient merge by yourself with k-times forward + backward, " + "and one-time optimizer.minimize()") + + assert (inner_optimizer is not None), "inner optimizer can not be None" + assert (isinstance(k_steps, int) and + k_steps > 0), "k_steps should be a positive integer" + + self.inner_optimizer = inner_optimizer + self.k_steps = k_steps + self.type = "gradient_merge" + self.avg = avg + + def minimize(self, + loss, + startup_program=None, + parameter_list=None, + no_grad_set=None): + + assert isinstance(loss, Variable), "The loss should be an Variable." + assert ( + parameter_list is None + ), "The parameter_list should be None when using GradientMergeOptimizer" + assert ( + no_grad_set is None + ), "The no_grad_set should be None when using GradientMergeOptimizer" + + params_grads = self.inner_optimizer.backward( + loss, startup_program=startup_program) + + #TODO(mapingshuo) support sparse embedding + for k, v in params_grads: + assert ( + v.type != core.VarDesc.VarType.SELECTED_ROWS + ), "SELECTED_ROWS is not supported in GradientMergeOptimizer for now" + + param_to_grad = {k.name: v for (k, v) in params_grads} + + # Get startup_program and main_program + if startup_program is None: + startup_program = default_startup_program() + main_block = loss.block + + # add some vars to the main_program and startup_program + startup_block = startup_program.global_block() + param_names = param_to_grad.keys() + param_to_gradient_merge = {} + + for param_name in param_names: + param_var = main_block.var(param_name) + assert (param_var is not None) + gradient_merge_var = main_block.create_var( + name=param_name + "@GRAD@GradientMerge", + shape=param_var.shape, + dtype=param_var.dtype, + persistable=True) + param_to_gradient_merge[param_name] = gradient_merge_var + startup_gradient_merge_var = startup_block.create_var( + name=param_name + "@GRAD@GradientMerge", + shape=param_var.shape, + dtype=param_var.dtype, + persistable=True) + startup_block.append_op( + type="fill_constant", + outputs={"Out": startup_gradient_merge_var}, + attrs={ + "shape": param_var.shape, + "dtype": param_var.dtype, + "value": float(0), + }) + + with framework.program_guard(main_block.program, startup_program): + # Add Var k to main prog and startup prog + gradient_merge_k = layers.create_global_var( + name="gradient_merge_k", + shape=[1], + value=int(self.k_steps), + dtype='int32', + persistable=True) + + # Add Var step + gradient_merge_step = layers.create_global_var( + name="gradient_merge_step", + shape=[1], + value=int(0), + dtype='int32', + persistable=True) + layers.increment(x=gradient_merge_step, value=1.0, in_place=True) + + # gradient merge + zero_var = layers.fill_constant( + shape=[1], dtype='float32', value=0.0) + one_var = layers.fill_constant( + shape=[1], dtype='float32', value=1.0) + + mod = layers.elementwise_mod(gradient_merge_step, gradient_merge_k) + with layers.control_flow.Switch() as switch: + with switch.case(mod != zero_var): + # 1. update the gradient_merge_vars + # gradient_merge_vars += gradient_vars + cur_block = main_block.program.current_block() + for param_name in param_names: + grad = param_to_grad[param_name] + grad_merge = param_to_gradient_merge[param_name] + cur_block.append_op( + type="elementwise_add", + inputs={'X': grad, + 'Y': grad_merge}, + outputs={'Out': grad_merge}, + attrs={'axis': -1, + 'use_mkldnn': False}) + + with switch.default(): + # 1. update the graient_vars + # gradient_vars += gradient_merge_vars + cur_block_idx = main_block.program.current_block_idx + cur_block = main_block.program.current_block() + for param_name in param_names: + grad = param_to_grad[param_name] + grad_merge = param_to_gradient_merge[param_name] + if self.avg: + tmp_var = layers.elementwise_add(grad, grad_merge) + cur_block.append_op( + type='scale', + inputs={'X': tmp_var}, + outputs={'Out': grad}, + attrs={ + 'scale': 1.0 / self.k_steps, + 'bias': 0.0, + 'bias_after_scale': False + }) + else: + cur_block.append_op( + type="elementwise_add", + inputs={'X': grad, + 'Y': grad_merge}, + outputs={'Out': grad}, + attrs={'axis': -1, + 'use_mkldnn': False}) + + # 2. apply_optimize + target_grad_block = main_block.program._create_block( + parent_idx=cur_block.parent_idx) + target_grad_block._set_forward_block_idx(cur_block_idx) + main_block.program.current_block_idx = cur_block_idx + + optimize_ops = self.inner_optimizer.apply_optimize( + loss, + startup_program=startup_program, + params_grads=params_grads) + + # 3. clear gradient_merge_vars + for param_name in param_names: + grad_merge = param_to_gradient_merge[param_name] + layers.fill_constant( + shape=grad_merge.shape, + dtype=grad_merge.dtype, + value=0.0, + out=grad_merge) + return optimize_ops, params_grads diff --git a/python/paddle/fluid/tests/unittests/test_optimizer.py b/python/paddle/fluid/tests/unittests/test_optimizer.py index 7894fc018876cb7860b122ed7fa0319158fe05c2..3384c80499b63751826115bb55dec98a258badd4 100644 --- a/python/paddle/fluid/tests/unittests/test_optimizer.py +++ b/python/paddle/fluid/tests/unittests/test_optimizer.py @@ -948,5 +948,82 @@ class TestRecomputeOptimizerCUDA(unittest.TestCase): self.assertEqual(drop_vec[0].tolist(), drop_vec[1].tolist()) +class TestGradientMergeOptimizer(unittest.TestCase): + def net(self): + program = framework.Program() + block = program.global_block() + mul_x = block.create_parameter( + dtype="float32", shape=[5, 10], lod_level=0, name="mul.x") + mul_y = block.create_var( + dtype="float32", shape=[10, 8], lod_level=0, name="mul.y") + mul_out = block.create_var( + dtype="float32", shape=[5, 8], lod_level=0, name="mul.out") + b1 = block.create_parameter( + dtype="float32", shape=[5, 8], lod_level=0, name="b1") + b1_out = block.create_var( + dtype="float32", shape=[5, 8], lod_level=0, name="b1_out") + mean_out = block.create_var( + dtype="float32", shape=[1], lod_level=0, name="mean.out") + block.append_op( + type="mul", + inputs={"X": mul_x, + "Y": mul_y}, + outputs={"Out": mul_out}, + attrs={"x_num_col_dims": 1}) + block.append_op( + type="elementwise_add", + inputs={"X": mul_out, + "Y": b1}, + outputs={"Out": b1_out}) + block.append_op( + type="mean", inputs={"X": b1_out}, outputs={"Out": mean_out}) + return mean_out + + def test_program_desc(self, ): + cost = self.net() + main_program = cost.block.program + init_program = framework.Program() + self.assertEqual(main_program.num_blocks, 1) + self.assertEqual(len(cost.block.ops), 3) + self.assertEqual([op.type for op in cost.block.ops], + ["mul", "elementwise_add", "mean"]) + + opt = optimizer.SGD(learning_rate=1.0) + opt = optimizer.GradientMergeOptimizer(opt, k_steps=4) + with framework.program_guard(main_program, init_program): + ops, params_grads = opt.minimize(cost) + + self.assertEqual(main_program.num_blocks, 4) + + # main block + self.assertEqual(len(cost.block.ops), 17) + self.assertEqual([op.type for op in cost.block.ops], [ + 'mul', 'elementwise_add', 'mean', 'fill_constant', 'mean_grad', + 'elementwise_add_grad', 'mul_grad', 'increment', 'fill_constant', + 'fill_constant', 'elementwise_mod', 'cast', 'not_equal', + 'logical_not', 'conditional_block', 'conditional_block', + 'conditional_block_grad' + ]) + + # merge block + self.assertEqual(len(main_program.block(1).ops), 2) + self.assertEqual([op.type for op in main_program.block(1).ops], [ + 'elementwise_add', + 'elementwise_add', + ]) + + # reset block + self.assertEqual(len(main_program.block(2).ops), 6) + self.assertEqual([op.type for op in main_program.block(2).ops], [ + 'elementwise_add', 'scale', 'elementwise_add', 'scale', + 'fill_constant', 'fill_constant' + ]) + + # optimize block + self.assertEqual(len(main_program.block(3).ops), 2) + self.assertEqual([op.type for op in main_program.block(3).ops], + ['sgd', 'sgd']) + + if __name__ == '__main__': unittest.main()