parallel_executor.cc 22.7 KB
Newer Older
Y
Yang Yang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

#include "paddle/fluid/framework/parallel_executor.h"
Y
Yu Yang 已提交
16 17
#include "ThreadPool.h"
#include "executor.h"
Y
Yu Yang 已提交
18
#include "lod_tensor.h"
Y
Yu Yang 已提交
19
#include "lod_tensor_array.h"
Y
Yu Yang 已提交
20
#include "op_registry.h"
Y
Yu Yang 已提交
21
#include "paddle/fluid/framework/details/var_handle.h"
Y
Debug  
Yu Yang 已提交
22
#include "paddle/fluid/framework/feed_fetch_type.h"
Y
Yu Yang 已提交
23
#include "paddle/fluid/operators/math/concat.h"
Y
Yu Yang 已提交
24
#include "paddle/fluid/platform/nccl_helper.h"
Y
Yang Yang 已提交
25 26

namespace paddle {
Y
Yu Yang 已提交
27 28
namespace framework {

Y
Yu Yang 已提交
29 30 31
using details::DummyVarHandle;
using details::VarHandle;
using details::VarHandleBase;
Y
Yu Yang 已提交
32

Y
Yu Yang 已提交
33
struct OpHandleBase {
Y
Yu Yang 已提交
34 35 36 37 38
  std::vector<VarHandleBase *> inputs_;
  std::vector<VarHandleBase *> outputs_;
  std::unordered_map<platform::Place, platform::DeviceContext *,
                     platform::PlaceHash>
      dev_ctx_;
Y
Yu Yang 已提交
39

Y
Yu Yang 已提交
40 41
  std::unordered_map<int, cudaEvent_t> events_;

Y
Yu Yang 已提交
42 43 44 45
  std::string DebugString() {
    std::stringstream ss;
    ss << "(";
    for (auto *var : inputs_) {
Y
Yu Yang 已提交
46
      ss << var->DebugString() << ", ";
Y
Yu Yang 已提交
47 48 49
    }
    ss << ") --> (";
    for (auto *var : outputs_) {
Y
Yu Yang 已提交
50
      ss << var->DebugString() << ", ";
Y
Yu Yang 已提交
51 52 53 54 55
    }
    ss << ")\n";
    return ss.str();
  }

Y
Yu Yang 已提交
56
  virtual ~OpHandleBase() {}
Y
Yu Yang 已提交
57

Y
Yu Yang 已提交
58 59
  void Run(bool use_event) {
    if (events_.empty() && use_event) {
Y
Yu Yang 已提交
60 61 62 63 64 65 66 67 68
      for (auto &p : dev_ctx_) {
        int dev_id = boost::get<platform::CUDAPlace>(p.first).device;
        cudaSetDevice(dev_id);
        cudaEventCreateWithFlags(&events_[dev_id], cudaEventDisableTiming);
      }
    }

    RunImpl();

Y
Yu Yang 已提交
69 70 71 72 73 74 75
    if (use_event) {
      for (auto &p : dev_ctx_) {
        int dev_id = boost::get<platform::CUDAPlace>(p.first).device;
        auto stream =
            static_cast<platform::CUDADeviceContext *>(p.second)->stream();
        cudaEventRecord(events_.at(dev_id), stream);
      }
Y
Yu Yang 已提交
76 77 78 79
    }
  }

  virtual void Wait(platform::DeviceContext *waited_dev) {
Y
Fix bug  
Yu Yang 已提交
80
    if (platform::is_cpu_place(waited_dev->GetPlace()) || events_.empty()) {
Y
Yu Yang 已提交
81 82 83 84 85 86 87 88 89 90 91 92 93 94
      for (auto &dev_ctx : dev_ctx_) {
        dev_ctx.second->Wait();
      }
    } else {
      auto stream =
          static_cast<platform::CUDADeviceContext *>(waited_dev)->stream();
      for (auto &ev : events_) {
        PADDLE_ENFORCE(cudaStreamWaitEvent(stream, ev.second, 0));
      }
    }
  }

 protected:
  virtual void RunImpl() = 0;
Y
Yu Yang 已提交
95 96
};

Y
Yu Yang 已提交
97
struct ScaleLossGradOpHandle : public OpHandleBase {
Y
Yu Yang 已提交
98 99 100 101 102 103 104 105
  float coeff_;
  Scope *scope_;
  platform::Place place_;

  explicit ScaleLossGradOpHandle(size_t num_dev, Scope *scope,
                                 platform::Place place)
      : coeff_(static_cast<float>(1.0 / num_dev)),
        scope_(scope),
Y
Yu Yang 已提交
106
        place_(place) {}
Y
Yu Yang 已提交
107

Y
Yu Yang 已提交
108
  ~ScaleLossGradOpHandle() {}
Y
Yu Yang 已提交
109

Y
Yu Yang 已提交
110 111
 protected:
  void RunImpl() override {
Y
Yu Yang 已提交
112
    std::string var_name = static_cast<VarHandle *>(this->outputs_[0])->name_;
Y
Yu Yang 已提交
113

Y
Yu Yang 已提交
114 115 116 117 118 119 120
    float *tmp = scope_->FindVar(var_name)
                     ->GetMutable<framework::LoDTensor>()
                     ->mutable_data<float>(make_ddim({1}), place_);

    if (platform::is_cpu_place(place_)) {
      *tmp = coeff_;
    } else {
Y
Yu Yang 已提交
121
      auto stream =
Y
Yu Yang 已提交
122
          static_cast<platform::CUDADeviceContext *>(this->dev_ctx_[place_])
Y
Yu Yang 已提交
123 124 125 126
              ->stream();
      memory::Copy(boost::get<platform::CUDAPlace>(place_), tmp,
                   platform::CPUPlace(), &coeff_, sizeof(float), stream);
    }
Y
Yu Yang 已提交
127
  }
Y
Yu Yang 已提交
128 129
};

Y
Yu Yang 已提交
130
struct FetchOpHandle : public OpHandleBase {
Y
Debug  
Yu Yang 已提交
131
  FeedFetchList *data_;
Y
Yu Yang 已提交
132 133 134 135 136 137 138 139 140 141
  size_t offset_;
  std::vector<Scope *> *local_scopes_;
  std::vector<LoDTensor> tensors_;

  ~FetchOpHandle() {
    for (auto *input_var : inputs_) {
      input_var->pending_ops_.erase(this);
    }
  }

Y
Yu Yang 已提交
142 143 144 145
  void Wait(platform::DeviceContext *waited_dev) override {
    PADDLE_THROW("Nobody should wait FetchOp. Unexpceted Error");
  }

Y
Debug  
Yu Yang 已提交
146 147 148 149 150 151 152 153 154 155 156 157 158 159
  void WaitAndMergeCPUTensors() const {
    // Wait fetch stream done.
    for (auto &ctx : dev_ctx_) {
      ctx.second->Wait();
    }

    std::vector<const LoDTensor *> tensors_ptr;
    tensors_ptr.reserve(tensors_.size());
    for (auto &t : tensors_) {
      tensors_ptr.emplace_back(&t);
    }
    data_->at(offset_).MergeLoDTensor(tensors_ptr, platform::CPUPlace());
  }

Y
Yu Yang 已提交
160 161
 protected:
  void RunImpl() override {
Y
Debug  
Yu Yang 已提交
162
    for (auto *input : inputs_) {
Y
Yu Yang 已提交
163 164
      auto *var = static_cast<VarHandle *>(input);
      var->generated_op_->Wait(this->dev_ctx_[var->place_]);
Y
Debug  
Yu Yang 已提交
165 166
    }

Y
Yu Yang 已提交
167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185
    tensors_.resize(inputs_.size());
    auto *var = static_cast<VarHandle *>(inputs_[0]);
    auto &var_name = var->name_;
    platform::CPUPlace cpu;
    auto &scopes = *local_scopes_;

    for (size_t i = 0; i < scopes.size(); ++i) {
      auto &scope = scopes[i];
      auto &t = scope->FindVar(var_name)->Get<framework::LoDTensor>();
      if (platform::is_gpu_place(var->place_)) {
        TensorCopy(t, cpu, *dev_ctx_[t.place()], &tensors_[i]);
      } else {
        tensors_[i].ShareDataWith(t);
        tensors_[i].set_lod(t.lod());
      }
    }
  }
};

Y
Yu Yang 已提交
186 187
class ParallelExecutorPrivate {
 public:
Y
Yu Yang 已提交
188
  explicit ParallelExecutorPrivate(size_t num_threads)
Y
Yu Yang 已提交
189
      : pool_(num_threads <= 1 ? nullptr : new ThreadPool(num_threads)) {}
Y
Yu Yang 已提交
190

Y
Stash  
Yu Yang 已提交
191 192
  std::vector<platform::Place> places_;

Y
Yu Yang 已提交
193
  std::vector<Scope *> local_scopes_;
Y
Yu Yang 已提交
194
  Scope *global_scope_;
Y
Yu Yang 已提交
195

Y
Yu Yang 已提交
196
  std::unordered_map<int, platform::NCCLContext> communication_streams_;
Y
Yu Yang 已提交
197

Y
Yu Yang 已提交
198
  platform::NCCLContext &GetNCCLCtx(platform::Place p) {
Y
Yu Yang 已提交
199 200 201 202
    int dev_id = boost::get<platform::CUDAPlace>(p).device;
    return communication_streams_.at(dev_id);
  }

Y
Yu Yang 已提交
203 204 205 206 207 208 209 210 211 212 213 214 215
  platform::DeviceContext *CommunicationDevCtx(const platform::Place &place) {
    if (platform::is_cpu_place(place) || local_scopes_.size() == 1) {
      return const_cast<platform::DeviceContext *>(
          platform::DeviceContextPool::Instance().Get(place));
    } else {
#ifdef PADDLE_WITH_CUDA
      return GetNCCLCtx(place).ctx_.get();
#else
      PADDLE_THROW("Not compiled with CUDA")
#endif
    }
  }

Y
Yu Yang 已提交
216 217 218 219 220 221
  platform::Place main_place_;

  std::unordered_map<platform::Place,
                     std::unordered_map<std::string, std::map<int, VarHandle>>,
                     platform::PlaceHash>
      vars_;
Y
Yu Yang 已提交
222 223
  std::unordered_set<std::unique_ptr<VarHandleBase>> dep_vars_;

Y
Yu Yang 已提交
224
  std::vector<std::unique_ptr<OpHandleBase>> ops_;
Y
Yu Yang 已提交
225

Y
Yu Yang 已提交
226
  // Use a simpler thread pool, might be faster.
Y
Yu Yang 已提交
227
  std::unique_ptr<ThreadPool> pool_;
Y
Yu Yang 已提交
228 229

  std::unique_ptr<platform::EnforceNotMet> exception_;
Y
Yu Yang 已提交
230

Y
Yu Yang 已提交
231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247
  VarHandle *GetVarHandle(const std::string &each_var_name,
                          const platform::Place &place) {
    auto &var_holders = vars_[place];
    auto &var_holder = var_holders[each_var_name];
    VarHandle *var = nullptr;
    if (var_holder.empty()) {
      auto &init_var = var_holder[0];
      init_var.place_ = place;
      init_var.name_ = each_var_name;
      init_var.generated_op_ = nullptr;
      init_var.version_ = 0;
      var = &init_var;
    } else {
      var = &var_holder.rbegin()->second;
    }
    return var;
  }
Y
Yu Yang 已提交
248

Y
Yu Yang 已提交
249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304
  void RunOp(
      bool use_event,
      std::unordered_map<VarHandleBase *, std::atomic<bool>> &pending_vars,
      OpHandleBase *op) {
    std::vector<std::atomic<bool> *> *ready_buffer =
        new std::vector<std::atomic<bool> *>();
    for (auto *var : op->outputs_) {
      ready_buffer->emplace_back(&pending_vars[var]);
    }

    auto op_run = [ready_buffer, op, this, use_event] {
      try {
        VLOG(10) << op->DebugString();
        op->Run(use_event);
        for (auto *ready : *ready_buffer) {
          ready->store(true, std::memory_order_release);
        }
        delete ready_buffer;
      } catch (platform::EnforceNotMet ex) {
        exception_.reset(new platform::EnforceNotMet(ex));
      } catch (...) {
        LOG(FATAL) << "Unknown exception catched";
      }
    };
    if (pool_) {
      pool_->enqueue(op_run);
    } else {
      op_run();
    }
  }

  void GenerateVar(OpHandleBase *op_handle, const std::string &each_var_name,
                   const platform::Place &place) {
    auto &vars = vars_[place][each_var_name];
    size_t version = vars.size();
    auto &var = vars[version];
    var.version_ = version;
    var.generated_op_ = op_handle;
    var.name_ = each_var_name;
    var.place_ = place;
    op_handle->outputs_.emplace_back(&var);
  }
};  // namespace framework

struct NCCLAllReduceOpHandle : public OpHandleBase {
  const std::vector<Scope *> &local_scopes_;
  const std::vector<platform::Place> &places_;
  const std::unordered_map<int, platform::NCCLContext> &communication_ctxs_;

  explicit NCCLAllReduceOpHandle(
      const std::vector<Scope *> &local_scopes,
      const std::vector<platform::Place> &places,
      const std::unordered_map<int, platform::NCCLContext> &ctxs)
      : local_scopes_(local_scopes),
        places_(places),
        communication_ctxs_(ctxs) {}
Y
Yu Yang 已提交
305

Y
Yu Yang 已提交
306
  void Wait(platform::DeviceContext *waited_dev) override {
Y
Yu Yang 已提交
307
    OpHandleBase::Wait(waited_dev);
Y
Yu Yang 已提交
308 309
  }

Y
Yu Yang 已提交
310 311
 protected:
  void RunImpl() override {
Y
Yu Yang 已提交
312
    if (inputs_.size() == 1) {
Y
Yu Yang 已提交
313 314
      return;  // No need to all reduce when GPU count = 1;
    } else {
Y
Yu Yang 已提交
315 316 317 318 319 320
      // Wait input done
      for (auto *in : inputs_) {
        auto &p = static_cast<VarHandle *>(in)->place_;
        in->generated_op_->Wait(dev_ctx_[p]);
      }

Y
Yu Yang 已提交
321 322 323 324
      auto &var_name = static_cast<VarHandle *>(this->inputs_[0])->name_;
      int dtype = -1;
      size_t numel = 0;

Y
Yu Yang 已提交
325
      platform::NCCLGroupGuard guard;
Y
Update  
Yu Yang 已提交
326

Y
Yu Yang 已提交
327 328 329
      for (size_t i = 0; i < local_scopes_.size(); ++i) {
        auto &p = places_[i];
        auto *s = local_scopes_[i];
Y
Yu Yang 已提交
330 331 332 333
        int dev_id = boost::get<platform::CUDAPlace>(p).device;

        auto &lod_tensor = s->FindVar(var_name)->Get<framework::LoDTensor>();
        void *buffer = const_cast<void *>(lod_tensor.data<void>());
Y
Debug  
Yu Yang 已提交
334 335 336 337 338
        uintptr_t buf = reinterpret_cast<uintptr_t>(buffer);
        if (buf % sizeof(float) != 0) {
          VLOG(3) << "Buffer is not aligned " << buf;
        }

Y
Yu Yang 已提交
339
        if (dtype == -1) {
Y
Yu Yang 已提交
340
          dtype = platform::ToNCCLDataType(lod_tensor.type());
Y
Yu Yang 已提交
341 342 343 344 345
        }

        if (numel == 0) {
          numel = static_cast<size_t>(lod_tensor.numel());
        }
Y
Yu Yang 已提交
346
        auto &nccl_ctx = communication_ctxs_.at(dev_id);
Y
Yu Yang 已提交
347
        PADDLE_ENFORCE(platform::dynload::ncclAllReduce(
Y
Update  
Yu Yang 已提交
348
            buffer, buffer, numel, static_cast<ncclDataType_t>(dtype), ncclSum,
Y
Yu Yang 已提交
349
            nccl_ctx.comm_, nccl_ctx.stream()));
Y
Yu Yang 已提交
350
      }
Y
Debug  
Yu Yang 已提交
351
    }
Y
Yu Yang 已提交
352
  }
Y
Yu Yang 已提交
353 354
};

Y
Yu Yang 已提交
355
struct ComputationOpHandle : public OpHandleBase {
Y
Yu Yang 已提交
356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380
  std::unique_ptr<OperatorBase> op_;
  Scope *scope_;
  platform::Place place_;

  explicit ComputationOpHandle(const OpDesc &op_desc, Scope *scope,
                               platform::Place place)
      : op_(framework::OpRegistry::CreateOp(op_desc)),
        scope_(scope),
        place_(place) {}

 protected:
  void RunImpl() override {
    auto *cur_ctx = dev_ctx_[place_];
    for (auto *in : inputs_) {
      bool need_wait =
          in->generated_op_ && in->generated_op_->dev_ctx_[place_] != cur_ctx;
      if (need_wait) {
        in->generated_op_->Wait(cur_ctx);
      }
    }

    op_->Run(*scope_, place_);
  }
};

Y
Yu Yang 已提交
381
ParallelExecutor::ParallelExecutor(
Y
Yu Yang 已提交
382
    size_t num_threads, const std::vector<platform::Place> &places,
Y
Yu Yang 已提交
383 384 385
    const std::unordered_set<std::string> &params,
    const ProgramDesc &startup_program, const ProgramDesc &main_program,
    const std::string &loss_var_name, Scope *scope)
Y
Yu Yang 已提交
386
    : member_(new ParallelExecutorPrivate(num_threads)) {
Y
Stash  
Yu Yang 已提交
387
  member_->places_ = places;
Y
Yu Yang 已提交
388
  member_->global_scope_ = scope;
Y
Yu Yang 已提交
389 390 391 392
  // Step 1. RunStartupProgram and Bcast the params to devs.
  Executor exe(places[0]);
  exe.Run(startup_program, scope, 0);
  // Create local scopes
Y
Yu Yang 已提交
393 394
  for (size_t i = 0; i < member_->places_.size(); ++i) {
    member_->local_scopes_.push_back(&scope->NewScope());
Y
Yu Yang 已提交
395 396 397 398
  }
  member_->main_place_ = places[0];

  // Bcast Parameters to all GPUs
Y
Yu Yang 已提交
399
  BuildNCCLCommunicator();
Y
Yu Yang 已提交
400 401 402
  if (platform::is_gpu_place(member_->main_place_) &&
      member_->local_scopes_.size() != 1) {  // Is CUDA
    BCastParamsToGPUs(startup_program);
Y
Yu Yang 已提交
403 404 405 406 407 408
  }
  // Startup Program has been run. All local scopes has correct parameters.

  // Step 2. Convert main_program to SSA form and dependency graph. Also, insert
  // ncclOp
  ConstructDependencyGraph(params, main_program, loss_var_name);
Y
Yu Yang 已提交
409 410

  // Step 3. Create vars in each scope;
Y
Yu Yang 已提交
411
  for (auto *scope : member_->local_scopes_) {
Y
Yu Yang 已提交
412 413 414 415 416 417 418 419
    for (auto *var : main_program.Block(0).AllVars()) {
      if (scope->FindVar(var->Name()) != nullptr) {
        continue;
      }

      InitializeVariable(scope->Var(var->Name()), var->GetType());
    }
  }
Y
Yu Yang 已提交
420 421 422 423 424
}

void ParallelExecutor::ConstructDependencyGraph(
    const std::unordered_set<std::string> &params,
    const ProgramDesc &main_program, const std::string &loss_var_name) const {
Y
Yu Yang 已提交
425
  std::unordered_set<std::string> grads;
Y
Yu Yang 已提交
426 427 428 429 430 431 432 433 434 435 436 437 438 439 440
  for (auto &each_param : params) {
    grads.insert(each_param + "@GRAD");
  }

  bool is_forwarding = true;
  for (auto *op : main_program.Block(0).AllOps()) {
    bool change_forward = false;
    if (!is_forwarding) {
      // FIXME(yy): Do not hard code like this
      if (op->OutputArgumentNames().size() == 1 &&
          op->OutputArgumentNames()[0] == loss_var_name + "@GRAD") {
        continue;  // Drop fill 1. for backward coeff;
      }
    }

Y
Yu Yang 已提交
441 442 443 444 445
    for (size_t i = 0; i < member_->places_.size(); ++i) {
      auto &p = member_->places_[i];
      auto *s = member_->local_scopes_[i];

      member_->ops_.emplace_back(new ComputationOpHandle(*op, s, p));
Y
Yu Yang 已提交
446
      auto *op_handle = member_->ops_.back().get();
Y
Yu Yang 已提交
447 448
      op_handle->dev_ctx_[p] = const_cast<platform::DeviceContext *>(
          platform::DeviceContextPool::Instance().Get(p));
Y
Yu Yang 已提交
449 450 451 452

      auto var_names = op->InputArgumentNames();

      for (auto &each_var_name : var_names) {
Y
Yu Yang 已提交
453
        VarHandle *var = member_->GetVarHandle(each_var_name, p);
Y
Yu Yang 已提交
454
        op_handle->inputs_.emplace_back(var);
Y
Yu Yang 已提交
455
        var->pending_ops_.emplace(op_handle);
Y
Yu Yang 已提交
456 457 458 459
      }
      var_names = op->OutputArgumentNames();

      for (auto &each_var_name : var_names) {
Y
Yu Yang 已提交
460
        member_->GenerateVar(op_handle, each_var_name, p);
Y
Yu Yang 已提交
461 462 463 464 465
      }

      if (is_forwarding) {
        if (var_names.size() == 1 && var_names[0] == loss_var_name) {
          // Insert ScaleCost OpHandle
Y
Yu Yang 已提交
466
          member_->ops_.emplace_back(new ScaleLossGradOpHandle(
Y
Yu Yang 已提交
467
              this->member_->local_scopes_.size(), s, p));
Y
Yu Yang 已提交
468
          op_handle = member_->ops_.back().get();
Y
Yu Yang 已提交
469

Y
Yu Yang 已提交
470
          op_handle->dev_ctx_[p] = member_->CommunicationDevCtx(p);
Y
Yu Yang 已提交
471

Y
Yu Yang 已提交
472 473 474 475 476 477
          // FIXME: Currently ScaleLossGradOp only use device_count as scale
          // factor. So it does not depend on any other operators.
          // VarHandle *loss = GetVarHandle(loss_var_name, place);
          // loss->pending_ops_.emplace_back(op_handle);
          // op_handle->inputs_.emplace_back(loss);

Y
Yu Yang 已提交
478
          member_->GenerateVar(op_handle, loss_var_name + "@GRAD", p);
Y
Yu Yang 已提交
479 480 481 482 483 484 485 486 487 488 489 490 491 492
          change_forward = true;
        }
      }
    }

    if (change_forward) {
      is_forwarding = false;
    }

    if (!is_forwarding) {
      auto var_names = op->OutputArgumentNames();
      for (auto &og : var_names) {
        if (grads.count(og) != 0) {  // is param grad
          // Insert NCCL AllReduce Op
Y
Yu Yang 已提交
493 494 495
          member_->ops_.emplace_back(new NCCLAllReduceOpHandle(
              member_->local_scopes_, member_->places_,
              member_->communication_streams_));
Y
Yu Yang 已提交
496 497
          auto *op_handle = member_->ops_.back().get();

Y
Yu Yang 已提交
498 499 500
          for (size_t i = 0; i < member_->places_.size(); ++i) {
            auto &p = member_->places_[i];
            auto &vars = member_->vars_[p][og];
Y
Yu Yang 已提交
501 502 503 504 505 506

            if (vars.empty()) {  // This device has no data. continue.
              continue;
            }
            auto *prev_grad = &vars[vars.size() - 1];
            op_handle->inputs_.emplace_back(prev_grad);
Y
Yu Yang 已提交
507
            prev_grad->pending_ops_.emplace(op_handle);
Y
Yu Yang 已提交
508
            auto &var = vars[vars.size()];
Y
Yu Yang 已提交
509
            var.place_ = p;
Y
Yu Yang 已提交
510 511 512 513
            var.generated_op_ = op_handle;
            var.name_ = og;
            var.version_ = vars.size() - 1;
            op_handle->outputs_.emplace_back(&var);
Y
Yu Yang 已提交
514
            op_handle->dev_ctx_[p] = member_->CommunicationDevCtx(p);
Y
Yu Yang 已提交
515 516 517 518 519
          }
        }
      }
    }
  }
Y
Yu Yang 已提交
520

Y
Yu Yang 已提交
521 522 523
  /*
    Dependency graph has been constructed. However, there are still data
    harzaeds need to be handled.
Y
Yu Yang 已提交
524
   */
525
  PolishGraphToSupportDataHazards();
Y
Yu Yang 已提交
526
}
Y
Yu Yang 已提交
527

Y
Yu Yang 已提交
528 529 530 531 532 533 534
/**
 * We only handle write after read(WAR), since it should not have a write
 * after write in program. If there are write after write operators, we need
 * prune them.
 *
 * https://en.wikipedia.org/wiki/Hazard_(computer_architecture)#Write_after_read_(WAR)
 */
535
void ParallelExecutor::PolishGraphToSupportDataHazards() const {
Y
Yu Yang 已提交
536 537 538 539 540 541 542 543 544 545 546
  for (auto &place_pair : member_->vars_) {
    for (auto &name_pair : place_pair.second) {
      if (name_pair.second.size() <= 1) {
        return;
      }
      auto it_new = name_pair.second.rbegin();
      auto it_old = name_pair.second.rbegin();
      ++it_old;
      for (; it_old != name_pair.second.rend(); it_new = it_old, ++it_old) {
        auto *write_op = it_new->second.generated_op_;
        auto &read_ops = it_old->second.pending_ops_;
Y
Yu Yang 已提交
547 548 549 550 551 552
        auto *ex_write_op = it_old->second.generated_op_;

        if (ex_write_op == nullptr) {  // Nobody write this var.
          continue;
        }

Y
Yu Yang 已提交
553 554
        for (auto *read_op : read_ops) {
          // Manually add a dependency var from read_op to write_op;
Y
Yu Yang 已提交
555 556 557 558
          if (read_op == write_op) {
            // Read Write is the same op.
            continue;
          }
Y
Yu Yang 已提交
559

Y
Yu Yang 已提交
560
          auto *dep_var = new DummyVarHandle();
Y
Yu Yang 已提交
561

Y
Yu Yang 已提交
562 563 564
          dep_var->generated_op_ = read_op;
          read_op->outputs_.emplace_back(dep_var);

Y
Yu Yang 已提交
565
          dep_var->pending_ops_.emplace(write_op);
Y
Yu Yang 已提交
566 567 568 569 570 571
          write_op->inputs_.emplace_back(dep_var);
          member_->dep_vars_.emplace(dep_var);
        }
      }
    }
  }
Y
Yu Yang 已提交
572 573 574 575
}

void ParallelExecutor::BCastParamsToGPUs(
    const ProgramDesc &startup_program) const {
Y
Yu Yang 已提交
576
#ifdef PADDLE_WITH_CUDA
Y
Yu Yang 已提交
577
  auto *main_scope = member_->local_scopes_[0];
Y
Yu Yang 已提交
578

Y
Yu Yang 已提交
579 580 581 582
  for (auto *var_desc : startup_program.Block(0).AllVars()) {
    if (var_desc->GetType() == proto::VarType::LOD_TENSOR) {
      auto &main_tensor =
          main_scope->FindVar(var_desc->Name())->Get<LoDTensor>();
Y
Yu Yang 已提交
583
      ncclDataType_t data_type = platform::ToNCCLDataType(main_tensor.type());
Y
Yu Yang 已提交
584 585 586
      auto &dims = main_tensor.dims();
      size_t numel = main_tensor.numel();

Y
Yu Yang 已提交
587
      platform::NCCLGroupGuard guard;
Y
Yu Yang 已提交
588

Y
Update  
Yu Yang 已提交
589 590 591 592 593 594
      for (size_t i = 0; i < member_->places_.size(); ++i) {
        auto place = member_->places_[i];
        void *buffer;
        if (i == 0) {
          buffer = const_cast<void *>(main_tensor.data<void>());
        } else {
Y
Yu Yang 已提交
595
          auto local_scope = member_->local_scopes_[i];
Y
Update  
Yu Yang 已提交
596 597 598 599 600
          auto *t = local_scope->Var(var_desc->Name())->GetMutable<LoDTensor>();
          t->Resize(dims);
          buffer = t->mutable_data(place, main_tensor.type());
        }

Y
Stash  
Yu Yang 已提交
601
        auto &nccl_ctx = member_->GetNCCLCtx(place);
Y
Yu Yang 已提交
602 603
        platform::dynload::ncclBcast(buffer, numel, data_type, 0,
                                     nccl_ctx.comm_, nccl_ctx.stream());
Y
Yu Yang 已提交
604
      }
Y
Stash  
Yu Yang 已提交
605
    }
Y
Yu Yang 已提交
606 607 608 609

    for (auto &stream : member_->communication_streams_) {
      stream.second.ctx_->Wait();
    }
Y
Stash  
Yu Yang 已提交
610
  }
Y
Yu Yang 已提交
611 612 613 614
#else
  PADDLE_THROW("Not compiled with CUDA");
#endif
}
Y
Yu Yang 已提交
615

Y
Yu Yang 已提交
616 617
void ParallelExecutor::BuildNCCLCommunicator() const {
#ifdef PADDLE_WITH_CUDA
Y
Yu Yang 已提交
618
  for (auto &place : member_->places_) {
Y
Yu Yang 已提交
619
    int dev_id = boost::get<platform::CUDAPlace>(place).device;
Y
Yu Yang 已提交
620

Y
Yu Yang 已提交
621 622
    member_->communication_streams_.emplace(dev_id,
                                            platform::NCCLContext(dev_id));
Y
Yu Yang 已提交
623
  }
Y
Yu Yang 已提交
624

Y
Yu Yang 已提交
625 626
  platform::NCCLContext::InitNCCLContext(member_->communication_streams_,
                                         member_->places_);
Y
Yu Yang 已提交
627
#endif
Y
Yu Yang 已提交
628 629
}

Y
Yu Yang 已提交
630 631
void ParallelExecutor::Run(const std::vector<std::string> &fetch_tensors,
                           const std::string &fetched_var_name) {
Y
Yu Yang 已提交
632
  bool use_event = true;
Y
Debug  
Yu Yang 已提交
633
  FeedFetchList fetched_data(fetch_tensors.size());
Y
Yu Yang 已提交
634
  // Version --> VarHandle
Y
Yu Yang 已提交
635
  member_->exception_.reset();
Y
Yu Yang 已提交
636
  std::unordered_map<VarHandleBase *, std::atomic<bool>> pending_vars;
Y
Yu Yang 已提交
637
  std::unordered_map<OpHandleBase *, size_t> pending_ops;
Y
Yu Yang 已提交
638
  std::vector<DummyVarHandle> dummy_vars;
Y
Yu Yang 已提交
639 640 641 642

  for (auto &place_pair : member_->vars_) {
    for (auto &name_pair : place_pair.second) {
      for (auto &version_pair : name_pair.second) {
Y
Yu Yang 已提交
643 644
        pending_vars[&version_pair.second] =
            version_pair.second.generated_op_ == nullptr;
Y
Yu Yang 已提交
645 646 647 648
      }
    }
  }

Y
Yu Yang 已提交
649
  for (auto &var : member_->dep_vars_) {
Y
Yu Yang 已提交
650
    pending_vars[var.get()] = var->generated_op_ == nullptr;
Y
Yu Yang 已提交
651 652
  }

Y
Yu Yang 已提交
653
  std::vector<OpHandleBase *> to_run;
Y
Yu Yang 已提交
654

Y
Yu Yang 已提交
655
  for (auto &op : member_->ops_) {
Y
Yu Yang 已提交
656 657 658 659 660 661 662
    if (op->inputs_.empty()) {  // Special case, Op has no input.
      to_run.emplace_back(op.get());
    } else {
      pending_ops.insert({op.get(), op->inputs_.size()});
    }
  }

Y
Yu Yang 已提交
663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680
  std::unordered_map<std::string, std::vector<VarHandleBase *>> fetched_vars;

  for (auto &fetch_var_name : fetch_tensors) {
    for (auto &pair : member_->vars_) {
      auto it = pair.second.find(fetch_var_name);
      if (it != pair.second.end()) {
        fetched_vars[fetch_var_name].push_back(&it->second.rbegin()->second);
      }
    }
  }

  std::vector<FetchOpHandle> fetch_ops;

  for (size_t i = 0; i < fetch_tensors.size(); ++i) {
    auto &var_name = fetch_tensors[i];
    auto &vars = fetched_vars[var_name];
    fetch_ops.emplace_back();
    FetchOpHandle *op = &fetch_ops.back();
Y
Debug  
Yu Yang 已提交
681
    op->data_ = &fetched_data;
Y
Yu Yang 已提交
682 683 684
    op->offset_ = i;
    op->local_scopes_ = &member_->local_scopes_;
    for (auto &p : member_->places_) {
Y
Yu Yang 已提交
685
      op->dev_ctx_[p] = member_->GetNCCLCtx(p).ctx_.get();
Y
Yu Yang 已提交
686 687 688 689 690 691
    }

    for (auto *var : vars) {
      var->pending_ops_.emplace(op);
      op->inputs_.emplace_back(var);
    }
Y
Yu Yang 已提交
692 693 694 695 696 697 698

    dummy_vars.emplace_back();
    auto *var = &dummy_vars.back();
    op->outputs_.emplace_back(var);
    var->generated_op_ = op;
    pending_vars[var] = false;

Y
Yu Yang 已提交
699 700 701
    pending_ops.insert({op, op->inputs_.size()});
  }

Y
Yu Yang 已提交
702
  for (auto *op : to_run) {
Y
Yu Yang 已提交
703
    member_->RunOp(use_event, pending_vars, op);
Y
Yu Yang 已提交
704 705
  }

Y
Yu Yang 已提交
706
  while (!pending_vars.empty()) {
Y
Yu Yang 已提交
707
    VarHandleBase *ready_var = nullptr;
Y
Yu Yang 已提交
708
    for (auto &pair : pending_vars) {
Y
Yu Yang 已提交
709
      if (pair.second.load(std::memory_order_acquire)) {
Y
Yu Yang 已提交
710
        ready_var = pair.first;
Y
Yu Yang 已提交
711 712
      }
    }
Y
Yu Yang 已提交
713
    if (ready_var == nullptr) {
Y
Yu Yang 已提交
714 715 716 717
      // FIXME use conditional var instead of busy wait.
      if (member_->exception_) {
        throw * member_->exception_;
      }
Y
Yu Yang 已提交
718
      continue;
Y
Yu Yang 已提交
719
    }
Y
Yu Yang 已提交
720
    pending_vars.erase(ready_var);
Y
Yu Yang 已提交
721
    to_run.clear();
Y
Yu Yang 已提交
722 723 724 725 726
    for (auto *op : ready_var->pending_ops_) {
      auto &deps = pending_ops[op];
      --deps;
      if (deps == 0) {
        to_run.emplace_back(op);
Y
Yu Yang 已提交
727 728 729 730
      }
    }
    for (auto *op : to_run) {
      pending_ops.erase(op);
Y
Yu Yang 已提交
731
      member_->RunOp(use_event, pending_vars, op);
Y
Yu Yang 已提交
732 733
    }
  }
Y
Yu Yang 已提交
734

Y
Debug  
Yu Yang 已提交
735 736 737 738 739 740
  for (auto &fetch_op : fetch_ops) {
    fetch_op.WaitAndMergeCPUTensors();
  }

  *member_->global_scope_->Var(fetched_var_name)->GetMutable<FeedFetchList>() =
      fetched_data;
Y
Yu Yang 已提交
741
}
Y
Yu Yang 已提交
742

Y
Yu Yang 已提交
743
}  // namespace framework
Y
Yang Yang 已提交
744
}  // namespace paddle