GrpcRequestScheduler.h 2.8 KB
Newer Older
J
jinhai 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

K
kun yu 已提交
18 19
#pragma once

S
starlord 已提交
20 21
#include "grpc/gen-status/status.grpc.pb.h"
#include "grpc/gen-status/status.pb.h"
S
starlord 已提交
22 23
#include "utils/BlockingQueue.h"
#include "utils/Status.h"
K
kun yu 已提交
24 25

#include <map>
S
starlord 已提交
26 27
#include <memory>
#include <string>
S
starlord 已提交
28 29
#include <thread>
#include <vector>
K
kun yu 已提交
30 31 32 33

namespace zilliz {
namespace milvus {
namespace server {
Y
Yu Kun 已提交
34
namespace grpc {
K
kun yu 已提交
35

Y
Yu Kun 已提交
36
class GrpcBaseTask {
S
starlord 已提交
37
 protected:
S
starlord 已提交
38
    explicit GrpcBaseTask(const std::string& task_group, bool async = false);
Y
Yu Kun 已提交
39

Y
Yu Kun 已提交
40
    virtual ~GrpcBaseTask();
K
kun yu 已提交
41

S
starlord 已提交
42
 public:
S
starlord 已提交
43 44
    Status
    Execute();
K
kun yu 已提交
45

S
starlord 已提交
46 47
    void
    Done();
K
kun yu 已提交
48

S
starlord 已提交
49 50
    Status
    WaitToFinish();
K
kun yu 已提交
51

S
starlord 已提交
52 53
    std::string
    TaskGroup() const {
S
starlord 已提交
54 55
        return task_group_;
    }
Y
Yu Kun 已提交
56

S
starlord 已提交
57 58
    const Status&
    status() const {
S
starlord 已提交
59 60
        return status_;
    }
S
starlord 已提交
61

S
starlord 已提交
62 63
    bool
    IsAsync() const {
S
starlord 已提交
64 65
        return async_;
    }
K
kun yu 已提交
66

S
starlord 已提交
67
 protected:
S
starlord 已提交
68 69
    virtual Status
    OnExecute() = 0;
K
kun yu 已提交
70

S
starlord 已提交
71
    Status
S
starlord 已提交
72
    SetStatus(ErrorCode error_code, const std::string& error_msg);
K
kun yu 已提交
73

S
starlord 已提交
74
 protected:
K
kun yu 已提交
75 76 77 78 79 80
    mutable std::mutex finish_mtx_;
    std::condition_variable finish_cond_;

    std::string task_group_;
    bool async_;
    bool done_;
S
starlord 已提交
81
    Status status_;
K
kun yu 已提交
82 83
};

Y
Yu Kun 已提交
84
using BaseTaskPtr = std::shared_ptr<GrpcBaseTask>;
K
kun yu 已提交
85 86 87 88
using TaskQueue = BlockingQueue<BaseTaskPtr>;
using TaskQueuePtr = std::shared_ptr<TaskQueue>;
using ThreadPtr = std::shared_ptr<std::thread>;

Y
Yu Kun 已提交
89
class GrpcRequestScheduler {
S
starlord 已提交
90
 public:
S
starlord 已提交
91 92
    static GrpcRequestScheduler&
    GetInstance() {
Y
Yu Kun 已提交
93
        static GrpcRequestScheduler scheduler;
K
kun yu 已提交
94 95 96
        return scheduler;
    }

S
starlord 已提交
97 98
    void
    Start();
Y
Yu Kun 已提交
99

S
starlord 已提交
100 101
    void
    Stop();
K
kun yu 已提交
102

S
starlord 已提交
103 104
    Status
    ExecuteTask(const BaseTaskPtr& task_ptr);
K
kun yu 已提交
105

S
starlord 已提交
106 107
    static void
    ExecTask(BaseTaskPtr& task_ptr, ::milvus::grpc::Status* grpc_status);
K
kun yu 已提交
108

S
starlord 已提交
109
 protected:
Y
Yu Kun 已提交
110
    GrpcRequestScheduler();
Y
Yu Kun 已提交
111

Y
Yu Kun 已提交
112
    virtual ~GrpcRequestScheduler();
K
kun yu 已提交
113

S
starlord 已提交
114 115
    void
    TakeTaskToExecute(TaskQueuePtr task_queue);
S
starlord 已提交
116

S
starlord 已提交
117 118
    Status
    PutTaskToQueue(const BaseTaskPtr& task_ptr);
K
kun yu 已提交
119

S
starlord 已提交
120
 private:
K
kun yu 已提交
121 122 123 124 125 126 127 128 129
    mutable std::mutex queue_mtx_;

    std::map<std::string, TaskQueuePtr> task_groups_;

    std::vector<ThreadPtr> execute_threads_;

    bool stopped_;
};

S
starlord 已提交
130 131 132 133
}  // namespace grpc
}  // namespace server
}  // namespace milvus
}  // namespace zilliz