提交 32d81909 编写于 作者: T typhoonzero

fix pserver with condition block

上级 0b9abcbe
...@@ -61,6 +61,8 @@ static void ParallelExecuteBlocks( ...@@ -61,6 +61,8 @@ static void ParallelExecuteBlocks(
framework::Async([&executor, &prepared, &program, &scope, idx]() { framework::Async([&executor, &prepared, &program, &scope, idx]() {
int run_block = idx; // thread local int run_block = idx; // thread local
try { try {
VLOG(3) << "running server block: " << run_block
<< "pointer: " << prepared[run_block].get();
executor->RunPreparedContext(prepared[run_block].get(), scope); executor->RunPreparedContext(prepared[run_block].get(), scope);
} catch (const std::exception &e) { } catch (const std::exception &e) {
LOG(ERROR) << "run sub program error " << e.what(); LOG(ERROR) << "run sub program error " << e.what();
...@@ -107,12 +109,14 @@ void ListenAndServOp::RunSyncLoop( ...@@ -107,12 +109,14 @@ void ListenAndServOp::RunSyncLoop(
PADDLE_ENFORCE_GE(num_blocks, 2, PADDLE_ENFORCE_GE(num_blocks, 2,
"server program should have at least 2 blocks"); "server program should have at least 2 blocks");
std::vector<int> optimize_blocks_idx; // Prepare all the server block
for (auto blk : optimize_blocks) { std::vector<int> optimize_blocks_list;
optimize_blocks_idx.push_back(blk->ID()); for (size_t i = 1; i < program->Size(); ++i) {
optimize_blocks_list.push_back(i);
} }
auto optimize_prepared = executor->Prepare(*program, optimize_blocks_idx); auto optimize_prepared = executor->Prepare(*program, optimize_blocks_list);
// Insert placeholder for block0 which holds current op itself. // Insert placeholder for block0 which holds current op itself,
// NOTE the first block in `optimize_prepared` should never be ran.
optimize_prepared.insert( optimize_prepared.insert(
optimize_prepared.begin(), optimize_prepared.begin(),
std::shared_ptr<framework::ExecutorPrepareContext>(nullptr)); std::shared_ptr<framework::ExecutorPrepareContext>(nullptr));
......
...@@ -304,10 +304,50 @@ class TestL2Decay(TranspilerTest): ...@@ -304,10 +304,50 @@ class TestL2Decay(TranspilerTest):
# TODO(typhoonzero): test clipping and L2Decay ops are removed from trainer # TODO(typhoonzero): test clipping and L2Decay ops are removed from trainer
# FIXME(typhoonzero): need to add test for async case: class TestL2DecayWithPiecewise(TranspilerTest):
# see https://github.com/PaddlePaddle/Paddle/issues/11691 def net_conf(self):
class TestAsyncSGD(TranspilerTest): x = fluid.layers.data(name='x', shape=[1000], dtype='float32')
pass y_predict = fluid.layers.fc(input=x,
size=1000,
act=None,
param_attr=fluid.ParamAttr(name='fc_w'),
bias_attr=fluid.ParamAttr(name='fc_b'))
y = fluid.layers.data(name='y', shape=[1], dtype='float32')
cost = fluid.layers.square_error_cost(input=y_predict, label=y)
avg_cost = fluid.layers.mean(cost)
base_lr = 1.0
bd = [1, 10, 20, 30]
lr = [base_lr * (0.1**i) for i in range(len(bd) + 1)]
sgd_optimizer = fluid.optimizer.Momentum(
learning_rate=fluid.layers.piecewise_decay(
boundaries=bd, values=lr),
momentum=0.9,
regularization=fluid.regularizer.L2Decay(1e-4))
sgd_optimizer.minimize(avg_cost)
return
def test_transpiler(self):
pserver, startup = self.get_pserver(self.pserver1_ep)
trainer = self.get_trainer()
self.assertEqual(len(pserver.blocks), 9)
self.assertEqual([op.type for op in pserver.blocks[1].ops], [
"increment", "cast", "fill_constant", "fill_constant", "less_than",
"logical_not", "conditional_block", "fill_constant",
"fill_constant", "less_than", "logical_not", "logical_and",
"logical_and", "conditional_block", "fill_constant",
"fill_constant", "less_than", "logical_not", "logical_and",
"logical_and", "conditional_block", "fill_constant",
"fill_constant", "less_than", "logical_not", "logical_and",
"logical_and", "conditional_block", "fill_constant",
"conditional_block"
])
self.assertEqual(
[op.type for op in pserver.blocks[7].ops],
["sum", "scale", "scale", "elementwise_add", "momentum"])
self.assertEqual(
[op.type for op in pserver.blocks[8].ops],
["sum", "scale", "scale", "elementwise_add", "momentum"])
if __name__ == "__main__": if __name__ == "__main__":
......
...@@ -461,6 +461,8 @@ class DistributeTranspiler(object): ...@@ -461,6 +461,8 @@ class DistributeTranspiler(object):
per_opt_block = pserver_program.create_block(pre_block_idx) per_opt_block = pserver_program.create_block(pre_block_idx)
optimize_blocks.append(per_opt_block) optimize_blocks.append(per_opt_block)
# append grad merging ops before clip and weight decay # append grad merging ops before clip and weight decay
# cases may like:
# L2Decay op -> clip op -> optimize
for _, op in enumerate(self.optimize_ops): for _, op in enumerate(self.optimize_ops):
# find the origin @GRAD var before clipping # find the origin @GRAD var before clipping
grad_varname_for_block = __op_have_grad_input__(op) grad_varname_for_block = __op_have_grad_input__(op)
...@@ -468,6 +470,7 @@ class DistributeTranspiler(object): ...@@ -468,6 +470,7 @@ class DistributeTranspiler(object):
merged_var = self._append_pserver_grad_merge_ops( merged_var = self._append_pserver_grad_merge_ops(
per_opt_block, grad_varname_for_block, endpoint, per_opt_block, grad_varname_for_block, endpoint,
grad_to_block_id, self.origin_program) grad_to_block_id, self.origin_program)
break # append optimize op once then append other ops.
for _, op in enumerate(self.optimize_ops): for _, op in enumerate(self.optimize_ops):
# optimizer is connected to itself # optimizer is connected to itself
if ufind.is_connected(op, opt_op) and op not in global_ops: if ufind.is_connected(op, opt_op) and op not in global_ops:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册