tcp_store.h 4.0 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

G
gongweibao 已提交
17 18 19 20 21 22 23 24 25 26
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>

#ifndef _WIN32
#include <unistd.h>
#endif

#include <array>
27 28 29 30 31 32
#include <iostream>
#include <memory>
#include <mutex>
#include <thread>
#include <unordered_map>

G
gongweibao 已提交
33
#include "paddle/fluid/distributed/store/socket.h"
34 35 36 37 38 39 40
#include "paddle/fluid/distributed/store/store.h"
#include "paddle/fluid/distributed/store/tcp_utils.h"

namespace paddle {
namespace distributed {

enum class ReplyType { WAITING, STOP_WAIT };
41
enum class Command { ADD, GET, SET, WAIT, STOP };
42 43 44 45 46

namespace detail {

class MasterDaemon {
 public:
47
  static std::unique_ptr<MasterDaemon> start(SocketType listen_socket,
48
                                             int nranks,
G
gongweibao 已提交
49
                                             int timeout);
50
  MasterDaemon() = delete;
G
gongweibao 已提交
51 52
  explicit MasterDaemon(SocketType listen_socket,
                        int nranks,
53
                        int stop_check_timeout);
54 55 56 57
  ~MasterDaemon();

 private:
  void run();
G
gongweibao 已提交
58
  void ProcessCommands(std::vector<struct pollfd>* p_fds);
59 60 61
  void _do_add(SocketType socket);
  void _do_wait(SocketType socket);
  void _do_get(SocketType socket);
62
  void _do_set(SocketType socket);
63 64 65 66
  SocketType _listen_socket;
  std::vector<SocketType> _sockets;
  std::unordered_map<std::string, std::vector<uint8_t>> _store;
  std::thread _background_thread{};
G
gongweibao 已提交
67 68 69 70 71 72 73
  int _nranks = -1;
  int _timeout = 0;

  void InitControlFd();
  void CloseControlFd();
  void StopByControlFd();
#ifdef _WIN32
L
LiYuRio 已提交
74
  HANDLE ghStopEvent_{};
G
gongweibao 已提交
75 76 77
#else
  std::array<int, 2> _control_fd{{-1, -1}};
#endif
78 79 80 81 82
};

class TCPServer {
 public:
  TCPServer() = default;
G
gongweibao 已提交
83 84
  static std::unique_ptr<TCPServer> create(std::uint16_t port,
                                           int nranks,
85
                                           int stop_check_timeout);
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115

 private:
  std::unique_ptr<MasterDaemon> _master_daemon;
};

class TCPClient {
 public:
  explicit TCPClient(SocketType socket) : _socket{socket} {}
  static std::unique_ptr<TCPClient> connect(const std::string host,
                                            uint16_t port);
  ~TCPClient() { tcputils::close_socket(_socket); }
  void send_command_for_key(Command type, const std::string& key);

  template <typename T>
  void send_value(const T& value);

  template <typename T>
  void send_vector(const std::vector<T>& value);
  template <typename T>
  std::vector<T> receive_vector();

  template <typename T>
  T receive_value();

 private:
  SocketType _socket;
};

}  // namespace detail

G
gongweibao 已提交
116
// TODO(gongwb) :Add IP6 support.
117 118 119
class TCPStore : public Store {
 public:
  static constexpr std::uint16_t kDefaultPort = 6170;
G
gongweibao 已提交
120 121 122 123 124
  explicit TCPStore(std::string host,
                    uint16_t port = kDefaultPort,
                    bool is_master = false,
                    size_t num_workers = 1,
                    int timeout = 900);
125 126 127 128 129 130

  ~TCPStore();

  int64_t add(const std::string& key, int64_t value) override;
  std::vector<uint8_t> get(const std::string& key) override;
  void wait(const std::string& key) override;
131
  void set(const std::string& key, const std::vector<uint8_t>& value) override;
132 133 134 135 136 137 138 139

 private:
  void waitWorkers();
  std::unique_ptr<detail::TCPServer> _server;
  std::unique_ptr<detail::TCPClient> _client;

  const std::string _init_key = "init/";
  const std::string _key_prefix = "/";
G
gongweibao 已提交
140

141 142 143 144 145 146
  bool _is_master;
  int _num_workers;
};

}  // namespace distributed
}  // namespace paddle