ResourceMgr.cpp 2.6 KB
Newer Older
G
groot 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105

/*******************************************************************************
 * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
 * Unauthorized copying of this file, via any medium is strictly prohibited.
 * Proprietary and confidential.
 ******************************************************************************/
#include "ResourceMgr.h"
#include "db/Log.h"

namespace zilliz {
namespace milvus {
namespace engine {

ResourceMgr::ResourceMgr()
    : running_(false) {

}

ResourceWPtr
ResourceMgr::Add(ResourcePtr &&resource) {
    ResourceWPtr ret(resource);

    std::lock_guard<std::mutex> lck(resources_mutex_);
    if(running_) {
        ENGINE_LOG_ERROR << "ResourceMgr is running, not allow to add resource";
        return ret;
    }

    resources_.emplace_back(resource);

    size_t index = resources_.size() - 1;
    resource->RegisterOnStartUp([&] {
        start_up_event_[index] = true;
        event_cv_.notify_one();
    });
    resource->RegisterOnFinishTask([&] {
        finish_task_event_[index] = true;
        event_cv_.notify_one();
    });
    return ret;
}

void
ResourceMgr::Connect(ResourceWPtr &res1, ResourceWPtr &res2, Connection &connection) {
    if (auto observe_a = res1.lock()) {
        if (auto observe_b = res2.lock()) {
            observe_a->AddNeighbour(std::static_pointer_cast<Node>(observe_b), connection);
        }
    }
}

void
ResourceMgr::EventProcess() {
    while (running_) {
        std::unique_lock <std::mutex> lock(resources_mutex_);
        event_cv_.wait(lock, [this] { return !resources_.empty(); });

        if(!running_) {
            break;
        }

        for (uint64_t i = 0; i < resources_.size(); ++i) {
            ResourceWPtr res(resources_[i]);
            if (start_up_event_[i]) {
                on_start_up_(res);
            }
            if (finish_task_event_[i]) {
                on_finish_task_(res);
            }
            if (copy_completed_event_[i]) {
                on_copy_completed_(res);
            }
            if (task_table_updated_event_[i]) {
                on_task_table_updated_(res);
            }
        }
    }
}

void
ResourceMgr::Start() {
    std::lock_guard<std::mutex> lck(resources_mutex_);
    for (auto &resource : resources_) {
        resource->Start();
    }
    worker_thread_ = std::thread(&ResourceMgr::EventProcess, this);

    running_ = true;
}

void
ResourceMgr::Stop() {
    std::lock_guard<std::mutex> lck(resources_mutex_);

    running_ = false;
    worker_thread_.join();

    for (auto &resource : resources_) {
        resource->Stop();
    }
}

}
}
}