memory_optimization_transpiler.py 18.0 KB
Newer Older
1
#   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
D
dzhwinter 已提交
2
#
D
dzhwinter 已提交
3 4 5
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
D
dzhwinter 已提交
6
#
D
dzhwinter 已提交
7
#     http://www.apache.org/licenses/LICENSE-2.0
D
dzhwinter 已提交
8
#
D
dzhwinter 已提交
9 10 11 12 13 14
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

15 16
from __future__ import print_function

17
from collections import defaultdict
18
from .. import core
M
minqiyang 已提交
19
from ... import compat as cpt
W
Wu Yi 已提交
20
from ..framework import Program, default_main_program, Parameter
21
from ..backward import _rename_arg_
22
from functools import reduce
23
from six.moves import range
24 25

dtype_to_size = {
26 27 28 29 30 31
    core.VarDesc.VarType.FP16: 2,
    core.VarDesc.VarType.FP32: 4,
    core.VarDesc.VarType.FP64: 8,
    core.VarDesc.VarType.INT16: 2,
    core.VarDesc.VarType.INT32: 4,
    core.VarDesc.VarType.INT64: 8,
32 33
    core.VarDesc.VarType.BOOL: 1,
    core.VarDesc.VarType.UINT8: 1,
34
}
35

36
SUB_BLOCK_OPS = [
37 38 39
    "while", "while_grad", "parallel_do", "parallel_do_grad",
    "conditional_block", "conditional_block_grad"
]
40

41 42 43
SUB_BLOCK_PAIR = [("while", "while_grad"), ("parallel_do", "parallel_do_grad"),
                  ("conditional_block", "conditional_block_grad")]

Q
qiaolongfei 已提交
44 45
PRINT_LOG = False

46 47

class ControlFlowGraph(object):
48 49
    def __init__(self, program, ops, forward_num, skip_opt):
        self._program = program
50 51 52 53
        self._ops = ops
        self._forward_num = forward_num
        self._successors = defaultdict(set)
        self._presuccessors = defaultdict(set)
54 55 56 57
        self._uses = defaultdict(set)
        self._defs = defaultdict(set)
        self._live_in = defaultdict(set)
        self._live_out = defaultdict(set)
58
        self._skip_opt = skip_opt
59 60

    def _add_connections(self, connections):
61
        """Populates _successors and _presuccessors for two neighbor nodes."""
62 63 64 65
        for node1, node2 in connections:
            self._add(node1, node2)

    def _add(self, node1, node2):
66 67
        self._successors[node1].add(node2)
        self._presuccessors[node2].add(node1)
68

69 70
    # TODO(panyx0718): We need to have a unified way of building intermediate
    # representation.
71
    def _build_graph(self):
72 73
        """Build a graph based on op sequence.
        """
74
        self.op_size = len(self._ops)
75 76 77
        op_node_connections = [(i, i + 1) for i in range(self.op_size - 1)]
        self._add_connections(op_node_connections)
        for i in range(self.op_size):
78 79
            self._uses[i].update(self._ops[i].input_arg_names())
            self._defs[i].update(self._ops[i].output_arg_names())
D
dzhwinter 已提交
80 81 82
            self._live_in[i] = self._uses[i]
        # print(self._successors)
        # print(self._presuccessors)
83

84 85 86 87 88 89 90 91
    def _update_graph(self, old_name, new_name, begin_idx=0):
        for i in range(begin_idx, self.op_size):
            if old_name in self._uses[i]:
                self._uses[i].remove(old_name)
                self._uses[i].add(new_name)
            if old_name in self._defs[i]:
                self._defs[i].remove(old_name)
                self._defs[i].add(new_name)
D
dzhwinter 已提交
92
        # for i in range(begin_idx, -1, -1):
93 94
            if old_name in self._live_in[i]:
                self._live_in[i].remove(old_name)
D
dzhwinter 已提交
95 96 97 98
                self._live_in[i].add(new_name)
                # if old_name == "concat_3.tmp_0@GRAD":
                #     print("new_name", new_name)
                #     print("live_in ", i , self._live_in[i])
99 100 101
            if old_name in self._live_out[i]:
                self._live_out[i].remove(old_name)
                self._live_out[i].add(new_name)
D
dzhwinter 已提交
102 103
                # if old_name == "concat_3.tmp_0@GRAD":
                #     print("live_out ", i , self._live_out[i])
104

105
    def _reach_fixed_point(self, live_in, live_out):
106
        """Check if the liveness set has stablized."""
107 108 109 110 111
        if len(live_in) != len(self._live_in):
            return False
        if len(live_out) != len(self._live_out):
            return False
        for i in range(self.op_size):
112 113
            if (live_in[i] != self._live_in[i] or
                    live_out[i] != self._live_out[i]):
114 115 116
                return False
        return True

D
dzhwinter 已提交
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
    # def _dataflow_analyze(self):
    #     self._build_graph()
    #     live_in = defaultdict(set)
    #     live_out = defaultdict(set)
    #     # Repeatedly apply liveness updates until the algorithm stablize
    #     # on a complete set live input vars and live output vars.
    #     counter = 0
    #     print(self._successors)
    #     while True:
    #         counter += 1
    #         for i in reversed(list(range(self.op_size))):
    #             live_in[i] = set(self._live_in[i])
    #             live_out[i] = set(self._live_out[i])
    #             for s in self._successors[i]:
    #                 self._live_out[i] |= self._live_in[s]
    #             self._live_in[i] = self._uses[i] | (
    #                 self._live_out[i] - self._defs[i])
    #         if self._reach_fixed_point(live_in, live_out):
    #             break

137 138 139
    def _dataflow_analyze(self):
        self._build_graph()
        live_in = defaultdict(set)
D
dzhwinter 已提交
140 141 142 143 144 145 146 147 148 149 150
        worklist = list(range(len(self._ops) - 1, -1, -1))
        while worklist:
            i = worklist.pop(0)
            live_in[i] = set(self._live_in[i])
            for s in self._successors[i]:
                self._live_out[i] |= self._live_in[s]
            self._live_in[i] = self._uses[i] | (
                self._live_out[i] - self._defs[i])
            if live_in[i] != self._live_in[i]:
                for d in self._presuccessors[i]:
                    worklist.append(d)
151 152 153 154 155

    def _get_diff(self, a, b):
        u = a & b
        return a - u, b - u

156 157
    def _has_var(self, block_desc, var_name, is_forward):
        if is_forward:
M
minqiyang 已提交
158
            return block_desc.has_var(cpt.to_bytes(var_name))
159
        else:
M
minqiyang 已提交
160
            return block_desc.has_var_recursive(cpt.to_bytes(var_name))
161 162 163

    def _find_var(self, block_desc, var_name, is_forward):
        if is_forward:
M
minqiyang 已提交
164
            return block_desc.find_var(cpt.to_bytes(var_name))
165
        else:
M
minqiyang 已提交
166
            return block_desc.find_var_recursive(cpt.to_bytes(var_name))
167

168 169 170 171 172 173 174 175 176 177 178 179 180 181 182
    def _check_var_validity(self, block_desc, x, is_forward):
        if str(x) == "@EMPTY@":
            return False
        if not self._has_var(block_desc, x, is_forward):
            return False
        if self._find_var(block_desc, x, is_forward).persistable():
            return False
        if self._find_var(block_desc, x,
                          is_forward).type() != core.VarDesc.VarType.LOD_TENSOR:
            return False
        if x in self._skip_opt:
            return False
        if not self._find_var(block_desc, x, is_forward).shape():
            return False
        return True
183

184 185
    # TODO(panyx0718): This needs to be less hacky. It seems memory optimization
    # doesn't consider vars copied between cpu and gpu.
186 187 188 189 190 191
    def _update_skip_opt_set(self):
        for i in range(self.op_size):
            op = self._ops[i]
            if op.type() == "fill_constant" and op.attr("force_cpu") == True:
                self._skip_opt.update(op.output_arg_names())

192
    def release_memory(self, skip_opt_set=None):
193
        self._dataflow_analyze()
194
        self._update_skip_opt_set()
195 196
        if skip_opt_set:
            self._skip_opt.update(skip_opt_set)
197 198 199 200
        fwd_id = 0
        bwd_id = 0
        for i in range(self.op_size):
            op = self._ops[i]
201
            if op.type() in SUB_BLOCK_OPS:
202 203 204 205 206
                continue
            block_desc = op.block()
            is_forward = i < self._forward_num
            in_diff, out_diff = self._get_diff(self._live_in[i],
                                               self._live_out[i])
207 208 209 210
            can_optimize = [
                x for x in in_diff
                if self._check_var_validity(block_desc, x, is_forward)
            ]
211 212
            if can_optimize:
                index = i + fwd_id + 1 if is_forward else i - self._forward_num + bwd_id + 1
W
Wu Yi 已提交
213
                delete_op = block_desc._insert_op(index)
214 215 216 217 218 219 220
                delete_op.set_type("delete_var")
                delete_op.set_input("X", can_optimize)
                if is_forward:
                    fwd_id += 1
                else:
                    bwd_id += 1

221
    def memory_optimize(self, skip_opt_set=None, level=0):
222 223 224
        def compare_shape(x_shape, cache_shape, opt_level):
            if opt_level == 0:
                return x_shape == cache_shape
225
            elif opt_level == 1:
226 227 228 229 230 231
                if (x_shape[0] == -1) ^ (cache_shape[0] == -1):
                    return False
                x_size = abs(reduce(lambda x, y: x * y, x_shape))
                cache_size = abs(reduce(lambda x, y: x * y, cache_shape))
                if x_size <= cache_size:
                    return True
232 233
            else:
                raise ValueError("only support opt_level 0 or 1.")
234 235 236 237
            return False

        self._dataflow_analyze()
        self._update_skip_opt_set()
238 239 240
        # update skip set to meet users' demand
        if skip_opt_set:
            self._skip_opt.update(skip_opt_set)
241 242
        self.pool = []
        for i in range(self.op_size):
243
            op = self._ops[i]
244
            if op.type() in SUB_BLOCK_OPS:
245 246 247
                continue
            block_desc = op.block()
            is_forward = i < self._forward_num
D
dzhwinter 已提交
248 249 250 251 252 253 254 255 256 257 258
            in_diff, _ = self._get_diff(self._live_in[i], self._live_out[i])
            can_optimize = [
                x for x in in_diff
                if self._check_var_validity(block_desc, x, is_forward)
            ]
            if can_optimize:
                for var_name in can_optimize:
                    self.pool.append((var_name, self._find_var(
                        block_desc, var_name, is_forward).shape()))
            # print(op.type(), i, self.pool)
            # print(self._live_in[i])
259
            if self.pool:
260 261 262 263
                defs_can_optimize = [
                    x for x in self._defs[i]
                    if self._check_var_validity(block_desc, x, is_forward)
                ]
264 265 266 267
                out_pair = [
                    (x, self._find_var(block_desc, x, is_forward).shape())
                    for x in defs_can_optimize
                ]
268
                for x, x_shape in out_pair:
269 270 271
                    # If x is both in uses and defs, it can not be optimized!
                    if x in self._uses[i]:
                        continue
272 273 274
                    for index, cache_pair in enumerate(self.pool):
                        cache_var = cache_pair[0]
                        cache_shape = cache_pair[1]
275 276 277 278 279 280 281 282 283 284 285 286 287 288 289
                        if not compare_shape(x_shape, cache_shape, level):
                            continue

                        if not self._has_var(block_desc, cache_var, is_forward):
                            continue

                        x_dtype = self._find_var(block_desc, x,
                                                 is_forward).dtype()
                        cache_dtype = self._find_var(block_desc, cache_var,
                                                     is_forward).dtype()
                        # TODO(qijun): actually, we should compare
                        # dtype_to_size[x_dtype] and dtype_to_size[cache_dtype]
                        if x_dtype != cache_dtype:
                            continue

D
dzhwinter 已提交
290 291 292 293
                        self.pool.pop(index)
                        if x == cache_var:
                            break

294
                        if PRINT_LOG:
295 296 297 298 299
                            print(("Hit Cache !!!! cache pool index "
                                   "is %d, var name is %s, "
                                   "cached var name is %s, "
                                   "var shape is %s ") % (index, x, cache_var,
                                                          str(cache_shape)))
300 301 302
                        # Rename the var to the cache var already with
                        # memory allocated in order to reuse the memory.
                        _rename_arg_(self._ops, x, cache_var, begin_idx=i)
M
minqiyang 已提交
303 304 305
                        self._program.block(block_desc.id).var(cpt.to_text(
                            x)).desc = self._find_var(block_desc, cache_var,
                                                      is_forward)
D
dzhwinter 已提交
306 307
                        if x == "concat_3.tmp_0@GRAD":
                            print("Update Graph", i)
308 309 310 311
                        self._update_graph(x, cache_var, begin_idx=i)
                        break

            in_diff, _ = self._get_diff(self._live_in[i], self._live_out[i])
312 313 314 315
            can_optimize = [
                x for x in in_diff
                if self._check_var_validity(block_desc, x, is_forward)
            ]
D
dzhwinter 已提交
316
            keys = set([key for key,shape in self.pool])
317 318
            if can_optimize:
                for var_name in can_optimize:
D
dzhwinter 已提交
319 320 321 322
                    if var_name not in keys:
                        self.pool.append((var_name, self._find_var(
                            block_desc, var_name, is_forward).shape()))
            # print(op.type(), i, self.pool)
323 324


325
def _process_sub_block_pair(pdesc, sub_block_pair):
326 327 328 329 330 331 332 333 334 335 336 337 338
    """Creates a list of tuple each of which tracks info of a subblock.

      Note: this function doesn't handle nested subblocks yet.
      TODO(panyx0718): assert if case nested subblocks happen.

    :param pdesc: ProgramDesc.
    :param sub_block_pair: A list op pairs. Each op pair is the forward
        op and backward op. The ops in the list are special that they contain
        a subblock of ops.
    :return: A list of tuples, each tuple is (all ops in a subblock pair
        including forward and backward, number of forward ops,
        all output args names of the ops in the subblock pairs).
    """
339 340 341
    ops_list = []
    block_desc = pdesc.block(0)
    op_size = block_desc.op_size()
342 343 344 345 346 347 348 349 350 351 352 353 354
    for fwd_op, bwd_op in sub_block_pair:
        sub_block_ids = []
        grad_sub_block_ids = []
        sub_block_id_pair = []
        sub_op_dict = {}
        for i in range(op_size):
            op = block_desc.op(i)
            if op.type() == fwd_op:
                sub_block_ids.append(op.attr("sub_block").id)
                sub_op_dict[op.attr("sub_block").id] = op
            elif op.type() == bwd_op:
                grad_sub_block_ids.append(op.attr("sub_block").id)
                sub_op_dict[op.attr("sub_block").id] = op
355

356 357
        # Find fwd_op/bwd_op block pair
        for grad_id in grad_sub_block_ids:
Q
qijun 已提交
358 359 360 361
            fwd_id = pdesc.block(grad_id).get_forward_block_idx()
            if fwd_id in sub_block_ids:
                sub_block_id_pair.append((fwd_id, grad_id))
                sub_block_ids.remove(fwd_id)
362

363
        # Get fwd_op/bwd_op block ops
Q
qijun 已提交
364
        for fwd_id, grad_id in sub_block_id_pair:
365
            sub_block_ops = []
Q
qijun 已提交
366
            sub_block = pdesc.block(fwd_id)
367 368 369
            block_op_size = sub_block.op_size()
            for i in range(block_op_size):
                sub_block_ops.append(sub_block.op(i))
370

371 372 373 374
            grad_sub_block = pdesc.block(grad_id)
            grad_sub_block_op_size = grad_sub_block.op_size()
            for i in range(grad_sub_block_op_size):
                sub_block_ops.append(grad_sub_block.op(i))
375

376
            sub_op_output = set()
Q
qijun 已提交
377
            sub_op_output.update(sub_op_dict[fwd_id].output_arg_names())
378
            sub_op_output.update(sub_op_dict[grad_id].output_arg_names())
379 380
            sub_op_output.update(sub_op_dict[fwd_id].input_arg_names())
            sub_op_output.update(sub_op_dict[grad_id].input_arg_names())
381
            ops_list.append((sub_block_ops, block_op_size, sub_op_output))
382

383
        # Process rest fwd_op block ops
Q
qijun 已提交
384
        for fwd_id in sub_block_ids:
385
            sub_block_ops = []
Q
qijun 已提交
386
            sub_block = pdesc.block(fwd_id)
387 388 389 390
            sub_block_op_size = sub_block.op_size()
            for i in range(sub_block_op_size):
                sub_block_ops.append(sub_block.op(i))
            sub_op_output = set()
Q
qijun 已提交
391
            sub_op_output.update(sub_op_dict[fwd_id].output_arg_names())
392
            sub_op_output.update(sub_op_dict[fwd_id].input_arg_names())
393 394
            ops_list.append((sub_block_ops, sub_block_op_size, sub_op_output))
    return ops_list
395

396

397
def _get_cfgs(input_program):
398 399 400 401 402
    """Process each block and create ControlFlowGraph for each of them.

    :param input_program: Program object.
    :return: A list of ControlFlowGraph, each corresponds to a block.
    """
403 404 405 406
    ops_list = []
    pdesc = input_program.get_desc()
    block_desc = pdesc.block(0)
    op_size = block_desc.op_size()
407

408 409
    # Only process one level of nested subblock.
    ops_list.extend(_process_sub_block_pair(pdesc, SUB_BLOCK_PAIR))
410

411 412 413 414 415 416 417
    skip_opt_set = set()
    for _, _, skip_opt in ops_list:
        skip_opt_set.update(skip_opt)

    # Get global block ops
    ops_list.insert(
        0, ([block_desc.op(i) for i in range(op_size)], op_size, skip_opt_set))
418 419 420 421
    cfgs = [
        ControlFlowGraph(input_program, ops, forward_num, skip_opt)
        for ops, forward_num, skip_opt in ops_list
    ]
422
    return cfgs
423 424


425
def memory_optimize(input_program, skip_opt_set=None, print_log=False, level=0):
426 427 428 429 430 431 432 433 434 435 436
    """Optimize memory by reusing var memory.

      Note: it doesn't not support subblock nested in subblock.

    :param input_program: Input Program
    :param print_log: whether to print debug log.
    :param level: If level=0, reuse if the shape is completely equal, o
    :return:
    """
    if level != 0 and level != 1:
        raise ValueError("only support opt_level 0 or 1.")
Q
qiaolongfei 已提交
437 438
    global PRINT_LOG
    PRINT_LOG = print_log
439
    cfgs = _get_cfgs(input_program)
440
    for cfg in cfgs:
441
        cfg.memory_optimize(skip_opt_set=skip_opt_set, level=level)
442 443


444
def release_memory(input_program, skip_opt_set=None):
Y
yuyang18 已提交
445 446 447 448 449 450 451 452 453 454
    """
    Modify the input program and insert :code:`delete_op` to early drop not used
    variables. The modification will be performed inplace.

    Notes: This is an experimental API and could be removed in next few
    releases. Users should not use this API.

    Args:
        input_program(Program): The program will be inserted :code:`delete_op`.
    """
455 456
    cfgs = _get_cfgs(input_program)
    for cfg in cfgs:
457
        cfg.release_memory(skip_opt_set=skip_opt_set)