Resource.cpp 4.3 KB
Newer Older
X
xj.lin 已提交
1 2 3 4 5
/*******************************************************************************
 * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
 * Unauthorized copying of this file, via any medium is strictly prohibited.
 * Proprietary and confidential.
 ******************************************************************************/
6
#include <iostream>
X
xj.lin 已提交
7 8 9 10 11 12 13
#include "Resource.h"


namespace zilliz {
namespace milvus {
namespace engine {

14 15 16 17 18
std::ostream &operator<<(std::ostream &out, const Resource &resource) {
    out << resource.Dump();
    return out;
}

19 20
Resource::Resource(std::string name,
                   ResourceType type,
21
                   uint64_t device_id,
22 23
                   bool enable_loader,
                   bool enable_executor)
X
xj.lin 已提交
24 25
    : name_(std::move(name)),
      type_(type),
26
      device_id_(device_id),
X
xj.lin 已提交
27
      running_(false),
28 29
      enable_loader_(enable_loader),
      enable_executor_(enable_executor),
X
xj.lin 已提交
30 31
      load_flag_(false),
      exec_flag_(false) {
W
wxyu 已提交
32 33 34 35 36 37
    task_table_.RegisterSubscriber([&] {
        if (subscriber_) {
            auto event = std::make_shared<TaskTableUpdatedEvent>(shared_from_this());
            subscriber_(std::static_pointer_cast<Event>(event));
        }
    });
X
xj.lin 已提交
38 39 40
}

void Resource::Start() {
W
wxyu 已提交
41
    running_ = true;
42 43 44 45 46 47
    if (enable_loader_) {
        loader_thread_ = std::thread(&Resource::loader_function, this);
    }
    if (enable_executor_) {
        executor_thread_ = std::thread(&Resource::executor_function, this);
    }
X
xj.lin 已提交
48 49 50 51
}

void Resource::Stop() {
    running_ = false;
52 53 54 55 56 57 58 59
    if (enable_loader_) {
        WakeupLoader();
        loader_thread_.join();
    }
    if (enable_executor_) {
        WakeupExecutor();
        executor_thread_.join();
    }
X
xj.lin 已提交
60 61 62 63 64 65 66
}

TaskTable &Resource::task_table() {
    return task_table_;
}

void Resource::WakeupLoader() {
67 68 69 70
    {
        std::lock_guard<std::mutex> lock(load_mutex_);
        load_flag_ = true;
    }
X
xj.lin 已提交
71 72 73
    load_cv_.notify_one();
}

W
wxyu 已提交
74
void Resource::WakeupExecutor() {
75 76 77 78
    {
        std::lock_guard<std::mutex> lock(exec_mutex_);
        exec_flag_ = true;
    }
W
wxyu 已提交
79 80 81
    exec_cv_.notify_one();
}

82
TaskTableItemPtr Resource::pick_task_load() {
83
    auto indexes = PickToLoad(task_table_, 10);
X
xj.lin 已提交
84 85 86
    for (auto index : indexes) {
        // try to set one task loading, then return
        if (task_table_.Load(index))
87
            return task_table_.Get(index);
X
xj.lin 已提交
88 89 90 91 92
        // else try next
    }
    return nullptr;
}

93
TaskTableItemPtr Resource::pick_task_execute() {
X
xj.lin 已提交
94 95 96 97
    auto indexes = PickToExecute(task_table_, 3);
    for (auto index : indexes) {
        // try to set one task executing, then return
        if (task_table_.Execute(index))
98
            return task_table_.Get(index);
X
xj.lin 已提交
99 100 101 102 103 104 105 106 107
        // else try next
    }
    return nullptr;
}

void Resource::loader_function() {
    while (running_) {
        std::unique_lock<std::mutex> lock(load_mutex_);
        load_cv_.wait(lock, [&] { return load_flag_; });
W
wxyu 已提交
108
        load_flag_ = false;
109
        lock.unlock();
110 111 112 113 114
        while (true) {
            auto task_item = pick_task_load();
            if (task_item == nullptr) {
                break;
            }
115
            LoadFile(task_item->task);
W
wxyu 已提交
116
            // TODO: wrapper loaded
117
            task_item->Loaded();
W
wxyu 已提交
118
            if (subscriber_) {
119 120
                auto event = std::make_shared<CopyCompletedEvent>(shared_from_this(), task_item);
                subscriber_(std::static_pointer_cast<Event>(event));
W
wxyu 已提交
121
            }
X
xj.lin 已提交
122
        }
123

X
xj.lin 已提交
124 125 126 127
    }
}

void Resource::executor_function() {
W
wxyu 已提交
128
    if (subscriber_) {
129 130
        auto event = std::make_shared<StartUpEvent>(shared_from_this());
        subscriber_(std::static_pointer_cast<Event>(event));
W
wxyu 已提交
131
    }
X
xj.lin 已提交
132 133 134
    while (running_) {
        std::unique_lock<std::mutex> lock(exec_mutex_);
        exec_cv_.wait(lock, [&] { return exec_flag_; });
W
wxyu 已提交
135
        exec_flag_ = false;
136
        lock.unlock();
137 138 139 140 141
        while (true) {
            auto task_item = pick_task_execute();
            if (task_item == nullptr) {
                break;
            }
142
            Process(task_item->task);
143
            task_item->Executed();
W
wxyu 已提交
144
            if (subscriber_) {
145 146
                auto event = std::make_shared<FinishTaskEvent>(shared_from_this(), task_item);
                subscriber_(std::static_pointer_cast<Event>(event));
W
wxyu 已提交
147
            }
X
xj.lin 已提交
148
        }
149

X
xj.lin 已提交
150 151 152 153 154 155 156 157 158 159 160
    }
}

RegisterHandlerPtr Resource::GetRegisterFunc(const RegisterType &type) {
    // construct object each time.
    return register_table_[type]();
}

}
}
}