TaskTable.h 4.5 KB
Newer Older
W
wxyu 已提交
1 2 3 4 5 6 7 8 9 10 11
/*******************************************************************************
 * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
 * Unauthorized copying of this file, via any medium is strictly prohibited.
 * Proprietary and confidential.
 ******************************************************************************/
#pragma once

#include <vector>
#include <deque>
#include <mutex>

12
#include "task/SearchTask.h"
W
wxyu 已提交
13
#include "event/Event.h"
W
wxyu 已提交
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30


namespace zilliz {
namespace milvus {
namespace engine {

enum class TaskTableItemState {
    INVALID,
    START, // idle
    LOADING, // loading data from other resource
    LOADED, // ready to exec or move
    EXECUTING, // executing, locking util executed or failed
    EXECUTED, // executed, termination state
    MOVING, // moving to another resource, locking util executed or failed
    MOVED, // moved, termination state
};

31 32 33 34 35 36 37 38
struct TaskTimestamp {
    uint64_t start = 0;
    uint64_t move = 0;
    uint64_t moved = 0;
    uint64_t load = 0;
    uint64_t loaded = 0;
    uint64_t execute = 0;
    uint64_t executed = 0;
39
    uint64_t finish = 0;
40 41
};

W
wxyu 已提交
42
struct TaskTableItem {
W
wxyu 已提交
43
    TaskTableItem() : id(0), state(TaskTableItemState::INVALID), mutex() {}
W
wxyu 已提交
44 45

    TaskTableItem(const TaskTableItem &src)
W
wxyu 已提交
46
        : id(src.id), state(src.state), mutex() {}
W
wxyu 已提交
47 48 49 50 51

    uint64_t id; // auto increment from 0;
    TaskPtr task; // the task;
    TaskTableItemState state; // the state;
    std::mutex mutex;
52
    TaskTimestamp timestamp;
W
wxyu 已提交
53

54 55 56
    bool
    IsFinish();

57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
    bool
    Load();

    bool
    Loaded();

    bool
    Execute();

    bool
    Executed();

    bool
    Move();

    bool
    Moved();
74 75 76

    std::string
    Dump();
W
wxyu 已提交
77 78
};

W
wxyu 已提交
79 80
using TaskTableItemPtr = std::shared_ptr<TaskTableItem>;

W
wxyu 已提交
81 82 83 84
class TaskTable {
public:
    TaskTable() = default;

85 86 87
    TaskTable(const TaskTable &) = delete;
    TaskTable(TaskTable &&) = delete;

W
wxyu 已提交
88 89 90 91 92
    inline void
    RegisterSubscriber(std::function<void(void)> subscriber) {
        subscriber_ = std::move(subscriber);
    }

W
wxyu 已提交
93 94 95 96
    /*
     * Put one task;
     */
    void
W
wxyu 已提交
97
    Put(TaskPtr task);
W
wxyu 已提交
98 99 100 101 102 103

    /*
     * Put tasks back of task table;
     * Called by DBImpl;
     */
    void
W
wxyu 已提交
104
    Put(std::vector<TaskPtr> &tasks);
W
wxyu 已提交
105 106 107 108

    /*
     * Return task table item reference;
     */
W
wxyu 已提交
109
    TaskTableItemPtr
W
wxyu 已提交
110
    Get(uint64_t index);
W
wxyu 已提交
111 112

    /*
W
wxyu 已提交
113
     * TODO(wxyu): BIG GC
W
wxyu 已提交
114 115 116 117
     * Remove sequence task which is DONE or MOVED from front;
     * Called by ?
     */
    void
W
wxyu 已提交
118
    Clear();
W
wxyu 已提交
119

W
wxyu 已提交
120 121 122 123 124 125 126
    /*
     * Return true if task table empty, otherwise false;
     */
    inline bool
    Empty() {
        return table_.empty();
    }
W
wxyu 已提交
127

W
wxyu 已提交
128 129 130
    /*
     * Return size of task table;
     */
W
wxyu 已提交
131
    inline size_t
W
wxyu 已提交
132 133 134
    Size() {
        return table_.size();
    }
W
wxyu 已提交
135

W
wxyu 已提交
136 137 138 139 140 141 142 143
public:
    TaskTableItemPtr &
    operator[](uint64_t index) {
        return table_[index];
    }

    std::deque<TaskTableItemPtr>::iterator begin() { return table_.begin(); }
    std::deque<TaskTableItemPtr>::iterator end() { return table_.end(); }
W
wxyu 已提交
144

145 146 147 148 149 150 151
public:
    std::vector<uint64_t>
    PickToLoad(uint64_t limit);

    std::vector<uint64_t>
    PickToExecute(uint64_t limit);

W
wxyu 已提交
152 153 154 155 156 157 158 159 160 161
public:

    /******** Action ********/

    // TODO: bool to Status
    /*
     * Load a task;
     * Set state loading;
     * Called by loader;
     */
162 163 164 165
    inline bool
    Load(uint64_t index) {
        return table_[index]->Load();
    }
W
wxyu 已提交
166 167 168 169 170 171

    /*
     * Load task finished;
     * Set state loaded;
     * Called by loader;
     */
172 173 174 175
    inline bool
    Loaded(uint64_t index) {
        return table_[index]->Loaded();
    }
W
wxyu 已提交
176 177 178 179 180 181

    /*
     * Execute a task;
     * Set state executing;
     * Called by executor;
     */
182 183 184 185
    inline bool
    Execute(uint64_t index) {
        return table_[index]->Execute();
    }
W
wxyu 已提交
186 187 188 189 190 191

    /*
     * Execute task finished;
     * Set state executed;
     * Called by executor;
     */
192
    inline bool
193
    Executed(uint64_t index) {
194 195 196 197 198 199 200 201 202 203
        return table_[index]->Executed();
    }

    /*
     * Move a task;
     * Set state moving;
     * Called by scheduler;
     */

    inline bool
204
    Move(uint64_t index) {
205 206 207 208 209 210 211 212 213
        return table_[index]->Move();
    }

    /*
     * Move task finished;
     * Set state moved;
     * Called by scheduler;
     */
    inline bool
214
    Moved(uint64_t index) {
215 216
        return table_[index]->Moved();
    }
W
wxyu 已提交
217 218 219 220 221 222 223 224 225

public:
    /*
     * Dump;
     */
    std::string
    Dump();

private:
226 227
    std::uint64_t id_ = 0;
    mutable std::mutex id_mutex_;
W
wxyu 已提交
228
    std::deque<TaskTableItemPtr> table_;
W
wxyu 已提交
229
    std::function<void(void)> subscriber_ = nullptr;
230 231 232

    // cache last finish avoid Pick task from begin always
    uint64_t last_finish_ = 0;
W
wxyu 已提交
233 234 235 236 237 238
};


}
}
}