parallel_executor.cc 13.9 KB
Newer Older
Y
Yang Yang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

#include "paddle/fluid/framework/parallel_executor.h"
C
chengduoZH 已提交
16
#include <string>
17
#include <tuple>
Q
qiaolongfei 已提交
18
#include <vector>
C
chengduo 已提交
19
#include "paddle/fluid/framework/ir/graph_helper.h"
Y
Yu Yang 已提交
20

X
clean  
Xin Pan 已提交
21
#include "paddle/fluid/framework/ir/graph.h"
X
Xin Pan 已提交
22

P
peizhilin 已提交
23
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
Y
Yu Yang 已提交
24
#include "paddle/fluid/platform/nccl_helper.h"
Y
Yu Yang 已提交
25
#endif
Y
Yang Yang 已提交
26

Y
yuyang18 已提交
27
#include "paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.h"
28
#include "paddle/fluid/framework/details/multi_devices_helper.h"
S
sneaxiy 已提交
29
#include "paddle/fluid/framework/details/reference_count_pass_helper.h"
Y
yuyang18 已提交
30
#include "paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.h"
Y
Yu Yang 已提交
31
#include "paddle/fluid/framework/details/threaded_ssa_graph_executor.h"
32
#include "paddle/fluid/platform/profiler.h"
Y
Yu Yang 已提交
33

Y
Yang Yang 已提交
34
namespace paddle {
Y
Yu Yang 已提交
35 36
namespace framework {

Y
Yu Yang 已提交
37 38 39
class ParallelExecutorPrivate {
 public:
  explicit ParallelExecutorPrivate(const std::vector<platform::Place> &places)
Y
Yu Yang 已提交
40
      : places_(places) {}
Y
Yu Yang 已提交
41

42 43 44 45 46 47 48 49 50 51 52
  ~ParallelExecutorPrivate() {
    if (own_local_scope_) {
      for (size_t i = 1; i < local_scopes_.size(); ++i) {
        // Skip the first scope, since it is the global scope.
        Scope *local_scope = local_scopes_[i];
        if (global_scope_->HasKid(local_scope)) {
          global_scope_->DeleteScope(local_scope);
        }
      }
    }
  }
S
sneaxiy 已提交
53 54 55 56 57 58 59 60 61

  void ResetRuntimeReferenceCount() {
    for (size_t i = 0; i < rt_ref_cnts_.size(); ++i) {
      for (auto &pair : rt_ref_cnts_[i]) {
        rt_cur_ref_cnts_[i][pair.first] = pair.second;
      }
    }
  }

Y
Yu Yang 已提交
62 63
  std::vector<platform::Place> places_;
  std::vector<Scope *> local_scopes_;
64
  Scope *global_scope_;  // not owned
Y
Yu Yang 已提交
65
  std::unique_ptr<details::SSAGraphExecutor> executor_;
Y
Yu Yang 已提交
66

P
peizhilin 已提交
67
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
Y
Yu Yang 已提交
68
  std::unique_ptr<platform::NCCLContextMap> nccl_ctxs_;
Y
Yu Yang 已提交
69
#endif
C
chengduoZH 已提交
70 71
  bool own_local_scope_;
  bool use_cuda_;
72
  bool use_all_reduce_;
S
sneaxiy 已提交
73 74 75 76 77 78 79

  // rt_ref_cnts_ is only initialized when ParallelExecutor constructs, and then
  // keeps unchanged
  // Before each iteration, rt_cur_ref_cnts_ is reset to ref_cnts_
  std::vector<details::ReferenceCountMap> rt_ref_cnts_;
  std::vector<details::AtomicReferenceCountMap> rt_cur_ref_cnts_;
  details::GarbageCollectorList gcs_;
Y
Yu Yang 已提交
80 81
};

82 83 84 85
std::vector<Scope *> &ParallelExecutor::GetLocalScopes() {
  return member_->local_scopes_;
}

Y
Yu Yang 已提交
86
ParallelExecutor::ParallelExecutor(
87
    const std::vector<platform::Place> &places,
Y
Yu Yang 已提交
88
    const std::unordered_set<std::string> &params,
89 90
    const std::unordered_set<std::string> &bcast_vars,
    const ProgramDesc &main_program, const std::string &loss_var_name,
Y
yuyang18 已提交
91
    Scope *scope, const std::vector<Scope *> &local_scopes,
92
    const ExecutionStrategy &exec_strategy, const BuildStrategy &build_strategy,
93
    size_t num_trainers, size_t trainer_id)
Y
Yu Yang 已提交
94
    : member_(new ParallelExecutorPrivate(places)) {
Y
Yu Yang 已提交
95
  member_->global_scope_ = scope;
96
  member_->use_cuda_ = exec_strategy.use_cuda_;
97 98 99 100 101 102 103 104
  member_->use_all_reduce_ =
      build_strategy.reduce_ == BuildStrategy::ReduceStrategy::kAllReduce;

  if (!member_->use_all_reduce_) {
    PADDLE_ENFORCE(places.size() > 1,
                   "If you set build_strategy.reduce with 'Reduce',"
                   "the number of places must be greater than 1.");
  }
Y
Yu Yang 已提交
105

106
  // Step 1. Bcast the params to devs.
Y
Yu Yang 已提交
107
  // Create local scopes
108
  if (local_scopes.empty()) {
C
chengduoZH 已提交
109
    member_->own_local_scope_ = true;
Y
Yu Yang 已提交
110 111
    member_->local_scopes_.emplace_back(member_->global_scope_);
    for (size_t i = 1; i < member_->places_.size(); ++i) {
Y
Debug  
Yu Yang 已提交
112
      member_->local_scopes_.emplace_back(&scope->NewScope());
113 114
    }
  } else {
C
chengduoZH 已提交
115
    member_->own_local_scope_ = false;
116 117
    PADDLE_ENFORCE_EQ(member_->places_.size(), local_scopes.size());
    for (size_t i = 0; i < member_->places_.size(); ++i) {
118
      member_->local_scopes_.emplace_back(&local_scopes[i]->NewScope());
119
    }
Y
Yu Yang 已提交
120 121
  }

C
chengduoZH 已提交
122
  if (member_->use_cuda_) {
Y
Yu Yang 已提交
123
// Bcast Parameters to all GPUs
P
peizhilin 已提交
124
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
C
chengduoZH 已提交
125 126 127 128 129 130 131 132 133
    auto *nccl_id_var = scope->FindVar(NCCL_ID_VARNAME);
    ncclUniqueId *nccl_id = nullptr;
    if (nccl_id_var != nullptr) {
      nccl_id = nccl_id_var->GetMutable<ncclUniqueId>();
    }
    member_->nccl_ctxs_.reset(new platform::NCCLContextMap(
        member_->places_, nccl_id, num_trainers, trainer_id));
#else
    PADDLE_THROW("Not compiled with CUDA");
Y
Yu Yang 已提交
134
#endif
C
chengduoZH 已提交
135 136 137
  }

  if (member_->local_scopes_.size() != 1 && local_scopes.empty()) {
Y
Yancey1989 已提交
138
    BCastParamsToDevices(bcast_vars);
Y
Yu Yang 已提交
139
  }
140
// Startup Program has been run. All local scopes has correct parameters.
Y
yuyang18 已提交
141

142
// Step 2. Convert main_program to SSA form and dependency graph. Also, insert
X
Xin Pan 已提交
143
// ncclOp
P
peizhilin 已提交
144
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
145
  std::unique_ptr<ir::Graph> graph = build_strategy.Apply(
X
Xin Pan 已提交
146
      main_program, member_->places_, loss_var_name, params,
147
      member_->local_scopes_, member_->use_cuda_, member_->nccl_ctxs_.get());
S
sneaxiy 已提交
148 149 150 151 152
#else
  std::unique_ptr<ir::Graph> graph =
      build_strategy.Apply(main_program, member_->places_, loss_var_name,
                           params, member_->local_scopes_, member_->use_cuda_);
#endif
S
sneaxiy 已提交
153 154 155

  auto max_memory_size = GetEagerDeletionThreshold();
  if (max_memory_size >= 0) {
S
sneaxiy 已提交
156 157 158 159 160
    size_t place_num = member_->places_.size();
    for (size_t i = 0; i < place_num; ++i) {
      auto &place = member_->places_[i];
#ifdef PADDLE_WITH_CUDA
      if (platform::is_gpu_place(place)) {
S
fix bug  
sneaxiy 已提交
161 162 163 164 165 166 167
        if (IsFastEagerDeletionModeEnabled()) {
          member_->gcs_.emplace_back(new UnsafeFastGPUGarbageCollector<Tensor>(
              boost::get<platform::CUDAPlace>(place), max_memory_size));
        } else {
          member_->gcs_.emplace_back(new StreamGarbageCollector<Tensor>(
              boost::get<platform::CUDAPlace>(place), max_memory_size));
        }
S
sneaxiy 已提交
168 169 170 171 172 173 174
        VLOG(10) << "Created " << i << "-th GarbageCollector at " << place;
      } else if (platform::is_cpu_place(place)) {
#endif
        member_->gcs_.emplace_back(new CPUGarbageCollector<Tensor>(
            boost::get<platform::CPUPlace>(place), max_memory_size));
        VLOG(10) << "Created " << i << "-th GarbageCollector at " << place;
#ifdef PADDLE_WITH_CUDA
S
sneaxiy 已提交
175
      }
S
sneaxiy 已提交
176
#endif
S
sneaxiy 已提交
177 178
    }
  }
S
sneaxiy 已提交
179 180 181 182 183 184 185 186 187 188 189

  if (!member_->gcs_.empty()) {
    std::vector<details::LastLiveOpsOfVars> last_live_ops_of_vars;

    auto ref_cnt_pass =
        ir::PassRegistry::Instance().Get("reference_count_pass");
    ref_cnt_pass->SetNotOwned(details::kGlobalReferenceCount,
                              &(member_->rt_ref_cnts_));
    ref_cnt_pass->SetNotOwned(details::kLastLiveOpsOfVars,
                              &last_live_ops_of_vars);
    graph = ref_cnt_pass->Apply(std::move(graph));
S
fix bug  
sneaxiy 已提交
190
    VLOG(10) << "ReferenceCountPass Applied";
S
sneaxiy 已提交
191 192 193 194 195 196 197 198 199 200 201

    auto eager_deletion_pass =
        ir::PassRegistry::Instance().Get("eager_deletion_pass");
    eager_deletion_pass->SetNotOwned(details::kCurReferenceCount,
                                     &(member_->rt_cur_ref_cnts_));
    eager_deletion_pass->SetNotOwned(details::kGarbageCollector,
                                     &(member_->gcs_));
    eager_deletion_pass->SetNotOwned(details::kLastLiveOpsOfVars,
                                     &last_live_ops_of_vars);
    graph = eager_deletion_pass->Apply(std::move(graph));
    VLOG(10) << "EagerDeletionPass Applied";
S
fix bug  
sneaxiy 已提交
202 203

    graph->SetNotOwned(details::kGarbageCollector, &(member_->gcs_));
S
sneaxiy 已提交
204
  }
X
Xin Pan 已提交
205

206 207 208 209 210 211 212 213 214 215 216
  // Step 3. Create vars in each scope. Passes may also create new vars.
  //         skip control vars and empty vars
  std::vector<details::VariableInfo> var_infos;
  for (auto &node : graph->Nodes()) {
    if (node->IsVar() && !node->IsCtrlVar() && node->Var()) {
      var_infos.emplace_back();
      var_infos.back().name_ = node->Var()->Name();
      var_infos.back().type_ = node->Var()->GetType();
      var_infos.back().persistable_ = node->Var()->Persistable();
    }
  }
W
Wu Yi 已提交
217 218
  // If the loss_var_name is given, the number of graph should be only one.
  if (loss_var_name.size()) {
C
chengduo 已提交
219 220 221 222 223 224 225 226 227 228 229
    size_t graph_num = ir::GraphNum(*graph);
    if (graph_num > 1) {
      LOG(WARNING)
          << "The number of graph should be only one, "
             "but the current graph has "
          << ir::GraphNum(*graph)
          << " sub_graphs. If you want to see the nodes of the "
             "sub_graphs, you should use 'FLAGS_print_sub_graph_dir' "
             "to specify the output dir. NOTES: if you not do training, "
             "please don't pass loss_var_name.";
    }
W
Wu Yi 已提交
230 231
  }

Y
yuyang18 已提交
232 233 234 235 236 237
  if (exec_strategy.type_ == ExecutionStrategy::kDefault) {
    member_->executor_.reset(new details::ThreadedSSAGraphExecutor(
        exec_strategy, member_->local_scopes_, places, std::move(graph)));
  } else {
    member_->executor_.reset(new details::FastThreadedSSAGraphExecutor(
        exec_strategy, member_->local_scopes_, places, std::move(graph)));
C
chengduoZH 已提交
238
  }
Y
yuyang18 已提交
239 240 241 242

  member_->executor_.reset(new details::ScopeBufferedSSAGraphExecutor(
      exec_strategy, member_->local_scopes_, std::move(var_infos),
      member_->places_, std::move(member_->executor_)));
Y
Yu Yang 已提交
243 244
}

Y
Yancey1989 已提交
245
void ParallelExecutor::BCastParamsToDevices(
246
    const std::unordered_set<std::string> &vars) const {
X
Xin Pan 已提交
247
  // the initializing bcast, all vars would be bcast from device(0).
248
  for (auto &var : vars) {
X
Xin Pan 已提交
249
    framework::Variable *main_var = member_->local_scopes_[0]->FindVar(var);
J
JiayiFeng 已提交
250
    if (main_var == nullptr || !main_var->IsType<LoDTensor>()) {
251 252 253 254
      continue;
    }

    auto &main_tensor = main_var->Get<LoDTensor>();
255
    if (!main_tensor.IsInitialized()) {
M
minqiyang 已提交
256
      VLOG(3) << "one in var not inited, return!";
257 258
      continue;
    }
259 260
    auto &dims = main_tensor.dims();
    if (paddle::platform::is_gpu_place(main_tensor.place())) {
P
peizhilin 已提交
261
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
262
      std::vector<void *> buffers;
263 264 265 266 267
      size_t numel = main_tensor.numel();
      ncclDataType_t data_type = platform::ToNCCLDataType(main_tensor.type());
      for (size_t i = 0; i < member_->places_.size(); ++i) {
        auto place = member_->places_[i];
        void *buffer;
268

X
Xin Pan 已提交
269
        if (i == 0) {
270 271
          buffer = const_cast<void *>(main_tensor.data<void>());
        } else {
Y
Yu Yang 已提交
272
          auto local_scope = member_->local_scopes_[i];
273
          auto *t = local_scope->Var(var)->GetMutable<LoDTensor>();
Y
Update  
Yu Yang 已提交
274
          t->Resize(dims);
275
          buffer = t->mutable_data(place, main_tensor.type());
Y
Update  
Yu Yang 已提交
276
        }
277
        buffers.push_back(buffer);
278
      }
279

280 281 282 283 284 285
      PADDLE_ENFORCE_EQ(member_->places_.size(), buffers.size(),
                        "variables' buffer size to bcast NOT equal to places");
      {
        platform::NCCLGroupGuard guard;
        for (size_t i = 0; i < member_->places_.size(); ++i) {
          auto &nccl_ctx = member_->nccl_ctxs_->at(member_->places_[i]);
X
Xin Pan 已提交
286 287
          platform::dynload::ncclBcast(buffers[i], numel, data_type, 0,
                                       nccl_ctx.comm_, nccl_ctx.stream());
288
        }
289
        member_->nccl_ctxs_->WaitAll();
290
      }
C
chengduoZH 已提交
291 292 293
#else
      PADDLE_THROW("Not compiled with CUDA");
#endif
294 295
    } else {
      platform::CPUPlace cpu;
Y
Yancey1989 已提交
296
      for (size_t i = 0; i < member_->places_.size(); ++i) {
X
Xin Pan 已提交
297
        if (i == 0) continue;
Y
Yancey1989 已提交
298

299 300
        auto local_scope = member_->local_scopes_[i];
        auto *t = local_scope->Var(var)->GetMutable<LoDTensor>();
C
chengduo 已提交
301 302 303 304

        // FIXME(zcd): LR_DECAY_COUNTER should not be shared. This is a hot fix.
        if (member_->use_all_reduce_ || member_->use_cuda_ ||
            var == "@LR_DECAY_COUNTER@") {
305 306 307 308 309 310
          t->Resize(dims);
          t->mutable_data(cpu, main_tensor.type());
          paddle::framework::TensorCopy(main_tensor, cpu, t);
        } else {
          t->ShareDataWith(main_tensor);
        }
Y
Yu Yang 已提交
311
      }
Y
Stash  
Yu Yang 已提交
312 313
    }
  }
Y
Yu Yang 已提交
314
}
Y
Yu Yang 已提交
315

Y
Yu Yang 已提交
316 317
void ParallelExecutor::Run(const std::vector<std::string> &fetch_tensors,
                           const std::string &fetched_var_name) {
X
Xin Pan 已提交
318
  platform::RecordBlock b(0);
S
sneaxiy 已提交
319 320 321 322
  if (!member_->gcs_.empty()) {
    member_->ResetRuntimeReferenceCount();
    size_t n = member_->rt_ref_cnts_.size();
    for (size_t i = 0; i < n; ++i) {
S
sneaxiy 已提交
323
      for (auto &fetch_name : fetch_tensors) {
S
sneaxiy 已提交
324
        member_->rt_cur_ref_cnts_[i].erase(fetch_name);
S
sneaxiy 已提交
325
      }
S
sneaxiy 已提交
326
      member_->rt_cur_ref_cnts_[i].erase(fetched_var_name);
S
sneaxiy 已提交
327
    }
S
sneaxiy 已提交
328
  }
S
sneaxiy 已提交
329 330 331
  auto fetch_data = member_->executor_->Run(fetch_tensors);
  *member_->global_scope_->Var(fetched_var_name)->GetMutable<FeedFetchList>() =
      fetch_data;
Y
Yu Yang 已提交
332
}
Y
Yu Yang 已提交
333

Y
Yu Yang 已提交
334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352
void ParallelExecutor::FeedTensorsIntoLocalScopes(
    const std::vector<std::unordered_map<std::string, LoDTensor>> &tensors) {
  PADDLE_ENFORCE_EQ(member_->local_scopes_.size(), tensors.size());

  for (size_t i = 0; i < tensors.size(); ++i) {
    auto &map = tensors[i];
    auto *scope = member_->local_scopes_[i];
    for (auto &pair : map) {
      auto *trg = scope->Var(pair.first)->GetMutable<LoDTensor>();
      trg->ShareDataWith(pair.second);
      trg->set_lod(pair.second.lod());
    }
  }
}

void ParallelExecutor::FeedAndSplitTensorIntoLocalScopes(
    const std::unordered_map<std::string, LoDTensor> &tensors) {
  for (auto pair : tensors) {
    auto lod_tensors = pair.second.SplitLoDTensor(member_->places_);
353 354 355 356 357
    PADDLE_ENFORCE_EQ(
        member_->places_.size(), lod_tensors.size(),
        "The number of samples of current batch is less than the count of "
        "devices, currently, it is not allowed. (%d vs %d)",
        member_->places_.size(), lod_tensors.size());
X
Xin Pan 已提交
358 359
    for (size_t j = 0; j < member_->places_.size(); ++j) {
      // TODO(panxy0718): Do I need to delete this var?
360
      auto t =
Y
Yu Yang 已提交
361
          member_->local_scopes_[j]->Var(pair.first)->GetMutable<LoDTensor>();
362 363
      t->ShareDataWith(lod_tensors[j]);
      t->set_lod(lod_tensors[j].lod());
X
Xin Pan 已提交
364 365 366 367
    }
  }
}

368
ParallelExecutor::~ParallelExecutor() {
369 370
  for (auto &p : member_->places_) {
    platform::DeviceContextPool::Instance().Get(p)->Wait();
C
chengduozh 已提交
371
  }
S
sneaxiy 已提交
372
  delete member_;
373 374
}

Y
Yu Yang 已提交
375
}  // namespace framework
Y
Yang Yang 已提交
376
}  // namespace paddle
S
sneaxiy 已提交
377

S
sneaxiy 已提交
378
USE_PASS(reference_count_pass);
S
sneaxiy 已提交
379
USE_PASS(eager_deletion_pass);