From 32d81909dc6901be0779e2ddbd9e53d42b84d003 Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Tue, 17 Jul 2018 10:55:53 +0800 Subject: [PATCH] fix pserver with condition block --- paddle/fluid/operators/listen_and_serv_op.cc | 14 ++++-- .../tests/unittests/test_dist_transpiler.py | 48 +++++++++++++++++-- .../fluid/transpiler/distribute_transpiler.py | 3 ++ 3 files changed, 56 insertions(+), 9 deletions(-) diff --git a/paddle/fluid/operators/listen_and_serv_op.cc b/paddle/fluid/operators/listen_and_serv_op.cc index 56e39649b40..438b44b42aa 100644 --- a/paddle/fluid/operators/listen_and_serv_op.cc +++ b/paddle/fluid/operators/listen_and_serv_op.cc @@ -61,6 +61,8 @@ static void ParallelExecuteBlocks( framework::Async([&executor, &prepared, &program, &scope, idx]() { int run_block = idx; // thread local try { + VLOG(3) << "running server block: " << run_block + << "pointer: " << prepared[run_block].get(); executor->RunPreparedContext(prepared[run_block].get(), scope); } catch (const std::exception &e) { LOG(ERROR) << "run sub program error " << e.what(); @@ -107,12 +109,14 @@ void ListenAndServOp::RunSyncLoop( PADDLE_ENFORCE_GE(num_blocks, 2, "server program should have at least 2 blocks"); - std::vector optimize_blocks_idx; - for (auto blk : optimize_blocks) { - optimize_blocks_idx.push_back(blk->ID()); + // Prepare all the server block + std::vector optimize_blocks_list; + for (size_t i = 1; i < program->Size(); ++i) { + optimize_blocks_list.push_back(i); } - auto optimize_prepared = executor->Prepare(*program, optimize_blocks_idx); - // Insert placeholder for block0 which holds current op itself. + auto optimize_prepared = executor->Prepare(*program, optimize_blocks_list); + // Insert placeholder for block0 which holds current op itself, + // NOTE the first block in `optimize_prepared` should never be ran. optimize_prepared.insert( optimize_prepared.begin(), std::shared_ptr(nullptr)); diff --git a/python/paddle/fluid/tests/unittests/test_dist_transpiler.py b/python/paddle/fluid/tests/unittests/test_dist_transpiler.py index f307e737e2e..9dbef0693bb 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_transpiler.py +++ b/python/paddle/fluid/tests/unittests/test_dist_transpiler.py @@ -304,10 +304,50 @@ class TestL2Decay(TranspilerTest): # TODO(typhoonzero): test clipping and L2Decay ops are removed from trainer - # FIXME(typhoonzero): need to add test for async case: - # see https://github.com/PaddlePaddle/Paddle/issues/11691 -class TestAsyncSGD(TranspilerTest): - pass +class TestL2DecayWithPiecewise(TranspilerTest): + def net_conf(self): + x = fluid.layers.data(name='x', shape=[1000], dtype='float32') + y_predict = fluid.layers.fc(input=x, + size=1000, + act=None, + param_attr=fluid.ParamAttr(name='fc_w'), + bias_attr=fluid.ParamAttr(name='fc_b')) + y = fluid.layers.data(name='y', shape=[1], dtype='float32') + cost = fluid.layers.square_error_cost(input=y_predict, label=y) + avg_cost = fluid.layers.mean(cost) + base_lr = 1.0 + bd = [1, 10, 20, 30] + lr = [base_lr * (0.1**i) for i in range(len(bd) + 1)] + sgd_optimizer = fluid.optimizer.Momentum( + learning_rate=fluid.layers.piecewise_decay( + boundaries=bd, values=lr), + momentum=0.9, + regularization=fluid.regularizer.L2Decay(1e-4)) + sgd_optimizer.minimize(avg_cost) + return + + def test_transpiler(self): + pserver, startup = self.get_pserver(self.pserver1_ep) + trainer = self.get_trainer() + + self.assertEqual(len(pserver.blocks), 9) + self.assertEqual([op.type for op in pserver.blocks[1].ops], [ + "increment", "cast", "fill_constant", "fill_constant", "less_than", + "logical_not", "conditional_block", "fill_constant", + "fill_constant", "less_than", "logical_not", "logical_and", + "logical_and", "conditional_block", "fill_constant", + "fill_constant", "less_than", "logical_not", "logical_and", + "logical_and", "conditional_block", "fill_constant", + "fill_constant", "less_than", "logical_not", "logical_and", + "logical_and", "conditional_block", "fill_constant", + "conditional_block" + ]) + self.assertEqual( + [op.type for op in pserver.blocks[7].ops], + ["sum", "scale", "scale", "elementwise_add", "momentum"]) + self.assertEqual( + [op.type for op in pserver.blocks[8].ops], + ["sum", "scale", "scale", "elementwise_add", "momentum"]) if __name__ == "__main__": diff --git a/python/paddle/fluid/transpiler/distribute_transpiler.py b/python/paddle/fluid/transpiler/distribute_transpiler.py index c4995fa09c7..006510a7d3f 100644 --- a/python/paddle/fluid/transpiler/distribute_transpiler.py +++ b/python/paddle/fluid/transpiler/distribute_transpiler.py @@ -461,6 +461,8 @@ class DistributeTranspiler(object): per_opt_block = pserver_program.create_block(pre_block_idx) optimize_blocks.append(per_opt_block) # append grad merging ops before clip and weight decay + # cases may like: + # L2Decay op -> clip op -> optimize for _, op in enumerate(self.optimize_ops): # find the origin @GRAD var before clipping grad_varname_for_block = __op_have_grad_input__(op) @@ -468,6 +470,7 @@ class DistributeTranspiler(object): merged_var = self._append_pserver_grad_merge_ops( per_opt_block, grad_varname_for_block, endpoint, grad_to_block_id, self.origin_program) + break # append optimize op once then append other ops. for _, op in enumerate(self.optimize_ops): # optimizer is connected to itself if ufind.is_connected(op, opt_op) and op not in global_ops: -- GitLab