grpc_client.h 7.3 KB
Newer Older
1
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.
G
gongweibao 已提交
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

#pragma once

#include <time.h>
Y
Yi Wang 已提交
18

W
Wu Yi 已提交
19 20
#include <chrono>              // NOLINT
#include <condition_variable>  // NOLINT
G
gongweibao 已提交
21 22 23 24
#include <ctime>
#include <functional>
#include <iostream>
#include <map>
Y
Yancey1989 已提交
25
#include <mutex>  // NOLINT
G
gongweibao 已提交
26
#include <string>
W
Wu Yi 已提交
27
#include <thread>  // NOLINT
G
gongweibao 已提交
28 29
#include <vector>

W
Wu Yi 已提交
30
#include "grpc++/channel.h"
Y
Yi Wang 已提交
31 32 33 34 35
#include "grpc++/generic/generic_stub.h"
#include "grpc++/grpc++.h"
#include "grpc++/support/byte_buffer.h"
#include "grpc++/support/slice.h"
#include "grpc/support/log.h"
T
typhoonzero 已提交
36
#include "paddle/fluid/framework/blocking_queue.h"
Y
Yi Wang 已提交
37 38 39 40
#include "paddle/fluid/framework/data_type.h"
#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/framework/selected_rows.h"
41
#include "paddle/fluid/operators/distributed/request_handler.h"
42
#include "paddle/fluid/operators/distributed/rpc_client.h"
43 44
#include "paddle/fluid/operators/distributed/send_recv.grpc.pb.h"
#include "paddle/fluid/operators/distributed/send_recv.pb.h"
45
#include "paddle/fluid/operators/distributed/sendrecvop_utils.h"
Y
Yancey1989 已提交
46
#include "paddle/fluid/platform/macros.h"  // for DISABLE_COPY_AND_ASSIGN
G
gongweibao 已提交
47 48 49

namespace paddle {
namespace operators {
50
namespace distributed {
G
gongweibao 已提交
51

52
void ProcGetResponse(const VarHandle& var_h, const grpc::ByteBuffer& msg);
G
gongweibao 已提交
53

54
class BaseProcessor {
G
gongweibao 已提交
55
 public:
56
  BaseProcessor() { context_ = nullptr; }
G
gongweibao 已提交
57

58
  virtual ~BaseProcessor() {}
G
gongweibao 已提交
59

60 61 62
  virtual void Prepare(VarHandlePtr h, int64_t time_out) {
    var_h_ = h;

G
gongweibao 已提交
63
    context_.reset(new grpc::ClientContext());
W
Wu Yi 已提交
64
    context_->set_wait_for_ready(true);
Y
Yancey1989 已提交
65 66 67 68 69 70
    if (time_out) {
      std::chrono::system_clock::time_point deadline =
          std::chrono::system_clock::now() +
          std::chrono::milliseconds(time_out);
      context_->set_deadline(deadline);
    }
G
gongweibao 已提交
71 72
  }

73 74 75
  void Process() {
    ProcessImpl();
    var_h_->Finish(true);
Y
Yancey 已提交
76 77
  }

78 79 80 81
  VarHandlePtr GetVarHandlePtr() { return var_h_; }
  bool Wait() { return var_h_->Wait(); }
  void Finish(bool ok) { return var_h_->Finish(ok); }
  virtual void ProcessImpl() = 0;
G
gongweibao 已提交
82 83 84

  std::unique_ptr<grpc::ClientContext> context_;
  grpc::Status status_;
85 86 87

 protected:
  VarHandlePtr var_h_;
G
gongweibao 已提交
88 89
};

90
typedef std::function<void(const VarHandle&, const ::grpc::ByteBuffer&)>
G
gongweibao 已提交
91 92
    RequestSendCallBack;

93
class SendProcessor : public BaseProcessor {
G
gongweibao 已提交
94
 public:
95
  explicit SendProcessor(std::shared_ptr<grpc::Channel> ch)
96
      : BaseProcessor(), stub_g_(ch) {}
G
gongweibao 已提交
97 98 99

  virtual ~SendProcessor() {}

100
  void ProcessImpl() override {
G
gongweibao 已提交
101
    if (response_call_back_) {
102
      response_call_back_(*var_h_.get(), reply_);
G
gongweibao 已提交
103 104 105
    }
  }

106 107
  ::grpc::GenericStub stub_g_;
  ::grpc::ByteBuffer reply_;
T
typhoonzero 已提交
108
  RequestSendCallBack response_call_back_ = nullptr;
G
gongweibao 已提交
109 110
};

111
typedef std::function<void(const VarHandle&, const ::grpc::ByteBuffer&)>
G
gongweibao 已提交
112 113
    RequestGetCallBack;

114
class GetProcessor : public BaseProcessor {
G
gongweibao 已提交
115
 public:
116
  explicit GetProcessor(std::shared_ptr<grpc::Channel> ch)
117
      : BaseProcessor(), stub_g_(ch) {}
G
gongweibao 已提交
118 119 120

  virtual ~GetProcessor() {}

121
  void ProcessImpl() override {
G
gongweibao 已提交
122
    if (response_call_back_) {
123
      response_call_back_(*var_h_.get(), reply_);
G
gongweibao 已提交
124 125 126
    }
  }

127 128
  ::grpc::ByteBuffer reply_;
  ::grpc::GenericStub stub_g_;
G
gongweibao 已提交
129 130 131
  RequestGetCallBack response_call_back_ = ProcGetResponse;
};

132
class BatchBarrierProcessor : public BaseProcessor {
Y
Yancey 已提交
133 134
 public:
  explicit BatchBarrierProcessor(std::shared_ptr<grpc::Channel> ch)
135
      : BaseProcessor() {
136 137
    stub_ = sendrecv::SendRecvService::NewStub(ch);
  }
Y
Yancey 已提交
138 139 140

  virtual ~BatchBarrierProcessor() {}

141
  void ProcessImpl() override {}
Y
Yancey 已提交
142
  sendrecv::VoidMessage reply_;
143
  std::unique_ptr<sendrecv::SendRecvService::Stub> stub_;
Y
Yancey 已提交
144 145
};

146 147 148
class FetchBarrierProcessor : public BaseProcessor {
 public:
  explicit FetchBarrierProcessor(std::shared_ptr<grpc::Channel> ch)
149
      : BaseProcessor() {
150 151
    stub_ = sendrecv::SendRecvService::NewStub(ch);
  }
152 153 154

  virtual ~FetchBarrierProcessor() {}

155
  void ProcessImpl() override {}
156
  sendrecv::VariableMessage reply_;
157
  std::unique_ptr<sendrecv::SendRecvService::Stub> stub_;
158 159
};

T
tangwei12 已提交
160 161 162
class CheckpointNotifyProcessor : public BaseProcessor {
 public:
  explicit CheckpointNotifyProcessor(std::shared_ptr<grpc::Channel> ch)
163
      : BaseProcessor() {
T
tangwei12 已提交
164 165 166 167 168
    stub_ = sendrecv::SendRecvService::NewStub(ch);
  }

  virtual ~CheckpointNotifyProcessor() {}

169
  void ProcessImpl() override {}
T
tangwei12 已提交
170 171
  sendrecv::VoidMessage reply_;
  std::unique_ptr<sendrecv::SendRecvService::Stub> stub_;
T
bug fix  
tangwei12 已提交
172
};
T
tangwei12 已提交
173

G
gongweibao 已提交
174
class GRPCClient : public RPCClient {
G
gongweibao 已提交
175
 public:
M
minqiyang 已提交
176
  GRPCClient() : ok_(true), completed_(false), stopped_(false) {}
G
gongweibao 已提交
177
  virtual ~GRPCClient();
Y
Yancey1989 已提交
178

179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209
  VarHandlePtr AsyncSendVar(const std::string& ep,
                            const platform::DeviceContext& ctx,
                            const framework::Scope& scope,
                            const std::string& var_name,
                            int64_t time_out = FLAGS_rpc_deadline) override;

  VarHandlePtr AsyncGetVar(const std::string& ep,
                           const platform::DeviceContext& ctx,
                           const framework::Scope& scope,
                           const std::string& var_name,
                           int64_t time_out = FLAGS_rpc_deadline) override;

  VarHandlePtr AsyncPrefetchVar(const std::string& ep,
                                const platform::DeviceContext& ctx,
                                const framework::Scope& scope,
                                const std::string& in_var_name,
                                const std::string& out_var_name,
                                int64_t time_out = FLAGS_rpc_deadline) override;

  VarHandlePtr AsyncSendBatchBarrier(
      const std::string& ep, int64_t time_out = FLAGS_rpc_deadline) override;

  VarHandlePtr AsyncSendFetchBarrier(
      const std::string& ep, int64_t time_out = FLAGS_rpc_deadline) override;

  VarHandlePtr AsyncCheckpointNotify(
      const std::string& ep, const std::string& dir,
      int64_t time_out = FLAGS_rpc_deadline) override;

  VarHandlePtr AsyncSendComplete(
      const std::string& ep, int64_t time_out = FLAGS_rpc_deadline) override;
Y
Yancey1989 已提交
210

Y
Yancey1989 已提交
211
  bool Wait() override;
Y
Yancey 已提交
212

Y
Yancey1989 已提交
213
  void SendComplete() override;
W
Wu Yi 已提交
214

G
gongweibao 已提交
215 216 217 218
 protected:
  void InitImpl() override;

 private:
W
Wu Yi 已提交
219 220
  // InitEventLoop should only be called by Init()
  void InitEventLoop();
G
gongweibao 已提交
221

W
Wu Yi 已提交
222
  void Proceed();
G
gongweibao 已提交
223

Y
Yancey1989 已提交
224
  std::shared_ptr<grpc::Channel> GetChannel(const std::string& ep);
G
gongweibao 已提交
225 226 227

 private:
  grpc::CompletionQueue cq_;
W
Wu Yi 已提交
228 229 230 231 232 233
  std::unordered_map<std::string, std::shared_ptr<grpc::Channel>> channels_;
  std::unique_ptr<std::thread> client_thread_;

  // mutex for Wait client sync
  std::mutex sync_mutex_;
  std::condition_variable sync_cond_;
Y
Yancey1989 已提交
234
  std::atomic<int64_t> req_count_{0};
Y
Yancey1989 已提交
235
  bool ok_;
W
Wu Yi 已提交
236 237 238

  // mutex for GetChannel thread safety
  std::mutex chan_mutex_;
G
gongweibao 已提交
239
  DISABLE_COPY_AND_ASSIGN(GRPCClient);
Y
Yancey1989 已提交
240 241 242 243

  // mutex for sending complete message only once
  std::mutex completed_mutex_;
  bool completed_;
M
minqiyang 已提交
244

M
minqiyang 已提交
245
  volatile bool stopped_;
G
gongweibao 已提交
246 247
};

248
}  // namespace distributed
G
gongweibao 已提交
249 250
}  // namespace operators
}  // namespace paddle