TaskTable.cpp 8.7 KB
Newer Older
1
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
J
jinhai 已提交
2
//
3 4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
J
jinhai 已提交
5
//
6 7 8 9 10
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
J
jinhai 已提交
11

S
starlord 已提交
12
#include "scheduler/TaskTable.h"
W
wxyu 已提交
13
#include "Utils.h"
S
starlord 已提交
14
#include "event/TaskTableUpdatedEvent.h"
Y
Yu Kun 已提交
15
#include "scheduler/SchedInst.h"
16
#include "scheduler/task/FinishedTask.h"
17
#include "utils/Log.h"
W
wxyu 已提交
18
#include "utils/TimeRecorder.h"
W
wxyu 已提交
19

W
Wang Xiangyu 已提交
20
#include <src/scheduler/task/SearchTask.h>
21
#include <ctime>
S
starlord 已提交
22 23
#include <sstream>
#include <vector>
W
wxyu 已提交
24 25

namespace milvus {
W
wxyu 已提交
26
namespace scheduler {
W
wxyu 已提交
27

28 29 30
std::string
ToString(TaskTableItemState state) {
    switch (state) {
S
starlord 已提交
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
        case TaskTableItemState::INVALID:
            return "INVALID";
        case TaskTableItemState::START:
            return "START";
        case TaskTableItemState::LOADING:
            return "LOADING";
        case TaskTableItemState::LOADED:
            return "LOADED";
        case TaskTableItemState::EXECUTING:
            return "EXECUTING";
        case TaskTableItemState::EXECUTED:
            return "EXECUTED";
        case TaskTableItemState::MOVING:
            return "MOVING";
        case TaskTableItemState::MOVED:
            return "MOVED";
        default:
            return "";
49 50 51
    }
}

W
wxyu 已提交
52
json
W
wxyu 已提交
53
TaskTimestamp::Dump() const {
W
wxyu 已提交
54 55 56 57 58
    json ret{
        {"start", start},       {"load", load}, {"loaded", loaded}, {"execute", execute},
        {"executed", executed}, {"move", move}, {"moved", moved},   {"finish", finish},
    };
    return ret;
59 60
}

61 62 63 64 65
bool
TaskTableItem::IsFinish() {
    return state == TaskTableItemState::MOVED || state == TaskTableItemState::EXECUTED;
}

66 67 68 69 70 71
bool
TaskTableItem::Load() {
    std::unique_lock<std::mutex> lock(mutex);
    if (state == TaskTableItemState::START) {
        state = TaskTableItemState::LOADING;
        lock.unlock();
W
wxyu 已提交
72
        timestamp.load = get_current_timestamp();
73 74 75 76
        return true;
    }
    return false;
}
S
starlord 已提交
77

78 79 80 81 82 83
bool
TaskTableItem::Loaded() {
    std::unique_lock<std::mutex> lock(mutex);
    if (state == TaskTableItemState::LOADING) {
        state = TaskTableItemState::LOADED;
        lock.unlock();
W
wxyu 已提交
84
        timestamp.loaded = get_current_timestamp();
85 86 87 88
        return true;
    }
    return false;
}
S
starlord 已提交
89

90 91 92 93 94 95
bool
TaskTableItem::Execute() {
    std::unique_lock<std::mutex> lock(mutex);
    if (state == TaskTableItemState::LOADED) {
        state = TaskTableItemState::EXECUTING;
        lock.unlock();
W
wxyu 已提交
96
        timestamp.execute = get_current_timestamp();
97 98 99 100
        return true;
    }
    return false;
}
S
starlord 已提交
101

102 103 104 105 106 107
bool
TaskTableItem::Executed() {
    std::unique_lock<std::mutex> lock(mutex);
    if (state == TaskTableItemState::EXECUTING) {
        state = TaskTableItemState::EXECUTED;
        lock.unlock();
W
wxyu 已提交
108 109
        timestamp.executed = get_current_timestamp();
        timestamp.finish = get_current_timestamp();
110 111 112 113
        return true;
    }
    return false;
}
S
starlord 已提交
114

115 116 117 118 119 120
bool
TaskTableItem::Move() {
    std::unique_lock<std::mutex> lock(mutex);
    if (state == TaskTableItemState::LOADED) {
        state = TaskTableItemState::MOVING;
        lock.unlock();
W
wxyu 已提交
121
        timestamp.move = get_current_timestamp();
122 123 124 125
        return true;
    }
    return false;
}
S
starlord 已提交
126

127 128 129 130 131 132
bool
TaskTableItem::Moved() {
    std::unique_lock<std::mutex> lock(mutex);
    if (state == TaskTableItemState::MOVING) {
        state = TaskTableItemState::MOVED;
        lock.unlock();
W
wxyu 已提交
133 134
        timestamp.moved = get_current_timestamp();
        timestamp.finish = get_current_timestamp();
135 136 137 138 139
        return true;
    }
    return false;
}

W
wxyu 已提交
140
json
W
wxyu 已提交
141
TaskTableItem::Dump() const {
W
wxyu 已提交
142 143
    json ret{
        {"id", id},
C
Cai Yudong 已提交
144
        {"task", reinterpret_cast<int64_t>(task.get())},
W
wxyu 已提交
145 146 147 148
        {"state", ToString(state)},
        {"timestamp", timestamp.Dump()},
    };
    return ret;
149
}
W
wxyu 已提交
150

151 152 153 154 155
void
TaskTableItem::SetFinished(const TaskPtr& t) {
    task = t;
}

156 157
std::vector<uint64_t>
TaskTable::PickToLoad(uint64_t limit) {
W
wxyu 已提交
158
#if 1
159
    // TimeRecorder rc("");
W
wxyu 已提交
160 161 162
    std::vector<uint64_t> indexes;
    bool cross = false;

163 164
    uint64_t available_begin = table_.front() + 1;
    for (uint64_t i = 0, loaded_count = 0, pick_count = 0; i < table_.size() && pick_count < limit; ++i) {
W
wxyu 已提交
165
        auto index = available_begin + i;
C
Cai Yudong 已提交
166
        if (table_[index] == nullptr) {
W
wxyu 已提交
167
            break;
C
Cai Yudong 已提交
168 169
        }
        if (index % table_.capacity() == table_.rear()) {
W
wxyu 已提交
170
            break;
C
Cai Yudong 已提交
171
        }
172 173 174
        if (not cross && table_[index]->IsFinish()) {
            table_.set_front(index);
        } else if (table_[index]->state == TaskTableItemState::LOADED) {
W
wxyu 已提交
175 176
            cross = true;
            ++loaded_count;
C
Cai Yudong 已提交
177
            if (loaded_count > 2) {
W
wxyu 已提交
178
                return std::vector<uint64_t>();
C
Cai Yudong 已提交
179
            }
180 181
        } else if (table_[index]->state == TaskTableItemState::START) {
            auto task = table_[index]->task;
W
wxyu 已提交
182 183 184

            // if task is a build index task, limit it
            if (task->Type() == TaskType::BuildIndexTask && task->path().Current() == "cpu") {
185
                if (BuildMgrInst::GetInstance()->NumOfAvailable() < 1) {
186
                    LOG_SERVER_WARNING_ << "BuildMgr doesnot have available place for building index";
W
wxyu 已提交
187 188 189 190 191 192 193 194
                    continue;
                }
            }
            cross = true;
            indexes.push_back(index);
            ++pick_count;
        }
    }
195
    // rc.ElapseFromBegin("PickToLoad ");
W
wxyu 已提交
196 197
    return indexes;
#else
198
    size_t count = 0;
199 200
    for (uint64_t j = last_finish_ + 1; j < table_.size(); ++j) {
        if (not table_[j]) {
201
            LOG_SERVER_WARNING_ << "collection[" << j << "] is nullptr";
202
        }
203

204 205
        if (table_[j]->task->path().Current() == "cpu") {
            if (table_[j]->task->Type() == TaskType::BuildIndexTask && BuildMgrInst::GetInstance()->numoftasks() < 1) {
206 207 208 209
                return std::vector<uint64_t>();
            }
        }

210
        if (table_[j]->state == TaskTableItemState::LOADED) {
211
            ++count;
212 213
            if (count > 2)
                return std::vector<uint64_t>();
214 215 216
        }
    }

217 218
    std::vector<uint64_t> indexes;
    bool cross = false;
219 220
    for (uint64_t i = last_finish_ + 1, count = 0; i < table_.size() && count < limit; ++i) {
        if (not cross && table_[i]->IsFinish()) {
221
            last_finish_ = i;
222 223
        } else if (table_[i]->state == TaskTableItemState::START) {
            auto task = table_[i]->task;
224 225 226 227 228 229 230 231 232 233 234 235 236 237
            if (task->Type() == TaskType::BuildIndexTask && task->path().Current() == "cpu") {
                if (BuildMgrInst::GetInstance()->numoftasks() == 0) {
                    break;
                } else {
                    cross = true;
                    indexes.push_back(i);
                    ++count;
                    BuildMgrInst::GetInstance()->take();
                }
            } else {
                cross = true;
                indexes.push_back(i);
                ++count;
            }
238 239 240
        }
    }
    return indexes;
W
wxyu 已提交
241
#endif
242 243 244 245
}

std::vector<uint64_t>
TaskTable::PickToExecute(uint64_t limit) {
246
    // TimeRecorder rc("");
247 248
    std::vector<uint64_t> indexes;
    bool cross = false;
249 250
    uint64_t available_begin = table_.front() + 1;
    for (uint64_t i = 0, pick_count = 0; i < table_.size() && pick_count < limit; ++i) {
W
wxyu 已提交
251
        uint64_t index = available_begin + i;
252
        if (not table_[index]) {
W
wxyu 已提交
253 254
            break;
        }
255
        if (index % table_.capacity() == table_.rear()) {
W
wxyu 已提交
256 257 258
            break;
        }

259 260 261
        if (not cross && table_[index]->IsFinish()) {
            table_.set_front(index);
        } else if (table_[index]->state == TaskTableItemState::LOADED) {
262
            cross = true;
W
wxyu 已提交
263 264
            indexes.push_back(index);
            ++pick_count;
265 266
        }
    }
267
    // rc.ElapseFromBegin("PickToExecute ");
268 269 270
    return indexes;
}

W
wxyu 已提交
271
void
272 273
TaskTable::Put(TaskPtr task, TaskTableItemPtr from) {
    auto item = std::make_shared<TaskTableItem>(std::move(from));
274
    item->id = id_++;
W
wxyu 已提交
275
    item->task = std::move(task);
W
wxyu 已提交
276
    item->state = TaskTableItemState::START;
W
wxyu 已提交
277
    item->timestamp.start = get_current_timestamp();
278
    table_.put(std::move(item));
W
wxyu 已提交
279 280 281
    if (subscriber_) {
        subscriber_();
    }
W
wxyu 已提交
282 283
}

W
wxyu 已提交
284 285 286
size_t
TaskTable::TaskToExecute() {
    size_t count = 0;
287 288
    auto begin = table_.front() + 1;
    for (size_t i = 0; i < table_.size(); ++i) {
W
wxyu 已提交
289
        auto index = begin + i;
290
        if (table_[index] && table_[index]->state == TaskTableItemState::LOADED) {
W
wxyu 已提交
291 292 293 294 295
            ++count;
        }
    }
    return count;
}
W
wxyu 已提交
296

W
wxyu 已提交
297
json
W
wxyu 已提交
298
TaskTable::Dump() const {
W
wxyu 已提交
299
    json ret{{"error.message", "not support yet."}};
W
wxyu 已提交
300
    return ret;
W
wxyu 已提交
301 302
}

S
starlord 已提交
303 304
}  // namespace scheduler
}  // namespace milvus