all_reduce_op_handle.cc 12.3 KB
Newer Older
Y
Yu Yang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
//   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
14
#include "paddle/fluid/framework/details/all_reduce_op_handle.h"
15

C
chengduoZH 已提交
16
#include "paddle/fluid/framework/details/container_cast.h"
C
chengduoZH 已提交
17
#include "paddle/fluid/framework/details/reduce_and_gather.h"
18
#include "paddle/fluid/platform/profiler.h"
Y
Stash  
Yu Yang 已提交
19

20
#ifdef PADDLE_WITH_NCCL
21 22
DECLARE_bool(sync_nccl_allreduce);
#endif
Y
Yancey1989 已提交
23

Y
Yu Yang 已提交
24 25 26
namespace paddle {
namespace framework {
namespace details {
C
chengduoZH 已提交
27

28
#if defined(PADDLE_WITH_NCCL)
X
Xin Pan 已提交
29 30
AllReduceOpHandle::AllReduceOpHandle(ir::Node *node,
                                     const std::vector<Scope *> &local_scopes,
31
                                     const std::vector<platform::Place> &places,
32
                                     const platform::NCCLCommunicator *ctxs)
33
    : NCCLOpHandleBase(node, places, ctxs), local_scopes_(local_scopes) {
34 35 36 37 38 39
  PADDLE_ENFORCE_EQ(places_.size(), local_scopes_.size(),
                    platform::errors::InvalidArgument(
                        "The number of places and the number of local scopes "
                        "should be equal, but got number of places is %d and "
                        "number of local scopes is %d.",
                        places_.size(), local_scopes_.size()));
Y
Yu Yang 已提交
40
}
41 42 43 44 45 46 47 48 49 50 51 52 53
#elif defined(PADDLE_WITH_XPU_BKCL)
AllReduceOpHandle::AllReduceOpHandle(ir::Node *node,
                                     const std::vector<Scope *> &local_scopes,
                                     const std::vector<platform::Place> &places,
                                     const platform::BKCLCommunicator *ctxs)
    : BKCLOpHandleBase(node, places, ctxs), local_scopes_(local_scopes) {
  PADDLE_ENFORCE_EQ(places_.size(), local_scopes_.size(),
                    platform::errors::InvalidArgument(
                        "The number of places and the number of local scopes "
                        "should be equal, but got number of places is %d and "
                        "number of local scopes is %d.",
                        places_.size(), local_scopes_.size()));
}
C
chengduoZH 已提交
54
#else
X
Xin Pan 已提交
55 56
AllReduceOpHandle::AllReduceOpHandle(ir::Node *node,
                                     const std::vector<Scope *> &local_scopes,
57
                                     const std::vector<platform::Place> &places)
58
    : OpHandleBase(node), local_scopes_(local_scopes), places_(places) {
59 60 61 62 63 64
  PADDLE_ENFORCE_EQ(places_.size(), local_scopes_.size(),
                    platform::errors::InvalidArgument(
                        "The number of places and the number of local scopes "
                        "should be equal, but got number of places is %d and "
                        "number of local scopes is %d.",
                        places_.size(), local_scopes_.size()));
65
}
C
chengduoZH 已提交
66
#endif
Y
Yu Yang 已提交
67

68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
void AllReduceOpHandle::RunImpl() {
  platform::RecordEvent record_event(Name());

  WaitInputVarGenerated();
  std::vector<VarHandleBase *> inputs = this->Inputs();
  std::vector<VarHandleBase *> outputs = this->Outputs();
  auto in_var_handles = DynamicCast<VarHandle>(inputs);
  auto out_var_handles = DynamicCast<VarHandle>(outputs);
  AllReduceImpl(in_var_handles, out_var_handles);
}

void AllReduceOpHandle::AllReduceImpl(
    const std::vector<VarHandle *> &in_var_handles,
    const std::vector<VarHandle *> &out_var_handles) {
  size_t num_places = places_.size();
83 84 85 86
  PADDLE_ENFORCE_EQ(in_var_handles.size(), num_places,
                    platform::errors::InvalidArgument(
                        "The NoDummyInputSize should be equal "
                        "to the number of places, but got NoDummyInputSize is "
87
                        "%d and the number of places is %d.",
88
                        in_var_handles.size(), num_places));
89 90
  PADDLE_ENFORCE_EQ(
      in_var_handles.size(), out_var_handles.size(),
91 92 93 94 95 96 97 98 99
      platform::errors::InvalidArgument(
          "The NoDummyInputSize and NoDummyOutputSize should be "
          "equal, but got NoDummyInputSize is %d and NoDummyOutputSize is %d.",
          in_var_handles.size(), out_var_handles.size()));
  PADDLE_ENFORCE_EQ(
      local_exec_scopes_.size(), num_places,
      platform::errors::InvalidArgument(
          "The number of local scopes should be equal "
          "to the number of places, but got the number of local scopes is "
100
          "%d and the number of places is %d.",
101
          in_var_handles.size(), num_places));
102 103 104 105 106 107 108

  std::vector<const void *> lod_tensor_data;
  std::vector<platform::Place> places;
  lod_tensor_data.reserve(num_places);
  places.reserve(num_places);
  int64_t numel = -1;
  bool is_gpu_place = false;
109 110 111
#if defined(PADDLE_WITH_XPU_BKCL)
  bool is_xpu_place = false;
#endif
112 113 114 115
  auto dtype = static_cast<framework::proto::VarType::Type>(0);
  for (size_t i = 0; i < local_exec_scopes_.size(); ++i) {
    auto &local_scope = local_exec_scopes_[i];
    auto var = local_scope->FindVar(in_var_handles[i]->name());
116 117 118
    PADDLE_ENFORCE_NOT_NULL(var, platform::errors::NotFound(
                                     "Variable %s is not found in local scope.",
                                     in_var_handles[i]->name()));
119 120 121 122
    auto &lod_tensor = var->Get<LoDTensor>();

    if (i == 0) {
      numel = static_cast<int64_t>(lod_tensor.numel());
123 124
      // only enforce place0, we will enforce other palce numel == place0 numel
      PADDLE_ENFORCE_GT(
125 126 127 128
          numel, 0,
          platform::errors::PreconditionNotMet(
              "The numel of tensor %s should be > 0, but got numel is %d.",
              in_var_handles[i]->name(), numel));
129 130
      dtype = lod_tensor.type();
      is_gpu_place = platform::is_gpu_place(lod_tensor.place());
131 132 133
#if defined(PADDLE_WITH_XPU_BKCL)
      is_xpu_place = platform::is_xpu_place(lod_tensor.place());
#endif
134
    }
135 136 137 138 139 140 141 142 143 144
    PADDLE_ENFORCE_EQ(
        numel, static_cast<int64_t>(lod_tensor.numel()),
        platform::errors::PreconditionNotMet(
            "The size of tensors of the same variable in different local "
            "scopes should be equal."));
    PADDLE_ENFORCE_EQ(
        dtype, lod_tensor.type(),
        platform::errors::PreconditionNotMet(
            "The dtype of tensors of the same variable in different local "
            "scopes should be equal."));
145 146 147 148 149 150
#if defined(PADDLE_WITH_XPU_BKCL)
    PADDLE_ENFORCE_EQ(is_xpu_place, platform::is_xpu_place(lod_tensor.place()),
                      platform::errors::PreconditionNotMet(
                          "The place type of tensors of the same variable "
                          "in different local scopes should be equal."));
#endif
151 152 153 154
    PADDLE_ENFORCE_EQ(is_gpu_place, platform::is_gpu_place(lod_tensor.place()),
                      platform::errors::PreconditionNotMet(
                          "The place type of tensors of the same variable "
                          "in different local scopes should be equal."));
155 156 157 158 159 160 161

    lod_tensor_data.emplace_back(lod_tensor.data<void>());
    places.emplace_back(lod_tensor.place());

    VLOG(10) << "place:" << i << ", input_name:" << in_var_handles[i]->name()
             << ", out_name:" << out_var_handles[i]->name();

162 163 164 165 166 167
    PADDLE_ENFORCE_EQ(
        in_var_handles[i]->name(), out_var_handles[i]->name(),
        platform::errors::InvalidArgument(
            "The name of input and output of all_reduce op should be equal, "
            "but got input is %s and output is %s.",
            in_var_handles[i]->name(), out_var_handles[i]->name()));
168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184
  }

  std::vector<std::string> grad_var_names;
  grad_var_names.reserve(num_places);
  for (auto &out_var : out_var_handles) {
    grad_var_names.emplace_back(out_var->Name());
  }

  AllReduceFunc(lod_tensor_data, dtype, numel, places, grad_var_names);
}

void AllReduceOpHandle::AllReduceFunc(
    std::vector<const void *> lod_tensor_data,
    const framework::proto::VarType::Type &dtype, int64_t numel,
    const std::vector<platform::Place> &places,
    const std::vector<std::string> &out_var_names) {
  if (is_gpu_place(places[0])) {
185
#if defined(PADDLE_WITH_NCCL)
186 187 188
    PADDLE_ENFORCE_NOT_NULL(nccl_ctxs_,
                            platform::errors::InvalidArgument(
                                "The nccl context should not be NULL."));
189 190 191 192 193 194 195 196 197 198 199
    ncclDataType_t nccl_dtype = platform::ToNCCLDataType(dtype);
    std::vector<std::function<void()>> all_reduce_calls;
    for (size_t i = 0; i < local_exec_scopes_.size(); ++i) {
      auto &p = places[i];
      void *buffer = const_cast<void *>(lod_tensor_data.at(i));
      all_reduce_calls.emplace_back([=] {
        NCCLAllReduce(p, buffer, buffer, numel, nccl_dtype, ncclSum);
      });
    }
    NCCLAllReduceFunc(all_reduce_calls);
#else
200 201
    PADDLE_THROW(
        platform::errors::PreconditionNotMet("Not compiled with CUDA."));
202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220
#endif
  } else if (is_xpu_place(places[0])) {
#if defined(PADDLE_WITH_XPU_BKCL)
    PADDLE_ENFORCE_NOT_NULL(bkcl_ctxs_,
                            platform::errors::InvalidArgument(
                                "The bkcl context should not be NULL."));
    BKCLDataType bkcl_dtype = platform::ToBKCLDataType(dtype);
    std::vector<std::function<void()>> all_reduce_calls;
    for (size_t i = 0; i < local_exec_scopes_.size(); ++i) {
      auto &p = places[i];
      void *buffer = const_cast<void *>(lod_tensor_data.at(i));
      all_reduce_calls.emplace_back([=] {
        BKCLAllReduce(p, buffer, buffer, numel, bkcl_dtype, BKCL_ADD);
      });
    }
    BKCLAllReduceFunc(all_reduce_calls);
#else
    PADDLE_THROW(
        platform::errors::PreconditionNotMet("Not compiled with BKCL."));
221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246
#endif
  } else {  // Special handle CPU only Operator's gradient. Like CRF
    auto &trg = *local_exec_scopes_[0]
                     ->FindVar(out_var_names[0])
                     ->GetMutable<LoDTensor>();

    // Reduce All Tensor to trg in CPU
    ReduceBufferData func(lod_tensor_data, trg.data<void>(), numel);
    VisitDataType(trg.type(), func);

    for (size_t i = 1; i < local_exec_scopes_.size(); ++i) {
      auto &scope = local_exec_scopes_[i];
      auto &p = places[i];
      auto *var = scope->FindVar(out_var_names[i]);

      size_t size = numel * SizeOfType(trg.type());
      RunAndRecordEvent(p, [&trg, var, p, size] {
        auto dst_ptr = var->GetMutable<framework::LoDTensor>()->data<void>();
        platform::CPUPlace cpu_place;
        memory::Copy(cpu_place, dst_ptr, cpu_place, trg.data<void>(), size);
      });
    }
  }
  VLOG(10) << Name() << " size:" << numel * SizeOfType(dtype);
}

247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267
#if defined(PADDLE_WITH_XPU_BKCL)
void AllReduceOpHandle::BKCLAllReduceFunc(
    const std::vector<std::function<void()>> &all_reduce_calls) {
  this->RunAndRecordEvent([&] {
    if (all_reduce_calls.size() == 1UL) {
      all_reduce_calls[0]();
    } else {
      PADDLE_ENFORCE_EQ(
          bkcl_group_start(), BKCL_SUCCESS,
          platform::errors::PreconditionNotMet("bkcl_group_start failed"));
      for (auto &call : all_reduce_calls) {
        call();
      }
      PADDLE_ENFORCE_EQ(
          bkcl_group_end(), BKCL_SUCCESS,
          platform::errors::PreconditionNotMet("bkcl_group_end failed"));
    }
  });
}
#endif

268
#if defined(PADDLE_WITH_NCCL)
269
void AllReduceOpHandle::NCCLAllReduceFunc(
G
gongweibao 已提交
270
    const std::vector<std::function<void()>> &all_reduce_calls) {
271 272 273 274 275 276 277 278 279 280 281 282
  this->RunAndRecordEvent([&] {
    if (all_reduce_calls.size() == 1UL) {
      // Do not use NCCLGroup when manage NCCL by per thread per device
      all_reduce_calls[0]();
    } else {
      platform::NCCLGroupGuard guard;
      for (auto &call : all_reduce_calls) {
        call();
      }
    }
  });

283 284 285 286
  SyncNCCLAllReduce();
}

void AllReduceOpHandle::SyncNCCLAllReduce() {
287 288
  if (FLAGS_sync_nccl_allreduce) {
    for (auto &p : places_) {
289
      int dev_id = BOOST_GET_CONST(platform::CUDAPlace, p).device;
290 291 292
      auto *nccl_ctxs =
          nccl_ctxs_->GetRunEnvNCCLCtx(run_order_, use_hierarchical_allreduce_);
      auto &nccl_ctx = nccl_ctxs->at(dev_id);
293
      auto stream = nccl_ctx.stream();
294 295
      PADDLE_ENFORCE_CUDA_SUCCESS(cudaStreamSynchronize(stream));
      PADDLE_ENFORCE_CUDA_SUCCESS(cudaGetLastError());
296 297 298 299 300
    }
  }
}
#endif

C
chengduoZH 已提交
301
std::string AllReduceOpHandle::Name() const { return "all_reduce"; }
Y
Yu Yang 已提交
302 303 304
}  // namespace details
}  // namespace framework
}  // namespace paddle