grpc_client.h 8.3 KB
Newer Older
1
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.
G
gongweibao 已提交
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

#pragma once

#include <time.h>
X
Xin Pan 已提交
18
#include <atomic>
Y
Yi Wang 已提交
19

W
Wu Yi 已提交
20 21
#include <chrono>              // NOLINT
#include <condition_variable>  // NOLINT
G
gongweibao 已提交
22 23 24 25
#include <ctime>
#include <functional>
#include <iostream>
#include <map>
Y
Yancey1989 已提交
26
#include <mutex>  // NOLINT
G
gongweibao 已提交
27
#include <string>
W
Wu Yi 已提交
28
#include <thread>  // NOLINT
G
gongweibao 已提交
29 30
#include <vector>

W
Wu Yi 已提交
31
#include "grpc++/channel.h"
Y
Yi Wang 已提交
32 33 34 35 36
#include "grpc++/generic/generic_stub.h"
#include "grpc++/grpc++.h"
#include "grpc++/support/byte_buffer.h"
#include "grpc++/support/slice.h"
#include "grpc/support/log.h"
T
typhoonzero 已提交
37
#include "paddle/fluid/framework/blocking_queue.h"
Y
Yi Wang 已提交
38 39 40 41
#include "paddle/fluid/framework/data_type.h"
#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/framework/selected_rows.h"
W
Wu Yi 已提交
42
#include "paddle/fluid/operators/distributed/distributed_pb.h"
43
#include "paddle/fluid/operators/distributed/request_handler.h"
44 45
#include "paddle/fluid/operators/distributed/rpc_client.h"
#include "paddle/fluid/operators/distributed/sendrecvop_utils.h"
Y
Yancey1989 已提交
46
#include "paddle/fluid/platform/macros.h"  // for DISABLE_COPY_AND_ASSIGN
G
gongweibao 已提交
47 48 49

namespace paddle {
namespace operators {
50
namespace distributed {
G
gongweibao 已提交
51

52
void ProcGetResponse(const VarHandle& var_h, const grpc::ByteBuffer& msg);
G
gongweibao 已提交
53

54
class BaseProcessor {
G
gongweibao 已提交
55
 public:
56
  BaseProcessor() { context_ = nullptr; }
G
gongweibao 已提交
57

58
  virtual ~BaseProcessor() {}
G
gongweibao 已提交
59

60 61 62
  virtual void Prepare(VarHandlePtr h, int64_t time_out) {
    var_h_ = h;

G
gongweibao 已提交
63
    context_.reset(new grpc::ClientContext());
W
Wu Yi 已提交
64
    context_->set_wait_for_ready(true);
Y
Yancey1989 已提交
65 66 67 68 69 70
    if (time_out) {
      std::chrono::system_clock::time_point deadline =
          std::chrono::system_clock::now() +
          std::chrono::milliseconds(time_out);
      context_->set_deadline(deadline);
    }
G
gongweibao 已提交
71 72
  }

73 74 75
  void Process() {
    ProcessImpl();
    var_h_->Finish(true);
Y
Yancey 已提交
76 77
  }

78 79 80 81
  VarHandlePtr GetVarHandlePtr() { return var_h_; }
  bool Wait() { return var_h_->Wait(); }
  void Finish(bool ok) { return var_h_->Finish(ok); }
  virtual void ProcessImpl() = 0;
G
gongweibao 已提交
82 83 84

  std::unique_ptr<grpc::ClientContext> context_;
  grpc::Status status_;
85 86 87

 protected:
  VarHandlePtr var_h_;
G
gongweibao 已提交
88 89
};

90
typedef std::function<void(const VarHandle&, const ::grpc::ByteBuffer&)>
G
gongweibao 已提交
91 92
    RequestSendCallBack;

93
class SendProcessor : public BaseProcessor {
G
gongweibao 已提交
94
 public:
95
  explicit SendProcessor(std::shared_ptr<grpc::Channel> ch)
96
      : BaseProcessor(), stub_g_(ch) {}
G
gongweibao 已提交
97 98 99

  virtual ~SendProcessor() {}

100
  void ProcessImpl() override {
G
gongweibao 已提交
101
    if (response_call_back_) {
102
      response_call_back_(*var_h_.get(), reply_);
G
gongweibao 已提交
103 104 105
    }
  }

106 107
  ::grpc::GenericStub stub_g_;
  ::grpc::ByteBuffer reply_;
T
typhoonzero 已提交
108
  RequestSendCallBack response_call_back_ = nullptr;
G
gongweibao 已提交
109 110
};

111
typedef std::function<void(const VarHandle&, const ::grpc::ByteBuffer&)>
G
gongweibao 已提交
112 113
    RequestGetCallBack;

114
class GetProcessor : public BaseProcessor {
G
gongweibao 已提交
115
 public:
116
  explicit GetProcessor(std::shared_ptr<grpc::Channel> ch)
117
      : BaseProcessor(), stub_g_(ch) {}
G
gongweibao 已提交
118 119 120

  virtual ~GetProcessor() {}

121
  void ProcessImpl() override {
G
gongweibao 已提交
122
    if (response_call_back_) {
123
      response_call_back_(*var_h_.get(), reply_);
G
gongweibao 已提交
124 125 126
    }
  }

127 128
  ::grpc::ByteBuffer reply_;
  ::grpc::GenericStub stub_g_;
G
gongweibao 已提交
129 130 131
  RequestGetCallBack response_call_back_ = ProcGetResponse;
};

132
class BatchBarrierProcessor : public BaseProcessor {
Y
Yancey 已提交
133 134
 public:
  explicit BatchBarrierProcessor(std::shared_ptr<grpc::Channel> ch)
135
      : BaseProcessor() {
136 137
    stub_ = sendrecv::SendRecvService::NewStub(ch);
  }
Y
Yancey 已提交
138 139 140

  virtual ~BatchBarrierProcessor() {}

141
  void ProcessImpl() override {}
Y
Yancey 已提交
142
  sendrecv::VoidMessage reply_;
143
  std::unique_ptr<sendrecv::SendRecvService::Stub> stub_;
Y
Yancey 已提交
144 145
};

146 147 148
class FetchBarrierProcessor : public BaseProcessor {
 public:
  explicit FetchBarrierProcessor(std::shared_ptr<grpc::Channel> ch)
149
      : BaseProcessor() {
150 151
    stub_ = sendrecv::SendRecvService::NewStub(ch);
  }
152 153 154

  virtual ~FetchBarrierProcessor() {}

155
  void ProcessImpl() override {}
156
  sendrecv::VariableMessage reply_;
157
  std::unique_ptr<sendrecv::SendRecvService::Stub> stub_;
158 159
};

T
tangwei12 已提交
160 161 162
class CheckpointNotifyProcessor : public BaseProcessor {
 public:
  explicit CheckpointNotifyProcessor(std::shared_ptr<grpc::Channel> ch)
163
      : BaseProcessor() {
T
tangwei12 已提交
164 165 166 167 168
    stub_ = sendrecv::SendRecvService::NewStub(ch);
  }

  virtual ~CheckpointNotifyProcessor() {}

169
  void ProcessImpl() override {}
T
tangwei12 已提交
170 171
  sendrecv::VoidMessage reply_;
  std::unique_ptr<sendrecv::SendRecvService::Stub> stub_;
T
bug fix  
tangwei12 已提交
172
};
T
tangwei12 已提交
173

G
gongweibao 已提交
174
class GRPCClient : public RPCClient {
G
gongweibao 已提交
175
 public:
M
minqiyang 已提交
176
  GRPCClient() : ok_(true), completed_(false), stopped_(false) {}
G
gongweibao 已提交
177
  virtual ~GRPCClient();
Y
Yancey1989 已提交
178

179 180 181 182 183 184 185 186 187 188
  VarHandlePtr AsyncSendVar(const std::string& ep,
                            const platform::DeviceContext& ctx,
                            const framework::Scope& scope,
                            const std::string& var_name,
                            int64_t time_out = FLAGS_rpc_deadline) override;

  VarHandlePtr AsyncGetVar(const std::string& ep,
                           const platform::DeviceContext& ctx,
                           const framework::Scope& scope,
                           const std::string& var_name,
189
                           const std::string& out_varname,
Q
Qiao Longfei 已提交
190
                           const std::string& table_name = "",
191 192
                           int64_t time_out = FLAGS_rpc_deadline) override;

193 194 195 196 197 198
  VarHandlePtr AsyncGetVarNoBarrier(
      const std::string& ep, const platform::DeviceContext& ctx,
      const framework::Scope& scope, const std::string& var_name,
      const std::string& out_varname,
      int64_t time_out = FLAGS_rpc_deadline) override;

199 200 201 202 203
  VarHandlePtr AsyncGetMonomerVariable(
      const std::string& ep, const platform::DeviceContext& ctx,
      const framework::Scope& scope, const std::string& var_name,
      int64_t time_out = FLAGS_rpc_deadline) override;

204 205 206 207 208
  VarHandlePtr AsyncPrefetchVar(const std::string& ep,
                                const platform::DeviceContext& ctx,
                                const framework::Scope& scope,
                                const std::string& in_var_name,
                                const std::string& out_var_name,
Q
Qiao Longfei 已提交
209
                                const std::string& table_name = "",
210 211 212 213 214
                                int64_t time_out = FLAGS_rpc_deadline) override;

  VarHandlePtr AsyncSendBatchBarrier(
      const std::string& ep, int64_t time_out = FLAGS_rpc_deadline) override;

215 216 217 218 219 220
  VarHandlePtr AsyncSendFetchBarrier(const std::string& ep,
                                     int64_t time_out) override;

  VarHandlePtr AsyncGetMonomerBarrier(
      const std::string& ep, const std::string& var_name,
      int64_t time_out = FLAGS_rpc_deadline) override;
221 222 223 224 225 226 227

  VarHandlePtr AsyncCheckpointNotify(
      const std::string& ep, const std::string& dir,
      int64_t time_out = FLAGS_rpc_deadline) override;

  VarHandlePtr AsyncSendComplete(
      const std::string& ep, int64_t time_out = FLAGS_rpc_deadline) override;
Y
Yancey1989 已提交
228

Y
Yancey1989 已提交
229
  bool Wait() override;
Y
Yancey 已提交
230

Y
Yancey1989 已提交
231
  void SendComplete() override;
W
Wu Yi 已提交
232

G
gongweibao 已提交
233 234 235
  void InitImpl() override;

 private:
W
Wu Yi 已提交
236
  void Proceed();
G
gongweibao 已提交
237

Y
Yancey1989 已提交
238
  std::shared_ptr<grpc::Channel> GetChannel(const std::string& ep);
239 240 241 242
  VarHandlePtr _AsyncGetVar(
      const std::string& ep, const platform::DeviceContext& ctx,
      const framework::Scope& scope, const std::string& method,
      const std::string& var_name, const std::string& out_varname,
Q
Qiao Longfei 已提交
243 244
      const std::string& rpc_path, const std::string& table_name = "",
      int64_t time_out = FLAGS_rpc_deadline);
G
gongweibao 已提交
245 246 247

 private:
  grpc::CompletionQueue cq_;
W
Wu Yi 已提交
248
  std::unordered_map<std::string, std::shared_ptr<grpc::Channel>> channels_;
249
  std::unique_ptr<std::thread> client_thread_{nullptr};
W
Wu Yi 已提交
250 251 252 253

  // mutex for Wait client sync
  std::mutex sync_mutex_;
  std::condition_variable sync_cond_;
Y
Yancey1989 已提交
254
  std::atomic<int64_t> req_count_{0};
Y
Yancey1989 已提交
255
  bool ok_;
W
Wu Yi 已提交
256 257 258

  // mutex for GetChannel thread safety
  std::mutex chan_mutex_;
G
gongweibao 已提交
259
  DISABLE_COPY_AND_ASSIGN(GRPCClient);
Y
Yancey1989 已提交
260 261 262 263

  // mutex for sending complete message only once
  std::mutex completed_mutex_;
  bool completed_;
M
minqiyang 已提交
264

M
minqiyang 已提交
265
  volatile bool stopped_;
G
gongweibao 已提交
266 267
};

268
}  // namespace distributed
G
gongweibao 已提交
269 270
}  // namespace operators
}  // namespace paddle