#pragma once #include #include #include #include #include #include #include #include "megbrain/comp_node.h" #include "megbrain/imperative/interpreter.h" #include "megbrain/imperative/profiler.h" #include "megbrain/utils/mempool.h" #include "./commands.h" #include "./option_manager.h" #include "./stack_manager.h" #include "./tensor_info.h" #include "../profiler/events.h" #include "megbrain/imperative/backtrace.h" namespace mgb::imperative::interpreter::intl { using Handle = Interpreter::Handle; struct InterpreterImpl : Interpreter { std::unique_ptr create_channel() override; }; /*! * \brief implement Channel to execute the commands asynchronously, * almost commands are executed by the worker threads, commands are sent * by the interface */ struct ChannelImpl : Interpreter::Channel, NonCopyableObj, NonMoveableObj { ChannelImpl(); ~ChannelImpl() override; Handle put(const HostTensorND& value, bool no_cache) override; Handle put(const DeviceTensorND& value, const HostTensorND& hvalue) override; void del(Handle) override; void drop(Handle) override; SmallVector apply_op( std::shared_ptr op, const SmallVector& inputs) override; HostTensorND get_value(Handle) override; TensorShape get_shape(Handle) override; DType get_dtype(Handle) override; CompNode get_device(Handle) override; DeviceTensorND get_dev_tensor(Handle) override; bool check_available() override; void sync() override; void close() override; size_t get_option(std::string name) override; void set_option(std::string name, size_t value) override; void clear_candidates() override; void start_profile() override; void stop_profile() override; void stop_step() override; void push_scope(std::string, ScopeType type = ScopeType::DEFAULT) override; void pop_scope(std::string, ScopeType type = ScopeType::DEFAULT) override; BackTraceInfoPtr& get_backtrace() override; void set_backtrace(BackTraceInfoPtr bt) override; void clear_backtrace() override; bool worker_started() const; void update_status_to_forked(void); void assert_available() const; static std::unordered_set m_all_active_channels; static MGB_MUTEX m_all_active_channels_mutex; private: struct WorkQueue; struct State; TensorInfo* alloc(); void init(TensorInfo*, LogicalTensorDesc&& desc); void free(TensorInfo*); void real_free(TensorInfo*); void recursive_free(TensorInfo*); void do_drop(TensorInfo*, bool); void detach_users(TensorInfo*); TensorInfo* put_impl(const HostTensorND& value, bool no_cache); TensorInfo* put_impl(const DeviceTensorND& value, const HostTensorND& hvalue); void del_impl(Handle); void sync_impl(); SmallVector apply_op_impl( std::shared_ptr op, const SmallVector& inputs); TensorPtr wait_tensor(TensorInfo* info, profiler::TensorProp prop); void notify_tensor_unsafe(TensorInfo* info); void process_one_task(Command&); void check_worker_exc_unsafe(); void produce_tensor(TensorInfo* dest, TensorPtr ptr); void release_tensor(TensorInfo* dest); void regenerate(TensorInfo* dest); void flush_apply_stack(); void do_apply_op(const ApplyOp& cmd, std::string reason); void dispatch_default_cpu( std::shared_ptr op, const SmallVector& input_infos, const SmallVector& input_descs, SmallVector* outputs); void dispatch_kernel( std::shared_ptr op, const SmallVector& input_infos, const SmallVector& input_descs, SmallVector* outputs); void push_scope(std::string, State&); void pop_scope(std::string, State&); void assert_in_channel(); void assert_in_worker(); std::thread::id get_worker_tid(); void sample_on_device(CompNode device, bool force); // valid => status != Deleted std::unordered_set collect_valid_tensors(); std::mutex m_mutex; Spinlock m_spin; std::condition_variable m_cv; MemPool m_pool; std::unordered_set m_valid_handle; TensorInfo* m_waitee = nullptr; BackTraceInfoPtr m_bt = nullptr; Spinlock m_pool_spin; Spinlock m_info_spin; uint64_t m_waitee_id = 0; std::exception_ptr m_worker_exc; std::function m_profile_dump_callback; size_t m_storage_id = 0; // TODO: use explicit struct std::stack> m_apply_stack; bool m_applying = false; enum class ChannelRunningStatus { RUNING, CLOSED, FORKED }; ChannelRunningStatus m_status = ChannelRunningStatus::RUNING; struct WorkQueue : AsyncQueueSC { // set max_spin=0 to prevent Queue fetch task in busy wait manner. // this won't affect throughput when python interpreter is sending enough task, // but will significantly save CPU time when waiting for task, e.g. wait for // data input limit pending tasks to 10000 WorkQueue(ChannelImpl* owner) : AsyncQueueSC(0, 10000), m_owner(owner) { sys::set_thread_name("interpreter"); if (const char* env_val = MGB_GETENV("MEGENGINE_ASYNC_QUEUE_SIZE")) { int len = strlen(env_val); for (int i = 0; i < len; i++) { mgb_assert( env_val[i] >= '0' && env_val[i] <= '9', "async queue size should be an integer"); } size_t val; sscanf(env_val, "%zu", &val); update_max_items(val); } } void process_one_task(Command& icmd) { m_owner->process_one_task(icmd); } void on_async_queue_worker_thread_start() override; private: ChannelImpl* m_owner; } m_worker; struct State { std::thread::id tid; OptionManager options; }; struct ChannelState : State { StackManager stack_manager; }; struct WorkerState : State {}; ChannelState m_channel_state; WorkerState m_worker_state; /*! * \brief A framework of dynamic sublienar memory optimization * * Note: The main idea is that during the training process, if the memory * usage exceeds the threshold, select some tensors to evict until the * memory usage is below the threshold. */ struct DynamicSublinear { /*! * \brief find an available tensor with the largest evaluation function * * Note: An available tensor must satisfy: (1) has computing path, * (2) is in memory, (3) is not pinned. Evaluation function refers to: * @see: TensorInfo::eval_func. * * \return the pointer of the best tensor; nullptr is returned if no * available tensor is found */ TensorInfo* find_best_tensor(bool); /*! * \brief estimate the cost of recomputing tensor ptr * * Note: We define the cost as the sum of the costs of each evicted * components where all the neighbors of ptr are located. */ double estimate_neighbor_cost(TensorInfo* ptr); /*! * \brief update the last used time of the tensor ptr */ void update_used_time(TensorInfo* ptr); /*! * \brief merge the two specified sets (the set in which the element x * is located, and the set in which the element y is located) */ void merge(std::shared_ptr& x, std::shared_ptr& y); /*! * \brief return the representative of the set that contains the * element x */ std::shared_ptr find_father(std::shared_ptr& x); /*! * \brief update DSU after recomputing tensor ptr * * Delete ptr from the set where ptr is located. Since DSU does not * support this operation, instead, we reset the DSU father of ptr, and * subtract the recomputation cost of ptr from the cost of the original * set. */ void update_dsu_after_recompute(TensorInfo* ptr); /*! * \brief update DSU after evicting tensor ptr * * Check the neighbors of x, that is, the input and output tensors, and * if they are evicted, merge their respective sets. */ void update_dsu_after_evict(TensorInfo* ptr); /*! * \brief pin the tensors in vec */ void pin(const SmallVector& vec); /*! * \brief unpin the tensors in vec */ void unpin( const SmallVector& vec, size_t& dtr_evictee_minimum_size); /*! * \brief add the tensor to the candidate set * * If the size of the tensor does not exceed the minimum threshold, * it will do nothing. */ void insert_candidate(TensorInfo* ptr); /*! * \brief erase the tensor from the candidate set * * If the size of the tensor does not exceed the minimum threshold, * it will do nothing. */ void erase_candidate(TensorInfo* ptr); //! estimate the current time, in order to reduce the overhead of timer double estimate_timestamp = 0; //! the comp node where dynamic sublinear memory optimization works CompNode comp_node; //! store all tensors that may be evicted SmallVector candidates; bool is_bad_op(std::string op_name) { return std::find(op_blacklist.begin(), op_blacklist.end(), op_name) != op_blacklist.end(); } // operators that cannot be re-computed, including : // distributed operators, inplace operator, random generator operators std::vector op_blacklist = { "CollectiveComm", "InplaceAdd", "ParamPackSplit", "ParamPackConcat", "GaussianRNG", "UniformRNG", "GammaRNG", "PermutationRNG", "PoissonRNG", "BetaRNG"}; } m_dtr; //! automatically evict an optimal tensor bool auto_evict(size_t); void alloc_tensor_with_evict(OwnedBlob*); // assert thread id when call get_xxx_state to avoid misuse ChannelState& get_channel_state(); WorkerState& get_worker_state(); }; } // namespace mgb::imperative::interpreter::intl // vim: syntax=cpp.doxygen foldmethod=marker foldmarker=f{{{,f}}}