parallel_executor.cc 6.6 KB
Newer Older
Y
Yang Yang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

#include "paddle/fluid/framework/parallel_executor.h"
Q
qiaolongfei 已提交
16

C
chengduoZH 已提交
17
#include <string>
Q
qiaolongfei 已提交
18
#include <vector>
Y
Yu Yang 已提交
19

Y
Yu Yang 已提交
20
#ifdef PADDLE_WITH_CUDA
Y
Yu Yang 已提交
21
#include "paddle/fluid/platform/nccl_helper.h"
Y
Yu Yang 已提交
22
#endif
Y
Yang Yang 已提交
23

Y
Yu Yang 已提交
24 25
#include "paddle/fluid/framework/details/multi_devices_graph_builder.h"
#include "paddle/fluid/framework/details/threaded_ssa_graph_executor.h"
26
#include "paddle/fluid/platform/profiler.h"
Y
Yu Yang 已提交
27

Y
Yang Yang 已提交
28
namespace paddle {
Y
Yu Yang 已提交
29 30
namespace framework {

Y
Yu Yang 已提交
31 32 33
class ParallelExecutorPrivate {
 public:
  explicit ParallelExecutorPrivate(const std::vector<platform::Place> &places)
Y
Yu Yang 已提交
34
      : places_(places) {}
Y
Yu Yang 已提交
35 36 37 38

  std::vector<platform::Place> places_;
  std::vector<Scope *> local_scopes_;
  Scope *global_scope_;
Y
Yu Yang 已提交
39
  std::unique_ptr<details::SSAGraphExecutor> executor_;
Y
Yu Yang 已提交
40

Y
Yu Yang 已提交
41
#ifdef PADDLE_WITH_CUDA
Y
Yu Yang 已提交
42
  std::unique_ptr<platform::NCCLContextMap> nccl_ctxs_;
Y
Yu Yang 已提交
43
#endif
Y
Yu Yang 已提交
44 45
};

46 47 48 49
std::vector<Scope *> &ParallelExecutor::GetLocalScopes() {
  return member_->local_scopes_;
}

Y
Yu Yang 已提交
50
ParallelExecutor::ParallelExecutor(
51 52
    size_t num_threads, bool use_event,
    const std::vector<platform::Place> &places,
Y
Yu Yang 已提交
53
    const std::unordered_set<std::string> &params,
54 55 56
    const std::unordered_set<std::string> &bcast_vars,
    const ProgramDesc &main_program, const std::string &loss_var_name,
    Scope *scope, const std::vector<Scope *> &local_scopes, bool allow_op_delay)
Y
Yu Yang 已提交
57
    : member_(new ParallelExecutorPrivate(places)) {
Y
Yu Yang 已提交
58
  member_->global_scope_ = scope;
Y
Yu Yang 已提交
59

60
  // Step 1. Bcast the params to devs.
Y
Yu Yang 已提交
61
  // Create local scopes
62 63 64 65 66 67 68 69 70
  if (local_scopes.empty()) {
    for (size_t i = 0; i < member_->places_.size(); ++i) {
      member_->local_scopes_.push_back(&scope->NewScope());
    }
  } else {
    PADDLE_ENFORCE_EQ(member_->places_.size(), local_scopes.size());
    for (size_t i = 0; i < member_->places_.size(); ++i) {
      member_->local_scopes_.push_back(local_scopes[i]);
    }
Y
Yu Yang 已提交
71 72
  }

Y
Yu Yang 已提交
73 74 75 76
// Bcast Parameters to all GPUs
#ifdef PADDLE_WITH_CUDA
  member_->nccl_ctxs_.reset(new platform::NCCLContextMap(member_->places_));
#endif
77 78 79
  if (platform::is_gpu_place(places[0]) && member_->local_scopes_.size() != 1 &&
      local_scopes.empty()) {  // Is CUDA
    BCastParamsToGPUs(bcast_vars);
Y
Yu Yang 已提交
80
  }
Y
Yu Yang 已提交
81
// Startup Program has been run. All local scopes has correct parameters.
Y
Yu Yang 已提交
82

Y
Yu Yang 已提交
83 84 85
// Step 2. Convert main_program to SSA form and dependency graph. Also, insert
// ncclOp
#ifdef PADDLE_WITH_CUDA
Y
Yu Yang 已提交
86 87 88
  details::MultiDevSSAGraphBuilder builder(member_->places_, loss_var_name,
                                           params, member_->local_scopes_,
                                           member_->nccl_ctxs_.get());
Y
Yu Yang 已提交
89 90 91 92
#else
  details::MultiDevSSAGraphBuilder builder(member_->places_, loss_var_name,
                                           params, member_->local_scopes_);
#endif
Y
Yu Yang 已提交
93
  auto graph = builder.Build(main_program);
Y
Yu Yang 已提交
94

Y
Yu Yang 已提交
95
  member_->executor_.reset(new details::ThreadedSSAGraphExecutor(
X
Xin Pan 已提交
96 97
      num_threads, use_event, member_->local_scopes_, places, std::move(graph),
      allow_op_delay));
Y
Yu Yang 已提交
98

Y
Yu Yang 已提交
99
  // Step 3. Create vars in each scope;
Y
Yu Yang 已提交
100
  for (auto *scope : member_->local_scopes_) {
Y
Yu Yang 已提交
101 102 103 104 105 106 107 108
    for (auto *var : main_program.Block(0).AllVars()) {
      if (scope->FindVar(var->Name()) != nullptr) {
        continue;
      }

      InitializeVariable(scope->Var(var->Name()), var->GetType());
    }
  }
Y
Yu Yang 已提交
109 110 111
}

void ParallelExecutor::BCastParamsToGPUs(
112
    const std::unordered_set<std::string> &vars) const {
Y
Yu Yang 已提交
113
#ifdef PADDLE_WITH_CUDA
Y
Yu Yang 已提交
114
  auto *main_scope = member_->local_scopes_[0];
Y
Yu Yang 已提交
115

116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
  for (auto &var : vars) {
    auto *main_var = main_scope->FindVar(var);
    if (!main_var->IsType<LoDTensor>()) {
      continue;
    }

    auto &main_tensor = main_var->Get<LoDTensor>();

    auto &dims = main_tensor.dims();

    if (paddle::platform::is_gpu_place(main_tensor.place())) {
      size_t numel = main_tensor.numel();
      ncclDataType_t data_type = platform::ToNCCLDataType(main_tensor.type());
      platform::NCCLGroupGuard guard;
      for (size_t i = 0; i < member_->places_.size(); ++i) {
        auto place = member_->places_[i];
        void *buffer;
        if (i == 0) {
          buffer = const_cast<void *>(main_tensor.data<void>());
        } else {
Y
Yu Yang 已提交
136
          auto local_scope = member_->local_scopes_[i];
137
          auto *t = local_scope->Var(var)->GetMutable<LoDTensor>();
Y
Update  
Yu Yang 已提交
138
          t->Resize(dims);
139
          buffer = t->mutable_data(place, main_tensor.type());
Y
Update  
Yu Yang 已提交
140
        }
141 142 143 144 145 146 147 148 149 150 151 152
        auto &nccl_ctx = member_->nccl_ctxs_->at(place);
        platform::dynload::ncclBcast(buffer, numel, data_type, 0,
                                     nccl_ctx.comm_, nccl_ctx.stream());
      }
    } else {
      platform::CPUPlace cpu;
      for (size_t i = 1; i < member_->places_.size(); ++i) {
        auto local_scope = member_->local_scopes_[i];
        auto *t = local_scope->Var(var)->GetMutable<LoDTensor>();
        t->Resize(dims);
        t->mutable_data(cpu, main_tensor.type());
        paddle::framework::TensorCopy(main_tensor, cpu, t);
Y
Yu Yang 已提交
153
      }
Y
Stash  
Yu Yang 已提交
154
    }
Y
Yu Yang 已提交
155
    member_->nccl_ctxs_->WaitAll();
Y
Stash  
Yu Yang 已提交
156
  }
Y
Yu Yang 已提交
157 158 159 160
#else
  PADDLE_THROW("Not compiled with CUDA");
#endif
}
Y
Yu Yang 已提交
161

X
Xin Pan 已提交
162 163 164 165
void ParallelExecutor::Run(
    const std::vector<std::string> &fetch_tensors,
    const std::string &fetched_var_name,
    const std::unordered_map<std::string, LoDTensor> &feed_tensors) {
X
Xin Pan 已提交
166
  platform::RecordBlock b(0);
X
Xin Pan 已提交
167
  SplitTensorToPlaces(feed_tensors);
Y
Yu Yang 已提交
168 169 170
  auto fetch_data = member_->executor_->Run(fetch_tensors);
  *member_->global_scope_->Var(fetched_var_name)->GetMutable<FeedFetchList>() =
      fetch_data;
Y
Yu Yang 已提交
171
}
Y
Yu Yang 已提交
172

X
Xin Pan 已提交
173 174 175 176
void ParallelExecutor::SplitTensorToPlaces(
    const std::unordered_map<std::string, LoDTensor> &feed_tensors) {
  for (auto it : feed_tensors) {
    auto lod_tensors = it.second.SplitLoDTensor(member_->places_);
177 178 179 180 181
    PADDLE_ENFORCE_EQ(
        member_->places_.size(), lod_tensors.size(),
        "The number of samples of current batch is less than the count of "
        "devices, currently, it is not allowed. (%d vs %d)",
        member_->places_.size(), lod_tensors.size());
X
Xin Pan 已提交
182 183
    for (size_t j = 0; j < member_->places_.size(); ++j) {
      // TODO(panxy0718): Do I need to delete this var?
184 185 186 187
      auto t =
          member_->local_scopes_[j]->Var(it.first)->GetMutable<LoDTensor>();
      t->ShareDataWith(lod_tensors[j]);
      t->set_lod(lod_tensors[j].lod());
X
Xin Pan 已提交
188 189 190 191
    }
  }
}

Y
Yu Yang 已提交
192
}  // namespace framework
Y
Yang Yang 已提交
193
}  // namespace paddle