reduce_op_handle.cc 12.9 KB
Newer Older
C
chengduoZH 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
//   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "paddle/fluid/framework/details/reduce_op_handle.h"
C
chengduoZH 已提交
16
#include "paddle/fluid/framework/details/container_cast.h"
C
chengduoZH 已提交
17
#include "paddle/fluid/framework/details/reduce_and_gather.h"
C
chengduoZH 已提交
18
#include "paddle/fluid/framework/details/variable_visitor.h"
19 20 21 22 23 24
#if defined PADDLE_WITH_CUDA && defined PADDLE_WITH_DISTRIBUTE
#include "paddle/fluid/operators/distributed/collective_client.h"
#include "paddle/fluid/operators/distributed/collective_server.h"
#include "paddle/fluid/operators/distributed/request_handler.h"
#endif
#include "paddle/fluid/operators/math/selected_rows_functor.h"
25
#include "paddle/fluid/platform/profiler.h"
C
chengduoZH 已提交
26

C
chengduo 已提交
27 28 29 30
DEFINE_bool(
    cpu_deterministic, false,
    "Whether to make the result of computation deterministic in CPU side.");

C
chengduoZH 已提交
31 32 33 34
namespace paddle {
namespace framework {
namespace details {

35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
std::once_flag CollectiveContext::init_flag_;
std::unique_ptr<CollectiveContext> CollectiveContext::context_;

static inline std::string GetRemoteVarName(const std::string &var_name,
                                           int trainer_id) {
  return string::Sprintf("%s_merged_tmp@trainer_%d", var_name, trainer_id);
}

void ReduceOpHandle::Wait(
    const std::map<platform::Place, platform::DeviceContext *> &dev_ctxes) {
  // TODO(gongwb): use event wait?
  for (auto &dev_ctx : dev_ctxes) {
    dev_ctx.second->Wait();
  }
}

#if defined PADDLE_WITH_CUDA && defined PADDLE_WITH_DISTRIBUTE
template <typename DevCtx, typename DataType>
void ReduceOpHandle::GatherSelectedRows(
    const std::vector<const SelectedRows *> &src_selected_rows,
    const std::vector<platform::Place> &in_places,
    const std::map<platform::Place, platform::DeviceContext *> &dev_ctxes,
    VarHandle *out_var_handle, const platform::Place &out_place,
    SelectedRows *dst_selected_rows) {
  const CollectiveContext &collective_context =
      *CollectiveContext::GetInstance();

  // 1. gather local selected rows, merge them
G
gongweibao 已提交
63 64
  std::string gathered_var_name = out_var_handle->name() + "_gathered_tmp";
  auto scope = local_scopes_.at(out_var_handle->scope_idx());
65 66 67 68 69 70 71 72 73 74 75
  auto gathered_var_mid = scope->Var(gathered_var_name);
  auto gathered_select_rows =
      gathered_var_mid->GetMutable<framework::SelectedRows>();
  GatherLocalSelectedRows(src_selected_rows, in_places, dev_ctxes, out_place,
                          gathered_select_rows);
  // FIXME(gongwb): remove this Wait.
  Wait(dev_ctxes);

  // merge them
  auto merged_dev_ctx = dynamic_cast<DevCtx *>(dev_ctxes.at(out_place));
  std::string merged_var_name =
G
gongweibao 已提交
76
      GetRemoteVarName(out_var_handle->name(), collective_context.trainer_id_);
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
  auto merged_select_rows =
      scope->Var(merged_var_name)->GetMutable<SelectedRows>();
  operators::math::scatter::MergeAdd<DevCtx, DataType> merge_func;
  merge_func(*merged_dev_ctx, *gathered_select_rows, merged_select_rows);

  // 2. start collective server if it doesn't exist
  operators::distributed::CollectiveServer *server =
      operators::distributed::CollectiveServer::GetInstance(
          collective_context.endpoints_[collective_context.trainer_id_],
          collective_context.endpoints_.size() - 1);

  auto rpc_server = server->GetRPCServer();
  rpc_server->RegisterVar(merged_var_name,
                          operators::distributed::kRequestGetMonomerVariable,
                          scope, merged_dev_ctx);

  // 3. gather them from all remote nodes.
  std::vector<const SelectedRows *> remote;
  operators::distributed::CollectiveClient *client =
      operators::distributed::CollectiveClient::GetInstance();

  std::vector<operators::distributed::RemoteVar> vars;
  for (unsigned int i = 0; i < collective_context.endpoints_.size(); i++) {
    if (i == (unsigned)collective_context.trainer_id_) continue;

    operators::distributed::RemoteVar var;
    var.trainer_id_ = i;
G
gongweibao 已提交
104
    var.var_name_ = GetRemoteVarName(out_var_handle->name(), i);
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
    var.ep_ = collective_context.endpoints_[i];

    vars.push_back(var);
    VLOG(4) << "gather from:" << var.String();
  }

  // erase gathered vars
  merged_dev_ctx->Wait();
  scope->EraseVars(std::vector<std::string>{gathered_var_name});

  PADDLE_ENFORCE(client->Gather(vars, &remote, *merged_dev_ctx, scope));
  PADDLE_ENFORCE(remote.size() == vars.size());

  // 4. merged local selected rows.
  std::vector<const SelectedRows *> all;
  all.resize(collective_context.endpoints_.size());
  for (auto v : vars) {
    all[v.trainer_id_] =
        scope->FindVar(v.var_name_)->GetMutable<SelectedRows>();
  }
  all[collective_context.trainer_id_] = merged_select_rows;

  merge_func(*merged_dev_ctx, all, dst_selected_rows);

  rpc_server->WaitVarBarrier(merged_var_name);
  rpc_server->ClearVar(merged_var_name);

  // 5. clear mid vars
  std::vector<std::string> tmp_vars{merged_var_name};
  for (auto r : vars) {
    tmp_vars.push_back(r.var_name_);
  }
  scope->EraseVars(tmp_vars);
}
#endif

C
chengduoZH 已提交
141
void ReduceOpHandle::RunImpl() {
C
chengduo 已提交
142
  platform::RecordEvent record_event(Name(), dev_ctxes_.cbegin()->second);
Y
Yancey1989 已提交
143

C
chengduoZH 已提交
144
  if (places_.size() == 1) return;
C
chengduoZH 已提交
145
  // the input and output may have dummy var.
C
chengduoZH 已提交
146
  auto in_var_handles = DynamicCast<VarHandle>(inputs_);
C
chengduoZH 已提交
147 148 149 150 151

  PADDLE_ENFORCE_EQ(
      in_var_handles.size(), places_.size(),
      "The number of output should equal to the number of places.");

C
chengduoZH 已提交
152 153 154 155
  VarHandle *out_var_handle;
  {
    auto out_var_handles = DynamicCast<VarHandle>(outputs_);

T
tensor-tang 已提交
156
    PADDLE_ENFORCE_EQ(out_var_handles.size(), 1UL,
C
chengduoZH 已提交
157 158 159
                      "The number of output should be one.");
    out_var_handle = out_var_handles.front();
  }
C
chengduoZH 已提交
160

C
chengduoZH 已提交
161
  auto in_0_handle = in_var_handles[0];
C
chengduoZH 已提交
162

C
chengduoZH 已提交
163 164 165 166 167 168
  std::vector<const Scope *> var_scopes;
  for (auto *s : local_scopes_) {
    var_scopes.emplace_back(s->FindVar(kLocalExecScopeName)->Get<Scope *>());
  }

  auto pre_in_var =
G
gongweibao 已提交
169
      var_scopes.at(in_0_handle->scope_idx())->FindVar(in_0_handle->name());
C
chengduoZH 已提交
170 171 172
  PADDLE_ENFORCE_NOT_NULL(pre_in_var);

  // Wait input done, this Wait is asynchronous operation
C
chengduoZH 已提交
173
  WaitInputVarGenerated();
C
chengduoZH 已提交
174

C
chengduoZH 已提交
175
  // NOTE: The Places of all input tensor must be all on CPU or all on GPU.
C
chengduoZH 已提交
176
  std::vector<platform::Place> in_places;  // used to get dev_ctx
C
chengduoZH 已提交
177
  for (auto *in_handle : in_var_handles) {
G
gongweibao 已提交
178
    in_places.emplace_back(in_handle->place());
C
chengduoZH 已提交
179
    auto in_var =
G
gongweibao 已提交
180
        var_scopes.at(in_handle->scope_idx())->FindVar(in_handle->name());
C
chengduoZH 已提交
181
    PADDLE_ENFORCE_NOT_NULL(in_var);
C
chengduoZH 已提交
182
    VariableVisitor::EnforceShapeAndDTypeEQ(*pre_in_var, *in_var);
C
chengduoZH 已提交
183
  }
C
chengduoZH 已提交
184

G
gongweibao 已提交
185 186
  auto out_var = var_scopes.at(out_var_handle->scope_idx())
                     ->FindVar(out_var_handle->name());
C
chengduoZH 已提交
187
  PADDLE_ENFORCE_NOT_NULL(out_var);
C
chengduoZH 已提交
188

C
chengduoZH 已提交
189 190 191 192 193
  // NOTE: The tensors' Place of input and output must be all on GPU or all on
  // CPU.
  auto in_p = VariableVisitor::GetMutableTensor(pre_in_var).place();
  platform::Place t_out_p;
  if (platform::is_gpu_place(in_p)) {
G
gongweibao 已提交
194
    PADDLE_ENFORCE(platform::is_gpu_place(out_var_handle->place()),
C
chengduoZH 已提交
195
                   "Places of input and output must be all on GPU.");
G
gongweibao 已提交
196
    t_out_p = out_var_handle->place();
C
chengduoZH 已提交
197 198 199
  } else {
    t_out_p = platform::CPUPlace();
  }
C
chengduoZH 已提交
200

C
chengduoZH 已提交
201
  if (pre_in_var->IsType<framework::SelectedRows>()) {
202
    this->RunAndRecordEvent([&] {
203 204
      std::vector<const SelectedRows *> in_selected_rows =
          GetInputValues<SelectedRows>(in_var_handles, var_scopes);
205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220

      const CollectiveContext &collective_context =
          *CollectiveContext::GetInstance();
      VLOG(10) << "GatherSelectedRows CollectiveContext:"
               << collective_context.String();

      // TODO(gongwb): add cpu support
      if (collective_context.endpoints_.size() <= 1 ||
          is_cpu_place(in_places[0]) || is_cpu_place(t_out_p)) {
        GatherLocalSelectedRows(in_selected_rows, in_places, dev_ctxes_,
                                t_out_p,
                                out_var->GetMutable<framework::SelectedRows>());
        return;
      }

#if defined PADDLE_WITH_CUDA && defined PADDLE_WITH_DISTRIBUTE
Y
Yu Yang 已提交
221 222
      if (in_selected_rows[0]->value().type() ==
          framework::proto::VarType::FP32) {
223 224 225
        GatherSelectedRows<platform::CUDADeviceContext, float>(
            in_selected_rows, in_places, dev_ctxes_, out_var_handle, t_out_p,
            out_var->GetMutable<framework::SelectedRows>());
Y
Yu Yang 已提交
226 227
      } else if (in_selected_rows[0]->value().type() ==
                 framework::proto::VarType::FP64) {
228 229 230 231
        GatherSelectedRows<platform::CUDADeviceContext, double>(
            in_selected_rows, in_places, dev_ctxes_, out_var_handle, t_out_p,
            out_var->GetMutable<framework::SelectedRows>());
      } else {
Y
Yu Yang 已提交
232
        PADDLE_THROW("only support double or float when gather SelectedRows");
233 234
      }
#endif
235
    });
C
chengduoZH 已提交
236
  } else {
C
chengduoZH 已提交
237 238
    std::vector<const LoDTensor *> lod_tensors =
        GetInputValues<LoDTensor>(in_var_handles, var_scopes);
C
chengduo 已提交
239

C
chengduoZH 已提交
240
    if (paddle::platform::is_cpu_place(lod_tensors[0]->place())) {
241
      this->RunAndRecordEvent([&] {
C
chengduo 已提交
242 243 244 245 246 247 248
        // FIXME(zcd): The order of summing is important,
        // especially when the type of data is float or double.
        // For example, the result of `a+b+c+d` may be different
        // with the result of `c+a+b+d`, so the summing order should be fixed.
        if (!FLAGS_cpu_deterministic) {
          ReduceLoDTensor func(lod_tensors,
                               out_var->GetMutable<framework::LoDTensor>());
Y
Yu Yang 已提交
249
          VisitDataType(lod_tensors[0]->type(), func);
C
chengduo 已提交
250 251 252 253 254 255
        } else {
          // We sum lod_tensors to reduce_sum_trg which is in local_scopes_0
          // here, but it doesn't mean reduce_sum_trg must be in local_scopes_0.
          auto &reduce_sum_trg = *this->local_scopes_[0]
                                      ->FindVar(kLocalExecScopeName)
                                      ->Get<Scope *>()
G
gongweibao 已提交
256
                                      ->FindVar(out_var_handle->name())
C
chengduo 已提交
257 258
                                      ->GetMutable<framework::LoDTensor>();
          ReduceLoDTensor func(lod_tensors, &reduce_sum_trg);
Y
Yu Yang 已提交
259
          VisitDataType(lod_tensors[0]->type(), func);
C
chengduo 已提交
260 261 262 263 264 265

          auto trg = out_var->GetMutable<framework::LoDTensor>();
          if (reduce_sum_trg.data<void>() != trg->data<void>()) {
            TensorCopy(reduce_sum_trg, platform::CPUPlace(), trg);
          }
        }
266
      });
C
chengduoZH 已提交
267
    } else if (paddle::platform::is_gpu_place(lod_tensors[0]->place())) {
P
peizhilin 已提交
268
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
C
chengduoZH 已提交
269 270 271
      auto pre_in = pre_in_var->Get<framework::LoDTensor>();
      VariableVisitor::ShareDimsAndLoD(*pre_in_var, out_var);
      VariableVisitor::GetMutableTensor(out_var).mutable_data(
G
gongweibao 已提交
272
          out_var_handle->place(), pre_in.type());
C
chengduoZH 已提交
273

G
gongweibao 已提交
274
      auto out_p = out_var_handle->place();
C
chengduoZH 已提交
275
      int root_id = boost::get<platform::CUDAPlace>(out_p).device;
C
chengduoZH 已提交
276
      std::vector<std::function<void()>> all_reduce_calls;
C
chengduoZH 已提交
277
      for (size_t i = 0; i < var_scopes.size(); ++i) {
C
chengduoZH 已提交
278
        auto &p = in_places[i];
C
chengduoZH 已提交
279
        auto &lod_tensor = *lod_tensors[i];
C
chengduoZH 已提交
280

C
chengduoZH 已提交
281
        int dev_id = boost::get<platform::CUDAPlace>(p).device;
C
chengduoZH 已提交
282
        auto &nccl_ctx = nccl_ctxs_->at(dev_id);
C
chengduoZH 已提交
283 284 285

        void *buffer = const_cast<void *>(lod_tensor.data<void>());
        void *recvbuffer = nullptr;
C
chengduoZH 已提交
286
        if (root_id == dev_id) {
C
chengduoZH 已提交
287 288
          recvbuffer =
              out_var->GetMutable<framework::LoDTensor>()->mutable_data(
G
gongweibao 已提交
289
                  out_var_handle->place());
C
chengduoZH 已提交
290 291
        }

C
chengduoZH 已提交
292
        int type = platform::ToNCCLDataType(lod_tensor.type());
C
chengduoZH 已提交
293 294 295 296 297 298 299
        size_t numel = static_cast<size_t>(lod_tensor.numel());
        all_reduce_calls.emplace_back(
            [buffer, recvbuffer, type, numel, root_id, &nccl_ctx] {
              PADDLE_ENFORCE(platform::dynload::ncclReduce(
                  buffer, recvbuffer, numel, static_cast<ncclDataType_t>(type),
                  ncclSum, root_id, nccl_ctx.comm_, nccl_ctx.stream()));
            });
C
chengduoZH 已提交
300 301
      }

C
chengduoZH 已提交
302 303 304 305 306 307
      this->RunAndRecordEvent([&] {
        platform::NCCLGroupGuard guard;
        for (auto &call : all_reduce_calls) {
          call();
        }
      });
C
chengduoZH 已提交
308
#else
C
chengduoZH 已提交
309
      PADDLE_THROW("CUDA is not enabled.");
C
chengduoZH 已提交
310 311
#endif
    } else {
C
chengduoZH 已提交
312
      PADDLE_THROW("Place should be CPUPlace or CUDAPlace.");
C
chengduoZH 已提交
313 314 315
    }
  }
}
C
chengduoZH 已提交
316

C
chengduoZH 已提交
317 318 319 320 321 322
template <typename T>
std::vector<const T *> ReduceOpHandle::GetInputValues(
    const std::vector<VarHandle *> &in_var_handles,
    const std::vector<const Scope *> &var_scopes) const {
  std::vector<const T *> in_selected_rows;
  for (auto *in_handle : in_var_handles) {
G
gongweibao 已提交
323 324
    auto &in_sr = var_scopes.at(in_handle->scope_idx())
                      ->FindVar(in_handle->name())
C
chengduoZH 已提交
325 326
                      ->Get<T>();
    in_selected_rows.emplace_back(&in_sr);
C
chengduoZH 已提交
327
  }
C
chengduoZH 已提交
328
  return in_selected_rows;
C
chengduoZH 已提交
329 330
}

C
chengduoZH 已提交
331 332 333 334
std::string ReduceOpHandle::Name() const { return "reduce"; }
}  // namespace details
}  // namespace framework
}  // namespace paddle