parallel_executor.cc 29.2 KB
Newer Older
Y
Yang Yang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

#include "paddle/fluid/framework/parallel_executor.h"
D
dzhwinter 已提交
16
#include <algorithm>
Q
qingqing01 已提交
17
#include <memory>
C
chengduoZH 已提交
18
#include <string>
19
#include <tuple>
Q
Qiao Longfei 已提交
20
#include <utility>
Q
qiaolongfei 已提交
21
#include <vector>
Q
Qiao Longfei 已提交
22
#include "paddle/fluid/framework/details/async_ssa_graph_executor.h"
Y
yuyang18 已提交
23
#include "paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.h"
24
#include "paddle/fluid/framework/details/multi_devices_helper.h"
Y
Yancey1989 已提交
25
#include "paddle/fluid/framework/details/parallel_ssa_graph_executor.h"
Y
yuyang18 已提交
26
#include "paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.h"
Y
Yu Yang 已提交
27
#include "paddle/fluid/framework/details/threaded_ssa_graph_executor.h"
28 29
#include "paddle/fluid/framework/ir/graph.h"
#include "paddle/fluid/framework/ir/graph_helper.h"
30
#include "paddle/fluid/framework/ir/memory_optimize_pass/reference_count_pass_helper.h"
31
#include "paddle/fluid/platform/profiler.h"
Y
Yu Yang 已提交
32

Y
Yu Yang 已提交
33
#ifdef WITH_GPERFTOOLS
Y
Yu Yang 已提交
34
#include "gperftools/profiler.h"
Y
Yu Yang 已提交
35
#endif
Y
Yu Yang 已提交
36
DEFINE_string(pe_profile_fname, "",
Y
Yu Yang 已提交
37 38
              "Profiler filename for PE, which generated by gperftools."
              "Only valid when compiled `WITH_PRIFILER=ON`. Empty if disable.");
39
DEFINE_bool(enable_parallel_graph, false,
Y
Yancey1989 已提交
40
            "Force disable parallel graph execution mode if set false.");
Y
Yu Yang 已提交
41

Y
Yang Yang 已提交
42
namespace paddle {
Y
Yu Yang 已提交
43 44
namespace framework {

Y
Yu Yang 已提交
45
static std::once_flag gProfileOnce;
Y
Yu Yang 已提交
46
#ifdef WITH_GPERFTOOLS
Y
Yu Yang 已提交
47
static bool gProfileStarted = false;
Y
Yu Yang 已提交
48
#endif
49

Y
Yu Yang 已提交
50 51 52
class ParallelExecutorPrivate {
 public:
  explicit ParallelExecutorPrivate(const std::vector<platform::Place> &places)
Y
Yu Yang 已提交
53
      : places_(places) {
Y
Yu Yang 已提交
54
    if (!FLAGS_pe_profile_fname.empty()) {
Y
Yu Yang 已提交
55 56
      std::call_once(gProfileOnce, [] {
#ifdef WITH_GPERFTOOLS
Y
Yu Yang 已提交
57
        ProfilerStart(FLAGS_pe_profile_fname.c_str());
Y
Yu Yang 已提交
58 59 60
        gProfileStarted = true;
#else
        LOG(WARNING) << "Paddle is not compiled with gperftools. "
61
          "FLAGS_pe_profile_fname will be ignored";
Y
Yu Yang 已提交
62 63 64 65
#endif
      });
    }
  }
Y
Yu Yang 已提交
66

67 68 69 70 71 72 73 74 75 76 77
  ~ParallelExecutorPrivate() {
    if (own_local_scope_) {
      for (size_t i = 1; i < local_scopes_.size(); ++i) {
        // Skip the first scope, since it is the global scope.
        Scope *local_scope = local_scopes_[i];
        if (global_scope_->HasKid(local_scope)) {
          global_scope_->DeleteScope(local_scope);
        }
      }
    }
  }
S
sneaxiy 已提交
78

79
  ir::Graph *PrepareGCAndRefCnts(ir::Graph *graph, size_t max_memory_size);
S
sneaxiy 已提交
80 81 82 83 84 85 86 87 88 89 90 91

  inline bool HasGarbageCollectors() const { return !gcs_.empty(); }

  void ResetRuntimeReferenceCount(const std::vector<std::string> &fetch_tensors,
                                  const std::string &fetched_var_name) {
    for (size_t i = 0; i < runtime_ref_cnts_.size(); ++i) {
      for (auto &pair : global_ref_cnts_[i]) {
        runtime_ref_cnts_[i][pair.first] = pair.second;
      }

      for (auto &fetch_name : fetch_tensors) {
        runtime_ref_cnts_[i].erase(fetch_name);
S
sneaxiy 已提交
92
      }
S
sneaxiy 已提交
93
      runtime_ref_cnts_[i].erase(fetched_var_name);
S
sneaxiy 已提交
94 95 96
    }
  }

97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
  void InitNCCLCtxs(framework::Scope *scope, const BuildStrategy &bst) {
    VLOG(1) << "nccl comm num:" << bst.nccl_comm_num_ << ", nranks:" << nranks_
            << ", num_trainers:" << bst.num_trainers_
            << ", trainer_id:" << bst.trainer_id_;

    if (bst.use_hierarchical_allreduce_) {
      VLOG(1) << ", use_hierarchical_allreduce:"
              << bst.use_hierarchical_allreduce_ << ", inter_trainers_num:"
              << bst.hierarchical_allreduce_inter_nranks_
              << ", exter_trainers_num:"
              << bst.hierarchical_allreduce_exter_nranks_;
    }

    std::vector<ncclUniqueId *> flat_nccl_ids;
    if (nranks_ == 1) {
      // FIXME(gongwb): need not to create ncclid when nranks==1
114 115
      nccl_ctxs_->InitFlatCtxs(places_, flat_nccl_ids, bst.num_trainers_,
                               bst.trainer_id_);
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
      return;
    }

    if (bst.enable_parallel_graph_) {
      VLOG(1) << "use only one ncclid in pg model";

      ncclUniqueId *nccl_id = nullptr;

      std::string var_name = platform::GetFlatNCCLVarName(0);
      auto nccl_id_var = scope->FindVar(var_name);
      if (nccl_id_var) {
        nccl_id = nccl_id_var->GetMutable<ncclUniqueId>();
      } else {
        nccl_id = new ncclUniqueId();
        PADDLE_ENFORCE(platform::dynload::ncclGetUniqueId(nccl_id));
      }

      flat_nccl_ids.push_back(nccl_id);

135 136
      nccl_ctxs_->InitFlatCtxs(places_, flat_nccl_ids, bst.num_trainers_,
                               bst.trainer_id_);
137 138 139 140 141 142
      VLOG(1) << "init bst nccl context complete!";
      return;
    }

    // num_trainers ==1 && places > 1
    if (bst.num_trainers_ == 1) {
143 144
      nccl_ctxs_->InitFlatCtxs(places_, flat_nccl_ids, bst.num_trainers_,
                               bst.trainer_id_);
145 146 147 148 149 150 151 152 153 154 155
      return;
    }

    for (int i = 0; i < static_cast<int>(bst.nccl_comm_num_); i++) {
      std::string var_name = platform::GetFlatNCCLVarName(i);
      auto nccl_id_var = scope->FindVar(var_name);
      PADDLE_ENFORCE(nccl_id_var, "can't find %s nccl_id_var", var_name);
      auto nccl_id = nccl_id_var->GetMutable<ncclUniqueId>();
      flat_nccl_ids.push_back(nccl_id);
    }

156 157
    nccl_ctxs_->InitFlatCtxs(places_, flat_nccl_ids, bst.num_trainers_,
                             bst.trainer_id_);
158 159

    if (bst.use_hierarchical_allreduce_) {
G
gongweibao 已提交
160 161 162 163 164 165 166 167
      std::vector<ncclUniqueId *> inter_nccl_ids;
      for (int i = 0; i < static_cast<int>(bst.nccl_comm_num_); i++) {
        std::string var_name = platform::GetHierarchicalInterNCCLVarName(i);
        auto nccl_id_var = scope->FindVar(var_name);
        PADDLE_ENFORCE(nccl_id_var, "can't find %s nccl_id_var", var_name);
        auto inter_nccl_id = nccl_id_var->GetMutable<ncclUniqueId>();
        inter_nccl_ids.push_back(inter_nccl_id);
      }
168 169 170 171 172 173 174 175 176

      std::vector<ncclUniqueId *> exter_nccl_ids;
      for (int i = 0; i < static_cast<int>(bst.nccl_comm_num_); i++) {
        std::string var_name = platform::GetHierarchicalExterNCCLVarName(i);
        auto nccl_id_var = scope->FindVar(var_name);
        PADDLE_ENFORCE(nccl_id_var, "can't find %s nccl_id_var", var_name);
        auto nccl_id = nccl_id_var->GetMutable<ncclUniqueId>();
        exter_nccl_ids.push_back(nccl_id);
      }
G
gongweibao 已提交
177

178 179 180 181
      nccl_ctxs_->InitHierarchicalCtxs(
          places_, inter_nccl_ids, exter_nccl_ids, bst.num_trainers_,
          bst.trainer_id_, bst.hierarchical_allreduce_inter_nranks_,
          bst.hierarchical_allreduce_exter_nranks_);
182 183
    }
  }
184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201

  void InitOrGetNCCLCommunicator(framework::Scope *scope,
                                 const BuildStrategy &bst) {
    const std::string var_name = "NCCLCommunicator";
    auto var = scope->FindVar(var_name);
    if (var != nullptr) {
      PADDLE_ENFORCE(var->IsInitialized(),
                     "if %s exists, it must be initialized", var_name);
      VLOG(1) << "find " << var_name
              << " in scope, so use it and does not recreate!";
      nccl_ctxs_ = var->GetMutable<platform::NCCLCommunicator>();
      return;
    }

    VLOG(1) << "not find " << var_name << " in scope, so recreate it!";
    nccl_ctxs_ = scope->Var(var_name)->GetMutable<platform::NCCLCommunicator>();
    InitNCCLCtxs(scope, bst);
  }
202
#endif
203 204 205 206
  inline bool IsPersistable(const std::string &name) const {
    auto iter = is_persistable_.find(name);
    return iter != is_persistable_.end() && iter->second;
  }
207

D
dzhwinter 已提交
208
  BuildStrategy build_strategy_;
Y
Yu Yang 已提交
209 210
  std::vector<platform::Place> places_;
  std::vector<Scope *> local_scopes_;
211
  Scope *global_scope_;  // not owned
Y
Yu Yang 已提交
212
  std::unique_ptr<details::SSAGraphExecutor> executor_;
213
  std::unordered_map<std::string, bool> is_persistable_;
Y
Yu Yang 已提交
214

P
peizhilin 已提交
215
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
216
  platform::NCCLCommunicator *nccl_ctxs_{nullptr};
Y
Yu Yang 已提交
217
#endif
C
chengduoZH 已提交
218 219
  bool own_local_scope_;
  bool use_cuda_;
220
  bool use_all_reduce_;
221
  size_t nranks_;
S
sneaxiy 已提交
222

S
sneaxiy 已提交
223 224 225
  // global_ref_cnts_ is only initialized when ParallelExecutor constructs, and
  // then keeps unchanged
  // Before each iteration, runtime_ref_cnts_ is reset to global_ref_cnts_
226 227 228
  std::vector<ir::ReferenceCountMap> global_ref_cnts_;
  std::vector<ir::AtomicReferenceCountMap> runtime_ref_cnts_;
  ir::GarbageCollectorMap gcs_;
Y
Yu Yang 已提交
229 230
};

231 232
ir::Graph *ParallelExecutorPrivate::PrepareGCAndRefCnts(
    ir::Graph *graph, size_t max_memory_size) {
S
sneaxiy 已提交
233 234 235 236 237
  for (size_t i = 0; i < places_.size(); ++i) {
    auto &place = places_[i];
    if (gcs_.count(place) > 0) {
      continue;
    }
S
sneaxiy 已提交
238
    std::unique_ptr<GarbageCollector> gc;
S
sneaxiy 已提交
239
#ifdef PADDLE_WITH_CUDA
S
sneaxiy 已提交
240 241
    if (platform::is_gpu_place(place)) {
      if (IsFastEagerDeletionModeEnabled()) {
S
sneaxiy 已提交
242 243
        gc.reset(new UnsafeFastGPUGarbageCollector(
            boost::get<platform::CUDAPlace>(place), max_memory_size));
S
sneaxiy 已提交
244
      } else {
S
sneaxiy 已提交
245 246
        gc.reset(new StreamGarbageCollector(
            boost::get<platform::CUDAPlace>(place), max_memory_size));
S
sneaxiy 已提交
247 248
      }
      VLOG(10) << "Created " << i << "-th GarbageCollector at " << place;
S
sneaxiy 已提交
249
    } else {
S
sneaxiy 已提交
250
#endif
S
sneaxiy 已提交
251 252 253 254 255 256 257
      if (platform::is_cpu_place(place)) {
        gc.reset(new CPUGarbageCollector(boost::get<platform::CPUPlace>(place),
                                         max_memory_size));
        VLOG(10) << "Created GarbageCollector at " << place;
      } else {
        PADDLE_THROW("Unsupported place for garbage collection");
      }
S
sneaxiy 已提交
258 259 260 261
#ifdef PADDLE_WITH_CUDA
    }
#endif

S
sneaxiy 已提交
262
    gcs_.emplace(place, std::move(gc));
S
sneaxiy 已提交
263 264
  }

S
sneaxiy 已提交
265
  if (!gcs_.empty()) {
266
    std::vector<ir::LastLiveOpsOfVars> last_live_ops_of_vars;
S
sneaxiy 已提交
267 268 269

    auto ref_cnt_pass =
        ir::PassRegistry::Instance().Get("reference_count_pass");
270 271
    ref_cnt_pass->SetNotOwned(ir::kGlobalReferenceCount, &global_ref_cnts_);
    ref_cnt_pass->SetNotOwned(ir::kLastLiveOpsOfVars, &last_live_ops_of_vars);
272
    graph = ref_cnt_pass->Apply(graph);
S
sneaxiy 已提交
273 274 275 276
    VLOG(10) << "ReferenceCountPass Applied";

    auto eager_deletion_pass =
        ir::PassRegistry::Instance().Get("eager_deletion_pass");
277
    eager_deletion_pass->SetNotOwned(ir::kRuntimeReferenceCount,
S
sneaxiy 已提交
278
                                     &runtime_ref_cnts_);
279 280
    eager_deletion_pass->SetNotOwned(ir::kGarbageCollector, &gcs_);
    eager_deletion_pass->SetNotOwned(ir::kLastLiveOpsOfVars,
S
sneaxiy 已提交
281
                                     &last_live_ops_of_vars);
282
    eager_deletion_pass->SetNotOwned(ir::kAllPlaces, &places_);
283
    graph = eager_deletion_pass->Apply(graph);
S
sneaxiy 已提交
284 285 286 287 288
    VLOG(10) << "EagerDeletionPass Applied";
  }
  return graph;
}

289 290 291 292
std::vector<Scope *> &ParallelExecutor::GetLocalScopes() {
  return member_->local_scopes_;
}

293 294 295 296 297 298 299 300 301 302 303 304 305 306
void ParallelExecutor::DropLocalExeScopes() {
  auto executor = dynamic_cast<details::ScopeBufferedSSAGraphExecutor *>(
      member_->executor_.get());
  if (executor) {
    executor->DropLocalExeScopes();
  }
}

bool ParallelExecutor::NeedCreateLocalExeScope() {
  auto executor = dynamic_cast<details::ScopeBufferedSSAGraphExecutor *>(
      member_->executor_.get());
  return executor && executor->NeedCreateLocalExeScope();
}

Y
Yan Xu 已提交
307 308 309 310 311 312 313 314
ParallelExecutor::ParallelExecutor(const std::vector<platform::Place> &places,
                                   const std::vector<std::string> &bcast_vars,
                                   const std::string &loss_var_name,
                                   Scope *scope,
                                   const std::vector<Scope *> &local_scopes,
                                   const ExecutionStrategy &exec_strategy,
                                   const BuildStrategy &build_strategy,
                                   ir::Graph *graph)
Y
Yu Yang 已提交
315
    : member_(new ParallelExecutorPrivate(places)) {
Y
Yu Yang 已提交
316
  member_->global_scope_ = scope;
317
  member_->use_cuda_ = exec_strategy.use_cuda_;
D
dzhwinter 已提交
318
  member_->build_strategy_ = build_strategy;
319 320
  member_->use_all_reduce_ = member_->build_strategy_.reduce_ ==
                             BuildStrategy::ReduceStrategy::kAllReduce;
X
Xin Pan 已提交
321
  member_->nranks_ = build_strategy.num_trainers_ * places.size();
C
chengduo 已提交
322 323 324 325 326 327 328
  if (!member_->use_all_reduce_ && member_->nranks_ == 1) {
    LOG(INFO) << "If you set build_strategy.reduce with 'Reduce',"
                 "the number of places should be greater than 1.";
    member_->build_strategy_.reduce_ =
        BuildStrategy::ReduceStrategy::kAllReduce;
    member_->use_all_reduce_ = true;
  }
329 330 331 332 333
#if defined(PADDLE_WITH_CUDA) && defined(_WIN32)
  if (member_->use_cuda_) {
    PADDLE_ENFORCE(places.size() == 1, "Windows can support Single GPU only.");
  }
#endif
334
  LOG(INFO) << string::Sprintf(
335 336 337 338
      "The number of %s, which is used in ParallelExecutor, is %lu. And "
      "the Program will be copied %lu copies",
      (member_->use_cuda_ ? "CUDAPlace" : "CPUPlace"), places.size(),
      places.size());
339
  // Step 1. Bcast the bcast_vars to devs.
Y
Yu Yang 已提交
340
  // Create local scopes
341
  if (local_scopes.empty()) {
C
chengduoZH 已提交
342
    member_->own_local_scope_ = true;
Y
Yu Yang 已提交
343 344
    member_->local_scopes_.emplace_back(member_->global_scope_);
    for (size_t i = 1; i < member_->places_.size(); ++i) {
Y
Debug  
Yu Yang 已提交
345
      member_->local_scopes_.emplace_back(&scope->NewScope());
346 347
    }
  } else {
C
chengduoZH 已提交
348
    member_->own_local_scope_ = false;
349 350
    PADDLE_ENFORCE_EQ(member_->places_.size(), local_scopes.size());
    for (size_t i = 0; i < member_->places_.size(); ++i) {
351
      member_->local_scopes_.emplace_back(&local_scopes[i]->NewScope());
352
    }
Y
Yu Yang 已提交
353 354
  }

Q
Qiao Longfei 已提交
355
  std::vector<ir::Graph *> graphs;
356
  if (member_->build_strategy_.async_mode_) {
Q
Qiao Longfei 已提交
357 358
    PADDLE_ENFORCE(!member_->use_cuda_,
                   "gpu mode does not support async_mode_ now!");
Q
Qiao Longfei 已提交
359
    graphs.push_back(graph);
D
dongdaxiang 已提交
360
    for (size_t i = 1; i < places.size(); ++i) {
Q
Qiao Longfei 已提交
361 362 363 364
      auto *tmp_graph = new ir::Graph(graph->OriginProgram());
      async_graphs_.emplace_back(tmp_graph);
      graphs.push_back(tmp_graph);
    }
Q
Qiao Longfei 已提交
365
  }
Q
Qiao Longfei 已提交
366

Y
Yancey1989 已提交
367 368 369
  // FIXME(Yancey1989): parallel graph mode get better performance
  // in GPU allreduce distributed training. Need an elegant way to
  // choice the execution strategy.
370 371 372 373
  member_->build_strategy_.enable_parallel_graph_ =
      EnableParallelGraphExecution(*graph, exec_strategy,
                                   member_->build_strategy_);
  if (member_->build_strategy_.enable_parallel_graph_) {
374 375 376 377
    LOG(INFO) << "The Executor would execute the graph by ParallelGraph "
                 "Execution which can get better performance,"
              << "you can force it off by env FLAGS_enable_parallel_graph=0";
  }
Y
Yancey1989 已提交
378

379
  if (member_->use_cuda_ && member_->nranks_ > 1) {
P
peizhilin 已提交
380
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
381
    member_->InitOrGetNCCLCommunicator(scope, member_->build_strategy_);
Q
qingqing01 已提交
382

W
Wu Yi 已提交
383 384 385
    // Initialize device context's nccl comm, will be used by normal
    // Operators like sync_batch_norm, and collective ops.
    // NOTE: more than one ParallelExecutor with same place, the nccl comm will
Q
qingqing01 已提交
386
    // be rewrite and there will be some problem.
W
Wu Yi 已提交
387 388 389
    // NOTE: NCCL group-calls and non-group-calls can not use the same
    // NCCL communicator, so for ParallelGraph and Multi-Process mode, re-use
    // same communicators.
390 391
    auto *nccl_ctxs =
        member_->nccl_ctxs_->GetSyncBatchNormCtx(scope, member_->places_);
Q
qingqing01 已提交
392 393 394 395 396
    for (size_t dev_id = 0; dev_id < member_->places_.size(); ++dev_id) {
      platform::DeviceContextPool &pool =
          platform::DeviceContextPool::Instance();
      auto *dev_ctx = static_cast<platform::CUDADeviceContext *>(
          pool.Get(member_->places_[dev_id]));
397
      auto &nccl_ctx = nccl_ctxs->at(member_->places_[dev_id]);
398
      dev_ctx->set_nccl_comm(nccl_ctx.comm());
Q
qingqing01 已提交
399
    }
Y
Yu Yang 已提交
400
#endif
C
chengduoZH 已提交
401
  }
Y
Yan Xu 已提交
402 403
  // broadcast parameters from the 0th device to others:
  auto need_broadcast = [&]() -> bool {
404
    if (member_->build_strategy_.num_trainers_ > 1) {
Y
Yan Xu 已提交
405 406 407 408 409 410 411 412 413
      // 1. num_tariners would be grater than 1 for nccl distributed training.
      return true;
    } else if (member_->local_scopes_.size() != 1 && local_scopes.empty()) {
      // 2. Only one trainer process, but ParallelExecutor hold multiple
      // devices.
      return true;
    }
    return false;
  };
414
  // Bcast Parameters to all GPUs
Y
Yan Xu 已提交
415
  if (need_broadcast()) {
416
    BCastParamsToDevices(bcast_vars, member_->build_strategy_.trainer_id_);
Y
Yu Yang 已提交
417
  }
418

Q
Qiao Longfei 已提交
419
  // Startup Program has been run. All local scopes has correct parameters.
Y
yuyang18 已提交
420

Q
Qiao Longfei 已提交
421 422 423
  // Step 2. Convert main_program to SSA form and dependency graph. Also, insert
  // ncclOp
  std::vector<ir::Graph *> async_graphs(places.size());
P
peizhilin 已提交
424
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
425
  if (member_->build_strategy_.async_mode_) {
Q
Qiao Longfei 已提交
426
    VLOG(3) << "use local async mode";
427 428 429 430
    graph = member_->build_strategy_.Apply(
        graph, {member_->places_[0]}, loss_var_name,
        {member_->local_scopes_[0]}, 1, member_->use_cuda_,
        member_->nccl_ctxs_);
D
dongdaxiang 已提交
431
    for (size_t i = 1; i < member_->places_.size(); ++i) {
432 433 434 435
      graphs[i] = member_->build_strategy_.Apply(
          graphs[i], {member_->places_[i]}, loss_var_name,
          {member_->local_scopes_[i]}, 1, member_->use_cuda_,
          member_->nccl_ctxs_);
436
      async_graphs[i] = graphs[i];
Q
Qiao Longfei 已提交
437
    }
Q
Qiao Longfei 已提交
438
  } else {
439 440 441
    graph = member_->build_strategy_.Apply(
        graph, member_->places_, loss_var_name, member_->local_scopes_,
        member_->nranks_, member_->use_cuda_, member_->nccl_ctxs_);
Q
Qiao Longfei 已提交
442
  }
C
chengduoZH 已提交
443
#else
444
  if (member_->build_strategy_.async_mode_) {
Q
Qiao Longfei 已提交
445
    VLOG(3) << "use local async mode";
446 447 448
    graph = member_->build_strategy_.Apply(
        graph, {member_->places_[0]}, loss_var_name,
        {member_->local_scopes_[0]}, 1, member_->use_cuda_);
449
    for (size_t i = 1; i < member_->places_.size(); ++i) {
450
      graphs[i] = member_->build_strategy_.Apply(
451
          graphs[i], {member_->places_[i]}, loss_var_name,
Q
Qiao Longfei 已提交
452
          {member_->local_scopes_[i]}, 1, member_->use_cuda_);
453
      async_graphs[i] = graphs[i];
Q
Qiao Longfei 已提交
454
    }
Q
can run  
Qiao Longfei 已提交
455
  } else {
456 457 458
    graph = member_->build_strategy_.Apply(
        graph, member_->places_, loss_var_name, member_->local_scopes_,
        member_->nranks_, member_->use_cuda_);
Q
can run  
Qiao Longfei 已提交
459
  }
Y
Yu Yang 已提交
460
#endif
461

Y
Yancey1989 已提交
462
  auto max_memory_size = GetEagerDeletionThreshold();
D
dzhwinter 已提交
463 464
  VLOG(10) << "Eager Deletion Threshold "
           << static_cast<float>(max_memory_size) / (1 << 30);
Y
Yancey1989 已提交
465
  if (max_memory_size >= 0) {
466 467
    graph = member_->PrepareGCAndRefCnts(graph,
                                         static_cast<size_t>(max_memory_size));
Y
Yancey1989 已提交
468 469
  }

Q
Qiao Longfei 已提交
470 471
  async_graphs[0] = graph;

472 473
  // Step 3. Create vars in each scope. Passes may also create new vars.
  //         skip control vars and empty vars
Y
Yancey1989 已提交
474
  std::vector<details::VariableInfo> var_infos;
Q
Qiao Longfei 已提交
475 476 477 478 479 480
  for (auto &node : graph->Nodes()) {
    if (node->IsVar() && !node->IsCtrlVar() && node->Var()) {
      var_infos.emplace_back();
      var_infos.back().name_ = node->Var()->Name();
      var_infos.back().type_ = node->Var()->GetType();
      var_infos.back().persistable_ = node->Var()->Persistable();
481 482
      member_->is_persistable_.emplace(node->Var()->Name(),
                                       node->Var()->Persistable());
Y
Yancey1989 已提交
483 484
    }
  }
Y
Yancey1989 已提交
485

W
Wu Yi 已提交
486 487
  // If the loss_var_name is given, the number of graph should be only one.
  if (loss_var_name.size()) {
Q
Qiao Longfei 已提交
488
    size_t graph_num = ir::GraphNum(*graph);
C
chengduo 已提交
489 490 491 492
    if (graph_num > 1) {
      LOG(WARNING)
          << "The number of graph should be only one, "
             "but the current graph has "
Q
Qiao Longfei 已提交
493
          << ir::GraphNum(*graph)
C
chengduo 已提交
494 495 496 497 498
          << " sub_graphs. If you want to see the nodes of the "
             "sub_graphs, you should use 'FLAGS_print_sub_graph_dir' "
             "to specify the output dir. NOTES: if you not do training, "
             "please don't pass loss_var_name.";
    }
W
Wu Yi 已提交
499 500
  }

501
  if (member_->build_strategy_.async_mode_) {
Q
can run  
Qiao Longfei 已提交
502 503
    VLOG(3) << "use AsyncSSAGraphExecutor";
    member_->executor_.reset(new details::AsyncSSAGraphExecutor(
Q
Qiao Longfei 已提交
504
        exec_strategy, member_->local_scopes_, member_->places_, async_graphs));
505
  } else if (member_->build_strategy_.enable_parallel_graph_) {
Q
can run  
Qiao Longfei 已提交
506
    VLOG(3) << "use ParallelSSAGraphExecutor";
Y
Yancey1989 已提交
507
#ifdef PADDLE_WITH_CUDA
Y
Yancey1989 已提交
508 509
    // TODO(Yancey1989): Remove passing in the main_program when
    // allreduce_seq_pass doesn't need it as the attr.
Y
Yancey1989 已提交
510
    member_->executor_.reset(new details::ParallelSSAGraphExecutor(
X
Xin Pan 已提交
511
        exec_strategy, member_->local_scopes_, member_->places_, graph));
Y
Yancey1989 已提交
512 513 514 515
#else
    PADDLE_THROW(
        "Paddle should be compiled with CUDA for ParallelGraph Execution.");
#endif
Y
yuyang18 已提交
516
  } else {
Y
Yancey1989 已提交
517
    if (exec_strategy.type_ == ExecutionStrategy::kDefault) {
Q
can run  
Qiao Longfei 已提交
518
      VLOG(3) << "use ThreadedSSAGraphExecutor";
Y
Yancey1989 已提交
519
      member_->executor_.reset(new details::ThreadedSSAGraphExecutor(
X
Xin Pan 已提交
520
          exec_strategy, member_->local_scopes_, member_->places_, graph));
Y
Yancey1989 已提交
521
    } else {
Q
can run  
Qiao Longfei 已提交
522
      VLOG(3) << "use FastThreadedSSAGraphExecutor";
Y
Yancey1989 已提交
523
      member_->executor_.reset(new details::FastThreadedSSAGraphExecutor(
X
Xin Pan 已提交
524
          exec_strategy, member_->local_scopes_, member_->places_, graph));
Y
Yancey1989 已提交
525
    }
C
chengduoZH 已提交
526
  }
Y
yuyang18 已提交
527

Q
can run  
Qiao Longfei 已提交
528
  VLOG(3) << "use ScopeBufferedSSAGraphExecutor";
529
  if (!member_->build_strategy_.async_mode_) {
Q
Qiao Longfei 已提交
530 531 532 533
    member_->executor_.reset(new details::ScopeBufferedSSAGraphExecutor(
        exec_strategy, member_->local_scopes_, std::move(var_infos),
        member_->places_, std::move(member_->executor_)));
  }
Y
Yu Yang 已提交
534 535
}

Y
Yancey1989 已提交
536
void ParallelExecutor::BCastParamsToDevices(
Y
Yan Xu 已提交
537
    const std::vector<std::string> &vars, int trainer_id) const {
Q
Qiao Longfei 已提交
538
  VLOG(3) << "BCastParamsToDevices";
X
Xin Pan 已提交
539
  // the initializing bcast, all vars would be bcast from device(0).
540
  for (auto &var : vars) {
X
Xin Pan 已提交
541
    framework::Variable *main_var = member_->local_scopes_[0]->FindVar(var);
J
JiayiFeng 已提交
542
    if (main_var == nullptr || !main_var->IsType<LoDTensor>()) {
543 544 545 546
      continue;
    }

    auto &main_tensor = main_var->Get<LoDTensor>();
547
    if (!main_tensor.IsInitialized()) {
M
minqiyang 已提交
548
      VLOG(3) << "one in var not inited, return!";
549 550
      continue;
    }
551 552
    auto &dims = main_tensor.dims();
    if (paddle::platform::is_gpu_place(main_tensor.place())) {
P
peizhilin 已提交
553
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
554
      std::vector<void *> buffers;
C
chengduo 已提交
555
      buffers.reserve(member_->places_.size());
556 557 558 559 560
      size_t numel = main_tensor.numel();
      ncclDataType_t data_type = platform::ToNCCLDataType(main_tensor.type());
      for (size_t i = 0; i < member_->places_.size(); ++i) {
        auto place = member_->places_[i];
        void *buffer;
561

Y
Yan Xu 已提交
562
        if (i == 0 && trainer_id == 0) {
563 564
          buffer = const_cast<void *>(main_tensor.data<void>());
        } else {
Y
Yu Yang 已提交
565
          auto local_scope = member_->local_scopes_[i];
566
          auto *t = local_scope->Var(var)->GetMutable<LoDTensor>();
Y
Update  
Yu Yang 已提交
567
          t->Resize(dims);
568
          buffer = t->mutable_data(place, main_tensor.type());
Y
Update  
Yu Yang 已提交
569
        }
570
        buffers.push_back(buffer);
571
      }
572

573 574 575
      PADDLE_ENFORCE_EQ(member_->places_.size(), buffers.size(),
                        "variables' buffer size to bcast NOT equal to places");
      {
576
        auto *nccl_ctxs = member_->nccl_ctxs_->DefaultFlatCtx();
577 578
        platform::NCCLGroupGuard guard;
        for (size_t i = 0; i < member_->places_.size(); ++i) {
579
          auto &nccl_ctx = nccl_ctxs->at(member_->places_[i]);
X
Xin Pan 已提交
580 581
          platform::dynload::ncclBcast(buffers[i], numel, data_type, 0,
                                       nccl_ctx.comm_, nccl_ctx.stream());
582
        }
583
        nccl_ctxs->WaitAll();
584
      }
C
chengduoZH 已提交
585
#endif
586 587
    } else {
      platform::CPUPlace cpu;
C
chengduo 已提交
588
      for (size_t i = 1; i < member_->places_.size(); ++i) {
589 590
        auto local_scope = member_->local_scopes_[i];
        auto *t = local_scope->Var(var)->GetMutable<LoDTensor>();
C
chengduo 已提交
591

Q
Qiao Longfei 已提交
592
        auto copy_memory = [&] {
593 594 595
          t->Resize(dims);
          t->mutable_data(cpu, main_tensor.type());
          paddle::framework::TensorCopy(main_tensor, cpu, t);
Q
can run  
Qiao Longfei 已提交
596 597
        };

Q
Qiao Longfei 已提交
598
        auto share_memory = [&] { t->ShareDataWith(main_tensor); };
Q
can run  
Qiao Longfei 已提交
599 600 601 602 603 604 605

        // FIXME(zcd): LR_DECAY_COUNTER should not be shared. This is a hot fix.
        if (member_->build_strategy_.async_mode_) {
          share_memory();
        } else if (member_->use_all_reduce_ || member_->use_cuda_ ||
                   var == "@LR_DECAY_COUNTER@") {
          copy_memory();
606
        } else {
Q
can run  
Qiao Longfei 已提交
607
          share_memory();
608
        }
Y
Yu Yang 已提交
609
      }
Y
Stash  
Yu Yang 已提交
610 611
    }
  }
Y
Yu Yang 已提交
612
}
Y
Yu Yang 已提交
613

Y
Yu Yang 已提交
614 615
void ParallelExecutor::Run(const std::vector<std::string> &fetch_tensors,
                           const std::string &fetched_var_name) {
616
  VLOG(3) << "enter ParallelExecutor Run";
Y
Yu Yang 已提交
617 618 619
#ifdef WITH_GPERFTOOLS
  if (gProfileStarted) {
    ProfilerFlush();
S
sneaxiy 已提交
620 621
  }
#endif
Y
Yu Yang 已提交
622

X
Xin Pan 已提交
623
  platform::RecordBlock b(0);
S
sneaxiy 已提交
624
  if (member_->HasGarbageCollectors()) {
625
    platform::RecordEvent event("PrepareGarbageCollectors");
S
sneaxiy 已提交
626
    member_->ResetRuntimeReferenceCount(fetch_tensors, fetched_var_name);
S
sneaxiy 已提交
627
  }
628 629

  VLOG(3) << "ParallelExecutor begin to run member_->executor_->Run";
S
sneaxiy 已提交
630 631 632
  auto fetch_data = member_->executor_->Run(fetch_tensors);
  *member_->global_scope_->Var(fetched_var_name)->GetMutable<FeedFetchList>() =
      fetch_data;
Y
Yu Yang 已提交
633
}
Y
Yu Yang 已提交
634

Y
Yu Yang 已提交
635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651
void ParallelExecutor::FeedTensorsIntoLocalScopes(
    const std::vector<std::unordered_map<std::string, LoDTensor>> &tensors) {
  PADDLE_ENFORCE_EQ(member_->local_scopes_.size(), tensors.size());

  for (size_t i = 0; i < tensors.size(); ++i) {
    auto &map = tensors[i];
    auto *scope = member_->local_scopes_[i];
    for (auto &pair : map) {
      auto *trg = scope->Var(pair.first)->GetMutable<LoDTensor>();
      trg->ShareDataWith(pair.second);
      trg->set_lod(pair.second.lod());
    }
  }
}

void ParallelExecutor::FeedAndSplitTensorIntoLocalScopes(
    const std::unordered_map<std::string, LoDTensor> &tensors) {
652 653 654 655 656 657
  size_t num_places = member_->places_.size();
  for (auto &pair : tensors) {
    bool is_persistable = member_->IsPersistable(pair.first);
    VLOG(3) << "Split " << (is_persistable ? "persistable" : "no persistable")
            << " data (" << pair.first << "), dim:" << pair.second.dims()
            << ", place: " << pair.second.place();
Y
Yu Yang 已提交
658
    auto lod_tensors = pair.second.SplitLoDTensor(member_->places_);
659 660
    bool is_cpu_place = platform::is_cpu_place(member_->places_.front());
    if (!is_persistable && num_places != lod_tensors.size()) {
C
chengduo 已提交
661
      auto error_info = string::Sprintf(
662 663 664
          "The number(%d) of samples[%s] of current batch is less than the "
          "count(%d) of devices(%s), currently, it is not allowed. ",
          lod_tensors.size(), pair.first, num_places,
C
chengduo 已提交
665 666 667 668 669 670 671
          (is_cpu_place ? "CPU" : "GPU"));
      if (is_cpu_place) {
        error_info +=
            "You should set the environment variable CPU_NUM in the system "
            "to determine the number of devices you need.";
      }
      PADDLE_THROW(error_info);
672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697
    } else if (is_persistable) {
      if (lod_tensors.size() == 1) {
        lod_tensors.reserve(num_places);
        auto &tensor = lod_tensors.front();
        PADDLE_ENFORCE_EQ(tensor.dims(), pair.second.dims(),
                          "The dim doesn't match.");
        PADDLE_ENFORCE_EQ(tensor.place(), member_->places_.at(0),
                          "The place doesn't match.");
        for (size_t i = 1; i < num_places; ++i) {
          lod_tensors.emplace_back();
          auto &tmp = lod_tensors.back();
          framework::TensorCopy(pair.second, member_->places_.at(i), &tmp);
        }
      }
      if (lod_tensors.size() != num_places) {
        auto error_info = string::Sprintf(
            "The number(%d) of samples[%s] of the current batch does not match "
            "the count(%d) of devices(%s). Because that %s is a persistable "
            "variable, you can feed just one sample, in that case, the input "
            "sample will be copied in %d copies and be sent to different "
            "places separately. If you need that different place has different "
            "value, you should feed %d samples.",
            lod_tensors.size(), pair.first, num_places,
            (is_cpu_place ? "CPU" : "GPU"), pair.first, num_places, num_places);
        PADDLE_THROW(error_info);
      }
C
chengduo 已提交
698
    }
699 700 701 702 703
    PADDLE_ENFORCE_EQ(
        lod_tensors.size(), num_places,
        "The number(%d) of samples of the current batch does not match the "
        "count(%d) of devices.",
        lod_tensors.size(), num_places);
X
Xin Pan 已提交
704 705
    for (size_t j = 0; j < member_->places_.size(); ++j) {
      // TODO(panxy0718): Do I need to delete this var?
706
      auto t =
Y
Yu Yang 已提交
707
          member_->local_scopes_[j]->Var(pair.first)->GetMutable<LoDTensor>();
708 709
      t->ShareDataWith(lod_tensors[j]);
      t->set_lod(lod_tensors[j].lod());
X
Xin Pan 已提交
710 711 712 713
    }
  }
}

X
Xin Pan 已提交
714 715 716 717 718 719 720
ParallelExecutor::~ParallelExecutor() {
  for (auto &p : member_->places_) {
    platform::DeviceContextPool::Instance().Get(p)->Wait();
  }
  delete member_;
}

721
bool ParallelExecutor::EnableParallelGraphExecution(
X
Xin Pan 已提交
722
    const ir::Graph &graph, const ExecutionStrategy &exec_strategy,
723
    const BuildStrategy &build_strategy) const {
724 725 726
  if (!FLAGS_enable_parallel_graph) {
    return false;
  }
727

Y
Yancey1989 已提交
728
  bool enable_parallel_graph = true;
729

X
Xin Pan 已提交
730 731 732 733 734 735 736 737 738 739 740 741 742
  for (ir::Node *node : graph.Nodes()) {
    if (node->IsVar() && node->Var()) {
      // TODO(Yancey1989): support sparse update in ParallelGraph mode.
      if (node->Var()->GetType() == proto::VarType::SELECTED_ROWS) {
        enable_parallel_graph = false;
        break;
      }
    } else if (node->IsOp() && node->Op()) {
      // TODO(Yancey1989): support pserver mode
      if (node->Op()->Type() == "send" || node->Op()->Type() == "recv") {
        enable_parallel_graph = false;
        break;
      }
743 744 745
    }
  }

746
  if (!member_->use_all_reduce_ || !member_->use_cuda_) {
Y
Yancey1989 已提交
747
    if (build_strategy.enable_sequential_execution_ ||
748
        exec_strategy.type_ == ExecutionStrategy::ExecutorType::kExperimental) {
Y
Yancey1989 已提交
749
      enable_parallel_graph = false;
750 751 752 753 754 755 756 757 758
    }
  }

#ifdef WIN32
  VLOG(1) << "Windows has no support to parallel graph, enable_parallel_graph "
             "would be forced to false.";
  enable_parallel_graph = false;
#endif

Y
Yancey1989 已提交
759
  return enable_parallel_graph;
760 761
}

Y
Yu Yang 已提交
762
}  // namespace framework
Y
Yang Yang 已提交
763
}  // namespace paddle
S
sneaxiy 已提交
764

S
sneaxiy 已提交
765
USE_PASS(reference_count_pass);
S
sneaxiy 已提交
766
USE_PASS(eager_deletion_pass);