executor.cc 16.8 KB
Newer Older
1
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.
Q
qijun 已提交
2 3 4 5 6 7 8 9 10 11 12 13 14

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

Y
Yi Wang 已提交
15
#include "paddle/fluid/framework/executor.h"
Y
Yang Yang 已提交
16

Y
Yi Wang 已提交
17 18 19 20 21
#include "paddle/fluid/framework/feed_fetch_method.h"
#include "paddle/fluid/framework/lod_rank_table.h"
#include "paddle/fluid/framework/lod_tensor_array.h"
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/framework/reader.h"
22
#include "paddle/fluid/framework/transfer_scope_cache.h"
W
Wang Guibao 已提交
23
#include "paddle/fluid/framework/variable_helper.h"
G
gongweibao 已提交
24
#include "paddle/fluid/operators/detail/macros.h"
Y
Yi Wang 已提交
25
#include "paddle/fluid/platform/place.h"
X
Xin Pan 已提交
26
#include "paddle/fluid/platform/profiler.h"
Y
Yang Yu 已提交
27

28 29 30 31
#ifdef PADDLE_WITH_NGRAPH
#include "paddle/fluid/framework/ngraph_operator.h"
#endif

D
dzhwinter 已提交
32
DECLARE_bool(benchmark);
33
DEFINE_bool(use_mkldnn, false, "Use MKLDNN to run");
B
baojun-nervana 已提交
34
DEFINE_bool(use_ngraph, false, "Use NGRAPH to run");
Q
qijun 已提交
35 36 37

namespace paddle {
namespace framework {
X
Xin Pan 已提交
38 39 40 41 42
namespace {
// block id starts from 0. This id is used to represent the codeblock
// wrapping the first block 0.
int kProgramId = -1;
}  // namespace
Q
qijun 已提交
43

Q
Qiao Longfei 已提交
44 45
ExecutorPrepareContext::ExecutorPrepareContext(
    const framework::ProgramDesc& prog, size_t block_id)
S
sneaxiy 已提交
46 47 48 49 50
    : prog_(prog), block_id_(block_id) {
  if (GetEagerDeletionThreshold() >= 0) {
    ref_cnts_ = GetNonPersistableReferenceCount<int>(prog_, block_id_);
  }
}
Y
Yu Yang 已提交
51

Q
Qiao Longfei 已提交
52
ExecutorPrepareContext::~ExecutorPrepareContext() {
M
minqiyang 已提交
53
  VLOG(5) << "destroy ExecutorPrepareContext";
Q
Qiao Longfei 已提交
54
}
Y
Yu Yang 已提交
55

S
sneaxiy 已提交
56 57 58 59 60 61 62 63 64 65 66 67 68 69
template <typename RefCntMap>
static void DeleteUnusedTensors(const Scope& scope, const OperatorBase* op,
                                GarbageCollector<Tensor>* gc,
                                RefCntMap* ref_cnts) {
  std::unordered_set<Tensor*> erase_tensors;

  auto handler = [&](const VariableNameMap& name_map) {
    for (auto& name_pair : name_map) {
      for (auto& name : name_pair.second) {
        auto it = ref_cnts->find(name);
        if (it == ref_cnts->end()) continue;
        if ((it->second)-- == 1) {
          auto* var = scope.FindVar(name);
          if (var != nullptr) {
M
minqiyang 已提交
70
            VLOG(10) << "Erase tensor \'" << name << "\'";
S
sneaxiy 已提交
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
            if (var->IsType<LoDTensor>()) {
              erase_tensors.insert(var->GetMutable<LoDTensor>());
            } else if (var->IsType<SelectedRows>()) {
              erase_tensors.insert(
                  var->GetMutable<SelectedRows>()->mutable_value());
            }
          }
        }
      }
    }
  };

  handler(op->Inputs());
  handler(op->Outputs());

  if (!erase_tensors.empty()) {
    gc->Add(erase_tensors);
  }
}

B
baojun-nervana 已提交
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
static void EnableFusedOp(ExecutorPrepareContext* ctx) {
#ifdef PADDLE_WITH_NGRAPH
  VLOG(3) << "use_ngraph=True";
  auto intervals = FusedOperator::FusedOpIntervals(&ctx->ops_);
  for (auto& interval : intervals) {
    auto* fused_op = new FusedOperator(ctx->prog_, ctx->block_id_,
                                       interval.at(0), interval.at(1));
    *interval[0] = std::unique_ptr<OperatorBase>(fused_op);
  }
  for (auto it = intervals.rbegin(); it != intervals.rend(); ++it) {
    ctx->ops_.erase(it->at(0) + 1, it->at(1));
  }
#else
  LOG(WARNING)
      << "'NGRAPH' is not supported, Please re-compile with WITH_NGRAPH option";
#endif
}

D
dzhwinter 已提交
109
Executor::Executor(const platform::Place& place) : place_(place) {}
Q
qijun 已提交
110

Y
Yancey1989 已提交
111
void Executor::Close() {
W
Wu Yi 已提交
112
#ifdef PADDLE_WITH_DISTRIBUTE
W
Wu Yi 已提交
113 114
  // TODO(typhoonzero): complete message will need to use real trainer_id,
  // except 0.
Y
Yancey1989 已提交
115
  ::paddle::operators::distributed::RPCClient::GetInstance<
W
Wu Yi 已提交
116
      ::paddle::operators::distributed::GRPCClient>(0)
Y
Yancey1989 已提交
117
      ->SendComplete();
W
Wu Yi 已提交
118
#endif
Y
Yancey1989 已提交
119
}
W
Wu Yi 已提交
120

L
Liu Yiqun 已提交
121 122 123
void Executor::CreateVariables(const ProgramDesc& pdesc, Scope* scope,
                               int block_id) {
  auto& global_block = pdesc.Block(block_id);
124 125 126 127 128 129 130 131 132 133 134 135 136 137

  const Scope* ancestor_scope = scope;
  while (ancestor_scope->parent()) {
    ancestor_scope = ancestor_scope->parent();
  }

  if (ancestor_scope != scope) {
    for (auto& var : global_block.AllVars()) {
      if (var->Name() == framework::kEmptyVarName) {
        continue;
      }

      if (var->Persistable()) {
        auto* ptr = const_cast<Scope*>(ancestor_scope)->Var(var->Name());
138
        InitializeVariable(ptr, var->GetType());
M
minqiyang 已提交
139 140
        VLOG(3) << "Create Variable " << var->Name()
                << " global, which pointer is " << ptr;
141 142
      } else {
        auto* ptr = scope->Var(var->Name());
143
        InitializeVariable(ptr, var->GetType());
M
minqiyang 已提交
144 145
        VLOG(3) << "Create Variable " << var->Name()
                << " locally, which pointer is " << ptr;
146 147 148 149 150
      }
    }
  } else {
    for (auto& var : global_block.AllVars()) {
      auto* ptr = scope->Var(var->Name());
151
      InitializeVariable(ptr, var->GetType());
M
minqiyang 已提交
152 153
      VLOG(3) << "Create variable " << var->Name() << ", which pointer is "
              << ptr;
154 155 156 157
    }
  }
}

Y
Yu Yang 已提交
158
void Executor::Run(const ProgramDesc& pdesc, Scope* scope, int block_id,
T
typhoonzero 已提交
159
                   bool create_local_scope, bool create_vars) {
X
Xin Pan 已提交
160
  platform::RecordBlock b(block_id);
161
  if (FLAGS_use_mkldnn) EnableMKLDNN(pdesc);
Q
Qiao Longfei 已提交
162 163
  auto ctx = Prepare(pdesc, block_id);
  RunPreparedContext(ctx.get(), scope, create_local_scope, create_vars);
Q
qijun 已提交
164 165
}

166 167 168 169 170 171 172
// Check whether the block already has feed operators and feed_holder.
// Return false if the block does not have any feed operators.
// If some feed operators have been prepended to the block, check that
// the info contained in these feed operators matches the feed_targets
// and feed_holder_name. Raise exception when any mismatch is found.
// Return true if the block has feed operators and holder of matching info.
static bool has_feed_operators(
173
    const BlockDesc& block,
L
Liu Yiqun 已提交
174
    const std::map<std::string, const LoDTensor*>& feed_targets,
175 176
    const std::string& feed_holder_name) {
  size_t feed_count = 0;
177
  for (auto* op : block.AllOps()) {
178 179
    if (op->Type() == kFeedOpType) {
      feed_count++;
L
Liu Yiqun 已提交
180
      // The input variable's name of feed_op should be feed_holder_name.
181 182 183 184 185 186 187 188 189 190 191 192 193 194 195
      PADDLE_ENFORCE_EQ(op->Input("X")[0], feed_holder_name,
                        "Input to feed op should be '%s'", feed_holder_name);
      std::string feed_target_name = op->Output("Out")[0];
      PADDLE_ENFORCE(
          feed_targets.find(feed_target_name) != feed_targets.end(),
          "Feed operator output name '%s' cannot be found in 'feed_targets'",
          feed_target_name);
    }
  }

  if (feed_count > 0) {
    PADDLE_ENFORCE_EQ(
        feed_count, feed_targets.size(),
        "The number of feed operators should match 'feed_targets'");

196
    if (!feed_holder_name.empty()) {
L
Liu Yiqun 已提交
197
      // When feed operator are present, so should be feed_holder.
198 199 200 201 202 203 204
      auto var = block.FindVar(feed_holder_name);
      PADDLE_ENFORCE_NOT_NULL(var, "Block should already have a '%s' variable",
                              feed_holder_name);
      PADDLE_ENFORCE_EQ(var->GetType(), proto::VarType::FEED_MINIBATCH,
                        "'%s' variable should be 'FEED_MINIBATCH' type",
                        feed_holder_name);
    }
205 206 207 208 209 210 211 212 213 214 215 216
  }

  return feed_count > 0;
}

// Check whether the block already has fetch operators and fetch_holder.
// Return false if the block does not have any fetch operators.
// If some fetch operators have been appended to the block, check that
// the info contained in these fetch operators matches the fetch_targets
// and fetch_holder_name. Raise exception when any mismatch is found.
// Return true if the block has fetch operators and holder of matching info.
static bool has_fetch_operators(
L
Liu Yiqun 已提交
217 218
    const BlockDesc& block,
    const std::map<std::string, LoDTensor*>& fetch_targets,
219 220
    const std::string& fetch_holder_name) {
  size_t fetch_count = 0;
221
  for (auto* op : block.AllOps()) {
222 223
    if (op->Type() == kFetchOpType) {
      fetch_count++;
L
Liu Yiqun 已提交
224
      // The output variable's name of fetch_op should be fetch_holder_name.
225 226 227 228 229 230 231 232 233 234 235 236 237 238 239
      PADDLE_ENFORCE_EQ(op->Output("Out")[0], fetch_holder_name,
                        "Output of fetch op should be '%s'", fetch_holder_name);
      std::string fetch_target_name = op->Input("X")[0];
      PADDLE_ENFORCE(
          fetch_targets.find(fetch_target_name) != fetch_targets.end(),
          "Fetch operator input name '%s' cannot be found in 'fetch_targets'",
          fetch_target_name);
    }
  }

  if (fetch_count > 0) {
    PADDLE_ENFORCE_EQ(
        fetch_count, fetch_targets.size(),
        "The number of fetch operators should match 'fetch_targets'");

240
    if (!fetch_holder_name.empty()) {
L
Liu Yiqun 已提交
241
      // When fetch operator are present, so should be fetch_holder.
242 243 244 245 246 247 248
      auto var = block.FindVar(fetch_holder_name);
      PADDLE_ENFORCE_NOT_NULL(var, "Block should already have a '%s' variable",
                              fetch_holder_name);
      PADDLE_ENFORCE_EQ(var->GetType(), proto::VarType::FETCH_LIST,
                        "'%s' variable should be 'FETCH_LIST' type",
                        fetch_holder_name);
    }
249 250 251 252 253 254
  }

  return fetch_count > 0;
}

void Executor::Run(const ProgramDesc& program, Scope* scope,
255 256
                   std::map<std::string, const LoDTensor*>* feed_targets,
                   std::map<std::string, LoDTensor*>* fetch_targets,
W
Wu Yi 已提交
257 258
                   bool create_local_scope, bool create_vars,
                   const std::string& feed_holder_name,
259
                   const std::string& fetch_holder_name) {
X
Xin Pan 已提交
260
  platform::RecordBlock b(kProgramId);
261
  if (FLAGS_use_mkldnn) EnableMKLDNN(program);
262
  bool has_feed_ops =
263
      has_feed_operators(program.Block(0), *feed_targets, feed_holder_name);
264
  bool has_fetch_ops =
265
      has_fetch_operators(program.Block(0), *fetch_targets, fetch_holder_name);
266 267

  ProgramDesc* copy_program = const_cast<ProgramDesc*>(&program);
S
sneaxiy 已提交
268
  std::unique_ptr<ProgramDesc> unique_ptr_of_copy_program;
269
  if (!has_feed_ops || !has_fetch_ops) {
S
sneaxiy 已提交
270 271
    unique_ptr_of_copy_program.reset(new ProgramDesc(program));
    copy_program = unique_ptr_of_copy_program.get();
272
  }
273 274
  auto* global_block = copy_program->MutableBlock(0);

275
  if (!has_feed_ops) {
276 277
    // create feed_holder variable
    auto* feed_holder = global_block->Var(feed_holder_name);
278
    feed_holder->SetType(proto::VarType::FEED_MINIBATCH);
279 280 281
    feed_holder->SetPersistable(true);

    int i = 0;
282
    for (auto& feed_target : (*feed_targets)) {
283
      std::string var_name = feed_target.first;
M
minqiyang 已提交
284
      VLOG(3) << "feed target's name: " << var_name;
285 286 287 288 289 290 291 292 293 294 295 296 297

      // prepend feed op
      auto* op = global_block->PrependOp();
      op->SetType(kFeedOpType);
      op->SetInput("X", {feed_holder_name});
      op->SetOutput("Out", {var_name});
      op->SetAttr("col", {static_cast<int>(i)});
      op->CheckAttrs();

      i++;
    }
  }

298
  if (!has_fetch_ops) {
299 300
    // create fetch_holder variable
    auto* fetch_holder = global_block->Var(fetch_holder_name);
301
    fetch_holder->SetType(proto::VarType::FETCH_LIST);
302 303 304
    fetch_holder->SetPersistable(true);

    int i = 0;
305
    for (auto& fetch_target : (*fetch_targets)) {
306
      std::string var_name = fetch_target.first;
M
minqiyang 已提交
307
      VLOG(3) << "fetch target's name: " << var_name;
308 309 310 311 312 313 314 315 316 317 318 319 320

      // append fetch op
      auto* op = global_block->AppendOp();
      op->SetType(kFetchOpType);
      op->SetInput("X", {var_name});
      op->SetOutput("Out", {fetch_holder_name});
      op->SetAttr("col", {static_cast<int>(i)});
      op->CheckAttrs();

      i++;
    }
  }

321
  auto ctx = Prepare(*copy_program, 0);
W
Wu Yi 已提交
322 323 324
  RunPreparedContext(ctx.get(), scope, feed_targets, fetch_targets,
                     create_local_scope, create_vars, feed_holder_name,
                     fetch_holder_name);
325 326
}

Q
Qiao Longfei 已提交
327 328
std::unique_ptr<ExecutorPrepareContext> Executor::Prepare(
    const ProgramDesc& program, int block_id) {
Q
Qiyang Min 已提交
329 330
  std::unique_ptr<ExecutorPrepareContext> ctx(
      new ExecutorPrepareContext(program, block_id));
Y
Yu Yang 已提交
331 332 333 334 335
  PADDLE_ENFORCE_LT(static_cast<size_t>(block_id), program.Size());
  auto& block = program.Block(block_id);
  for (auto& op_desc : block.AllOps()) {
    ctx->ops_.push_back(OpRegistry::CreateOp(*op_desc));
  }
B
baojun-nervana 已提交
336
  if (FLAGS_use_ngraph) EnableFusedOp(ctx.get());
Q
Qiyang Min 已提交
337
  return ctx;
Y
Yu Yang 已提交
338 339
}

T
refine  
typhoonzero 已提交
340
std::vector<std::shared_ptr<ExecutorPrepareContext>> Executor::Prepare(
T
typhoonzero 已提交
341 342 343 344 345 346 347 348 349 350 351 352 353 354
    const ProgramDesc& program, const std::vector<int>& block_ids) {
  std::vector<std::shared_ptr<ExecutorPrepareContext>> result;
  for (auto& bid : block_ids) {
    auto* ctx = new ExecutorPrepareContext(program, bid);
    PADDLE_ENFORCE_LT(static_cast<size_t>(bid), program.Size());
    auto& block = program.Block(bid);
    for (auto& op_desc : block.AllOps()) {
      ctx->ops_.push_back(OpRegistry::CreateOp(*op_desc));
    }
    result.push_back(std::shared_ptr<ExecutorPrepareContext>(ctx));
  }
  return result;
}

Y
Yu Yang 已提交
355
void Executor::RunPreparedContext(ExecutorPrepareContext* ctx, Scope* scope,
Q
qiaolongfei 已提交
356 357
                                  bool create_local_scope, bool create_vars,
                                  bool keep_kids) {
358
  PADDLE_ENFORCE_NOT_NULL(scope);
Y
Yu Yang 已提交
359 360 361 362
  Scope* local_scope = scope;
  if (create_vars) {
    if (create_local_scope) {
      local_scope = &scope->NewScope();
363 364
    }
    CreateVariables(ctx->prog_, local_scope, ctx->block_id_);
L
Liu Yiqun 已提交
365
  }
Y
Yu Yang 已提交
366

S
sneaxiy 已提交
367 368
  int64_t max_memory_size = GetEagerDeletionThreshold();
  std::unique_ptr<GarbageCollector<Tensor>> gc;
369 370
  // WhileOp would set keep_kids to true,
  // because WhileGradOp needs the scopes created in WhileOp.
S
sneaxiy 已提交
371 372 373 374
  // Perhaps, we should not perform eager deletion in WhileOp
  // The scopes and variables created by WhileOp would be deleted
  // in WhileGradOp.
  if (max_memory_size >= 0 && !keep_kids) {
S
sneaxiy 已提交
375
    ctx->ResetReferenceCount();
S
sneaxiy 已提交
376 377 378 379 380 381 382 383 384 385 386 387 388
#ifdef PADDLE_WITH_CUDA
    if (platform::is_gpu_place(place_)) {
      gc.reset(new DefaultStreamGarbageCollector<Tensor>(
          boost::get<platform::CUDAPlace>(place_), max_memory_size));
    } else {
#endif
      gc.reset(new CPUGarbageCollector<Tensor>(
          boost::get<platform::CPUPlace>(place_), max_memory_size));
#ifdef PADDLE_WITH_CUDA
    }
#endif
  }

Y
Yu Yang 已提交
389
  for (auto& op : ctx->ops_) {
390
    op->Run(*local_scope, place_);
S
sneaxiy 已提交
391 392

    if (gc != nullptr) {
S
sneaxiy 已提交
393 394
      DeleteUnusedTensors(*local_scope, op.get(), gc.get(),
                          &(ctx->cur_ref_cnts_));
S
sneaxiy 已提交
395
    }
Y
Yu Yang 已提交
396
  }
S
sneaxiy 已提交
397

S
sneaxiy 已提交
398
  if (gc != nullptr) {
S
sneaxiy 已提交
399
    gc->Wait();
S
sneaxiy 已提交
400
  } else {
S
sneaxiy 已提交
401
    platform::DeviceContextPool::Instance().Get(place_)->Wait();
S
sneaxiy 已提交
402
  }
S
sneaxiy 已提交
403

Q
qiaolongfei 已提交
404
  if (local_scope != scope) {
Y
Yu Yang 已提交
405
    scope->DeleteScope(local_scope);
406
  } else {
Q
qiaolongfei 已提交
407 408 409 410 411
    if (!keep_kids) {
      // By default, we should delete all kid scopes after run executor because
      // some operators may create local scope when running, such as while_op.
      // But when while_op also create a local executor to run it's sub block,
      // the sub scopes it created should not be dropped immediately, because
Q
qiaolongfei 已提交
412 413
      // while_grad_op will use some variables created during while_op run, so
      // we need to keep the kids and wait for the outer executor to drop them.
Q
qiaolongfei 已提交
414 415
      scope->DropKids();
    }
Y
Yu Yang 已提交
416 417 418
  }
}

419 420
void Executor::RunPreparedContext(
    ExecutorPrepareContext* ctx, Scope* scope,
421
    std::map<std::string, const LoDTensor*>* feed_targets,
W
Wu Yi 已提交
422 423 424
    std::map<std::string, LoDTensor*>* fetch_targets, bool create_local_scope,
    bool create_vars, const std::string& feed_holder_name,
    const std::string& fetch_holder_name) {
425 426
  auto& global_block = ctx->prog_.Block(ctx->block_id_);

427
  PADDLE_ENFORCE(
428
      has_feed_operators(global_block, *feed_targets, feed_holder_name),
429 430
      "Program in ExecutorPrepareContext should has feed_ops.");
  PADDLE_ENFORCE(
431
      has_fetch_operators(global_block, *fetch_targets, fetch_holder_name),
432 433
      "Program in the prepared context should has fetch_ops.");

434 435 436 437 438
  // map the data of feed_targets to feed_holder
  for (auto* op : global_block.AllOps()) {
    if (op->Type() == kFeedOpType) {
      std::string feed_target_name = op->Output("Out")[0];
      int idx = boost::get<int>(op->GetAttr("col"));
439 440
      SetFeedVariable(scope, *(*feed_targets)[feed_target_name],
                      feed_holder_name, idx);
441 442 443
    }
  }

W
Wu Yi 已提交
444
  RunPreparedContext(ctx, scope, create_local_scope, create_vars);
445 446 447 448 449 450

  // obtain the data of fetch_targets from fetch_holder
  for (auto* op : global_block.AllOps()) {
    if (op->Type() == kFetchOpType) {
      std::string fetch_target_name = op->Input("X")[0];
      int idx = boost::get<int>(op->GetAttr("col"));
451
      *(*fetch_targets)[fetch_target_name] =
452 453 454 455 456
          GetFetchVariable(*scope, fetch_holder_name, idx);
    }
  }
}

457 458
void Executor::EnableMKLDNN(const ProgramDesc& program) {
#ifdef PADDLE_WITH_MKLDNN
M
minqiyang 已提交
459
  VLOG(3) << "use_mkldnn=True";
460 461 462 463 464 465 466 467
  for (size_t bid = 0; bid < program.Size(); ++bid) {
    auto* block = const_cast<ProgramDesc&>(program).MutableBlock(bid);
    for (auto* op : block->AllOps()) {
      if (op->HasAttr("use_mkldnn")) {
        op->SetAttr("use_mkldnn", true);
      }
    }
  }
468 469 470
#else
  LOG(WARNING)
      << "'MKLDNN' is not supported, Please re-compile with WITH_MKLDNN option";
471 472
#endif
}
Q
qijun 已提交
473 474
}  // namespace framework
}  // namespace paddle