# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from __future__ import print_function import six from collections import defaultdict, MutableSet from .. import core from ... import compat as cpt from ..framework import Program, default_main_program, Parameter, Variable, core from ..backward import _rename_arg_ from functools import reduce from six.moves import range dtype_to_size = { core.VarDesc.VarType.FP16: 2, core.VarDesc.VarType.FP32: 4, core.VarDesc.VarType.FP64: 8, core.VarDesc.VarType.INT16: 2, core.VarDesc.VarType.INT32: 4, core.VarDesc.VarType.INT64: 8, core.VarDesc.VarType.BOOL: 1, core.VarDesc.VarType.UINT8: 1, } SUB_BLOCK_OPS = [ "while", "while_grad", "conditional_block", "conditional_block_grad" ] SUB_BLOCK_PAIR = [("while", "while_grad"), ("conditional_block", "conditional_block_grad")] PRINT_LOG = False FLAGS_memory_optimize = "" class OrderedSet(MutableSet): def __init__(self, iterable=None): self.end = end = [] end += [None, end, end] # sentinel node for doubly linked list self.map = {} # key --> [key, prev, next] if iterable is not None: self |= iterable def __len__(self): return len(self.map) def __contains__(self, key): return key in self.map def add(self, key): if key not in self.map: end = self.end curr = end[1] curr[2] = end[1] = self.map[key] = [key, curr, end] def update(self, other): for e in other: self.add(e) def discard(self, key): if key in self.map: key, prev, next = self.map.pop(key) prev[2] = next next[1] = prev def remove(self, key): self.discard(key) def __iter__(self): end = self.end curr = end[2] while curr is not end: yield curr[0] curr = curr[2] def __reversed__(self): end = self.end curr = end[1] while curr is not end: yield curr[0] curr = curr[1] def pop(self, last=True): if not self: raise KeyError('set is empty') key = self.end[1][0] if last else self.end[2][0] self.discard(key) return key def __repr__(self): if not self: return '%s()' % (self.__class__.__name__, ) return '%s(%r)' % (self.__class__.__name__, list(self)) def __eq__(self, other): if isinstance(other, OrderedSet): return len(self) == len(other) and list(self) == list(other) return set(self) == set(other) class ControlFlowGraph(object): def __init__(self, program, ops, forward_num, skip_opt): self._program = program self._ops = ops self._forward_num = forward_num self._successors = defaultdict(OrderedSet) self._presuccessors = defaultdict(OrderedSet) self._uses = defaultdict(OrderedSet) self._defs = defaultdict(OrderedSet) self._live_in = defaultdict(OrderedSet) self._live_out = defaultdict(OrderedSet) self._skip_opt = skip_opt self.pool = [] def _add_connections(self, connections): """Populates _successors and _presuccessors for two neighbor nodes.""" for node1, node2 in connections: self._add(node1, node2) def _add(self, node1, node2): self._successors[node1].add(node2) self._presuccessors[node2].add(node1) # TODO(panyx0718): We need to have a unified way of building intermediate # representation. def _build_graph(self): """Build a graph based on op sequence. """ self.op_size = len(self._ops) op_node_connections = [(i, i + 1) for i in range(self.op_size - 1)] self._add_connections(op_node_connections) for i in range(self.op_size): self._uses[i].update(self._ops[i].input_arg_names()) self._defs[i].update(self._ops[i].output_arg_names()) def _update_graph(self, old_name, new_name, begin_idx=0): for i in range(begin_idx, self.op_size): if old_name in self._uses[i]: self._uses[i].remove(old_name) self._uses[i].add(new_name) if old_name in self._defs[i]: self._defs[i].remove(old_name) self._defs[i].add(new_name) if old_name in self._live_in[i]: self._live_in[i].remove(old_name) self._live_in[i].add(new_name) if old_name in self._live_out[i]: self._live_out[i].remove(old_name) self._live_out[i].add(new_name) def _dataflow_analyze(self): self._build_graph() live_in = defaultdict(set) worklist = list(range(len(self._ops) - 1, -1, -1)) while worklist: i = worklist.pop(0) live_in[i] = set(self._live_in[i]) for s in self._successors[i]: self._live_out[i] |= self._live_in[s] self._live_in[i] = self._uses[i] | ( self._live_out[i] - self._defs[i]) if live_in[i] != set(self._live_in[i]): for d in self._presuccessors[i]: worklist.append(d) def _fill_pool(self, i, is_forward): def comparator(x, cache): x_shape = x[1] cache_shape = cache[1] x_size = abs(reduce(lambda x, y: x * y, x_shape)) cache_size = abs(reduce(lambda x, y: x * y, cache_shape)) if (x_shape[0] == -1 and cache_shape[0] == -1) or \ (x_shape[0] != -1 and cache_shape[0] != -1) : return x_size <= cache_size else: return False def find_var_in_block(x): known_vars = set() for op in self._ops: known_vars.update(op.output_arg_names()) return x in known_vars block_desc = self._ops[i].block() in_diff, _ = self._get_diff(self._live_in[i], self._live_out[i]) # NOTE: must sort the in_diff set for cases that get different cache var. # FIXME(typhoonzero): maybe use a "sorted set" is better than this. can_optimize = [ x for x in sorted(in_diff) if self._check_var_validity(block_desc, x, is_forward) ] if can_optimize: for var_name in can_optimize: cache = (var_name, self._find_var(block_desc, var_name, is_forward).shape()) if cache not in self.pool and find_var_in_block(var_name): i = 0 while i < len(self.pool): mycache = self.pool[i] mysize = mycache[1][0] cache_size = cache[1][0] if (mysize == -1 and cache_size == -1) or \ (mysize != -1 and cache_size != -1): if comparator(mycache, cache): i += 1 else: break elif mysize == -1 and cache_size != -1: i += 1 elif mysize != -1 and cache_size == -1: break self.pool.insert(i, cache) def _get_diff(self, a, b): u = a & b return a - u, b - u def _has_var(self, block_desc, var_name, is_forward): if is_forward: return block_desc.has_var(cpt.to_bytes(var_name)) else: return block_desc.has_var_recursive(cpt.to_bytes(var_name)) def _find_var(self, block_desc, var_name, is_forward): if is_forward: return block_desc.find_var(cpt.to_bytes(var_name)) else: return block_desc.find_var_recursive(cpt.to_bytes(var_name)) def _check_var_validity(self, block_desc, x, is_forward): if str(x) == "@EMPTY@": return False if not self._has_var(block_desc, x, is_forward): return False if self._find_var(block_desc, x, is_forward).persistable(): return False if self._find_var(block_desc, x, is_forward).type() != core.VarDesc.VarType.LOD_TENSOR: return False if x in self._skip_opt: return False if not self._find_var(block_desc, x, is_forward).shape(): return False return True # TODO(panyx0718): This needs to be less hacky. It seems memory optimization # doesn't consider vars copied between cpu and gpu. def _update_skip_opt_set(self): for i in range(self.op_size): op = self._ops[i] if op.has_attr("force_cpu") and op.attr("force_cpu") == True: self._skip_opt.update(op.output_arg_names()) def release_memory(self, skip_opt_set=None): self._dataflow_analyze() self._update_skip_opt_set() if skip_opt_set: self._skip_opt.update(skip_opt_set) fwd_id = 0 bwd_id = 0 for i in range(self.op_size): op = self._ops[i] if op.type() in SUB_BLOCK_OPS: continue block_desc = op.block() is_forward = i < self._forward_num in_diff, out_diff = self._get_diff(self._live_in[i], self._live_out[i]) can_optimize = [ x for x in in_diff if self._check_var_validity(block_desc, x, is_forward) ] if can_optimize: index = i + fwd_id + 1 if is_forward else i - self._forward_num + bwd_id + 1 delete_op = block_desc._insert_op(index) delete_op.set_type("delete_var") delete_op.set_input("X", can_optimize) if is_forward: fwd_id += 1 else: bwd_id += 1 def memory_optimize(self, skip_opt_set=None, level=0): def compare_shape(x_shape, cache_shape, opt_level): if opt_level == 0: return x_shape == cache_shape elif opt_level == 1: if (x_shape[0] == -1) ^ (cache_shape[0] == -1): return False x_size = abs(reduce(lambda x, y: x * y, x_shape)) cache_size = abs(reduce(lambda x, y: x * y, cache_shape)) if x_size <= cache_size: return True else: raise ValueError("only support opt_level 0 or 1.") return False self._dataflow_analyze() self._update_skip_opt_set() # update skip set to meet users' demand if skip_opt_set: self._skip_opt.update(skip_opt_set) counter = 0 for i in range(self.op_size): op = self._ops[i] if op.type() in SUB_BLOCK_OPS: continue block_desc = op.block() is_forward = i < self._forward_num if self.pool: # NOTE: must sort the in_diff set for cases that get different cache var. defs_can_optimize = [ x for x in self._defs[i] if self._check_var_validity(block_desc, x, is_forward) ] out_pair = [ (x, self._find_var(block_desc, x, is_forward).shape()) for x in defs_can_optimize ] for x, x_shape in out_pair: # If x is both in uses and defs, it can not be optimized! if x in self._uses[i]: continue if x == FLAGS_memory_optimize: print("start match var ", x, " of op ", op.type()) print(self.pool) for index, cache_pair in enumerate(self.pool): cache_var = cache_pair[0] cache_shape = cache_pair[1] if not self._has_var(block_desc, cache_var, is_forward): if PRINT_LOG: print("cache %s not exists!" % (cpt.to_text(cache_var))) continue if x == cache_var: if PRINT_LOG: print("x : ", cpt.to_text(x), " cache : ", cpt.to_text(cache_var), " is same var!") break x_dtype = self._find_var(block_desc, x, is_forward).dtype() cache_dtype = self._find_var(block_desc, cache_var, is_forward).dtype() if x_dtype != cache_dtype: if PRINT_LOG: print("x_dtype and cache_dtype are different!") continue if not compare_shape(x_shape, cache_shape, level): continue # TODO(qijun): dtype_to_size[x_dtype] and dtype_to_size[cache_dtype] if PRINT_LOG: print( ("!!! %d, %s => %s, cache idx %d, pool size %d" % (counter, x + str(x_shape), cache_var + str(cache_shape), index, len(self.pool)))) counter += 1 self.pool.pop(index) # Rename the var to the cache var already with # memory allocated in order to reuse the memory. _rename_arg_(self._ops, x, cache_var, begin_idx=i) self._program.block(block_desc.id).var(cpt.to_text( x)).desc = self._find_var(block_desc, cache_var, is_forward) self._program.block(block_desc.id).vars[cpt.to_text(x)] = \ Variable(self._program.block(block_desc.id), name=cpt.to_text(x)) self._update_graph(x, cache_var, begin_idx=i) break self._fill_pool(i, is_forward) def _process_sub_block_pair(pdesc, sub_block_pair): """Creates a list of tuple each of which tracks info of a subblock. Note: this function doesn't handle nested subblocks yet. TODO(panyx0718): assert if case nested subblocks happen. :param pdesc: ProgramDesc. :param sub_block_pair: A list op pairs. Each op pair is the forward op and backward op. The ops in the list are special that they contain a subblock of ops. :return: A list of tuples, each tuple is (all ops in a subblock pair including forward and backward, number of forward ops, all output args names of the ops in the subblock pairs). """ ops_list = [] block_desc = pdesc.block(0) op_size = block_desc.op_size() for fwd_op, bwd_op in sub_block_pair: sub_block_ids = [] grad_sub_block_ids = [] sub_block_id_pair = [] sub_op_dict = {} for i in range(op_size): op = block_desc.op(i) if op.type() == fwd_op: sub_block_ids.append(op.attr("sub_block").id) sub_op_dict[op.attr("sub_block").id] = op elif op.type() == bwd_op: grad_sub_block_ids.append(op.attr("sub_block").id) sub_op_dict[op.attr("sub_block").id] = op # Find fwd_op/bwd_op block pair for grad_id in grad_sub_block_ids: fwd_id = pdesc.block(grad_id).get_forward_block_idx() if fwd_id in sub_block_ids: sub_block_id_pair.append((fwd_id, grad_id)) sub_block_ids.remove(fwd_id) # Get fwd_op/bwd_op block ops for fwd_id, grad_id in sub_block_id_pair: sub_block_ops = [] sub_block = pdesc.block(fwd_id) block_op_size = sub_block.op_size() for i in range(block_op_size): sub_block_ops.append(sub_block.op(i)) grad_sub_block = pdesc.block(grad_id) grad_sub_block_op_size = grad_sub_block.op_size() for i in range(grad_sub_block_op_size): sub_block_ops.append(grad_sub_block.op(i)) sub_op_output = set() sub_op_output.update(sub_op_dict[fwd_id].output_arg_names()) sub_op_output.update(sub_op_dict[grad_id].output_arg_names()) sub_op_output.update(sub_op_dict[fwd_id].input_arg_names()) sub_op_output.update(sub_op_dict[grad_id].input_arg_names()) ops_list.append((sub_block_ops, block_op_size, sub_op_output)) # Process rest fwd_op block ops for fwd_id in sub_block_ids: sub_block_ops = [] sub_block = pdesc.block(fwd_id) sub_block_op_size = sub_block.op_size() for i in range(sub_block_op_size): sub_block_ops.append(sub_block.op(i)) sub_op_output = set() sub_op_output.update(sub_op_dict[fwd_id].output_arg_names()) sub_op_output.update(sub_op_dict[fwd_id].input_arg_names()) ops_list.append((sub_block_ops, sub_block_op_size, sub_op_output)) return ops_list def _get_cfgs(input_program): """Process each block and create ControlFlowGraph for each of them. :param input_program: Program object. :return: A list of ControlFlowGraph, each corresponds to a block. """ ops_list = [] pdesc = input_program._get_desc() block_desc = pdesc.block(0) op_size = block_desc.op_size() # Only process one level of nested subblock. ops_list.extend(_process_sub_block_pair(pdesc, SUB_BLOCK_PAIR)) skip_opt_set = set() for _, _, skip_opt in ops_list: skip_opt_set.update(skip_opt) # Get global block ops ops_list.insert( 0, ([block_desc.op(i) for i in range(op_size)], op_size, skip_opt_set)) cfgs = [ ControlFlowGraph(input_program, ops, forward_num, skip_opt) for ops, forward_num, skip_opt in ops_list ] return cfgs def _is_opt_role_op(op): op_maker = core.op_proto_and_checker_maker optimize_role = core.op_proto_and_checker_maker.OpRole.Optimize if op_maker.kOpRoleAttrName() in op.attr_names and \ int(op.all_attrs()[op_maker.kOpRoleAttrName()]) == int(optimize_role): return True def memory_optimize(input_program, skip_opt_set=None, print_log=False, level=0, skip_grads=False): """Optimize memory by reusing var memory. Note: it doesn't not support subblock nested in subblock. Args: input_program(str): Input Program skip_opt_set(set): vars wil be skipped in memory optimze print_log(bool): whether to print debug log. level(int): If level=0, reuse if the shape is completely equal, o Returns: None """ def to_name_str(var): if isinstance(var, Variable): return var.desc.name() elif isinstance(var, str): return var elif isinstance(var, six.string_types): return str(var) else: raise TypeError(str(var) + " should be Variable or str") if level != 0 and level != 1: raise ValueError("only support opt_level 0 or 1.") if skip_opt_set is not None: if isinstance(skip_opt_set, set) or isinstance(skip_opt_set, list): skip_opt_set = set(skip_opt_set) else: raise ValueError("only support skip_opt_set as set.") global PRINT_LOG PRINT_LOG = print_log if skip_grads: grad_set = set() OP_ROLE_VAR = core.op_proto_and_checker_maker.kOpRoleVarAttrName() for op in input_program.global_block().ops: if _is_opt_role_op(op): if op.attr(OP_ROLE_VAR): grad_name = op.attr(OP_ROLE_VAR)[1] grad_set.add(grad_name) if not skip_opt_set: skip_opt_set = grad_set else: skip_opt_set.update(grad_set) if skip_opt_set is not None: skip_opt_set = set(map(to_name_str, skip_opt_set)) cfgs = _get_cfgs(input_program) input_program._is_mem_optimized = True for cfg in cfgs: cfg.memory_optimize(skip_opt_set=skip_opt_set, level=level) def release_memory(input_program, skip_opt_set=None): """ Modify the input program and insert :code:`delete_op` to early drop not used variables. The modification will be performed inplace. Notes: This is an experimental API and could be removed in next few releases. Users should not use this API. Args: input_program(Program): The program will be inserted :code:`delete_op`. skip_opt_set(set): vars wil be skipped in memory optimze Returns: None """ cfgs = _get_cfgs(input_program) input_program._is_mem_optimized = True for cfg in cfgs: cfg.release_memory(skip_opt_set=skip_opt_set)