interpretercore.cc 17.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
14

15
#include "paddle/fluid/framework/new_executor/interpretercore.h"
W
wanghuancoder 已提交
16
#include "paddle/fluid/framework/new_executor/interpretercore_util.h"
17

18 19
#include <unordered_set>

20
#include "paddle/fluid/framework/details/share_tensor_buffer_functor.h"
21
#include "paddle/fluid/platform/profiler.h"
22

23 24
PADDLE_DEFINE_EXPORTED_bool(new_executor_use_inplace, true,
                            "Use inplace in new executor");
25

26 27
namespace paddle {
namespace framework {
28 29
// NOTE(Aurelius84): Need a better strategy to determine it.
static constexpr size_t kHostNumThreads = 4;
30 31 32 33 34 35

InterpreterCore::InterpreterCore(const platform::Place& place,
                                 const ProgramDesc& main_prog,
                                 VariableScope* global_scope,
                                 const std::vector<std::string>& feed_names,
                                 const std::vector<std::string>& fetch_names)
W
wanghuancoder 已提交
36 37 38
    : place_(place),
      main_program_(main_prog),
      global_scope_(global_scope),
39
      stream_analyzer_(place),
L
liutiexing 已提交
40
      async_work_queue_(kHostNumThreads, &main_thread_blocker_) {
41
  is_build_ = false;
W
wanghuancoder 已提交
42

43
  feed_names_ = feed_names;
W
wanghuancoder 已提交
44 45

  // Step1: add feedop and fetchop to main_program
W
wanghuancoder 已提交
46 47 48 49 50 51 52 53 54 55
  AddFetch(fetch_names);

  // prune

  // optmize graph pass

  // convert to run graph
}

void InterpreterCore::AddFetch(const std::vector<std::string>& fetch_names) {
W
wanghuancoder 已提交
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
  auto* fetch_holder = main_program_.MutableBlock(0)->Var("fetch_vars");
  fetch_holder->SetType(proto::VarType::FETCH_LIST);
  fetch_holder->SetPersistable(true);

  int i = 0;
  for (auto& fetch_name : fetch_names) {
    // append fetch op
    auto* op = main_program_.MutableBlock(0)->AppendOp();
    op->SetType("fetch_v2");
    op->SetInput("X", {fetch_name});
    op->SetOutput("Out", {"fetch_vars"});
    op->SetAttr("col", {static_cast<int>(i)});
    op->CheckAttrs();
    i++;
  }
71 72
}

W
wanghuancoder 已提交
73 74
paddle::framework::FetchList InterpreterCore::Run(
    const std::vector<framework::Tensor>& feed_tensors) {
W
wanghuancoder 已提交
75 76 77 78
  auto FeedInput = [&] {
    for (size_t i = 0; i < feed_names_.size(); ++i) {
      auto it = global_scope_->name2id.find(feed_names_[i]);
      assert(it != global_scope_->name2id.end());
79

W
wanghuancoder 已提交
80 81 82 83 84
      auto feed_tensor = global_scope_->var_list[it->second]
                             ->GetMutable<framework::LoDTensor>();
      feed_tensor->ShareDataWith(feed_tensors[i]);
    }
  };
85 86

  if (is_build_ == false) {
W
wanghuancoder 已提交
87 88 89 90 91
    paddle::framework::interpretercore::build_variable_scope(main_program_,
                                                             global_scope_);
    FeedInput();
    paddle::framework::interpretercore::build_op_func_list(
        place_, main_program_, &op_list_, &vec_func_list_, global_scope_);
92 93 94 95
    is_build_ = true;
    // convert vec func_list to graph
    Convert();
  } else {
W
wanghuancoder 已提交
96
    FeedInput();
97
    ExecuteInstructionList(vec_instruction_);
98 99
  }

W
wanghuancoder 已提交
100
  // return Fetch Tensors
W
wanghuancoder 已提交
101 102
  return *(global_scope_->var_list[global_scope_->name2id["fetch_vars"]]
               ->GetMutable<framework::FetchList>());
103 104 105 106 107 108 109 110 111 112
}

void InterpreterCore::Convert() {
  input_var2op_info_.resize(global_scope_->var_list.size());

  vec_instruction_.reserve(vec_func_list_.size());
  dependecy_count_.resize(vec_func_list_.size());
  vec_meta_info_.resize(global_scope_->var_list.size());
  for (size_t i = 0; i < vec_func_list_.size(); ++i) {
    Instruction temp_inst;
113 114
    auto* op_base = op_list_[i];
    temp_inst.dev_ctx_ =
115
        stream_analyzer_.ParseDeviceContext(vec_func_list_[i], *op_base);
116
    temp_inst.kernel_func_.compute_func_ = vec_func_list_[i].kernel_func_;
117
    temp_inst.kernel_func_.operator_base_ = op_base;
118 119
    temp_inst.input_index_ = vec_func_list_[i].input_index;
    temp_inst.output_index_ = vec_func_list_[i].output_index;
120
    temp_inst.type_ = vec_func_list_[i].type_;
121 122
    temp_inst.no_data_transform_index_ =
        vec_func_list_[i].no_data_transform_index;
123

W
wanghuancoder 已提交
124 125
    OpInOutInfo info;

126 127 128 129
    std::vector<size_t> gc_check_input_list;
    for (auto& item : vec_func_list_[i].input_index) {
      for (auto id : item.second) {
        input_var2op_info_[id].push_back(i);
W
wanghuancoder 已提交
130 131 132 133 134 135 136 137 138 139 140 141
        // var can be gc-ed
        if (!info.IsBuilt()) {
          info.Build(op_list_[i]);
        }
        if (global_scope_->vec_meta_info_[id].vardesc_) {
          if (info.IsInArgBufferNeeded(
                  global_scope_->vec_meta_info_[id].vardesc_->Name())) {
            gc_check_input_list.push_back(id);
          }
        } else {
          gc_check_input_list.push_back(id);
        }
142 143 144 145 146 147 148 149 150 151 152 153 154 155 156
      }
    }
    std::sort(gc_check_input_list.begin(), gc_check_input_list.end());
    auto last =
        std::unique(gc_check_input_list.begin(), gc_check_input_list.end());
    gc_check_input_list.erase(last, gc_check_input_list.end());
    for (auto var_id : gc_check_input_list) {
      vec_meta_info_[var_id].var_ref_count_++;
    }

    temp_inst.gc_check_var_list.swap(gc_check_input_list);

    vec_instruction_.push_back(temp_inst);
  }

W
wanghuancoder 已提交
157 158 159 160 161 162 163 164 165 166 167 168 169
  for (size_t i = 0; i < vec_instruction_.size(); ++i) {
    // checkout ouput
    for (auto& item : vec_instruction_[i].output_index_) {
      for (auto id : item.second) {
        if (input_var2op_info_[id].size() == 0) {
          // output var not be used by any kernel
          vec_instruction_[i].gc_check_var_list.push_back(id);
          vec_meta_info_[id].var_ref_count_++;
        }
      }
    }
  }

170 171 172 173
  for (size_t i = 0; i < vec_instruction_.size(); ++i) {
    std::vector<size_t> vec_temp;
    for (auto& item : vec_instruction_[i].output_index_) {
      for (auto id : item.second) {
W
wanghuancoder 已提交
174 175
        vec_temp =
            interpretercore::merge_vector(vec_temp, input_var2op_info_[id]);
176 177 178
      }
    }

179 180
    // In Program, op order is a very important information.
    // Op can only add op after it as next as next ops.
181 182 183 184 185 186 187
    std::vector<size_t> filter_next;
    filter_next.reserve(vec_temp.size());
    for (auto item : vec_temp) {
      if (item > i) {
        filter_next.push_back(item);
      }
    }
188

189
    stream_analyzer_.Schedule(filter_next, &vec_instruction_, i);
190 191 192 193 194

    for (auto inst_id : filter_next) {
      dependecy_count_[inst_id]++;
    }
  }
195 196

  for (size_t i = 0; i < vec_instruction_.size(); ++i) {
W
wanghuancoder 已提交
197
    BuildAndCacheInstructionCtx(&vec_instruction_[i], *global_scope_, place_);
198
  }
W
wanghuancoder 已提交
199

200 201
  BuildSkipShareLoDInfo();

W
wanghuancoder 已提交
202 203 204 205
  for (size_t i = 0; i < vec_instruction_.size(); ++i) {
    gc_event_.emplace_back(vec_instruction_[i].execution_ctx_.get()->GetPlace(),
                           platform::GenerateDeviceEventFlag());
  }
206 207 208 209 210 211

  if (FLAGS_new_executor_use_inplace) {
    BuildInplace();
  }
}

212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228
bool InterpreterCore::BuildInplaceCheckVarIsOnlyInput(size_t var_index) {
  if (!global_scope_->vec_meta_info_[var_index].vardesc_) {
    return input_var2op_info_[var_index].size() == 1;
  } else {
    int is_input_cnt = 0;
    for (auto inst_id : input_var2op_info_[var_index]) {
      OpInOutInfo info;
      info.Build(vec_instruction_[inst_id].kernel_func_.operator_base_);
      if (info.IsInArgBufferNeeded(
              global_scope_->vec_meta_info_[var_index].vardesc_->Name())) {
        is_input_cnt++;
      }
    }
    return is_input_cnt == 1;
  }
}

229 230 231 232 233 234 235 236 237 238 239 240 241 242 243
void InterpreterCore::BuildInplace() {
  for (size_t i = 0; i < vec_instruction_.size(); ++i) {
    if (!vec_instruction_[i]
             .kernel_func_.operator_base_->Info()
             .infer_inplace_) {
      continue;
    }

    auto in_to_outs =
        vec_instruction_[i].kernel_func_.operator_base_->Info().infer_inplace_(
            platform::is_gpu_place(vec_instruction_[i].dev_ctx_->GetPlace()));

    for (auto& pair : in_to_outs) {
      auto iter = vec_instruction_[i].input_index_.find(pair.first);
      if (iter != vec_instruction_[i].input_index_.end()) {
244
        if (BuildInplaceCheckVarIsOnlyInput(iter->second[0])) {
245 246 247 248 249 250 251
          auto iterout = vec_instruction_[i].output_index_.find(pair.second);
          if (iterout != vec_instruction_[i].output_index_.end()) {
            auto invar = global_scope_->var_list[iter->second[0]];
            auto outvar = global_scope_->var_list[iterout->second[0]];
            if (invar && outvar) {
              vec_instruction_[i].vec_inplace_in_to_out_.emplace_back(invar,
                                                                      outvar);
252 253 254 255 256 257 258 259 260
              VLOG(3) << "inplace "
                      << vec_instruction_[i].kernel_func_.operator_base_->Type()
                      << " "
                      << global_scope_->vec_meta_info_[iter->second[0]]
                             .vardesc_->Name()
                      << " -> "
                      << global_scope_->vec_meta_info_[iterout->second[0]]
                             .vardesc_->Name()
                      << std::endl;
261 262 263 264 265 266
            }
          }
        }
      }
    }
  }
267 268
}

W
wanghuancoder 已提交
269 270 271
void InterpreterCore::BuildAndCacheInstructionCtx(
    Instruction* instr_node, const VariableScope& var_scope,
    const platform::Place& place) {
272 273
  auto op_base = instr_node->kernel_func_.operator_base_;

274
  VariableValueMap ins_map;
275
  for (auto& var_name_item : instr_node->input_index_) {
276 277 278 279 280 281 282 283 284 285
    std::vector<Variable*> input_vars;

    input_vars.reserve(var_name_item.second.size());
    for (auto& id : var_name_item.second) {
      input_vars.emplace_back(var_scope.var_list[id]);
    }
    ins_map.emplace(var_name_item.first, std::move(input_vars));
  }

  VariableValueMap outs_map;
286
  for (auto& var_name_item : instr_node->output_index_) {
287 288 289 290 291 292 293 294 295
    std::vector<Variable*> out_vars;

    out_vars.reserve(var_name_item.second.size());
    for (auto& id : var_name_item.second) {
      out_vars.emplace_back(var_scope.var_list[id]);
    }
    outs_map.emplace(var_name_item.first, std::move(out_vars));
  }

296 297 298
  instr_node->runtime_ctx_.reset(new RuntimeContext({}, {}));
  instr_node->runtime_ctx_->inputs.swap(ins_map);
  instr_node->runtime_ctx_->outputs.swap(outs_map);
299

300 301
  instr_node->infershape_ctx_.reset(new InterpretercoreInferShapeContext(
      *op_base, *instr_node->runtime_ctx_.get()));
302

303
  auto* dev_ctx = instr_node->dev_ctx_;
304 305
  Scope scope;

306 307 308 309
  instr_node->execution_ctx_.reset(new ExecutionContext(
      *op_base, scope, *dev_ctx, *instr_node->runtime_ctx_.get()));
}

310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329
void InterpreterCore::BuildSkipShareLoDInfo() {
  for (size_t i = 0; i < vec_instruction_.size(); ++i) {
    bool can_skip_lod = true;
    for (auto& input : vec_instruction_[i].runtime_ctx_.get()->inputs) {
      for (auto& var : input.second) {
        if (var->IsType<LoDTensor>()) {
          if (var->Get<LoDTensor>().lod().size() != 0) {
            can_skip_lod = false;
            break;
          }
        } else {
          can_skip_lod = false;
          break;
        }
      }
    }
    vec_instruction_[i].infershape_ctx_.get()->SetSkipLoD(can_skip_lod);
  }
}

330
void InterpreterCore::RunInstruction(const Instruction& instr_node) {
331 332 333
  VLOG(3) << "RunInstruction:  "
          << instr_node.kernel_func_.operator_base_->Type();

334 335 336 337 338 339
  {
    platform::RecordEvent infershape_event("InferShape");
    static_cast<const framework::OperatorWithKernel*>(
        instr_node.kernel_func_.operator_base_)
        ->InferShape(instr_node.infershape_ctx_.get());
  }
340

341 342 343 344 345 346 347 348 349 350
  if (FLAGS_new_executor_use_inplace) {
    for (auto& pair : instr_node.vec_inplace_in_to_out_) {
      const auto& in = paddle::framework::details::GetTensorFromVar(pair.first);
      auto* out =
          paddle::framework::details::GetMutableTensorFromVar(pair.second);
      if (in.dims() == out->dims()) {
        out->ShareBufferWith(in);
      }
    }
  }
351 352 353 354
  {
    platform::RecordEvent compute_event("Compute");
    instr_node.kernel_func_.compute_func_(*instr_node.execution_ctx_.get());
  }
355 356 357
}

void InterpreterCore::ExecuteInstructionList(
358
    const std::vector<Instruction>& vec_instr) {
359 360 361
  async_work_queue_.PrepareAtomicDeps(dependecy_count_);
  async_work_queue_.PrepareAtomicVarRef(vec_meta_info_);
  op_run_number_ = 0;
362

363 364
  for (size_t i = 0; i < dependecy_count_.size(); ++i) {
    if (dependecy_count_[i] == 0) {
365 366
      async_work_queue_.AddTask(vec_instr[i].type_,
                                [&, i] { RunInstructionAsync(i); });
367 368 369
    }
  }

L
liutiexing 已提交
370 371
  auto event_id = main_thread_blocker_.WaitEvent();
  VLOG(3) << "event_id " << event_id;
372

373
  PADDLE_ENFORCE_EQ(
374
      op_run_number_.load(), vec_instr.size(),
375 376
      platform::errors::Fatal(
          "Required op_run_number == %d, but received op_run_number = %d.",
377
          vec_instr.size(), op_run_number_.load()));
378
}
379

L
liutiexing 已提交
380 381
void InterpreterCore::RunNextInstructions(
    const Instruction& instr, std::queue<size_t>* reserved_next_ops) {
382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399
  auto& next_instr = instr.next_instruction_;
  auto& atomic_deps = async_work_queue_.AtomicDeps();
  auto IsReady = [&](size_t next_id) {
    return atomic_deps[next_id]->fetch_sub(1, std::memory_order_relaxed) == 1;
  };

  if (instr.type_ == OpFuncType::kQueueAsync) {
    // move all sync_ops into other threads
    for (auto next_id : next_instr.synchronize_run_) {
      if (IsReady(next_id)) {
        async_work_queue_.AddTask(
            vec_instruction_[next_id].type_,
            [&, next_id] { RunInstructionAsync(next_id); });
      }
    }
    // keep all async_ops running in current thread
    for (auto next_id : next_instr.direct_run_) {
      if (IsReady(next_id)) {
L
liutiexing 已提交
400
        reserved_next_ops->push(next_id);
401 402 403 404
      }
    }
    for (auto next_id : next_instr.event_wait_run_) {
      if (IsReady(next_id)) {
L
liutiexing 已提交
405
        reserved_next_ops->push(next_id);
406 407 408 409 410 411 412 413 414 415 416
      }
    }
  } else {
    // move async_ops into async_thread
    for (auto next_id : next_instr.event_wait_run_) {
      if (IsReady(next_id)) {
        async_work_queue_.AddTask(
            vec_instruction_[next_id].type_,
            [&, next_id] { RunInstructionAsync(next_id); });
      }
    }
417 418 419 420
    auto direct_run_ops = interpretercore::merge_vector(
        next_instr.synchronize_run_, next_instr.direct_run_);
    size_t first_op = 0;
    for (auto next_id : direct_run_ops) {
421 422
      if (IsReady(next_id)) {
        // only keep one op running in current thread
423 424
        if (first_op == 0) {
          first_op = next_id;
425 426 427 428 429 430 431 432
          continue;
        }
        // move rest ops into other threads
        async_work_queue_.AddTask(
            vec_instruction_[next_id].type_,
            [&, next_id] { RunInstructionAsync(next_id); });
      }
    }
L
liutiexing 已提交
433
    if (first_op != 0) reserved_next_ops->push(first_op);
434 435 436 437
  }
}

void InterpreterCore::RunInstructionAsync(size_t instr_id) {
L
liutiexing 已提交
438 439 440 441 442 443 444 445 446
  std::queue<size_t> ready_ops;
  ready_ops.push(instr_id);
  while (!ready_ops.empty()) {
    instr_id = ready_ops.front();
    ready_ops.pop();
    auto& instr_node = vec_instruction_[instr_id];
    platform::RecordEvent instruction_event(
        instr_node.kernel_func_.operator_base_->Type());
    event_manager_.WaitEvent(instr_node, place_);
447

L
liutiexing 已提交
448
    RunInstruction(instr_node);
449

L
liutiexing 已提交
450 451
    event_manager_.RecordEvent(instr_node, place_);
    op_run_number_.fetch_add(1, std::memory_order_relaxed);
452

L
liutiexing 已提交
453 454
    // GC infomation
    CheckGC(instr_id, instr_node.gc_check_var_list);
455

L
liutiexing 已提交
456 457
    RunNextInstructions(instr_node, &ready_ops);
  }
458 459
}

W
wanghuancoder 已提交
460
void InterpreterCore::CheckGC(size_t instr_id,
461
                              const std::vector<size_t>& gc_check_list) {
462
  auto& var_scope = *global_scope_;
463
  auto& atomic_var_ref = async_work_queue_.AtomicVarRef();
464

W
wanghuancoder 已提交
465
  for (auto var_id : gc_check_list) {
466 467
    bool is_ready =
        atomic_var_ref[var_id]->fetch_sub(1, std::memory_order_relaxed) == 1;
468 469
    if (is_ready && var_scope.vec_meta_info_[var_id].vardesc_ &&
        !var_scope.vec_meta_info_[var_id].vardesc_->Persistable()) {
W
wanghuancoder 已提交
470 471
      gc_.Add(var_scope.var_list[var_id], gc_event_[instr_id],
              vec_instruction_[instr_id].dev_ctx_);
472 473 474 475
    } else if (is_ready &&
               var_scope.vec_meta_info_[var_id].vardesc_ == nullptr) {
      gc_.Add(var_scope.var_list[var_id], gc_event_[instr_id],
              vec_instruction_[instr_id].dev_ctx_);
W
wanghuancoder 已提交
476 477 478 479
    }
  }
}

W
wanghuancoder 已提交
480
void InterpreterCore::DryRunPrepare(
481 482 483 484 485 486 487 488 489 490 491 492 493
    const std::vector<framework::Tensor>& feed_tensors) {
  auto FeedInput = [&] {
    for (size_t i = 0; i < feed_names_.size(); ++i) {
      auto it = global_scope_->name2id.find(feed_names_[i]);
      assert(it != global_scope_->name2id.end());

      auto feed_tensor = global_scope_->var_list[it->second]
                             ->GetMutable<framework::LoDTensor>();
      feed_tensor->ShareDataWith(feed_tensors[i]);
    }
  };

  if (is_build_ == false) {
W
wanghuancoder 已提交
494 495
    paddle::framework::interpretercore::build_variable_scope(main_program_,
                                                             global_scope_);
496
    FeedInput();
W
wanghuancoder 已提交
497 498
    paddle::framework::interpretercore::build_op_func_list(
        place_, main_program_, &op_list_, &vec_func_list_, global_scope_);
499 500 501 502
    is_build_ = true;
    // convert vec func_list to graph
    Convert();
  }
W
wanghuancoder 已提交
503 504
  // NOTE: Because feed_tensor will be GC after
  // paddle::framework::build_op_func_list, so we should
505 506 507 508 509 510 511
  // call
  // FeedInput again.
  FeedInput();
}

const CostInfo& InterpreterCore::DryRun(
    const std::vector<framework::Tensor>& feed_tensors) {
W
wanghuancoder 已提交
512
  DryRunPrepare(feed_tensors);
513
  // DryRun may be called many times.
W
wanghuancoder 已提交
514 515
  dry_run_profiler_.Reset();
  dry_run_profiler_.Start();
516
  ExecuteInstructionList(vec_instruction_);
517 518
  platform::DeviceContextPool::Instance().Get(place_)->Wait();

W
wanghuancoder 已提交
519 520 521
  dry_run_profiler_.Pause();
  dry_run_profiler_.TotalCUDAAllocatedMemorySize(place_);
  return dry_run_profiler_.GetCostInfo();
522
}
523 524 525

}  // namespace framework
}  // namespace paddle