tcp_store.h 3.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <iostream>
#include <memory>
#include <mutex>
#include <thread>
#include <unordered_map>

#include "paddle/fluid/distributed/store/store.h"
#include "paddle/fluid/distributed/store/tcp_utils.h"

namespace paddle {
namespace distributed {

enum class ReplyType { WAITING, STOP_WAIT };
30
enum class Command { ADD, GET, SET, WAIT, STOP };
31 32 33 34 35

namespace detail {

class MasterDaemon {
 public:
36 37
  static std::unique_ptr<MasterDaemon> start(SocketType listen_socket,
                                             int nranks);
38
  MasterDaemon() = delete;
39
  explicit MasterDaemon(SocketType listen_socket, int nranks);
40 41 42 43 44 45 46
  ~MasterDaemon();

 private:
  void run();
  void _do_add(SocketType socket);
  void _do_wait(SocketType socket);
  void _do_get(SocketType socket);
47
  void _do_set(SocketType socket);
48 49 50 51 52
  void _do_stop(SocketType socket);
  SocketType _listen_socket;
  std::vector<SocketType> _sockets;
  std::unordered_map<std::string, std::vector<uint8_t>> _store;
  std::thread _background_thread{};
53
  int _nranks;
54 55 56 57 58 59
  bool _stop = false;
};

class TCPServer {
 public:
  TCPServer() = default;
60
  static std::unique_ptr<TCPServer> create(std::uint16_t port, int nranks);
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102

 private:
  std::unique_ptr<MasterDaemon> _master_daemon;
};

class TCPClient {
 public:
  explicit TCPClient(SocketType socket) : _socket{socket} {}
  static std::unique_ptr<TCPClient> connect(const std::string host,
                                            uint16_t port);
  ~TCPClient() { tcputils::close_socket(_socket); }
  void send_command_for_key(Command type, const std::string& key);

  template <typename T>
  void send_value(const T& value);

  template <typename T>
  void send_vector(const std::vector<T>& value);
  template <typename T>
  std::vector<T> receive_vector();

  template <typename T>
  T receive_value();

 private:
  SocketType _socket;
};

}  // namespace detail

class TCPStore : public Store {
 public:
  static constexpr std::uint16_t kDefaultPort = 6170;
  explicit TCPStore(std::string host, uint16_t port = kDefaultPort,
                    bool is_master = false, size_t num_workers = 1,
                    std::chrono::seconds timeout = tcputils::kDefaultTimeout);

  ~TCPStore();

  int64_t add(const std::string& key, int64_t value) override;
  std::vector<uint8_t> get(const std::string& key) override;
  void wait(const std::string& key) override;
103
  void set(const std::string& key, const std::vector<uint8_t>& value) override;
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118

 private:
  void waitWorkers();
  std::unique_ptr<detail::TCPServer> _server;
  std::unique_ptr<detail::TCPClient> _client;

  const std::string _init_key = "init/";
  const std::string _key_prefix = "/";
  std::chrono::seconds _timeout;
  bool _is_master;
  int _num_workers;
};

}  // namespace distributed
}  // namespace paddle