nccl_context.cc 7.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
//   Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "paddle/fluid/imperative/nccl_context.h"
16

17 18
namespace paddle {
namespace imperative {
19
#if defined(PADDLE_WITH_NCCL)
20 21 22 23
void NCCLParallelContext::RecvNCCLID(
    const std::string &ep,
    std::vector<ncclUniqueId> &nccl_ids) {  // NOLINT
  int nrings = nccl_ids.size();
24
  auto addr = paddle::string::Split(ep, ':');
25 26 27 28
  PADDLE_ENFORCE_EQ(
      addr.size(), 2UL,
      platform::errors::InvalidArgument(
          "The endpoint should contain host and port, but got %s.", ep));
29 30 31 32 33 34 35 36 37
  std::string host = addr[0];
  int port = std::stoi(addr[1]);

  int server_fd, new_socket;
  struct sockaddr_in address;
  int addrlen = sizeof(address);
  char buffer[1024] = {0};
  int opt = 0;
  // creating socket fd
38 39 40 41 42 43 44 45
  if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) {
    PADDLE_THROW(
        platform::errors::Unavailable("Create server file descriptor failed."));
  }

  if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt))) {
    PADDLE_THROW(platform::errors::Unavailable("Set socket options failed."));
  }
46 47 48 49 50

  address.sin_family = AF_INET;
  address.sin_addr.s_addr = INADDR_ANY;
  address.sin_port = htons(port);

51
  int try_times = 0;
52
  int retry_time = 0;
53 54
  while (true) {
    if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) {
55
      retry_time = 3 * (try_times + 1);
56
      LOG(WARNING) << "Socket bind worker " << ep
57 58 59 60 61 62 63 64 65
                   << (try_times < 9
                           ? " failed, try again after " +
                                 std::to_string(retry_time) + " seconds."
                           : " failed, try again after " +
                                 std::to_string(retry_time) +
                                 " seconds. Bind on endpoint " + ep +
                                 " failed. Please confirm whether the "
                                 "communication port or GPU card is occupied.");
      std::this_thread::sleep_for(std::chrono::seconds(retry_time));
66 67 68 69
      ++try_times;
      continue;
    }
    break;
70 71
  }

72
  VLOG(3) << "listening on: " << ep;
73 74 75 76
  if (listen(server_fd, 3) < 0) {
    PADDLE_THROW(platform::errors::Unavailable(
        "Listen on server file descriptor failed."));
  }
77 78 79

  if ((new_socket =
           accept(server_fd, reinterpret_cast<struct sockaddr *>(&address),
80 81 82 83 84 85 86 87
                  reinterpret_cast<socklen_t *>(&addrlen))) < 0) {
    PADDLE_THROW(platform::errors::Unavailable(
        "Accept the new socket file descriptor failed."));
  }

  if (read(new_socket, buffer, 1024) < 0) {
    PADDLE_THROW(platform::errors::Unavailable("Read from socket failed."));
  }
88 89

  VLOG(3) << "recevived the ncclUniqueId";
90 91

  memcpy(&nccl_ids[0], buffer, nrings * NCCL_UNIQUE_ID_BYTES);
92 93 94 95 96

  VLOG(3) << "closing the socket server: " << ep;
  close(server_fd);
}

97 98 99
void NCCLParallelContext::SendNCCLID(
    const std::string &ep, const std::vector<ncclUniqueId> &nccl_ids) {
  int nrings = nccl_ids.size();
100
  auto addr = paddle::string::Split(ep, ':');
101 102 103 104
  PADDLE_ENFORCE_EQ(
      addr.size(), 2UL,
      platform::errors::InvalidArgument(
          "The endpoint should contain host and port, but got %s.", ep));
105 106 107 108 109 110
  std::string host = addr[0];
  int port = std::stoi(addr[1]);
  int sock = 0;
  struct sockaddr_in serv_addr;
  char buffer[1024] = {0};

111 112
  memcpy(buffer, &nccl_ids[0], nrings * NCCL_UNIQUE_ID_BYTES);

113 114 115
  if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
    PADDLE_THROW(platform::errors::Unavailable("Create socket failed."));
  }
116 117 118 119 120

  memset(&serv_addr, '0', sizeof(serv_addr));
  serv_addr.sin_family = AF_INET;
  serv_addr.sin_port = htons(port);

121 122 123 124 125 126 127 128 129 130 131 132 133
  char *ip = NULL;
  struct hostent *hp;
  if ((hp = gethostbyname(host.c_str())) == NULL) {
    PADDLE_THROW(platform::errors::InvalidArgument(
        "Fail to get host by name %s.", host));
  }
  int i = 0;
  while (hp->h_addr_list[i] != NULL) {
    ip = inet_ntoa(*(struct in_addr *)hp->h_addr_list[i]);
    VLOG(3) << "gethostbyname  host:" << host << "  ->ip: " << ip;
    break;
  }
  if (inet_pton(AF_INET, ip, &serv_addr.sin_addr) <= 0) {
134 135
    PADDLE_THROW(platform::errors::Unavailable("Open address %s failed.", ep));
  }
136

137
  int try_times = 0;
138
  int retry_time = 0;
139 140
  while (true) {
    if (connect(sock, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0) {
141
      retry_time = 3 * (try_times + 1);
142 143
      LOG(WARNING)
          << "Socket connect worker " << ep
144 145 146 147 148 149 150 151
          << (try_times < 9
                  ? " failed, try again after " + std::to_string(retry_time) +
                        " seconds."
                  : " failed, try again after " + std::to_string(retry_time) +
                        " seconds. Maybe that some process is occupied the "
                        "GPUs of this node now, and you should kill those "
                        "process manually.");
      std::this_thread::sleep_for(std::chrono::seconds(retry_time));
152
      ++try_times;
153 154 155
      continue;
    }
    VLOG(3) << "sending the ncclUniqueId to " << ep;
156
    send(sock, buffer, NCCL_UNIQUE_ID_BYTES * nrings, 0);
157 158
    break;
  }
C
chengduo 已提交
159
  close(sock);
160 161
}

162 163 164
void NCCLParallelContext::BcastNCCLId(
    std::vector<ncclUniqueId> &nccl_ids,  // NOLINT
    int root) {
165 166
  if (strategy_.local_rank_ == root) {
    for (auto ep : strategy_.trainer_endpoints_) {
167
      if (ep != strategy_.current_endpoint_) SendNCCLID(ep, nccl_ids);
168 169
    }
  } else {
170
    RecvNCCLID(strategy_.current_endpoint_, nccl_ids);
171 172 173 174
  }
}

void NCCLParallelContext::Init() {
175 176 177 178 179 180
  std::vector<ncclUniqueId> nccl_ids;
  nccl_ids.resize(strategy_.nrings_);
  if (strategy_.local_rank_ == 0) {
    // generate the unique ncclid on the root worker
    for (size_t i = 0; i < nccl_ids.size(); ++i) {
      platform::dynload::ncclGetUniqueId(&nccl_ids[i]);
181
    }
182 183 184 185 186 187 188
    BcastNCCLId(nccl_ids, 0);
  } else {
    BcastNCCLId(nccl_ids, 0);
  }

  int gpu_id = BOOST_GET_CONST(platform::CUDAPlace, place_).device;
  for (int ring_id = 0; ring_id < strategy_.nrings_; ring_id++) {
189 190 191 192 193
    VLOG(0) << "init nccl context nranks: " << strategy_.nranks_
            << " local rank: " << strategy_.local_rank_ << " gpu id: " << gpu_id
            << " ring id: " << ring_id;
    // it will assign nccl_comm in CUDADeviceContext within ring_id
    platform::NCCLCommContext::Instance().CreateNCCLComm(
194 195
        &nccl_ids[ring_id], strategy_.nranks_, strategy_.local_rank_, gpu_id,
        ring_id);
196 197 198 199 200 201 202 203 204 205
  }
}

void NCCLParallelContext::AllReduceByStream(const framework::Variable &src,
                                            framework::Variable *dst,
                                            int ring_id, bool use_calc_stream) {
  PADDLE_ENFORCE_EQ(
      platform::is_gpu_place(place_), true,
      platform::errors::Unimplemented(
          "Dynamic graph mode does not support multi-CPU training yet."));
206
  AllReduce(src, dst, strategy_, ring_id, use_calc_stream);
207
}
208

209 210 211 212 213
paddle::platform::CUDADeviceContext *NCCLParallelContext::GetDeviceContext(
    int ring_id) {
  return platform::NCCLCommContext::Instance()
      .Get(ring_id, place_)
      ->dev_context();
214
}
215

216 217 218 219
#endif

}  //  namespace imperative
}  //  namespace paddle