interpretercore.cc 18.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
14

15
#include "paddle/fluid/framework/new_executor/interpretercore.h"
W
wanghuancoder 已提交
16
#include "paddle/fluid/framework/new_executor/interpretercore_util.h"
17

18 19
#include <unordered_set>

20
#include "paddle/fluid/framework/details/share_tensor_buffer_functor.h"
21
#include "paddle/fluid/platform/profiler.h"
22

23 24
PADDLE_DEFINE_EXPORTED_bool(new_executor_use_inplace, true,
                            "Use inplace in new executor");
25

26 27
constexpr const char* kExceptionCaught = "ExceptionCaught";

28 29
namespace paddle {
namespace framework {
30 31
// NOTE(Aurelius84): Need a better strategy to determine it.
static constexpr size_t kHostNumThreads = 4;
32 33 34 35 36 37

InterpreterCore::InterpreterCore(const platform::Place& place,
                                 const ProgramDesc& main_prog,
                                 VariableScope* global_scope,
                                 const std::vector<std::string>& feed_names,
                                 const std::vector<std::string>& fetch_names)
W
wanghuancoder 已提交
38 39 40
    : place_(place),
      main_program_(main_prog),
      global_scope_(global_scope),
41
      stream_analyzer_(place),
L
liutiexing 已提交
42
      async_work_queue_(kHostNumThreads, &main_thread_blocker_) {
43
  is_build_ = false;
W
wanghuancoder 已提交
44

45
  feed_names_ = feed_names;
W
wanghuancoder 已提交
46

47 48 49
  exception_notifier_ = main_thread_blocker_.RegisterEvent(
      kExceptionCaught, [this]() { return exception_holder_.IsCaught(); });

W
wanghuancoder 已提交
50
  // Step1: add feedop and fetchop to main_program
W
wanghuancoder 已提交
51 52 53 54 55 56 57 58 59 60
  AddFetch(fetch_names);

  // prune

  // optmize graph pass

  // convert to run graph
}

void InterpreterCore::AddFetch(const std::vector<std::string>& fetch_names) {
W
wanghuancoder 已提交
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
  auto* fetch_holder = main_program_.MutableBlock(0)->Var("fetch_vars");
  fetch_holder->SetType(proto::VarType::FETCH_LIST);
  fetch_holder->SetPersistable(true);

  int i = 0;
  for (auto& fetch_name : fetch_names) {
    // append fetch op
    auto* op = main_program_.MutableBlock(0)->AppendOp();
    op->SetType("fetch_v2");
    op->SetInput("X", {fetch_name});
    op->SetOutput("Out", {"fetch_vars"});
    op->SetAttr("col", {static_cast<int>(i)});
    op->CheckAttrs();
    i++;
  }
76 77
}

W
wanghuancoder 已提交
78 79
paddle::framework::FetchList InterpreterCore::Run(
    const std::vector<framework::Tensor>& feed_tensors) {
W
wanghuancoder 已提交
80 81 82 83
  auto FeedInput = [&] {
    for (size_t i = 0; i < feed_names_.size(); ++i) {
      auto it = global_scope_->name2id.find(feed_names_[i]);
      assert(it != global_scope_->name2id.end());
84

W
wanghuancoder 已提交
85 86 87 88 89
      auto feed_tensor = global_scope_->var_list[it->second]
                             ->GetMutable<framework::LoDTensor>();
      feed_tensor->ShareDataWith(feed_tensors[i]);
    }
  };
90 91

  if (is_build_ == false) {
W
wanghuancoder 已提交
92 93 94 95 96
    paddle::framework::interpretercore::build_variable_scope(main_program_,
                                                             global_scope_);
    FeedInput();
    paddle::framework::interpretercore::build_op_func_list(
        place_, main_program_, &op_list_, &vec_func_list_, global_scope_);
97 98 99 100
    is_build_ = true;
    // convert vec func_list to graph
    Convert();
  } else {
W
wanghuancoder 已提交
101
    FeedInput();
102
    ExecuteInstructionList(vec_instruction_);
103 104
  }

W
wanghuancoder 已提交
105
  // return Fetch Tensors
W
wanghuancoder 已提交
106 107
  return *(global_scope_->var_list[global_scope_->name2id["fetch_vars"]]
               ->GetMutable<framework::FetchList>());
108 109 110 111 112 113 114 115 116 117
}

void InterpreterCore::Convert() {
  input_var2op_info_.resize(global_scope_->var_list.size());

  vec_instruction_.reserve(vec_func_list_.size());
  dependecy_count_.resize(vec_func_list_.size());
  vec_meta_info_.resize(global_scope_->var_list.size());
  for (size_t i = 0; i < vec_func_list_.size(); ++i) {
    Instruction temp_inst;
118 119
    auto* op_base = op_list_[i];
    temp_inst.dev_ctx_ =
120
        stream_analyzer_.ParseDeviceContext(vec_func_list_[i], *op_base);
121
    temp_inst.kernel_func_.compute_func_ = vec_func_list_[i].kernel_func_;
122
    temp_inst.kernel_func_.operator_base_ = op_base;
123 124
    temp_inst.input_index_ = vec_func_list_[i].input_index;
    temp_inst.output_index_ = vec_func_list_[i].output_index;
125
    temp_inst.type_ = vec_func_list_[i].type_;
126 127
    temp_inst.no_data_transform_index_ =
        vec_func_list_[i].no_data_transform_index;
128

W
wanghuancoder 已提交
129 130
    OpInOutInfo info;

131 132 133 134
    std::vector<size_t> gc_check_input_list;
    for (auto& item : vec_func_list_[i].input_index) {
      for (auto id : item.second) {
        input_var2op_info_[id].push_back(i);
W
wanghuancoder 已提交
135 136 137 138 139 140 141 142 143 144 145 146
        // var can be gc-ed
        if (!info.IsBuilt()) {
          info.Build(op_list_[i]);
        }
        if (global_scope_->vec_meta_info_[id].vardesc_) {
          if (info.IsInArgBufferNeeded(
                  global_scope_->vec_meta_info_[id].vardesc_->Name())) {
            gc_check_input_list.push_back(id);
          }
        } else {
          gc_check_input_list.push_back(id);
        }
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
      }
    }
    std::sort(gc_check_input_list.begin(), gc_check_input_list.end());
    auto last =
        std::unique(gc_check_input_list.begin(), gc_check_input_list.end());
    gc_check_input_list.erase(last, gc_check_input_list.end());
    for (auto var_id : gc_check_input_list) {
      vec_meta_info_[var_id].var_ref_count_++;
    }

    temp_inst.gc_check_var_list.swap(gc_check_input_list);

    vec_instruction_.push_back(temp_inst);
  }

W
wanghuancoder 已提交
162 163 164 165 166 167 168 169 170 171 172 173 174
  for (size_t i = 0; i < vec_instruction_.size(); ++i) {
    // checkout ouput
    for (auto& item : vec_instruction_[i].output_index_) {
      for (auto id : item.second) {
        if (input_var2op_info_[id].size() == 0) {
          // output var not be used by any kernel
          vec_instruction_[i].gc_check_var_list.push_back(id);
          vec_meta_info_[id].var_ref_count_++;
        }
      }
    }
  }

175 176 177 178
  for (size_t i = 0; i < vec_instruction_.size(); ++i) {
    std::vector<size_t> vec_temp;
    for (auto& item : vec_instruction_[i].output_index_) {
      for (auto id : item.second) {
W
wanghuancoder 已提交
179 180
        vec_temp =
            interpretercore::merge_vector(vec_temp, input_var2op_info_[id]);
181 182 183
      }
    }

184 185
    // In Program, op order is a very important information.
    // Op can only add op after it as next as next ops.
186 187 188 189 190 191 192
    std::vector<size_t> filter_next;
    filter_next.reserve(vec_temp.size());
    for (auto item : vec_temp) {
      if (item > i) {
        filter_next.push_back(item);
      }
    }
193

194
    stream_analyzer_.Schedule(filter_next, &vec_instruction_, i);
195 196 197 198 199

    for (auto inst_id : filter_next) {
      dependecy_count_[inst_id]++;
    }
  }
200 201

  for (size_t i = 0; i < vec_instruction_.size(); ++i) {
W
wanghuancoder 已提交
202
    BuildAndCacheInstructionCtx(&vec_instruction_[i], *global_scope_, place_);
203
  }
W
wanghuancoder 已提交
204

205 206
  BuildSkipShareLoDInfo();

W
wanghuancoder 已提交
207 208 209 210
  for (size_t i = 0; i < vec_instruction_.size(); ++i) {
    gc_event_.emplace_back(vec_instruction_[i].execution_ctx_.get()->GetPlace(),
                           platform::GenerateDeviceEventFlag());
  }
211 212 213 214 215 216

  if (FLAGS_new_executor_use_inplace) {
    BuildInplace();
  }
}

217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233
bool InterpreterCore::BuildInplaceCheckVarIsOnlyInput(size_t var_index) {
  if (!global_scope_->vec_meta_info_[var_index].vardesc_) {
    return input_var2op_info_[var_index].size() == 1;
  } else {
    int is_input_cnt = 0;
    for (auto inst_id : input_var2op_info_[var_index]) {
      OpInOutInfo info;
      info.Build(vec_instruction_[inst_id].kernel_func_.operator_base_);
      if (info.IsInArgBufferNeeded(
              global_scope_->vec_meta_info_[var_index].vardesc_->Name())) {
        is_input_cnt++;
      }
    }
    return is_input_cnt == 1;
  }
}

234 235 236 237 238 239 240 241 242 243 244 245 246 247 248
void InterpreterCore::BuildInplace() {
  for (size_t i = 0; i < vec_instruction_.size(); ++i) {
    if (!vec_instruction_[i]
             .kernel_func_.operator_base_->Info()
             .infer_inplace_) {
      continue;
    }

    auto in_to_outs =
        vec_instruction_[i].kernel_func_.operator_base_->Info().infer_inplace_(
            platform::is_gpu_place(vec_instruction_[i].dev_ctx_->GetPlace()));

    for (auto& pair : in_to_outs) {
      auto iter = vec_instruction_[i].input_index_.find(pair.first);
      if (iter != vec_instruction_[i].input_index_.end()) {
249
        if (BuildInplaceCheckVarIsOnlyInput(iter->second[0])) {
250 251 252 253 254 255 256
          auto iterout = vec_instruction_[i].output_index_.find(pair.second);
          if (iterout != vec_instruction_[i].output_index_.end()) {
            auto invar = global_scope_->var_list[iter->second[0]];
            auto outvar = global_scope_->var_list[iterout->second[0]];
            if (invar && outvar) {
              vec_instruction_[i].vec_inplace_in_to_out_.emplace_back(invar,
                                                                      outvar);
257 258 259 260 261 262 263 264 265
              VLOG(3) << "inplace "
                      << vec_instruction_[i].kernel_func_.operator_base_->Type()
                      << " "
                      << global_scope_->vec_meta_info_[iter->second[0]]
                             .vardesc_->Name()
                      << " -> "
                      << global_scope_->vec_meta_info_[iterout->second[0]]
                             .vardesc_->Name()
                      << std::endl;
266 267 268 269 270 271
            }
          }
        }
      }
    }
  }
272 273
}

W
wanghuancoder 已提交
274 275 276
void InterpreterCore::BuildAndCacheInstructionCtx(
    Instruction* instr_node, const VariableScope& var_scope,
    const platform::Place& place) {
277 278
  auto op_base = instr_node->kernel_func_.operator_base_;

279
  VariableValueMap ins_map;
280
  for (auto& var_name_item : instr_node->input_index_) {
281 282 283 284 285 286 287 288 289 290
    std::vector<Variable*> input_vars;

    input_vars.reserve(var_name_item.second.size());
    for (auto& id : var_name_item.second) {
      input_vars.emplace_back(var_scope.var_list[id]);
    }
    ins_map.emplace(var_name_item.first, std::move(input_vars));
  }

  VariableValueMap outs_map;
291
  for (auto& var_name_item : instr_node->output_index_) {
292 293 294 295 296 297 298 299 300
    std::vector<Variable*> out_vars;

    out_vars.reserve(var_name_item.second.size());
    for (auto& id : var_name_item.second) {
      out_vars.emplace_back(var_scope.var_list[id]);
    }
    outs_map.emplace(var_name_item.first, std::move(out_vars));
  }

301 302 303
  instr_node->runtime_ctx_.reset(new RuntimeContext({}, {}));
  instr_node->runtime_ctx_->inputs.swap(ins_map);
  instr_node->runtime_ctx_->outputs.swap(outs_map);
304

305 306
  instr_node->infershape_ctx_.reset(new InterpretercoreInferShapeContext(
      *op_base, *instr_node->runtime_ctx_.get()));
307

308
  auto* dev_ctx = instr_node->dev_ctx_;
309 310
  Scope scope;

311 312 313 314
  instr_node->execution_ctx_.reset(new ExecutionContext(
      *op_base, scope, *dev_ctx, *instr_node->runtime_ctx_.get()));
}

315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334
void InterpreterCore::BuildSkipShareLoDInfo() {
  for (size_t i = 0; i < vec_instruction_.size(); ++i) {
    bool can_skip_lod = true;
    for (auto& input : vec_instruction_[i].runtime_ctx_.get()->inputs) {
      for (auto& var : input.second) {
        if (var->IsType<LoDTensor>()) {
          if (var->Get<LoDTensor>().lod().size() != 0) {
            can_skip_lod = false;
            break;
          }
        } else {
          can_skip_lod = false;
          break;
        }
      }
    }
    vec_instruction_[i].infershape_ctx_.get()->SetSkipLoD(can_skip_lod);
  }
}

335
void InterpreterCore::RunInstruction(const Instruction& instr_node) {
336 337 338
  VLOG(3) << "RunInstruction:  "
          << instr_node.kernel_func_.operator_base_->Type();

339 340 341 342 343 344
  {
    platform::RecordEvent infershape_event("InferShape");
    static_cast<const framework::OperatorWithKernel*>(
        instr_node.kernel_func_.operator_base_)
        ->InferShape(instr_node.infershape_ctx_.get());
  }
345

346 347 348 349 350 351 352 353 354 355
  if (FLAGS_new_executor_use_inplace) {
    for (auto& pair : instr_node.vec_inplace_in_to_out_) {
      const auto& in = paddle::framework::details::GetTensorFromVar(pair.first);
      auto* out =
          paddle::framework::details::GetMutableTensorFromVar(pair.second);
      if (in.dims() == out->dims()) {
        out->ShareBufferWith(in);
      }
    }
  }
356 357 358 359
  {
    platform::RecordEvent compute_event("Compute");
    instr_node.kernel_func_.compute_func_(*instr_node.execution_ctx_.get());
  }
360 361 362
}

void InterpreterCore::ExecuteInstructionList(
363
    const std::vector<Instruction>& vec_instr) {
364 365 366
  async_work_queue_.PrepareAtomicDeps(dependecy_count_);
  async_work_queue_.PrepareAtomicVarRef(vec_meta_info_);
  op_run_number_ = 0;
367

368 369
  exception_holder_.Clear();

370 371
  for (size_t i = 0; i < dependecy_count_.size(); ++i) {
    if (dependecy_count_[i] == 0) {
372 373
      async_work_queue_.AddTask(vec_instr[i].type_,
                                [&, i] { RunInstructionAsync(i); });
374 375 376
    }
  }

L
liutiexing 已提交
377 378
  auto event_id = main_thread_blocker_.WaitEvent();
  VLOG(3) << "event_id " << event_id;
379

380 381 382 383 384
  if (UNLIKELY(exception_holder_.IsCaught())) {
    VLOG(4) << "Exception caught " << exception_holder_.Type();
    exception_holder_.ReThrow();
  }

385
  PADDLE_ENFORCE_EQ(
386
      op_run_number_.load(), vec_instr.size(),
387 388
      platform::errors::Fatal(
          "Required op_run_number == %d, but received op_run_number = %d.",
389
          vec_instr.size(), op_run_number_.load()));
390
}
391

L
liutiexing 已提交
392 393
void InterpreterCore::RunNextInstructions(
    const Instruction& instr, std::queue<size_t>* reserved_next_ops) {
394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411
  auto& next_instr = instr.next_instruction_;
  auto& atomic_deps = async_work_queue_.AtomicDeps();
  auto IsReady = [&](size_t next_id) {
    return atomic_deps[next_id]->fetch_sub(1, std::memory_order_relaxed) == 1;
  };

  if (instr.type_ == OpFuncType::kQueueAsync) {
    // move all sync_ops into other threads
    for (auto next_id : next_instr.synchronize_run_) {
      if (IsReady(next_id)) {
        async_work_queue_.AddTask(
            vec_instruction_[next_id].type_,
            [&, next_id] { RunInstructionAsync(next_id); });
      }
    }
    // keep all async_ops running in current thread
    for (auto next_id : next_instr.direct_run_) {
      if (IsReady(next_id)) {
L
liutiexing 已提交
412
        reserved_next_ops->push(next_id);
413 414 415 416
      }
    }
    for (auto next_id : next_instr.event_wait_run_) {
      if (IsReady(next_id)) {
L
liutiexing 已提交
417
        reserved_next_ops->push(next_id);
418 419 420 421 422 423 424 425 426 427 428
      }
    }
  } else {
    // move async_ops into async_thread
    for (auto next_id : next_instr.event_wait_run_) {
      if (IsReady(next_id)) {
        async_work_queue_.AddTask(
            vec_instruction_[next_id].type_,
            [&, next_id] { RunInstructionAsync(next_id); });
      }
    }
429 430 431 432
    auto direct_run_ops = interpretercore::merge_vector(
        next_instr.synchronize_run_, next_instr.direct_run_);
    size_t first_op = 0;
    for (auto next_id : direct_run_ops) {
433 434
      if (IsReady(next_id)) {
        // only keep one op running in current thread
435 436
        if (first_op == 0) {
          first_op = next_id;
437 438 439 440 441 442 443 444
          continue;
        }
        // move rest ops into other threads
        async_work_queue_.AddTask(
            vec_instruction_[next_id].type_,
            [&, next_id] { RunInstructionAsync(next_id); });
      }
    }
L
liutiexing 已提交
445
    if (first_op != 0) reserved_next_ops->push(first_op);
446 447 448 449
  }
}

void InterpreterCore::RunInstructionAsync(size_t instr_id) {
L
liutiexing 已提交
450 451 452 453 454 455
  std::queue<size_t> ready_ops;
  ready_ops.push(instr_id);
  while (!ready_ops.empty()) {
    instr_id = ready_ops.front();
    ready_ops.pop();
    auto& instr_node = vec_instruction_[instr_id];
456 457
    auto* op = instr_node.kernel_func_.operator_base_;
    platform::RecordEvent instruction_event(op->Type());
L
liutiexing 已提交
458
    event_manager_.WaitEvent(instr_node, place_);
459

460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483
    try {
      RunInstruction(instr_node);
    } catch (platform::EnforceNotMet& ex) {
      framework::InsertCallStackInfo(op->Type(), op->Attrs(), &ex);
      exception_holder_.Catch(std::make_exception_ptr(std::move(ex)));
    } catch (platform::EOFException&) {
      exception_holder_.Catch(std::current_exception());
    } catch (std::exception& ex) {
      LOG(WARNING) << op->Type() << " raises an exception "
                   << platform::demangle(typeid(ex).name()) << ", "
                   << ex.what();
      exception_holder_.Catch(std::current_exception());
    } catch (...) {
      LOG(WARNING) << op->Type() << " raises an unknown exception";
      exception_holder_.Catch(std::current_exception());
    }

    if (UNLIKELY(exception_holder_.IsCaught())) {
      VLOG(4) << "Exception caught";
      if (exception_notifier_ != nullptr) {
        exception_notifier_->NotifyEvent();
      }
      return;
    }
484

L
liutiexing 已提交
485 486
    event_manager_.RecordEvent(instr_node, place_);
    op_run_number_.fetch_add(1, std::memory_order_relaxed);
487

L
liutiexing 已提交
488 489
    // GC infomation
    CheckGC(instr_id, instr_node.gc_check_var_list);
490

L
liutiexing 已提交
491 492
    RunNextInstructions(instr_node, &ready_ops);
  }
493 494
}

W
wanghuancoder 已提交
495
void InterpreterCore::CheckGC(size_t instr_id,
496
                              const std::vector<size_t>& gc_check_list) {
497
  auto& var_scope = *global_scope_;
498
  auto& atomic_var_ref = async_work_queue_.AtomicVarRef();
499

W
wanghuancoder 已提交
500
  for (auto var_id : gc_check_list) {
501 502
    bool is_ready =
        atomic_var_ref[var_id]->fetch_sub(1, std::memory_order_relaxed) == 1;
503 504
    if (is_ready && var_scope.vec_meta_info_[var_id].vardesc_ &&
        !var_scope.vec_meta_info_[var_id].vardesc_->Persistable()) {
W
wanghuancoder 已提交
505 506
      gc_.Add(var_scope.var_list[var_id], gc_event_[instr_id],
              vec_instruction_[instr_id].dev_ctx_);
507 508 509 510
    } else if (is_ready &&
               var_scope.vec_meta_info_[var_id].vardesc_ == nullptr) {
      gc_.Add(var_scope.var_list[var_id], gc_event_[instr_id],
              vec_instruction_[instr_id].dev_ctx_);
W
wanghuancoder 已提交
511 512 513 514
    }
  }
}

W
wanghuancoder 已提交
515
void InterpreterCore::DryRunPrepare(
516 517 518 519 520 521 522 523 524 525 526 527 528
    const std::vector<framework::Tensor>& feed_tensors) {
  auto FeedInput = [&] {
    for (size_t i = 0; i < feed_names_.size(); ++i) {
      auto it = global_scope_->name2id.find(feed_names_[i]);
      assert(it != global_scope_->name2id.end());

      auto feed_tensor = global_scope_->var_list[it->second]
                             ->GetMutable<framework::LoDTensor>();
      feed_tensor->ShareDataWith(feed_tensors[i]);
    }
  };

  if (is_build_ == false) {
W
wanghuancoder 已提交
529 530
    paddle::framework::interpretercore::build_variable_scope(main_program_,
                                                             global_scope_);
531
    FeedInput();
W
wanghuancoder 已提交
532 533
    paddle::framework::interpretercore::build_op_func_list(
        place_, main_program_, &op_list_, &vec_func_list_, global_scope_);
534 535 536 537
    is_build_ = true;
    // convert vec func_list to graph
    Convert();
  }
W
wanghuancoder 已提交
538 539
  // NOTE: Because feed_tensor will be GC after
  // paddle::framework::build_op_func_list, so we should
540 541 542 543 544 545 546
  // call
  // FeedInput again.
  FeedInput();
}

const CostInfo& InterpreterCore::DryRun(
    const std::vector<framework::Tensor>& feed_tensors) {
W
wanghuancoder 已提交
547
  DryRunPrepare(feed_tensors);
548
  // DryRun may be called many times.
W
wanghuancoder 已提交
549 550
  dry_run_profiler_.Reset();
  dry_run_profiler_.Start();
551
  ExecuteInstructionList(vec_instruction_);
552 553
  platform::DeviceContextPool::Instance().Get(place_)->Wait();

W
wanghuancoder 已提交
554 555 556
  dry_run_profiler_.Pause();
  dry_run_profiler_.TotalCUDAAllocatedMemorySize(place_);
  return dry_run_profiler_.GetCostInfo();
557
}
558 559 560

}  // namespace framework
}  // namespace paddle