未验证 提交 f7e9fe57 编写于 作者: Q QI JUN 提交者: GitHub

[Memory]More memory optimization policy (#8690)

* add memopt level

* add opt level for image classification demo

* clean code

* add delete op

* clean code

* test machine translation demo

* clean code

* clean code

* skip fill constant with force cpu

* clean code

* clean code

* refine code

* clean code

* fix bug
上级 607eec30
......@@ -135,6 +135,14 @@ OpDesc *BlockDesc::PrependOp() {
return ops_.front().get();
}
OpDesc *BlockDesc::InsertOp(size_t index) {
need_update_ = true;
auto it = ops_.begin() + index;
std::unique_ptr<OpDesc> new_op(new OpDesc(this));
it = ops_.insert(it, std::move(new_op));
return (*it).get();
}
void BlockDesc::RemoveOp(size_t s, size_t e) {
if (ops_.begin() + s == ops_.end() || ops_.begin() + e == ops_.end()) {
return;
......
......@@ -87,6 +87,8 @@ class BlockDesc {
OpDesc *PrependOp();
OpDesc *InsertOp(size_t index);
void RemoveOp(size_t s, size_t e);
std::vector<OpDesc *> AllOps() const;
......
......@@ -16,6 +16,7 @@ limitations under the License. */
#include <memory> // for unique_ptr
#include <mutex> // for call_once
#include <set>
#include "glog/logging.h"
#include "paddle/fluid/framework/threadpool.h"
#include "paddle/fluid/string/printf.h"
......@@ -102,6 +103,18 @@ void Scope::DeleteScope(Scope* scope) {
}
}
void Scope::EraseVars(std::vector<std::string>& var_names) {
std::set<std::string> var_set(var_names.begin(), var_names.end());
for (auto it = vars_.begin(); it != vars_.end();) {
if (var_set.find(it->first) != var_set.end()) {
delete it->second;
it = vars_.erase(it);
} else {
++it;
}
}
}
void Scope::Rename(const std::string& origin_name,
const std::string& new_name) const {
auto origin_it = vars_.find(origin_name);
......
......@@ -51,6 +51,8 @@ class Scope {
/// Create a variable with a scope-unique name.
Variable* Var(std::string* name = nullptr);
void EraseVars(std::vector<std::string>& var_names);
/// Find a variable in the scope or any of its ancestors. Returns
/// nullptr if cannot find.
Variable* FindVar(const std::string& name) const;
......
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/framework/operator.h"
namespace paddle {
namespace operators {
class DeleteVarOp : public framework::OperatorBase {
public:
DeleteVarOp(const std::string &type, const framework::VariableNameMap &inputs,
const framework::VariableNameMap &outputs,
const framework::AttributeMap &attrs)
: OperatorBase(type, inputs, outputs, attrs) {}
void RunImpl(const framework::Scope &scope,
const platform::Place &place) const override {
// get device context from pool
platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance();
auto &dev_ctx = *pool.Get(place);
dev_ctx.Wait();
auto delete_var_names = Inputs("X");
const_cast<framework::Scope &>(scope).EraseVars(delete_var_names);
}
};
class DeleteVarOpInfoMaker : public framework::OpProtoAndCheckerMaker {
public:
DeleteVarOpInfoMaker(OpProto *proto, OpAttrChecker *op_checker)
: OpProtoAndCheckerMaker(proto, op_checker) {
AddInput("X", "The input of delete op").AsDuplicable();
AddComment(R"DOC(
Delete Operator.
It should not be configured by users directly.
)DOC");
}
};
} // namespace operators
} // namespace paddle
REGISTER_OPERATOR(delete_var, paddle::operators::DeleteVarOp,
paddle::framework::EmptyGradOpMaker,
paddle::operators::DeleteVarOpInfoMaker);
......@@ -161,6 +161,8 @@ void BindBlockDesc(py::module &m) {
py::return_value_policy::reference)
.def("prepend_op", &BlockDesc::PrependOp,
py::return_value_policy::reference)
.def("insert_op", &BlockDesc::InsertOp,
py::return_value_policy::reference)
.def("remove_op", &BlockDesc::RemoveOp)
.def("var",
[](BlockDesc &self, py::bytes byte_name) {
......
......@@ -37,7 +37,7 @@ from distribute_transpiler_simple import SimpleDistributeTranspiler
from concurrency import (Go, make_channel, channel_send, channel_recv,
channel_close)
import clip
from memory_optimization_transpiler import memory_optimize
from memory_optimization_transpiler import memory_optimize, release_memory
import profiler
import unique_name
......@@ -63,6 +63,7 @@ __all__ = framework.__all__ + executor.__all__ + concurrency.__all__ + [
'SimpleDistributeTranspiler',
'DistributeTranspiler',
'memory_optimize',
'release_memory',
'profiler',
'unique_name',
]
......
......@@ -457,7 +457,8 @@ def append_backward(loss, parameter_list=None, no_grad_set=None,
"Out": [_append_grad_suffix_(loss.name)]
}, {"shape": [1],
"value": 1.0,
"dtype": loss.dtype})
"dtype": loss.dtype,
"force_cpu": False})
root_block.desc.append_op().copy_from(op_desc)
block_no_grad_set = set(map(_strip_grad_suffix_, no_grad_dict[0]))
......
......@@ -29,7 +29,10 @@ dtype_to_size = {
core.VarDesc.VarType.BOOL: 1
}
sub_block_ops = ["while", "while_grad", "parallel_do", "parallel_do_grad"]
sub_block_ops = [
"while", "while_grad", "parallel_do", "parallel_do_grad",
"conditional_block", "conditional_block_grad"
]
PRINT_LOG = False
......@@ -122,16 +125,14 @@ class ControlFlowGraph(object):
else:
return block_desc.find_var_recursive(str(var_name))
def memory_optimize(self):
def check_var_validity(block_desc, x, is_forward):
def _check_var_validity(self, block_desc, x, is_forward):
if str(x) == "@EMPTY@":
return False
if not self._has_var(block_desc, x, is_forward):
return False
if self._find_var(block_desc, x, is_forward).persistable():
return False
if self._find_var(
block_desc, x,
if self._find_var(block_desc, x,
is_forward).type() != core.VarDesc.VarType.LOD_TENSOR:
return False
if x in self._skip_opt:
......@@ -140,18 +141,66 @@ class ControlFlowGraph(object):
return False
return True
def _update_skip_opt_set(self):
for i in range(self.op_size):
op = self._ops[i]
if op.type() == "fill_constant" and op.attr("force_cpu") == True:
self._skip_opt.update(op.output_arg_names())
def release_memory(self):
self._build_graph()
self._dataflow_analyze()
self._update_skip_opt_set()
fwd_id = 0
bwd_id = 0
for i in range(self.op_size):
op = self._ops[i]
if op.type() in sub_block_ops:
continue
block_desc = op.block()
is_forward = i < self._forward_num
in_diff, out_diff = self._get_diff(self._live_in[i],
self._live_out[i])
can_optimize = filter(
lambda x: self._check_var_validity(block_desc, x, is_forward),
in_diff)
if can_optimize:
index = i + fwd_id + 1 if is_forward else i - self._forward_num + bwd_id + 1
delete_op = block_desc.insert_op(index)
delete_op.set_type("delete_var")
delete_op.set_input("X", can_optimize)
if is_forward:
fwd_id += 1
else:
bwd_id += 1
def memory_optimize(self, level=0):
def compare_shape(x_shape, cache_shape, opt_level):
if opt_level == 0:
return x_shape == cache_shape
if opt_level == 1:
if (x_shape[0] == -1) ^ (cache_shape[0] == -1):
return False
x_size = abs(reduce(lambda x, y: x * y, x_shape))
cache_size = abs(reduce(lambda x, y: x * y, cache_shape))
if x_size <= cache_size:
return True
return False
self._build_graph()
self._dataflow_analyze()
self._update_skip_opt_set()
self.pool = []
for i in range(self.op_size):
op = self._ops[i]
if op.type() in sub_block_ops:
continue
block_desc = op.block()
self.current_block_desc = block_desc
is_forward = i < self._forward_num
if self.pool:
defs_can_optimize = filter(
lambda x: check_var_validity(block_desc, x, is_forward),
lambda x: self._check_var_validity(block_desc, x, is_forward),
self._defs[i])
out_pair = [
(x, self._find_var(block_desc, x, is_forward).shape())
......@@ -164,7 +213,7 @@ class ControlFlowGraph(object):
for index, cache_pair in enumerate(self.pool):
cache_var = cache_pair[0]
cache_shape = cache_pair[1]
if x_shape == cache_shape:
if compare_shape(x_shape, cache_shape, level):
if self._has_var(block_desc, cache_var, is_forward):
x_dtype = self._find_var(block_desc, x,
is_forward).dtype()
......@@ -196,7 +245,7 @@ class ControlFlowGraph(object):
in_diff, out_diff = self._get_diff(self._live_in[i],
self._live_out[i])
can_optimize = filter(
lambda x: check_var_validity(block_desc, x, is_forward),
lambda x: self._check_var_validity(block_desc, x, is_forward),
in_diff)
if can_optimize:
for var_name in can_optimize:
......@@ -270,7 +319,8 @@ def _get_cfgs(input_program):
([block_desc.op(i) for i in range(op_size)], op_size, set()))
sub_block_pair = [("while", "while_grad"), ("parallel_do",
"parallel_do_grad")]
"parallel_do_grad"),
("conditional_block", "conditional_block_grad")]
ops_list.extend(_process_sub_block_pair(pdesc, sub_block_pair))
......@@ -281,9 +331,15 @@ def _get_cfgs(input_program):
return cfgs
def memory_optimize(input_program, print_log=False):
def memory_optimize(input_program, print_log=False, level=0):
global PRINT_LOG
PRINT_LOG = print_log
cfgs = _get_cfgs(input_program)
for cfg in cfgs:
cfg.memory_optimize()
cfg.memory_optimize(level)
def release_memory(input_program):
cfgs = _get_cfgs(input_program)
for cfg in cfgs:
cfg.release_memory()
......@@ -50,6 +50,7 @@ sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.01)
sgd_optimizer.minimize(avg_cost)
fluid.memory_optimize(fluid.default_main_program(), print_log=True)
# fluid.release_memory(fluid.default_main_program())
BATCH_SIZE = 200
......@@ -69,8 +70,6 @@ exe.run(fluid.default_startup_program())
PASS_NUM = 100
for pass_id in range(PASS_NUM):
fluid.io.save_persistables(exe, "./fit_a_line.model/")
fluid.io.load_persistables(exe, "./fit_a_line.model/")
for data in train_reader():
avg_loss_value, = exe.run(fluid.default_main_program(),
feed=feeder.feed(data),
......
......@@ -125,9 +125,10 @@ opts = optimizer.minimize(avg_cost)
batch_size = fluid.layers.create_tensor(dtype='int64')
batch_acc = fluid.layers.accuracy(input=predict, label=label, total=batch_size)
fluid.memory_optimize(fluid.default_main_program())
# fluid.memory_optimize(fluid.default_main_program(), level=0)
fluid.release_memory(fluid.default_main_program())
BATCH_SIZE = 128
BATCH_SIZE = 16
PASS_NUM = 1
# fix the order of training data
......@@ -159,8 +160,7 @@ for pass_id in range(PASS_NUM):
print("loss:" + str(loss) + " acc:" + str(acc) + " pass_acc:" + str(
pass_acc))
# this model is slow, so if we can train two mini batch, we think it works properly.
if i > 2:
if i > 0:
exit(0)
if math.isnan(float(loss)):
sys.exit("got NaN loss, training failed.")
......
......@@ -105,7 +105,8 @@ def main():
optimizer = fluid.optimizer.Adagrad(learning_rate=1e-4)
optimizer.minimize(avg_cost)
fluid.memory_optimize(fluid.default_main_program())
# fluid.memory_optimize(fluid.default_main_program())
fluid.release_memory(fluid.default_main_program())
# fix the order of training data
train_data = paddle.batch(
......
......@@ -98,6 +98,9 @@ class TestLearningRateDecay(unittest.TestCase):
exe = fluid.Executor(place)
exe.run(fluid.default_startup_program())
fluid.memory_optimize(fluid.default_main_program())
for step in range(10):
lr_val, = exe.run(fluid.default_main_program(),
feed={},
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册