Scheduler.cpp 3.5 KB
Newer Older
W
wxyu 已提交
1 2 3 4 5 6 7
/*******************************************************************************
 * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
 * Unauthorized copying of this file, via any medium is strictly prohibited.
 * Proprietary and confidential.
 ******************************************************************************/

#include "Scheduler.h"
W
wxyu 已提交
8
#include "Cost.h"
9
#include "action/Action.h"
W
wxyu 已提交
10 11 12 13 14 15


namespace zilliz {
namespace milvus {
namespace engine {

16 17 18 19 20 21 22 23 24
Scheduler::Scheduler(ResourceMgrWPtr res_mgr)
    : running_(false),
      res_mgr_(std::move(res_mgr)) {
    if (auto mgr = res_mgr_.lock()) {
        mgr->RegisterSubscriber(std::bind(&Scheduler::PostEvent, this, std::placeholders::_1));
    }
}


W
wxyu 已提交
25
void
26 27 28 29 30 31 32 33 34 35 36 37
Scheduler::Start() {
    running_ = true;
    worker_thread_ = std::thread(&Scheduler::worker_function, this);
}

void
Scheduler::Stop() {
    {
        std::lock_guard<std::mutex> lock(event_mutex_);
        running_ = false;
        event_queue_.push(nullptr);
        event_cv_.notify_one();
W
wxyu 已提交
38
    }
39 40 41 42 43
    worker_thread_.join();
}

void
Scheduler::PostEvent(const EventPtr &event) {
44 45 46 47
    {
        std::lock_guard<std::mutex> lock(event_mutex_);
        event_queue_.push(event);
    }
48 49 50 51 52 53
    event_cv_.notify_one();
}

std::string
Scheduler::Dump() {
    return std::string();
W
wxyu 已提交
54 55 56
}

void
57 58 59 60 61
Scheduler::worker_function() {
    while (running_) {
        std::unique_lock<std::mutex> lock(event_mutex_);
        event_cv_.wait(lock, [this] { return !event_queue_.empty(); });
        auto event = event_queue_.front();
62
        event_queue_.pop();
63 64
        if (event == nullptr) {
            break;
W
wxyu 已提交
65 66
        }

67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
        Process(event);
    }
}

void
Scheduler::Process(const EventPtr &event) {
    switch (event->Type()) {
        case EventType::START_UP: {
            OnStartUp(event);
            break;
        }
        case EventType::COPY_COMPLETED: {
            OnCopyCompleted(event);
            break;
        }
        case EventType::FINISH_TASK: {
            OnFinishTask(event);
            break;
        }
        case EventType::TASK_TABLE_UPDATED: {
            OnTaskTableUpdated(event);
            break;
        }
        default: {
            // TODO: logging
            break;
        }
W
wxyu 已提交
94
    }
W
wxyu 已提交
95 96
}

97

W
wxyu 已提交
98
void
W
wxyu 已提交
99
Scheduler::OnStartUp(const EventPtr &event) {
100 101 102
    if (auto resource = event->resource_.lock()) {
        resource->WakeupLoader();
    }
W
wxyu 已提交
103 104 105
}

void
W
wxyu 已提交
106
Scheduler::OnFinishTask(const EventPtr &event) {
W
wxyu 已提交
107 108
}

W
wxyu 已提交
109
void
W
wxyu 已提交
110
Scheduler::OnCopyCompleted(const EventPtr &event) {
111
    auto load_completed_event = std::static_pointer_cast<CopyCompletedEvent>(event);
112 113
    if (auto resource = event->resource_.lock()) {
        resource->WakeupExecutor();
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129

        auto task_table_type = load_completed_event->task_table_item_->task->label()->Type();
        switch (task_table_type) {
            case TaskLabelType::DEFAULT: {
                if (not resource->HasExecutor() && load_completed_event->task_table_item_->Move()) {
                    Action::PushTaskToNeighbourRandomly(load_completed_event->task_table_item_->task, resource);
                }
                break;
            }
            case TaskLabelType::BROADCAST: {
                Action::PushTaskToAllNeighbour(load_completed_event->task_table_item_->task, resource);
                break;
            }
            default: {
                break;
            }
130 131
        }
    }
W
wxyu 已提交
132
}
W
wxyu 已提交
133 134

void
W
wxyu 已提交
135
Scheduler::OnTaskTableUpdated(const EventPtr &event) {
136 137 138
    if (auto resource = event->resource_.lock()) {
        resource->WakeupLoader();
    }
W
wxyu 已提交
139 140 141 142 143
}

}
}
}