interpretercore.cc 28.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
14

15
#include "paddle/fluid/framework/new_executor/interpretercore.h"
16
#include <unordered_set>
17
#include "paddle/fluid/framework/details/nan_inf_utils.h"
18
#include "paddle/fluid/framework/details/share_tensor_buffer_functor.h"
19 20
#include "paddle/fluid/framework/new_executor/garbage_collector/event_garbage_collector.h"
#include "paddle/fluid/framework/new_executor/garbage_collector/fast_garbage_collector.h"
21 22
#include "paddle/fluid/framework/new_executor/interpretercore_util.h"
#include "paddle/fluid/framework/operator.h"
L
liutiexing 已提交
23 24
#include "paddle/fluid/platform/os_info.h"
#include "paddle/fluid/platform/profiler/event_tracing.h"
25

26 27
PADDLE_DEFINE_EXPORTED_bool(new_executor_use_inplace, true,
                            "Use inplace in new executor");
28 29 30
PADDLE_DEFINE_EXPORTED_bool(new_executor_use_local_scope, true,
                            "Use local_scope in new executor(especially used "
                            "in UT), can turn off for better performance");
31

32
DECLARE_bool(check_nan_inf);
33
DECLARE_bool(benchmark);
34 35
DECLARE_bool(fast_eager_deletion_mode);
DECLARE_bool(use_stream_safe_cuda_allocator);
36

37
constexpr const char* kExceptionCaught = "ExceptionCaught";
38
constexpr const char* kTaskCompletion = "TaskCompletion";
39

40 41
namespace paddle {
namespace framework {
42 43
// NOTE(Aurelius84): Need a better strategy to determine it.
static constexpr size_t kHostNumThreads = 4;
44

45 46 47 48
bool IsInterpretercoreFastGCEnabled() {
  return FLAGS_fast_eager_deletion_mode && FLAGS_use_stream_safe_cuda_allocator;
}

49 50 51
InterpreterCore::InterpreterCore(const platform::Place& place,
                                 const BlockDesc& block,
                                 VariableScope* global_scope)
W
wanghuancoder 已提交
52
    : place_(place),
53
      block_(block),
W
wanghuancoder 已提交
54
      global_scope_(global_scope),
55
      stream_analyzer_(place) {
56
  is_build_ = false;
57 58
  async_work_queue_.reset(
      new interpreter::AsyncWorkQueue(kHostNumThreads, &main_thread_blocker_));
59 60 61 62 63 64 65 66 67 68

#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
  if (IsInterpretercoreFastGCEnabled()) {
    gc_ = std::make_unique<InterpreterCoreFastGarbageCollector>();
  } else {
    gc_ = std::make_unique<InterpreterCoreEventGarbageCollector>();
  }
#else
  gc_ = std::make_unique<InterpreterCoreEventGarbageCollector>();
#endif
W
wanghuancoder 已提交
69

L
liutiexing 已提交
70
  exception_notifier_ = main_thread_blocker_.RegisterEvent(kExceptionCaught);
71
  completion_notifier_ = main_thread_blocker_.RegisterEvent(kTaskCompletion);
72

73 74 75 76 77 78 79 80
  create_local_scope_ = FLAGS_new_executor_use_local_scope;
  if (FLAGS_new_executor_use_local_scope) {
    auto local_scope = &global_scope->GetMutableScope()->NewScope();
    local_scope->AddListener(global_scope->Listener());
    local_scope_ = local_scope;
  }
  VLOG(4) << "create_local_scope_ is " << create_local_scope_;

W
wanghuancoder 已提交
81 82 83 84 85 86 87
  // prune

  // optmize graph pass

  // convert to run graph
}

88 89 90 91
InterpreterCore::~InterpreterCore() {
  // cancle gc's thread
  gc_.reset(nullptr);

92 93 94
  exception_notifier_->UnregisterEvent();
  completion_notifier_->UnregisterEvent();

95 96 97
  async_work_queue_.reset(nullptr);
}

98 99 100 101
void InterpreterCore::SetCopyProgram(std::shared_ptr<ProgramDesc> prog) {
  copy_program_ = prog;
}

W
wanghuancoder 已提交
102
paddle::framework::FetchList InterpreterCore::Run(
103
    const std::vector<std::string>& feed_names,
104
    const std::vector<framework::LoDTensor>& feed_tensors) {
105
  bool is_build = is_build_;
106
  global_scope_->SetLocalScope(local_scope_);
107
  Prepare(feed_names, feed_tensors, is_build);
108

109
  if (is_build) {
110
    ExecuteInstructionList(vec_instruction_);
111 112
  }

113 114 115 116
  if (create_local_scope_) {
    ClearLoDTensorArrayInLocalScope();
  }

W
wanghuancoder 已提交
117
  // return Fetch Tensors
118
  auto* fetch_var = global_scope_->Var(interpreter::kFetchVarName);
119
  return std::move(*fetch_var->GetMutable<framework::FetchList>());
120 121
}

122 123
paddle::framework::FetchList InterpreterCore::Run(
    const std::vector<std::string>& feed_names) {
124
  if (!is_build_) {
125 126 127 128 129 130 131 132 133 134 135 136 137 138
    if (create_local_scope_ &&
        global_scope_->GetMutableLocalScope() !=
            global_scope_->GetMutableScope() &&
        global_scope_->GetMutableLocalScope()) {
      VLOG(4) << "Clear previous local scope before run";
      VLOG(4) << global_scope_->GetMutableScope() << " "
              << global_scope_->GetMutableLocalScope();
      platform::DeviceContextPool::Instance().Get(place_)->Wait();
      // TODO(zhiqiu): clear the tensor holder of all vars in previous local
      // scope?
    }
    global_scope_->SetLocalScope(local_scope_);
    paddle::framework::interpreter::build_variable_scope(block_, global_scope_,
                                                         create_local_scope_);
139 140
    std::vector<paddle::framework::OpFuncNode> op_func_nodes;
    paddle::framework::interpreter::build_op_func_list(
141
        place_, block_, &op_func_nodes, global_scope_, create_local_scope_);
142
    is_build_ = true;
143
    SetFeedVarsInplaceSkip(feed_names);
144 145
    // convert vec func_list to graph
    Convert(&op_func_nodes);
146

147 148 149 150
  } else {
    ExecuteInstructionList(vec_instruction_);
  }

151 152 153 154
  if (create_local_scope_) {
    ClearLoDTensorArrayInLocalScope();
  }

155 156
  // return Fetch Tensors
  auto* fetch_var = global_scope_->Var(interpreter::kFetchVarName);
157
  return std::move(*fetch_var->GetMutable<framework::FetchList>());
158 159
}

160 161 162 163 164 165 166 167 168 169 170 171 172
// At the end of each step, the holder of Tensor in LoDTensorArray is null.
// Clear these Tensors and leave LoDTensorArray empty, otherwise an exception
// will occur in the next step
void InterpreterCore::ClearLoDTensorArrayInLocalScope() {
  auto vars = local_scope_->LocalVars();
  for (auto var : vars) {
    if (var->IsType<LoDTensorArray>()) {
      auto* lod_tensor_arr = var->GetMutable<LoDTensorArray>();
      lod_tensor_arr->clear();
    }
  }
}

X
xiongkun 已提交
173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189
void InterpreterCore::BuildOperatorDependences() {
  // analysis the dependences between ops, set the dependecy_count_ and Call
  // Schedule
  auto op_nums = vec_instruction_.size();
  dependecy_count_.resize(op_nums);
  auto op2downstream = interpreter::build_op_downstream_map(vec_instruction_);
  for (size_t op = 0; op < vec_instruction_.size(); ++op) {
    auto op_list = op2downstream[op];
    std::vector<size_t> downsteam_vector(op_list.begin(), op_list.end());
    stream_analyzer_.Schedule(downsteam_vector, &vec_instruction_, op);

    for (auto inst_id : op_list) {
      dependecy_count_[inst_id]++;
    }
  }
}

L
Leo Chen 已提交
190 191
void InterpreterCore::Convert(
    std::vector<paddle::framework::OpFuncNode>* op_func_nodes) {
192
  auto& vec_meta_info = global_scope_->MutableVecMetaInfo();
193 194
  auto var_nums = global_scope_->VarSize();
  input_var2op_info_.resize(var_nums);
L
Leo Chen 已提交
195
  auto nodes = *op_func_nodes;
196

L
Leo Chen 已提交
197
  auto op_nums = nodes.size();
198 199 200
  vec_instruction_.reserve(op_nums);

  for (size_t op_idx = 0; op_idx < op_nums; ++op_idx) {
L
Leo Chen 已提交
201
    auto& op_func_node = nodes[op_idx];
202
    auto* dev_ctx_ = stream_analyzer_.ParseDeviceContext(op_func_node);
W
wanghuancoder 已提交
203

L
Leo Chen 已提交
204
    vec_instruction_.emplace_back(op_idx, std::move(op_func_node), *dev_ctx_);
205 206 207
    auto& instr = vec_instruction_.back();

    OpInOutInfo info;
208
    std::vector<size_t> gc_check_input_list;
209 210

    for (auto& item : op_func_node.input_index) {
211
      for (auto id : item.second) {
W
wanghuancoder 已提交
212 213 214
        if (id == kEmptyVarIndex) {
          continue;
        }
215
        input_var2op_info_.at(id).push_back(op_idx);
W
wanghuancoder 已提交
216 217
        // var can be gc-ed
        if (!info.IsBuilt()) {
L
Leo Chen 已提交
218
          info.Build(op_func_node.operator_base_.get());
W
wanghuancoder 已提交
219
        }
220 221 222
        auto* var_desc = global_scope_->VarDesc(id);
        if (var_desc) {
          if (info.IsInArgBufferNeeded(var_desc->Name())) {
W
wanghuancoder 已提交
223 224 225 226 227
            gc_check_input_list.push_back(id);
          }
        } else {
          gc_check_input_list.push_back(id);
        }
228 229 230 231 232 233
      }
    }
    std::sort(gc_check_input_list.begin(), gc_check_input_list.end());
    auto last =
        std::unique(gc_check_input_list.begin(), gc_check_input_list.end());
    gc_check_input_list.erase(last, gc_check_input_list.end());
234

235
    for (auto var_id : gc_check_input_list) {
236
      vec_meta_info[var_id].var_ref_count_++;
237
      instr.AddGCCheckVar(var_id);
L
Leo Chen 已提交
238 239
      VLOG(4) << "clear " << global_scope_->GetNameById(var_id) << " after "
              << instr.OpBase()->Type();
240 241 242
    }
  }

W
wanghuancoder 已提交
243 244
  for (size_t i = 0; i < vec_instruction_.size(); ++i) {
    // checkout ouput
245
    for (auto& item : vec_instruction_[i].Outputs()) {
W
wanghuancoder 已提交
246
      for (auto id : item.second) {
247
        if (input_var2op_info_.at(id).size() == 0) {
W
wanghuancoder 已提交
248
          // output var not be used by any kernel
249
          vec_instruction_[i].AddGCCheckVar(id);
L
Leo Chen 已提交
250 251
          VLOG(4) << "clear " << global_scope_->GetNameById(id) << " after "
                  << vec_instruction_[i].OpBase()->Type();
252
          vec_meta_info[id].var_ref_count_++;
W
wanghuancoder 已提交
253 254 255 256 257
        }
      }
    }
  }

X
xiongkun 已提交
258
  BuildOperatorDependences();
259 260

  for (size_t i = 0; i < vec_instruction_.size(); ++i) {
261
    BuildAndCacheInstructionCtx(&vec_instruction_[i]);
262
  }
W
wanghuancoder 已提交
263

264 265
  BuildSkipShareLoDInfo();

W
wanghuancoder 已提交
266
  for (size_t i = 0; i < vec_instruction_.size(); ++i) {
267
    gc_event_.emplace_back(vec_instruction_[i].DeviceContext().GetPlace(),
W
wanghuancoder 已提交
268 269
                           platform::GenerateDeviceEventFlag());
  }
270 271 272 273 274 275

  if (FLAGS_new_executor_use_inplace) {
    BuildInplace();
  }
}

276
bool InterpreterCore::BuildInplaceCheckVarIsOnlyInput(size_t var_index) {
277 278
  if (!global_scope_->VarDesc(var_index)) {
    return input_var2op_info_.at(var_index).size() == 1;
279 280
  } else {
    int is_input_cnt = 0;
281
    for (auto inst_id : input_var2op_info_.at(var_index)) {
282
      OpInOutInfo info;
283 284
      info.Build(vec_instruction_.at(inst_id).OpBase());
      if (info.IsInArgBufferNeeded(global_scope_->VarDesc(var_index)->Name())) {
285 286 287 288 289 290 291
        is_input_cnt++;
      }
    }
    return is_input_cnt == 1;
  }
}

292 293
void InterpreterCore::BuildInplace() {
  for (size_t i = 0; i < vec_instruction_.size(); ++i) {
294 295 296
    auto& instr = vec_instruction_[i];
    auto* op_base = instr.OpBase();
    if (!op_base->Info().infer_inplace_) {
297 298 299
      continue;
    }

300 301
    auto in_to_outs = op_base->Info().infer_inplace_(
        platform::is_gpu_place(instr.DeviceContext().GetPlace()));
302

303 304
    auto& inputs = instr.Inputs();
    auto& outputs = instr.Outputs();
305
    for (auto& pair : in_to_outs) {
306
      auto iter = inputs.find(pair.first);
307
      if (iter != inputs.end() && !iter->second.empty()) {
308 309 310 311 312 313 314
        auto in_var_desc = global_scope_->VarDesc(iter->second[0]);
        if (in_var_desc && in_var_desc->Persistable()) {
          continue;
        }
        if (global_scope_->GetVarSikpInplace(iter->second[0])) {
          continue;
        }
315
        if (BuildInplaceCheckVarIsOnlyInput(iter->second[0])) {
316
          auto iterout = outputs.find(pair.second);
317
          if (iterout != outputs.end() && !iterout->second.empty()) {
318 319
            auto invar = global_scope_->Var(iter->second[0]);
            auto outvar = global_scope_->Var(iterout->second[0]);
320 321
            if (invar && outvar && invar->IsType<LoDTensor>() &&
                outvar->IsType<LoDTensor>()) {
322
              instr.AddInplace(invar, outvar);
323 324
              VLOG(3) << "inplace " << vec_instruction_[i].OpBase()->Type()
                      << " " << global_scope_->GetNameById(iter->second[0])
325
                      << " -> "
326
                      << global_scope_->GetNameById(iterout->second[0])
327
                      << std::endl;
328 329 330 331 332 333
            }
          }
        }
      }
    }
  }
334 335
}

336
void InterpreterCore::BuildAndCacheInstructionCtx(Instruction* instr_node) {
337
  VariableValueMap ins_map;
338
  for (auto& var_name_item : instr_node->Inputs()) {
339 340 341 342
    std::vector<Variable*> input_vars;

    input_vars.reserve(var_name_item.second.size());
    for (auto& id : var_name_item.second) {
343
      input_vars.emplace_back(global_scope_->Var(id));
344 345 346 347 348
    }
    ins_map.emplace(var_name_item.first, std::move(input_vars));
  }

  VariableValueMap outs_map;
349
  for (auto& var_name_item : instr_node->Outputs()) {
350 351 352 353
    std::vector<Variable*> out_vars;

    out_vars.reserve(var_name_item.second.size());
    for (auto& id : var_name_item.second) {
354
      out_vars.emplace_back(global_scope_->Var(id));
355 356 357
    }
    outs_map.emplace(var_name_item.first, std::move(out_vars));
  }
358 359
  // set runtime_ctx and infershape_ctx_
  instr_node->ResetContext(ins_map, outs_map);
360 361
}

362 363 364
void InterpreterCore::BuildSkipShareLoDInfo() {
  for (size_t i = 0; i < vec_instruction_.size(); ++i) {
    bool can_skip_lod = true;
365
    for (auto& input : vec_instruction_[i].InnerRuntimeContext()->inputs) {
366 367 368 369 370 371 372 373 374 375 376 377
      for (auto& var : input.second) {
        if (var->IsType<LoDTensor>()) {
          if (var->Get<LoDTensor>().lod().size() != 0) {
            can_skip_lod = false;
            break;
          }
        } else {
          can_skip_lod = false;
          break;
        }
      }
    }
378
    vec_instruction_[i].InnerInferShapeContext()->SetSkipLoD(can_skip_lod);
379 380 381
  }
}

382
void InterpreterCore::RunInstruction(const Instruction& instr_node) {
383 384
  auto* op = instr_node.OpBase();
  auto place = instr_node.DeviceContext().GetPlace();
385 386 387 388
  VLOG(4) << "Start run " << place << " " << op->DebugStringEx(global_scope_);
  Scope* local_scope = create_local_scope_
                           ? global_scope_->GetMutableLocalScope()
                           : global_scope_->GetMutableScope();
389
  auto op_with_kernel = dynamic_cast<const framework::OperatorWithKernel*>(op);
390
  {
391 392 393
    platform::RecordEvent infershape_event(
        "InferShape", platform::TracerEventType::OperatorInner, 1,
        platform::EventRole::kInnerOp);
394 395
    // If it is OperatorBase, InferShape do nothing.
    if (op_with_kernel != nullptr)
396 397
      op_with_kernel->Info().infer_shape_(
          instr_node.InnerInferShapeContext().get());
398
  }
399

400 401
  if (op_with_kernel != nullptr &&
      FLAGS_new_executor_use_inplace) {  // TODO(xiongkun03) Does operator
X
xiongkun 已提交
402
                                         // base support inplace ?
403
    for (auto& pair : instr_node.InplaceInfo()) {
404 405 406 407 408 409 410 411
      const auto& in = paddle::framework::details::GetTensorFromVar(pair.first);
      auto* out =
          paddle::framework::details::GetMutableTensorFromVar(pair.second);
      if (in.dims() == out->dims()) {
        out->ShareBufferWith(in);
      }
    }
  }
412
  {
413 414 415
    platform::RecordEvent compute_event(
        "Compute", platform::TracerEventType::OperatorInner, 1,
        platform::EventRole::kInnerOp);
416 417 418
    if (op_with_kernel == nullptr) {
      instr_node.OpBase()->Run(*local_scope, place_);
    } else {
419 420 421
      // fit for phi
      if (instr_node.PhiKernel() && instr_node.PhiKernel()->IsValid()) {
        VLOG(4) << "Run phi kernel: " << op->Type();
422 423
        VLOG(4) << instr_node.InnerRuntimeContext().get() << " "
                << &instr_node.DeviceContext();
424
        phi::KernelContext pt_kernel_context;
425
        op_with_kernel->BuildPhiKernelContext(
426
            *instr_node.InnerRuntimeContext().get(),
427 428
            const_cast<platform::DeviceContext*>(&instr_node.DeviceContext()),
            &pt_kernel_context);
429

430
        (*instr_node.PhiKernel())(&pt_kernel_context);
431 432 433 434

      } else {
        instr_node.KernelFunc()(*instr_node.InnerExecutionContext().get());
      }
435
    }
436
  }
437

438
  VLOG(4) << "End run " << place << " " << op->DebugStringEx(global_scope_);
439 440 441 442

  /*For profiling/benchmark only*/
  if (FLAGS_benchmark) {
    instr_node.DeviceContext().Wait();
443 444
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
    PADDLE_ENFORCE_GPU_SUCCESS(platform::GpuGetLastError());
445 446 447 448 449
    VLOG(4) << "Operator(" << op->Type()
            << "): context wait and get last error";
#endif
  }

450 451 452
  // for debug nan/inf
  if (FLAGS_check_nan_inf) {
    VLOG(4) << "Check nan/inf";
453 454 455
    framework::details::CheckOpHasNanOrInf(
        *op, *global_scope_,
        place);  // TODO(xiongkun03) change it to inner scope.
456
  }
457 458 459
}

void InterpreterCore::ExecuteInstructionList(
460
    const std::vector<Instruction>& vec_instr) {
461
  async_work_queue_->PrepareAtomicDeps(dependecy_count_);
462
  async_work_queue_->PrepareAtomicVarRef(global_scope_->VecMetaInfo());
463
  unfinished_op_numer_ = vec_instr.size();
464

465 466
  exception_holder_.Clear();

467 468
  for (size_t i = 0; i < dependecy_count_.size(); ++i) {
    if (dependecy_count_[i] == 0) {
469 470
      async_work_queue_->AddTask(vec_instr.at(i).KernelType(),
                                 [&, i] { RunInstructionAsync(i); });
471 472 473
    }
  }

474
  auto event_name = main_thread_blocker_.WaitEvent();
475
  VLOG(1) << "event_name: " << event_name;
476

477
  if (UNLIKELY(exception_holder_.IsCaught())) {
478
    VLOG(1) << "Exception caught " << exception_holder_.Type();
479 480 481 482 483
    // Graceful exit when the executor encountered a fatal error.
    // EOF is not a fatal error.
    if (exception_holder_.Type() != "EOF") {
      async_work_queue_->Cancel();
    }
484 485 486 487
    PADDLE_ENFORCE_EQ(
        main_thread_blocker_.Clear(), 0,
        platform::errors::PreconditionNotMet(
            "main_thread_blocker_.Clear() return -1, clear failed"));
488 489
    exception_holder_.ReThrow();
  }
490
}
491

L
liutiexing 已提交
492 493
void InterpreterCore::RunNextInstructions(
    const Instruction& instr, std::queue<size_t>* reserved_next_ops) {
494
  auto& next_instr = instr.NextInstructions();
495
  auto& atomic_deps = async_work_queue_->AtomicDeps();
496 497 498 499
  auto IsReady = [&](size_t next_id) {
    return atomic_deps[next_id]->fetch_sub(1, std::memory_order_relaxed) == 1;
  };

500
  if (instr.KernelType() == OpFuncType::kQueueAsync) {
501
    // move all sync_ops into other threads
502
    for (auto next_id : next_instr.SyncRunIds()) {
503
      if (IsReady(next_id)) {
504
        async_work_queue_->AddTask(
505
            vec_instruction_[next_id].KernelType(),
506 507 508 509
            [&, next_id] { RunInstructionAsync(next_id); });
      }
    }
    // keep all async_ops running in current thread
510
    for (auto next_id : next_instr.DirectRunIds()) {
511
      if (IsReady(next_id)) {
L
liutiexing 已提交
512
        reserved_next_ops->push(next_id);
513 514
      }
    }
515
    for (auto next_id : next_instr.EventRunIds()) {
516
      if (IsReady(next_id)) {
L
liutiexing 已提交
517
        reserved_next_ops->push(next_id);
518 519 520 521
      }
    }
  } else {
    // move async_ops into async_thread
522
    for (auto next_id : next_instr.EventRunIds()) {
523
      if (IsReady(next_id)) {
524
        async_work_queue_->AddTask(
525
            vec_instruction_[next_id].KernelType(),
526 527 528
            [&, next_id] { RunInstructionAsync(next_id); });
      }
    }
529 530
    auto direct_run_ops = interpreter::merge_vector(next_instr.SyncRunIds(),
                                                    next_instr.DirectRunIds());
531 532
    size_t first_op = 0;
    for (auto next_id : direct_run_ops) {
533 534
      if (IsReady(next_id)) {
        // only keep one op running in current thread
535 536
        if (first_op == 0) {
          first_op = next_id;
537 538 539
          continue;
        }
        // move rest ops into other threads
540
        async_work_queue_->AddTask(
541
            vec_instruction_[next_id].KernelType(),
542 543 544
            [&, next_id] { RunInstructionAsync(next_id); });
      }
    }
L
liutiexing 已提交
545
    if (first_op != 0) reserved_next_ops->push(first_op);
546 547 548 549
  }
}

void InterpreterCore::RunInstructionAsync(size_t instr_id) {
L
liutiexing 已提交
550 551 552 553 554
  std::queue<size_t> ready_ops;
  ready_ops.push(instr_id);
  while (!ready_ops.empty()) {
    instr_id = ready_ops.front();
    ready_ops.pop();
555
    auto& instr_node = vec_instruction_.at(instr_id);
L
liutiexing 已提交
556 557 558 559 560 561 562
    VLOG(5) << __func__ << " OP id:" << instr_node.Id()
            << " name:" << instr_node.OpBase()->Type()
            << " type:" << (instr_node.KernelType() == OpFuncType::kQueueSync
                                ? "kQueueSync"
                                : "kQueueAsync")
            << " runs on " << platform::GetCurrentThreadName();

563
    auto* op = instr_node.OpBase();
L
liutiexing 已提交
564
    platform::RecordEvent instruction_event(op->Type().c_str());
565
    interpreter::WaitEvent(instr_node, place_);
566

567 568
    try {
      RunInstruction(instr_node);
569 570 571 572

#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
      RecordStreamForGC(instr_node);
#endif
L
Leo Chen 已提交
573
      CheckGC(instr_node);
574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595
    } catch (platform::EnforceNotMet& ex) {
      framework::InsertCallStackInfo(op->Type(), op->Attrs(), &ex);
      exception_holder_.Catch(std::make_exception_ptr(std::move(ex)));
    } catch (platform::EOFException&) {
      exception_holder_.Catch(std::current_exception());
    } catch (std::exception& ex) {
      LOG(WARNING) << op->Type() << " raises an exception "
                   << platform::demangle(typeid(ex).name()) << ", "
                   << ex.what();
      exception_holder_.Catch(std::current_exception());
    } catch (...) {
      LOG(WARNING) << op->Type() << " raises an unknown exception";
      exception_holder_.Catch(std::current_exception());
    }

    if (UNLIKELY(exception_holder_.IsCaught())) {
      VLOG(4) << "Exception caught";
      if (exception_notifier_ != nullptr) {
        exception_notifier_->NotifyEvent();
      }
      return;
    }
596

597 598 599 600 601 602 603 604
    VLOG(4) << "unfinished_op_numer_: " << unfinished_op_numer_;
    if (UNLIKELY(unfinished_op_numer_.fetch_sub(1, std::memory_order_relaxed) ==
                 1)) {
      if (completion_notifier_ != nullptr) {
        completion_notifier_->NotifyEvent();
      }
    }

605
    interpreter::RecordEvent(instr_node, place_);
606

L
liutiexing 已提交
607 608
    RunNextInstructions(instr_node, &ready_ops);
  }
609 610
}

611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
void InterpreterCore::RecordStreamForGC(const Instruction& instr) {
  if (!IsInterpretercoreFastGCEnabled() ||
      instr.KernelType() != OpFuncType::kQueueAsync) {
    return;
  }

  gpuStream_t stream = reinterpret_cast<const platform::CUDADeviceContext&>(
                           instr.DeviceContext())
                           .stream();
  auto TensorRecordStream = [&stream](Tensor& tensor) {
    auto allocation = tensor.Holder();
    if (allocation == nullptr) {
      return;
    }

    const platform::Place& place = allocation->place();
    if (platform::is_gpu_place(place)) {
      memory::RecordStream(allocation, stream);
    } else if (platform::is_cuda_pinned_place(place)) {
      // TODO(Ruibiao): Here should do something to make sure that the tensor is
      // not freed until the H2D copies done. However, simplely launch a CUDA
      // runtime callback to the H2D stream may lead a high performance
      // overhead. As all the cases we meet in H2D are copies from CPUPlace at
      // present, we just log a WARNING here. A better design is required.
      LOG(WARNING) << "Copy data from a CUDAPinned tensor in an asynchronous "
                      "manner may lead a data inconsistent";
    } else {
      // memory copies involve CPUPlace are always synchronous, so just do
      // nothing here
    }
  };

  /* NOTE(Ruibiao):Cross-stream tensor synchronization is required only when
   * all the following conditions are satisfied:
   * 1. The tensor will be GC after running the instruction, i.e., in
   * instr.GCCheckVars.
   * 2. The stream which initializes this tensor is different from the stream
   * which the instruction run in.
   * 3. The tensor is the instruction's input, cause we assume that instruction
   * will initialize all output tensors with its running stream.
   * 4. In the OP function of this instruction, the tensor is an input of a
   * async CUDA kernel.
   *
   * Here we only process the first condition, because:
   * 1. Since the RecordStream function will directly return when the recored
   * stream is equal to the owning stream, recording a stream same as which
   * initialized this tensor has less time overhead. Conversely, it may take
   * more time if we try to extract those cross-stream input vars from
   * instr.GCCheckVars.
   * 2. Now the instruction has no idea of which vars involving async running in
   * OP function, and thus we can not recognize condition 4. It should be
   * supported later.
   */
  for (int var_id : instr.GCCheckVars()) {
    VLOG(4) << "GC sync " << global_scope_->GetNameById(var_id) << " "
            << global_scope_->VarDesc(var_id);

    // persistable var will be ignore while GC
    if (global_scope_->VarDesc(var_id) &&
        global_scope_->VarDesc(var_id)->Persistable()) {
      continue;
    }

    paddle::framework::Variable* var = global_scope_->Var(var_id);
    if (var == nullptr) {
      continue;
    }

    if (var->IsType<LoDTensor>()) {
      TensorRecordStream(*(var->GetMutable<LoDTensor>()));
    } else if (var->IsType<
                   operators::reader::
                       OrderedMultiDeviceLoDTensorBlockingQueueHolder>()) {
      // do nothing
686
    } else if (var->IsType<phi::SelectedRows>()) {
687
      TensorRecordStream(
688
          *(var->GetMutable<phi::SelectedRows>()->mutable_value()));
689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704
    } else if (var->IsType<LoDTensorArray>()) {
      auto* tensor_arr = var->GetMutable<LoDTensorArray>();
      for (auto& tensor : *tensor_arr) {
        TensorRecordStream(tensor);
      }
    } else if (var->IsType<std::vector<Scope*>>()) {
      // do nothing
    } else {
      PADDLE_THROW(platform::errors::Unimplemented(
          "The variable(%s) is not supported in eager deletion.",
          framework::ToTypeName(var->Type())));
    }
  }
}
#endif

705 706
void InterpreterCore::CheckGC(const Instruction& instr) {
  size_t instr_id = instr.Id();
707
  auto& var_scope = *global_scope_;
708
  auto& atomic_var_ref = async_work_queue_->AtomicVarRef();
709

710
  for (auto var_id : instr.GCCheckVars()) {
L
Leo Chen 已提交
711 712 713
    VLOG(4) << "GC " << global_scope_->GetNameById(var_id) << " "
            << var_scope.VarDesc(var_id);

714 715
    bool is_ready =
        atomic_var_ref[var_id]->fetch_sub(1, std::memory_order_relaxed) == 1;
716 717 718 719 720
    // ignore all persistable var while GC
    if (var_scope.VarDesc(var_id) && var_scope.VarDesc(var_id)->Persistable()) {
      continue;
    }
    if (is_ready) {
X
xiongkun 已提交
721 722
      VLOG(6) << "Async delete variable with name : "
              << var_scope.GetNameById(var_id);
723 724 725 726 727 728 729
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
      if (IsInterpretercoreFastGCEnabled()) {
        static_cast<InterpreterCoreFastGarbageCollector*>(gc_.get())->Add(
            var_scope.Var(var_id));

      } else {
        static_cast<InterpreterCoreEventGarbageCollector*>(gc_.get())->Add(
730
            var_scope.Var(var_id), &gc_event_.at(instr_id),
731 732 733 734
            &instr.DeviceContext());
      }
#else
      static_cast<InterpreterCoreEventGarbageCollector*>(gc_.get())->Add(
735
          var_scope.Var(var_id), &gc_event_.at(instr_id),
736 737
          &instr.DeviceContext());
#endif
W
wanghuancoder 已提交
738 739 740 741
    }
  }
}

742 743 744 745 746 747 748 749 750
void InterpreterCore::Prepare(
    const std::vector<std::string>& feed_names,
    const std::vector<framework::LoDTensor>& feed_tensors, bool prepare_feed) {
  PADDLE_ENFORCE_EQ(feed_names.size(), feed_tensors.size(),
                    platform::errors::PreconditionNotMet(
                        "Required feed_names.size() == feed_tensors.size(), "
                        "but received %d != %d",
                        feed_names.size(), feed_tensors.size()));

751
  auto FeedInput = [&] {
752
    VLOG(4) << "Feed inputs";
753 754
    for (size_t i = 0; i < feed_names.size(); ++i) {
      auto* feed_var = global_scope_->FindVar(feed_names[i]);
755 756 757
      PADDLE_ENFORCE_NOT_NULL(
          feed_var, platform::errors::NotFound(
                        "Variable %s should not be nullptr.", feed_names[i]));
758

759
      auto feed_tensor = feed_var->GetMutable<framework::LoDTensor>();
760
      feed_tensor->ShareDataWith(feed_tensors[i]);
761
      feed_tensor->set_lod(feed_tensors[i].lod());
762 763 764
    }
  };

765
  if (!is_build_) {
766 767
    paddle::framework::interpreter::build_variable_scope(block_, global_scope_,
                                                         create_local_scope_);
768
    FeedInput();
L
Leo Chen 已提交
769
    std::vector<paddle::framework::OpFuncNode> op_func_nodes;
770
    paddle::framework::interpreter::build_op_func_list(
771
        place_, block_, &op_func_nodes, global_scope_, create_local_scope_);
772
    is_build_ = true;
773
    SetFeedVarsInplaceSkip(feed_names);
774
    // convert vec func_list to graph
L
Leo Chen 已提交
775
    Convert(&op_func_nodes);
776
  }
W
wanghuancoder 已提交
777 778
  // NOTE: Because feed_tensor will be GC after
  // paddle::framework::build_op_func_list, so we should
779
  // call FeedInput again.
780 781 782
  if (prepare_feed) {
    FeedInput();
  }
783 784
}

785 786
interpreter::CostInfo InterpreterCore::DryRun(
    const std::vector<std::string>& feed_names,
787
    const std::vector<framework::LoDTensor>& feed_tensors) {
788
  global_scope_->SetLocalScope(local_scope_);
789 790 791 792 793 794 795 796
  Prepare(feed_names, feed_tensors, true);
  interpreter::CostInfo cost_info;
  {
    interpreter::ProfilerGuard(place_, &cost_info);
    ExecuteInstructionList(vec_instruction_);
    platform::DeviceContextPool::Instance().Get(place_)->Wait();
  }

797 798 799 800
  if (create_local_scope_) {
    ClearLoDTensorArrayInLocalScope();
  }

801
  return cost_info;
802
}
803

804 805 806 807 808 809 810
void InterpreterCore::SetFeedVarsInplaceSkip(
    const std::vector<std::string>& feed_names) {
  for (auto& feed_name : feed_names) {
    global_scope_->SetVarSikpInplace(feed_name, true);
  }
}

811 812
}  // namespace framework
}  // namespace paddle