reduce_op_handle.cc 12.7 KB
Newer Older
C
chengduoZH 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
//   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "paddle/fluid/framework/details/reduce_op_handle.h"
16
#include <memory>
C
chengduoZH 已提交
17
#include "paddle/fluid/framework/details/container_cast.h"
C
chengduoZH 已提交
18
#include "paddle/fluid/framework/details/reduce_and_gather.h"
C
chengduoZH 已提交
19
#include "paddle/fluid/framework/details/variable_visitor.h"
20 21 22 23 24 25
#if defined PADDLE_WITH_CUDA && defined PADDLE_WITH_DISTRIBUTE
#include "paddle/fluid/operators/distributed/collective_client.h"
#include "paddle/fluid/operators/distributed/collective_server.h"
#include "paddle/fluid/operators/distributed/request_handler.h"
#endif
#include "paddle/fluid/operators/math/selected_rows_functor.h"
26
#include "paddle/fluid/platform/profiler.h"
C
chengduoZH 已提交
27

C
chengduo 已提交
28 29 30 31
DEFINE_bool(
    cpu_deterministic, false,
    "Whether to make the result of computation deterministic in CPU side.");

C
chengduoZH 已提交
32 33 34 35
namespace paddle {
namespace framework {
namespace details {

36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
std::once_flag CollectiveContext::init_flag_;
std::unique_ptr<CollectiveContext> CollectiveContext::context_;

static inline std::string GetRemoteVarName(const std::string &var_name,
                                           int trainer_id) {
  return string::Sprintf("%s_merged_tmp@trainer_%d", var_name, trainer_id);
}

void ReduceOpHandle::Wait(
    const std::map<platform::Place, platform::DeviceContext *> &dev_ctxes) {
  // TODO(gongwb): use event wait?
  for (auto &dev_ctx : dev_ctxes) {
    dev_ctx.second->Wait();
  }
}

#if defined PADDLE_WITH_CUDA && defined PADDLE_WITH_DISTRIBUTE
template <typename DevCtx, typename DataType>
void ReduceOpHandle::GatherSelectedRows(
    const std::vector<const SelectedRows *> &src_selected_rows,
    const std::vector<platform::Place> &in_places,
    const std::map<platform::Place, platform::DeviceContext *> &dev_ctxes,
    VarHandle *out_var_handle, const platform::Place &out_place,
    SelectedRows *dst_selected_rows) {
  const CollectiveContext &collective_context =
      *CollectiveContext::GetInstance();

  // 1. gather local selected rows, merge them
G
gongweibao 已提交
64 65
  std::string gathered_var_name = out_var_handle->name() + "_gathered_tmp";
  auto scope = local_scopes_.at(out_var_handle->scope_idx());
66 67 68
  auto gathered_var_mid = scope->Var(gathered_var_name);
  auto gathered_select_rows =
      gathered_var_mid->GetMutable<framework::SelectedRows>();
69 70 71 72 73
  GatherLocalSelectedRowsFunctor functor(
      src_selected_rows, in_places, dev_ctxes, out_place, gathered_select_rows);
  WaitInputVarGenerated();
  functor();

74 75 76 77 78 79
  // FIXME(gongwb): remove this Wait.
  Wait(dev_ctxes);

  // merge them
  auto merged_dev_ctx = dynamic_cast<DevCtx *>(dev_ctxes.at(out_place));
  std::string merged_var_name =
G
gongweibao 已提交
80
      GetRemoteVarName(out_var_handle->name(), collective_context.trainer_id_);
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
  auto merged_select_rows =
      scope->Var(merged_var_name)->GetMutable<SelectedRows>();
  operators::math::scatter::MergeAdd<DevCtx, DataType> merge_func;
  merge_func(*merged_dev_ctx, *gathered_select_rows, merged_select_rows);

  // 2. start collective server if it doesn't exist
  operators::distributed::CollectiveServer *server =
      operators::distributed::CollectiveServer::GetInstance(
          collective_context.endpoints_[collective_context.trainer_id_],
          collective_context.endpoints_.size() - 1);

  auto rpc_server = server->GetRPCServer();
  rpc_server->RegisterVar(merged_var_name,
                          operators::distributed::kRequestGetMonomerVariable,
                          scope, merged_dev_ctx);

  // 3. gather them from all remote nodes.
  std::vector<const SelectedRows *> remote;
  operators::distributed::CollectiveClient *client =
      operators::distributed::CollectiveClient::GetInstance();

  std::vector<operators::distributed::RemoteVar> vars;
  for (unsigned int i = 0; i < collective_context.endpoints_.size(); i++) {
    if (i == (unsigned)collective_context.trainer_id_) continue;

    operators::distributed::RemoteVar var;
    var.trainer_id_ = i;
G
gongweibao 已提交
108
    var.var_name_ = GetRemoteVarName(out_var_handle->name(), i);
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
    var.ep_ = collective_context.endpoints_[i];

    vars.push_back(var);
    VLOG(4) << "gather from:" << var.String();
  }

  // erase gathered vars
  merged_dev_ctx->Wait();
  scope->EraseVars(std::vector<std::string>{gathered_var_name});

  PADDLE_ENFORCE(client->Gather(vars, &remote, *merged_dev_ctx, scope));
  PADDLE_ENFORCE(remote.size() == vars.size());

  // 4. merged local selected rows.
  std::vector<const SelectedRows *> all;
  all.resize(collective_context.endpoints_.size());
  for (auto v : vars) {
    all[v.trainer_id_] =
        scope->FindVar(v.var_name_)->GetMutable<SelectedRows>();
  }
  all[collective_context.trainer_id_] = merged_select_rows;

  merge_func(*merged_dev_ctx, all, dst_selected_rows);

  rpc_server->WaitVarBarrier(merged_var_name);
  rpc_server->ClearVar(merged_var_name);

  // 5. clear mid vars
  std::vector<std::string> tmp_vars{merged_var_name};
  for (auto r : vars) {
    tmp_vars.push_back(r.var_name_);
  }
  scope->EraseVars(tmp_vars);
}
#endif

C
chengduoZH 已提交
145
void ReduceOpHandle::RunImpl() {
146
  platform::RecordEvent record_event(Name());
Y
Yancey1989 已提交
147

C
chengduoZH 已提交
148
  if (places_.size() == 1) return;
C
chengduoZH 已提交
149
  // the input and output may have dummy var.
C
chengduoZH 已提交
150
  auto in_var_handles = DynamicCast<VarHandle>(inputs_);
C
chengduoZH 已提交
151 152 153 154 155

  PADDLE_ENFORCE_EQ(
      in_var_handles.size(), places_.size(),
      "The number of output should equal to the number of places.");

C
chengduoZH 已提交
156 157 158 159
  VarHandle *out_var_handle;
  {
    auto out_var_handles = DynamicCast<VarHandle>(outputs_);

T
tensor-tang 已提交
160
    PADDLE_ENFORCE_EQ(out_var_handles.size(), 1UL,
C
chengduoZH 已提交
161 162 163
                      "The number of output should be one.");
    out_var_handle = out_var_handles.front();
  }
C
chengduoZH 已提交
164

C
chengduoZH 已提交
165
  auto in_0_handle = in_var_handles[0];
C
chengduoZH 已提交
166

167
  auto &var_scopes = local_exec_scopes_;
C
chengduoZH 已提交
168 169

  auto pre_in_var =
G
gongweibao 已提交
170
      var_scopes.at(in_0_handle->scope_idx())->FindVar(in_0_handle->name());
C
chengduoZH 已提交
171 172
  PADDLE_ENFORCE_NOT_NULL(pre_in_var);

C
chengduoZH 已提交
173
  // NOTE: The Places of all input tensor must be all on CPU or all on GPU.
C
chengduoZH 已提交
174
  std::vector<platform::Place> in_places;  // used to get dev_ctx
C
chengduoZH 已提交
175
  for (auto *in_handle : in_var_handles) {
G
gongweibao 已提交
176
    in_places.emplace_back(in_handle->place());
C
chengduoZH 已提交
177
    auto in_var =
G
gongweibao 已提交
178
        var_scopes.at(in_handle->scope_idx())->FindVar(in_handle->name());
C
chengduoZH 已提交
179
    PADDLE_ENFORCE_NOT_NULL(in_var);
C
chengduoZH 已提交
180
    VariableVisitor::EnforceShapeAndDTypeEQ(*pre_in_var, *in_var);
C
chengduoZH 已提交
181
  }
C
chengduoZH 已提交
182

G
gongweibao 已提交
183 184
  auto out_var = var_scopes.at(out_var_handle->scope_idx())
                     ->FindVar(out_var_handle->name());
C
chengduoZH 已提交
185
  PADDLE_ENFORCE_NOT_NULL(out_var);
C
chengduoZH 已提交
186

C
chengduoZH 已提交
187 188 189 190 191
  // NOTE: The tensors' Place of input and output must be all on GPU or all on
  // CPU.
  auto in_p = VariableVisitor::GetMutableTensor(pre_in_var).place();
  platform::Place t_out_p;
  if (platform::is_gpu_place(in_p)) {
G
gongweibao 已提交
192
    PADDLE_ENFORCE(platform::is_gpu_place(out_var_handle->place()),
C
chengduoZH 已提交
193
                   "Places of input and output must be all on GPU.");
G
gongweibao 已提交
194
    t_out_p = out_var_handle->place();
C
chengduoZH 已提交
195 196 197
  } else {
    t_out_p = platform::CPUPlace();
  }
C
chengduoZH 已提交
198

C
chengduoZH 已提交
199
  if (pre_in_var->IsType<framework::SelectedRows>()) {
200
    this->RunAndRecordEvent([&] {
201 202
      std::vector<const SelectedRows *> in_selected_rows =
          GetInputValues<SelectedRows>(in_var_handles, var_scopes);
203 204 205 206 207 208 209 210 211

      const CollectiveContext &collective_context =
          *CollectiveContext::GetInstance();
      VLOG(10) << "GatherSelectedRows CollectiveContext:"
               << collective_context.String();

      // TODO(gongwb): add cpu support
      if (collective_context.endpoints_.size() <= 1 ||
          is_cpu_place(in_places[0]) || is_cpu_place(t_out_p)) {
212 213 214 215 216
        GatherLocalSelectedRowsFunctor functor(
            in_selected_rows, in_places, dev_ctxes_, t_out_p,
            out_var->GetMutable<framework::SelectedRows>());
        WaitInputVarGenerated();
        functor();
217 218 219 220
        return;
      }

#if defined PADDLE_WITH_CUDA && defined PADDLE_WITH_DISTRIBUTE
Y
Yu Yang 已提交
221 222
      if (in_selected_rows[0]->value().type() ==
          framework::proto::VarType::FP32) {
223 224 225
        GatherSelectedRows<platform::CUDADeviceContext, float>(
            in_selected_rows, in_places, dev_ctxes_, out_var_handle, t_out_p,
            out_var->GetMutable<framework::SelectedRows>());
Y
Yu Yang 已提交
226 227
      } else if (in_selected_rows[0]->value().type() ==
                 framework::proto::VarType::FP64) {
228 229 230 231
        GatherSelectedRows<platform::CUDADeviceContext, double>(
            in_selected_rows, in_places, dev_ctxes_, out_var_handle, t_out_p,
            out_var->GetMutable<framework::SelectedRows>());
      } else {
Y
Yu Yang 已提交
232
        PADDLE_THROW("only support double or float when gather SelectedRows");
233 234
      }
#endif
235
    });
C
chengduoZH 已提交
236
  } else {
C
chengduoZH 已提交
237 238
    std::vector<const LoDTensor *> lod_tensors =
        GetInputValues<LoDTensor>(in_var_handles, var_scopes);
C
chengduo 已提交
239

C
chengduoZH 已提交
240
    if (paddle::platform::is_cpu_place(lod_tensors[0]->place())) {
241
      WaitInputVarGenerated();
242
      this->RunAndRecordEvent([&] {
C
chengduo 已提交
243 244 245 246 247 248 249
        // FIXME(zcd): The order of summing is important,
        // especially when the type of data is float or double.
        // For example, the result of `a+b+c+d` may be different
        // with the result of `c+a+b+d`, so the summing order should be fixed.
        if (!FLAGS_cpu_deterministic) {
          ReduceLoDTensor func(lod_tensors,
                               out_var->GetMutable<framework::LoDTensor>());
Y
Yu Yang 已提交
250
          VisitDataType(lod_tensors[0]->type(), func);
C
chengduo 已提交
251 252 253
        } else {
          // We sum lod_tensors to reduce_sum_trg which is in local_scopes_0
          // here, but it doesn't mean reduce_sum_trg must be in local_scopes_0.
254
          auto &reduce_sum_trg = *this->local_exec_scopes_[0]
G
gongweibao 已提交
255
                                      ->FindVar(out_var_handle->name())
C
chengduo 已提交
256 257
                                      ->GetMutable<framework::LoDTensor>();
          ReduceLoDTensor func(lod_tensors, &reduce_sum_trg);
Y
Yu Yang 已提交
258
          VisitDataType(lod_tensors[0]->type(), func);
C
chengduo 已提交
259 260 261 262 263 264

          auto trg = out_var->GetMutable<framework::LoDTensor>();
          if (reduce_sum_trg.data<void>() != trg->data<void>()) {
            TensorCopy(reduce_sum_trg, platform::CPUPlace(), trg);
          }
        }
265
      });
C
chengduoZH 已提交
266
    } else if (paddle::platform::is_gpu_place(lod_tensors[0]->place())) {
267
#if defined(PADDLE_WITH_NCCL)
C
chengduoZH 已提交
268 269 270
      auto pre_in = pre_in_var->Get<framework::LoDTensor>();
      VariableVisitor::ShareDimsAndLoD(*pre_in_var, out_var);
      VariableVisitor::GetMutableTensor(out_var).mutable_data(
G
gongweibao 已提交
271
          out_var_handle->place(), pre_in.type());
C
chengduoZH 已提交
272

G
gongweibao 已提交
273
      auto out_p = out_var_handle->place();
C
chengduoZH 已提交
274
      int root_id = boost::get<platform::CUDAPlace>(out_p).device;
C
chengduoZH 已提交
275
      std::vector<std::function<void()>> all_reduce_calls;
C
chengduoZH 已提交
276
      for (size_t i = 0; i < var_scopes.size(); ++i) {
C
chengduoZH 已提交
277
        auto &p = in_places[i];
C
chengduoZH 已提交
278
        auto &lod_tensor = *lod_tensors[i];
C
chengduoZH 已提交
279

C
chengduoZH 已提交
280
        int dev_id = boost::get<platform::CUDAPlace>(p).device;
C
chengduoZH 已提交
281
        auto &nccl_ctx = nccl_ctxs_->at(dev_id);
C
chengduoZH 已提交
282 283 284

        void *buffer = const_cast<void *>(lod_tensor.data<void>());
        void *recvbuffer = nullptr;
C
chengduoZH 已提交
285
        if (root_id == dev_id) {
C
chengduoZH 已提交
286 287
          recvbuffer =
              out_var->GetMutable<framework::LoDTensor>()->mutable_data(
G
gongweibao 已提交
288
                  out_var_handle->place());
C
chengduoZH 已提交
289 290
        }

C
chengduoZH 已提交
291
        int type = platform::ToNCCLDataType(lod_tensor.type());
C
chengduoZH 已提交
292 293 294 295 296 297 298
        size_t numel = static_cast<size_t>(lod_tensor.numel());
        all_reduce_calls.emplace_back(
            [buffer, recvbuffer, type, numel, root_id, &nccl_ctx] {
              PADDLE_ENFORCE(platform::dynload::ncclReduce(
                  buffer, recvbuffer, numel, static_cast<ncclDataType_t>(type),
                  ncclSum, root_id, nccl_ctx.comm_, nccl_ctx.stream()));
            });
C
chengduoZH 已提交
299 300
      }

301
      WaitInputVarGenerated();
C
chengduoZH 已提交
302 303 304 305 306 307
      this->RunAndRecordEvent([&] {
        platform::NCCLGroupGuard guard;
        for (auto &call : all_reduce_calls) {
          call();
        }
      });
C
chengduoZH 已提交
308
#else
C
chengduoZH 已提交
309
      PADDLE_THROW("CUDA is not enabled.");
C
chengduoZH 已提交
310 311
#endif
    } else {
C
chengduoZH 已提交
312
      PADDLE_THROW("Place should be CPUPlace or CUDAPlace.");
C
chengduoZH 已提交
313 314 315
    }
  }
}
C
chengduoZH 已提交
316

C
chengduoZH 已提交
317 318 319
template <typename T>
std::vector<const T *> ReduceOpHandle::GetInputValues(
    const std::vector<VarHandle *> &in_var_handles,
320
    const std::vector<Scope *> &var_scopes) const {
C
chengduoZH 已提交
321 322
  std::vector<const T *> in_selected_rows;
  for (auto *in_handle : in_var_handles) {
G
gongweibao 已提交
323 324
    auto &in_sr = var_scopes.at(in_handle->scope_idx())
                      ->FindVar(in_handle->name())
C
chengduoZH 已提交
325 326
                      ->Get<T>();
    in_selected_rows.emplace_back(&in_sr);
C
chengduoZH 已提交
327
  }
C
chengduoZH 已提交
328
  return in_selected_rows;
C
chengduoZH 已提交
329 330
}

C
chengduoZH 已提交
331 332 333 334
std::string ReduceOpHandle::Name() const { return "reduce"; }
}  // namespace details
}  // namespace framework
}  // namespace paddle