interpreter_impl.h 14.1 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
/**
 * \file imperative/src/impl/interpreter/interpreter_impl.h
 * MegEngine is Licensed under the Apache License, Version 2.0 (the "License")
 *
 * Copyright (c) 2014-2021 Megvii Inc. All rights reserved.
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 */

#pragma once

#include <deque>
#include <future>
#include <list>
#include <thread>
#include <unordered_set>
#include <variant>

21
#include "megbrain/comp_node.h"
22 23 24 25 26 27 28
#include "megbrain/utils/mempool.h"
#include "megbrain/imperative/interpreter.h"
#include "megbrain/imperative/profiler.h"

#include "./commands.h"
#include "./tensor_info.h"
#include "./option_manager.h"
29 30

#include "../profiler/events.h"
31 32 33 34 35 36 37 38 39 40 41 42 43 44

namespace mgb::imperative::interpreter::intl {

using Handle = Interpreter::Handle;

struct InterpreterImpl : Interpreter {
    std::unique_ptr<Channel> create_channel() override;
};

struct ChannelImpl : Interpreter::Channel {
    ChannelImpl();
    ~ChannelImpl() override;

    Handle put(const HostTensorND& value, bool no_cache) override;
45
    Handle put(const DeviceTensorND& value, const HostTensorND& hvalue) override;
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62

    void del(Handle) override;
    void swap_in(Handle) override;
    void swap_out(Handle) override;
    void drop(Handle) override;

    SmallVector<Handle> apply_op(
            std::shared_ptr<OpDef> op,
            const SmallVector<Handle>& inputs) override;

    HostTensorND get_value(Handle) override;
    TensorShape get_shape(Handle) override;
    DType get_dtype(Handle) override;
    CompNode get_device(Handle) override;

    DeviceTensorND get_dev_tensor(Handle) override;

63
    bool check_available() override;
64 65 66
    void sync() override;
    void close() override;

67 68
    size_t get_option(std::string name) override;
    void set_option(std::string name, size_t value) override;
69

70 71
    void start_profile() override;
    void stop_profile() override;
72 73 74 75

    void push_scope(std::string) override;
    void pop_scope(std::string) override;
private:
76 77 78
    struct WorkQueue;
    struct State;

79
    TensorInfo* alloc();
80
    void init(TensorInfo*, LogicalTensorDesc desc);
81
    void free(TensorInfo*);
82 83 84
    void real_free(TensorInfo*);
    void recursive_free(TensorInfo*);
    void do_drop(TensorInfo*, bool);
85 86
    void detach_users(TensorInfo*);

87
    TensorInfo* put_impl(const HostTensorND& value, bool no_cache);
88
    TensorInfo* put_impl(const DeviceTensorND& value, const HostTensorND& hvalue);
89 90
    void del_impl(Handle);
    void sync_impl();
91 92 93
    SmallVector<Handle> apply_op_impl(
            std::shared_ptr<OpDef> op,
            const SmallVector<Handle>& inputs);
94 95 96
    TensorPtr wait_tensor(TensorInfo* info, profiler::TensorProp prop);
    void notify_tensor_unsafe(TensorInfo* info);

97 98 99 100
    void process_one_task(IdentifiedCommand&);

    void check_worker_exc_unsafe();

101
    void produce_tensor(TensorInfo* dest, TensorPtr ptr);
102 103 104 105 106

    void release_tensor(TensorInfo* dest);

    void regenerate(TensorInfo* dest);
    void recompute(TensorInfo::ComputePath* path);
107
    void do_apply_op(const ApplyOp& cmd);
108 109 110 111 112
    
    std::tuple<SmallVector<MemoryDesc>, SmallVector<TensorPtr>, SmallVector<TensorPtr>> init_output_and_workspace(
        const OpDef& def,
        SmallVector<TensorPtr> inputs,
        SmallVector<MemoryDesc> inputs_mem_desc);
113 114 115 116 117 118 119 120 121 122 123 124

    void dispatch_default_cpu(
        std::shared_ptr<OpDef> op,
        const SmallVector<TensorInfo*>& input_infos,
        const SmallVector<LogicalTensorDesc>& input_descs,
        SmallVector<Handle>* outputs);
    void dispatch_kernel(
        std::shared_ptr<OpDef> op,
        const SmallVector<TensorInfo*>& input_infos,
        const SmallVector<LogicalTensorDesc>& input_descs,
        SmallVector<Handle>* outputs);

125 126 127
    void push_scope(std::string, State&);
    void pop_scope(std::string, State&);

128 129 130 131
    void assert_in_channel();
    void assert_in_worker();
    std::thread::id get_worker_tid();

132 133 134 135 136
    template <typename TCommand>
    void enqueue_command(TCommand&& cmd) {
        m_buffer.enqueue(Command{std::forward<TCommand>(cmd)});
    }

137 138 139 140 141
    void sample_on_device(CompNode device, bool force);

    // valid => status != Deleted
    std::unordered_set<TensorInfo*> collect_valid_tensors();

142
    std::mutex m_mutex;
143
    Spinlock m_spin;
144 145 146 147
    std::condition_variable m_cv;
    MemPool<TensorInfo> m_pool;
    std::unordered_set<Handle> m_valid_handle;
    TensorInfo* m_waitee = nullptr;
148
    uint64_t m_waitee_id = 0;
149
    std::exception_ptr m_worker_exc;
150
    std::function<void(std::string, std::string)> m_profile_dump_callback;
151
    size_t m_storage_id = 0;
152

153 154
    bool m_closed = false;

155 156 157 158
    struct WorkQueue : AsyncQueueSC<IdentifiedCommand, WorkQueue> {
        // set max_spin=0 to prevent Queue fetch task in busy wait manner.
        // this won't affect throughput when python interpreter is sending enough task,
        // but will significantly save CPU time when waiting for task, e.g. wait for data input
159
        // limit pending tasks to 1000000
160
        WorkQueue(ChannelImpl* owner)
161
                : AsyncQueueSC<IdentifiedCommand, WorkQueue>(0, 1000000), m_owner(owner) {
162 163 164 165 166
            sys::set_thread_name("interpreter");
        }
        void process_one_task(IdentifiedCommand& icmd) {
            m_owner->process_one_task(icmd);
        }
167
        void on_async_queue_worker_thread_start() override;
168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216
    private:
        ChannelImpl* m_owner;
    } m_worker;

    /**
     * Buf a command window for following fuse
     * example:
     *     ---------------------------------------------------------------------
     *     | ..., Apply{in: (i0, i1), out: (o0, o1)}, ... + Del{i0} + Del{i1}  |
     *     ---------------------------------------------------------------------
     *     | ..., Apply{in: (i0, i1), out: (o0, o1), del: (i0)}, ... + Del{i1} |
     *     ---------------------------------------------------------------------
     *     | ..., Apply{in: (i0, i1), out: (o0, o1), del: (i0, i1)}, ...       |
     *     ---------------------------------------------------------------------
     *     Then the fused Apply may be invoked inplace. see: ChannelImpl::process_one_task
     */
    struct CommandBuffer {
        CommandBuffer(ChannelImpl* owner) : m_owner(owner) {}
        void enqueue(Command cmd);
        bool empty() const {
            return m_commands.empty();
        }
        void flush();
    private:
        ChannelImpl* m_owner;
        std::deque<Command> m_commands;

        using Handle = decltype(m_commands)::iterator;
        // [begin, end)
        using Range = std::array<Handle, 2>;

        // Launch commands in range [m_commands.begin(), pos)
        void flush(Handle pos);
        // Select flush position for incoming cmd
        Handle flush_pos_for(const Command& cmd);
        // Fuse del command into suitable ApplyOp
        bool fuse_del(const Del& cmd);
        // Returns the last handle that dest is used within range. If dest is not used, returns range[1]
        Handle find_last_usage(TensorInfo* dest, Range range);
        // Returns the produce position of dest. If not found, returns range[1]
        Handle find_produce(TensorInfo* dest, Range range);
    } m_buffer;

    //! config whether raise error exactly when invoking op.
    //! level 2: both device and user side errors are async;
    //! level 1: user side errors are sync;
    //! level 0: both sync.
    int m_async_level = 2;

217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241
    struct Scope {
        std::string name;
        std::unordered_map<std::string, std::unique_ptr<Scope>> children;
        size_t version = 0;
        size_t parent_version = 0;
        size_t tensor_count = 0;
        Scope* active_child = nullptr;
        Scope* parent = nullptr;

        Scope* enter(std::string name) {
            auto& child = children[name];
            if (!child) {
                child = std::make_unique<Scope>();
                child->name = name;
                child->parent = this;
            }
            if (version != child->parent_version) {
                child->version = 0;
                child->parent_version = version;
            } else {
                child->version++;
            }
            child->tensor_count = 0;
            return active_child = child.get();
        }
242

243 244 245 246
        Scope* exit(std::string name) {
            mgb_assert(this->name == name, "scope name mismatch");
            parent->active_child = nullptr;
            return parent;
247 248 249
        }
    };

250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293
    class ScopeManager {
    private:
        Scope m_root;
        Scope* m_current_scope = &m_root;
    public:
        class ScopeGuard{
        private:
            ScopeManager* m_manager;
            std::string m_name;
        public:
            ScopeGuard(ScopeManager* manager, std::string name): m_manager{manager}, m_name{name} {
                m_manager->push(m_name);
            }
            ~ScopeGuard() {
                m_manager->pop(m_name);
            }
        };
        void push(std::string name) {
            m_current_scope = m_current_scope->enter(name);
        }
        void pop(std::string name) {
            m_current_scope = m_current_scope->exit(name);
        }
        std::string next_tensor_name() {
            std::string builder;
            Scope* scope = &m_root;
            while (true) {
                builder.append(scope->name);
                if (scope->version != 0) {
                    builder.append(ssprintf("(%ld)", scope->version));
                }
                if (scope != &m_root) {
                    builder.append(".");
                }
                if (scope->active_child == nullptr) {
                    builder.append(ssprintf(":%%%ld", scope->tensor_count++));
                    break;
                } else {
                    scope = scope->active_child;
                }
            }
            return builder;
        }
    };
294

295
    struct State {
296
        std::thread::id tid;
297
        OptionManager options;
298 299
    };

300 301 302 303 304 305
    struct ChannelState: State {
        ScopeManager scopes;
    };

    struct WorkerState: State {};

306 307
    ChannelState m_channel_state;
    WorkerState m_worker_state;
308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326

    /*!
     * \brief A framework of dynamic sublienar memory optimization
     *
     * Note: The main idea is that during the training process, if the memory
     * usage exceeds the threshold, select some tensors to evict until the
     * memory usage is below the threshold.
     */
    struct DynamicSublinear {
        /*!
         * \brief find an available tensor with the largest evaluation function
         *
         * Note: An available tensor must satisfy: (1) has computing path,
         * (2) is in memory, (3) is not pinned. Evaluation function refers to:
         * @see: TensorInfo::eval_func.
         *
         * \return the pointer of the best tensor; nullptr is returned if no 
         * available tensor is found
         */
327
        TensorInfo* find_best_tensor(bool);
328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406

        /*!
         * \brief estimate the cost of recomputing tensor ptr
         *
         * Note: We define the cost as the sum of the costs of each evicted
         * components where all the neighbors of ptr are located.
         */
        double estimate_neighbor_cost(TensorInfo* ptr);

        /*!
         * \brief update the last used time of the tensor ptr
         */
        void update_used_time(TensorInfo* ptr);

        /*!
         * \brief merge the two specified sets (the set in which the element x
         * is located, and the set in which the element y is located)
         */
        void merge(std::shared_ptr<DsuNode> &x, std::shared_ptr<DsuNode> &y);

        /*!
         * \brief return the representative of the set that contains the
         * element x
         */
        std::shared_ptr<DsuNode> find_father(std::shared_ptr<DsuNode> &x);

        /*!
         * \brief update DSU after recomputing tensor ptr
         *
         * Delete ptr from the set where ptr is located. Since DSU does not
         * support this operation, instead, we reset the DSU father of ptr, and
         * subtract the recomputation cost of ptr from the cost of the original
         * set.
         */
        void update_dsu_after_recompute(TensorInfo* ptr);

        /*!
         * \brief update DSU after evicting tensor ptr
         *
         * Check the neighbors of x, that is, the input and output tensors, and
         * if they are evicted, merge their respective sets.
         */
        void update_dsu_after_evict(TensorInfo* ptr);

        /*!
         * \brief pin the tensors in vec
         */
        void pin(const SmallVector<TensorInfo*>& vec);

        /*!
         * \brief unpin the tensors in vec
         */
        void unpin(const SmallVector<TensorInfo*>& vec);

        /*!
         * \brief add the tensor to the candidate set
         *
         * If the size of the tensor does not exceed the minimum threshold,
         * it will do nothing.
         */
        void insert_candidate(TensorInfo* ptr);

        /*!
         * \brief erase the tensor from the candidate set
         *
         * If the size of the tensor does not exceed the minimum threshold,
         * it will do nothing.
         */
        void erase_candidate(TensorInfo* ptr);

        //! estimate the current time, in order to reduce the overhead of timer
        double estimate_timestamp = 0;

        //! the comp node where dynamic sublinear memory optimization works
        CompNode comp_node;

        //! store all tensors that may be evicted
        std::unordered_set<TensorInfo*> candidates;

407 408 409 410 411
        bool is_bad_op(std::string op_name) {
            return std::find(op_blacklist.begin(), op_blacklist.end(), op_name) != op_blacklist.end();
        }

        std::vector<std::string> op_blacklist = {"CollectiveComm", "InplaceAdd",
412 413
                                "ParamPackSplit", "ParamPackConcat", "GaussianRNG", "UniformRNG",
                                "GammaRNG", "PermutationRNG", "PoissonRNG", "BetaRNG"};
414 415 416
    } m_dtr;

    //! automatically evict an optimal tensor
417 418
    bool auto_evict(size_t);

419
    void alloc_tensor_with_evict(Blob*);
420 421 422 423

    // assert thread id when call get_xxx_state to avoid misuse
    ChannelState& get_channel_state();
    WorkerState& get_worker_state();
424 425 426
};

} // namespace mgb::imperative::interpreter::intl