interpretercore.cc 18.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
14

15
#include "paddle/fluid/framework/new_executor/interpretercore.h"
16
#include <unordered_set>
17
#include "paddle/fluid/framework/details/nan_inf_utils.h"
18
#include "paddle/fluid/framework/details/share_tensor_buffer_functor.h"
19 20
#include "paddle/fluid/framework/new_executor/interpretercore_util.h"
#include "paddle/fluid/framework/operator.h"
21
#include "paddle/fluid/platform/profiler.h"
22

23 24
PADDLE_DEFINE_EXPORTED_bool(new_executor_use_inplace, true,
                            "Use inplace in new executor");
25

26
DECLARE_bool(check_nan_inf);
27
DECLARE_bool(benchmark);
28

29 30
constexpr const char* kExceptionCaught = "ExceptionCaught";

31 32
namespace paddle {
namespace framework {
33 34
// NOTE(Aurelius84): Need a better strategy to determine it.
static constexpr size_t kHostNumThreads = 4;
35

36 37 38
InterpreterCore::InterpreterCore(const platform::Place& place,
                                 const BlockDesc& block,
                                 VariableScope* global_scope)
W
wanghuancoder 已提交
39
    : place_(place),
40
      block_(block),
W
wanghuancoder 已提交
41
      global_scope_(global_scope),
42
      stream_analyzer_(place) {
43
  is_build_ = false;
44 45 46
  async_work_queue_.reset(
      new interpreter::AsyncWorkQueue(kHostNumThreads, &main_thread_blocker_));
  gc_.reset(new InterpreterCoreGarbageCollector());
W
wanghuancoder 已提交
47

48 49 50
  exception_notifier_ = main_thread_blocker_.RegisterEvent(
      kExceptionCaught, [this]() { return exception_holder_.IsCaught(); });

W
wanghuancoder 已提交
51 52 53 54 55 56 57
  // prune

  // optmize graph pass

  // convert to run graph
}

58 59 60 61 62 63 64
InterpreterCore::~InterpreterCore() {
  // cancle gc's thread
  gc_.reset(nullptr);

  async_work_queue_.reset(nullptr);
}

W
wanghuancoder 已提交
65
paddle::framework::FetchList InterpreterCore::Run(
66
    const std::vector<std::string>& feed_names,
67
    const std::vector<framework::LoDTensor>& feed_tensors) {
68 69
  bool is_build = is_build_;
  Prepare(feed_names, feed_tensors, is_build);
70

71
  if (is_build) {
72
    ExecuteInstructionList(vec_instruction_);
73 74
  }

W
wanghuancoder 已提交
75
  // return Fetch Tensors
76
  auto* fetch_var = global_scope_->Var(interpreter::kFetchVarName);
77
  return *(fetch_var->GetMutable<framework::FetchList>());
78 79
}

X
xiongkun 已提交
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
void InterpreterCore::BuildOperatorDependences() {
  // analysis the dependences between ops, set the dependecy_count_ and Call
  // Schedule
  auto op_nums = vec_instruction_.size();
  dependecy_count_.resize(op_nums);
  auto op2downstream = interpreter::build_op_downstream_map(vec_instruction_);
  for (size_t op = 0; op < vec_instruction_.size(); ++op) {
    auto op_list = op2downstream[op];
    std::vector<size_t> downsteam_vector(op_list.begin(), op_list.end());
    stream_analyzer_.Schedule(downsteam_vector, &vec_instruction_, op);

    for (auto inst_id : op_list) {
      dependecy_count_[inst_id]++;
    }
  }
}

L
Leo Chen 已提交
97 98
void InterpreterCore::Convert(
    std::vector<paddle::framework::OpFuncNode>* op_func_nodes) {
99
  auto& vec_meta_info = global_scope_->MutableVecMetaInfo();
100 101
  auto var_nums = global_scope_->VarSize();
  input_var2op_info_.resize(var_nums);
L
Leo Chen 已提交
102
  auto nodes = *op_func_nodes;
103

L
Leo Chen 已提交
104
  auto op_nums = nodes.size();
105 106 107
  vec_instruction_.reserve(op_nums);

  for (size_t op_idx = 0; op_idx < op_nums; ++op_idx) {
L
Leo Chen 已提交
108
    auto& op_func_node = nodes[op_idx];
109
    auto* dev_ctx_ = stream_analyzer_.ParseDeviceContext(op_func_node);
W
wanghuancoder 已提交
110

L
Leo Chen 已提交
111
    vec_instruction_.emplace_back(op_idx, std::move(op_func_node), *dev_ctx_);
112 113 114
    auto& instr = vec_instruction_.back();

    OpInOutInfo info;
115
    std::vector<size_t> gc_check_input_list;
116 117

    for (auto& item : op_func_node.input_index) {
118
      for (auto id : item.second) {
W
wanghuancoder 已提交
119 120 121
        if (id == kEmptyVarIndex) {
          continue;
        }
122
        input_var2op_info_.at(id).push_back(op_idx);
W
wanghuancoder 已提交
123 124
        // var can be gc-ed
        if (!info.IsBuilt()) {
L
Leo Chen 已提交
125
          info.Build(op_func_node.operator_base_.get());
W
wanghuancoder 已提交
126
        }
127 128 129
        auto* var_desc = global_scope_->VarDesc(id);
        if (var_desc) {
          if (info.IsInArgBufferNeeded(var_desc->Name())) {
W
wanghuancoder 已提交
130 131 132 133 134
            gc_check_input_list.push_back(id);
          }
        } else {
          gc_check_input_list.push_back(id);
        }
135 136 137 138 139 140
      }
    }
    std::sort(gc_check_input_list.begin(), gc_check_input_list.end());
    auto last =
        std::unique(gc_check_input_list.begin(), gc_check_input_list.end());
    gc_check_input_list.erase(last, gc_check_input_list.end());
141

142
    for (auto var_id : gc_check_input_list) {
143
      vec_meta_info[var_id].var_ref_count_++;
144
      instr.AddGCCheckVar(var_id);
L
Leo Chen 已提交
145 146
      VLOG(4) << "clear " << global_scope_->GetNameById(var_id) << " after "
              << instr.OpBase()->Type();
147 148 149
    }
  }

W
wanghuancoder 已提交
150 151
  for (size_t i = 0; i < vec_instruction_.size(); ++i) {
    // checkout ouput
152
    for (auto& item : vec_instruction_[i].Outputs()) {
W
wanghuancoder 已提交
153
      for (auto id : item.second) {
154
        if (input_var2op_info_.at(id).size() == 0) {
W
wanghuancoder 已提交
155
          // output var not be used by any kernel
156
          vec_instruction_[i].AddGCCheckVar(id);
L
Leo Chen 已提交
157 158
          VLOG(4) << "clear " << global_scope_->GetNameById(id) << " after "
                  << vec_instruction_[i].OpBase()->Type();
159
          vec_meta_info[id].var_ref_count_++;
W
wanghuancoder 已提交
160 161 162 163 164
        }
      }
    }
  }

X
xiongkun 已提交
165
  BuildOperatorDependences();
166 167

  for (size_t i = 0; i < vec_instruction_.size(); ++i) {
168
    BuildAndCacheInstructionCtx(&vec_instruction_[i]);
169
  }
W
wanghuancoder 已提交
170

171 172
  BuildSkipShareLoDInfo();

W
wanghuancoder 已提交
173
  for (size_t i = 0; i < vec_instruction_.size(); ++i) {
174
    gc_event_.emplace_back(vec_instruction_[i].DeviceContext().GetPlace(),
W
wanghuancoder 已提交
175 176
                           platform::GenerateDeviceEventFlag());
  }
177 178 179 180 181 182

  if (FLAGS_new_executor_use_inplace) {
    BuildInplace();
  }
}

183
bool InterpreterCore::BuildInplaceCheckVarIsOnlyInput(size_t var_index) {
184 185
  if (!global_scope_->VarDesc(var_index)) {
    return input_var2op_info_.at(var_index).size() == 1;
186 187
  } else {
    int is_input_cnt = 0;
188
    for (auto inst_id : input_var2op_info_.at(var_index)) {
189
      OpInOutInfo info;
190 191
      info.Build(vec_instruction_.at(inst_id).OpBase());
      if (info.IsInArgBufferNeeded(global_scope_->VarDesc(var_index)->Name())) {
192 193 194 195 196 197 198
        is_input_cnt++;
      }
    }
    return is_input_cnt == 1;
  }
}

199 200
void InterpreterCore::BuildInplace() {
  for (size_t i = 0; i < vec_instruction_.size(); ++i) {
201 202 203
    auto& instr = vec_instruction_[i];
    auto* op_base = instr.OpBase();
    if (!op_base->Info().infer_inplace_) {
204 205 206
      continue;
    }

207 208
    auto in_to_outs = op_base->Info().infer_inplace_(
        platform::is_gpu_place(instr.DeviceContext().GetPlace()));
209

210 211
    auto& inputs = instr.Inputs();
    auto& outputs = instr.Outputs();
212
    for (auto& pair : in_to_outs) {
213
      auto iter = inputs.find(pair.first);
214
      if (iter != inputs.end() && !iter->second.empty()) {
215
        if (BuildInplaceCheckVarIsOnlyInput(iter->second[0])) {
216
          auto iterout = outputs.find(pair.second);
217
          if (iterout != outputs.end() && !iterout->second.empty()) {
218 219
            auto invar = global_scope_->Var(iter->second[0]);
            auto outvar = global_scope_->Var(iterout->second[0]);
220 221
            if (invar && outvar && invar->IsType<LoDTensor>() &&
                outvar->IsType<LoDTensor>()) {
222
              instr.AddInplace(invar, outvar);
223 224
              VLOG(3) << "inplace " << vec_instruction_[i].OpBase()->Type()
                      << " " << global_scope_->GetNameById(iter->second[0])
225
                      << " -> "
226
                      << global_scope_->GetNameById(iterout->second[0])
227
                      << std::endl;
228 229 230 231 232 233
            }
          }
        }
      }
    }
  }
234 235
}

236
void InterpreterCore::BuildAndCacheInstructionCtx(Instruction* instr_node) {
237
  VariableValueMap ins_map;
238
  for (auto& var_name_item : instr_node->Inputs()) {
239 240 241 242
    std::vector<Variable*> input_vars;

    input_vars.reserve(var_name_item.second.size());
    for (auto& id : var_name_item.second) {
243
      input_vars.emplace_back(global_scope_->Var(id));
244 245 246 247 248
    }
    ins_map.emplace(var_name_item.first, std::move(input_vars));
  }

  VariableValueMap outs_map;
249
  for (auto& var_name_item : instr_node->Outputs()) {
250 251 252 253
    std::vector<Variable*> out_vars;

    out_vars.reserve(var_name_item.second.size());
    for (auto& id : var_name_item.second) {
254
      out_vars.emplace_back(global_scope_->Var(id));
255 256 257
    }
    outs_map.emplace(var_name_item.first, std::move(out_vars));
  }
258 259
  // set runtime_ctx and infershape_ctx_
  instr_node->ResetContext(ins_map, outs_map);
260 261
}

262 263 264
void InterpreterCore::BuildSkipShareLoDInfo() {
  for (size_t i = 0; i < vec_instruction_.size(); ++i) {
    bool can_skip_lod = true;
265
    for (auto& input : vec_instruction_[i].InnerRuntimeContext()->inputs) {
266 267 268 269 270 271 272 273 274 275 276 277
      for (auto& var : input.second) {
        if (var->IsType<LoDTensor>()) {
          if (var->Get<LoDTensor>().lod().size() != 0) {
            can_skip_lod = false;
            break;
          }
        } else {
          can_skip_lod = false;
          break;
        }
      }
    }
278
    vec_instruction_[i].InnerInferShapeContext()->SetSkipLoD(can_skip_lod);
279 280 281
  }
}

282
void InterpreterCore::RunInstruction(const Instruction& instr_node) {
283 284
  auto* op = instr_node.OpBase();
  auto place = instr_node.DeviceContext().GetPlace();
X
xiongkun 已提交
285
  VLOG(4) << "Start run" << place << " " << op->DebugStringEx(global_scope_);
286

287
  auto op_with_kernel = dynamic_cast<const framework::OperatorWithKernel*>(op);
288 289
  {
    platform::RecordEvent infershape_event("InferShape");
290 291 292
    // If it is OperatorBase, InferShape do nothing.
    if (op_with_kernel != nullptr)
      op_with_kernel->InferShape(instr_node.InnerInferShapeContext().get());
293
  }
294

295 296 297 298
  if (op_with_kernel != nullptr &&
      FLAGS_new_executor_use_inplace) {  // TODO(xiongkun03) Does operator
                                         // base support
                                         // inplace ?
299
    for (auto& pair : instr_node.InplaceInfo()) {
300 301 302 303 304 305 306 307
      const auto& in = paddle::framework::details::GetTensorFromVar(pair.first);
      auto* out =
          paddle::framework::details::GetMutableTensorFromVar(pair.second);
      if (in.dims() == out->dims()) {
        out->ShareBufferWith(in);
      }
    }
  }
308 309
  {
    platform::RecordEvent compute_event("Compute");
310 311 312 313
    if (op_with_kernel == nullptr)
      instr_node.OpBase()->Run(*global_scope_->GetScope(), place_);
    else
      instr_node.KernelFunc()(*instr_node.InnerExecutionContext().get());
314
  }
315

X
xiongkun 已提交
316
  VLOG(4) << "End run" << place << " " << op->DebugStringEx(global_scope_);
317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332

  /*For profiling/benchmark only*/
  if (FLAGS_benchmark) {
    instr_node.DeviceContext().Wait();
#if defined(PADDLE_WITH_CUDA)
    PADDLE_ENFORCE_CUDA_SUCCESS(cudaGetLastError());
    VLOG(4) << "Operator(" << op->Type()
            << "): context wait and get last error";
#endif
#if defined(PADDLE_WITH_HIP)
    PADDLE_ENFORCE_CUDA_SUCCESS(hipGetLastError());
    VLOG(4) << "Operator(" << op->Type()
            << "): context wait and get last error";
#endif
  }

333 334 335
  // for debug nan/inf
  if (FLAGS_check_nan_inf) {
    VLOG(4) << "Check nan/inf";
336 337 338
    framework::details::CheckOpHasNanOrInf(
        *op, *global_scope_,
        place);  // TODO(xiongkun03) change it to inner scope.
339
  }
340 341 342
}

void InterpreterCore::ExecuteInstructionList(
343
    const std::vector<Instruction>& vec_instr) {
344
  async_work_queue_->PrepareAtomicDeps(dependecy_count_);
345
  async_work_queue_->PrepareAtomicVarRef(global_scope_->VecMetaInfo());
346
  op_run_number_ = 0;
347

348 349
  exception_holder_.Clear();

350 351
  for (size_t i = 0; i < dependecy_count_.size(); ++i) {
    if (dependecy_count_[i] == 0) {
352 353
      async_work_queue_->AddTask(vec_instr.at(i).KernelType(),
                                 [&, i] { RunInstructionAsync(i); });
354 355 356
    }
  }

L
liutiexing 已提交
357 358
  auto event_id = main_thread_blocker_.WaitEvent();
  VLOG(3) << "event_id " << event_id;
359

360 361 362 363 364
  if (UNLIKELY(exception_holder_.IsCaught())) {
    VLOG(4) << "Exception caught " << exception_holder_.Type();
    exception_holder_.ReThrow();
  }

365
  PADDLE_ENFORCE_EQ(
366
      op_run_number_.load(), vec_instr.size(),
367 368
      platform::errors::Fatal(
          "Required op_run_number == %d, but received op_run_number = %d.",
369
          vec_instr.size(), op_run_number_.load()));
370
}
371

L
liutiexing 已提交
372 373
void InterpreterCore::RunNextInstructions(
    const Instruction& instr, std::queue<size_t>* reserved_next_ops) {
374
  auto& next_instr = instr.NextInstructions();
375
  auto& atomic_deps = async_work_queue_->AtomicDeps();
376 377 378 379
  auto IsReady = [&](size_t next_id) {
    return atomic_deps[next_id]->fetch_sub(1, std::memory_order_relaxed) == 1;
  };

380
  if (instr.KernelType() == OpFuncType::kQueueAsync) {
381
    // move all sync_ops into other threads
382
    for (auto next_id : next_instr.SyncRunIds()) {
383
      if (IsReady(next_id)) {
384
        async_work_queue_->AddTask(
385
            vec_instruction_[next_id].KernelType(),
386 387 388 389
            [&, next_id] { RunInstructionAsync(next_id); });
      }
    }
    // keep all async_ops running in current thread
390
    for (auto next_id : next_instr.DirectRunIds()) {
391
      if (IsReady(next_id)) {
L
liutiexing 已提交
392
        reserved_next_ops->push(next_id);
393 394
      }
    }
395
    for (auto next_id : next_instr.EventRunIds()) {
396
      if (IsReady(next_id)) {
L
liutiexing 已提交
397
        reserved_next_ops->push(next_id);
398 399 400 401
      }
    }
  } else {
    // move async_ops into async_thread
402
    for (auto next_id : next_instr.EventRunIds()) {
403
      if (IsReady(next_id)) {
404
        async_work_queue_->AddTask(
405
            vec_instruction_[next_id].KernelType(),
406 407 408
            [&, next_id] { RunInstructionAsync(next_id); });
      }
    }
409 410
    auto direct_run_ops = interpreter::merge_vector(next_instr.SyncRunIds(),
                                                    next_instr.DirectRunIds());
411 412
    size_t first_op = 0;
    for (auto next_id : direct_run_ops) {
413 414
      if (IsReady(next_id)) {
        // only keep one op running in current thread
415 416
        if (first_op == 0) {
          first_op = next_id;
417 418 419
          continue;
        }
        // move rest ops into other threads
420
        async_work_queue_->AddTask(
421
            vec_instruction_[next_id].KernelType(),
422 423 424
            [&, next_id] { RunInstructionAsync(next_id); });
      }
    }
L
liutiexing 已提交
425
    if (first_op != 0) reserved_next_ops->push(first_op);
426 427 428 429
  }
}

void InterpreterCore::RunInstructionAsync(size_t instr_id) {
L
liutiexing 已提交
430 431 432 433 434
  std::queue<size_t> ready_ops;
  ready_ops.push(instr_id);
  while (!ready_ops.empty()) {
    instr_id = ready_ops.front();
    ready_ops.pop();
435 436
    auto& instr_node = vec_instruction_.at(instr_id);
    auto* op = instr_node.OpBase();
437
    platform::RecordEvent instruction_event(op->Type());
438
    interpreter::WaitEvent(instr_node, place_);
439

440 441
    try {
      RunInstruction(instr_node);
L
Leo Chen 已提交
442 443
      // GC infomation
      CheckGC(instr_node);
444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465
    } catch (platform::EnforceNotMet& ex) {
      framework::InsertCallStackInfo(op->Type(), op->Attrs(), &ex);
      exception_holder_.Catch(std::make_exception_ptr(std::move(ex)));
    } catch (platform::EOFException&) {
      exception_holder_.Catch(std::current_exception());
    } catch (std::exception& ex) {
      LOG(WARNING) << op->Type() << " raises an exception "
                   << platform::demangle(typeid(ex).name()) << ", "
                   << ex.what();
      exception_holder_.Catch(std::current_exception());
    } catch (...) {
      LOG(WARNING) << op->Type() << " raises an unknown exception";
      exception_holder_.Catch(std::current_exception());
    }

    if (UNLIKELY(exception_holder_.IsCaught())) {
      VLOG(4) << "Exception caught";
      if (exception_notifier_ != nullptr) {
        exception_notifier_->NotifyEvent();
      }
      return;
    }
466

467
    interpreter::RecordEvent(instr_node, place_);
L
liutiexing 已提交
468
    op_run_number_.fetch_add(1, std::memory_order_relaxed);
469

L
liutiexing 已提交
470 471
    RunNextInstructions(instr_node, &ready_ops);
  }
472 473
}

474 475
void InterpreterCore::CheckGC(const Instruction& instr) {
  size_t instr_id = instr.Id();
476
  auto& var_scope = *global_scope_;
477
  auto& atomic_var_ref = async_work_queue_->AtomicVarRef();
478

479
  for (auto var_id : instr.GCCheckVars()) {
L
Leo Chen 已提交
480 481 482
    VLOG(4) << "GC " << global_scope_->GetNameById(var_id) << " "
            << var_scope.VarDesc(var_id);

483 484
    bool is_ready =
        atomic_var_ref[var_id]->fetch_sub(1, std::memory_order_relaxed) == 1;
485 486 487 488 489
    // ignore all persistable var while GC
    if (var_scope.VarDesc(var_id) && var_scope.VarDesc(var_id)->Persistable()) {
      continue;
    }
    if (is_ready) {
X
xiongkun 已提交
490 491
      VLOG(6) << "Async delete variable with name : "
              << var_scope.GetNameById(var_id);
492 493
      gc_->Add(var_scope.Var(var_id), gc_event_.at(instr_id),
               &instr.DeviceContext());
W
wanghuancoder 已提交
494 495 496 497
    }
  }
}

498 499 500 501 502 503 504 505 506
void InterpreterCore::Prepare(
    const std::vector<std::string>& feed_names,
    const std::vector<framework::LoDTensor>& feed_tensors, bool prepare_feed) {
  PADDLE_ENFORCE_EQ(feed_names.size(), feed_tensors.size(),
                    platform::errors::PreconditionNotMet(
                        "Required feed_names.size() == feed_tensors.size(), "
                        "but received %d != %d",
                        feed_names.size(), feed_tensors.size()));

507
  auto FeedInput = [&] {
508 509
    for (size_t i = 0; i < feed_names.size(); ++i) {
      auto* feed_var = global_scope_->FindVar(feed_names[i]);
510 511
      PADDLE_ENFORCE_NOT_NULL(feed_var, platform::errors::NotFound(
                                            "feed_var shall not be nullptr."));
512

513
      auto feed_tensor = feed_var->GetMutable<framework::LoDTensor>();
514
      feed_tensor->ShareDataWith(feed_tensors[i]);
515
      feed_tensor->set_lod(feed_tensors[i].lod());
516 517 518
    }
  };

519 520
  if (!is_build_) {
    paddle::framework::interpreter::build_variable_scope(block_, global_scope_);
521
    FeedInput();
L
Leo Chen 已提交
522
    std::vector<paddle::framework::OpFuncNode> op_func_nodes;
523
    paddle::framework::interpreter::build_op_func_list(
L
Leo Chen 已提交
524
        place_, block_, &op_func_nodes, global_scope_);
525 526
    is_build_ = true;
    // convert vec func_list to graph
L
Leo Chen 已提交
527
    Convert(&op_func_nodes);
528
  }
W
wanghuancoder 已提交
529 530
  // NOTE: Because feed_tensor will be GC after
  // paddle::framework::build_op_func_list, so we should
531 532
  // call FeedInput again.
  if (prepare_feed) FeedInput();
533 534
}

535 536
interpreter::CostInfo InterpreterCore::DryRun(
    const std::vector<std::string>& feed_names,
537
    const std::vector<framework::LoDTensor>& feed_tensors) {
538 539 540 541 542 543 544 545 546
  Prepare(feed_names, feed_tensors, true);
  interpreter::CostInfo cost_info;
  {
    interpreter::ProfilerGuard(place_, &cost_info);
    ExecuteInstructionList(vec_instruction_);
    platform::DeviceContextPool::Instance().Get(place_)->Wait();
  }

  return cost_info;
547
}
548 549 550

}  // namespace framework
}  // namespace paddle