interpretercore.cc 37.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
14

15
#include "paddle/fluid/framework/new_executor/interpretercore.h"
16

17
#include <unordered_set>
18

19
#include "paddle/fluid/framework/details/nan_inf_utils.h"
20
#include "paddle/fluid/framework/details/share_tensor_buffer_functor.h"
21 22
#include "paddle/fluid/framework/new_executor/garbage_collector/event_garbage_collector.h"
#include "paddle/fluid/framework/new_executor/garbage_collector/fast_garbage_collector.h"
23 24
#include "paddle/fluid/framework/new_executor/interpretercore_util.h"
#include "paddle/fluid/framework/operator.h"
L
liutiexing 已提交
25 26
#include "paddle/fluid/platform/os_info.h"
#include "paddle/fluid/platform/profiler/event_tracing.h"
27
#include "paddle/fluid/platform/profiler/supplement_tracing.h"
28
#include "paddle/phi/core/kernel_context.h"
L
Leo Chen 已提交
29 30 31
#ifdef PADDLE_WITH_MKLDNN
#include "paddle/fluid/platform/mkldnn_helper.h"
#endif
32

33 34
PADDLE_DEFINE_EXPORTED_bool(new_executor_use_inplace,
                            true,
35
                            "Use inplace in new executor");
36 37
PADDLE_DEFINE_EXPORTED_bool(new_executor_use_local_scope,
                            true,
38 39
                            "Use local_scope in new executor(especially used "
                            "in UT), can turn off for better performance");
40

41
DECLARE_bool(check_nan_inf);
42
DECLARE_bool(benchmark);
43
DECLARE_bool(fast_eager_deletion_mode);
44

45
constexpr const char* kExceptionCaught = "ExceptionCaught";
46
constexpr const char* kTaskCompletion = "TaskCompletion";
47

48 49
namespace paddle {
namespace framework {
50 51
// NOTE(Aurelius84): Need a better strategy to determine it.
static constexpr size_t kHostNumThreads = 4;
52
static constexpr size_t kDeviceNumThreads = 1;
53

54
bool IsInterpretercoreFastGCEnabled() {
55 56 57
  return memory::allocation::AllocatorFacade::Instance()
             .IsStreamSafeCUDAAllocatorUsed() &&
         FLAGS_fast_eager_deletion_mode;
58 59
}

60 61
InterpreterCore::InterpreterCore(const platform::Place& place,
                                 const BlockDesc& block,
62
                                 const std::set<std::string>& skip_gc_vars,
63
                                 framework::Scope* scope)
W
wanghuancoder 已提交
64
    : place_(place),
65
      block_(block),
66
      skip_gc_vars_(skip_gc_vars),
67
      var_scope_(scope),
68
      stream_analyzer_(place) {
L
Leo Chen 已提交
69
  VLOG(4) << "InterpreterCore(): " << this << " on " << place_;
70

71
  is_build_ = false;
72 73 74 75 76 77 78 79 80 81

#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
  if (IsInterpretercoreFastGCEnabled()) {
    gc_ = std::make_unique<InterpreterCoreFastGarbageCollector>();
  } else {
    gc_ = std::make_unique<InterpreterCoreEventGarbageCollector>();
  }
#else
  gc_ = std::make_unique<InterpreterCoreEventGarbageCollector>();
#endif
W
wanghuancoder 已提交
82

L
liutiexing 已提交
83
  exception_notifier_ = main_thread_blocker_.RegisterEvent(kExceptionCaught);
84
  completion_notifier_ = main_thread_blocker_.RegisterEvent(kTaskCompletion);
85

86
  create_local_scope_ = FLAGS_new_executor_use_local_scope;
87 88 89 90
  VLOG(4) << "create_local_scope_ is " << create_local_scope_;

  if (create_local_scope_) {
    auto local_scope = &var_scope_.GetMutableScope()->NewScope();
91 92
    local_scope_ = local_scope;
  }
93
  var_scope_.SetLocalScope(local_scope_);
94

W
wanghuancoder 已提交
95 96 97 98 99 100 101
  // prune

  // optmize graph pass

  // convert to run graph
}

102 103 104 105
InterpreterCore::~InterpreterCore() {
  // cancle gc's thread
  gc_.reset(nullptr);

106 107
  async_work_queue_.reset();
  VLOG(4) << "~InterpreterCore(): " << this << " on " << place_;
L
Leo Chen 已提交
108 109 110 111 112 113

#ifdef PADDLE_WITH_MKLDNN
  // Clear mkl-dnn cache,
  // this is needed to have mkl-dnn unit tests working
  platform::ClearMKLDNNCache(place_, this);
#endif
114 115
}

116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
interpreter::CostInfo InterpreterCore::DryRun(
    const std::vector<std::string>& feed_names,
    const std::vector<framework::LoDTensor>& feed_tensors) {
  Prepare(feed_names, feed_tensors, true);
  interpreter::CostInfo cost_info;
  {
    interpreter::ProfilerGuard(place_, &cost_info);

    // For the program that only run once, it is no need to
    // create work_queue, so the async_work_queue_ is created
    // until the second step run.
    async_work_queue_ = GetWorkQueue();

    ExecuteInstructionList(vec_instruction_);
    platform::DeviceContextPool::Instance().Get(place_)->Wait();
  }

  if (create_local_scope_) {
    ClearLoDTensorArrayInLocalScope();
  }

  return cost_info;
138 139
}

W
wanghuancoder 已提交
140
paddle::framework::FetchList InterpreterCore::Run(
141
    const std::vector<std::string>& feed_names,
142
    const std::vector<framework::LoDTensor>& feed_tensors) {
L
Leo Chen 已提交
143 144 145
#ifdef PADDLE_WITH_MKLDNN
  platform::AttachPointerHashToMKLDNNKey(this, place_);
#endif
146 147
  bool is_build = is_build_;
  Prepare(feed_names, feed_tensors, is_build);
148

149
  if (is_build) {
150 151 152
    // For the program that only run once, it is no need to
    // create work_queue, so the async_work_queue_ is created
    // until the second step run.
153
    async_work_queue_ = GetWorkQueue();
154
    ExecuteInstructionList(vec_instruction_);
155 156 157
#ifdef PADDLE_WITH_ASCEND_CL
    platform::DeviceContextPool::Instance().Get(place_)->Wait();
#endif
158
  }
159 160 161 162
  if (create_local_scope_) {
    ClearLoDTensorArrayInLocalScope();
  }

W
wanghuancoder 已提交
163
  // return Fetch Tensors
164 165 166 167 168 169
  auto* fetch_var = local_scope_->FindVar(interpreter::kFetchVarName);
  if (fetch_var) {
    return std::move(*fetch_var->GetMutable<framework::FetchList>());
  } else {
    return {};
  }
170 171
}

172 173
paddle::framework::FetchList InterpreterCore::Run(
    const std::vector<std::string>& feed_names) {
L
Leo Chen 已提交
174 175 176
#ifdef PADDLE_WITH_MKLDNN
  platform::AttachPointerHashToMKLDNNKey(this, place_);
#endif
177
  if (!is_build_) {
178 179
    paddle::framework::interpreter::build_variable_scope(block_, &var_scope_);

180
    std::vector<paddle::framework::OpFuncNode> op_func_nodes;
181 182 183 184
    paddle::framework::interpreter::build_op_func_list(place_,
                                                       block_,
                                                       skip_gc_vars_,
                                                       &op_func_nodes,
185
                                                       &var_scope_,
186
                                                       create_local_scope_);
187
    is_build_ = true;
188
    SetFeedVarsInplaceSkip(feed_names);
189 190
    // convert vec func_list to graph
    Convert(&op_func_nodes);
191

192
  } else {
193 194 195
    // For the program that only run once, it is no need to
    // create work_queue, so the async_work_queue_ is created
    // until the second step run.
196
    async_work_queue_ = GetWorkQueue();
197

198
    ExecuteInstructionList(vec_instruction_);
199 200 201
#ifdef PADDLE_WITH_ASCEND_CL
    platform::DeviceContextPool::Instance().Get(place_)->Wait();
#endif
202 203
  }

204 205 206
  if (create_local_scope_) {
    ClearLoDTensorArrayInLocalScope();
  }
207
  // return Fetch Tensors
208 209 210 211 212 213
  auto* fetch_var = local_scope_->FindVar(interpreter::kFetchVarName);
  if (fetch_var) {
    return std::move(*fetch_var->GetMutable<framework::FetchList>());
  } else {
    return {};
  }
214 215
}

216 217 218 219 220 221 222 223 224 225 226
void InterpreterCore::SetCopyProgram(std::shared_ptr<ProgramDesc> prog) {
  copy_program_ = prog;
}

void InterpreterCore::ShareWorkQueueFrom(std::shared_ptr<InterpreterCore> src) {
  async_work_queue_ = src->GetWorkQueue();
  VLOG(8) << "Share AsyncWorkQueue from InterpreterCore(" << &src
          << ") to InterpreterCore(" << this << ")";
}

bool InterpreterCore::BuildInplaceCheckVarIsOnlyInput(size_t var_index) {
227
  if (!var_scope_.VarDesc(var_index)) {
228 229 230 231 232 233
    return input_var2op_info_.at(var_index).size() == 1;
  } else {
    int is_input_cnt = 0;
    for (auto inst_id : input_var2op_info_.at(var_index)) {
      OpInOutInfo info;
      info.Build(vec_instruction_.at(inst_id).OpBase());
234
      if (info.IsInArgBufferNeeded(var_scope_.VarDesc(var_index)->Name())) {
235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256
        is_input_cnt++;
      }
    }
    return is_input_cnt == 1;
  }
}

std::shared_ptr<interpreter::AsyncWorkQueue> InterpreterCore::GetWorkQueue() {
  if (async_work_queue_ == nullptr) {
    async_work_queue_ = std::make_shared<interpreter::AsyncWorkQueue>(
        kHostNumThreads, kDeviceNumThreads, &main_thread_blocker_);
  }
  return async_work_queue_;
}

void InterpreterCore::BuildAndCacheInstructionCtx(Instruction* instr_node) {
  VariableValueMap ins_map;
  for (auto& var_name_item : instr_node->Inputs()) {
    std::vector<Variable*> input_vars;

    input_vars.reserve(var_name_item.second.size());
    for (auto& id : var_name_item.second) {
257 258
      input_vars.emplace_back(
          local_scope_->FindVar(var_scope_.GetNameById(id)));
259 260 261 262 263 264 265 266 267 268
    }
    ins_map.emplace(var_name_item.first, std::move(input_vars));
  }

  VariableValueMap outs_map;
  for (auto& var_name_item : instr_node->Outputs()) {
    std::vector<Variable*> out_vars;

    out_vars.reserve(var_name_item.second.size());
    for (auto& id : var_name_item.second) {
269
      out_vars.emplace_back(local_scope_->FindVar(var_scope_.GetNameById(id)));
270 271 272 273 274 275 276
    }
    outs_map.emplace(var_name_item.first, std::move(out_vars));
  }

  // set runtime_ctx and infershape_ctx_
  if (instr_node->OpBase()->Type() == "cinn_launch") {  // OP use scope in
                                                        // kernel
277 278
    Scope* local_scope = create_local_scope_ ? var_scope_.GetMutableLocalScope()
                                             : var_scope_.GetMutableScope();
279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300
    instr_node->ResetContextWithScope(ins_map, outs_map, *local_scope);
  } else {
    instr_node->ResetContext(ins_map, outs_map);
  }
}

void InterpreterCore::BuildInplace() {
  for (size_t i = 0; i < vec_instruction_.size(); ++i) {
    auto& instr = vec_instruction_[i];
    auto* op_base = instr.OpBase();
    if (!op_base->Info().infer_inplace_) {
      continue;
    }

    auto in_to_outs = op_base->Info().infer_inplace_(
        platform::is_gpu_place(instr.DeviceContext().GetPlace()));

    auto& inputs = instr.Inputs();
    auto& outputs = instr.Outputs();
    for (auto& pair : in_to_outs) {
      auto iter = inputs.find(pair.first);
      if (iter != inputs.end() && !iter->second.empty()) {
301
        auto in_var_desc = var_scope_.VarDesc(iter->second[0]);
302 303 304
        if (in_var_desc && in_var_desc->Persistable()) {
          continue;
        }
305
        if (var_scope_.GetVarSikpInplace(iter->second[0])) {
306 307 308 309 310
          continue;
        }
        if (BuildInplaceCheckVarIsOnlyInput(iter->second[0])) {
          auto iterout = outputs.find(pair.second);
          if (iterout != outputs.end() && !iterout->second.empty()) {
311 312 313 314
            auto invar =
                local_scope_->FindVar(var_scope_.GetNameById(iter->second[0]));
            auto outvar = local_scope_->FindVar(
                var_scope_.GetNameById(iterout->second[0]));
315 316 317 318
            if (invar && outvar && invar->IsType<LoDTensor>() &&
                outvar->IsType<LoDTensor>()) {
              instr.AddInplace(invar, outvar);
              VLOG(3) << "inplace " << vec_instruction_[i].OpBase()->Type()
319 320
                      << " " << var_scope_.GetNameById(iter->second[0])
                      << " -> " << var_scope_.GetNameById(iterout->second[0])
321 322 323 324 325
                      << std::endl;
            }
          }
        }
      }
326 327 328 329
    }
  }
}

X
xiongkun 已提交
330 331 332 333 334
void InterpreterCore::BuildOperatorDependences() {
  // analysis the dependences between ops, set the dependecy_count_ and Call
  // Schedule
  auto op_nums = vec_instruction_.size();
  dependecy_count_.resize(op_nums);
335 336
  auto op2downstream = interpreter::build_op_downstream_map(
      vec_instruction_, &op_happens_before_);
X
xiongkun 已提交
337 338 339 340 341 342 343 344 345 346 347
  for (size_t op = 0; op < vec_instruction_.size(); ++op) {
    auto op_list = op2downstream[op];
    std::vector<size_t> downsteam_vector(op_list.begin(), op_list.end());
    stream_analyzer_.Schedule(downsteam_vector, &vec_instruction_, op);

    for (auto inst_id : op_list) {
      dependecy_count_[inst_id]++;
    }
  }
}

348 349 350 351 352 353 354 355 356 357 358 359 360
// At the end of each step, the holder of Tensor in LoDTensorArray is null.
// Clear these Tensors and leave LoDTensorArray empty, otherwise an exception
// will occur in the next step
void InterpreterCore::ClearLoDTensorArrayInLocalScope() {
  auto vars = local_scope_->LocalVars();
  for (auto var : vars) {
    if (var->IsType<LoDTensorArray>()) {
      auto* lod_tensor_arr = var->GetMutable<LoDTensorArray>();
      lod_tensor_arr->clear();
    }
  }
}

L
Leo Chen 已提交
361 362
void InterpreterCore::Convert(
    std::vector<paddle::framework::OpFuncNode>* op_func_nodes) {
363 364
  auto& vec_meta_info = var_scope_.MutableVecMetaInfo();
  auto var_nums = var_scope_.VarSize();
365
  input_var2op_info_.resize(var_nums);
L
Leo Chen 已提交
366
  auto nodes = *op_func_nodes;
367

L
Leo Chen 已提交
368
  auto op_nums = nodes.size();
369 370
  vec_instruction_.reserve(op_nums);
  for (size_t op_idx = 0; op_idx < op_nums; ++op_idx) {
L
Leo Chen 已提交
371
    auto& op_func_node = nodes[op_idx];
372
    auto* dev_ctx_ = stream_analyzer_.ParseDeviceContext(op_func_node);
L
Leo Chen 已提交
373
    vec_instruction_.emplace_back(op_idx, std::move(op_func_node), *dev_ctx_);
374
  }
375

376 377 378 379 380
  BuildOperatorDependences();

  // calculate last_live_ops_
  for (size_t op_idx = 0; op_idx < op_nums; ++op_idx) {
    auto& instr = vec_instruction_[op_idx];
381
    OpInOutInfo info;
382
    std::set<size_t> gc_check_inputs;
383

384
    for (auto& item : instr.Inputs()) {
385
      for (auto id : item.second) {
W
wanghuancoder 已提交
386 387 388
        if (id == kEmptyVarIndex) {
          continue;
        }
389
        input_var2op_info_.at(id).push_back(op_idx);
W
wanghuancoder 已提交
390 391
        // var can be gc-ed
        if (!info.IsBuilt()) {
392
          info.Build(instr.OpBase());
W
wanghuancoder 已提交
393
        }
394
        auto* var_desc = var_scope_.VarDesc(id);
395 396
        if (var_desc) {
          if (info.IsInArgBufferNeeded(var_desc->Name())) {
397
            gc_check_inputs.insert(id);
W
wanghuancoder 已提交
398 399
          }
        } else {
400
          gc_check_inputs.insert(id);
W
wanghuancoder 已提交
401
        }
402 403
      }
    }
404

405
    for (auto var_id : gc_check_inputs) {
406 407
      paddle::framework::Variable* var =
          local_scope_->FindVar(var_scope_.GetNameById(var_id));
408 409
      if (var->IsType<LoDTensor>() || var->IsType<phi::SelectedRows>() ||
          var->IsType<LoDTensorArray>()) {
410
        last_live_ops_[var_id].insert(op_idx);
411
      } else {
412 413
        VLOG(4) << "not clear " << var_scope_.GetNameById(var_id) << " after "
                << instr.OpBase()->Type() << " because its type is "
414 415
                << framework::ToTypeName(var->Type());
      }
416 417 418
    }
  }

W
wanghuancoder 已提交
419
  for (size_t i = 0; i < vec_instruction_.size(); ++i) {
420
    // checkout output
421
    for (auto& item : vec_instruction_[i].Outputs()) {
422 423 424
      for (auto var_id : item.second) {
        if (input_var2op_info_.at(var_id).size() == 0) {
          last_live_ops_[var_id].insert(i);
W
wanghuancoder 已提交
425 426 427 428 429
        }
      }
    }
  }

430 431
  // clear the last_live_ops list for all vars in skip_gc_vars
  for (const std::string& skip_gc_var : skip_gc_vars_) {
432
    int var_id = var_scope_.GetIdByName(skip_gc_var);
433 434 435 436 437 438
    if (var_id != -1) {
      last_live_ops_[var_id].clear();
      VLOG(8) << "Skip gc for var: " << skip_gc_var;
    }
  }

439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460
  // shrink, find the downstream op that has no other op in the
  // downstream list happens before it
  // For example,
  // b = op1(a)
  // c = op2(a, b)
  // in this case, a is the input of op1 and op2, we only need to check
  // a after op2, because op2 always uses a after op1.
  for (size_t i = 0; i < last_live_ops_.size(); ++i) {
    std::set<size_t> minumum_last_live_ops;
    for (size_t item : last_live_ops_[i]) {
      bool not_before_any = true;
      // find the op that is not executed before any
      for (size_t other_item : last_live_ops_[i]) {
        if (op_happens_before_[item][other_item]) {
          VLOG(8) << "happens_before: " << item << "->" << other_item
                  << ", so skip " << item;
          not_before_any = false;
          break;
        }
      }
      if (not_before_any) {
        VLOG(8) << "last live op of var " << i << " "
461
                << var_scope_.GetNameById(i) << " : " << item << " "
462 463 464 465 466 467 468 469
                << vec_instruction_[item].OpBase()->Type();
        minumum_last_live_ops.insert(item);
        vec_instruction_[item].AddGCCheckVar(i);
      }
    }
    last_live_ops_[i] = minumum_last_live_ops;
    vec_meta_info[i].var_ref_count_ = last_live_ops_[i].size();
  }
470 471

  for (size_t i = 0; i < vec_instruction_.size(); ++i) {
472
    BuildAndCacheInstructionCtx(&vec_instruction_[i]);
473
  }
W
wanghuancoder 已提交
474

475 476
  BuildSkipShareLoDInfo();

W
wanghuancoder 已提交
477
  for (size_t i = 0; i < vec_instruction_.size(); ++i) {
478
    gc_event_.emplace_back(vec_instruction_[i].DeviceContext().GetPlace(),
W
wanghuancoder 已提交
479 480
                           platform::GenerateDeviceEventFlag());
  }
481 482 483 484 485 486 487 488
  bool inplaced = false;
  for (auto inst : vec_instruction_) {
    if (inst.OpBase()->Type() == "share_buffer" ||
        inst.OpBase()->Type() == "share_data") {
      VLOG(4) << "Already inplaced, skip inplace now.";
      inplaced = true;
    }
  }
489

490
  if (FLAGS_new_executor_use_inplace && !inplaced) {
491 492 493
    BuildInplace();
  }

494 495 496 497 498 499 500 501 502 503
  // prepare for the first time.
  std::promise<std::unique_ptr<AtomicVectorSizeT>> deps_promise =
      std::promise<std::unique_ptr<AtomicVectorSizeT>>();
  atomic_deps_ = deps_promise.get_future();
  deps_promise.set_value(interpreter::PrepareAtomicDeps(dependecy_count_));

  std::promise<std::unique_ptr<AtomicVectorSizeT>> var_ref_promise =
      std::promise<std::unique_ptr<AtomicVectorSizeT>>();
  atomic_var_ref_ = var_ref_promise.get_future();
  var_ref_promise.set_value(
504
      interpreter::PrepareAtomicVarRef(var_scope_.VecMetaInfo()));
505 506
}

507 508 509
void InterpreterCore::BuildSkipShareLoDInfo() {
  for (size_t i = 0; i < vec_instruction_.size(); ++i) {
    bool can_skip_lod = true;
510
    for (auto& input : vec_instruction_[i].InnerRuntimeContext()->inputs) {
511 512 513 514 515 516 517 518 519 520 521 522
      for (auto& var : input.second) {
        if (var->IsType<LoDTensor>()) {
          if (var->Get<LoDTensor>().lod().size() != 0) {
            can_skip_lod = false;
            break;
          }
        } else {
          can_skip_lod = false;
          break;
        }
      }
    }
523
    vec_instruction_[i].InnerInferShapeContext()->SetSkipLoD(can_skip_lod);
524 525 526
  }
}

527
void InterpreterCore::RunInstruction(const Instruction& instr_node) {
528 529
  auto* op = instr_node.OpBase();
  auto place = instr_node.DeviceContext().GetPlace();
530 531 532
  VLOG(4) << "Start run " << place << " " << op->DebugStringEx(local_scope_);
  Scope* local_scope = create_local_scope_ ? var_scope_.GetMutableLocalScope()
                                           : var_scope_.GetMutableScope();
533 534 535 536 537 538 539 540 541 542 543

#ifdef PADDLE_WITH_ASCEND_CL
  // NOTE(wangxi): nan/inf cannot be detected on NPU by checking the variable
  // values, but only through special `float_status` to checks whether
  // the operation is overflow. More about `float_status`, see:
  // https://gitee.com/ascend/modelzoo/issues/I3NF8V?from=project-issue
  if (FLAGS_check_nan_inf) {
    framework::details::NPUAllocAndClearFloatStatus(*op, *local_scope, place);
  }
#endif

544
  auto op_with_kernel = dynamic_cast<const framework::OperatorWithKernel*>(op);
545
  {
546
    // If it is OperatorBase, InferShape do nothing.
547 548
    if (op_with_kernel != nullptr) {
      platform::RecordEvent infershape_event(
549 550 551
          "infer_shape",
          platform::TracerEventType::OperatorInner,
          1,
552
          platform::EventRole::kInnerOp);
553 554 555 556 557 558 559

      // see OperatorWithKernel::RunImpl in operator.cc for why
      if (!(op_with_kernel->HasAttr(kAllKernelsMustComputeRuntimeShape) &&
            op_with_kernel->Attr<bool>(kAllKernelsMustComputeRuntimeShape))) {
        op_with_kernel->Info().infer_shape_(
            instr_node.InnerInferShapeContext().get());
      }
560 561 562 563 564
      infershape_event.End();
      platform::RecordOpInfoSupplement(op->Type(),
                                       op->Attrs(),
                                       *(instr_node.InnerInferShapeContext()),
                                       *(instr_node.InnerRuntimeContext()));
565
    }
566
  }
567

568 569
  if (op_with_kernel != nullptr && FLAGS_new_executor_use_inplace) {
    // TODO(xiongkun03) Does operator base support inplace ?
570
    for (auto& pair : instr_node.InplaceInfo()) {
571 572 573 574 575 576 577 578
      const auto& in = paddle::framework::details::GetTensorFromVar(pair.first);
      auto* out =
          paddle::framework::details::GetMutableTensorFromVar(pair.second);
      if (in.dims() == out->dims()) {
        out->ShareBufferWith(in);
      }
    }
  }
579

580
  {
581
    platform::RecordEvent compute_event(
582 583 584
        "compute",
        platform::TracerEventType::OperatorInner,
        1,
585
        platform::EventRole::kInnerOp);
586 587 588
    if (op_with_kernel == nullptr) {
      instr_node.OpBase()->Run(*local_scope, place_);
    } else {
589 590 591
      // fit for phi
      if (instr_node.PhiKernel() && instr_node.PhiKernel()->IsValid()) {
        VLOG(4) << "Run phi kernel: " << op->Type();
592 593
        VLOG(4) << instr_node.InnerRuntimeContext().get() << " "
                << &instr_node.DeviceContext();
594
        phi::KernelContext pt_kernel_context;
595
        op_with_kernel->BuildPhiKernelContext(
596
            *instr_node.InnerRuntimeContext().get(),
597 598
            const_cast<platform::DeviceContext*>(&instr_node.DeviceContext()),
            &pt_kernel_context);
599

600
        (*instr_node.PhiKernel())(&pt_kernel_context);
601 602 603 604

      } else {
        instr_node.KernelFunc()(*instr_node.InnerExecutionContext().get());
      }
605
    }
606
  }
607

608
  VLOG(4) << "End run " << place << " " << op->DebugStringEx(local_scope_);
609

610
  if (!instr_node.InplaceBackMap().empty()) {
L
liutiexing 已提交
611 612
    platform::RecordEvent inplaceback_event(
        "InplaceVarsBack", platform::TracerEventType::UserDefined, 10);
613 614 615 616
    auto& m = instr_node.InplaceBackMap();
    // NOTE(zhiqiu): same logic as TransferInplaceVarsBack() in operator.cc
    for (auto& p : m) {
      auto* transformed_tensor = GetMutableLoDTensorOrSelectedRowsValueFromVar(
617
          var_scope_.VarRef(p.first));
618
      auto* original_tensor = GetMutableLoDTensorOrSelectedRowsValueFromVar(
619
          var_scope_.VarRef(p.second));
620 621
      original_tensor->ShareDataWith(*transformed_tensor);
      VLOG(4) << "Transfer inplace variable back form "
622 623
              << var_scope_.GetNameById(p.first) << " to "
              << var_scope_.GetNameById(p.second);
624 625 626
    }
  }

627 628 629
  /*For profiling/benchmark only*/
  if (FLAGS_benchmark) {
    instr_node.DeviceContext().Wait();
630 631
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
    PADDLE_ENFORCE_GPU_SUCCESS(platform::GpuGetLastError());
632 633 634 635 636
    VLOG(4) << "Operator(" << op->Type()
            << "): context wait and get last error";
#endif
  }

637
  // for debug nan/inf
638
  if (op_with_kernel != nullptr && FLAGS_check_nan_inf) {
639
    VLOG(4) << "Check nan/inf";
640
    framework::details::CheckOpHasNanOrInf(
641
        *op,
642
        *local_scope_,
643
        place);  // TODO(xiongkun03) change it to inner scope.
644
  }
645 646 647
}

void InterpreterCore::ExecuteInstructionList(
648
    const std::vector<Instruction>& vec_instr) {
649 650 651 652 653 654
  unfinished_op_numer_ = vec_instr.size();
  if (unfinished_op_numer_ == 0) {
    VLOG(4) << "No op to run, return";
    return;
  }

L
liutiexing 已提交
655 656
  platform::RecordEvent record_prepare(
      "PrepareAtomic", platform::TracerEventType::UserDefined, 1);
657 658
  // NOTE(zhiqiu): get the prepared deps from std::future, and async prepare
  // those for the next step
659 660
  auto atomic_deps = atomic_deps_.get();
  auto atomic_var_ref = atomic_var_ref_.get();
661

662 663
  atomic_deps_ = async_work_queue_->PrepareAtomicDeps(dependecy_count_);
  atomic_var_ref_ =
664
      async_work_queue_->PrepareAtomicVarRef(var_scope_.VecMetaInfo());
L
liutiexing 已提交
665
  record_prepare.End();
666

667 668
  exception_holder_.Clear();

669 670
  for (size_t i = 0; i < dependecy_count_.size(); ++i) {
    if (dependecy_count_[i] == 0) {
671
      async_work_queue_->AddTask(vec_instr.at(i).KernelType(),
672 673 674
                                 [this,
                                  i,
                                  atomic_deps = atomic_deps.get(),
675
                                  atomic_var_ref = atomic_var_ref.get()] {
676 677
                                   RunInstructionAsync(
                                       i, atomic_deps, atomic_var_ref);
678
                                 });
679 680 681
    }
  }

682
  auto event_name = main_thread_blocker_.WaitEvent();
683 684
  VLOG(1) << "main_thread_blocker_(" << &main_thread_blocker_
          << ") got event_name: " << event_name;
685

686
  if (UNLIKELY(exception_holder_.IsCaught())) {
687
    VLOG(1) << "Exception caught " << exception_holder_.Type();
688 689 690 691 692
    // Graceful exit when the executor encountered a fatal error.
    // EOF is not a fatal error.
    if (exception_holder_.Type() != "EOF") {
      async_work_queue_->Cancel();
    }
693
    VLOG(4) << "Cancel ok";
694
    PADDLE_ENFORCE_EQ(
695 696
        main_thread_blocker_.Clear(),
        0,
697 698
        platform::errors::PreconditionNotMet(
            "main_thread_blocker_.Clear() return -1, clear failed"));
699
    VLOG(4) << "clear ok";
700 701
    exception_holder_.ReThrow();
  }
702
}
703

L
liutiexing 已提交
704
void InterpreterCore::RunNextInstructions(
705 706
    const Instruction& instr,
    std::queue<size_t>* reserved_next_ops,
707 708
    std::vector<std::atomic<size_t>>* atomic_deps,
    std::vector<std::atomic<size_t>>* atomic_var_ref) {
709 710
  platform::RecordEvent record(
      "RunNextInstructions", platform::TracerEventType::UserDefined, 10);
L
liutiexing 已提交
711
  VLOG(4) << "atomic 1:" << atomic_deps;
712
  auto& next_instr = instr.NextInstructions();
713 714

  auto IsReady = [atomic_deps](size_t next_id) {
715 716
    VLOG(4) << "atomic:" << atomic_deps << " op_id: " << next_id
            << ", remain deps: " << (*atomic_deps)[next_id];
717
    return (*atomic_deps)[next_id].fetch_sub(1, std::memory_order_relaxed) == 1;
718 719
  };

720
  if (instr.KernelType() == OpFuncType::kQueueAsync) {
721
    // move all sync_ops into other threads
722
    for (auto next_id : next_instr.SyncRunIds()) {
723
      if (IsReady(next_id)) {
724
        async_work_queue_->AddTask(
725
            vec_instruction_[next_id].KernelType(),
726 727 728
            [this, next_id, atomic_deps, atomic_var_ref]() {
              RunInstructionAsync(next_id, atomic_deps, atomic_var_ref);
            });
729 730 731
      }
    }
    // keep all async_ops running in current thread
732
    for (auto next_id : next_instr.DirectRunIds()) {
733
      if (IsReady(next_id)) {
L
liutiexing 已提交
734
        reserved_next_ops->push(next_id);
735 736
      }
    }
737
    for (auto next_id : next_instr.EventRunIds()) {
738
      if (IsReady(next_id)) {
L
liutiexing 已提交
739
        reserved_next_ops->push(next_id);
740 741 742 743
      }
    }
  } else {
    // move async_ops into async_thread
744
    for (auto next_id : next_instr.EventRunIds()) {
745
      if (IsReady(next_id)) {
746
        async_work_queue_->AddTask(
747
            vec_instruction_[next_id].KernelType(),
748 749 750
            [this, next_id, atomic_deps, atomic_var_ref] {
              RunInstructionAsync(next_id, atomic_deps, atomic_var_ref);
            });
751 752
      }
    }
753 754
    auto direct_run_ops = interpreter::merge_vector(next_instr.SyncRunIds(),
                                                    next_instr.DirectRunIds());
755 756
    size_t first_op = 0;
    for (auto next_id : direct_run_ops) {
757 758
      if (IsReady(next_id)) {
        // only keep one op running in current thread
759 760
        if (first_op == 0) {
          first_op = next_id;
761 762 763
          continue;
        }
        // move rest ops into other threads
764
        async_work_queue_->AddTask(
765
            vec_instruction_[next_id].KernelType(),
766 767 768
            [this, next_id, atomic_deps, atomic_var_ref] {
              RunInstructionAsync(next_id, atomic_deps, atomic_var_ref);
            });
769 770
      }
    }
L
liutiexing 已提交
771
    if (first_op != 0) reserved_next_ops->push(first_op);
772 773 774
  }
}

775
void InterpreterCore::RunInstructionAsync(
776 777
    size_t instr_id,
    std::vector<std::atomic<size_t>>* atomic_deps,
778
    std::vector<std::atomic<size_t>>* atomic_var_ref) {
L
liutiexing 已提交
779 780 781 782 783
  std::queue<size_t> ready_ops;
  ready_ops.push(instr_id);
  while (!ready_ops.empty()) {
    instr_id = ready_ops.front();
    ready_ops.pop();
784
    auto& instr_node = vec_instruction_.at(instr_id);
L
liutiexing 已提交
785
    VLOG(5) << __func__ << " OP id:" << instr_node.Id()
786 787 788 789
            << " name:" << instr_node.OpBase()->Type() << " type:"
            << (instr_node.KernelType() == OpFuncType::kQueueSync
                    ? "kQueueSync"
                    : "kQueueAsync")
L
liutiexing 已提交
790 791
            << " runs on " << platform::GetCurrentThreadName();

792
    auto* op = instr_node.OpBase();
L
liutiexing 已提交
793 794
    platform::RecordEvent instruction_event(
        op->Type(), platform::TracerEventType::Operator, 1);
795

796
    try {
797 798
      interpreter::WaitEvent(instr_node, place_);

799
      RunInstruction(instr_node);
800 801 802 803

#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
      RecordStreamForGC(instr_node);
#endif
804
      CheckGC(instr_node, atomic_var_ref);
805 806

      interpreter::RecordEvent(instr_node, place_);
807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828
    } catch (platform::EnforceNotMet& ex) {
      framework::InsertCallStackInfo(op->Type(), op->Attrs(), &ex);
      exception_holder_.Catch(std::make_exception_ptr(std::move(ex)));
    } catch (platform::EOFException&) {
      exception_holder_.Catch(std::current_exception());
    } catch (std::exception& ex) {
      LOG(WARNING) << op->Type() << " raises an exception "
                   << platform::demangle(typeid(ex).name()) << ", "
                   << ex.what();
      exception_holder_.Catch(std::current_exception());
    } catch (...) {
      LOG(WARNING) << op->Type() << " raises an unknown exception";
      exception_holder_.Catch(std::current_exception());
    }

    if (UNLIKELY(exception_holder_.IsCaught())) {
      VLOG(4) << "Exception caught";
      if (exception_notifier_ != nullptr) {
        exception_notifier_->NotifyEvent();
      }
      return;
    }
829

830 831 832 833 834 835 836 837
    VLOG(4) << "unfinished_op_numer_: " << unfinished_op_numer_;
    if (UNLIKELY(unfinished_op_numer_.fetch_sub(1, std::memory_order_relaxed) ==
                 1)) {
      if (completion_notifier_ != nullptr) {
        completion_notifier_->NotifyEvent();
      }
    }

838
    RunNextInstructions(instr_node, &ready_ops, atomic_deps, atomic_var_ref);
L
liutiexing 已提交
839
  }
840 841
}

842 843 844 845 846 847
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
void InterpreterCore::RecordStreamForGC(const Instruction& instr) {
  if (!IsInterpretercoreFastGCEnabled() ||
      instr.KernelType() != OpFuncType::kQueueAsync) {
    return;
  }
848 849
  platform::RecordEvent record(
      "RecordStreamForGC", platform::TracerEventType::UserDefined, 10);
850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898

  gpuStream_t stream = reinterpret_cast<const platform::CUDADeviceContext&>(
                           instr.DeviceContext())
                           .stream();
  auto TensorRecordStream = [&stream](Tensor& tensor) {
    auto allocation = tensor.Holder();
    if (allocation == nullptr) {
      return;
    }

    const platform::Place& place = allocation->place();
    if (platform::is_gpu_place(place)) {
      memory::RecordStream(allocation, stream);
    } else if (platform::is_cuda_pinned_place(place)) {
      // TODO(Ruibiao): Here should do something to make sure that the tensor is
      // not freed until the H2D copies done. However, simplely launch a CUDA
      // runtime callback to the H2D stream may lead a high performance
      // overhead. As all the cases we meet in H2D are copies from CPUPlace at
      // present, we just log a WARNING here. A better design is required.
      LOG(WARNING) << "Copy data from a CUDAPinned tensor in an asynchronous "
                      "manner may lead a data inconsistent";
    } else {
      // memory copies involve CPUPlace are always synchronous, so just do
      // nothing here
    }
  };

  /* NOTE(Ruibiao):Cross-stream tensor synchronization is required only when
   * all the following conditions are satisfied:
   * 1. The tensor will be GC after running the instruction, i.e., in
   * instr.GCCheckVars.
   * 2. The stream which initializes this tensor is different from the stream
   * which the instruction run in.
   * 3. The tensor is the instruction's input, cause we assume that instruction
   * will initialize all output tensors with its running stream.
   * 4. In the OP function of this instruction, the tensor is an input of a
   * async CUDA kernel.
   *
   * Here we only process the first condition, because:
   * 1. Since the RecordStream function will directly return when the recored
   * stream is equal to the owning stream, recording a stream same as which
   * initialized this tensor has less time overhead. Conversely, it may take
   * more time if we try to extract those cross-stream input vars from
   * instr.GCCheckVars.
   * 2. Now the instruction has no idea of which vars involving async running in
   * OP function, and thus we can not recognize condition 4. It should be
   * supported later.
   */
  for (int var_id : instr.GCCheckVars()) {
899 900
    VLOG(4) << "GC sync " << var_scope_.GetNameById(var_id) << " "
            << var_scope_.VarDesc(var_id);
901 902

    // persistable var will be ignore while GC
903 904
    if (var_scope_.VarDesc(var_id) &&
        var_scope_.VarDesc(var_id)->Persistable()) {
905 906 907
      continue;
    }

908
    paddle::framework::Variable* var = var_scope_.VarRef(var_id);
909 910 911 912 913 914 915 916 917 918
    if (var == nullptr) {
      continue;
    }

    if (var->IsType<LoDTensor>()) {
      TensorRecordStream(*(var->GetMutable<LoDTensor>()));
    } else if (var->IsType<
                   operators::reader::
                       OrderedMultiDeviceLoDTensorBlockingQueueHolder>()) {
      // do nothing
919
    } else if (var->IsType<phi::SelectedRows>()) {
920
      TensorRecordStream(
921
          *(var->GetMutable<phi::SelectedRows>()->mutable_value()));
922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937
    } else if (var->IsType<LoDTensorArray>()) {
      auto* tensor_arr = var->GetMutable<LoDTensorArray>();
      for (auto& tensor : *tensor_arr) {
        TensorRecordStream(tensor);
      }
    } else if (var->IsType<std::vector<Scope*>>()) {
      // do nothing
    } else {
      PADDLE_THROW(platform::errors::Unimplemented(
          "The variable(%s) is not supported in eager deletion.",
          framework::ToTypeName(var->Type())));
    }
  }
}
#endif

938 939 940
void InterpreterCore::CheckGC(
    const Instruction& instr,
    std::vector<std::atomic<size_t>>* atomic_var_ref) {
941 942
  platform::RecordEvent record(
      "CheckGC", platform::TracerEventType::UserDefined, 10);
943
  size_t instr_id = instr.Id();
944
  auto& var_scope = var_scope_;
945

946
  for (auto var_id : instr.GCCheckVars()) {
947
    VLOG(4) << "GC " << var_scope_.GetNameById(var_id) << " "
L
Leo Chen 已提交
948
            << var_scope.VarDesc(var_id);
949 950
    VLOG(4) << "atomic:" << atomic_var_ref << " " << &(*atomic_var_ref)[var_id]
            << " " << var_id;
951
    bool is_ready =
952
        (*atomic_var_ref)[var_id].fetch_sub(1, std::memory_order_relaxed) == 1;
953 954 955 956 957
    // ignore all persistable var while GC
    if (var_scope.VarDesc(var_id) && var_scope.VarDesc(var_id)->Persistable()) {
      continue;
    }
    if (is_ready) {
X
xiongkun 已提交
958 959
      VLOG(6) << "Async delete variable with name : "
              << var_scope.GetNameById(var_id);
960 961 962
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
      if (IsInterpretercoreFastGCEnabled()) {
        static_cast<InterpreterCoreFastGarbageCollector*>(gc_.get())->Add(
963
            var_scope_.VarRef(var_id));
964 965 966

      } else {
        static_cast<InterpreterCoreEventGarbageCollector*>(gc_.get())->Add(
967
            var_scope_.VarRef(var_id),
968
            &gc_event_.at(instr_id),
969 970 971 972
            &instr.DeviceContext());
      }
#else
      static_cast<InterpreterCoreEventGarbageCollector*>(gc_.get())->Add(
973
          var_scope_.VarRef(var_id),
974
          &gc_event_.at(instr_id),
975 976
          &instr.DeviceContext());
#endif
W
wanghuancoder 已提交
977 978 979 980
    }
  }
}

981 982
void InterpreterCore::Prepare(
    const std::vector<std::string>& feed_names,
983 984 985 986
    const std::vector<framework::LoDTensor>& feed_tensors,
    bool prepare_feed) {
  PADDLE_ENFORCE_EQ(feed_names.size(),
                    feed_tensors.size(),
987 988 989
                    platform::errors::PreconditionNotMet(
                        "Required feed_names.size() == feed_tensors.size(), "
                        "but received %d != %d",
990 991
                        feed_names.size(),
                        feed_tensors.size()));
992

993
  auto FeedInput = [&] {
994
    VLOG(4) << "Feed inputs";
995
    for (size_t i = 0; i < feed_names.size(); ++i) {
996
      auto* feed_var = local_scope_->FindVar(feed_names[i]);
997
      PADDLE_ENFORCE_NOT_NULL(
998 999 1000
          feed_var,
          platform::errors::NotFound("Variable %s should not be nullptr.",
                                     feed_names[i]));
1001

1002
      auto feed_tensor = feed_var->GetMutable<framework::LoDTensor>();
1003
      feed_tensor->ShareDataWith(feed_tensors[i]);
1004
      feed_tensor->set_lod(feed_tensors[i].lod());
1005 1006 1007
    }
  };

1008
  if (!is_build_) {
1009
    paddle::framework::interpreter::build_variable_scope(
1010
        block_, &var_scope_, create_local_scope_);
1011
    FeedInput();
L
Leo Chen 已提交
1012
    std::vector<paddle::framework::OpFuncNode> op_func_nodes;
1013 1014 1015 1016
    paddle::framework::interpreter::build_op_func_list(place_,
                                                       block_,
                                                       skip_gc_vars_,
                                                       &op_func_nodes,
1017
                                                       &var_scope_,
1018
                                                       create_local_scope_);
1019
    is_build_ = true;
1020
    SetFeedVarsInplaceSkip(feed_names);
1021
    // convert vec func_list to graph
L
Leo Chen 已提交
1022
    Convert(&op_func_nodes);
1023
  }
W
wanghuancoder 已提交
1024 1025
  // NOTE: Because feed_tensor will be GC after
  // paddle::framework::build_op_func_list, so we should
1026
  // call FeedInput again.
1027 1028 1029
  if (prepare_feed) {
    FeedInput();
  }
1030 1031
}

1032 1033 1034
void InterpreterCore::SetFeedVarsInplaceSkip(
    const std::vector<std::string>& feed_names) {
  for (auto& feed_name : feed_names) {
1035
    var_scope_.SetVarSikpInplace(feed_name, true);
1036 1037 1038
  }
}

1039
std::shared_ptr<InterpreterCore> CreateInterpreterCore(
1040 1041
    const platform::Place& place,
    const ProgramDesc& prog,
1042
    Scope* scope,
1043
    const std::vector<std::string>& fetch_names,
1044 1045 1046 1047 1048 1049 1050 1051
    const std::set<std::string>& skip_gc_vars) {
  std::shared_ptr<InterpreterCore> core = nullptr;
  // NOTE(Aurelius84): `add_fetch` will modify BlockDesc, so we should copy
  // a new program.
  auto new_prog = std::make_shared<framework::ProgramDesc>(prog);
  auto* block = new_prog->MutableBlock(0);
  interpreter::add_fetch(fetch_names, block);

1052
  core = std::make_shared<InterpreterCore>(place, *block, skip_gc_vars, scope);
1053 1054 1055 1056
  core->SetCopyProgram(new_prog);
  return core;
}

1057 1058
}  // namespace framework
}  // namespace paddle