GrpcRequestScheduler.cpp 7.8 KB
Newer Older
K
kun yu 已提交
1
/*******************************************************************************
Y
Yu Kun 已提交
2 3 4 5
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
Y
Yu Kun 已提交
6
#include "GrpcRequestScheduler.h"
K
kun yu 已提交
7 8
#include "utils/Log.h"

K
kun yu 已提交
9
#include "src/grpc/gen-status/status.pb.h"
K
kun yu 已提交
10 11 12 13

namespace zilliz {
namespace milvus {
namespace server {
Y
Yu Kun 已提交
14
namespace grpc {
K
kun yu 已提交
15 16 17 18

using namespace ::milvus;

namespace {
S
starlord 已提交
19
    ::milvus::grpc::ErrorCode ErrorMap(ErrorCode code) {
S
starlord 已提交
20
        static const std::map<ErrorCode, ::milvus::grpc::ErrorCode> code_map = {
K
kun yu 已提交
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
                {SERVER_UNEXPECTED_ERROR,         ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR},
                {SERVER_UNSUPPORTED_ERROR,        ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR},
                {SERVER_NULL_POINTER,             ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR},
                {SERVER_INVALID_ARGUMENT,         ::milvus::grpc::ErrorCode::ILLEGAL_ARGUMENT},
                {SERVER_FILE_NOT_FOUND,           ::milvus::grpc::ErrorCode::FILE_NOT_FOUND},
                {SERVER_NOT_IMPLEMENT,            ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR},
                {SERVER_BLOCKING_QUEUE_EMPTY,     ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR},
                {SERVER_CANNOT_CREATE_FOLDER,     ::milvus::grpc::ErrorCode::CANNOT_CREATE_FOLDER},
                {SERVER_CANNOT_CREATE_FILE,       ::milvus::grpc::ErrorCode::CANNOT_CREATE_FILE},
                {SERVER_CANNOT_DELETE_FOLDER,     ::milvus::grpc::ErrorCode::CANNOT_DELETE_FOLDER},
                {SERVER_CANNOT_DELETE_FILE,       ::milvus::grpc::ErrorCode::CANNOT_DELETE_FILE},
                {SERVER_TABLE_NOT_EXIST,          ::milvus::grpc::ErrorCode::TABLE_NOT_EXISTS},
                {SERVER_INVALID_TABLE_NAME,       ::milvus::grpc::ErrorCode::ILLEGAL_TABLE_NAME},
                {SERVER_INVALID_TABLE_DIMENSION,  ::milvus::grpc::ErrorCode::ILLEGAL_DIMENSION},
                {SERVER_INVALID_TIME_RANGE,       ::milvus::grpc::ErrorCode::ILLEGAL_RANGE},
                {SERVER_INVALID_VECTOR_DIMENSION, ::milvus::grpc::ErrorCode::ILLEGAL_DIMENSION},

                {SERVER_INVALID_INDEX_TYPE,       ::milvus::grpc::ErrorCode::ILLEGAL_INDEX_TYPE},
                {SERVER_INVALID_ROWRECORD,        ::milvus::grpc::ErrorCode::ILLEGAL_ROWRECORD},
                {SERVER_INVALID_ROWRECORD_ARRAY,  ::milvus::grpc::ErrorCode::ILLEGAL_ROWRECORD},
                {SERVER_INVALID_TOPK,             ::milvus::grpc::ErrorCode::ILLEGAL_TOPK},
42
                {SERVER_INVALID_NPROBE,           ::milvus::grpc::ErrorCode::ILLEGAL_ARGUMENT},
S
starlord 已提交
43 44 45
                {SERVER_INVALID_INDEX_NLIST,      ::milvus::grpc::ErrorCode::ILLEGAL_NLIST},
                {SERVER_INVALID_INDEX_METRIC_TYPE,::milvus::grpc::ErrorCode::ILLEGAL_METRIC_TYPE},
                {SERVER_INVALID_INDEX_FILE_SIZE,  ::milvus::grpc::ErrorCode::ILLEGAL_ARGUMENT},
K
kun yu 已提交
46 47 48 49 50
                {SERVER_ILLEGAL_VECTOR_ID,        ::milvus::grpc::ErrorCode::ILLEGAL_VECTOR_ID},
                {SERVER_ILLEGAL_SEARCH_RESULT,    ::milvus::grpc::ErrorCode::ILLEGAL_SEARCH_RESULT},
                {SERVER_CACHE_ERROR,              ::milvus::grpc::ErrorCode::CACHE_FAILED},
                {DB_META_TRANSACTION_FAILED,      ::milvus::grpc::ErrorCode::META_FAILED},
                {SERVER_BUILD_INDEX_ERROR,        ::milvus::grpc::ErrorCode::BUILD_INDEX_ERROR},
K
kun yu 已提交
51 52
        };

S
starlord 已提交
53 54 55 56 57
        if(code_map.find(code) != code_map.end()) {
            return code_map.at(code);
        } else {
            return ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR;
        }
K
kun yu 已提交
58 59 60 61
    }
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Y
Yu Kun 已提交
62
GrpcBaseTask::GrpcBaseTask(const std::string &task_group, bool async)
K
kun yu 已提交
63 64 65 66 67 68 69
        : task_group_(task_group),
          async_(async),
          done_(false),
          error_code_(SERVER_SUCCESS) {

}

Y
Yu Kun 已提交
70
GrpcBaseTask::~GrpcBaseTask() {
K
kun yu 已提交
71 72 73
    WaitToFinish();
}

S
starlord 已提交
74
ErrorCode GrpcBaseTask::Execute() {
K
kun yu 已提交
75
    error_code_ = OnExecute();
S
starlord 已提交
76 77 78 79 80
    Done();
    return error_code_;
}

void GrpcBaseTask::Done() {
K
kun yu 已提交
81 82 83 84
    done_ = true;
    finish_cond_.notify_all();
}

S
starlord 已提交
85
ErrorCode GrpcBaseTask::SetError(ErrorCode error_code, const std::string &error_msg) {
K
kun yu 已提交
86 87 88 89 90 91 92
    error_code_ = error_code;
    error_msg_ = error_msg;

    SERVER_LOG_ERROR << error_msg_;
    return error_code_;
}

S
starlord 已提交
93
ErrorCode GrpcBaseTask::WaitToFinish() {
Y
Yu Kun 已提交
94
    std::unique_lock<std::mutex> lock(finish_mtx_);
K
kun yu 已提交
95 96 97 98 99 100
    finish_cond_.wait(lock, [this] { return done_; });

    return error_code_;
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Y
Yu Kun 已提交
101
GrpcRequestScheduler::GrpcRequestScheduler()
K
kun yu 已提交
102 103 104 105
        : stopped_(false) {
    Start();
}

Y
Yu Kun 已提交
106
GrpcRequestScheduler::~GrpcRequestScheduler() {
K
kun yu 已提交
107 108 109
    Stop();
}

S
starlord 已提交
110
void GrpcRequestScheduler::ExecTask(BaseTaskPtr &task_ptr, ::milvus::grpc::Status *grpc_status) {
Y
Yu Kun 已提交
111
    if (task_ptr == nullptr) {
K
kun yu 已提交
112 113 114
        return;
    }

Y
Yu Kun 已提交
115
    GrpcRequestScheduler &scheduler = GrpcRequestScheduler::GetInstance();
K
kun yu 已提交
116 117
    scheduler.ExecuteTask(task_ptr);

Y
Yu Kun 已提交
118
    if (!task_ptr->IsAsync()) {
K
kun yu 已提交
119
        task_ptr->WaitToFinish();
S
starlord 已提交
120
        ErrorCode err = task_ptr->ErrorID();
K
kun yu 已提交
121 122
        if (err != SERVER_SUCCESS) {
            grpc_status->set_reason(task_ptr->ErrorMsg());
S
starlord 已提交
123
            grpc_status->set_error_code(ErrorMap(err));
K
kun yu 已提交
124 125 126 127
        }
    }
}

S
starlord 已提交
128
void GrpcRequestScheduler::Start() {
Y
Yu Kun 已提交
129
    if (!stopped_) {
K
kun yu 已提交
130 131 132 133 134 135
        return;
    }

    stopped_ = false;
}

S
starlord 已提交
136
void GrpcRequestScheduler::Stop() {
Y
Yu Kun 已提交
137
    if (stopped_) {
K
kun yu 已提交
138 139 140 141 142 143
        return;
    }

    SERVER_LOG_INFO << "Scheduler gonna stop...";
    {
        std::lock_guard<std::mutex> lock(queue_mtx_);
Y
Yu Kun 已提交
144 145
        for (auto iter : task_groups_) {
            if (iter.second != nullptr) {
K
kun yu 已提交
146 147 148 149 150
                iter.second->Put(nullptr);
            }
        }
    }

Y
Yu Kun 已提交
151 152
    for (auto iter : execute_threads_) {
        if (iter == nullptr)
K
kun yu 已提交
153 154 155 156 157 158 159 160
            continue;

        iter->join();
    }
    stopped_ = true;
    SERVER_LOG_INFO << "Scheduler stopped";
}

S
starlord 已提交
161
ErrorCode GrpcRequestScheduler::ExecuteTask(const BaseTaskPtr &task_ptr) {
Y
Yu Kun 已提交
162
    if (task_ptr == nullptr) {
K
kun yu 已提交
163 164 165
        return SERVER_NULL_POINTER;
    }

S
starlord 已提交
166
    ErrorCode err = PutTaskToQueue(task_ptr);
Y
Yu Kun 已提交
167 168
    if (err != SERVER_SUCCESS) {
        SERVER_LOG_ERROR << "Put task to queue failed with code: " << err;
K
kun yu 已提交
169 170 171
        return err;
    }

Y
Yu Kun 已提交
172
    if (task_ptr->IsAsync()) {
K
kun yu 已提交
173 174 175 176 177 178 179
        return SERVER_SUCCESS;//async execution, caller need to call WaitToFinish at somewhere
    }

    return task_ptr->WaitToFinish();//sync execution
}


S
starlord 已提交
180 181 182 183 184 185 186 187 188 189 190
void GrpcRequestScheduler::TakeTaskToExecute(TaskQueuePtr task_queue) {
    if (task_queue == nullptr) {
        return;
    }

    while (true) {
        BaseTaskPtr task = task_queue->Take();
        if (task == nullptr) {
            SERVER_LOG_ERROR << "Take null from task queue, stop thread";
            break;//stop the thread
        }
K
kun yu 已提交
191

S
starlord 已提交
192
        try {
S
starlord 已提交
193
            ErrorCode err = task->Execute();
S
starlord 已提交
194 195
            if (err != SERVER_SUCCESS) {
                SERVER_LOG_ERROR << "Task failed with code: " << err;
K
kun yu 已提交
196
            }
S
starlord 已提交
197 198
        } catch (std::exception &ex) {
            SERVER_LOG_ERROR << "Task failed to execute: " << ex.what();
K
kun yu 已提交
199 200 201 202
        }
    }
}

S
starlord 已提交
203
ErrorCode GrpcRequestScheduler::PutTaskToQueue(const BaseTaskPtr &task_ptr) {
K
kun yu 已提交
204 205 206
    std::lock_guard<std::mutex> lock(queue_mtx_);

    std::string group_name = task_ptr->TaskGroup();
Y
Yu Kun 已提交
207
    if (task_groups_.count(group_name) > 0) {
K
kun yu 已提交
208 209 210 211 212 213 214
        task_groups_[group_name]->Put(task_ptr);
    } else {
        TaskQueuePtr queue = std::make_shared<TaskQueue>();
        queue->Put(task_ptr);
        task_groups_.insert(std::make_pair(group_name, queue));

        //start a thread
S
starlord 已提交
215
        ThreadPtr thread = std::make_shared<std::thread>(&GrpcRequestScheduler::TakeTaskToExecute, this, queue);
K
kun yu 已提交
216 217 218 219 220 221 222 223 224 225
        execute_threads_.push_back(thread);
        SERVER_LOG_INFO << "Create new thread for task group: " << group_name;
    }

    return SERVER_SUCCESS;
}

}
}
}
Y
Yu Kun 已提交
226
}