parallel_executor.cc 14.9 KB
Newer Older
Y
Yang Yang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

#include "paddle/fluid/framework/parallel_executor.h"
Y
Yu Yang 已提交
16
#include "ThreadPool.h"
Y
Yu Yang 已提交
17
#include "lod_tensor.h"
Y
Yu Yang 已提交
18
#include "lod_tensor_array.h"
Y
Yu Yang 已提交
19
#include "op_registry.h"
Y
Yu Yang 已提交
20
#include "paddle/fluid/framework/details/computation_op_handle.h"
Y
Yu Yang 已提交
21
#include "paddle/fluid/framework/details/fetch_op_handle.h"
Y
Yu Yang 已提交
22
#include "paddle/fluid/framework/details/nccl_all_reduce_op_handle.h"
Y
Yu Yang 已提交
23
#include "paddle/fluid/framework/details/op_handle_base.h"
Y
Yu Yang 已提交
24
#include "paddle/fluid/framework/details/scale_loss_grad_op_handle.h"
Y
Yu Yang 已提交
25
#include "paddle/fluid/framework/details/var_handle.h"
Y
Yu Yang 已提交
26
#include "paddle/fluid/platform/nccl_helper.h"
Y
Yang Yang 已提交
27 28

namespace paddle {
Y
Yu Yang 已提交
29 30
namespace framework {

Y
Yu Yang 已提交
31
using details::DummyVarHandle;
Y
Yu Yang 已提交
32
using details::FetchOpHandle;
Y
Yu Yang 已提交
33
using details::NCCLAllReduceOpHandle;
Y
Yu Yang 已提交
34
using details::OpHandleBase;
Y
Yu Yang 已提交
35
using details::ScaleLossGradOpHandle;
Y
Yu Yang 已提交
36 37
using details::VarHandle;
using details::VarHandleBase;
Y
Yu Yang 已提交
38
using details::ComputationOpHandle;
Y
Yu Yang 已提交
39 40 41

class ParallelExecutorPrivate {
 public:
Y
Yu Yang 已提交
42 43 44 45 46
  explicit ParallelExecutorPrivate(size_t num_threads,
                                   const std::vector<platform::Place> &places)
      : places_(places),
        fetch_dev_ctxs_(places),
        pool_(num_threads <= 1 ? nullptr : new ThreadPool(num_threads)) {}
Y
Yu Yang 已提交
47

Y
Stash  
Yu Yang 已提交
48
  std::vector<platform::Place> places_;
Y
Yu Yang 已提交
49
  platform::DeviceContextPool fetch_dev_ctxs_;
Y
Yu Yang 已提交
50
  std::vector<Scope *> local_scopes_;
Y
Yu Yang 已提交
51
  Scope *global_scope_;
Y
Yu Yang 已提交
52

Y
Yu Yang 已提交
53
  std::unique_ptr<platform::NCCLContextMap> nccl_ctxs_;
Y
Yu Yang 已提交
54

Y
Yu Yang 已提交
55 56 57 58 59 60
  platform::Place main_place_;

  std::unordered_map<platform::Place,
                     std::unordered_map<std::string, std::map<int, VarHandle>>,
                     platform::PlaceHash>
      vars_;
Y
Yu Yang 已提交
61

Y
Yu Yang 已提交
62 63
  std::unordered_set<std::unique_ptr<VarHandleBase>> dep_vars_;

Y
Yu Yang 已提交
64
  std::vector<std::unique_ptr<OpHandleBase>> ops_;
Y
Yu Yang 已提交
65

Y
Yu Yang 已提交
66
  // Use a simpler thread pool, might be faster.
Y
Yu Yang 已提交
67
  std::unique_ptr<ThreadPool> pool_;
Y
Yu Yang 已提交
68 69

  std::unique_ptr<platform::EnforceNotMet> exception_;
Y
Yu Yang 已提交
70

Y
Yu Yang 已提交
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
  VarHandle *GetVarHandle(const std::string &each_var_name,
                          const platform::Place &place) {
    auto &var_holders = vars_[place];
    auto &var_holder = var_holders[each_var_name];
    VarHandle *var = nullptr;
    if (var_holder.empty()) {
      auto &init_var = var_holder[0];
      init_var.place_ = place;
      init_var.name_ = each_var_name;
      init_var.generated_op_ = nullptr;
      init_var.version_ = 0;
      var = &init_var;
    } else {
      var = &var_holder.rbegin()->second;
    }
    return var;
  }
Y
Yu Yang 已提交
88

Y
Yu Yang 已提交
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
  void RunOp(
      bool use_event,
      std::unordered_map<VarHandleBase *, std::atomic<bool>> &pending_vars,
      OpHandleBase *op) {
    std::vector<std::atomic<bool> *> *ready_buffer =
        new std::vector<std::atomic<bool> *>();
    for (auto *var : op->outputs_) {
      ready_buffer->emplace_back(&pending_vars[var]);
    }

    auto op_run = [ready_buffer, op, this, use_event] {
      try {
        VLOG(10) << op->DebugString();
        op->Run(use_event);
        for (auto *ready : *ready_buffer) {
          ready->store(true, std::memory_order_release);
        }
        delete ready_buffer;
      } catch (platform::EnforceNotMet ex) {
        exception_.reset(new platform::EnforceNotMet(ex));
      } catch (...) {
        LOG(FATAL) << "Unknown exception catched";
      }
    };
    if (pool_) {
      pool_->enqueue(op_run);
    } else {
      op_run();
    }
  }

  void GenerateVar(OpHandleBase *op_handle, const std::string &each_var_name,
                   const platform::Place &place) {
    auto &vars = vars_[place][each_var_name];
    size_t version = vars.size();
    auto &var = vars[version];
    var.version_ = version;
    var.name_ = each_var_name;
    var.place_ = place;
Y
Yu Yang 已提交
128
    op_handle->AddOutput(&var);
Y
Yu Yang 已提交
129
  }
Y
Yu Yang 已提交
130 131
};

Y
Yu Yang 已提交
132
ParallelExecutor::ParallelExecutor(
Y
Yu Yang 已提交
133
    size_t num_threads, const std::vector<platform::Place> &places,
Y
Yu Yang 已提交
134 135 136
    const std::unordered_set<std::string> &params,
    const ProgramDesc &startup_program, const ProgramDesc &main_program,
    const std::string &loss_var_name, Scope *scope)
Y
Yu Yang 已提交
137
    : member_(new ParallelExecutorPrivate(num_threads, places)) {
Y
Yu Yang 已提交
138
  member_->global_scope_ = scope;
Y
Yu Yang 已提交
139

Y
Yu Yang 已提交
140 141 142 143
  // Step 1. RunStartupProgram and Bcast the params to devs.
  Executor exe(places[0]);
  exe.Run(startup_program, scope, 0);
  // Create local scopes
Y
Yu Yang 已提交
144 145
  for (size_t i = 0; i < member_->places_.size(); ++i) {
    member_->local_scopes_.push_back(&scope->NewScope());
Y
Yu Yang 已提交
146 147 148 149
  }
  member_->main_place_ = places[0];

  // Bcast Parameters to all GPUs
Y
Yu Yang 已提交
150
  BuildNCCLCommunicator();
Y
Yu Yang 已提交
151 152 153
  if (platform::is_gpu_place(member_->main_place_) &&
      member_->local_scopes_.size() != 1) {  // Is CUDA
    BCastParamsToGPUs(startup_program);
Y
Yu Yang 已提交
154 155 156 157 158 159
  }
  // Startup Program has been run. All local scopes has correct parameters.

  // Step 2. Convert main_program to SSA form and dependency graph. Also, insert
  // ncclOp
  ConstructDependencyGraph(params, main_program, loss_var_name);
Y
Yu Yang 已提交
160 161

  // Step 3. Create vars in each scope;
Y
Yu Yang 已提交
162
  for (auto *scope : member_->local_scopes_) {
Y
Yu Yang 已提交
163 164 165 166 167 168 169 170
    for (auto *var : main_program.Block(0).AllVars()) {
      if (scope->FindVar(var->Name()) != nullptr) {
        continue;
      }

      InitializeVariable(scope->Var(var->Name()), var->GetType());
    }
  }
Y
Yu Yang 已提交
171 172 173 174 175
}

void ParallelExecutor::ConstructDependencyGraph(
    const std::unordered_set<std::string> &params,
    const ProgramDesc &main_program, const std::string &loss_var_name) const {
Y
Yu Yang 已提交
176
  std::unordered_set<std::string> grads;
Y
Yu Yang 已提交
177 178 179 180 181 182 183 184 185 186 187 188 189 190 191
  for (auto &each_param : params) {
    grads.insert(each_param + "@GRAD");
  }

  bool is_forwarding = true;
  for (auto *op : main_program.Block(0).AllOps()) {
    bool change_forward = false;
    if (!is_forwarding) {
      // FIXME(yy): Do not hard code like this
      if (op->OutputArgumentNames().size() == 1 &&
          op->OutputArgumentNames()[0] == loss_var_name + "@GRAD") {
        continue;  // Drop fill 1. for backward coeff;
      }
    }

Y
Yu Yang 已提交
192 193 194 195 196
    for (size_t i = 0; i < member_->places_.size(); ++i) {
      auto &p = member_->places_[i];
      auto *s = member_->local_scopes_[i];

      member_->ops_.emplace_back(new ComputationOpHandle(*op, s, p));
Y
Yu Yang 已提交
197
      auto *op_handle = member_->ops_.back().get();
Y
Yu Yang 已提交
198 199
      op_handle->dev_ctx_[p] = const_cast<platform::DeviceContext *>(
          platform::DeviceContextPool::Instance().Get(p));
Y
Yu Yang 已提交
200 201 202 203

      auto var_names = op->InputArgumentNames();

      for (auto &each_var_name : var_names) {
Y
Yu Yang 已提交
204
        VarHandle *var = member_->GetVarHandle(each_var_name, p);
Y
Yu Yang 已提交
205
        op_handle->AddInput(var);
Y
Yu Yang 已提交
206 207 208 209
      }
      var_names = op->OutputArgumentNames();

      for (auto &each_var_name : var_names) {
Y
Yu Yang 已提交
210
        member_->GenerateVar(op_handle, each_var_name, p);
Y
Yu Yang 已提交
211 212 213 214 215
      }

      if (is_forwarding) {
        if (var_names.size() == 1 && var_names[0] == loss_var_name) {
          // Insert ScaleCost OpHandle
Y
Yu Yang 已提交
216 217 218 219
          op_handle =
              new ScaleLossGradOpHandle(this->member_->local_scopes_.size(), s,
                                        p, member_->nccl_ctxs_->DevCtx(p));
          member_->ops_.emplace_back(op_handle);
Y
Yu Yang 已提交
220

Y
Yu Yang 已提交
221 222 223 224 225 226
          // FIXME: Currently ScaleLossGradOp only use device_count as scale
          // factor. So it does not depend on any other operators.
          // VarHandle *loss = GetVarHandle(loss_var_name, place);
          // loss->pending_ops_.emplace_back(op_handle);
          // op_handle->inputs_.emplace_back(loss);

Y
Yu Yang 已提交
227
          member_->GenerateVar(op_handle, loss_var_name + "@GRAD", p);
Y
Yu Yang 已提交
228 229 230 231 232 233 234 235 236 237 238 239 240 241
          change_forward = true;
        }
      }
    }

    if (change_forward) {
      is_forwarding = false;
    }

    if (!is_forwarding) {
      auto var_names = op->OutputArgumentNames();
      for (auto &og : var_names) {
        if (grads.count(og) != 0) {  // is param grad
          // Insert NCCL AllReduce Op
Y
Yu Yang 已提交
242
          member_->ops_.emplace_back(new NCCLAllReduceOpHandle(
Y
Yu Yang 已提交
243
              member_->local_scopes_, member_->places_, *member_->nccl_ctxs_));
Y
Yu Yang 已提交
244 245
          auto *op_handle = member_->ops_.back().get();

Y
Yu Yang 已提交
246 247 248
          for (size_t i = 0; i < member_->places_.size(); ++i) {
            auto &p = member_->places_[i];
            auto &vars = member_->vars_[p][og];
Y
Yu Yang 已提交
249 250 251 252 253

            if (vars.empty()) {  // This device has no data. continue.
              continue;
            }
            auto *prev_grad = &vars[vars.size() - 1];
Y
Yu Yang 已提交
254 255
            op_handle->AddInput(prev_grad);

Y
Yu Yang 已提交
256
            auto &var = vars[vars.size()];
Y
Yu Yang 已提交
257
            var.place_ = p;
Y
Yu Yang 已提交
258 259
            var.name_ = og;
            var.version_ = vars.size() - 1;
Y
Yu Yang 已提交
260 261

            op_handle->AddOutput(&var);
Y
Yu Yang 已提交
262 263 264 265 266
          }
        }
      }
    }
  }
Y
Yu Yang 已提交
267

Y
Yu Yang 已提交
268 269 270
  /*
    Dependency graph has been constructed. However, there are still data
    harzaeds need to be handled.
Y
Yu Yang 已提交
271
   */
272
  PolishGraphToSupportDataHazards();
Y
Yu Yang 已提交
273
}
Y
Yu Yang 已提交
274

Y
Yu Yang 已提交
275 276 277 278 279 280 281
/**
 * We only handle write after read(WAR), since it should not have a write
 * after write in program. If there are write after write operators, we need
 * prune them.
 *
 * https://en.wikipedia.org/wiki/Hazard_(computer_architecture)#Write_after_read_(WAR)
 */
282
void ParallelExecutor::PolishGraphToSupportDataHazards() const {
Y
Yu Yang 已提交
283 284 285 286 287 288 289 290 291 292 293
  for (auto &place_pair : member_->vars_) {
    for (auto &name_pair : place_pair.second) {
      if (name_pair.second.size() <= 1) {
        return;
      }
      auto it_new = name_pair.second.rbegin();
      auto it_old = name_pair.second.rbegin();
      ++it_old;
      for (; it_old != name_pair.second.rend(); it_new = it_old, ++it_old) {
        auto *write_op = it_new->second.generated_op_;
        auto &read_ops = it_old->second.pending_ops_;
Y
Yu Yang 已提交
294 295 296 297 298 299
        auto *ex_write_op = it_old->second.generated_op_;

        if (ex_write_op == nullptr) {  // Nobody write this var.
          continue;
        }

Y
Yu Yang 已提交
300 301
        for (auto *read_op : read_ops) {
          // Manually add a dependency var from read_op to write_op;
Y
Yu Yang 已提交
302 303 304 305
          if (read_op == write_op) {
            // Read Write is the same op.
            continue;
          }
Y
Yu Yang 已提交
306

Y
Yu Yang 已提交
307
          auto *dep_var = new DummyVarHandle();
Y
Yu Yang 已提交
308 309
          read_op->AddOutput(dep_var);
          write_op->AddInput(dep_var);
Y
Yu Yang 已提交
310 311 312 313 314
          member_->dep_vars_.emplace(dep_var);
        }
      }
    }
  }
Y
Yu Yang 已提交
315 316 317 318
}

void ParallelExecutor::BCastParamsToGPUs(
    const ProgramDesc &startup_program) const {
Y
Yu Yang 已提交
319
#ifdef PADDLE_WITH_CUDA
Y
Yu Yang 已提交
320
  auto *main_scope = member_->local_scopes_[0];
Y
Yu Yang 已提交
321

Y
Yu Yang 已提交
322 323 324 325
  for (auto *var_desc : startup_program.Block(0).AllVars()) {
    if (var_desc->GetType() == proto::VarType::LOD_TENSOR) {
      auto &main_tensor =
          main_scope->FindVar(var_desc->Name())->Get<LoDTensor>();
Y
Yu Yang 已提交
326
      ncclDataType_t data_type = platform::ToNCCLDataType(main_tensor.type());
Y
Yu Yang 已提交
327 328 329
      auto &dims = main_tensor.dims();
      size_t numel = main_tensor.numel();

Y
Yu Yang 已提交
330
      platform::NCCLGroupGuard guard;
Y
Yu Yang 已提交
331

Y
Update  
Yu Yang 已提交
332 333 334 335 336 337
      for (size_t i = 0; i < member_->places_.size(); ++i) {
        auto place = member_->places_[i];
        void *buffer;
        if (i == 0) {
          buffer = const_cast<void *>(main_tensor.data<void>());
        } else {
Y
Yu Yang 已提交
338
          auto local_scope = member_->local_scopes_[i];
Y
Update  
Yu Yang 已提交
339 340 341 342 343
          auto *t = local_scope->Var(var_desc->Name())->GetMutable<LoDTensor>();
          t->Resize(dims);
          buffer = t->mutable_data(place, main_tensor.type());
        }

Y
Yu Yang 已提交
344
        auto &nccl_ctx = member_->nccl_ctxs_->at(place);
Y
Yu Yang 已提交
345 346
        platform::dynload::ncclBcast(buffer, numel, data_type, 0,
                                     nccl_ctx.comm_, nccl_ctx.stream());
Y
Yu Yang 已提交
347
      }
Y
Stash  
Yu Yang 已提交
348
    }
Y
Yu Yang 已提交
349
    member_->nccl_ctxs_->WaitAll();
Y
Stash  
Yu Yang 已提交
350
  }
Y
Yu Yang 已提交
351 352 353 354
#else
  PADDLE_THROW("Not compiled with CUDA");
#endif
}
Y
Yu Yang 已提交
355

Y
Yu Yang 已提交
356 357
void ParallelExecutor::BuildNCCLCommunicator() const {
#ifdef PADDLE_WITH_CUDA
Y
Yu Yang 已提交
358
  member_->nccl_ctxs_.reset(new platform::NCCLContextMap(member_->places_));
Y
Yu Yang 已提交
359
#endif
Y
Yu Yang 已提交
360 361
}

Y
Yu Yang 已提交
362 363
void ParallelExecutor::Run(const std::vector<std::string> &fetch_tensors,
                           const std::string &fetched_var_name) {
Y
Yu Yang 已提交
364
  bool use_event = true;
Y
Debug  
Yu Yang 已提交
365
  FeedFetchList fetched_data(fetch_tensors.size());
Y
Yu Yang 已提交
366
  // Version --> VarHandle
Y
Yu Yang 已提交
367
  member_->exception_.reset();
Y
Yu Yang 已提交
368
  std::unordered_map<VarHandleBase *, std::atomic<bool>> pending_vars;
Y
Yu Yang 已提交
369
  std::unordered_map<OpHandleBase *, size_t> pending_ops;
Y
Yu Yang 已提交
370
  std::vector<DummyVarHandle> dummy_vars;
Y
Yu Yang 已提交
371 372 373 374

  for (auto &place_pair : member_->vars_) {
    for (auto &name_pair : place_pair.second) {
      for (auto &version_pair : name_pair.second) {
Y
Yu Yang 已提交
375 376
        pending_vars[&version_pair.second] =
            version_pair.second.generated_op_ == nullptr;
Y
Yu Yang 已提交
377 378 379 380
      }
    }
  }

Y
Yu Yang 已提交
381
  for (auto &var : member_->dep_vars_) {
Y
Yu Yang 已提交
382
    pending_vars[var.get()] = var->generated_op_ == nullptr;
Y
Yu Yang 已提交
383 384
  }

Y
Yu Yang 已提交
385
  std::vector<OpHandleBase *> to_run;
Y
Yu Yang 已提交
386

Y
Yu Yang 已提交
387
  for (auto &op : member_->ops_) {
Y
Yu Yang 已提交
388 389 390 391 392 393 394
    if (op->inputs_.empty()) {  // Special case, Op has no input.
      to_run.emplace_back(op.get());
    } else {
      pending_ops.insert({op.get(), op->inputs_.size()});
    }
  }

Y
Yu Yang 已提交
395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410
  std::unordered_map<std::string, std::vector<VarHandleBase *>> fetched_vars;

  for (auto &fetch_var_name : fetch_tensors) {
    for (auto &pair : member_->vars_) {
      auto it = pair.second.find(fetch_var_name);
      if (it != pair.second.end()) {
        fetched_vars[fetch_var_name].push_back(&it->second.rbegin()->second);
      }
    }
  }

  std::vector<FetchOpHandle> fetch_ops;

  for (size_t i = 0; i < fetch_tensors.size(); ++i) {
    auto &var_name = fetch_tensors[i];
    auto &vars = fetched_vars[var_name];
Y
Yu Yang 已提交
411
    fetch_ops.emplace_back(&fetched_data, i, &member_->local_scopes_);
Y
Yu Yang 已提交
412
    FetchOpHandle *op = &fetch_ops.back();
Y
Yu Yang 已提交
413 414

    // FIXME: Use new device context
Y
Yu Yang 已提交
415
    for (auto &p : member_->places_) {
Y
Yu Yang 已提交
416
      op->dev_ctx_[p] = member_->fetch_dev_ctxs_.Get(p);
Y
Yu Yang 已提交
417 418 419
    }

    for (auto *var : vars) {
Y
Yu Yang 已提交
420
      op->AddInput(var);
Y
Yu Yang 已提交
421
    }
Y
Yu Yang 已提交
422 423 424

    dummy_vars.emplace_back();
    auto *var = &dummy_vars.back();
Y
Yu Yang 已提交
425
    op->AddOutput(var);
Y
Yu Yang 已提交
426 427
    pending_vars[var] = false;

Y
Yu Yang 已提交
428 429 430
    pending_ops.insert({op, op->inputs_.size()});
  }

Y
Yu Yang 已提交
431
  for (auto *op : to_run) {
Y
Yu Yang 已提交
432
    member_->RunOp(use_event, pending_vars, op);
Y
Yu Yang 已提交
433 434
  }

Y
Yu Yang 已提交
435
  while (!pending_vars.empty()) {
Y
Yu Yang 已提交
436
    VarHandleBase *ready_var = nullptr;
Y
Yu Yang 已提交
437
    for (auto &pair : pending_vars) {
Y
Yu Yang 已提交
438
      if (pair.second.load(std::memory_order_acquire)) {
Y
Yu Yang 已提交
439
        ready_var = pair.first;
Y
Yu Yang 已提交
440 441
      }
    }
Y
Yu Yang 已提交
442
    if (ready_var == nullptr) {
Y
Yu Yang 已提交
443 444 445 446
      // FIXME use conditional var instead of busy wait.
      if (member_->exception_) {
        throw * member_->exception_;
      }
Y
Yu Yang 已提交
447
      continue;
Y
Yu Yang 已提交
448
    }
Y
Yu Yang 已提交
449
    pending_vars.erase(ready_var);
Y
Yu Yang 已提交
450
    to_run.clear();
Y
Yu Yang 已提交
451 452 453 454 455
    for (auto *op : ready_var->pending_ops_) {
      auto &deps = pending_ops[op];
      --deps;
      if (deps == 0) {
        to_run.emplace_back(op);
Y
Yu Yang 已提交
456 457 458 459
      }
    }
    for (auto *op : to_run) {
      pending_ops.erase(op);
Y
Yu Yang 已提交
460
      member_->RunOp(use_event, pending_vars, op);
Y
Yu Yang 已提交
461 462
    }
  }
Y
Yu Yang 已提交
463

Y
Debug  
Yu Yang 已提交
464 465 466 467 468 469
  for (auto &fetch_op : fetch_ops) {
    fetch_op.WaitAndMergeCPUTensors();
  }

  *member_->global_scope_->Var(fetched_var_name)->GetMutable<FeedFetchList>() =
      fetched_data;
Y
Yu Yang 已提交
470
}
Y
Yu Yang 已提交
471

Y
Yu Yang 已提交
472
}  // namespace framework
Y
Yang Yang 已提交
473
}  // namespace paddle