interpretercore.cc 27.0 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
14

15
#include "paddle/fluid/framework/new_executor/interpretercore.h"
16
#include <unordered_set>
17
#include "paddle/fluid/framework/details/nan_inf_utils.h"
18
#include "paddle/fluid/framework/details/share_tensor_buffer_functor.h"
19
#include "paddle/fluid/framework/new_executor/interpretercore_event_garbage_collector.h"
20 21
#include "paddle/fluid/framework/new_executor/interpretercore_util.h"
#include "paddle/fluid/framework/operator.h"
22
#include "paddle/fluid/platform/profiler.h"
23

24 25 26 27
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
#include "paddle/fluid/framework/new_executor/interpretercore_fast_garbage_collector.h"
#endif

28 29
PADDLE_DEFINE_EXPORTED_bool(new_executor_use_inplace, true,
                            "Use inplace in new executor");
30 31 32
PADDLE_DEFINE_EXPORTED_bool(new_executor_use_local_scope, true,
                            "Use local_scope in new executor(especially used "
                            "in UT), can turn off for better performance");
33

34
DECLARE_bool(check_nan_inf);
35
DECLARE_bool(benchmark);
36 37
DECLARE_bool(fast_eager_deletion_mode);
DECLARE_bool(use_stream_safe_cuda_allocator);
38

39
constexpr const char* kExceptionCaught = "ExceptionCaught";
40
constexpr const char* kTaskCompletion = "TaskCompletion";
41

42 43
namespace paddle {
namespace framework {
44 45
// NOTE(Aurelius84): Need a better strategy to determine it.
static constexpr size_t kHostNumThreads = 4;
46

47 48 49 50
bool IsInterpretercoreFastGCEnabled() {
  return FLAGS_fast_eager_deletion_mode && FLAGS_use_stream_safe_cuda_allocator;
}

51 52 53
InterpreterCore::InterpreterCore(const platform::Place& place,
                                 const BlockDesc& block,
                                 VariableScope* global_scope)
W
wanghuancoder 已提交
54
    : place_(place),
55
      block_(block),
W
wanghuancoder 已提交
56
      global_scope_(global_scope),
57
      stream_analyzer_(place) {
58
  is_build_ = false;
59 60
  async_work_queue_.reset(
      new interpreter::AsyncWorkQueue(kHostNumThreads, &main_thread_blocker_));
61 62 63 64 65 66 67 68 69 70

#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
  if (IsInterpretercoreFastGCEnabled()) {
    gc_ = std::make_unique<InterpreterCoreFastGarbageCollector>();
  } else {
    gc_ = std::make_unique<InterpreterCoreEventGarbageCollector>();
  }
#else
  gc_ = std::make_unique<InterpreterCoreEventGarbageCollector>();
#endif
W
wanghuancoder 已提交
71

L
liutiexing 已提交
72
  exception_notifier_ = main_thread_blocker_.RegisterEvent(kExceptionCaught);
73
  completion_notifier_ = main_thread_blocker_.RegisterEvent(kTaskCompletion);
74

75 76 77 78 79 80 81 82
  create_local_scope_ = FLAGS_new_executor_use_local_scope;
  if (FLAGS_new_executor_use_local_scope) {
    auto local_scope = &global_scope->GetMutableScope()->NewScope();
    local_scope->AddListener(global_scope->Listener());
    local_scope_ = local_scope;
  }
  VLOG(4) << "create_local_scope_ is " << create_local_scope_;

W
wanghuancoder 已提交
83 84 85 86 87 88 89
  // prune

  // optmize graph pass

  // convert to run graph
}

90 91 92 93
InterpreterCore::~InterpreterCore() {
  // cancle gc's thread
  gc_.reset(nullptr);

94 95 96
  exception_notifier_->UnregisterEvent();
  completion_notifier_->UnregisterEvent();

97 98 99
  async_work_queue_.reset(nullptr);
}

100 101 102 103
void InterpreterCore::SetCopyProgram(std::shared_ptr<ProgramDesc> prog) {
  copy_program_ = prog;
}

W
wanghuancoder 已提交
104
paddle::framework::FetchList InterpreterCore::Run(
105
    const std::vector<std::string>& feed_names,
106
    const std::vector<framework::LoDTensor>& feed_tensors) {
107
  bool is_build = is_build_;
108
  global_scope_->SetLocalScope(local_scope_);
109
  Prepare(feed_names, feed_tensors, is_build);
110

111
  if (is_build) {
112
    ExecuteInstructionList(vec_instruction_);
113 114
  }

115 116 117 118
  if (create_local_scope_) {
    ClearLoDTensorArrayInLocalScope();
  }

W
wanghuancoder 已提交
119
  // return Fetch Tensors
120
  auto* fetch_var = global_scope_->Var(interpreter::kFetchVarName);
121
  return std::move(*fetch_var->GetMutable<framework::FetchList>());
122 123
}

124 125
paddle::framework::FetchList InterpreterCore::Run(
    const std::vector<std::string>& feed_names) {
126
  if (!is_build_) {
127 128 129 130 131 132 133 134 135 136 137 138 139 140
    if (create_local_scope_ &&
        global_scope_->GetMutableLocalScope() !=
            global_scope_->GetMutableScope() &&
        global_scope_->GetMutableLocalScope()) {
      VLOG(4) << "Clear previous local scope before run";
      VLOG(4) << global_scope_->GetMutableScope() << " "
              << global_scope_->GetMutableLocalScope();
      platform::DeviceContextPool::Instance().Get(place_)->Wait();
      // TODO(zhiqiu): clear the tensor holder of all vars in previous local
      // scope?
    }
    global_scope_->SetLocalScope(local_scope_);
    paddle::framework::interpreter::build_variable_scope(block_, global_scope_,
                                                         create_local_scope_);
141 142
    std::vector<paddle::framework::OpFuncNode> op_func_nodes;
    paddle::framework::interpreter::build_op_func_list(
143
        place_, block_, &op_func_nodes, global_scope_, create_local_scope_);
144
    is_build_ = true;
145
    SetFeedVarsInplaceSkip(feed_names);
146 147
    // convert vec func_list to graph
    Convert(&op_func_nodes);
148

149 150 151 152
  } else {
    ExecuteInstructionList(vec_instruction_);
  }

153 154 155 156
  if (create_local_scope_) {
    ClearLoDTensorArrayInLocalScope();
  }

157 158
  // return Fetch Tensors
  auto* fetch_var = global_scope_->Var(interpreter::kFetchVarName);
159
  return std::move(*fetch_var->GetMutable<framework::FetchList>());
160 161
}

162 163 164 165 166 167 168 169 170 171 172 173 174
// At the end of each step, the holder of Tensor in LoDTensorArray is null.
// Clear these Tensors and leave LoDTensorArray empty, otherwise an exception
// will occur in the next step
void InterpreterCore::ClearLoDTensorArrayInLocalScope() {
  auto vars = local_scope_->LocalVars();
  for (auto var : vars) {
    if (var->IsType<LoDTensorArray>()) {
      auto* lod_tensor_arr = var->GetMutable<LoDTensorArray>();
      lod_tensor_arr->clear();
    }
  }
}

X
xiongkun 已提交
175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191
void InterpreterCore::BuildOperatorDependences() {
  // analysis the dependences between ops, set the dependecy_count_ and Call
  // Schedule
  auto op_nums = vec_instruction_.size();
  dependecy_count_.resize(op_nums);
  auto op2downstream = interpreter::build_op_downstream_map(vec_instruction_);
  for (size_t op = 0; op < vec_instruction_.size(); ++op) {
    auto op_list = op2downstream[op];
    std::vector<size_t> downsteam_vector(op_list.begin(), op_list.end());
    stream_analyzer_.Schedule(downsteam_vector, &vec_instruction_, op);

    for (auto inst_id : op_list) {
      dependecy_count_[inst_id]++;
    }
  }
}

L
Leo Chen 已提交
192 193
void InterpreterCore::Convert(
    std::vector<paddle::framework::OpFuncNode>* op_func_nodes) {
194
  auto& vec_meta_info = global_scope_->MutableVecMetaInfo();
195 196
  auto var_nums = global_scope_->VarSize();
  input_var2op_info_.resize(var_nums);
L
Leo Chen 已提交
197
  auto nodes = *op_func_nodes;
198

L
Leo Chen 已提交
199
  auto op_nums = nodes.size();
200 201 202
  vec_instruction_.reserve(op_nums);

  for (size_t op_idx = 0; op_idx < op_nums; ++op_idx) {
L
Leo Chen 已提交
203
    auto& op_func_node = nodes[op_idx];
204
    auto* dev_ctx_ = stream_analyzer_.ParseDeviceContext(op_func_node);
W
wanghuancoder 已提交
205

L
Leo Chen 已提交
206
    vec_instruction_.emplace_back(op_idx, std::move(op_func_node), *dev_ctx_);
207 208 209
    auto& instr = vec_instruction_.back();

    OpInOutInfo info;
210
    std::vector<size_t> gc_check_input_list;
211 212

    for (auto& item : op_func_node.input_index) {
213
      for (auto id : item.second) {
W
wanghuancoder 已提交
214 215 216
        if (id == kEmptyVarIndex) {
          continue;
        }
217
        input_var2op_info_.at(id).push_back(op_idx);
W
wanghuancoder 已提交
218 219
        // var can be gc-ed
        if (!info.IsBuilt()) {
L
Leo Chen 已提交
220
          info.Build(op_func_node.operator_base_.get());
W
wanghuancoder 已提交
221
        }
222 223 224
        auto* var_desc = global_scope_->VarDesc(id);
        if (var_desc) {
          if (info.IsInArgBufferNeeded(var_desc->Name())) {
W
wanghuancoder 已提交
225 226 227 228 229
            gc_check_input_list.push_back(id);
          }
        } else {
          gc_check_input_list.push_back(id);
        }
230 231 232 233 234 235
      }
    }
    std::sort(gc_check_input_list.begin(), gc_check_input_list.end());
    auto last =
        std::unique(gc_check_input_list.begin(), gc_check_input_list.end());
    gc_check_input_list.erase(last, gc_check_input_list.end());
236

237
    for (auto var_id : gc_check_input_list) {
238
      vec_meta_info[var_id].var_ref_count_++;
239
      instr.AddGCCheckVar(var_id);
L
Leo Chen 已提交
240 241
      VLOG(4) << "clear " << global_scope_->GetNameById(var_id) << " after "
              << instr.OpBase()->Type();
242 243 244
    }
  }

W
wanghuancoder 已提交
245 246
  for (size_t i = 0; i < vec_instruction_.size(); ++i) {
    // checkout ouput
247
    for (auto& item : vec_instruction_[i].Outputs()) {
W
wanghuancoder 已提交
248
      for (auto id : item.second) {
249
        if (input_var2op_info_.at(id).size() == 0) {
W
wanghuancoder 已提交
250
          // output var not be used by any kernel
251
          vec_instruction_[i].AddGCCheckVar(id);
L
Leo Chen 已提交
252 253
          VLOG(4) << "clear " << global_scope_->GetNameById(id) << " after "
                  << vec_instruction_[i].OpBase()->Type();
254
          vec_meta_info[id].var_ref_count_++;
W
wanghuancoder 已提交
255 256 257 258 259
        }
      }
    }
  }

X
xiongkun 已提交
260
  BuildOperatorDependences();
261 262

  for (size_t i = 0; i < vec_instruction_.size(); ++i) {
263
    BuildAndCacheInstructionCtx(&vec_instruction_[i]);
264
  }
W
wanghuancoder 已提交
265

266 267
  BuildSkipShareLoDInfo();

W
wanghuancoder 已提交
268
  for (size_t i = 0; i < vec_instruction_.size(); ++i) {
269
    gc_event_.emplace_back(vec_instruction_[i].DeviceContext().GetPlace(),
W
wanghuancoder 已提交
270 271
                           platform::GenerateDeviceEventFlag());
  }
272 273 274 275 276 277

  if (FLAGS_new_executor_use_inplace) {
    BuildInplace();
  }
}

278
bool InterpreterCore::BuildInplaceCheckVarIsOnlyInput(size_t var_index) {
279 280
  if (!global_scope_->VarDesc(var_index)) {
    return input_var2op_info_.at(var_index).size() == 1;
281 282
  } else {
    int is_input_cnt = 0;
283
    for (auto inst_id : input_var2op_info_.at(var_index)) {
284
      OpInOutInfo info;
285 286
      info.Build(vec_instruction_.at(inst_id).OpBase());
      if (info.IsInArgBufferNeeded(global_scope_->VarDesc(var_index)->Name())) {
287 288 289 290 291 292 293
        is_input_cnt++;
      }
    }
    return is_input_cnt == 1;
  }
}

294 295
void InterpreterCore::BuildInplace() {
  for (size_t i = 0; i < vec_instruction_.size(); ++i) {
296 297 298
    auto& instr = vec_instruction_[i];
    auto* op_base = instr.OpBase();
    if (!op_base->Info().infer_inplace_) {
299 300 301
      continue;
    }

302 303
    auto in_to_outs = op_base->Info().infer_inplace_(
        platform::is_gpu_place(instr.DeviceContext().GetPlace()));
304

305 306
    auto& inputs = instr.Inputs();
    auto& outputs = instr.Outputs();
307
    for (auto& pair : in_to_outs) {
308
      auto iter = inputs.find(pair.first);
309
      if (iter != inputs.end() && !iter->second.empty()) {
310 311 312 313 314 315 316
        auto in_var_desc = global_scope_->VarDesc(iter->second[0]);
        if (in_var_desc && in_var_desc->Persistable()) {
          continue;
        }
        if (global_scope_->GetVarSikpInplace(iter->second[0])) {
          continue;
        }
317
        if (BuildInplaceCheckVarIsOnlyInput(iter->second[0])) {
318
          auto iterout = outputs.find(pair.second);
319
          if (iterout != outputs.end() && !iterout->second.empty()) {
320 321
            auto invar = global_scope_->Var(iter->second[0]);
            auto outvar = global_scope_->Var(iterout->second[0]);
322 323
            if (invar && outvar && invar->IsType<LoDTensor>() &&
                outvar->IsType<LoDTensor>()) {
324
              instr.AddInplace(invar, outvar);
325 326
              VLOG(3) << "inplace " << vec_instruction_[i].OpBase()->Type()
                      << " " << global_scope_->GetNameById(iter->second[0])
327
                      << " -> "
328
                      << global_scope_->GetNameById(iterout->second[0])
329
                      << std::endl;
330 331 332 333 334 335
            }
          }
        }
      }
    }
  }
336 337
}

338
void InterpreterCore::BuildAndCacheInstructionCtx(Instruction* instr_node) {
339
  VariableValueMap ins_map;
340
  for (auto& var_name_item : instr_node->Inputs()) {
341 342 343 344
    std::vector<Variable*> input_vars;

    input_vars.reserve(var_name_item.second.size());
    for (auto& id : var_name_item.second) {
345
      input_vars.emplace_back(global_scope_->Var(id));
346 347 348 349 350
    }
    ins_map.emplace(var_name_item.first, std::move(input_vars));
  }

  VariableValueMap outs_map;
351
  for (auto& var_name_item : instr_node->Outputs()) {
352 353 354 355
    std::vector<Variable*> out_vars;

    out_vars.reserve(var_name_item.second.size());
    for (auto& id : var_name_item.second) {
356
      out_vars.emplace_back(global_scope_->Var(id));
357 358 359
    }
    outs_map.emplace(var_name_item.first, std::move(out_vars));
  }
360 361
  // set runtime_ctx and infershape_ctx_
  instr_node->ResetContext(ins_map, outs_map);
362 363
}

364 365 366
void InterpreterCore::BuildSkipShareLoDInfo() {
  for (size_t i = 0; i < vec_instruction_.size(); ++i) {
    bool can_skip_lod = true;
367
    for (auto& input : vec_instruction_[i].InnerRuntimeContext()->inputs) {
368 369 370 371 372 373 374 375 376 377 378 379
      for (auto& var : input.second) {
        if (var->IsType<LoDTensor>()) {
          if (var->Get<LoDTensor>().lod().size() != 0) {
            can_skip_lod = false;
            break;
          }
        } else {
          can_skip_lod = false;
          break;
        }
      }
    }
380
    vec_instruction_[i].InnerInferShapeContext()->SetSkipLoD(can_skip_lod);
381 382 383
  }
}

384
void InterpreterCore::RunInstruction(const Instruction& instr_node) {
385 386
  auto* op = instr_node.OpBase();
  auto place = instr_node.DeviceContext().GetPlace();
387 388 389 390
  VLOG(4) << "Start run " << place << " " << op->DebugStringEx(global_scope_);
  Scope* local_scope = create_local_scope_
                           ? global_scope_->GetMutableLocalScope()
                           : global_scope_->GetMutableScope();
391
  auto op_with_kernel = dynamic_cast<const framework::OperatorWithKernel*>(op);
392 393
  {
    platform::RecordEvent infershape_event("InferShape");
394 395 396
    // If it is OperatorBase, InferShape do nothing.
    if (op_with_kernel != nullptr)
      op_with_kernel->InferShape(instr_node.InnerInferShapeContext().get());
397
  }
398

399 400
  if (op_with_kernel != nullptr &&
      FLAGS_new_executor_use_inplace) {  // TODO(xiongkun03) Does operator
X
xiongkun 已提交
401
                                         // base support inplace ?
402
    for (auto& pair : instr_node.InplaceInfo()) {
403 404 405 406 407 408 409 410
      const auto& in = paddle::framework::details::GetTensorFromVar(pair.first);
      auto* out =
          paddle::framework::details::GetMutableTensorFromVar(pair.second);
      if (in.dims() == out->dims()) {
        out->ShareBufferWith(in);
      }
    }
  }
411 412
  {
    platform::RecordEvent compute_event("Compute");
413 414 415
    if (op_with_kernel == nullptr) {
      instr_node.OpBase()->Run(*local_scope, place_);
    } else {
416
      instr_node.KernelFunc()(*instr_node.InnerExecutionContext().get());
417
    }
418
  }
419

420
  VLOG(4) << "End run " << place << " " << op->DebugStringEx(global_scope_);
421 422 423 424

  /*For profiling/benchmark only*/
  if (FLAGS_benchmark) {
    instr_node.DeviceContext().Wait();
425 426
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
    PADDLE_ENFORCE_GPU_SUCCESS(platform::GpuGetLastError());
427 428 429 430 431
    VLOG(4) << "Operator(" << op->Type()
            << "): context wait and get last error";
#endif
  }

432 433 434
  // for debug nan/inf
  if (FLAGS_check_nan_inf) {
    VLOG(4) << "Check nan/inf";
435 436 437
    framework::details::CheckOpHasNanOrInf(
        *op, *global_scope_,
        place);  // TODO(xiongkun03) change it to inner scope.
438
  }
439 440 441
}

void InterpreterCore::ExecuteInstructionList(
442
    const std::vector<Instruction>& vec_instr) {
443
  async_work_queue_->PrepareAtomicDeps(dependecy_count_);
444
  async_work_queue_->PrepareAtomicVarRef(global_scope_->VecMetaInfo());
445
  unfinished_op_numer_ = vec_instr.size();
446

447 448
  exception_holder_.Clear();

449 450
  for (size_t i = 0; i < dependecy_count_.size(); ++i) {
    if (dependecy_count_[i] == 0) {
451 452
      async_work_queue_->AddTask(vec_instr.at(i).KernelType(),
                                 [&, i] { RunInstructionAsync(i); });
453 454 455
    }
  }

456 457
  auto event_name = main_thread_blocker_.WaitEvent();
  VLOG(3) << "event_name: " << event_name;
458

459 460
  if (UNLIKELY(exception_holder_.IsCaught())) {
    VLOG(4) << "Exception caught " << exception_holder_.Type();
461
    async_work_queue_->Cancel();
462 463
    exception_holder_.ReThrow();
  }
464
}
465

L
liutiexing 已提交
466 467
void InterpreterCore::RunNextInstructions(
    const Instruction& instr, std::queue<size_t>* reserved_next_ops) {
468
  auto& next_instr = instr.NextInstructions();
469
  auto& atomic_deps = async_work_queue_->AtomicDeps();
470 471 472 473
  auto IsReady = [&](size_t next_id) {
    return atomic_deps[next_id]->fetch_sub(1, std::memory_order_relaxed) == 1;
  };

474
  if (instr.KernelType() == OpFuncType::kQueueAsync) {
475
    // move all sync_ops into other threads
476
    for (auto next_id : next_instr.SyncRunIds()) {
477
      if (IsReady(next_id)) {
478
        async_work_queue_->AddTask(
479
            vec_instruction_[next_id].KernelType(),
480 481 482 483
            [&, next_id] { RunInstructionAsync(next_id); });
      }
    }
    // keep all async_ops running in current thread
484
    for (auto next_id : next_instr.DirectRunIds()) {
485
      if (IsReady(next_id)) {
L
liutiexing 已提交
486
        reserved_next_ops->push(next_id);
487 488
      }
    }
489
    for (auto next_id : next_instr.EventRunIds()) {
490
      if (IsReady(next_id)) {
L
liutiexing 已提交
491
        reserved_next_ops->push(next_id);
492 493 494 495
      }
    }
  } else {
    // move async_ops into async_thread
496
    for (auto next_id : next_instr.EventRunIds()) {
497
      if (IsReady(next_id)) {
498
        async_work_queue_->AddTask(
499
            vec_instruction_[next_id].KernelType(),
500 501 502
            [&, next_id] { RunInstructionAsync(next_id); });
      }
    }
503 504
    auto direct_run_ops = interpreter::merge_vector(next_instr.SyncRunIds(),
                                                    next_instr.DirectRunIds());
505 506
    size_t first_op = 0;
    for (auto next_id : direct_run_ops) {
507 508
      if (IsReady(next_id)) {
        // only keep one op running in current thread
509 510
        if (first_op == 0) {
          first_op = next_id;
511 512 513
          continue;
        }
        // move rest ops into other threads
514
        async_work_queue_->AddTask(
515
            vec_instruction_[next_id].KernelType(),
516 517 518
            [&, next_id] { RunInstructionAsync(next_id); });
      }
    }
L
liutiexing 已提交
519
    if (first_op != 0) reserved_next_ops->push(first_op);
520 521 522 523
  }
}

void InterpreterCore::RunInstructionAsync(size_t instr_id) {
L
liutiexing 已提交
524 525 526 527 528
  std::queue<size_t> ready_ops;
  ready_ops.push(instr_id);
  while (!ready_ops.empty()) {
    instr_id = ready_ops.front();
    ready_ops.pop();
529 530
    auto& instr_node = vec_instruction_.at(instr_id);
    auto* op = instr_node.OpBase();
L
liutiexing 已提交
531
    platform::RecordEvent instruction_event(op->Type().c_str());
532
    interpreter::WaitEvent(instr_node, place_);
533

534 535
    try {
      RunInstruction(instr_node);
536 537 538 539

#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
      RecordStreamForGC(instr_node);
#endif
L
Leo Chen 已提交
540
      CheckGC(instr_node);
541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562
    } catch (platform::EnforceNotMet& ex) {
      framework::InsertCallStackInfo(op->Type(), op->Attrs(), &ex);
      exception_holder_.Catch(std::make_exception_ptr(std::move(ex)));
    } catch (platform::EOFException&) {
      exception_holder_.Catch(std::current_exception());
    } catch (std::exception& ex) {
      LOG(WARNING) << op->Type() << " raises an exception "
                   << platform::demangle(typeid(ex).name()) << ", "
                   << ex.what();
      exception_holder_.Catch(std::current_exception());
    } catch (...) {
      LOG(WARNING) << op->Type() << " raises an unknown exception";
      exception_holder_.Catch(std::current_exception());
    }

    if (UNLIKELY(exception_holder_.IsCaught())) {
      VLOG(4) << "Exception caught";
      if (exception_notifier_ != nullptr) {
        exception_notifier_->NotifyEvent();
      }
      return;
    }
563

564 565 566 567 568 569 570 571
    VLOG(4) << "unfinished_op_numer_: " << unfinished_op_numer_;
    if (UNLIKELY(unfinished_op_numer_.fetch_sub(1, std::memory_order_relaxed) ==
                 1)) {
      if (completion_notifier_ != nullptr) {
        completion_notifier_->NotifyEvent();
      }
    }

572
    interpreter::RecordEvent(instr_node, place_);
573

L
liutiexing 已提交
574 575
    RunNextInstructions(instr_node, &ready_ops);
  }
576 577
}

578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
void InterpreterCore::RecordStreamForGC(const Instruction& instr) {
  if (!IsInterpretercoreFastGCEnabled() ||
      instr.KernelType() != OpFuncType::kQueueAsync) {
    return;
  }

  gpuStream_t stream = reinterpret_cast<const platform::CUDADeviceContext&>(
                           instr.DeviceContext())
                           .stream();
  auto TensorRecordStream = [&stream](Tensor& tensor) {
    auto allocation = tensor.Holder();
    if (allocation == nullptr) {
      return;
    }

    const platform::Place& place = allocation->place();
    if (platform::is_gpu_place(place)) {
      memory::RecordStream(allocation, stream);
    } else if (platform::is_cuda_pinned_place(place)) {
      // TODO(Ruibiao): Here should do something to make sure that the tensor is
      // not freed until the H2D copies done. However, simplely launch a CUDA
      // runtime callback to the H2D stream may lead a high performance
      // overhead. As all the cases we meet in H2D are copies from CPUPlace at
      // present, we just log a WARNING here. A better design is required.
      LOG(WARNING) << "Copy data from a CUDAPinned tensor in an asynchronous "
                      "manner may lead a data inconsistent";
    } else {
      // memory copies involve CPUPlace are always synchronous, so just do
      // nothing here
    }
  };

  /* NOTE(Ruibiao):Cross-stream tensor synchronization is required only when
   * all the following conditions are satisfied:
   * 1. The tensor will be GC after running the instruction, i.e., in
   * instr.GCCheckVars.
   * 2. The stream which initializes this tensor is different from the stream
   * which the instruction run in.
   * 3. The tensor is the instruction's input, cause we assume that instruction
   * will initialize all output tensors with its running stream.
   * 4. In the OP function of this instruction, the tensor is an input of a
   * async CUDA kernel.
   *
   * Here we only process the first condition, because:
   * 1. Since the RecordStream function will directly return when the recored
   * stream is equal to the owning stream, recording a stream same as which
   * initialized this tensor has less time overhead. Conversely, it may take
   * more time if we try to extract those cross-stream input vars from
   * instr.GCCheckVars.
   * 2. Now the instruction has no idea of which vars involving async running in
   * OP function, and thus we can not recognize condition 4. It should be
   * supported later.
   */
  for (int var_id : instr.GCCheckVars()) {
    VLOG(4) << "GC sync " << global_scope_->GetNameById(var_id) << " "
            << global_scope_->VarDesc(var_id);

    // persistable var will be ignore while GC
    if (global_scope_->VarDesc(var_id) &&
        global_scope_->VarDesc(var_id)->Persistable()) {
      continue;
    }

    paddle::framework::Variable* var = global_scope_->Var(var_id);
    if (var == nullptr) {
      continue;
    }

    if (var->IsType<LoDTensor>()) {
      TensorRecordStream(*(var->GetMutable<LoDTensor>()));
    } else if (var->IsType<
                   operators::reader::
                       OrderedMultiDeviceLoDTensorBlockingQueueHolder>()) {
      // do nothing
    } else if (var->IsType<SelectedRows>()) {
      TensorRecordStream(*(var->GetMutable<SelectedRows>()->mutable_value()));
    } else if (var->IsType<LoDTensorArray>()) {
      auto* tensor_arr = var->GetMutable<LoDTensorArray>();
      for (auto& tensor : *tensor_arr) {
        TensorRecordStream(tensor);
      }
    } else if (var->IsType<std::vector<Scope*>>()) {
      // do nothing
    } else {
      PADDLE_THROW(platform::errors::Unimplemented(
          "The variable(%s) is not supported in eager deletion.",
          framework::ToTypeName(var->Type())));
    }
  }
}
#endif

671 672
void InterpreterCore::CheckGC(const Instruction& instr) {
  size_t instr_id = instr.Id();
673
  auto& var_scope = *global_scope_;
674
  auto& atomic_var_ref = async_work_queue_->AtomicVarRef();
675

676
  for (auto var_id : instr.GCCheckVars()) {
L
Leo Chen 已提交
677 678 679
    VLOG(4) << "GC " << global_scope_->GetNameById(var_id) << " "
            << var_scope.VarDesc(var_id);

680 681
    bool is_ready =
        atomic_var_ref[var_id]->fetch_sub(1, std::memory_order_relaxed) == 1;
682 683 684 685 686
    // ignore all persistable var while GC
    if (var_scope.VarDesc(var_id) && var_scope.VarDesc(var_id)->Persistable()) {
      continue;
    }
    if (is_ready) {
X
xiongkun 已提交
687 688
      VLOG(6) << "Async delete variable with name : "
              << var_scope.GetNameById(var_id);
689 690 691 692 693 694 695 696 697 698 699 700 701 702 703
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
      if (IsInterpretercoreFastGCEnabled()) {
        static_cast<InterpreterCoreFastGarbageCollector*>(gc_.get())->Add(
            var_scope.Var(var_id));

      } else {
        static_cast<InterpreterCoreEventGarbageCollector*>(gc_.get())->Add(
            var_scope.Var(var_id), gc_event_.at(instr_id),
            &instr.DeviceContext());
      }
#else
      static_cast<InterpreterCoreEventGarbageCollector*>(gc_.get())->Add(
          var_scope.Var(var_id), gc_event_.at(instr_id),
          &instr.DeviceContext());
#endif
W
wanghuancoder 已提交
704 705 706 707
    }
  }
}

708 709 710 711 712 713 714 715 716
void InterpreterCore::Prepare(
    const std::vector<std::string>& feed_names,
    const std::vector<framework::LoDTensor>& feed_tensors, bool prepare_feed) {
  PADDLE_ENFORCE_EQ(feed_names.size(), feed_tensors.size(),
                    platform::errors::PreconditionNotMet(
                        "Required feed_names.size() == feed_tensors.size(), "
                        "but received %d != %d",
                        feed_names.size(), feed_tensors.size()));

717
  auto FeedInput = [&] {
718
    VLOG(4) << "Feed inputs";
719 720
    for (size_t i = 0; i < feed_names.size(); ++i) {
      auto* feed_var = global_scope_->FindVar(feed_names[i]);
721 722 723
      PADDLE_ENFORCE_NOT_NULL(
          feed_var, platform::errors::NotFound(
                        "Variable %s should not be nullptr.", feed_names[i]));
724

725
      auto feed_tensor = feed_var->GetMutable<framework::LoDTensor>();
726
      feed_tensor->ShareDataWith(feed_tensors[i]);
727
      feed_tensor->set_lod(feed_tensors[i].lod());
728 729 730
    }
  };

731
  if (!is_build_) {
732 733
    paddle::framework::interpreter::build_variable_scope(block_, global_scope_,
                                                         create_local_scope_);
734
    FeedInput();
L
Leo Chen 已提交
735
    std::vector<paddle::framework::OpFuncNode> op_func_nodes;
736
    paddle::framework::interpreter::build_op_func_list(
737
        place_, block_, &op_func_nodes, global_scope_, create_local_scope_);
738
    is_build_ = true;
739
    SetFeedVarsInplaceSkip(feed_names);
740
    // convert vec func_list to graph
L
Leo Chen 已提交
741
    Convert(&op_func_nodes);
742
  }
W
wanghuancoder 已提交
743 744
  // NOTE: Because feed_tensor will be GC after
  // paddle::framework::build_op_func_list, so we should
745
  // call FeedInput again.
746 747 748
  if (prepare_feed) {
    FeedInput();
  }
749 750
}

751 752
interpreter::CostInfo InterpreterCore::DryRun(
    const std::vector<std::string>& feed_names,
753
    const std::vector<framework::LoDTensor>& feed_tensors) {
754
  global_scope_->SetLocalScope(local_scope_);
755 756 757 758 759 760 761 762
  Prepare(feed_names, feed_tensors, true);
  interpreter::CostInfo cost_info;
  {
    interpreter::ProfilerGuard(place_, &cost_info);
    ExecuteInstructionList(vec_instruction_);
    platform::DeviceContextPool::Instance().Get(place_)->Wait();
  }

763 764 765 766
  if (create_local_scope_) {
    ClearLoDTensorArrayInLocalScope();
  }

767
  return cost_info;
768
}
769

770 771 772 773 774 775 776
void InterpreterCore::SetFeedVarsInplaceSkip(
    const std::vector<std::string>& feed_names) {
  for (auto& feed_name : feed_names) {
    global_scope_->SetVarSikpInplace(feed_name, true);
  }
}

777 778
}  // namespace framework
}  // namespace paddle