Resource.h 3.4 KB
Newer Older
W
wxyu 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*******************************************************************************
 * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
 * Unauthorized copying of this file, via any medium is strictly prohibited.
 * Proprietary and confidential.
 ******************************************************************************/
#pragma once

#include <string>
#include <vector>
#include <memory>
#include <thread>
#include <functional>
#include <condition_variable>

W
wxyu 已提交
15 16 17 18
#include "../event/Event.h"
#include "../event/StartUpEvent.h"
#include "../event/CopyCompletedEvent.h"
#include "../event/FinishTaskEvent.h"
W
wxyu 已提交
19
#include "../event/TaskTableUpdatedEvent.h"
W
wxyu 已提交
20
#include "../TaskTable.h"
21
#include "../task/Task.h"
W
wxyu 已提交
22 23
#include "../Cost.h"
#include "Connection.h"
X
xj.lin 已提交
24 25
#include "Node.h"
#include "RegisterHandler.h"
W
wxyu 已提交
26 27 28 29 30 31 32 33 34 35 36 37


namespace zilliz {
namespace milvus {
namespace engine {

enum class ResourceType {
    DISK = 0,
    CPU = 1,
    GPU = 2
};

X
xj.lin 已提交
38 39 40 41 42 43 44
enum class RegisterType {
    START_UP,
    ON_FINISH_TASK,
    ON_COPY_COMPLETED,
    ON_TASK_TABLE_UPDATED,
};

W
wxyu 已提交
45
class Resource : public Node, public std::enable_shared_from_this<Resource> {
W
wxyu 已提交
46
public:
X
xj.lin 已提交
47 48 49
    /*
     * Event function MUST be a short function, never blocking;
     */
W
wxyu 已提交
50 51
    template<typename T>
    void Register_T(const RegisterType &type) {
X
xj.lin 已提交
52
        register_table_.emplace(type, [] { return std::make_shared<T>(); });
W
wxyu 已提交
53 54
    }

X
xj.lin 已提交
55
    RegisterHandlerPtr
W
wxyu 已提交
56 57 58 59 60 61 62 63 64 65 66
    GetRegisterFunc(const RegisterType &type);

    inline void
    RegisterSubscriber(std::function<void(EventPtr)> subscriber) {
        subscriber_ = std::move(subscriber);
    }

    inline ResourceType
    Type() const {
        return type_;
    }
X
xj.lin 已提交
67

W
wxyu 已提交
68
    void
X
xj.lin 已提交
69 70 71 72
    Start();

    void
    Stop();
W
wxyu 已提交
73 74

    TaskTable &
X
xj.lin 已提交
75
    task_table();
W
wxyu 已提交
76 77 78

public:
    /*
W
wxyu 已提交
79
     * wake up loader;
W
wxyu 已提交
80 81
     */
    void
W
wxyu 已提交
82
    WakeupLoader();
W
wxyu 已提交
83

W
wxyu 已提交
84 85
    /*
     * wake up executor;
W
wxyu 已提交
86 87
     */
    void
W
wxyu 已提交
88
    WakeupExecutor();
W
wxyu 已提交
89 90

protected:
X
xj.lin 已提交
91
    Resource(std::string name, ResourceType type);
W
wxyu 已提交
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117

    // TODO: SearchContextPtr to TaskPtr
    /*
     * Implementation by inherit class;
     * Blocking function;
     */
    virtual void
    LoadFile(TaskPtr task) = 0;

    /*
     * Implementation by inherit class;
     * Blocking function;
     */
    virtual void
    Process(TaskPtr task) = 0;

private:
    /*
     * These function should move to cost.h ???
     * COST.H ???
     */

    /*
     * Pick one task to load;
     * Order by start time;
     */
118
    TaskTableItemPtr
X
xj.lin 已提交
119
    pick_task_load();
W
wxyu 已提交
120 121 122 123 124

    /*
     * Pick one task to execute;
     * Pick by start time and priority;
     */
125
    TaskTableItemPtr
X
xj.lin 已提交
126
    pick_task_execute();
W
wxyu 已提交
127 128 129 130 131 132

private:
    /*
     * Only called by load thread;
     */
    void
X
xj.lin 已提交
133
    loader_function();
W
wxyu 已提交
134 135 136 137 138

    /*
     * Only called by worker thread;
     */
    void
X
xj.lin 已提交
139
    executor_function();
W
wxyu 已提交
140 141 142 143 144 145 146

private:
    std::string name_;
    ResourceType type_;

    TaskTable task_table_;

X
xj.lin 已提交
147
    std::map<RegisterType, std::function<RegisterHandlerPtr()>> register_table_;
W
wxyu 已提交
148
    std::function<void(EventPtr)> subscriber_ = nullptr;
W
wxyu 已提交
149 150

    bool running_;
W
wxyu 已提交
151 152
    bool loader_running_ = false;
    bool executor_running_ = false;
W
wxyu 已提交
153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
    std::thread loader_thread_;
    std::thread executor_thread_;

    bool load_flag_;
    bool exec_flag_;
    std::mutex load_mutex_;
    std::mutex exec_mutex_;
    std::condition_variable load_cv_;
    std::condition_variable exec_cv_;
};

using ResourcePtr = std::shared_ptr<Resource>;
using ResourceWPtr = std::weak_ptr<Resource>;

}
}
}