diff --git a/python/paddle/fluid/memory_optimization_transpiler.py b/python/paddle/fluid/memory_optimization_transpiler.py index 6952ca7fe49931b9ebc84e214569c47d632d2a06..708ca08b17c85efbb25ecaf2580b7141421e25b9 100644 --- a/python/paddle/fluid/memory_optimization_transpiler.py +++ b/python/paddle/fluid/memory_optimization_transpiler.py @@ -29,6 +29,8 @@ dtype_to_size = { core.VarDesc.VarType.BOOL: 1 } +sub_block_ops = ["while", "while_grad", "parallel_do", "parallel_do_grad"] + class ControlFlowGraph(object): def __init__(self, Program, ops, forward_num, skip_opt): @@ -141,7 +143,7 @@ class ControlFlowGraph(object): self.pool = [] for i in range(self.op_size): op = self._ops[i] - if op.type() == "while" or op.type() == "while_grad": + if op.type() in sub_block_ops: continue block_desc = op.block() is_forward = i < self._forward_num @@ -198,67 +200,75 @@ class ControlFlowGraph(object): block_desc, var_name, is_forward).shape())) -def get_cfgs(input_program): +def _process_sub_block_pair(pdesc, sub_block_pair): ops_list = [] - pdesc = input_program.get_desc() block_desc = pdesc.block(0) op_size = block_desc.op_size() - # Get global block ops - ops_list.append( - ([block_desc.op(i) for i in range(op_size)], op_size, set())) - - while_sub_block_ids = [] - while_grad_sub_block_ids = [] - while_block_id_pair = [] - while_op_dict = {} + for fwd_op, bwd_op in sub_block_pair: + sub_block_ids = [] + grad_sub_block_ids = [] + sub_block_id_pair = [] + sub_op_dict = {} + for i in range(op_size): + op = block_desc.op(i) + if op.type() == fwd_op: + sub_block_ids.append(op.attr("sub_block").id) + sub_op_dict[op.attr("sub_block").id] = op + elif op.type() == bwd_op: + grad_sub_block_ids.append(op.attr("sub_block").id) + sub_op_dict[op.attr("sub_block").id] = op - for i in range(op_size): - op = block_desc.op(i) - if op.type() == "while": - while_sub_block_ids.append(op.attr("sub_block").id) - while_op_dict[op.attr("sub_block").id] = op - elif op.type() == "while_grad": - while_grad_sub_block_ids.append(op.attr("sub_block").id) - while_op_dict[op.attr("sub_block").id] = op + # Find fwd_op/bwd_op block pair + for grad_id in grad_sub_block_ids: + fwd_id = pdesc.block(grad_id).get_forward_block_idx() + if fwd_id in sub_block_ids: + sub_block_id_pair.append((fwd_id, grad_id)) + sub_block_ids.remove(fwd_id) - # Find while/while_grad block pair - for grad_id in while_grad_sub_block_ids: - forward_id = pdesc.block(grad_id).get_forward_block_idx() - if forward_id in while_sub_block_ids: - while_block_id_pair.append((forward_id, grad_id)) - while_sub_block_ids.remove(forward_id) + # Get fwd_op/bwd_op block ops + for fwd_id, grad_id in sub_block_id_pair: + sub_block_ops = [] + sub_block = pdesc.block(fwd_id) + block_op_size = sub_block.op_size() + for i in range(block_op_size): + sub_block_ops.append(sub_block.op(i)) - # Get while/while_grad block ops - for forward_id, grad_id in while_block_id_pair: - while_block_ops = [] - while_block = pdesc.block(forward_id) - while_block_op_size = while_block.op_size() - for i in range(while_block_op_size): - while_block_ops.append(while_block.op(i)) + grad_sub_block = pdesc.block(grad_id) + grad_sub_block_op_size = grad_sub_block.op_size() + for i in range(grad_sub_block_op_size): + sub_block_ops.append(grad_sub_block.op(i)) - while_grad_block = pdesc.block(grad_id) - while_grad_block_op_size = while_grad_block.op_size() - for i in range(while_grad_block_op_size): - while_block_ops.append(while_grad_block.op(i)) + sub_op_output = set() + sub_op_output.update(sub_op_dict[fwd_id].output_arg_names()) + sub_op_output.update(sub_op_dict[grad_id].output_arg_names()) + ops_list.append((sub_block_ops, block_op_size, sub_op_output)) - while_op_output = set() - while_op_output.update(while_op_dict[forward_id].output_arg_names()) - while_op_output.update(while_op_dict[grad_id].output_arg_names()) + # Process rest fwd_op block ops + for fwd_id in sub_block_ids: + sub_block_ops = [] + sub_block = pdesc.block(fwd_id) + sub_block_op_size = sub_block.op_size() + for i in range(sub_block_op_size): + sub_block_ops.append(sub_block.op(i)) + sub_op_output = set() + sub_op_output.update(sub_op_dict[fwd_id].output_arg_names()) + ops_list.append((sub_block_ops, sub_block_op_size, sub_op_output)) + return ops_list - ops_list.append((while_block_ops, while_block_op_size, while_op_output)) - # Process rest while block ops - for forward_id in while_sub_block_ids: - while_block_ops = [] - while_block = pdesc.block(forward_id) - while_block_op_size = while_block.op_size() - for i in range(while_block_op_size): - while_block_ops.append(while_block.op(i)) +def _get_cfgs(input_program): + ops_list = [] + pdesc = input_program.get_desc() + block_desc = pdesc.block(0) + op_size = block_desc.op_size() + # Get global block ops + ops_list.append( + ([block_desc.op(i) for i in range(op_size)], op_size, set())) - while_op_output = set() - while_op_output.update(while_op_dict[forward_id].output_arg_names()) + sub_block_pair = [("while", "while_grad"), ("parallel_do", + "parallel_do_grad")] - ops_list.append((while_block_ops, while_block_op_size, while_op_output)) + ops_list.extend(_process_sub_block_pair(pdesc, sub_block_pair)) cfgs = [ ControlFlowGraph(input_program, ops, forward_num, skip_opt) @@ -268,6 +278,6 @@ def get_cfgs(input_program): def memory_optimize(input_program): - cfgs = get_cfgs(input_program) + cfgs = _get_cfgs(input_program) for cfg in cfgs: cfg.memory_optimize() diff --git a/python/paddle/fluid/tests/book_memory_optimization/test_memopt_fit_a_line.py b/python/paddle/fluid/tests/book_memory_optimization/test_memopt_fit_a_line.py index 784cfe58dfebd5451918789a3bd156b092978bd5..a99d13330b84e07a04a4499d749df24ec99e7ec0 100644 --- a/python/paddle/fluid/tests/book_memory_optimization/test_memopt_fit_a_line.py +++ b/python/paddle/fluid/tests/book_memory_optimization/test_memopt_fit_a_line.py @@ -24,15 +24,29 @@ import sys fluid.default_startup_program().random_seed = 111 x = fluid.layers.data(name='x', shape=[13], dtype='float32') - -y_predict = fluid.layers.fc(input=x, size=1, act=None) - y = fluid.layers.data(name='y', shape=[1], dtype='float32') -cost = fluid.layers.square_error_cost(input=y_predict, label=y) -avg_cost = fluid.layers.mean(cost) +device_type = 'CPU' +use_nccl = False +place = fluid.CPUPlace() +if fluid.core.is_compiled_with_cuda(): + device_type = 'CUDA' + use_nccl = True + place = fluid.CUDAPlace(0) -sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.1) +places = fluid.layers.get_places(device_count=0, device_type=device_type) +pd = fluid.layers.ParallelDo(places, use_nccl=use_nccl) +with pd.do(): + x_ = pd.read_input(x) + y_ = pd.read_input(y) + y_predict = fluid.layers.fc(input=x_, size=1, act=None) + cost = fluid.layers.square_error_cost(input=y_predict, label=y_) + avg_cost = fluid.layers.mean(x=cost) + pd.write_output(avg_cost) + +cost = pd() +avg_cost = fluid.layers.mean(x=cost) +sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.01) sgd_optimizer.minimize(avg_cost) fluid.memory_optimize(fluid.default_main_program()) @@ -48,7 +62,6 @@ train_reader = paddle.batch( # paddle.dataset.uci_housing.train(), buf_size=500), # batch_size=BATCH_SIZE) -place = fluid.CPUPlace() feeder = fluid.DataFeeder(place=place, feed_list=[x, y]) exe = fluid.Executor(place) @@ -65,6 +78,7 @@ for pass_id in range(PASS_NUM): if avg_loss_value[0] < 10.0: exit(0) # if avg cost less than 10.0, we think our code is good. + print avg_loss_value[0] if math.isnan(float(avg_loss_value)): sys.exit("got NaN loss, training failed.") exit(1)