ResourceMgr.cpp 3.5 KB
Newer Older
S
starlord 已提交
1 2 3 4 5 6 7 8 9

/*******************************************************************************
 * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
 * Unauthorized copying of this file, via any medium is strictly prohibited.
 * Proprietary and confidential.
 ******************************************************************************/
#include "ResourceMgr.h"
#include "db/Log.h"

W
wxyu 已提交
10

S
starlord 已提交
11 12 13 14 15 16 17 18 19
namespace zilliz {
namespace milvus {
namespace engine {

ResourceMgr::ResourceMgr()
    : running_(false) {

}

W
wxyu 已提交
20 21 22 23 24 25 26 27 28 29 30
uint64_t
ResourceMgr::GetNumOfComputeResource() {
    uint64_t count = 0;
    for (auto &res : resources_) {
        if (res->HasExecutor()) {
            ++count;
        }
    }
    return count;
}

S
starlord 已提交
31 32 33 34 35
ResourceWPtr
ResourceMgr::Add(ResourcePtr &&resource) {
    ResourceWPtr ret(resource);

    std::lock_guard<std::mutex> lck(resources_mutex_);
W
wxyu 已提交
36
    if (running_) {
S
starlord 已提交
37 38 39 40
        ENGINE_LOG_ERROR << "ResourceMgr is running, not allow to add resource";
        return ret;
    }

W
wxyu 已提交
41 42 43
    if (resource->Type() == ResourceType::DISK) {
        disk_resources_.emplace_back(ResourceWPtr(resource));
    }
S
starlord 已提交
44 45 46
    resources_.emplace_back(resource);

    size_t index = resources_.size() - 1;
47
    resource->RegisterSubscriber(std::bind(&ResourceMgr::PostEvent, this, std::placeholders::_1));
S
starlord 已提交
48 49 50 51 52 53 54 55
    return ret;
}

void
ResourceMgr::Connect(ResourceWPtr &res1, ResourceWPtr &res2, Connection &connection) {
    if (auto observe_a = res1.lock()) {
        if (auto observe_b = res2.lock()) {
            observe_a->AddNeighbour(std::static_pointer_cast<Node>(observe_b), connection);
56
            observe_b->AddNeighbour(std::static_pointer_cast<Node>(observe_a), connection);
S
starlord 已提交
57 58 59 60 61 62 63 64 65 66 67 68
        }
    }
}


void
ResourceMgr::Start() {
    std::lock_guard<std::mutex> lck(resources_mutex_);
    for (auto &resource : resources_) {
        resource->Start();
    }
    running_ = true;
69
    worker_thread_ = std::thread(&ResourceMgr::event_process, this);
S
starlord 已提交
70 71 72 73
}

void
ResourceMgr::Stop() {
74 75 76 77 78 79
    {
        std::lock_guard<std::mutex> lock(event_mutex_);
        running_ = false;
        queue_.push(nullptr);
        event_cv_.notify_one();
    }
S
starlord 已提交
80 81
    worker_thread_.join();

82
    std::lock_guard<std::mutex> lck(resources_mutex_);
S
starlord 已提交
83 84 85 86 87
    for (auto &resource : resources_) {
        resource->Stop();
    }
}

88 89
void
ResourceMgr::PostEvent(const EventPtr &event) {
90
    std::lock_guard<std::mutex> lock(event_mutex_);
91 92 93 94
    queue_.emplace(event);
    event_cv_.notify_one();
}

S
starlord 已提交
95 96 97 98 99 100
std::string
ResourceMgr::Dump() {
    std::string str = "ResourceMgr contains " + std::to_string(resources_.size()) + " resources.\n";

    for (uint64_t i = 0; i < resources_.size(); ++i) {
        str += "Resource No." + std::to_string(i) + ":\n";
S
starlord 已提交
101
        //str += resources_[i]->Dump();
S
starlord 已提交
102 103 104 105 106
    }

    return str;
}

107 108 109 110 111 112
std::string
ResourceMgr::DumpTaskTables() {
    std::stringstream ss;
    ss << ">>>>>>>>>>>>>>>ResourceMgr::DumpTaskTable<<<<<<<<<<<<<<<" << std::endl;
    for (auto &resource : resources_) {
        ss << resource->Dump() << std::endl;
113 114
        ss << resource->task_table().Dump();
        ss << resource->Dump() << std::endl << std::endl;
115 116 117 118
    }
    return ss.str();
}

119 120 121 122 123 124 125
void
ResourceMgr::event_process() {
    while (running_) {
        std::unique_lock<std::mutex> lock(event_mutex_);
        event_cv_.wait(lock, [this] { return !queue_.empty(); });

        auto event = queue_.front();
126 127
        queue_.pop();
        lock.unlock();
128 129 130 131 132 133 134 135 136 137 138 139
        if (event == nullptr) {
            break;
        }

//        ENGINE_LOG_DEBUG << "ResourceMgr process " << *event;

        if (subscriber_) {
            subscriber_(event);
        }
    }
}

S
starlord 已提交
140 141 142
}
}
}