GrpcRequestScheduler.cpp 7.6 KB
Newer Older
K
kun yu 已提交
1
/*******************************************************************************
Y
Yu Kun 已提交
2 3 4 5
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
Y
Yu Kun 已提交
6
#include "GrpcRequestScheduler.h"
K
kun yu 已提交
7 8
#include "utils/Log.h"

K
kun yu 已提交
9
#include "src/grpc/gen-status/status.pb.h"
K
kun yu 已提交
10 11 12 13

namespace zilliz {
namespace milvus {
namespace server {
Y
Yu Kun 已提交
14
namespace grpc {
K
kun yu 已提交
15 16 17 18

using namespace ::milvus;

namespace {
Y
Yu Kun 已提交
19
    const std::map<ServerError, ::milvus::grpc::ErrorCode> &ErrorMap() {
K
kun yu 已提交
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
        static const std::map<ServerError, ::milvus::grpc::ErrorCode> code_map = {
                {SERVER_UNEXPECTED_ERROR,         ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR},
                {SERVER_UNSUPPORTED_ERROR,        ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR},
                {SERVER_NULL_POINTER,             ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR},
                {SERVER_INVALID_ARGUMENT,         ::milvus::grpc::ErrorCode::ILLEGAL_ARGUMENT},
                {SERVER_FILE_NOT_FOUND,           ::milvus::grpc::ErrorCode::FILE_NOT_FOUND},
                {SERVER_NOT_IMPLEMENT,            ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR},
                {SERVER_BLOCKING_QUEUE_EMPTY,     ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR},
                {SERVER_CANNOT_CREATE_FOLDER,     ::milvus::grpc::ErrorCode::CANNOT_CREATE_FOLDER},
                {SERVER_CANNOT_CREATE_FILE,       ::milvus::grpc::ErrorCode::CANNOT_CREATE_FILE},
                {SERVER_CANNOT_DELETE_FOLDER,     ::milvus::grpc::ErrorCode::CANNOT_DELETE_FOLDER},
                {SERVER_CANNOT_DELETE_FILE,       ::milvus::grpc::ErrorCode::CANNOT_DELETE_FILE},
                {SERVER_TABLE_NOT_EXIST,          ::milvus::grpc::ErrorCode::TABLE_NOT_EXISTS},
                {SERVER_INVALID_TABLE_NAME,       ::milvus::grpc::ErrorCode::ILLEGAL_TABLE_NAME},
                {SERVER_INVALID_TABLE_DIMENSION,  ::milvus::grpc::ErrorCode::ILLEGAL_DIMENSION},
                {SERVER_INVALID_TIME_RANGE,       ::milvus::grpc::ErrorCode::ILLEGAL_RANGE},
                {SERVER_INVALID_VECTOR_DIMENSION, ::milvus::grpc::ErrorCode::ILLEGAL_DIMENSION},

                {SERVER_INVALID_INDEX_TYPE,       ::milvus::grpc::ErrorCode::ILLEGAL_INDEX_TYPE},
                {SERVER_INVALID_ROWRECORD,        ::milvus::grpc::ErrorCode::ILLEGAL_ROWRECORD},
                {SERVER_INVALID_ROWRECORD_ARRAY,  ::milvus::grpc::ErrorCode::ILLEGAL_ROWRECORD},
                {SERVER_INVALID_TOPK,             ::milvus::grpc::ErrorCode::ILLEGAL_TOPK},
42 43 44
                {SERVER_INVALID_NPROBE,           ::milvus::grpc::ErrorCode::ILLEGAL_ARGUMENT},
                {SERVER_INVALID_INDEX_NLIST,      ::milvus::grpc::ErrorCode::ILLEGAL_ARGUMENT},
                {SERVER_INVALID_INDEX_METRIC_TYPE,::milvus::grpc::ErrorCode::ILLEGAL_ARGUMENT},
K
kun yu 已提交
45 46 47 48 49
                {SERVER_ILLEGAL_VECTOR_ID,        ::milvus::grpc::ErrorCode::ILLEGAL_VECTOR_ID},
                {SERVER_ILLEGAL_SEARCH_RESULT,    ::milvus::grpc::ErrorCode::ILLEGAL_SEARCH_RESULT},
                {SERVER_CACHE_ERROR,              ::milvus::grpc::ErrorCode::CACHE_FAILED},
                {DB_META_TRANSACTION_FAILED,      ::milvus::grpc::ErrorCode::META_FAILED},
                {SERVER_BUILD_INDEX_ERROR,        ::milvus::grpc::ErrorCode::BUILD_INDEX_ERROR},
K
kun yu 已提交
50 51 52 53 54 55 56
        };

        return code_map;
    }
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Y
Yu Kun 已提交
57
GrpcBaseTask::GrpcBaseTask(const std::string &task_group, bool async)
K
kun yu 已提交
58 59 60 61 62 63 64
        : task_group_(task_group),
          async_(async),
          done_(false),
          error_code_(SERVER_SUCCESS) {

}

Y
Yu Kun 已提交
65
GrpcBaseTask::~GrpcBaseTask() {
K
kun yu 已提交
66 67 68
    WaitToFinish();
}

G
groot 已提交
69
ServerError GrpcBaseTask::Execute() {
K
kun yu 已提交
70
    error_code_ = OnExecute();
G
groot 已提交
71 72 73 74 75
    Done();
    return error_code_;
}

void GrpcBaseTask::Done() {
K
kun yu 已提交
76 77 78 79
    done_ = true;
    finish_cond_.notify_all();
}

G
groot 已提交
80
ServerError GrpcBaseTask::SetError(ServerError error_code, const std::string &error_msg) {
K
kun yu 已提交
81 82 83 84 85 86 87
    error_code_ = error_code;
    error_msg_ = error_msg;

    SERVER_LOG_ERROR << error_msg_;
    return error_code_;
}

G
groot 已提交
88
ServerError GrpcBaseTask::WaitToFinish() {
Y
Yu Kun 已提交
89
    std::unique_lock<std::mutex> lock(finish_mtx_);
K
kun yu 已提交
90 91 92 93 94 95
    finish_cond_.wait(lock, [this] { return done_; });

    return error_code_;
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Y
Yu Kun 已提交
96
GrpcRequestScheduler::GrpcRequestScheduler()
K
kun yu 已提交
97 98 99 100
        : stopped_(false) {
    Start();
}

Y
Yu Kun 已提交
101
GrpcRequestScheduler::~GrpcRequestScheduler() {
K
kun yu 已提交
102 103 104
    Stop();
}

G
groot 已提交
105
void GrpcRequestScheduler::ExecTask(BaseTaskPtr &task_ptr, ::milvus::grpc::Status *grpc_status) {
Y
Yu Kun 已提交
106
    if (task_ptr == nullptr) {
K
kun yu 已提交
107 108 109
        return;
    }

Y
Yu Kun 已提交
110
    GrpcRequestScheduler &scheduler = GrpcRequestScheduler::GetInstance();
K
kun yu 已提交
111 112
    scheduler.ExecuteTask(task_ptr);

Y
Yu Kun 已提交
113
    if (!task_ptr->IsAsync()) {
K
kun yu 已提交
114 115 116 117
        task_ptr->WaitToFinish();
        ServerError err = task_ptr->ErrorCode();
        if (err != SERVER_SUCCESS) {
            grpc_status->set_reason(task_ptr->ErrorMsg());
K
kun yu 已提交
118
            grpc_status->set_error_code(ErrorMap().at(err));
K
kun yu 已提交
119 120 121 122
        }
    }
}

G
groot 已提交
123
void GrpcRequestScheduler::Start() {
Y
Yu Kun 已提交
124
    if (!stopped_) {
K
kun yu 已提交
125 126 127 128 129 130
        return;
    }

    stopped_ = false;
}

G
groot 已提交
131
void GrpcRequestScheduler::Stop() {
Y
Yu Kun 已提交
132
    if (stopped_) {
K
kun yu 已提交
133 134 135 136 137 138
        return;
    }

    SERVER_LOG_INFO << "Scheduler gonna stop...";
    {
        std::lock_guard<std::mutex> lock(queue_mtx_);
Y
Yu Kun 已提交
139 140
        for (auto iter : task_groups_) {
            if (iter.second != nullptr) {
K
kun yu 已提交
141 142 143 144 145
                iter.second->Put(nullptr);
            }
        }
    }

Y
Yu Kun 已提交
146 147
    for (auto iter : execute_threads_) {
        if (iter == nullptr)
K
kun yu 已提交
148 149 150 151 152 153 154 155
            continue;

        iter->join();
    }
    stopped_ = true;
    SERVER_LOG_INFO << "Scheduler stopped";
}

G
groot 已提交
156
ServerError GrpcRequestScheduler::ExecuteTask(const BaseTaskPtr &task_ptr) {
Y
Yu Kun 已提交
157
    if (task_ptr == nullptr) {
K
kun yu 已提交
158 159 160
        return SERVER_NULL_POINTER;
    }

Y
Yu Kun 已提交
161 162 163
    ServerError err = PutTaskToQueue(task_ptr);
    if (err != SERVER_SUCCESS) {
        SERVER_LOG_ERROR << "Put task to queue failed with code: " << err;
K
kun yu 已提交
164 165 166
        return err;
    }

Y
Yu Kun 已提交
167
    if (task_ptr->IsAsync()) {
K
kun yu 已提交
168 169 170 171 172 173 174
        return SERVER_SUCCESS;//async execution, caller need to call WaitToFinish at somewhere
    }

    return task_ptr->WaitToFinish();//sync execution
}


G
groot 已提交
175 176 177 178 179 180 181 182 183 184 185
void GrpcRequestScheduler::TakeTaskToExecute(TaskQueuePtr task_queue) {
    if (task_queue == nullptr) {
        return;
    }

    while (true) {
        BaseTaskPtr task = task_queue->Take();
        if (task == nullptr) {
            SERVER_LOG_ERROR << "Take null from task queue, stop thread";
            break;//stop the thread
        }
K
kun yu 已提交
186

G
groot 已提交
187 188 189 190
        try {
            ServerError err = task->Execute();
            if (err != SERVER_SUCCESS) {
                SERVER_LOG_ERROR << "Task failed with code: " << err;
K
kun yu 已提交
191
            }
G
groot 已提交
192 193
        } catch (std::exception &ex) {
            SERVER_LOG_ERROR << "Task failed to execute: " << ex.what();
K
kun yu 已提交
194 195 196 197
        }
    }
}

G
groot 已提交
198
ServerError GrpcRequestScheduler::PutTaskToQueue(const BaseTaskPtr &task_ptr) {
K
kun yu 已提交
199 200 201
    std::lock_guard<std::mutex> lock(queue_mtx_);

    std::string group_name = task_ptr->TaskGroup();
Y
Yu Kun 已提交
202
    if (task_groups_.count(group_name) > 0) {
K
kun yu 已提交
203 204 205 206 207 208 209
        task_groups_[group_name]->Put(task_ptr);
    } else {
        TaskQueuePtr queue = std::make_shared<TaskQueue>();
        queue->Put(task_ptr);
        task_groups_.insert(std::make_pair(group_name, queue));

        //start a thread
G
groot 已提交
210
        ThreadPtr thread = std::make_shared<std::thread>(&GrpcRequestScheduler::TakeTaskToExecute, this, queue);
K
kun yu 已提交
211 212 213 214 215 216 217 218 219 220
        execute_threads_.push_back(thread);
        SERVER_LOG_INFO << "Create new thread for task group: " << group_name;
    }

    return SERVER_SUCCESS;
}

}
}
}
Y
Yu Kun 已提交
221
}