diff --git a/paddle/fluid/operators/detail/grpc_server.h b/paddle/fluid/operators/detail/grpc_server.h index 5c278f0ed765566cba0c1579995e2f7e0e2809ea..f891c75dbc81a5cdb5274bbae84e9e85f42464fb 100644 --- a/paddle/fluid/operators/detail/grpc_server.h +++ b/paddle/fluid/operators/detail/grpc_server.h @@ -21,14 +21,11 @@ limitations under the License. */ #include "paddle/fluid/framework/scope.h" #include "paddle/fluid/framework/selected_rows.h" #include "paddle/fluid/framework/var_type.h" -#include "paddle/fluid/operators/detail/sendrecvop_utils.h" -#include "paddle/fluid/operators/detail/simple_block_queue.h" - +#include "paddle/fluid/operators/detail/grpc_service.h" +#include "paddle/fluid/operators/detail/grpc_service.h" #include "paddle/fluid/operators/detail/send_recv.grpc.pb.h" #include "paddle/fluid/operators/detail/send_recv.pb.h" -#include "paddle/fluid/operators/detail/grpc_service.h" - namespace paddle { namespace operators { namespace detail { diff --git a/paddle/fluid/operators/listen_and_serv_op.cc b/paddle/fluid/operators/listen_and_serv_op.cc index 31ea2a7e581950b5399a7a5efc9ae38b8ea3c52d..d8a3c45ac5bf64c0613fa5f3de1684954d3c056a 100644 --- a/paddle/fluid/operators/listen_and_serv_op.cc +++ b/paddle/fluid/operators/listen_and_serv_op.cc @@ -93,6 +93,12 @@ class ListenAndServOp : public framework::OperatorBase { "server program should have at least 2 blocks"); framework::Executor executor(dev_place); + std::vector blk_ctx_list; + blk_ctx_list.push_back(nullptr); // block0 is not used. + for (int blkid = 1; blkid < num_blocks; ++blkid) { + auto *exe_ctx = executor.Prepare(*program, blkid); + blk_ctx_list.push_back(exe_ctx); + } // TODO(typhoonzero): change this to a while_op for every cluster-batch. bool exit_flag = false; @@ -139,26 +145,27 @@ class ListenAndServOp : public framework::OperatorBase { // should be global ops. // NOTE: if is_gpu_place, CUDA kernels are laugched by multiple threads // and this will still work. + std::vector> fs; // block0 contains only listen_and_serv op, start run from block1. for (int blkid = 1; blkid < num_blocks - 1; ++blkid) { - fs.push_back(framework::Async([&executor, &program, &recv_scope, - blkid]() { - int run_block = blkid; // thread local - try { - executor.Run(*program, &recv_scope, run_block, - false /*create_local_scope*/, false /*create_vars*/); - } catch (std::exception &e) { - LOG(ERROR) << "run sub program error " << e.what(); - } - })); + fs.push_back(framework::Async( + [&executor, &program, &recv_scope, &blk_ctx_list, blkid]() { + int run_block = blkid; // thread local + try { + executor.RunPreparedContext(blk_ctx_list[run_block], + &recv_scope, false, false); + } catch (std::exception &e) { + LOG(ERROR) << "run sub program error " << e.what(); + } + })); } for (int i = 0; i < num_blocks - 2; ++i) fs[i].wait(); // Run global block at final step, or block1 if there are only 2 blocks if (num_blocks >= 2) { try { - executor.Run(*program, &recv_scope, num_blocks - 1, - false /*create_local_scope*/, false /*create_vars*/); + executor.RunPreparedContext(blk_ctx_list[num_blocks - 1], &recv_scope, + false, false); } catch (std::exception &e) { LOG(ERROR) << "run sub program error " << e.what(); } @@ -177,6 +184,10 @@ class ListenAndServOp : public framework::OperatorBase { rpc_service_->WaitClientGet(fan_in); sparse_vars.clear(); } // while(true) + + for (int i = 0; i < num_blocks; ++i) { + delete blk_ctx_list[i]; + } } protected: diff --git a/paddle/fluid/operators/send_op.cc b/paddle/fluid/operators/send_op.cc index 443f40e803ea31c3961ed77842bd0775e0f74f35..a77c38f633c776359648a7a3eb6fab0ada6de997 100644 --- a/paddle/fluid/operators/send_op.cc +++ b/paddle/fluid/operators/send_op.cc @@ -68,7 +68,7 @@ class SendOp : public framework::OperatorBase { for (size_t i = 0; i < ins.size(); i++) { if (NeedSend(scope, ins[i])) { - VLOG(3) << "sending " << ins[i] << " to " << epmap[i]; + VLOG(2) << "sending " << ins[i] << " to " << epmap[i]; rpc_client->AsyncSendVariable(epmap[i], ctx, scope, ins[i]); } else { VLOG(3) << "don't send no-initialied variable: " << ins[i]; @@ -77,20 +77,20 @@ class SendOp : public framework::OperatorBase { PADDLE_ENFORCE(rpc_client->Wait()); for (auto& ep : endpoints) { - VLOG(3) << "batch barrier, ep: " << ep; + VLOG(2) << "batch barrier, ep: " << ep; rpc_client->AsyncSendBatchBarrier(ep); } PADDLE_ENFORCE(rpc_client->Wait()); if (outs.size() > 0) { for (size_t i = 0; i < outs.size(); i++) { - VLOG(3) << "getting " << outs[i] << " from " << epmap[i]; + VLOG(2) << "getting " << outs[i] << " from " << epmap[i]; rpc_client->AsyncGetVariable(epmap[i], ctx, scope, outs[i]); } PADDLE_ENFORCE(rpc_client->Wait()); // tell pservers that current trainer have called fetch for (auto& ep : endpoints) { - VLOG(3) << "send fetch barrier, ep: " << ep; + VLOG(2) << "send fetch barrier, ep: " << ep; rpc_client->AsyncSendFetchBarrier(ep); } PADDLE_ENFORCE(rpc_client->Wait()); diff --git a/python/paddle/fluid/distribute_transpiler.py b/python/paddle/fluid/distribute_transpiler.py index 33cea96421bf93f1693bc06e7412b561f1bd2a32..62147d325b699a62bd39cfbaca44874b7fc19a0f 100644 --- a/python/paddle/fluid/distribute_transpiler.py +++ b/python/paddle/fluid/distribute_transpiler.py @@ -565,6 +565,8 @@ class DistributeTranspiler: orig_var_name = "" if suff_idx >= 0: orig_var_name = varname[:suff_idx] + else: + orig_var_name = varname return orig_var_name def _append_pserver_ops(self, optimize_block, opt_op, endpoint, @@ -579,7 +581,8 @@ class DistributeTranspiler: grad_block = None for g in self.param_grad_ep_mapping[endpoint]["grads"]: if same_or_split_var( - self._orig_varname(g.name), opt_op.input(key)[0]): + self._orig_varname(g.name), + self._orig_varname(opt_op.input(key)[0])): grad_block = g break if not grad_block: @@ -750,7 +753,7 @@ class DistributeTranspiler: param_names = [ p.name for p in self.param_grad_ep_mapping[endpoint]["params"] ] - if op.input("Param") in param_names: + if op.input("Param")[0] in param_names: return True else: for n in param_names: