interpreter_impl.h 10.3 KB
Newer Older
1 2 3 4 5
#pragma once

#include <deque>
#include <future>
#include <list>
6
#include <stack>
7 8 9
#include <thread>
#include <unordered_set>
#include <variant>
10
#include "megbrain/comp_node.h"
11 12
#include "megbrain/imperative/interpreter.h"
#include "megbrain/imperative/profiler.h"
M
Megvii Engine Team 已提交
13
#include "megbrain/utils/mempool.h"
14 15 16

#include "./commands.h"
#include "./option_manager.h"
17
#include "./stack_manager.h"
M
Megvii Engine Team 已提交
18
#include "./tensor_info.h"
19 20

#include "../profiler/events.h"
21 22 23 24 25 26 27 28 29

namespace mgb::imperative::interpreter::intl {

using Handle = Interpreter::Handle;

struct InterpreterImpl : Interpreter {
    std::unique_ptr<Channel> create_channel() override;
};

30 31 32 33 34
/*!
 * \brief implement Channel to execute the commands asynchronously,
 * almost commands are executed by the worker threads, commands are sent
 * by the interface
 */
35
struct ChannelImpl : Interpreter::Channel, NonCopyableObj, NonMoveableObj {
36 37 38 39
    ChannelImpl();
    ~ChannelImpl() override;

    Handle put(const HostTensorND& value, bool no_cache) override;
40
    Handle put(const DeviceTensorND& value, const HostTensorND& hvalue) override;
41 42 43 44 45

    void del(Handle) override;
    void drop(Handle) override;

    SmallVector<Handle> apply_op(
M
Megvii Engine Team 已提交
46
            std::shared_ptr<OpDef> op, const SmallVector<Handle>& inputs) override;
47 48 49 50 51 52 53 54

    HostTensorND get_value(Handle) override;
    TensorShape get_shape(Handle) override;
    DType get_dtype(Handle) override;
    CompNode get_device(Handle) override;

    DeviceTensorND get_dev_tensor(Handle) override;

55
    bool check_available() override;
56 57 58
    void sync() override;
    void close() override;

59 60
    size_t get_option(std::string name) override;
    void set_option(std::string name, size_t value) override;
61
    void clear_candidates() override;
62

63 64
    void start_profile() override;
    void stop_profile() override;
65 66 67

    void push_scope(std::string) override;
    void pop_scope(std::string) override;
M
Megvii Engine Team 已提交
68

69 70 71 72 73 74 75
    bool worker_started() const;
    void update_status_to_forked(void);
    void assert_available() const;

    static std::unordered_set<ChannelImpl*> m_all_active_channels;
    static MGB_MUTEX m_all_active_channels_mutex;

76
private:
77 78 79
    struct WorkQueue;
    struct State;

80
    TensorInfo* alloc();
81
    void init(TensorInfo*, LogicalTensorDesc&& desc);
82
    void free(TensorInfo*);
83 84 85
    void real_free(TensorInfo*);
    void recursive_free(TensorInfo*);
    void do_drop(TensorInfo*, bool);
86 87
    void detach_users(TensorInfo*);

88
    TensorInfo* put_impl(const HostTensorND& value, bool no_cache);
89
    TensorInfo* put_impl(const DeviceTensorND& value, const HostTensorND& hvalue);
90 91
    void del_impl(Handle);
    void sync_impl();
92
    SmallVector<Handle> apply_op_impl(
M
Megvii Engine Team 已提交
93
            std::shared_ptr<OpDef> op, const SmallVector<Handle>& inputs);
94 95 96
    TensorPtr wait_tensor(TensorInfo* info, profiler::TensorProp prop);
    void notify_tensor_unsafe(TensorInfo* info);

97
    void process_one_task(Command&);
98 99 100

    void check_worker_exc_unsafe();

101
    void produce_tensor(TensorInfo* dest, TensorPtr ptr);
102 103 104 105

    void release_tensor(TensorInfo* dest);

    void regenerate(TensorInfo* dest);
106
    void flush_apply_stack();
107
    void do_apply_op(const ApplyOp& cmd, std::string reason);
M
Megvii Engine Team 已提交
108

109
    void dispatch_default_cpu(
M
Megvii Engine Team 已提交
110 111 112
            std::shared_ptr<OpDef> op, const SmallVector<TensorInfo*>& input_infos,
            const SmallVector<LogicalTensorDesc>& input_descs,
            SmallVector<Handle>* outputs);
113
    void dispatch_kernel(
M
Megvii Engine Team 已提交
114 115 116
            std::shared_ptr<OpDef> op, const SmallVector<TensorInfo*>& input_infos,
            const SmallVector<LogicalTensorDesc>& input_descs,
            SmallVector<Handle>* outputs);
117

118 119 120
    void push_scope(std::string, State&);
    void pop_scope(std::string, State&);

121 122 123 124
    void assert_in_channel();
    void assert_in_worker();
    std::thread::id get_worker_tid();

125 126 127 128 129
    void sample_on_device(CompNode device, bool force);

    // valid => status != Deleted
    std::unordered_set<TensorInfo*> collect_valid_tensors();

130
    std::mutex m_mutex;
131
    Spinlock m_spin;
132 133 134 135
    std::condition_variable m_cv;
    MemPool<TensorInfo> m_pool;
    std::unordered_set<Handle> m_valid_handle;
    TensorInfo* m_waitee = nullptr;
136 137
    Spinlock m_pool_spin;
    Spinlock m_info_spin;
138
    uint64_t m_waitee_id = 0;
139
    std::exception_ptr m_worker_exc;
140
    std::function<void(std::string, std::string)> m_profile_dump_callback;
141
    size_t m_storage_id = 0;
142 143
    // TODO: use explicit struct
    std::stack<std::tuple<ApplyOp, size_t, TensorInfo*, std::string>> m_apply_stack;
144
    bool m_applying = false;
145 146 147

    enum class ChannelRunningStatus { RUNING, CLOSED, FORKED };
    ChannelRunningStatus m_status = ChannelRunningStatus::RUNING;
148

149
    struct WorkQueue : AsyncQueueSC<Command, WorkQueue> {
150 151
        // set max_spin=0 to prevent Queue fetch task in busy wait manner.
        // this won't affect throughput when python interpreter is sending enough task,
M
Megvii Engine Team 已提交
152 153
        // but will significantly save CPU time when waiting for task, e.g. wait for
        // data input limit pending tasks to 10000
154
        WorkQueue(ChannelImpl* owner)
155
                : AsyncQueueSC<Command, WorkQueue>(0, 10000), m_owner(owner) {
156
            sys::set_thread_name("interpreter");
157 158
            if (const char* env_val = MGB_GETENV("MEGENGINE_ASYNC_QUEUE_SIZE")) {
                int len = strlen(env_val);
M
Megvii Engine Team 已提交
159 160 161 162
                for (int i = 0; i < len; i++) {
                    mgb_assert(
                            env_val[i] >= '0' && env_val[i] <= '9',
                            "async queue size should be an integer");
163 164 165 166 167
                }
                size_t val;
                sscanf(env_val, "%zu", &val);
                update_max_items(val);
            }
168
        }
M
Megvii Engine Team 已提交
169
        void process_one_task(Command& icmd) { m_owner->process_one_task(icmd); }
170
        void on_async_queue_worker_thread_start() override;
M
Megvii Engine Team 已提交
171

172 173 174 175
    private:
        ChannelImpl* m_owner;
    } m_worker;

176
    struct State {
177
        std::thread::id tid;
178
        OptionManager options;
179 180
    };

M
Megvii Engine Team 已提交
181
    struct ChannelState : State {
182
        StackManager stack_manager;
183 184
    };

M
Megvii Engine Team 已提交
185
    struct WorkerState : State {};
186

187 188
    ChannelState m_channel_state;
    WorkerState m_worker_state;
189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204

    /*!
     * \brief A framework of dynamic sublienar memory optimization
     *
     * Note: The main idea is that during the training process, if the memory
     * usage exceeds the threshold, select some tensors to evict until the
     * memory usage is below the threshold.
     */
    struct DynamicSublinear {
        /*!
         * \brief find an available tensor with the largest evaluation function
         *
         * Note: An available tensor must satisfy: (1) has computing path,
         * (2) is in memory, (3) is not pinned. Evaluation function refers to:
         * @see: TensorInfo::eval_func.
         *
M
Megvii Engine Team 已提交
205
         * \return the pointer of the best tensor; nullptr is returned if no
206 207
         * available tensor is found
         */
208
        TensorInfo* find_best_tensor(bool);
209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226

        /*!
         * \brief estimate the cost of recomputing tensor ptr
         *
         * Note: We define the cost as the sum of the costs of each evicted
         * components where all the neighbors of ptr are located.
         */
        double estimate_neighbor_cost(TensorInfo* ptr);

        /*!
         * \brief update the last used time of the tensor ptr
         */
        void update_used_time(TensorInfo* ptr);

        /*!
         * \brief merge the two specified sets (the set in which the element x
         * is located, and the set in which the element y is located)
         */
M
Megvii Engine Team 已提交
227
        void merge(std::shared_ptr<DsuNode>& x, std::shared_ptr<DsuNode>& y);
228 229 230 231 232

        /*!
         * \brief return the representative of the set that contains the
         * element x
         */
M
Megvii Engine Team 已提交
233
        std::shared_ptr<DsuNode> find_father(std::shared_ptr<DsuNode>& x);
234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260

        /*!
         * \brief update DSU after recomputing tensor ptr
         *
         * Delete ptr from the set where ptr is located. Since DSU does not
         * support this operation, instead, we reset the DSU father of ptr, and
         * subtract the recomputation cost of ptr from the cost of the original
         * set.
         */
        void update_dsu_after_recompute(TensorInfo* ptr);

        /*!
         * \brief update DSU after evicting tensor ptr
         *
         * Check the neighbors of x, that is, the input and output tensors, and
         * if they are evicted, merge their respective sets.
         */
        void update_dsu_after_evict(TensorInfo* ptr);

        /*!
         * \brief pin the tensors in vec
         */
        void pin(const SmallVector<TensorInfo*>& vec);

        /*!
         * \brief unpin the tensors in vec
         */
261
        void unpin(const SmallVector<TensorInfo*>& vec, WorkerState& state);
262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285

        /*!
         * \brief add the tensor to the candidate set
         *
         * If the size of the tensor does not exceed the minimum threshold,
         * it will do nothing.
         */
        void insert_candidate(TensorInfo* ptr);

        /*!
         * \brief erase the tensor from the candidate set
         *
         * If the size of the tensor does not exceed the minimum threshold,
         * it will do nothing.
         */
        void erase_candidate(TensorInfo* ptr);

        //! estimate the current time, in order to reduce the overhead of timer
        double estimate_timestamp = 0;

        //! the comp node where dynamic sublinear memory optimization works
        CompNode comp_node;

        //! store all tensors that may be evicted
286
        SmallVector<TensorInfo*> candidates;
287

288
        bool is_bad_op(std::string op_name) {
M
Megvii Engine Team 已提交
289 290
            return std::find(op_blacklist.begin(), op_blacklist.end(), op_name) !=
                   op_blacklist.end();
291 292
        }

293 294
        // operators that cannot be re-computed, including :
        // distributed operators, inplace operator, random generator operators
M
Megvii Engine Team 已提交
295 296 297 298
        std::vector<std::string> op_blacklist = {
                "CollectiveComm", "InplaceAdd", "ParamPackSplit", "ParamPackConcat",
                "GaussianRNG",    "UniformRNG", "GammaRNG",       "PermutationRNG",
                "PoissonRNG",     "BetaRNG"};
299 300 301
    } m_dtr;

    //! automatically evict an optimal tensor
302 303
    bool auto_evict(size_t);

304
    void alloc_tensor_with_evict(OwnedBlob*);
305 306 307 308

    // assert thread id when call get_xxx_state to avoid misuse
    ChannelState& get_channel_state();
    WorkerState& get_worker_state();
309 310
};

M
Megvii Engine Team 已提交
311
}  // namespace mgb::imperative::interpreter::intl
312 313

// vim: syntax=cpp.doxygen foldmethod=marker foldmarker=f{{{,f}}}