send_v2_op.cu.cc 8.8 KB
Newer Older
L
lilong12 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/* Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

#include "paddle/fluid/operators/collective/send_v2_op.h"

17
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)
L
lilong12 已提交
18
#include "paddle/fluid/platform/collective_helper.h"
19
#include "paddle/fluid/platform/device/gpu/nccl_helper.h"
L
lilong12 已提交
20
#endif
21 22
#include "paddle/fluid/distributed/collective/ProcessGroup.h"
#include "paddle/phi/api/include/tensor.h"
L
lilong12 已提交
23 24 25 26

namespace paddle {
namespace operators {

27 28
#if (defined(PADDLE_WITH_RCCL) || defined(PADDLE_WITH_NCCL)) && \
    NCCL_VERSION_CODE >= 2703
29
void send_shape_info(const phi::DenseTensor& x,
30 31 32 33 34
                     const platform::Place& place,
                     const gpuStream_t& stream,
                     platform::NCCLComm* comm,
                     const int& peer,
                     distributed::ProcessGroup* group) {
35
  if (!group) {
36 37
    PADDLE_ENFORCE_EQ((stream != nullptr && comm != nullptr),
                      true,
38 39 40 41 42 43 44 45 46 47 48 49
                      platform::errors::InvalidArgument(
                          "NCCLComm and Stream should be provided if use NCCL "
                          "to send the shape info."));
  }
  paddle::experimental::DataType shape_dytpe =
      paddle::experimental::DataType::INT32;
  ncclDataType_t nccl_dtype =
      platform::ToNCCLDataType(framework::TransToProtoVarType(shape_dytpe));
  auto dims = x.dims();
  int shape_size = dims.size();

  // step1: send the shape size
50
  phi::DenseTensor cpu_shape_size_tensor(shape_dytpe);
51 52 53 54 55 56
  cpu_shape_size_tensor.Resize({1});
  cpu_shape_size_tensor.mutable_data(platform::CPUPlace(), shape_dytpe);
  auto* cpu_data = cpu_shape_size_tensor.data<int>();
  cpu_data[0] = shape_size;

  if (group) {
57
    std::vector<phi::DenseTensor> shape_size_tensor;
58 59 60 61
    shape_size_tensor.template emplace_back(cpu_shape_size_tensor);
    auto shape_size_task = group->Send(shape_size_tensor, peer);
  } else {
    // copy the shape size tensor to gpu and send
62
    phi::DenseTensor* gpu_shape_size_tensor = new phi::DenseTensor(shape_dytpe);
63 64
    gpu_shape_size_tensor->Resize({1});
    gpu_shape_size_tensor->mutable_data(place, shape_dytpe);
65 66
    framework::TensorCopySync(
        cpu_shape_size_tensor, place, gpu_shape_size_tensor);
67
    PADDLE_ENFORCE_GPU_SUCCESS(
68 69 70 71 72 73
        platform::dynload::ncclSend(gpu_shape_size_tensor->data<int>(),
                                    1,
                                    nccl_dtype,
                                    peer,
                                    comm->comm(),
                                    stream));
74 75 76 77
  }
  VLOG(3) << "send the shape size: " << shape_size << " to peer";

  // step2: send the shape
78
  phi::DenseTensor cpu_shape_tensor(shape_dytpe);
79 80 81 82 83 84 85 86
  cpu_shape_tensor.Resize({shape_size});
  cpu_shape_tensor.mutable_data(platform::CPUPlace(), shape_dytpe);
  auto* cpu_shape_data = cpu_shape_tensor.data<int>();
  for (int i = 0; i < shape_size; ++i) {
    cpu_shape_data[i] = dims[i];
  }

  if (group) {
87
    std::vector<phi::DenseTensor> shape_tensor;
88 89 90 91
    shape_tensor.template emplace_back(cpu_shape_tensor);
    auto shape_task = group->Send(shape_tensor, peer);
  } else {
    // copy the shape tensor to gpu and send
92
    phi::DenseTensor* gpu_shape_tensor = new phi::DenseTensor(shape_dytpe);
93 94 95 96
    gpu_shape_tensor->Resize({shape_size});
    gpu_shape_tensor->mutable_data(place, shape_dytpe);
    framework::TensorCopySync(cpu_shape_tensor, place, gpu_shape_tensor);
    PADDLE_ENFORCE_GPU_SUCCESS(
97 98 99 100 101 102
        platform::dynload::ncclSend(gpu_shape_tensor->data<int>(),
                                    shape_size,
                                    nccl_dtype,
                                    peer,
                                    comm->comm(),
                                    stream));
103 104 105 106 107
  }
  VLOG(3) << "send the shape: (" << dims << ") to peer";
}
#endif

L
lilong12 已提交
108 109 110 111
template <typename T>
class SendOpV2CUDAKernel : public framework::OpKernel<T> {
 public:
  void Compute(const framework::ExecutionContext& ctx) const override {
112 113
#if (defined(PADDLE_WITH_RCCL) || defined(PADDLE_WITH_NCCL)) && \
    NCCL_VERSION_CODE >= 2703
L
lilong12 已提交
114
    int rid = ctx.Attr<int>("ring_id");
115
    bool dynamic_shape = ctx.Attr<bool>("dynamic_shape");
L
lilong12 已提交
116
    PADDLE_ENFORCE_GE(
117 118
        rid,
        0,
L
lilong12 已提交
119 120 121 122 123
        platform::errors::InvalidArgument(
            "The ring_id (%d) for send_v2 op must be non-negative.", rid));

    int peer = ctx.Attr<int>("peer");
    PADDLE_ENFORCE_GE(
124 125
        peer,
        0,
L
lilong12 已提交
126 127
        platform::errors::InvalidArgument(
            "The peer (%d) for send_v2 op must be non-negative.", peer));
128 129 130 131
    auto map = distributed::ProcessGroupMapFromGid::getInstance();
    if (map->has(rid)) {
      // Use ProcessGroup
      distributed::ProcessGroup* pg = map->get(rid);
132
      auto x = ctx.Input<phi::DenseTensor>("X");
133 134 135 136

      if (dynamic_shape) {
        // dynamic shape for switch send/recv
        VLOG(3) << "send_v2 will use dynamic shape with recv_v2 for switch";
137 138
        send_shape_info(*x,
                        ctx.GetPlace(),
139
                        /* gpuStream_t */ nullptr,
140 141 142
                        /* NCCLComm* */ nullptr,
                        peer,
                        pg);
143 144 145
      }

      std::vector<phi::DenseTensor> in_tensor;
146 147 148 149
      in_tensor.push_back(*x);
      auto task = pg->Send(in_tensor, peer);
      return;
    }
150
    gpuStream_t stream = nullptr;
L
lilong12 已提交
151 152 153 154
    auto place = ctx.GetPlace();
    auto comm = platform::NCCLCommContext::Instance().Get(rid, place);
    if (ctx.Attr<bool>("use_calc_stream")) {
      auto dev_ctx = platform::DeviceContextPool::Instance().Get(place);
L
Leo Chen 已提交
155
      stream = static_cast<phi::GPUContext*>(dev_ctx)->stream();
L
lilong12 已提交
156 157 158 159
    } else {
      stream = comm->stream();
    }
    PADDLE_ENFORCE_LT(
160 161
        peer,
        comm->nranks(),
L
lilong12 已提交
162 163
        platform::errors::InvalidArgument("The value of peer (%d) you set must "
                                          "be less than comm->nranks (%d).",
164 165
                                          peer,
                                          comm->nranks()));
166 167 168

    auto* x_var = ctx.InputVar("X");
    if (x_var->IsType<framework::LoDTensorArray>()) {
169
      PADDLE_ENFORCE_EQ(
170 171
          dynamic_shape,
          false,
172 173
          platform::errors::InvalidArgument("Dynamic shape for send/recv not "
                                            "support LoDTensorArray for now."));
174 175 176 177 178
      auto& x_array = x_var->Get<framework::LoDTensorArray>();
      for (size_t idx = 0; idx < x_array.size(); idx++) {
        VLOG(3) << "LodTensorArray: idx(" << idx << ")";
        auto& x = x_array.at(idx);
        int numel = x.numel();
179 180
        ncclDataType_t dtype =
            platform::ToNCCLDataType(framework::TransToProtoVarType(x.dtype()));
181
        PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclSend(
182
            x.data<T>(), numel, dtype, peer, comm->comm(), stream));
183 184
        VLOG(3) << "rank " << comm->rank() << " send " << phi::product(x.dims())
                << " to " << peer;
185 186 187
      }
      return;
    }
188
    auto x = ctx.Input<phi::DenseTensor>("X");
189 190
    int numel = x->numel();

191 192
    if (dynamic_shape) {
      VLOG(3) << "send_v2 will use dynamic shape with recv_v2";
193 194 195 196 197
      send_shape_info(*x,
                      place,
                      stream,
                      comm,
                      peer,
198 199 200
                      /* ProcessGroup* */ nullptr);
    }

201 202
    ncclDataType_t dtype =
        platform::ToNCCLDataType(framework::TransToProtoVarType(x->dtype()));
203
    PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclSend(
L
lilong12 已提交
204
        x->data<T>(), numel, dtype, peer, comm->comm(), stream));
205
    VLOG(3) << "rank " << comm->rank() << " send " << phi::product(x->dims())
206
            << " to " << peer;
L
lilong12 已提交
207 208 209 210 211 212 213 214 215 216 217 218 219 220
#else
    PADDLE_THROW(platform::errors::Unavailable(
        "PaddlePaddle should be compiled with NCCL "
        "and NCCL version >= 2.7.3 is needed."));
#endif
  }
};

}  // namespace operators
}  // namespace paddle

namespace ops = paddle::operators;
namespace plat = paddle::platform;

221 222
REGISTER_OP_CUDA_KERNEL(send_v2,
                        ops::SendOpV2CUDAKernel<float>,
L
lilong12 已提交
223
                        ops::SendOpV2CUDAKernel<double>,
224 225 226
#if CUDNN_VERSION_MIN(8, 1, 0) && NCCL_VERSION_CODE >= 21000
                        ops::SendOpV2CUDAKernel<plat::bfloat16>,
#endif
L
lilong12 已提交
227 228
                        ops::SendOpV2CUDAKernel<int>,
                        ops::SendOpV2CUDAKernel<int64_t>,
L
lilong12 已提交
229
                        ops::SendOpV2CUDAKernel<int8_t>,
L
lilong12 已提交
230
                        ops::SendOpV2CUDAKernel<plat::float16>);