send_v2_op.cu.cc 8.1 KB
Newer Older
L
lilong12 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/* Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

#include "paddle/fluid/operators/collective/send_v2_op.h"

17
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)
L
lilong12 已提交
18
#include "paddle/fluid/platform/collective_helper.h"
19
#include "paddle/fluid/platform/device/gpu/nccl_helper.h"
L
lilong12 已提交
20
#endif
21 22
#include "paddle/fluid/distributed/collective/ProcessGroup.h"
#include "paddle/phi/api/include/tensor.h"
L
lilong12 已提交
23 24 25 26

namespace paddle {
namespace operators {

27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
#if (defined(PADDLE_WITH_RCCL) || defined(PADDLE_WITH_NCCL)) && \
    NCCL_VERSION_CODE >= 2703
void send_shape_info(const framework::Tensor& x, const platform::Place& place,
                     const gpuStream_t& stream, platform::NCCLComm* comm,
                     const int& peer, distributed::ProcessGroup* group) {
  if (!group) {
    PADDLE_ENFORCE_EQ((stream != nullptr && comm != nullptr), true,
                      platform::errors::InvalidArgument(
                          "NCCLComm and Stream should be provided if use NCCL "
                          "to send the shape info."));
  }
  paddle::experimental::DataType shape_dytpe =
      paddle::experimental::DataType::INT32;
  ncclDataType_t nccl_dtype =
      platform::ToNCCLDataType(framework::TransToProtoVarType(shape_dytpe));
  auto dims = x.dims();
  int shape_size = dims.size();

  // step1: send the shape size
  framework::Tensor cpu_shape_size_tensor(shape_dytpe);
  cpu_shape_size_tensor.Resize({1});
  cpu_shape_size_tensor.mutable_data(platform::CPUPlace(), shape_dytpe);
  auto* cpu_data = cpu_shape_size_tensor.data<int>();
  cpu_data[0] = shape_size;

  if (group) {
    std::vector<framework::Tensor> shape_size_tensor;
    shape_size_tensor.template emplace_back(cpu_shape_size_tensor);
    auto shape_size_task = group->Send(shape_size_tensor, peer);
  } else {
    // copy the shape size tensor to gpu and send
    framework::Tensor* gpu_shape_size_tensor =
        new framework::Tensor(shape_dytpe);
    gpu_shape_size_tensor->Resize({1});
    gpu_shape_size_tensor->mutable_data(place, shape_dytpe);
    framework::TensorCopySync(cpu_shape_size_tensor, place,
                              gpu_shape_size_tensor);
    PADDLE_ENFORCE_GPU_SUCCESS(
        platform::dynload::ncclSend(gpu_shape_size_tensor->data<int>(), 1,
                                    nccl_dtype, peer, comm->comm(), stream));
  }
  VLOG(3) << "send the shape size: " << shape_size << " to peer";

  // step2: send the shape
  framework::Tensor cpu_shape_tensor(shape_dytpe);
  cpu_shape_tensor.Resize({shape_size});
  cpu_shape_tensor.mutable_data(platform::CPUPlace(), shape_dytpe);
  auto* cpu_shape_data = cpu_shape_tensor.data<int>();
  for (int i = 0; i < shape_size; ++i) {
    cpu_shape_data[i] = dims[i];
  }

  if (group) {
    std::vector<framework::Tensor> shape_tensor;
    shape_tensor.template emplace_back(cpu_shape_tensor);
    auto shape_task = group->Send(shape_tensor, peer);
  } else {
    // copy the shape tensor to gpu and send
    framework::Tensor* gpu_shape_tensor = new framework::Tensor(shape_dytpe);
    gpu_shape_tensor->Resize({shape_size});
    gpu_shape_tensor->mutable_data(place, shape_dytpe);
    framework::TensorCopySync(cpu_shape_tensor, place, gpu_shape_tensor);
    PADDLE_ENFORCE_GPU_SUCCESS(
        platform::dynload::ncclSend(gpu_shape_tensor->data<int>(), shape_size,
                                    nccl_dtype, peer, comm->comm(), stream));
  }
  VLOG(3) << "send the shape: (" << dims << ") to peer";
}
#endif

L
lilong12 已提交
97 98 99 100
template <typename T>
class SendOpV2CUDAKernel : public framework::OpKernel<T> {
 public:
  void Compute(const framework::ExecutionContext& ctx) const override {
101 102
#if (defined(PADDLE_WITH_RCCL) || defined(PADDLE_WITH_NCCL)) && \
    NCCL_VERSION_CODE >= 2703
L
lilong12 已提交
103
    int rid = ctx.Attr<int>("ring_id");
104
    bool dynamic_shape = ctx.Attr<bool>("dynamic_shape");
L
lilong12 已提交
105 106 107 108 109 110 111 112 113 114
    PADDLE_ENFORCE_GE(
        rid, 0,
        platform::errors::InvalidArgument(
            "The ring_id (%d) for send_v2 op must be non-negative.", rid));

    int peer = ctx.Attr<int>("peer");
    PADDLE_ENFORCE_GE(
        peer, 0,
        platform::errors::InvalidArgument(
            "The peer (%d) for send_v2 op must be non-negative.", peer));
115 116 117 118 119
    auto map = distributed::ProcessGroupMapFromGid::getInstance();
    if (map->has(rid)) {
      // Use ProcessGroup
      distributed::ProcessGroup* pg = map->get(rid);
      auto x = ctx.Input<framework::LoDTensor>("X");
120 121 122 123 124 125 126 127 128 129

      if (dynamic_shape) {
        // dynamic shape for switch send/recv
        VLOG(3) << "send_v2 will use dynamic shape with recv_v2 for switch";
        send_shape_info(*x, ctx.GetPlace(),
                        /* gpuStream_t */ nullptr,
                        /* NCCLComm* */ nullptr, peer, pg);
      }

      std::vector<phi::DenseTensor> in_tensor;
130 131 132 133
      in_tensor.push_back(*x);
      auto task = pg->Send(in_tensor, peer);
      return;
    }
134
    gpuStream_t stream = nullptr;
L
lilong12 已提交
135 136 137 138 139 140 141 142 143 144 145 146 147
    auto place = ctx.GetPlace();
    auto comm = platform::NCCLCommContext::Instance().Get(rid, place);
    if (ctx.Attr<bool>("use_calc_stream")) {
      auto dev_ctx = platform::DeviceContextPool::Instance().Get(place);
      stream = static_cast<platform::CUDADeviceContext*>(dev_ctx)->stream();
    } else {
      stream = comm->stream();
    }
    PADDLE_ENFORCE_LT(
        peer, comm->nranks(),
        platform::errors::InvalidArgument("The value of peer (%d) you set must "
                                          "be less than comm->nranks (%d).",
                                          peer, comm->nranks()));
148 149 150

    auto* x_var = ctx.InputVar("X");
    if (x_var->IsType<framework::LoDTensorArray>()) {
151 152 153 154
      PADDLE_ENFORCE_EQ(
          dynamic_shape, false,
          platform::errors::InvalidArgument("Dynamic shape for send/recv not "
                                            "support LoDTensorArray for now."));
155 156 157 158 159
      auto& x_array = x_var->Get<framework::LoDTensorArray>();
      for (size_t idx = 0; idx < x_array.size(); idx++) {
        VLOG(3) << "LodTensorArray: idx(" << idx << ")";
        auto& x = x_array.at(idx);
        int numel = x.numel();
160 161
        ncclDataType_t dtype =
            platform::ToNCCLDataType(framework::TransToProtoVarType(x.dtype()));
162
        PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclSend(
163
            x.data<T>(), numel, dtype, peer, comm->comm(), stream));
164 165
        VLOG(3) << "rank " << comm->rank() << " send " << phi::product(x.dims())
                << " to " << peer;
166 167 168 169 170 171
      }
      return;
    }
    auto x = ctx.Input<framework::LoDTensor>("X");
    int numel = x->numel();

172 173 174 175 176 177
    if (dynamic_shape) {
      VLOG(3) << "send_v2 will use dynamic shape with recv_v2";
      send_shape_info(*x, place, stream, comm, peer,
                      /* ProcessGroup* */ nullptr);
    }

178 179
    ncclDataType_t dtype =
        platform::ToNCCLDataType(framework::TransToProtoVarType(x->dtype()));
180
    PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclSend(
L
lilong12 已提交
181
        x->data<T>(), numel, dtype, peer, comm->comm(), stream));
182
    VLOG(3) << "rank " << comm->rank() << " send " << phi::product(x->dims())
183
            << " to " << peer;
L
lilong12 已提交
184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201
#else
    PADDLE_THROW(platform::errors::Unavailable(
        "PaddlePaddle should be compiled with NCCL "
        "and NCCL version >= 2.7.3 is needed."));
#endif
  }
};

}  // namespace operators
}  // namespace paddle

namespace ops = paddle::operators;
namespace plat = paddle::platform;

REGISTER_OP_CUDA_KERNEL(send_v2, ops::SendOpV2CUDAKernel<float>,
                        ops::SendOpV2CUDAKernel<double>,
                        ops::SendOpV2CUDAKernel<int>,
                        ops::SendOpV2CUDAKernel<int64_t>,
L
lilong12 已提交
202
                        ops::SendOpV2CUDAKernel<int8_t>,
L
lilong12 已提交
203
                        ops::SendOpV2CUDAKernel<plat::float16>);