interpretercore.cc 31.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
14

15
#include "paddle/fluid/framework/new_executor/interpretercore.h"
16
#include <unordered_set>
17
#include "paddle/fluid/framework/details/nan_inf_utils.h"
18
#include "paddle/fluid/framework/details/share_tensor_buffer_functor.h"
19 20
#include "paddle/fluid/framework/new_executor/garbage_collector/event_garbage_collector.h"
#include "paddle/fluid/framework/new_executor/garbage_collector/fast_garbage_collector.h"
21 22
#include "paddle/fluid/framework/new_executor/interpretercore_util.h"
#include "paddle/fluid/framework/operator.h"
L
liutiexing 已提交
23 24
#include "paddle/fluid/platform/os_info.h"
#include "paddle/fluid/platform/profiler/event_tracing.h"
L
Leo Chen 已提交
25 26 27
#ifdef PADDLE_WITH_MKLDNN
#include "paddle/fluid/platform/mkldnn_helper.h"
#endif
28

29 30
PADDLE_DEFINE_EXPORTED_bool(new_executor_use_inplace, true,
                            "Use inplace in new executor");
31 32 33
PADDLE_DEFINE_EXPORTED_bool(new_executor_use_local_scope, true,
                            "Use local_scope in new executor(especially used "
                            "in UT), can turn off for better performance");
34

35
DECLARE_bool(check_nan_inf);
36
DECLARE_bool(benchmark);
37
DECLARE_bool(fast_eager_deletion_mode);
38

39
constexpr const char* kExceptionCaught = "ExceptionCaught";
40
constexpr const char* kTaskCompletion = "TaskCompletion";
41

42 43
namespace paddle {
namespace framework {
44 45
// NOTE(Aurelius84): Need a better strategy to determine it.
static constexpr size_t kHostNumThreads = 4;
46
static constexpr size_t kDeviceNumThreads = 1;
47

48
bool IsInterpretercoreFastGCEnabled() {
49 50 51
  return memory::allocation::AllocatorFacade::Instance()
             .IsStreamSafeCUDAAllocatorUsed() &&
         FLAGS_fast_eager_deletion_mode;
52 53
}

54 55 56
InterpreterCore::InterpreterCore(const platform::Place& place,
                                 const BlockDesc& block,
                                 VariableScope* global_scope)
W
wanghuancoder 已提交
57
    : place_(place),
58
      block_(block),
W
wanghuancoder 已提交
59
      global_scope_(global_scope),
60
      stream_analyzer_(place) {
L
Leo Chen 已提交
61
  VLOG(4) << "InterpreterCore(): " << this << " on " << place_;
62
  is_build_ = false;
63 64
  async_work_queue_.reset(new interpreter::AsyncWorkQueue(
      kHostNumThreads, kDeviceNumThreads, &main_thread_blocker_));
65 66 67 68 69 70 71 72 73 74

#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
  if (IsInterpretercoreFastGCEnabled()) {
    gc_ = std::make_unique<InterpreterCoreFastGarbageCollector>();
  } else {
    gc_ = std::make_unique<InterpreterCoreEventGarbageCollector>();
  }
#else
  gc_ = std::make_unique<InterpreterCoreEventGarbageCollector>();
#endif
W
wanghuancoder 已提交
75

L
liutiexing 已提交
76
  exception_notifier_ = main_thread_blocker_.RegisterEvent(kExceptionCaught);
77
  completion_notifier_ = main_thread_blocker_.RegisterEvent(kTaskCompletion);
78

79 80 81 82 83 84 85 86
  create_local_scope_ = FLAGS_new_executor_use_local_scope;
  if (FLAGS_new_executor_use_local_scope) {
    auto local_scope = &global_scope->GetMutableScope()->NewScope();
    local_scope->AddListener(global_scope->Listener());
    local_scope_ = local_scope;
  }
  VLOG(4) << "create_local_scope_ is " << create_local_scope_;

W
wanghuancoder 已提交
87 88 89 90 91 92 93
  // prune

  // optmize graph pass

  // convert to run graph
}

94 95 96 97 98
InterpreterCore::~InterpreterCore() {
  // cancle gc's thread
  gc_.reset(nullptr);

  async_work_queue_.reset(nullptr);
L
Leo Chen 已提交
99 100 101 102 103 104 105 106
  VLOG(4) << "~InterpreterCore(): " << this;
  VLOG(4) << " on" << place_;

#ifdef PADDLE_WITH_MKLDNN
  // Clear mkl-dnn cache,
  // this is needed to have mkl-dnn unit tests working
  platform::ClearMKLDNNCache(place_, this);
#endif
107 108
}

109 110 111 112
void InterpreterCore::SetCopyProgram(std::shared_ptr<ProgramDesc> prog) {
  copy_program_ = prog;
}

W
wanghuancoder 已提交
113
paddle::framework::FetchList InterpreterCore::Run(
114
    const std::vector<std::string>& feed_names,
115
    const std::vector<framework::LoDTensor>& feed_tensors) {
L
Leo Chen 已提交
116 117 118
#ifdef PADDLE_WITH_MKLDNN
  platform::AttachPointerHashToMKLDNNKey(this, place_);
#endif
119
  bool is_build = is_build_;
120
  global_scope_->SetLocalScope(local_scope_);
121
  Prepare(feed_names, feed_tensors, is_build);
122

123
  if (is_build) {
124
    ExecuteInstructionList(vec_instruction_);
125 126
  }

127 128 129 130
  if (create_local_scope_) {
    ClearLoDTensorArrayInLocalScope();
  }

W
wanghuancoder 已提交
131
  // return Fetch Tensors
132
  auto* fetch_var = global_scope_->Var(interpreter::kFetchVarName);
133
  return std::move(*fetch_var->GetMutable<framework::FetchList>());
134 135
}

136 137
paddle::framework::FetchList InterpreterCore::Run(
    const std::vector<std::string>& feed_names) {
L
Leo Chen 已提交
138 139 140
#ifdef PADDLE_WITH_MKLDNN
  platform::AttachPointerHashToMKLDNNKey(this, place_);
#endif
141
  if (!is_build_) {
142 143 144 145 146 147 148 149 150 151 152 153 154 155
    if (create_local_scope_ &&
        global_scope_->GetMutableLocalScope() !=
            global_scope_->GetMutableScope() &&
        global_scope_->GetMutableLocalScope()) {
      VLOG(4) << "Clear previous local scope before run";
      VLOG(4) << global_scope_->GetMutableScope() << " "
              << global_scope_->GetMutableLocalScope();
      platform::DeviceContextPool::Instance().Get(place_)->Wait();
      // TODO(zhiqiu): clear the tensor holder of all vars in previous local
      // scope?
    }
    global_scope_->SetLocalScope(local_scope_);
    paddle::framework::interpreter::build_variable_scope(block_, global_scope_,
                                                         create_local_scope_);
156 157
    std::vector<paddle::framework::OpFuncNode> op_func_nodes;
    paddle::framework::interpreter::build_op_func_list(
158
        place_, block_, &op_func_nodes, global_scope_, create_local_scope_);
159
    is_build_ = true;
160
    SetFeedVarsInplaceSkip(feed_names);
161 162
    // convert vec func_list to graph
    Convert(&op_func_nodes);
163

164 165 166 167
  } else {
    ExecuteInstructionList(vec_instruction_);
  }

168 169 170 171
  if (create_local_scope_) {
    ClearLoDTensorArrayInLocalScope();
  }

172 173
  // return Fetch Tensors
  auto* fetch_var = global_scope_->Var(interpreter::kFetchVarName);
174
  return std::move(*fetch_var->GetMutable<framework::FetchList>());
175 176
}

177 178 179 180 181 182 183 184 185 186 187 188 189
// At the end of each step, the holder of Tensor in LoDTensorArray is null.
// Clear these Tensors and leave LoDTensorArray empty, otherwise an exception
// will occur in the next step
void InterpreterCore::ClearLoDTensorArrayInLocalScope() {
  auto vars = local_scope_->LocalVars();
  for (auto var : vars) {
    if (var->IsType<LoDTensorArray>()) {
      auto* lod_tensor_arr = var->GetMutable<LoDTensorArray>();
      lod_tensor_arr->clear();
    }
  }
}

X
xiongkun 已提交
190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206
void InterpreterCore::BuildOperatorDependences() {
  // analysis the dependences between ops, set the dependecy_count_ and Call
  // Schedule
  auto op_nums = vec_instruction_.size();
  dependecy_count_.resize(op_nums);
  auto op2downstream = interpreter::build_op_downstream_map(vec_instruction_);
  for (size_t op = 0; op < vec_instruction_.size(); ++op) {
    auto op_list = op2downstream[op];
    std::vector<size_t> downsteam_vector(op_list.begin(), op_list.end());
    stream_analyzer_.Schedule(downsteam_vector, &vec_instruction_, op);

    for (auto inst_id : op_list) {
      dependecy_count_[inst_id]++;
    }
  }
}

L
Leo Chen 已提交
207 208
void InterpreterCore::Convert(
    std::vector<paddle::framework::OpFuncNode>* op_func_nodes) {
209
  auto& vec_meta_info = global_scope_->MutableVecMetaInfo();
210 211
  auto var_nums = global_scope_->VarSize();
  input_var2op_info_.resize(var_nums);
L
Leo Chen 已提交
212
  auto nodes = *op_func_nodes;
213

L
Leo Chen 已提交
214
  auto op_nums = nodes.size();
215 216 217
  vec_instruction_.reserve(op_nums);

  for (size_t op_idx = 0; op_idx < op_nums; ++op_idx) {
L
Leo Chen 已提交
218
    auto& op_func_node = nodes[op_idx];
219
    auto* dev_ctx_ = stream_analyzer_.ParseDeviceContext(op_func_node);
W
wanghuancoder 已提交
220

L
Leo Chen 已提交
221
    vec_instruction_.emplace_back(op_idx, std::move(op_func_node), *dev_ctx_);
222 223 224
    auto& instr = vec_instruction_.back();

    OpInOutInfo info;
225
    std::vector<size_t> gc_check_input_list;
226 227

    for (auto& item : op_func_node.input_index) {
228
      for (auto id : item.second) {
W
wanghuancoder 已提交
229 230 231
        if (id == kEmptyVarIndex) {
          continue;
        }
232
        input_var2op_info_.at(id).push_back(op_idx);
W
wanghuancoder 已提交
233 234
        // var can be gc-ed
        if (!info.IsBuilt()) {
L
Leo Chen 已提交
235
          info.Build(op_func_node.operator_base_.get());
W
wanghuancoder 已提交
236
        }
237 238 239
        auto* var_desc = global_scope_->VarDesc(id);
        if (var_desc) {
          if (info.IsInArgBufferNeeded(var_desc->Name())) {
W
wanghuancoder 已提交
240 241 242 243 244
            gc_check_input_list.push_back(id);
          }
        } else {
          gc_check_input_list.push_back(id);
        }
245 246 247 248 249 250
      }
    }
    std::sort(gc_check_input_list.begin(), gc_check_input_list.end());
    auto last =
        std::unique(gc_check_input_list.begin(), gc_check_input_list.end());
    gc_check_input_list.erase(last, gc_check_input_list.end());
251

252
    for (auto var_id : gc_check_input_list) {
253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272
      paddle::framework::Variable* var = global_scope_->Var(var_id);
      if (var->IsType<LoDTensor>() || var->IsType<phi::SelectedRows>() ||
          var->IsType<LoDTensorArray>()) {
        vec_meta_info[var_id].var_ref_count_++;
        // TODO(zhiqiu): not all var needs to be checked, var need to be checked
        // only
        // after the last_live_op. For example,
        // b = op1(a)
        // c = op2(a, b)
        // in this case, a is the input of op1 and op2, we only need to check
        // a after op2, because op2 always uses a after op1.
        instr.AddGCCheckVar(var_id);
        VLOG(4) << "clear " << global_scope_->GetNameById(var_id) << " after "
                << instr.OpBase()->Type();
      } else {
        VLOG(4) << "not clear " << global_scope_->GetNameById(var_id)
                << " after " << instr.OpBase()->Type()
                << " because its type is "
                << framework::ToTypeName(var->Type());
      }
273 274 275
    }
  }

W
wanghuancoder 已提交
276 277
  for (size_t i = 0; i < vec_instruction_.size(); ++i) {
    // checkout ouput
278
    for (auto& item : vec_instruction_[i].Outputs()) {
W
wanghuancoder 已提交
279
      for (auto id : item.second) {
280
        if (input_var2op_info_.at(id).size() == 0) {
W
wanghuancoder 已提交
281
          // output var not be used by any kernel
282
          vec_instruction_[i].AddGCCheckVar(id);
L
Leo Chen 已提交
283 284
          VLOG(4) << "clear " << global_scope_->GetNameById(id) << " after "
                  << vec_instruction_[i].OpBase()->Type();
285
          vec_meta_info[id].var_ref_count_++;
W
wanghuancoder 已提交
286 287 288 289 290
        }
      }
    }
  }

X
xiongkun 已提交
291
  BuildOperatorDependences();
292 293

  for (size_t i = 0; i < vec_instruction_.size(); ++i) {
294
    BuildAndCacheInstructionCtx(&vec_instruction_[i]);
295
  }
W
wanghuancoder 已提交
296

297 298
  BuildSkipShareLoDInfo();

W
wanghuancoder 已提交
299
  for (size_t i = 0; i < vec_instruction_.size(); ++i) {
300
    gc_event_.emplace_back(vec_instruction_[i].DeviceContext().GetPlace(),
W
wanghuancoder 已提交
301 302
                           platform::GenerateDeviceEventFlag());
  }
303 304 305 306

  if (FLAGS_new_executor_use_inplace) {
    BuildInplace();
  }
307 308 309 310

  // prepare for the first time.
  async_work_queue_->PrepareAtomicDeps(dependecy_count_);
  async_work_queue_->PrepareAtomicVarRef(vec_meta_info);
311 312
}

313
bool InterpreterCore::BuildInplaceCheckVarIsOnlyInput(size_t var_index) {
314 315
  if (!global_scope_->VarDesc(var_index)) {
    return input_var2op_info_.at(var_index).size() == 1;
316 317
  } else {
    int is_input_cnt = 0;
318
    for (auto inst_id : input_var2op_info_.at(var_index)) {
319
      OpInOutInfo info;
320 321
      info.Build(vec_instruction_.at(inst_id).OpBase());
      if (info.IsInArgBufferNeeded(global_scope_->VarDesc(var_index)->Name())) {
322 323 324 325 326 327 328
        is_input_cnt++;
      }
    }
    return is_input_cnt == 1;
  }
}

329 330
void InterpreterCore::BuildInplace() {
  for (size_t i = 0; i < vec_instruction_.size(); ++i) {
331 332 333
    auto& instr = vec_instruction_[i];
    auto* op_base = instr.OpBase();
    if (!op_base->Info().infer_inplace_) {
334 335 336
      continue;
    }

337 338
    auto in_to_outs = op_base->Info().infer_inplace_(
        platform::is_gpu_place(instr.DeviceContext().GetPlace()));
339

340 341
    auto& inputs = instr.Inputs();
    auto& outputs = instr.Outputs();
342
    for (auto& pair : in_to_outs) {
343
      auto iter = inputs.find(pair.first);
344
      if (iter != inputs.end() && !iter->second.empty()) {
345 346 347 348 349 350 351
        auto in_var_desc = global_scope_->VarDesc(iter->second[0]);
        if (in_var_desc && in_var_desc->Persistable()) {
          continue;
        }
        if (global_scope_->GetVarSikpInplace(iter->second[0])) {
          continue;
        }
352
        if (BuildInplaceCheckVarIsOnlyInput(iter->second[0])) {
353
          auto iterout = outputs.find(pair.second);
354
          if (iterout != outputs.end() && !iterout->second.empty()) {
355 356
            auto invar = global_scope_->Var(iter->second[0]);
            auto outvar = global_scope_->Var(iterout->second[0]);
357 358
            if (invar && outvar && invar->IsType<LoDTensor>() &&
                outvar->IsType<LoDTensor>()) {
359
              instr.AddInplace(invar, outvar);
360 361
              VLOG(3) << "inplace " << vec_instruction_[i].OpBase()->Type()
                      << " " << global_scope_->GetNameById(iter->second[0])
362
                      << " -> "
363
                      << global_scope_->GetNameById(iterout->second[0])
364
                      << std::endl;
365 366 367 368 369 370
            }
          }
        }
      }
    }
  }
371 372
}

373
void InterpreterCore::BuildAndCacheInstructionCtx(Instruction* instr_node) {
374
  VariableValueMap ins_map;
375
  for (auto& var_name_item : instr_node->Inputs()) {
376 377 378 379
    std::vector<Variable*> input_vars;

    input_vars.reserve(var_name_item.second.size());
    for (auto& id : var_name_item.second) {
380
      input_vars.emplace_back(global_scope_->Var(id));
381 382 383 384 385
    }
    ins_map.emplace(var_name_item.first, std::move(input_vars));
  }

  VariableValueMap outs_map;
386
  for (auto& var_name_item : instr_node->Outputs()) {
387 388 389 390
    std::vector<Variable*> out_vars;

    out_vars.reserve(var_name_item.second.size());
    for (auto& id : var_name_item.second) {
391
      out_vars.emplace_back(global_scope_->Var(id));
392 393 394
    }
    outs_map.emplace(var_name_item.first, std::move(out_vars));
  }
395 396
  // set runtime_ctx and infershape_ctx_
  instr_node->ResetContext(ins_map, outs_map);
397 398
}

399 400 401
void InterpreterCore::BuildSkipShareLoDInfo() {
  for (size_t i = 0; i < vec_instruction_.size(); ++i) {
    bool can_skip_lod = true;
402
    for (auto& input : vec_instruction_[i].InnerRuntimeContext()->inputs) {
403 404 405 406 407 408 409 410 411 412 413 414
      for (auto& var : input.second) {
        if (var->IsType<LoDTensor>()) {
          if (var->Get<LoDTensor>().lod().size() != 0) {
            can_skip_lod = false;
            break;
          }
        } else {
          can_skip_lod = false;
          break;
        }
      }
    }
415
    vec_instruction_[i].InnerInferShapeContext()->SetSkipLoD(can_skip_lod);
416 417 418
  }
}

419
void InterpreterCore::RunInstruction(const Instruction& instr_node) {
420 421
  auto* op = instr_node.OpBase();
  auto place = instr_node.DeviceContext().GetPlace();
422 423 424 425
  VLOG(4) << "Start run " << place << " " << op->DebugStringEx(global_scope_);
  Scope* local_scope = create_local_scope_
                           ? global_scope_->GetMutableLocalScope()
                           : global_scope_->GetMutableScope();
426
  auto op_with_kernel = dynamic_cast<const framework::OperatorWithKernel*>(op);
427
  {
428 429 430 431 432
    if (op_with_kernel != nullptr) {
      platform::RecordEvent infershape_event(
          "infer_shape", platform::TracerEventType::OperatorInner, 1,
          platform::EventRole::kInnerOp);
      // If it is OperatorBase, InferShape do nothing.
433 434
      op_with_kernel->Info().infer_shape_(
          instr_node.InnerInferShapeContext().get());
435
    }
436
  }
437

438 439
  if (op_with_kernel != nullptr && FLAGS_new_executor_use_inplace) {
    // TODO(xiongkun03) Does operator base support inplace ?
440
    for (auto& pair : instr_node.InplaceInfo()) {
441 442 443 444 445 446 447 448
      const auto& in = paddle::framework::details::GetTensorFromVar(pair.first);
      auto* out =
          paddle::framework::details::GetMutableTensorFromVar(pair.second);
      if (in.dims() == out->dims()) {
        out->ShareBufferWith(in);
      }
    }
  }
449

450
  {
451
    platform::RecordEvent compute_event(
L
liutiexing 已提交
452
        "compute", platform::TracerEventType::OperatorInner, 1,
453
        platform::EventRole::kInnerOp);
454 455 456
    if (op_with_kernel == nullptr) {
      instr_node.OpBase()->Run(*local_scope, place_);
    } else {
457 458 459
      // fit for phi
      if (instr_node.PhiKernel() && instr_node.PhiKernel()->IsValid()) {
        VLOG(4) << "Run phi kernel: " << op->Type();
460 461
        VLOG(4) << instr_node.InnerRuntimeContext().get() << " "
                << &instr_node.DeviceContext();
462
        phi::KernelContext pt_kernel_context;
463
        op_with_kernel->BuildPhiKernelContext(
464
            *instr_node.InnerRuntimeContext().get(),
465 466
            const_cast<platform::DeviceContext*>(&instr_node.DeviceContext()),
            &pt_kernel_context);
467

468
        (*instr_node.PhiKernel())(&pt_kernel_context);
469 470 471 472

      } else {
        instr_node.KernelFunc()(*instr_node.InnerExecutionContext().get());
      }
473
    }
474
  }
475

476
  VLOG(4) << "End run " << place << " " << op->DebugStringEx(global_scope_);
477

478 479 480 481 482 483 484 485 486 487 488 489 490 491 492
  if (!instr_node.InplaceBackMap().empty()) {
    auto& m = instr_node.InplaceBackMap();
    // NOTE(zhiqiu): same logic as TransferInplaceVarsBack() in operator.cc
    for (auto& p : m) {
      auto* transformed_tensor = GetMutableLoDTensorOrSelectedRowsValueFromVar(
          global_scope_->Var(p.first));
      auto* original_tensor = GetMutableLoDTensorOrSelectedRowsValueFromVar(
          global_scope_->Var(p.second));
      original_tensor->ShareDataWith(*transformed_tensor);
      VLOG(4) << "Transfer inplace variable back form "
              << global_scope_->GetNameById(p.first) << " to "
              << global_scope_->GetNameById(p.second);
    }
  }

493 494 495
  /*For profiling/benchmark only*/
  if (FLAGS_benchmark) {
    instr_node.DeviceContext().Wait();
496 497
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
    PADDLE_ENFORCE_GPU_SUCCESS(platform::GpuGetLastError());
498 499 500 501 502
    VLOG(4) << "Operator(" << op->Type()
            << "): context wait and get last error";
#endif
  }

503
  // for debug nan/inf
504
  if (op_with_kernel != nullptr && FLAGS_check_nan_inf) {
505
    VLOG(4) << "Check nan/inf";
506 507 508
    framework::details::CheckOpHasNanOrInf(
        *op, *global_scope_,
        place);  // TODO(xiongkun03) change it to inner scope.
509
  }
510 511 512
}

void InterpreterCore::ExecuteInstructionList(
513
    const std::vector<Instruction>& vec_instr) {
514 515 516 517 518
  // NOTE(zhiqiu): get the prepared deps from std::future, and async prepare
  // those for the next step
  auto atomic_deps = async_work_queue_->AtomicDeps();
  auto atomic_var_ref = async_work_queue_->AtomicVarRef();

519
  async_work_queue_->PrepareAtomicDeps(dependecy_count_);
520
  async_work_queue_->PrepareAtomicVarRef(global_scope_->VecMetaInfo());
521

522
  unfinished_op_numer_ = vec_instr.size();
523

524 525
  exception_holder_.Clear();

526 527
  for (size_t i = 0; i < dependecy_count_.size(); ++i) {
    if (dependecy_count_[i] == 0) {
528 529 530 531
      async_work_queue_->AddTask(vec_instr.at(i).KernelType(), [
        this, i, atomic_deps = atomic_deps.get(),
        atomic_var_ref = atomic_var_ref.get()
      ] { RunInstructionAsync(i, atomic_deps, atomic_var_ref); });
532 533 534
    }
  }

535
  auto event_name = main_thread_blocker_.WaitEvent();
536
  VLOG(1) << "event_name: " << event_name;
537

538
  if (UNLIKELY(exception_holder_.IsCaught())) {
539
    VLOG(1) << "Exception caught " << exception_holder_.Type();
540 541 542 543 544
    // Graceful exit when the executor encountered a fatal error.
    // EOF is not a fatal error.
    if (exception_holder_.Type() != "EOF") {
      async_work_queue_->Cancel();
    }
545
    VLOG(4) << "Cancel ok";
546 547 548 549
    PADDLE_ENFORCE_EQ(
        main_thread_blocker_.Clear(), 0,
        platform::errors::PreconditionNotMet(
            "main_thread_blocker_.Clear() return -1, clear failed"));
550
    VLOG(4) << "clear ok";
551 552
    exception_holder_.ReThrow();
  }
553
}
554

L
liutiexing 已提交
555
void InterpreterCore::RunNextInstructions(
556 557 558 559
    const Instruction& instr, std::queue<size_t>* reserved_next_ops,
    std::vector<std::atomic<size_t>>* atomic_deps,
    std::vector<std::atomic<size_t>>* atomic_var_ref) {
  VLOG(4) << "atomic 1:" << atomic_deps;
560
  auto& next_instr = instr.NextInstructions();
561 562 563 564 565

  auto IsReady = [atomic_deps](size_t next_id) {
    VLOG(4) << "atomic:" << atomic_deps << " " << &(*atomic_deps)[next_id]
            << " " << next_id;
    return (*atomic_deps)[next_id].fetch_sub(1, std::memory_order_relaxed) == 1;
566 567
  };

568
  if (instr.KernelType() == OpFuncType::kQueueAsync) {
569
    // move all sync_ops into other threads
570
    for (auto next_id : next_instr.SyncRunIds()) {
571
      if (IsReady(next_id)) {
572
        async_work_queue_->AddTask(
573
            vec_instruction_[next_id].KernelType(),
574 575 576
            [this, next_id, atomic_deps, atomic_var_ref]() {
              RunInstructionAsync(next_id, atomic_deps, atomic_var_ref);
            });
577 578 579
      }
    }
    // keep all async_ops running in current thread
580
    for (auto next_id : next_instr.DirectRunIds()) {
581
      if (IsReady(next_id)) {
L
liutiexing 已提交
582
        reserved_next_ops->push(next_id);
583 584
      }
    }
585
    for (auto next_id : next_instr.EventRunIds()) {
586
      if (IsReady(next_id)) {
L
liutiexing 已提交
587
        reserved_next_ops->push(next_id);
588 589 590 591
      }
    }
  } else {
    // move async_ops into async_thread
592
    for (auto next_id : next_instr.EventRunIds()) {
593
      if (IsReady(next_id)) {
594
        async_work_queue_->AddTask(
595
            vec_instruction_[next_id].KernelType(),
596 597 598
            [this, next_id, atomic_deps, atomic_var_ref] {
              RunInstructionAsync(next_id, atomic_deps, atomic_var_ref);
            });
599 600
      }
    }
601 602
    auto direct_run_ops = interpreter::merge_vector(next_instr.SyncRunIds(),
                                                    next_instr.DirectRunIds());
603 604
    size_t first_op = 0;
    for (auto next_id : direct_run_ops) {
605 606
      if (IsReady(next_id)) {
        // only keep one op running in current thread
607 608
        if (first_op == 0) {
          first_op = next_id;
609 610 611
          continue;
        }
        // move rest ops into other threads
612
        async_work_queue_->AddTask(
613
            vec_instruction_[next_id].KernelType(),
614 615 616
            [this, next_id, atomic_deps, atomic_var_ref] {
              RunInstructionAsync(next_id, atomic_deps, atomic_var_ref);
            });
617 618
      }
    }
L
liutiexing 已提交
619
    if (first_op != 0) reserved_next_ops->push(first_op);
620 621 622
  }
}

623 624 625
void InterpreterCore::RunInstructionAsync(
    size_t instr_id, std::vector<std::atomic<size_t>>* atomic_deps,
    std::vector<std::atomic<size_t>>* atomic_var_ref) {
L
liutiexing 已提交
626 627 628 629 630
  std::queue<size_t> ready_ops;
  ready_ops.push(instr_id);
  while (!ready_ops.empty()) {
    instr_id = ready_ops.front();
    ready_ops.pop();
631
    auto& instr_node = vec_instruction_.at(instr_id);
L
liutiexing 已提交
632 633 634 635 636 637 638
    VLOG(5) << __func__ << " OP id:" << instr_node.Id()
            << " name:" << instr_node.OpBase()->Type()
            << " type:" << (instr_node.KernelType() == OpFuncType::kQueueSync
                                ? "kQueueSync"
                                : "kQueueAsync")
            << " runs on " << platform::GetCurrentThreadName();

639
    auto* op = instr_node.OpBase();
L
liutiexing 已提交
640 641
    platform::RecordEvent instruction_event(
        op->Type(), platform::TracerEventType::Operator, 1);
642

643
    try {
644 645
      interpreter::WaitEvent(instr_node, place_);

646
      RunInstruction(instr_node);
647 648 649 650

#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
      RecordStreamForGC(instr_node);
#endif
651
      CheckGC(instr_node, atomic_var_ref);
652 653

      interpreter::RecordEvent(instr_node, place_);
654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675
    } catch (platform::EnforceNotMet& ex) {
      framework::InsertCallStackInfo(op->Type(), op->Attrs(), &ex);
      exception_holder_.Catch(std::make_exception_ptr(std::move(ex)));
    } catch (platform::EOFException&) {
      exception_holder_.Catch(std::current_exception());
    } catch (std::exception& ex) {
      LOG(WARNING) << op->Type() << " raises an exception "
                   << platform::demangle(typeid(ex).name()) << ", "
                   << ex.what();
      exception_holder_.Catch(std::current_exception());
    } catch (...) {
      LOG(WARNING) << op->Type() << " raises an unknown exception";
      exception_holder_.Catch(std::current_exception());
    }

    if (UNLIKELY(exception_holder_.IsCaught())) {
      VLOG(4) << "Exception caught";
      if (exception_notifier_ != nullptr) {
        exception_notifier_->NotifyEvent();
      }
      return;
    }
676

677 678 679 680 681 682 683 684
    VLOG(4) << "unfinished_op_numer_: " << unfinished_op_numer_;
    if (UNLIKELY(unfinished_op_numer_.fetch_sub(1, std::memory_order_relaxed) ==
                 1)) {
      if (completion_notifier_ != nullptr) {
        completion_notifier_->NotifyEvent();
      }
    }

685
    RunNextInstructions(instr_node, &ready_ops, atomic_deps, atomic_var_ref);
L
liutiexing 已提交
686
  }
687 688
}

689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
void InterpreterCore::RecordStreamForGC(const Instruction& instr) {
  if (!IsInterpretercoreFastGCEnabled() ||
      instr.KernelType() != OpFuncType::kQueueAsync) {
    return;
  }

  gpuStream_t stream = reinterpret_cast<const platform::CUDADeviceContext&>(
                           instr.DeviceContext())
                           .stream();
  auto TensorRecordStream = [&stream](Tensor& tensor) {
    auto allocation = tensor.Holder();
    if (allocation == nullptr) {
      return;
    }

    const platform::Place& place = allocation->place();
    if (platform::is_gpu_place(place)) {
      memory::RecordStream(allocation, stream);
    } else if (platform::is_cuda_pinned_place(place)) {
      // TODO(Ruibiao): Here should do something to make sure that the tensor is
      // not freed until the H2D copies done. However, simplely launch a CUDA
      // runtime callback to the H2D stream may lead a high performance
      // overhead. As all the cases we meet in H2D are copies from CPUPlace at
      // present, we just log a WARNING here. A better design is required.
      LOG(WARNING) << "Copy data from a CUDAPinned tensor in an asynchronous "
                      "manner may lead a data inconsistent";
    } else {
      // memory copies involve CPUPlace are always synchronous, so just do
      // nothing here
    }
  };

  /* NOTE(Ruibiao):Cross-stream tensor synchronization is required only when
   * all the following conditions are satisfied:
   * 1. The tensor will be GC after running the instruction, i.e., in
   * instr.GCCheckVars.
   * 2. The stream which initializes this tensor is different from the stream
   * which the instruction run in.
   * 3. The tensor is the instruction's input, cause we assume that instruction
   * will initialize all output tensors with its running stream.
   * 4. In the OP function of this instruction, the tensor is an input of a
   * async CUDA kernel.
   *
   * Here we only process the first condition, because:
   * 1. Since the RecordStream function will directly return when the recored
   * stream is equal to the owning stream, recording a stream same as which
   * initialized this tensor has less time overhead. Conversely, it may take
   * more time if we try to extract those cross-stream input vars from
   * instr.GCCheckVars.
   * 2. Now the instruction has no idea of which vars involving async running in
   * OP function, and thus we can not recognize condition 4. It should be
   * supported later.
   */
  for (int var_id : instr.GCCheckVars()) {
    VLOG(4) << "GC sync " << global_scope_->GetNameById(var_id) << " "
            << global_scope_->VarDesc(var_id);

    // persistable var will be ignore while GC
    if (global_scope_->VarDesc(var_id) &&
        global_scope_->VarDesc(var_id)->Persistable()) {
      continue;
    }

    paddle::framework::Variable* var = global_scope_->Var(var_id);
    if (var == nullptr) {
      continue;
    }

    if (var->IsType<LoDTensor>()) {
      TensorRecordStream(*(var->GetMutable<LoDTensor>()));
    } else if (var->IsType<
                   operators::reader::
                       OrderedMultiDeviceLoDTensorBlockingQueueHolder>()) {
      // do nothing
764
    } else if (var->IsType<phi::SelectedRows>()) {
765
      TensorRecordStream(
766
          *(var->GetMutable<phi::SelectedRows>()->mutable_value()));
767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782
    } else if (var->IsType<LoDTensorArray>()) {
      auto* tensor_arr = var->GetMutable<LoDTensorArray>();
      for (auto& tensor : *tensor_arr) {
        TensorRecordStream(tensor);
      }
    } else if (var->IsType<std::vector<Scope*>>()) {
      // do nothing
    } else {
      PADDLE_THROW(platform::errors::Unimplemented(
          "The variable(%s) is not supported in eager deletion.",
          framework::ToTypeName(var->Type())));
    }
  }
}
#endif

783 784 785
void InterpreterCore::CheckGC(
    const Instruction& instr,
    std::vector<std::atomic<size_t>>* atomic_var_ref) {
786
  size_t instr_id = instr.Id();
787 788
  auto& var_scope = *global_scope_;

789
  for (auto var_id : instr.GCCheckVars()) {
L
Leo Chen 已提交
790 791
    VLOG(4) << "GC " << global_scope_->GetNameById(var_id) << " "
            << var_scope.VarDesc(var_id);
792 793
    VLOG(4) << "atomic:" << atomic_var_ref << " " << &(*atomic_var_ref)[var_id]
            << " " << var_id;
794
    bool is_ready =
795
        (*atomic_var_ref)[var_id].fetch_sub(1, std::memory_order_relaxed) == 1;
796 797 798 799 800
    // ignore all persistable var while GC
    if (var_scope.VarDesc(var_id) && var_scope.VarDesc(var_id)->Persistable()) {
      continue;
    }
    if (is_ready) {
X
xiongkun 已提交
801 802
      VLOG(6) << "Async delete variable with name : "
              << var_scope.GetNameById(var_id);
803 804 805 806 807 808 809
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
      if (IsInterpretercoreFastGCEnabled()) {
        static_cast<InterpreterCoreFastGarbageCollector*>(gc_.get())->Add(
            var_scope.Var(var_id));

      } else {
        static_cast<InterpreterCoreEventGarbageCollector*>(gc_.get())->Add(
810
            var_scope.Var(var_id), &gc_event_.at(instr_id),
811 812 813 814
            &instr.DeviceContext());
      }
#else
      static_cast<InterpreterCoreEventGarbageCollector*>(gc_.get())->Add(
815
          var_scope.Var(var_id), &gc_event_.at(instr_id),
816 817
          &instr.DeviceContext());
#endif
W
wanghuancoder 已提交
818 819 820 821
    }
  }
}

822 823 824 825 826 827 828 829 830
void InterpreterCore::Prepare(
    const std::vector<std::string>& feed_names,
    const std::vector<framework::LoDTensor>& feed_tensors, bool prepare_feed) {
  PADDLE_ENFORCE_EQ(feed_names.size(), feed_tensors.size(),
                    platform::errors::PreconditionNotMet(
                        "Required feed_names.size() == feed_tensors.size(), "
                        "but received %d != %d",
                        feed_names.size(), feed_tensors.size()));

831
  auto FeedInput = [&] {
832
    VLOG(4) << "Feed inputs";
833 834
    for (size_t i = 0; i < feed_names.size(); ++i) {
      auto* feed_var = global_scope_->FindVar(feed_names[i]);
835 836 837
      PADDLE_ENFORCE_NOT_NULL(
          feed_var, platform::errors::NotFound(
                        "Variable %s should not be nullptr.", feed_names[i]));
838

839
      auto feed_tensor = feed_var->GetMutable<framework::LoDTensor>();
840
      feed_tensor->ShareDataWith(feed_tensors[i]);
841
      feed_tensor->set_lod(feed_tensors[i].lod());
842 843 844
    }
  };

845
  if (!is_build_) {
846 847
    paddle::framework::interpreter::build_variable_scope(block_, global_scope_,
                                                         create_local_scope_);
848
    FeedInput();
L
Leo Chen 已提交
849
    std::vector<paddle::framework::OpFuncNode> op_func_nodes;
850
    paddle::framework::interpreter::build_op_func_list(
851
        place_, block_, &op_func_nodes, global_scope_, create_local_scope_);
852
    is_build_ = true;
853
    SetFeedVarsInplaceSkip(feed_names);
854
    // convert vec func_list to graph
L
Leo Chen 已提交
855
    Convert(&op_func_nodes);
856
  }
W
wanghuancoder 已提交
857 858
  // NOTE: Because feed_tensor will be GC after
  // paddle::framework::build_op_func_list, so we should
859
  // call FeedInput again.
860 861 862
  if (prepare_feed) {
    FeedInput();
  }
863 864
}

865 866
interpreter::CostInfo InterpreterCore::DryRun(
    const std::vector<std::string>& feed_names,
867
    const std::vector<framework::LoDTensor>& feed_tensors) {
868
  global_scope_->SetLocalScope(local_scope_);
869 870 871 872 873 874 875 876
  Prepare(feed_names, feed_tensors, true);
  interpreter::CostInfo cost_info;
  {
    interpreter::ProfilerGuard(place_, &cost_info);
    ExecuteInstructionList(vec_instruction_);
    platform::DeviceContextPool::Instance().Get(place_)->Wait();
  }

877 878 879 880
  if (create_local_scope_) {
    ClearLoDTensorArrayInLocalScope();
  }

881
  return cost_info;
882
}
883

884 885 886 887 888 889 890
void InterpreterCore::SetFeedVarsInplaceSkip(
    const std::vector<std::string>& feed_names) {
  for (auto& feed_name : feed_names) {
    global_scope_->SetVarSikpInplace(feed_name, true);
  }
}

891 892
}  // namespace framework
}  // namespace paddle