reduce_op_handle.cc 12.7 KB
Newer Older
C
chengduoZH 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
//   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "paddle/fluid/framework/details/reduce_op_handle.h"
16
#include <memory>
C
chengduoZH 已提交
17
#include "paddle/fluid/framework/details/container_cast.h"
C
chengduoZH 已提交
18
#include "paddle/fluid/framework/details/reduce_and_gather.h"
C
chengduoZH 已提交
19
#include "paddle/fluid/framework/details/variable_visitor.h"
20 21 22 23 24 25
#if defined PADDLE_WITH_CUDA && defined PADDLE_WITH_DISTRIBUTE
#include "paddle/fluid/operators/distributed/collective_client.h"
#include "paddle/fluid/operators/distributed/collective_server.h"
#include "paddle/fluid/operators/distributed/request_handler.h"
#endif
#include "paddle/fluid/operators/math/selected_rows_functor.h"
26
#include "paddle/fluid/platform/profiler.h"
C
chengduoZH 已提交
27

C
chengduo 已提交
28 29 30 31
DEFINE_bool(
    cpu_deterministic, false,
    "Whether to make the result of computation deterministic in CPU side.");

C
chengduoZH 已提交
32 33 34 35
namespace paddle {
namespace framework {
namespace details {

36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
std::once_flag CollectiveContext::init_flag_;
std::unique_ptr<CollectiveContext> CollectiveContext::context_;

static inline std::string GetRemoteVarName(const std::string &var_name,
                                           int trainer_id) {
  return string::Sprintf("%s_merged_tmp@trainer_%d", var_name, trainer_id);
}

void ReduceOpHandle::Wait(
    const std::map<platform::Place, platform::DeviceContext *> &dev_ctxes) {
  // TODO(gongwb): use event wait?
  for (auto &dev_ctx : dev_ctxes) {
    dev_ctx.second->Wait();
  }
}

#if defined PADDLE_WITH_CUDA && defined PADDLE_WITH_DISTRIBUTE
template <typename DevCtx, typename DataType>
void ReduceOpHandle::GatherSelectedRows(
    const std::vector<const SelectedRows *> &src_selected_rows,
    const std::vector<platform::Place> &in_places,
    const std::map<platform::Place, platform::DeviceContext *> &dev_ctxes,
    VarHandle *out_var_handle, const platform::Place &out_place,
    SelectedRows *dst_selected_rows) {
  const CollectiveContext &collective_context =
      *CollectiveContext::GetInstance();

  // 1. gather local selected rows, merge them
G
gongweibao 已提交
64 65
  std::string gathered_var_name = out_var_handle->name() + "_gathered_tmp";
  auto scope = local_scopes_.at(out_var_handle->scope_idx());
66 67 68 69 70 71 72 73 74 75 76
  auto gathered_var_mid = scope->Var(gathered_var_name);
  auto gathered_select_rows =
      gathered_var_mid->GetMutable<framework::SelectedRows>();
  GatherLocalSelectedRows(src_selected_rows, in_places, dev_ctxes, out_place,
                          gathered_select_rows);
  // FIXME(gongwb): remove this Wait.
  Wait(dev_ctxes);

  // merge them
  auto merged_dev_ctx = dynamic_cast<DevCtx *>(dev_ctxes.at(out_place));
  std::string merged_var_name =
G
gongweibao 已提交
77
      GetRemoteVarName(out_var_handle->name(), collective_context.trainer_id_);
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
  auto merged_select_rows =
      scope->Var(merged_var_name)->GetMutable<SelectedRows>();
  operators::math::scatter::MergeAdd<DevCtx, DataType> merge_func;
  merge_func(*merged_dev_ctx, *gathered_select_rows, merged_select_rows);

  // 2. start collective server if it doesn't exist
  operators::distributed::CollectiveServer *server =
      operators::distributed::CollectiveServer::GetInstance(
          collective_context.endpoints_[collective_context.trainer_id_],
          collective_context.endpoints_.size() - 1);

  auto rpc_server = server->GetRPCServer();
  rpc_server->RegisterVar(merged_var_name,
                          operators::distributed::kRequestGetMonomerVariable,
                          scope, merged_dev_ctx);

  // 3. gather them from all remote nodes.
  std::vector<const SelectedRows *> remote;
  operators::distributed::CollectiveClient *client =
      operators::distributed::CollectiveClient::GetInstance();

  std::vector<operators::distributed::RemoteVar> vars;
  for (unsigned int i = 0; i < collective_context.endpoints_.size(); i++) {
    if (i == (unsigned)collective_context.trainer_id_) continue;

    operators::distributed::RemoteVar var;
    var.trainer_id_ = i;
G
gongweibao 已提交
105
    var.var_name_ = GetRemoteVarName(out_var_handle->name(), i);
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141
    var.ep_ = collective_context.endpoints_[i];

    vars.push_back(var);
    VLOG(4) << "gather from:" << var.String();
  }

  // erase gathered vars
  merged_dev_ctx->Wait();
  scope->EraseVars(std::vector<std::string>{gathered_var_name});

  PADDLE_ENFORCE(client->Gather(vars, &remote, *merged_dev_ctx, scope));
  PADDLE_ENFORCE(remote.size() == vars.size());

  // 4. merged local selected rows.
  std::vector<const SelectedRows *> all;
  all.resize(collective_context.endpoints_.size());
  for (auto v : vars) {
    all[v.trainer_id_] =
        scope->FindVar(v.var_name_)->GetMutable<SelectedRows>();
  }
  all[collective_context.trainer_id_] = merged_select_rows;

  merge_func(*merged_dev_ctx, all, dst_selected_rows);

  rpc_server->WaitVarBarrier(merged_var_name);
  rpc_server->ClearVar(merged_var_name);

  // 5. clear mid vars
  std::vector<std::string> tmp_vars{merged_var_name};
  for (auto r : vars) {
    tmp_vars.push_back(r.var_name_);
  }
  scope->EraseVars(tmp_vars);
}
#endif

C
chengduoZH 已提交
142
void ReduceOpHandle::RunImpl() {
143
  platform::RecordEvent record_event(Name());
Y
Yancey1989 已提交
144

C
chengduoZH 已提交
145
  if (places_.size() == 1) return;
C
chengduoZH 已提交
146
  // the input and output may have dummy var.
C
chengduoZH 已提交
147
  auto in_var_handles = DynamicCast<VarHandle>(inputs_);
C
chengduoZH 已提交
148 149 150 151 152

  PADDLE_ENFORCE_EQ(
      in_var_handles.size(), places_.size(),
      "The number of output should equal to the number of places.");

C
chengduoZH 已提交
153 154 155 156
  VarHandle *out_var_handle;
  {
    auto out_var_handles = DynamicCast<VarHandle>(outputs_);

T
tensor-tang 已提交
157
    PADDLE_ENFORCE_EQ(out_var_handles.size(), 1UL,
C
chengduoZH 已提交
158 159 160
                      "The number of output should be one.");
    out_var_handle = out_var_handles.front();
  }
C
chengduoZH 已提交
161

C
chengduoZH 已提交
162
  auto in_0_handle = in_var_handles[0];
C
chengduoZH 已提交
163

164
  auto &var_scopes = local_exec_scopes_;
C
chengduoZH 已提交
165 166

  auto pre_in_var =
G
gongweibao 已提交
167
      var_scopes.at(in_0_handle->scope_idx())->FindVar(in_0_handle->name());
C
chengduoZH 已提交
168 169 170
  PADDLE_ENFORCE_NOT_NULL(pre_in_var);

  // Wait input done, this Wait is asynchronous operation
C
chengduoZH 已提交
171
  WaitInputVarGenerated();
C
chengduoZH 已提交
172

C
chengduoZH 已提交
173
  // NOTE: The Places of all input tensor must be all on CPU or all on GPU.
C
chengduoZH 已提交
174
  std::vector<platform::Place> in_places;  // used to get dev_ctx
C
chengduoZH 已提交
175
  for (auto *in_handle : in_var_handles) {
G
gongweibao 已提交
176
    in_places.emplace_back(in_handle->place());
C
chengduoZH 已提交
177
    auto in_var =
G
gongweibao 已提交
178
        var_scopes.at(in_handle->scope_idx())->FindVar(in_handle->name());
C
chengduoZH 已提交
179
    PADDLE_ENFORCE_NOT_NULL(in_var);
C
chengduoZH 已提交
180
    VariableVisitor::EnforceShapeAndDTypeEQ(*pre_in_var, *in_var);
C
chengduoZH 已提交
181
  }
C
chengduoZH 已提交
182

G
gongweibao 已提交
183 184
  auto out_var = var_scopes.at(out_var_handle->scope_idx())
                     ->FindVar(out_var_handle->name());
C
chengduoZH 已提交
185
  PADDLE_ENFORCE_NOT_NULL(out_var);
C
chengduoZH 已提交
186

C
chengduoZH 已提交
187 188 189 190 191
  // NOTE: The tensors' Place of input and output must be all on GPU or all on
  // CPU.
  auto in_p = VariableVisitor::GetMutableTensor(pre_in_var).place();
  platform::Place t_out_p;
  if (platform::is_gpu_place(in_p)) {
G
gongweibao 已提交
192
    PADDLE_ENFORCE(platform::is_gpu_place(out_var_handle->place()),
C
chengduoZH 已提交
193
                   "Places of input and output must be all on GPU.");
G
gongweibao 已提交
194
    t_out_p = out_var_handle->place();
C
chengduoZH 已提交
195 196 197
  } else {
    t_out_p = platform::CPUPlace();
  }
C
chengduoZH 已提交
198

C
chengduoZH 已提交
199
  if (pre_in_var->IsType<framework::SelectedRows>()) {
200
    this->RunAndRecordEvent([&] {
201 202
      std::vector<const SelectedRows *> in_selected_rows =
          GetInputValues<SelectedRows>(in_var_handles, var_scopes);
203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218

      const CollectiveContext &collective_context =
          *CollectiveContext::GetInstance();
      VLOG(10) << "GatherSelectedRows CollectiveContext:"
               << collective_context.String();

      // TODO(gongwb): add cpu support
      if (collective_context.endpoints_.size() <= 1 ||
          is_cpu_place(in_places[0]) || is_cpu_place(t_out_p)) {
        GatherLocalSelectedRows(in_selected_rows, in_places, dev_ctxes_,
                                t_out_p,
                                out_var->GetMutable<framework::SelectedRows>());
        return;
      }

#if defined PADDLE_WITH_CUDA && defined PADDLE_WITH_DISTRIBUTE
Y
Yu Yang 已提交
219 220
      if (in_selected_rows[0]->value().type() ==
          framework::proto::VarType::FP32) {
221 222 223
        GatherSelectedRows<platform::CUDADeviceContext, float>(
            in_selected_rows, in_places, dev_ctxes_, out_var_handle, t_out_p,
            out_var->GetMutable<framework::SelectedRows>());
Y
Yu Yang 已提交
224 225
      } else if (in_selected_rows[0]->value().type() ==
                 framework::proto::VarType::FP64) {
226 227 228 229
        GatherSelectedRows<platform::CUDADeviceContext, double>(
            in_selected_rows, in_places, dev_ctxes_, out_var_handle, t_out_p,
            out_var->GetMutable<framework::SelectedRows>());
      } else {
Y
Yu Yang 已提交
230
        PADDLE_THROW("only support double or float when gather SelectedRows");
231 232
      }
#endif
233
    });
C
chengduoZH 已提交
234
  } else {
C
chengduoZH 已提交
235 236
    std::vector<const LoDTensor *> lod_tensors =
        GetInputValues<LoDTensor>(in_var_handles, var_scopes);
C
chengduo 已提交
237

C
chengduoZH 已提交
238
    if (paddle::platform::is_cpu_place(lod_tensors[0]->place())) {
239
      this->RunAndRecordEvent([&] {
C
chengduo 已提交
240 241 242 243 244 245 246
        // FIXME(zcd): The order of summing is important,
        // especially when the type of data is float or double.
        // For example, the result of `a+b+c+d` may be different
        // with the result of `c+a+b+d`, so the summing order should be fixed.
        if (!FLAGS_cpu_deterministic) {
          ReduceLoDTensor func(lod_tensors,
                               out_var->GetMutable<framework::LoDTensor>());
Y
Yu Yang 已提交
247
          VisitDataType(lod_tensors[0]->type(), func);
C
chengduo 已提交
248 249 250
        } else {
          // We sum lod_tensors to reduce_sum_trg which is in local_scopes_0
          // here, but it doesn't mean reduce_sum_trg must be in local_scopes_0.
251
          auto &reduce_sum_trg = *this->local_exec_scopes_[0]
G
gongweibao 已提交
252
                                      ->FindVar(out_var_handle->name())
C
chengduo 已提交
253 254
                                      ->GetMutable<framework::LoDTensor>();
          ReduceLoDTensor func(lod_tensors, &reduce_sum_trg);
Y
Yu Yang 已提交
255
          VisitDataType(lod_tensors[0]->type(), func);
C
chengduo 已提交
256 257 258 259 260 261

          auto trg = out_var->GetMutable<framework::LoDTensor>();
          if (reduce_sum_trg.data<void>() != trg->data<void>()) {
            TensorCopy(reduce_sum_trg, platform::CPUPlace(), trg);
          }
        }
262
      });
C
chengduoZH 已提交
263
    } else if (paddle::platform::is_gpu_place(lod_tensors[0]->place())) {
P
peizhilin 已提交
264
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
C
chengduoZH 已提交
265 266 267
      auto pre_in = pre_in_var->Get<framework::LoDTensor>();
      VariableVisitor::ShareDimsAndLoD(*pre_in_var, out_var);
      VariableVisitor::GetMutableTensor(out_var).mutable_data(
G
gongweibao 已提交
268
          out_var_handle->place(), pre_in.type());
C
chengduoZH 已提交
269

G
gongweibao 已提交
270
      auto out_p = out_var_handle->place();
C
chengduoZH 已提交
271
      int root_id = boost::get<platform::CUDAPlace>(out_p).device;
C
chengduoZH 已提交
272
      std::vector<std::function<void()>> all_reduce_calls;
C
chengduoZH 已提交
273
      for (size_t i = 0; i < var_scopes.size(); ++i) {
C
chengduoZH 已提交
274
        auto &p = in_places[i];
C
chengduoZH 已提交
275
        auto &lod_tensor = *lod_tensors[i];
C
chengduoZH 已提交
276

C
chengduoZH 已提交
277
        int dev_id = boost::get<platform::CUDAPlace>(p).device;
C
chengduoZH 已提交
278
        auto &nccl_ctx = nccl_ctxs_->at(dev_id);
C
chengduoZH 已提交
279 280 281

        void *buffer = const_cast<void *>(lod_tensor.data<void>());
        void *recvbuffer = nullptr;
C
chengduoZH 已提交
282
        if (root_id == dev_id) {
C
chengduoZH 已提交
283 284
          recvbuffer =
              out_var->GetMutable<framework::LoDTensor>()->mutable_data(
G
gongweibao 已提交
285
                  out_var_handle->place());
C
chengduoZH 已提交
286 287
        }

C
chengduoZH 已提交
288
        int type = platform::ToNCCLDataType(lod_tensor.type());
C
chengduoZH 已提交
289 290 291 292 293 294 295
        size_t numel = static_cast<size_t>(lod_tensor.numel());
        all_reduce_calls.emplace_back(
            [buffer, recvbuffer, type, numel, root_id, &nccl_ctx] {
              PADDLE_ENFORCE(platform::dynload::ncclReduce(
                  buffer, recvbuffer, numel, static_cast<ncclDataType_t>(type),
                  ncclSum, root_id, nccl_ctx.comm_, nccl_ctx.stream()));
            });
C
chengduoZH 已提交
296 297
      }

C
chengduoZH 已提交
298 299 300 301 302 303
      this->RunAndRecordEvent([&] {
        platform::NCCLGroupGuard guard;
        for (auto &call : all_reduce_calls) {
          call();
        }
      });
C
chengduoZH 已提交
304
#else
C
chengduoZH 已提交
305
      PADDLE_THROW("CUDA is not enabled.");
C
chengduoZH 已提交
306 307
#endif
    } else {
C
chengduoZH 已提交
308
      PADDLE_THROW("Place should be CPUPlace or CUDAPlace.");
C
chengduoZH 已提交
309 310 311
    }
  }
}
C
chengduoZH 已提交
312

C
chengduoZH 已提交
313 314 315
template <typename T>
std::vector<const T *> ReduceOpHandle::GetInputValues(
    const std::vector<VarHandle *> &in_var_handles,
316
    const std::vector<Scope *> &var_scopes) const {
C
chengduoZH 已提交
317 318
  std::vector<const T *> in_selected_rows;
  for (auto *in_handle : in_var_handles) {
G
gongweibao 已提交
319 320
    auto &in_sr = var_scopes.at(in_handle->scope_idx())
                      ->FindVar(in_handle->name())
C
chengduoZH 已提交
321 322
                      ->Get<T>();
    in_selected_rows.emplace_back(&in_sr);
C
chengduoZH 已提交
323
  }
C
chengduoZH 已提交
324
  return in_selected_rows;
C
chengduoZH 已提交
325 326
}

C
chengduoZH 已提交
327 328 329 330
std::string ReduceOpHandle::Name() const { return "reduce"; }
}  // namespace details
}  // namespace framework
}  // namespace paddle