reduce_op_handle.cc 11.8 KB
Newer Older
C
chengduoZH 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
//   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "paddle/fluid/framework/details/reduce_op_handle.h"
16

17
#include "paddle/fluid/framework/convert_utils.h"
C
chengduoZH 已提交
18
#include "paddle/fluid/framework/details/container_cast.h"
C
chengduoZH 已提交
19
#include "paddle/fluid/framework/details/reduce_and_gather.h"
C
chengduoZH 已提交
20
#include "paddle/fluid/framework/details/variable_visitor.h"
21
#include "paddle/fluid/platform/flags.h"
22
#include "paddle/fluid/platform/place.h"
23
#include "paddle/fluid/platform/profiler/event_tracing.h"
24
PADDLE_DEFINE_EXPORTED_bool(
25 26
    cpu_deterministic,
    false,
C
chengduo 已提交
27 28
    "Whether to make the result of computation deterministic in CPU side.");

C
chengduoZH 已提交
29 30 31 32
namespace paddle {
namespace framework {
namespace details {

33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
std::once_flag CollectiveContext::init_flag_;
std::unique_ptr<CollectiveContext> CollectiveContext::context_;

static inline std::string GetRemoteVarName(const std::string &var_name,
                                           int trainer_id) {
  return string::Sprintf("%s_merged_tmp@trainer_%d", var_name, trainer_id);
}

void ReduceOpHandle::Wait(
    const std::map<platform::Place, platform::DeviceContext *> &dev_ctxes) {
  // TODO(gongwb): use event wait?
  for (auto &dev_ctx : dev_ctxes) {
    dev_ctx.second->Wait();
  }
}

C
chengduoZH 已提交
49
void ReduceOpHandle::RunImpl() {
50 51
  platform::RecordEvent record_event(
      Name(), platform::TracerEventType::Communication, 1);
Y
Yancey1989 已提交
52

C
chengduoZH 已提交
53
  if (places_.size() == 1) return;
C
chengduoZH 已提交
54
  // the input and output may have dummy var.
C
chengduoZH 已提交
55
  auto in_var_handles = DynamicCast<VarHandle>(inputs_);
C
chengduoZH 已提交
56 57

  PADDLE_ENFORCE_EQ(
58 59
      in_var_handles.size(),
      places_.size(),
60 61 62
      platform::errors::InvalidArgument(
          "The number of inputs should equal to the number of places, but got "
          "the number of inputs is %d and the number of places is %d.",
63 64
          in_var_handles.size(),
          places_.size()));
C
chengduoZH 已提交
65

C
chengduoZH 已提交
66 67 68 69
  VarHandle *out_var_handle;
  {
    auto out_var_handles = DynamicCast<VarHandle>(outputs_);

70 71
    PADDLE_ENFORCE_EQ(out_var_handles.size(),
                      1UL,
72 73 74
                      platform::errors::InvalidArgument(
                          "The number of output should be one, but got %d.",
                          out_var_handles.size()));
C
chengduoZH 已提交
75 76
    out_var_handle = out_var_handles.front();
  }
C
chengduoZH 已提交
77

C
chengduoZH 已提交
78
  auto in_0_handle = in_var_handles[0];
C
chengduoZH 已提交
79

80
  auto &var_scopes = local_exec_scopes_;
C
chengduoZH 已提交
81 82

  auto pre_in_var =
G
gongweibao 已提交
83
      var_scopes.at(in_0_handle->scope_idx())->FindVar(in_0_handle->name());
84

85 86 87 88
  PADDLE_ENFORCE_NOT_NULL(
      pre_in_var,
      platform::errors::NotFound("Variable %s is not found in scope.",
                                 in_0_handle->name()));
C
chengduoZH 已提交
89

C
chengduoZH 已提交
90
  // NOTE: The Places of all input tensor must be all on CPU or all on GPU.
C
chengduoZH 已提交
91
  std::vector<platform::Place> in_places;  // used to get dev_ctx
C
chengduoZH 已提交
92
  for (auto *in_handle : in_var_handles) {
G
gongweibao 已提交
93
    in_places.emplace_back(in_handle->place());
C
chengduoZH 已提交
94
    auto in_var =
G
gongweibao 已提交
95
        var_scopes.at(in_handle->scope_idx())->FindVar(in_handle->name());
96 97

    PADDLE_ENFORCE_NOT_NULL(
98 99 100
        in_var,
        platform::errors::NotFound("Variable %s is not found in scope.",
                                   in_handle->name()));
101

C
chengduoZH 已提交
102
    VariableVisitor::EnforceShapeAndDTypeEQ(*pre_in_var, *in_var);
C
chengduoZH 已提交
103
  }
C
chengduoZH 已提交
104

G
gongweibao 已提交
105 106
  auto out_var = var_scopes.at(out_var_handle->scope_idx())
                     ->FindVar(out_var_handle->name());
107 108

  PADDLE_ENFORCE_NOT_NULL(
109 110 111
      out_var,
      platform::errors::NotFound("Variable %s is not found in scope.",
                                 out_var_handle->name()));
C
chengduoZH 已提交
112

C
chengduoZH 已提交
113 114 115 116 117
  // NOTE: The tensors' Place of input and output must be all on GPU or all on
  // CPU.
  auto in_p = VariableVisitor::GetMutableTensor(pre_in_var).place();
  platform::Place t_out_p;
  if (platform::is_gpu_place(in_p)) {
118 119
    PADDLE_ENFORCE_EQ(platform::is_gpu_place(out_var_handle->place()),
                      true,
120 121
                      platform::errors::PreconditionNotMet(
                          "Places of input and output must be all on GPU."));
G
gongweibao 已提交
122
    t_out_p = out_var_handle->place();
C
chengduoZH 已提交
123 124 125
  } else {
    t_out_p = platform::CPUPlace();
  }
C
chengduoZH 已提交
126

127
  if (pre_in_var->IsType<phi::SelectedRows>()) {
128
    this->RunAndRecordEvent([&] {
129 130
      std::vector<const phi::SelectedRows *> in_selected_rows =
          GetInputValues<phi::SelectedRows>(in_var_handles, var_scopes);
131 132 133 134 135 136 137 138

      const CollectiveContext &collective_context =
          *CollectiveContext::GetInstance();
      VLOG(10) << "GatherSelectedRows CollectiveContext:"
               << collective_context.String();

      // TODO(gongwb): add cpu support
      if (collective_context.endpoints_.size() <= 1 ||
139 140
          platform::is_cpu_place(in_places[0]) ||
          platform::is_cpu_place(t_out_p)) {
141
        GatherLocalSelectedRowsFunctor functor(
142 143 144 145
            in_selected_rows,
            in_places,
            dev_ctxes_,
            t_out_p,
146
            out_var->GetMutable<phi::SelectedRows>());
147 148
        WaitInputVarGenerated();
        functor();
149 150
        return;
      }
151
    });
C
chengduoZH 已提交
152
  } else {
153 154
    std::vector<const phi::DenseTensor *> lod_tensors =
        GetInputValues<phi::DenseTensor>(in_var_handles, var_scopes);
C
chengduo 已提交
155

C
chengduoZH 已提交
156
    if (paddle::platform::is_cpu_place(lod_tensors[0]->place())) {
157
      WaitInputVarGenerated();
158
      this->RunAndRecordEvent([&] {
C
chengduo 已提交
159 160 161 162 163 164
        // FIXME(zcd): The order of summing is important,
        // especially when the type of data is float or double.
        // For example, the result of `a+b+c+d` may be different
        // with the result of `c+a+b+d`, so the summing order should be fixed.
        if (!FLAGS_cpu_deterministic) {
          ReduceLoDTensor func(lod_tensors,
165
                               out_var->GetMutable<phi::DenseTensor>());
166 167
          VisitDataType(framework::TransToProtoVarType(lod_tensors[0]->dtype()),
                        func);
C
chengduo 已提交
168 169 170
        } else {
          // We sum lod_tensors to reduce_sum_trg which is in local_scopes_0
          // here, but it doesn't mean reduce_sum_trg must be in local_scopes_0.
171
          auto &reduce_sum_trg = *this->local_exec_scopes_[0]
G
gongweibao 已提交
172
                                      ->FindVar(out_var_handle->name())
173
                                      ->GetMutable<phi::DenseTensor>();
C
chengduo 已提交
174
          ReduceLoDTensor func(lod_tensors, &reduce_sum_trg);
175 176
          VisitDataType(framework::TransToProtoVarType(lod_tensors[0]->dtype()),
                        func);
C
chengduo 已提交
177

178
          auto trg = out_var->GetMutable<phi::DenseTensor>();
179
          if (reduce_sum_trg.data() != trg->data()) {
C
chengduo 已提交
180 181 182
            TensorCopy(reduce_sum_trg, platform::CPUPlace(), trg);
          }
        }
183
      });
C
chengduoZH 已提交
184
    } else if (paddle::platform::is_gpu_place(lod_tensors[0]->place())) {
185
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)
186
      auto pre_in = pre_in_var->Get<phi::DenseTensor>();
C
chengduoZH 已提交
187 188
      VariableVisitor::ShareDimsAndLoD(*pre_in_var, out_var);
      VariableVisitor::GetMutableTensor(out_var).mutable_data(
189
          out_var_handle->place(), pre_in.dtype());
C
chengduoZH 已提交
190

G
gongweibao 已提交
191
      auto out_p = out_var_handle->place();
Z
Zhenghai Zhang 已提交
192
      int root_id = out_p.device;  // NOLINT
C
chengduoZH 已提交
193
      std::vector<std::function<void()>> all_reduce_calls;
C
chengduoZH 已提交
194
      for (size_t i = 0; i < var_scopes.size(); ++i) {
C
chengduoZH 已提交
195
        auto &p = in_places[i];
C
chengduoZH 已提交
196
        auto &lod_tensor = *lod_tensors[i];
C
chengduoZH 已提交
197

Z
Zhenghai Zhang 已提交
198
        int dev_id = p.device;  // NOLINT
C
chengduoZH 已提交
199
        auto &nccl_ctx = nccl_ctxs_->at(dev_id);
C
chengduoZH 已提交
200

201
        void *buffer = const_cast<void *>(lod_tensor.data());
C
chengduoZH 已提交
202
        void *recvbuffer = nullptr;
C
chengduoZH 已提交
203
        if (root_id == dev_id) {
204 205
          recvbuffer = out_var->GetMutable<phi::DenseTensor>()->mutable_data(
              out_var_handle->place());
C
chengduoZH 已提交
206 207
        }

208 209
        int type = platform::ToNCCLDataType(
            framework::TransToProtoVarType(lod_tensor.dtype()));
C
chengduoZH 已提交
210 211 212
        size_t numel = static_cast<size_t>(lod_tensor.numel());
        all_reduce_calls.emplace_back(
            [buffer, recvbuffer, type, numel, root_id, &nccl_ctx] {
213
              PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclReduce(
214 215 216 217 218 219 220 221
                  buffer,
                  recvbuffer,
                  numel,
                  static_cast<ncclDataType_t>(type),
                  ncclSum,
                  root_id,
                  nccl_ctx.comm_,
                  nccl_ctx.stream()));
C
chengduoZH 已提交
222
            });
C
chengduoZH 已提交
223 224
      }

225
      WaitInputVarGenerated();
C
chengduoZH 已提交
226 227 228 229 230 231
      this->RunAndRecordEvent([&] {
        platform::NCCLGroupGuard guard;
        for (auto &call : all_reduce_calls) {
          call();
        }
      });
C
chengduoZH 已提交
232
#else
233 234
      PADDLE_THROW(
          platform::errors::PreconditionNotMet("Not compiled with CUDA."));
235 236 237
#endif
    } else if (paddle::platform::is_xpu_place(lod_tensors[0]->place())) {
#if defined(PADDLE_WITH_XPU_BKCL)
238
      auto pre_in = pre_in_var->Get<phi::DenseTensor>();
239 240
      VariableVisitor::ShareDimsAndLoD(*pre_in_var, out_var);
      VariableVisitor::GetMutableTensor(out_var).mutable_data(
241
          out_var_handle->place(), pre_in.dtype());
242 243

      auto out_p = out_var_handle->place();
244
      int root_id = out_p.device;
245 246 247 248 249
      std::vector<std::function<void()>> all_reduce_calls;
      for (size_t i = 0; i < var_scopes.size(); ++i) {
        auto &p = in_places[i];
        auto &lod_tensor = *lod_tensors[i];

250
        int dev_id = p.device;
251 252
        auto &bkcl_ctx = bkcl_ctxs_->at(dev_id);

253
        void *buffer = const_cast<void *>(lod_tensor.data());
254 255
        void *recvbuffer = nullptr;
        if (root_id == dev_id) {
256 257
          recvbuffer = out_var->GetMutable<phi::DenseTensor>()->mutable_data(
              out_var_handle->place());
258 259
        }

260 261
        int type = platform::ToBKCLDataType(
            framework::TransToProtoVarType(lod_tensor.dtype()));
262
        size_t numel = static_cast<size_t>(lod_tensor.numel());
263 264 265
        all_reduce_calls.emplace_back(
            [buffer, recvbuffer, type, numel, root_id, &bkcl_ctx] {
              PADDLE_ENFORCE_EQ(
266 267 268 269 270 271 272 273
                  bkcl_reduce(bkcl_ctx.comm(),
                              buffer,
                              recvbuffer,
                              numel,
                              static_cast<BKCLDataType>(type),
                              BKCL_ADD,
                              root_id,
                              nullptr),
274 275 276
                  BKCL_SUCCESS,
                  platform::errors::Unavailable("bkcl_all_reduce failed"));
            });
277 278 279 280 281
      }

      WaitInputVarGenerated();
      this->RunAndRecordEvent([&] {
        PADDLE_ENFORCE_EQ(
282 283
            bkcl_group_start(),
            BKCL_SUCCESS,
284 285 286 287 288
            platform::errors::Unavailable("bkcl_group_start failed"));
        for (auto &call : all_reduce_calls) {
          call();
        }
        PADDLE_ENFORCE_EQ(
289 290
            bkcl_group_end(),
            BKCL_SUCCESS,
291 292 293 294 295
            platform::errors::Unavailable("bkcl_group_end failed"));
      });
#else
      PADDLE_THROW(
          platform::errors::PreconditionNotMet("Not compiled with XPU."));
C
chengduoZH 已提交
296 297
#endif
    } else {
298
      PADDLE_THROW(platform::errors::InvalidArgument(
299 300
          "The place of tensor should be CPUPlace, CUDAPlace or XPUPlace, but "
          "got %s.",
301
          lod_tensors[0]->place()));
C
chengduoZH 已提交
302 303 304
    }
  }
}
C
chengduoZH 已提交
305

C
chengduoZH 已提交
306 307 308
template <typename T>
std::vector<const T *> ReduceOpHandle::GetInputValues(
    const std::vector<VarHandle *> &in_var_handles,
309
    const std::vector<Scope *> &var_scopes) const {
C
chengduoZH 已提交
310 311
  std::vector<const T *> in_selected_rows;
  for (auto *in_handle : in_var_handles) {
G
gongweibao 已提交
312 313
    auto &in_sr = var_scopes.at(in_handle->scope_idx())
                      ->FindVar(in_handle->name())
C
chengduoZH 已提交
314 315
                      ->Get<T>();
    in_selected_rows.emplace_back(&in_sr);
C
chengduoZH 已提交
316
  }
C
chengduoZH 已提交
317
  return in_selected_rows;
C
chengduoZH 已提交
318 319
}

C
chengduoZH 已提交
320 321 322 323
std::string ReduceOpHandle::Name() const { return "reduce"; }
}  // namespace details
}  // namespace framework
}  // namespace paddle