parallel_executor.cc 13.6 KB
Newer Older
Y
Yang Yang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

#include "paddle/fluid/framework/parallel_executor.h"
C
chengduoZH 已提交
16
#include <string>
17
#include <tuple>
Q
qiaolongfei 已提交
18
#include <vector>
C
chengduo 已提交
19
#include "paddle/fluid/framework/ir/graph_helper.h"
Y
Yu Yang 已提交
20

X
clean  
Xin Pan 已提交
21
#include "paddle/fluid/framework/ir/graph.h"
X
Xin Pan 已提交
22

P
peizhilin 已提交
23
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
Y
Yu Yang 已提交
24
#include "paddle/fluid/platform/nccl_helper.h"
Y
Yu Yang 已提交
25
#endif
Y
Yang Yang 已提交
26

Y
yuyang18 已提交
27
#include "paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.h"
28
#include "paddle/fluid/framework/details/multi_devices_helper.h"
S
sneaxiy 已提交
29
#include "paddle/fluid/framework/details/reference_count_pass_helper.h"
Y
yuyang18 已提交
30
#include "paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.h"
Y
Yu Yang 已提交
31
#include "paddle/fluid/framework/details/threaded_ssa_graph_executor.h"
32
#include "paddle/fluid/platform/profiler.h"
Y
Yu Yang 已提交
33

Y
Yang Yang 已提交
34
namespace paddle {
Y
Yu Yang 已提交
35 36
namespace framework {

Y
Yu Yang 已提交
37 38 39
class ParallelExecutorPrivate {
 public:
  explicit ParallelExecutorPrivate(const std::vector<platform::Place> &places)
Y
Yu Yang 已提交
40
      : places_(places) {}
Y
Yu Yang 已提交
41

42 43 44 45 46 47 48 49 50 51 52
  ~ParallelExecutorPrivate() {
    if (own_local_scope_) {
      for (size_t i = 1; i < local_scopes_.size(); ++i) {
        // Skip the first scope, since it is the global scope.
        Scope *local_scope = local_scopes_[i];
        if (global_scope_->HasKid(local_scope)) {
          global_scope_->DeleteScope(local_scope);
        }
      }
    }
  }
S
sneaxiy 已提交
53 54 55 56 57 58 59 60 61

  void ResetRuntimeReferenceCount() {
    for (size_t i = 0; i < rt_ref_cnts_.size(); ++i) {
      for (auto &pair : rt_ref_cnts_[i]) {
        rt_cur_ref_cnts_[i][pair.first] = pair.second;
      }
    }
  }

Y
Yu Yang 已提交
62 63
  std::vector<platform::Place> places_;
  std::vector<Scope *> local_scopes_;
64
  Scope *global_scope_;  // not owned
Y
Yu Yang 已提交
65
  std::unique_ptr<details::SSAGraphExecutor> executor_;
Y
Yu Yang 已提交
66

P
peizhilin 已提交
67
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
Y
Yu Yang 已提交
68
  std::unique_ptr<platform::NCCLContextMap> nccl_ctxs_;
Y
Yu Yang 已提交
69
#endif
C
chengduoZH 已提交
70 71
  bool own_local_scope_;
  bool use_cuda_;
72
  bool use_all_reduce_;
S
sneaxiy 已提交
73 74 75 76 77 78 79

  // rt_ref_cnts_ is only initialized when ParallelExecutor constructs, and then
  // keeps unchanged
  // Before each iteration, rt_cur_ref_cnts_ is reset to ref_cnts_
  std::vector<details::ReferenceCountMap> rt_ref_cnts_;
  std::vector<details::AtomicReferenceCountMap> rt_cur_ref_cnts_;
  details::GarbageCollectorList gcs_;
Y
Yu Yang 已提交
80 81
};

82 83 84 85
std::vector<Scope *> &ParallelExecutor::GetLocalScopes() {
  return member_->local_scopes_;
}

Y
Yu Yang 已提交
86
ParallelExecutor::ParallelExecutor(
87
    const std::vector<platform::Place> &places,
Y
Yu Yang 已提交
88
    const std::unordered_set<std::string> &params,
89 90
    const std::unordered_set<std::string> &bcast_vars,
    const ProgramDesc &main_program, const std::string &loss_var_name,
Y
yuyang18 已提交
91
    Scope *scope, const std::vector<Scope *> &local_scopes,
92
    const ExecutionStrategy &exec_strategy, const BuildStrategy &build_strategy,
93
    size_t num_trainers, size_t trainer_id)
Y
Yu Yang 已提交
94
    : member_(new ParallelExecutorPrivate(places)) {
Y
Yu Yang 已提交
95
  member_->global_scope_ = scope;
96
  member_->use_cuda_ = exec_strategy.use_cuda_;
97 98 99 100 101 102 103 104
  member_->use_all_reduce_ =
      build_strategy.reduce_ == BuildStrategy::ReduceStrategy::kAllReduce;

  if (!member_->use_all_reduce_) {
    PADDLE_ENFORCE(places.size() > 1,
                   "If you set build_strategy.reduce with 'Reduce',"
                   "the number of places must be greater than 1.");
  }
Y
Yu Yang 已提交
105

106
  // Step 1. Bcast the params to devs.
Y
Yu Yang 已提交
107
  // Create local scopes
108
  if (local_scopes.empty()) {
C
chengduoZH 已提交
109
    member_->own_local_scope_ = true;
Y
Yu Yang 已提交
110 111
    member_->local_scopes_.emplace_back(member_->global_scope_);
    for (size_t i = 1; i < member_->places_.size(); ++i) {
Y
Debug  
Yu Yang 已提交
112
      member_->local_scopes_.emplace_back(&scope->NewScope());
113 114
    }
  } else {
C
chengduoZH 已提交
115
    member_->own_local_scope_ = false;
116 117
    PADDLE_ENFORCE_EQ(member_->places_.size(), local_scopes.size());
    for (size_t i = 0; i < member_->places_.size(); ++i) {
118
      member_->local_scopes_.emplace_back(&local_scopes[i]->NewScope());
119
    }
Y
Yu Yang 已提交
120 121
  }

C
chengduoZH 已提交
122
  if (member_->use_cuda_) {
Y
Yu Yang 已提交
123
// Bcast Parameters to all GPUs
P
peizhilin 已提交
124
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
C
chengduoZH 已提交
125 126 127 128 129 130 131 132 133
    auto *nccl_id_var = scope->FindVar(NCCL_ID_VARNAME);
    ncclUniqueId *nccl_id = nullptr;
    if (nccl_id_var != nullptr) {
      nccl_id = nccl_id_var->GetMutable<ncclUniqueId>();
    }
    member_->nccl_ctxs_.reset(new platform::NCCLContextMap(
        member_->places_, nccl_id, num_trainers, trainer_id));
#else
    PADDLE_THROW("Not compiled with CUDA");
Y
Yu Yang 已提交
134
#endif
C
chengduoZH 已提交
135 136 137
  }

  if (member_->local_scopes_.size() != 1 && local_scopes.empty()) {
Y
Yancey1989 已提交
138
    BCastParamsToDevices(bcast_vars);
Y
Yu Yang 已提交
139
  }
140
// Startup Program has been run. All local scopes has correct parameters.
Y
yuyang18 已提交
141

142
// Step 2. Convert main_program to SSA form and dependency graph. Also, insert
X
Xin Pan 已提交
143
// ncclOp
P
peizhilin 已提交
144
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
145
  std::unique_ptr<ir::Graph> graph = build_strategy.Apply(
X
Xin Pan 已提交
146
      main_program, member_->places_, loss_var_name, params,
147
      member_->local_scopes_, member_->use_cuda_, member_->nccl_ctxs_.get());
S
sneaxiy 已提交
148 149 150 151 152
#else
  std::unique_ptr<ir::Graph> graph =
      build_strategy.Apply(main_program, member_->places_, loss_var_name,
                           params, member_->local_scopes_, member_->use_cuda_);
#endif
S
sneaxiy 已提交
153 154 155

  auto max_memory_size = GetEagerDeletionThreshold();
  if (max_memory_size >= 0) {
S
sneaxiy 已提交
156 157 158 159 160 161 162 163 164 165 166 167 168 169
    size_t place_num = member_->places_.size();
    for (size_t i = 0; i < place_num; ++i) {
      auto &place = member_->places_[i];
#ifdef PADDLE_WITH_CUDA
      if (platform::is_gpu_place(place)) {
        member_->gcs_.emplace_back(new StreamGarbageCollector<Tensor>(
            boost::get<platform::CUDAPlace>(place), max_memory_size));
        VLOG(10) << "Created " << i << "-th GarbageCollector at " << place;
      } else if (platform::is_cpu_place(place)) {
#endif
        member_->gcs_.emplace_back(new CPUGarbageCollector<Tensor>(
            boost::get<platform::CPUPlace>(place), max_memory_size));
        VLOG(10) << "Created " << i << "-th GarbageCollector at " << place;
#ifdef PADDLE_WITH_CUDA
S
sneaxiy 已提交
170
      }
S
sneaxiy 已提交
171
#endif
S
sneaxiy 已提交
172 173
    }
  }
S
sneaxiy 已提交
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197

  if (!member_->gcs_.empty()) {
    std::vector<details::LastLiveOpsOfVars> last_live_ops_of_vars;

    auto ref_cnt_pass =
        ir::PassRegistry::Instance().Get("reference_count_pass");
    ref_cnt_pass->SetNotOwned(details::kGlobalReferenceCount,
                              &(member_->rt_ref_cnts_));
    ref_cnt_pass->SetNotOwned(details::kLastLiveOpsOfVars,
                              &last_live_ops_of_vars);
    VLOG(10) << "ReferenceCountPass Applied";
    graph = ref_cnt_pass->Apply(std::move(graph));

    auto eager_deletion_pass =
        ir::PassRegistry::Instance().Get("eager_deletion_pass");
    eager_deletion_pass->SetNotOwned(details::kCurReferenceCount,
                                     &(member_->rt_cur_ref_cnts_));
    eager_deletion_pass->SetNotOwned(details::kGarbageCollector,
                                     &(member_->gcs_));
    eager_deletion_pass->SetNotOwned(details::kLastLiveOpsOfVars,
                                     &last_live_ops_of_vars);
    graph = eager_deletion_pass->Apply(std::move(graph));
    VLOG(10) << "EagerDeletionPass Applied";
  }
X
Xin Pan 已提交
198

199 200 201 202 203 204 205 206 207 208 209
  // Step 3. Create vars in each scope. Passes may also create new vars.
  //         skip control vars and empty vars
  std::vector<details::VariableInfo> var_infos;
  for (auto &node : graph->Nodes()) {
    if (node->IsVar() && !node->IsCtrlVar() && node->Var()) {
      var_infos.emplace_back();
      var_infos.back().name_ = node->Var()->Name();
      var_infos.back().type_ = node->Var()->GetType();
      var_infos.back().persistable_ = node->Var()->Persistable();
    }
  }
W
Wu Yi 已提交
210 211
  // If the loss_var_name is given, the number of graph should be only one.
  if (loss_var_name.size()) {
C
chengduo 已提交
212 213 214 215 216 217 218 219 220 221 222
    size_t graph_num = ir::GraphNum(*graph);
    if (graph_num > 1) {
      LOG(WARNING)
          << "The number of graph should be only one, "
             "but the current graph has "
          << ir::GraphNum(*graph)
          << " sub_graphs. If you want to see the nodes of the "
             "sub_graphs, you should use 'FLAGS_print_sub_graph_dir' "
             "to specify the output dir. NOTES: if you not do training, "
             "please don't pass loss_var_name.";
    }
W
Wu Yi 已提交
223 224
  }

Y
yuyang18 已提交
225 226 227 228 229 230
  if (exec_strategy.type_ == ExecutionStrategy::kDefault) {
    member_->executor_.reset(new details::ThreadedSSAGraphExecutor(
        exec_strategy, member_->local_scopes_, places, std::move(graph)));
  } else {
    member_->executor_.reset(new details::FastThreadedSSAGraphExecutor(
        exec_strategy, member_->local_scopes_, places, std::move(graph)));
C
chengduoZH 已提交
231
  }
Y
yuyang18 已提交
232 233 234 235

  member_->executor_.reset(new details::ScopeBufferedSSAGraphExecutor(
      exec_strategy, member_->local_scopes_, std::move(var_infos),
      member_->places_, std::move(member_->executor_)));
Y
Yu Yang 已提交
236 237
}

Y
Yancey1989 已提交
238
void ParallelExecutor::BCastParamsToDevices(
239
    const std::unordered_set<std::string> &vars) const {
X
Xin Pan 已提交
240
  // the initializing bcast, all vars would be bcast from device(0).
241
  for (auto &var : vars) {
X
Xin Pan 已提交
242
    framework::Variable *main_var = member_->local_scopes_[0]->FindVar(var);
J
JiayiFeng 已提交
243
    if (main_var == nullptr || !main_var->IsType<LoDTensor>()) {
244 245 246 247
      continue;
    }

    auto &main_tensor = main_var->Get<LoDTensor>();
248
    if (!main_tensor.IsInitialized()) {
M
minqiyang 已提交
249
      VLOG(3) << "one in var not inited, return!";
250 251
      continue;
    }
252 253
    auto &dims = main_tensor.dims();
    if (paddle::platform::is_gpu_place(main_tensor.place())) {
P
peizhilin 已提交
254
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
255
      std::vector<void *> buffers;
256 257 258 259 260
      size_t numel = main_tensor.numel();
      ncclDataType_t data_type = platform::ToNCCLDataType(main_tensor.type());
      for (size_t i = 0; i < member_->places_.size(); ++i) {
        auto place = member_->places_[i];
        void *buffer;
261

X
Xin Pan 已提交
262
        if (i == 0) {
263 264
          buffer = const_cast<void *>(main_tensor.data<void>());
        } else {
Y
Yu Yang 已提交
265
          auto local_scope = member_->local_scopes_[i];
266
          auto *t = local_scope->Var(var)->GetMutable<LoDTensor>();
Y
Update  
Yu Yang 已提交
267
          t->Resize(dims);
268
          buffer = t->mutable_data(place, main_tensor.type());
Y
Update  
Yu Yang 已提交
269
        }
270
        buffers.push_back(buffer);
271
      }
272

273 274 275 276 277 278
      PADDLE_ENFORCE_EQ(member_->places_.size(), buffers.size(),
                        "variables' buffer size to bcast NOT equal to places");
      {
        platform::NCCLGroupGuard guard;
        for (size_t i = 0; i < member_->places_.size(); ++i) {
          auto &nccl_ctx = member_->nccl_ctxs_->at(member_->places_[i]);
X
Xin Pan 已提交
279 280
          platform::dynload::ncclBcast(buffers[i], numel, data_type, 0,
                                       nccl_ctx.comm_, nccl_ctx.stream());
281
        }
282
        member_->nccl_ctxs_->WaitAll();
283
      }
C
chengduoZH 已提交
284 285 286
#else
      PADDLE_THROW("Not compiled with CUDA");
#endif
287 288
    } else {
      platform::CPUPlace cpu;
Y
Yancey1989 已提交
289
      for (size_t i = 0; i < member_->places_.size(); ++i) {
X
Xin Pan 已提交
290
        if (i == 0) continue;
Y
Yancey1989 已提交
291

292 293
        auto local_scope = member_->local_scopes_[i];
        auto *t = local_scope->Var(var)->GetMutable<LoDTensor>();
C
chengduo 已提交
294 295 296 297

        // FIXME(zcd): LR_DECAY_COUNTER should not be shared. This is a hot fix.
        if (member_->use_all_reduce_ || member_->use_cuda_ ||
            var == "@LR_DECAY_COUNTER@") {
298 299 300 301 302 303
          t->Resize(dims);
          t->mutable_data(cpu, main_tensor.type());
          paddle::framework::TensorCopy(main_tensor, cpu, t);
        } else {
          t->ShareDataWith(main_tensor);
        }
Y
Yu Yang 已提交
304
      }
Y
Stash  
Yu Yang 已提交
305 306
    }
  }
Y
Yu Yang 已提交
307
}
Y
Yu Yang 已提交
308

Y
Yu Yang 已提交
309 310
void ParallelExecutor::Run(const std::vector<std::string> &fetch_tensors,
                           const std::string &fetched_var_name) {
X
Xin Pan 已提交
311
  platform::RecordBlock b(0);
S
sneaxiy 已提交
312 313 314 315
  if (!member_->gcs_.empty()) {
    member_->ResetRuntimeReferenceCount();
    size_t n = member_->rt_ref_cnts_.size();
    for (size_t i = 0; i < n; ++i) {
S
sneaxiy 已提交
316
      for (auto &fetch_name : fetch_tensors) {
S
sneaxiy 已提交
317
        member_->rt_cur_ref_cnts_[i].erase(fetch_name);
S
sneaxiy 已提交
318
      }
S
sneaxiy 已提交
319
      member_->rt_cur_ref_cnts_[i].erase(fetched_var_name);
S
sneaxiy 已提交
320
    }
S
sneaxiy 已提交
321
  }
S
sneaxiy 已提交
322 323 324
  auto fetch_data = member_->executor_->Run(fetch_tensors);
  *member_->global_scope_->Var(fetched_var_name)->GetMutable<FeedFetchList>() =
      fetch_data;
Y
Yu Yang 已提交
325
}
Y
Yu Yang 已提交
326

Y
Yu Yang 已提交
327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345
void ParallelExecutor::FeedTensorsIntoLocalScopes(
    const std::vector<std::unordered_map<std::string, LoDTensor>> &tensors) {
  PADDLE_ENFORCE_EQ(member_->local_scopes_.size(), tensors.size());

  for (size_t i = 0; i < tensors.size(); ++i) {
    auto &map = tensors[i];
    auto *scope = member_->local_scopes_[i];
    for (auto &pair : map) {
      auto *trg = scope->Var(pair.first)->GetMutable<LoDTensor>();
      trg->ShareDataWith(pair.second);
      trg->set_lod(pair.second.lod());
    }
  }
}

void ParallelExecutor::FeedAndSplitTensorIntoLocalScopes(
    const std::unordered_map<std::string, LoDTensor> &tensors) {
  for (auto pair : tensors) {
    auto lod_tensors = pair.second.SplitLoDTensor(member_->places_);
346 347 348 349 350
    PADDLE_ENFORCE_EQ(
        member_->places_.size(), lod_tensors.size(),
        "The number of samples of current batch is less than the count of "
        "devices, currently, it is not allowed. (%d vs %d)",
        member_->places_.size(), lod_tensors.size());
X
Xin Pan 已提交
351 352
    for (size_t j = 0; j < member_->places_.size(); ++j) {
      // TODO(panxy0718): Do I need to delete this var?
353
      auto t =
Y
Yu Yang 已提交
354
          member_->local_scopes_[j]->Var(pair.first)->GetMutable<LoDTensor>();
355 356
      t->ShareDataWith(lod_tensors[j]);
      t->set_lod(lod_tensors[j].lod());
X
Xin Pan 已提交
357 358 359 360
    }
  }
}

361
ParallelExecutor::~ParallelExecutor() {
362 363
  for (auto &p : member_->places_) {
    platform::DeviceContextPool::Instance().Get(p)->Wait();
C
chengduozh 已提交
364
  }
S
sneaxiy 已提交
365
  delete member_;
366 367
}

Y
Yu Yang 已提交
368
}  // namespace framework
Y
Yang Yang 已提交
369
}  // namespace paddle
S
sneaxiy 已提交
370

S
sneaxiy 已提交
371
USE_PASS(reference_count_pass);
S
sneaxiy 已提交
372
USE_PASS(eager_deletion_pass);