parallel_executor.cc 24.9 KB
Newer Older
Y
Yang Yang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

#include "paddle/fluid/framework/parallel_executor.h"
D
dzhwinter 已提交
16
#include <algorithm>
Q
qingqing01 已提交
17
#include <memory>
C
chengduoZH 已提交
18
#include <string>
19
#include <tuple>
Q
Qiao Longfei 已提交
20
#include <utility>
Q
qiaolongfei 已提交
21
#include <vector>
Q
Qiao Longfei 已提交
22
#include "paddle/fluid/framework/details/async_ssa_graph_executor.h"
Y
yuyang18 已提交
23
#include "paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.h"
24
#include "paddle/fluid/framework/details/multi_devices_helper.h"
Y
Yancey1989 已提交
25
#include "paddle/fluid/framework/details/parallel_ssa_graph_executor.h"
Y
yuyang18 已提交
26
#include "paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.h"
Y
Yu Yang 已提交
27
#include "paddle/fluid/framework/details/threaded_ssa_graph_executor.h"
28 29
#include "paddle/fluid/framework/ir/graph.h"
#include "paddle/fluid/framework/ir/graph_helper.h"
30
#include "paddle/fluid/framework/ir/memory_optimize_pass/reference_count_pass_helper.h"
31
#include "paddle/fluid/platform/profiler.h"
Y
Yu Yang 已提交
32

Y
Yu Yang 已提交
33
#ifdef WITH_GPERFTOOLS
Y
Yu Yang 已提交
34
#include "gperftools/profiler.h"
Y
Yu Yang 已提交
35
#endif
Y
Yu Yang 已提交
36
DEFINE_string(pe_profile_fname, "",
Y
Yu Yang 已提交
37 38
              "Profiler filename for PE, which generated by gperftools."
              "Only valid when compiled `WITH_PRIFILER=ON`. Empty if disable.");
39
DEFINE_bool(enable_parallel_graph, false,
Y
Yancey1989 已提交
40
            "Force disable parallel graph execution mode if set false.");
Y
Yu Yang 已提交
41

Y
Yang Yang 已提交
42
namespace paddle {
Y
Yu Yang 已提交
43 44
namespace framework {

Y
Yu Yang 已提交
45
static std::once_flag gProfileOnce;
Y
Yu Yang 已提交
46
#ifdef WITH_GPERFTOOLS
Y
Yu Yang 已提交
47
static bool gProfileStarted = false;
Y
Yu Yang 已提交
48
#endif
49

Y
Yu Yang 已提交
50 51 52
class ParallelExecutorPrivate {
 public:
  explicit ParallelExecutorPrivate(const std::vector<platform::Place> &places)
Y
Yu Yang 已提交
53
      : places_(places) {
Y
Yu Yang 已提交
54
    if (!FLAGS_pe_profile_fname.empty()) {
Y
Yu Yang 已提交
55 56
      std::call_once(gProfileOnce, [] {
#ifdef WITH_GPERFTOOLS
Y
Yu Yang 已提交
57
        ProfilerStart(FLAGS_pe_profile_fname.c_str());
Y
Yu Yang 已提交
58 59 60
        gProfileStarted = true;
#else
        LOG(WARNING) << "Paddle is not compiled with gperftools. "
61
          "FLAGS_pe_profile_fname will be ignored";
Y
Yu Yang 已提交
62 63 64 65
#endif
      });
    }
  }
Y
Yu Yang 已提交
66

67 68 69 70 71 72 73 74 75 76 77
  ~ParallelExecutorPrivate() {
    if (own_local_scope_) {
      for (size_t i = 1; i < local_scopes_.size(); ++i) {
        // Skip the first scope, since it is the global scope.
        Scope *local_scope = local_scopes_[i];
        if (global_scope_->HasKid(local_scope)) {
          global_scope_->DeleteScope(local_scope);
        }
      }
    }
  }
S
sneaxiy 已提交
78

79
  ir::Graph *PrepareGCAndRefCnts(ir::Graph *graph, size_t max_memory_size);
S
sneaxiy 已提交
80 81 82 83 84 85 86 87 88 89 90 91

  inline bool HasGarbageCollectors() const { return !gcs_.empty(); }

  void ResetRuntimeReferenceCount(const std::vector<std::string> &fetch_tensors,
                                  const std::string &fetched_var_name) {
    for (size_t i = 0; i < runtime_ref_cnts_.size(); ++i) {
      for (auto &pair : global_ref_cnts_[i]) {
        runtime_ref_cnts_[i][pair.first] = pair.second;
      }

      for (auto &fetch_name : fetch_tensors) {
        runtime_ref_cnts_[i].erase(fetch_name);
S
sneaxiy 已提交
92
      }
S
sneaxiy 已提交
93
      runtime_ref_cnts_[i].erase(fetched_var_name);
S
sneaxiy 已提交
94 95 96
    }
  }

97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
  void InitNCCLCtxs(framework::Scope *scope, const BuildStrategy &bst) {
    VLOG(1) << "nccl comm num:" << bst.nccl_comm_num_ << ", nranks:" << nranks_
            << ", num_trainers:" << bst.num_trainers_
            << ", trainer_id:" << bst.trainer_id_;

    if (bst.use_hierarchical_allreduce_) {
      VLOG(1) << ", use_hierarchical_allreduce:"
              << bst.use_hierarchical_allreduce_ << ", inter_trainers_num:"
              << bst.hierarchical_allreduce_inter_nranks_
              << ", exter_trainers_num:"
              << bst.hierarchical_allreduce_exter_nranks_;
    }

    std::vector<ncclUniqueId *> flat_nccl_ids;
    if (nranks_ == 1) {
      // FIXME(gongwb): need not to create ncclid when nranks==1
      nccl_ctxs_.InitFlatCtxs(places_, flat_nccl_ids, bst.num_trainers_,
                              bst.trainer_id_);
      return;
    }

    if (bst.enable_parallel_graph_) {
      VLOG(1) << "use only one ncclid in pg model";

      ncclUniqueId *nccl_id = nullptr;

      std::string var_name = platform::GetFlatNCCLVarName(0);
      auto nccl_id_var = scope->FindVar(var_name);
      if (nccl_id_var) {
        nccl_id = nccl_id_var->GetMutable<ncclUniqueId>();
      } else {
        nccl_id = new ncclUniqueId();
        PADDLE_ENFORCE(platform::dynload::ncclGetUniqueId(nccl_id));
      }

      flat_nccl_ids.push_back(nccl_id);

      nccl_ctxs_.InitFlatCtxs(places_, flat_nccl_ids, bst.num_trainers_,
                              bst.trainer_id_);
      VLOG(1) << "init bst nccl context complete!";
      return;
    }

    // num_trainers ==1 && places > 1
    if (bst.num_trainers_ == 1) {
      nccl_ctxs_.InitFlatCtxs(places_, flat_nccl_ids, bst.num_trainers_,
                              bst.trainer_id_);
      return;
    }

    for (int i = 0; i < static_cast<int>(bst.nccl_comm_num_); i++) {
      std::string var_name = platform::GetFlatNCCLVarName(i);
      auto nccl_id_var = scope->FindVar(var_name);
      PADDLE_ENFORCE(nccl_id_var, "can't find %s nccl_id_var", var_name);
      auto nccl_id = nccl_id_var->GetMutable<ncclUniqueId>();
      flat_nccl_ids.push_back(nccl_id);
    }

    nccl_ctxs_.InitFlatCtxs(places_, flat_nccl_ids, bst.num_trainers_,
                            bst.trainer_id_);

    if (bst.use_hierarchical_allreduce_) {
      std::string var_name = platform::GetHierarchicalInterNCCLVarName();
      auto nccl_id_var = scope->FindVar(var_name);
      auto inter_nccl_id = nccl_id_var->GetMutable<ncclUniqueId>();

      std::vector<ncclUniqueId *> exter_nccl_ids;
      for (int i = 0; i < static_cast<int>(bst.nccl_comm_num_); i++) {
        std::string var_name = platform::GetHierarchicalExterNCCLVarName(i);
        auto nccl_id_var = scope->FindVar(var_name);
        PADDLE_ENFORCE(nccl_id_var, "can't find %s nccl_id_var", var_name);
        auto nccl_id = nccl_id_var->GetMutable<ncclUniqueId>();
        exter_nccl_ids.push_back(nccl_id);
      }
      nccl_ctxs_.InitHierarchicalCtxs(places_, inter_nccl_id, exter_nccl_ids,
                                      bst.num_trainers_, bst.trainer_id_,
                                      bst.hierarchical_allreduce_inter_nranks_,
                                      bst.hierarchical_allreduce_exter_nranks_);
    }
  }
#endif

D
dzhwinter 已提交
180
  BuildStrategy build_strategy_;
Y
Yu Yang 已提交
181 182
  std::vector<platform::Place> places_;
  std::vector<Scope *> local_scopes_;
183
  Scope *global_scope_;  // not owned
Y
Yu Yang 已提交
184
  std::unique_ptr<details::SSAGraphExecutor> executor_;
Y
Yu Yang 已提交
185

P
peizhilin 已提交
186
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
187
  platform::MultiNCCLContextMap nccl_ctxs_;
Y
Yu Yang 已提交
188
#endif
C
chengduoZH 已提交
189 190
  bool own_local_scope_;
  bool use_cuda_;
191
  bool use_all_reduce_;
192
  size_t nranks_;
S
sneaxiy 已提交
193

S
sneaxiy 已提交
194 195 196
  // global_ref_cnts_ is only initialized when ParallelExecutor constructs, and
  // then keeps unchanged
  // Before each iteration, runtime_ref_cnts_ is reset to global_ref_cnts_
197 198 199
  std::vector<ir::ReferenceCountMap> global_ref_cnts_;
  std::vector<ir::AtomicReferenceCountMap> runtime_ref_cnts_;
  ir::GarbageCollectorMap gcs_;
Y
Yu Yang 已提交
200 201
};

202 203
ir::Graph *ParallelExecutorPrivate::PrepareGCAndRefCnts(
    ir::Graph *graph, size_t max_memory_size) {
S
sneaxiy 已提交
204 205 206 207 208
  for (size_t i = 0; i < places_.size(); ++i) {
    auto &place = places_[i];
    if (gcs_.count(place) > 0) {
      continue;
    }
S
sneaxiy 已提交
209
    std::unique_ptr<GarbageCollector> gc;
S
sneaxiy 已提交
210
#ifdef PADDLE_WITH_CUDA
S
sneaxiy 已提交
211 212
    if (platform::is_gpu_place(place)) {
      if (IsFastEagerDeletionModeEnabled()) {
S
sneaxiy 已提交
213 214
        gc.reset(new UnsafeFastGPUGarbageCollector(
            boost::get<platform::CUDAPlace>(place), max_memory_size));
S
sneaxiy 已提交
215
      } else {
S
sneaxiy 已提交
216 217
        gc.reset(new StreamGarbageCollector(
            boost::get<platform::CUDAPlace>(place), max_memory_size));
S
sneaxiy 已提交
218 219
      }
      VLOG(10) << "Created " << i << "-th GarbageCollector at " << place;
S
sneaxiy 已提交
220
    } else {
S
sneaxiy 已提交
221
#endif
S
sneaxiy 已提交
222 223 224 225 226 227 228
      if (platform::is_cpu_place(place)) {
        gc.reset(new CPUGarbageCollector(boost::get<platform::CPUPlace>(place),
                                         max_memory_size));
        VLOG(10) << "Created GarbageCollector at " << place;
      } else {
        PADDLE_THROW("Unsupported place for garbage collection");
      }
S
sneaxiy 已提交
229 230 231 232
#ifdef PADDLE_WITH_CUDA
    }
#endif

S
sneaxiy 已提交
233
    gcs_.emplace(place, std::move(gc));
S
sneaxiy 已提交
234 235
  }

S
sneaxiy 已提交
236
  if (!gcs_.empty()) {
237
    std::vector<ir::LastLiveOpsOfVars> last_live_ops_of_vars;
S
sneaxiy 已提交
238 239 240

    auto ref_cnt_pass =
        ir::PassRegistry::Instance().Get("reference_count_pass");
241 242
    ref_cnt_pass->SetNotOwned(ir::kGlobalReferenceCount, &global_ref_cnts_);
    ref_cnt_pass->SetNotOwned(ir::kLastLiveOpsOfVars, &last_live_ops_of_vars);
243
    graph = ref_cnt_pass->Apply(graph);
S
sneaxiy 已提交
244 245 246 247
    VLOG(10) << "ReferenceCountPass Applied";

    auto eager_deletion_pass =
        ir::PassRegistry::Instance().Get("eager_deletion_pass");
248
    eager_deletion_pass->SetNotOwned(ir::kRuntimeReferenceCount,
S
sneaxiy 已提交
249
                                     &runtime_ref_cnts_);
250 251
    eager_deletion_pass->SetNotOwned(ir::kGarbageCollector, &gcs_);
    eager_deletion_pass->SetNotOwned(ir::kLastLiveOpsOfVars,
S
sneaxiy 已提交
252
                                     &last_live_ops_of_vars);
253
    eager_deletion_pass->SetNotOwned(ir::kAllPlaces, &places_);
254
    graph = eager_deletion_pass->Apply(graph);
S
sneaxiy 已提交
255 256 257 258 259
    VLOG(10) << "EagerDeletionPass Applied";
  }
  return graph;
}

260 261 262 263
std::vector<Scope *> &ParallelExecutor::GetLocalScopes() {
  return member_->local_scopes_;
}

264 265 266 267 268 269 270 271 272 273 274 275 276 277
void ParallelExecutor::DropLocalExeScopes() {
  auto executor = dynamic_cast<details::ScopeBufferedSSAGraphExecutor *>(
      member_->executor_.get());
  if (executor) {
    executor->DropLocalExeScopes();
  }
}

bool ParallelExecutor::NeedCreateLocalExeScope() {
  auto executor = dynamic_cast<details::ScopeBufferedSSAGraphExecutor *>(
      member_->executor_.get());
  return executor && executor->NeedCreateLocalExeScope();
}

Y
Yan Xu 已提交
278 279 280 281 282 283 284 285
ParallelExecutor::ParallelExecutor(const std::vector<platform::Place> &places,
                                   const std::vector<std::string> &bcast_vars,
                                   const std::string &loss_var_name,
                                   Scope *scope,
                                   const std::vector<Scope *> &local_scopes,
                                   const ExecutionStrategy &exec_strategy,
                                   const BuildStrategy &build_strategy,
                                   ir::Graph *graph)
Y
Yu Yang 已提交
286
    : member_(new ParallelExecutorPrivate(places)) {
Y
Yu Yang 已提交
287
  member_->global_scope_ = scope;
288
  member_->use_cuda_ = exec_strategy.use_cuda_;
D
dzhwinter 已提交
289
  member_->build_strategy_ = build_strategy;
290 291
  member_->use_all_reduce_ =
      build_strategy.reduce_ == BuildStrategy::ReduceStrategy::kAllReduce;
X
Xin Pan 已提交
292
  member_->nranks_ = build_strategy.num_trainers_ * places.size();
293 294 295 296
  if (!member_->use_all_reduce_) {
    PADDLE_ENFORCE(places.size() > 1,
                   "If you set build_strategy.reduce with 'Reduce',"
                   "the number of places must be greater than 1.");
Y
Yancey1989 已提交
297 298
  }

299
  // Step 1. Bcast the bcast_vars to devs.
Y
Yu Yang 已提交
300
  // Create local scopes
301
  if (local_scopes.empty()) {
C
chengduoZH 已提交
302
    member_->own_local_scope_ = true;
Y
Yu Yang 已提交
303 304
    member_->local_scopes_.emplace_back(member_->global_scope_);
    for (size_t i = 1; i < member_->places_.size(); ++i) {
Y
Debug  
Yu Yang 已提交
305
      member_->local_scopes_.emplace_back(&scope->NewScope());
306 307
    }
  } else {
C
chengduoZH 已提交
308
    member_->own_local_scope_ = false;
309 310
    PADDLE_ENFORCE_EQ(member_->places_.size(), local_scopes.size());
    for (size_t i = 0; i < member_->places_.size(); ++i) {
311
      member_->local_scopes_.emplace_back(&local_scopes[i]->NewScope());
312
    }
Y
Yu Yang 已提交
313 314
  }

Q
Qiao Longfei 已提交
315
  std::vector<ir::Graph *> graphs;
Q
Qiao Longfei 已提交
316 317 318
  if (build_strategy.async_mode_) {
    PADDLE_ENFORCE(!member_->use_cuda_,
                   "gpu mode does not support async_mode_ now!");
Q
Qiao Longfei 已提交
319
    graphs.push_back(graph);
D
dongdaxiang 已提交
320
    for (size_t i = 1; i < places.size(); ++i) {
Q
Qiao Longfei 已提交
321 322 323 324
      auto *tmp_graph = new ir::Graph(graph->OriginProgram());
      async_graphs_.emplace_back(tmp_graph);
      graphs.push_back(tmp_graph);
    }
Q
Qiao Longfei 已提交
325
  }
Q
Qiao Longfei 已提交
326

Y
Yancey1989 已提交
327 328 329
  // FIXME(Yancey1989): parallel graph mode get better performance
  // in GPU allreduce distributed training. Need an elegant way to
  // choice the execution strategy.
330 331
  build_strategy.enable_parallel_graph_ =
      EnableParallelGraphExecution(*graph, exec_strategy, build_strategy);
Y
Yancey1989 已提交
332 333 334 335
  if (build_strategy.enable_parallel_graph_)
    VLOG(0) << "The Executor would execute the graph by ParallelGraph "
               "Execution which can get better performance,"
            << "you can force it off by env FLAGS_enable_parallel_graph=0";
Y
Yancey1989 已提交
336

C
chengduoZH 已提交
337
  if (member_->use_cuda_) {
Y
Yu Yang 已提交
338
// Bcast Parameters to all GPUs
P
peizhilin 已提交
339
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
340
    member_->InitNCCLCtxs(scope, build_strategy);
Q
qingqing01 已提交
341

W
Wu Yi 已提交
342 343 344
    // Initialize device context's nccl comm, will be used by normal
    // Operators like sync_batch_norm, and collective ops.
    // NOTE: more than one ParallelExecutor with same place, the nccl comm will
Q
qingqing01 已提交
345
    // be rewrite and there will be some problem.
W
Wu Yi 已提交
346 347 348
    // NOTE: NCCL group-calls and non-group-calls can not use the same
    // NCCL communicator, so for ParallelGraph and Multi-Process mode, re-use
    // same communicators.
Q
qingqing01 已提交
349 350 351 352 353
    for (size_t dev_id = 0; dev_id < member_->places_.size(); ++dev_id) {
      platform::DeviceContextPool &pool =
          platform::DeviceContextPool::Instance();
      auto *dev_ctx = static_cast<platform::CUDADeviceContext *>(
          pool.Get(member_->places_[dev_id]));
354 355 356
      auto &nccl_ctx =
          member_->nccl_ctxs_.DefaultFlatCtx()->at(member_->places_[dev_id]);
      dev_ctx->set_nccl_comm(nccl_ctx.comm());
Q
qingqing01 已提交
357
    }
C
chengduoZH 已提交
358 359
#else
    PADDLE_THROW("Not compiled with CUDA");
Y
Yu Yang 已提交
360
#endif
C
chengduoZH 已提交
361
  }
Y
Yan Xu 已提交
362 363 364 365 366 367 368 369 370 371 372 373 374 375 376
  // broadcast parameters from the 0th device to others:
  auto need_broadcast = [&]() -> bool {
    if (build_strategy.num_trainers_ > 1) {
      // 1. num_tariners would be grater than 1 for nccl distributed training.
      return true;
    } else if (member_->local_scopes_.size() != 1 && local_scopes.empty()) {
      // 2. Only one trainer process, but ParallelExecutor hold multiple
      // devices.
      return true;
    }
    return false;
  };

  if (need_broadcast()) {
    BCastParamsToDevices(bcast_vars, build_strategy.trainer_id_);
Y
Yu Yang 已提交
377
  }
Q
Qiao Longfei 已提交
378
  // Startup Program has been run. All local scopes has correct parameters.
Y
yuyang18 已提交
379

Q
Qiao Longfei 已提交
380 381 382
  // Step 2. Convert main_program to SSA form and dependency graph. Also, insert
  // ncclOp
  std::vector<ir::Graph *> async_graphs(places.size());
P
peizhilin 已提交
383
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
Q
Qiao Longfei 已提交
384
  if (build_strategy.async_mode_) {
Q
Qiao Longfei 已提交
385
    VLOG(3) << "use local async mode";
386 387
    graph = build_strategy.Apply(graph, {member_->places_[0]}, loss_var_name,
                                 {member_->local_scopes_[0]}, 1,
388
                                 member_->use_cuda_, &member_->nccl_ctxs_);
D
dongdaxiang 已提交
389
    for (size_t i = 1; i < member_->places_.size(); ++i) {
390 391 392
      graphs[i] =
          build_strategy.Apply(graphs[i], {member_->places_[i]}, loss_var_name,
                               {member_->local_scopes_[i]}, 1,
393
                               member_->use_cuda_, &member_->nccl_ctxs_);
394
      async_graphs[i] = graphs[i];
Q
Qiao Longfei 已提交
395
    }
Q
Qiao Longfei 已提交
396
  } else {
397 398
    graph = build_strategy.Apply(graph, member_->places_, loss_var_name,
                                 member_->local_scopes_, member_->nranks_,
399
                                 member_->use_cuda_, &member_->nccl_ctxs_);
Q
Qiao Longfei 已提交
400
  }
C
chengduoZH 已提交
401
#else
Q
Qiao Longfei 已提交
402
  if (build_strategy.async_mode_) {
Q
Qiao Longfei 已提交
403
    VLOG(3) << "use local async mode";
404 405 406
    graph = build_strategy.Apply(graph, {member_->places_[0]}, loss_var_name,
                                 {member_->local_scopes_[0]}, 1,
                                 member_->use_cuda_);
407
    for (size_t i = 1; i < member_->places_.size(); ++i) {
408 409
      graphs[i] = build_strategy.Apply(
          graphs[i], {member_->places_[i]}, loss_var_name,
Q
Qiao Longfei 已提交
410
          {member_->local_scopes_[i]}, 1, member_->use_cuda_);
411
      async_graphs[i] = graphs[i];
Q
Qiao Longfei 已提交
412
    }
Q
can run  
Qiao Longfei 已提交
413
  } else {
414 415 416
    graph = build_strategy.Apply(graph, member_->places_, loss_var_name,
                                 member_->local_scopes_, member_->nranks_,
                                 member_->use_cuda_);
Q
can run  
Qiao Longfei 已提交
417
  }
Y
Yu Yang 已提交
418
#endif
419

Y
Yancey1989 已提交
420
  auto max_memory_size = GetEagerDeletionThreshold();
D
dzhwinter 已提交
421 422
  VLOG(10) << "Eager Deletion Threshold "
           << static_cast<float>(max_memory_size) / (1 << 30);
Y
Yancey1989 已提交
423
  if (max_memory_size >= 0) {
424 425
    graph = member_->PrepareGCAndRefCnts(graph,
                                         static_cast<size_t>(max_memory_size));
Y
Yancey1989 已提交
426 427
  }

Q
Qiao Longfei 已提交
428 429
  async_graphs[0] = graph;

430 431
  // Step 3. Create vars in each scope. Passes may also create new vars.
  //         skip control vars and empty vars
Y
Yancey1989 已提交
432
  std::vector<details::VariableInfo> var_infos;
Q
Qiao Longfei 已提交
433 434 435 436 437 438
  for (auto &node : graph->Nodes()) {
    if (node->IsVar() && !node->IsCtrlVar() && node->Var()) {
      var_infos.emplace_back();
      var_infos.back().name_ = node->Var()->Name();
      var_infos.back().type_ = node->Var()->GetType();
      var_infos.back().persistable_ = node->Var()->Persistable();
Y
Yancey1989 已提交
439 440
    }
  }
Y
Yancey1989 已提交
441

W
Wu Yi 已提交
442 443
  // If the loss_var_name is given, the number of graph should be only one.
  if (loss_var_name.size()) {
Q
Qiao Longfei 已提交
444
    size_t graph_num = ir::GraphNum(*graph);
C
chengduo 已提交
445 446 447 448
    if (graph_num > 1) {
      LOG(WARNING)
          << "The number of graph should be only one, "
             "but the current graph has "
Q
Qiao Longfei 已提交
449
          << ir::GraphNum(*graph)
C
chengduo 已提交
450 451 452 453 454
          << " sub_graphs. If you want to see the nodes of the "
             "sub_graphs, you should use 'FLAGS_print_sub_graph_dir' "
             "to specify the output dir. NOTES: if you not do training, "
             "please don't pass loss_var_name.";
    }
W
Wu Yi 已提交
455 456
  }

Q
Qiao Longfei 已提交
457
  if (build_strategy.async_mode_) {
Q
can run  
Qiao Longfei 已提交
458 459
    VLOG(3) << "use AsyncSSAGraphExecutor";
    member_->executor_.reset(new details::AsyncSSAGraphExecutor(
Q
Qiao Longfei 已提交
460
        exec_strategy, member_->local_scopes_, member_->places_, async_graphs));
Q
can run  
Qiao Longfei 已提交
461 462
  } else if (build_strategy.enable_parallel_graph_) {
    VLOG(3) << "use ParallelSSAGraphExecutor";
Y
Yancey1989 已提交
463
#ifdef PADDLE_WITH_CUDA
Y
Yancey1989 已提交
464 465
    // TODO(Yancey1989): Remove passing in the main_program when
    // allreduce_seq_pass doesn't need it as the attr.
Y
Yancey1989 已提交
466
    member_->executor_.reset(new details::ParallelSSAGraphExecutor(
X
Xin Pan 已提交
467
        exec_strategy, member_->local_scopes_, member_->places_, graph));
Y
Yancey1989 已提交
468 469 470 471
#else
    PADDLE_THROW(
        "Paddle should be compiled with CUDA for ParallelGraph Execution.");
#endif
Y
yuyang18 已提交
472
  } else {
Y
Yancey1989 已提交
473
    if (exec_strategy.type_ == ExecutionStrategy::kDefault) {
Q
can run  
Qiao Longfei 已提交
474
      VLOG(3) << "use ThreadedSSAGraphExecutor";
Y
Yancey1989 已提交
475
      member_->executor_.reset(new details::ThreadedSSAGraphExecutor(
X
Xin Pan 已提交
476
          exec_strategy, member_->local_scopes_, member_->places_, graph));
Y
Yancey1989 已提交
477
    } else {
Q
can run  
Qiao Longfei 已提交
478
      VLOG(3) << "use FastThreadedSSAGraphExecutor";
Y
Yancey1989 已提交
479
      member_->executor_.reset(new details::FastThreadedSSAGraphExecutor(
X
Xin Pan 已提交
480
          exec_strategy, member_->local_scopes_, member_->places_, graph));
Y
Yancey1989 已提交
481
    }
C
chengduoZH 已提交
482
  }
Y
yuyang18 已提交
483

Q
can run  
Qiao Longfei 已提交
484
  VLOG(3) << "use ScopeBufferedSSAGraphExecutor";
Q
Qiao Longfei 已提交
485 486 487 488 489
  if (!build_strategy.async_mode_) {
    member_->executor_.reset(new details::ScopeBufferedSSAGraphExecutor(
        exec_strategy, member_->local_scopes_, std::move(var_infos),
        member_->places_, std::move(member_->executor_)));
  }
Y
Yu Yang 已提交
490 491
}

Y
Yancey1989 已提交
492
void ParallelExecutor::BCastParamsToDevices(
Y
Yan Xu 已提交
493
    const std::vector<std::string> &vars, int trainer_id) const {
Q
Qiao Longfei 已提交
494
  VLOG(3) << "BCastParamsToDevices";
X
Xin Pan 已提交
495
  // the initializing bcast, all vars would be bcast from device(0).
496
  for (auto &var : vars) {
X
Xin Pan 已提交
497
    framework::Variable *main_var = member_->local_scopes_[0]->FindVar(var);
J
JiayiFeng 已提交
498
    if (main_var == nullptr || !main_var->IsType<LoDTensor>()) {
499 500 501 502
      continue;
    }

    auto &main_tensor = main_var->Get<LoDTensor>();
503
    if (!main_tensor.IsInitialized()) {
M
minqiyang 已提交
504
      VLOG(3) << "one in var not inited, return!";
505 506
      continue;
    }
507 508
    auto &dims = main_tensor.dims();
    if (paddle::platform::is_gpu_place(main_tensor.place())) {
P
peizhilin 已提交
509
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
510
      std::vector<void *> buffers;
C
chengduo 已提交
511
      buffers.reserve(member_->places_.size());
512 513 514 515 516
      size_t numel = main_tensor.numel();
      ncclDataType_t data_type = platform::ToNCCLDataType(main_tensor.type());
      for (size_t i = 0; i < member_->places_.size(); ++i) {
        auto place = member_->places_[i];
        void *buffer;
517

Y
Yan Xu 已提交
518
        if (i == 0 && trainer_id == 0) {
519 520
          buffer = const_cast<void *>(main_tensor.data<void>());
        } else {
Y
Yu Yang 已提交
521
          auto local_scope = member_->local_scopes_[i];
522
          auto *t = local_scope->Var(var)->GetMutable<LoDTensor>();
Y
Update  
Yu Yang 已提交
523
          t->Resize(dims);
524
          buffer = t->mutable_data(place, main_tensor.type());
Y
Update  
Yu Yang 已提交
525
        }
526
        buffers.push_back(buffer);
527
      }
528

529 530 531
      PADDLE_ENFORCE_EQ(member_->places_.size(), buffers.size(),
                        "variables' buffer size to bcast NOT equal to places");
      {
532
        auto *nccl_ctxs = member_->nccl_ctxs_.DefaultFlatCtx();
533 534
        platform::NCCLGroupGuard guard;
        for (size_t i = 0; i < member_->places_.size(); ++i) {
535
          auto &nccl_ctx = nccl_ctxs->at(member_->places_[i]);
X
Xin Pan 已提交
536 537
          platform::dynload::ncclBcast(buffers[i], numel, data_type, 0,
                                       nccl_ctx.comm_, nccl_ctx.stream());
538
        }
539
        nccl_ctxs->WaitAll();
540
      }
C
chengduoZH 已提交
541 542 543
#else
      PADDLE_THROW("Not compiled with CUDA");
#endif
544 545
    } else {
      platform::CPUPlace cpu;
C
chengduo 已提交
546
      for (size_t i = 1; i < member_->places_.size(); ++i) {
547 548
        auto local_scope = member_->local_scopes_[i];
        auto *t = local_scope->Var(var)->GetMutable<LoDTensor>();
C
chengduo 已提交
549

Q
Qiao Longfei 已提交
550
        auto copy_memory = [&] {
551 552 553
          t->Resize(dims);
          t->mutable_data(cpu, main_tensor.type());
          paddle::framework::TensorCopy(main_tensor, cpu, t);
Q
can run  
Qiao Longfei 已提交
554 555
        };

Q
Qiao Longfei 已提交
556
        auto share_memory = [&] { t->ShareDataWith(main_tensor); };
Q
can run  
Qiao Longfei 已提交
557 558 559 560 561 562 563

        // FIXME(zcd): LR_DECAY_COUNTER should not be shared. This is a hot fix.
        if (member_->build_strategy_.async_mode_) {
          share_memory();
        } else if (member_->use_all_reduce_ || member_->use_cuda_ ||
                   var == "@LR_DECAY_COUNTER@") {
          copy_memory();
564
        } else {
Q
can run  
Qiao Longfei 已提交
565
          share_memory();
566
        }
Y
Yu Yang 已提交
567
      }
Y
Stash  
Yu Yang 已提交
568 569
    }
  }
Y
Yu Yang 已提交
570
}
Y
Yu Yang 已提交
571

Y
Yu Yang 已提交
572 573
void ParallelExecutor::Run(const std::vector<std::string> &fetch_tensors,
                           const std::string &fetched_var_name) {
574
  VLOG(3) << "enter ParallelExecutor Run";
Y
Yu Yang 已提交
575 576 577
#ifdef WITH_GPERFTOOLS
  if (gProfileStarted) {
    ProfilerFlush();
S
sneaxiy 已提交
578 579
  }
#endif
Y
Yu Yang 已提交
580

X
Xin Pan 已提交
581
  platform::RecordBlock b(0);
S
sneaxiy 已提交
582 583
  if (member_->HasGarbageCollectors()) {
    member_->ResetRuntimeReferenceCount(fetch_tensors, fetched_var_name);
S
sneaxiy 已提交
584
  }
585 586

  VLOG(3) << "ParallelExecutor begin to run member_->executor_->Run";
S
sneaxiy 已提交
587 588 589
  auto fetch_data = member_->executor_->Run(fetch_tensors);
  *member_->global_scope_->Var(fetched_var_name)->GetMutable<FeedFetchList>() =
      fetch_data;
Y
Yu Yang 已提交
590
}
Y
Yu Yang 已提交
591

Y
Yu Yang 已提交
592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610
void ParallelExecutor::FeedTensorsIntoLocalScopes(
    const std::vector<std::unordered_map<std::string, LoDTensor>> &tensors) {
  PADDLE_ENFORCE_EQ(member_->local_scopes_.size(), tensors.size());

  for (size_t i = 0; i < tensors.size(); ++i) {
    auto &map = tensors[i];
    auto *scope = member_->local_scopes_[i];
    for (auto &pair : map) {
      auto *trg = scope->Var(pair.first)->GetMutable<LoDTensor>();
      trg->ShareDataWith(pair.second);
      trg->set_lod(pair.second.lod());
    }
  }
}

void ParallelExecutor::FeedAndSplitTensorIntoLocalScopes(
    const std::unordered_map<std::string, LoDTensor> &tensors) {
  for (auto pair : tensors) {
    auto lod_tensors = pair.second.SplitLoDTensor(member_->places_);
611 612 613 614 615
    PADDLE_ENFORCE_EQ(
        member_->places_.size(), lod_tensors.size(),
        "The number of samples of current batch is less than the count of "
        "devices, currently, it is not allowed. (%d vs %d)",
        member_->places_.size(), lod_tensors.size());
X
Xin Pan 已提交
616 617
    for (size_t j = 0; j < member_->places_.size(); ++j) {
      // TODO(panxy0718): Do I need to delete this var?
618
      auto t =
Y
Yu Yang 已提交
619
          member_->local_scopes_[j]->Var(pair.first)->GetMutable<LoDTensor>();
620 621
      t->ShareDataWith(lod_tensors[j]);
      t->set_lod(lod_tensors[j].lod());
X
Xin Pan 已提交
622 623 624 625
    }
  }
}

X
Xin Pan 已提交
626 627 628 629 630 631 632
ParallelExecutor::~ParallelExecutor() {
  for (auto &p : member_->places_) {
    platform::DeviceContextPool::Instance().Get(p)->Wait();
  }
  delete member_;
}

633
bool ParallelExecutor::EnableParallelGraphExecution(
X
Xin Pan 已提交
634
    const ir::Graph &graph, const ExecutionStrategy &exec_strategy,
635
    const BuildStrategy &build_strategy) const {
Y
Yancey1989 已提交
636
  if (!FLAGS_enable_parallel_graph) return false;
637

Y
Yancey1989 已提交
638
  bool enable_parallel_graph = true;
639

X
Xin Pan 已提交
640 641 642 643 644 645 646 647 648 649 650 651 652
  for (ir::Node *node : graph.Nodes()) {
    if (node->IsVar() && node->Var()) {
      // TODO(Yancey1989): support sparse update in ParallelGraph mode.
      if (node->Var()->GetType() == proto::VarType::SELECTED_ROWS) {
        enable_parallel_graph = false;
        break;
      }
    } else if (node->IsOp() && node->Op()) {
      // TODO(Yancey1989): support pserver mode
      if (node->Op()->Type() == "send" || node->Op()->Type() == "recv") {
        enable_parallel_graph = false;
        break;
      }
653 654 655 656 657
    }
  }

  if (!member_->use_all_reduce_ || !member_->use_cuda_)

Y
Yancey1989 已提交
658 659 660
    if (build_strategy.enable_sequential_execution_ ||
        exec_strategy.type_ == ExecutionStrategy::ExecutorType::kExperimental)
      enable_parallel_graph = false;
Y
Yancey1989 已提交
661
  return enable_parallel_graph;
662 663
}

Y
Yu Yang 已提交
664
}  // namespace framework
Y
Yang Yang 已提交
665
}  // namespace paddle
S
sneaxiy 已提交
666

S
sneaxiy 已提交
667
USE_PASS(reference_count_pass);
S
sneaxiy 已提交
668
USE_PASS(eager_deletion_pass);