diff --git a/paddle/fluid/framework/block_desc.cc b/paddle/fluid/framework/block_desc.cc index d72b64700f7cf680501fd3e355d20e694f1f097d..3693bc25d81a8309df1a6ddf3d9b08d484596ea9 100644 --- a/paddle/fluid/framework/block_desc.cc +++ b/paddle/fluid/framework/block_desc.cc @@ -135,6 +135,14 @@ OpDesc *BlockDesc::PrependOp() { return ops_.front().get(); } +OpDesc *BlockDesc::InsertOp(size_t index) { + need_update_ = true; + auto it = ops_.begin() + index; + std::unique_ptr new_op(new OpDesc(this)); + it = ops_.insert(it, std::move(new_op)); + return (*it).get(); +} + void BlockDesc::RemoveOp(size_t s, size_t e) { if (ops_.begin() + s == ops_.end() || ops_.begin() + e == ops_.end()) { return; diff --git a/paddle/fluid/framework/block_desc.h b/paddle/fluid/framework/block_desc.h index 3bd90f38907c0a45ae0c9bb00706e5c127f08417..185f018ac1b5863e0ee86fdaa17df1ccbc6e030e 100644 --- a/paddle/fluid/framework/block_desc.h +++ b/paddle/fluid/framework/block_desc.h @@ -87,6 +87,8 @@ class BlockDesc { OpDesc *PrependOp(); + OpDesc *InsertOp(size_t index); + void RemoveOp(size_t s, size_t e); std::vector AllOps() const; diff --git a/paddle/fluid/framework/scope.cc b/paddle/fluid/framework/scope.cc index ea6c8cebd3ff16451c974bf3a0ded9d822a9caf8..17e38b1cf042657834b4d0d1c12cbbb92f19fa45 100644 --- a/paddle/fluid/framework/scope.cc +++ b/paddle/fluid/framework/scope.cc @@ -16,6 +16,7 @@ limitations under the License. */ #include // for unique_ptr #include // for call_once +#include #include "glog/logging.h" #include "paddle/fluid/framework/threadpool.h" #include "paddle/fluid/string/printf.h" @@ -102,6 +103,18 @@ void Scope::DeleteScope(Scope* scope) { } } +void Scope::EraseVars(std::vector& var_names) { + std::set var_set(var_names.begin(), var_names.end()); + for (auto it = vars_.begin(); it != vars_.end();) { + if (var_set.find(it->first) != var_set.end()) { + delete it->second; + it = vars_.erase(it); + } else { + ++it; + } + } +} + void Scope::Rename(const std::string& origin_name, const std::string& new_name) const { auto origin_it = vars_.find(origin_name); diff --git a/paddle/fluid/framework/scope.h b/paddle/fluid/framework/scope.h index d8fad162e59c8ea6465b2dc78c90709a758ace24..c1e1f49caaa5a60df0e97289aada465b45213971 100644 --- a/paddle/fluid/framework/scope.h +++ b/paddle/fluid/framework/scope.h @@ -51,6 +51,8 @@ class Scope { /// Create a variable with a scope-unique name. Variable* Var(std::string* name = nullptr); + void EraseVars(std::vector& var_names); + /// Find a variable in the scope or any of its ancestors. Returns /// nullptr if cannot find. Variable* FindVar(const std::string& name) const; diff --git a/paddle/fluid/operators/delete_var_op.cc b/paddle/fluid/operators/delete_var_op.cc new file mode 100644 index 0000000000000000000000000000000000000000..1fe9404c00335edbe3594486f8c403e69f2ab08f --- /dev/null +++ b/paddle/fluid/operators/delete_var_op.cc @@ -0,0 +1,52 @@ +/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include "paddle/fluid/framework/op_registry.h" +#include "paddle/fluid/framework/operator.h" + +namespace paddle { +namespace operators { +class DeleteVarOp : public framework::OperatorBase { + public: + DeleteVarOp(const std::string &type, const framework::VariableNameMap &inputs, + const framework::VariableNameMap &outputs, + const framework::AttributeMap &attrs) + : OperatorBase(type, inputs, outputs, attrs) {} + void RunImpl(const framework::Scope &scope, + const platform::Place &place) const override { + // get device context from pool + platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance(); + auto &dev_ctx = *pool.Get(place); + dev_ctx.Wait(); + + auto delete_var_names = Inputs("X"); + const_cast(scope).EraseVars(delete_var_names); + } +}; + +class DeleteVarOpInfoMaker : public framework::OpProtoAndCheckerMaker { + public: + DeleteVarOpInfoMaker(OpProto *proto, OpAttrChecker *op_checker) + : OpProtoAndCheckerMaker(proto, op_checker) { + AddInput("X", "The input of delete op").AsDuplicable(); + AddComment(R"DOC( +Delete Operator. +It should not be configured by users directly. +)DOC"); + } +}; + +} // namespace operators +} // namespace paddle + +REGISTER_OPERATOR(delete_var, paddle::operators::DeleteVarOp, + paddle::framework::EmptyGradOpMaker, + paddle::operators::DeleteVarOpInfoMaker); diff --git a/paddle/fluid/pybind/protobuf.cc b/paddle/fluid/pybind/protobuf.cc index b0a2497d919b65afbe5eeaf4fe47c19baa1aba1c..45a64f43846e79c27295e52c59dca6bdfaa120a3 100644 --- a/paddle/fluid/pybind/protobuf.cc +++ b/paddle/fluid/pybind/protobuf.cc @@ -161,6 +161,8 @@ void BindBlockDesc(py::module &m) { py::return_value_policy::reference) .def("prepend_op", &BlockDesc::PrependOp, py::return_value_policy::reference) + .def("insert_op", &BlockDesc::InsertOp, + py::return_value_policy::reference) .def("remove_op", &BlockDesc::RemoveOp) .def("var", [](BlockDesc &self, py::bytes byte_name) { diff --git a/python/paddle/fluid/__init__.py b/python/paddle/fluid/__init__.py index 0df3fd0343dbdaee88192a402d990fdfc2235811..84a57aff516ad2d7ba1efaf1d530e77747d3b254 100644 --- a/python/paddle/fluid/__init__.py +++ b/python/paddle/fluid/__init__.py @@ -37,7 +37,7 @@ from distribute_transpiler_simple import SimpleDistributeTranspiler from concurrency import (Go, make_channel, channel_send, channel_recv, channel_close) import clip -from memory_optimization_transpiler import memory_optimize +from memory_optimization_transpiler import memory_optimize, release_memory import profiler import unique_name @@ -63,6 +63,7 @@ __all__ = framework.__all__ + executor.__all__ + concurrency.__all__ + [ 'SimpleDistributeTranspiler', 'DistributeTranspiler', 'memory_optimize', + 'release_memory', 'profiler', 'unique_name', ] diff --git a/python/paddle/fluid/backward.py b/python/paddle/fluid/backward.py index 09d137c9017bb4185308af94287fe0e8aa005505..b6f20daee3a585777a23255355f0a0e31328d23f 100644 --- a/python/paddle/fluid/backward.py +++ b/python/paddle/fluid/backward.py @@ -457,7 +457,8 @@ def append_backward(loss, parameter_list=None, no_grad_set=None, "Out": [_append_grad_suffix_(loss.name)] }, {"shape": [1], "value": 1.0, - "dtype": loss.dtype}) + "dtype": loss.dtype, + "force_cpu": False}) root_block.desc.append_op().copy_from(op_desc) block_no_grad_set = set(map(_strip_grad_suffix_, no_grad_dict[0])) diff --git a/python/paddle/fluid/memory_optimization_transpiler.py b/python/paddle/fluid/memory_optimization_transpiler.py index 4fa2d03ef563b98b2eec576bf87d4b2e54ca0a36..e6e98cbfecd78ef61a5e0b8e7972a0240904be50 100644 --- a/python/paddle/fluid/memory_optimization_transpiler.py +++ b/python/paddle/fluid/memory_optimization_transpiler.py @@ -29,7 +29,10 @@ dtype_to_size = { core.VarDesc.VarType.BOOL: 1 } -sub_block_ops = ["while", "while_grad", "parallel_do", "parallel_do_grad"] +sub_block_ops = [ + "while", "while_grad", "parallel_do", "parallel_do_grad", + "conditional_block", "conditional_block_grad" +] PRINT_LOG = False @@ -122,36 +125,82 @@ class ControlFlowGraph(object): else: return block_desc.find_var_recursive(str(var_name)) - def memory_optimize(self): - def check_var_validity(block_desc, x, is_forward): - if str(x) == "@EMPTY@": - return False - if not self._has_var(block_desc, x, is_forward): - return False - if self._find_var(block_desc, x, is_forward).persistable(): - return False - if self._find_var( - block_desc, x, - is_forward).type() != core.VarDesc.VarType.LOD_TENSOR: - return False - if x in self._skip_opt: - return False - if not self._find_var(block_desc, x, is_forward).shape(): - return False - return True + def _check_var_validity(self, block_desc, x, is_forward): + if str(x) == "@EMPTY@": + return False + if not self._has_var(block_desc, x, is_forward): + return False + if self._find_var(block_desc, x, is_forward).persistable(): + return False + if self._find_var(block_desc, x, + is_forward).type() != core.VarDesc.VarType.LOD_TENSOR: + return False + if x in self._skip_opt: + return False + if not self._find_var(block_desc, x, is_forward).shape(): + return False + return True + def _update_skip_opt_set(self): + for i in range(self.op_size): + op = self._ops[i] + if op.type() == "fill_constant" and op.attr("force_cpu") == True: + self._skip_opt.update(op.output_arg_names()) + + def release_memory(self): self._build_graph() self._dataflow_analyze() + self._update_skip_opt_set() + fwd_id = 0 + bwd_id = 0 + for i in range(self.op_size): + op = self._ops[i] + if op.type() in sub_block_ops: + continue + block_desc = op.block() + is_forward = i < self._forward_num + in_diff, out_diff = self._get_diff(self._live_in[i], + self._live_out[i]) + can_optimize = filter( + lambda x: self._check_var_validity(block_desc, x, is_forward), + in_diff) + if can_optimize: + index = i + fwd_id + 1 if is_forward else i - self._forward_num + bwd_id + 1 + delete_op = block_desc.insert_op(index) + delete_op.set_type("delete_var") + delete_op.set_input("X", can_optimize) + if is_forward: + fwd_id += 1 + else: + bwd_id += 1 + + def memory_optimize(self, level=0): + def compare_shape(x_shape, cache_shape, opt_level): + if opt_level == 0: + return x_shape == cache_shape + if opt_level == 1: + if (x_shape[0] == -1) ^ (cache_shape[0] == -1): + return False + x_size = abs(reduce(lambda x, y: x * y, x_shape)) + cache_size = abs(reduce(lambda x, y: x * y, cache_shape)) + if x_size <= cache_size: + return True + return False + + self._build_graph() + self._dataflow_analyze() + self._update_skip_opt_set() self.pool = [] for i in range(self.op_size): op = self._ops[i] if op.type() in sub_block_ops: continue block_desc = op.block() + self.current_block_desc = block_desc is_forward = i < self._forward_num if self.pool: defs_can_optimize = filter( - lambda x: check_var_validity(block_desc, x, is_forward), + lambda x: self._check_var_validity(block_desc, x, is_forward), self._defs[i]) out_pair = [ (x, self._find_var(block_desc, x, is_forward).shape()) @@ -164,7 +213,7 @@ class ControlFlowGraph(object): for index, cache_pair in enumerate(self.pool): cache_var = cache_pair[0] cache_shape = cache_pair[1] - if x_shape == cache_shape: + if compare_shape(x_shape, cache_shape, level): if self._has_var(block_desc, cache_var, is_forward): x_dtype = self._find_var(block_desc, x, is_forward).dtype() @@ -196,7 +245,7 @@ class ControlFlowGraph(object): in_diff, out_diff = self._get_diff(self._live_in[i], self._live_out[i]) can_optimize = filter( - lambda x: check_var_validity(block_desc, x, is_forward), + lambda x: self._check_var_validity(block_desc, x, is_forward), in_diff) if can_optimize: for var_name in can_optimize: @@ -270,7 +319,8 @@ def _get_cfgs(input_program): ([block_desc.op(i) for i in range(op_size)], op_size, set())) sub_block_pair = [("while", "while_grad"), ("parallel_do", - "parallel_do_grad")] + "parallel_do_grad"), + ("conditional_block", "conditional_block_grad")] ops_list.extend(_process_sub_block_pair(pdesc, sub_block_pair)) @@ -281,9 +331,15 @@ def _get_cfgs(input_program): return cfgs -def memory_optimize(input_program, print_log=False): +def memory_optimize(input_program, print_log=False, level=0): global PRINT_LOG PRINT_LOG = print_log cfgs = _get_cfgs(input_program) for cfg in cfgs: - cfg.memory_optimize() + cfg.memory_optimize(level) + + +def release_memory(input_program): + cfgs = _get_cfgs(input_program) + for cfg in cfgs: + cfg.release_memory() diff --git a/python/paddle/fluid/tests/book_memory_optimization/test_memopt_fit_a_line.py b/python/paddle/fluid/tests/book_memory_optimization/test_memopt_fit_a_line.py index c9d2a5ecaab0669f308b5b9c5cf74d0212fa462a..ad79e96b958b36a06c8a3cc990dbe3608e32c9ac 100644 --- a/python/paddle/fluid/tests/book_memory_optimization/test_memopt_fit_a_line.py +++ b/python/paddle/fluid/tests/book_memory_optimization/test_memopt_fit_a_line.py @@ -50,6 +50,7 @@ sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.01) sgd_optimizer.minimize(avg_cost) fluid.memory_optimize(fluid.default_main_program(), print_log=True) +# fluid.release_memory(fluid.default_main_program()) BATCH_SIZE = 200 @@ -69,8 +70,6 @@ exe.run(fluid.default_startup_program()) PASS_NUM = 100 for pass_id in range(PASS_NUM): - fluid.io.save_persistables(exe, "./fit_a_line.model/") - fluid.io.load_persistables(exe, "./fit_a_line.model/") for data in train_reader(): avg_loss_value, = exe.run(fluid.default_main_program(), feed=feeder.feed(data), diff --git a/python/paddle/fluid/tests/book_memory_optimization/test_memopt_image_classification_train.py b/python/paddle/fluid/tests/book_memory_optimization/test_memopt_image_classification_train.py index 80ff11f8d78b0a22fc6aefd722c9e6a2c23fbd5c..204669d7e6176e9e8250e8aebc2d10441fa24b67 100644 --- a/python/paddle/fluid/tests/book_memory_optimization/test_memopt_image_classification_train.py +++ b/python/paddle/fluid/tests/book_memory_optimization/test_memopt_image_classification_train.py @@ -125,9 +125,10 @@ opts = optimizer.minimize(avg_cost) batch_size = fluid.layers.create_tensor(dtype='int64') batch_acc = fluid.layers.accuracy(input=predict, label=label, total=batch_size) -fluid.memory_optimize(fluid.default_main_program()) +# fluid.memory_optimize(fluid.default_main_program(), level=0) +fluid.release_memory(fluid.default_main_program()) -BATCH_SIZE = 128 +BATCH_SIZE = 16 PASS_NUM = 1 # fix the order of training data @@ -159,8 +160,7 @@ for pass_id in range(PASS_NUM): print("loss:" + str(loss) + " acc:" + str(acc) + " pass_acc:" + str( pass_acc)) # this model is slow, so if we can train two mini batch, we think it works properly. - - if i > 2: + if i > 0: exit(0) if math.isnan(float(loss)): sys.exit("got NaN loss, training failed.") diff --git a/python/paddle/fluid/tests/book_memory_optimization/test_memopt_machine_translation.py b/python/paddle/fluid/tests/book_memory_optimization/test_memopt_machine_translation.py index 689a75afc7ccdf84142f5531a438e1f9af7af4ca..a24834a6f0b19d1265f6c8d7089d31583af82d1f 100644 --- a/python/paddle/fluid/tests/book_memory_optimization/test_memopt_machine_translation.py +++ b/python/paddle/fluid/tests/book_memory_optimization/test_memopt_machine_translation.py @@ -105,7 +105,8 @@ def main(): optimizer = fluid.optimizer.Adagrad(learning_rate=1e-4) optimizer.minimize(avg_cost) - fluid.memory_optimize(fluid.default_main_program()) + # fluid.memory_optimize(fluid.default_main_program()) + fluid.release_memory(fluid.default_main_program()) # fix the order of training data train_data = paddle.batch( diff --git a/python/paddle/fluid/tests/unittests/test_learning_rate_scheduler.py b/python/paddle/fluid/tests/unittests/test_learning_rate_scheduler.py index 00a6f7c237d58458ea083abf47dd09585cd6f235..6382e290eb30c621da64d5c600be6d8a7c6254f1 100644 --- a/python/paddle/fluid/tests/unittests/test_learning_rate_scheduler.py +++ b/python/paddle/fluid/tests/unittests/test_learning_rate_scheduler.py @@ -98,6 +98,9 @@ class TestLearningRateDecay(unittest.TestCase): exe = fluid.Executor(place) exe.run(fluid.default_startup_program()) + + fluid.memory_optimize(fluid.default_main_program()) + for step in range(10): lr_val, = exe.run(fluid.default_main_program(), feed={},