Resource.cpp 3.6 KB
Newer Older
X
xj.lin 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/*******************************************************************************
 * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
 * Unauthorized copying of this file, via any medium is strictly prohibited.
 * Proprietary and confidential.
 ******************************************************************************/
#include "Resource.h"


namespace zilliz {
namespace milvus {
namespace engine {

Resource::Resource(std::string name, ResourceType type)
    : name_(std::move(name)),
      type_(type),
      running_(false),
      load_flag_(false),
      exec_flag_(false) {
W
wxyu 已提交
19 20 21 22 23 24
    task_table_.RegisterSubscriber([&] {
        if (subscriber_) {
            auto event = std::make_shared<TaskTableUpdatedEvent>(shared_from_this());
            subscriber_(std::static_pointer_cast<Event>(event));
        }
    });
X
xj.lin 已提交
25 26 27
}

void Resource::Start() {
W
wxyu 已提交
28
    running_ = true;
X
xj.lin 已提交
29 30 31 32 33 34 35 36
    loader_thread_ = std::thread(&Resource::loader_function, this);
    executor_thread_ = std::thread(&Resource::executor_function, this);
}

void Resource::Stop() {
    running_ = false;
    WakeupLoader();
    WakeupExecutor();
W
wxyu 已提交
37 38
    loader_thread_.join();
    executor_thread_.join();
X
xj.lin 已提交
39 40 41 42 43 44 45
}

TaskTable &Resource::task_table() {
    return task_table_;
}

void Resource::WakeupLoader() {
W
wxyu 已提交
46 47
    std::lock_guard<std::mutex> lock(load_mutex_);
    load_flag_ = true;
X
xj.lin 已提交
48 49 50
    load_cv_.notify_one();
}

W
wxyu 已提交
51 52 53 54 55 56
void Resource::WakeupExecutor() {
    std::lock_guard<std::mutex> lock(exec_mutex_);
    exec_flag_ = true;
    exec_cv_.notify_one();
}

57
TaskTableItemPtr Resource::pick_task_load() {
X
xj.lin 已提交
58 59 60 61
    auto indexes = PickToLoad(task_table_, 3);
    for (auto index : indexes) {
        // try to set one task loading, then return
        if (task_table_.Load(index))
62
            return task_table_.Get(index);
X
xj.lin 已提交
63 64 65 66 67
        // else try next
    }
    return nullptr;
}

68
TaskTableItemPtr Resource::pick_task_execute() {
X
xj.lin 已提交
69 70 71 72
    auto indexes = PickToExecute(task_table_, 3);
    for (auto index : indexes) {
        // try to set one task executing, then return
        if (task_table_.Execute(index))
73
            return task_table_.Get(index);
X
xj.lin 已提交
74 75 76 77 78 79 80 81 82
        // else try next
    }
    return nullptr;
}

void Resource::loader_function() {
    while (running_) {
        std::unique_lock<std::mutex> lock(load_mutex_);
        load_cv_.wait(lock, [&] { return load_flag_; });
W
wxyu 已提交
83
        load_flag_ = false;
84 85 86
        auto task_item = pick_task_load();
        if (task_item) {
            LoadFile(task_item->task);
W
wxyu 已提交
87 88
            // TODO: wrapper loaded
            task_item->state = TaskTableItemState::LOADED;
W
wxyu 已提交
89
            if (subscriber_) {
90 91
                auto event = std::make_shared<CopyCompletedEvent>(shared_from_this(), task_item);
                subscriber_(std::static_pointer_cast<Event>(event));
W
wxyu 已提交
92
            }
X
xj.lin 已提交
93 94 95 96 97
        }
    }
}

void Resource::executor_function() {
W
wxyu 已提交
98
    if (subscriber_) {
99 100
        auto event = std::make_shared<StartUpEvent>(shared_from_this());
        subscriber_(std::static_pointer_cast<Event>(event));
W
wxyu 已提交
101
    }
X
xj.lin 已提交
102 103 104
    while (running_) {
        std::unique_lock<std::mutex> lock(exec_mutex_);
        exec_cv_.wait(lock, [&] { return exec_flag_; });
W
wxyu 已提交
105
        exec_flag_ = false;
106 107 108
        auto task_item = pick_task_execute();
        if (task_item) {
            Process(task_item->task);
W
wxyu 已提交
109
            if (subscriber_) {
110 111
                auto event = std::make_shared<FinishTaskEvent>(shared_from_this(), task_item);
                subscriber_(std::static_pointer_cast<Event>(event));
W
wxyu 已提交
112
            }
X
xj.lin 已提交
113 114 115 116 117 118 119 120 121 122 123 124
        }
    }
}

RegisterHandlerPtr Resource::GetRegisterFunc(const RegisterType &type) {
    // construct object each time.
    return register_table_[type]();
}

}
}
}