TaskTable.h 5.3 KB
Newer Older
J
jinhai 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

W
wxyu 已提交
18 19 20
#pragma once

#include <deque>
S
starlord 已提交
21
#include <functional>
S
starlord 已提交
22
#include <memory>
S
starlord 已提交
23
#include <mutex>
S
starlord 已提交
24
#include <string>
S
starlord 已提交
25 26
#include <utility>
#include <vector>
W
wxyu 已提交
27

W
wxyu 已提交
28
#include "event/Event.h"
S
starlord 已提交
29
#include "task/SearchTask.h"
W
wxyu 已提交
30 31

namespace milvus {
W
wxyu 已提交
32
namespace scheduler {
W
wxyu 已提交
33 34 35

enum class TaskTableItemState {
    INVALID,
S
starlord 已提交
36 37 38 39 40 41 42
    START,      // idle
    LOADING,    // loading data from other resource
    LOADED,     // ready to exec or move
    EXECUTING,  // executing, locking util executed or failed
    EXECUTED,   // executed, termination state
    MOVING,     // moving to another resource, locking util executed or failed
    MOVED,      // moved, termination state
W
wxyu 已提交
43 44
};

45 46 47 48 49 50 51 52
struct TaskTimestamp {
    uint64_t start = 0;
    uint64_t move = 0;
    uint64_t moved = 0;
    uint64_t load = 0;
    uint64_t loaded = 0;
    uint64_t execute = 0;
    uint64_t executed = 0;
53
    uint64_t finish = 0;
54 55
};

W
wxyu 已提交
56
struct TaskTableItem {
S
starlord 已提交
57 58
    TaskTableItem() : id(0), task(nullptr), state(TaskTableItemState::INVALID), mutex() {
    }
W
wxyu 已提交
59

S
starlord 已提交
60 61
    TaskTableItem(const TaskTableItem& src) = delete;
    TaskTableItem(TaskTableItem&&) = delete;
W
wxyu 已提交
62

S
starlord 已提交
63 64 65
    uint64_t id;               // auto increment from 0;
    TaskPtr task;              // the task;
    TaskTableItemState state;  // the state;
W
wxyu 已提交
66
    std::mutex mutex;
67
    TaskTimestamp timestamp;
W
wxyu 已提交
68

69 70 71
    bool
    IsFinish();

72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
    bool
    Load();

    bool
    Loaded();

    bool
    Execute();

    bool
    Executed();

    bool
    Move();

    bool
    Moved();
89 90 91

    std::string
    Dump();
W
wxyu 已提交
92 93
};

W
wxyu 已提交
94 95
using TaskTableItemPtr = std::shared_ptr<TaskTableItem>;

W
wxyu 已提交
96
class TaskTable {
S
starlord 已提交
97
 public:
W
wxyu 已提交
98 99
    TaskTable() = default;

S
starlord 已提交
100 101
    TaskTable(const TaskTable&) = delete;
    TaskTable(TaskTable&&) = delete;
102

W
wxyu 已提交
103 104 105 106 107
    inline void
    RegisterSubscriber(std::function<void(void)> subscriber) {
        subscriber_ = std::move(subscriber);
    }

W
wxyu 已提交
108 109 110 111
    /*
     * Put one task;
     */
    void
W
wxyu 已提交
112
    Put(TaskPtr task);
W
wxyu 已提交
113 114 115 116 117 118

    /*
     * Put tasks back of task table;
     * Called by DBImpl;
     */
    void
S
starlord 已提交
119
    Put(std::vector<TaskPtr>& tasks);
W
wxyu 已提交
120 121 122 123

    /*
     * Return task table item reference;
     */
W
wxyu 已提交
124
    TaskTableItemPtr
W
wxyu 已提交
125
    Get(uint64_t index);
W
wxyu 已提交
126 127

    /*
W
wxyu 已提交
128
     * TODO(wxyu): BIG GC
W
wxyu 已提交
129 130 131
     * Remove sequence task which is DONE or MOVED from front;
     * Called by ?
     */
S
starlord 已提交
132 133
    //    void
    //    Clear();
W
wxyu 已提交
134

W
wxyu 已提交
135 136 137 138 139 140 141
    /*
     * Return true if task table empty, otherwise false;
     */
    inline bool
    Empty() {
        return table_.empty();
    }
W
wxyu 已提交
142

W
wxyu 已提交
143 144 145
    /*
     * Return size of task table;
     */
W
wxyu 已提交
146
    inline size_t
W
wxyu 已提交
147 148 149
    Size() {
        return table_.size();
    }
W
wxyu 已提交
150

S
starlord 已提交
151
 public:
152 153 154 155 156
    TaskTableItemPtr& operator[](uint64_t index) {
        std::lock_guard<std::mutex> lock(mutex_);
        return table_[index];
    }

S
starlord 已提交
157 158
    std::deque<TaskTableItemPtr>::iterator
    begin() {
S
starlord 已提交
159 160
        return table_.begin();
    }
W
wxyu 已提交
161

S
starlord 已提交
162 163
    std::deque<TaskTableItemPtr>::iterator
    end() {
S
starlord 已提交
164 165 166 167
        return table_.end();
    }

 public:
168 169 170 171 172 173
    std::vector<uint64_t>
    PickToLoad(uint64_t limit);

    std::vector<uint64_t>
    PickToExecute(uint64_t limit);

S
starlord 已提交
174
 public:
W
wxyu 已提交
175 176
    /******** Action ********/

W
wxyu 已提交
177
    // TODO(wxyu): bool to Status
W
wxyu 已提交
178 179 180 181 182
    /*
     * Load a task;
     * Set state loading;
     * Called by loader;
     */
183 184 185 186
    inline bool
    Load(uint64_t index) {
        return table_[index]->Load();
    }
W
wxyu 已提交
187 188 189 190 191 192

    /*
     * Load task finished;
     * Set state loaded;
     * Called by loader;
     */
193 194 195 196
    inline bool
    Loaded(uint64_t index) {
        return table_[index]->Loaded();
    }
W
wxyu 已提交
197 198 199 200 201 202

    /*
     * Execute a task;
     * Set state executing;
     * Called by executor;
     */
203 204 205 206
    inline bool
    Execute(uint64_t index) {
        return table_[index]->Execute();
    }
W
wxyu 已提交
207 208 209 210 211 212

    /*
     * Execute task finished;
     * Set state executed;
     * Called by executor;
     */
213
    inline bool
214
    Executed(uint64_t index) {
215 216 217 218 219 220 221 222 223 224
        return table_[index]->Executed();
    }

    /*
     * Move a task;
     * Set state moving;
     * Called by scheduler;
     */

    inline bool
225
    Move(uint64_t index) {
226 227 228 229 230 231 232 233 234
        return table_[index]->Move();
    }

    /*
     * Move task finished;
     * Set state moved;
     * Called by scheduler;
     */
    inline bool
235
    Moved(uint64_t index) {
236 237
        return table_[index]->Moved();
    }
W
wxyu 已提交
238

S
starlord 已提交
239
 public:
W
wxyu 已提交
240 241 242 243 244 245
    /*
     * Dump;
     */
    std::string
    Dump();

S
starlord 已提交
246
 private:
247
    std::uint64_t id_ = 0;
248
    mutable std::mutex mutex_;
W
wxyu 已提交
249
    std::deque<TaskTableItemPtr> table_;
W
wxyu 已提交
250
    std::function<void(void)> subscriber_ = nullptr;
251 252

    // cache last finish avoid Pick task from begin always
W
wxyu 已提交
253 254 255
    // pick from (last_finish_ + 1)
    // init with -1, pick from (last_finish_ + 1) = 0
    uint64_t last_finish_ = -1;
W
wxyu 已提交
256 257
};

S
starlord 已提交
258 259
}  // namespace scheduler
}  // namespace milvus