interpretercore.cc 34.0 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
14

15
#include "paddle/fluid/framework/new_executor/interpretercore.h"
16
#include <unordered_set>
17
#include "paddle/fluid/framework/details/nan_inf_utils.h"
18
#include "paddle/fluid/framework/details/share_tensor_buffer_functor.h"
19 20
#include "paddle/fluid/framework/new_executor/garbage_collector/event_garbage_collector.h"
#include "paddle/fluid/framework/new_executor/garbage_collector/fast_garbage_collector.h"
21 22
#include "paddle/fluid/framework/new_executor/interpretercore_util.h"
#include "paddle/fluid/framework/operator.h"
L
liutiexing 已提交
23 24
#include "paddle/fluid/platform/os_info.h"
#include "paddle/fluid/platform/profiler/event_tracing.h"
25
#include "paddle/phi/core/kernel_context.h"
L
Leo Chen 已提交
26 27 28
#ifdef PADDLE_WITH_MKLDNN
#include "paddle/fluid/platform/mkldnn_helper.h"
#endif
29

30 31
PADDLE_DEFINE_EXPORTED_bool(new_executor_use_inplace, true,
                            "Use inplace in new executor");
32 33 34
PADDLE_DEFINE_EXPORTED_bool(new_executor_use_local_scope, true,
                            "Use local_scope in new executor(especially used "
                            "in UT), can turn off for better performance");
35

36
DECLARE_bool(check_nan_inf);
37
DECLARE_bool(benchmark);
38
DECLARE_bool(fast_eager_deletion_mode);
39

40
constexpr const char* kExceptionCaught = "ExceptionCaught";
41
constexpr const char* kTaskCompletion = "TaskCompletion";
42

43 44
namespace paddle {
namespace framework {
45 46
// NOTE(Aurelius84): Need a better strategy to determine it.
static constexpr size_t kHostNumThreads = 4;
47
static constexpr size_t kDeviceNumThreads = 1;
48

49
bool IsInterpretercoreFastGCEnabled() {
50 51 52
  return memory::allocation::AllocatorFacade::Instance()
             .IsStreamSafeCUDAAllocatorUsed() &&
         FLAGS_fast_eager_deletion_mode;
53 54
}

55 56 57
InterpreterCore::InterpreterCore(const platform::Place& place,
                                 const BlockDesc& block,
                                 VariableScope* global_scope)
W
wanghuancoder 已提交
58
    : place_(place),
59
      block_(block),
W
wanghuancoder 已提交
60
      global_scope_(global_scope),
61
      stream_analyzer_(place) {
L
Leo Chen 已提交
62
  VLOG(4) << "InterpreterCore(): " << this << " on " << place_;
63
  is_build_ = false;
64 65
  async_work_queue_.reset(new interpreter::AsyncWorkQueue(
      kHostNumThreads, kDeviceNumThreads, &main_thread_blocker_));
66 67 68 69 70 71 72 73 74 75

#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
  if (IsInterpretercoreFastGCEnabled()) {
    gc_ = std::make_unique<InterpreterCoreFastGarbageCollector>();
  } else {
    gc_ = std::make_unique<InterpreterCoreEventGarbageCollector>();
  }
#else
  gc_ = std::make_unique<InterpreterCoreEventGarbageCollector>();
#endif
W
wanghuancoder 已提交
76

L
liutiexing 已提交
77
  exception_notifier_ = main_thread_blocker_.RegisterEvent(kExceptionCaught);
78
  completion_notifier_ = main_thread_blocker_.RegisterEvent(kTaskCompletion);
79

80 81 82 83 84 85 86 87
  create_local_scope_ = FLAGS_new_executor_use_local_scope;
  if (FLAGS_new_executor_use_local_scope) {
    auto local_scope = &global_scope->GetMutableScope()->NewScope();
    local_scope->AddListener(global_scope->Listener());
    local_scope_ = local_scope;
  }
  VLOG(4) << "create_local_scope_ is " << create_local_scope_;

W
wanghuancoder 已提交
88 89 90 91 92 93 94
  // prune

  // optmize graph pass

  // convert to run graph
}

95 96 97 98 99
InterpreterCore::~InterpreterCore() {
  // cancle gc's thread
  gc_.reset(nullptr);

  async_work_queue_.reset(nullptr);
L
Leo Chen 已提交
100 101 102 103 104 105 106 107
  VLOG(4) << "~InterpreterCore(): " << this;
  VLOG(4) << " on" << place_;

#ifdef PADDLE_WITH_MKLDNN
  // Clear mkl-dnn cache,
  // this is needed to have mkl-dnn unit tests working
  platform::ClearMKLDNNCache(place_, this);
#endif
108 109
}

110 111 112 113
void InterpreterCore::SetCopyProgram(std::shared_ptr<ProgramDesc> prog) {
  copy_program_ = prog;
}

W
wanghuancoder 已提交
114
paddle::framework::FetchList InterpreterCore::Run(
115
    const std::vector<std::string>& feed_names,
116
    const std::vector<framework::LoDTensor>& feed_tensors) {
L
Leo Chen 已提交
117 118 119
#ifdef PADDLE_WITH_MKLDNN
  platform::AttachPointerHashToMKLDNNKey(this, place_);
#endif
120
  bool is_build = is_build_;
121
  global_scope_->SetLocalScope(local_scope_);
122
  Prepare(feed_names, feed_tensors, is_build);
123

124
  if (is_build) {
125 126 127
    // add listener before run and is_build=true
    global_scope_->ResetListener();

128
    ExecuteInstructionList(vec_instruction_);
129 130
  }

131 132 133 134
  if (create_local_scope_) {
    ClearLoDTensorArrayInLocalScope();
  }

135 136 137
  // clear the listener after run
  global_scope_->ClearListener();

W
wanghuancoder 已提交
138
  // return Fetch Tensors
139
  auto* fetch_var = global_scope_->Var(interpreter::kFetchVarName);
140
  return std::move(*fetch_var->GetMutable<framework::FetchList>());
141 142
}

143 144
paddle::framework::FetchList InterpreterCore::Run(
    const std::vector<std::string>& feed_names) {
L
Leo Chen 已提交
145 146 147
#ifdef PADDLE_WITH_MKLDNN
  platform::AttachPointerHashToMKLDNNKey(this, place_);
#endif
148
  if (!is_build_) {
149 150 151 152 153 154 155 156 157 158 159 160 161 162
    if (create_local_scope_ &&
        global_scope_->GetMutableLocalScope() !=
            global_scope_->GetMutableScope() &&
        global_scope_->GetMutableLocalScope()) {
      VLOG(4) << "Clear previous local scope before run";
      VLOG(4) << global_scope_->GetMutableScope() << " "
              << global_scope_->GetMutableLocalScope();
      platform::DeviceContextPool::Instance().Get(place_)->Wait();
      // TODO(zhiqiu): clear the tensor holder of all vars in previous local
      // scope?
    }
    global_scope_->SetLocalScope(local_scope_);
    paddle::framework::interpreter::build_variable_scope(block_, global_scope_,
                                                         create_local_scope_);
163 164
    std::vector<paddle::framework::OpFuncNode> op_func_nodes;
    paddle::framework::interpreter::build_op_func_list(
165
        place_, block_, &op_func_nodes, global_scope_, create_local_scope_);
166
    is_build_ = true;
167
    SetFeedVarsInplaceSkip(feed_names);
168 169
    // convert vec func_list to graph
    Convert(&op_func_nodes);
170

171
  } else {
172 173 174
    // add listener before run and is_build=true
    global_scope_->ResetListener();

175 176 177
    ExecuteInstructionList(vec_instruction_);
  }

178 179 180 181
  if (create_local_scope_) {
    ClearLoDTensorArrayInLocalScope();
  }

182 183 184
  // clear the listener after run
  global_scope_->ClearListener();

185 186
  // return Fetch Tensors
  auto* fetch_var = global_scope_->Var(interpreter::kFetchVarName);
187
  return std::move(*fetch_var->GetMutable<framework::FetchList>());
188 189
}

190 191 192 193 194 195 196 197 198 199 200 201 202
// At the end of each step, the holder of Tensor in LoDTensorArray is null.
// Clear these Tensors and leave LoDTensorArray empty, otherwise an exception
// will occur in the next step
void InterpreterCore::ClearLoDTensorArrayInLocalScope() {
  auto vars = local_scope_->LocalVars();
  for (auto var : vars) {
    if (var->IsType<LoDTensorArray>()) {
      auto* lod_tensor_arr = var->GetMutable<LoDTensorArray>();
      lod_tensor_arr->clear();
    }
  }
}

X
xiongkun 已提交
203 204 205 206 207
void InterpreterCore::BuildOperatorDependences() {
  // analysis the dependences between ops, set the dependecy_count_ and Call
  // Schedule
  auto op_nums = vec_instruction_.size();
  dependecy_count_.resize(op_nums);
208 209
  auto op2downstream = interpreter::build_op_downstream_map(
      vec_instruction_, &op_happens_before_);
X
xiongkun 已提交
210 211 212 213 214 215 216 217 218 219 220
  for (size_t op = 0; op < vec_instruction_.size(); ++op) {
    auto op_list = op2downstream[op];
    std::vector<size_t> downsteam_vector(op_list.begin(), op_list.end());
    stream_analyzer_.Schedule(downsteam_vector, &vec_instruction_, op);

    for (auto inst_id : op_list) {
      dependecy_count_[inst_id]++;
    }
  }
}

L
Leo Chen 已提交
221 222
void InterpreterCore::Convert(
    std::vector<paddle::framework::OpFuncNode>* op_func_nodes) {
223
  auto& vec_meta_info = global_scope_->MutableVecMetaInfo();
224 225
  auto var_nums = global_scope_->VarSize();
  input_var2op_info_.resize(var_nums);
L
Leo Chen 已提交
226
  auto nodes = *op_func_nodes;
227

L
Leo Chen 已提交
228
  auto op_nums = nodes.size();
229 230
  vec_instruction_.reserve(op_nums);
  for (size_t op_idx = 0; op_idx < op_nums; ++op_idx) {
L
Leo Chen 已提交
231
    auto& op_func_node = nodes[op_idx];
232
    auto* dev_ctx_ = stream_analyzer_.ParseDeviceContext(op_func_node);
L
Leo Chen 已提交
233
    vec_instruction_.emplace_back(op_idx, std::move(op_func_node), *dev_ctx_);
234
  }
235

236 237 238 239 240
  BuildOperatorDependences();

  // calculate last_live_ops_
  for (size_t op_idx = 0; op_idx < op_nums; ++op_idx) {
    auto& instr = vec_instruction_[op_idx];
241
    OpInOutInfo info;
242
    std::set<size_t> gc_check_inputs;
243

244
    for (auto& item : instr.Inputs()) {
245
      for (auto id : item.second) {
W
wanghuancoder 已提交
246 247 248
        if (id == kEmptyVarIndex) {
          continue;
        }
249
        input_var2op_info_.at(id).push_back(op_idx);
W
wanghuancoder 已提交
250 251
        // var can be gc-ed
        if (!info.IsBuilt()) {
252
          info.Build(instr.OpBase());
W
wanghuancoder 已提交
253
        }
254 255 256
        auto* var_desc = global_scope_->VarDesc(id);
        if (var_desc) {
          if (info.IsInArgBufferNeeded(var_desc->Name())) {
257
            gc_check_inputs.insert(id);
W
wanghuancoder 已提交
258 259
          }
        } else {
260
          gc_check_inputs.insert(id);
W
wanghuancoder 已提交
261
        }
262 263
      }
    }
264

265
    for (auto var_id : gc_check_inputs) {
266 267 268
      paddle::framework::Variable* var = global_scope_->Var(var_id);
      if (var->IsType<LoDTensor>() || var->IsType<phi::SelectedRows>() ||
          var->IsType<LoDTensorArray>()) {
269
        last_live_ops_[var_id].insert(op_idx);
270 271 272 273 274 275
      } else {
        VLOG(4) << "not clear " << global_scope_->GetNameById(var_id)
                << " after " << instr.OpBase()->Type()
                << " because its type is "
                << framework::ToTypeName(var->Type());
      }
276 277 278
    }
  }

W
wanghuancoder 已提交
279
  for (size_t i = 0; i < vec_instruction_.size(); ++i) {
280
    // checkout output
281
    for (auto& item : vec_instruction_[i].Outputs()) {
282 283 284
      for (auto var_id : item.second) {
        if (input_var2op_info_.at(var_id).size() == 0) {
          last_live_ops_[var_id].insert(i);
W
wanghuancoder 已提交
285 286 287 288 289
        }
      }
    }
  }

290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320
  // shrink, find the downstream op that has no other op in the
  // downstream list happens before it
  // For example,
  // b = op1(a)
  // c = op2(a, b)
  // in this case, a is the input of op1 and op2, we only need to check
  // a after op2, because op2 always uses a after op1.
  for (size_t i = 0; i < last_live_ops_.size(); ++i) {
    std::set<size_t> minumum_last_live_ops;
    for (size_t item : last_live_ops_[i]) {
      bool not_before_any = true;
      // find the op that is not executed before any
      for (size_t other_item : last_live_ops_[i]) {
        if (op_happens_before_[item][other_item]) {
          VLOG(8) << "happens_before: " << item << "->" << other_item
                  << ", so skip " << item;
          not_before_any = false;
          break;
        }
      }
      if (not_before_any) {
        VLOG(8) << "last live op of var " << i << " "
                << global_scope_->GetNameById(i) << " : " << item << " "
                << vec_instruction_[item].OpBase()->Type();
        minumum_last_live_ops.insert(item);
        vec_instruction_[item].AddGCCheckVar(i);
      }
    }
    last_live_ops_[i] = minumum_last_live_ops;
    vec_meta_info[i].var_ref_count_ = last_live_ops_[i].size();
  }
321 322

  for (size_t i = 0; i < vec_instruction_.size(); ++i) {
323
    BuildAndCacheInstructionCtx(&vec_instruction_[i]);
324
  }
W
wanghuancoder 已提交
325

326 327
  BuildSkipShareLoDInfo();

W
wanghuancoder 已提交
328
  for (size_t i = 0; i < vec_instruction_.size(); ++i) {
329
    gc_event_.emplace_back(vec_instruction_[i].DeviceContext().GetPlace(),
W
wanghuancoder 已提交
330 331
                           platform::GenerateDeviceEventFlag());
  }
332 333 334 335 336 337 338 339
  bool inplaced = false;
  for (auto inst : vec_instruction_) {
    if (inst.OpBase()->Type() == "share_buffer" ||
        inst.OpBase()->Type() == "share_data") {
      VLOG(4) << "Already inplaced, skip inplace now.";
      inplaced = true;
    }
  }
340

341
  if (FLAGS_new_executor_use_inplace && !inplaced) {
342 343
    BuildInplace();
  }
344 345 346 347

  // prepare for the first time.
  async_work_queue_->PrepareAtomicDeps(dependecy_count_);
  async_work_queue_->PrepareAtomicVarRef(vec_meta_info);
348 349
}

350
bool InterpreterCore::BuildInplaceCheckVarIsOnlyInput(size_t var_index) {
351 352
  if (!global_scope_->VarDesc(var_index)) {
    return input_var2op_info_.at(var_index).size() == 1;
353 354
  } else {
    int is_input_cnt = 0;
355
    for (auto inst_id : input_var2op_info_.at(var_index)) {
356
      OpInOutInfo info;
357 358
      info.Build(vec_instruction_.at(inst_id).OpBase());
      if (info.IsInArgBufferNeeded(global_scope_->VarDesc(var_index)->Name())) {
359 360 361 362 363 364 365
        is_input_cnt++;
      }
    }
    return is_input_cnt == 1;
  }
}

366 367
void InterpreterCore::BuildInplace() {
  for (size_t i = 0; i < vec_instruction_.size(); ++i) {
368 369 370
    auto& instr = vec_instruction_[i];
    auto* op_base = instr.OpBase();
    if (!op_base->Info().infer_inplace_) {
371 372 373
      continue;
    }

374 375
    auto in_to_outs = op_base->Info().infer_inplace_(
        platform::is_gpu_place(instr.DeviceContext().GetPlace()));
376

377 378
    auto& inputs = instr.Inputs();
    auto& outputs = instr.Outputs();
379
    for (auto& pair : in_to_outs) {
380
      auto iter = inputs.find(pair.first);
381
      if (iter != inputs.end() && !iter->second.empty()) {
382 383 384 385 386 387 388
        auto in_var_desc = global_scope_->VarDesc(iter->second[0]);
        if (in_var_desc && in_var_desc->Persistable()) {
          continue;
        }
        if (global_scope_->GetVarSikpInplace(iter->second[0])) {
          continue;
        }
389
        if (BuildInplaceCheckVarIsOnlyInput(iter->second[0])) {
390
          auto iterout = outputs.find(pair.second);
391
          if (iterout != outputs.end() && !iterout->second.empty()) {
392 393
            auto invar = global_scope_->Var(iter->second[0]);
            auto outvar = global_scope_->Var(iterout->second[0]);
394 395
            if (invar && outvar && invar->IsType<LoDTensor>() &&
                outvar->IsType<LoDTensor>()) {
396
              instr.AddInplace(invar, outvar);
397 398
              VLOG(3) << "inplace " << vec_instruction_[i].OpBase()->Type()
                      << " " << global_scope_->GetNameById(iter->second[0])
399
                      << " -> "
400
                      << global_scope_->GetNameById(iterout->second[0])
401
                      << std::endl;
402 403 404 405 406 407
            }
          }
        }
      }
    }
  }
408 409
}

410
void InterpreterCore::BuildAndCacheInstructionCtx(Instruction* instr_node) {
411
  VariableValueMap ins_map;
412
  for (auto& var_name_item : instr_node->Inputs()) {
413 414 415 416
    std::vector<Variable*> input_vars;

    input_vars.reserve(var_name_item.second.size());
    for (auto& id : var_name_item.second) {
417
      input_vars.emplace_back(global_scope_->Var(id));
418 419 420 421 422
    }
    ins_map.emplace(var_name_item.first, std::move(input_vars));
  }

  VariableValueMap outs_map;
423
  for (auto& var_name_item : instr_node->Outputs()) {
424 425 426 427
    std::vector<Variable*> out_vars;

    out_vars.reserve(var_name_item.second.size());
    for (auto& id : var_name_item.second) {
428
      out_vars.emplace_back(global_scope_->Var(id));
429 430 431
    }
    outs_map.emplace(var_name_item.first, std::move(out_vars));
  }
432

433
  // set runtime_ctx and infershape_ctx_
434 435 436 437 438 439 440 441 442
  if (instr_node->OpBase()->Type() == "cinn_launch") {  // OP use scope in
                                                        // kernel
    Scope* local_scope = create_local_scope_
                             ? global_scope_->GetMutableLocalScope()
                             : global_scope_->GetMutableScope();
    instr_node->ResetContextWithScope(ins_map, outs_map, *local_scope);
  } else {
    instr_node->ResetContext(ins_map, outs_map);
  }
443 444
}

445 446 447
void InterpreterCore::BuildSkipShareLoDInfo() {
  for (size_t i = 0; i < vec_instruction_.size(); ++i) {
    bool can_skip_lod = true;
448
    for (auto& input : vec_instruction_[i].InnerRuntimeContext()->inputs) {
449 450 451 452 453 454 455 456 457 458 459 460
      for (auto& var : input.second) {
        if (var->IsType<LoDTensor>()) {
          if (var->Get<LoDTensor>().lod().size() != 0) {
            can_skip_lod = false;
            break;
          }
        } else {
          can_skip_lod = false;
          break;
        }
      }
    }
461
    vec_instruction_[i].InnerInferShapeContext()->SetSkipLoD(can_skip_lod);
462 463 464
  }
}

465
void InterpreterCore::RunInstruction(const Instruction& instr_node) {
466 467
  auto* op = instr_node.OpBase();
  auto place = instr_node.DeviceContext().GetPlace();
468 469 470 471
  VLOG(4) << "Start run " << place << " " << op->DebugStringEx(global_scope_);
  Scope* local_scope = create_local_scope_
                           ? global_scope_->GetMutableLocalScope()
                           : global_scope_->GetMutableScope();
472
  auto op_with_kernel = dynamic_cast<const framework::OperatorWithKernel*>(op);
473
  {
474
    // If it is OperatorBase, InferShape do nothing.
475 476 477 478
    if (op_with_kernel != nullptr) {
      platform::RecordEvent infershape_event(
          "infer_shape", platform::TracerEventType::OperatorInner, 1,
          platform::EventRole::kInnerOp);
479 480 481 482 483 484 485

      // see OperatorWithKernel::RunImpl in operator.cc for why
      if (!(op_with_kernel->HasAttr(kAllKernelsMustComputeRuntimeShape) &&
            op_with_kernel->Attr<bool>(kAllKernelsMustComputeRuntimeShape))) {
        op_with_kernel->Info().infer_shape_(
            instr_node.InnerInferShapeContext().get());
      }
486
    }
487
  }
488

489 490
  if (op_with_kernel != nullptr && FLAGS_new_executor_use_inplace) {
    // TODO(xiongkun03) Does operator base support inplace ?
491
    for (auto& pair : instr_node.InplaceInfo()) {
492 493 494 495 496 497 498 499
      const auto& in = paddle::framework::details::GetTensorFromVar(pair.first);
      auto* out =
          paddle::framework::details::GetMutableTensorFromVar(pair.second);
      if (in.dims() == out->dims()) {
        out->ShareBufferWith(in);
      }
    }
  }
500

501
  {
502
    platform::RecordEvent compute_event(
L
liutiexing 已提交
503
        "compute", platform::TracerEventType::OperatorInner, 1,
504
        platform::EventRole::kInnerOp);
505 506 507
    if (op_with_kernel == nullptr) {
      instr_node.OpBase()->Run(*local_scope, place_);
    } else {
508 509 510
      // fit for phi
      if (instr_node.PhiKernel() && instr_node.PhiKernel()->IsValid()) {
        VLOG(4) << "Run phi kernel: " << op->Type();
511 512
        VLOG(4) << instr_node.InnerRuntimeContext().get() << " "
                << &instr_node.DeviceContext();
513
        phi::KernelContext pt_kernel_context;
514
        op_with_kernel->BuildPhiKernelContext(
515
            *instr_node.InnerRuntimeContext().get(),
516 517
            const_cast<platform::DeviceContext*>(&instr_node.DeviceContext()),
            &pt_kernel_context);
518

519
        (*instr_node.PhiKernel())(&pt_kernel_context);
520 521 522 523

      } else {
        instr_node.KernelFunc()(*instr_node.InnerExecutionContext().get());
      }
524
    }
525
  }
526

527
  VLOG(4) << "End run " << place << " " << op->DebugStringEx(global_scope_);
528

529
  if (!instr_node.InplaceBackMap().empty()) {
L
liutiexing 已提交
530 531
    platform::RecordEvent inplaceback_event(
        "InplaceVarsBack", platform::TracerEventType::UserDefined, 10);
532 533 534 535 536 537 538 539 540 541 542 543 544 545
    auto& m = instr_node.InplaceBackMap();
    // NOTE(zhiqiu): same logic as TransferInplaceVarsBack() in operator.cc
    for (auto& p : m) {
      auto* transformed_tensor = GetMutableLoDTensorOrSelectedRowsValueFromVar(
          global_scope_->Var(p.first));
      auto* original_tensor = GetMutableLoDTensorOrSelectedRowsValueFromVar(
          global_scope_->Var(p.second));
      original_tensor->ShareDataWith(*transformed_tensor);
      VLOG(4) << "Transfer inplace variable back form "
              << global_scope_->GetNameById(p.first) << " to "
              << global_scope_->GetNameById(p.second);
    }
  }

546 547 548
  /*For profiling/benchmark only*/
  if (FLAGS_benchmark) {
    instr_node.DeviceContext().Wait();
549 550
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
    PADDLE_ENFORCE_GPU_SUCCESS(platform::GpuGetLastError());
551 552 553 554 555
    VLOG(4) << "Operator(" << op->Type()
            << "): context wait and get last error";
#endif
  }

556
  // for debug nan/inf
557
  if (op_with_kernel != nullptr && FLAGS_check_nan_inf) {
558
    VLOG(4) << "Check nan/inf";
559 560 561
    framework::details::CheckOpHasNanOrInf(
        *op, *global_scope_,
        place);  // TODO(xiongkun03) change it to inner scope.
562
  }
563 564 565
}

void InterpreterCore::ExecuteInstructionList(
566
    const std::vector<Instruction>& vec_instr) {
567 568 569 570 571 572
  unfinished_op_numer_ = vec_instr.size();
  if (unfinished_op_numer_ == 0) {
    VLOG(4) << "No op to run, return";
    return;
  }

L
liutiexing 已提交
573 574
  platform::RecordEvent record_prepare(
      "PrepareAtomic", platform::TracerEventType::UserDefined, 1);
575 576 577 578 579
  // NOTE(zhiqiu): get the prepared deps from std::future, and async prepare
  // those for the next step
  auto atomic_deps = async_work_queue_->AtomicDeps();
  auto atomic_var_ref = async_work_queue_->AtomicVarRef();

580
  async_work_queue_->PrepareAtomicDeps(dependecy_count_);
581
  async_work_queue_->PrepareAtomicVarRef(global_scope_->VecMetaInfo());
L
liutiexing 已提交
582
  record_prepare.End();
583

584 585
  exception_holder_.Clear();

586 587
  for (size_t i = 0; i < dependecy_count_.size(); ++i) {
    if (dependecy_count_[i] == 0) {
588 589 590 591
      async_work_queue_->AddTask(vec_instr.at(i).KernelType(), [
        this, i, atomic_deps = atomic_deps.get(),
        atomic_var_ref = atomic_var_ref.get()
      ] { RunInstructionAsync(i, atomic_deps, atomic_var_ref); });
592 593 594
    }
  }

595
  auto event_name = main_thread_blocker_.WaitEvent();
596
  VLOG(1) << "event_name: " << event_name;
597

598
  if (UNLIKELY(exception_holder_.IsCaught())) {
599
    VLOG(1) << "Exception caught " << exception_holder_.Type();
600 601 602 603 604
    // Graceful exit when the executor encountered a fatal error.
    // EOF is not a fatal error.
    if (exception_holder_.Type() != "EOF") {
      async_work_queue_->Cancel();
    }
605
    VLOG(4) << "Cancel ok";
606 607 608 609
    PADDLE_ENFORCE_EQ(
        main_thread_blocker_.Clear(), 0,
        platform::errors::PreconditionNotMet(
            "main_thread_blocker_.Clear() return -1, clear failed"));
610
    VLOG(4) << "clear ok";
611 612
    exception_holder_.ReThrow();
  }
613
}
614

L
liutiexing 已提交
615
void InterpreterCore::RunNextInstructions(
616 617 618
    const Instruction& instr, std::queue<size_t>* reserved_next_ops,
    std::vector<std::atomic<size_t>>* atomic_deps,
    std::vector<std::atomic<size_t>>* atomic_var_ref) {
L
liutiexing 已提交
619 620 621
  platform::RecordEvent record("RunNextInstructions",
                               platform::TracerEventType::UserDefined, 10);
  VLOG(4) << "atomic 1:" << atomic_deps;
622
  auto& next_instr = instr.NextInstructions();
623 624

  auto IsReady = [atomic_deps](size_t next_id) {
625 626
    VLOG(4) << "atomic:" << atomic_deps << " op_id: " << next_id
            << ", remain deps: " << (*atomic_deps)[next_id];
627
    return (*atomic_deps)[next_id].fetch_sub(1, std::memory_order_relaxed) == 1;
628 629
  };

630
  if (instr.KernelType() == OpFuncType::kQueueAsync) {
631
    // move all sync_ops into other threads
632
    for (auto next_id : next_instr.SyncRunIds()) {
633
      if (IsReady(next_id)) {
634
        async_work_queue_->AddTask(
635
            vec_instruction_[next_id].KernelType(),
636 637 638
            [this, next_id, atomic_deps, atomic_var_ref]() {
              RunInstructionAsync(next_id, atomic_deps, atomic_var_ref);
            });
639 640 641
      }
    }
    // keep all async_ops running in current thread
642
    for (auto next_id : next_instr.DirectRunIds()) {
643
      if (IsReady(next_id)) {
L
liutiexing 已提交
644
        reserved_next_ops->push(next_id);
645 646
      }
    }
647
    for (auto next_id : next_instr.EventRunIds()) {
648
      if (IsReady(next_id)) {
L
liutiexing 已提交
649
        reserved_next_ops->push(next_id);
650 651 652 653
      }
    }
  } else {
    // move async_ops into async_thread
654
    for (auto next_id : next_instr.EventRunIds()) {
655
      if (IsReady(next_id)) {
656
        async_work_queue_->AddTask(
657
            vec_instruction_[next_id].KernelType(),
658 659 660
            [this, next_id, atomic_deps, atomic_var_ref] {
              RunInstructionAsync(next_id, atomic_deps, atomic_var_ref);
            });
661 662
      }
    }
663 664
    auto direct_run_ops = interpreter::merge_vector(next_instr.SyncRunIds(),
                                                    next_instr.DirectRunIds());
665 666
    size_t first_op = 0;
    for (auto next_id : direct_run_ops) {
667 668
      if (IsReady(next_id)) {
        // only keep one op running in current thread
669 670
        if (first_op == 0) {
          first_op = next_id;
671 672 673
          continue;
        }
        // move rest ops into other threads
674
        async_work_queue_->AddTask(
675
            vec_instruction_[next_id].KernelType(),
676 677 678
            [this, next_id, atomic_deps, atomic_var_ref] {
              RunInstructionAsync(next_id, atomic_deps, atomic_var_ref);
            });
679 680
      }
    }
L
liutiexing 已提交
681
    if (first_op != 0) reserved_next_ops->push(first_op);
682 683 684
  }
}

685 686 687
void InterpreterCore::RunInstructionAsync(
    size_t instr_id, std::vector<std::atomic<size_t>>* atomic_deps,
    std::vector<std::atomic<size_t>>* atomic_var_ref) {
L
liutiexing 已提交
688 689 690 691 692
  std::queue<size_t> ready_ops;
  ready_ops.push(instr_id);
  while (!ready_ops.empty()) {
    instr_id = ready_ops.front();
    ready_ops.pop();
693
    auto& instr_node = vec_instruction_.at(instr_id);
L
liutiexing 已提交
694 695 696 697 698 699 700
    VLOG(5) << __func__ << " OP id:" << instr_node.Id()
            << " name:" << instr_node.OpBase()->Type()
            << " type:" << (instr_node.KernelType() == OpFuncType::kQueueSync
                                ? "kQueueSync"
                                : "kQueueAsync")
            << " runs on " << platform::GetCurrentThreadName();

701
    auto* op = instr_node.OpBase();
L
liutiexing 已提交
702 703
    platform::RecordEvent instruction_event(
        op->Type(), platform::TracerEventType::Operator, 1);
704

705
    try {
706 707
      interpreter::WaitEvent(instr_node, place_);

708
      RunInstruction(instr_node);
709 710 711 712

#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
      RecordStreamForGC(instr_node);
#endif
713
      CheckGC(instr_node, atomic_var_ref);
714 715

      interpreter::RecordEvent(instr_node, place_);
716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737
    } catch (platform::EnforceNotMet& ex) {
      framework::InsertCallStackInfo(op->Type(), op->Attrs(), &ex);
      exception_holder_.Catch(std::make_exception_ptr(std::move(ex)));
    } catch (platform::EOFException&) {
      exception_holder_.Catch(std::current_exception());
    } catch (std::exception& ex) {
      LOG(WARNING) << op->Type() << " raises an exception "
                   << platform::demangle(typeid(ex).name()) << ", "
                   << ex.what();
      exception_holder_.Catch(std::current_exception());
    } catch (...) {
      LOG(WARNING) << op->Type() << " raises an unknown exception";
      exception_holder_.Catch(std::current_exception());
    }

    if (UNLIKELY(exception_holder_.IsCaught())) {
      VLOG(4) << "Exception caught";
      if (exception_notifier_ != nullptr) {
        exception_notifier_->NotifyEvent();
      }
      return;
    }
738

739 740 741 742 743 744 745 746
    VLOG(4) << "unfinished_op_numer_: " << unfinished_op_numer_;
    if (UNLIKELY(unfinished_op_numer_.fetch_sub(1, std::memory_order_relaxed) ==
                 1)) {
      if (completion_notifier_ != nullptr) {
        completion_notifier_->NotifyEvent();
      }
    }

747
    RunNextInstructions(instr_node, &ready_ops, atomic_deps, atomic_var_ref);
L
liutiexing 已提交
748
  }
749 750
}

751 752 753 754 755 756
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
void InterpreterCore::RecordStreamForGC(const Instruction& instr) {
  if (!IsInterpretercoreFastGCEnabled() ||
      instr.KernelType() != OpFuncType::kQueueAsync) {
    return;
  }
L
liutiexing 已提交
757 758
  platform::RecordEvent record("RecordStreamForGC",
                               platform::TracerEventType::UserDefined, 10);
759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827

  gpuStream_t stream = reinterpret_cast<const platform::CUDADeviceContext&>(
                           instr.DeviceContext())
                           .stream();
  auto TensorRecordStream = [&stream](Tensor& tensor) {
    auto allocation = tensor.Holder();
    if (allocation == nullptr) {
      return;
    }

    const platform::Place& place = allocation->place();
    if (platform::is_gpu_place(place)) {
      memory::RecordStream(allocation, stream);
    } else if (platform::is_cuda_pinned_place(place)) {
      // TODO(Ruibiao): Here should do something to make sure that the tensor is
      // not freed until the H2D copies done. However, simplely launch a CUDA
      // runtime callback to the H2D stream may lead a high performance
      // overhead. As all the cases we meet in H2D are copies from CPUPlace at
      // present, we just log a WARNING here. A better design is required.
      LOG(WARNING) << "Copy data from a CUDAPinned tensor in an asynchronous "
                      "manner may lead a data inconsistent";
    } else {
      // memory copies involve CPUPlace are always synchronous, so just do
      // nothing here
    }
  };

  /* NOTE(Ruibiao):Cross-stream tensor synchronization is required only when
   * all the following conditions are satisfied:
   * 1. The tensor will be GC after running the instruction, i.e., in
   * instr.GCCheckVars.
   * 2. The stream which initializes this tensor is different from the stream
   * which the instruction run in.
   * 3. The tensor is the instruction's input, cause we assume that instruction
   * will initialize all output tensors with its running stream.
   * 4. In the OP function of this instruction, the tensor is an input of a
   * async CUDA kernel.
   *
   * Here we only process the first condition, because:
   * 1. Since the RecordStream function will directly return when the recored
   * stream is equal to the owning stream, recording a stream same as which
   * initialized this tensor has less time overhead. Conversely, it may take
   * more time if we try to extract those cross-stream input vars from
   * instr.GCCheckVars.
   * 2. Now the instruction has no idea of which vars involving async running in
   * OP function, and thus we can not recognize condition 4. It should be
   * supported later.
   */
  for (int var_id : instr.GCCheckVars()) {
    VLOG(4) << "GC sync " << global_scope_->GetNameById(var_id) << " "
            << global_scope_->VarDesc(var_id);

    // persistable var will be ignore while GC
    if (global_scope_->VarDesc(var_id) &&
        global_scope_->VarDesc(var_id)->Persistable()) {
      continue;
    }

    paddle::framework::Variable* var = global_scope_->Var(var_id);
    if (var == nullptr) {
      continue;
    }

    if (var->IsType<LoDTensor>()) {
      TensorRecordStream(*(var->GetMutable<LoDTensor>()));
    } else if (var->IsType<
                   operators::reader::
                       OrderedMultiDeviceLoDTensorBlockingQueueHolder>()) {
      // do nothing
828
    } else if (var->IsType<phi::SelectedRows>()) {
829
      TensorRecordStream(
830
          *(var->GetMutable<phi::SelectedRows>()->mutable_value()));
831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846
    } else if (var->IsType<LoDTensorArray>()) {
      auto* tensor_arr = var->GetMutable<LoDTensorArray>();
      for (auto& tensor : *tensor_arr) {
        TensorRecordStream(tensor);
      }
    } else if (var->IsType<std::vector<Scope*>>()) {
      // do nothing
    } else {
      PADDLE_THROW(platform::errors::Unimplemented(
          "The variable(%s) is not supported in eager deletion.",
          framework::ToTypeName(var->Type())));
    }
  }
}
#endif

847 848 849
void InterpreterCore::CheckGC(
    const Instruction& instr,
    std::vector<std::atomic<size_t>>* atomic_var_ref) {
L
liutiexing 已提交
850 851
  platform::RecordEvent record("CheckGC",
                               platform::TracerEventType::UserDefined, 10);
852
  size_t instr_id = instr.Id();
853 854
  auto& var_scope = *global_scope_;

855
  for (auto var_id : instr.GCCheckVars()) {
L
Leo Chen 已提交
856 857
    VLOG(4) << "GC " << global_scope_->GetNameById(var_id) << " "
            << var_scope.VarDesc(var_id);
858 859
    VLOG(4) << "atomic:" << atomic_var_ref << " " << &(*atomic_var_ref)[var_id]
            << " " << var_id;
860
    bool is_ready =
861
        (*atomic_var_ref)[var_id].fetch_sub(1, std::memory_order_relaxed) == 1;
862 863 864 865 866
    // ignore all persistable var while GC
    if (var_scope.VarDesc(var_id) && var_scope.VarDesc(var_id)->Persistable()) {
      continue;
    }
    if (is_ready) {
X
xiongkun 已提交
867 868
      VLOG(6) << "Async delete variable with name : "
              << var_scope.GetNameById(var_id);
869 870 871 872 873 874 875
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
      if (IsInterpretercoreFastGCEnabled()) {
        static_cast<InterpreterCoreFastGarbageCollector*>(gc_.get())->Add(
            var_scope.Var(var_id));

      } else {
        static_cast<InterpreterCoreEventGarbageCollector*>(gc_.get())->Add(
876
            var_scope.Var(var_id), &gc_event_.at(instr_id),
877 878 879 880
            &instr.DeviceContext());
      }
#else
      static_cast<InterpreterCoreEventGarbageCollector*>(gc_.get())->Add(
881
          var_scope.Var(var_id), &gc_event_.at(instr_id),
882 883
          &instr.DeviceContext());
#endif
W
wanghuancoder 已提交
884 885 886 887
    }
  }
}

888 889 890 891 892 893 894 895 896
void InterpreterCore::Prepare(
    const std::vector<std::string>& feed_names,
    const std::vector<framework::LoDTensor>& feed_tensors, bool prepare_feed) {
  PADDLE_ENFORCE_EQ(feed_names.size(), feed_tensors.size(),
                    platform::errors::PreconditionNotMet(
                        "Required feed_names.size() == feed_tensors.size(), "
                        "but received %d != %d",
                        feed_names.size(), feed_tensors.size()));

897
  auto FeedInput = [&] {
898
    VLOG(4) << "Feed inputs";
899 900
    for (size_t i = 0; i < feed_names.size(); ++i) {
      auto* feed_var = global_scope_->FindVar(feed_names[i]);
901 902 903
      PADDLE_ENFORCE_NOT_NULL(
          feed_var, platform::errors::NotFound(
                        "Variable %s should not be nullptr.", feed_names[i]));
904

905
      auto feed_tensor = feed_var->GetMutable<framework::LoDTensor>();
906
      feed_tensor->ShareDataWith(feed_tensors[i]);
907
      feed_tensor->set_lod(feed_tensors[i].lod());
908 909 910
    }
  };

911
  if (!is_build_) {
912 913
    paddle::framework::interpreter::build_variable_scope(block_, global_scope_,
                                                         create_local_scope_);
914
    FeedInput();
L
Leo Chen 已提交
915
    std::vector<paddle::framework::OpFuncNode> op_func_nodes;
916
    paddle::framework::interpreter::build_op_func_list(
917
        place_, block_, &op_func_nodes, global_scope_, create_local_scope_);
918
    is_build_ = true;
919
    SetFeedVarsInplaceSkip(feed_names);
920
    // convert vec func_list to graph
L
Leo Chen 已提交
921
    Convert(&op_func_nodes);
922
  }
W
wanghuancoder 已提交
923 924
  // NOTE: Because feed_tensor will be GC after
  // paddle::framework::build_op_func_list, so we should
925
  // call FeedInput again.
926 927 928
  if (prepare_feed) {
    FeedInput();
  }
929 930
}

931 932
interpreter::CostInfo InterpreterCore::DryRun(
    const std::vector<std::string>& feed_names,
933
    const std::vector<framework::LoDTensor>& feed_tensors) {
934
  global_scope_->SetLocalScope(local_scope_);
935 936 937 938 939 940 941 942
  Prepare(feed_names, feed_tensors, true);
  interpreter::CostInfo cost_info;
  {
    interpreter::ProfilerGuard(place_, &cost_info);
    ExecuteInstructionList(vec_instruction_);
    platform::DeviceContextPool::Instance().Get(place_)->Wait();
  }

943 944 945 946
  if (create_local_scope_) {
    ClearLoDTensorArrayInLocalScope();
  }

947
  return cost_info;
948
}
949

950 951 952 953 954 955 956
void InterpreterCore::SetFeedVarsInplaceSkip(
    const std::vector<std::string>& feed_names) {
  for (auto& feed_name : feed_names) {
    global_scope_->SetVarSikpInplace(feed_name, true);
  }
}

957 958
}  // namespace framework
}  // namespace paddle