TaskTable.h 5.0 KB
Newer Older
J
jinhai 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

W
wxyu 已提交
18 19 20
#pragma once

#include <deque>
S
starlord 已提交
21
#include <functional>
S
starlord 已提交
22
#include <memory>
S
starlord 已提交
23
#include <mutex>
S
starlord 已提交
24
#include <string>
S
starlord 已提交
25 26
#include <utility>
#include <vector>
W
wxyu 已提交
27

W
wxyu 已提交
28
#include "CircleQueue.h"
W
wxyu 已提交
29
#include "event/Event.h"
W
wxyu 已提交
30
#include "interface/interfaces.h"
S
starlord 已提交
31
#include "task/SearchTask.h"
W
wxyu 已提交
32 33

namespace milvus {
W
wxyu 已提交
34
namespace scheduler {
W
wxyu 已提交
35 36 37

enum class TaskTableItemState {
    INVALID,
S
starlord 已提交
38 39 40 41 42 43 44
    START,      // idle
    LOADING,    // loading data from other resource
    LOADED,     // ready to exec or move
    EXECUTING,  // executing, locking util executed or failed
    EXECUTED,   // executed, termination state
    MOVING,     // moving to another resource, locking util executed or failed
    MOVED,      // moved, termination state
W
wxyu 已提交
45 46
};

W
wxyu 已提交
47
struct TaskTimestamp : public interface::dumpable {
48 49 50 51 52 53 54
    uint64_t start = 0;
    uint64_t move = 0;
    uint64_t moved = 0;
    uint64_t load = 0;
    uint64_t loaded = 0;
    uint64_t execute = 0;
    uint64_t executed = 0;
55
    uint64_t finish = 0;
W
wxyu 已提交
56 57

    json
W
wxyu 已提交
58
    Dump() const override;
59 60
};

W
wxyu 已提交
61
struct TaskTableItem : public interface::dumpable {
S
starlord 已提交
62 63
    TaskTableItem() : id(0), task(nullptr), state(TaskTableItemState::INVALID), mutex() {
    }
W
wxyu 已提交
64

S
starlord 已提交
65 66
    TaskTableItem(const TaskTableItem& src) = delete;
    TaskTableItem(TaskTableItem&&) = delete;
W
wxyu 已提交
67

S
starlord 已提交
68 69 70
    uint64_t id;               // auto increment from 0;
    TaskPtr task;              // the task;
    TaskTableItemState state;  // the state;
W
wxyu 已提交
71
    std::mutex mutex;
72
    TaskTimestamp timestamp;
W
wxyu 已提交
73

74 75 76
    bool
    IsFinish();

77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
    bool
    Load();

    bool
    Loaded();

    bool
    Execute();

    bool
    Executed();

    bool
    Move();

    bool
    Moved();
94

W
wxyu 已提交
95
    json
W
wxyu 已提交
96
    Dump() const override;
W
wxyu 已提交
97 98
};

W
wxyu 已提交
99 100
using TaskTableItemPtr = std::shared_ptr<TaskTableItem>;

W
wxyu 已提交
101
class TaskTable : public interface::dumpable {
S
starlord 已提交
102
 public:
W
wxyu 已提交
103 104
    TaskTable() : table_(1ULL << 16ULL) {
    }
W
wxyu 已提交
105

S
starlord 已提交
106 107
    TaskTable(const TaskTable&) = delete;
    TaskTable(TaskTable&&) = delete;
108

109 110 111 112 113
 public:
    json
    Dump() const override;

 public:
W
wxyu 已提交
114 115 116 117 118
    inline void
    RegisterSubscriber(std::function<void(void)> subscriber) {
        subscriber_ = std::move(subscriber);
    }

W
wxyu 已提交
119 120 121 122
    /*
     * Put one task;
     */
    void
W
wxyu 已提交
123
    Put(TaskPtr task);
W
wxyu 已提交
124 125 126 127 128 129

    /*
     * Put tasks back of task table;
     * Called by DBImpl;
     */
    void
S
starlord 已提交
130
    Put(std::vector<TaskPtr>& tasks);
W
wxyu 已提交
131

W
wxyu 已提交
132 133 134
    size_t
    TaskToExecute();

135 136 137 138 139 140
    std::vector<uint64_t>
    PickToLoad(uint64_t limit);

    std::vector<uint64_t>
    PickToExecute(uint64_t limit);

141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
 public:
    inline const TaskTableItemPtr& operator[](uint64_t index) {
        return table_[index];
    }

    inline const TaskTableItemPtr&
    at(uint64_t index) {
        return table_[index];
    }

    inline size_t
    capacity() {
        return table_.capacity();
    }

    inline size_t
    size() {
        return table_.size();
    }

S
starlord 已提交
161
 public:
W
wxyu 已提交
162 163
    /******** Action ********/

W
wxyu 已提交
164
    // TODO(wxyu): bool to Status
W
wxyu 已提交
165 166 167 168 169
    /*
     * Load a task;
     * Set state loading;
     * Called by loader;
     */
170 171 172 173
    inline bool
    Load(uint64_t index) {
        return table_[index]->Load();
    }
W
wxyu 已提交
174 175 176 177 178 179

    /*
     * Load task finished;
     * Set state loaded;
     * Called by loader;
     */
180 181 182 183
    inline bool
    Loaded(uint64_t index) {
        return table_[index]->Loaded();
    }
W
wxyu 已提交
184 185 186 187 188 189

    /*
     * Execute a task;
     * Set state executing;
     * Called by executor;
     */
190 191 192 193
    inline bool
    Execute(uint64_t index) {
        return table_[index]->Execute();
    }
W
wxyu 已提交
194 195 196 197 198 199

    /*
     * Execute task finished;
     * Set state executed;
     * Called by executor;
     */
200
    inline bool
201
    Executed(uint64_t index) {
202 203 204 205 206 207 208 209 210 211
        return table_[index]->Executed();
    }

    /*
     * Move a task;
     * Set state moving;
     * Called by scheduler;
     */

    inline bool
212
    Move(uint64_t index) {
213 214 215 216 217 218 219 220 221
        return table_[index]->Move();
    }

    /*
     * Move task finished;
     * Set state moved;
     * Called by scheduler;
     */
    inline bool
222
    Moved(uint64_t index) {
223 224
        return table_[index]->Moved();
    }
W
wxyu 已提交
225

S
starlord 已提交
226
 private:
227
    std::uint64_t id_ = 0;
W
wxyu 已提交
228
    CircleQueue<TaskTableItemPtr> table_;
W
wxyu 已提交
229
    std::function<void(void)> subscriber_ = nullptr;
230 231

    // cache last finish avoid Pick task from begin always
W
wxyu 已提交
232 233 234
    // pick from (last_finish_ + 1)
    // init with -1, pick from (last_finish_ + 1) = 0
    uint64_t last_finish_ = -1;
W
wxyu 已提交
235 236
};

S
starlord 已提交
237 238
}  // namespace scheduler
}  // namespace milvus