From c194b0c835d9e017576e5df46c023f78e9187741 Mon Sep 17 00:00:00 2001 From: Zeng Jinle <32832641+sneaxiy@users.noreply.github.com> Date: Sat, 10 Aug 2019 22:49:10 +0800 Subject: [PATCH] Try to deprecate unstable python memory optimize (#18983) * deprecate python memory optimize, test=develop * remove memory_optimize in unittests, test=develop * add unittests to deprecated interfaces, test=develop --- .../fluid/tests/unittests/dist_save_load.py | 2 - .../unittests/ir_memory_optimize_net_base.py | 12 +- .../unittests/parallel_executor_test_base.py | 21 +- ...t_deprecated_memory_optimize_interfaces.py | 68 +++ .../fluid/tests/unittests/test_dist_base.py | 14 - .../unittests/test_fuse_all_reduce_pass.py | 2 - .../test_fuse_elewise_add_act_pass.py | 2 - .../unittests/test_fuse_optimizer_pass.py | 3 - .../test_fuse_relu_depthwise_conv_pass.py | 2 - .../unittests/test_inference_model_io.py | 3 - .../tests/unittests/test_ir_inplace_pass.py | 6 +- .../test_ir_memory_optimize_ifelse_op.py | 14 +- .../unittests/test_ir_memory_optimize_pass.py | 12 +- .../test_ir_memory_optimize_transformer.py | 7 +- .../unittests/test_learning_rate_scheduler.py | 2 - .../test_memory_optimization_transpiler.py | 118 ---- .../test_parallel_executor_fetch_feed.py | 4 - .../fluid/tests/unittests/test_py_func_op.py | 4 - .../memory_optimization_transpiler.py | 538 +----------------- 19 files changed, 108 insertions(+), 726 deletions(-) create mode 100644 python/paddle/fluid/tests/unittests/test_deprecated_memory_optimize_interfaces.py delete mode 100644 python/paddle/fluid/tests/unittests/test_memory_optimization_transpiler.py diff --git a/python/paddle/fluid/tests/unittests/dist_save_load.py b/python/paddle/fluid/tests/unittests/dist_save_load.py index 2af65db8249..f3a6b19d819 100644 --- a/python/paddle/fluid/tests/unittests/dist_save_load.py +++ b/python/paddle/fluid/tests/unittests/dist_save_load.py @@ -102,8 +102,6 @@ class TestDistSaveLoad2x2(TestDistSimnetBow2x2): test_program, avg_cost, train_reader, test_reader, batch_acc, predict = \ self.get_model(batch_size=2) - if args.mem_opt: - fluid.memory_optimize(fluid.default_main_program(), skip_grads=True) if args.update_method == "pserver": t = self.get_transpiler(args.trainer_id, fluid.default_main_program(), diff --git a/python/paddle/fluid/tests/unittests/ir_memory_optimize_net_base.py b/python/paddle/fluid/tests/unittests/ir_memory_optimize_net_base.py index 439a8e3ba33..0e4fd8f69dc 100644 --- a/python/paddle/fluid/tests/unittests/ir_memory_optimize_net_base.py +++ b/python/paddle/fluid/tests/unittests/ir_memory_optimize_net_base.py @@ -43,7 +43,6 @@ class BuildIrMemOptBase(unittest.TestCase): def check_network_convergence(self, network, use_cuda=True, - memory_opt=True, use_ir_memory_optimize=True, enable_inplace=True, iter=5): @@ -68,13 +67,8 @@ class BuildIrMemOptBase(unittest.TestCase): optimizer = fluid.optimizer.Adam(learning_rate=0.001) optimizer.minimize(cost) build_strategy = fluid.BuildStrategy() - build_strategy.enable_inplace = False - build_strategy.memory_optimize = False - if memory_opt: - fluid.memory_optimize(fluid.default_main_program()) - else: - build_strategy.enable_inplace = use_ir_memory_optimize - build_strategy.memory_optimize = enable_inplace + build_strategy.enable_inplace = enable_inplace + build_strategy.memory_optimize = use_ir_memory_optimize # execution place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() @@ -134,7 +128,7 @@ class TestIrMemOptBase(BuildIrMemOptBase): self.network) cur_first_loss, cur_last_loss = self.check_network_convergence( - self.network, memory_opt=False) + self.network) self.assertAlmostEquals( np.mean(baseline_last_loss), diff --git a/python/paddle/fluid/tests/unittests/parallel_executor_test_base.py b/python/paddle/fluid/tests/unittests/parallel_executor_test_base.py index ae950f5912b..94e451ff266 100644 --- a/python/paddle/fluid/tests/unittests/parallel_executor_test_base.py +++ b/python/paddle/fluid/tests/unittests/parallel_executor_test_base.py @@ -33,7 +33,6 @@ class TestParallelExecutorBase(unittest.TestCase): def check_network_convergence(cls, method, use_cuda=True, - memory_opt=False, iter=50, batch_size=None, feed_dict=None, @@ -59,8 +58,7 @@ class TestParallelExecutorBase(unittest.TestCase): main.random_seed = 1 with fluid.program_guard(main, startup): feed_dict, loss = cls.build_model(feed_dict, get_data_from_feeder, - main, memory_opt, method, - optimizer) + main, method, optimizer) place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() exe = fluid.Executor(place) @@ -112,7 +110,6 @@ class TestParallelExecutorBase(unittest.TestCase): def check_pass_conflict(cls, method, use_cuda=True, - memory_opt=False, feed_dict=None, get_data_from_feeder=None, use_reduce=False, @@ -130,8 +127,7 @@ class TestParallelExecutorBase(unittest.TestCase): startup = fluid.Program() with fluid.program_guard(main, startup): feed_dict, loss = cls.build_model(feed_dict, get_data_from_feeder, - main, memory_opt, method, - optimizer) + main, method, optimizer) place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() exe = fluid.Executor(place) @@ -175,16 +171,17 @@ class TestParallelExecutorBase(unittest.TestCase): return build_strategy, exec_strategy @classmethod - def build_model(cls, feed_dict, get_data_from_feeder, main, memory_opt, - method, optimizer): + def build_model(cls, feed_dict, get_data_from_feeder, main, method, + optimizer): loss = method(use_feed=feed_dict is not None) # NOTE(zjl): memory_optimize/inplace pass would not require - # that loss.persistable = True - loss.persistable = memory_opt + # that loss.persistable = True. + # We set loss.persistable = False here to verify our memory + # optimization strategies intentionally. + loss.persistable = False if optimizer: optimizer().minimize(loss) - if memory_opt: - fluid.memory_optimize(main) + if get_data_from_feeder is not None: assert feed_dict is None feed_dict = get_data_from_feeder() diff --git a/python/paddle/fluid/tests/unittests/test_deprecated_memory_optimize_interfaces.py b/python/paddle/fluid/tests/unittests/test_deprecated_memory_optimize_interfaces.py new file mode 100644 index 00000000000..c3a21ba0bcb --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_deprecated_memory_optimize_interfaces.py @@ -0,0 +1,68 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import paddle.fluid as fluid +import unittest +from simple_nets import simple_fc_net + + +class DeprecatedMemoryOptimizationInterfaceTest(unittest.TestCase): + def setUp(self): + self.method = fluid.memory_optimize + + def build_network(self, call_interface): + startup_prog = fluid.Program() + main_prog = fluid.Program() + with fluid.program_guard(main_prog, startup_prog): + with fluid.unique_name.guard(): + loss = simple_fc_net() + opt = fluid.optimizer.Adam(learning_rate=1e-3) + opt.minimize(loss) + + if call_interface: + self.method(main_prog) + + return main_prog + + def assert_program_equal(self, prog1, prog2): + block_num = prog1.num_blocks + self.assertEquals(block_num, prog2.num_blocks) + + for block_id in range(block_num): + block1 = prog1.block(block_id) + block2 = prog2.block(block_id) + self.assertEquals(len(block1.ops), len(block2.ops)) + for op1, op2 in zip(block1.ops, block2.ops): + self.assertEquals(op1.input_arg_names, op2.input_arg_names) + self.assertEquals(op1.output_arg_names, op2.output_arg_names) + + self.assertEquals(len(block1.vars), len(block2.vars)) + for var1 in block1.vars.values(): + self.assertTrue(var1.name in block2.vars) + var2 = block2.vars.get(var1.name) + self.assertEquals(var1.name, var2.name) + + def test_main(self): + prog1 = self.build_network(False) + prog2 = self.build_network(True) + self.assert_program_equal(prog1, prog2) + + +class ReleaseMemoryTest(DeprecatedMemoryOptimizationInterfaceTest): + def setUp(self): + self.method = fluid.release_memory + + +if __name__ == '__main__': + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_dist_base.py b/python/paddle/fluid/tests/unittests/test_dist_base.py index b429b4f96bb..b42cea3114c 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_base.py +++ b/python/paddle/fluid/tests/unittests/test_dist_base.py @@ -108,10 +108,6 @@ class TestDistRunnerBase(object): test_program, avg_cost, train_reader, test_reader, batch_acc, predict = \ self.get_model(batch_size=args.batch_size) - if args.mem_opt: - my_print(type(self).__name__, "begin to run memory optimize") - fluid.memory_optimize(fluid.default_main_program(), skip_grads=True) - my_print(type(self).__name__, "trainer run memory optimize done.") if args.update_method == "pserver": my_print( type(self).__name__, @@ -327,7 +323,6 @@ def runtime_main(test_class): parser.add_argument( '--current_endpoint', type=str, required=False, default="") parser.add_argument('--sync_mode', action='store_true') - parser.add_argument('--mem_opt', action='store_true') parser.add_argument('--use_cuda', action='store_true') parser.add_argument('--use_dgc', action='store_true') parser.add_argument('--use_reduce', action='store_true') @@ -387,7 +382,6 @@ class TestDistBase(unittest.TestCase): self._python_interp = sys.executable self._sync_mode = True self._enforce_place = None - self._mem_opt = False self._use_reduce = False self._dc_asgd = False # must use with async mode self._use_reader_alloc = True @@ -435,9 +429,6 @@ class TestDistBase(unittest.TestCase): if self._sync_mode: ps0_cmd += " --sync_mode" ps1_cmd += " --sync_mode" - if self._mem_opt: - ps0_cmd += " --mem_opt" - ps1_cmd += " --mem_opt" print(ps0_cmd) print(ps1_cmd) @@ -530,9 +521,6 @@ class TestDistBase(unittest.TestCase): if self._sync_mode: tr0_cmd += " --sync_mode" tr1_cmd += " --sync_mode" - if self._mem_opt: - tr0_cmd += " --mem_opt" - tr1_cmd += " --mem_opt" if self._use_reduce: tr0_cmd += " --use_reduce" tr1_cmd += " --use_reduce" @@ -603,8 +591,6 @@ class TestDistBase(unittest.TestCase): (self._python_interp, model, self._ps_endpoints, trainer_id, ep, update_method, self._lr) - if self._mem_opt: - tr_cmd += " --mem_opt" if self._use_reduce: tr_cmd += " --use_reduce" if self._use_reader_alloc: diff --git a/python/paddle/fluid/tests/unittests/test_fuse_all_reduce_pass.py b/python/paddle/fluid/tests/unittests/test_fuse_all_reduce_pass.py index cd76b45b242..5ce82b267ac 100644 --- a/python/paddle/fluid/tests/unittests/test_fuse_all_reduce_pass.py +++ b/python/paddle/fluid/tests/unittests/test_fuse_all_reduce_pass.py @@ -49,7 +49,6 @@ class TestFuseAllReduceOpsBase(TestParallelExecutorBase): use_cuda=use_cuda, fuse_all_reduce_ops=False, fuse_all_optimizer_ops=fuse_all_optimizer_ops, - memory_opt=False, optimizer=optimizer) fuse_op_first_loss, fuse_op_last_loss = self.check_network_convergence( model, @@ -58,7 +57,6 @@ class TestFuseAllReduceOpsBase(TestParallelExecutorBase): use_cuda=use_cuda, fuse_all_reduce_ops=True, fuse_all_optimizer_ops=fuse_all_optimizer_ops, - memory_opt=False, optimizer=optimizer) for loss in zip(not_fuse_op_first_loss, fuse_op_first_loss): diff --git a/python/paddle/fluid/tests/unittests/test_fuse_elewise_add_act_pass.py b/python/paddle/fluid/tests/unittests/test_fuse_elewise_add_act_pass.py index 552f94e769e..617fecffe07 100644 --- a/python/paddle/fluid/tests/unittests/test_fuse_elewise_add_act_pass.py +++ b/python/paddle/fluid/tests/unittests/test_fuse_elewise_add_act_pass.py @@ -47,7 +47,6 @@ class TestMNIST(TestParallelExecutorBase): "label": label}, use_cuda=use_cuda, fuse_elewise_add_act_ops=False, - memory_opt=False, use_ir_memory_optimize=False, enable_inplace=False, optimizer=_optimizer) @@ -57,7 +56,6 @@ class TestMNIST(TestParallelExecutorBase): "label": label}, use_cuda=use_cuda, fuse_elewise_add_act_ops=True, - memory_opt=False, use_ir_memory_optimize=False, enable_inplace=False, optimizer=_optimizer) diff --git a/python/paddle/fluid/tests/unittests/test_fuse_optimizer_pass.py b/python/paddle/fluid/tests/unittests/test_fuse_optimizer_pass.py index 38af7b792d8..b47bcd2a032 100644 --- a/python/paddle/fluid/tests/unittests/test_fuse_optimizer_pass.py +++ b/python/paddle/fluid/tests/unittests/test_fuse_optimizer_pass.py @@ -46,7 +46,6 @@ class TestFuseOptimizationOps(TestParallelExecutorBase): get_data_from_feeder=get_data_from_feeder, use_cuda=use_cuda, fuse_all_optimizer_ops=False, - memory_opt=False, # avoid the gradient's name changed in Python side. optimizer=optimizer) fuse_op_first_loss, fuse_op_last_loss = self.check_network_convergence( model, @@ -54,7 +53,6 @@ class TestFuseOptimizationOps(TestParallelExecutorBase): get_data_from_feeder=get_data_from_feeder, use_cuda=use_cuda, fuse_all_optimizer_ops=True, - memory_opt=False, # avoid the gradient's name changed in Python side. optimizer=optimizer) for loss in zip(not_fuse_op_first_loss, fuse_op_first_loss): @@ -152,7 +150,6 @@ class TestPassConflictBase(TestFuseAdamOps): get_data_from_feeder=get_data_from_feeder, use_cuda=use_cuda, fuse_all_optimizer_ops=True, - memory_opt=False, # avoid the gradient's name changed in Python side. optimizer=optimizer, enable_sequential_execution=True) diff --git a/python/paddle/fluid/tests/unittests/test_fuse_relu_depthwise_conv_pass.py b/python/paddle/fluid/tests/unittests/test_fuse_relu_depthwise_conv_pass.py index 0c8531606b8..d4be4f84af4 100644 --- a/python/paddle/fluid/tests/unittests/test_fuse_relu_depthwise_conv_pass.py +++ b/python/paddle/fluid/tests/unittests/test_fuse_relu_depthwise_conv_pass.py @@ -120,7 +120,6 @@ class TestMNIST(TestParallelExecutorBase): use_cuda=use_cuda, fuse_relu_depthwise_conv=True, use_ir_memory_optimize=True, - memory_opt=False, optimizer=_optimizer) not_fuse_op_first_loss, not_fuse_op_last_loss = self.check_network_convergence( model, @@ -128,7 +127,6 @@ class TestMNIST(TestParallelExecutorBase): "label": label}, use_cuda=use_cuda, fuse_relu_depthwise_conv=False, - memory_opt=False, optimizer=_optimizer) for loss in zip(not_fuse_op_first_loss, fuse_op_first_loss): diff --git a/python/paddle/fluid/tests/unittests/test_inference_model_io.py b/python/paddle/fluid/tests/unittests/test_inference_model_io.py index 4ac9648e637..bdda62bc682 100644 --- a/python/paddle/fluid/tests/unittests/test_inference_model_io.py +++ b/python/paddle/fluid/tests/unittests/test_inference_model_io.py @@ -109,9 +109,6 @@ class TestSaveInferenceModel(unittest.TestCase): exe = executor.Executor(place) exe.run(init_program, feed={}, fetch_list=[]) - memory_optimize(program, print_log=True) - self.assertEqual(program._is_mem_optimized, True) - # will print warning message save_inference_model(MODEL_DIR, ["x", "y"], [avg_cost], exe, program) diff --git a/python/paddle/fluid/tests/unittests/test_ir_inplace_pass.py b/python/paddle/fluid/tests/unittests/test_ir_inplace_pass.py index 988b6773366..c1ef0f49afb 100644 --- a/python/paddle/fluid/tests/unittests/test_ir_inplace_pass.py +++ b/python/paddle/fluid/tests/unittests/test_ir_inplace_pass.py @@ -47,10 +47,7 @@ class TestIrInplace(TestParallelExecutorBase): def setUpClass(cls): os.environ['CPU_NUM'] = str(4) - def _fc_with_batchnorm(self, - ir_memory_optimize, - enable_inplace, - memory_opt=False): + def _fc_with_batchnorm(self, ir_memory_optimize, enable_inplace): if not core.is_compiled_with_cuda(): return @@ -62,7 +59,6 @@ class TestIrInplace(TestParallelExecutorBase): feed_dict={"image": img, "label": label}, use_cuda=True, - memory_opt=memory_opt, use_ir_memory_optimize=ir_memory_optimize, enable_inplace=enable_inplace) diff --git a/python/paddle/fluid/tests/unittests/test_ir_memory_optimize_ifelse_op.py b/python/paddle/fluid/tests/unittests/test_ir_memory_optimize_ifelse_op.py index b1fe2b40b92..c5228fcf122 100644 --- a/python/paddle/fluid/tests/unittests/test_ir_memory_optimize_ifelse_op.py +++ b/python/paddle/fluid/tests/unittests/test_ir_memory_optimize_ifelse_op.py @@ -33,7 +33,9 @@ from ir_memory_optimize_net_base import TestIrMemOptBase class TestIrMemoryOptimizeIfElseOp(unittest.TestCase): - def check_network_convergence(self, use_cuda=True, py_opt=False, + def check_network_convergence(self, + use_cuda=True, + use_mem_opt=False, iter_num=5): prog = Program() startup_prog = Program() @@ -75,11 +77,14 @@ class TestIrMemoryOptimizeIfElseOp(unittest.TestCase): exec_strategy = fluid.ExecutionStrategy() exec_strategy.use_cuda = use_cuda - if py_opt: - fluid.memory_optimize(fluid.default_main_program()) + build_strategy = fluid.BuildStrategy() + build_strategy.memory_optimize = use_mem_opt + train_cp = compiler.CompiledProgram(fluid.default_main_program()) train_cp = train_cp.with_data_parallel( - loss_name=avg_loss.name, exec_strategy=exec_strategy) + loss_name=avg_loss.name, + exec_strategy=exec_strategy, + build_strategy=build_strategy) fetch_list = [avg_loss.name] exe.run(startup_prog) @@ -116,7 +121,6 @@ class TestIrMemoryOptimizeIfElseOp(unittest.TestCase): ret2 = self.check_network_convergence(True, False) print(ret2) self.assertTrue(np.allclose(ret1, ret2)) - #self.assertEqual(ret1, ret2) if __name__ == "__main__": diff --git a/python/paddle/fluid/tests/unittests/test_ir_memory_optimize_pass.py b/python/paddle/fluid/tests/unittests/test_ir_memory_optimize_pass.py index 6ca65c5d3b6..e224caee6e5 100644 --- a/python/paddle/fluid/tests/unittests/test_ir_memory_optimize_pass.py +++ b/python/paddle/fluid/tests/unittests/test_ir_memory_optimize_pass.py @@ -86,7 +86,7 @@ class TestMNIST(TestParallelExecutorBase): label = np.ones(shape=[32, 1], dtype='int64') return img, label - def _compare_ir_and_python_memory_optimize(self, model, use_cuda): + def _compare_ir_memory_optimize(self, model, use_cuda): if use_cuda and not core.is_compiled_with_cuda(): return @@ -96,14 +96,12 @@ class TestMNIST(TestParallelExecutorBase): feed_dict={"image": img, "label": label}, use_cuda=use_cuda, - memory_opt=False, use_ir_memory_optimize=False) first_loss1, last_loss1 = self.check_network_convergence( model, feed_dict={"image": img, "label": label}, use_cuda=use_cuda, - memory_opt=False, use_ir_memory_optimize=True) for loss in zip(first_loss0, first_loss1): self.assertAlmostEqual(loss[0], loss[1], delta=1e-6) @@ -111,12 +109,12 @@ class TestMNIST(TestParallelExecutorBase): self.assertAlmostEqual(loss[0], loss[1], delta=1e-6) def test_simple_fc_net(self): - self._compare_ir_and_python_memory_optimize(simple_fc_net, False) - self._compare_ir_and_python_memory_optimize(simple_fc_net, True) + self._compare_ir_memory_optimize(simple_fc_net, False) + self._compare_ir_memory_optimize(simple_fc_net, True) def test_fc_with_reshape_net(self): - self._compare_ir_and_python_memory_optimize(fc_with_inplace_net, False) - self._compare_ir_and_python_memory_optimize(fc_with_inplace_net, True) + self._compare_ir_memory_optimize(fc_with_inplace_net, False) + self._compare_ir_memory_optimize(fc_with_inplace_net, True) if __name__ == '__main__': diff --git a/python/paddle/fluid/tests/unittests/test_ir_memory_optimize_transformer.py b/python/paddle/fluid/tests/unittests/test_ir_memory_optimize_transformer.py index 50d998990f9..642be33b6ed 100644 --- a/python/paddle/fluid/tests/unittests/test_ir_memory_optimize_transformer.py +++ b/python/paddle/fluid/tests/unittests/test_ir_memory_optimize_transformer.py @@ -57,16 +57,11 @@ class TestTransformerWithIR(TestParallelExecutorBase): self.check_network_convergence( transformer, use_cuda=True, - memory_opt=True, use_ir_memory_optimize=False, iter=2) # check IR memory optimize self.check_network_convergence( - transformer, - use_cuda=True, - memory_opt=False, - use_ir_memory_optimize=True, - iter=2) + transformer, use_cuda=True, use_ir_memory_optimize=True, iter=2) if __name__ == '__main__': diff --git a/python/paddle/fluid/tests/unittests/test_learning_rate_scheduler.py b/python/paddle/fluid/tests/unittests/test_learning_rate_scheduler.py index c5001145969..88d9919f596 100644 --- a/python/paddle/fluid/tests/unittests/test_learning_rate_scheduler.py +++ b/python/paddle/fluid/tests/unittests/test_learning_rate_scheduler.py @@ -111,8 +111,6 @@ class TestLearningRateDecay(unittest.TestCase): exe.run(startup_prog) - fluid.memory_optimize(main_prog) - for step in range(10): lr_val, = exe.run(main_prog, feed={}, fetch_list=[decayed_lr]) python_decayed_lr = python_decay_fn( diff --git a/python/paddle/fluid/tests/unittests/test_memory_optimization_transpiler.py b/python/paddle/fluid/tests/unittests/test_memory_optimization_transpiler.py deleted file mode 100644 index fa16f082880..00000000000 --- a/python/paddle/fluid/tests/unittests/test_memory_optimization_transpiler.py +++ /dev/null @@ -1,118 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from __future__ import print_function -import unittest - -import paddle.fluid as fluid -import paddle.fluid.layers as layers -import paddle.fluid.optimizer as optimizer -from paddle.fluid.framework import Program, program_guard -from paddle.fluid.transpiler import memory_optimize - - -def _get_vars(prog): - assert (isinstance(prog, Program)) - all_vars = set() - for op in prog.global_block().ops: - all_vars.update(op.input_arg_names) - all_vars.update(op.output_arg_names) - return all_vars - - -class TestControlFlowGraph(unittest.TestCase): - def setUp(self): - program = Program() - with program_guard(program, startup_program=Program()): - x = layers.data(name='x', shape=[13], dtype='float32') - y_predict = layers.fc(input=x, size=1, act=None) - y = layers.data(name='y', shape=[1], dtype='float32') - cost = layers.square_error_cost(input=y_predict, label=y) - avg_cost = layers.mean(cost) - opt = optimizer.SGD(learning_rate=0.001) - opt = opt.minimize(avg_cost) - - self.program = program - - def test_control_flow_graph(self): - result_program = self.program.clone() - memory_optimize(self.program) - old_vars = _get_vars(self.program) - new_vars = _get_vars(result_program) - self.assertTrue(old_vars != new_vars) - - -class TestMemoryTranspiler2(unittest.TestCase): - def setUp(self): - program = Program() - with program_guard(program, startup_program=Program()): - x = layers.data(name='x', shape=[13], dtype='float32') - fc = layers.fc(input=x, size=10, act=None) - reshape = layers.reshape(x=fc, shape=[-1, 2, 5]) - fc = layers.reshape(x=reshape, shape=[-1, 5, 2]) - y_predict = layers.fc(input=fc, size=1, act=None) - y = layers.data(name='y', shape=[1], dtype='float32') - cost = layers.square_error_cost(input=y_predict, label=y) - avg_cost = layers.mean(cost) - opt = optimizer.SGD(learning_rate=0.001) - opt.minimize(avg_cost) - self.skip_set = set([cost.name, fc.name]) - self.program = program - - def test_inplace_ops(self): - result_program = self.program.clone() - memory_optimize(self.program) - old_vars = _get_vars(self.program) - new_vars = _get_vars(result_program) - self.assertTrue(old_vars != new_vars) - - def test_skip_opt(self): - result_program = self.program.clone() - memory_optimize(self.program, skip_opt_set=self.skip_set) - old_vars = _get_vars(self.program) - new_vars = _get_vars(result_program) - self.assertTrue(old_vars != new_vars) - - -class TestMemoryTranspiler3(unittest.TestCase): - def setUp(self): - program = Program() - with program_guard(program, startup_program=Program()): - word = fluid.layers.data(name='word', shape=[1], dtype='int64') - emb = [ - fluid.layers.embedding( - word, size=[65536, 256], param_attr='emb') for _ in range(6) - ] - - left = emb.pop(0) - while len(emb) != 0: - right = emb.pop(0) - left = fluid.layers.concat([left, right]) - emb = fluid.layers.mean(left) - fluid.backward.append_backward(emb) - self.program = program - - def test_cascade_reuse(self): - block = self.program.block(0) - # variable reuse in programdesc - # TODO(dzhwinter): confirm cascade strategy. disable temporialy - self.assertTrue("concat_4.tmp_0@GRAD" in block.vars) - # self.assertTrue("concat_3.tmp_0@GRAD" not in block.vars) - # self.assertTrue("concat_2.tmp_0@GRAD" not in block.vars) - # self.assertTrue("concat_1.tmp_0@GRAD" not in block.vars) - # self.assertTrue("concat_0.tmp_0@GRAD" not in block.vars) - - -if __name__ == "__main__": - unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor_fetch_feed.py b/python/paddle/fluid/tests/unittests/test_parallel_executor_fetch_feed.py index 0457e9cefdb..052edac0ea7 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor_fetch_feed.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_executor_fetch_feed.py @@ -93,10 +93,6 @@ class TestFetchAndFeed(unittest.TestCase): 10).astype(np.int64) yield img, l - # TODO(zcd): I found that onece the memory optimizer is open, - # parallel_exe doesn't fetch some variable, such as conv2d_0.b_0@GRAD, - # conv2d_1.b_0@GRAD. Those variables should not be pruned. - # fluid.memory_optimize(main) fetch_list = [] all_vars = compiled_program._program.global_block().vars diff --git a/python/paddle/fluid/tests/unittests/test_py_func_op.py b/python/paddle/fluid/tests/unittests/test_py_func_op.py index 05bef1a4762..18207373aca 100644 --- a/python/paddle/fluid/tests/unittests/test_py_func_op.py +++ b/python/paddle/fluid/tests/unittests/test_py_func_op.py @@ -142,10 +142,6 @@ def test_main(use_cuda, use_py_func_op, use_parallel_executor): exe = fluid.Executor(place) exe.run(fluid.default_startup_program()) - #FIXME force use old memory optimzie strategy here to pass the unittest - #since open the new strategy will crash the unittest - fluid.memory_optimize(fluid.default_main_program()) - train_cp = compiler.CompiledProgram(fluid.default_main_program()) if use_parallel_executor: train_cp = train_cp.with_data_parallel(loss_name=loss.name) diff --git a/python/paddle/fluid/transpiler/memory_optimization_transpiler.py b/python/paddle/fluid/transpiler/memory_optimization_transpiler.py index 00a94fa829f..29812812af6 100755 --- a/python/paddle/fluid/transpiler/memory_optimization_transpiler.py +++ b/python/paddle/fluid/transpiler/memory_optimization_transpiler.py @@ -12,486 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from __future__ import print_function - import logging -import six -import sys -from collections import defaultdict, MutableSet -from .. import core -from ... import compat as cpt -from ..framework import Program, default_main_program, Parameter, Variable, core -from ..backward import _rename_arg_ -from functools import reduce -from six.moves import range - -dtype_to_size = { - core.VarDesc.VarType.FP16: 2, - core.VarDesc.VarType.FP32: 4, - core.VarDesc.VarType.FP64: 8, - core.VarDesc.VarType.INT16: 2, - core.VarDesc.VarType.INT32: 4, - core.VarDesc.VarType.INT64: 8, - core.VarDesc.VarType.BOOL: 1, - core.VarDesc.VarType.UINT8: 1, -} - -SUB_BLOCK_OPS = [ - "while", "while_grad", "conditional_block", "conditional_block_grad" -] - -SUB_BLOCK_PAIR = [("while", "while_grad"), - ("conditional_block", "conditional_block_grad")] - -PRINT_LOG = False -FLAGS_memory_optimize = "" - - -class OrderedSet(MutableSet): - def __init__(self, iterable=None): - self.end = end = [] - end += [None, end, end] # sentinel node for doubly linked list - self.map = {} # key --> [key, prev, next] - if iterable is not None: - self |= iterable - - def __len__(self): - return len(self.map) - - def __contains__(self, key): - return key in self.map - - def add(self, key): - if key not in self.map: - end = self.end - curr = end[1] - curr[2] = end[1] = self.map[key] = [key, curr, end] - - def update(self, other): - for e in other: - self.add(e) - - def discard(self, key): - if key in self.map: - key, prev, next = self.map.pop(key) - prev[2] = next - next[1] = prev - - def remove(self, key): - self.discard(key) - - def __iter__(self): - end = self.end - curr = end[2] - while curr is not end: - yield curr[0] - curr = curr[2] - - def __reversed__(self): - end = self.end - curr = end[1] - while curr is not end: - yield curr[0] - curr = curr[1] - - def pop(self, last=True): - if not self: - raise KeyError('set is empty') - key = self.end[1][0] if last else self.end[2][0] - self.discard(key) - return key - - def __repr__(self): - if not self: - return '%s()' % (self.__class__.__name__, ) - return '%s(%r)' % (self.__class__.__name__, list(self)) - - def __eq__(self, other): - if isinstance(other, OrderedSet): - return len(self) == len(other) and list(self) == list(other) - return set(self) == set(other) - - -class ControlFlowGraph(object): - def __init__(self, program, ops, forward_num, skip_opt): - self._program = program - self._ops = ops - self._forward_num = forward_num - self._successors = defaultdict(OrderedSet) - self._presuccessors = defaultdict(OrderedSet) - self._uses = defaultdict(OrderedSet) - self._defs = defaultdict(OrderedSet) - self._live_in = defaultdict(OrderedSet) - self._live_out = defaultdict(OrderedSet) - - self._skip_opt = skip_opt - self.pool = [] - - def _add_connections(self, connections): - """Populates _successors and _presuccessors for two neighbor nodes.""" - for node1, node2 in connections: - self._add(node1, node2) - - def _add(self, node1, node2): - self._successors[node1].add(node2) - self._presuccessors[node2].add(node1) - - # TODO(panyx0718): We need to have a unified way of building intermediate - # representation. - def _build_graph(self): - """Build a graph based on op sequence. - """ - self.op_size = len(self._ops) - op_node_connections = [(i, i + 1) for i in range(self.op_size - 1)] - self._add_connections(op_node_connections) - for i in range(self.op_size): - self._uses[i].update(self._ops[i].input_arg_names()) - self._defs[i].update(self._ops[i].output_arg_names()) - - def _update_graph(self, old_name, new_name, begin_idx=0): - for i in range(begin_idx, self.op_size): - if old_name in self._uses[i]: - self._uses[i].remove(old_name) - self._uses[i].add(new_name) - if old_name in self._defs[i]: - self._defs[i].remove(old_name) - self._defs[i].add(new_name) - if old_name in self._live_in[i]: - self._live_in[i].remove(old_name) - self._live_in[i].add(new_name) - if old_name in self._live_out[i]: - self._live_out[i].remove(old_name) - self._live_out[i].add(new_name) - - def _dataflow_analyze(self): - self._build_graph() - live_in = defaultdict(set) - worklist = list(range(len(self._ops) - 1, -1, -1)) - while worklist: - i = worklist.pop(0) - live_in[i] = set(self._live_in[i]) - for s in self._successors[i]: - self._live_out[i] |= self._live_in[s] - self._live_in[i] = self._uses[i] | ( - self._live_out[i] - self._defs[i]) - if live_in[i] != set(self._live_in[i]): - for d in self._presuccessors[i]: - worklist.append(d) - - def _fill_pool(self, i, is_forward): - def comparator(x, cache): - x_shape = x[1] - cache_shape = cache[1] - x_size = abs(reduce(lambda x, y: x * y, x_shape)) - cache_size = abs(reduce(lambda x, y: x * y, cache_shape)) - if (x_shape[0] == -1 and cache_shape[0] == -1) or \ - (x_shape[0] != -1 and cache_shape[0] != -1) : - return x_size <= cache_size - else: - return False - - def find_var_in_block(x): - known_vars = set() - for op in self._ops: - known_vars.update(op.output_arg_names()) - return x in known_vars - - block_desc = self._ops[i].block() - in_diff, _ = self._get_diff(self._live_in[i], self._live_out[i]) - # NOTE: must sort the in_diff set for cases that get different cache var. - # FIXME(typhoonzero): maybe use a "sorted set" is better than this. - can_optimize = [ - x for x in sorted(in_diff) - if self._check_var_validity(block_desc, x, is_forward) - ] - if can_optimize: - for var_name in can_optimize: - cache = (var_name, self._find_var(block_desc, var_name, - is_forward).shape()) - if cache not in self.pool and find_var_in_block(var_name): - i = 0 - while i < len(self.pool): - mycache = self.pool[i] - mysize = mycache[1][0] - cache_size = cache[1][0] - if (mysize == -1 and cache_size == -1) or \ - (mysize != -1 and cache_size != -1): - if comparator(mycache, cache): - i += 1 - else: - break - elif mysize == -1 and cache_size != -1: - i += 1 - elif mysize != -1 and cache_size == -1: - break - self.pool.insert(i, cache) - - def _get_diff(self, a, b): - u = a & b - return a - u, b - u - - def _has_var(self, block_desc, var_name, is_forward): - if is_forward: - return block_desc.has_var(cpt.to_bytes(var_name)) - else: - return block_desc.has_var_recursive(cpt.to_bytes(var_name)) - - def _find_var(self, block_desc, var_name, is_forward): - if is_forward: - return block_desc.find_var(cpt.to_bytes(var_name)) - else: - return block_desc.find_var_recursive(cpt.to_bytes(var_name)) - - def _check_var_validity(self, block_desc, x, is_forward): - if str(x) == "@EMPTY@": - return False - if not self._has_var(block_desc, x, is_forward): - return False - if self._find_var(block_desc, x, is_forward).persistable(): - return False - if self._find_var(block_desc, x, - is_forward).type() != core.VarDesc.VarType.LOD_TENSOR: - return False - if x in self._skip_opt: - return False - if not self._find_var(block_desc, x, is_forward).shape(): - return False - return True - - # TODO(panyx0718): This needs to be less hacky. It seems memory optimization - # doesn't consider vars copied between cpu and gpu. - def _update_skip_opt_set(self): - for i in range(self.op_size): - op = self._ops[i] - if op.has_attr("force_cpu") and op.attr("force_cpu") == True: - self._skip_opt.update(op.output_arg_names()) - - def release_memory(self, skip_opt_set=None): - self._dataflow_analyze() - self._update_skip_opt_set() - if skip_opt_set: - self._skip_opt.update(skip_opt_set) - fwd_id = 0 - bwd_id = 0 - for i in range(self.op_size): - op = self._ops[i] - if op.type() in SUB_BLOCK_OPS: - continue - block_desc = op.block() - is_forward = i < self._forward_num - in_diff, out_diff = self._get_diff(self._live_in[i], - self._live_out[i]) - can_optimize = [ - x for x in in_diff - if self._check_var_validity(block_desc, x, is_forward) - ] - if can_optimize: - index = i + fwd_id + 1 if is_forward else i - self._forward_num + bwd_id + 1 - delete_op = block_desc._insert_op(index) - delete_op.set_type("delete_var") - delete_op.set_input("X", can_optimize) - if is_forward: - fwd_id += 1 - else: - bwd_id += 1 - - def memory_optimize(self, skip_opt_set=None, level=0): - def compare_shape(x_shape, cache_shape, opt_level): - if opt_level == 0: - return x_shape == cache_shape - elif opt_level == 1: - if (x_shape[0] == -1) ^ (cache_shape[0] == -1): - return False - x_size = abs(reduce(lambda x, y: x * y, x_shape)) - cache_size = abs(reduce(lambda x, y: x * y, cache_shape)) - if x_size <= cache_size: - return True - else: - raise ValueError("only support opt_level 0 or 1.") - return False - - self._dataflow_analyze() - self._update_skip_opt_set() - # update skip set to meet users' demand - if skip_opt_set: - self._skip_opt.update(skip_opt_set) - counter = 0 - for i in range(self.op_size): - op = self._ops[i] - if op.type() in SUB_BLOCK_OPS: - continue - block_desc = op.block() - is_forward = i < self._forward_num - if self.pool: - # NOTE: must sort the in_diff set for cases that get different cache var. - defs_can_optimize = [ - x for x in self._defs[i] - if self._check_var_validity(block_desc, x, is_forward) - ] - out_pair = [ - (x, self._find_var(block_desc, x, is_forward).shape()) - for x in defs_can_optimize - ] - for x, x_shape in out_pair: - # If x is both in uses and defs, it can not be optimized! - if x in self._uses[i]: - continue - if x == FLAGS_memory_optimize: - print("start match var ", x, " of op ", op.type()) - print(self.pool) - for index, cache_pair in enumerate(self.pool): - cache_var = cache_pair[0] - cache_shape = cache_pair[1] - if not self._has_var(block_desc, cache_var, is_forward): - if PRINT_LOG: - print("cache %s not exists!" % - (cpt.to_text(cache_var))) - continue - if x == cache_var: - if PRINT_LOG: - print("x : ", cpt.to_text(x), " cache : ", - cpt.to_text(cache_var), " is same var!") - break - - x_dtype = self._find_var(block_desc, x, - is_forward).dtype() - cache_dtype = self._find_var(block_desc, cache_var, - is_forward).dtype() - if x_dtype != cache_dtype: - if PRINT_LOG: - print("x_dtype and cache_dtype are different") - continue - - if not compare_shape(x_shape, cache_shape, level): - continue - # TODO(qijun): dtype_to_size[x_dtype] and dtype_to_size[cache_dtype] - if PRINT_LOG: - print( - ("!!! %d, %s => %s, cache idx %d, pool size %d" - % (counter, x + str(x_shape), - cache_var + str(cache_shape), index, - len(self.pool)))) - counter += 1 - self.pool.pop(index) - # Rename the var to the cache var already with - # memory allocated in order to reuse the memory. - _rename_arg_(self._ops, x, cache_var, begin_idx=i) - self._program.block(block_desc.id).var(cpt.to_text( - x)).desc = self._find_var(block_desc, cache_var, - is_forward) - self._program.block(block_desc.id).vars[cpt.to_text(x)] = \ - Variable(self._program.block(block_desc.id), name=cpt.to_text(x)) - self._update_graph(x, cache_var, begin_idx=i) - break - self._fill_pool(i, is_forward) - - -def _process_sub_block_pair(pdesc, sub_block_pair): - """Creates a list of tuple each of which tracks info of a subblock. - - Note: this function doesn't handle nested subblocks yet. - TODO(panyx0718): assert if case nested subblocks happen. - - :param pdesc: ProgramDesc. - :param sub_block_pair: A list op pairs. Each op pair is the forward - op and backward op. The ops in the list are special that they contain - a subblock of ops. - :return: A list of tuples, each tuple is (all ops in a subblock pair - including forward and backward, number of forward ops, - all output args names of the ops in the subblock pairs). - """ - ops_list = [] - block_desc = pdesc.block(0) - op_size = block_desc.op_size() - for fwd_op, bwd_op in sub_block_pair: - sub_block_ids = [] - grad_sub_block_ids = [] - sub_block_id_pair = [] - sub_op_dict = {} - for i in range(op_size): - op = block_desc.op(i) - if op.type() == fwd_op: - sub_block_ids.append(op.attr("sub_block").id) - sub_op_dict[op.attr("sub_block").id] = op - elif op.type() == bwd_op: - grad_sub_block_ids.append(op.attr("sub_block").id) - sub_op_dict[op.attr("sub_block").id] = op - - # Find fwd_op/bwd_op block pair - for grad_id in grad_sub_block_ids: - fwd_id = pdesc.block(grad_id).get_forward_block_idx() - if fwd_id in sub_block_ids: - sub_block_id_pair.append((fwd_id, grad_id)) - sub_block_ids.remove(fwd_id) - - # Get fwd_op/bwd_op block ops - for fwd_id, grad_id in sub_block_id_pair: - sub_block_ops = [] - sub_block = pdesc.block(fwd_id) - block_op_size = sub_block.op_size() - for i in range(block_op_size): - sub_block_ops.append(sub_block.op(i)) - - grad_sub_block = pdesc.block(grad_id) - grad_sub_block_op_size = grad_sub_block.op_size() - for i in range(grad_sub_block_op_size): - sub_block_ops.append(grad_sub_block.op(i)) - - sub_op_output = set() - sub_op_output.update(sub_op_dict[fwd_id].output_arg_names()) - sub_op_output.update(sub_op_dict[grad_id].output_arg_names()) - sub_op_output.update(sub_op_dict[fwd_id].input_arg_names()) - sub_op_output.update(sub_op_dict[grad_id].input_arg_names()) - ops_list.append((sub_block_ops, block_op_size, sub_op_output)) - - # Process rest fwd_op block ops - for fwd_id in sub_block_ids: - sub_block_ops = [] - sub_block = pdesc.block(fwd_id) - sub_block_op_size = sub_block.op_size() - for i in range(sub_block_op_size): - sub_block_ops.append(sub_block.op(i)) - sub_op_output = set() - sub_op_output.update(sub_op_dict[fwd_id].output_arg_names()) - sub_op_output.update(sub_op_dict[fwd_id].input_arg_names()) - ops_list.append((sub_block_ops, sub_block_op_size, sub_op_output)) - return ops_list - - -def _get_cfgs(input_program): - """Process each block and create ControlFlowGraph for each of them. - - :param input_program: Program object. - :return: A list of ControlFlowGraph, each corresponds to a block. - """ - ops_list = [] - pdesc = input_program._get_desc() - block_desc = pdesc.block(0) - op_size = block_desc.op_size() - - # Only process one level of nested subblock. - ops_list.extend(_process_sub_block_pair(pdesc, SUB_BLOCK_PAIR)) - - skip_opt_set = set() - for _, _, skip_opt in ops_list: - skip_opt_set.update(skip_opt) - - # Get global block ops - ops_list.insert( - 0, ([block_desc.op(i) for i in range(op_size)], op_size, skip_opt_set)) - cfgs = [ - ControlFlowGraph(input_program, ops, forward_num, skip_opt) - for ops, forward_num, skip_opt in ops_list - ] - return cfgs - - -def _is_opt_role_op(op): - op_maker = core.op_proto_and_checker_maker - optimize_role = core.op_proto_and_checker_maker.OpRole.Optimize - if op_maker.kOpRoleAttrName() in op.attr_names and \ - int(op.all_attrs()[op_maker.kOpRoleAttrName()]) == int(optimize_role): - return True def memory_optimize(input_program, @@ -554,49 +75,16 @@ def memory_optimize(input_program, logging.warn( 'Caution! paddle.fluid.memory_optimize() is deprecated ' 'and not maintained any more, since it is not stable!\n' - 'Please use the newest and stable memory optimization strategies!\n' - ' 1. Enable garbage collection strategy by exporting environment ' - 'variable FLAGS_eager_delete_tensor_gb=0\n' - ' 2. Set build_strategy.enable_inplace=True (True is the default ' - 'value) when using CompiledProgram or ParallelExecutor.\n') - - def to_name_str(var): - if isinstance(var, Variable): - return var.desc.name() - elif isinstance(var, str): - return var - elif isinstance(var, six.string_types): - return str(var) - else: - raise TypeError(str(var) + " should be Variable or str") - - if level != 0 and level != 1: - raise ValueError("only support opt_level 0 or 1.") - if skip_opt_set is not None: - if isinstance(skip_opt_set, set) or isinstance(skip_opt_set, list): - skip_opt_set = set(skip_opt_set) - else: - raise ValueError("only support skip_opt_set as set.") - global PRINT_LOG - PRINT_LOG = print_log - if skip_grads: - grad_set = set() - OP_ROLE_VAR = core.op_proto_and_checker_maker.kOpRoleVarAttrName() - for op in input_program.global_block().ops: - if _is_opt_role_op(op): - if op.attr(OP_ROLE_VAR): - grad_name = op.attr(OP_ROLE_VAR)[1] - grad_set.add(grad_name) - if not skip_opt_set: - skip_opt_set = grad_set - else: - skip_opt_set.update(grad_set) - if skip_opt_set is not None: - skip_opt_set = set(map(to_name_str, skip_opt_set)) - cfgs = _get_cfgs(input_program) - input_program._is_mem_optimized = True - for cfg in cfgs: - cfg.memory_optimize(skip_opt_set=skip_opt_set, level=level) + 'This API would not take any memory optimizations on your Program ' + 'now, since we have provided default strategies for you.\n' + 'The newest and stable memory optimization strategies (they are all ' + 'enabled by default) are as follows:\n' + ' 1. Garbage collection strategy, which is enabled by exporting ' + 'environment variable FLAGS_eager_delete_tensor_gb=0 (0 is the ' + 'default value).\n' + ' 2. Inplace strategy, which is enabled by setting ' + 'build_strategy.enable_inplace=True (True is the default value) ' + 'when using CompiledProgram or ParallelExecutor.\n') def release_memory(input_program, skip_opt_set=None): @@ -625,7 +113,5 @@ def release_memory(input_program, skip_opt_set=None): fluid.release_memory(fluid.default_main_program()) """ - cfgs = _get_cfgs(input_program) - input_program._is_mem_optimized = True - for cfg in cfgs: - cfg.release_memory(skip_opt_set=skip_opt_set) + logging.warn('paddle.fluid.release_memory() is deprecated, it would not' + ' take any memory release on your program') -- GitLab