ResourceMgr.cpp 4.2 KB
Newer Older
S
starlord 已提交
1 2 3 4 5 6 7 8 9

/*******************************************************************************
 * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
 * Unauthorized copying of this file, via any medium is strictly prohibited.
 * Proprietary and confidential.
 ******************************************************************************/
#include "ResourceMgr.h"
#include "db/Log.h"

W
wxyu 已提交
10

S
starlord 已提交
11 12 13 14 15 16 17 18 19
namespace zilliz {
namespace milvus {
namespace engine {

ResourceMgr::ResourceMgr()
    : running_(false) {

}

W
wxyu 已提交
20 21 22 23 24 25 26 27 28 29 30
uint64_t
ResourceMgr::GetNumOfComputeResource() {
    uint64_t count = 0;
    for (auto &res : resources_) {
        if (res->HasExecutor()) {
            ++count;
        }
    }
    return count;
}

S
starlord 已提交
31 32 33 34 35
ResourceWPtr
ResourceMgr::Add(ResourcePtr &&resource) {
    ResourceWPtr ret(resource);

    std::lock_guard<std::mutex> lck(resources_mutex_);
W
wxyu 已提交
36
    if (running_) {
S
starlord 已提交
37 38 39 40
        ENGINE_LOG_ERROR << "ResourceMgr is running, not allow to add resource";
        return ret;
    }

W
wxyu 已提交
41 42 43
    if (resource->Type() == ResourceType::DISK) {
        disk_resources_.emplace_back(ResourceWPtr(resource));
    }
S
starlord 已提交
44 45 46
    resources_.emplace_back(resource);

    size_t index = resources_.size() - 1;
47
    resource->RegisterSubscriber(std::bind(&ResourceMgr::PostEvent, this, std::placeholders::_1));
S
starlord 已提交
48 49 50
    return ret;
}

51 52 53 54 55 56
void
ResourceMgr::Connect(const std::string &name1, const std::string &name2, Connection &connection) {
    auto res1 = get_resource_by_name(name1);
    auto res2 = get_resource_by_name(name2);
    if (res1 && res2) {
        res1->AddNeighbour(std::static_pointer_cast<Node>(res2), connection);
57
//        res2->AddNeighbour(std::static_pointer_cast<Node>(res1), connection);
58 59 60
    }
}

S
starlord 已提交
61 62 63 64 65
void
ResourceMgr::Connect(ResourceWPtr &res1, ResourceWPtr &res2, Connection &connection) {
    if (auto observe_a = res1.lock()) {
        if (auto observe_b = res2.lock()) {
            observe_a->AddNeighbour(std::static_pointer_cast<Node>(observe_b), connection);
66
            observe_b->AddNeighbour(std::static_pointer_cast<Node>(observe_a), connection);
S
starlord 已提交
67 68 69 70 71 72 73 74 75 76 77 78
        }
    }
}


void
ResourceMgr::Start() {
    std::lock_guard<std::mutex> lck(resources_mutex_);
    for (auto &resource : resources_) {
        resource->Start();
    }
    running_ = true;
79
    worker_thread_ = std::thread(&ResourceMgr::event_process, this);
S
starlord 已提交
80 81 82 83
}

void
ResourceMgr::Stop() {
84 85 86 87 88 89
    {
        std::lock_guard<std::mutex> lock(event_mutex_);
        running_ = false;
        queue_.push(nullptr);
        event_cv_.notify_one();
    }
S
starlord 已提交
90 91
    worker_thread_.join();

92
    std::lock_guard<std::mutex> lck(resources_mutex_);
S
starlord 已提交
93 94 95 96 97
    for (auto &resource : resources_) {
        resource->Stop();
    }
}

W
wxyu 已提交
98 99 100 101 102 103 104
void
ResourceMgr::Clear() {
    std::lock_guard<std::mutex> lck(resources_mutex_);
    disk_resources_.clear();
    resources_.clear();
}

105 106
void
ResourceMgr::PostEvent(const EventPtr &event) {
107
    std::lock_guard<std::mutex> lock(event_mutex_);
108 109 110 111
    queue_.emplace(event);
    event_cv_.notify_one();
}

S
starlord 已提交
112 113 114 115 116 117
std::string
ResourceMgr::Dump() {
    std::string str = "ResourceMgr contains " + std::to_string(resources_.size()) + " resources.\n";

    for (uint64_t i = 0; i < resources_.size(); ++i) {
        str += "Resource No." + std::to_string(i) + ":\n";
S
starlord 已提交
118
        //str += resources_[i]->Dump();
S
starlord 已提交
119 120 121 122 123
    }

    return str;
}

124 125 126 127 128 129
std::string
ResourceMgr::DumpTaskTables() {
    std::stringstream ss;
    ss << ">>>>>>>>>>>>>>>ResourceMgr::DumpTaskTable<<<<<<<<<<<<<<<" << std::endl;
    for (auto &resource : resources_) {
        ss << resource->Dump() << std::endl;
130 131
        ss << resource->task_table().Dump();
        ss << resource->Dump() << std::endl << std::endl;
132 133 134 135
    }
    return ss.str();
}

136 137 138 139 140 141 142 143 144 145
ResourcePtr
ResourceMgr::get_resource_by_name(const std::string &name) {
    for (auto &res : resources_) {
        if (res->Name() == name) {
            return res;
        }
    }
    return nullptr;
}

146 147 148 149 150 151 152
void
ResourceMgr::event_process() {
    while (running_) {
        std::unique_lock<std::mutex> lock(event_mutex_);
        event_cv_.wait(lock, [this] { return !queue_.empty(); });

        auto event = queue_.front();
153 154
        queue_.pop();
        lock.unlock();
155 156 157 158 159 160 161 162 163 164 165 166
        if (event == nullptr) {
            break;
        }

//        ENGINE_LOG_DEBUG << "ResourceMgr process " << *event;

        if (subscriber_) {
            subscriber_(event);
        }
    }
}

S
starlord 已提交
167 168 169
}
}
}