parallel_executor.cc 23.8 KB
Newer Older
Y
Yang Yang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

#include "paddle/fluid/framework/parallel_executor.h"
Y
Yu Yang 已提交
16 17
#include "ThreadPool.h"
#include "executor.h"
Y
Yu Yang 已提交
18
#include "lod_tensor.h"
Y
Yu Yang 已提交
19
#include "lod_tensor_array.h"
Y
Yu Yang 已提交
20
#include "op_registry.h"
Y
Debug  
Yu Yang 已提交
21
#include "paddle/fluid/framework/feed_fetch_type.h"
Y
Yu Yang 已提交
22
#include "paddle/fluid/operators/math/concat.h"
Y
Yu Yang 已提交
23
#include "paddle/fluid/platform/nccl_helper.h"
Y
Yang Yang 已提交
24 25

namespace paddle {
Y
Yu Yang 已提交
26 27 28 29
namespace framework {

struct OpHandle;

Y
Yu Yang 已提交
30 31 32 33 34
struct VarHandleBase {
  virtual ~VarHandleBase() {}
  virtual std::string DebugString() const = 0;

  OpHandle *generated_op_;
Y
Yu Yang 已提交
35
  std::unordered_set<OpHandle *> pending_ops_;
Y
Yu Yang 已提交
36 37 38 39 40 41 42 43 44
};

struct VarHandle : public VarHandleBase {
  std::string DebugString() const override {
    std::stringstream ss;
    ss << name_ << ":" << place_;
    return ss.str();
  }

Y
Yu Yang 已提交
45 46
  // version field currently is not used, however, just store the version to
  // debug easily.
Y
Yu Yang 已提交
47 48 49
  size_t version_;
  std::string name_;
  platform::Place place_;
Y
Yu Yang 已提交
50
};
Y
Yu Yang 已提交
51

Y
Yu Yang 已提交
52 53 54 55
struct DummyVarHandle : public VarHandleBase {
  std::string DebugString() const override { return "dummy"; }
};

Y
Yu Yang 已提交
56
struct OpHandle {
Y
Yu Yang 已提交
57 58 59 60 61
  std::vector<VarHandleBase *> inputs_;
  std::vector<VarHandleBase *> outputs_;
  std::unordered_map<platform::Place, platform::DeviceContext *,
                     platform::PlaceHash>
      dev_ctx_;
Y
Yu Yang 已提交
62

Y
Yu Yang 已提交
63 64
  std::unordered_map<int, cudaEvent_t> events_;

Y
Yu Yang 已提交
65 66 67 68
  std::string DebugString() {
    std::stringstream ss;
    ss << "(";
    for (auto *var : inputs_) {
Y
Yu Yang 已提交
69
      ss << var->DebugString() << ", ";
Y
Yu Yang 已提交
70 71 72
    }
    ss << ") --> (";
    for (auto *var : outputs_) {
Y
Yu Yang 已提交
73
      ss << var->DebugString() << ", ";
Y
Yu Yang 已提交
74 75 76 77 78 79
    }
    ss << ")\n";
    return ss.str();
  }

  virtual ~OpHandle() {}
Y
Yu Yang 已提交
80

Y
Yu Yang 已提交
81 82
  void Run(bool use_event) {
    if (events_.empty() && use_event) {
Y
Yu Yang 已提交
83 84 85 86 87 88 89 90 91
      for (auto &p : dev_ctx_) {
        int dev_id = boost::get<platform::CUDAPlace>(p.first).device;
        cudaSetDevice(dev_id);
        cudaEventCreateWithFlags(&events_[dev_id], cudaEventDisableTiming);
      }
    }

    RunImpl();

Y
Yu Yang 已提交
92 93 94 95 96 97 98
    if (use_event) {
      for (auto &p : dev_ctx_) {
        int dev_id = boost::get<platform::CUDAPlace>(p.first).device;
        auto stream =
            static_cast<platform::CUDADeviceContext *>(p.second)->stream();
        cudaEventRecord(events_.at(dev_id), stream);
      }
Y
Yu Yang 已提交
99 100 101 102
    }
  }

  virtual void Wait(platform::DeviceContext *waited_dev) {
Y
Fix bug  
Yu Yang 已提交
103
    if (platform::is_cpu_place(waited_dev->GetPlace()) || events_.empty()) {
Y
Yu Yang 已提交
104 105 106 107 108 109 110 111 112 113 114 115 116 117
      for (auto &dev_ctx : dev_ctx_) {
        dev_ctx.second->Wait();
      }
    } else {
      auto stream =
          static_cast<platform::CUDADeviceContext *>(waited_dev)->stream();
      for (auto &ev : events_) {
        PADDLE_ENFORCE(cudaStreamWaitEvent(stream, ev.second, 0));
      }
    }
  }

 protected:
  virtual void RunImpl() = 0;
Y
Yu Yang 已提交
118 119
};

Y
Yu Yang 已提交
120 121 122 123 124 125 126 127 128
struct ScaleLossGradOpHandle : public OpHandle {
  float coeff_;
  Scope *scope_;
  platform::Place place_;

  explicit ScaleLossGradOpHandle(size_t num_dev, Scope *scope,
                                 platform::Place place)
      : coeff_(static_cast<float>(1.0 / num_dev)),
        scope_(scope),
Y
Yu Yang 已提交
129
        place_(place) {}
Y
Yu Yang 已提交
130

Y
Yu Yang 已提交
131
  ~ScaleLossGradOpHandle() {}
Y
Yu Yang 已提交
132

Y
Yu Yang 已提交
133 134
 protected:
  void RunImpl() override {
Y
Yu Yang 已提交
135
    std::string var_name = static_cast<VarHandle *>(this->outputs_[0])->name_;
Y
Yu Yang 已提交
136

Y
Yu Yang 已提交
137 138 139 140 141 142 143
    float *tmp = scope_->FindVar(var_name)
                     ->GetMutable<framework::LoDTensor>()
                     ->mutable_data<float>(make_ddim({1}), place_);

    if (platform::is_cpu_place(place_)) {
      *tmp = coeff_;
    } else {
Y
Yu Yang 已提交
144
      auto stream =
Y
Yu Yang 已提交
145
          static_cast<platform::CUDADeviceContext *>(this->dev_ctx_[place_])
Y
Yu Yang 已提交
146 147 148 149
              ->stream();
      memory::Copy(boost::get<platform::CUDAPlace>(place_), tmp,
                   platform::CPUPlace(), &coeff_, sizeof(float), stream);
    }
Y
Yu Yang 已提交
150
  }
Y
Yu Yang 已提交
151 152
};

Y
Yu Yang 已提交
153
struct FetchOpHandle : public OpHandle {
Y
Debug  
Yu Yang 已提交
154
  FeedFetchList *data_;
Y
Yu Yang 已提交
155 156 157 158 159 160 161 162 163 164
  size_t offset_;
  std::vector<Scope *> *local_scopes_;
  std::vector<LoDTensor> tensors_;

  ~FetchOpHandle() {
    for (auto *input_var : inputs_) {
      input_var->pending_ops_.erase(this);
    }
  }

Y
Yu Yang 已提交
165 166 167 168
  void Wait(platform::DeviceContext *waited_dev) override {
    PADDLE_THROW("Nobody should wait FetchOp. Unexpceted Error");
  }

Y
Debug  
Yu Yang 已提交
169 170 171 172 173 174 175 176 177 178 179 180 181 182
  void WaitAndMergeCPUTensors() const {
    // Wait fetch stream done.
    for (auto &ctx : dev_ctx_) {
      ctx.second->Wait();
    }

    std::vector<const LoDTensor *> tensors_ptr;
    tensors_ptr.reserve(tensors_.size());
    for (auto &t : tensors_) {
      tensors_ptr.emplace_back(&t);
    }
    data_->at(offset_).MergeLoDTensor(tensors_ptr, platform::CPUPlace());
  }

Y
Yu Yang 已提交
183 184
 protected:
  void RunImpl() override {
Y
Debug  
Yu Yang 已提交
185
    for (auto *input : inputs_) {
Y
Yu Yang 已提交
186 187
      auto *var = static_cast<VarHandle *>(input);
      var->generated_op_->Wait(this->dev_ctx_[var->place_]);
Y
Debug  
Yu Yang 已提交
188 189
    }

Y
Yu Yang 已提交
190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208
    tensors_.resize(inputs_.size());
    auto *var = static_cast<VarHandle *>(inputs_[0]);
    auto &var_name = var->name_;
    platform::CPUPlace cpu;
    auto &scopes = *local_scopes_;

    for (size_t i = 0; i < scopes.size(); ++i) {
      auto &scope = scopes[i];
      auto &t = scope->FindVar(var_name)->Get<framework::LoDTensor>();
      if (platform::is_gpu_place(var->place_)) {
        TensorCopy(t, cpu, *dev_ctx_[t.place()], &tensors_[i]);
      } else {
        tensors_[i].ShareDataWith(t);
        tensors_[i].set_lod(t.lod());
      }
    }
  }
};

Y
Yu Yang 已提交
209 210
class ParallelExecutorPrivate {
 public:
Y
Yu Yang 已提交
211
  explicit ParallelExecutorPrivate(size_t num_threads)
Y
Yu Yang 已提交
212
      : pool_(num_threads <= 1 ? nullptr : new ThreadPool(num_threads)) {}
Y
Yu Yang 已提交
213

Y
Stash  
Yu Yang 已提交
214 215
  std::vector<platform::Place> places_;

Y
Yu Yang 已提交
216
  std::vector<Scope *> local_scopes_;
Y
Yu Yang 已提交
217
  Scope *global_scope_;
Y
Yu Yang 已提交
218

Y
Yu Yang 已提交
219 220 221 222 223 224 225 226 227 228 229 230 231 232 233
#ifdef PADDLE_WITH_CUDA
  struct NCCLContext {
    std::unique_ptr<platform::CUDADeviceContext> ctx_;
    ncclComm_t comm;

    explicit NCCLContext(int dev_id) {
      ctx_.reset(new platform::CUDADeviceContext(platform::CUDAPlace(dev_id)));
    }

    cudaStream_t stream() const { return ctx_->stream(); }

    int device_id() const {
      return boost::get<platform::CUDAPlace>(ctx_->GetPlace()).device;
    }

Y
Update  
Yu Yang 已提交
234 235
    static void InitNCCLContext(std::unordered_map<int, NCCLContext> &contexts,
                                const std::vector<platform::Place> &places) {
Y
Yu Yang 已提交
236 237 238 239 240
      std::vector<ncclComm_t> comms;
      std::vector<int> devs;
      comms.resize(contexts.size());
      devs.reserve(contexts.size());

Y
Update  
Yu Yang 已提交
241 242
      for (auto &p : places) {
        devs.push_back(boost::get<platform::CUDAPlace>(p).device);
Y
Yu Yang 已提交
243 244
      }

Y
Yu Yang 已提交
245
      PADDLE_ENFORCE(platform::dynload::ncclCommInitAll(
Y
Yu Yang 已提交
246 247 248
          &comms[0], static_cast<int>(contexts.size()), &devs[0]));

      int i = 0;
Y
Update  
Yu Yang 已提交
249 250
      for (auto &dev_id : devs) {
        contexts.at(dev_id).comm = comms[i++];
Y
Yu Yang 已提交
251 252 253 254
      }
    }
  };

Y
Update  
Yu Yang 已提交
255
  std::unordered_map<int, NCCLContext> communication_streams_;
Y
Yu Yang 已提交
256 257 258 259 260 261 262 263

  NCCLContext &GetNCCLCtx(platform::Place p) {
    int dev_id = boost::get<platform::CUDAPlace>(p).device;
    return communication_streams_.at(dev_id);
  }

#endif

Y
Yu Yang 已提交
264 265 266 267 268 269 270 271 272 273 274 275 276
  platform::DeviceContext *CommunicationDevCtx(const platform::Place &place) {
    if (platform::is_cpu_place(place) || local_scopes_.size() == 1) {
      return const_cast<platform::DeviceContext *>(
          platform::DeviceContextPool::Instance().Get(place));
    } else {
#ifdef PADDLE_WITH_CUDA
      return GetNCCLCtx(place).ctx_.get();
#else
      PADDLE_THROW("Not compiled with CUDA")
#endif
    }
  }

Y
Yu Yang 已提交
277 278 279 280 281 282
  platform::Place main_place_;

  std::unordered_map<platform::Place,
                     std::unordered_map<std::string, std::map<int, VarHandle>>,
                     platform::PlaceHash>
      vars_;
Y
Yu Yang 已提交
283 284
  std::unordered_set<std::unique_ptr<VarHandleBase>> dep_vars_;

Y
Yu Yang 已提交
285
  std::vector<std::unique_ptr<OpHandle>> ops_;
Y
Yu Yang 已提交
286

Y
Yu Yang 已提交
287
  // Use a simpler thread pool, might be faster.
Y
Yu Yang 已提交
288
  std::unique_ptr<ThreadPool> pool_;
Y
Yu Yang 已提交
289 290

  std::unique_ptr<platform::EnforceNotMet> exception_;
Y
Yu Yang 已提交
291 292
};

Y
Yu Yang 已提交
293 294 295 296
struct NCCLAllReduceOpHandle : public OpHandle {
  ParallelExecutorPrivate *member_;

  explicit NCCLAllReduceOpHandle(ParallelExecutorPrivate *member)
Y
Yu Yang 已提交
297
      : member_(member) {}
Y
Yu Yang 已提交
298

Y
Yu Yang 已提交
299 300 301 302
  void Wait(platform::DeviceContext *waited_dev) override {
    OpHandle::Wait(waited_dev);
  }

Y
Yu Yang 已提交
303 304
 protected:
  void RunImpl() override {
Y
Yu Yang 已提交
305 306 307
    if (this->inputs_.size() == 1) {
      return;  // No need to all reduce when GPU count = 1;
    } else {
Y
Yu Yang 已提交
308 309 310 311 312 313
      // Wait input done
      for (auto *in : inputs_) {
        auto &p = static_cast<VarHandle *>(in)->place_;
        in->generated_op_->Wait(dev_ctx_[p]);
      }

Y
Yu Yang 已提交
314 315 316 317
      auto &var_name = static_cast<VarHandle *>(this->inputs_[0])->name_;
      int dtype = -1;
      size_t numel = 0;

Y
Yu Yang 已提交
318
      platform::NCCLGroupGuard guard;
Y
Update  
Yu Yang 已提交
319

Y
Yu Yang 已提交
320 321 322
      for (size_t i = 0; i < member_->local_scopes_.size(); ++i) {
        auto &p = member_->places_[i];
        auto *s = member_->local_scopes_[i];
Y
Yu Yang 已提交
323 324 325 326
        int dev_id = boost::get<platform::CUDAPlace>(p).device;

        auto &lod_tensor = s->FindVar(var_name)->Get<framework::LoDTensor>();
        void *buffer = const_cast<void *>(lod_tensor.data<void>());
Y
Debug  
Yu Yang 已提交
327 328 329 330 331
        uintptr_t buf = reinterpret_cast<uintptr_t>(buffer);
        if (buf % sizeof(float) != 0) {
          VLOG(3) << "Buffer is not aligned " << buf;
        }

Y
Yu Yang 已提交
332
        if (dtype == -1) {
Y
Yu Yang 已提交
333
          dtype = platform::ToNCCLDataType(lod_tensor.type());
Y
Yu Yang 已提交
334 335 336 337 338 339
        }

        if (numel == 0) {
          numel = static_cast<size_t>(lod_tensor.numel());
        }
        auto &nccl_ctx = member_->communication_streams_.at(dev_id);
Y
Yu Yang 已提交
340
        PADDLE_ENFORCE(platform::dynload::ncclAllReduce(
Y
Update  
Yu Yang 已提交
341
            buffer, buffer, numel, static_cast<ncclDataType_t>(dtype), ncclSum,
Y
Yu Yang 已提交
342
            nccl_ctx.comm, nccl_ctx.stream()));
Y
Yu Yang 已提交
343
      }
Y
Debug  
Yu Yang 已提交
344
    }
Y
Yu Yang 已提交
345
  }
Y
Yu Yang 已提交
346 347
};

Y
Yu Yang 已提交
348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373
struct ComputationOpHandle : public OpHandle {
  std::unique_ptr<OperatorBase> op_;
  Scope *scope_;
  platform::Place place_;

  explicit ComputationOpHandle(const OpDesc &op_desc, Scope *scope,
                               platform::Place place)
      : op_(framework::OpRegistry::CreateOp(op_desc)),
        scope_(scope),
        place_(place) {}

 protected:
  void RunImpl() override {
    auto *cur_ctx = dev_ctx_[place_];
    for (auto *in : inputs_) {
      bool need_wait =
          in->generated_op_ && in->generated_op_->dev_ctx_[place_] != cur_ctx;
      if (need_wait) {
        in->generated_op_->Wait(cur_ctx);
      }
    }

    op_->Run(*scope_, place_);
  }
};

Y
Yu Yang 已提交
374
ParallelExecutor::ParallelExecutor(
Y
Yu Yang 已提交
375
    size_t num_threads, const std::vector<platform::Place> &places,
Y
Yu Yang 已提交
376 377 378
    const std::unordered_set<std::string> &params,
    const ProgramDesc &startup_program, const ProgramDesc &main_program,
    const std::string &loss_var_name, Scope *scope)
Y
Yu Yang 已提交
379
    : member_(new ParallelExecutorPrivate(num_threads)) {
Y
Stash  
Yu Yang 已提交
380
  member_->places_ = places;
Y
Yu Yang 已提交
381
  member_->global_scope_ = scope;
Y
Yu Yang 已提交
382 383 384 385
  // Step 1. RunStartupProgram and Bcast the params to devs.
  Executor exe(places[0]);
  exe.Run(startup_program, scope, 0);
  // Create local scopes
Y
Yu Yang 已提交
386 387
  for (size_t i = 0; i < member_->places_.size(); ++i) {
    member_->local_scopes_.push_back(&scope->NewScope());
Y
Yu Yang 已提交
388 389 390 391
  }
  member_->main_place_ = places[0];

  // Bcast Parameters to all GPUs
Y
Yu Yang 已提交
392
  BuildNCCLCommunicator();
Y
Yu Yang 已提交
393 394 395
  if (platform::is_gpu_place(member_->main_place_) &&
      member_->local_scopes_.size() != 1) {  // Is CUDA
    BCastParamsToGPUs(startup_program);
Y
Yu Yang 已提交
396 397 398 399 400 401
  }
  // Startup Program has been run. All local scopes has correct parameters.

  // Step 2. Convert main_program to SSA form and dependency graph. Also, insert
  // ncclOp
  ConstructDependencyGraph(params, main_program, loss_var_name);
Y
Yu Yang 已提交
402 403

  // Step 3. Create vars in each scope;
Y
Yu Yang 已提交
404
  for (auto *scope : member_->local_scopes_) {
Y
Yu Yang 已提交
405 406 407 408 409 410 411 412
    for (auto *var : main_program.Block(0).AllVars()) {
      if (scope->FindVar(var->Name()) != nullptr) {
        continue;
      }

      InitializeVariable(scope->Var(var->Name()), var->GetType());
    }
  }
Y
Yu Yang 已提交
413 414 415 416 417
}

void ParallelExecutor::ConstructDependencyGraph(
    const std::unordered_set<std::string> &params,
    const ProgramDesc &main_program, const std::string &loss_var_name) const {
Y
Yu Yang 已提交
418
  std::unordered_set<std::string> grads;
Y
Yu Yang 已提交
419 420 421 422 423 424 425 426 427 428 429 430 431 432 433
  for (auto &each_param : params) {
    grads.insert(each_param + "@GRAD");
  }

  bool is_forwarding = true;
  for (auto *op : main_program.Block(0).AllOps()) {
    bool change_forward = false;
    if (!is_forwarding) {
      // FIXME(yy): Do not hard code like this
      if (op->OutputArgumentNames().size() == 1 &&
          op->OutputArgumentNames()[0] == loss_var_name + "@GRAD") {
        continue;  // Drop fill 1. for backward coeff;
      }
    }

Y
Yu Yang 已提交
434 435 436 437 438
    for (size_t i = 0; i < member_->places_.size(); ++i) {
      auto &p = member_->places_[i];
      auto *s = member_->local_scopes_[i];

      member_->ops_.emplace_back(new ComputationOpHandle(*op, s, p));
Y
Yu Yang 已提交
439
      auto *op_handle = member_->ops_.back().get();
Y
Yu Yang 已提交
440 441
      op_handle->dev_ctx_[p] = const_cast<platform::DeviceContext *>(
          platform::DeviceContextPool::Instance().Get(p));
Y
Yu Yang 已提交
442 443 444 445

      auto var_names = op->InputArgumentNames();

      for (auto &each_var_name : var_names) {
Y
Yu Yang 已提交
446
        VarHandle *var = GetVarHandle(each_var_name, p);
Y
Yu Yang 已提交
447
        op_handle->inputs_.emplace_back(var);
Y
Yu Yang 已提交
448
        var->pending_ops_.emplace(op_handle);
Y
Yu Yang 已提交
449 450 451 452
      }
      var_names = op->OutputArgumentNames();

      for (auto &each_var_name : var_names) {
Y
Yu Yang 已提交
453
        GenerateVar(op_handle, each_var_name, p);
Y
Yu Yang 已提交
454 455 456 457 458
      }

      if (is_forwarding) {
        if (var_names.size() == 1 && var_names[0] == loss_var_name) {
          // Insert ScaleCost OpHandle
Y
Yu Yang 已提交
459
          member_->ops_.emplace_back(new ScaleLossGradOpHandle(
Y
Yu Yang 已提交
460
              this->member_->local_scopes_.size(), s, p));
Y
Yu Yang 已提交
461
          op_handle = member_->ops_.back().get();
Y
Yu Yang 已提交
462

Y
Yu Yang 已提交
463
          op_handle->dev_ctx_[p] = member_->CommunicationDevCtx(p);
Y
Yu Yang 已提交
464

Y
Yu Yang 已提交
465 466 467 468 469 470
          // FIXME: Currently ScaleLossGradOp only use device_count as scale
          // factor. So it does not depend on any other operators.
          // VarHandle *loss = GetVarHandle(loss_var_name, place);
          // loss->pending_ops_.emplace_back(op_handle);
          // op_handle->inputs_.emplace_back(loss);

Y
Yu Yang 已提交
471
          GenerateVar(op_handle, loss_var_name + "@GRAD", p);
Y
Yu Yang 已提交
472 473 474 475 476 477 478 479 480 481 482 483 484 485
          change_forward = true;
        }
      }
    }

    if (change_forward) {
      is_forwarding = false;
    }

    if (!is_forwarding) {
      auto var_names = op->OutputArgumentNames();
      for (auto &og : var_names) {
        if (grads.count(og) != 0) {  // is param grad
          // Insert NCCL AllReduce Op
Y
Yu Yang 已提交
486
          member_->ops_.emplace_back(new NCCLAllReduceOpHandle(member_));
Y
Yu Yang 已提交
487 488
          auto *op_handle = member_->ops_.back().get();

Y
Yu Yang 已提交
489 490 491
          for (size_t i = 0; i < member_->places_.size(); ++i) {
            auto &p = member_->places_[i];
            auto &vars = member_->vars_[p][og];
Y
Yu Yang 已提交
492 493 494 495 496 497

            if (vars.empty()) {  // This device has no data. continue.
              continue;
            }
            auto *prev_grad = &vars[vars.size() - 1];
            op_handle->inputs_.emplace_back(prev_grad);
Y
Yu Yang 已提交
498
            prev_grad->pending_ops_.emplace(op_handle);
Y
Yu Yang 已提交
499
            auto &var = vars[vars.size()];
Y
Yu Yang 已提交
500
            var.place_ = p;
Y
Yu Yang 已提交
501 502 503 504
            var.generated_op_ = op_handle;
            var.name_ = og;
            var.version_ = vars.size() - 1;
            op_handle->outputs_.emplace_back(&var);
Y
Yu Yang 已提交
505
            op_handle->dev_ctx_[p] = member_->CommunicationDevCtx(p);
Y
Yu Yang 已提交
506 507 508 509 510
          }
        }
      }
    }
  }
Y
Yu Yang 已提交
511

Y
Yu Yang 已提交
512 513 514
  /*
    Dependency graph has been constructed. However, there are still data
    harzaeds need to be handled.
Y
Yu Yang 已提交
515
   */
516
  PolishGraphToSupportDataHazards();
Y
Yu Yang 已提交
517
}
Y
Yu Yang 已提交
518

Y
Yu Yang 已提交
519 520 521 522 523 524 525
/**
 * We only handle write after read(WAR), since it should not have a write
 * after write in program. If there are write after write operators, we need
 * prune them.
 *
 * https://en.wikipedia.org/wiki/Hazard_(computer_architecture)#Write_after_read_(WAR)
 */
526
void ParallelExecutor::PolishGraphToSupportDataHazards() const {
Y
Yu Yang 已提交
527 528 529 530 531 532 533 534 535 536 537
  for (auto &place_pair : member_->vars_) {
    for (auto &name_pair : place_pair.second) {
      if (name_pair.second.size() <= 1) {
        return;
      }
      auto it_new = name_pair.second.rbegin();
      auto it_old = name_pair.second.rbegin();
      ++it_old;
      for (; it_old != name_pair.second.rend(); it_new = it_old, ++it_old) {
        auto *write_op = it_new->second.generated_op_;
        auto &read_ops = it_old->second.pending_ops_;
Y
Yu Yang 已提交
538 539 540 541 542 543
        auto *ex_write_op = it_old->second.generated_op_;

        if (ex_write_op == nullptr) {  // Nobody write this var.
          continue;
        }

Y
Yu Yang 已提交
544 545
        for (auto *read_op : read_ops) {
          // Manually add a dependency var from read_op to write_op;
Y
Yu Yang 已提交
546 547 548 549
          if (read_op == write_op) {
            // Read Write is the same op.
            continue;
          }
Y
Yu Yang 已提交
550

Y
Yu Yang 已提交
551
          auto *dep_var = new DummyVarHandle();
Y
Yu Yang 已提交
552

Y
Yu Yang 已提交
553 554 555
          dep_var->generated_op_ = read_op;
          read_op->outputs_.emplace_back(dep_var);

Y
Yu Yang 已提交
556
          dep_var->pending_ops_.emplace(write_op);
Y
Yu Yang 已提交
557 558 559 560 561 562
          write_op->inputs_.emplace_back(dep_var);
          member_->dep_vars_.emplace(dep_var);
        }
      }
    }
  }
Y
Yu Yang 已提交
563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597
}

void ParallelExecutor::GenerateVar(OpHandle *op_handle,
                                   const std::string &each_var_name,
                                   const platform::Place &place) const {
  auto &vars = member_->vars_[place][each_var_name];
  size_t version = vars.size();
  auto &var = vars[version];
  var.version_ = version;
  var.generated_op_ = op_handle;
  var.name_ = each_var_name;
  var.place_ = place;
  op_handle->outputs_.emplace_back(&var);
}

VarHandle *ParallelExecutor::GetVarHandle(const std::string &each_var_name,
                                          const platform::Place &place) const {
  auto &var_holders = member_->vars_[place];
  auto &var_holder = var_holders[each_var_name];
  VarHandle *var = nullptr;
  if (var_holder.empty()) {
    auto &init_var = var_holder[0];
    init_var.place_ = place;
    init_var.name_ = each_var_name;
    init_var.generated_op_ = nullptr;
    init_var.version_ = 0;
    var = &init_var;
  } else {
    var = &var_holder.rbegin()->second;
  }
  return var;
}

void ParallelExecutor::BCastParamsToGPUs(
    const ProgramDesc &startup_program) const {
Y
Yu Yang 已提交
598
#ifdef PADDLE_WITH_CUDA
Y
Yu Yang 已提交
599
  auto *main_scope = member_->local_scopes_[0];
Y
Yu Yang 已提交
600

Y
Yu Yang 已提交
601 602 603 604
  for (auto *var_desc : startup_program.Block(0).AllVars()) {
    if (var_desc->GetType() == proto::VarType::LOD_TENSOR) {
      auto &main_tensor =
          main_scope->FindVar(var_desc->Name())->Get<LoDTensor>();
Y
Yu Yang 已提交
605
      ncclDataType_t data_type = platform::ToNCCLDataType(main_tensor.type());
Y
Yu Yang 已提交
606 607 608
      auto &dims = main_tensor.dims();
      size_t numel = main_tensor.numel();

Y
Stash  
Yu Yang 已提交
609
      platform::dynload::ncclGroupStart();
Y
Yu Yang 已提交
610

Y
Update  
Yu Yang 已提交
611 612 613 614 615 616
      for (size_t i = 0; i < member_->places_.size(); ++i) {
        auto place = member_->places_[i];
        void *buffer;
        if (i == 0) {
          buffer = const_cast<void *>(main_tensor.data<void>());
        } else {
Y
Yu Yang 已提交
617
          auto local_scope = member_->local_scopes_[i];
Y
Update  
Yu Yang 已提交
618 619 620 621 622
          auto *t = local_scope->Var(var_desc->Name())->GetMutable<LoDTensor>();
          t->Resize(dims);
          buffer = t->mutable_data(place, main_tensor.type());
        }

Y
Stash  
Yu Yang 已提交
623
        auto &nccl_ctx = member_->GetNCCLCtx(place);
Y
Update  
Yu Yang 已提交
624
        platform::dynload::ncclBcast(buffer, numel, data_type, 0, nccl_ctx.comm,
Y
Stash  
Yu Yang 已提交
625
                                     nccl_ctx.stream());
Y
Yu Yang 已提交
626
      }
Y
Stash  
Yu Yang 已提交
627 628
      platform::dynload::ncclGroupEnd();
    }
Y
Yu Yang 已提交
629 630 631 632

    for (auto &stream : member_->communication_streams_) {
      stream.second.ctx_->Wait();
    }
Y
Stash  
Yu Yang 已提交
633
  }
Y
Yu Yang 已提交
634 635 636 637
#else
  PADDLE_THROW("Not compiled with CUDA");
#endif
}
Y
Yu Yang 已提交
638

Y
Yu Yang 已提交
639 640
void ParallelExecutor::BuildNCCLCommunicator() const {
#ifdef PADDLE_WITH_CUDA
Y
Yu Yang 已提交
641
  for (auto &place : member_->places_) {
Y
Yu Yang 已提交
642
    int dev_id = boost::get<platform::CUDAPlace>(place).device;
Y
Yu Yang 已提交
643

Y
Yu Yang 已提交
644 645
    member_->communication_streams_.emplace(
        dev_id, ParallelExecutorPrivate::NCCLContext(dev_id));
Y
Yu Yang 已提交
646
  }
Y
Yu Yang 已提交
647 648

  ParallelExecutorPrivate::NCCLContext::InitNCCLContext(
Y
Update  
Yu Yang 已提交
649
      member_->communication_streams_, member_->places_);
Y
Yu Yang 已提交
650
#endif
Y
Yu Yang 已提交
651 652
}

Y
Yu Yang 已提交
653 654
void ParallelExecutor::Run(const std::vector<std::string> &fetch_tensors,
                           const std::string &fetched_var_name) {
Y
Yu Yang 已提交
655
  bool use_event = true;
Y
Debug  
Yu Yang 已提交
656
  FeedFetchList fetched_data(fetch_tensors.size());
Y
Yu Yang 已提交
657
  // Version --> VarHandle
Y
Yu Yang 已提交
658
  member_->exception_.reset();
Y
Yu Yang 已提交
659
  std::unordered_map<VarHandleBase *, std::atomic<bool>> pending_vars;
Y
Yu Yang 已提交
660
  std::unordered_map<OpHandle *, size_t> pending_ops;
Y
Yu Yang 已提交
661
  std::vector<DummyVarHandle> dummy_vars;
Y
Yu Yang 已提交
662 663 664 665

  for (auto &place_pair : member_->vars_) {
    for (auto &name_pair : place_pair.second) {
      for (auto &version_pair : name_pair.second) {
Y
Yu Yang 已提交
666 667
        pending_vars[&version_pair.second] =
            version_pair.second.generated_op_ == nullptr;
Y
Yu Yang 已提交
668 669 670 671
      }
    }
  }

Y
Yu Yang 已提交
672
  for (auto &var : member_->dep_vars_) {
Y
Yu Yang 已提交
673
    pending_vars[var.get()] = var->generated_op_ == nullptr;
Y
Yu Yang 已提交
674 675
  }

Y
Yu Yang 已提交
676 677
  std::vector<OpHandle *> to_run;

Y
Yu Yang 已提交
678
  for (auto &op : member_->ops_) {
Y
Yu Yang 已提交
679 680 681 682 683 684 685
    if (op->inputs_.empty()) {  // Special case, Op has no input.
      to_run.emplace_back(op.get());
    } else {
      pending_ops.insert({op.get(), op->inputs_.size()});
    }
  }

Y
Yu Yang 已提交
686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703
  std::unordered_map<std::string, std::vector<VarHandleBase *>> fetched_vars;

  for (auto &fetch_var_name : fetch_tensors) {
    for (auto &pair : member_->vars_) {
      auto it = pair.second.find(fetch_var_name);
      if (it != pair.second.end()) {
        fetched_vars[fetch_var_name].push_back(&it->second.rbegin()->second);
      }
    }
  }

  std::vector<FetchOpHandle> fetch_ops;

  for (size_t i = 0; i < fetch_tensors.size(); ++i) {
    auto &var_name = fetch_tensors[i];
    auto &vars = fetched_vars[var_name];
    fetch_ops.emplace_back();
    FetchOpHandle *op = &fetch_ops.back();
Y
Debug  
Yu Yang 已提交
704
    op->data_ = &fetched_data;
Y
Yu Yang 已提交
705 706 707
    op->offset_ = i;
    op->local_scopes_ = &member_->local_scopes_;
    for (auto &p : member_->places_) {
Y
Yu Yang 已提交
708
      op->dev_ctx_[p] = member_->GetNCCLCtx(p).ctx_.get();
Y
Yu Yang 已提交
709 710 711 712 713 714
    }

    for (auto *var : vars) {
      var->pending_ops_.emplace(op);
      op->inputs_.emplace_back(var);
    }
Y
Yu Yang 已提交
715 716 717 718 719 720 721

    dummy_vars.emplace_back();
    auto *var = &dummy_vars.back();
    op->outputs_.emplace_back(var);
    var->generated_op_ = op;
    pending_vars[var] = false;

Y
Yu Yang 已提交
722 723 724
    pending_ops.insert({op, op->inputs_.size()});
  }

Y
Yu Yang 已提交
725
  for (auto *op : to_run) {
Y
Yu Yang 已提交
726
    RunOp(use_event, pending_vars, op);
Y
Yu Yang 已提交
727 728
  }

Y
Yu Yang 已提交
729
  while (!pending_vars.empty()) {
Y
Yu Yang 已提交
730
    VarHandleBase *ready_var = nullptr;
Y
Yu Yang 已提交
731
    for (auto &pair : pending_vars) {
Y
Yu Yang 已提交
732
      if (pair.second.load(std::memory_order_acquire)) {
Y
Yu Yang 已提交
733
        ready_var = pair.first;
Y
Yu Yang 已提交
734 735
      }
    }
Y
Yu Yang 已提交
736
    if (ready_var == nullptr) {
Y
Yu Yang 已提交
737 738 739 740
      // FIXME use conditional var instead of busy wait.
      if (member_->exception_) {
        throw * member_->exception_;
      }
Y
Yu Yang 已提交
741
      continue;
Y
Yu Yang 已提交
742
    }
Y
Yu Yang 已提交
743
    pending_vars.erase(ready_var);
Y
Yu Yang 已提交
744
    to_run.clear();
Y
Yu Yang 已提交
745 746 747 748 749
    for (auto *op : ready_var->pending_ops_) {
      auto &deps = pending_ops[op];
      --deps;
      if (deps == 0) {
        to_run.emplace_back(op);
Y
Yu Yang 已提交
750 751 752 753
      }
    }
    for (auto *op : to_run) {
      pending_ops.erase(op);
Y
Yu Yang 已提交
754
      RunOp(use_event, pending_vars, op);
Y
Yu Yang 已提交
755 756
    }
  }
Y
Yu Yang 已提交
757

Y
Debug  
Yu Yang 已提交
758 759 760 761 762 763
  for (auto &fetch_op : fetch_ops) {
    fetch_op.WaitAndMergeCPUTensors();
  }

  *member_->global_scope_->Var(fetched_var_name)->GetMutable<FeedFetchList>() =
      fetched_data;
Y
Yu Yang 已提交
764
}
Y
Yu Yang 已提交
765

Y
Yu Yang 已提交
766
void ParallelExecutor::RunOp(
Y
Yu Yang 已提交
767
    bool use_event,
Y
Yu Yang 已提交
768
    std::unordered_map<VarHandleBase *, std::atomic<bool>> &pending_vars,
Y
Yu Yang 已提交
769
    OpHandle *op) const {
Y
Yu Yang 已提交
770 771
  std::vector<std::atomic<bool> *> *ready_buffer =
      new std::vector<std::atomic<bool> *>();
Y
Yu Yang 已提交
772
  for (auto *var : op->outputs_) {
Y
Debug  
Yu Yang 已提交
773
    ready_buffer->emplace_back(&pending_vars[var]);
Y
Yu Yang 已提交
774 775
  }

Y
Yu Yang 已提交
776
  auto op_run = [ready_buffer, op, this, use_event] {
Y
Yu Yang 已提交
777
    try {
Y
Add log  
Yu Yang 已提交
778
      VLOG(10) << op->DebugString();
Y
Yu Yang 已提交
779
      op->Run(use_event);
Y
Debug  
Yu Yang 已提交
780
      for (auto *ready : *ready_buffer) {
Y
Yu Yang 已提交
781
        ready->store(true, std::memory_order_release);
Y
Yu Yang 已提交
782
      }
Y
Debug  
Yu Yang 已提交
783
      delete ready_buffer;
Y
Yu Yang 已提交
784 785 786 787 788 789
    } catch (platform::EnforceNotMet ex) {
      member_->exception_.reset(new platform::EnforceNotMet(ex));
    } catch (...) {
      LOG(FATAL) << "Unknown exception catched";
    }
  };
Y
Yu Yang 已提交
790 791 792 793 794
  if (member_->pool_) {
    member_->pool_->enqueue(op_run);
  } else {
    op_run();
  }
Y
Yu Yang 已提交
795
}
Y
Yu Yang 已提交
796
}  // namespace framework
Y
Yang Yang 已提交
797
}  // namespace paddle