Resource.cpp 4.4 KB
Newer Older
X
xj.lin 已提交
1 2 3 4 5
/*******************************************************************************
 * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
 * Unauthorized copying of this file, via any medium is strictly prohibited.
 * Proprietary and confidential.
 ******************************************************************************/
6
#include <iostream>
Y
Yu Kun 已提交
7
#include "../Utils.h"
X
xj.lin 已提交
8 9 10 11 12 13 14
#include "Resource.h"


namespace zilliz {
namespace milvus {
namespace engine {

W
wxyu 已提交
15 16
std::ostream &
operator<<(std::ostream &out, const Resource &resource) {
17 18 19 20
    out << resource.Dump();
    return out;
}

21 22
Resource::Resource(std::string name,
                   ResourceType type,
23
                   uint64_t device_id,
24 25
                   bool enable_loader,
                   bool enable_executor)
X
xj.lin 已提交
26 27
    : name_(std::move(name)),
      type_(type),
28
      device_id_(device_id),
29
      enable_loader_(enable_loader),
W
wxyu 已提交
30 31
      enable_executor_(enable_executor) {
    // register subscriber in tasktable
W
wxyu 已提交
32 33 34 35 36 37
    task_table_.RegisterSubscriber([&] {
        if (subscriber_) {
            auto event = std::make_shared<TaskTableUpdatedEvent>(shared_from_this());
            subscriber_(std::static_pointer_cast<Event>(event));
        }
    });
X
xj.lin 已提交
38 39
}

W
wxyu 已提交
40 41
void
Resource::Start() {
W
wxyu 已提交
42
    running_ = true;
43 44 45 46 47 48
    if (enable_loader_) {
        loader_thread_ = std::thread(&Resource::loader_function, this);
    }
    if (enable_executor_) {
        executor_thread_ = std::thread(&Resource::executor_function, this);
    }
X
xj.lin 已提交
49 50
}

W
wxyu 已提交
51 52
void
Resource::Stop() {
X
xj.lin 已提交
53
    running_ = false;
54 55 56 57 58 59 60 61
    if (enable_loader_) {
        WakeupLoader();
        loader_thread_.join();
    }
    if (enable_executor_) {
        WakeupExecutor();
        executor_thread_.join();
    }
X
xj.lin 已提交
62 63
}

W
wxyu 已提交
64 65
void
Resource::WakeupLoader() {
66 67 68 69
    {
        std::lock_guard<std::mutex> lock(load_mutex_);
        load_flag_ = true;
    }
X
xj.lin 已提交
70 71 72
    load_cv_.notify_one();
}

W
wxyu 已提交
73 74
void
Resource::WakeupExecutor() {
75 76 77 78
    {
        std::lock_guard<std::mutex> lock(exec_mutex_);
        exec_flag_ = true;
    }
W
wxyu 已提交
79 80 81
    exec_cv_.notify_one();
}

W
wxyu 已提交
82 83 84 85 86 87 88 89 90
uint64_t
Resource::NumOfTaskToExec() {
    uint64_t count = 0;
    for (auto &task : task_table_) {
        if (task->state == TaskTableItemState::LOADED) ++count;
    }
    return count;
}

91
TaskTableItemPtr Resource::pick_task_load() {
92
    auto indexes = task_table_.PickToLoad(10);
X
xj.lin 已提交
93 94 95
    for (auto index : indexes) {
        // try to set one task loading, then return
        if (task_table_.Load(index))
96
            return task_table_.Get(index);
X
xj.lin 已提交
97 98 99 100 101
        // else try next
    }
    return nullptr;
}

102
TaskTableItemPtr Resource::pick_task_execute() {
103
    auto indexes = task_table_.PickToExecute(3);
X
xj.lin 已提交
104 105 106
    for (auto index : indexes) {
        // try to set one task executing, then return
        if (task_table_.Execute(index))
107
            return task_table_.Get(index);
X
xj.lin 已提交
108 109 110 111 112 113 114 115 116
        // else try next
    }
    return nullptr;
}

void Resource::loader_function() {
    while (running_) {
        std::unique_lock<std::mutex> lock(load_mutex_);
        load_cv_.wait(lock, [&] { return load_flag_; });
W
wxyu 已提交
117
        load_flag_ = false;
118
        lock.unlock();
119 120 121 122 123
        while (true) {
            auto task_item = pick_task_load();
            if (task_item == nullptr) {
                break;
            }
124
            LoadFile(task_item->task);
125
            task_item->Loaded();
W
wxyu 已提交
126
            if (subscriber_) {
127
                auto event = std::make_shared<LoadCompletedEvent>(shared_from_this(), task_item);
128
                subscriber_(std::static_pointer_cast<Event>(event));
W
wxyu 已提交
129
            }
X
xj.lin 已提交
130
        }
131

X
xj.lin 已提交
132 133 134 135
    }
}

void Resource::executor_function() {
W
wxyu 已提交
136
    if (subscriber_) {
137 138
        auto event = std::make_shared<StartUpEvent>(shared_from_this());
        subscriber_(std::static_pointer_cast<Event>(event));
W
wxyu 已提交
139
    }
X
xj.lin 已提交
140 141 142
    while (running_) {
        std::unique_lock<std::mutex> lock(exec_mutex_);
        exec_cv_.wait(lock, [&] { return exec_flag_; });
W
wxyu 已提交
143
        exec_flag_ = false;
144
        lock.unlock();
145 146 147 148 149
        while (true) {
            auto task_item = pick_task_execute();
            if (task_item == nullptr) {
                break;
            }
Y
Yu Kun 已提交
150

151
            auto start = get_current_timestamp();
152
            Process(task_item->task);
153
            auto finish = get_current_timestamp();
Y
Yu Kun 已提交
154 155 156
            ++total_task_;
            total_cost_ += finish - start;

157
            task_item->Executed();
W
wxyu 已提交
158
            if (subscriber_) {
159 160
                auto event = std::make_shared<FinishTaskEvent>(shared_from_this(), task_item);
                subscriber_(std::static_pointer_cast<Event>(event));
W
wxyu 已提交
161
            }
X
xj.lin 已提交
162
        }
163

X
xj.lin 已提交
164 165 166 167 168 169
    }
}

}
}
}