program_interpreter.h 6.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
// Copyright (c) 2023 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include "paddle/fluid/framework/new_executor/interpreter_base_impl.h"

namespace paddle {
namespace framework {

///
/// \brief Derived Class to interpret the instructions transformed
/// from legacy ProgramDesc.
///

class ProgramInterpreter : public InterpreterBaseImpl {
  using ExecutionConfig = interpreter::ExecutionConfig;
  using InstructionSchedulingPriorityLess = std::function<bool(size_t, size_t)>;
  using SchedulingQueue =
      std::priority_queue<size_t,
                          std::vector<size_t>,
                          InstructionSchedulingPriorityLess>;

 public:
  ProgramInterpreter(
      const platform::Place& place,
      const BlockDesc& block,
      Scope* scope,
      const ExecutionConfig& execution_config = ExecutionConfig());

  ~ProgramInterpreter();

  paddle::framework::FetchList Run(
      const std::vector<std::string>& feed_names,
      const std::vector<phi::DenseTensor>& feed_tensors) override;

  paddle::framework::FetchList Run(const std::vector<std::string>& feed_names,
                                   bool need_fetch = true) override;

  void ShareWorkQueueFrom(InterpreterBaseImpl* src) override;

53 54 55
  void ShareBuildResultsFrom(const InterpreterBaseImpl& src) override;

  // op dependences
Z
zhangbo9674 已提交
56
  const interpreter::DependencyBuilder& GetDependencyBuilder() const;
57 58 59

  std::shared_ptr<std::vector<size_t>> GetDependencyCount() const override;

Z
zhangbo9674 已提交
60
  const interpreter::StreamAnalyzer& GetStreamAnalyzer() const;
61

62 63
  bool IsSharedResultsBuild() const override;

64 65 66 67 68 69 70 71 72 73 74 75
  void SetCopyProgram(std::shared_ptr<ProgramDesc> prog) override;

  void SetSkipGcVars(const std::set<std::string>& skip_gc_vars) override;

  const std::set<std::string>& JitInputVars() const override;

  void SetJitInputVars(const std::set<std::string>& jit_input_vars) override;

  const VariableScope* GetVariableScope() const override;

  void reset_scope(Scope* new_scope) override;

76 77
  const Scope* local_scope() const override;

78 79 80 81 82 83
  const platform::Place& GetPlace() const override { return place_; }

  void SetOutputHooks(const std::vector<HookFunc>& hookfuncs) override {
    hookfuncs_ = hookfuncs;
  }

84 85 86 87 88 89 90 91 92 93 94
  std::unordered_map<std::string, std::shared_ptr<EventInter>>*
  GetForceEventsToWaitInfo() {
    return force_evnets_to_wait_;
  }

  void SetForceEventsToWaitInfo(
      std::unordered_map<std::string, std::shared_ptr<EventInter>>*
          force_evnets_to_wait) {
    force_evnets_to_wait_ = force_evnets_to_wait;
  }

95 96
  bool IsStaticBuild() const { return static_build_; }

97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149
 private:
  // build graph
  void Convert(std::vector<paddle::framework::OpFuncNode>* op_func_nodes);
  void BuildOperatorDependences();
  void BuildAndCacheInstructionCtx(Instruction* instr_node);
  void BuildSkipShareLoDInfo();
  void UpdateSyncOpNum();
  void AnalyseExecuteOrderForTrace();

  // inplace
  void BuildInplace();
  bool BuildInplaceCheckVarIsOnlyInput(
      const std::vector<std::vector<size_t>>& input_var2op, size_t var_index);
  void SetFeedVarsInplaceSkip(const std::vector<std::string>& feed_names);

  // cuda graph
  void CheckCUDAGraphBeforeRun(const std::vector<std::string>& feed_names);
  void PrepareForCUDAGraphCapture();

  // execution
  void RunImpl();
  void ExecuteInstructionList(const std::vector<Instruction>& vec_instr);
  void RunInstructionAsync(size_t instr_id);
  void RunInstruction(const Instruction& instr_node);
  void RunNextInstructions(const Instruction& instr_id,
                           SchedulingQueue* reserved_next_ops);
  void RunOperator(const Instruction& instr_node);
  // Trace
  void TraceInstructionList(const std::vector<Instruction>& vec_instr);

  // only used when program contains no feed op
  void Prepare(const std::vector<std::string>& feed_names,
               const std::vector<phi::DenseTensor>& feed_tensors,
               bool prepare_feed);

  void RecordMemcpyD2H(const Instruction& instr_node);

  // gc
  void RecordStreamForGC(const Instruction& instr);
  void CheckGC(const Instruction& instr);
  void ClearLoDTensorArrayInLocalScope();

  // workqueue
  std::shared_ptr<interpreter::AsyncWorkQueue> GetWorkQueue();

  // scope
  bool HasLocalScope() const;

  // For log and debug
  std::string GetDepsString() const;

  bool is_build_{false};
  bool static_build_{false};
150 151
  // Note(sonder): share the op dependency and event analysis procedure.
  bool is_shared_results_build_{false};
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175

  const platform::Place place_;
  const BlockDesc& block_;  // not owned

  interpreter::DependencyBuilder dependency_builder_;
  interpreter::StreamAnalyzer stream_analyzer_;

  // NOTE(zhiqiu): when add fetch ops in GetInterpreterCore, we will
  // copy a new program and block, the copy_program_ here is used to
  // hold the program, otherwise block_ maybe not valid after the
  // new program is deleted.
  std::shared_ptr<ProgramDesc> copy_program_{nullptr};

  // from variable scope
  std::vector<Variable*> var_list_;
  std::map<std::string, int> name2id_;
  std::vector<VariableMetaInfo> vec_meta_info_;

  std::vector<Instruction> vec_instruction_;  // deconstruct before OpFuncNode

  std::atomic<size_t> unfinished_op_number_{0};

  ExecutionConfig execution_config_;

176 177 178
  std::unordered_map<std::string, std::shared_ptr<EventInter>>*
      force_evnets_to_wait_;

179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194
  VariableScope var_scope_;
  Scope* local_scope_{nullptr};  // not owned

  EventsWaiter main_thread_blocker_;
  std::shared_ptr<interpreter::AsyncWorkQueue> async_work_queue_;

  details::ExceptionHolder exception_holder_;
  std::shared_ptr<EventsWaiter::EventNotifier> exception_notifier_{nullptr};
  std::shared_ptr<EventsWaiter::EventNotifier> completion_notifier_{nullptr};

  std::unique_ptr<InterpreterCoreGarbageCollector> gc_;

  // last_live_ops_[i] contains the id of operators that last access the i-th
  // var
  std::map<size_t, std::set<size_t>> last_live_ops_;

195
  // (*dependecy_count_)[i] contains the number of dependencies that the i-th op
196
  // need to wait
197
  std::shared_ptr<std::vector<size_t>> dependecy_count_;
198 199 200 201 202 203 204 205 206 207 208 209 210 211 212

  std::vector<std::shared_ptr<interpreter::OpDepInfo>> deps_;
  std::vector<std::shared_ptr<interpreter::VarRefInfo>> refs_;

  // used for Trace
  int64_t sync_op_num_{-1};
  std::vector<size_t> trace_execute_order_;

  InstructionSchedulingPriorityLess instruction_scheduling_priority_less;

  std::vector<HookFunc> hookfuncs_;
};

}  // namespace framework
}  // namespace paddle