TaskTable.cpp 6.0 KB
Newer Older
W
wxyu 已提交
1 2 3 4 5 6 7
/*******************************************************************************
 * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
 * Unauthorized copying of this file, via any medium is strictly prohibited.
 * Proprietary and confidential.
 ******************************************************************************/

#include "TaskTable.h"
W
wxyu 已提交
8
#include "event/TaskTableUpdatedEvent.h"
W
wxyu 已提交
9 10
#include "Utils.h"

W
wxyu 已提交
11
#include <vector>
12
#include <sstream>
13
#include <ctime>
W
wxyu 已提交
14 15 16 17 18 19


namespace zilliz {
namespace milvus {
namespace engine {

20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
std::string
ToString(TaskTableItemState state) {
    switch (state) {
        case TaskTableItemState::INVALID: return "INVALID";
        case TaskTableItemState::START: return "START";
        case TaskTableItemState::LOADING: return "LOADING";
        case TaskTableItemState::LOADED: return "LOADED";
        case TaskTableItemState::EXECUTING: return "EXECUTING";
        case TaskTableItemState::EXECUTED: return "EXECUTED";
        case TaskTableItemState::MOVING: return "MOVING";
        case TaskTableItemState::MOVED: return "MOVED";
        default: return "";
    }
}

std::string
ToString(const TaskTimestamp &timestamp) {
    std::stringstream ss;
    ss << "<start=" << timestamp.start;
    ss << ", load=" << timestamp.load;
    ss << ", loaded=" << timestamp.loaded;
    ss << ", execute=" << timestamp.execute;
    ss << ", executed=" << timestamp.executed;
    ss << ", move=" << timestamp.move;
    ss << ", moved=" << timestamp.moved;
45
    ss << ", finish=" << timestamp.finish;
46 47 48 49
    ss << ">";
    return ss.str();
}

50 51 52 53 54
bool
TaskTableItem::IsFinish() {
    return state == TaskTableItemState::MOVED || state == TaskTableItemState::EXECUTED;
}

55 56 57 58 59 60
bool
TaskTableItem::Load() {
    std::unique_lock<std::mutex> lock(mutex);
    if (state == TaskTableItemState::START) {
        state = TaskTableItemState::LOADING;
        lock.unlock();
W
wxyu 已提交
61
        timestamp.load = get_current_timestamp();
62 63 64 65 66 67 68 69 70 71
        return true;
    }
    return false;
}
bool
TaskTableItem::Loaded() {
    std::unique_lock<std::mutex> lock(mutex);
    if (state == TaskTableItemState::LOADING) {
        state = TaskTableItemState::LOADED;
        lock.unlock();
W
wxyu 已提交
72
        timestamp.loaded = get_current_timestamp();
73 74 75 76 77 78 79 80 81 82
        return true;
    }
    return false;
}
bool
TaskTableItem::Execute() {
    std::unique_lock<std::mutex> lock(mutex);
    if (state == TaskTableItemState::LOADED) {
        state = TaskTableItemState::EXECUTING;
        lock.unlock();
W
wxyu 已提交
83
        timestamp.execute = get_current_timestamp();
84 85 86 87 88 89 90 91 92 93
        return true;
    }
    return false;
}
bool
TaskTableItem::Executed() {
    std::unique_lock<std::mutex> lock(mutex);
    if (state == TaskTableItemState::EXECUTING) {
        state = TaskTableItemState::EXECUTED;
        lock.unlock();
W
wxyu 已提交
94 95
        timestamp.executed = get_current_timestamp();
        timestamp.finish = get_current_timestamp();
96 97 98 99 100 101 102 103 104 105
        return true;
    }
    return false;
}
bool
TaskTableItem::Move() {
    std::unique_lock<std::mutex> lock(mutex);
    if (state == TaskTableItemState::LOADED) {
        state = TaskTableItemState::MOVING;
        lock.unlock();
W
wxyu 已提交
106
        timestamp.move = get_current_timestamp();
107 108 109 110 111 112 113 114 115 116
        return true;
    }
    return false;
}
bool
TaskTableItem::Moved() {
    std::unique_lock<std::mutex> lock(mutex);
    if (state == TaskTableItemState::MOVING) {
        state = TaskTableItemState::MOVED;
        lock.unlock();
W
wxyu 已提交
117 118
        timestamp.moved = get_current_timestamp();
        timestamp.finish = get_current_timestamp();
119 120 121 122 123
        return true;
    }
    return false;
}

124 125 126 127 128 129 130 131 132 133
std::string
TaskTableItem::Dump() {
    std::stringstream ss;
    ss << "<id=" << id;
    ss << ", task=" << task;
    ss << ", state=" << ToString(state);
    ss << ", timestamp=" << ToString(timestamp);
    ss << ">";
    return ss.str();
}
W
wxyu 已提交
134

135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166
std::vector<uint64_t>
TaskTable::PickToLoad(uint64_t limit) {
    std::vector<uint64_t> indexes;
    bool cross = false;
    for (uint64_t i = last_finish_, count = 0; i < table_.size() && count < limit; ++i) {
        if (not cross && table_[i]->IsFinish()) {
            last_finish_ = i;
        } else if (table_[i]->state == TaskTableItemState::START) {
            cross = true;
            indexes.push_back(i);
            ++count;
        }
    }
    return indexes;
}

std::vector<uint64_t>
TaskTable::PickToExecute(uint64_t limit) {
    std::vector<uint64_t> indexes;
    bool cross = false;
    for (uint64_t i = last_finish_, count = 0; i < table_.size() && count < limit; ++i) {
        if (not cross && table_[i]->IsFinish()) {
            last_finish_ = i;
        } else if (table_[i]->state == TaskTableItemState::LOADED) {
            cross = true;
            indexes.push_back(i);
            ++count;
        }
    }
    return indexes;
}

W
wxyu 已提交
167 168
void
TaskTable::Put(TaskPtr task) {
169
    std::lock_guard<std::mutex> lock(id_mutex_);
W
wxyu 已提交
170
    auto item = std::make_shared<TaskTableItem>();
171
    item->id = id_++;
W
wxyu 已提交
172
    item->task = std::move(task);
W
wxyu 已提交
173
    item->state = TaskTableItemState::START;
W
wxyu 已提交
174
    item->timestamp.start = get_current_timestamp();
W
wxyu 已提交
175
    table_.push_back(item);
W
wxyu 已提交
176 177 178
    if (subscriber_) {
        subscriber_();
    }
W
wxyu 已提交
179 180 181 182
}

void
TaskTable::Put(std::vector<TaskPtr> &tasks) {
183
    std::lock_guard<std::mutex> lock(id_mutex_);
W
wxyu 已提交
184 185
    for (auto &task : tasks) {
        auto item = std::make_shared<TaskTableItem>();
186
        item->id = id_++;
W
wxyu 已提交
187
        item->task = std::move(task);
W
wxyu 已提交
188
        item->state = TaskTableItemState::START;
W
wxyu 已提交
189
        item->timestamp.start = get_current_timestamp();
W
wxyu 已提交
190 191
        table_.push_back(item);
    }
W
wxyu 已提交
192 193 194
    if (subscriber_) {
        subscriber_();
    }
W
wxyu 已提交
195 196 197
}


W
wxyu 已提交
198
TaskTableItemPtr
W
wxyu 已提交
199 200 201 202 203 204 205 206 207 208 209 210 211 212
TaskTable::Get(uint64_t index) {
    return table_[index];
}

void
TaskTable::Clear() {
// find first task is NOT (done or moved), erase from begin to it;
//        auto iterator = table_.begin();
//        while (iterator->state == TaskTableItemState::EXECUTED or
//            iterator->state == TaskTableItemState::MOVED)
//            iterator++;
//        table_.erase(table_.begin(), iterator);
}

213

W
wxyu 已提交
214 215
std::string
TaskTable::Dump() {
216 217
    std::stringstream ss;
    for (auto &item : table_) {
218
        ss << item->Dump() << std::endl;
219 220
    }
    return ss.str();
W
wxyu 已提交
221 222 223 224 225
}

}
}
}