executor_cache.cc 6.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "paddle/fluid/framework/executor_cache.h"

17 18 19 20 21
namespace paddle {
namespace framework {
class ProgramDesc;
}  // namespace framework
}  // namespace paddle
22 23 24 25 26 27

namespace paddle {
namespace framework {

namespace details {

28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
static ExecutionStrategy GetExecutionStrategy(
    const ExecutorInfoCache::CacheKey &cache_key) {
  framework::ExecutionStrategy execution_strategy;

  switch (cache_key.device_type_) {
    case platform::DeviceType::CPU: {
      execution_strategy.num_threads_ = 2;
      break;
    }
    case platform::DeviceType::CUDA: {
      // NOTE: According experiments, one thread is faster in
      // most model training.
      execution_strategy.num_threads_ = 1;
      break;
    }
    case platform::DeviceType::XPU: {
      execution_strategy.num_threads_ = 1;
      break;
    }
    default:
      PADDLE_THROW(platform::errors::Unavailable("Unsupported Device type %d.",
                                                 cache_key.device_type_));
  }
  execution_strategy.use_device_ = cache_key.device_type_;

  return execution_strategy;
}

void AppendSkipDeletionVars(const std::vector<std::string> &append_vars,
                            std::vector<std::string> *all_vars) {
58 59 60 61 62
  for (auto &var : append_vars) {
    all_vars->emplace_back(var);
  }
}

63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
/*
 * NOTE(Aurelius84): In ParallelExecutor, memory optimized pass will be applied.
 * To avoid eagerly deleting last alive variables which are necessary in
 * backward program, we firstly parse these variable names as
 * skip_eager_vars. While executing pe.run skip_eager_vars are used to
 * skip memory optimization.
 *
 * Variables satisfying the following rules are considered as skip_eager_var:
 *
 *   1. it is an output var in run_program_op
 *   2. it is an input var used in backward_op
 */
void ParseSafeEagerDeletionSkipVars(
    const ProgramDesc &program, int64_t forward_op_nums,
    const std::vector<std::string> &output_var_names,
    std::vector<std::string> *skip_eager_delete_vars) {
  auto all_ops = program.Block(0).AllOps();
  // NOTE: skip `shape` and `fill_constant` op created by
  // fluid.backward.gradients, one forward output will generate one `shape`
  // and `fill_constant`.
  size_t backward_op_start_index =
      forward_op_nums + (output_var_names.size() * 2);

  // step 2: parse the necessary variable of backward op
  std::unordered_set<std::string> op_outputs;
  std::unordered_set<std::string> op_inputs;
  for (auto i = backward_op_start_index; i < all_ops.size(); ++i) {
    framework::OpDesc *op = all_ops[i];
91
    for (const std::string &in_arg_name : op->InputArgumentNames()) {
92
      op_inputs.emplace(in_arg_name);
93 94
    }
    for (const std::string &out_arg_name : op->OutputArgumentNames()) {
95
      op_outputs.emplace(out_arg_name);
96 97 98 99 100 101
    }
  }

  // For the grad op input variables, if it is not output of grad_op, it may
  // be output of forward op and we should set the variables as skip_var to
  // prevent it being deleted when grad op is called multiple times.
102 103 104 105
  for (const std::string &var_name : op_inputs) {
    if (op_outputs.find(var_name) == op_outputs.end()) {
      VLOG(2) << "skip eager var: " << var_name;
      skip_eager_delete_vars->emplace_back(var_name);
106 107
    }
  }
108
  VLOG(3) << "Found skip_eager_delete_vars: " << skip_eager_delete_vars->size();
109
}
110

111 112 113 114 115 116 117 118 119 120
}  // namespace details

// C++11 removes the need for manual locking. Concurrent execution shall wait if
// a static local variable is already being initialized.
// https://stackoverflow.com/questions/11711920/how-to-implement-multithread-safe-singleton-in-c11-without-using-mutex
ExecutorInfoCache &ExecutorInfoCache::Instance() {
  static ExecutorInfoCache g_exe_cache_info_map;
  return g_exe_cache_info_map;
}

121 122 123 124 125 126
void ExecutorInfoCache::Finalize() {
  // NOTE(Aurelius84): DO NOT perform finalize in destructor
  // to avoid problems caused by destructor order of static
  // object.
  info_map_.clear();
}
127

128 129
CacheInfo GetExecutorInfoFromCache(const ExecutorInfoCache::CacheKey &cache_key,
                                   framework::Scope *scope) {
130 131 132
  auto &cached_exe_info = framework::ExecutorInfoCache::Instance();

  if (!cached_exe_info.Has(cache_key)) {
133 134 135 136 137 138 139
    VLOG(1) << "create exe_info for " << cache_key.DebugString();

    // TODO(Aurelius84): Consider to use LRU algorithm to replace this.
    if (cached_exe_info.Size() > 4u /* max_cached_size*/) {
      VLOG(2) << "The cached info size has exceeded max_cached_size: 4, clear "
                 "all cache!";
      cached_exe_info.Finalize();
140 141
    }

142 143 144 145 146 147 148 149 150 151 152 153 154 155
    framework::BuildStrategy build_strategy;
    auto execution_strategy = details::GetExecutionStrategy(cache_key);

    auto graph = std::make_shared<framework::ir::Graph>(
        *cache_key.program_desc_, cache_key.start_op_index_,
        cache_key.end_op_index_);
    auto parallel_executor = std::make_shared<framework::ParallelExecutor>(
        cache_key.place_, scope, execution_strategy, build_strategy,
        graph.get());
    parallel_executor->PrepareVariables(scope);

    framework::ExecutorInfoCache::ValueType cache_val = {parallel_executor,
                                                         graph};
    cached_exe_info.Insert(cache_key, cache_val);
156

157 158
    bool is_new_created = true;
    return std::make_pair(parallel_executor, is_new_created);
159
  } else {
160 161 162 163 164 165 166 167 168 169 170 171 172
    VLOG(1) << "get exe_info from cache by: " << cache_key.DebugString();
    bool is_new_created = false;
    auto cache_val = cached_exe_info.GetMutable(cache_key);
    auto parallel_executor = cache_val.first;

    // update op_handle scope_map in pe->executor_->Graph
    std::unordered_map<Scope *, Scope *> scope_map = {
        {parallel_executor->GetLocalScopes().front(), scope}};
    parallel_executor->ResetOpHandleScopeMapOfGraphs(scope_map);
    // need to recreate tmp variables in new scope
    parallel_executor->PrepareVariables(scope);

    return std::make_pair(parallel_executor, is_new_created);
173 174 175 176 177
  }
}

}  // namespace framework
}  // namespace paddle