nccl_context.cc 4.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
//   Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "paddle/fluid/imperative/nccl_context.h"
16
#include "paddle/fluid/platform/collective_helper.h"
17 18 19

namespace paddle {
namespace imperative {
20
#if defined(PADDLE_WITH_NCCL)
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
void NCCLParallelContext::RecvNCCLID(const std::string &ep,
                                     ncclUniqueId *nccl_id) {
  auto addr = paddle::string::Split(ep, ':');
  PADDLE_ENFORCE_EQ(addr.size(), 2UL,
                    "The endpoint should contain host and port: %s", ep);
  std::string host = addr[0];
  int port = std::stoi(addr[1]);

  int server_fd, new_socket;
  struct sockaddr_in address;
  int addrlen = sizeof(address);
  char buffer[1024] = {0};
  int opt = 0;
  // creating socket fd
  if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0)
    PADDLE_THROW("create server fd failed");
37
  if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)))
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
    PADDLE_THROW("set socket opt failed");

  address.sin_family = AF_INET;
  address.sin_addr.s_addr = INADDR_ANY;
  address.sin_port = htons(port);

  if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0)
    PADDLE_THROW("binding failed on ep: %s", ep);
  VLOG(3) << "listening on: " << ep;
  if (listen(server_fd, 3) < 0) PADDLE_THROW("listen on server fd failed");

  if ((new_socket =
           accept(server_fd, reinterpret_cast<struct sockaddr *>(&address),
                  reinterpret_cast<socklen_t *>(&addrlen))) < 0)
    PADDLE_THROW("accept the new socket fd failed");

  if (read(new_socket, buffer, 1024) < 0)
    PADDLE_THROW("reading the ncclUniqueId from socket failed");
  VLOG(3) << "recevived the ncclUniqueId";
  memcpy(nccl_id, buffer, NCCL_UNIQUE_ID_BYTES);

  VLOG(3) << "closing the socket server: " << ep;
  close(server_fd);
}

void NCCLParallelContext::SendNCCLID(const std::string &ep,
                                     ncclUniqueId *nccl_id) {
  auto addr = paddle::string::Split(ep, ':');
  PADDLE_ENFORCE_EQ(addr.size(), 2UL,
                    "The endpoint should contain host and port: %s", ep);
  std::string host = addr[0];
  int port = std::stoi(addr[1]);
  // struct sockaddr_in address;
  int sock = 0;
  struct sockaddr_in serv_addr;
  char buffer[1024] = {0};

  memcpy(buffer, nccl_id, NCCL_UNIQUE_ID_BYTES);
  if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0)
    PADDLE_THROW("create socket failed");

  memset(&serv_addr, '0', sizeof(serv_addr));
  serv_addr.sin_family = AF_INET;
  serv_addr.sin_port = htons(port);

  if (inet_pton(AF_INET, host.c_str(), &serv_addr.sin_addr) <= 0)
    PADDLE_THROW("invalied address: %s", ep);

86
  int try_times = 0;
87 88 89
  while (true) {
    if (connect(sock, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0) {
      VLOG(0) << "worker: " << ep
90 91 92 93 94 95
              << (try_times < 5 ? " is not ready, will retry after 3 seconds..."
                                : " is not ready. Maybe that some process "
                                  "is occupied the GPUs of this node now, "
                                  "and you should kill those process manually. "
                                  "Will retry after 3 seconds...");

96
      std::this_thread::sleep_for(std::chrono::seconds(3));
97
      ++try_times;
98 99 100 101 102 103
      continue;
    }
    VLOG(3) << "sending the ncclUniqueId to " << ep;
    send(sock, buffer, NCCL_UNIQUE_ID_BYTES, 0);
    break;
  }
C
chengduo 已提交
104
  close(sock);
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
}

void NCCLParallelContext::BcastNCCLId(ncclUniqueId *nccl_id, int root) {
  if (strategy_.local_rank_ == root) {
    for (auto ep : strategy_.trainer_endpoints_) {
      if (ep != strategy_.current_endpoint_) SendNCCLID(ep, nccl_id);
    }
  } else {
    RecvNCCLID(strategy_.current_endpoint_, nccl_id);
  }
}

void NCCLParallelContext::Init() {
  ncclUniqueId nccl_id;
  if (strategy_.local_rank_ == 0) {
    // generate the unique ncclid on the root worker
    platform::dynload::ncclGetUniqueId(&nccl_id);
    BcastNCCLId(&nccl_id, 0);
  } else {
    BcastNCCLId(&nccl_id, 0);
  }
  int gpu_id = boost::get<platform::CUDAPlace>(place_).device;
  VLOG(0) << "init nccl context nranks: " << strategy_.nranks_
          << " local rank: " << strategy_.local_rank_ << " gpu id: " << gpu_id;

130 131 132
  // it will assign nccl_comm in CUDADeviceContext within ring_id 0
  platform::NCCLCommContext::Instance().CreateNCCLComm(
      &nccl_id, strategy_.nranks_, strategy_.local_rank_, gpu_id, 0);
133 134 135 136 137
}
#endif

}  //  namespace imperative
}  //  namespace paddle