TaskTable.cpp 6.6 KB
Newer Older
J
jinhai 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

S
starlord 已提交
18
#include "scheduler/TaskTable.h"
W
wxyu 已提交
19
#include "Utils.h"
S
starlord 已提交
20
#include "event/TaskTableUpdatedEvent.h"
W
wxyu 已提交
21

22
#include <ctime>
S
starlord 已提交
23 24
#include <sstream>
#include <vector>
W
wxyu 已提交
25 26

namespace milvus {
W
wxyu 已提交
27
namespace scheduler {
W
wxyu 已提交
28

29 30 31
std::string
ToString(TaskTableItemState state) {
    switch (state) {
S
starlord 已提交
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
        case TaskTableItemState::INVALID:
            return "INVALID";
        case TaskTableItemState::START:
            return "START";
        case TaskTableItemState::LOADING:
            return "LOADING";
        case TaskTableItemState::LOADED:
            return "LOADED";
        case TaskTableItemState::EXECUTING:
            return "EXECUTING";
        case TaskTableItemState::EXECUTED:
            return "EXECUTED";
        case TaskTableItemState::MOVING:
            return "MOVING";
        case TaskTableItemState::MOVED:
            return "MOVED";
        default:
            return "";
50 51 52 53
    }
}

std::string
S
starlord 已提交
54
ToString(const TaskTimestamp& timestamp) {
55 56 57 58 59 60 61 62
    std::stringstream ss;
    ss << "<start=" << timestamp.start;
    ss << ", load=" << timestamp.load;
    ss << ", loaded=" << timestamp.loaded;
    ss << ", execute=" << timestamp.execute;
    ss << ", executed=" << timestamp.executed;
    ss << ", move=" << timestamp.move;
    ss << ", moved=" << timestamp.moved;
63
    ss << ", finish=" << timestamp.finish;
64 65 66 67
    ss << ">";
    return ss.str();
}

68 69 70 71 72
bool
TaskTableItem::IsFinish() {
    return state == TaskTableItemState::MOVED || state == TaskTableItemState::EXECUTED;
}

73 74 75 76 77 78
bool
TaskTableItem::Load() {
    std::unique_lock<std::mutex> lock(mutex);
    if (state == TaskTableItemState::START) {
        state = TaskTableItemState::LOADING;
        lock.unlock();
W
wxyu 已提交
79
        timestamp.load = get_current_timestamp();
80 81 82 83
        return true;
    }
    return false;
}
S
starlord 已提交
84

85 86 87 88 89 90
bool
TaskTableItem::Loaded() {
    std::unique_lock<std::mutex> lock(mutex);
    if (state == TaskTableItemState::LOADING) {
        state = TaskTableItemState::LOADED;
        lock.unlock();
W
wxyu 已提交
91
        timestamp.loaded = get_current_timestamp();
92 93 94 95
        return true;
    }
    return false;
}
S
starlord 已提交
96

97 98 99 100 101 102
bool
TaskTableItem::Execute() {
    std::unique_lock<std::mutex> lock(mutex);
    if (state == TaskTableItemState::LOADED) {
        state = TaskTableItemState::EXECUTING;
        lock.unlock();
W
wxyu 已提交
103
        timestamp.execute = get_current_timestamp();
104 105 106 107
        return true;
    }
    return false;
}
S
starlord 已提交
108

109 110 111 112 113 114
bool
TaskTableItem::Executed() {
    std::unique_lock<std::mutex> lock(mutex);
    if (state == TaskTableItemState::EXECUTING) {
        state = TaskTableItemState::EXECUTED;
        lock.unlock();
W
wxyu 已提交
115 116
        timestamp.executed = get_current_timestamp();
        timestamp.finish = get_current_timestamp();
117 118 119 120
        return true;
    }
    return false;
}
S
starlord 已提交
121

122 123 124 125 126 127
bool
TaskTableItem::Move() {
    std::unique_lock<std::mutex> lock(mutex);
    if (state == TaskTableItemState::LOADED) {
        state = TaskTableItemState::MOVING;
        lock.unlock();
W
wxyu 已提交
128
        timestamp.move = get_current_timestamp();
129 130 131 132
        return true;
    }
    return false;
}
S
starlord 已提交
133

134 135 136 137 138 139
bool
TaskTableItem::Moved() {
    std::unique_lock<std::mutex> lock(mutex);
    if (state == TaskTableItemState::MOVING) {
        state = TaskTableItemState::MOVED;
        lock.unlock();
W
wxyu 已提交
140 141
        timestamp.moved = get_current_timestamp();
        timestamp.finish = get_current_timestamp();
142 143 144 145 146
        return true;
    }
    return false;
}

147 148 149 150 151 152 153 154 155 156
std::string
TaskTableItem::Dump() {
    std::stringstream ss;
    ss << "<id=" << id;
    ss << ", task=" << task;
    ss << ", state=" << ToString(state);
    ss << ", timestamp=" << ToString(timestamp);
    ss << ">";
    return ss.str();
}
W
wxyu 已提交
157

158 159 160 161
std::vector<uint64_t>
TaskTable::PickToLoad(uint64_t limit) {
    std::vector<uint64_t> indexes;
    bool cross = false;
W
wxyu 已提交
162
    for (uint64_t i = last_finish_ + 1, count = 0; i < table_.size() && count < limit; ++i) {
163 164 165 166 167 168 169 170 171 172 173 174 175 176 177
        if (not cross && table_[i]->IsFinish()) {
            last_finish_ = i;
        } else if (table_[i]->state == TaskTableItemState::START) {
            cross = true;
            indexes.push_back(i);
            ++count;
        }
    }
    return indexes;
}

std::vector<uint64_t>
TaskTable::PickToExecute(uint64_t limit) {
    std::vector<uint64_t> indexes;
    bool cross = false;
W
wxyu 已提交
178
    for (uint64_t i = last_finish_ + 1, count = 0; i < table_.size() && count < limit; ++i) {
179 180 181 182 183 184 185 186 187 188 189
        if (not cross && table_[i]->IsFinish()) {
            last_finish_ = i;
        } else if (table_[i]->state == TaskTableItemState::LOADED) {
            cross = true;
            indexes.push_back(i);
            ++count;
        }
    }
    return indexes;
}

W
wxyu 已提交
190 191
void
TaskTable::Put(TaskPtr task) {
192
    std::lock_guard<std::mutex> lock(id_mutex_);
W
wxyu 已提交
193
    auto item = std::make_shared<TaskTableItem>();
194
    item->id = id_++;
W
wxyu 已提交
195
    item->task = std::move(task);
W
wxyu 已提交
196
    item->state = TaskTableItemState::START;
W
wxyu 已提交
197
    item->timestamp.start = get_current_timestamp();
W
wxyu 已提交
198
    table_.push_back(item);
W
wxyu 已提交
199 200 201
    if (subscriber_) {
        subscriber_();
    }
W
wxyu 已提交
202 203 204
}

void
S
starlord 已提交
205
TaskTable::Put(std::vector<TaskPtr>& tasks) {
206
    std::lock_guard<std::mutex> lock(id_mutex_);
S
starlord 已提交
207
    for (auto& task : tasks) {
W
wxyu 已提交
208
        auto item = std::make_shared<TaskTableItem>();
209
        item->id = id_++;
W
wxyu 已提交
210
        item->task = std::move(task);
W
wxyu 已提交
211
        item->state = TaskTableItemState::START;
W
wxyu 已提交
212
        item->timestamp.start = get_current_timestamp();
W
wxyu 已提交
213 214
        table_.push_back(item);
    }
W
wxyu 已提交
215 216 217
    if (subscriber_) {
        subscriber_();
    }
W
wxyu 已提交
218 219
}

W
wxyu 已提交
220
TaskTableItemPtr
W
wxyu 已提交
221 222 223 224
TaskTable::Get(uint64_t index) {
    return table_[index];
}

S
starlord 已提交
225 226
// void
// TaskTable::Clear() {
W
wxyu 已提交
227 228 229 230 231 232 233
//// find first task is NOT (done or moved), erase from begin to it;
////        auto iterator = table_.begin();
////        while (iterator->state == TaskTableItemState::EXECUTED or
////            iterator->state == TaskTableItemState::MOVED)
////            iterator++;
////        table_.erase(table_.begin(), iterator);
//}
W
wxyu 已提交
234 235 236

std::string
TaskTable::Dump() {
237
    std::stringstream ss;
S
starlord 已提交
238
    for (auto& item : table_) {
239
        ss << item->Dump() << std::endl;
240 241
    }
    return ss.str();
W
wxyu 已提交
242 243
}

S
starlord 已提交
244 245
}  // namespace scheduler
}  // namespace milvus