interpretercore.cc 35.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
14

15
#include "paddle/fluid/framework/new_executor/interpretercore.h"
16

17
#include <unordered_set>
18

19
#include "paddle/fluid/framework/details/nan_inf_utils.h"
20
#include "paddle/fluid/framework/details/share_tensor_buffer_functor.h"
21 22
#include "paddle/fluid/framework/new_executor/garbage_collector/event_garbage_collector.h"
#include "paddle/fluid/framework/new_executor/garbage_collector/fast_garbage_collector.h"
23 24
#include "paddle/fluid/framework/new_executor/interpretercore_util.h"
#include "paddle/fluid/framework/operator.h"
L
liutiexing 已提交
25 26
#include "paddle/fluid/platform/os_info.h"
#include "paddle/fluid/platform/profiler/event_tracing.h"
27
#include "paddle/phi/core/kernel_context.h"
L
Leo Chen 已提交
28 29 30
#ifdef PADDLE_WITH_MKLDNN
#include "paddle/fluid/platform/mkldnn_helper.h"
#endif
31

32 33
PADDLE_DEFINE_EXPORTED_bool(new_executor_use_inplace, true,
                            "Use inplace in new executor");
34 35 36
PADDLE_DEFINE_EXPORTED_bool(new_executor_use_local_scope, true,
                            "Use local_scope in new executor(especially used "
                            "in UT), can turn off for better performance");
37

38
DECLARE_bool(check_nan_inf);
39
DECLARE_bool(benchmark);
40
DECLARE_bool(fast_eager_deletion_mode);
41

42
constexpr const char* kExceptionCaught = "ExceptionCaught";
43
constexpr const char* kTaskCompletion = "TaskCompletion";
44

45 46
namespace paddle {
namespace framework {
47 48
// NOTE(Aurelius84): Need a better strategy to determine it.
static constexpr size_t kHostNumThreads = 4;
49
static constexpr size_t kDeviceNumThreads = 1;
50

51
bool IsInterpretercoreFastGCEnabled() {
52 53 54
  return memory::allocation::AllocatorFacade::Instance()
             .IsStreamSafeCUDAAllocatorUsed() &&
         FLAGS_fast_eager_deletion_mode;
55 56
}

57 58 59
InterpreterCore::InterpreterCore(const platform::Place& place,
                                 const BlockDesc& block,
                                 VariableScope* global_scope)
W
wanghuancoder 已提交
60
    : place_(place),
61
      block_(block),
W
wanghuancoder 已提交
62
      global_scope_(global_scope),
63
      stream_analyzer_(place) {
L
Leo Chen 已提交
64
  VLOG(4) << "InterpreterCore(): " << this << " on " << place_;
65
  is_build_ = false;
66 67 68 69 70 71 72 73 74 75

#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
  if (IsInterpretercoreFastGCEnabled()) {
    gc_ = std::make_unique<InterpreterCoreFastGarbageCollector>();
  } else {
    gc_ = std::make_unique<InterpreterCoreEventGarbageCollector>();
  }
#else
  gc_ = std::make_unique<InterpreterCoreEventGarbageCollector>();
#endif
W
wanghuancoder 已提交
76

L
liutiexing 已提交
77
  exception_notifier_ = main_thread_blocker_.RegisterEvent(kExceptionCaught);
78
  completion_notifier_ = main_thread_blocker_.RegisterEvent(kTaskCompletion);
79

80 81 82 83 84 85 86 87
  create_local_scope_ = FLAGS_new_executor_use_local_scope;
  if (FLAGS_new_executor_use_local_scope) {
    auto local_scope = &global_scope->GetMutableScope()->NewScope();
    local_scope->AddListener(global_scope->Listener());
    local_scope_ = local_scope;
  }
  VLOG(4) << "create_local_scope_ is " << create_local_scope_;

W
wanghuancoder 已提交
88 89 90 91 92 93 94
  // prune

  // optmize graph pass

  // convert to run graph
}

95 96 97 98 99
InterpreterCore::~InterpreterCore() {
  // cancle gc's thread
  gc_.reset(nullptr);

  async_work_queue_.reset(nullptr);
L
Leo Chen 已提交
100 101 102 103 104 105 106 107
  VLOG(4) << "~InterpreterCore(): " << this;
  VLOG(4) << " on" << place_;

#ifdef PADDLE_WITH_MKLDNN
  // Clear mkl-dnn cache,
  // this is needed to have mkl-dnn unit tests working
  platform::ClearMKLDNNCache(place_, this);
#endif
108 109
}

110 111 112 113
void InterpreterCore::SetCopyProgram(std::shared_ptr<ProgramDesc> prog) {
  copy_program_ = prog;
}

W
wanghuancoder 已提交
114
paddle::framework::FetchList InterpreterCore::Run(
115
    const std::vector<std::string>& feed_names,
116
    const std::vector<framework::LoDTensor>& feed_tensors) {
L
Leo Chen 已提交
117 118 119
#ifdef PADDLE_WITH_MKLDNN
  platform::AttachPointerHashToMKLDNNKey(this, place_);
#endif
120
  bool is_build = is_build_;
121
  global_scope_->SetLocalScope(local_scope_);
122
  Prepare(feed_names, feed_tensors, is_build);
123

124
  if (is_build) {
125 126 127
    // add listener before run and is_build=true
    global_scope_->ResetListener();

128 129 130 131 132 133 134 135 136 137 138
    // For the program that only run once, it is no need to
    // create work_queue, so the async_work_queue_ is created
    // until the second step run.
    if (async_work_queue_ == nullptr) {
      async_work_queue_ = std::make_unique<interpreter::AsyncWorkQueue>(
          kHostNumThreads, kDeviceNumThreads, &main_thread_blocker_);
      // prepare for the first time.
      async_work_queue_->PrepareAtomicDeps(dependecy_count_);
      async_work_queue_->PrepareAtomicVarRef(global_scope_->VecMetaInfo());
    }

139
    ExecuteInstructionList(vec_instruction_);
140 141
  }

142 143 144 145
  if (create_local_scope_) {
    ClearLoDTensorArrayInLocalScope();
  }

146 147 148
  // clear the listener after run
  global_scope_->ClearListener();

W
wanghuancoder 已提交
149
  // return Fetch Tensors
150
  auto* fetch_var = global_scope_->Var(interpreter::kFetchVarName);
151
  return std::move(*fetch_var->GetMutable<framework::FetchList>());
152 153
}

154 155
paddle::framework::FetchList InterpreterCore::Run(
    const std::vector<std::string>& feed_names) {
L
Leo Chen 已提交
156 157 158
#ifdef PADDLE_WITH_MKLDNN
  platform::AttachPointerHashToMKLDNNKey(this, place_);
#endif
159
  if (!is_build_) {
160 161 162 163 164 165 166 167 168 169 170 171 172 173
    if (create_local_scope_ &&
        global_scope_->GetMutableLocalScope() !=
            global_scope_->GetMutableScope() &&
        global_scope_->GetMutableLocalScope()) {
      VLOG(4) << "Clear previous local scope before run";
      VLOG(4) << global_scope_->GetMutableScope() << " "
              << global_scope_->GetMutableLocalScope();
      platform::DeviceContextPool::Instance().Get(place_)->Wait();
      // TODO(zhiqiu): clear the tensor holder of all vars in previous local
      // scope?
    }
    global_scope_->SetLocalScope(local_scope_);
    paddle::framework::interpreter::build_variable_scope(block_, global_scope_,
                                                         create_local_scope_);
174 175
    std::vector<paddle::framework::OpFuncNode> op_func_nodes;
    paddle::framework::interpreter::build_op_func_list(
176
        place_, block_, &op_func_nodes, global_scope_, create_local_scope_);
177
    is_build_ = true;
178
    SetFeedVarsInplaceSkip(feed_names);
179 180
    // convert vec func_list to graph
    Convert(&op_func_nodes);
181

182
  } else {
183 184 185
    // add listener before run and is_build=true
    global_scope_->ResetListener();

186 187 188 189 190 191 192 193 194 195 196
    // For the program that only run once, it is no need to
    // create work_queue, so the async_work_queue_ is created
    // until the second step run.
    if (async_work_queue_ == nullptr) {
      async_work_queue_ = std::make_unique<interpreter::AsyncWorkQueue>(
          kHostNumThreads, kDeviceNumThreads, &main_thread_blocker_);
      // prepare for the first time.
      async_work_queue_->PrepareAtomicDeps(dependecy_count_);
      async_work_queue_->PrepareAtomicVarRef(global_scope_->VecMetaInfo());
    }

197 198 199
    ExecuteInstructionList(vec_instruction_);
  }

200 201 202 203
  if (create_local_scope_) {
    ClearLoDTensorArrayInLocalScope();
  }

204 205 206
  // clear the listener after run
  global_scope_->ClearListener();

207 208
  // return Fetch Tensors
  auto* fetch_var = global_scope_->Var(interpreter::kFetchVarName);
209
  return std::move(*fetch_var->GetMutable<framework::FetchList>());
210 211
}

212 213 214 215 216 217 218 219 220 221 222 223 224
// At the end of each step, the holder of Tensor in LoDTensorArray is null.
// Clear these Tensors and leave LoDTensorArray empty, otherwise an exception
// will occur in the next step
void InterpreterCore::ClearLoDTensorArrayInLocalScope() {
  auto vars = local_scope_->LocalVars();
  for (auto var : vars) {
    if (var->IsType<LoDTensorArray>()) {
      auto* lod_tensor_arr = var->GetMutable<LoDTensorArray>();
      lod_tensor_arr->clear();
    }
  }
}

X
xiongkun 已提交
225 226 227 228 229
void InterpreterCore::BuildOperatorDependences() {
  // analysis the dependences between ops, set the dependecy_count_ and Call
  // Schedule
  auto op_nums = vec_instruction_.size();
  dependecy_count_.resize(op_nums);
230 231
  auto op2downstream = interpreter::build_op_downstream_map(
      vec_instruction_, &op_happens_before_);
X
xiongkun 已提交
232 233 234 235 236 237 238 239 240 241 242
  for (size_t op = 0; op < vec_instruction_.size(); ++op) {
    auto op_list = op2downstream[op];
    std::vector<size_t> downsteam_vector(op_list.begin(), op_list.end());
    stream_analyzer_.Schedule(downsteam_vector, &vec_instruction_, op);

    for (auto inst_id : op_list) {
      dependecy_count_[inst_id]++;
    }
  }
}

L
Leo Chen 已提交
243 244
void InterpreterCore::Convert(
    std::vector<paddle::framework::OpFuncNode>* op_func_nodes) {
245
  auto& vec_meta_info = global_scope_->MutableVecMetaInfo();
246 247
  auto var_nums = global_scope_->VarSize();
  input_var2op_info_.resize(var_nums);
L
Leo Chen 已提交
248
  auto nodes = *op_func_nodes;
249

L
Leo Chen 已提交
250
  auto op_nums = nodes.size();
251 252
  vec_instruction_.reserve(op_nums);
  for (size_t op_idx = 0; op_idx < op_nums; ++op_idx) {
L
Leo Chen 已提交
253
    auto& op_func_node = nodes[op_idx];
254
    auto* dev_ctx_ = stream_analyzer_.ParseDeviceContext(op_func_node);
L
Leo Chen 已提交
255
    vec_instruction_.emplace_back(op_idx, std::move(op_func_node), *dev_ctx_);
256
  }
257

258 259 260 261 262
  BuildOperatorDependences();

  // calculate last_live_ops_
  for (size_t op_idx = 0; op_idx < op_nums; ++op_idx) {
    auto& instr = vec_instruction_[op_idx];
263
    OpInOutInfo info;
264
    std::set<size_t> gc_check_inputs;
265

266
    for (auto& item : instr.Inputs()) {
267
      for (auto id : item.second) {
W
wanghuancoder 已提交
268 269 270
        if (id == kEmptyVarIndex) {
          continue;
        }
271
        input_var2op_info_.at(id).push_back(op_idx);
W
wanghuancoder 已提交
272 273
        // var can be gc-ed
        if (!info.IsBuilt()) {
274
          info.Build(instr.OpBase());
W
wanghuancoder 已提交
275
        }
276 277 278
        auto* var_desc = global_scope_->VarDesc(id);
        if (var_desc) {
          if (info.IsInArgBufferNeeded(var_desc->Name())) {
279
            gc_check_inputs.insert(id);
W
wanghuancoder 已提交
280 281
          }
        } else {
282
          gc_check_inputs.insert(id);
W
wanghuancoder 已提交
283
        }
284 285
      }
    }
286

287
    for (auto var_id : gc_check_inputs) {
288 289 290
      paddle::framework::Variable* var = global_scope_->Var(var_id);
      if (var->IsType<LoDTensor>() || var->IsType<phi::SelectedRows>() ||
          var->IsType<LoDTensorArray>()) {
291
        last_live_ops_[var_id].insert(op_idx);
292 293 294 295 296 297
      } else {
        VLOG(4) << "not clear " << global_scope_->GetNameById(var_id)
                << " after " << instr.OpBase()->Type()
                << " because its type is "
                << framework::ToTypeName(var->Type());
      }
298 299 300
    }
  }

W
wanghuancoder 已提交
301
  for (size_t i = 0; i < vec_instruction_.size(); ++i) {
302
    // checkout output
303
    for (auto& item : vec_instruction_[i].Outputs()) {
304 305 306
      for (auto var_id : item.second) {
        if (input_var2op_info_.at(var_id).size() == 0) {
          last_live_ops_[var_id].insert(i);
W
wanghuancoder 已提交
307 308 309 310 311
        }
      }
    }
  }

312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342
  // shrink, find the downstream op that has no other op in the
  // downstream list happens before it
  // For example,
  // b = op1(a)
  // c = op2(a, b)
  // in this case, a is the input of op1 and op2, we only need to check
  // a after op2, because op2 always uses a after op1.
  for (size_t i = 0; i < last_live_ops_.size(); ++i) {
    std::set<size_t> minumum_last_live_ops;
    for (size_t item : last_live_ops_[i]) {
      bool not_before_any = true;
      // find the op that is not executed before any
      for (size_t other_item : last_live_ops_[i]) {
        if (op_happens_before_[item][other_item]) {
          VLOG(8) << "happens_before: " << item << "->" << other_item
                  << ", so skip " << item;
          not_before_any = false;
          break;
        }
      }
      if (not_before_any) {
        VLOG(8) << "last live op of var " << i << " "
                << global_scope_->GetNameById(i) << " : " << item << " "
                << vec_instruction_[item].OpBase()->Type();
        minumum_last_live_ops.insert(item);
        vec_instruction_[item].AddGCCheckVar(i);
      }
    }
    last_live_ops_[i] = minumum_last_live_ops;
    vec_meta_info[i].var_ref_count_ = last_live_ops_[i].size();
  }
343 344

  for (size_t i = 0; i < vec_instruction_.size(); ++i) {
345
    BuildAndCacheInstructionCtx(&vec_instruction_[i]);
346
  }
W
wanghuancoder 已提交
347

348 349
  BuildSkipShareLoDInfo();

W
wanghuancoder 已提交
350
  for (size_t i = 0; i < vec_instruction_.size(); ++i) {
351
    gc_event_.emplace_back(vec_instruction_[i].DeviceContext().GetPlace(),
W
wanghuancoder 已提交
352 353
                           platform::GenerateDeviceEventFlag());
  }
354 355 356 357 358 359 360 361
  bool inplaced = false;
  for (auto inst : vec_instruction_) {
    if (inst.OpBase()->Type() == "share_buffer" ||
        inst.OpBase()->Type() == "share_data") {
      VLOG(4) << "Already inplaced, skip inplace now.";
      inplaced = true;
    }
  }
362

363
  if (FLAGS_new_executor_use_inplace && !inplaced) {
364 365 366 367
    BuildInplace();
  }
}

368
bool InterpreterCore::BuildInplaceCheckVarIsOnlyInput(size_t var_index) {
369 370
  if (!global_scope_->VarDesc(var_index)) {
    return input_var2op_info_.at(var_index).size() == 1;
371 372
  } else {
    int is_input_cnt = 0;
373
    for (auto inst_id : input_var2op_info_.at(var_index)) {
374
      OpInOutInfo info;
375 376
      info.Build(vec_instruction_.at(inst_id).OpBase());
      if (info.IsInArgBufferNeeded(global_scope_->VarDesc(var_index)->Name())) {
377 378 379 380 381 382 383
        is_input_cnt++;
      }
    }
    return is_input_cnt == 1;
  }
}

384 385
void InterpreterCore::BuildInplace() {
  for (size_t i = 0; i < vec_instruction_.size(); ++i) {
386 387 388
    auto& instr = vec_instruction_[i];
    auto* op_base = instr.OpBase();
    if (!op_base->Info().infer_inplace_) {
389 390 391
      continue;
    }

392 393
    auto in_to_outs = op_base->Info().infer_inplace_(
        platform::is_gpu_place(instr.DeviceContext().GetPlace()));
394

395 396
    auto& inputs = instr.Inputs();
    auto& outputs = instr.Outputs();
397
    for (auto& pair : in_to_outs) {
398
      auto iter = inputs.find(pair.first);
399
      if (iter != inputs.end() && !iter->second.empty()) {
400 401 402 403 404 405 406
        auto in_var_desc = global_scope_->VarDesc(iter->second[0]);
        if (in_var_desc && in_var_desc->Persistable()) {
          continue;
        }
        if (global_scope_->GetVarSikpInplace(iter->second[0])) {
          continue;
        }
407
        if (BuildInplaceCheckVarIsOnlyInput(iter->second[0])) {
408
          auto iterout = outputs.find(pair.second);
409
          if (iterout != outputs.end() && !iterout->second.empty()) {
410 411
            auto invar = global_scope_->Var(iter->second[0]);
            auto outvar = global_scope_->Var(iterout->second[0]);
412 413
            if (invar && outvar && invar->IsType<LoDTensor>() &&
                outvar->IsType<LoDTensor>()) {
414
              instr.AddInplace(invar, outvar);
415 416
              VLOG(3) << "inplace " << vec_instruction_[i].OpBase()->Type()
                      << " " << global_scope_->GetNameById(iter->second[0])
417
                      << " -> "
418
                      << global_scope_->GetNameById(iterout->second[0])
419
                      << std::endl;
420 421 422 423 424 425
            }
          }
        }
      }
    }
  }
426 427
}

428
void InterpreterCore::BuildAndCacheInstructionCtx(Instruction* instr_node) {
429
  VariableValueMap ins_map;
430
  for (auto& var_name_item : instr_node->Inputs()) {
431 432 433 434
    std::vector<Variable*> input_vars;

    input_vars.reserve(var_name_item.second.size());
    for (auto& id : var_name_item.second) {
435
      input_vars.emplace_back(global_scope_->Var(id));
436 437 438 439 440
    }
    ins_map.emplace(var_name_item.first, std::move(input_vars));
  }

  VariableValueMap outs_map;
441
  for (auto& var_name_item : instr_node->Outputs()) {
442 443 444 445
    std::vector<Variable*> out_vars;

    out_vars.reserve(var_name_item.second.size());
    for (auto& id : var_name_item.second) {
446
      out_vars.emplace_back(global_scope_->Var(id));
447 448 449
    }
    outs_map.emplace(var_name_item.first, std::move(out_vars));
  }
450

451
  // set runtime_ctx and infershape_ctx_
452 453 454 455 456 457 458 459 460
  if (instr_node->OpBase()->Type() == "cinn_launch") {  // OP use scope in
                                                        // kernel
    Scope* local_scope = create_local_scope_
                             ? global_scope_->GetMutableLocalScope()
                             : global_scope_->GetMutableScope();
    instr_node->ResetContextWithScope(ins_map, outs_map, *local_scope);
  } else {
    instr_node->ResetContext(ins_map, outs_map);
  }
461 462
}

463 464 465
void InterpreterCore::BuildSkipShareLoDInfo() {
  for (size_t i = 0; i < vec_instruction_.size(); ++i) {
    bool can_skip_lod = true;
466
    for (auto& input : vec_instruction_[i].InnerRuntimeContext()->inputs) {
467 468 469 470 471 472 473 474 475 476 477 478
      for (auto& var : input.second) {
        if (var->IsType<LoDTensor>()) {
          if (var->Get<LoDTensor>().lod().size() != 0) {
            can_skip_lod = false;
            break;
          }
        } else {
          can_skip_lod = false;
          break;
        }
      }
    }
479
    vec_instruction_[i].InnerInferShapeContext()->SetSkipLoD(can_skip_lod);
480 481 482
  }
}

483
void InterpreterCore::RunInstruction(const Instruction& instr_node) {
484 485
  auto* op = instr_node.OpBase();
  auto place = instr_node.DeviceContext().GetPlace();
486 487 488 489
  VLOG(4) << "Start run " << place << " " << op->DebugStringEx(global_scope_);
  Scope* local_scope = create_local_scope_
                           ? global_scope_->GetMutableLocalScope()
                           : global_scope_->GetMutableScope();
490
  auto op_with_kernel = dynamic_cast<const framework::OperatorWithKernel*>(op);
491
  {
492
    // If it is OperatorBase, InferShape do nothing.
493 494 495 496
    if (op_with_kernel != nullptr) {
      platform::RecordEvent infershape_event(
          "infer_shape", platform::TracerEventType::OperatorInner, 1,
          platform::EventRole::kInnerOp);
497 498 499 500 501 502 503

      // see OperatorWithKernel::RunImpl in operator.cc for why
      if (!(op_with_kernel->HasAttr(kAllKernelsMustComputeRuntimeShape) &&
            op_with_kernel->Attr<bool>(kAllKernelsMustComputeRuntimeShape))) {
        op_with_kernel->Info().infer_shape_(
            instr_node.InnerInferShapeContext().get());
      }
504
    }
505
  }
506

507 508
  if (op_with_kernel != nullptr && FLAGS_new_executor_use_inplace) {
    // TODO(xiongkun03) Does operator base support inplace ?
509
    for (auto& pair : instr_node.InplaceInfo()) {
510 511 512 513 514 515 516 517
      const auto& in = paddle::framework::details::GetTensorFromVar(pair.first);
      auto* out =
          paddle::framework::details::GetMutableTensorFromVar(pair.second);
      if (in.dims() == out->dims()) {
        out->ShareBufferWith(in);
      }
    }
  }
518

519
  {
520
    platform::RecordEvent compute_event(
L
liutiexing 已提交
521
        "compute", platform::TracerEventType::OperatorInner, 1,
522
        platform::EventRole::kInnerOp);
523 524 525
    if (op_with_kernel == nullptr) {
      instr_node.OpBase()->Run(*local_scope, place_);
    } else {
526 527 528
      // fit for phi
      if (instr_node.PhiKernel() && instr_node.PhiKernel()->IsValid()) {
        VLOG(4) << "Run phi kernel: " << op->Type();
529 530
        VLOG(4) << instr_node.InnerRuntimeContext().get() << " "
                << &instr_node.DeviceContext();
531
        phi::KernelContext pt_kernel_context;
532
        op_with_kernel->BuildPhiKernelContext(
533
            *instr_node.InnerRuntimeContext().get(),
534 535
            const_cast<platform::DeviceContext*>(&instr_node.DeviceContext()),
            &pt_kernel_context);
536

537
        (*instr_node.PhiKernel())(&pt_kernel_context);
538 539 540 541

      } else {
        instr_node.KernelFunc()(*instr_node.InnerExecutionContext().get());
      }
542
    }
543
  }
544

545
  VLOG(4) << "End run " << place << " " << op->DebugStringEx(global_scope_);
546

547
  if (!instr_node.InplaceBackMap().empty()) {
L
liutiexing 已提交
548 549
    platform::RecordEvent inplaceback_event(
        "InplaceVarsBack", platform::TracerEventType::UserDefined, 10);
550 551 552 553 554 555 556 557 558 559 560 561 562 563
    auto& m = instr_node.InplaceBackMap();
    // NOTE(zhiqiu): same logic as TransferInplaceVarsBack() in operator.cc
    for (auto& p : m) {
      auto* transformed_tensor = GetMutableLoDTensorOrSelectedRowsValueFromVar(
          global_scope_->Var(p.first));
      auto* original_tensor = GetMutableLoDTensorOrSelectedRowsValueFromVar(
          global_scope_->Var(p.second));
      original_tensor->ShareDataWith(*transformed_tensor);
      VLOG(4) << "Transfer inplace variable back form "
              << global_scope_->GetNameById(p.first) << " to "
              << global_scope_->GetNameById(p.second);
    }
  }

564 565 566
  /*For profiling/benchmark only*/
  if (FLAGS_benchmark) {
    instr_node.DeviceContext().Wait();
567 568
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
    PADDLE_ENFORCE_GPU_SUCCESS(platform::GpuGetLastError());
569 570 571 572 573
    VLOG(4) << "Operator(" << op->Type()
            << "): context wait and get last error";
#endif
  }

574
  // for debug nan/inf
575
  if (op_with_kernel != nullptr && FLAGS_check_nan_inf) {
576
    VLOG(4) << "Check nan/inf";
577 578 579
    framework::details::CheckOpHasNanOrInf(
        *op, *global_scope_,
        place);  // TODO(xiongkun03) change it to inner scope.
580
  }
581 582 583
}

void InterpreterCore::ExecuteInstructionList(
584
    const std::vector<Instruction>& vec_instr) {
585 586 587 588 589 590
  unfinished_op_numer_ = vec_instr.size();
  if (unfinished_op_numer_ == 0) {
    VLOG(4) << "No op to run, return";
    return;
  }

L
liutiexing 已提交
591 592
  platform::RecordEvent record_prepare(
      "PrepareAtomic", platform::TracerEventType::UserDefined, 1);
593 594 595 596 597
  // NOTE(zhiqiu): get the prepared deps from std::future, and async prepare
  // those for the next step
  auto atomic_deps = async_work_queue_->AtomicDeps();
  auto atomic_var_ref = async_work_queue_->AtomicVarRef();

598
  async_work_queue_->PrepareAtomicDeps(dependecy_count_);
599
  async_work_queue_->PrepareAtomicVarRef(global_scope_->VecMetaInfo());
L
liutiexing 已提交
600
  record_prepare.End();
601

602 603
  exception_holder_.Clear();

604 605
  for (size_t i = 0; i < dependecy_count_.size(); ++i) {
    if (dependecy_count_[i] == 0) {
606 607 608 609 610 611
      async_work_queue_->AddTask(vec_instr.at(i).KernelType(),
                                 [this, i, atomic_deps = atomic_deps.get(),
                                  atomic_var_ref = atomic_var_ref.get()] {
                                   RunInstructionAsync(i, atomic_deps,
                                                       atomic_var_ref);
                                 });
612 613 614
    }
  }

615
  auto event_name = main_thread_blocker_.WaitEvent();
616
  VLOG(1) << "event_name: " << event_name;
617

618
  if (UNLIKELY(exception_holder_.IsCaught())) {
619
    VLOG(1) << "Exception caught " << exception_holder_.Type();
620 621 622 623 624
    // Graceful exit when the executor encountered a fatal error.
    // EOF is not a fatal error.
    if (exception_holder_.Type() != "EOF") {
      async_work_queue_->Cancel();
    }
625
    VLOG(4) << "Cancel ok";
626 627 628 629
    PADDLE_ENFORCE_EQ(
        main_thread_blocker_.Clear(), 0,
        platform::errors::PreconditionNotMet(
            "main_thread_blocker_.Clear() return -1, clear failed"));
630
    VLOG(4) << "clear ok";
631 632
    exception_holder_.ReThrow();
  }
633
}
634

L
liutiexing 已提交
635
void InterpreterCore::RunNextInstructions(
636 637 638
    const Instruction& instr, std::queue<size_t>* reserved_next_ops,
    std::vector<std::atomic<size_t>>* atomic_deps,
    std::vector<std::atomic<size_t>>* atomic_var_ref) {
L
liutiexing 已提交
639 640 641
  platform::RecordEvent record("RunNextInstructions",
                               platform::TracerEventType::UserDefined, 10);
  VLOG(4) << "atomic 1:" << atomic_deps;
642
  auto& next_instr = instr.NextInstructions();
643 644

  auto IsReady = [atomic_deps](size_t next_id) {
645 646
    VLOG(4) << "atomic:" << atomic_deps << " op_id: " << next_id
            << ", remain deps: " << (*atomic_deps)[next_id];
647
    return (*atomic_deps)[next_id].fetch_sub(1, std::memory_order_relaxed) == 1;
648 649
  };

650
  if (instr.KernelType() == OpFuncType::kQueueAsync) {
651
    // move all sync_ops into other threads
652
    for (auto next_id : next_instr.SyncRunIds()) {
653
      if (IsReady(next_id)) {
654
        async_work_queue_->AddTask(
655
            vec_instruction_[next_id].KernelType(),
656 657 658
            [this, next_id, atomic_deps, atomic_var_ref]() {
              RunInstructionAsync(next_id, atomic_deps, atomic_var_ref);
            });
659 660 661
      }
    }
    // keep all async_ops running in current thread
662
    for (auto next_id : next_instr.DirectRunIds()) {
663
      if (IsReady(next_id)) {
L
liutiexing 已提交
664
        reserved_next_ops->push(next_id);
665 666
      }
    }
667
    for (auto next_id : next_instr.EventRunIds()) {
668
      if (IsReady(next_id)) {
L
liutiexing 已提交
669
        reserved_next_ops->push(next_id);
670 671 672 673
      }
    }
  } else {
    // move async_ops into async_thread
674
    for (auto next_id : next_instr.EventRunIds()) {
675
      if (IsReady(next_id)) {
676
        async_work_queue_->AddTask(
677
            vec_instruction_[next_id].KernelType(),
678 679 680
            [this, next_id, atomic_deps, atomic_var_ref] {
              RunInstructionAsync(next_id, atomic_deps, atomic_var_ref);
            });
681 682
      }
    }
683 684
    auto direct_run_ops = interpreter::merge_vector(next_instr.SyncRunIds(),
                                                    next_instr.DirectRunIds());
685 686
    size_t first_op = 0;
    for (auto next_id : direct_run_ops) {
687 688
      if (IsReady(next_id)) {
        // only keep one op running in current thread
689 690
        if (first_op == 0) {
          first_op = next_id;
691 692 693
          continue;
        }
        // move rest ops into other threads
694
        async_work_queue_->AddTask(
695
            vec_instruction_[next_id].KernelType(),
696 697 698
            [this, next_id, atomic_deps, atomic_var_ref] {
              RunInstructionAsync(next_id, atomic_deps, atomic_var_ref);
            });
699 700
      }
    }
L
liutiexing 已提交
701
    if (first_op != 0) reserved_next_ops->push(first_op);
702 703 704
  }
}

705 706 707
void InterpreterCore::RunInstructionAsync(
    size_t instr_id, std::vector<std::atomic<size_t>>* atomic_deps,
    std::vector<std::atomic<size_t>>* atomic_var_ref) {
L
liutiexing 已提交
708 709 710 711 712
  std::queue<size_t> ready_ops;
  ready_ops.push(instr_id);
  while (!ready_ops.empty()) {
    instr_id = ready_ops.front();
    ready_ops.pop();
713
    auto& instr_node = vec_instruction_.at(instr_id);
L
liutiexing 已提交
714
    VLOG(5) << __func__ << " OP id:" << instr_node.Id()
715 716 717 718
            << " name:" << instr_node.OpBase()->Type() << " type:"
            << (instr_node.KernelType() == OpFuncType::kQueueSync
                    ? "kQueueSync"
                    : "kQueueAsync")
L
liutiexing 已提交
719 720
            << " runs on " << platform::GetCurrentThreadName();

721
    auto* op = instr_node.OpBase();
L
liutiexing 已提交
722 723
    platform::RecordEvent instruction_event(
        op->Type(), platform::TracerEventType::Operator, 1);
724

725
    try {
726 727
      interpreter::WaitEvent(instr_node, place_);

728
      RunInstruction(instr_node);
729 730 731 732

#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
      RecordStreamForGC(instr_node);
#endif
733
      CheckGC(instr_node, atomic_var_ref);
734 735

      interpreter::RecordEvent(instr_node, place_);
736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757
    } catch (platform::EnforceNotMet& ex) {
      framework::InsertCallStackInfo(op->Type(), op->Attrs(), &ex);
      exception_holder_.Catch(std::make_exception_ptr(std::move(ex)));
    } catch (platform::EOFException&) {
      exception_holder_.Catch(std::current_exception());
    } catch (std::exception& ex) {
      LOG(WARNING) << op->Type() << " raises an exception "
                   << platform::demangle(typeid(ex).name()) << ", "
                   << ex.what();
      exception_holder_.Catch(std::current_exception());
    } catch (...) {
      LOG(WARNING) << op->Type() << " raises an unknown exception";
      exception_holder_.Catch(std::current_exception());
    }

    if (UNLIKELY(exception_holder_.IsCaught())) {
      VLOG(4) << "Exception caught";
      if (exception_notifier_ != nullptr) {
        exception_notifier_->NotifyEvent();
      }
      return;
    }
758

759 760 761 762 763 764 765 766
    VLOG(4) << "unfinished_op_numer_: " << unfinished_op_numer_;
    if (UNLIKELY(unfinished_op_numer_.fetch_sub(1, std::memory_order_relaxed) ==
                 1)) {
      if (completion_notifier_ != nullptr) {
        completion_notifier_->NotifyEvent();
      }
    }

767
    RunNextInstructions(instr_node, &ready_ops, atomic_deps, atomic_var_ref);
L
liutiexing 已提交
768
  }
769 770
}

771 772 773 774 775 776
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
void InterpreterCore::RecordStreamForGC(const Instruction& instr) {
  if (!IsInterpretercoreFastGCEnabled() ||
      instr.KernelType() != OpFuncType::kQueueAsync) {
    return;
  }
L
liutiexing 已提交
777 778
  platform::RecordEvent record("RecordStreamForGC",
                               platform::TracerEventType::UserDefined, 10);
779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847

  gpuStream_t stream = reinterpret_cast<const platform::CUDADeviceContext&>(
                           instr.DeviceContext())
                           .stream();
  auto TensorRecordStream = [&stream](Tensor& tensor) {
    auto allocation = tensor.Holder();
    if (allocation == nullptr) {
      return;
    }

    const platform::Place& place = allocation->place();
    if (platform::is_gpu_place(place)) {
      memory::RecordStream(allocation, stream);
    } else if (platform::is_cuda_pinned_place(place)) {
      // TODO(Ruibiao): Here should do something to make sure that the tensor is
      // not freed until the H2D copies done. However, simplely launch a CUDA
      // runtime callback to the H2D stream may lead a high performance
      // overhead. As all the cases we meet in H2D are copies from CPUPlace at
      // present, we just log a WARNING here. A better design is required.
      LOG(WARNING) << "Copy data from a CUDAPinned tensor in an asynchronous "
                      "manner may lead a data inconsistent";
    } else {
      // memory copies involve CPUPlace are always synchronous, so just do
      // nothing here
    }
  };

  /* NOTE(Ruibiao):Cross-stream tensor synchronization is required only when
   * all the following conditions are satisfied:
   * 1. The tensor will be GC after running the instruction, i.e., in
   * instr.GCCheckVars.
   * 2. The stream which initializes this tensor is different from the stream
   * which the instruction run in.
   * 3. The tensor is the instruction's input, cause we assume that instruction
   * will initialize all output tensors with its running stream.
   * 4. In the OP function of this instruction, the tensor is an input of a
   * async CUDA kernel.
   *
   * Here we only process the first condition, because:
   * 1. Since the RecordStream function will directly return when the recored
   * stream is equal to the owning stream, recording a stream same as which
   * initialized this tensor has less time overhead. Conversely, it may take
   * more time if we try to extract those cross-stream input vars from
   * instr.GCCheckVars.
   * 2. Now the instruction has no idea of which vars involving async running in
   * OP function, and thus we can not recognize condition 4. It should be
   * supported later.
   */
  for (int var_id : instr.GCCheckVars()) {
    VLOG(4) << "GC sync " << global_scope_->GetNameById(var_id) << " "
            << global_scope_->VarDesc(var_id);

    // persistable var will be ignore while GC
    if (global_scope_->VarDesc(var_id) &&
        global_scope_->VarDesc(var_id)->Persistable()) {
      continue;
    }

    paddle::framework::Variable* var = global_scope_->Var(var_id);
    if (var == nullptr) {
      continue;
    }

    if (var->IsType<LoDTensor>()) {
      TensorRecordStream(*(var->GetMutable<LoDTensor>()));
    } else if (var->IsType<
                   operators::reader::
                       OrderedMultiDeviceLoDTensorBlockingQueueHolder>()) {
      // do nothing
848
    } else if (var->IsType<phi::SelectedRows>()) {
849
      TensorRecordStream(
850
          *(var->GetMutable<phi::SelectedRows>()->mutable_value()));
851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866
    } else if (var->IsType<LoDTensorArray>()) {
      auto* tensor_arr = var->GetMutable<LoDTensorArray>();
      for (auto& tensor : *tensor_arr) {
        TensorRecordStream(tensor);
      }
    } else if (var->IsType<std::vector<Scope*>>()) {
      // do nothing
    } else {
      PADDLE_THROW(platform::errors::Unimplemented(
          "The variable(%s) is not supported in eager deletion.",
          framework::ToTypeName(var->Type())));
    }
  }
}
#endif

867 868 869
void InterpreterCore::CheckGC(
    const Instruction& instr,
    std::vector<std::atomic<size_t>>* atomic_var_ref) {
L
liutiexing 已提交
870 871
  platform::RecordEvent record("CheckGC",
                               platform::TracerEventType::UserDefined, 10);
872
  size_t instr_id = instr.Id();
873 874
  auto& var_scope = *global_scope_;

875
  for (auto var_id : instr.GCCheckVars()) {
L
Leo Chen 已提交
876 877
    VLOG(4) << "GC " << global_scope_->GetNameById(var_id) << " "
            << var_scope.VarDesc(var_id);
878 879
    VLOG(4) << "atomic:" << atomic_var_ref << " " << &(*atomic_var_ref)[var_id]
            << " " << var_id;
880
    bool is_ready =
881
        (*atomic_var_ref)[var_id].fetch_sub(1, std::memory_order_relaxed) == 1;
882 883 884 885 886
    // ignore all persistable var while GC
    if (var_scope.VarDesc(var_id) && var_scope.VarDesc(var_id)->Persistable()) {
      continue;
    }
    if (is_ready) {
X
xiongkun 已提交
887 888
      VLOG(6) << "Async delete variable with name : "
              << var_scope.GetNameById(var_id);
889 890 891 892 893 894 895
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
      if (IsInterpretercoreFastGCEnabled()) {
        static_cast<InterpreterCoreFastGarbageCollector*>(gc_.get())->Add(
            var_scope.Var(var_id));

      } else {
        static_cast<InterpreterCoreEventGarbageCollector*>(gc_.get())->Add(
896
            var_scope.Var(var_id), &gc_event_.at(instr_id),
897 898 899 900
            &instr.DeviceContext());
      }
#else
      static_cast<InterpreterCoreEventGarbageCollector*>(gc_.get())->Add(
901
          var_scope.Var(var_id), &gc_event_.at(instr_id),
902 903
          &instr.DeviceContext());
#endif
W
wanghuancoder 已提交
904 905 906 907
    }
  }
}

908 909 910 911 912 913 914 915 916
void InterpreterCore::Prepare(
    const std::vector<std::string>& feed_names,
    const std::vector<framework::LoDTensor>& feed_tensors, bool prepare_feed) {
  PADDLE_ENFORCE_EQ(feed_names.size(), feed_tensors.size(),
                    platform::errors::PreconditionNotMet(
                        "Required feed_names.size() == feed_tensors.size(), "
                        "but received %d != %d",
                        feed_names.size(), feed_tensors.size()));

917
  auto FeedInput = [&] {
918
    VLOG(4) << "Feed inputs";
919 920
    for (size_t i = 0; i < feed_names.size(); ++i) {
      auto* feed_var = global_scope_->FindVar(feed_names[i]);
921 922 923
      PADDLE_ENFORCE_NOT_NULL(
          feed_var, platform::errors::NotFound(
                        "Variable %s should not be nullptr.", feed_names[i]));
924

925
      auto feed_tensor = feed_var->GetMutable<framework::LoDTensor>();
926
      feed_tensor->ShareDataWith(feed_tensors[i]);
927
      feed_tensor->set_lod(feed_tensors[i].lod());
928 929 930
    }
  };

931
  if (!is_build_) {
932 933
    paddle::framework::interpreter::build_variable_scope(block_, global_scope_,
                                                         create_local_scope_);
934
    FeedInput();
L
Leo Chen 已提交
935
    std::vector<paddle::framework::OpFuncNode> op_func_nodes;
936
    paddle::framework::interpreter::build_op_func_list(
937
        place_, block_, &op_func_nodes, global_scope_, create_local_scope_);
938
    is_build_ = true;
939
    SetFeedVarsInplaceSkip(feed_names);
940
    // convert vec func_list to graph
L
Leo Chen 已提交
941
    Convert(&op_func_nodes);
942
  }
W
wanghuancoder 已提交
943 944
  // NOTE: Because feed_tensor will be GC after
  // paddle::framework::build_op_func_list, so we should
945
  // call FeedInput again.
946 947 948
  if (prepare_feed) {
    FeedInput();
  }
949 950
}

951 952
interpreter::CostInfo InterpreterCore::DryRun(
    const std::vector<std::string>& feed_names,
953
    const std::vector<framework::LoDTensor>& feed_tensors) {
954
  global_scope_->SetLocalScope(local_scope_);
955 956 957 958
  Prepare(feed_names, feed_tensors, true);
  interpreter::CostInfo cost_info;
  {
    interpreter::ProfilerGuard(place_, &cost_info);
959 960 961 962 963 964 965 966 967 968 969 970

    // For the program that only run once, it is no need to
    // create work_queue, so the async_work_queue_ is created
    // until the second step run.
    if (async_work_queue_ == nullptr) {
      async_work_queue_ = std::make_unique<interpreter::AsyncWorkQueue>(
          kHostNumThreads, kDeviceNumThreads, &main_thread_blocker_);
      // prepare for the first time.
      async_work_queue_->PrepareAtomicDeps(dependecy_count_);
      async_work_queue_->PrepareAtomicVarRef(global_scope_->VecMetaInfo());
    }

971 972 973 974
    ExecuteInstructionList(vec_instruction_);
    platform::DeviceContextPool::Instance().Get(place_)->Wait();
  }

975 976 977 978
  if (create_local_scope_) {
    ClearLoDTensorArrayInLocalScope();
  }

979
  return cost_info;
980
}
981

982 983 984 985 986 987 988
void InterpreterCore::SetFeedVarsInplaceSkip(
    const std::vector<std::string>& feed_names) {
  for (auto& feed_name : feed_names) {
    global_scope_->SetVarSikpInplace(feed_name, true);
  }
}

989 990
}  // namespace framework
}  // namespace paddle