interpretercore.cc 52.0 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
14

15
#include "paddle/fluid/framework/new_executor/interpretercore.h"
16

17
#include <unordered_set>
18

19 20
#include "gflags/gflags.h"

21
#include "paddle/fluid/framework/details/nan_inf_utils.h"
22
#include "paddle/fluid/framework/details/share_tensor_buffer_functor.h"
23
#include "paddle/fluid/framework/new_executor/interpreter/interpreter_util.h"
24
#include "paddle/fluid/framework/operator.h"
25
#include "paddle/fluid/platform/device/gpu/gpu_info.h"
L
liutiexing 已提交
26 27
#include "paddle/fluid/platform/os_info.h"
#include "paddle/fluid/platform/profiler/event_tracing.h"
28
#include "paddle/fluid/platform/profiler/supplement_tracing.h"
29
#include "paddle/phi/common/place.h"
30
#include "paddle/phi/core/kernel_context.h"
L
Leo Chen 已提交
31 32 33
#ifdef PADDLE_WITH_MKLDNN
#include "paddle/fluid/platform/mkldnn_helper.h"
#endif
34
#include "paddle/fluid/platform/cuda_graph_with_memory_pool.h"
L
Leo Chen 已提交
35
#include "paddle/phi/backends/device_manager.h"
36

37 38 39 40
PADDLE_DEFINE_EXPORTED_bool(
    new_executor_serial_run,
    false,
    "Enable serial execution for standalone executor, used for debug.");
41
PADDLE_DEFINE_EXPORTED_bool(new_executor_use_inplace,
42
                            false,
43
                            "Use inplace in new executor");
44 45
PADDLE_DEFINE_EXPORTED_bool(new_executor_use_local_scope,
                            true,
46 47
                            "Use local_scope in new executor(especially used "
                            "in UT), can turn off for better performance");
48
PADDLE_DEFINE_EXPORTED_bool(control_flow_use_new_executor,
49
                            true,
50
                            "Use new executor in control flow op");
51

52
DECLARE_bool(check_nan_inf);
53
DECLARE_bool(benchmark);
54 55 56 57
DECLARE_bool(new_executor_use_cuda_graph);
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
DECLARE_bool(sync_nccl_allreduce);
#endif
58

59
constexpr const char* kExceptionCaught = "ExceptionCaught";
60
constexpr const char* kTaskCompletion = "TaskCompletion";
61

62 63 64
namespace paddle {
namespace framework {

L
Leo Chen 已提交
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
inline void SetDeviceId(const platform::Place& place) {
  // TODO(zhiqiu): reduce the cost
  if (platform::is_gpu_place(place)) {
#if !defined(PADDLE_WITH_CUDA) && !defined(PADDLE_WITH_HIP)
    PADDLE_THROW(platform::errors::Unavailable(
        "Cannot run operator on place %s, please recompile paddle or "
        "reinstall Paddle with CUDA support.",
        place));
#else
    auto dev_id = place.device;
    platform::SetDeviceId(dev_id);
#endif
  } else if (platform::is_xpu_place(place)) {
#ifndef PADDLE_WITH_XPU
    PADDLE_THROW(platform::errors::Unavailable(
        "Cannot run operator on place %s, please recompile paddle or "
        "reinstall Paddle with XPU support.",
        place));
#else
    auto dev_id = place.device;
    platform::SetXPUDeviceId(dev_id);
#endif
  } else if (platform::is_npu_place(place)) {
#ifndef PADDLE_WITH_ASCEND_CL
    PADDLE_THROW(platform::errors::Unavailable(
        "Cannot run operator on place %s, please recompile paddle or "
        "reinstall Paddle with NPU support.",
        place));
#else
    auto dev_id = place.device;
    platform::SetNPUDeviceId(dev_id);
#endif
  } else if (platform::is_custom_place(place)) {
#ifndef PADDLE_WITH_CUSTOM_DEVICE
    PADDLE_THROW(platform::errors::Unavailable(
        "Cannot run operator on place %s, please recompile paddle or "
        "reinstall Paddle with CustomDevice support.",
        place));
#else
    phi::DeviceManager::SetDevice(place);
#endif
  }
}

109 110
InterpreterCore::InterpreterCore(const platform::Place& place,
                                 const BlockDesc& block,
111
                                 framework::Scope* scope,
112
                                 const ExecutionConfig& execution_config)
W
wanghuancoder 已提交
113
    : place_(place),
114
      block_(block),
R
Ruibiao Chen 已提交
115
      stream_analyzer_(place),
116
      execution_config_(execution_config),
R
Ruibiao Chen 已提交
117
      var_scope_(scope) {
L
Leo Chen 已提交
118
  VLOG(4) << "InterpreterCore(): " << this << " on " << place_;
119

L
liutiexing 已提交
120
  exception_notifier_ = main_thread_blocker_.RegisterEvent(kExceptionCaught);
121
  completion_notifier_ = main_thread_blocker_.RegisterEvent(kTaskCompletion);
122

123 124 125 126
  if (!FLAGS_new_executor_use_local_scope) {
    execution_config_.create_local_scope = false;
  }
  execution_config_.AnalyzeThreadPoolConfig(place, block.OpSize());
127
  execution_config_.Log(/*log_level=*/8);
128

129
  if (execution_config_.create_local_scope) {
130
    auto local_scope = &var_scope_.GetMutableScope()->NewScope();
131 132
    local_scope_ = local_scope;
  }
133
  var_scope_.SetLocalScope(local_scope_);
134

135 136 137 138 139 140
  instruction_scheduling_priority_less = [this](size_t lhs, size_t rhs) {
    SchedulingPriority lhs_scheduling_priority =
        vec_instruction_[lhs].GetSchedulingPriority();
    SchedulingPriority rhs_scheduling_priority =
        vec_instruction_[rhs].GetSchedulingPriority();
    if (lhs_scheduling_priority == rhs_scheduling_priority) {
141
      return lhs < rhs;
142
    }
143
    return lhs_scheduling_priority > rhs_scheduling_priority;
144
  };
145 146

  PrepareForCUDAGraphCapture();
W
wanghuancoder 已提交
147 148
}

149 150 151
InterpreterCore::~InterpreterCore() {
  // cancle gc's thread
  gc_.reset(nullptr);
152 153
  async_work_queue_.reset();
  VLOG(4) << "~InterpreterCore(): " << this << " on " << place_;
L
Leo Chen 已提交
154 155 156 157 158 159

#ifdef PADDLE_WITH_MKLDNN
  // Clear mkl-dnn cache,
  // this is needed to have mkl-dnn unit tests working
  platform::ClearMKLDNNCache(place_, this);
#endif
160 161
}

162 163
interpreter::CostInfo InterpreterCore::DryRun(
    const std::vector<std::string>& feed_names,
164
    const std::vector<phi::DenseTensor>& feed_tensors) {
L
Leo Chen 已提交
165
  SetDeviceId(place_);
166
  CheckCUDAGraphBeforeRun(feed_names);
L
Leo Chen 已提交
167

168 169 170 171 172 173 174 175 176 177
  Prepare(feed_names, feed_tensors, true);
  interpreter::CostInfo cost_info;
  {
    interpreter::ProfilerGuard(place_, &cost_info);

    // For the program that only run once, it is no need to
    // create work_queue, so the async_work_queue_ is created
    // until the second step run.
    async_work_queue_ = GetWorkQueue();

L
Leo Chen 已提交
178 179 180 181 182
    // lazy initialization of gc, do not create gc is the program only run once
    if (!gc_) {
      gc_ = CreateInterpreterCoreGarbageCollector(place_, vec_instruction_);
    }

183 184 185 186
    ExecuteInstructionList(vec_instruction_);
    platform::DeviceContextPool::Instance().Get(place_)->Wait();
  }

L
Leo Chen 已提交
187
  if (HasLocalScope()) {
188 189 190 191
    ClearLoDTensorArrayInLocalScope();
  }

  return cost_info;
192 193
}

194 195 196 197 198 199
void InterpreterCore::RunImpl() {
  // lazy initialization of gc, do not create gc is the program only run once
  if (!gc_) {
    gc_ = CreateInterpreterCoreGarbageCollector(place_, vec_instruction_);
  }

200 201
  if ((execution_config_.used_for_jit || execution_config_.used_for_cinn) &&
      (sync_op_num_ == 0)) {
202 203 204
    VLOG(4) << "Tracing Instruction List";
    TraceInstructionList(vec_instruction_);
  } else {
205 206 207 208 209
    VLOG(4) << "Non-tracing";
    // For the program that only run once, it is no need to
    // create work_queue, so the async_work_queue_ is created
    // until the second step run.
    async_work_queue_ = GetWorkQueue();
210 211 212 213 214 215 216 217 218 219 220 221 222 223
    ExecuteInstructionList(vec_instruction_);
  }
#ifdef PADDLE_WITH_ASCEND_CL
  if (platform::is_npu_place(place_)) {
    platform::DeviceContextPool::Instance().Get(place_)->Wait();
  }
#endif
#ifdef PADDLE_WITH_CUSTOM_DEVICE
  if (platform::is_custom_place(place_)) {
    platform::DeviceContextPool::Instance().Get(place_)->Wait();
  }
#endif
}

W
wanghuancoder 已提交
224
paddle::framework::FetchList InterpreterCore::Run(
225
    const std::vector<std::string>& feed_names,
226
    const std::vector<phi::DenseTensor>& feed_tensors) {
L
Leo Chen 已提交
227
  SetDeviceId(place_);
228
  CheckCUDAGraphBeforeRun(feed_names);
229

L
Leo Chen 已提交
230 231 232
#ifdef PADDLE_WITH_MKLDNN
  platform::AttachPointerHashToMKLDNNKey(this, place_);
#endif
233

234 235
  bool is_build = is_build_;
  Prepare(feed_names, feed_tensors, is_build);
236

237
  if (is_build) {
238
    RunImpl();
239
  }
240

L
Leo Chen 已提交
241
  if (HasLocalScope()) {
242 243 244
    ClearLoDTensorArrayInLocalScope();
  }

W
wanghuancoder 已提交
245
  // return Fetch Tensors
246 247
  auto* fetch_var = local_scope_->FindVar(interpreter::kFetchVarName);
  if (fetch_var) {
248 249 250 251 252 253 254 255 256 257
    auto fetch_list = std::move(*fetch_var->GetMutable<framework::FetchList>());
#ifdef PADDLE_WITH_CUDA
    if (platform::IsCUDAGraphCapturing()) {
      PADDLE_ENFORCE_EQ(fetch_list.empty(),
                        true,
                        platform::errors::InvalidArgument(
                            "Cannot fetch data when using CUDA Graph."));
    }
#endif
    return fetch_list;
258 259 260
  } else {
    return {};
  }
261 262
}

263
paddle::framework::FetchList InterpreterCore::Run(
264
    const std::vector<std::string>& feed_names, bool need_fetch) {
L
Leo Chen 已提交
265
  SetDeviceId(place_);
266
  CheckCUDAGraphBeforeRun(feed_names);
267

L
Leo Chen 已提交
268 269 270
#ifdef PADDLE_WITH_MKLDNN
  platform::AttachPointerHashToMKLDNNKey(this, place_);
#endif
271

272
  if (!is_build_) {
273
    LOG_FIRST_N(INFO, 1) << "New Executor is Running.";
L
Leo Chen 已提交
274
    paddle::framework::interpreter::BuildVariableScope(
275
        block_, execution_config_, &var_scope_);
276

277
    std::vector<paddle::framework::OpFuncNode> op_func_nodes;
278
    auto skip_run = paddle::framework::interpreter::BuildOpFuncList(
279 280 281 282 283
        place_,
        block_,
        execution_config_.skip_gc_vars,
        &op_func_nodes,
        &var_scope_,
284 285
        execution_config_,
        HasLocalScope());
286
    SetFeedVarsInplaceSkip(feed_names);
287 288
    // convert vec func_list to graph
    Convert(&op_func_nodes);
289
    is_build_ = true;
290
    UpdateSyncOpNum();
291 292 293
    if (skip_run) {
      VLOG(4) << "RUN impl";
      RunImpl();
294
    }
295 296
  } else {
    RunImpl();
297 298
  }

L
Leo Chen 已提交
299
  if (HasLocalScope()) {
300 301
    ClearLoDTensorArrayInLocalScope();
  }
L
Leo Chen 已提交
302

303
  // return Fetch Tensors
L
Leo Chen 已提交
304 305
  Scope* inner_scope =
      HasLocalScope() ? local_scope_ : var_scope_.GetMutableScope();
306
  auto* fetch_var = inner_scope->FindVar(interpreter::kFetchVarName);
307
  if (fetch_var && need_fetch) {
308 309 310 311 312 313 314 315 316 317
    auto fetch_list = std::move(*fetch_var->GetMutable<framework::FetchList>());
#ifdef PADDLE_WITH_CUDA
    if (platform::IsCUDAGraphCapturing()) {
      PADDLE_ENFORCE_EQ(fetch_list.empty(),
                        true,
                        platform::errors::InvalidArgument(
                            "Cannot fetch data when using CUDA Graph."));
    }
#endif
    return fetch_list;
318 319 320
  } else {
    return {};
  }
321 322
}

323 324 325 326
void InterpreterCore::SetCopyProgram(std::shared_ptr<ProgramDesc> prog) {
  copy_program_ = prog;
}

327 328
void InterpreterCore::SetSkipGcVars(const std::set<std::string>& skip_gc_vars) {
  PADDLE_ENFORCE_EQ(
329
      execution_config_.skip_gc_vars.empty(),
330 331
      true,
      platform::errors::PreconditionNotMet(
332 333
          "execution_config_.skip_gc_vars can only be initialized once, now "
          "execution_config_.skip_gc_vars is "
334
          "not empty, do not call SetSkipGcVars method repeatedly."));
335
  execution_config_.skip_gc_vars = skip_gc_vars;
336 337
}

338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353
void InterpreterCore::SetJitInputVars(
    const std::set<std::string>& jit_input_vars) {
  PADDLE_ENFORCE_EQ(
      execution_config_.jit_input_vars.empty(),
      true,
      platform::errors::PreconditionNotMet(
          "execution_config_.jit_input_vars can only be initialized once, now "
          "execution_config_.jit_input_vars is "
          "not empty, do not call SetJitInputVars method repeatedly."));
  execution_config_.jit_input_vars = jit_input_vars;
}

const std::set<std::string>& InterpreterCore::JitInputVars() const {
  return execution_config_.jit_input_vars;
}

354 355 356 357 358 359 360 361
const VariableScope* InterpreterCore::GetVariableScope() const {
  return &var_scope_;
}

void InterpreterCore::reset_scope(Scope* new_scope) {
  var_scope_.SetScope(new_scope);
  auto& var_list = var_scope_.MutableVarList();
  for (size_t i = 0; i < var_list.size(); i++) {
362 363
    const auto& var_name = var_scope_.GetNameById(i);
    var_list[i] = new_scope->FindVar(var_name);
364
  }
365 366 367 368 369 370 371 372 373
  // The index should assured valid, cause the InterpreterCore may not be fully
  // built, but was still cached and used. For example, see unit test
  // `test_assert.py`, it may exit before `InterpreterCore::Convert`, but still
  // was cached and used by later tests.
  for (size_t i = 0; i < std::min(refs_.size(), var_list.size()); i++) {
    refs_[i]->ResetVariable(var_list[i]);
  }

  for (size_t i = 0; i < vec_instruction_.size(); i++) {
374 375 376 377
    BuildAndCacheInstructionCtx(&vec_instruction_[i]);
  }
}

378 379
void InterpreterCore::ShareWorkQueueFrom(std::shared_ptr<InterpreterCore> src) {
  async_work_queue_ = src->GetWorkQueue();
380
  VLOG(8) << "Share AsyncWorkQueue from InterpreterCore(" << src.get()
381 382 383
          << ") to InterpreterCore(" << this << ")";
}

384 385
bool InterpreterCore::BuildInplaceCheckVarIsOnlyInput(
    const std::vector<std::vector<size_t>>& input_var2op, size_t var_index) {
386
  if (!var_scope_.VarDesc(var_index)) {
387
    return input_var2op.at(var_index).size() == 1;
388 389
  } else {
    int is_input_cnt = 0;
390
    for (auto inst_id : input_var2op.at(var_index)) {
391 392
      OpInOutInfo info;
      info.Build(vec_instruction_.at(inst_id).OpBase());
393
      if (info.IsInArgBufferNeeded(var_scope_.VarDesc(var_index)->Name())) {
394 395 396 397 398 399 400 401 402
        is_input_cnt++;
      }
    }
    return is_input_cnt == 1;
  }
}

std::shared_ptr<interpreter::AsyncWorkQueue> InterpreterCore::GetWorkQueue() {
  if (async_work_queue_ == nullptr) {
403 404
    async_work_queue_ = std::make_shared<interpreter::AsyncWorkQueue>(
        execution_config_.host_num_threads,
405
        execution_config_.device_num_threads,
406
        nullptr);
407 408 409 410 411
  }
  return async_work_queue_;
}

void InterpreterCore::BuildAndCacheInstructionCtx(Instruction* instr_node) {
L
Leo Chen 已提交
412 413
  Scope* inner_scope =
      HasLocalScope() ? local_scope_ : var_scope_.GetMutableScope();
414 415 416 417 418 419
  VariableValueMap ins_map;
  for (auto& var_name_item : instr_node->Inputs()) {
    std::vector<Variable*> input_vars;

    input_vars.reserve(var_name_item.second.size());
    for (auto& id : var_name_item.second) {
420
      input_vars.emplace_back(inner_scope->FindVar(var_scope_.GetNameById(id)));
421 422 423 424 425 426 427 428 429 430
    }
    ins_map.emplace(var_name_item.first, std::move(input_vars));
  }

  VariableValueMap outs_map;
  for (auto& var_name_item : instr_node->Outputs()) {
    std::vector<Variable*> out_vars;

    out_vars.reserve(var_name_item.second.size());
    for (auto& id : var_name_item.second) {
431
      out_vars.emplace_back(inner_scope->FindVar(var_scope_.GetNameById(id)));
432 433 434 435 436
    }
    outs_map.emplace(var_name_item.first, std::move(out_vars));
  }

  // set runtime_ctx and infershape_ctx_
437 438 439
  if (instr_node->OpBase()->Type() == "cinn_launch" ||
      instr_node->OpBase()->Type() == "cinn_instruction_run") {  // OP use scope
                                                                 // in kernel
L
Leo Chen 已提交
440 441
    Scope* local_scope = HasLocalScope() ? var_scope_.GetMutableLocalScope()
                                         : var_scope_.GetMutableScope();
442 443 444 445 446 447 448
    instr_node->ResetContextWithScope(ins_map, outs_map, *local_scope);
  } else {
    instr_node->ResetContext(ins_map, outs_map);
  }
}

void InterpreterCore::BuildInplace() {
449 450 451 452
  // NOTE(Ruibiao): coalesce_tensor_op outputs a FusedOutput phi::DenseTensor
  // and a list of Output Tensors which are sliced from the FusedOutput. These
  // outputs sholud not be the outvar of the in-place var-pair since memory
  // reuse between FusedOutput and Output Tensors is assumed. For the following
453 454 455 456 457 458 459 460 461 462 463
  // example:
  // fused_var, var1, var2, var3 = coalesce_tensor(var1, var2, var3)
  // var1 = sum(var4, var5)
  // ...
  //
  // After running coalesce_tensor_op, var1 is assumed to share the buffer
  // slices from fused_var. However, if sum_op is in-place, then var1 would
  // re-share the buffer with var4 instead of fused_var.
  std::set<std::string> skip_inplace_outvars;
  for (Instruction& instr : vec_instruction_) {
    OperatorBase* op = instr.OpBase();
464
    if (op->Type() == kCoalesceTensor) {
465 466 467 468 469 470
      const std::vector<std::string>& outputs =
          op->OutputVars(/*has_intermediate=*/false);
      skip_inplace_outvars.insert(outputs.begin(), outputs.end());
    }
  }

L
Leo Chen 已提交
471 472
  Scope* local_scope = HasLocalScope() ? var_scope_.GetMutableLocalScope()
                                       : var_scope_.GetMutableScope();
473 474 475 476 477 478 479 480 481 482
  std::vector<std::vector<size_t>> input_var2op(var_scope_.VarSize());
  for (Instruction& instr : vec_instruction_) {
    for (auto& item : instr.Inputs()) {
      for (int var_id : item.second) {
        if (var_id != kEmptyVarIndex) {
          input_var2op.at(var_id).push_back(instr.Id());
        }
      }
    }
  }
483

484 485 486 487 488 489 490 491 492 493 494 495 496 497 498
  for (size_t i = 0; i < vec_instruction_.size(); ++i) {
    auto& instr = vec_instruction_[i];
    auto* op_base = instr.OpBase();
    if (!op_base->Info().infer_inplace_) {
      continue;
    }

    auto in_to_outs = op_base->Info().infer_inplace_(
        platform::is_gpu_place(instr.DeviceContext().GetPlace()));

    auto& inputs = instr.Inputs();
    auto& outputs = instr.Outputs();
    for (auto& pair : in_to_outs) {
      auto iter = inputs.find(pair.first);
      if (iter != inputs.end() && !iter->second.empty()) {
499
        auto in_var_desc = var_scope_.VarDesc(iter->second[0]);
500 501 502
        if (in_var_desc && in_var_desc->Persistable()) {
          continue;
        }
503
        if (var_scope_.GetVarSikpInplace(iter->second[0])) {
504 505
          continue;
        }
506
        if (BuildInplaceCheckVarIsOnlyInput(input_var2op, iter->second[0])) {
507 508
          auto iterout = outputs.find(pair.second);
          if (iterout != outputs.end() && !iterout->second.empty()) {
509 510 511 512
            const std::string& invar_name =
                var_scope_.GetNameById(iter->second[0]);
            const std::string& outvar_name =
                var_scope_.GetNameById(iterout->second[0]);
513 514
            auto invar = local_scope->FindVar(invar_name);
            auto outvar = local_scope->FindVar(outvar_name);
515

516 517
            if (invar && outvar && invar->IsType<phi::DenseTensor>() &&
                outvar->IsType<phi::DenseTensor>() &&
518 519
                skip_inplace_outvars.find(outvar_name) ==
                    skip_inplace_outvars.end()) {
520
              instr.AddInplace(invar, outvar);
521 522
              VLOG(3) << "inplace " << op_base->Type() << " " << invar_name
                      << " -> " << outvar_name;
523 524 525 526
            }
          }
        }
      }
527 528 529 530
    }
  }
}

531 532 533 534 535 536 537 538 539 540 541 542 543 544 545
void InterpreterCore::PrepareForCUDAGraphCapture() {
  if (!FLAGS_new_executor_use_cuda_graph) return;
#ifdef PADDLE_WITH_CUDA
  PADDLE_ENFORCE_EQ(platform::is_gpu_place(place_),
                    true,
                    platform::errors::InvalidArgument(
                        "CUDA Graph is only supported on NVIDIA GPU device."));
  // If set true, will call `cudaStreamSynchronize(nccl_stream)`after allreduce.
  // which may cause error in cuda graph. This behavior is consistent with PE.
  PADDLE_ENFORCE_EQ(FLAGS_sync_nccl_allreduce,
                    false,
                    platform::errors::InvalidArgument(
                        "FLAGS_sync_nccl_allreduce must be False to support "
                        "CUDA Graph capturing."));

546
  // All output vars of coalesce_tensor op should be persistable.
547 548 549 550 551
  // If fused output var of coalesce_tensor is gc, it will cause accuracy
  // problem. The specific reasons need to be analyzed.
  for (auto& op_desc : block_.AllOps()) {
    if (op_desc->Type() == kCoalesceTensor) {
      for (auto& out_var_name : op_desc->OutputArgumentNames()) {
552 553 554 555 556 557 558 559 560 561 562
        // The fused var needs to be set to persistable, not just added to
        // skip_gc_vars.
        // In the case where the feed fetch var is changed, StandaloneExecutor
        // will be newly constructed. If the fused var is not persistable,
        // these vars will be recreated and initialized, resulting in
        // precision problems.
        auto* out_var = op_desc->Block()->FindVarRecursive(out_var_name);
        if (out_var) {
          out_var->SetPersistable(true);
          VLOG(4) << "Mark Var(" << out_var_name << ") as Persistable.";
        }
563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595
      }
    }
  }
#else
  PADDLE_THROW(platform::errors::Unimplemented(
      "CUDA Graph is only supported on NVIDIA GPU device."));
#endif
}

void InterpreterCore::CheckCUDAGraphBeforeRun(
    const std::vector<std::string>& feed_names) {
#ifdef PADDLE_WITH_CUDA
  if (platform::IsCUDAGraphCapturing()) {
    PADDLE_ENFORCE_EQ(
        feed_names.empty(),
        true,
        platform::errors::InvalidArgument(
            "Feeding data is not permitted when capturing CUDA Graph."));
    PADDLE_ENFORCE_EQ(
        FLAGS_new_executor_use_cuda_graph,
        true,
        platform::errors::InvalidArgument(
            "You must turn on FLAGS_new_executor_use_cuda_graph to True "
            "to enable CUDA Graph capturing."));
    PADDLE_ENFORCE_EQ(
        place_,
        platform::CUDAGraphCapturingPlace(),
        platform::errors::InvalidArgument("The place to capture CUDAGraph is "
                                          "not the same as the place to run."));
  }
#endif
}

X
xiongkun 已提交
596
void InterpreterCore::BuildOperatorDependences() {
R
Ruibiao Chen 已提交
597 598 599 600
  // analysis the dependences between ops, add next_instr_list to each instr,
  // and set the dependecy_count_
  size_t instr_num = vec_instruction_.size();
  dependecy_count_.resize(instr_num);
601
  auto downstream_map = dependency_builder_.Build(vec_instruction_);
X
xiongkun 已提交
602

R
Ruibiao Chen 已提交
603 604 605 606
  for (size_t instr_id = 0; instr_id < instr_num; ++instr_id) {
    Instruction& cur_instr = vec_instruction_[instr_id];
    const std::set<size_t>& next_instr_ids = downstream_map[instr_id];

607
    if (FLAGS_new_executor_serial_run) {
R
Ruibiao Chen 已提交
608
      for (size_t next_instr_id : next_instr_ids) {
609
        cur_instr.AddNextInstrInSameThread(next_instr_id);
R
Ruibiao Chen 已提交
610 611
      }
    } else {
612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631
      if (cur_instr.KernelType() == OpFuncType::kGpuAsync) {
        for (size_t next_instr_id : next_instr_ids) {
          if (vec_instruction_[next_instr_id].KernelType() ==
              OpFuncType::kGpuAsync) {
            cur_instr.AddNextInstrInSameThread(next_instr_id);
          } else {
            cur_instr.AddNextInstrInDifferentThread(next_instr_id);
          }
        }
      } else {
        bool has_instr_in_same_thread = false;
        for (size_t next_instr_id : next_instr_ids) {
          if (!has_instr_in_same_thread &&
              vec_instruction_[next_instr_id].KernelType() !=
                  OpFuncType::kGpuAsync) {
            cur_instr.AddNextInstrInSameThread(next_instr_id);
            has_instr_in_same_thread = true;
          } else {
            cur_instr.AddNextInstrInDifferentThread(next_instr_id);
          }
R
Ruibiao Chen 已提交
632 633 634 635 636 637
        }
      }
    }

    for (size_t next_instr_id : next_instr_ids) {
      ++dependecy_count_[next_instr_id];
X
xiongkun 已提交
638 639 640 641
    }
  }
}

642 643 644
// At the end of each step, the holder of phi::DenseTensor in LoDTensorArray is
// null. Clear these Tensors and leave LoDTensorArray empty, otherwise an
// exception will occur in the next step
645 646 647 648 649 650 651 652 653 654
void InterpreterCore::ClearLoDTensorArrayInLocalScope() {
  auto vars = local_scope_->LocalVars();
  for (auto var : vars) {
    if (var->IsType<LoDTensorArray>()) {
      auto* lod_tensor_arr = var->GetMutable<LoDTensorArray>();
      lod_tensor_arr->clear();
    }
  }
}

L
Leo Chen 已提交
655 656
void InterpreterCore::Convert(
    std::vector<paddle::framework::OpFuncNode>* op_func_nodes) {
657
  auto& vec_meta_info = var_scope_.MutableVecMetaInfo();
L
Leo Chen 已提交
658 659
  auto nodes = *op_func_nodes;
  auto op_nums = nodes.size();
660 661
  vec_instruction_.reserve(op_nums);
  for (size_t op_idx = 0; op_idx < op_nums; ++op_idx) {
L
Leo Chen 已提交
662
    auto& op_func_node = nodes[op_idx];
663
    auto* dev_ctx_ = stream_analyzer_.ParseDeviceContext(op_func_node);
664
    vec_instruction_.emplace_back(op_idx, std::move(op_func_node), *dev_ctx_);
665
  }
666

667
  BuildOperatorDependences();
668

669 670 671 672 673 674 675
  // NOTE(Ruibiao): For cross-step stream synchronization, an event may be
  // recorded in the first step and waited in the second step. So, in the first
  // step, the WaitEvent may be called without RecordEvent. Considering that
  // before the first call to RecordEvent, an Event represents an empty set of
  // work and WaitEvent always return succeed immediately, we omit the
  // prelude-record for the first step here.
  stream_analyzer_.ConstructEvents(&vec_instruction_);
R
Ruibiao Chen 已提交
676

677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700
  // add event for the input var of jit program, since there are async copied
  // from gpu_pinned place to gpu place on compute stream.
  for (size_t i = 0; i < dependecy_count_.size(); ++i) {
    if (dependecy_count_[i] == 0) {
      auto& inst = vec_instruction_[i];
      if (inst.OpBase()->Type() == interpreter::kMemcpyD2H &&
          platform::is_gpu_place(place_)) {
        for (auto& item : inst.Inputs()) {
          for (auto var_id : item.second) {
            auto name = var_scope_.GetNameById(var_id);
            if (JitInputVars().count(name)) {
              auto device_event = std::make_shared<platform::DeviceEvent>(
                  place_, platform::GenerateDeviceEventFlag());
              VLOG(4) << "Add input event for input: " << name << " of "
                      << inst.OpBase()->Type();
              inst.AddEventToWait(
                  i, device_event, stream_analyzer_.GetWaiterType(inst));
            }
          }
        }
      }
    }
  }

701 702
  // calculate last_live_ops_
  for (size_t op_idx = 0; op_idx < op_nums; ++op_idx) {
703
    Instruction& instr = vec_instruction_[op_idx];
704
    OpInOutInfo info;
705 706 707 708 709 710 711 712 713 714 715
    info.Build(instr.OpBase());

    std::set<size_t> gc_check_vars;

    const std::map<std::string, std::vector<int>>& ins = instr.Inputs();
    const std::map<std::string, std::vector<int>>& outs = instr.Outputs();
    std::multimap<std::string, std::vector<int>> ins_and_outs{ins.begin(),
                                                              ins.end()};
    ins_and_outs.insert(outs.begin(), outs.end());

    for (auto& item : ins_and_outs) {
716
      for (auto id : item.second) {
W
wanghuancoder 已提交
717 718 719
        if (id == kEmptyVarIndex) {
          continue;
        }
720
        auto* var_desc = var_scope_.VarDesc(id);
721 722 723 724
        // skip no_need_buffer input vars
        if (var_desc && ins.count(item.first) &&
            !info.IsInArgBufferNeeded(var_desc->Name())) {
          continue;
725 726 727 728 729 730 731 732 733 734 735 736 737 738
        }
        // skip when this var is not in block and not a data_transferred var,
        // which means this var is managed by other block
        const auto& var_name = var_scope_.GetNameById(id);
        bool not_owned = !block_.HasVar(var_name);
        const auto& transferred_vars = var_scope_.DataTransferAddedVars();
        bool not_transferred =
            std::all_of(transferred_vars.begin(),
                        transferred_vars.end(),
                        [&](const std::pair<std::string, int>& elem) {
                          return elem.first != var_name;
                        });
        if (not_owned && not_transferred) {
          VLOG(10) << "[gc_check_inputs] skip gc: " << var_name;
739
          continue;
W
wanghuancoder 已提交
740
        }
741
        gc_check_vars.insert(id);
742 743
      }
    }
744 745

    for (auto var_id : gc_check_vars) {
L
Leo Chen 已提交
746 747
      Scope* inner_scope =
          HasLocalScope() ? local_scope_ : var_scope_.GetMutableScope();
748
      paddle::framework::Variable* var =
749
          inner_scope->FindVar(var_scope_.GetNameById(var_id));
750
      if (var->IsType<phi::DenseTensor>() || var->IsType<phi::SelectedRows>() ||
751
          var->IsType<LoDTensorArray>()) {
752
        last_live_ops_[var_id].insert(op_idx);
753
      } else {
754 755
        VLOG(4) << "not clear " << var_scope_.GetNameById(var_id) << " after "
                << instr.OpBase()->Type() << " because its type is "
756 757
                << framework::ToTypeName(var->Type());
      }
758 759
    }
  }
760

761
  // clear the last_live_ops list for all vars in skip_gc_vars
762
  for (const std::string& skip_gc_var : execution_config_.skip_gc_vars) {
763
    int var_id = var_scope_.GetIdByName(skip_gc_var);
764 765 766 767 768 769
    if (var_id != -1) {
      last_live_ops_[var_id].clear();
      VLOG(8) << "Skip gc for var: " << skip_gc_var;
    }
  }

770 771 772 773 774 775 776 777 778 779 780 781 782
  // shrink, find the downstream op that has no other op in the
  // downstream list happens before it
  // For example,
  // b = op1(a)
  // c = op2(a, b)
  // in this case, a is the input of op1 and op2, we only need to check
  // a after op2, because op2 always uses a after op1.
  for (size_t i = 0; i < last_live_ops_.size(); ++i) {
    std::set<size_t> minumum_last_live_ops;
    for (size_t item : last_live_ops_[i]) {
      bool not_before_any = true;
      // find the op that is not executed before any
      for (size_t other_item : last_live_ops_[i]) {
783
        if (dependency_builder_.OpHappensBefore(item, other_item)) {
784 785 786 787 788 789 790 791
          VLOG(8) << "happens_before: " << item << "->" << other_item
                  << ", so skip " << item;
          not_before_any = false;
          break;
        }
      }
      if (not_before_any) {
        VLOG(8) << "last live op of var " << i << " "
792
                << var_scope_.GetNameById(i) << " : " << item << " "
793 794 795 796 797 798 799 800
                << vec_instruction_[item].OpBase()->Type();
        minumum_last_live_ops.insert(item);
        vec_instruction_[item].AddGCCheckVar(i);
      }
    }
    last_live_ops_[i] = minumum_last_live_ops;
    vec_meta_info[i].var_ref_count_ = last_live_ops_[i].size();
  }
801 802

  for (size_t i = 0; i < vec_instruction_.size(); ++i) {
803
    BuildAndCacheInstructionCtx(&vec_instruction_[i]);
804
  }
W
wanghuancoder 已提交
805

806
  BuildSkipShareLoDInfo();
L
Leo Chen 已提交
807

808
  bool inplaced = false;
R
Ruibiao Chen 已提交
809
  for (const Instruction& inst : vec_instruction_) {
810 811 812 813 814 815
    if (inst.OpBase()->Type() == "share_buffer" ||
        inst.OpBase()->Type() == "share_data") {
      VLOG(4) << "Already inplaced, skip inplace now.";
      inplaced = true;
    }
  }
816

817
  if (FLAGS_new_executor_use_inplace && !inplaced) {
818 819 820
    BuildInplace();
  }

821 822 823 824 825 826 827
  for (auto& dep : dependecy_count_) {
    deps_.emplace_back(std::make_shared<interpreter::OpDepInfo>(dep));
  }
  for (size_t i = 0; i < vec_meta_info.size(); ++i) {
    refs_.emplace_back(std::make_shared<interpreter::VarRefInfo>(
        vec_meta_info[i].var_ref_count_, var_scope_.VarRef(i)));
  }
828 829

  AnalyseExecuteOrderForTrace();
830 831
}

832 833 834
void InterpreterCore::BuildSkipShareLoDInfo() {
  for (size_t i = 0; i < vec_instruction_.size(); ++i) {
    bool can_skip_lod = true;
835
    for (auto& input : vec_instruction_[i].InnerRuntimeContext()->inputs) {
836
      for (auto& var : input.second) {
837 838
        if (var->IsType<phi::DenseTensor>()) {
          if (var->Get<phi::DenseTensor>().lod().size() != 0) {
839 840 841 842 843 844 845 846 847
            can_skip_lod = false;
            break;
          }
        } else {
          can_skip_lod = false;
          break;
        }
      }
    }
848
    vec_instruction_[i].InnerInferShapeContext()->SetSkipLoD(can_skip_lod);
849 850 851
  }
}

852
void InterpreterCore::RunOperator(const Instruction& instr_node) {
853 854
  auto* op = instr_node.OpBase();
  auto place = instr_node.DeviceContext().GetPlace();
L
Leo Chen 已提交
855 856
  Scope* local_scope = HasLocalScope() ? var_scope_.GetMutableLocalScope()
                                       : var_scope_.GetMutableScope();
857
  VLOG(4) << "Start run " << place << " " << op->DebugStringEx(local_scope);
L
Leo Chen 已提交
858

859
#ifdef PADDLE_WITH_ASCEND_CL
860
  if (platform::is_npu_place(place)) {
861 862 863
    // NOTE(wangxi): nan/inf cannot be detected on NPU by checking the
    // variable values, but only through special `float_status` to checks
    // whether the operation is overflow. More about `float_status`, see:
864 865 866 867
    // https://gitee.com/ascend/modelzoo/issues/I3NF8V?from=project-issue
    if (FLAGS_check_nan_inf) {
      framework::details::NPUAllocAndClearFloatStatus(*op, *local_scope, place);
    }
868 869 870
  }
#endif

871
  auto op_with_kernel = dynamic_cast<const framework::OperatorWithKernel*>(op);
872
  {
873
    // If it is OperatorBase, InferShape do nothing.
874 875
    if (op_with_kernel != nullptr) {
      platform::RecordEvent infershape_event(
876 877 878
          "infer_shape",
          platform::TracerEventType::OperatorInner,
          1,
879
          platform::EventRole::kInnerOp);
880 881 882 883 884 885 886

      // see OperatorWithKernel::RunImpl in operator.cc for why
      if (!(op_with_kernel->HasAttr(kAllKernelsMustComputeRuntimeShape) &&
            op_with_kernel->Attr<bool>(kAllKernelsMustComputeRuntimeShape))) {
        op_with_kernel->Info().infer_shape_(
            instr_node.InnerInferShapeContext().get());
      }
887 888 889 890
      infershape_event.End();
      platform::RecordOpInfoSupplement(op->Type(),
                                       op->Attrs(),
                                       *(instr_node.InnerInferShapeContext()),
891 892
                                       *(instr_node.InnerRuntimeContext()),
                                       op->Id());
893
    }
894
  }
895 896
  if (op_with_kernel != nullptr && FLAGS_new_executor_use_inplace) {
    // TODO(xiongkun03) Does operator base support inplace ?
897
    for (auto& pair : instr_node.InplaceInfo()) {
898 899 900 901 902 903 904 905
      const auto& in = paddle::framework::details::GetTensorFromVar(pair.first);
      auto* out =
          paddle::framework::details::GetMutableTensorFromVar(pair.second);
      if (in.dims() == out->dims()) {
        out->ShareBufferWith(in);
      }
    }
  }
906

907
  {
908
    platform::RecordEvent compute_event(
909 910 911
        "compute",
        platform::TracerEventType::OperatorInner,
        1,
912
        platform::EventRole::kInnerOp);
913 914 915
    if (op_with_kernel == nullptr) {
      instr_node.OpBase()->Run(*local_scope, place_);
    } else {
916 917 918
      // fit for phi
      if (instr_node.PhiKernel() && instr_node.PhiKernel()->IsValid()) {
        VLOG(4) << "Run phi kernel: " << op->Type();
919 920
        VLOG(4) << instr_node.InnerRuntimeContext().get() << " "
                << &instr_node.DeviceContext();
921
        phi::KernelContext phi_kernel_context;
922
        op_with_kernel->BuildPhiKernelContext(
923
            *instr_node.InnerRuntimeContext().get(),
924
            const_cast<platform::DeviceContext*>(&instr_node.DeviceContext()),
925
            &phi_kernel_context);
926

927
        (*instr_node.PhiKernel())(&phi_kernel_context);
928 929 930 931

      } else {
        instr_node.KernelFunc()(*instr_node.InnerExecutionContext().get());
      }
932
    }
933
  }
934

935
  VLOG(4) << "End run " << place << " " << op->DebugStringEx(local_scope);
936

937
  if (!instr_node.InplaceBackMap().empty()) {
L
liutiexing 已提交
938 939
    platform::RecordEvent inplaceback_event(
        "InplaceVarsBack", platform::TracerEventType::UserDefined, 10);
940 941 942 943
    auto& m = instr_node.InplaceBackMap();
    // NOTE(zhiqiu): same logic as TransferInplaceVarsBack() in operator.cc
    for (auto& p : m) {
      auto* transformed_tensor = GetMutableLoDTensorOrSelectedRowsValueFromVar(
944
          var_scope_.VarRef(p.first));
945
      auto* original_tensor = GetMutableLoDTensorOrSelectedRowsValueFromVar(
946
          var_scope_.VarRef(p.second));
947 948
      original_tensor->ShareDataWith(*transformed_tensor);
      VLOG(4) << "Transfer inplace variable back form "
949 950
              << var_scope_.GetNameById(p.first) << " to "
              << var_scope_.GetNameById(p.second);
951 952 953
    }
  }

954 955 956
  /*For profiling/benchmark only*/
  if (FLAGS_benchmark) {
    instr_node.DeviceContext().Wait();
957 958
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
    PADDLE_ENFORCE_GPU_SUCCESS(platform::GpuGetLastError());
959 960 961 962 963
    VLOG(4) << "Operator(" << op->Type()
            << "): context wait and get last error";
#endif
  }

964
  // for debug nan/inf
965
  if (op_with_kernel != nullptr && FLAGS_check_nan_inf) {
966
    VLOG(4) << "Check nan/inf";
967
    framework::details::CheckOpHasNanOrInf(
968
        *op,
969
        *local_scope,
970
        place);  // TODO(xiongkun03) change it to inner scope.
971
  }
972 973
}

974 975 976 977 978 979 980 981 982 983 984 985 986 987
void InterpreterCore::RunInstruction(const Instruction& instr_node) {
  VLOG(5) << __func__ << " OP id:" << instr_node.Id()
          << " name:" << instr_node.OpBase()->Type() << " type:"
          << (instr_node.KernelType() == OpFuncType::kCpuSync
                  ? "kCpuSync"
                  : (instr_node.KernelType() == OpFuncType::kGpuSync
                         ? "kGpuSync"
                         : "kGpuAsync"))
          << " runs on " << platform::GetCurrentThreadName();

  auto* op = instr_node.OpBase();
  platform::RecordEvent instruction_event(
      op->Type(), platform::TracerEventType::Operator, 1);

988 989
  SetDeviceId(instr_node.DeviceContext().GetPlace());

990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014
  try {
    instr_node.WaitEvent(place_);

    if (!instr_node.IsArtificial()) {
      RunOperator(instr_node);
      CheckGC(instr_node);
      interpreter::LogDeviceMemoryStats(place_);
    }

    instr_node.RecordEvent(place_);
  } catch (platform::EnforceNotMet& ex) {
    framework::InsertCallStackInfo(op->Type(), op->Attrs(), &ex);
    exception_holder_.Catch(std::make_exception_ptr(std::move(ex)));
  } catch (platform::EOFException&) {
    exception_holder_.Catch(std::current_exception());
  } catch (std::exception& ex) {
    LOG(WARNING) << op->Type() << " raises an exception "
                 << platform::demangle(typeid(ex).name()) << ", " << ex.what();
    exception_holder_.Catch(std::current_exception());
  } catch (...) {
    LOG(WARNING) << op->Type() << " raises an unknown exception";
    exception_holder_.Catch(std::current_exception());
  }
}

1015
void InterpreterCore::ExecuteInstructionList(
1016
    const std::vector<Instruction>& vec_instr) {
1017
  interpreter::ResetAtomicGuard guard(&deps_, &refs_);
L
Leo Chen 已提交
1018 1019
  unfinished_op_number_ = vec_instr.size();
  if (unfinished_op_number_ == 0) {
1020 1021 1022 1023
    VLOG(4) << "No op to run, return";
    return;
  }

1024 1025
  exception_holder_.Clear();

1026 1027
  for (size_t i = 0; i < dependecy_count_.size(); ++i) {
    if (dependecy_count_[i] == 0) {
1028
      // NOTE(zhiqiu): hot fix for jit input var
1029
      RecordMemcpyD2H(vec_instr.at(i));
1030 1031 1032 1033 1034 1035
      if (FLAGS_new_executor_serial_run) {
        RunInstructionAsync(i);
      } else {
        async_work_queue_->AddTask(vec_instr.at(i).KernelType(),
                                   [this, i] { RunInstructionAsync(i); });
      }
1036 1037 1038
    }
  }

1039
  auto event_name = main_thread_blocker_.WaitEvent();
1040 1041
  VLOG(1) << "main_thread_blocker_(" << &main_thread_blocker_
          << ") got event_name: " << event_name;
1042

1043
  if (UNLIKELY(exception_holder_.IsCaught())) {
1044
    VLOG(1) << "Exception caught " << exception_holder_.Type();
1045 1046 1047 1048 1049
    // Graceful exit when the executor encountered a fatal error.
    // EOF is not a fatal error.
    if (exception_holder_.Type() != "EOF") {
      async_work_queue_->Cancel();
    }
1050
    VLOG(4) << "Cancel ok";
1051
    PADDLE_ENFORCE_EQ(
1052 1053
        main_thread_blocker_.Clear(),
        0,
1054 1055
        platform::errors::PreconditionNotMet(
            "main_thread_blocker_.Clear() return -1, clear failed"));
1056
    VLOG(4) << "clear ok";
1057 1058
    exception_holder_.ReThrow();
  }
1059
}
1060

1061 1062
void InterpreterCore::RunNextInstructions(const Instruction& instr,
                                          SchedulingQueue* reserved_next_ops) {
1063 1064
  platform::RecordEvent record(
      "RunNextInstructions", platform::TracerEventType::UserDefined, 10);
1065

1066 1067 1068 1069
  auto IsReady = [this](size_t next_id) {
    VLOG(4) << "op_id: " << next_id
            << ", remain deps: " << deps_[next_id]->DynamicDep();
    return deps_[next_id]->CheckAndDecrease();
1070 1071
  };

R
Ruibiao Chen 已提交
1072 1073 1074 1075 1076
  for (size_t next_instr_id : instr.NextInstrsInDifferenceThread()) {
    if (IsReady(next_instr_id)) {
      async_work_queue_->AddTask(
          vec_instruction_[next_instr_id].KernelType(),
          [this, next_instr_id]() { RunInstructionAsync(next_instr_id); });
1077
    }
R
Ruibiao Chen 已提交
1078
  }
L
Leo Chen 已提交
1079

R
Ruibiao Chen 已提交
1080 1081
  for (size_t next_instr_id : instr.NextInstrsInSameThread()) {
    if (IsReady(next_instr_id)) {
1082
      reserved_next_ops->push(next_instr_id);
1083 1084 1085 1086
    }
  }
}

1087
void InterpreterCore::RunInstructionAsync(size_t instr_id) {
1088 1089 1090 1091
  // NOTE(Ruibiao): Due to the uncertain order in multi-threading asynchronous
  // scheduling, the priority order involved cross-thread scheduling is not
  // guaranteed. Only Ops scheduled by the same AddTask call have the guarantee
  // of priority order.
1092
  SchedulingQueue ready_ops(instruction_scheduling_priority_less);
1093
  ready_ops.push(instr_id);
L
liutiexing 已提交
1094
  while (!ready_ops.empty()) {
1095 1096
    instr_id = ready_ops.top();
    ready_ops.pop();
1097
    auto& instr_node = vec_instruction_.at(instr_id);
1098

1099
    RunInstruction(instr_node);
1100 1101 1102 1103 1104 1105 1106 1107

    if (UNLIKELY(exception_holder_.IsCaught())) {
      VLOG(4) << "Exception caught";
      if (exception_notifier_ != nullptr) {
        exception_notifier_->NotifyEvent();
      }
      return;
    }
1108

L
Leo Chen 已提交
1109 1110 1111
    VLOG(4) << "unfinished_op_number_: " << unfinished_op_number_;
    if (UNLIKELY(unfinished_op_number_.fetch_sub(
                     1, std::memory_order_relaxed) == 1)) {
1112 1113 1114 1115 1116
      if (completion_notifier_ != nullptr) {
        completion_notifier_->NotifyEvent();
      }
    }

1117
    RunNextInstructions(instr_node, &ready_ops);
L
liutiexing 已提交
1118
  }
1119 1120
}

1121
void InterpreterCore::RecordStreamForGC(const Instruction& instr) {
L
Leo Chen 已提交
1122 1123 1124 1125
#if !defined(PADDLE_WITH_CUDA) && !defined(PADDLE_WITH_HIP)
  PADDLE_THROW(platform::errors::Unimplemented(
      "RecordStreamForGC is only implemented when compiled with GPU."));
#else
1126
  if (!IsInterpretercoreFastGCEnabled() ||
R
Ruibiao Chen 已提交
1127
      instr.KernelType() != OpFuncType::kGpuAsync) {
1128 1129
    return;
  }
1130 1131
  platform::RecordEvent record(
      "RecordStreamForGC", platform::TracerEventType::UserDefined, 10);
1132

L
Leo Chen 已提交
1133 1134
  gpuStream_t stream =
      reinterpret_cast<const phi::GPUContext&>(instr.DeviceContext()).stream();
1135
  auto TensorRecordStream = [&stream](phi::DenseTensor& tensor) {
1136 1137 1138 1139 1140 1141 1142 1143 1144
    auto allocation = tensor.Holder();
    if (allocation == nullptr) {
      return;
    }

    const platform::Place& place = allocation->place();
    if (platform::is_gpu_place(place)) {
      memory::RecordStream(allocation, stream);
    } else if (platform::is_cuda_pinned_place(place)) {
1145 1146 1147
      // TODO(Ruibiao): Here should do something to make sure that the tensor
      // is not freed until the H2D copies done. However, simplely launch a
      // CUDA runtime callback to the H2D stream may lead a high performance
1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163
      // overhead. As all the cases we meet in H2D are copies from CPUPlace at
      // present, we just log a WARNING here. A better design is required.
      LOG(WARNING) << "Copy data from a CUDAPinned tensor in an asynchronous "
                      "manner may lead a data inconsistent";
    } else {
      // memory copies involve CPUPlace are always synchronous, so just do
      // nothing here
    }
  };

  /* NOTE(Ruibiao):Cross-stream tensor synchronization is required only when
   * all the following conditions are satisfied:
   * 1. The tensor will be GC after running the instruction, i.e., in
   * instr.GCCheckVars.
   * 2. The stream which initializes this tensor is different from the stream
   * which the instruction run in.
1164 1165
   * 3. The tensor is the instruction's input, cause we assume that
   * instruction will initialize all output tensors with its running stream.
1166 1167 1168 1169 1170 1171 1172 1173 1174
   * 4. In the OP function of this instruction, the tensor is an input of a
   * async CUDA kernel.
   *
   * Here we only process the first condition, because:
   * 1. Since the RecordStream function will directly return when the recored
   * stream is equal to the owning stream, recording a stream same as which
   * initialized this tensor has less time overhead. Conversely, it may take
   * more time if we try to extract those cross-stream input vars from
   * instr.GCCheckVars.
1175 1176
   * 2. Now the instruction has no idea of which vars involving async running
   * in OP function, and thus we can not recognize condition 4. It should be
1177 1178 1179
   * supported later.
   */
  for (int var_id : instr.GCCheckVars()) {
1180 1181
    VLOG(4) << "GC sync " << var_scope_.GetNameById(var_id) << " "
            << var_scope_.VarDesc(var_id);
1182 1183

    // persistable var will be ignore while GC
1184 1185
    if (var_scope_.VarDesc(var_id) &&
        var_scope_.VarDesc(var_id)->Persistable()) {
1186 1187 1188
      continue;
    }

1189
    paddle::framework::Variable* var = var_scope_.VarRef(var_id);
1190 1191 1192 1193
    if (var == nullptr) {
      continue;
    }

1194 1195
    if (var->IsType<phi::DenseTensor>()) {
      TensorRecordStream(*(var->GetMutable<phi::DenseTensor>()));
1196 1197 1198 1199
    } else if (var->IsType<
                   operators::reader::
                       OrderedMultiDeviceLoDTensorBlockingQueueHolder>()) {
      // do nothing
1200
    } else if (var->IsType<phi::SelectedRows>()) {
1201
      TensorRecordStream(
1202
          *(var->GetMutable<phi::SelectedRows>()->mutable_value()));
1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216
    } else if (var->IsType<LoDTensorArray>()) {
      auto* tensor_arr = var->GetMutable<LoDTensorArray>();
      for (auto& tensor : *tensor_arr) {
        TensorRecordStream(tensor);
      }
    } else if (var->IsType<std::vector<Scope*>>()) {
      // do nothing
    } else {
      PADDLE_THROW(platform::errors::Unimplemented(
          "The variable(%s) is not supported in eager deletion.",
          framework::ToTypeName(var->Type())));
    }
  }
#endif
L
Leo Chen 已提交
1217
}
1218

1219
void InterpreterCore::CheckGC(const Instruction& instr) {
1220 1221
  platform::RecordEvent record(
      "CheckGC", platform::TracerEventType::UserDefined, 10);
1222 1223 1224
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
  RecordStreamForGC(instr);
#endif
1225
  auto& var_scope = var_scope_;
1226

1227
  for (auto var_id : instr.GCCheckVars()) {
1228 1229 1230
    VLOG(4) << "GC:" << var_scope_.GetNameById(var_id) << ", id:" << var_id
            << ", ref:" << refs_[var_id]->DynamicRef();
    bool is_ready = refs_[var_id]->CheckAndDecrease();
1231 1232 1233 1234 1235
    // ignore all persistable var while GC
    if (var_scope.VarDesc(var_id) && var_scope.VarDesc(var_id)->Persistable()) {
      continue;
    }
    if (is_ready) {
X
xiongkun 已提交
1236 1237
      VLOG(6) << "Async delete variable with name : "
              << var_scope.GetNameById(var_id);
1238
      gc_->Add(refs_[var_id]->Var(), instr);
W
wanghuancoder 已提交
1239 1240 1241 1242
    }
  }
}

1243 1244 1245
void InterpreterCore::Prepare(const std::vector<std::string>& feed_names,
                              const std::vector<phi::DenseTensor>& feed_tensors,
                              bool prepare_feed) {
1246 1247
  PADDLE_ENFORCE_EQ(feed_names.size(),
                    feed_tensors.size(),
1248 1249 1250
                    platform::errors::PreconditionNotMet(
                        "Required feed_names.size() == feed_tensors.size(), "
                        "but received %d != %d",
1251 1252
                        feed_names.size(),
                        feed_tensors.size()));
1253
  auto FeedInput = [&] {
1254
    VLOG(4) << "Feed inputs";
1255
    for (size_t i = 0; i < feed_names.size(); ++i) {
1256
      auto* feed_var = local_scope_->FindVar(feed_names[i]);
1257
      PADDLE_ENFORCE_NOT_NULL(
1258 1259 1260
          feed_var,
          platform::errors::NotFound("Variable %s should not be nullptr.",
                                     feed_names[i]));
1261

1262
      auto feed_tensor = feed_var->GetMutable<phi::DenseTensor>();
1263
      feed_tensor->ShareDataWith(feed_tensors[i]);
1264
      feed_tensor->set_lod(feed_tensors[i].lod());
1265 1266 1267
    }
  };

1268
  if (!is_build_) {
L
Leo Chen 已提交
1269
    paddle::framework::interpreter::BuildVariableScope(
1270
        block_, execution_config_, &var_scope_);
1271
    FeedInput();
L
Leo Chen 已提交
1272
    std::vector<paddle::framework::OpFuncNode> op_func_nodes;
1273
    auto skip_run = paddle::framework::interpreter::BuildOpFuncList(
1274 1275 1276 1277 1278
        place_,
        block_,
        execution_config_.skip_gc_vars,
        &op_func_nodes,
        &var_scope_,
1279 1280
        execution_config_,
        HasLocalScope());
1281
    SetFeedVarsInplaceSkip(feed_names);
1282
    // convert vec func_list to graph
L
Leo Chen 已提交
1283
    Convert(&op_func_nodes);
1284
    UpdateSyncOpNum();
1285
    is_build_ = true;
1286 1287 1288 1289
    if (skip_run) {
      VLOG(4) << "RUN impl";
      RunImpl();
    }
1290
  }
W
wanghuancoder 已提交
1291
  // NOTE: Because feed_tensor will be GC after
L
Leo Chen 已提交
1292
  // paddle::framework::BuildOpFuncList, so we should
1293
  // call FeedInput again.
1294 1295 1296
  if (prepare_feed) {
    FeedInput();
  }
1297 1298
}

1299 1300 1301
void InterpreterCore::SetFeedVarsInplaceSkip(
    const std::vector<std::string>& feed_names) {
  for (auto& feed_name : feed_names) {
1302
    var_scope_.SetVarSikpInplace(feed_name, true);
1303 1304 1305
  }
}

L
Leo Chen 已提交
1306 1307
bool InterpreterCore::HasLocalScope() const { return local_scope_ != nullptr; }

1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411
// Note(zhangbo):
// (1) What is "Trace"?
// The OP execute scheduling rule adopted by Interpretercore by default is a
// multi-threaded scheduling mode(see ExecuteInstructionList). By maintaining a
// high-performance thread pool, the OP's execute scheduling is distributed to
// the sub threads maintained by the thread pool, but the main thread does not
// have any tasks. In Trace mode, the executor will execute directly in the main
// thread according to the pre provided OP sequence(trace_execute_order_),
// instead of being distributed to the thread pool.
// (2) When we use "Trace"?
// In dygraph to static, This scheduling causes that the execution of the
// forward and backward OPs and the execution of the dygraph optimizer cannot be
// executed in the same thread. Executing thread switch may cause cpu cache
// miss. When a model is all KQueueAsync type OPs, all OPs will be distributed
// to the DeviceThread for execution, and the multithreading scheduling will not
// have any benefits. Therefore, in the dynamic to static, when the number of
// KQueueAsync Ops is 0, we choose Trace mode.
void InterpreterCore::TraceInstructionList(
    const std::vector<Instruction>& vec_instr) {
  unfinished_op_number_ = vec_instr.size();
  if (unfinished_op_number_ == 0) {
    VLOG(4) << "No op to run, return";
    return;
  }

  exception_holder_.Clear();

  for (size_t i = 0; i < dependecy_count_.size(); ++i) {
    if (dependecy_count_[i] == 0) {
      // NOTE(zhiqiu): hot fix for jit input var
      RecordMemcpyD2H(vec_instr.at(i));
    }
  }

  for (size_t idx = 0; idx < trace_execute_order_.size(); idx++) {
    auto instr_id = trace_execute_order_[idx];
    auto& instr_node = vec_instruction_.at(instr_id);

    RunInstruction(instr_node);

    if (UNLIKELY(exception_holder_.IsCaught())) {
      VLOG(4) << "Exception caught";
      break;
    }
  }

  if (UNLIKELY(exception_holder_.IsCaught())) {
    VLOG(1) << "Exception caught " << exception_holder_.Type();
    PADDLE_ENFORCE_EQ(
        main_thread_blocker_.Clear(),
        0,
        platform::errors::PreconditionNotMet(
            "main_thread_blocker_.Clear() return -1, clear failed"));
    VLOG(4) << "clear ok";
    exception_holder_.ReThrow();
  }
}

void InterpreterCore::RecordMemcpyD2H(const Instruction& instr_node) {
  // NOTE(zhiqiu): hot fix for jit input var
  if (instr_node.OpBase()->Type() == interpreter::kMemcpyD2H) {
    platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance();
    auto* default_dev_ctx = pool.Get(place_);
    for (auto& event : instr_node.EventsToWait()) {
      platform::RecordEvent record(
          "RecordStreamEvent", platform::TracerEventType::UserDefined, 10);
      VLOG(3) << "Record event on default stream in jit_input_var at op: "
              << instr_node.OpBase()->Type();
      event.event_->Record(default_dev_ctx);
    }
  }
}

void InterpreterCore::UpdateSyncOpNum() {
  int64_t sync_op_num = 0;
  for (size_t i = 0; i < vec_instruction_.size(); ++i) {
    if (vec_instruction_[i].KernelType() == OpFuncType::kCpuSync ||
        vec_instruction_[i].KernelType() == OpFuncType::kGpuSync) {
      sync_op_num = sync_op_num + 1;
    }
  }
  sync_op_num_ = sync_op_num;
  VLOG(4) << "Update sync op num, sync op num is: " << sync_op_num_;
}

// Note(zhangbo):
// When there is a KQueueSync type OP in the model, breadth traversal is better
// than depth traversal. For example: OP(O) ->(direct_run)-> OP(A)
// ->(sync_run)-> OP(B) OP(O) ->(direct_run)-> OP(C) ->(direct_run)-> OP(D) If B
// is run before C, B may always block to wait for A to finish executing, but in
// fact, C can be executed first during this time.
void InterpreterCore::AnalyseExecuteOrderForTrace() {
  VLOG(4) << "Analyze the execution order of Trace scheduling mode.";
  interpreter::ResetAtomicGuard guard(&deps_, &refs_);

  auto op_downstream_map = dependency_builder_.OpDownstreamMap();

  auto IsReady = [this](size_t next_id) {
    VLOG(4) << "op_id: " << next_id
            << ", remain deps: " << deps_[next_id]->DynamicDep();
    return deps_[next_id]->CheckAndDecrease();
  };

  std::vector<size_t> trace_order;
1412
  SchedulingQueue ready_ops(instruction_scheduling_priority_less);
1413 1414 1415

  for (size_t instr_id = 0; instr_id < dependecy_count_.size(); ++instr_id) {
    if (dependecy_count_[instr_id] == 0) {
1416
      ready_ops.push(instr_id);
1417 1418 1419 1420
    }
  }

  while (!ready_ops.empty()) {
1421 1422
    size_t now_id = ready_ops.top();
    ready_ops.pop();
1423 1424 1425 1426 1427 1428
    trace_order.push_back(now_id);

    auto next_op_set = op_downstream_map[now_id];

    for (size_t next_op_id : next_op_set) {
      if (IsReady(next_op_id)) {
1429
        ready_ops.push(next_op_id);
1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442
      }
    }
  }

  PADDLE_ENFORCE_EQ(
      trace_order.size(),
      dependecy_count_.size(),
      platform::errors::PreconditionNotMet(
          "trace_order size should be equal to dependecy_count_."));

  trace_execute_order_ = trace_order;
}

1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461
std::shared_ptr<InterpreterCore> CreateInterpreterCore(
    const platform::Place& place,
    const ProgramDesc& prog,
    Scope* scope,
    const std::vector<std::string>& fetch_names,
    const interpreter::ExecutionConfig& execution_config) {
  std::shared_ptr<InterpreterCore> core = nullptr;
  // NOTE(Aurelius84): `AddFetch` will modify BlockDesc, so we should copy
  // a new program.
  auto new_prog = std::make_shared<framework::ProgramDesc>(prog);
  auto* block = new_prog->MutableBlock(0);
  interpreter::AddFetch(fetch_names, block);

  core =
      std::make_shared<InterpreterCore>(place, *block, scope, execution_config);
  core->SetCopyProgram(new_prog);
  return core;
}

1462 1463
}  // namespace framework
}  // namespace paddle