parallel_executor.cc 15.1 KB
Newer Older
Y
Yang Yang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

#include "paddle/fluid/framework/parallel_executor.h"
Y
Yu Yang 已提交
16
#include "ThreadPool.h"
Y
Yu Yang 已提交
17
#include "lod_tensor.h"
Y
Yu Yang 已提交
18
#include "lod_tensor_array.h"
Y
Yu Yang 已提交
19
#include "op_registry.h"
Y
Yu Yang 已提交
20
#include "paddle/fluid/framework/details/computation_op_handle.h"
Y
Yu Yang 已提交
21
#include "paddle/fluid/framework/details/fetch_op_handle.h"
Y
Yu Yang 已提交
22
#include "paddle/fluid/framework/details/nccl_all_reduce_op_handle.h"
Y
Yu Yang 已提交
23
#include "paddle/fluid/framework/details/op_handle_base.h"
Y
Yu Yang 已提交
24
#include "paddle/fluid/framework/details/scale_loss_grad_op_handle.h"
Y
Yu Yang 已提交
25
#include "paddle/fluid/framework/details/var_handle.h"
Y
Yu Yang 已提交
26
#include "paddle/fluid/platform/nccl_helper.h"
Y
Yang Yang 已提交
27 28

namespace paddle {
Y
Yu Yang 已提交
29 30
namespace framework {

Y
Yu Yang 已提交
31
using details::ComputationOpHandle;
Y
Yu Yang 已提交
32
using details::DummyVarHandle;
Y
Yu Yang 已提交
33
using details::FetchOpHandle;
Y
Yu Yang 已提交
34
using details::NCCLAllReduceOpHandle;
Y
Yu Yang 已提交
35
using details::OpHandleBase;
Y
Yu Yang 已提交
36
using details::ScaleLossGradOpHandle;
Y
Yu Yang 已提交
37 38
using details::VarHandle;
using details::VarHandleBase;
Y
Yu Yang 已提交
39

Y
Yu Yang 已提交
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
struct SSAGraph {
  std::vector<std::unordered_map<std::string, std::map<int, VarHandle>>> vars_;
  std::unordered_set<std::unique_ptr<VarHandleBase>> dep_vars_;
  std::vector<std::unique_ptr<OpHandleBase>> ops_;
};

/**
 * We only handle write after read(WAR), since it should not have a write
 * after write in program. If there are write after write operators, we need
 * prune them.
 *
 * https://en.wikipedia.org/wiki/Hazard_(computer_architecture)#Write_after_read_(WAR)
 */
static void PolishGraphToSupportDataHazards(SSAGraph *graph) {
  for (auto &var_map : graph->vars_) {
    for (auto &name_pair : var_map) {
      if (name_pair.second.size() <= 1) {
        return;
      }
      auto it_new = name_pair.second.rbegin();
      auto it_old = name_pair.second.rbegin();
      ++it_old;
      for (; it_old != name_pair.second.rend(); it_new = it_old, ++it_old) {
        auto *write_op = it_new->second.generated_op_;
        auto &read_ops = it_old->second.pending_ops_;
        auto *ex_write_op = it_old->second.generated_op_;

        if (ex_write_op == nullptr) {  // Nobody write this var.
          continue;
        }

        for (auto *read_op : read_ops) {
          // Manually add a dependency var from read_op to write_op;
          if (read_op == write_op) {
            // Read Write is the same op.
            continue;
          }

          auto *dep_var = new DummyVarHandle();
          read_op->AddOutput(dep_var);
          write_op->AddInput(dep_var);
          graph->dep_vars_.emplace(dep_var);
        }
      }
    }
  }
}

static VarHandle *CreateOrGetLatestVarHandle(SSAGraph *graph,
                                             const std::string &each_var_name,
                                             const platform::Place &place,
                                             size_t place_offset) {
  auto &var_holders = graph->vars_[place_offset];
  auto &var_holder = var_holders[each_var_name];
  VarHandle *var = nullptr;
  if (var_holder.empty()) {
    auto &init_var = var_holder[0];
    init_var.place_ = place;
    init_var.name_ = each_var_name;
    init_var.generated_op_ = nullptr;
    init_var.version_ = 0;
    var = &init_var;
  } else {
    var = &var_holder.rbegin()->second;
  }
  return var;
}

static void CreateOpOutput(SSAGraph *graph, OpHandleBase *op_handle,
                           const std::string &each_var_name,
                           const platform::Place &place, size_t place_offset) {
  auto &vars = graph->vars_[place_offset][each_var_name];
  size_t version = vars.size();
  auto &var = vars[version];
  var.version_ = version;
  var.name_ = each_var_name;
  var.place_ = place;
  op_handle->AddOutput(&var);
}

Y
Yu Yang 已提交
120 121
class ParallelExecutorPrivate {
 public:
Y
Yu Yang 已提交
122 123 124 125
  explicit ParallelExecutorPrivate(size_t num_threads,
                                   const std::vector<platform::Place> &places)
      : places_(places),
        fetch_dev_ctxs_(places),
Y
Yu Yang 已提交
126
        pool_(num_threads <= 1 ? nullptr : new ThreadPool(num_threads)) {
Y
Yu Yang 已提交
127
    graph_.vars_.resize(places.size());
Y
Yu Yang 已提交
128
  }
Y
Yu Yang 已提交
129

Y
Stash  
Yu Yang 已提交
130
  std::vector<platform::Place> places_;
Y
Yu Yang 已提交
131
  platform::DeviceContextPool fetch_dev_ctxs_;
Y
Yu Yang 已提交
132
  std::vector<Scope *> local_scopes_;
Y
Yu Yang 已提交
133
  Scope *global_scope_;
Y
Yu Yang 已提交
134

Y
Yu Yang 已提交
135
  std::unique_ptr<platform::NCCLContextMap> nccl_ctxs_;
Y
Yu Yang 已提交
136

Y
Yu Yang 已提交
137
  SSAGraph graph_;
Y
Yu Yang 已提交
138

Y
Yu Yang 已提交
139
  // Use a simpler thread pool, might be faster.
Y
Yu Yang 已提交
140
  std::unique_ptr<ThreadPool> pool_;
Y
Yu Yang 已提交
141 142

  std::unique_ptr<platform::EnforceNotMet> exception_;
Y
Yu Yang 已提交
143

Y
Yu Yang 已提交
144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173
  void RunOp(
      bool use_event,
      std::unordered_map<VarHandleBase *, std::atomic<bool>> &pending_vars,
      OpHandleBase *op) {
    std::vector<std::atomic<bool> *> *ready_buffer =
        new std::vector<std::atomic<bool> *>();
    for (auto *var : op->outputs_) {
      ready_buffer->emplace_back(&pending_vars[var]);
    }

    auto op_run = [ready_buffer, op, this, use_event] {
      try {
        VLOG(10) << op->DebugString();
        op->Run(use_event);
        for (auto *ready : *ready_buffer) {
          ready->store(true, std::memory_order_release);
        }
        delete ready_buffer;
      } catch (platform::EnforceNotMet ex) {
        exception_.reset(new platform::EnforceNotMet(ex));
      } catch (...) {
        LOG(FATAL) << "Unknown exception catched";
      }
    };
    if (pool_) {
      pool_->enqueue(op_run);
    } else {
      op_run();
    }
  }
Y
Yu Yang 已提交
174 175
};

Y
Yu Yang 已提交
176
ParallelExecutor::ParallelExecutor(
Y
Yu Yang 已提交
177
    size_t num_threads, const std::vector<platform::Place> &places,
Y
Yu Yang 已提交
178 179 180
    const std::unordered_set<std::string> &params,
    const ProgramDesc &startup_program, const ProgramDesc &main_program,
    const std::string &loss_var_name, Scope *scope)
Y
Yu Yang 已提交
181
    : member_(new ParallelExecutorPrivate(num_threads, places)) {
Y
Yu Yang 已提交
182
  member_->global_scope_ = scope;
Y
Yu Yang 已提交
183

Y
Yu Yang 已提交
184 185 186 187
  // Step 1. RunStartupProgram and Bcast the params to devs.
  Executor exe(places[0]);
  exe.Run(startup_program, scope, 0);
  // Create local scopes
Y
Yu Yang 已提交
188 189
  for (size_t i = 0; i < member_->places_.size(); ++i) {
    member_->local_scopes_.push_back(&scope->NewScope());
Y
Yu Yang 已提交
190 191 192
  }

  // Bcast Parameters to all GPUs
Y
Yu Yang 已提交
193
  BuildNCCLCommunicator();
Y
Yu Yang 已提交
194
  if (platform::is_gpu_place(places[0]) &&
Y
Yu Yang 已提交
195 196
      member_->local_scopes_.size() != 1) {  // Is CUDA
    BCastParamsToGPUs(startup_program);
Y
Yu Yang 已提交
197 198 199 200 201 202
  }
  // Startup Program has been run. All local scopes has correct parameters.

  // Step 2. Convert main_program to SSA form and dependency graph. Also, insert
  // ncclOp
  ConstructDependencyGraph(params, main_program, loss_var_name);
Y
Yu Yang 已提交
203 204

  // Step 3. Create vars in each scope;
Y
Yu Yang 已提交
205
  for (auto *scope : member_->local_scopes_) {
Y
Yu Yang 已提交
206 207 208 209 210 211 212 213
    for (auto *var : main_program.Block(0).AllVars()) {
      if (scope->FindVar(var->Name()) != nullptr) {
        continue;
      }

      InitializeVariable(scope->Var(var->Name()), var->GetType());
    }
  }
Y
Yu Yang 已提交
214 215 216 217 218
}

void ParallelExecutor::ConstructDependencyGraph(
    const std::unordered_set<std::string> &params,
    const ProgramDesc &main_program, const std::string &loss_var_name) const {
Y
Yu Yang 已提交
219
  std::unordered_set<std::string> grads;
Y
Yu Yang 已提交
220 221 222 223 224 225 226 227 228 229 230 231 232 233 234
  for (auto &each_param : params) {
    grads.insert(each_param + "@GRAD");
  }

  bool is_forwarding = true;
  for (auto *op : main_program.Block(0).AllOps()) {
    bool change_forward = false;
    if (!is_forwarding) {
      // FIXME(yy): Do not hard code like this
      if (op->OutputArgumentNames().size() == 1 &&
          op->OutputArgumentNames()[0] == loss_var_name + "@GRAD") {
        continue;  // Drop fill 1. for backward coeff;
      }
    }

Y
Yu Yang 已提交
235 236 237 238
    for (size_t i = 0; i < member_->places_.size(); ++i) {
      auto &p = member_->places_[i];
      auto *s = member_->local_scopes_[i];

Y
Yu Yang 已提交
239 240
      member_->graph_.ops_.emplace_back(new ComputationOpHandle(*op, s, p));
      auto *op_handle = member_->graph_.ops_.back().get();
Y
Yu Yang 已提交
241 242
      op_handle->dev_ctx_[p] = const_cast<platform::DeviceContext *>(
          platform::DeviceContextPool::Instance().Get(p));
Y
Yu Yang 已提交
243 244 245 246

      auto var_names = op->InputArgumentNames();

      for (auto &each_var_name : var_names) {
Y
Yu Yang 已提交
247 248
        VarHandle *var =
            CreateOrGetLatestVarHandle(&member_->graph_, each_var_name, p, i);
Y
Yu Yang 已提交
249
        op_handle->AddInput(var);
Y
Yu Yang 已提交
250 251 252 253
      }
      var_names = op->OutputArgumentNames();

      for (auto &each_var_name : var_names) {
Y
Yu Yang 已提交
254
        CreateOpOutput(&member_->graph_, op_handle, each_var_name, p, i);
Y
Yu Yang 已提交
255 256 257 258 259
      }

      if (is_forwarding) {
        if (var_names.size() == 1 && var_names[0] == loss_var_name) {
          // Insert ScaleCost OpHandle
Y
Yu Yang 已提交
260 261 262
          op_handle =
              new ScaleLossGradOpHandle(this->member_->local_scopes_.size(), s,
                                        p, member_->nccl_ctxs_->DevCtx(p));
Y
Yu Yang 已提交
263
          member_->graph_.ops_.emplace_back(op_handle);
Y
Yu Yang 已提交
264

Y
Yu Yang 已提交
265 266 267 268 269 270
          // FIXME: Currently ScaleLossGradOp only use device_count as scale
          // factor. So it does not depend on any other operators.
          // VarHandle *loss = GetVarHandle(loss_var_name, place);
          // loss->pending_ops_.emplace_back(op_handle);
          // op_handle->inputs_.emplace_back(loss);

Y
Yu Yang 已提交
271 272
          CreateOpOutput(&member_->graph_, op_handle, loss_var_name + "@GRAD",
                         p, i);
Y
Yu Yang 已提交
273 274 275 276 277 278 279 280 281 282 283 284 285 286
          change_forward = true;
        }
      }
    }

    if (change_forward) {
      is_forwarding = false;
    }

    if (!is_forwarding) {
      auto var_names = op->OutputArgumentNames();
      for (auto &og : var_names) {
        if (grads.count(og) != 0) {  // is param grad
          // Insert NCCL AllReduce Op
Y
Yu Yang 已提交
287
          member_->graph_.ops_.emplace_back(new NCCLAllReduceOpHandle(
Y
Yu Yang 已提交
288
              member_->local_scopes_, member_->places_, *member_->nccl_ctxs_));
Y
Yu Yang 已提交
289
          auto *op_handle = member_->graph_.ops_.back().get();
Y
Yu Yang 已提交
290

Y
Yu Yang 已提交
291 292
          for (size_t i = 0; i < member_->places_.size(); ++i) {
            auto &p = member_->places_[i];
Y
Yu Yang 已提交
293
            auto &vars = member_->graph_.vars_[i][og];
Y
Yu Yang 已提交
294 295 296 297 298

            if (vars.empty()) {  // This device has no data. continue.
              continue;
            }
            auto *prev_grad = &vars[vars.size() - 1];
Y
Yu Yang 已提交
299 300
            op_handle->AddInput(prev_grad);

Y
Yu Yang 已提交
301
            auto &var = vars[vars.size()];
Y
Yu Yang 已提交
302
            var.place_ = p;
Y
Yu Yang 已提交
303 304
            var.name_ = og;
            var.version_ = vars.size() - 1;
Y
Yu Yang 已提交
305 306

            op_handle->AddOutput(&var);
Y
Yu Yang 已提交
307 308 309 310 311
          }
        }
      }
    }
  }
Y
Yu Yang 已提交
312

Y
Yu Yang 已提交
313 314 315
  /*
    Dependency graph has been constructed. However, there are still data
    harzaeds need to be handled.
Y
Yu Yang 已提交
316
   */
Y
Yu Yang 已提交
317
  PolishGraphToSupportDataHazards(&member_->graph_);
Y
Yu Yang 已提交
318 319 320 321
}

void ParallelExecutor::BCastParamsToGPUs(
    const ProgramDesc &startup_program) const {
Y
Yu Yang 已提交
322
#ifdef PADDLE_WITH_CUDA
Y
Yu Yang 已提交
323
  auto *main_scope = member_->local_scopes_[0];
Y
Yu Yang 已提交
324

Y
Yu Yang 已提交
325 326 327 328
  for (auto *var_desc : startup_program.Block(0).AllVars()) {
    if (var_desc->GetType() == proto::VarType::LOD_TENSOR) {
      auto &main_tensor =
          main_scope->FindVar(var_desc->Name())->Get<LoDTensor>();
Y
Yu Yang 已提交
329
      ncclDataType_t data_type = platform::ToNCCLDataType(main_tensor.type());
Y
Yu Yang 已提交
330 331 332
      auto &dims = main_tensor.dims();
      size_t numel = main_tensor.numel();

Y
Yu Yang 已提交
333
      platform::NCCLGroupGuard guard;
Y
Yu Yang 已提交
334

Y
Update  
Yu Yang 已提交
335 336 337 338 339 340
      for (size_t i = 0; i < member_->places_.size(); ++i) {
        auto place = member_->places_[i];
        void *buffer;
        if (i == 0) {
          buffer = const_cast<void *>(main_tensor.data<void>());
        } else {
Y
Yu Yang 已提交
341
          auto local_scope = member_->local_scopes_[i];
Y
Update  
Yu Yang 已提交
342 343 344 345 346
          auto *t = local_scope->Var(var_desc->Name())->GetMutable<LoDTensor>();
          t->Resize(dims);
          buffer = t->mutable_data(place, main_tensor.type());
        }

Y
Yu Yang 已提交
347
        auto &nccl_ctx = member_->nccl_ctxs_->at(place);
Y
Yu Yang 已提交
348 349
        platform::dynload::ncclBcast(buffer, numel, data_type, 0,
                                     nccl_ctx.comm_, nccl_ctx.stream());
Y
Yu Yang 已提交
350
      }
Y
Stash  
Yu Yang 已提交
351
    }
Y
Yu Yang 已提交
352
    member_->nccl_ctxs_->WaitAll();
Y
Stash  
Yu Yang 已提交
353
  }
Y
Yu Yang 已提交
354 355 356 357
#else
  PADDLE_THROW("Not compiled with CUDA");
#endif
}
Y
Yu Yang 已提交
358

Y
Yu Yang 已提交
359 360
void ParallelExecutor::BuildNCCLCommunicator() const {
#ifdef PADDLE_WITH_CUDA
Y
Yu Yang 已提交
361
  member_->nccl_ctxs_.reset(new platform::NCCLContextMap(member_->places_));
Y
Yu Yang 已提交
362
#endif
Y
Yu Yang 已提交
363 364
}

Y
Yu Yang 已提交
365 366
void ParallelExecutor::Run(const std::vector<std::string> &fetch_tensors,
                           const std::string &fetched_var_name) {
Y
Yu Yang 已提交
367
  bool use_event = true;
Y
Debug  
Yu Yang 已提交
368
  FeedFetchList fetched_data(fetch_tensors.size());
Y
Yu Yang 已提交
369
  // Version --> VarHandle
Y
Yu Yang 已提交
370
  member_->exception_.reset();
Y
Yu Yang 已提交
371
  std::unordered_map<VarHandleBase *, std::atomic<bool>> pending_vars;
Y
Yu Yang 已提交
372
  std::unordered_map<OpHandleBase *, size_t> pending_ops;
Y
Yu Yang 已提交
373
  std::vector<DummyVarHandle> dummy_vars;
Y
Yu Yang 已提交
374

Y
Yu Yang 已提交
375
  for (auto &var_map : member_->graph_.vars_) {
Y
Yu Yang 已提交
376
    for (auto &name_pair : var_map) {
Y
Yu Yang 已提交
377
      for (auto &version_pair : name_pair.second) {
Y
Yu Yang 已提交
378 379
        pending_vars[&version_pair.second] =
            version_pair.second.generated_op_ == nullptr;
Y
Yu Yang 已提交
380 381 382 383
      }
    }
  }

Y
Yu Yang 已提交
384
  for (auto &var : member_->graph_.dep_vars_) {
Y
Yu Yang 已提交
385
    pending_vars[var.get()] = var->generated_op_ == nullptr;
Y
Yu Yang 已提交
386 387
  }

Y
Yu Yang 已提交
388
  std::vector<OpHandleBase *> to_run;
Y
Yu Yang 已提交
389

Y
Yu Yang 已提交
390
  for (auto &op : member_->graph_.ops_) {
Y
Yu Yang 已提交
391 392 393 394 395 396 397
    if (op->inputs_.empty()) {  // Special case, Op has no input.
      to_run.emplace_back(op.get());
    } else {
      pending_ops.insert({op.get(), op->inputs_.size()});
    }
  }

Y
Yu Yang 已提交
398 399 400
  std::unordered_map<std::string, std::vector<VarHandleBase *>> fetched_vars;

  for (auto &fetch_var_name : fetch_tensors) {
Y
Yu Yang 已提交
401
    for (auto &var_map : member_->graph_.vars_) {
Y
Yu Yang 已提交
402 403
      auto it = var_map.find(fetch_var_name);
      if (it != var_map.end()) {
Y
Yu Yang 已提交
404 405 406 407 408 409 410 411 412 413
        fetched_vars[fetch_var_name].push_back(&it->second.rbegin()->second);
      }
    }
  }

  std::vector<FetchOpHandle> fetch_ops;

  for (size_t i = 0; i < fetch_tensors.size(); ++i) {
    auto &var_name = fetch_tensors[i];
    auto &vars = fetched_vars[var_name];
Y
Yu Yang 已提交
414
    fetch_ops.emplace_back(&fetched_data, i, &member_->local_scopes_);
Y
Yu Yang 已提交
415
    FetchOpHandle *op = &fetch_ops.back();
Y
Yu Yang 已提交
416 417

    // FIXME: Use new device context
Y
Yu Yang 已提交
418
    for (auto &p : member_->places_) {
Y
Yu Yang 已提交
419
      op->dev_ctx_[p] = member_->fetch_dev_ctxs_.Get(p);
Y
Yu Yang 已提交
420 421 422
    }

    for (auto *var : vars) {
Y
Yu Yang 已提交
423
      op->AddInput(var);
Y
Yu Yang 已提交
424
    }
Y
Yu Yang 已提交
425 426 427

    dummy_vars.emplace_back();
    auto *var = &dummy_vars.back();
Y
Yu Yang 已提交
428
    op->AddOutput(var);
Y
Yu Yang 已提交
429 430
    pending_vars[var] = false;

Y
Yu Yang 已提交
431 432 433
    pending_ops.insert({op, op->inputs_.size()});
  }

Y
Yu Yang 已提交
434
  for (auto *op : to_run) {
Y
Yu Yang 已提交
435
    member_->RunOp(use_event, pending_vars, op);
Y
Yu Yang 已提交
436 437
  }

Y
Yu Yang 已提交
438
  while (!pending_vars.empty()) {
Y
Yu Yang 已提交
439
    VarHandleBase *ready_var = nullptr;
Y
Yu Yang 已提交
440
    for (auto &pair : pending_vars) {
Y
Yu Yang 已提交
441
      if (pair.second.load(std::memory_order_acquire)) {
Y
Yu Yang 已提交
442
        ready_var = pair.first;
Y
Yu Yang 已提交
443 444
      }
    }
Y
Yu Yang 已提交
445
    if (ready_var == nullptr) {
Y
Yu Yang 已提交
446 447 448 449
      // FIXME use conditional var instead of busy wait.
      if (member_->exception_) {
        throw * member_->exception_;
      }
Y
Yu Yang 已提交
450
      continue;
Y
Yu Yang 已提交
451
    }
Y
Yu Yang 已提交
452
    pending_vars.erase(ready_var);
Y
Yu Yang 已提交
453
    to_run.clear();
Y
Yu Yang 已提交
454 455 456 457 458
    for (auto *op : ready_var->pending_ops_) {
      auto &deps = pending_ops[op];
      --deps;
      if (deps == 0) {
        to_run.emplace_back(op);
Y
Yu Yang 已提交
459 460 461 462
      }
    }
    for (auto *op : to_run) {
      pending_ops.erase(op);
Y
Yu Yang 已提交
463
      member_->RunOp(use_event, pending_vars, op);
Y
Yu Yang 已提交
464 465
    }
  }
Y
Yu Yang 已提交
466

Y
Debug  
Yu Yang 已提交
467 468 469 470 471 472
  for (auto &fetch_op : fetch_ops) {
    fetch_op.WaitAndMergeCPUTensors();
  }

  *member_->global_scope_->Var(fetched_var_name)->GetMutable<FeedFetchList>() =
      fetched_data;
Y
Yu Yang 已提交
473
}
Y
Yu Yang 已提交
474

Y
Yu Yang 已提交
475
}  // namespace framework
Y
Yang Yang 已提交
476
}  // namespace paddle