c_allreduce_op.h 15.6 KB
Newer Older
1
/* Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
2 3 4 5 6 7 8 9 10 11 12 13 14 15

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

#pragma once
16 17

#include <string>
18 19 20 21

#include "paddle/fluid/framework/data_type.h"
#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/op_registry.h"
22 23
#include "paddle/fluid/memory/memcpy.h"
#include "paddle/fluid/memory/memory.h"
24
#include "paddle/fluid/platform/device/npu/npu_op_runner.h"
25

Z
zn 已提交
26 27 28
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) ||          \
    defined(PADDLE_WITH_ASCEND_CL) || defined(PADDLE_WITH_XPU_BKCL) || \
    defined(PADDLE_WITH_CNCL)
29
#include "paddle/fluid/platform/collective_helper.h"
30 31 32
#endif

#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)
33
#include "paddle/fluid/platform/device/gpu/nccl_helper.h"
34 35
#endif

36
#if defined(PADDLE_WITH_XPU_BKCL)
37
#include "paddle/fluid/platform/device/xpu/bkcl_helper.h"
38 39
#endif

40 41 42 43 44
#if defined(PADDLE_WITH_GLOO)
#include <gloo/allreduce.h>
#include "paddle/fluid/framework/fleet/gloo_wrapper.h"
#endif

45
#if defined(PADDLE_WITH_ASCEND_CL)
46
#include "paddle/fluid/platform/device/npu/hccl_helper.h"
47 48
#endif

Z
zn 已提交
49 50 51 52
#if defined(PADDLE_WITH_CNCL)
#include "paddle/fluid/platform/device/mlu/cncl_helper.h"
#endif

53 54 55 56
#if defined(PADDLE_WITH_ASCEND_CL)
DECLARE_bool(hccl_check_nan);
#endif

57 58 59
namespace paddle {
namespace operators {

60 61 62 63 64 65 66 67 68 69 70 71 72
enum ReduceType { kRedSum, kRedMax, kRedMin, kRedProd };

class CAllReduceOp : public framework::OperatorWithKernel {
 public:
  using framework::OperatorWithKernel::OperatorWithKernel;

  void InferShape(framework::InferShapeContext* ctx) const override {
    ctx->SetOutputDim("Out", ctx->GetInputDim("X"));
  }

 protected:
  framework::OpKernelType GetExpectedKernelType(
      const framework::ExecutionContext& ctx) const override {
73 74
    return framework::OpKernelType(
        OperatorWithKernel::IndicateVarDataType(ctx, "X"), ctx.GetPlace());
75 76 77 78 79 80 81
  }
};

template <ReduceType red_type, typename T>
class CAllReduceOpCPUKernel : public framework::OpKernel<T> {
 public:
  void Compute(const framework::ExecutionContext& ctx) const override {
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128
#if defined(PADDLE_WITH_GLOO)
    auto in = ctx.Input<framework::Tensor>("X");
    auto out = ctx.Output<framework::Tensor>("Out");

    auto place = ctx.GetPlace();
    int64_t send_numel = in->numel();
    const T* send_buff = in->data<T>();
    T* recv_buff = out->mutable_data<T>(in->dims(), place);
    auto gloo = paddle::framework::GlooWrapper::GetInstance();
    PADDLE_ENFORCE_EQ(
        gloo->IsInitialized(), true,
        platform::errors::PreconditionNotMet(
            "You must initialize the gloo environment first to use it."));
    gloo::AllreduceOptions opts(gloo->GetContext());
    opts.setInput(const_cast<T*>(send_buff), send_numel);
    opts.setOutput(recv_buff, send_numel);
    switch (red_type) {
      case kRedSum:
        opts.setReduceFunction(
            static_cast<void (*)(void*, const void*, const void*, size_t)>(
                &gloo::sum<T>));
        break;
      case kRedMax:
        opts.setReduceFunction(
            static_cast<void (*)(void*, const void*, const void*, size_t)>(
                &gloo::max<T>));
        break;
      case kRedMin:
        opts.setReduceFunction(
            static_cast<void (*)(void*, const void*, const void*, size_t)>(
                &gloo::min<T>));
        break;
      case kRedProd:
        opts.setReduceFunction(
            static_cast<void (*)(void*, const void*, const void*, size_t)>(
                &gloo::product<T>));
        break;
      default:
        PADDLE_ENFORCE_EQ(true, false,
                          platform::errors::InvalidArgument(
                              "Invalid reduce type: %d.", red_type));
    }
    gloo::allreduce(opts);
#else
    PADDLE_THROW(platform::errors::Unavailable(
        "PaddlePaddle should compile with GLOO by setting WITH_GLOO=ON"));
#endif
129 130 131
  }
};

132
#if defined(PADDLE_WITH_ASCEND_CL)
133 134 135 136
// return true if found_nan or return false;
inline bool ContainsNan(const paddle::platform::NPUDeviceContext& dev_ctx,
                        aclrtStream stream,
                        const paddle::framework::Tensor* in) {
137 138
  using Tensor = paddle::framework::Tensor;
  Tensor out(in->type());
139

140 141 142 143 144 145 146
  Tensor mean(in->type());
  mean.Resize({1});
  mean.mutable_data<float>(dev_ctx.GetPlace());
  std::vector<int> axes;
  for (int i = 0; i < in->dims().size(); ++i) {
    axes.push_back(i);
  }
147

148
  std::vector<float> vec;
149
  try {
150 151
    const auto& runner_mean = paddle::operators::NpuOpRunner(
        "ReduceMeanD", {*in}, {mean}, {{"axes", axes}, {"keep_dims", false}});
152
    paddle::framework::TensorToVector(mean, dev_ctx, &vec);
153
  } catch (...) {
154 155 156 157 158 159 160 161 162 163 164 165
    LOG(WARNING) << "ContainsNan catch exception";
    return true;
  }

  VLOG(4) << "reducemeand result:" << vec[0];
  if (std::isnan(static_cast<float>(vec[0]))) {
    LOG(WARNING) << "ContainsNan detects nan";
    return true;
  }

  if (std::isinf(static_cast<float>(vec[0]))) {
    LOG(WARNING) << "ContainsNan detects inf";
166 167
  }

168
  return false;
169
}
170

171 172
#endif

173 174 175 176 177
template <ReduceType red_type, typename T>
class CAllReduceOpASCENDKernel : public framework::OpKernel<T> {
 public:
  void Compute(const framework::ExecutionContext& ctx) const override {
#if defined(PADDLE_WITH_ASCEND_CL)
178 179
    auto in = ctx.Input<framework::Tensor>("X");
    auto out = ctx.Output<framework::Tensor>("Out");
180
    auto place = ctx.GetPlace();
181 182
    HcclDataType dtype =
        platform::ToHCCLDataType(framework::TransToProtoVarType(in->dtype()));
183 184 185
    int64_t numel = in->numel();

    void* sendbuff = reinterpret_cast<void*>(const_cast<T*>(in->data<T>()));
186
    out->mutable_data<T>(in->dims(), ctx.GetPlace());
187 188 189 190 191 192 193 194 195
    void* recvbuff = reinterpret_cast<void*>(out->data<T>());

    int ring_id = ctx.Attr<int>("ring_id");
    std::string group =
        std::string(HCOM_GROUP_PREFIX) + std::to_string(ring_id);
    auto comm =
        paddle::platform::HCCLCommContext::Instance().Get(ring_id, place);

    aclrtStream stream = nullptr;
196 197
    auto dev_ctx = static_cast<platform::NPUDeviceContext*>(
        platform::DeviceContextPool::Instance().Get(place));
198
    if (ctx.Attr<bool>("use_calc_stream")) {
199
      stream = dev_ctx->stream();
200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226
    } else {
      stream = comm->stream();
    }

    HcclReduceOp hccl_red_type = HCCL_REDUCE_SUM;
    switch (red_type) {
      case kRedSum:
        hccl_red_type = HCCL_REDUCE_SUM;
        break;

      case kRedMax:
        hccl_red_type = HCCL_REDUCE_MAX;
        break;

      case kRedMin:
        hccl_red_type = HCCL_REDUCE_MIN;
        break;

      case kRedProd:
        hccl_red_type = HCCL_REDUCE_PROD;
        break;

      default:
        PADDLE_THROW(platform::errors::InvalidArgument(
            "Invalid reduce type: %d", red_type));
    }

227 228 229 230 231 232 233 234 235 236 237
    VLOG(3) << "hccl allreduce, parameter is: "
            << "input num: " << in->dims() << "dtype: " << dtype
            << "hccl_red_type: " << hccl_red_type << ", group is: " << group
            << ", sendbuff:" << sendbuff << ", recvbuff:" << recvbuff
            << ", out_size:" << out->memory_size()
            << ", use_calc_stream:" << ctx.Attr<bool>("use_calc_stream")
            << ", stream:" << stream;

    framework::Tensor tmp;
    tmp.mutable_data<float>({8}, ctx.GetPlace());

238
    bool found_nan = false;
239

240
    auto d_type = framework::TransToProtoVarType(in->dtype());
241
    switch (d_type) {
242 243 244
      case framework::proto::VarType::FP16: {
        break;
      }
245
      case framework::proto::VarType::FP32: {
246 247
        if (FLAGS_hccl_check_nan) {
          VLOG(3) << "prepare to FoundNanInf";
Y
Yuang Liu 已提交
248 249
          // NOTE: performance relating, DO NOT REMOVE!
          ContainsNan(*dev_ctx, dev_ctx->stream(), in);
250
        }
251 252 253 254 255 256
        break;
      }
      default:
        break;
    }

257
    if (found_nan) {
258 259 260 261 262 263 264 265 266
      T inf = static_cast<T>(std::numeric_limits<float>::infinity());
      VLOG(4) << "fill input data constant inf";
      auto dims = in->dims();
      auto mutable_in = const_cast<framework::Tensor*>(in);
      FillNpuTensorWithConstant<T>(mutable_in, inf);
      mutable_in->Resize(dims);
    }

    VLOG(3) << "hccl allreduce, parameter is: "
267
            << "input num: " << numel << "dtype: " << dtype
268 269 270
            << "hccl_red_type: " << hccl_red_type << ", group is: " << group
            << ", sendbuff:" << sendbuff << ", recvbuff:" << recvbuff
            << ", out_size:" << out->memory_size();
271 272 273 274 275 276 277 278 279 280 281 282 283

    PADDLE_ENFORCE_NPU_SUCCESS(platform::dynload::HcclAllReduce(
        sendbuff, recvbuff, numel, dtype, hccl_red_type, comm->comm(),
        reinterpret_cast<void*>(stream)));

    out->Resize(in->dims());
#else
    PADDLE_THROW(platform::errors::PreconditionNotMet(
        "PaddlePaddle should compile with NPU."));
#endif
  }
};

284 285 286 287 288 289 290 291 292
template <ReduceType red_type, typename T>
class CAllReduceOpXPUKernel : public framework::OpKernel<T> {
 public:
  void Compute(const framework::ExecutionContext& ctx) const override {
#if defined(PADDLE_WITH_XPU_BKCL)
    auto in = ctx.Input<framework::Tensor>("X");
    auto out = ctx.Output<framework::Tensor>("Out");

    auto place = ctx.GetPlace();
293 294
    BKCLDataType dtype =
        platform::ToBKCLDataType(framework::TransToProtoVarType(in->dtype()));
295
    int64_t numel = in->numel();
296
    const void* sendbuff = in->data<T>();
297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346
    out->Resize(in->dims());
    void* recvbuff = out->mutable_data<T>(place);

    int rid = ctx.Attr<int>("ring_id");
    auto comm = platform::BKCLCommContext::Instance().Get(rid, place);

    XPUStream stream = nullptr;
    if (ctx.Attr<bool>("use_calc_stream")) {
      auto dev_ctx = platform::DeviceContextPool::Instance().Get(place);
      stream = static_cast<platform::XPUDeviceContext*>(dev_ctx)
                   ->x_context()
                   ->xpu_stream;
    } else {
      stream = comm->stream();
    }

    BKCLOp bkcl_red_type = BKCL_ADD;
    switch (red_type) {
      case kRedSum:
        bkcl_red_type = BKCL_ADD;
        break;

      case kRedMax:
        bkcl_red_type = BKCL_MAX;
        break;

      case kRedMin:
        bkcl_red_type = BKCL_MIN;
        break;

      case kRedProd:
        bkcl_red_type = BKCL_PRODUCT;
        break;

      default:
        PADDLE_THROW(platform::errors::InvalidArgument(
            "Invalid reduce type: %d", red_type));
    }

    PADDLE_ENFORCE_EQ(bkcl_all_reduce(comm->comm(), sendbuff, recvbuff, numel,
                                      dtype, bkcl_red_type, stream),
                      BKCL_SUCCESS, platform::errors::PreconditionNotMet(
                                        "BKCL all reduce failed"));
#else
    PADDLE_THROW(platform::errors::PreconditionNotMet(
        "PaddlePaddle should be compiled with XPU."));
#endif
  }
};

347 348
template <ReduceType red_type, typename T>
class CAllReduceOpCUDAKernel : public framework::OpKernel<T> {
349 350
 public:
  void Compute(const framework::ExecutionContext& ctx) const override {
351
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)
352 353 354
    auto in = ctx.Input<framework::Tensor>("X");
    auto out = ctx.Output<framework::Tensor>("Out");

355
    auto place = ctx.GetPlace();
356 357
    ncclDataType_t dtype =
        platform::ToNCCLDataType(framework::TransToProtoVarType(in->dtype()));
358
    int64_t numel = in->numel();
359
    const void* sendbuff = in->data<T>();
360 361 362 363
    out->Resize(in->dims());
    void* recvbuff = out->mutable_data<T>(place);

    int rid = ctx.Attr<int>("ring_id");
364
    auto comm = platform::NCCLCommContext::Instance().Get(rid, place);
365

366
    gpuStream_t stream = nullptr;
367 368 369 370 371 372 373
    if (ctx.Attr<bool>("use_calc_stream")) {
      auto dev_ctx = platform::DeviceContextPool::Instance().Get(place);
      stream = static_cast<platform::CUDADeviceContext*>(dev_ctx)->stream();
    } else {
      stream = comm->stream();
    }

374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392
    ncclRedOp_t nccl_red_type = ncclSum;
    switch (red_type) {
      case kRedSum:
        nccl_red_type = ncclSum;
        break;

      case kRedMax:
        nccl_red_type = ncclMax;
        break;

      case kRedMin:
        nccl_red_type = ncclMin;
        break;

      case kRedProd:
        nccl_red_type = ncclProd;
        break;

      default:
M
MRXLT 已提交
393 394
        PADDLE_THROW(platform::errors::InvalidArgument(
            "Invalid reduce type: %d", red_type));
395 396
    }

397
    PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclAllReduce(
398
        sendbuff, recvbuff, numel, dtype, nccl_red_type, comm->comm(), stream));
399
#else
M
MRXLT 已提交
400 401
    PADDLE_THROW(platform::errors::PreconditionNotMet(
        "PaddlePaddle should compile with GPU."));
402 403 404 405
#endif
  }
};

Z
zn 已提交
406 407 408 409 410 411 412 413 414 415
template <ReduceType red_type, typename T>
class CAllReduceOpMLUKernel : public framework::OpKernel<T> {
 public:
  void Compute(const framework::ExecutionContext& ctx) const override {
#if defined(PADDLE_WITH_CNCL)
    auto in = ctx.Input<framework::Tensor>("X");
    auto out = ctx.Output<framework::Tensor>("Out");

    auto place = ctx.GetPlace();
    cnclDataType_t dtype =
Z
zn 已提交
416
        platform::ToCNCLDataType(framework::TransToProtoVarType(in->dtype()));
Z
zn 已提交
417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464
    int64_t numel = in->numel();
    const void* sendbuff = in->data<T>();
    out->Resize(in->dims());
    void* recvbuff = out->mutable_data<T>(place);

    int rid = ctx.Attr<int>("ring_id");
    auto comm = platform::CNCLCommContext::Instance().Get(rid, place);

    mluStream stream = nullptr;
    if (ctx.Attr<bool>("use_calc_stream")) {
      auto dev_ctx = platform::DeviceContextPool::Instance().Get(place);
      stream = static_cast<platform::MLUDeviceContext*>(dev_ctx)->stream();
    } else {
      stream = comm->stream();
    }

    cnclReduceOp_t cncl_red_type = cnclSum;
    switch (red_type) {
      case kRedSum:
        cncl_red_type = cnclSum;
        break;

      case kRedMax:
        cncl_red_type = cnclMax;
        break;

      case kRedMin:
        cncl_red_type = cnclMin;
        break;

      case kRedProd:
        cncl_red_type = cnclProd;
        break;

      default:
        PADDLE_THROW(platform::errors::InvalidArgument(
            "Invalid reduce type: %d", red_type));
    }

    PADDLE_ENFORCE_MLU_SUCCESS(cnclAllReduce(
        sendbuff, recvbuff, numel, dtype, cncl_red_type, comm->comm(), stream));
#else
    PADDLE_THROW(platform::errors::PreconditionNotMet(
        "PaddlePaddle should compile with MLU."));
#endif
  }
};

465 466 467 468 469 470 471
class CAllReduceOpMaker : public framework::OpProtoAndCheckerMaker {
 public:
  void Make() {
    AddInput("X", "(Tensor), tensor to be allreduced.");
    AddOutput("Out", "(Tensor) the allreduced result.");
    AddAttr<int>("ring_id", "(int default 0) communication ring id.")
        .SetDefault(0);
472 473 474 475
#if defined(PADDLE_WITH_ASCEND_CL)
    AddAttr<std::string>("tag", "(string default tag) tag for all reduce.")
        .SetDefault("tag");
#endif
476 477 478 479
    AddAttr<bool>(
        "use_calc_stream",
        "(bool default false) eject CUDA operations to calculation stream.")
        .SetDefault(false);
L
lilong12 已提交
480 481 482 483 484 485
    AddAttr<bool>(
        "use_model_parallel",
        "(bool default false) use this op with model parallel mode. In model "
        "parallel mode, the backward is c_identity which returns itself for "
        "c_allreduce_sum.")
        .SetDefault(false);
486 487 488 489 490 491 492 493 494 495 496 497 498 499
    AddComment(string::Sprintf(R"DOC(
CAllReduce %s Operator

Call collective AllReduce with reduce type %s. If input and output are
the same variable, in-place allreduce will be used.
Reference: https://docs.nvidia.com/deeplearning/sdk/nccl-developer-guide/docs/usage/operations.html#allreduce
)DOC",
                               GetName(), GetName()));
  }

 protected:
  virtual std::string GetName() const = 0;
};

500 501
}  // namespace operators
}  // namespace paddle