提交 a56333b5 编写于 作者: G groot

throw exception


Former-commit-id: 6a8ae6f0c0b73fc9411b039143a1650e6ab7cc02
上级 354f68a9
......@@ -38,6 +38,62 @@ namespace {
void TimeRecord(const std::string& func_name) {
}
const std::map<ServerError, zilliz::VecErrCode::type>& ErrorMap() {
static const std::map<ServerError, zilliz::VecErrCode::type> code_map = {
{SERVER_UNEXPECTED_ERROR, zilliz::VecErrCode::ILLEGAL_ARGUMENT},
{SERVER_NULL_POINTER, zilliz::VecErrCode::ILLEGAL_ARGUMENT},
{SERVER_INVALID_ARGUMENT, zilliz::VecErrCode::ILLEGAL_ARGUMENT},
{SERVER_FILE_NOT_FOUND, zilliz::VecErrCode::ILLEGAL_ARGUMENT},
{SERVER_NOT_IMPLEMENT, zilliz::VecErrCode::ILLEGAL_ARGUMENT},
{SERVER_BLOCKING_QUEUE_EMPTY, zilliz::VecErrCode::ILLEGAL_ARGUMENT},
{SERVER_GROUP_NOT_EXIST, zilliz::VecErrCode::GROUP_NOT_EXISTS},
{SERVER_INVALID_TIME_RANGE, zilliz::VecErrCode::ILLEGAL_TIME_RANGE},
{SERVER_INVALID_VECTOR_DIMENSION, zilliz::VecErrCode::ILLEGAL_VECTOR_DIMENSION},
};
return code_map;
}
const std::map<ServerError, std::string>& ErrorMessage() {
static const std::map<ServerError, std::string> msg_map = {
{SERVER_UNEXPECTED_ERROR, "unexpected error occurs"},
{SERVER_NULL_POINTER, "null pointer error"},
{SERVER_INVALID_ARGUMENT, "invalid argument"},
{SERVER_FILE_NOT_FOUND, "file not found"},
{SERVER_NOT_IMPLEMENT, "not implemented"},
{SERVER_BLOCKING_QUEUE_EMPTY, "queue empty"},
{SERVER_GROUP_NOT_EXIST, "group not exist"},
{SERVER_INVALID_TIME_RANGE, "invalid time range"},
{SERVER_INVALID_VECTOR_DIMENSION, "invalid vector dimension"},
};
return msg_map;
}
void ExecTask(BaseTaskPtr& task_ptr) {
if(task_ptr == nullptr) {
return;
}
VecServiceScheduler& scheduler = VecServiceScheduler::GetInstance();
scheduler.ExecuteTask(task_ptr);
if(!task_ptr->IsAsync()) {
task_ptr->WaitToFinish();
ServerError err = task_ptr->ErrorCode();
if (err != SERVER_SUCCESS) {
zilliz::VecException ex;
ex.__set_code(ErrorMap().at(err));
std::string msg = task_ptr->ErrorMsg();
if(msg.empty()){
msg = ErrorMessage().at(err);
}
ex.__set_reason(msg);
throw ex;
}
}
}
}
void
......@@ -47,8 +103,7 @@ VecServiceHandler::add_group(const VecGroup &group) {
<< ", group.index_type = " << group.index_type;
BaseTaskPtr task_ptr = AddGroupTask::Create(group.dimension, group.id);
VecServiceScheduler& scheduler = VecServiceScheduler::GetInstance();
scheduler.ExecuteTask(task_ptr);
ExecTask(task_ptr);
}
void
......@@ -58,8 +113,7 @@ VecServiceHandler::get_group(VecGroup &_return, const std::string &group_id) {
_return.id = group_id;
BaseTaskPtr task_ptr = GetGroupTask::Create(group_id, _return.dimension);
VecServiceScheduler& scheduler = VecServiceScheduler::GetInstance();
scheduler.ExecuteTask(task_ptr);
ExecTask(task_ptr);
}
void
......@@ -68,8 +122,7 @@ VecServiceHandler::del_group(const std::string &group_id) {
SERVER_LOG_TRACE << "group_id = " << group_id;
BaseTaskPtr task_ptr = DeleteGroupTask::Create(group_id);
VecServiceScheduler& scheduler = VecServiceScheduler::GetInstance();
scheduler.ExecuteTask(task_ptr);
ExecTask(task_ptr);
}
......@@ -79,8 +132,7 @@ VecServiceHandler::add_vector(std::string& _return, const std::string &group_id,
SERVER_LOG_TRACE << "group_id = " << group_id << ", vector size = " << tensor.tensor.size();
BaseTaskPtr task_ptr = AddVectorTask::Create(group_id, &tensor, _return);
VecServiceScheduler& scheduler = VecServiceScheduler::GetInstance();
scheduler.ExecuteTask(task_ptr);
ExecTask(task_ptr);
}
void
......@@ -92,8 +144,7 @@ VecServiceHandler::add_vector_batch(std::vector<std::string> & _return,
<< tensor_list.tensor_list.size();
BaseTaskPtr task_ptr = AddBatchVectorTask::Create(group_id, &tensor_list, _return);
VecServiceScheduler& scheduler = VecServiceScheduler::GetInstance();
scheduler.ExecuteTask(task_ptr);
ExecTask(task_ptr);
}
void
......@@ -104,8 +155,7 @@ VecServiceHandler::add_binary_vector(std::string& _return,
SERVER_LOG_TRACE << "group_id = " << group_id << ", vector size = " << tensor.tensor.size()/4;
BaseTaskPtr task_ptr = AddVectorTask::Create(group_id, &tensor, _return);
VecServiceScheduler& scheduler = VecServiceScheduler::GetInstance();
scheduler.ExecuteTask(task_ptr);
ExecTask(task_ptr);
}
void
......@@ -117,8 +167,7 @@ VecServiceHandler::add_binary_vector_batch(std::vector<std::string> & _return,
<< tensor_list.tensor_list.size();
BaseTaskPtr task_ptr = AddBatchVectorTask::Create(group_id, &tensor_list, _return);
VecServiceScheduler& scheduler = VecServiceScheduler::GetInstance();
scheduler.ExecuteTask(task_ptr);
ExecTask(task_ptr);
}
void
......@@ -135,8 +184,7 @@ VecServiceHandler::search_vector(VecSearchResult &_return,
tensor_list.tensor_list.push_back(tensor);
VecSearchResultList result;
BaseTaskPtr task_ptr = SearchVectorTask::Create(group_id, top_k, &tensor_list, filter, result);
VecServiceScheduler& scheduler = VecServiceScheduler::GetInstance();
scheduler.ExecuteTask(task_ptr);
ExecTask(task_ptr);
if(!result.result_list.empty()) {
_return = result.result_list[0];
......@@ -156,8 +204,7 @@ VecServiceHandler::search_vector_batch(VecSearchResultList &_return,
<< ", vector list size = " << tensor_list.tensor_list.size();
BaseTaskPtr task_ptr = SearchVectorTask::Create(group_id, top_k, &tensor_list, filter, _return);
VecServiceScheduler& scheduler = VecServiceScheduler::GetInstance();
scheduler.ExecuteTask(task_ptr);
ExecTask(task_ptr);
}
void
......@@ -174,8 +221,7 @@ VecServiceHandler::search_binary_vector(VecSearchResult& _return,
tensor_list.tensor_list.push_back(tensor);
VecSearchResultList result;
BaseTaskPtr task_ptr = SearchVectorTask::Create(group_id, top_k, &tensor_list, filter, result);
VecServiceScheduler& scheduler = VecServiceScheduler::GetInstance();
scheduler.ExecuteTask(task_ptr);
ExecTask(task_ptr);
if(!result.result_list.empty()) {
_return = result.result_list[0];
......@@ -195,8 +241,7 @@ VecServiceHandler::search_binary_vector_batch(VecSearchResultList& _return,
<< ", vector list size = " << tensor_list.tensor_list.size();
BaseTaskPtr task_ptr = SearchVectorTask::Create(group_id, top_k, &tensor_list, filter, _return);
VecServiceScheduler& scheduler = VecServiceScheduler::GetInstance();
scheduler.ExecuteTask(task_ptr);
ExecTask(task_ptr);
}
......
......@@ -27,6 +27,7 @@ public:
std::string TaskGroup() const { return task_group_; }
ServerError ErrorCode() const { return error_code_; }
std::string ErrorMsg() const { return error_msg_; }
bool IsAsync() const { return async_; }
......@@ -41,6 +42,7 @@ protected:
bool async_;
bool done_;
ServerError error_code_;
std::string error_msg_;
};
using BaseTaskPtr = std::shared_ptr<BaseTask>;
......
......@@ -92,12 +92,16 @@ ServerError AddGroupTask::OnExecute() {
group_info.group_id = group_id_;
engine::Status stat = DB()->add_group(group_info);
if(!stat.ok()) {
SERVER_LOG_ERROR << "Engine failed: " << stat.ToString();
return SERVER_UNEXPECTED_ERROR;
error_code_ = SERVER_UNEXPECTED_ERROR;
error_msg_ = "Engine failed: " + stat.ToString();
SERVER_LOG_ERROR << error_msg_;
return error_code_;
}
} catch (std::exception& ex) {
SERVER_LOG_ERROR << ex.what();
error_code_ = SERVER_UNEXPECTED_ERROR;
error_msg_ = ex.what();
SERVER_LOG_ERROR << error_msg_;
return SERVER_UNEXPECTED_ERROR;
}
......@@ -124,14 +128,18 @@ ServerError GetGroupTask::OnExecute() {
group_info.group_id = group_id_;
engine::Status stat = DB()->get_group(group_info);
if(!stat.ok()) {
SERVER_LOG_ERROR << "Engine failed: " << stat.ToString();
return SERVER_UNEXPECTED_ERROR;
error_code_ = SERVER_GROUP_NOT_EXIST;
error_msg_ = "Engine failed: " + stat.ToString();
SERVER_LOG_ERROR << error_msg_;
return error_code_;
} else {
dimension_ = (int32_t)group_info.dimension;
}
} catch (std::exception& ex) {
SERVER_LOG_ERROR << ex.what();
error_code_ = SERVER_UNEXPECTED_ERROR;
error_msg_ = ex.what();
SERVER_LOG_ERROR << error_msg_;
return SERVER_UNEXPECTED_ERROR;
}
......@@ -150,14 +158,10 @@ BaseTaskPtr DeleteGroupTask::Create(const std::string& group_id) {
}
ServerError DeleteGroupTask::OnExecute() {
try {
} catch (std::exception& ex) {
SERVER_LOG_ERROR << ex.what();
}
return SERVER_SUCCESS;
error_code_ = SERVER_NOT_IMPLEMENT;
error_msg_ = "delete group not implemented";
SERVER_LOG_ERROR << error_msg_;
return SERVER_NOT_IMPLEMENT;
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
......@@ -241,8 +245,10 @@ ServerError AddVectorTask::OnExecute() {
group_info.group_id = group_id_;
engine::Status stat = DB()->get_group(group_info);
if(!stat.ok()) {
SERVER_LOG_ERROR << "Engine failed: " << stat.ToString();
return SERVER_INVALID_ARGUMENT;
error_code_ = SERVER_GROUP_NOT_EXIST;
error_msg_ = "Engine failed: " + stat.ToString();
SERVER_LOG_ERROR << error_msg_;
return error_code_;
}
uint64_t group_dim = group_info.dimension;
......@@ -250,7 +256,10 @@ ServerError AddVectorTask::OnExecute() {
if(group_dim != vec_dim) {
SERVER_LOG_ERROR << "Invalid vector dimension: " << vec_dim
<< " vs. group dimension:" << group_dim;
return SERVER_INVALID_ARGUMENT;
error_code_ = SERVER_INVALID_VECTOR_DIMENSION;
error_msg_ = "Engine failed: " + stat.ToString();
SERVER_LOG_ERROR << error_msg_;
return error_code_;
}
std::vector<float> vec_f;
......@@ -263,8 +272,10 @@ ServerError AddVectorTask::OnExecute() {
engine::IDNumbers vector_ids;
stat = DB()->add_vectors(group_id_, 1, vec_f.data(), vector_ids);
if(!stat.ok()) {
SERVER_LOG_ERROR << "Engine failed: " << stat.ToString();
return SERVER_UNEXPECTED_ERROR;
error_code_ = SERVER_UNEXPECTED_ERROR;
error_msg_ = "Engine failed: " + stat.ToString();
SERVER_LOG_ERROR << error_msg_;
return error_code_;
} else {
if(vector_ids.empty()) {
SERVER_LOG_ERROR << "Vector ID not returned";
......@@ -289,8 +300,10 @@ ServerError AddVectorTask::OnExecute() {
}
} catch (std::exception& ex) {
SERVER_LOG_ERROR << ex.what();
return SERVER_UNEXPECTED_ERROR;
error_code_ = SERVER_UNEXPECTED_ERROR;
error_msg_ = ex.what();
SERVER_LOG_ERROR << error_msg_;
return error_code_;
}
return SERVER_SUCCESS;
......@@ -433,8 +446,10 @@ ServerError AddBatchVectorTask::OnExecute() {
group_info.group_id = group_id_;
engine::Status stat = DB()->get_group(group_info);
if(!stat.ok()) {
SERVER_LOG_ERROR << "Engine failed: " << stat.ToString();
return SERVER_UNEXPECTED_ERROR;
error_code_ = SERVER_GROUP_NOT_EXIST;
error_msg_ = "Engine failed: " + stat.ToString();
SERVER_LOG_ERROR << error_msg_;
return error_code_;
}
rc.Record("check group dimension");
......@@ -447,7 +462,10 @@ ServerError AddBatchVectorTask::OnExecute() {
if(vec_dim != group_dim) {
SERVER_LOG_ERROR << "Invalid vector dimension: " << vec_dim
<< " vs. group dimension:" << group_dim;
return SERVER_INVALID_ARGUMENT;
error_code_ = SERVER_INVALID_VECTOR_DIMENSION;
error_msg_ = "Engine failed: " + stat.ToString();
SERVER_LOG_ERROR << error_msg_;
return error_code_;
}
const double* d_p = GetVecData(i);
......@@ -462,43 +480,48 @@ ServerError AddBatchVectorTask::OnExecute() {
stat = DB()->add_vectors(group_id_, vec_count, vec_f.data(), vector_ids);
rc.Record("add vectors to engine");
if(!stat.ok()) {
SERVER_LOG_ERROR << "Engine failed: " << stat.ToString();
error_code_ = SERVER_UNEXPECTED_ERROR;
error_msg_ = "Engine failed: " + stat.ToString();
SERVER_LOG_ERROR << error_msg_;
return error_code_;
}
if(vector_ids.size() < vec_count) {
SERVER_LOG_ERROR << "Vector ID not returned";
return SERVER_UNEXPECTED_ERROR;
} else {
if(vector_ids.size() < vec_count) {
SERVER_LOG_ERROR << "Vector ID not returned";
return SERVER_UNEXPECTED_ERROR;
tensor_ids_.resize(vector_ids.size());
if(vec_count < USE_MT) {
ProcessIdMapping(vector_ids, 0, vec_count, tensor_ids_);
rc.Record("built id mapping");
} else {
tensor_ids_.resize(vector_ids.size());
if(vec_count < USE_MT) {
ProcessIdMapping(vector_ids, 0, vec_count, tensor_ids_);
rc.Record("built id mapping");
} else {
std::list<std::future<void>> threads_list;
uint64_t begin_index = 0, end_index = USE_MT;
while(end_index < vec_count) {
threads_list.push_back(
GetThreadPool().enqueue(&AddBatchVectorTask::ProcessIdMapping,
this, vector_ids, begin_index, end_index, tensor_ids_));
begin_index = end_index;
end_index += USE_MT;
if(end_index > vec_count) {
end_index = vec_count;
}
}
for (std::list<std::future<void>>::iterator it = threads_list.begin(); it != threads_list.end(); it++) {
it->wait();
std::list<std::future<void>> threads_list;
uint64_t begin_index = 0, end_index = USE_MT;
while(end_index < vec_count) {
threads_list.push_back(
GetThreadPool().enqueue(&AddBatchVectorTask::ProcessIdMapping,
this, vector_ids, begin_index, end_index, tensor_ids_));
begin_index = end_index;
end_index += USE_MT;
if(end_index > vec_count) {
end_index = vec_count;
}
}
rc.Record("built id mapping by multi-threads:" + std::to_string(threads_list.size()));
for (std::list<std::future<void>>::iterator it = threads_list.begin(); it != threads_list.end(); it++) {
it->wait();
}
rc.Record("built id mapping by multi-threads:" + std::to_string(threads_list.size()));
}
}
} catch (std::exception& ex) {
SERVER_LOG_ERROR << ex.what();
return SERVER_UNEXPECTED_ERROR;
error_code_ = SERVER_UNEXPECTED_ERROR;
error_msg_ = ex.what();
SERVER_LOG_ERROR << error_msg_;
return error_code_;
}
return SERVER_SUCCESS;
......@@ -612,8 +635,10 @@ ServerError SearchVectorTask::OnExecute() {
group_info.group_id = group_id_;
engine::Status stat = DB()->get_group(group_info);
if(!stat.ok()) {
SERVER_LOG_ERROR << "Engine failed: " << stat.ToString();
return SERVER_UNEXPECTED_ERROR;
error_code_ = SERVER_GROUP_NOT_EXIST;
error_msg_ = "Engine failed: " + stat.ToString();
SERVER_LOG_ERROR << error_msg_;
return error_code_;
}
uint64_t vec_dim = GetTargetDimension();
......@@ -674,8 +699,10 @@ ServerError SearchVectorTask::OnExecute() {
}
} catch (std::exception& ex) {
SERVER_LOG_ERROR << ex.what();
return SERVER_UNEXPECTED_ERROR;
error_code_ = SERVER_UNEXPECTED_ERROR;
error_msg_ = ex.what();
SERVER_LOG_ERROR << error_msg_;
return error_code_;
}
return SERVER_SUCCESS;
......
#!/bin/bash
../../third_party/build/bin/thrift -r --gen cpp ./VectorService.thrift
../../third_party/build/bin/thrift -r --gen cpp ./megasearch.thrift
......@@ -82,8 +82,8 @@ def test_vecwise():
transport.close()
time_start = print_time_cost('close connection', time_start)
except Thrift.TException as ex:
print(ex.message)
except VecService.VecException as ex:
print(ex.reason)
test_vecwise()
\ No newline at end of file
#!/bin/bash
../../third_party/build/bin/thrift -r --gen py ./VectorService.thrift
../../third_party/build/bin/thrift -r --gen py ./megasearch.thrift
......@@ -21,7 +21,7 @@ constexpr ServerError SERVER_ERROR_CODE_BASE = 0x30000;
constexpr ServerError
ToGlobalServerErrorCode(const ServerError error_code) {
return SERVER_ERROR_CODE_BASE + SERVER_ERROR_CODE_BASE;
return SERVER_ERROR_CODE_BASE + error_code;
}
constexpr ServerError SERVER_UNEXPECTED_ERROR = ToGlobalServerErrorCode(0x001);
......@@ -31,6 +31,9 @@ constexpr ServerError SERVER_INVALID_ARGUMENT = ToGlobalServerErrorCode(0x004);
constexpr ServerError SERVER_FILE_NOT_FOUND = ToGlobalServerErrorCode(0x005);
constexpr ServerError SERVER_NOT_IMPLEMENT = ToGlobalServerErrorCode(0x006);
constexpr ServerError SERVER_BLOCKING_QUEUE_EMPTY = ToGlobalServerErrorCode(0x007);
constexpr ServerError SERVER_GROUP_NOT_EXIST = ToGlobalServerErrorCode(0x008);
constexpr ServerError SERVER_INVALID_TIME_RANGE = ToGlobalServerErrorCode(0x009);
constexpr ServerError SERVER_INVALID_VECTOR_DIMENSION = ToGlobalServerErrorCode(0x00a);
class ServerException : public std::exception {
public:
......
......@@ -126,6 +126,23 @@ TEST(AddVector, CLIENT_TEST) {
GetServerAddress(address, port, protocol);
client::ClientSession session(address, port, protocol);
//verify get invalid group
try {
std::string id;
zilliz::VecTensor tensor;
session.interface()->add_vector(id, GetGroupID(), tensor);
} catch (zilliz::VecException& ex) {
ASSERT_EQ(ex.code, zilliz::VecErrCode::GROUP_NOT_EXISTS);
}
try {
VecGroup temp_group;
session.interface()->get_group(temp_group, GetGroupID());
ASSERT_TRUE(temp_group.id.empty());
} catch (zilliz::VecException& ex) {
ASSERT_EQ(ex.code, zilliz::VecErrCode::GROUP_NOT_EXISTS);
}
//add group
VecGroup group;
group.id = GetGroupID();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册