未验证 提交 c194b0c8 编写于 作者: Z Zeng Jinle 提交者: GitHub

Try to deprecate unstable python memory optimize (#18983)

* deprecate python memory optimize, test=develop

* remove memory_optimize in unittests, test=develop

* add unittests to deprecated interfaces, test=develop
上级 5a80cc84
......@@ -102,8 +102,6 @@ class TestDistSaveLoad2x2(TestDistSimnetBow2x2):
test_program, avg_cost, train_reader, test_reader, batch_acc, predict = \
self.get_model(batch_size=2)
if args.mem_opt:
fluid.memory_optimize(fluid.default_main_program(), skip_grads=True)
if args.update_method == "pserver":
t = self.get_transpiler(args.trainer_id,
fluid.default_main_program(),
......
......@@ -43,7 +43,6 @@ class BuildIrMemOptBase(unittest.TestCase):
def check_network_convergence(self,
network,
use_cuda=True,
memory_opt=True,
use_ir_memory_optimize=True,
enable_inplace=True,
iter=5):
......@@ -68,13 +67,8 @@ class BuildIrMemOptBase(unittest.TestCase):
optimizer = fluid.optimizer.Adam(learning_rate=0.001)
optimizer.minimize(cost)
build_strategy = fluid.BuildStrategy()
build_strategy.enable_inplace = False
build_strategy.memory_optimize = False
if memory_opt:
fluid.memory_optimize(fluid.default_main_program())
else:
build_strategy.enable_inplace = use_ir_memory_optimize
build_strategy.memory_optimize = enable_inplace
build_strategy.enable_inplace = enable_inplace
build_strategy.memory_optimize = use_ir_memory_optimize
# execution
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
......@@ -134,7 +128,7 @@ class TestIrMemOptBase(BuildIrMemOptBase):
self.network)
cur_first_loss, cur_last_loss = self.check_network_convergence(
self.network, memory_opt=False)
self.network)
self.assertAlmostEquals(
np.mean(baseline_last_loss),
......
......@@ -33,7 +33,6 @@ class TestParallelExecutorBase(unittest.TestCase):
def check_network_convergence(cls,
method,
use_cuda=True,
memory_opt=False,
iter=50,
batch_size=None,
feed_dict=None,
......@@ -59,8 +58,7 @@ class TestParallelExecutorBase(unittest.TestCase):
main.random_seed = 1
with fluid.program_guard(main, startup):
feed_dict, loss = cls.build_model(feed_dict, get_data_from_feeder,
main, memory_opt, method,
optimizer)
main, method, optimizer)
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
exe = fluid.Executor(place)
......@@ -112,7 +110,6 @@ class TestParallelExecutorBase(unittest.TestCase):
def check_pass_conflict(cls,
method,
use_cuda=True,
memory_opt=False,
feed_dict=None,
get_data_from_feeder=None,
use_reduce=False,
......@@ -130,8 +127,7 @@ class TestParallelExecutorBase(unittest.TestCase):
startup = fluid.Program()
with fluid.program_guard(main, startup):
feed_dict, loss = cls.build_model(feed_dict, get_data_from_feeder,
main, memory_opt, method,
optimizer)
main, method, optimizer)
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
exe = fluid.Executor(place)
......@@ -175,16 +171,17 @@ class TestParallelExecutorBase(unittest.TestCase):
return build_strategy, exec_strategy
@classmethod
def build_model(cls, feed_dict, get_data_from_feeder, main, memory_opt,
method, optimizer):
def build_model(cls, feed_dict, get_data_from_feeder, main, method,
optimizer):
loss = method(use_feed=feed_dict is not None)
# NOTE(zjl): memory_optimize/inplace pass would not require
# that loss.persistable = True
loss.persistable = memory_opt
# that loss.persistable = True.
# We set loss.persistable = False here to verify our memory
# optimization strategies intentionally.
loss.persistable = False
if optimizer:
optimizer().minimize(loss)
if memory_opt:
fluid.memory_optimize(main)
if get_data_from_feeder is not None:
assert feed_dict is None
feed_dict = get_data_from_feeder()
......
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle.fluid as fluid
import unittest
from simple_nets import simple_fc_net
class DeprecatedMemoryOptimizationInterfaceTest(unittest.TestCase):
def setUp(self):
self.method = fluid.memory_optimize
def build_network(self, call_interface):
startup_prog = fluid.Program()
main_prog = fluid.Program()
with fluid.program_guard(main_prog, startup_prog):
with fluid.unique_name.guard():
loss = simple_fc_net()
opt = fluid.optimizer.Adam(learning_rate=1e-3)
opt.minimize(loss)
if call_interface:
self.method(main_prog)
return main_prog
def assert_program_equal(self, prog1, prog2):
block_num = prog1.num_blocks
self.assertEquals(block_num, prog2.num_blocks)
for block_id in range(block_num):
block1 = prog1.block(block_id)
block2 = prog2.block(block_id)
self.assertEquals(len(block1.ops), len(block2.ops))
for op1, op2 in zip(block1.ops, block2.ops):
self.assertEquals(op1.input_arg_names, op2.input_arg_names)
self.assertEquals(op1.output_arg_names, op2.output_arg_names)
self.assertEquals(len(block1.vars), len(block2.vars))
for var1 in block1.vars.values():
self.assertTrue(var1.name in block2.vars)
var2 = block2.vars.get(var1.name)
self.assertEquals(var1.name, var2.name)
def test_main(self):
prog1 = self.build_network(False)
prog2 = self.build_network(True)
self.assert_program_equal(prog1, prog2)
class ReleaseMemoryTest(DeprecatedMemoryOptimizationInterfaceTest):
def setUp(self):
self.method = fluid.release_memory
if __name__ == '__main__':
unittest.main()
......@@ -108,10 +108,6 @@ class TestDistRunnerBase(object):
test_program, avg_cost, train_reader, test_reader, batch_acc, predict = \
self.get_model(batch_size=args.batch_size)
if args.mem_opt:
my_print(type(self).__name__, "begin to run memory optimize")
fluid.memory_optimize(fluid.default_main_program(), skip_grads=True)
my_print(type(self).__name__, "trainer run memory optimize done.")
if args.update_method == "pserver":
my_print(
type(self).__name__,
......@@ -327,7 +323,6 @@ def runtime_main(test_class):
parser.add_argument(
'--current_endpoint', type=str, required=False, default="")
parser.add_argument('--sync_mode', action='store_true')
parser.add_argument('--mem_opt', action='store_true')
parser.add_argument('--use_cuda', action='store_true')
parser.add_argument('--use_dgc', action='store_true')
parser.add_argument('--use_reduce', action='store_true')
......@@ -387,7 +382,6 @@ class TestDistBase(unittest.TestCase):
self._python_interp = sys.executable
self._sync_mode = True
self._enforce_place = None
self._mem_opt = False
self._use_reduce = False
self._dc_asgd = False # must use with async mode
self._use_reader_alloc = True
......@@ -435,9 +429,6 @@ class TestDistBase(unittest.TestCase):
if self._sync_mode:
ps0_cmd += " --sync_mode"
ps1_cmd += " --sync_mode"
if self._mem_opt:
ps0_cmd += " --mem_opt"
ps1_cmd += " --mem_opt"
print(ps0_cmd)
print(ps1_cmd)
......@@ -530,9 +521,6 @@ class TestDistBase(unittest.TestCase):
if self._sync_mode:
tr0_cmd += " --sync_mode"
tr1_cmd += " --sync_mode"
if self._mem_opt:
tr0_cmd += " --mem_opt"
tr1_cmd += " --mem_opt"
if self._use_reduce:
tr0_cmd += " --use_reduce"
tr1_cmd += " --use_reduce"
......@@ -603,8 +591,6 @@ class TestDistBase(unittest.TestCase):
(self._python_interp, model, self._ps_endpoints,
trainer_id, ep, update_method, self._lr)
if self._mem_opt:
tr_cmd += " --mem_opt"
if self._use_reduce:
tr_cmd += " --use_reduce"
if self._use_reader_alloc:
......
......@@ -49,7 +49,6 @@ class TestFuseAllReduceOpsBase(TestParallelExecutorBase):
use_cuda=use_cuda,
fuse_all_reduce_ops=False,
fuse_all_optimizer_ops=fuse_all_optimizer_ops,
memory_opt=False,
optimizer=optimizer)
fuse_op_first_loss, fuse_op_last_loss = self.check_network_convergence(
model,
......@@ -58,7 +57,6 @@ class TestFuseAllReduceOpsBase(TestParallelExecutorBase):
use_cuda=use_cuda,
fuse_all_reduce_ops=True,
fuse_all_optimizer_ops=fuse_all_optimizer_ops,
memory_opt=False,
optimizer=optimizer)
for loss in zip(not_fuse_op_first_loss, fuse_op_first_loss):
......
......@@ -47,7 +47,6 @@ class TestMNIST(TestParallelExecutorBase):
"label": label},
use_cuda=use_cuda,
fuse_elewise_add_act_ops=False,
memory_opt=False,
use_ir_memory_optimize=False,
enable_inplace=False,
optimizer=_optimizer)
......@@ -57,7 +56,6 @@ class TestMNIST(TestParallelExecutorBase):
"label": label},
use_cuda=use_cuda,
fuse_elewise_add_act_ops=True,
memory_opt=False,
use_ir_memory_optimize=False,
enable_inplace=False,
optimizer=_optimizer)
......
......@@ -46,7 +46,6 @@ class TestFuseOptimizationOps(TestParallelExecutorBase):
get_data_from_feeder=get_data_from_feeder,
use_cuda=use_cuda,
fuse_all_optimizer_ops=False,
memory_opt=False, # avoid the gradient's name changed in Python side.
optimizer=optimizer)
fuse_op_first_loss, fuse_op_last_loss = self.check_network_convergence(
model,
......@@ -54,7 +53,6 @@ class TestFuseOptimizationOps(TestParallelExecutorBase):
get_data_from_feeder=get_data_from_feeder,
use_cuda=use_cuda,
fuse_all_optimizer_ops=True,
memory_opt=False, # avoid the gradient's name changed in Python side.
optimizer=optimizer)
for loss in zip(not_fuse_op_first_loss, fuse_op_first_loss):
......@@ -152,7 +150,6 @@ class TestPassConflictBase(TestFuseAdamOps):
get_data_from_feeder=get_data_from_feeder,
use_cuda=use_cuda,
fuse_all_optimizer_ops=True,
memory_opt=False, # avoid the gradient's name changed in Python side.
optimizer=optimizer,
enable_sequential_execution=True)
......
......@@ -120,7 +120,6 @@ class TestMNIST(TestParallelExecutorBase):
use_cuda=use_cuda,
fuse_relu_depthwise_conv=True,
use_ir_memory_optimize=True,
memory_opt=False,
optimizer=_optimizer)
not_fuse_op_first_loss, not_fuse_op_last_loss = self.check_network_convergence(
model,
......@@ -128,7 +127,6 @@ class TestMNIST(TestParallelExecutorBase):
"label": label},
use_cuda=use_cuda,
fuse_relu_depthwise_conv=False,
memory_opt=False,
optimizer=_optimizer)
for loss in zip(not_fuse_op_first_loss, fuse_op_first_loss):
......
......@@ -109,9 +109,6 @@ class TestSaveInferenceModel(unittest.TestCase):
exe = executor.Executor(place)
exe.run(init_program, feed={}, fetch_list=[])
memory_optimize(program, print_log=True)
self.assertEqual(program._is_mem_optimized, True)
# will print warning message
save_inference_model(MODEL_DIR, ["x", "y"], [avg_cost], exe, program)
......
......@@ -47,10 +47,7 @@ class TestIrInplace(TestParallelExecutorBase):
def setUpClass(cls):
os.environ['CPU_NUM'] = str(4)
def _fc_with_batchnorm(self,
ir_memory_optimize,
enable_inplace,
memory_opt=False):
def _fc_with_batchnorm(self, ir_memory_optimize, enable_inplace):
if not core.is_compiled_with_cuda():
return
......@@ -62,7 +59,6 @@ class TestIrInplace(TestParallelExecutorBase):
feed_dict={"image": img,
"label": label},
use_cuda=True,
memory_opt=memory_opt,
use_ir_memory_optimize=ir_memory_optimize,
enable_inplace=enable_inplace)
......
......@@ -33,7 +33,9 @@ from ir_memory_optimize_net_base import TestIrMemOptBase
class TestIrMemoryOptimizeIfElseOp(unittest.TestCase):
def check_network_convergence(self, use_cuda=True, py_opt=False,
def check_network_convergence(self,
use_cuda=True,
use_mem_opt=False,
iter_num=5):
prog = Program()
startup_prog = Program()
......@@ -75,11 +77,14 @@ class TestIrMemoryOptimizeIfElseOp(unittest.TestCase):
exec_strategy = fluid.ExecutionStrategy()
exec_strategy.use_cuda = use_cuda
if py_opt:
fluid.memory_optimize(fluid.default_main_program())
build_strategy = fluid.BuildStrategy()
build_strategy.memory_optimize = use_mem_opt
train_cp = compiler.CompiledProgram(fluid.default_main_program())
train_cp = train_cp.with_data_parallel(
loss_name=avg_loss.name, exec_strategy=exec_strategy)
loss_name=avg_loss.name,
exec_strategy=exec_strategy,
build_strategy=build_strategy)
fetch_list = [avg_loss.name]
exe.run(startup_prog)
......@@ -116,7 +121,6 @@ class TestIrMemoryOptimizeIfElseOp(unittest.TestCase):
ret2 = self.check_network_convergence(True, False)
print(ret2)
self.assertTrue(np.allclose(ret1, ret2))
#self.assertEqual(ret1, ret2)
if __name__ == "__main__":
......
......@@ -86,7 +86,7 @@ class TestMNIST(TestParallelExecutorBase):
label = np.ones(shape=[32, 1], dtype='int64')
return img, label
def _compare_ir_and_python_memory_optimize(self, model, use_cuda):
def _compare_ir_memory_optimize(self, model, use_cuda):
if use_cuda and not core.is_compiled_with_cuda():
return
......@@ -96,14 +96,12 @@ class TestMNIST(TestParallelExecutorBase):
feed_dict={"image": img,
"label": label},
use_cuda=use_cuda,
memory_opt=False,
use_ir_memory_optimize=False)
first_loss1, last_loss1 = self.check_network_convergence(
model,
feed_dict={"image": img,
"label": label},
use_cuda=use_cuda,
memory_opt=False,
use_ir_memory_optimize=True)
for loss in zip(first_loss0, first_loss1):
self.assertAlmostEqual(loss[0], loss[1], delta=1e-6)
......@@ -111,12 +109,12 @@ class TestMNIST(TestParallelExecutorBase):
self.assertAlmostEqual(loss[0], loss[1], delta=1e-6)
def test_simple_fc_net(self):
self._compare_ir_and_python_memory_optimize(simple_fc_net, False)
self._compare_ir_and_python_memory_optimize(simple_fc_net, True)
self._compare_ir_memory_optimize(simple_fc_net, False)
self._compare_ir_memory_optimize(simple_fc_net, True)
def test_fc_with_reshape_net(self):
self._compare_ir_and_python_memory_optimize(fc_with_inplace_net, False)
self._compare_ir_and_python_memory_optimize(fc_with_inplace_net, True)
self._compare_ir_memory_optimize(fc_with_inplace_net, False)
self._compare_ir_memory_optimize(fc_with_inplace_net, True)
if __name__ == '__main__':
......
......@@ -57,16 +57,11 @@ class TestTransformerWithIR(TestParallelExecutorBase):
self.check_network_convergence(
transformer,
use_cuda=True,
memory_opt=True,
use_ir_memory_optimize=False,
iter=2)
# check IR memory optimize
self.check_network_convergence(
transformer,
use_cuda=True,
memory_opt=False,
use_ir_memory_optimize=True,
iter=2)
transformer, use_cuda=True, use_ir_memory_optimize=True, iter=2)
if __name__ == '__main__':
......
......@@ -111,8 +111,6 @@ class TestLearningRateDecay(unittest.TestCase):
exe.run(startup_prog)
fluid.memory_optimize(main_prog)
for step in range(10):
lr_val, = exe.run(main_prog, feed={}, fetch_list=[decayed_lr])
python_decayed_lr = python_decay_fn(
......
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import unittest
import paddle.fluid as fluid
import paddle.fluid.layers as layers
import paddle.fluid.optimizer as optimizer
from paddle.fluid.framework import Program, program_guard
from paddle.fluid.transpiler import memory_optimize
def _get_vars(prog):
assert (isinstance(prog, Program))
all_vars = set()
for op in prog.global_block().ops:
all_vars.update(op.input_arg_names)
all_vars.update(op.output_arg_names)
return all_vars
class TestControlFlowGraph(unittest.TestCase):
def setUp(self):
program = Program()
with program_guard(program, startup_program=Program()):
x = layers.data(name='x', shape=[13], dtype='float32')
y_predict = layers.fc(input=x, size=1, act=None)
y = layers.data(name='y', shape=[1], dtype='float32')
cost = layers.square_error_cost(input=y_predict, label=y)
avg_cost = layers.mean(cost)
opt = optimizer.SGD(learning_rate=0.001)
opt = opt.minimize(avg_cost)
self.program = program
def test_control_flow_graph(self):
result_program = self.program.clone()
memory_optimize(self.program)
old_vars = _get_vars(self.program)
new_vars = _get_vars(result_program)
self.assertTrue(old_vars != new_vars)
class TestMemoryTranspiler2(unittest.TestCase):
def setUp(self):
program = Program()
with program_guard(program, startup_program=Program()):
x = layers.data(name='x', shape=[13], dtype='float32')
fc = layers.fc(input=x, size=10, act=None)
reshape = layers.reshape(x=fc, shape=[-1, 2, 5])
fc = layers.reshape(x=reshape, shape=[-1, 5, 2])
y_predict = layers.fc(input=fc, size=1, act=None)
y = layers.data(name='y', shape=[1], dtype='float32')
cost = layers.square_error_cost(input=y_predict, label=y)
avg_cost = layers.mean(cost)
opt = optimizer.SGD(learning_rate=0.001)
opt.minimize(avg_cost)
self.skip_set = set([cost.name, fc.name])
self.program = program
def test_inplace_ops(self):
result_program = self.program.clone()
memory_optimize(self.program)
old_vars = _get_vars(self.program)
new_vars = _get_vars(result_program)
self.assertTrue(old_vars != new_vars)
def test_skip_opt(self):
result_program = self.program.clone()
memory_optimize(self.program, skip_opt_set=self.skip_set)
old_vars = _get_vars(self.program)
new_vars = _get_vars(result_program)
self.assertTrue(old_vars != new_vars)
class TestMemoryTranspiler3(unittest.TestCase):
def setUp(self):
program = Program()
with program_guard(program, startup_program=Program()):
word = fluid.layers.data(name='word', shape=[1], dtype='int64')
emb = [
fluid.layers.embedding(
word, size=[65536, 256], param_attr='emb') for _ in range(6)
]
left = emb.pop(0)
while len(emb) != 0:
right = emb.pop(0)
left = fluid.layers.concat([left, right])
emb = fluid.layers.mean(left)
fluid.backward.append_backward(emb)
self.program = program
def test_cascade_reuse(self):
block = self.program.block(0)
# variable reuse in programdesc
# TODO(dzhwinter): confirm cascade strategy. disable temporialy
self.assertTrue("concat_4.tmp_0@GRAD" in block.vars)
# self.assertTrue("concat_3.tmp_0@GRAD" not in block.vars)
# self.assertTrue("concat_2.tmp_0@GRAD" not in block.vars)
# self.assertTrue("concat_1.tmp_0@GRAD" not in block.vars)
# self.assertTrue("concat_0.tmp_0@GRAD" not in block.vars)
if __name__ == "__main__":
unittest.main()
......@@ -93,10 +93,6 @@ class TestFetchAndFeed(unittest.TestCase):
10).astype(np.int64)
yield img, l
# TODO(zcd): I found that onece the memory optimizer is open,
# parallel_exe doesn't fetch some variable, such as conv2d_0.b_0@GRAD,
# conv2d_1.b_0@GRAD. Those variables should not be pruned.
# fluid.memory_optimize(main)
fetch_list = []
all_vars = compiled_program._program.global_block().vars
......
......@@ -142,10 +142,6 @@ def test_main(use_cuda, use_py_func_op, use_parallel_executor):
exe = fluid.Executor(place)
exe.run(fluid.default_startup_program())
#FIXME force use old memory optimzie strategy here to pass the unittest
#since open the new strategy will crash the unittest
fluid.memory_optimize(fluid.default_main_program())
train_cp = compiler.CompiledProgram(fluid.default_main_program())
if use_parallel_executor:
train_cp = train_cp.with_data_parallel(loss_name=loss.name)
......
......@@ -12,486 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import logging
import six
import sys
from collections import defaultdict, MutableSet
from .. import core
from ... import compat as cpt
from ..framework import Program, default_main_program, Parameter, Variable, core
from ..backward import _rename_arg_
from functools import reduce
from six.moves import range
dtype_to_size = {
core.VarDesc.VarType.FP16: 2,
core.VarDesc.VarType.FP32: 4,
core.VarDesc.VarType.FP64: 8,
core.VarDesc.VarType.INT16: 2,
core.VarDesc.VarType.INT32: 4,
core.VarDesc.VarType.INT64: 8,
core.VarDesc.VarType.BOOL: 1,
core.VarDesc.VarType.UINT8: 1,
}
SUB_BLOCK_OPS = [
"while", "while_grad", "conditional_block", "conditional_block_grad"
]
SUB_BLOCK_PAIR = [("while", "while_grad"),
("conditional_block", "conditional_block_grad")]
PRINT_LOG = False
FLAGS_memory_optimize = ""
class OrderedSet(MutableSet):
def __init__(self, iterable=None):
self.end = end = []
end += [None, end, end] # sentinel node for doubly linked list
self.map = {} # key --> [key, prev, next]
if iterable is not None:
self |= iterable
def __len__(self):
return len(self.map)
def __contains__(self, key):
return key in self.map
def add(self, key):
if key not in self.map:
end = self.end
curr = end[1]
curr[2] = end[1] = self.map[key] = [key, curr, end]
def update(self, other):
for e in other:
self.add(e)
def discard(self, key):
if key in self.map:
key, prev, next = self.map.pop(key)
prev[2] = next
next[1] = prev
def remove(self, key):
self.discard(key)
def __iter__(self):
end = self.end
curr = end[2]
while curr is not end:
yield curr[0]
curr = curr[2]
def __reversed__(self):
end = self.end
curr = end[1]
while curr is not end:
yield curr[0]
curr = curr[1]
def pop(self, last=True):
if not self:
raise KeyError('set is empty')
key = self.end[1][0] if last else self.end[2][0]
self.discard(key)
return key
def __repr__(self):
if not self:
return '%s()' % (self.__class__.__name__, )
return '%s(%r)' % (self.__class__.__name__, list(self))
def __eq__(self, other):
if isinstance(other, OrderedSet):
return len(self) == len(other) and list(self) == list(other)
return set(self) == set(other)
class ControlFlowGraph(object):
def __init__(self, program, ops, forward_num, skip_opt):
self._program = program
self._ops = ops
self._forward_num = forward_num
self._successors = defaultdict(OrderedSet)
self._presuccessors = defaultdict(OrderedSet)
self._uses = defaultdict(OrderedSet)
self._defs = defaultdict(OrderedSet)
self._live_in = defaultdict(OrderedSet)
self._live_out = defaultdict(OrderedSet)
self._skip_opt = skip_opt
self.pool = []
def _add_connections(self, connections):
"""Populates _successors and _presuccessors for two neighbor nodes."""
for node1, node2 in connections:
self._add(node1, node2)
def _add(self, node1, node2):
self._successors[node1].add(node2)
self._presuccessors[node2].add(node1)
# TODO(panyx0718): We need to have a unified way of building intermediate
# representation.
def _build_graph(self):
"""Build a graph based on op sequence.
"""
self.op_size = len(self._ops)
op_node_connections = [(i, i + 1) for i in range(self.op_size - 1)]
self._add_connections(op_node_connections)
for i in range(self.op_size):
self._uses[i].update(self._ops[i].input_arg_names())
self._defs[i].update(self._ops[i].output_arg_names())
def _update_graph(self, old_name, new_name, begin_idx=0):
for i in range(begin_idx, self.op_size):
if old_name in self._uses[i]:
self._uses[i].remove(old_name)
self._uses[i].add(new_name)
if old_name in self._defs[i]:
self._defs[i].remove(old_name)
self._defs[i].add(new_name)
if old_name in self._live_in[i]:
self._live_in[i].remove(old_name)
self._live_in[i].add(new_name)
if old_name in self._live_out[i]:
self._live_out[i].remove(old_name)
self._live_out[i].add(new_name)
def _dataflow_analyze(self):
self._build_graph()
live_in = defaultdict(set)
worklist = list(range(len(self._ops) - 1, -1, -1))
while worklist:
i = worklist.pop(0)
live_in[i] = set(self._live_in[i])
for s in self._successors[i]:
self._live_out[i] |= self._live_in[s]
self._live_in[i] = self._uses[i] | (
self._live_out[i] - self._defs[i])
if live_in[i] != set(self._live_in[i]):
for d in self._presuccessors[i]:
worklist.append(d)
def _fill_pool(self, i, is_forward):
def comparator(x, cache):
x_shape = x[1]
cache_shape = cache[1]
x_size = abs(reduce(lambda x, y: x * y, x_shape))
cache_size = abs(reduce(lambda x, y: x * y, cache_shape))
if (x_shape[0] == -1 and cache_shape[0] == -1) or \
(x_shape[0] != -1 and cache_shape[0] != -1) :
return x_size <= cache_size
else:
return False
def find_var_in_block(x):
known_vars = set()
for op in self._ops:
known_vars.update(op.output_arg_names())
return x in known_vars
block_desc = self._ops[i].block()
in_diff, _ = self._get_diff(self._live_in[i], self._live_out[i])
# NOTE: must sort the in_diff set for cases that get different cache var.
# FIXME(typhoonzero): maybe use a "sorted set" is better than this.
can_optimize = [
x for x in sorted(in_diff)
if self._check_var_validity(block_desc, x, is_forward)
]
if can_optimize:
for var_name in can_optimize:
cache = (var_name, self._find_var(block_desc, var_name,
is_forward).shape())
if cache not in self.pool and find_var_in_block(var_name):
i = 0
while i < len(self.pool):
mycache = self.pool[i]
mysize = mycache[1][0]
cache_size = cache[1][0]
if (mysize == -1 and cache_size == -1) or \
(mysize != -1 and cache_size != -1):
if comparator(mycache, cache):
i += 1
else:
break
elif mysize == -1 and cache_size != -1:
i += 1
elif mysize != -1 and cache_size == -1:
break
self.pool.insert(i, cache)
def _get_diff(self, a, b):
u = a & b
return a - u, b - u
def _has_var(self, block_desc, var_name, is_forward):
if is_forward:
return block_desc.has_var(cpt.to_bytes(var_name))
else:
return block_desc.has_var_recursive(cpt.to_bytes(var_name))
def _find_var(self, block_desc, var_name, is_forward):
if is_forward:
return block_desc.find_var(cpt.to_bytes(var_name))
else:
return block_desc.find_var_recursive(cpt.to_bytes(var_name))
def _check_var_validity(self, block_desc, x, is_forward):
if str(x) == "@EMPTY@":
return False
if not self._has_var(block_desc, x, is_forward):
return False
if self._find_var(block_desc, x, is_forward).persistable():
return False
if self._find_var(block_desc, x,
is_forward).type() != core.VarDesc.VarType.LOD_TENSOR:
return False
if x in self._skip_opt:
return False
if not self._find_var(block_desc, x, is_forward).shape():
return False
return True
# TODO(panyx0718): This needs to be less hacky. It seems memory optimization
# doesn't consider vars copied between cpu and gpu.
def _update_skip_opt_set(self):
for i in range(self.op_size):
op = self._ops[i]
if op.has_attr("force_cpu") and op.attr("force_cpu") == True:
self._skip_opt.update(op.output_arg_names())
def release_memory(self, skip_opt_set=None):
self._dataflow_analyze()
self._update_skip_opt_set()
if skip_opt_set:
self._skip_opt.update(skip_opt_set)
fwd_id = 0
bwd_id = 0
for i in range(self.op_size):
op = self._ops[i]
if op.type() in SUB_BLOCK_OPS:
continue
block_desc = op.block()
is_forward = i < self._forward_num
in_diff, out_diff = self._get_diff(self._live_in[i],
self._live_out[i])
can_optimize = [
x for x in in_diff
if self._check_var_validity(block_desc, x, is_forward)
]
if can_optimize:
index = i + fwd_id + 1 if is_forward else i - self._forward_num + bwd_id + 1
delete_op = block_desc._insert_op(index)
delete_op.set_type("delete_var")
delete_op.set_input("X", can_optimize)
if is_forward:
fwd_id += 1
else:
bwd_id += 1
def memory_optimize(self, skip_opt_set=None, level=0):
def compare_shape(x_shape, cache_shape, opt_level):
if opt_level == 0:
return x_shape == cache_shape
elif opt_level == 1:
if (x_shape[0] == -1) ^ (cache_shape[0] == -1):
return False
x_size = abs(reduce(lambda x, y: x * y, x_shape))
cache_size = abs(reduce(lambda x, y: x * y, cache_shape))
if x_size <= cache_size:
return True
else:
raise ValueError("only support opt_level 0 or 1.")
return False
self._dataflow_analyze()
self._update_skip_opt_set()
# update skip set to meet users' demand
if skip_opt_set:
self._skip_opt.update(skip_opt_set)
counter = 0
for i in range(self.op_size):
op = self._ops[i]
if op.type() in SUB_BLOCK_OPS:
continue
block_desc = op.block()
is_forward = i < self._forward_num
if self.pool:
# NOTE: must sort the in_diff set for cases that get different cache var.
defs_can_optimize = [
x for x in self._defs[i]
if self._check_var_validity(block_desc, x, is_forward)
]
out_pair = [
(x, self._find_var(block_desc, x, is_forward).shape())
for x in defs_can_optimize
]
for x, x_shape in out_pair:
# If x is both in uses and defs, it can not be optimized!
if x in self._uses[i]:
continue
if x == FLAGS_memory_optimize:
print("start match var ", x, " of op ", op.type())
print(self.pool)
for index, cache_pair in enumerate(self.pool):
cache_var = cache_pair[0]
cache_shape = cache_pair[1]
if not self._has_var(block_desc, cache_var, is_forward):
if PRINT_LOG:
print("cache %s not exists!" %
(cpt.to_text(cache_var)))
continue
if x == cache_var:
if PRINT_LOG:
print("x : ", cpt.to_text(x), " cache : ",
cpt.to_text(cache_var), " is same var!")
break
x_dtype = self._find_var(block_desc, x,
is_forward).dtype()
cache_dtype = self._find_var(block_desc, cache_var,
is_forward).dtype()
if x_dtype != cache_dtype:
if PRINT_LOG:
print("x_dtype and cache_dtype are different")
continue
if not compare_shape(x_shape, cache_shape, level):
continue
# TODO(qijun): dtype_to_size[x_dtype] and dtype_to_size[cache_dtype]
if PRINT_LOG:
print(
("!!! %d, %s => %s, cache idx %d, pool size %d"
% (counter, x + str(x_shape),
cache_var + str(cache_shape), index,
len(self.pool))))
counter += 1
self.pool.pop(index)
# Rename the var to the cache var already with
# memory allocated in order to reuse the memory.
_rename_arg_(self._ops, x, cache_var, begin_idx=i)
self._program.block(block_desc.id).var(cpt.to_text(
x)).desc = self._find_var(block_desc, cache_var,
is_forward)
self._program.block(block_desc.id).vars[cpt.to_text(x)] = \
Variable(self._program.block(block_desc.id), name=cpt.to_text(x))
self._update_graph(x, cache_var, begin_idx=i)
break
self._fill_pool(i, is_forward)
def _process_sub_block_pair(pdesc, sub_block_pair):
"""Creates a list of tuple each of which tracks info of a subblock.
Note: this function doesn't handle nested subblocks yet.
TODO(panyx0718): assert if case nested subblocks happen.
:param pdesc: ProgramDesc.
:param sub_block_pair: A list op pairs. Each op pair is the forward
op and backward op. The ops in the list are special that they contain
a subblock of ops.
:return: A list of tuples, each tuple is (all ops in a subblock pair
including forward and backward, number of forward ops,
all output args names of the ops in the subblock pairs).
"""
ops_list = []
block_desc = pdesc.block(0)
op_size = block_desc.op_size()
for fwd_op, bwd_op in sub_block_pair:
sub_block_ids = []
grad_sub_block_ids = []
sub_block_id_pair = []
sub_op_dict = {}
for i in range(op_size):
op = block_desc.op(i)
if op.type() == fwd_op:
sub_block_ids.append(op.attr("sub_block").id)
sub_op_dict[op.attr("sub_block").id] = op
elif op.type() == bwd_op:
grad_sub_block_ids.append(op.attr("sub_block").id)
sub_op_dict[op.attr("sub_block").id] = op
# Find fwd_op/bwd_op block pair
for grad_id in grad_sub_block_ids:
fwd_id = pdesc.block(grad_id).get_forward_block_idx()
if fwd_id in sub_block_ids:
sub_block_id_pair.append((fwd_id, grad_id))
sub_block_ids.remove(fwd_id)
# Get fwd_op/bwd_op block ops
for fwd_id, grad_id in sub_block_id_pair:
sub_block_ops = []
sub_block = pdesc.block(fwd_id)
block_op_size = sub_block.op_size()
for i in range(block_op_size):
sub_block_ops.append(sub_block.op(i))
grad_sub_block = pdesc.block(grad_id)
grad_sub_block_op_size = grad_sub_block.op_size()
for i in range(grad_sub_block_op_size):
sub_block_ops.append(grad_sub_block.op(i))
sub_op_output = set()
sub_op_output.update(sub_op_dict[fwd_id].output_arg_names())
sub_op_output.update(sub_op_dict[grad_id].output_arg_names())
sub_op_output.update(sub_op_dict[fwd_id].input_arg_names())
sub_op_output.update(sub_op_dict[grad_id].input_arg_names())
ops_list.append((sub_block_ops, block_op_size, sub_op_output))
# Process rest fwd_op block ops
for fwd_id in sub_block_ids:
sub_block_ops = []
sub_block = pdesc.block(fwd_id)
sub_block_op_size = sub_block.op_size()
for i in range(sub_block_op_size):
sub_block_ops.append(sub_block.op(i))
sub_op_output = set()
sub_op_output.update(sub_op_dict[fwd_id].output_arg_names())
sub_op_output.update(sub_op_dict[fwd_id].input_arg_names())
ops_list.append((sub_block_ops, sub_block_op_size, sub_op_output))
return ops_list
def _get_cfgs(input_program):
"""Process each block and create ControlFlowGraph for each of them.
:param input_program: Program object.
:return: A list of ControlFlowGraph, each corresponds to a block.
"""
ops_list = []
pdesc = input_program._get_desc()
block_desc = pdesc.block(0)
op_size = block_desc.op_size()
# Only process one level of nested subblock.
ops_list.extend(_process_sub_block_pair(pdesc, SUB_BLOCK_PAIR))
skip_opt_set = set()
for _, _, skip_opt in ops_list:
skip_opt_set.update(skip_opt)
# Get global block ops
ops_list.insert(
0, ([block_desc.op(i) for i in range(op_size)], op_size, skip_opt_set))
cfgs = [
ControlFlowGraph(input_program, ops, forward_num, skip_opt)
for ops, forward_num, skip_opt in ops_list
]
return cfgs
def _is_opt_role_op(op):
op_maker = core.op_proto_and_checker_maker
optimize_role = core.op_proto_and_checker_maker.OpRole.Optimize
if op_maker.kOpRoleAttrName() in op.attr_names and \
int(op.all_attrs()[op_maker.kOpRoleAttrName()]) == int(optimize_role):
return True
def memory_optimize(input_program,
......@@ -554,49 +75,16 @@ def memory_optimize(input_program,
logging.warn(
'Caution! paddle.fluid.memory_optimize() is deprecated '
'and not maintained any more, since it is not stable!\n'
'Please use the newest and stable memory optimization strategies!\n'
' 1. Enable garbage collection strategy by exporting environment '
'variable FLAGS_eager_delete_tensor_gb=0\n'
' 2. Set build_strategy.enable_inplace=True (True is the default '
'value) when using CompiledProgram or ParallelExecutor.\n')
def to_name_str(var):
if isinstance(var, Variable):
return var.desc.name()
elif isinstance(var, str):
return var
elif isinstance(var, six.string_types):
return str(var)
else:
raise TypeError(str(var) + " should be Variable or str")
if level != 0 and level != 1:
raise ValueError("only support opt_level 0 or 1.")
if skip_opt_set is not None:
if isinstance(skip_opt_set, set) or isinstance(skip_opt_set, list):
skip_opt_set = set(skip_opt_set)
else:
raise ValueError("only support skip_opt_set as set.")
global PRINT_LOG
PRINT_LOG = print_log
if skip_grads:
grad_set = set()
OP_ROLE_VAR = core.op_proto_and_checker_maker.kOpRoleVarAttrName()
for op in input_program.global_block().ops:
if _is_opt_role_op(op):
if op.attr(OP_ROLE_VAR):
grad_name = op.attr(OP_ROLE_VAR)[1]
grad_set.add(grad_name)
if not skip_opt_set:
skip_opt_set = grad_set
else:
skip_opt_set.update(grad_set)
if skip_opt_set is not None:
skip_opt_set = set(map(to_name_str, skip_opt_set))
cfgs = _get_cfgs(input_program)
input_program._is_mem_optimized = True
for cfg in cfgs:
cfg.memory_optimize(skip_opt_set=skip_opt_set, level=level)
'This API would not take any memory optimizations on your Program '
'now, since we have provided default strategies for you.\n'
'The newest and stable memory optimization strategies (they are all '
'enabled by default) are as follows:\n'
' 1. Garbage collection strategy, which is enabled by exporting '
'environment variable FLAGS_eager_delete_tensor_gb=0 (0 is the '
'default value).\n'
' 2. Inplace strategy, which is enabled by setting '
'build_strategy.enable_inplace=True (True is the default value) '
'when using CompiledProgram or ParallelExecutor.\n')
def release_memory(input_program, skip_opt_set=None):
......@@ -625,7 +113,5 @@ def release_memory(input_program, skip_opt_set=None):
fluid.release_memory(fluid.default_main_program())
"""
cfgs = _get_cfgs(input_program)
input_program._is_mem_optimized = True
for cfg in cfgs:
cfg.release_memory(skip_opt_set=skip_opt_set)
logging.warn('paddle.fluid.release_memory() is deprecated, it would not'
' take any memory release on your program')
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册