Scheduler.cpp 3.1 KB
Newer Older
W
wxyu 已提交
1 2 3 4 5 6 7
/*******************************************************************************
 * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
 * Unauthorized copying of this file, via any medium is strictly prohibited.
 * Proprietary and confidential.
 ******************************************************************************/

#include "Scheduler.h"
W
wxyu 已提交
8
#include "Cost.h"
9
#include "action/Action.h"
W
wxyu 已提交
10 11 12 13 14 15


namespace zilliz {
namespace milvus {
namespace engine {

16 17 18 19 20 21 22 23 24
Scheduler::Scheduler(ResourceMgrWPtr res_mgr)
    : running_(false),
      res_mgr_(std::move(res_mgr)) {
    if (auto mgr = res_mgr_.lock()) {
        mgr->RegisterSubscriber(std::bind(&Scheduler::PostEvent, this, std::placeholders::_1));
    }
}


W
wxyu 已提交
25
void
26 27 28 29 30 31 32 33 34 35 36 37
Scheduler::Start() {
    running_ = true;
    worker_thread_ = std::thread(&Scheduler::worker_function, this);
}

void
Scheduler::Stop() {
    {
        std::lock_guard<std::mutex> lock(event_mutex_);
        running_ = false;
        event_queue_.push(nullptr);
        event_cv_.notify_one();
W
wxyu 已提交
38
    }
39 40 41 42 43 44 45 46 47 48 49 50 51 52
    worker_thread_.join();
}

void
Scheduler::PostEvent(const EventPtr &event) {
    std::lock_guard<std::mutex> lock(event_mutex_);
    event_queue_.push(event);
    event_cv_.notify_one();
//    SERVER_LOG_DEBUG << "Scheduler post " << *event;
}

std::string
Scheduler::Dump() {
    return std::string();
W
wxyu 已提交
53 54 55
}

void
56 57 58 59 60 61 62
Scheduler::worker_function() {
    while (running_) {
        std::unique_lock<std::mutex> lock(event_mutex_);
        event_cv_.wait(lock, [this] { return !event_queue_.empty(); });
        auto event = event_queue_.front();
        if (event == nullptr) {
            break;
W
wxyu 已提交
63 64
        }

65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
//        SERVER_LOG_DEBUG << "Scheduler process " << *event;
        event_queue_.pop();
        Process(event);
    }
}

void
Scheduler::Process(const EventPtr &event) {
    switch (event->Type()) {
        case EventType::START_UP: {
            OnStartUp(event);
            break;
        }
        case EventType::COPY_COMPLETED: {
            OnCopyCompleted(event);
            break;
        }
        case EventType::FINISH_TASK: {
            OnFinishTask(event);
            break;
        }
        case EventType::TASK_TABLE_UPDATED: {
            OnTaskTableUpdated(event);
            break;
        }
        default: {
            // TODO: logging
            break;
        }
W
wxyu 已提交
94
    }
W
wxyu 已提交
95 96
}

97

W
wxyu 已提交
98
void
W
wxyu 已提交
99
Scheduler::OnStartUp(const EventPtr &event) {
100 101 102
    if (auto resource = event->resource_.lock()) {
        resource->WakeupLoader();
    }
W
wxyu 已提交
103 104 105
}

void
W
wxyu 已提交
106
Scheduler::OnFinishTask(const EventPtr &event) {
107 108 109
    if (auto resource = event->resource_.lock()) {
        resource->WakeupExecutor();
    }
W
wxyu 已提交
110 111
}

W
wxyu 已提交
112
void
W
wxyu 已提交
113
Scheduler::OnCopyCompleted(const EventPtr &event) {
114 115 116 117 118 119 120
    if (auto resource = event->resource_.lock()) {
        resource->WakeupLoader();
        resource->WakeupExecutor();
        if (resource->Type()== ResourceType::DISK) {
            Action::PushTaskToNeighbour(event->resource_);
        }
    }
W
wxyu 已提交
121
}
W
wxyu 已提交
122 123

void
W
wxyu 已提交
124
Scheduler::OnTaskTableUpdated(const EventPtr &event) {
125 126 127 128
//    Action::PushTaskToNeighbour(event->resource_);
    if (auto resource = event->resource_.lock()) {
        resource->WakeupLoader();
    }
W
wxyu 已提交
129 130 131 132 133
}

}
}
}