epoll_comm_network.cpp 9.1 KB
Newer Older
W
willzhang4a58 已提交
1
#include "oneflow/core/comm_network/epoll/epoll_comm_network.h"
W
willzhang4a58 已提交
2
#include "oneflow/core/control/ctrl_client.h"
W
willzhang4a58 已提交
3
#include "oneflow/core/job/machine_context.h"
W
willzhang4a58 已提交
4 5 6 7 8 9 10

#ifdef PLATFORM_POSIX

namespace oneflow {

namespace {

11
sockaddr_in GetSockAddr(const std::string& addr, uint16_t port) {
W
willzhang4a58 已提交
12 13 14 15 16 17 18
  sockaddr_in sa;
  sa.sin_family = AF_INET;
  sa.sin_port = htons(port);
  PCHECK(inet_pton(AF_INET, addr.c_str(), &(sa.sin_addr)) == 1);
  return sa;
}

S
Shiyuan Shang-Guan 已提交
19
int32_t SockListen(int32_t listen_sockfd, uint16_t listen_port, int32_t total_machine_num) {
20
  sockaddr_in sa = GetSockAddr("0.0.0.0", listen_port);
S
Shiyuan Shang-Guan 已提交
21
  int32_t bind_result = bind(listen_sockfd, reinterpret_cast<sockaddr*>(&sa), sizeof(sa));
22 23 24 25 26 27 28 29 30 31
  if (bind_result == 0) {
    PCHECK(listen(listen_sockfd, total_machine_num) == 0);
    LOG(INFO) << "CommNet:Epoll listening on "
              << "0.0.0.0:" + std::to_string(listen_port);
  } else {
    PCHECK(errno == EACCES || errno == EADDRINUSE);
  }
  return bind_result;
}

W
willzhang4a58 已提交
32 33 34 35
int64_t GetMachineId(const sockaddr_in& sa) {
  char addr[INET_ADDRSTRLEN];
  memset(addr, '\0', sizeof(addr));
  PCHECK(inet_ntop(AF_INET, &(sa.sin_addr), addr, INET_ADDRSTRLEN));
36
  for (int64_t i = 0; i < Global<JobDesc>::Get()->TotalMachineNum(); ++i) {
W
willzhang4a58 已提交
37
    if (Global<JobDesc>::Get()->resource().machine(i).addr() == addr) { return i; }
W
willzhang4a58 已提交
38
  }
W
willzhang4a58 已提交
39
  UNIMPLEMENTED();
W
willzhang4a58 已提交
40 41
}

W
willzhang4a58 已提交
42
std::string GenPortKey(int64_t machine_id) { return "EpollPort/" + std::to_string(machine_id); }
W
willzhang4a58 已提交
43
void PushPort(int64_t machine_id, uint16_t port) {
W
willzhang4a58 已提交
44
  Global<CtrlClient>::Get()->PushKV(GenPortKey(machine_id), std::to_string(port));
W
willzhang4a58 已提交
45
}
W
willzhang4a58 已提交
46
void ClearPort(int64_t machine_id) { Global<CtrlClient>::Get()->ClearKV(GenPortKey(machine_id)); }
W
willzhang4a58 已提交
47
uint16_t PullPort(int64_t machine_id) {
W
willzhang4a58 已提交
48
  uint16_t port = 0;
49
  Global<CtrlClient>::Get()->PullKV(
W
willzhang4a58 已提交
50
      GenPortKey(machine_id), [&](const std::string& v) { port = oneflow_cast<uint16_t>(v); });
W
willzhang4a58 已提交
51
  return port;
W
willzhang4a58 已提交
52 53
}

W
willzhang4a58 已提交
54 55
}  // namespace

W
willzhang4a58 已提交
56
EpollCommNet::~EpollCommNet() {
W
willzhang4a58 已提交
57
  for (size_t i = 0; i < pollers_.size(); ++i) {
58
    LOG(INFO) << "CommNet Thread " << i << " finish";
S
Shiyuan Shang-Guan 已提交
59
    pollers_.at(i)->Stop();
W
willzhang4a58 已提交
60 61
  }
  OF_BARRIER();
W
willzhang4a58 已提交
62
  for (IOEventPoller* poller : pollers_) { delete poller; }
W
willzhang4a58 已提交
63
  for (auto& pair : sockfd2helper_) { delete pair.second; }
W
Will Zhang 已提交
64 65
}

W
willzhang4a58 已提交
66
void EpollCommNet::RegisterMemoryDone() {
S
Shiyuan Shang-Guan 已提交
67
  for (void* dst_token : mem_descs()) { dst_token2part_done_cnt_[dst_token] = 0; }
W
willzhang4a58 已提交
68 69
}

S
Shiyuan Shang-Guan 已提交
70
void EpollCommNet::SendActorMsg(int64_t dst_machine_id, const ActorMsg& actor_msg) const {
W
Will Zhang 已提交
71 72 73
  SocketMsg msg;
  msg.msg_type = SocketMsgType::kActor;
  msg.actor_msg = actor_msg;
S
Shiyuan Shang-Guan 已提交
74
  GetSocketHelper(dst_machine_id, epoll_conf_.link_num() - 1)->AsyncWrite(msg);
W
willzhang4a58 已提交
75 76
}

S
Shiyuan Shang-Guan 已提交
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
void EpollCommNet::RequestRead(int64_t dst_machine_id, void* src_token, void* dst_token,
                               void* read_id) const {
  int32_t total_byte_size = static_cast<const SocketMemDesc*>(src_token)->byte_size;
  CHECK_GT(total_byte_size, 0);

  int32_t part_length = (total_byte_size + epoll_conf_.link_num() - 1) / epoll_conf_.link_num();
  part_length = RoundUp(part_length, epoll_conf_.msg_segment_kbyte() * 1024);
  int32_t part_num = (total_byte_size + part_length - 1) / part_length;
  CHECK_LE(part_num, epoll_conf_.link_num());

  for (int32_t link_i = 0; link_i < part_num; ++link_i) {
    int32_t byte_size = (total_byte_size > part_length) ? (part_length) : (total_byte_size);
    CHECK_GT(byte_size, 0);
    total_byte_size -= byte_size;
    SocketMsg msg;
    msg.msg_type = SocketMsgType::kRequestRead;
S
Shiyuan Shang-Guan 已提交
93
    msg.request_read_msg.src_machine_id = Global<MachineCtx>::Get()->this_machine_id();
S
Shiyuan Shang-Guan 已提交
94 95 96 97 98 99 100 101 102
    msg.request_read_msg.src_token = src_token;
    msg.request_read_msg.dst_token = dst_token;
    msg.request_read_msg.offset = link_i * part_length;
    msg.request_read_msg.byte_size = byte_size;
    msg.request_read_msg.read_id = read_id;
    msg.request_read_msg.part_num = part_num;
    GetSocketHelper(dst_machine_id, link_i)->AsyncWrite(msg);
  }
  CHECK_EQ(total_byte_size, 0);
W
willzhang4a58 已提交
103 104
}

S
Shiyuan Shang-Guan 已提交
105
SocketMemDesc* EpollCommNet::NewMemDesc(void* ptr, size_t byte_size) const {
W
willzhang4a58 已提交
106 107 108 109 110 111
  SocketMemDesc* mem_desc = new SocketMemDesc;
  mem_desc->mem_ptr = ptr;
  mem_desc->byte_size = byte_size;
  return mem_desc;
}

S
Shiyuan Shang-Guan 已提交
112 113
EpollCommNet::EpollCommNet(const Plan& plan)
    : CommNetIf(plan), epoll_conf_(Global<JobDesc>::Get()->epoll_conf()) {
114
  pollers_.resize(Global<JobDesc>::Get()->CommNetWorkerNum(), nullptr);
S
Shiyuan Shang-Guan 已提交
115
  for (size_t i = 0; i < pollers_.size(); ++i) { pollers_.at(i) = new IOEventPoller; }
W
willzhang4a58 已提交
116
  InitSockets();
W
willzhang4a58 已提交
117
  for (IOEventPoller* poller : pollers_) { poller->Start(); }
W
Will Zhang 已提交
118 119
}

W
willzhang4a58 已提交
120
void EpollCommNet::InitSockets() {
121
  int64_t this_machine_id = Global<MachineCtx>::Get()->this_machine_id();
122
  auto this_machine = Global<JobDesc>::Get()->resource().machine(this_machine_id);
123
  int64_t total_machine_num = Global<JobDesc>::Get()->TotalMachineNum();
S
Shiyuan Shang-Guan 已提交
124
  machine_link_id2sockfds_.assign(total_machine_num * epoll_conf_.link_num(), -1);
W
willzhang4a58 已提交
125
  sockfd2helper_.clear();
W
willzhang4a58 已提交
126
  size_t poller_idx = 0;
S
Shiyuan Shang-Guan 已提交
127 128
  auto NewSocketHelper = [&](int32_t sockfd) {
    IOEventPoller* poller = pollers_.at(poller_idx);
W
willzhang4a58 已提交
129 130
    poller_idx = (poller_idx + 1) % pollers_.size();
    return new SocketHelper(sockfd, poller);
W
Will Zhang 已提交
131
  };
132

W
willzhang4a58 已提交
133
  // listen
S
Shiyuan Shang-Guan 已提交
134
  int32_t listen_sockfd = socket(AF_INET, SOCK_STREAM, 0);
135 136 137 138 139 140 141 142 143 144 145 146
  int32_t this_listen_port = Global<JobDesc>::Get()->resource().data_port();
  if (this_listen_port != -1) {
    CHECK_EQ(SockListen(listen_sockfd, this_listen_port, total_machine_num), 0);
    PushPort(this_machine_id,
             ((this_machine.data_port_agent() != -1) ? (this_machine.data_port_agent())
                                                     : (this_listen_port)));
  } else {
    for (this_listen_port = 1024; this_listen_port < MaxVal<uint16_t>(); ++this_listen_port) {
      if (SockListen(listen_sockfd, this_listen_port, total_machine_num) == 0) {
        PushPort(this_machine_id, this_listen_port);
        break;
      }
W
willzhang4a58 已提交
147
    }
148
    CHECK_LT(this_listen_port, MaxVal<uint16_t>());
W
willzhang4a58 已提交
149
  }
D
Daniel Sun 已提交
150
  int32_t src_machine_count = 0;
151

W
willzhang4a58 已提交
152
  // connect
S
Shiyuan Shang-Guan 已提交
153 154
  for (int64_t peer_mchn_id : peer_machine_id()) {
    if (peer_mchn_id < this_machine_id) {
D
Daniel Sun 已提交
155 156 157
      ++src_machine_count;
      continue;
    }
S
Shiyuan Shang-Guan 已提交
158 159
    uint16_t peer_port = PullPort(peer_mchn_id);
    auto peer_machine = Global<JobDesc>::Get()->resource().machine(peer_mchn_id);
160
    sockaddr_in peer_sockaddr = GetSockAddr(peer_machine.addr(), peer_port);
S
Shiyuan Shang-Guan 已提交
161 162 163 164 165 166 167
    for (int32_t link_i = 0; link_i < epoll_conf_.link_num(); ++link_i) {
      int32_t sockfd = socket(AF_INET, SOCK_STREAM, 0);
      PCHECK(connect(sockfd, reinterpret_cast<sockaddr*>(&peer_sockaddr), sizeof(peer_sockaddr))
             == 0);
      CHECK(sockfd2helper_.emplace(sockfd, NewSocketHelper(sockfd)).second);
      machine_link_id2sockfds_.at(peer_mchn_id * epoll_conf_.link_num() + link_i) = sockfd;
    }
W
willzhang4a58 已提交
168
  }
169

W
willzhang4a58 已提交
170
  // accept
D
Daniel Sun 已提交
171
  FOR_RANGE(int32_t, idx, 0, src_machine_count) {
W
willzhang4a58 已提交
172
    sockaddr_in peer_sockaddr;
W
willzhang4a58 已提交
173
    socklen_t len = sizeof(peer_sockaddr);
S
Shiyuan Shang-Guan 已提交
174 175 176 177 178 179 180
    for (int32_t link_i = 0; link_i < epoll_conf_.link_num(); ++link_i) {
      int32_t sockfd = accept(listen_sockfd, reinterpret_cast<sockaddr*>(&peer_sockaddr), &len);
      PCHECK(sockfd != -1);
      CHECK(sockfd2helper_.emplace(sockfd, NewSocketHelper(sockfd)).second);
      int64_t peer_mchn_id = GetMachineId(peer_sockaddr);
      machine_link_id2sockfds_.at(peer_mchn_id * epoll_conf_.link_num() + link_i) = sockfd;
    }
W
Will Zhang 已提交
181
  }
W
willzhang4a58 已提交
182
  PCHECK(close(listen_sockfd) == 0);
W
willzhang4a58 已提交
183
  ClearPort(this_machine_id);
184

W
willzhang4a58 已提交
185
  // useful log
S
Shiyuan Shang-Guan 已提交
186 187 188 189 190 191 192
  for (int64_t peer_mchn_id : peer_machine_id()) {
    FOR_RANGE(int32_t, link_i, 0, epoll_conf_.link_num()) {
      int32_t sockfd = machine_link_id2sockfds_.at(peer_mchn_id * epoll_conf_.link_num() + link_i);
      CHECK_GT(sockfd, 0);
      LOG(INFO) << "machine: " << peer_mchn_id << ", link index: " << link_i
                << ", sockfd: " << sockfd;
    }
W
willzhang4a58 已提交
193
  }
W
Will Zhang 已提交
194 195
}

S
Shiyuan Shang-Guan 已提交
196 197
SocketHelper* EpollCommNet::GetSocketHelper(int64_t machine_id, int32_t link_index) const {
  int32_t sockfd = machine_link_id2sockfds_.at(machine_id * epoll_conf_.link_num() + link_index);
W
willzhang4a58 已提交
198
  return sockfd2helper_.at(sockfd);
W
willzhang4a58 已提交
199 200
}

W
willzhang4a58 已提交
201
void EpollCommNet::DoRead(void* read_id, int64_t src_machine_id, void* src_token, void* dst_token) {
202 203 204
  SocketMsg msg;
  msg.msg_type = SocketMsgType::kRequestWrite;
  msg.request_write_msg.src_token = src_token;
W
willzhang4a58 已提交
205
  msg.request_write_msg.dst_machine_id = Global<MachineCtx>::Get()->this_machine_id();
206 207
  msg.request_write_msg.dst_token = dst_token;
  msg.request_write_msg.read_id = read_id;
S
Shiyuan Shang-Guan 已提交
208 209
  dst_token2part_done_cnt_.at(dst_token) = 0;
  GetSocketHelper(src_machine_id, epoll_conf_.link_num() - 1)->AsyncWrite(msg);
S
Shiyuan Shang-Guan 已提交
210 211 212 213 214
  {
    std::unique_lock<std::mutex> lck(read_done_mtx_);
    machine_id2read_done_order_[src_machine_id].push(read_id);
    read_id2done_status_.emplace(read_id, false);
  }
S
Shiyuan Shang-Guan 已提交
215 216
}

S
Shiyuan Shang-Guan 已提交
217 218
void EpollCommNet::PartReadDone(void* read_id, int64_t src_machine_id, void* dst_token,
                                int32_t part_num) {
S
Shiyuan Shang-Guan 已提交
219 220
  if (dst_token2part_done_cnt_.at(dst_token).fetch_add(1, std::memory_order_relaxed)
      == (part_num - 1)) {
S
Shiyuan Shang-Guan 已提交
221 222
    {
      std::unique_lock<std::mutex> lck(read_done_mtx_);
S
update  
Shiyuan Shang-Guan 已提交
223
      read_id2done_status_.at(read_id) = true;
S
Shiyuan Shang-Guan 已提交
224 225 226 227 228 229 230 231
      auto& read_done_order = machine_id2read_done_order_.at(src_machine_id);
      while (read_id2done_status_.at(read_done_order.front())) {
        void* item = read_done_order.front();
        ReadDone(item);
        read_id2done_status_.erase(item);
        read_done_order.pop();
      }
    }
S
Shiyuan Shang-Guan 已提交
232
  }
233 234
}

W
willzhang4a58 已提交
235 236 237
}  // namespace oneflow

#endif  // PLATFORM_POSIX