parallel_executor.cc 13.3 KB
Newer Older
Y
Yang Yang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

#include "paddle/fluid/framework/parallel_executor.h"
Q
qiaolongfei 已提交
16

C
chengduoZH 已提交
17
#include <string>
18
#include <tuple>
Q
qiaolongfei 已提交
19
#include <vector>
Y
Yu Yang 已提交
20

X
clean  
Xin Pan 已提交
21
#include "paddle/fluid/framework/ir/graph.h"
X
Xin Pan 已提交
22
#include "paddle/fluid/framework/ir/graph_viz_pass.h"
X
Xin Pan 已提交
23

Y
Yu Yang 已提交
24
#ifdef PADDLE_WITH_CUDA
Y
Yu Yang 已提交
25
#include "paddle/fluid/platform/nccl_helper.h"
Y
Yu Yang 已提交
26
#endif
Y
Yang Yang 已提交
27

Y
yuyang18 已提交
28
#include "paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.h"
X
Xin Pan 已提交
29 30
#include "paddle/fluid/framework/details/ssa_graph_checker.h"
#include "paddle/fluid/framework/details/ssa_graph_printer.h"
Y
Yu Yang 已提交
31
#include "paddle/fluid/framework/details/threaded_ssa_graph_executor.h"
32
#include "paddle/fluid/platform/profiler.h"
Y
Yu Yang 已提交
33

Y
Yang Yang 已提交
34
namespace paddle {
Y
Yu Yang 已提交
35 36
namespace framework {

X
Xin Pan 已提交
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
std::unique_ptr<ir::Graph> ApplyParallelExecutorPass(
    const ProgramDesc &main_program, const std::vector<platform::Place> &places,
    const std::string &loss_var_name,
    const std::unordered_set<std::string> &param_names,
    const std::vector<Scope *> &local_scopes, const bool use_cuda,
#ifdef PADDLE_WITH_CUDA
    const BuildStrategy &strategy, platform::NCCLContextMap *nccl_ctxs) {
#else
    const BuildStrategy &strategy) {
#endif
  std::unique_ptr<ir::Graph> graph(new ir::Graph(main_program));
  if (!strategy.debug_graphviz_path_.empty()) {
    auto viz_pass = ir::PassRegistry::Instance().Get("graph_viz_pass");
    const std::string graph_path = string::Sprintf(
        "%s%s", strategy.debug_graphviz_path_.c_str(), "_original_graph");
    viz_pass->Set<std::string>("graph_viz_path", new std::string(graph_path));
    graph = viz_pass->Apply(std::move(graph));
  }

X
Xin Pan 已提交
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
  auto multi_device_pass =
      ir::PassRegistry::Instance().Get("multi_device_pass");
  multi_device_pass->SetNotOwned<const std::vector<platform::Place>>("places",
                                                                     &places);
  multi_device_pass->SetNotOwned<const std::string>("loss_var_name",
                                                    &loss_var_name);
  multi_device_pass->SetNotOwned<const std::unordered_set<std::string>>(
      "params", &param_names);
  multi_device_pass->SetNotOwned<const std::vector<Scope *>>("local_scopes",
                                                             &local_scopes);
  multi_device_pass->SetNotOwned<const BuildStrategy>("strategy", &strategy);

#ifdef PADDLE_WITH_CUDA
  platform::NCCLContextMap *nctx = use_cuda ? nccl_ctxs : nullptr;
  multi_device_pass->SetNotOwned<platform::NCCLContextMap>("nccl_ctxs", nctx);
#endif
  graph = multi_device_pass->Apply(std::move(graph));

  if (!strategy.debug_graphviz_path_.empty()) {
    auto multi_device_print_pass =
        ir::PassRegistry::Instance().Get("multi_device_print_pass");
    multi_device_print_pass->SetNotOwned<const std::string>(
        "debug_graphviz_path", &strategy.debug_graphviz_path_);
    multi_device_print_pass->Set<details::GraphvizSSAGraphPrinter>(
        "graph_printer", new details::GraphvizSSAGraphPrinter);
    graph = multi_device_print_pass->Apply(std::move(graph));
  }

  auto multi_device_check_pass =
      ir::PassRegistry::Instance().Get("multi_device_check_pass");
  graph = multi_device_check_pass->Apply(std::move(graph));
X
Xin Pan 已提交
87 88 89 90 91 92 93 94 95 96 97

  if (!strategy.debug_graphviz_path_.empty()) {
    auto viz_pass = ir::PassRegistry::Instance().Get("graph_viz_pass");
    const std::string graph_path = string::Sprintf(
        "%s%s", strategy.debug_graphviz_path_.c_str(), "_before_exec");
    viz_pass->Set<std::string>("graph_viz_path", new std::string(graph_path));
    graph = viz_pass->Apply(std::move(graph));
  }
  return graph;
}

Y
Yu Yang 已提交
98 99 100
class ParallelExecutorPrivate {
 public:
  explicit ParallelExecutorPrivate(const std::vector<platform::Place> &places)
Y
Yu Yang 已提交
101
      : places_(places) {}
Y
Yu Yang 已提交
102 103 104 105

  std::vector<platform::Place> places_;
  std::vector<Scope *> local_scopes_;
  Scope *global_scope_;
Y
Yu Yang 已提交
106
  std::unique_ptr<details::SSAGraphExecutor> executor_;
Y
Yu Yang 已提交
107

Y
Yu Yang 已提交
108
#ifdef PADDLE_WITH_CUDA
Y
Yu Yang 已提交
109
  std::unique_ptr<platform::NCCLContextMap> nccl_ctxs_;
Y
Yu Yang 已提交
110
#endif
C
chengduoZH 已提交
111 112
  bool own_local_scope_;
  bool use_cuda_;
113
  bool use_all_reduce_;
Y
Yu Yang 已提交
114 115
};

116 117 118 119
std::vector<Scope *> &ParallelExecutor::GetLocalScopes() {
  return member_->local_scopes_;
}

Y
Yu Yang 已提交
120
ParallelExecutor::ParallelExecutor(
121
    const std::vector<platform::Place> &places,
Y
Yu Yang 已提交
122
    const std::unordered_set<std::string> &params,
123 124
    const std::unordered_set<std::string> &bcast_vars,
    const ProgramDesc &main_program, const std::string &loss_var_name,
Y
yuyang18 已提交
125
    Scope *scope, const std::vector<Scope *> &local_scopes,
126
    const ExecutionStrategy &exec_strategy, const BuildStrategy &build_strategy,
127
    size_t num_trainers, size_t trainer_id)
Y
Yu Yang 已提交
128
    : member_(new ParallelExecutorPrivate(places)) {
Y
Yu Yang 已提交
129
  member_->global_scope_ = scope;
130
  member_->use_cuda_ = exec_strategy.use_cuda_;
131 132 133 134 135 136 137 138
  member_->use_all_reduce_ =
      build_strategy.reduce_ == BuildStrategy::ReduceStrategy::kAllReduce;

  if (!member_->use_all_reduce_) {
    PADDLE_ENFORCE(places.size() > 1,
                   "If you set build_strategy.reduce with 'Reduce',"
                   "the number of places must be greater than 1.");
  }
Y
Yu Yang 已提交
139

140
  // Step 1. Bcast the params to devs.
Y
Yu Yang 已提交
141
  // Create local scopes
142
  if (local_scopes.empty()) {
C
chengduoZH 已提交
143
    member_->own_local_scope_ = true;
Y
Yu Yang 已提交
144 145
    member_->local_scopes_.emplace_back(member_->global_scope_);
    for (size_t i = 1; i < member_->places_.size(); ++i) {
Y
Debug  
Yu Yang 已提交
146
      member_->local_scopes_.emplace_back(&scope->NewScope());
147 148
    }
  } else {
C
chengduoZH 已提交
149
    member_->own_local_scope_ = false;
150 151
    PADDLE_ENFORCE_EQ(member_->places_.size(), local_scopes.size());
    for (size_t i = 0; i < member_->places_.size(); ++i) {
152
      member_->local_scopes_.emplace_back(&local_scopes[i]->NewScope());
153
    }
Y
Yu Yang 已提交
154 155
  }

C
chengduoZH 已提交
156
  if (member_->use_cuda_) {
Y
Yu Yang 已提交
157 158
// Bcast Parameters to all GPUs
#ifdef PADDLE_WITH_CUDA
C
chengduoZH 已提交
159 160 161 162 163 164 165 166 167
    auto *nccl_id_var = scope->FindVar(NCCL_ID_VARNAME);
    ncclUniqueId *nccl_id = nullptr;
    if (nccl_id_var != nullptr) {
      nccl_id = nccl_id_var->GetMutable<ncclUniqueId>();
    }
    member_->nccl_ctxs_.reset(new platform::NCCLContextMap(
        member_->places_, nccl_id, num_trainers, trainer_id));
#else
    PADDLE_THROW("Not compiled with CUDA");
Y
Yu Yang 已提交
168
#endif
C
chengduoZH 已提交
169 170 171
  }

  if (member_->local_scopes_.size() != 1 && local_scopes.empty()) {
Y
Yancey1989 已提交
172
    BCastParamsToDevices(bcast_vars);
Y
Yu Yang 已提交
173
  }
Y
yuyang18 已提交
174 175 176 177 178 179 180 181 182 183
  // Startup Program has been run. All local scopes has correct parameters.

  // Step 2. Create vars in each scope;
  std::vector<details::VariableInfo> var_infos;
  for (auto *var : main_program.Block(0).AllVars()) {
    var_infos.emplace_back();
    var_infos.back().name_ = var->Name();
    var_infos.back().type_ = var->GetType();
    var_infos.back().persistable_ = var->Persistable();
  }
Y
Yu Yang 已提交
184

X
Xin Pan 已提交
185 186
// Step 3. Convert main_program to SSA form and dependency graph. Also, insert
// ncclOp
Y
yuyang18 已提交
187
#ifdef PADDLE_WITH_CUDA
X
Xin Pan 已提交
188 189 190 191
  std::unique_ptr<ir::Graph> graph = ApplyParallelExecutorPass(
      main_program, member_->places_, loss_var_name, params,
      member_->local_scopes_, member_->use_cuda_, build_strategy,
      member_->nccl_ctxs_.get());
C
chengduoZH 已提交
192
#else
X
Xin Pan 已提交
193 194 195
  std::unique_ptr<ir::Graph> graph = ApplyParallelExecutorPass(
      main_program, member_->places_, loss_var_name, params,
      member_->local_scopes_, member_->use_cuda_, build_strategy);
Y
Yu Yang 已提交
196
#endif
X
Xin Pan 已提交
197

Y
Yu Yang 已提交
198
  member_->executor_.reset(new details::ThreadedSSAGraphExecutor(
X
Xin Pan 已提交
199
      exec_strategy, member_->local_scopes_, places, std::move(graph)));
Y
yuyang18 已提交
200 201 202
  member_->executor_.reset(new details::ScopeBufferedSSAGraphExecutor(
      exec_strategy, member_->local_scopes_, std::move(var_infos),
      member_->places_, std::move(member_->executor_)));
Y
Yu Yang 已提交
203 204
}

Y
Yancey1989 已提交
205
void ParallelExecutor::BCastParamsToDevices(
206
    const std::unordered_set<std::string> &vars) const {
207
  // the initializing bcast, all vars would be bcast from device(0),
Y
yi.wu 已提交
208
  // otherwise
209
  // bcast from the specified device.
X
Xin Pan 已提交
210
  bool initializing = member_->executor_ ? false : true;
211
  for (auto &var : vars) {
X
Xin Pan 已提交
212 213 214 215 216 217 218 219 220 221
    int var_dev_id = -1;
    if (member_->executor_) {
      auto &sharded_var_device =
          member_->executor_->Graph().Get<details::ShardedVarDevice>(
              "sharded_var_device");
      if (sharded_var_device.find(var) != sharded_var_device.end()) {
        var_dev_id = sharded_var_device.at(var);
      }
    }

Y
yi.wu 已提交
222
    if (!initializing && var_dev_id == -1) continue;
223 224

    framework::Variable *main_var = nullptr;
Y
yi.wu 已提交
225
    if (initializing) {
226 227 228 229 230
      main_var = member_->local_scopes_[0]->FindVar(var);
    } else {
      main_var = member_->local_scopes_[var_dev_id]->FindVar(var);
    }

J
JiayiFeng 已提交
231
    if (main_var == nullptr || !main_var->IsType<LoDTensor>()) {
232 233 234 235 236 237
      continue;
    }

    auto &main_tensor = main_var->Get<LoDTensor>();
    auto &dims = main_tensor.dims();
    if (paddle::platform::is_gpu_place(main_tensor.place())) {
C
chengduoZH 已提交
238
#ifdef PADDLE_WITH_CUDA
239
      std::vector<void *> buffers;
240 241 242 243 244
      size_t numel = main_tensor.numel();
      ncclDataType_t data_type = platform::ToNCCLDataType(main_tensor.type());
      for (size_t i = 0; i < member_->places_.size(); ++i) {
        auto place = member_->places_[i];
        void *buffer;
245

Y
yi.wu 已提交
246
        if ((initializing && i == 0) ||
Y
update  
yi.wu 已提交
247
            (!initializing && static_cast<int>(i) == var_dev_id)) {
248 249
          buffer = const_cast<void *>(main_tensor.data<void>());
        } else {
Y
Yu Yang 已提交
250
          auto local_scope = member_->local_scopes_[i];
251
          auto *t = local_scope->Var(var)->GetMutable<LoDTensor>();
Y
Update  
Yu Yang 已提交
252
          t->Resize(dims);
253
          buffer = t->mutable_data(place, main_tensor.type());
Y
Update  
Yu Yang 已提交
254
        }
255
        buffers.push_back(buffer);
256
      }
257

258 259 260 261 262 263
      PADDLE_ENFORCE_EQ(member_->places_.size(), buffers.size(),
                        "variables' buffer size to bcast NOT equal to places");
      {
        platform::NCCLGroupGuard guard;
        for (size_t i = 0; i < member_->places_.size(); ++i) {
          auto &nccl_ctx = member_->nccl_ctxs_->at(member_->places_[i]);
Y
yi.wu 已提交
264 265 266 267
          if (initializing) {
            platform::dynload::ncclBcast(buffers[i], numel, data_type, 0,
                                         nccl_ctx.comm_, nccl_ctx.stream());
          } else {
Y
update  
yi.wu 已提交
268
            if (var_dev_id >= 0) {
Y
yi.wu 已提交
269 270 271 272 273
              platform::dynload::ncclBcast(buffers[i], numel, data_type,
                                           var_dev_id, nccl_ctx.comm_,
                                           nccl_ctx.stream());
            }
          }
274
        }
275
        member_->nccl_ctxs_->WaitAll();
276
      }
277

C
chengduoZH 已提交
278 279 280
#else
      PADDLE_THROW("Not compiled with CUDA");
#endif
281 282
    } else {
      platform::CPUPlace cpu;
Y
Yancey1989 已提交
283 284 285 286 287
      for (size_t i = 0; i < member_->places_.size(); ++i) {
        if ((initializing && i == 0) ||
            (!initializing && static_cast<int>(i) == var_dev_id))
          continue;

288 289
        auto local_scope = member_->local_scopes_[i];
        auto *t = local_scope->Var(var)->GetMutable<LoDTensor>();
C
chengduo 已提交
290 291 292 293

        // FIXME(zcd): LR_DECAY_COUNTER should not be shared. This is a hot fix.
        if (member_->use_all_reduce_ || member_->use_cuda_ ||
            var == "@LR_DECAY_COUNTER@") {
294 295 296 297 298 299
          t->Resize(dims);
          t->mutable_data(cpu, main_tensor.type());
          paddle::framework::TensorCopy(main_tensor, cpu, t);
        } else {
          t->ShareDataWith(main_tensor);
        }
Y
Yu Yang 已提交
300
      }
Y
Stash  
Yu Yang 已提交
301 302
    }
  }
Y
Yu Yang 已提交
303
}
Y
Yu Yang 已提交
304

Y
Yu Yang 已提交
305 306
void ParallelExecutor::Run(const std::vector<std::string> &fetch_tensors,
                           const std::string &fetched_var_name) {
X
Xin Pan 已提交
307
  platform::RecordBlock b(0);
Y
Yu Yang 已提交
308 309 310
  auto fetch_data = member_->executor_->Run(fetch_tensors);
  *member_->global_scope_->Var(fetched_var_name)->GetMutable<FeedFetchList>() =
      fetch_data;
Y
Yu Yang 已提交
311
}
Y
Yu Yang 已提交
312

Y
Yu Yang 已提交
313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331
void ParallelExecutor::FeedTensorsIntoLocalScopes(
    const std::vector<std::unordered_map<std::string, LoDTensor>> &tensors) {
  PADDLE_ENFORCE_EQ(member_->local_scopes_.size(), tensors.size());

  for (size_t i = 0; i < tensors.size(); ++i) {
    auto &map = tensors[i];
    auto *scope = member_->local_scopes_[i];
    for (auto &pair : map) {
      auto *trg = scope->Var(pair.first)->GetMutable<LoDTensor>();
      trg->ShareDataWith(pair.second);
      trg->set_lod(pair.second.lod());
    }
  }
}

void ParallelExecutor::FeedAndSplitTensorIntoLocalScopes(
    const std::unordered_map<std::string, LoDTensor> &tensors) {
  for (auto pair : tensors) {
    auto lod_tensors = pair.second.SplitLoDTensor(member_->places_);
332 333 334 335 336
    PADDLE_ENFORCE_EQ(
        member_->places_.size(), lod_tensors.size(),
        "The number of samples of current batch is less than the count of "
        "devices, currently, it is not allowed. (%d vs %d)",
        member_->places_.size(), lod_tensors.size());
X
Xin Pan 已提交
337 338
    for (size_t j = 0; j < member_->places_.size(); ++j) {
      // TODO(panxy0718): Do I need to delete this var?
339
      auto t =
Y
Yu Yang 已提交
340
          member_->local_scopes_[j]->Var(pair.first)->GetMutable<LoDTensor>();
341 342
      t->ShareDataWith(lod_tensors[j]);
      t->set_lod(lod_tensors[j].lod());
X
Xin Pan 已提交
343 344 345 346
    }
  }
}

347
ParallelExecutor::~ParallelExecutor() {
C
chengduoZH 已提交
348
  if (member_->own_local_scope_) {
349 350 351 352 353 354
    for (size_t i = 1; i < member_->local_scopes_.size(); ++i) {
      member_->global_scope_->DeleteScope(member_->local_scopes_[i]);
    }
  }
}

Y
Yu Yang 已提交
355
}  // namespace framework
Y
Yang Yang 已提交
356
}  // namespace paddle
X
Xin Pan 已提交
357 358

USE_PASS(graph_viz_pass);
X
Xin Pan 已提交
359 360 361
USE_PASS(multi_device_pass);
USE_PASS(multi_device_check_pass);
USE_PASS(multi_device_print_pass);