grpc_server.h 2.9 KB
Newer Older
1
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.
G
gongweibao 已提交
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

#pragma once

Y
Yi Wang 已提交
17 18 19 20 21
#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/framework/selected_rows.h"
#include "paddle/fluid/framework/var_type.h"
#include "paddle/fluid/operators/detail/simple_block_queue.h"
G
gongweibao 已提交
22

Y
Yi Wang 已提交
23 24
#include "paddle/fluid/operators/detail/send_recv.grpc.pb.h"
#include "paddle/fluid/operators/detail/send_recv.pb.h"
G
gongweibao 已提交
25 26 27 28

#include <grpc++/grpc++.h>
#include <grpc/support/log.h>
#include <thread>
Y
Yi Wang 已提交
29
#include "paddle/fluid/operators/detail/sendrecvop_utils.h"
G
gongweibao 已提交
30 31 32 33 34 35 36 37 38 39

namespace paddle {
namespace operators {
namespace detail {

typedef std::pair<std::string, sendrecv::VariableMessage> MessageWithName;
class RequestBase;

class AsyncGRPCServer final : public sendrecv::SendRecvService::Service {
 public:
Y
Yancey1989 已提交
40
  explicit AsyncGRPCServer(const std::string &address) : address_(address) {}
G
gongweibao 已提交
41 42 43

  void RunSyncUpdate();

T
typhoonzero 已提交
44
  // functions to sync server barrier status.
T
typhoonzero 已提交
45 46
  void WaitCond(int cond);
  void SetCond(int cond);
T
typhoonzero 已提交
47
  void WaitClientGet(int count);
G
gongweibao 已提交
48 49 50

  void SetScope(framework::Scope *scope) { scope_ = scope; }

Y
Yancey1989 已提交
51 52
  void SetDevCtx(const platform::DeviceContext *dev_ctx) { dev_ctx_ = dev_ctx; }

G
gongweibao 已提交
53 54 55 56 57 58 59
  const MessageWithName Get() { return this->var_recv_queue_.Pop(); }

  void Push(const MessageWithName &msg) { this->var_recv_queue_.Push(msg); }

  void ShutDown();

 protected:
Y
Yancey 已提交
60
  void HandleRequest(grpc::ServerCompletionQueue *cq, std::string cq_name,
G
gongweibao 已提交
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
                     std::function<void()> TryToRegisterNewOne);
  void TryToRegisterNewSendOne();
  void TryToRegisterNewGetOne();
  void ShutdownQueue();

 private:
  std::mutex cq_mutex_;
  volatile bool is_shut_down_ = false;
  std::unique_ptr<grpc::ServerCompletionQueue> cq_send_;
  std::unique_ptr<grpc::ServerCompletionQueue> cq_get_;

  sendrecv::SendRecvService::AsyncService service_;
  std::unique_ptr<grpc::Server> server_;

  std::string address_;
  framework::Scope *scope_;
Y
Yancey1989 已提交
77
  const platform::DeviceContext *dev_ctx_;
G
gongweibao 已提交
78 79
  // received variable from RPC, operators fetch variable from this queue.
  SimpleBlockQueue<MessageWithName> var_recv_queue_;
80
  SimpleBlockQueue<MessageWithName> var_get_queue_;
G
gongweibao 已提交
81 82

  // condition of the sub program
T
typhoonzero 已提交
83 84 85
  std::mutex barrier_mutex_;
  mutable int barrier_cond_step_;
  std::condition_variable barrier_condition_;
G
gongweibao 已提交
86 87 88 89 90 91 92 93

  std::unique_ptr<std::thread> t_send_;
  std::unique_ptr<std::thread> t_get_;
};

};  // namespace detail
};  // namespace operators
};  // namespace paddle