Resource.h 4.6 KB
Newer Older
W
wxyu 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*******************************************************************************
 * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
 * Unauthorized copying of this file, via any medium is strictly prohibited.
 * Proprietary and confidential.
 ******************************************************************************/
#pragma once

#include <string>
#include <vector>
#include <memory>
#include <thread>
#include <functional>
#include <condition_variable>

W
wxyu 已提交
15 16
#include "../event/Event.h"
#include "../event/StartUpEvent.h"
17
#include "../event/LoadCompletedEvent.h"
W
wxyu 已提交
18
#include "../event/FinishTaskEvent.h"
W
wxyu 已提交
19
#include "../event/TaskTableUpdatedEvent.h"
W
wxyu 已提交
20
#include "../TaskTable.h"
21
#include "../task/Task.h"
W
wxyu 已提交
22 23
#include "../Cost.h"
#include "Connection.h"
X
xj.lin 已提交
24 25
#include "Node.h"
#include "RegisterHandler.h"
W
wxyu 已提交
26 27 28 29 30 31


namespace zilliz {
namespace milvus {
namespace engine {

W
wxyu 已提交
32
// TODO(wxyu): Storage, Route, Executor
W
wxyu 已提交
33 34 35 36 37 38
enum class ResourceType {
    DISK = 0,
    CPU = 1,
    GPU = 2
};

X
xj.lin 已提交
39 40 41 42 43 44 45
enum class RegisterType {
    START_UP,
    ON_FINISH_TASK,
    ON_COPY_COMPLETED,
    ON_TASK_TABLE_UPDATED,
};

W
wxyu 已提交
46
class Resource : public Node, public std::enable_shared_from_this<Resource> {
Y
Yu Kun 已提交
47
 public:
X
xj.lin 已提交
48
    /*
W
wxyu 已提交
49
     * Start loader and executor if enable;
X
xj.lin 已提交
50
     */
W
wxyu 已提交
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
    void
    Start();

    /*
     * Stop loader and executor, join it, blocking util thread exited;
     */
    void
    Stop();

    /*
     * wake up loader;
     */
    void
    WakeupLoader();

    /*
     * wake up executor;
     */
    void
    WakeupExecutor();

Y
Yu Kun 已提交
72
 public:
W
wxyu 已提交
73 74
    template<typename T>
    void Register_T(const RegisterType &type) {
X
xj.lin 已提交
75
        register_table_.emplace(type, [] { return std::make_shared<T>(); });
W
wxyu 已提交
76 77
    }

X
xj.lin 已提交
78
    RegisterHandlerPtr
W
wxyu 已提交
79 80 81 82 83 84 85
    GetRegisterFunc(const RegisterType &type);

    inline void
    RegisterSubscriber(std::function<void(EventPtr)> subscriber) {
        subscriber_ = std::move(subscriber);
    }

86 87 88 89 90
    inline std::string
    Name() const {
        return name_;
    }

W
wxyu 已提交
91 92 93 94
    inline ResourceType
    Type() const {
        return type_;
    }
X
xj.lin 已提交
95

96 97 98 99 100
    inline uint64_t
    DeviceId() {
        return device_id_;
    }

W
wxyu 已提交
101 102 103 104 105
    // TODO: better name?
    inline bool
    HasLoader() {
        return enable_loader_;
    }
X
xj.lin 已提交
106

W
wxyu 已提交
107 108 109 110 111
    // TODO: better name?
    inline bool
    HasExecutor() {
        return enable_executor_;
    }
W
wxyu 已提交
112

Y
Yu Kun 已提交
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128
    // TODO: const
    uint64_t
    NumOfTaskToExec() {
        uint64_t count = 0;
        for (auto &task : task_table_) {
            if (task->state == TaskTableItemState::LOADED) ++count;
        }
        return count;
    }

    // TODO: need double ?
    inline uint64_t
    TaskAvgCost() const {
        return total_cost_ / total_task_;
    }

Y
Yu Kun 已提交
129 130 131 132 133
    inline uint64_t
    TotalTasks() const {
        return total_task_;
    }

W
wxyu 已提交
134
    TaskTable &
X
xj.lin 已提交
135
    task_table();
W
wxyu 已提交
136

137 138 139 140 141 142 143
    inline virtual std::string
    Dump() const {
        return "<Resource>";
    }

    friend std::ostream &operator<<(std::ostream &out, const Resource &resource);

Y
Yu Kun 已提交
144
 protected:
145 146
    Resource(std::string name,
             ResourceType type,
147
             uint64_t device_id,
W
wxyu 已提交
148 149
             bool enable_loader,
             bool enable_executor);
W
wxyu 已提交
150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165

    // TODO: SearchContextPtr to TaskPtr
    /*
     * Implementation by inherit class;
     * Blocking function;
     */
    virtual void
    LoadFile(TaskPtr task) = 0;

    /*
     * Implementation by inherit class;
     * Blocking function;
     */
    virtual void
    Process(TaskPtr task) = 0;

Y
Yu Kun 已提交
166
 private:
W
wxyu 已提交
167 168 169 170 171 172 173 174 175
    /*
     * These function should move to cost.h ???
     * COST.H ???
     */

    /*
     * Pick one task to load;
     * Order by start time;
     */
176
    TaskTableItemPtr
X
xj.lin 已提交
177
    pick_task_load();
W
wxyu 已提交
178 179 180 181 182

    /*
     * Pick one task to execute;
     * Pick by start time and priority;
     */
183
    TaskTableItemPtr
X
xj.lin 已提交
184
    pick_task_execute();
W
wxyu 已提交
185

Y
Yu Kun 已提交
186
 private:
W
wxyu 已提交
187 188 189 190
    /*
     * Only called by load thread;
     */
    void
X
xj.lin 已提交
191
    loader_function();
W
wxyu 已提交
192 193 194 195 196

    /*
     * Only called by worker thread;
     */
    void
X
xj.lin 已提交
197
    executor_function();
W
wxyu 已提交
198

Y
Yu Kun 已提交
199
 protected:
200
    uint64_t device_id_;
W
wxyu 已提交
201
    std::string name_;
Y
Yu Kun 已提交
202
 private:
W
wxyu 已提交
203 204 205 206
    ResourceType type_;

    TaskTable task_table_;

Y
fix bug  
Yu Kun 已提交
207 208
    uint64_t total_cost_ = 0;
    uint64_t total_task_ = 0;
Y
Yu Kun 已提交
209

X
xj.lin 已提交
210
    std::map<RegisterType, std::function<RegisterHandlerPtr()>> register_table_;
W
wxyu 已提交
211
    std::function<void(EventPtr)> subscriber_ = nullptr;
W
wxyu 已提交
212 213

    bool running_;
214 215
    bool enable_loader_ = true;
    bool enable_executor_ = true;
W
wxyu 已提交
216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233
    std::thread loader_thread_;
    std::thread executor_thread_;

    bool load_flag_;
    bool exec_flag_;
    std::mutex load_mutex_;
    std::mutex exec_mutex_;
    std::condition_variable load_cv_;
    std::condition_variable exec_cv_;
};

using ResourcePtr = std::shared_ptr<Resource>;
using ResourceWPtr = std::weak_ptr<Resource>;

}
}
}