提交 081b7824 编写于 作者: Y Yancey1989

update by comment

上级 2a4221ac
......@@ -53,7 +53,7 @@ class SendVarsOp : public framework::OperatorBase {
auto ins = Inputs("X");
std::vector<std::string> epmap = Attr<std::vector<std::string>>("epmap");
int flag_wait = Attr<int>("wait");
int sync_send = Attr<int>("sync_sent");
platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance();
auto& ctx = *pool.Get(place);
......@@ -68,12 +68,14 @@ class SendVarsOp : public framework::OperatorBase {
for (size_t i = 0; i < ins.size(); i++) {
if (NeedSend(scope, ins[i])) {
VLOG(3) << "sending " << ins[i] << " to " << epmap[i];
// TODO(Yancey1989): we need to use an IO threadpool which has
// a larger number of threads than the computing threadpool.
rpc_client->AsyncSendVariable(epmap[i], ctx, scope, ins[i]);
} else {
VLOG(3) << "don't send no-initialied variable: " << ins[i];
}
}
if (flag_wait) {
if (sync_send) {
rpc_client->Wait();
}
}
......@@ -86,16 +88,16 @@ class SendVarsOpMaker : public framework::OpProtoAndCheckerMaker {
AddInput("X", "(Tensor, SelectedRows) Input variables to be sent")
.AsDuplicable();
AddOutput("RPCClient",
"(RPCClient) The RPC client object which is"
"(RPCClient) The RPC client object which will be"
"initialized at most once.");
AddComment(R"DOC(
Send operator
This operator will send variables to listen_and_serve op at the parameter server.
)DOC");
AddAttr<int>("wait",
AddAttr<int>("ync_send",
"(int, default 0)"
"whether watting for all send request have been sent.")
"sync send or async send.")
.SetDefault(0);
AddAttr<std::vector<std::string>>("epmap",
"(string vector, default 127.0.0.1:6164)"
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册