executor_cache.cc 18.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "paddle/fluid/framework/executor_cache.h"
16

17
#include "paddle/fluid/framework/new_executor/interpretercore.h"
18
#include "paddle/fluid/framework/op_info.h"
19
#include "paddle/fluid/ir/transforms/inplace_pass.h"
20 21
#include "paddle/fluid/ir/transforms/pd_op_to_kernel_pass.h"
#include "paddle/fluid/ir_adaptor/translator/translate.h"
22 23
#include "paddle/ir/core/program.h"
#include "paddle/ir/core/value.h"
24 25
#include "paddle/ir/pass/pass.h"
#include "paddle/ir/pass/pass_manager.h"
26 27 28
#include "paddle/phi/core/flags.h"

PHI_DECLARE_bool(new_ir_apply_inplace_pass);
29

30 31 32 33 34
namespace paddle {
namespace framework {
class ProgramDesc;
}  // namespace framework
}  // namespace paddle
35 36 37 38 39 40

namespace paddle {
namespace framework {

namespace details {

41
static ExecutionStrategy GetExecutionStrategy(const platform::Place &place) {
42 43
  framework::ExecutionStrategy execution_strategy;

44 45
  auto device_type = platform::Place2DeviceType(place);
  switch (device_type) {
46 47 48 49 50 51 52 53 54 55 56 57
    case platform::DeviceType::CPU: {
      execution_strategy.num_threads_ = 2;
      break;
    }
    case platform::DeviceType::CUDA: {
      // NOTE: According experiments, one thread is faster in
      // most model training.
      execution_strategy.num_threads_ = 1;
      break;
    }
    case platform::DeviceType::XPU: {
      execution_strategy.num_threads_ = 1;
58 59 60 61
      break;
    }
    case platform::DeviceType::IPU: {
      execution_strategy.num_threads_ = 1;
62 63
      break;
    }
64 65 66 67
    case platform::DeviceType::CUSTOM_DEVICE: {
      execution_strategy.num_threads_ = 1;
      break;
    }
68 69
    default:
      PADDLE_THROW(platform::errors::Unavailable("Unsupported Device type %d.",
70
                                                 device_type));
71
  }
72
  execution_strategy.use_device_ = device_type;
73 74 75 76 77 78

  return execution_strategy;
}

void AppendSkipDeletionVars(const std::vector<std::string> &append_vars,
                            std::vector<std::string> *all_vars) {
79 80 81 82 83
  for (auto &var : append_vars) {
    all_vars->emplace_back(var);
  }
}

84 85 86 87 88 89 90 91 92 93 94 95 96
/*
 * NOTE(Aurelius84): In ParallelExecutor, memory optimized pass will be applied.
 * To avoid eagerly deleting last alive variables which are necessary in
 * backward program, we firstly parse these variable names as
 * skip_eager_vars. While executing pe.run skip_eager_vars are used to
 * skip memory optimization.
 *
 * Variables satisfying the following rules are considered as skip_eager_var:
 *
 *   1. it is an output var in run_program_op
 *   2. it is an input var used in backward_op
 */
void ParseSafeEagerDeletionSkipVars(
97 98
    const ProgramDesc &program,
    int64_t forward_op_nums,
99 100 101
    const std::vector<std::string> &output_var_names,
    std::vector<std::string> *skip_eager_delete_vars) {
  auto all_ops = program.Block(0).AllOps();
102
  auto &op_info_map = OpInfoMap::Instance();
103 104 105 106 107 108 109 110 111
  // NOTE: skip `shape` and `fill_constant` op created by
  // fluid.backward.gradients, one forward output will generate one `shape`
  // and `fill_constant`.
  size_t backward_op_start_index =
      forward_op_nums + (output_var_names.size() * 2);

  // step 2: parse the necessary variable of backward op
  std::unordered_set<std::string> op_outputs;
  std::unordered_set<std::string> op_inputs;
112 113
  std::unordered_set<std::string> no_need_buffer_ins;

114 115
  for (auto i = backward_op_start_index; i < all_ops.size(); ++i) {
    framework::OpDesc *op = all_ops[i];
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
    // NOTE: skip NoNeedBufferVars of grad_op and GC its memory in advance.
    auto &op_info = op_info_map.Get(op->Type());
    auto &inferer = op_info.NoNeedBufferVarsInferer();
    no_need_buffer_ins.clear();
    if (inferer != nullptr) {
      no_need_buffer_ins =
          inferer(op->Inputs(), op->Outputs(), op->GetAttrMap());
    }
    for (auto &in_names : op->Inputs()) {
      if (no_need_buffer_ins.count(in_names.first) == 0) {
        for (auto &in_name : in_names.second) {
          op_inputs.emplace(in_name);
        }
      } else {
        VLOG(2) << op->Type() << " has no_need_buffer_in: " << in_names.first
                << " , skip it.";
      }
133
    }
134

135
    for (const std::string &out_arg_name : op->OutputArgumentNames()) {
136
      op_outputs.emplace(out_arg_name);
137 138 139 140 141
    }
  }
  // For the grad op input variables, if it is not output of grad_op, it may
  // be output of forward op and we should set the variables as skip_var to
  // prevent it being deleted when grad op is called multiple times.
142 143 144 145
  for (const std::string &var_name : op_inputs) {
    if (op_outputs.find(var_name) == op_outputs.end()) {
      VLOG(2) << "skip eager var: " << var_name;
      skip_eager_delete_vars->emplace_back(var_name);
146 147
    }
  }
148
  VLOG(3) << "Found skip_eager_delete_vars: " << skip_eager_delete_vars->size();
149
}
150

151 152 153 154 155 156 157 158
void AppendSkipDeletionVars(const std::vector<std::string> &append_vars,
                            std::set<std::string> *all_vars) {
  for (auto &var : append_vars) {
    all_vars->insert(var);
  }
}

std::set<std::string> ParseSafeEagerDeletionSkipVarsSet(
159
    const ProgramDesc &backward_program, bool skip_no_need_buffer) {
160 161 162 163 164 165
  std::set<std::string> skip_eager_delete_vars;
  auto backward_ops = backward_program.Block(0).AllOps();
  auto &op_info_map = OpInfoMap::Instance();
  std::unordered_set<std::string> op_outputs;
  std::unordered_set<std::string> op_inputs;
  std::unordered_set<std::string> no_need_buffer_ins;
166
  for (auto op : backward_ops) {
167
    VLOG(4) << "parse op type: " << op->Type();
168 169 170 171 172 173 174 175
    if (op->Type() == "share_buffer") {
      VLOG(1) << "skip share_buffer op";
      continue;
    }
    // NOTE: skip NoNeedBufferVars of grad_op and GC its memory in advance.
    auto &op_info = op_info_map.Get(op->Type());
    auto &inferer = op_info.NoNeedBufferVarsInferer();
    no_need_buffer_ins.clear();
176 177 178
    // TODO(Aurelius84): Need remove skip_no_need_buffer after cinn fix this
    // problem.
    if (inferer != nullptr && !skip_no_need_buffer) {
179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196
      no_need_buffer_ins =
          inferer(op->Inputs(), op->Outputs(), op->GetAttrMap());
    }
    for (auto &in_names : op->Inputs()) {
      if (no_need_buffer_ins.count(in_names.first) == 0) {
        for (auto &in_name : in_names.second) {
          op_inputs.emplace(in_name);
        }
      } else {
        VLOG(2) << op->Type() << " has no_need_buffer_in: " << in_names.first
                << " , skip it.";
      }
    }
    for (const std::string &out_arg_name : op->OutputArgumentNames()) {
      op_outputs.emplace(out_arg_name);
    }
  }
  for (const std::string &var_name : op_inputs) {
197
    VLOG(4) << "parse op.input: " << var_name;
198 199 200 201 202 203 204 205
    if (op_outputs.find(var_name) == op_outputs.end()) {
      VLOG(1) << "skip eager var: " << var_name;
      skip_eager_delete_vars.insert(var_name);
    }
  }
  VLOG(1) << "Found skip_eager_delete_vars: " << skip_eager_delete_vars.size();
  return skip_eager_delete_vars;
}
206 207 208 209 210 211 212 213 214 215
}  // namespace details

// C++11 removes the need for manual locking. Concurrent execution shall wait if
// a static local variable is already being initialized.
// https://stackoverflow.com/questions/11711920/how-to-implement-multithread-safe-singleton-in-c11-without-using-mutex
ExecutorInfoCache &ExecutorInfoCache::Instance() {
  static ExecutorInfoCache g_exe_cache_info_map;
  return g_exe_cache_info_map;
}

216
static PEAndGraphPair CreateExecutorInfo(
217 218 219 220 221
    const ProgramDesc &program_desc,
    const platform::Place &place,
    int64_t start_op_index,
    int64_t end_op_index,
    framework::Scope *scope,
222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238
    const details::BuildStrategy &build_strategy) {
  auto execution_strategy = details::GetExecutionStrategy(place);
  auto graph = std::make_shared<framework::ir::Graph>(
      program_desc, start_op_index, end_op_index);
  auto parallel_executor = std::make_shared<framework::ParallelExecutor>(
      place, scope, execution_strategy, build_strategy, graph.get());
  parallel_executor->PrepareVariables(scope);
  return std::make_pair(parallel_executor, graph);
}

PEAndGraphPair CreateFixOrderExecutorInfo(const ProgramDesc &program_desc,
                                          const platform::Place &place,
                                          int64_t start_op_index,
                                          int64_t end_op_index,
                                          framework::Scope *scope) {
  details::BuildStrategy build_strategy;
  build_strategy.fix_op_run_order_ = true;
239 240
  auto pe_and_graph = CreateExecutorInfo(
      program_desc, place, start_op_index, end_op_index, scope, build_strategy);
241 242 243
  return pe_and_graph;
}

244 245
CacheInfo GetExecutorInfoFromCache(const ProgramDesc &program_desc,
                                   const platform::Place &place,
246 247 248 249
                                   int64_t start_op_index,
                                   int64_t end_op_index,
                                   bool is_grad,
                                   int64_t program_id,
250
                                   framework::Scope *scope) {
251 252
  auto &cached_exe_info = framework::ExecutorInfoCache::Instance();

253
  if (!cached_exe_info.Has(program_id, is_grad)) {
254 255 256 257 258 259 260
    // TODO(Aurelius84): Consider to use LRU algorithm to replace this.
    if (cached_exe_info.Size() > 4u /* max_cached_size*/) {
      VLOG(2) << "The cached info size has exceeded max_cached_size: 4, clear "
                 "all cache!";
      cached_exe_info.Finalize();
    }

261 262
    VLOG(1) << "create exe_info for " << program_id << " is_grad: " << is_grad;
    auto &build_strategy = cached_exe_info.GetBuildStrategy(program_id);
263

264
    // 2. Construct Graph and ParallelExecutor.
265 266 267 268 269 270
    auto pe_and_graph = CreateExecutorInfo(program_desc,
                                           place,
                                           start_op_index,
                                           end_op_index,
                                           scope,
                                           build_strategy);
271

272 273
    // 3. Insert value into cached map.
    auto &cached_value = cached_exe_info.GetMutable(program_id, is_grad);
274 275
    cached_value.executor_ = pe_and_graph.first;
    cached_value.graph_ = pe_and_graph.second;
276
    return std::make_pair(pe_and_graph.first, true);
277
  } else {
278 279 280
    VLOG(1) << "get exe_info from cache by: " << program_id
            << " is_grad: " << is_grad;
    auto &cached_value = cached_exe_info.GetMutable(program_id, is_grad);
281

282
    auto &parallel_executor = cached_value.executor_;
283 284 285 286 287 288 289
    // update op_handle scope_map in pe->executor_->Graph
    std::unordered_map<Scope *, Scope *> scope_map = {
        {parallel_executor->GetLocalScopes().front(), scope}};
    parallel_executor->ResetOpHandleScopeMapOfGraphs(scope_map);
    // need to recreate tmp variables in new scope
    parallel_executor->PrepareVariables(scope);

290
    return std::make_pair(parallel_executor, false);
291 292 293
  }
}

294 295 296 297 298
InterpreterCoreInfoCache &InterpreterCoreInfoCache::Instance() {
  static InterpreterCoreInfoCache g_info_cache;
  return g_info_cache;
}

299
std::shared_ptr<InterpreterCore> CreateProgramInterpreterCoreInfoToCache(
300 301 302 303 304 305 306
    const ProgramDesc &program_desc,
    const platform::Place &place,
    bool is_grad,
    int64_t program_id,
    framework::Scope *scope) {
  auto &interpretercore_info_cache =
      framework::InterpreterCoreInfoCache::Instance();
307
  if (interpretercore_info_cache.Size() > 10u /* max_cached_size*/) {
308 309
    VLOG(2) << "The cached info size has exceeded max_cached_size: 4, clear "
               "all cache!";
310 311
    interpretercore_info_cache.Finalize();
  }
312 313 314
  interpreter::ExecutionConfig execution_config;
  execution_config.create_local_scope = false;
  execution_config.used_for_jit = true;
315 316 317 318 319 320 321

  std::shared_ptr<InterpreterCore> core = nullptr;

  core.reset(new InterpreterCore(
      place, program_desc.Block(0), scope, execution_config));

  auto &cached_value =
322
      interpretercore_info_cache.GetMutable(program_id, scope, is_grad);
323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346
  cached_value.core_ = core;
  return core;
}

std::shared_ptr<InterpreterCore> CreateNewIRInterpreterCoreInfoToCache(
    std::unique_ptr<::ir::Program> ir_program,
    const platform::Place &place,
    bool is_grad,
    int64_t program_id,
    framework::Scope *scope) {
  auto &interpretercore_info_cache =
      framework::InterpreterCoreInfoCache::Instance();
  if (interpretercore_info_cache.Size() > 10u /* max_cached_size*/) {
    VLOG(2) << "The cached info size has exceeded max_cached_size: 4, clear "
               "all cache!";
    interpretercore_info_cache.Finalize();
  }
  interpreter::ExecutionConfig execution_config;
  execution_config.create_local_scope = false;
  execution_config.used_for_jit = true;

  std::shared_ptr<InterpreterCore> core = nullptr;

  core.reset(new InterpreterCore(
347
      place, {}, std::move(ir_program), scope, execution_config));
348

349
  auto &cached_value =
350
      interpretercore_info_cache.GetMutable(program_id, scope, is_grad);
351 352 353 354
  cached_value.core_ = core;
  return core;
}

355 356 357 358
std::unique_ptr<::ir::Program> ConstructFowardIrProgram(
    const paddle::framework::BlockDesc *forward_global_block,
    const paddle::framework::BlockDesc *backward_global_block,
    const std::vector<std::string> output_names,
359 360
    const std::vector<paddle::Tensor> &x,
    const std::vector<paddle::Tensor> &params) {
361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376
  auto ir_ctx = ::ir::IrContext::Instance();
  auto program = std::make_unique<::ir::Program>(ir_ctx);

  std::set<std::string> set_output_names;
  auto local_program =
      paddle::framework::ProgramDesc(*(forward_global_block->Program()));

  for (auto op_desc : local_program.Block(0).AllOps()) {
    for (const auto &n : op_desc->Outputs()) {
      const auto &input_var_names = n.second;
      for (const auto &var_name : input_var_names) {
        set_output_names.insert(var_name);
      }
    }
  }

377
  // add data op to program
378
  auto *block = local_program.MutableBlock(0);
379 380
  for (auto &in_t : x) {
    auto name = in_t.name();
381 382 383
    if (block->FindVarRecursive(name) == nullptr) {
      continue;
    }
384 385
    auto place = in_t.place().GetType();

386
    auto op_desc = block->PrependOp();
387
    op_desc->SetType("data");
388
    op_desc->SetAttr("shape", std::vector<int64_t>());
389 390 391 392 393 394 395
    // TODO(phlrain) : using tensor dtype
    op_desc->SetAttr("dtype", 0);
    op_desc->SetAttr("place", static_cast<int>(place));
    op_desc->SetAttr("name", name);
    op_desc->SetOutput("out", {name});
  }

H
hong 已提交
396
  std::set<std::string> input_param_names;
397
  for (auto &param : params) {
398
    auto &name = param.name();
399 400 401
    auto place = param.place().GetType();

    auto op_desc = local_program.MutableBlock(0)->PrependOp();
402
    op_desc->SetType("data");
403
    op_desc->SetAttr("shape", std::vector<int64_t>());
404 405 406 407 408
    // TODO(phlrain) : using tensor dtype
    op_desc->SetAttr("dtype", 0);
    op_desc->SetAttr("place", static_cast<int>(place));
    op_desc->SetAttr("name", name);
    op_desc->SetOutput("out", {name});
H
hong 已提交
409 410

    input_param_names.insert(name);
411 412
  }

413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431
  std::set<std::string> set_parameter_names;
  for (auto op_desc : backward_global_block->Program()->Block(0).AllOps()) {
    for (const auto &n : op_desc->Inputs()) {
      const auto &input_var_names = n.second;
      for (const auto &var_name : input_var_names) {
        set_parameter_names.insert(var_name);
      }
    }
  }

  for (auto &t : output_names) {
    set_parameter_names.insert(t);
  }

  for (auto &name : set_parameter_names) {
    if (!set_output_names.count(name)) {
      continue;
    }

H
hong 已提交
432 433 434 435
    if (input_param_names.count(name)) {
      continue;
    }

436
    auto op_desc = local_program.MutableBlock(0)->AppendOp();
H
hong 已提交
437
    op_desc->SetType("shadow_output");
438 439 440 441 442 443 444 445 446 447 448 449
    op_desc->SetAttr("name", name);
    op_desc->SetInput("x", {name});
    op_desc->SetOutput("out", {"@EMPTY@"});
  }

  paddle::translator::ProgramTranslator program_translator(&local_program,
                                                           program.get());

  program_translator.Translate();

  auto ir_res = paddle::dialect::PdOpLowerToKernelPass(program.get());

450 451 452 453 454
  if (FLAGS_new_ir_apply_inplace_pass) {
    ::ir::PassManager pm(::ir::IrContext::Instance(), 3);
    pm.AddPass(::ir::CreateInplacePass());
    pm.Run(ir_res.get());
  }
455

456 457 458 459 460 461 462
  return ir_res;
}

std::unique_ptr<::ir::Program> ConstructBackwardIrProgram(
    const paddle::framework::BlockDesc *backward_global_block,
    const std::vector<paddle::Tensor> &out_grad,
    const std::vector<paddle::Tensor *> &x_grad,
463 464
    const std::vector<paddle::Tensor *> &params_grad,
    const paddle::framework::Scope *scope) {
465 466 467 468 469
  auto ir_ctx = ::ir::IrContext::Instance();
  auto program = std::make_unique<::ir::Program>(ir_ctx);

  auto local_program =
      paddle::framework::ProgramDesc(*(backward_global_block->Program()));
470 471 472 473 474 475 476 477 478

  // get feed with data
  std::set<std::string> set_parameter_names;
  for (auto op_desc : backward_global_block->Program()->Block(0).AllOps()) {
    for (const auto &n : op_desc->Inputs()) {
      const auto &input_var_names = n.second;
      for (const auto &var_name : input_var_names) {
        set_parameter_names.insert(var_name);
      }
479
    }
480 481 482 483 484 485 486 487 488 489 490 491 492 493
  }

  for (auto &var_name : set_parameter_names) {
    if (scope->FindVar(var_name)) {
      auto tensor = scope->FindVar(var_name)->Get<phi::DenseTensor>();
      phi::AllocationType place(phi::AllocationType::UNDEFINED);
      if (tensor.initialized()) {
        place = tensor.place().GetType();
      }

      if (var_name == "@EMPTY@") {
        continue;
      }
      auto op_desc = local_program.MutableBlock(0)->PrependOp();
494
      op_desc->SetType("data");
495
      op_desc->SetAttr("shape", std::vector<int64_t>());
496 497 498 499 500
      // TODO(phlrain) : using tensor dtype
      op_desc->SetAttr("dtype", 0);
      op_desc->SetAttr("place", static_cast<int>(place));
      op_desc->SetAttr("name", var_name);
      op_desc->SetOutput("out", {var_name});
501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516
    }
  }

  std::vector<std::string> param_grad_names;
  for (auto &p_g : params_grad) {
    param_grad_names.push_back(p_g->name());
  }

  for (auto &t : x_grad) {
    param_grad_names.push_back(t->name());
  }
  for (auto &name : param_grad_names) {
    if (name == "@EMPTY@") {
      continue;
    }
    auto op_desc = local_program.MutableBlock(0)->AppendOp();
H
hong 已提交
517
    op_desc->SetType("shadow_output");
518 519 520 521 522 523 524 525 526 527 528
    op_desc->SetAttr("name", name);
    op_desc->SetInput("x", {name});
    op_desc->SetOutput("out", {"@EMPTY@"});
  }

  paddle::translator::ProgramTranslator program_translator(&local_program,
                                                           program.get());
  program_translator.Translate();

  auto res = paddle::dialect::PdOpLowerToKernelPass(program.get());

529 530 531 532 533
  if (FLAGS_new_ir_apply_inplace_pass) {
    ::ir::PassManager pm(::ir::IrContext::Instance(), 3);
    pm.AddPass(::ir::CreateInplacePass());
    pm.Run(res.get());
  }
534

535 536 537
  return res;
}

538 539
}  // namespace framework
}  // namespace paddle