parallel_executor.cc 19.6 KB
Newer Older
Y
Yang Yang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

#include "paddle/fluid/framework/parallel_executor.h"
Y
Yu Yang 已提交
16
#include "ThreadPool.h"
Y
Yu Yang 已提交
17
#include "lod_tensor.h"
Y
Yu Yang 已提交
18
#include "lod_tensor_array.h"
Y
Yu Yang 已提交
19
#include "op_registry.h"
Y
Yu Yang 已提交
20
#include "paddle/fluid/framework/details/op_handle_base.h"
Y
Yu Yang 已提交
21
#include "paddle/fluid/framework/details/scale_loss_grad_op_handle.h"
Y
Yu Yang 已提交
22
#include "paddle/fluid/framework/details/var_handle.h"
Y
Debug  
Yu Yang 已提交
23
#include "paddle/fluid/framework/feed_fetch_type.h"
Y
Yu Yang 已提交
24
#include "paddle/fluid/platform/nccl_helper.h"
Y
Yang Yang 已提交
25 26

namespace paddle {
Y
Yu Yang 已提交
27 28
namespace framework {

Y
Yu Yang 已提交
29
using details::DummyVarHandle;
Y
Yu Yang 已提交
30
using details::OpHandleBase;
Y
Yu Yang 已提交
31
using details::ScaleLossGradOpHandle;
Y
Yu Yang 已提交
32 33
using details::VarHandle;
using details::VarHandleBase;
Y
Yu Yang 已提交
34

Y
Yu Yang 已提交
35
struct FetchOpHandle : public OpHandleBase {
Y
Debug  
Yu Yang 已提交
36
  FeedFetchList *data_;
Y
Yu Yang 已提交
37 38 39 40 41 42 43 44 45 46
  size_t offset_;
  std::vector<Scope *> *local_scopes_;
  std::vector<LoDTensor> tensors_;

  ~FetchOpHandle() {
    for (auto *input_var : inputs_) {
      input_var->pending_ops_.erase(this);
    }
  }

Y
Yu Yang 已提交
47 48 49 50
  void Wait(platform::DeviceContext *waited_dev) override {
    PADDLE_THROW("Nobody should wait FetchOp. Unexpceted Error");
  }

Y
Debug  
Yu Yang 已提交
51 52 53 54 55 56 57 58 59 60 61 62 63 64
  void WaitAndMergeCPUTensors() const {
    // Wait fetch stream done.
    for (auto &ctx : dev_ctx_) {
      ctx.second->Wait();
    }

    std::vector<const LoDTensor *> tensors_ptr;
    tensors_ptr.reserve(tensors_.size());
    for (auto &t : tensors_) {
      tensors_ptr.emplace_back(&t);
    }
    data_->at(offset_).MergeLoDTensor(tensors_ptr, platform::CPUPlace());
  }

Y
Yu Yang 已提交
65 66
 protected:
  void RunImpl() override {
Y
Debug  
Yu Yang 已提交
67
    for (auto *input : inputs_) {
Y
Yu Yang 已提交
68 69
      auto *var = static_cast<VarHandle *>(input);
      var->generated_op_->Wait(this->dev_ctx_[var->place_]);
Y
Debug  
Yu Yang 已提交
70 71
    }

Y
Yu Yang 已提交
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
    tensors_.resize(inputs_.size());
    auto *var = static_cast<VarHandle *>(inputs_[0]);
    auto &var_name = var->name_;
    platform::CPUPlace cpu;
    auto &scopes = *local_scopes_;

    for (size_t i = 0; i < scopes.size(); ++i) {
      auto &scope = scopes[i];
      auto &t = scope->FindVar(var_name)->Get<framework::LoDTensor>();
      if (platform::is_gpu_place(var->place_)) {
        TensorCopy(t, cpu, *dev_ctx_[t.place()], &tensors_[i]);
      } else {
        tensors_[i].ShareDataWith(t);
        tensors_[i].set_lod(t.lod());
      }
    }
  }
};

Y
Yu Yang 已提交
91 92
class ParallelExecutorPrivate {
 public:
Y
Yu Yang 已提交
93
  explicit ParallelExecutorPrivate(size_t num_threads)
Y
Yu Yang 已提交
94
      : pool_(num_threads <= 1 ? nullptr : new ThreadPool(num_threads)) {}
Y
Yu Yang 已提交
95

Y
Stash  
Yu Yang 已提交
96 97
  std::vector<platform::Place> places_;

Y
Yu Yang 已提交
98
  std::vector<Scope *> local_scopes_;
Y
Yu Yang 已提交
99
  Scope *global_scope_;
Y
Yu Yang 已提交
100

Y
Yu Yang 已提交
101
  std::unique_ptr<platform::NCCLContextMap> nccl_ctxs_;
Y
Yu Yang 已提交
102

Y
Yu Yang 已提交
103 104 105 106 107 108
  platform::DeviceContext *CommunicationDevCtx(const platform::Place &place) {
    if (platform::is_cpu_place(place) || local_scopes_.size() == 1) {
      return const_cast<platform::DeviceContext *>(
          platform::DeviceContextPool::Instance().Get(place));
    } else {
#ifdef PADDLE_WITH_CUDA
Y
Yu Yang 已提交
109
      return nccl_ctxs_->DevCtx(place);
Y
Yu Yang 已提交
110 111 112 113 114 115
#else
      PADDLE_THROW("Not compiled with CUDA")
#endif
    }
  }

Y
Yu Yang 已提交
116 117 118 119 120 121
  platform::Place main_place_;

  std::unordered_map<platform::Place,
                     std::unordered_map<std::string, std::map<int, VarHandle>>,
                     platform::PlaceHash>
      vars_;
Y
Yu Yang 已提交
122 123
  std::unordered_set<std::unique_ptr<VarHandleBase>> dep_vars_;

Y
Yu Yang 已提交
124
  std::vector<std::unique_ptr<OpHandleBase>> ops_;
Y
Yu Yang 已提交
125

Y
Yu Yang 已提交
126
  // Use a simpler thread pool, might be faster.
Y
Yu Yang 已提交
127
  std::unique_ptr<ThreadPool> pool_;
Y
Yu Yang 已提交
128 129

  std::unique_ptr<platform::EnforceNotMet> exception_;
Y
Yu Yang 已提交
130

Y
Yu Yang 已提交
131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
  VarHandle *GetVarHandle(const std::string &each_var_name,
                          const platform::Place &place) {
    auto &var_holders = vars_[place];
    auto &var_holder = var_holders[each_var_name];
    VarHandle *var = nullptr;
    if (var_holder.empty()) {
      auto &init_var = var_holder[0];
      init_var.place_ = place;
      init_var.name_ = each_var_name;
      init_var.generated_op_ = nullptr;
      init_var.version_ = 0;
      var = &init_var;
    } else {
      var = &var_holder.rbegin()->second;
    }
    return var;
  }
Y
Yu Yang 已提交
148

Y
Yu Yang 已提交
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195
  void RunOp(
      bool use_event,
      std::unordered_map<VarHandleBase *, std::atomic<bool>> &pending_vars,
      OpHandleBase *op) {
    std::vector<std::atomic<bool> *> *ready_buffer =
        new std::vector<std::atomic<bool> *>();
    for (auto *var : op->outputs_) {
      ready_buffer->emplace_back(&pending_vars[var]);
    }

    auto op_run = [ready_buffer, op, this, use_event] {
      try {
        VLOG(10) << op->DebugString();
        op->Run(use_event);
        for (auto *ready : *ready_buffer) {
          ready->store(true, std::memory_order_release);
        }
        delete ready_buffer;
      } catch (platform::EnforceNotMet ex) {
        exception_.reset(new platform::EnforceNotMet(ex));
      } catch (...) {
        LOG(FATAL) << "Unknown exception catched";
      }
    };
    if (pool_) {
      pool_->enqueue(op_run);
    } else {
      op_run();
    }
  }

  void GenerateVar(OpHandleBase *op_handle, const std::string &each_var_name,
                   const platform::Place &place) {
    auto &vars = vars_[place][each_var_name];
    size_t version = vars.size();
    auto &var = vars[version];
    var.version_ = version;
    var.generated_op_ = op_handle;
    var.name_ = each_var_name;
    var.place_ = place;
    op_handle->outputs_.emplace_back(&var);
  }
};  // namespace framework

struct NCCLAllReduceOpHandle : public OpHandleBase {
  const std::vector<Scope *> &local_scopes_;
  const std::vector<platform::Place> &places_;
Y
Yu Yang 已提交
196
  const platform::NCCLContextMap &nccl_ctxs_;
Y
Yu Yang 已提交
197

Y
Yu Yang 已提交
198 199 200 201
  explicit NCCLAllReduceOpHandle(const std::vector<Scope *> &local_scopes,
                                 const std::vector<platform::Place> &places,
                                 const platform::NCCLContextMap &ctxs)
      : local_scopes_(local_scopes), places_(places), nccl_ctxs_(ctxs) {}
Y
Yu Yang 已提交
202

Y
Yu Yang 已提交
203
  void Wait(platform::DeviceContext *waited_dev) override {
Y
Yu Yang 已提交
204
    OpHandleBase::Wait(waited_dev);
Y
Yu Yang 已提交
205 206
  }

Y
Yu Yang 已提交
207 208
 protected:
  void RunImpl() override {
Y
Yu Yang 已提交
209
    if (inputs_.size() == 1) {
Y
Yu Yang 已提交
210 211
      return;  // No need to all reduce when GPU count = 1;
    } else {
Y
Yu Yang 已提交
212 213 214 215 216 217
      // Wait input done
      for (auto *in : inputs_) {
        auto &p = static_cast<VarHandle *>(in)->place_;
        in->generated_op_->Wait(dev_ctx_[p]);
      }

Y
Yu Yang 已提交
218 219 220 221
      auto &var_name = static_cast<VarHandle *>(this->inputs_[0])->name_;
      int dtype = -1;
      size_t numel = 0;

Y
Yu Yang 已提交
222
      platform::NCCLGroupGuard guard;
Y
Update  
Yu Yang 已提交
223

Y
Yu Yang 已提交
224 225 226
      for (size_t i = 0; i < local_scopes_.size(); ++i) {
        auto &p = places_[i];
        auto *s = local_scopes_[i];
Y
Yu Yang 已提交
227 228 229 230
        int dev_id = boost::get<platform::CUDAPlace>(p).device;

        auto &lod_tensor = s->FindVar(var_name)->Get<framework::LoDTensor>();
        void *buffer = const_cast<void *>(lod_tensor.data<void>());
Y
Debug  
Yu Yang 已提交
231 232 233 234 235
        uintptr_t buf = reinterpret_cast<uintptr_t>(buffer);
        if (buf % sizeof(float) != 0) {
          VLOG(3) << "Buffer is not aligned " << buf;
        }

Y
Yu Yang 已提交
236
        if (dtype == -1) {
Y
Yu Yang 已提交
237
          dtype = platform::ToNCCLDataType(lod_tensor.type());
Y
Yu Yang 已提交
238 239 240 241 242
        }

        if (numel == 0) {
          numel = static_cast<size_t>(lod_tensor.numel());
        }
Y
Yu Yang 已提交
243
        auto &nccl_ctx = nccl_ctxs_.at(dev_id);
Y
Yu Yang 已提交
244
        PADDLE_ENFORCE(platform::dynload::ncclAllReduce(
Y
Update  
Yu Yang 已提交
245
            buffer, buffer, numel, static_cast<ncclDataType_t>(dtype), ncclSum,
Y
Yu Yang 已提交
246
            nccl_ctx.comm_, nccl_ctx.stream()));
Y
Yu Yang 已提交
247
      }
Y
Debug  
Yu Yang 已提交
248
    }
Y
Yu Yang 已提交
249
  }
Y
Yu Yang 已提交
250 251
};

Y
Yu Yang 已提交
252
struct ComputationOpHandle : public OpHandleBase {
Y
Yu Yang 已提交
253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277
  std::unique_ptr<OperatorBase> op_;
  Scope *scope_;
  platform::Place place_;

  explicit ComputationOpHandle(const OpDesc &op_desc, Scope *scope,
                               platform::Place place)
      : op_(framework::OpRegistry::CreateOp(op_desc)),
        scope_(scope),
        place_(place) {}

 protected:
  void RunImpl() override {
    auto *cur_ctx = dev_ctx_[place_];
    for (auto *in : inputs_) {
      bool need_wait =
          in->generated_op_ && in->generated_op_->dev_ctx_[place_] != cur_ctx;
      if (need_wait) {
        in->generated_op_->Wait(cur_ctx);
      }
    }

    op_->Run(*scope_, place_);
  }
};

Y
Yu Yang 已提交
278
ParallelExecutor::ParallelExecutor(
Y
Yu Yang 已提交
279
    size_t num_threads, const std::vector<platform::Place> &places,
Y
Yu Yang 已提交
280 281 282
    const std::unordered_set<std::string> &params,
    const ProgramDesc &startup_program, const ProgramDesc &main_program,
    const std::string &loss_var_name, Scope *scope)
Y
Yu Yang 已提交
283
    : member_(new ParallelExecutorPrivate(num_threads)) {
Y
Stash  
Yu Yang 已提交
284
  member_->places_ = places;
Y
Yu Yang 已提交
285
  member_->global_scope_ = scope;
Y
Yu Yang 已提交
286 287 288 289
  // Step 1. RunStartupProgram and Bcast the params to devs.
  Executor exe(places[0]);
  exe.Run(startup_program, scope, 0);
  // Create local scopes
Y
Yu Yang 已提交
290 291
  for (size_t i = 0; i < member_->places_.size(); ++i) {
    member_->local_scopes_.push_back(&scope->NewScope());
Y
Yu Yang 已提交
292 293 294 295
  }
  member_->main_place_ = places[0];

  // Bcast Parameters to all GPUs
Y
Yu Yang 已提交
296
  BuildNCCLCommunicator();
Y
Yu Yang 已提交
297 298 299
  if (platform::is_gpu_place(member_->main_place_) &&
      member_->local_scopes_.size() != 1) {  // Is CUDA
    BCastParamsToGPUs(startup_program);
Y
Yu Yang 已提交
300 301 302 303 304 305
  }
  // Startup Program has been run. All local scopes has correct parameters.

  // Step 2. Convert main_program to SSA form and dependency graph. Also, insert
  // ncclOp
  ConstructDependencyGraph(params, main_program, loss_var_name);
Y
Yu Yang 已提交
306 307

  // Step 3. Create vars in each scope;
Y
Yu Yang 已提交
308
  for (auto *scope : member_->local_scopes_) {
Y
Yu Yang 已提交
309 310 311 312 313 314 315 316
    for (auto *var : main_program.Block(0).AllVars()) {
      if (scope->FindVar(var->Name()) != nullptr) {
        continue;
      }

      InitializeVariable(scope->Var(var->Name()), var->GetType());
    }
  }
Y
Yu Yang 已提交
317 318 319 320 321
}

void ParallelExecutor::ConstructDependencyGraph(
    const std::unordered_set<std::string> &params,
    const ProgramDesc &main_program, const std::string &loss_var_name) const {
Y
Yu Yang 已提交
322
  std::unordered_set<std::string> grads;
Y
Yu Yang 已提交
323 324 325 326 327 328 329 330 331 332 333 334 335 336 337
  for (auto &each_param : params) {
    grads.insert(each_param + "@GRAD");
  }

  bool is_forwarding = true;
  for (auto *op : main_program.Block(0).AllOps()) {
    bool change_forward = false;
    if (!is_forwarding) {
      // FIXME(yy): Do not hard code like this
      if (op->OutputArgumentNames().size() == 1 &&
          op->OutputArgumentNames()[0] == loss_var_name + "@GRAD") {
        continue;  // Drop fill 1. for backward coeff;
      }
    }

Y
Yu Yang 已提交
338 339 340 341 342
    for (size_t i = 0; i < member_->places_.size(); ++i) {
      auto &p = member_->places_[i];
      auto *s = member_->local_scopes_[i];

      member_->ops_.emplace_back(new ComputationOpHandle(*op, s, p));
Y
Yu Yang 已提交
343
      auto *op_handle = member_->ops_.back().get();
Y
Yu Yang 已提交
344 345
      op_handle->dev_ctx_[p] = const_cast<platform::DeviceContext *>(
          platform::DeviceContextPool::Instance().Get(p));
Y
Yu Yang 已提交
346 347 348 349

      auto var_names = op->InputArgumentNames();

      for (auto &each_var_name : var_names) {
Y
Yu Yang 已提交
350
        VarHandle *var = member_->GetVarHandle(each_var_name, p);
Y
Yu Yang 已提交
351
        op_handle->inputs_.emplace_back(var);
Y
Yu Yang 已提交
352
        var->pending_ops_.emplace(op_handle);
Y
Yu Yang 已提交
353 354 355 356
      }
      var_names = op->OutputArgumentNames();

      for (auto &each_var_name : var_names) {
Y
Yu Yang 已提交
357
        member_->GenerateVar(op_handle, each_var_name, p);
Y
Yu Yang 已提交
358 359 360 361 362
      }

      if (is_forwarding) {
        if (var_names.size() == 1 && var_names[0] == loss_var_name) {
          // Insert ScaleCost OpHandle
Y
Yu Yang 已提交
363
          member_->ops_.emplace_back(new ScaleLossGradOpHandle(
Y
Yu Yang 已提交
364
              this->member_->local_scopes_.size(), s, p));
Y
Yu Yang 已提交
365
          op_handle = member_->ops_.back().get();
Y
Yu Yang 已提交
366

Y
Yu Yang 已提交
367
          op_handle->dev_ctx_[p] = member_->CommunicationDevCtx(p);
Y
Yu Yang 已提交
368

Y
Yu Yang 已提交
369 370 371 372 373 374
          // FIXME: Currently ScaleLossGradOp only use device_count as scale
          // factor. So it does not depend on any other operators.
          // VarHandle *loss = GetVarHandle(loss_var_name, place);
          // loss->pending_ops_.emplace_back(op_handle);
          // op_handle->inputs_.emplace_back(loss);

Y
Yu Yang 已提交
375
          member_->GenerateVar(op_handle, loss_var_name + "@GRAD", p);
Y
Yu Yang 已提交
376 377 378 379 380 381 382 383 384 385 386 387 388 389
          change_forward = true;
        }
      }
    }

    if (change_forward) {
      is_forwarding = false;
    }

    if (!is_forwarding) {
      auto var_names = op->OutputArgumentNames();
      for (auto &og : var_names) {
        if (grads.count(og) != 0) {  // is param grad
          // Insert NCCL AllReduce Op
Y
Yu Yang 已提交
390
          member_->ops_.emplace_back(new NCCLAllReduceOpHandle(
Y
Yu Yang 已提交
391
              member_->local_scopes_, member_->places_, *member_->nccl_ctxs_));
Y
Yu Yang 已提交
392 393
          auto *op_handle = member_->ops_.back().get();

Y
Yu Yang 已提交
394 395 396
          for (size_t i = 0; i < member_->places_.size(); ++i) {
            auto &p = member_->places_[i];
            auto &vars = member_->vars_[p][og];
Y
Yu Yang 已提交
397 398 399 400 401 402

            if (vars.empty()) {  // This device has no data. continue.
              continue;
            }
            auto *prev_grad = &vars[vars.size() - 1];
            op_handle->inputs_.emplace_back(prev_grad);
Y
Yu Yang 已提交
403
            prev_grad->pending_ops_.emplace(op_handle);
Y
Yu Yang 已提交
404
            auto &var = vars[vars.size()];
Y
Yu Yang 已提交
405
            var.place_ = p;
Y
Yu Yang 已提交
406 407 408 409
            var.generated_op_ = op_handle;
            var.name_ = og;
            var.version_ = vars.size() - 1;
            op_handle->outputs_.emplace_back(&var);
Y
Yu Yang 已提交
410
            op_handle->dev_ctx_[p] = member_->CommunicationDevCtx(p);
Y
Yu Yang 已提交
411 412 413 414 415
          }
        }
      }
    }
  }
Y
Yu Yang 已提交
416

Y
Yu Yang 已提交
417 418 419
  /*
    Dependency graph has been constructed. However, there are still data
    harzaeds need to be handled.
Y
Yu Yang 已提交
420
   */
421
  PolishGraphToSupportDataHazards();
Y
Yu Yang 已提交
422
}
Y
Yu Yang 已提交
423

Y
Yu Yang 已提交
424 425 426 427 428 429 430
/**
 * We only handle write after read(WAR), since it should not have a write
 * after write in program. If there are write after write operators, we need
 * prune them.
 *
 * https://en.wikipedia.org/wiki/Hazard_(computer_architecture)#Write_after_read_(WAR)
 */
431
void ParallelExecutor::PolishGraphToSupportDataHazards() const {
Y
Yu Yang 已提交
432 433 434 435 436 437 438 439 440 441 442
  for (auto &place_pair : member_->vars_) {
    for (auto &name_pair : place_pair.second) {
      if (name_pair.second.size() <= 1) {
        return;
      }
      auto it_new = name_pair.second.rbegin();
      auto it_old = name_pair.second.rbegin();
      ++it_old;
      for (; it_old != name_pair.second.rend(); it_new = it_old, ++it_old) {
        auto *write_op = it_new->second.generated_op_;
        auto &read_ops = it_old->second.pending_ops_;
Y
Yu Yang 已提交
443 444 445 446 447 448
        auto *ex_write_op = it_old->second.generated_op_;

        if (ex_write_op == nullptr) {  // Nobody write this var.
          continue;
        }

Y
Yu Yang 已提交
449 450
        for (auto *read_op : read_ops) {
          // Manually add a dependency var from read_op to write_op;
Y
Yu Yang 已提交
451 452 453 454
          if (read_op == write_op) {
            // Read Write is the same op.
            continue;
          }
Y
Yu Yang 已提交
455

Y
Yu Yang 已提交
456
          auto *dep_var = new DummyVarHandle();
Y
Yu Yang 已提交
457

Y
Yu Yang 已提交
458 459 460
          dep_var->generated_op_ = read_op;
          read_op->outputs_.emplace_back(dep_var);

Y
Yu Yang 已提交
461
          dep_var->pending_ops_.emplace(write_op);
Y
Yu Yang 已提交
462 463 464 465 466 467
          write_op->inputs_.emplace_back(dep_var);
          member_->dep_vars_.emplace(dep_var);
        }
      }
    }
  }
Y
Yu Yang 已提交
468 469 470 471
}

void ParallelExecutor::BCastParamsToGPUs(
    const ProgramDesc &startup_program) const {
Y
Yu Yang 已提交
472
#ifdef PADDLE_WITH_CUDA
Y
Yu Yang 已提交
473
  auto *main_scope = member_->local_scopes_[0];
Y
Yu Yang 已提交
474

Y
Yu Yang 已提交
475 476 477 478
  for (auto *var_desc : startup_program.Block(0).AllVars()) {
    if (var_desc->GetType() == proto::VarType::LOD_TENSOR) {
      auto &main_tensor =
          main_scope->FindVar(var_desc->Name())->Get<LoDTensor>();
Y
Yu Yang 已提交
479
      ncclDataType_t data_type = platform::ToNCCLDataType(main_tensor.type());
Y
Yu Yang 已提交
480 481 482
      auto &dims = main_tensor.dims();
      size_t numel = main_tensor.numel();

Y
Yu Yang 已提交
483
      platform::NCCLGroupGuard guard;
Y
Yu Yang 已提交
484

Y
Update  
Yu Yang 已提交
485 486 487 488 489 490
      for (size_t i = 0; i < member_->places_.size(); ++i) {
        auto place = member_->places_[i];
        void *buffer;
        if (i == 0) {
          buffer = const_cast<void *>(main_tensor.data<void>());
        } else {
Y
Yu Yang 已提交
491
          auto local_scope = member_->local_scopes_[i];
Y
Update  
Yu Yang 已提交
492 493 494 495 496
          auto *t = local_scope->Var(var_desc->Name())->GetMutable<LoDTensor>();
          t->Resize(dims);
          buffer = t->mutable_data(place, main_tensor.type());
        }

Y
Yu Yang 已提交
497
        auto &nccl_ctx = member_->nccl_ctxs_->at(place);
Y
Yu Yang 已提交
498 499
        platform::dynload::ncclBcast(buffer, numel, data_type, 0,
                                     nccl_ctx.comm_, nccl_ctx.stream());
Y
Yu Yang 已提交
500
      }
Y
Stash  
Yu Yang 已提交
501
    }
Y
Yu Yang 已提交
502
    member_->nccl_ctxs_->WaitAll();
Y
Stash  
Yu Yang 已提交
503
  }
Y
Yu Yang 已提交
504 505 506 507
#else
  PADDLE_THROW("Not compiled with CUDA");
#endif
}
Y
Yu Yang 已提交
508

Y
Yu Yang 已提交
509 510
void ParallelExecutor::BuildNCCLCommunicator() const {
#ifdef PADDLE_WITH_CUDA
Y
Yu Yang 已提交
511
  member_->nccl_ctxs_.reset(new platform::NCCLContextMap(member_->places_));
Y
Yu Yang 已提交
512
#endif
Y
Yu Yang 已提交
513 514
}

Y
Yu Yang 已提交
515 516
void ParallelExecutor::Run(const std::vector<std::string> &fetch_tensors,
                           const std::string &fetched_var_name) {
Y
Yu Yang 已提交
517
  bool use_event = true;
Y
Debug  
Yu Yang 已提交
518
  FeedFetchList fetched_data(fetch_tensors.size());
Y
Yu Yang 已提交
519
  // Version --> VarHandle
Y
Yu Yang 已提交
520
  member_->exception_.reset();
Y
Yu Yang 已提交
521
  std::unordered_map<VarHandleBase *, std::atomic<bool>> pending_vars;
Y
Yu Yang 已提交
522
  std::unordered_map<OpHandleBase *, size_t> pending_ops;
Y
Yu Yang 已提交
523
  std::vector<DummyVarHandle> dummy_vars;
Y
Yu Yang 已提交
524 525 526 527

  for (auto &place_pair : member_->vars_) {
    for (auto &name_pair : place_pair.second) {
      for (auto &version_pair : name_pair.second) {
Y
Yu Yang 已提交
528 529
        pending_vars[&version_pair.second] =
            version_pair.second.generated_op_ == nullptr;
Y
Yu Yang 已提交
530 531 532 533
      }
    }
  }

Y
Yu Yang 已提交
534
  for (auto &var : member_->dep_vars_) {
Y
Yu Yang 已提交
535
    pending_vars[var.get()] = var->generated_op_ == nullptr;
Y
Yu Yang 已提交
536 537
  }

Y
Yu Yang 已提交
538
  std::vector<OpHandleBase *> to_run;
Y
Yu Yang 已提交
539

Y
Yu Yang 已提交
540
  for (auto &op : member_->ops_) {
Y
Yu Yang 已提交
541 542 543 544 545 546 547
    if (op->inputs_.empty()) {  // Special case, Op has no input.
      to_run.emplace_back(op.get());
    } else {
      pending_ops.insert({op.get(), op->inputs_.size()});
    }
  }

Y
Yu Yang 已提交
548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565
  std::unordered_map<std::string, std::vector<VarHandleBase *>> fetched_vars;

  for (auto &fetch_var_name : fetch_tensors) {
    for (auto &pair : member_->vars_) {
      auto it = pair.second.find(fetch_var_name);
      if (it != pair.second.end()) {
        fetched_vars[fetch_var_name].push_back(&it->second.rbegin()->second);
      }
    }
  }

  std::vector<FetchOpHandle> fetch_ops;

  for (size_t i = 0; i < fetch_tensors.size(); ++i) {
    auto &var_name = fetch_tensors[i];
    auto &vars = fetched_vars[var_name];
    fetch_ops.emplace_back();
    FetchOpHandle *op = &fetch_ops.back();
Y
Debug  
Yu Yang 已提交
566
    op->data_ = &fetched_data;
Y
Yu Yang 已提交
567 568 569
    op->offset_ = i;
    op->local_scopes_ = &member_->local_scopes_;
    for (auto &p : member_->places_) {
Y
Yu Yang 已提交
570
      op->dev_ctx_[p] = member_->nccl_ctxs_->DevCtx(p);
Y
Yu Yang 已提交
571 572 573 574 575 576
    }

    for (auto *var : vars) {
      var->pending_ops_.emplace(op);
      op->inputs_.emplace_back(var);
    }
Y
Yu Yang 已提交
577 578 579 580 581 582 583

    dummy_vars.emplace_back();
    auto *var = &dummy_vars.back();
    op->outputs_.emplace_back(var);
    var->generated_op_ = op;
    pending_vars[var] = false;

Y
Yu Yang 已提交
584 585 586
    pending_ops.insert({op, op->inputs_.size()});
  }

Y
Yu Yang 已提交
587
  for (auto *op : to_run) {
Y
Yu Yang 已提交
588
    member_->RunOp(use_event, pending_vars, op);
Y
Yu Yang 已提交
589 590
  }

Y
Yu Yang 已提交
591
  while (!pending_vars.empty()) {
Y
Yu Yang 已提交
592
    VarHandleBase *ready_var = nullptr;
Y
Yu Yang 已提交
593
    for (auto &pair : pending_vars) {
Y
Yu Yang 已提交
594
      if (pair.second.load(std::memory_order_acquire)) {
Y
Yu Yang 已提交
595
        ready_var = pair.first;
Y
Yu Yang 已提交
596 597
      }
    }
Y
Yu Yang 已提交
598
    if (ready_var == nullptr) {
Y
Yu Yang 已提交
599 600 601 602
      // FIXME use conditional var instead of busy wait.
      if (member_->exception_) {
        throw * member_->exception_;
      }
Y
Yu Yang 已提交
603
      continue;
Y
Yu Yang 已提交
604
    }
Y
Yu Yang 已提交
605
    pending_vars.erase(ready_var);
Y
Yu Yang 已提交
606
    to_run.clear();
Y
Yu Yang 已提交
607 608 609 610 611
    for (auto *op : ready_var->pending_ops_) {
      auto &deps = pending_ops[op];
      --deps;
      if (deps == 0) {
        to_run.emplace_back(op);
Y
Yu Yang 已提交
612 613 614 615
      }
    }
    for (auto *op : to_run) {
      pending_ops.erase(op);
Y
Yu Yang 已提交
616
      member_->RunOp(use_event, pending_vars, op);
Y
Yu Yang 已提交
617 618
    }
  }
Y
Yu Yang 已提交
619

Y
Debug  
Yu Yang 已提交
620 621 622 623 624 625
  for (auto &fetch_op : fetch_ops) {
    fetch_op.WaitAndMergeCPUTensors();
  }

  *member_->global_scope_->Var(fetched_var_name)->GetMutable<FeedFetchList>() =
      fetched_data;
Y
Yu Yang 已提交
626
}
Y
Yu Yang 已提交
627

Y
Yu Yang 已提交
628
}  // namespace framework
Y
Yang Yang 已提交
629
}  // namespace paddle