grpc_client.cc 18.6 KB
Newer Older
1
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.
G
gongweibao 已提交
2 3 4 5 6 7 8 9 10 11 12 13 14

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

15
#include <stdlib.h>
Y
Yi Wang 已提交
16 17
#include <limits>

G
gongweibao 已提交
18
#include "glog/logging.h"  // For VLOG
Y
Yi Wang 已提交
19
#include "paddle/fluid/framework/threadpool.h"
W
Wu Yi 已提交
20 21
#include "paddle/fluid/operators/distributed/grpc/grpc_client.h"
#include "paddle/fluid/operators/distributed/grpc/grpc_serde.h"
22
#include "paddle/fluid/operators/distributed/request_handler.h"
P
peizhilin 已提交
23
#include "paddle/fluid/platform/port.h"
X
Xin Pan 已提交
24
#include "paddle/fluid/platform/profiler.h"
25

26 27
DECLARE_bool(rpc_disable_reuse_port);

G
gongweibao 已提交
28 29
namespace paddle {
namespace operators {
30
namespace distributed {
G
gongweibao 已提交
31

32
void GRPCClient::InitImpl() {
W
Wu Yi 已提交
33 34
  // start the client process thread
  // TODO(wuyi): can make this in a threadpool
35 36
  PADDLE_ENFORCE(client_thread_ == nullptr,
                 "please not re init proceed thread");
G
gongweibao 已提交
37
  client_thread_.reset(new std::thread(std::bind(&GRPCClient::Proceed, this)));
W
Wu Yi 已提交
38 39
}

Y
Yancey1989 已提交
40
void GRPCClient::SendComplete() {
Y
Yancey1989 已提交
41 42 43
  std::unique_lock<std::mutex> lk(completed_mutex_);
  if (!completed_) {
    for (auto& it : channels_) {
M
minqiyang 已提交
44
      VLOG(3) << "send complete message to " << it.first;
Y
Yancey1989 已提交
45 46 47 48
      this->AsyncSendComplete(it.first);
    }
    PADDLE_ENFORCE(this->Wait(), "internal grpc error");
    completed_ = true;
W
Wu Yi 已提交
49 50 51
  }
}

G
gongweibao 已提交
52
GRPCClient::~GRPCClient() {
M
minqiyang 已提交
53
  stopped_ = true;
W
Wu Yi 已提交
54 55 56 57 58 59 60
  Wait();
  cq_.Shutdown();
  {
    std::lock_guard<std::mutex> guard(chan_mutex_);
    for (auto& it : channels_) {
      it.second.reset();
    }
M
minqiyang 已提交
61
    channels_.clear();
W
Wu Yi 已提交
62 63
  }
  client_thread_->join();
Y
Yancey1989 已提交
64 65
}

66 67 68 69 70
VarHandlePtr GRPCClient::AsyncSendVar(const std::string& ep,
                                      const platform::DeviceContext& ctx,
                                      const framework::Scope& scope,
                                      const std::string& var_name,
                                      int64_t time_out) {
71 72 73 74
  const platform::DeviceContext* p_ctx = &ctx;
  const std::string ep_val = ep;
  const std::string var_name_val = var_name;
  const framework::Scope* p_scope = &scope;
Y
Yancey1989 已提交
75
  const auto ch = GetChannel(ep_val);
76
  const std::string method = kSendRPC;
77

78 79 80 81 82 83
  int retry_times_ = 0;

  while (true) {
    SendProcessor* s = new SendProcessor(ch);
    VarHandlePtr h(new VarHandle(ep, method, var_name_val, p_ctx, p_scope));
    s->Prepare(h, time_out);
84

85 86
    framework::AsyncIO([var_name_val, p_scope, p_ctx, s, method, h, this] {
      auto* var = p_scope->FindVar(var_name_val);
87

88 89
      ::grpc::ByteBuffer req;
      SerializeToByteBuffer(var_name_val, var, *p_ctx, &req, "", trainer_id_);
90

91
      VLOG(3) << s->GetVarHandlePtr()->String() << " begin";
92

93 94
      // stub context
      s->response_call_back_ = nullptr;
G
gongweibao 已提交
95

96
      platform::RecordRPCEvent record_event(method);
G
gongweibao 已提交
97

98 99 100 101 102 103 104 105 106 107 108 109 110
      auto call = s->stub_g_.PrepareUnaryCall(
          s->context_.get(), "/sendrecv.SendRecvService/SendVariable", req,
          &cq_);
      call->StartCall();
      call->Finish(&s->reply_, &s->status_, reinterpret_cast<void*>(s));

      if (UNLIKELY(platform::IsProfileEnabled())) {
        h->Wait();
      }
    });
    req_count_++;

    if (FLAGS_rpc_retry_times > 0 && retry_times_ < FLAGS_rpc_retry_times) {
G
gongweibao 已提交
111
      h->Wait();
112 113 114 115 116 117 118
      if (h->should_retry) {
        VLOG(3) << "rpc call failed, retry times " << retry_times_;
        retry_times_++;
        std::random_device rd;
        std::this_thread::sleep_for(std::chrono::milliseconds(rd() % 5));
        continue;
      }
G
gongweibao 已提交
119
    }
G
gongweibao 已提交
120

121 122
    return h;
  }
G
gongweibao 已提交
123 124 125
}

void ProcGetResponse(const VarHandle& var_h,
126
                     const ::grpc::ByteBuffer& ret_msg) {
127
  VLOG(4) << "ProcGetResponse";
T
typhoonzero 已提交
128
  framework::Variable* outvar = nullptr;
W
Wu Yi 已提交
129 130 131 132
  // get response's trainer_id is not used
  int trainer_id;
  DeserializeFromByteBuffer(ret_msg, *var_h.ctx(), var_h.scope(), &outvar,
                            &trainer_id);
133 134 135 136 137
}

template <typename T>
void RequestToByteBuffer(const T& proto, ::grpc::ByteBuffer* result) {
  ::grpc::Slice slice(proto.ByteSizeLong());
Q
qiaolongfei 已提交
138
  proto.SerializeWithCachedSizesToArray(const_cast<uint8_t*>(slice.begin()));
139 140
  ::grpc::ByteBuffer tmp(&slice, 1);
  result->Swap(&tmp);
G
gongweibao 已提交
141 142
}

143 144 145 146
VarHandlePtr GRPCClient::AsyncGetVar(const std::string& ep,
                                     const platform::DeviceContext& ctx,
                                     const framework::Scope& scope,
                                     const std::string& var_name,
147
                                     const std::string& out_varname,
Q
Qiao Longfei 已提交
148
                                     const std::string& table_name,
149
                                     int64_t time_out) {
150
  return _AsyncGetVar(ep, ctx, scope, kGetRPC, var_name, out_varname,
Q
Qiao Longfei 已提交
151 152
                      "/sendrecv.SendRecvService/GetVariable", table_name,
                      time_out);
153 154
}

155 156 157 158 159 160 161 162 163
VarHandlePtr GRPCClient::AsyncGetVarNoBarrier(
    const std::string& ep, const platform::DeviceContext& ctx,
    const framework::Scope& scope, const std::string& var_name,
    const std::string& out_varname, int64_t time_out) {
  std::string var_name_no_barrier =
      string::Sprintf("%s%s", var_name, WITHOUT_BARRIER_MESSAGE);

  return _AsyncGetVar(
      ep, ctx, scope, kGetNoBarrierRPC, var_name_no_barrier, out_varname,
Q
Qiao Longfei 已提交
164
      "/sendrecv.SendRecvService/GetVariableNoBarrier", "", time_out);
165 166
}

167 168 169 170
VarHandlePtr GRPCClient::AsyncGetMonomerVariable(
    const std::string& ep, const platform::DeviceContext& ctx,
    const framework::Scope& scope, const std::string& var_name,
    int64_t time_out) {
171
  return _AsyncGetVar(ep, ctx, scope, kGetMonomerRPC, var_name, var_name,
Q
Qiao Longfei 已提交
172 173
                      "/sendrecv.SendRecvService/GetMonomerVariable", "",
                      time_out);
174 175
}

176 177 178 179
VarHandlePtr GRPCClient::_AsyncGetVar(
    const std::string& ep, const platform::DeviceContext& ctx,
    const framework::Scope& scope, const std::string& method,
    const std::string& var_name, const std::string& out_varname,
Q
Qiao Longfei 已提交
180 181
    const std::string& rpc_path, const std::string& table_name,
    int64_t time_out) {
182 183 184
  const platform::DeviceContext* p_ctx = &ctx;
  const std::string ep_val = ep;
  const std::string var_name_val = var_name;
185
  const std::string out_varname_val = out_varname;
Q
Qiao Longfei 已提交
186
  const std::string table_name_val = table_name;
187
  const framework::Scope* p_scope = &scope;
Y
Yancey1989 已提交
188
  const auto ch = GetChannel(ep_val);
189

190 191 192 193
  int retry_times_ = 0;

  while (true) {
    GetProcessor* s = new GetProcessor(ch);
194

195 196
    VarHandlePtr h(new VarHandle(ep, method, out_varname_val, p_ctx, p_scope));
    s->Prepare(h, time_out);
197

198 199 200 201 202 203 204 205 206 207
    framework::AsyncIO([var_name_val, out_varname_val, table_name_val, s,
                        method, p_ctx, h, rpc_path, this] {
      // prepare input
      sendrecv::VariableMessage req;
      req.set_varname(var_name_val);
      req.set_out_varname(out_varname_val);
      req.set_trainer_id(trainer_id_);
      req.set_table_name(table_name_val);
      ::grpc::ByteBuffer buf;
      RequestToByteBuffer<sendrecv::VariableMessage>(req, &buf);
208

209
      VLOG(3) << s->GetVarHandlePtr()->String() << " begin";
210

211 212
      // stub context
      s->response_call_back_ = ProcGetResponse;
G
gongweibao 已提交
213

214 215 216 217 218 219 220 221 222 223 224 225
      platform::RecordRPCEvent record_event(method);

      auto call =
          s->stub_g_.PrepareUnaryCall(s->context_.get(), rpc_path, buf, &cq_);
      call->StartCall();
      call->Finish(&s->reply_, &s->status_, reinterpret_cast<void*>(s));

      if (UNLIKELY(platform::IsProfileEnabled())) {
        h->Wait();
      }
    });
    req_count_++;
G
gongweibao 已提交
226

227
    if (FLAGS_rpc_retry_times > 0 && retry_times_ < FLAGS_rpc_retry_times) {
Q
Qiao Longfei 已提交
228
      h->Wait();
229 230 231 232 233 234 235
      if (h->should_retry) {
        VLOG(3) << "rpc call failed, retry times " << retry_times_;
        retry_times_++;
        std::random_device rd;
        std::this_thread::sleep_for(std::chrono::milliseconds(rd() % 5));
        continue;
      }
Q
Qiao Longfei 已提交
236
    }
G
gongweibao 已提交
237

238 239
    return h;
  }
G
gongweibao 已提交
240 241
}

242 243 244 245 246
VarHandlePtr GRPCClient::AsyncPrefetchVar(const std::string& ep,
                                          const platform::DeviceContext& ctx,
                                          const framework::Scope& scope,
                                          const std::string& in_var_name,
                                          const std::string& out_var_name,
Q
Qiao Longfei 已提交
247
                                          const std::string& table_name,
248
                                          int64_t time_out) {
Q
Qiao Longfei 已提交
249 250 251 252
  const platform::DeviceContext* p_ctx = &ctx;
  const std::string ep_val = ep;
  const std::string in_var_name_val = in_var_name;
  const std::string out_var_name_val = out_var_name;
Q
Qiao Longfei 已提交
253
  const std::string table_name_val = table_name;
Q
Qiao Longfei 已提交
254
  const framework::Scope* p_scope = &scope;
Y
Yancey1989 已提交
255
  const auto ch = GetChannel(ep_val);
G
gongweibao 已提交
256

257
  const std::string method = kPrefetchRPC;
258
  int retry_times_ = 0;
G
gongweibao 已提交
259

260 261 262 263
  while (true) {
    GetProcessor* s = new GetProcessor(ch);
    VarHandlePtr h(new VarHandle(ep, method, out_var_name_val, p_ctx, p_scope));
    s->Prepare(h, time_out);
Q
Qiao Longfei 已提交
264

265 266 267
    framework::AsyncIO([in_var_name_val, out_var_name_val, ep_val, p_scope,
                        p_ctx, s, method, h, table_name_val, this] {
      auto* var = p_scope->FindVar(in_var_name_val);
Q
Qiao Longfei 已提交
268

269 270 271
      ::grpc::ByteBuffer req;
      SerializeToByteBuffer(in_var_name_val, var, *p_ctx, &req,
                            out_var_name_val, 0, table_name_val);
Q
Qiao Longfei 已提交
272

273
      VLOG(3) << s->GetVarHandlePtr()->String() << " begin";
Q
Qiao Longfei 已提交
274

275 276
      // stub context
      s->response_call_back_ = ProcGetResponse;
Q
Qiao Longfei 已提交
277

278
      platform::RecordRPCEvent record_event(method);
G
gongweibao 已提交
279

280 281 282 283 284
      auto call = s->stub_g_.PrepareUnaryCall(
          s->context_.get(), "/sendrecv.SendRecvService/PrefetchVariable", req,
          &cq_);
      call->StartCall();
      call->Finish(&s->reply_, &s->status_, static_cast<void*>(s));
G
gongweibao 已提交
285

286 287 288 289 290 291 292
      if (UNLIKELY(platform::IsProfileEnabled())) {
        h->Wait();
      }
    });
    req_count_++;

    if (FLAGS_rpc_retry_times > 0 && retry_times_ < FLAGS_rpc_retry_times) {
G
gongweibao 已提交
293
      h->Wait();
294 295 296 297 298 299 300
      if (h->should_retry) {
        VLOG(3) << "rpc call failed, retry times " << retry_times_;
        retry_times_++;
        std::random_device rd;
        std::this_thread::sleep_for(std::chrono::milliseconds(rd() % 5));
        continue;
      }
G
gongweibao 已提交
301
    }
Q
Qiao Longfei 已提交
302

303 304
    return h;
  }
Q
Qiao Longfei 已提交
305 306
}

307 308
VarHandlePtr GRPCClient::AsyncSendBatchBarrier(const std::string& ep,
                                               int64_t time_out) {
Y
Yancey1989 已提交
309
  const auto ch = GetChannel(ep);
Y
Yancey 已提交
310 311

  BatchBarrierProcessor* s = new BatchBarrierProcessor(ch);
312
  const std::string method = kBatchBarrierRPC;
G
gongweibao 已提交
313 314
  VarHandlePtr h(
      new VarHandle(ep, method, BATCH_BARRIER_MESSAGE, nullptr, nullptr));
315
  s->Prepare(h, time_out);
Y
Yancey 已提交
316 317 318

  sendrecv::VariableMessage req;
  req.set_varname(BATCH_BARRIER_MESSAGE);
G
gongweibao 已提交
319

320
  platform::RecordRPCEvent record_event(method);
G
gongweibao 已提交
321

Y
Yancey 已提交
322
  auto rpc = s->stub_->AsyncSendVariable(s->context_.get(), req, &cq_);
Y
Yi Wang 已提交
323
  rpc->Finish(&s->reply_, &s->status_, reinterpret_cast<void*>(s));
Y
Yancey 已提交
324
  req_count_++;
G
gongweibao 已提交
325 326 327 328 329

  if (UNLIKELY(platform::IsProfileEnabled())) {
    h->Wait();
  }

330
  return h;
331
}
Y
Yancey 已提交
332

333 334
VarHandlePtr GRPCClient::AsyncSendFetchBarrier(const std::string& ep,
                                               int64_t time_out) {
Y
Yancey1989 已提交
335
  const auto ch = GetChannel(ep);
336
  FetchBarrierProcessor* s = new FetchBarrierProcessor(ch);
337
  const std::string method = kFetchBarrierRPC;
G
gongweibao 已提交
338 339
  VarHandlePtr h(
      new VarHandle(ep, method, FETCH_BARRIER_MESSAGE, nullptr, nullptr));
340
  s->Prepare(h, time_out);
341 342 343

  sendrecv::VariableMessage req;
  req.set_varname(FETCH_BARRIER_MESSAGE);
G
gongweibao 已提交
344

345
  platform::RecordRPCEvent record_event(method);
G
gongweibao 已提交
346

347
  auto rpc = s->stub_->AsyncGetVariable(s->context_.get(), req, &cq_);
Y
Yi Wang 已提交
348
  rpc->Finish(&s->reply_, &s->status_, reinterpret_cast<void*>(s));
349
  req_count_++;
G
gongweibao 已提交
350 351 352 353 354

  if (UNLIKELY(platform::IsProfileEnabled())) {
    h->Wait();
  }

355
  return h;
Y
Yancey 已提交
356 357
}

358 359 360 361 362
VarHandlePtr GRPCClient::AsyncGetMonomerBarrier(const std::string& ep,
                                                const std::string& var_name,
                                                int64_t time_out) {
  const auto ch = GetChannel(ep);
  BatchBarrierProcessor* s = new BatchBarrierProcessor(ch);
363
  const std::string method = kSendMonomerFetchBarrierRPC;
364
  VarHandlePtr h(new VarHandle(ep, method, var_name, nullptr, nullptr));
365 366 367 368 369 370 371
  s->Prepare(h, time_out);

  VLOG(30) << s->GetVarHandlePtr()->String() << " begin";

  sendrecv::VariableMessage req;
  req.set_varname(var_name);

372
  platform::RecordRPCEvent record_event(method);
373 374 375 376 377 378 379 380 381 382 383 384

  auto rpc = s->stub_->AsyncGetMonomerBarrier(s->context_.get(), req, &cq_);
  rpc->Finish(&s->reply_, &s->status_, reinterpret_cast<void*>(s));
  req_count_++;

  if (UNLIKELY(platform::IsProfileEnabled())) {
    h->Wait();
  }

  return h;
}

385 386
VarHandlePtr GRPCClient::AsyncSendComplete(const std::string& ep,
                                           int64_t time_out) {
W
Wu Yi 已提交
387 388 389
  const auto ch = GetChannel(ep);

  BatchBarrierProcessor* s = new BatchBarrierProcessor(ch);
390
  const std::string method = kSendCompleteRPC;
G
gongweibao 已提交
391
  VarHandlePtr h(new VarHandle(ep, method, COMPLETE_MESSAGE, nullptr, nullptr));
392
  s->Prepare(h, time_out);
W
Wu Yi 已提交
393 394

  sendrecv::VariableMessage req;
395
  req.set_trainer_id(trainer_id_);
Y
Yancey1989 已提交
396
  req.set_varname(COMPLETE_MESSAGE);
G
gongweibao 已提交
397

398
  platform::RecordRPCEvent record_event(method);
G
gongweibao 已提交
399

W
Wu Yi 已提交
400 401
  auto rpc = s->stub_->AsyncSendVariable(s->context_.get(), req, &cq_);
  rpc->Finish(&s->reply_, &s->status_, reinterpret_cast<void*>(s));
Y
Yancey1989 已提交
402
  req_count_++;
G
gongweibao 已提交
403 404 405 406 407

  if (UNLIKELY(platform::IsProfileEnabled())) {
    h->Wait();
  }

408
  return h;
Y
Yancey1989 已提交
409 410
}

411 412 413
VarHandlePtr GRPCClient::AsyncCheckpointNotify(const std::string& ep,
                                               const std::string& dir,
                                               int64_t time_out) {
T
tangwei12 已提交
414
  const auto ch = GetChannel(ep);
415

T
tangwei12 已提交
416
  CheckpointNotifyProcessor* s = new CheckpointNotifyProcessor(ch);
G
gongweibao 已提交
417

418
  const std::string method = kCheckPointNotifyRPC;
G
gongweibao 已提交
419 420 421

  VarHandlePtr h(
      new VarHandle(ep, method, CHECKPOINT_SAVE_MESSAGE, nullptr, nullptr));
422
  s->Prepare(h, time_out);
T
tangwei12 已提交
423

424 425
  sendrecv::VariableMessage req;
  req.set_varname(CHECKPOINT_SAVE_MESSAGE);
426
  req.set_out_varname(dir);
T
tangwei12 已提交
427

428
  platform::RecordRPCEvent record_event(method);
G
gongweibao 已提交
429

T
bug fix  
tangwei12 已提交
430
  auto rpc = s->stub_->AsyncCheckpointNotify(s->context_.get(), req, &cq_);
T
tangwei12 已提交
431 432
  rpc->Finish(&s->reply_, &s->status_, reinterpret_cast<void*>(s));
  req_count_++;
G
gongweibao 已提交
433 434 435 436 437

  if (UNLIKELY(platform::IsProfileEnabled())) {
    h->Wait();
  }

438
  return h;
T
tangwei12 已提交
439 440
}

1
123malin 已提交
441 442 443 444 445 446 447 448 449
VarHandlePtr GRPCClient::AsyncDistributeNotify(
    const std::string& ep, const platform::DeviceContext& ctx,
    const framework::Scope& scope, const std::string& var_name,
    int64_t time_out) {
  const platform::DeviceContext* p_ctx = &ctx;
  const std::string ep_val = ep;
  const std::string var_name_val = var_name;
  const framework::Scope* p_scope = &scope;
  const auto ch = GetChannel(ep_val);
450 451
  const std::string method = kRequestNotify;

1
123malin 已提交
452 453
  SendProcessor* s = new SendProcessor(ch);
  VarHandlePtr h(new VarHandle(ep, method, var_name_val, p_ctx, p_scope));
454 455
  s->Prepare(h, time_out);

1
123malin 已提交
456 457
  framework::AsyncIO([var_name_val, p_scope, p_ctx, s, method, h, this] {
    auto* var = p_scope->FindVar(var_name_val);
458

1
123malin 已提交
459 460
    ::grpc::ByteBuffer req;
    SerializeToByteBuffer(var_name_val, var, *p_ctx, &req, "", trainer_id_);
461

1
123malin 已提交
462 463 464 465 466 467 468 469 470 471 472 473 474
    VLOG(3) << s->GetVarHandlePtr()->String() << " begin";

    // stub context
    s->response_call_back_ = nullptr;

    platform::RecordRPCEvent record_event(method);

    auto call = s->stub_g_.PrepareUnaryCall(
        s->context_.get(), "/sendrecv.SendRecvService/DistributeNotify", req,
        &cq_);
    call->StartCall();
    call->Finish(&s->reply_, &s->status_, reinterpret_cast<void*>(s));
  });
475 476 477 478 479 480 481 482 483
  req_count_++;

  if (UNLIKELY(platform::IsProfileEnabled())) {
    h->Wait();
  }

  return h;
}

Y
Yancey1989 已提交
484
bool GRPCClient::Wait() {
W
Wu Yi 已提交
485
  std::unique_lock<std::mutex> lk(sync_mutex_);
Y
Yancey1989 已提交
486 487
  sync_cond_.wait(lk, [this] { return (req_count_ == 0 || ok_ == false); });
  return ok_;
G
gongweibao 已提交
488 489
}

G
gongweibao 已提交
490
void GRPCClient::Proceed() {
W
Wu Yi 已提交
491
  void* tag = nullptr;
G
gongweibao 已提交
492 493
  bool ok = false;

M
minqiyang 已提交
494
  VLOG(3) << "GRPCClient Proceed begin";
M
minqiyang 已提交
495
  while (!stopped_ && cq_.Next(&tag, &ok)) {
W
Wu Yi 已提交
496 497 498
    BaseProcessor* c = static_cast<BaseProcessor*>(tag);
    GPR_ASSERT(ok);
    PADDLE_ENFORCE(c);
G
gongweibao 已提交
499

W
Wu Yi 已提交
500
    if (c->status_.ok()) {
M
minqiyang 已提交
501
      VLOG(3) << c->GetVarHandlePtr()->String() << " process";
W
Wu Yi 已提交
502
      c->Process();
Y
Yancey1989 已提交
503
    } else if (c->status_.error_code() == grpc::StatusCode::DEADLINE_EXCEEDED) {
504
      LOG(FATAL) << c->GetVarHandlePtr()->String()
G
gongweibao 已提交
505 506 507
                 << " meets grpc error, error_code:" << c->status_.error_code()
                 << " error_message:" << c->status_.error_message()
                 << " error_details:" << c->status_.error_details();
Y
Yancey1989 已提交
508 509 510 511
      {
        std::lock_guard<std::mutex> lk(sync_mutex_);
        ok_ = false;
      }
512
      c->Finish(false);
513 514 515 516 517 518 519 520
    } else if (c->status_.error_code() == grpc::StatusCode::UNAVAILABLE) {
      VLOG(3) << c->GetVarHandlePtr()->String()
              << " meets grpc error, error_code:" << c->status_.error_code()
              << " error_message:" << c->status_.error_message()
              << " error_details:" << c->status_.error_details()
              << " should retry!";
      c->GetVarHandlePtr()->should_retry = true;
      c->Finish(false);
W
Wu Yi 已提交
521
    } else {
522
      LOG(FATAL) << c->GetVarHandlePtr()->String()
G
gongweibao 已提交
523 524 525 526
                 << " meets grpc error, error_code:" << c->status_.error_code()
                 << " error_message:" << c->status_.error_message()
                 << " error_details:" << c->status_.error_details();

527
      c->Finish(false);
W
Wu Yi 已提交
528
    }
529

G
gongweibao 已提交
530
    bool notify = false;
W
Wu Yi 已提交
531 532 533
    {
      std::lock_guard<std::mutex> lk(sync_mutex_);
      req_count_--;
G
gongweibao 已提交
534 535 536 537 538 539 540
      notify = (req_count_ <= 0 || !c->status_.ok());
    }

    delete c;

    if (notify) {
      sync_cond_.notify_all();
W
Wu Yi 已提交
541
    }
G
gongweibao 已提交
542
  }
543 544 545 546 547 548 549

  // Last log message
  // Avoid using VLOG() and LOG(): in the destructor of google::LogMessage() a
  // static Mutex log_mutex is used for synchronization, which might have been
  // destructed at this moment.
  if (FLAGS_v >= 3) {
    std::string msg("GRPCClient Proceed end");
550
    fwrite(msg.c_str(), msg.length(), 1, stderr);
551
  }
G
gongweibao 已提交
552
}
W
Wu Yi 已提交
553

G
gongweibao 已提交
554
std::shared_ptr<grpc::Channel> GRPCClient::GetChannel(const std::string& ep) {
W
Wu Yi 已提交
555
  std::lock_guard<std::mutex> guard(chan_mutex_);
Y
Yancey1989 已提交
556
  auto it = channels_.find(ep);
G
gongweibao 已提交
557 558 559 560
  if (it != channels_.end()) {
    return it->second;
  }

W
Wu Yi 已提交
561
  // Channel configurations:
G
gongweibao 已提交
562
  grpc::ChannelArguments args;
W
Wu Yi 已提交
563
  args.SetInt(GRPC_ARG_MAX_RECONNECT_BACKOFF_MS, 2000);
564 565 566
  if (FLAGS_rpc_disable_reuse_port) {
    args.SetInt(GRPC_ARG_ALLOW_REUSEPORT, 0);
  }
567
  args.SetCompressionAlgorithm(GRPC_COMPRESS_NONE);
G
gongweibao 已提交
568 569 570
  args.SetMaxSendMessageSize(std::numeric_limits<int>::max());
  args.SetMaxReceiveMessageSize(std::numeric_limits<int>::max());

T
typhoonzero 已提交
571 572
  auto ch =
      grpc::CreateCustomChannel(ep, grpc::InsecureChannelCredentials(), args);
Y
Yancey1989 已提交
573
  channels_[ep] = ch;
G
gongweibao 已提交
574 575 576
  return ch;
}

577
}  // namespace distributed
G
gongweibao 已提交
578 579
}  // namespace operators
}  // namespace paddle