tcp_store.h 3.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <iostream>
#include <memory>
#include <mutex>
#include <thread>
#include <unordered_map>

#include "paddle/fluid/distributed/store/store.h"
#include "paddle/fluid/distributed/store/tcp_utils.h"

namespace paddle {
namespace distributed {

enum class ReplyType { WAITING, STOP_WAIT };
30
enum class Command { ADD, GET, SET, WAIT, STOP };
31 32 33 34 35

namespace detail {

class MasterDaemon {
 public:
36
  static std::unique_ptr<MasterDaemon> start(SocketType listen_socket,
37 38
                                             int nranks,
                                             int stop_check_timeout);
39
  MasterDaemon() = delete;
40 41
  explicit MasterDaemon(SocketType listen_socket, int nranks,
                        int stop_check_timeout);
42 43 44 45 46 47 48
  ~MasterDaemon();

 private:
  void run();
  void _do_add(SocketType socket);
  void _do_wait(SocketType socket);
  void _do_get(SocketType socket);
49
  void _do_set(SocketType socket);
50 51 52 53 54
  void _do_stop(SocketType socket);
  SocketType _listen_socket;
  std::vector<SocketType> _sockets;
  std::unordered_map<std::string, std::vector<uint8_t>> _store;
  std::thread _background_thread{};
55
  int _nranks;
56 57 58 59
  int _stop_check_timeout;
  bool _stop = false;  // all workers stopped
  std::chrono::time_point<std::chrono::system_clock> _stop_time;
  bool _has_stop = false;  // at least one worker stopped
60 61 62 63 64
};

class TCPServer {
 public:
  TCPServer() = default;
65 66
  static std::unique_ptr<TCPServer> create(std::uint16_t port, int nranks,
                                           int stop_check_timeout);
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101

 private:
  std::unique_ptr<MasterDaemon> _master_daemon;
};

class TCPClient {
 public:
  explicit TCPClient(SocketType socket) : _socket{socket} {}
  static std::unique_ptr<TCPClient> connect(const std::string host,
                                            uint16_t port);
  ~TCPClient() { tcputils::close_socket(_socket); }
  void send_command_for_key(Command type, const std::string& key);

  template <typename T>
  void send_value(const T& value);

  template <typename T>
  void send_vector(const std::vector<T>& value);
  template <typename T>
  std::vector<T> receive_vector();

  template <typename T>
  T receive_value();

 private:
  SocketType _socket;
};

}  // namespace detail

class TCPStore : public Store {
 public:
  static constexpr std::uint16_t kDefaultPort = 6170;
  explicit TCPStore(std::string host, uint16_t port = kDefaultPort,
                    bool is_master = false, size_t num_workers = 1,
102 103
                    std::chrono::seconds timeout = tcputils::kDefaultTimeout,
                    int stop_check_timeout = 900);
104 105 106 107 108 109

  ~TCPStore();

  int64_t add(const std::string& key, int64_t value) override;
  std::vector<uint8_t> get(const std::string& key) override;
  void wait(const std::string& key) override;
110
  void set(const std::string& key, const std::vector<uint8_t>& value) override;
111 112 113 114 115 116 117 118 119 120 121 122 123 124 125

 private:
  void waitWorkers();
  std::unique_ptr<detail::TCPServer> _server;
  std::unique_ptr<detail::TCPClient> _client;

  const std::string _init_key = "init/";
  const std::string _key_prefix = "/";
  std::chrono::seconds _timeout;
  bool _is_master;
  int _num_workers;
};

}  // namespace distributed
}  // namespace paddle