interpretercore.cc 14.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
14

15
#include "paddle/fluid/framework/new_executor/interpretercore.h"
W
wanghuancoder 已提交
16
#include "paddle/fluid/framework/new_executor/interpretercore_util.h"
17

18 19
#include <unordered_set>

20 21
#include "paddle/fluid/framework/details/share_tensor_buffer_functor.h"

22 23
PADDLE_DEFINE_EXPORTED_bool(new_executor_use_inplace, true,
                            "Use inplace in new executor");
24

25 26
namespace paddle {
namespace framework {
27 28
// NOTE(Aurelius84): Need a better strategy to determine it.
static constexpr size_t kHostNumThreads = 4;
29 30 31 32 33 34

InterpreterCore::InterpreterCore(const platform::Place& place,
                                 const ProgramDesc& main_prog,
                                 VariableScope* global_scope,
                                 const std::vector<std::string>& feed_names,
                                 const std::vector<std::string>& fetch_names)
W
wanghuancoder 已提交
35 36 37
    : place_(place),
      main_program_(main_prog),
      global_scope_(global_scope),
38 39
      stream_analyzer_(place),
      async_work_queue_(kHostNumThreads) {
40
  is_build_ = false;
W
wanghuancoder 已提交
41

42
  feed_names_ = feed_names;
W
wanghuancoder 已提交
43 44

  // Step1: add feedop and fetchop to main_program
W
wanghuancoder 已提交
45 46 47 48 49 50 51 52 53 54
  AddFetch(fetch_names);

  // prune

  // optmize graph pass

  // convert to run graph
}

void InterpreterCore::AddFetch(const std::vector<std::string>& fetch_names) {
W
wanghuancoder 已提交
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
  auto* fetch_holder = main_program_.MutableBlock(0)->Var("fetch_vars");
  fetch_holder->SetType(proto::VarType::FETCH_LIST);
  fetch_holder->SetPersistable(true);

  int i = 0;
  for (auto& fetch_name : fetch_names) {
    // append fetch op
    auto* op = main_program_.MutableBlock(0)->AppendOp();
    op->SetType("fetch_v2");
    op->SetInput("X", {fetch_name});
    op->SetOutput("Out", {"fetch_vars"});
    op->SetAttr("col", {static_cast<int>(i)});
    op->CheckAttrs();
    i++;
  }
70 71
}

W
wanghuancoder 已提交
72 73
paddle::framework::FetchList InterpreterCore::Run(
    const std::vector<framework::Tensor>& feed_tensors) {
W
wanghuancoder 已提交
74 75 76 77
  auto FeedInput = [&] {
    for (size_t i = 0; i < feed_names_.size(); ++i) {
      auto it = global_scope_->name2id.find(feed_names_[i]);
      assert(it != global_scope_->name2id.end());
78

W
wanghuancoder 已提交
79 80 81 82 83
      auto feed_tensor = global_scope_->var_list[it->second]
                             ->GetMutable<framework::LoDTensor>();
      feed_tensor->ShareDataWith(feed_tensors[i]);
    }
  };
84 85

  if (is_build_ == false) {
W
wanghuancoder 已提交
86 87 88 89 90
    paddle::framework::interpretercore::build_variable_scope(main_program_,
                                                             global_scope_);
    FeedInput();
    paddle::framework::interpretercore::build_op_func_list(
        place_, main_program_, &op_list_, &vec_func_list_, global_scope_);
91 92 93 94
    is_build_ = true;
    // convert vec func_list to graph
    Convert();
  } else {
W
wanghuancoder 已提交
95
    FeedInput();
96
    ExecuteInstructionList(vec_instruction_);
97 98
  }

W
wanghuancoder 已提交
99
  // return Fetch Tensors
W
wanghuancoder 已提交
100 101
  return *(global_scope_->var_list[global_scope_->name2id["fetch_vars"]]
               ->GetMutable<framework::FetchList>());
102 103 104 105 106 107 108 109 110 111
}

void InterpreterCore::Convert() {
  input_var2op_info_.resize(global_scope_->var_list.size());

  vec_instruction_.reserve(vec_func_list_.size());
  dependecy_count_.resize(vec_func_list_.size());
  vec_meta_info_.resize(global_scope_->var_list.size());
  for (size_t i = 0; i < vec_func_list_.size(); ++i) {
    Instruction temp_inst;
112 113
    auto* op_base = op_list_[i];
    temp_inst.dev_ctx_ =
114
        stream_analyzer_.ParseDeviceContext(vec_func_list_[i], *op_base);
115
    temp_inst.kernel_func_.compute_func_ = vec_func_list_[i].kernel_func_;
116
    temp_inst.kernel_func_.operator_base_ = op_base;
117 118
    temp_inst.input_index_ = vec_func_list_[i].input_index;
    temp_inst.output_index_ = vec_func_list_[i].output_index;
119
    temp_inst.type_ = vec_func_list_[i].type_;
120

W
wanghuancoder 已提交
121 122
    OpInOutInfo info;

123 124 125 126
    std::vector<size_t> gc_check_input_list;
    for (auto& item : vec_func_list_[i].input_index) {
      for (auto id : item.second) {
        input_var2op_info_[id].push_back(i);
W
wanghuancoder 已提交
127 128 129 130 131 132 133 134 135 136 137 138
        // var can be gc-ed
        if (!info.IsBuilt()) {
          info.Build(op_list_[i]);
        }
        if (global_scope_->vec_meta_info_[id].vardesc_) {
          if (info.IsInArgBufferNeeded(
                  global_scope_->vec_meta_info_[id].vardesc_->Name())) {
            gc_check_input_list.push_back(id);
          }
        } else {
          gc_check_input_list.push_back(id);
        }
139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
      }
    }
    std::sort(gc_check_input_list.begin(), gc_check_input_list.end());
    auto last =
        std::unique(gc_check_input_list.begin(), gc_check_input_list.end());
    gc_check_input_list.erase(last, gc_check_input_list.end());
    for (auto var_id : gc_check_input_list) {
      vec_meta_info_[var_id].var_ref_count_++;
    }

    temp_inst.gc_check_var_list.swap(gc_check_input_list);

    vec_instruction_.push_back(temp_inst);
  }

W
wanghuancoder 已提交
154 155 156 157 158 159 160 161 162 163 164 165 166
  for (size_t i = 0; i < vec_instruction_.size(); ++i) {
    // checkout ouput
    for (auto& item : vec_instruction_[i].output_index_) {
      for (auto id : item.second) {
        if (input_var2op_info_[id].size() == 0) {
          // output var not be used by any kernel
          vec_instruction_[i].gc_check_var_list.push_back(id);
          vec_meta_info_[id].var_ref_count_++;
        }
      }
    }
  }

167 168 169 170
  for (size_t i = 0; i < vec_instruction_.size(); ++i) {
    std::vector<size_t> vec_temp;
    for (auto& item : vec_instruction_[i].output_index_) {
      for (auto id : item.second) {
W
wanghuancoder 已提交
171 172
        vec_temp =
            interpretercore::merge_vector(vec_temp, input_var2op_info_[id]);
173 174 175
      }
    }

176 177
    // In Program, op order is a very important information.
    // Op can only add op after it as next as next ops.
178 179 180 181 182 183 184
    std::vector<size_t> filter_next;
    filter_next.reserve(vec_temp.size());
    for (auto item : vec_temp) {
      if (item > i) {
        filter_next.push_back(item);
      }
    }
185

186
    stream_analyzer_.Schedule(filter_next, &vec_instruction_, i);
187 188 189 190

    for (auto inst_id : filter_next) {
      dependecy_count_[inst_id]++;
    }
191 192
    vec_instruction_[i].next_instruction_.all_next_ops_ =
        std::move(filter_next);
193
  }
194 195

  for (size_t i = 0; i < vec_instruction_.size(); ++i) {
W
wanghuancoder 已提交
196
    BuildAndCacheInstructionCtx(&vec_instruction_[i], *global_scope_, place_);
197
  }
W
wanghuancoder 已提交
198

199 200
  BuildSkipShareLoDInfo();

W
wanghuancoder 已提交
201 202 203 204
  for (size_t i = 0; i < vec_instruction_.size(); ++i) {
    gc_event_.emplace_back(vec_instruction_[i].execution_ctx_.get()->GetPlace(),
                           platform::GenerateDeviceEventFlag());
  }
205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239

  if (FLAGS_new_executor_use_inplace) {
    BuildInplace();
  }
}

void InterpreterCore::BuildInplace() {
  for (size_t i = 0; i < vec_instruction_.size(); ++i) {
    if (!vec_instruction_[i]
             .kernel_func_.operator_base_->Info()
             .infer_inplace_) {
      continue;
    }

    auto in_to_outs =
        vec_instruction_[i].kernel_func_.operator_base_->Info().infer_inplace_(
            platform::is_gpu_place(vec_instruction_[i].dev_ctx_->GetPlace()));

    for (auto& pair : in_to_outs) {
      auto iter = vec_instruction_[i].input_index_.find(pair.first);
      if (iter != vec_instruction_[i].input_index_.end()) {
        if (input_var2op_info_[iter->second[0]].size() == 1) {
          auto iterout = vec_instruction_[i].output_index_.find(pair.second);
          if (iterout != vec_instruction_[i].output_index_.end()) {
            auto invar = global_scope_->var_list[iter->second[0]];
            auto outvar = global_scope_->var_list[iterout->second[0]];
            if (invar && outvar) {
              vec_instruction_[i].vec_inplace_in_to_out_.emplace_back(invar,
                                                                      outvar);
            }
          }
        }
      }
    }
  }
240 241
}

W
wanghuancoder 已提交
242 243 244
void InterpreterCore::BuildAndCacheInstructionCtx(
    Instruction* instr_node, const VariableScope& var_scope,
    const platform::Place& place) {
245 246
  auto op_base = instr_node->kernel_func_.operator_base_;

247
  VariableValueMap ins_map;
248
  for (auto& var_name_item : instr_node->input_index_) {
249 250 251 252 253 254 255 256 257 258
    std::vector<Variable*> input_vars;

    input_vars.reserve(var_name_item.second.size());
    for (auto& id : var_name_item.second) {
      input_vars.emplace_back(var_scope.var_list[id]);
    }
    ins_map.emplace(var_name_item.first, std::move(input_vars));
  }

  VariableValueMap outs_map;
259
  for (auto& var_name_item : instr_node->output_index_) {
260 261 262 263 264 265 266 267 268
    std::vector<Variable*> out_vars;

    out_vars.reserve(var_name_item.second.size());
    for (auto& id : var_name_item.second) {
      out_vars.emplace_back(var_scope.var_list[id]);
    }
    outs_map.emplace(var_name_item.first, std::move(out_vars));
  }

269 270 271
  instr_node->runtime_ctx_.reset(new RuntimeContext({}, {}));
  instr_node->runtime_ctx_->inputs.swap(ins_map);
  instr_node->runtime_ctx_->outputs.swap(outs_map);
272

273 274
  instr_node->infershape_ctx_.reset(new InterpretercoreInferShapeContext(
      *op_base, *instr_node->runtime_ctx_.get()));
275

276
  auto* dev_ctx = instr_node->dev_ctx_;
277 278
  Scope scope;

279 280 281 282
  instr_node->execution_ctx_.reset(new ExecutionContext(
      *op_base, scope, *dev_ctx, *instr_node->runtime_ctx_.get()));
}

283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302
void InterpreterCore::BuildSkipShareLoDInfo() {
  for (size_t i = 0; i < vec_instruction_.size(); ++i) {
    bool can_skip_lod = true;
    for (auto& input : vec_instruction_[i].runtime_ctx_.get()->inputs) {
      for (auto& var : input.second) {
        if (var->IsType<LoDTensor>()) {
          if (var->Get<LoDTensor>().lod().size() != 0) {
            can_skip_lod = false;
            break;
          }
        } else {
          can_skip_lod = false;
          break;
        }
      }
    }
    vec_instruction_[i].infershape_ctx_.get()->SetSkipLoD(can_skip_lod);
  }
}

303
void InterpreterCore::RunInstruction(const Instruction& instr_node) {
304 305 306
  VLOG(3) << "RunInstruction:  "
          << instr_node.kernel_func_.operator_base_->Type();

307 308 309
  static_cast<const framework::OperatorWithKernel*>(
      instr_node.kernel_func_.operator_base_)
      ->InferShape(instr_node.infershape_ctx_.get());
310

311 312 313 314 315 316 317 318 319 320 321
  if (FLAGS_new_executor_use_inplace) {
    for (auto& pair : instr_node.vec_inplace_in_to_out_) {
      const auto& in = paddle::framework::details::GetTensorFromVar(pair.first);
      auto* out =
          paddle::framework::details::GetMutableTensorFromVar(pair.second);
      if (in.dims() == out->dims()) {
        out->ShareBufferWith(in);
      }
    }
  }

322
  instr_node.kernel_func_.compute_func_(*instr_node.execution_ctx_.get());
323 324 325
}

void InterpreterCore::ExecuteInstructionList(
326
    const std::vector<Instruction>& vec_instr) {
327 328 329 330
  auto atomic_deps = async_work_queue_.PrepareAtomicDeps(dependecy_count_);
  auto atomic_var_ref = async_work_queue_.PrepareAtomicVarRef(vec_meta_info_);
  std::atomic<size_t> op_run_number{0};

331 332
  for (size_t i = 0; i < dependecy_count_.size(); ++i) {
    if (dependecy_count_[i] == 0) {
333 334
      async_work_queue_.AddTask(vec_instr[i].type_, [&, i]() {
        RunInstructionAsync(i, &atomic_deps, &atomic_var_ref, &op_run_number);
335
      });
336 337 338
    }
  }

339
  async_work_queue_.WaitEmpty();
340

341 342 343 344 345 346
  PADDLE_ENFORCE_EQ(
      op_run_number.load(), vec_instr.size(),
      platform::errors::Fatal(
          "Required op_run_number == %d, but received op_run_number = %d.",
          vec_instr.size(), op_run_number.load()));
}
347

348 349 350
void InterpreterCore::RunInstructionAsync(size_t instr_id,
                                          AtomicVectorSizeT* atomic_deps,
                                          AtomicVectorSizeT* atomic_var_ref,
351
                                          std::atomic<size_t>* op_run_number) {
352 353
  auto& instr_node = vec_instruction_[instr_id];
  event_manager_.WaitEvent(instr_node, place_);
354

355
  RunInstruction(instr_node);
356

357 358 359 360 361 362 363 364 365 366 367
  event_manager_.RecordEvent(instr_node, place_);
  op_run_number->fetch_add(1, std::memory_order_relaxed);

  auto& next_instr = instr_node.next_instruction_.all_next_ops_;

  for (auto next_i : next_instr) {
    // fetch_sub return value before applying sub
    bool is_ready =
        atomic_deps->at(next_i)->fetch_sub(1, std::memory_order_relaxed) == 1;
    if (is_ready) {
      async_work_queue_.AddTask(vec_instruction_[next_i].type_, [=]() {
368
        RunInstructionAsync(next_i, atomic_deps, atomic_var_ref, op_run_number);
369 370 371 372 373
      });
    }
  }
  // GC infomation
  CheckGC(instr_id, instr_node.gc_check_var_list, atomic_var_ref);
374 375
}

W
wanghuancoder 已提交
376 377
void InterpreterCore::CheckGC(size_t instr_id,
                              const std::vector<size_t>& gc_check_list,
378 379 380
                              AtomicVectorSizeT* atomic_var_ref) {
  auto& var_scope = *global_scope_;

W
wanghuancoder 已提交
381
  for (auto var_id : gc_check_list) {
382 383 384 385
    bool is_ready = atomic_var_ref->at(var_id)->fetch_sub(
                        1, std::memory_order_relaxed) == 1;
    if (is_ready && var_scope.vec_meta_info_[var_id].vardesc_ &&
        !var_scope.vec_meta_info_[var_id].vardesc_->Persistable()) {
W
wanghuancoder 已提交
386 387
      gc_.Add(var_scope.var_list[var_id], gc_event_[instr_id],
              vec_instruction_[instr_id].dev_ctx_);
388 389 390 391
    } else if (is_ready &&
               var_scope.vec_meta_info_[var_id].vardesc_ == nullptr) {
      gc_.Add(var_scope.var_list[var_id], gc_event_[instr_id],
              vec_instruction_[instr_id].dev_ctx_);
W
wanghuancoder 已提交
392 393 394 395
    }
  }
}

W
wanghuancoder 已提交
396
void InterpreterCore::DryRunPrepare(
397 398 399 400 401 402 403 404 405 406 407 408 409
    const std::vector<framework::Tensor>& feed_tensors) {
  auto FeedInput = [&] {
    for (size_t i = 0; i < feed_names_.size(); ++i) {
      auto it = global_scope_->name2id.find(feed_names_[i]);
      assert(it != global_scope_->name2id.end());

      auto feed_tensor = global_scope_->var_list[it->second]
                             ->GetMutable<framework::LoDTensor>();
      feed_tensor->ShareDataWith(feed_tensors[i]);
    }
  };

  if (is_build_ == false) {
W
wanghuancoder 已提交
410 411
    paddle::framework::interpretercore::build_variable_scope(main_program_,
                                                             global_scope_);
412
    FeedInput();
W
wanghuancoder 已提交
413 414
    paddle::framework::interpretercore::build_op_func_list(
        place_, main_program_, &op_list_, &vec_func_list_, global_scope_);
415 416 417 418
    is_build_ = true;
    // convert vec func_list to graph
    Convert();
  }
W
wanghuancoder 已提交
419 420
  // NOTE: Because feed_tensor will be GC after
  // paddle::framework::build_op_func_list, so we should
421 422 423 424 425 426 427
  // call
  // FeedInput again.
  FeedInput();
}

const CostInfo& InterpreterCore::DryRun(
    const std::vector<framework::Tensor>& feed_tensors) {
W
wanghuancoder 已提交
428
  DryRunPrepare(feed_tensors);
429
  // DryRun may be called many times.
W
wanghuancoder 已提交
430 431
  dry_run_profiler_.Reset();
  dry_run_profiler_.Start();
432
  ExecuteInstructionList(vec_instruction_);
433 434
  platform::DeviceContextPool::Instance().Get(place_)->Wait();

W
wanghuancoder 已提交
435 436 437
  dry_run_profiler_.Pause();
  dry_run_profiler_.TotalCUDAAllocatedMemorySize(place_);
  return dry_run_profiler_.GetCostInfo();
438
}
439 440 441

}  // namespace framework
}  // namespace paddle