send_v2_op.cu.cc 8.8 KB
Newer Older
L
lilong12 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/* Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

#include "paddle/fluid/operators/collective/send_v2_op.h"

17
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)
L
lilong12 已提交
18
#include "paddle/fluid/platform/collective_helper.h"
19
#include "paddle/fluid/platform/device/gpu/nccl_helper.h"
L
lilong12 已提交
20
#endif
21 22
#include "paddle/fluid/distributed/collective/ProcessGroup.h"
#include "paddle/phi/api/include/tensor.h"
L
lilong12 已提交
23 24 25 26

namespace paddle {
namespace operators {

27 28
#if (defined(PADDLE_WITH_RCCL) || defined(PADDLE_WITH_NCCL)) && \
    NCCL_VERSION_CODE >= 2703
29 30 31 32 33 34
void send_shape_info(const framework::Tensor& x,
                     const platform::Place& place,
                     const gpuStream_t& stream,
                     platform::NCCLComm* comm,
                     const int& peer,
                     distributed::ProcessGroup* group) {
35
  if (!group) {
36 37
    PADDLE_ENFORCE_EQ((stream != nullptr && comm != nullptr),
                      true,
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
                      platform::errors::InvalidArgument(
                          "NCCLComm and Stream should be provided if use NCCL "
                          "to send the shape info."));
  }
  paddle::experimental::DataType shape_dytpe =
      paddle::experimental::DataType::INT32;
  ncclDataType_t nccl_dtype =
      platform::ToNCCLDataType(framework::TransToProtoVarType(shape_dytpe));
  auto dims = x.dims();
  int shape_size = dims.size();

  // step1: send the shape size
  framework::Tensor cpu_shape_size_tensor(shape_dytpe);
  cpu_shape_size_tensor.Resize({1});
  cpu_shape_size_tensor.mutable_data(platform::CPUPlace(), shape_dytpe);
  auto* cpu_data = cpu_shape_size_tensor.data<int>();
  cpu_data[0] = shape_size;

  if (group) {
    std::vector<framework::Tensor> shape_size_tensor;
    shape_size_tensor.template emplace_back(cpu_shape_size_tensor);
    auto shape_size_task = group->Send(shape_size_tensor, peer);
  } else {
    // copy the shape size tensor to gpu and send
    framework::Tensor* gpu_shape_size_tensor =
        new framework::Tensor(shape_dytpe);
    gpu_shape_size_tensor->Resize({1});
    gpu_shape_size_tensor->mutable_data(place, shape_dytpe);
66 67
    framework::TensorCopySync(
        cpu_shape_size_tensor, place, gpu_shape_size_tensor);
68
    PADDLE_ENFORCE_GPU_SUCCESS(
69 70 71 72 73 74
        platform::dynload::ncclSend(gpu_shape_size_tensor->data<int>(),
                                    1,
                                    nccl_dtype,
                                    peer,
                                    comm->comm(),
                                    stream));
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
  }
  VLOG(3) << "send the shape size: " << shape_size << " to peer";

  // step2: send the shape
  framework::Tensor cpu_shape_tensor(shape_dytpe);
  cpu_shape_tensor.Resize({shape_size});
  cpu_shape_tensor.mutable_data(platform::CPUPlace(), shape_dytpe);
  auto* cpu_shape_data = cpu_shape_tensor.data<int>();
  for (int i = 0; i < shape_size; ++i) {
    cpu_shape_data[i] = dims[i];
  }

  if (group) {
    std::vector<framework::Tensor> shape_tensor;
    shape_tensor.template emplace_back(cpu_shape_tensor);
    auto shape_task = group->Send(shape_tensor, peer);
  } else {
    // copy the shape tensor to gpu and send
    framework::Tensor* gpu_shape_tensor = new framework::Tensor(shape_dytpe);
    gpu_shape_tensor->Resize({shape_size});
    gpu_shape_tensor->mutable_data(place, shape_dytpe);
    framework::TensorCopySync(cpu_shape_tensor, place, gpu_shape_tensor);
    PADDLE_ENFORCE_GPU_SUCCESS(
98 99 100 101 102 103
        platform::dynload::ncclSend(gpu_shape_tensor->data<int>(),
                                    shape_size,
                                    nccl_dtype,
                                    peer,
                                    comm->comm(),
                                    stream));
104 105 106 107 108
  }
  VLOG(3) << "send the shape: (" << dims << ") to peer";
}
#endif

L
lilong12 已提交
109 110 111 112
template <typename T>
class SendOpV2CUDAKernel : public framework::OpKernel<T> {
 public:
  void Compute(const framework::ExecutionContext& ctx) const override {
113 114
#if (defined(PADDLE_WITH_RCCL) || defined(PADDLE_WITH_NCCL)) && \
    NCCL_VERSION_CODE >= 2703
L
lilong12 已提交
115
    int rid = ctx.Attr<int>("ring_id");
116
    bool dynamic_shape = ctx.Attr<bool>("dynamic_shape");
L
lilong12 已提交
117
    PADDLE_ENFORCE_GE(
118 119
        rid,
        0,
L
lilong12 已提交
120 121 122 123 124
        platform::errors::InvalidArgument(
            "The ring_id (%d) for send_v2 op must be non-negative.", rid));

    int peer = ctx.Attr<int>("peer");
    PADDLE_ENFORCE_GE(
125 126
        peer,
        0,
L
lilong12 已提交
127 128
        platform::errors::InvalidArgument(
            "The peer (%d) for send_v2 op must be non-negative.", peer));
129 130 131 132 133
    auto map = distributed::ProcessGroupMapFromGid::getInstance();
    if (map->has(rid)) {
      // Use ProcessGroup
      distributed::ProcessGroup* pg = map->get(rid);
      auto x = ctx.Input<framework::LoDTensor>("X");
134 135 136 137

      if (dynamic_shape) {
        // dynamic shape for switch send/recv
        VLOG(3) << "send_v2 will use dynamic shape with recv_v2 for switch";
138 139
        send_shape_info(*x,
                        ctx.GetPlace(),
140
                        /* gpuStream_t */ nullptr,
141 142 143
                        /* NCCLComm* */ nullptr,
                        peer,
                        pg);
144 145 146
      }

      std::vector<phi::DenseTensor> in_tensor;
147 148 149 150
      in_tensor.push_back(*x);
      auto task = pg->Send(in_tensor, peer);
      return;
    }
151
    gpuStream_t stream = nullptr;
L
lilong12 已提交
152 153 154 155
    auto place = ctx.GetPlace();
    auto comm = platform::NCCLCommContext::Instance().Get(rid, place);
    if (ctx.Attr<bool>("use_calc_stream")) {
      auto dev_ctx = platform::DeviceContextPool::Instance().Get(place);
L
Leo Chen 已提交
156
      stream = static_cast<phi::GPUContext*>(dev_ctx)->stream();
L
lilong12 已提交
157 158 159 160
    } else {
      stream = comm->stream();
    }
    PADDLE_ENFORCE_LT(
161 162
        peer,
        comm->nranks(),
L
lilong12 已提交
163 164
        platform::errors::InvalidArgument("The value of peer (%d) you set must "
                                          "be less than comm->nranks (%d).",
165 166
                                          peer,
                                          comm->nranks()));
167 168 169

    auto* x_var = ctx.InputVar("X");
    if (x_var->IsType<framework::LoDTensorArray>()) {
170
      PADDLE_ENFORCE_EQ(
171 172
          dynamic_shape,
          false,
173 174
          platform::errors::InvalidArgument("Dynamic shape for send/recv not "
                                            "support LoDTensorArray for now."));
175 176 177 178 179
      auto& x_array = x_var->Get<framework::LoDTensorArray>();
      for (size_t idx = 0; idx < x_array.size(); idx++) {
        VLOG(3) << "LodTensorArray: idx(" << idx << ")";
        auto& x = x_array.at(idx);
        int numel = x.numel();
180 181
        ncclDataType_t dtype =
            platform::ToNCCLDataType(framework::TransToProtoVarType(x.dtype()));
182
        PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclSend(
183
            x.data<T>(), numel, dtype, peer, comm->comm(), stream));
184 185
        VLOG(3) << "rank " << comm->rank() << " send " << phi::product(x.dims())
                << " to " << peer;
186 187 188 189 190 191
      }
      return;
    }
    auto x = ctx.Input<framework::LoDTensor>("X");
    int numel = x->numel();

192 193
    if (dynamic_shape) {
      VLOG(3) << "send_v2 will use dynamic shape with recv_v2";
194 195 196 197 198
      send_shape_info(*x,
                      place,
                      stream,
                      comm,
                      peer,
199 200 201
                      /* ProcessGroup* */ nullptr);
    }

202 203
    ncclDataType_t dtype =
        platform::ToNCCLDataType(framework::TransToProtoVarType(x->dtype()));
204
    PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclSend(
L
lilong12 已提交
205
        x->data<T>(), numel, dtype, peer, comm->comm(), stream));
206
    VLOG(3) << "rank " << comm->rank() << " send " << phi::product(x->dims())
207
            << " to " << peer;
L
lilong12 已提交
208 209 210 211 212 213 214 215 216 217 218 219 220 221
#else
    PADDLE_THROW(platform::errors::Unavailable(
        "PaddlePaddle should be compiled with NCCL "
        "and NCCL version >= 2.7.3 is needed."));
#endif
  }
};

}  // namespace operators
}  // namespace paddle

namespace ops = paddle::operators;
namespace plat = paddle::platform;

222 223
REGISTER_OP_CUDA_KERNEL(send_v2,
                        ops::SendOpV2CUDAKernel<float>,
L
lilong12 已提交
224
                        ops::SendOpV2CUDAKernel<double>,
shaojie_wang's avatar
shaojie_wang 已提交
225
#if NCCL_VERSION_CODE >= 21000
226 227
                        ops::SendOpV2CUDAKernel<plat::bfloat16>,
#endif
L
lilong12 已提交
228 229
                        ops::SendOpV2CUDAKernel<int>,
                        ops::SendOpV2CUDAKernel<int64_t>,
L
lilong12 已提交
230
                        ops::SendOpV2CUDAKernel<int8_t>,
L
lilong12 已提交
231
                        ops::SendOpV2CUDAKernel<plat::float16>);