Resource.h 4.0 KB
Newer Older
1
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
J
jinhai 已提交
2
//
3 4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
J
jinhai 已提交
5
//
6 7 8 9 10
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
J
jinhai 已提交
11

W
wxyu 已提交
12 13
#pragma once

S
starlord 已提交
14 15
#include <condition_variable>
#include <functional>
W
wxyu 已提交
16
#include <memory>
S
starlord 已提交
17
#include <string>
W
wxyu 已提交
18
#include <thread>
S
starlord 已提交
19 20
#include <utility>
#include <vector>
W
wxyu 已提交
21

S
starlord 已提交
22
#include "../TaskTable.h"
W
wxyu 已提交
23 24
#include "../event/Event.h"
#include "../event/FinishTaskEvent.h"
S
starlord 已提交
25 26
#include "../event/LoadCompletedEvent.h"
#include "../event/StartUpEvent.h"
W
wxyu 已提交
27
#include "../event/TaskTableUpdatedEvent.h"
28
#include "../task/Task.h"
W
wxyu 已提交
29
#include "Connection.h"
X
xj.lin 已提交
30
#include "Node.h"
W
wxyu 已提交
31

W
Wang Xiangyu 已提交
32
namespace milvus::scheduler {
W
wxyu 已提交
33

W
wxyu 已提交
34
// TODO(wxyu): Storage, Route, Executor
W
wxyu 已提交
35 36 37
enum class ResourceType {
    DISK = 0,
    CPU = 1,
W
wxyu 已提交
38 39
    GPU = 2,
    TEST = 3,
W
wxyu 已提交
40 41
};

W
wxyu 已提交
42
class Resource : public Node, public std::enable_shared_from_this<Resource> {
Y
Yu Kun 已提交
43
 public:
X
xj.lin 已提交
44
    /*
W
wxyu 已提交
45
     * Start loader and executor if enable;
X
xj.lin 已提交
46
     */
W
wxyu 已提交
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
    void
    Start();

    /*
     * Stop loader and executor, join it, blocking util thread exited;
     */
    void
    Stop();

    /*
     * wake up loader;
     */
    void
    WakeupLoader();

    /*
     * wake up executor;
     */
    void
    WakeupExecutor();

W
wxyu 已提交
68 69 70 71 72
    inline void
    RegisterSubscriber(std::function<void(EventPtr)> subscriber) {
        subscriber_ = std::move(subscriber);
    }

W
wxyu 已提交
73 74
    json
    Dump() const override;
W
wxyu 已提交
75 76

 public:
77
    inline std::string
W
wxyu 已提交
78
    name() const {
79 80 81
        return name_;
    }

W
wxyu 已提交
82
    inline ResourceType
W
wxyu 已提交
83
    type() const {
W
wxyu 已提交
84 85
        return type_;
    }
X
xj.lin 已提交
86

87
    inline uint64_t
W
wxyu 已提交
88
    device_id() const {
89 90 91
        return device_id_;
    }

S
starlord 已提交
92
    TaskTable&
W
wxyu 已提交
93 94 95 96
    task_table() {
        return task_table_;
    }

S
starlord 已提交
97
 public:
W
wxyu 已提交
98
    inline bool
W
wxyu 已提交
99
    HasExecutor() const {
W
wxyu 已提交
100 101
        return enable_executor_;
    }
W
wxyu 已提交
102

W
wxyu 已提交
103
    // TODO(wxyu): const
Y
Yu Kun 已提交
104
    uint64_t
W
wxyu 已提交
105
    NumOfTaskToExec();
Y
Yu Kun 已提交
106

W
wxyu 已提交
107
    // TODO(wxyu): need double ?
Y
Yu Kun 已提交
108 109
    inline uint64_t
    TaskAvgCost() const {
F
fishpenguin 已提交
110 111 112
        if (total_task_ == 0) {
            return 0;
        }
Y
Yu Kun 已提交
113 114 115
        return total_cost_ / total_task_;
    }

Y
Yu Kun 已提交
116 117 118 119 120
    inline uint64_t
    TotalTasks() const {
        return total_task_;
    }

S
starlord 已提交
121 122
    friend std::ostream&
    operator<<(std::ostream& out, const Resource& resource);
123

Y
Yu Kun 已提交
124
 protected:
W
Wang XiangYu 已提交
125
    Resource(std::string name, ResourceType type, uint64_t device_id, bool enable_executor);
W
wxyu 已提交
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140

    /*
     * Implementation by inherit class;
     * Blocking function;
     */
    virtual void
    LoadFile(TaskPtr task) = 0;

    /*
     * Implementation by inherit class;
     * Blocking function;
     */
    virtual void
    Process(TaskPtr task) = 0;

Y
Yu Kun 已提交
141
 private:
W
wxyu 已提交
142 143 144 145
    /*
     * Pick one task to load;
     * Order by start time;
     */
146
    TaskTableItemPtr
X
xj.lin 已提交
147
    pick_task_load();
W
wxyu 已提交
148 149 150 151 152

    /*
     * Pick one task to execute;
     * Pick by start time and priority;
     */
153
    TaskTableItemPtr
X
xj.lin 已提交
154
    pick_task_execute();
W
wxyu 已提交
155

Y
Yu Kun 已提交
156
 private:
W
wxyu 已提交
157 158 159 160
    /*
     * Only called by load thread;
     */
    void
X
xj.lin 已提交
161
    loader_function();
W
wxyu 已提交
162 163 164 165 166

    /*
     * Only called by worker thread;
     */
    void
X
xj.lin 已提交
167
    executor_function();
W
wxyu 已提交
168

Y
Yu Kun 已提交
169
 protected:
170
    uint64_t device_id_;
W
wxyu 已提交
171
    std::string name_;
W
wxyu 已提交
172

Y
Yu Kun 已提交
173
 private:
W
wxyu 已提交
174 175 176 177
    ResourceType type_;

    TaskTable task_table_;

Y
fix bug  
Yu Kun 已提交
178 179
    uint64_t total_cost_ = 0;
    uint64_t total_task_ = 0;
Y
Yu Kun 已提交
180

W
wxyu 已提交
181
    std::function<void(EventPtr)> subscriber_ = nullptr;
W
wxyu 已提交
182

W
wxyu 已提交
183
    bool running_ = false;
184
    bool enable_executor_ = true;
W
wxyu 已提交
185 186 187
    std::thread loader_thread_;
    std::thread executor_thread_;

W
wxyu 已提交
188 189
    bool load_flag_ = false;
    bool exec_flag_ = false;
W
wxyu 已提交
190 191 192 193 194 195 196 197 198
    std::mutex load_mutex_;
    std::mutex exec_mutex_;
    std::condition_variable load_cv_;
    std::condition_variable exec_cv_;
};

using ResourcePtr = std::shared_ptr<Resource>;
using ResourceWPtr = std::weak_ptr<Resource>;

W
Wang Xiangyu 已提交
199
}  // namespace milvus::scheduler