提交 077c36ee 编写于 作者: X xj.lin

Merge branch 'release-v1' of 192.168.1.105:jinhai/vecwise_engine into release-v1


Former-commit-id: ec5221f0b08674678633ec4415b48399bb52b820
......@@ -14,3 +14,4 @@ cmake_build
cpp/third_party/thrift-0.12.0/
cpp/third_party/faiss-1.5.1
cpp/megasearch/
......@@ -17,8 +17,8 @@ set(license_check_files
set(service_files
thrift/gen-cpp/VecService.cpp
thrift/gen-cpp/VectorService_constants.cpp
thrift/gen-cpp/VectorService_types.cpp
thrift/gen-cpp/megasearch_constants.cpp
thrift/gen-cpp/megasearch_types.cpp
)
set(vecwise_engine_src
......
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "RocksIdMapper.h"
#include "ServerConfig.h"
#include "utils/Log.h"
#include "utils/CommonUtil.h"
#include "rocksdb/slice.h"
#include "rocksdb/options.h"
#include <exception>
namespace zilliz {
namespace vecwise {
namespace server {
RocksIdMapper::RocksIdMapper()
: db_(nullptr) {
OpenDb();
}
RocksIdMapper::~RocksIdMapper() {
CloseDb();
}
void RocksIdMapper::OpenDb() {
if(db_) {
return;
}
ConfigNode& config = ServerConfig::GetInstance().GetConfig(CONFIG_DB);
std::string db_path = config.GetValue(CONFIG_DB_PATH);
db_path += "/id_mapping";
CommonUtil::CreateDirectory(db_path);
rocksdb::Options options;
// Optimize RocksDB. This is the easiest way to get RocksDB to perform well
options.IncreaseParallelism();
options.OptimizeLevelStyleCompaction();
// create the DB if it's not already present
options.create_if_missing = true;
options.max_open_files = config.GetInt32Value(CONFIG_DB_IDMAPPER_MAX_FILE, 512);
//load column families
std::vector<std::string> column_names;
rocksdb::Status s = rocksdb::DB::ListColumnFamilies(options, db_path, &column_names);
if (!s.ok()) {
SERVER_LOG_ERROR << "ID mapper failed to initialize:" << s.ToString();
}
if(column_names.empty()) {
column_names.push_back("default");
}
SERVER_LOG_INFO << "ID mapper has " << std::to_string(column_names.size()) << " groups";
std::vector<rocksdb::ColumnFamilyDescriptor> column_families;
for(auto& column_name : column_names) {
rocksdb::ColumnFamilyDescriptor desc;
desc.name = column_name;
column_families.emplace_back(desc);
}
// open DB
std::vector<rocksdb::ColumnFamilyHandle*> column_handles;
s = rocksdb::DB::Open(options, db_path, column_families, &column_handles, &db_);
if(!s.ok()) {
SERVER_LOG_ERROR << "ID mapper failed to initialize:" << s.ToString();
db_ = nullptr;
}
column_handles_.clear();
for(auto handler : column_handles) {
column_handles_.insert(std::make_pair(handler->GetName(), handler));
}
}
void RocksIdMapper::CloseDb() {
for(auto& iter : column_handles_) {
delete iter.second;
}
column_handles_.clear();
if(db_) {
db_->Close();
delete db_;
}
}
ServerError RocksIdMapper::Put(const std::string& nid, const std::string& sid, const std::string& group) {
if(db_ == nullptr) {
return SERVER_NULL_POINTER;
}
rocksdb::Slice key(nid);
rocksdb::Slice value(sid);
if(group.empty()) {//to default group
rocksdb::Status s = db_->Put(rocksdb::WriteOptions(), key, value);
if (!s.ok()) {
SERVER_LOG_ERROR << "ID mapper failed to put:" << s.ToString();
return SERVER_UNEXPECTED_ERROR;
}
} else {
rocksdb::ColumnFamilyHandle *cfh = nullptr;
if(column_handles_.count(group) == 0) {
try {//add group
rocksdb::Status s = db_->CreateColumnFamily(rocksdb::ColumnFamilyOptions(), group, &cfh);
if (!s.ok()) {
SERVER_LOG_ERROR << "ID mapper failed to create group:" << s.ToString();
} else {
column_handles_.insert(std::make_pair(group, cfh));
}
} catch(std::exception& ex) {
std::cout << ex.what() << std::endl;
}
} else {
cfh = column_handles_[group];
}
rocksdb::Status s = db_->Put(rocksdb::WriteOptions(), cfh, key, value);
if (!s.ok()) {
SERVER_LOG_ERROR << "ID mapper failed to put:" << s.ToString();
return SERVER_UNEXPECTED_ERROR;
}
}
return SERVER_SUCCESS;
}
ServerError RocksIdMapper::Put(const std::vector<std::string>& nid, const std::vector<std::string>& sid, const std::string& group) {
if(nid.size() != sid.size()) {
return SERVER_INVALID_ARGUMENT;
}
ServerError err = SERVER_SUCCESS;
for(size_t i = 0; i < nid.size(); i++) {
err = Put(nid[i], sid[i], group);
if(err != SERVER_SUCCESS) {
return err;
}
}
return err;
}
ServerError RocksIdMapper::Get(const std::string& nid, std::string& sid, const std::string& group) const {
sid = "";
if(db_ == nullptr) {
return SERVER_NULL_POINTER;
}
rocksdb::ColumnFamilyHandle *cfh = nullptr;
if(column_handles_.count(group) != 0) {
cfh = column_handles_.at(group);
}
rocksdb::Slice key(nid);
rocksdb::Status s;
if(cfh){
s = db_->Get(rocksdb::ReadOptions(), cfh, key, &sid);
} else {
s = db_->Get(rocksdb::ReadOptions(), key, &sid);
}
if(!s.ok()) {
SERVER_LOG_ERROR << "ID mapper failed to get:" << s.ToString();
return SERVER_UNEXPECTED_ERROR;
}
return SERVER_SUCCESS;
}
ServerError RocksIdMapper::Get(const std::vector<std::string>& nid, std::vector<std::string>& sid, const std::string& group) const {
sid.clear();
ServerError err = SERVER_SUCCESS;
for(size_t i = 0; i < nid.size(); i++) {
std::string str_id;
ServerError temp_err = Get(nid[i], str_id, group);
if(temp_err != SERVER_SUCCESS) {
sid.push_back("");
SERVER_LOG_ERROR << "ID mapper failed to get id: " << nid[i];
err = temp_err;
continue;
}
sid.push_back(str_id);
}
return err;
}
ServerError RocksIdMapper::Delete(const std::string& nid, const std::string& group) {
if(db_ == nullptr) {
return SERVER_NULL_POINTER;
}
rocksdb::ColumnFamilyHandle *cfh = nullptr;
if(column_handles_.count(group) != 0) {
cfh = column_handles_.at(group);
}
rocksdb::Slice key(nid);
rocksdb::Status s;
if(cfh){
s = db_->Delete(rocksdb::WriteOptions(), cfh, key);
} else {
s = db_->Delete(rocksdb::WriteOptions(), key);
}
if(!s.ok()) {
SERVER_LOG_ERROR << "ID mapper failed to delete:" << s.ToString();
return SERVER_UNEXPECTED_ERROR;
}
return SERVER_SUCCESS;
}
ServerError RocksIdMapper::DeleteGroup(const std::string& group) {
if(db_ == nullptr) {
return SERVER_NULL_POINTER;
}
rocksdb::ColumnFamilyHandle *cfh = nullptr;
if(column_handles_.count(group) != 0) {
cfh = column_handles_.at(group);
}
if(cfh) {
db_->DropColumnFamily(cfh);
db_->DestroyColumnFamilyHandle(cfh);
column_handles_.erase(group);
}
return SERVER_SUCCESS;
}
}
}
}
\ No newline at end of file
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include "utils/Error.h"
#include "VecIdMapper.h"
#include "rocksdb/db.h"
#include <string>
#include <vector>
#include <unordered_map>
namespace zilliz {
namespace vecwise {
namespace server {
class RocksIdMapper : public IVecIdMapper{
public:
RocksIdMapper();
~RocksIdMapper();
ServerError Put(const std::string& nid, const std::string& sid, const std::string& group = "") override;
ServerError Put(const std::vector<std::string>& nid, const std::vector<std::string>& sid, const std::string& group = "") override;
ServerError Get(const std::string& nid, std::string& sid, const std::string& group = "") const override;
ServerError Get(const std::vector<std::string>& nid, std::vector<std::string>& sid, const std::string& group = "") const override;
ServerError Delete(const std::string& nid, const std::string& group = "") override;
ServerError DeleteGroup(const std::string& group) override;
private:
void OpenDb();
void CloseDb();
private:
rocksdb::DB* db_;
std::unordered_map<std::string, rocksdb::ColumnFamilyHandle*> column_handles_;
};
}
}
}
......@@ -5,6 +5,7 @@
******************************************************************************/
#include "VecIdMapper.h"
#include "RocksIdMapper.h"
#include "ServerConfig.h"
#include "utils/Log.h"
#include "utils/CommonUtil.h"
......@@ -13,6 +14,8 @@
#include "rocksdb/slice.h"
#include "rocksdb/options.h"
#include <exception>
namespace zilliz {
namespace vecwise {
namespace server {
......@@ -36,26 +39,33 @@ SimpleIdMapper::~SimpleIdMapper() {
}
ServerError SimpleIdMapper::Put(const std::string& nid, const std::string& sid) {
ids_[nid] = sid;
//not thread-safe
ServerError SimpleIdMapper::Put(const std::string& nid, const std::string& sid, const std::string& group) {
ID_MAPPING& mapping = id_groups_[group];
mapping[nid] = sid;
return SERVER_SUCCESS;
}
ServerError SimpleIdMapper::Put(const std::vector<std::string>& nid, const std::vector<std::string>& sid) {
//not thread-safe
ServerError SimpleIdMapper::Put(const std::vector<std::string>& nid, const std::vector<std::string>& sid, const std::string& group) {
if(nid.size() != sid.size()) {
return SERVER_INVALID_ARGUMENT;
}
ID_MAPPING& mapping = id_groups_[group];
for(size_t i = 0; i < nid.size(); i++) {
ids_[nid[i]] = sid[i];
mapping[nid[i]] = sid[i];
}
return SERVER_SUCCESS;
}
ServerError SimpleIdMapper::Get(const std::string& nid, std::string& sid) const {
auto iter = ids_.find(nid);
if(iter == ids_.end()) {
//not thread-safe
ServerError SimpleIdMapper::Get(const std::string& nid, std::string& sid, const std::string& group) const {
ID_MAPPING& mapping = id_groups_[group];
auto iter = mapping.find(nid);
if(iter == mapping.end()) {
return SERVER_INVALID_ARGUMENT;
}
......@@ -64,13 +74,16 @@ ServerError SimpleIdMapper::Get(const std::string& nid, std::string& sid) const
return SERVER_SUCCESS;
}
ServerError SimpleIdMapper::Get(const std::vector<std::string>& nid, std::vector<std::string>& sid) const {
//not thread-safe
ServerError SimpleIdMapper::Get(const std::vector<std::string>& nid, std::vector<std::string>& sid, const std::string& group) const {
sid.clear();
ID_MAPPING& mapping = id_groups_[group];
ServerError err = SERVER_SUCCESS;
for(size_t i = 0; i < nid.size(); i++) {
auto iter = ids_.find(nid[i]);
if(iter == ids_.end()) {
auto iter = mapping.find(nid[i]);
if(iter == mapping.end()) {
sid.push_back("");
SERVER_LOG_ERROR << "ID mapper failed to find id: " << nid[i];
err = SERVER_INVALID_ARGUMENT;
......@@ -83,120 +96,16 @@ ServerError SimpleIdMapper::Get(const std::vector<std::string>& nid, std::vector
return err;
}
ServerError SimpleIdMapper::Delete(const std::string& nid) {
ids_.erase(nid);
//not thread-safe
ServerError SimpleIdMapper::Delete(const std::string& nid, const std::string& group) {
ID_MAPPING& mapping = id_groups_[group];
mapping.erase(nid);
return SERVER_SUCCESS;
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
RocksIdMapper::RocksIdMapper() {
ConfigNode& config = ServerConfig::GetInstance().GetConfig(CONFIG_DB);
std::string db_path = config.GetValue(CONFIG_DB_PATH);
db_path += "/id_mapping";
CommonUtil::CreateDirectory(db_path);
rocksdb::Options options;
// Optimize RocksDB. This is the easiest way to get RocksDB to perform well
options.IncreaseParallelism();
options.OptimizeLevelStyleCompaction();
// create the DB if it's not already present
options.create_if_missing = true;
options.max_open_files = config.GetInt32Value(CONFIG_DB_IDMAPPER_MAX_FILE, 128);
// open DB
rocksdb::Status s = rocksdb::DB::Open(options, db_path, &db_);
if(!s.ok()) {
SERVER_LOG_ERROR << "ID mapper failed to initialize:" << s.ToString();
db_ = nullptr;
}
}
RocksIdMapper::~RocksIdMapper() {
if(db_) {
db_->Close();
delete db_;
}
}
ServerError RocksIdMapper::Put(const std::string& nid, const std::string& sid) {
if(db_ == nullptr) {
return SERVER_NULL_POINTER;
}
rocksdb::Slice key(nid);
rocksdb::Slice value(sid);
rocksdb::Status s = db_->Put(rocksdb::WriteOptions(), key, value);
if(!s.ok()) {
SERVER_LOG_ERROR << "ID mapper failed to put:" << s.ToString();
return SERVER_UNEXPECTED_ERROR;
}
return SERVER_SUCCESS;
}
ServerError RocksIdMapper::Put(const std::vector<std::string>& nid, const std::vector<std::string>& sid) {
if(nid.size() != sid.size()) {
return SERVER_INVALID_ARGUMENT;
}
ServerError err = SERVER_SUCCESS;
for(size_t i = 0; i < nid.size(); i++) {
err = Put(nid[i], sid[i]);
if(err != SERVER_SUCCESS) {
return err;
}
}
return err;
}
ServerError RocksIdMapper::Get(const std::string& nid, std::string& sid) const {
if(db_ == nullptr) {
return SERVER_NULL_POINTER;
}
rocksdb::Slice key(nid);
rocksdb::Status s = db_->Get(rocksdb::ReadOptions(), key, &sid);
if(!s.ok()) {
SERVER_LOG_ERROR << "ID mapper failed to get:" << s.ToString();
return SERVER_UNEXPECTED_ERROR;
}
return SERVER_SUCCESS;
}
ServerError RocksIdMapper::Get(const std::vector<std::string>& nid, std::vector<std::string>& sid) const {
sid.clear();
ServerError err = SERVER_SUCCESS;
for(size_t i = 0; i < nid.size(); i++) {
std::string str_id;
ServerError temp_err = Get(nid[i], str_id);
if(temp_err != SERVER_SUCCESS) {
sid.push_back("");
SERVER_LOG_ERROR << "ID mapper failed to get id: " << nid[i];
err = temp_err;
continue;
}
sid.push_back(str_id);
}
return err;
}
ServerError RocksIdMapper::Delete(const std::string& nid) {
if(db_ == nullptr) {
return SERVER_NULL_POINTER;
}
rocksdb::Slice key(nid);
rocksdb::Status s = db_->Delete(rocksdb::WriteOptions(), key);
if(!s.ok()) {
SERVER_LOG_ERROR << "ID mapper failed to delete:" << s.ToString();
return SERVER_UNEXPECTED_ERROR;
}
//not thread-safe
ServerError SimpleIdMapper::DeleteGroup(const std::string& group) {
id_groups_.erase(group);
return SERVER_SUCCESS;
}
......
......@@ -25,14 +25,15 @@ public:
virtual ~IVecIdMapper(){}
virtual ServerError Put(const std::string& nid, const std::string& sid) = 0;
virtual ServerError Put(const std::vector<std::string>& nid, const std::vector<std::string>& sid) = 0;
virtual ServerError Put(const std::string& nid, const std::string& sid, const std::string& group = "") = 0;
virtual ServerError Put(const std::vector<std::string>& nid, const std::vector<std::string>& sid, const std::string& group = "") = 0;
virtual ServerError Get(const std::string& nid, std::string& sid) const = 0;
virtual ServerError Get(const std::string& nid, std::string& sid, const std::string& group = "") const = 0;
//NOTE: the 'sid' will be cleared at begin of the function
virtual ServerError Get(const std::vector<std::string>& nid, std::vector<std::string>& sid) const = 0;
virtual ServerError Get(const std::vector<std::string>& nid, std::vector<std::string>& sid, const std::string& group = "") const = 0;
virtual ServerError Delete(const std::string& nid) = 0;
virtual ServerError Delete(const std::string& nid, const std::string& group = "") = 0;
virtual ServerError DeleteGroup(const std::string& group) = 0;
};
class SimpleIdMapper : public IVecIdMapper{
......@@ -40,33 +41,18 @@ public:
SimpleIdMapper();
~SimpleIdMapper();
ServerError Put(const std::string& nid, const std::string& sid) override;
ServerError Put(const std::vector<std::string>& nid, const std::vector<std::string>& sid) override;
ServerError Put(const std::string& nid, const std::string& sid, const std::string& group = "") override;
ServerError Put(const std::vector<std::string>& nid, const std::vector<std::string>& sid, const std::string& group = "") override;
ServerError Get(const std::string& nid, std::string& sid) const override;
ServerError Get(const std::vector<std::string>& nid, std::vector<std::string>& sid) const override;
ServerError Get(const std::string& nid, std::string& sid, const std::string& group = "") const override;
ServerError Get(const std::vector<std::string>& nid, std::vector<std::string>& sid, const std::string& group = "") const override;
ServerError Delete(const std::string& nid) override;
ServerError Delete(const std::string& nid, const std::string& group = "") override;
ServerError DeleteGroup(const std::string& group) override;
private:
std::unordered_map<std::string, std::string> ids_;
};
class RocksIdMapper : public IVecIdMapper{
public:
RocksIdMapper();
~RocksIdMapper();
ServerError Put(const std::string& nid, const std::string& sid) override;
ServerError Put(const std::vector<std::string>& nid, const std::vector<std::string>& sid) override;
ServerError Get(const std::string& nid, std::string& sid) const override;
ServerError Get(const std::vector<std::string>& nid, std::vector<std::string>& sid) const override;
ServerError Delete(const std::string& nid) override;
private:
rocksdb::DB* db_;
using ID_MAPPING = std::unordered_map<std::string, std::string>;
mutable std::unordered_map<std::string, ID_MAPPING> id_groups_;
};
}
......
......@@ -6,7 +6,6 @@
#include "VecServiceHandler.h"
#include "VecServiceTask.h"
#include "ServerConfig.h"
#include "VecIdMapper.h"
#include "utils/Log.h"
#include "utils/CommonUtil.h"
#include "utils/TimeRecorder.h"
......@@ -23,12 +22,12 @@ namespace {
public:
TimeRecordWrapper(const std::string& func_name)
: recorder_(func_name), func_name_(func_name) {
SERVER_LOG_TRACE << func_name << " called";
//SERVER_LOG_TRACE << func_name << " called";
}
~TimeRecordWrapper() {
recorder_.Elapse("cost");
SERVER_LOG_TRACE << func_name_ << " finished";
//SERVER_LOG_TRACE << func_name_ << " finished";
}
private:
......@@ -38,87 +37,128 @@ namespace {
void TimeRecord(const std::string& func_name) {
}
const std::map<ServerError, zilliz::VecErrCode::type>& ErrorMap() {
static const std::map<ServerError, zilliz::VecErrCode::type> code_map = {
{SERVER_UNEXPECTED_ERROR, zilliz::VecErrCode::ILLEGAL_ARGUMENT},
{SERVER_NULL_POINTER, zilliz::VecErrCode::ILLEGAL_ARGUMENT},
{SERVER_INVALID_ARGUMENT, zilliz::VecErrCode::ILLEGAL_ARGUMENT},
{SERVER_FILE_NOT_FOUND, zilliz::VecErrCode::ILLEGAL_ARGUMENT},
{SERVER_NOT_IMPLEMENT, zilliz::VecErrCode::ILLEGAL_ARGUMENT},
{SERVER_BLOCKING_QUEUE_EMPTY, zilliz::VecErrCode::ILLEGAL_ARGUMENT},
{SERVER_GROUP_NOT_EXIST, zilliz::VecErrCode::GROUP_NOT_EXISTS},
{SERVER_INVALID_TIME_RANGE, zilliz::VecErrCode::ILLEGAL_TIME_RANGE},
{SERVER_INVALID_VECTOR_DIMENSION, zilliz::VecErrCode::ILLEGAL_VECTOR_DIMENSION},
};
return code_map;
}
const std::map<ServerError, std::string>& ErrorMessage() {
static const std::map<ServerError, std::string> msg_map = {
{SERVER_UNEXPECTED_ERROR, "unexpected error occurs"},
{SERVER_NULL_POINTER, "null pointer error"},
{SERVER_INVALID_ARGUMENT, "invalid argument"},
{SERVER_FILE_NOT_FOUND, "file not found"},
{SERVER_NOT_IMPLEMENT, "not implemented"},
{SERVER_BLOCKING_QUEUE_EMPTY, "queue empty"},
{SERVER_GROUP_NOT_EXIST, "group not exist"},
{SERVER_INVALID_TIME_RANGE, "invalid time range"},
{SERVER_INVALID_VECTOR_DIMENSION, "invalid vector dimension"},
};
return msg_map;
}
void ExecTask(BaseTaskPtr& task_ptr) {
if(task_ptr == nullptr) {
return;
}
VecServiceScheduler& scheduler = VecServiceScheduler::GetInstance();
scheduler.ExecuteTask(task_ptr);
if(!task_ptr->IsAsync()) {
task_ptr->WaitToFinish();
ServerError err = task_ptr->ErrorCode();
if (err != SERVER_SUCCESS) {
zilliz::VecException ex;
ex.__set_code(ErrorMap().at(err));
std::string msg = task_ptr->ErrorMsg();
if(msg.empty()){
msg = ErrorMessage().at(err);
}
ex.__set_reason(msg);
throw ex;
}
}
}
}
void
VecServiceHandler::add_group(const VecGroup &group) {
TimeRecordWrapper rc("add_group()");
SERVER_LOG_TRACE << "group.id = " << group.id << ", group.dimension = " << group.dimension
<< ", group.index_type = " << group.index_type;
std::string info = "add_group() " + group.id + " dimension = " + std::to_string(group.dimension)
+ " index_type = " + std::to_string(group.index_type);
TimeRecordWrapper rc(info);
BaseTaskPtr task_ptr = AddGroupTask::Create(group.dimension, group.id);
VecServiceScheduler& scheduler = VecServiceScheduler::GetInstance();
scheduler.ExecuteTask(task_ptr);
ExecTask(task_ptr);
}
void
VecServiceHandler::get_group(VecGroup &_return, const std::string &group_id) {
TimeRecordWrapper rc("get_group()");
SERVER_LOG_TRACE << "group_id = " << group_id;
TimeRecordWrapper rc("get_group() " + group_id);
_return.id = group_id;
BaseTaskPtr task_ptr = GetGroupTask::Create(group_id, _return.dimension);
VecServiceScheduler& scheduler = VecServiceScheduler::GetInstance();
scheduler.ExecuteTask(task_ptr);
ExecTask(task_ptr);
}
void
VecServiceHandler::del_group(const std::string &group_id) {
TimeRecordWrapper rc("del_group()");
SERVER_LOG_TRACE << "group_id = " << group_id;
TimeRecordWrapper rc("del_group() " + group_id);
BaseTaskPtr task_ptr = DeleteGroupTask::Create(group_id);
VecServiceScheduler& scheduler = VecServiceScheduler::GetInstance();
scheduler.ExecuteTask(task_ptr);
ExecTask(task_ptr);
}
void
VecServiceHandler::add_vector(std::string& _return, const std::string &group_id, const VecTensor &tensor) {
TimeRecordWrapper rc("add_vector()");
SERVER_LOG_TRACE << "group_id = " << group_id << ", vector size = " << tensor.tensor.size();
TimeRecordWrapper rc("add_vector() to " + group_id);
BaseTaskPtr task_ptr = AddVectorTask::Create(group_id, &tensor, _return);
VecServiceScheduler& scheduler = VecServiceScheduler::GetInstance();
scheduler.ExecuteTask(task_ptr);
ExecTask(task_ptr);
}
void
VecServiceHandler::add_vector_batch(std::vector<std::string> & _return,
const std::string &group_id,
const VecTensorList &tensor_list) {
TimeRecordWrapper rc("add_vector_batch()");
SERVER_LOG_TRACE << "group_id = " << group_id << ", vector list size = "
<< tensor_list.tensor_list.size();
TimeRecordWrapper rc("add_vector_batch() to " + group_id);
BaseTaskPtr task_ptr = AddBatchVectorTask::Create(group_id, &tensor_list, _return);
VecServiceScheduler& scheduler = VecServiceScheduler::GetInstance();
scheduler.ExecuteTask(task_ptr);
ExecTask(task_ptr);
}
void
VecServiceHandler::add_binary_vector(std::string& _return,
const std::string& group_id,
const VecBinaryTensor& tensor) {
TimeRecordWrapper rc("add_binary_vector()");
SERVER_LOG_TRACE << "group_id = " << group_id << ", vector size = " << tensor.tensor.size()/4;
TimeRecordWrapper rc("add_binary_vector() to " + group_id);
BaseTaskPtr task_ptr = AddVectorTask::Create(group_id, &tensor, _return);
VecServiceScheduler& scheduler = VecServiceScheduler::GetInstance();
scheduler.ExecuteTask(task_ptr);
ExecTask(task_ptr);
}
void
VecServiceHandler::add_binary_vector_batch(std::vector<std::string> & _return,
const std::string& group_id,
const VecBinaryTensorList& tensor_list) {
TimeRecordWrapper rc("add_binary_vector_batch()");
SERVER_LOG_TRACE << "group_id = " << group_id << ", vector list size = "
<< tensor_list.tensor_list.size();
TimeRecordWrapper rc("add_binary_vector_batch() to " + group_id);
BaseTaskPtr task_ptr = AddBatchVectorTask::Create(group_id, &tensor_list, _return);
VecServiceScheduler& scheduler = VecServiceScheduler::GetInstance();
scheduler.ExecuteTask(task_ptr);
ExecTask(task_ptr);
}
void
......@@ -127,16 +167,13 @@ VecServiceHandler::search_vector(VecSearchResult &_return,
const int64_t top_k,
const VecTensor &tensor,
const VecSearchFilter& filter) {
TimeRecordWrapper rc("search_vector()");
SERVER_LOG_TRACE << "group_id = " << group_id << ", top_k = " << top_k
<< ", vector dimension = " << tensor.tensor.size();
TimeRecordWrapper rc("search_vector() in " + group_id);
VecTensorList tensor_list;
tensor_list.tensor_list.push_back(tensor);
VecSearchResultList result;
BaseTaskPtr task_ptr = SearchVectorTask::Create(group_id, top_k, &tensor_list, filter, result);
VecServiceScheduler& scheduler = VecServiceScheduler::GetInstance();
scheduler.ExecuteTask(task_ptr);
ExecTask(task_ptr);
if(!result.result_list.empty()) {
_return = result.result_list[0];
......@@ -151,13 +188,10 @@ VecServiceHandler::search_vector_batch(VecSearchResultList &_return,
const int64_t top_k,
const VecTensorList &tensor_list,
const VecSearchFilter& filter) {
TimeRecordWrapper rc("search_vector_batch()");
SERVER_LOG_TRACE << "group_id = " << group_id << ", top_k = " << top_k
<< ", vector list size = " << tensor_list.tensor_list.size();
TimeRecordWrapper rc("search_vector_batch() in " + group_id);
BaseTaskPtr task_ptr = SearchVectorTask::Create(group_id, top_k, &tensor_list, filter, _return);
VecServiceScheduler& scheduler = VecServiceScheduler::GetInstance();
scheduler.ExecuteTask(task_ptr);
ExecTask(task_ptr);
}
void
......@@ -166,16 +200,13 @@ VecServiceHandler::search_binary_vector(VecSearchResult& _return,
const int64_t top_k,
const VecBinaryTensor& tensor,
const VecSearchFilter& filter) {
TimeRecordWrapper rc("search_binary_vector()");
SERVER_LOG_TRACE << "group_id = " << group_id << ", top_k = " << top_k
<< ", vector dimension = " << tensor.tensor.size();
TimeRecordWrapper rc("search_binary_vector() in " + group_id);
VecBinaryTensorList tensor_list;
tensor_list.tensor_list.push_back(tensor);
VecSearchResultList result;
BaseTaskPtr task_ptr = SearchVectorTask::Create(group_id, top_k, &tensor_list, filter, result);
VecServiceScheduler& scheduler = VecServiceScheduler::GetInstance();
scheduler.ExecuteTask(task_ptr);
ExecTask(task_ptr);
if(!result.result_list.empty()) {
_return = result.result_list[0];
......@@ -190,13 +221,10 @@ VecServiceHandler::search_binary_vector_batch(VecSearchResultList& _return,
const int64_t top_k,
const VecBinaryTensorList& tensor_list,
const VecSearchFilter& filter) {
TimeRecordWrapper rc("search_binary_vector_batch()");
SERVER_LOG_TRACE << "group_id = " << group_id << ", top_k = " << top_k
<< ", vector list size = " << tensor_list.tensor_list.size();
TimeRecordWrapper rc("search_binary_vector_batch() in " + group_id);
BaseTaskPtr task_ptr = SearchVectorTask::Create(group_id, top_k, &tensor_list, filter, _return);
VecServiceScheduler& scheduler = VecServiceScheduler::GetInstance();
scheduler.ExecuteTask(task_ptr);
ExecTask(task_ptr);
}
......
......@@ -27,6 +27,7 @@ public:
std::string TaskGroup() const { return task_group_; }
ServerError ErrorCode() const { return error_code_; }
std::string ErrorMsg() const { return error_msg_; }
bool IsAsync() const { return async_; }
......@@ -41,6 +42,7 @@ protected:
bool async_;
bool done_;
ServerError error_code_;
std::string error_msg_;
};
using BaseTaskPtr = std::shared_ptr<BaseTask>;
......
......@@ -91,13 +91,16 @@ ServerError AddGroupTask::OnExecute() {
group_info.dimension = (size_t)dimension_;
group_info.group_id = group_id_;
engine::Status stat = DB()->add_group(group_info);
if(!stat.ok()) {
if(!stat.ok()) {//could exist
SERVER_LOG_ERROR << "Engine failed: " << stat.ToString();
return SERVER_UNEXPECTED_ERROR;
SERVER_LOG_ERROR << error_msg_;
return SERVER_SUCCESS;
}
} catch (std::exception& ex) {
SERVER_LOG_ERROR << ex.what();
error_code_ = SERVER_UNEXPECTED_ERROR;
error_msg_ = ex.what();
SERVER_LOG_ERROR << error_msg_;
return SERVER_UNEXPECTED_ERROR;
}
......@@ -124,14 +127,18 @@ ServerError GetGroupTask::OnExecute() {
group_info.group_id = group_id_;
engine::Status stat = DB()->get_group(group_info);
if(!stat.ok()) {
SERVER_LOG_ERROR << "Engine failed: " << stat.ToString();
return SERVER_UNEXPECTED_ERROR;
error_code_ = SERVER_GROUP_NOT_EXIST;
error_msg_ = "Engine failed: " + stat.ToString();
SERVER_LOG_ERROR << error_msg_;
return error_code_;
} else {
dimension_ = (int32_t)group_info.dimension;
}
} catch (std::exception& ex) {
SERVER_LOG_ERROR << ex.what();
error_code_ = SERVER_UNEXPECTED_ERROR;
error_msg_ = ex.what();
SERVER_LOG_ERROR << error_msg_;
return SERVER_UNEXPECTED_ERROR;
}
......@@ -150,14 +157,13 @@ BaseTaskPtr DeleteGroupTask::Create(const std::string& group_id) {
}
ServerError DeleteGroupTask::OnExecute() {
try {
error_code_ = SERVER_NOT_IMPLEMENT;
error_msg_ = "delete group not implemented";
SERVER_LOG_ERROR << error_msg_;
//IVecIdMapper::GetInstance()->DeleteGroup(group_id_);
} catch (std::exception& ex) {
SERVER_LOG_ERROR << ex.what();
}
return SERVER_SUCCESS;
return SERVER_NOT_IMPLEMENT;
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
......@@ -237,22 +243,7 @@ const AttribMap& AddVectorTask::GetVecAttrib() const {
ServerError AddVectorTask::OnExecute() {
try {
engine::meta::GroupSchema group_info;
group_info.group_id = group_id_;
engine::Status stat = DB()->get_group(group_info);
if(!stat.ok()) {
SERVER_LOG_ERROR << "Engine failed: " << stat.ToString();
return SERVER_INVALID_ARGUMENT;
}
uint64_t group_dim = group_info.dimension;
uint64_t vec_dim = GetVecDimension();
if(group_dim != vec_dim) {
SERVER_LOG_ERROR << "Invalid vector dimension: " << vec_dim
<< " vs. group dimension:" << group_dim;
return SERVER_INVALID_ARGUMENT;
}
std::vector<float> vec_f;
vec_f.resize(vec_dim);
const double* d_p = GetVecData();
......@@ -261,13 +252,16 @@ ServerError AddVectorTask::OnExecute() {
}
engine::IDNumbers vector_ids;
stat = DB()->add_vectors(group_id_, 1, vec_f.data(), vector_ids);
engine::Status stat = DB()->add_vectors(group_id_, 1, vec_f.data(), vector_ids);
if(!stat.ok()) {
SERVER_LOG_ERROR << "Engine failed: " << stat.ToString();
return SERVER_UNEXPECTED_ERROR;
error_code_ = SERVER_UNEXPECTED_ERROR;
error_msg_ = "Engine failed: " + stat.ToString();
SERVER_LOG_ERROR << error_msg_;
return error_code_;
} else {
if(vector_ids.empty()) {
SERVER_LOG_ERROR << "Vector ID not returned";
error_msg_ = "Engine failed: " + stat.ToString();
SERVER_LOG_ERROR << error_msg_;
return SERVER_UNEXPECTED_ERROR;
} else {
std::string uid = GetVecID();
......@@ -283,14 +277,16 @@ ServerError AddVectorTask::OnExecute() {
attrib[VECTOR_UID] = tensor_id_;
std::string attrib_str;
AttributeSerializer::Encode(attrib, attrib_str);
IVecIdMapper::GetInstance()->Put(nid, attrib_str);
SERVER_LOG_TRACE << "nid = " << vector_ids[0] << ", uid = " << uid;
IVecIdMapper::GetInstance()->Put(nid, attrib_str, group_id_);
//SERVER_LOG_TRACE << "nid = " << vector_ids[0] << ", uid = " << uid;
}
}
} catch (std::exception& ex) {
SERVER_LOG_ERROR << ex.what();
return SERVER_UNEXPECTED_ERROR;
error_code_ = SERVER_UNEXPECTED_ERROR;
error_msg_ = ex.what();
SERVER_LOG_ERROR << error_msg_;
return error_code_;
}
return SERVER_SUCCESS;
......@@ -416,7 +412,7 @@ void AddBatchVectorTask::ProcessIdMapping(engine::IDNumbers& vector_ids,
attrib[VECTOR_UID] = uid;
std::string attrib_str;
AttributeSerializer::Encode(attrib, attrib_str);
IVecIdMapper::GetInstance()->Put(nid, attrib_str);
IVecIdMapper::GetInstance()->Put(nid, attrib_str, group_id_);
}
}
......@@ -433,8 +429,10 @@ ServerError AddBatchVectorTask::OnExecute() {
group_info.group_id = group_id_;
engine::Status stat = DB()->get_group(group_info);
if(!stat.ok()) {
SERVER_LOG_ERROR << "Engine failed: " << stat.ToString();
return SERVER_UNEXPECTED_ERROR;
error_code_ = SERVER_GROUP_NOT_EXIST;
error_msg_ = "Engine failed: " + stat.ToString();
SERVER_LOG_ERROR << error_msg_;
return error_code_;
}
rc.Record("check group dimension");
......@@ -447,7 +445,9 @@ ServerError AddBatchVectorTask::OnExecute() {
if(vec_dim != group_dim) {
SERVER_LOG_ERROR << "Invalid vector dimension: " << vec_dim
<< " vs. group dimension:" << group_dim;
return SERVER_INVALID_ARGUMENT;
error_code_ = SERVER_INVALID_VECTOR_DIMENSION;
error_msg_ = "Engine failed: " + stat.ToString();
return error_code_;
}
const double* d_p = GetVecData(i);
......@@ -462,43 +462,48 @@ ServerError AddBatchVectorTask::OnExecute() {
stat = DB()->add_vectors(group_id_, vec_count, vec_f.data(), vector_ids);
rc.Record("add vectors to engine");
if(!stat.ok()) {
SERVER_LOG_ERROR << "Engine failed: " << stat.ToString();
error_code_ = SERVER_UNEXPECTED_ERROR;
error_msg_ = "Engine failed: " + stat.ToString();
SERVER_LOG_ERROR << error_msg_;
return error_code_;
}
if(vector_ids.size() < vec_count) {
SERVER_LOG_ERROR << "Vector ID not returned";
return SERVER_UNEXPECTED_ERROR;
} else {
if(vector_ids.size() < vec_count) {
SERVER_LOG_ERROR << "Vector ID not returned";
return SERVER_UNEXPECTED_ERROR;
tensor_ids_.resize(vector_ids.size());
if(vec_count < USE_MT) {
ProcessIdMapping(vector_ids, 0, vec_count, tensor_ids_);
rc.Record("built id mapping");
} else {
tensor_ids_.resize(vector_ids.size());
if(vec_count < USE_MT) {
ProcessIdMapping(vector_ids, 0, vec_count, tensor_ids_);
rc.Record("built id mapping");
} else {
std::list<std::future<void>> threads_list;
uint64_t begin_index = 0, end_index = USE_MT;
while(end_index < vec_count) {
threads_list.push_back(
GetThreadPool().enqueue(&AddBatchVectorTask::ProcessIdMapping,
this, vector_ids, begin_index, end_index, tensor_ids_));
begin_index = end_index;
end_index += USE_MT;
if(end_index > vec_count) {
end_index = vec_count;
}
}
for (std::list<std::future<void>>::iterator it = threads_list.begin(); it != threads_list.end(); it++) {
it->wait();
std::list<std::future<void>> threads_list;
uint64_t begin_index = 0, end_index = USE_MT;
while(end_index < vec_count) {
threads_list.push_back(
GetThreadPool().enqueue(&AddBatchVectorTask::ProcessIdMapping,
this, vector_ids, begin_index, end_index, tensor_ids_));
begin_index = end_index;
end_index += USE_MT;
if(end_index > vec_count) {
end_index = vec_count;
}
}
rc.Record("built id mapping by multi-threads:" + std::to_string(threads_list.size()));
for (std::list<std::future<void>>::iterator it = threads_list.begin(); it != threads_list.end(); it++) {
it->wait();
}
rc.Record("built id mapping by multi-threads:" + std::to_string(threads_list.size()));
}
}
} catch (std::exception& ex) {
SERVER_LOG_ERROR << ex.what();
return SERVER_UNEXPECTED_ERROR;
error_code_ = SERVER_UNEXPECTED_ERROR;
error_msg_ = ex.what();
SERVER_LOG_ERROR << error_msg_;
return error_code_;
}
return SERVER_SUCCESS;
......@@ -612,15 +617,19 @@ ServerError SearchVectorTask::OnExecute() {
group_info.group_id = group_id_;
engine::Status stat = DB()->get_group(group_info);
if(!stat.ok()) {
SERVER_LOG_ERROR << "Engine failed: " << stat.ToString();
return SERVER_UNEXPECTED_ERROR;
error_code_ = SERVER_GROUP_NOT_EXIST;
error_msg_ = "Engine failed: " + stat.ToString();
SERVER_LOG_ERROR << error_msg_;
return error_code_;
}
uint64_t vec_dim = GetTargetDimension();
if(vec_dim != group_info.dimension) {
SERVER_LOG_ERROR << "Invalid vector dimension: " << vec_dim
<< " vs. group dimension:" << group_info.dimension;
return SERVER_INVALID_ARGUMENT;
error_code_ = SERVER_INVALID_VECTOR_DIMENSION;
error_msg_ = "Engine failed: " + stat.ToString();
return error_code_;
}
rc.Record("check group dimension");
......@@ -654,18 +663,30 @@ ServerError SearchVectorTask::OnExecute() {
for(auto id : res) {
std::string attrib_str;
std::string nid = nid_prefix + std::to_string(id);
IVecIdMapper::GetInstance()->Get(nid, attrib_str);
IVecIdMapper::GetInstance()->Get(nid, attrib_str, group_id_);
AttribMap attrib_map;
AttributeSerializer::Decode(attrib_str, attrib_map);
AttribMap attrib_return;
VecSearchResultItem item;
item.__set_attrib(attrib_map);
item.uid = item.attrib[VECTOR_UID];
item.uid = attrib_map[VECTOR_UID];
if(filter_.return_attribs.empty()) {//return all attributes
attrib_return.swap(attrib_map);
} else {//filter attributes
for(auto& name : filter_.return_attribs) {
if(attrib_map.count(name) == 0)
continue;
attrib_return[name] = attrib_map[name];
}
}
item.__set_attrib(attrib_return);
item.distance = 0.0;////TODO: return distance
v_res.result_list.emplace_back(item);
SERVER_LOG_TRACE << "nid = " << nid << ", uid = " << item.uid;
//SERVER_LOG_TRACE << "nid = " << nid << ", uid = " << item.uid;
}
result_.result_list.push_back(v_res);
......@@ -674,8 +695,10 @@ ServerError SearchVectorTask::OnExecute() {
}
} catch (std::exception& ex) {
SERVER_LOG_ERROR << ex.what();
return SERVER_UNEXPECTED_ERROR;
error_code_ = SERVER_UNEXPECTED_ERROR;
error_msg_ = ex.what();
SERVER_LOG_ERROR << error_msg_;
return error_code_;
}
return SERVER_SUCCESS;
......
......@@ -10,7 +10,7 @@
#include "utils/AttributeSerializer.h"
#include "db/Types.h"
#include "thrift/gen-cpp/VectorService_types.h"
#include "thrift/gen-cpp/megasearch_types.h"
#include <condition_variable>
#include <memory>
......
......@@ -10,8 +10,8 @@
#include "utils/Log.h"
#include "thrift/gen-cpp/VectorService_types.h"
#include "thrift/gen-cpp/VectorService_constants.h"
#include "thrift/gen-cpp/megasearch_types.h"
#include "thrift/gen-cpp/megasearch_constants.h"
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/protocol/TJSONProtocol.h>
......
#!/bin/bash
../../third_party/build/bin/thrift -r --gen cpp ./VectorService.thrift
../../third_party/build/bin/thrift -r --gen cpp ./megasearch.thrift
......@@ -9,7 +9,7 @@
#include <thrift/TDispatchProcessor.h>
#include <thrift/async/TConcurrentClientSyncInfo.h>
#include "VectorService_types.h"
#include "megasearch_types.h"
namespace zilliz {
......
......@@ -4,13 +4,13 @@
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
* @generated
*/
#include "VectorService_constants.h"
#include "megasearch_constants.h"
namespace zilliz {
const VectorServiceConstants g_VectorService_constants;
const megasearchConstants g_megasearch_constants;
VectorServiceConstants::VectorServiceConstants() {
megasearchConstants::megasearchConstants() {
}
} // namespace
......
......@@ -4,20 +4,20 @@
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
* @generated
*/
#ifndef VectorService_CONSTANTS_H
#define VectorService_CONSTANTS_H
#ifndef megasearch_CONSTANTS_H
#define megasearch_CONSTANTS_H
#include "VectorService_types.h"
#include "megasearch_types.h"
namespace zilliz {
class VectorServiceConstants {
class megasearchConstants {
public:
VectorServiceConstants();
megasearchConstants();
};
extern const VectorServiceConstants g_VectorService_constants;
extern const megasearchConstants g_megasearch_constants;
} // namespace
......
......@@ -4,7 +4,7 @@
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
* @generated
*/
#include "VectorService_types.h"
#include "megasearch_types.h"
#include <algorithm>
#include <ostream>
......
......@@ -4,8 +4,8 @@
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
* @generated
*/
#ifndef VectorService_TYPES_H
#define VectorService_TYPES_H
#ifndef megasearch_TYPES_H
#define megasearch_TYPES_H
#include <iosfwd>
......
......@@ -82,8 +82,8 @@ def test_vecwise():
transport.close()
time_start = print_time_cost('close connection', time_start)
except Thrift.TException as ex:
print(ex.message)
except VecService.VecException as ex:
print(ex.reason)
test_vecwise()
\ No newline at end of file
......@@ -743,9 +743,9 @@ class VecSearchResultList(object):
class VecDateTime(object):
"""
second; Seconds. [0-60] (1 leap second)
minute; Minutes. [0-59]
hour; Hours. [0-23]
second; Seconds. [0-59] reserved
minute; Minutes. [0-59] reserved
hour; Hours. [0-23] reserved
day; Day. [1-31]
month; Month. [0-11]
year; Year - 1900.
......@@ -874,9 +874,9 @@ class VecDateTime(object):
class VecTimeRange(object):
"""
time_begin; time range begin
begine_closed; true means '[', false means '('
begine_closed; true means '[', false means '(' reserved
time_end; set to true to return tensor double array
end_closed; time range end
end_closed; time range end reserved
Attributes:
- time_begin
......@@ -977,7 +977,7 @@ class VecTimeRange(object):
class VecSearchFilter(object):
"""
attrib_filter; search condition, for example: "color=red"
attrib_filter; reserved
time_ranges; search condition, for example: "date between 1999-02-12 and 2008-10-14"
return_attribs; specify required attribute names
......
......@@ -69,9 +69,9 @@ struct VecSearchResultList {
}
/**
* second; Seconds. [0-60] (1 leap second)
* minute; Minutes. [0-59]
* hour; Hours. [0-23]
* second; Seconds. [0-59] reserved
* minute; Minutes. [0-59] reserved
* hour; Hours. [0-23] reserved
* day; Day. [1-31]
* month; Month. [0-11]
* year; Year - 1900.
......@@ -87,9 +87,9 @@ struct VecDateTime {
/**
* time_begin; time range begin
* begine_closed; true means '[', false means '('
* begine_closed; true means '[', false means '(' reserved
* time_end; set to true to return tensor double array
* end_closed; time range end
* end_closed; time range end reserved
*/
struct VecTimeRange {
1: required VecDateTime time_begin;
......@@ -99,7 +99,7 @@ struct VecTimeRange {
}
/**
* attrib_filter; search condition, for example: "color=red"
* attrib_filter; reserved
* time_ranges; search condition, for example: "date between 1999-02-12 and 2008-10-14"
* return_attribs; specify required attribute names
*/
......
#!/bin/bash
../../third_party/build/bin/thrift -r --gen py ./VectorService.thrift
../../third_party/build/bin/thrift -r --gen py ./megasearch.thrift
......@@ -21,7 +21,7 @@ constexpr ServerError SERVER_ERROR_CODE_BASE = 0x30000;
constexpr ServerError
ToGlobalServerErrorCode(const ServerError error_code) {
return SERVER_ERROR_CODE_BASE + SERVER_ERROR_CODE_BASE;
return SERVER_ERROR_CODE_BASE + error_code;
}
constexpr ServerError SERVER_UNEXPECTED_ERROR = ToGlobalServerErrorCode(0x001);
......@@ -31,6 +31,9 @@ constexpr ServerError SERVER_INVALID_ARGUMENT = ToGlobalServerErrorCode(0x004);
constexpr ServerError SERVER_FILE_NOT_FOUND = ToGlobalServerErrorCode(0x005);
constexpr ServerError SERVER_NOT_IMPLEMENT = ToGlobalServerErrorCode(0x006);
constexpr ServerError SERVER_BLOCKING_QUEUE_EMPTY = ToGlobalServerErrorCode(0x007);
constexpr ServerError SERVER_GROUP_NOT_EXIST = ToGlobalServerErrorCode(0x008);
constexpr ServerError SERVER_INVALID_TIME_RANGE = ToGlobalServerErrorCode(0x009);
constexpr ServerError SERVER_INVALID_VECTOR_DIMENSION = ToGlobalServerErrorCode(0x00a);
class ServerException : public std::exception {
public:
......
......@@ -16,8 +16,8 @@ set(util_files
set(service_files
../src/thrift/gen-cpp/VecService.cpp
../src/thrift/gen-cpp/VectorService_constants.cpp
../src/thrift/gen-cpp/VectorService_types.cpp)
../src/thrift/gen-cpp/megasearch_constants.cpp
../src/thrift/gen-cpp/megasearch_types.cpp)
......
......@@ -6,8 +6,8 @@
#include "ClientSession.h"
#include "Log.h"
#include "thrift/gen-cpp/VectorService_types.h"
#include "thrift/gen-cpp/VectorService_constants.h"
#include "thrift/gen-cpp/megasearch_types.h"
#include "thrift/gen-cpp/megasearch_constants.h"
#include <exception>
......
......@@ -21,7 +21,7 @@ using namespace zilliz::vecwise::client;
namespace {
static const int32_t VEC_DIMENSION = 256;
static const int64_t BATCH_COUNT = 1000;
static const int64_t BATCH_COUNT = 10000;
static const int64_t REPEAT_COUNT = 1;
static const int64_t TOP_K = 10;
......@@ -126,6 +126,28 @@ TEST(AddVector, CLIENT_TEST) {
GetServerAddress(address, port, protocol);
client::ClientSession session(address, port, protocol);
//verify get invalid group
try {
std::string id;
zilliz::VecTensor tensor;
for(int32_t i = 0; i < VEC_DIMENSION; i++) {
tensor.tensor.push_back(0.5);
}
session.interface()->add_vector(id, GetGroupID(), tensor);
} catch (zilliz::VecException& ex) {
CLIENT_LOG_ERROR << "request encounter exception: " << ex.what();
ASSERT_EQ(ex.code, zilliz::VecErrCode::ILLEGAL_ARGUMENT);
}
try {
VecGroup temp_group;
session.interface()->get_group(temp_group, GetGroupID());
//ASSERT_TRUE(temp_group.id.empty());
} catch (zilliz::VecException& ex) {
CLIENT_LOG_ERROR << "request encounter exception: " << ex.what();
ASSERT_EQ(ex.code, zilliz::VecErrCode::GROUP_NOT_EXISTS);
}
//add group
VecGroup group;
group.id = GetGroupID();
......@@ -214,7 +236,6 @@ TEST(SearchVector, CLIENT_TEST) {
//search vector
{
const int32_t anchor_index = 100;
server::TimeRecorder rc("Search top_k");
VecTensor tensor;
for (int32_t i = 0; i < VEC_DIMENSION; i++) {
tensor.tensor.push_back((double) (i + anchor_index));
......@@ -232,35 +253,60 @@ TEST(SearchVector, CLIENT_TEST) {
time_ranges.emplace_back(range);
filter.__set_time_ranges(time_ranges);
//do search
session.interface()->search_vector(res, GetGroupID(), TOP_K, tensor, filter);
//normal search
{
server::TimeRecorder rc("Search top_k");
session.interface()->search_vector(res, GetGroupID(), TOP_K, tensor, filter);
rc.Elapse("done!");
//build result
std::cout << "Search result: " << std::endl;
for(VecSearchResultItem& item : res.result_list) {
std::cout << "\t" << item.uid << std::endl;
//build result
std::cout << "Search result: " << std::endl;
for (VecSearchResultItem &item : res.result_list) {
std::cout << "\t" << item.uid << std::endl;
ASSERT_TRUE(item.attrib.count(TEST_ATTRIB_NUM) != 0);
ASSERT_TRUE(item.attrib.count(TEST_ATTRIB_COMMENT) != 0);
ASSERT_TRUE(!item.attrib[TEST_ATTRIB_COMMENT].empty());
ASSERT_TRUE(item.attrib.count(TEST_ATTRIB_NUM) != 0);
ASSERT_TRUE(item.attrib.count(TEST_ATTRIB_COMMENT) != 0);
ASSERT_TRUE(!item.attrib[TEST_ATTRIB_COMMENT].empty());
}
ASSERT_EQ(res.result_list.size(), (uint64_t) TOP_K);
if (!res.result_list.empty()) {
ASSERT_TRUE(!res.result_list[0].uid.empty());
}
}
rc.Elapse("done!");
ASSERT_EQ(res.result_list.size(), (uint64_t)TOP_K);
if(!res.result_list.empty()) {
ASSERT_TRUE(!res.result_list[0].uid.empty());
//filter attribute search
{
std::vector<std::string> require_attributes = {TEST_ATTRIB_COMMENT};
filter.__set_return_attribs(require_attributes);
server::TimeRecorder rc("Search top_k with attribute filter");
session.interface()->search_vector(res, GetGroupID(), TOP_K, tensor, filter);
rc.Elapse("done!");
//build result
std::cout << "Search result attributes: " << std::endl;
for (VecSearchResultItem &item : res.result_list) {
ASSERT_EQ(item.attrib.size(), 1UL);
ASSERT_TRUE(item.attrib.count(TEST_ATTRIB_COMMENT) != 0);
ASSERT_TRUE(!item.attrib[TEST_ATTRIB_COMMENT].empty());
std::cout << "\t" << item.uid << ":" << item.attrib[TEST_ATTRIB_COMMENT] << std::endl;
}
ASSERT_EQ(res.result_list.size(), (uint64_t) TOP_K);
}
//empty search
date.day > 0 ? date.day -= 1 : date.day += 1;
range.time_begin = date;
range.time_end = date;
time_ranges.clear();
time_ranges.emplace_back(range);
filter.__set_time_ranges(time_ranges);
session.interface()->search_vector(res, GetGroupID(), TOP_K, tensor, filter);
ASSERT_EQ(res.result_list.size(), 0);
{
date.day > 0 ? date.day -= 1 : date.day += 1;
range.time_begin = date;
range.time_end = date;
time_ranges.clear();
time_ranges.emplace_back(range);
filter.__set_time_ranges(time_ranges);
session.interface()->search_vector(res, GetGroupID(), TOP_K, tensor, filter);
ASSERT_EQ(res.result_list.size(), 0);
}
}
//search binary vector
......
......@@ -14,6 +14,7 @@ link_directories("/usr/local/cuda/lib64")
set(require_files
../../src/server/VecIdMapper.cpp
../../src/server/RocksIdMapper.cpp
../../src/server/ServerConfig.cpp
../../src/utils/CommonUtil.cpp
../../src/utils/TimeRecorder.cpp
......
......@@ -16,6 +16,7 @@ aux_source_directory(./ test_srcs)
set(require_files
../../src/server/VecIdMapper.cpp
../../src/server/RocksIdMapper.cpp
../../src/server/ServerConfig.cpp
../../src/utils/CommonUtil.cpp
../../src/utils/TimeRecorder.cpp
......
......@@ -11,12 +11,13 @@ using namespace zilliz::vecwise;
TEST(CommonTest, COMMON_TEST) {
std::string path1 = "/tmp/vecwise_test/common_test_12345/";
std::string path2 = path1 + "abcdef";
server::ServerError err = server::CommonUtil::CreateDirectory(path2);
std::string path1 = "/tmp/vecwise_test/";
std::string path2 = path1 + "common_test_12345/";
std::string path3 = path2 + "abcdef";
server::ServerError err = server::CommonUtil::CreateDirectory(path3);
ASSERT_EQ(err, server::SERVER_SUCCESS);
ASSERT_TRUE(server::CommonUtil::IsDirectoryExit(path2));
ASSERT_TRUE(server::CommonUtil::IsDirectoryExit(path3));
err = server::CommonUtil::DeleteDirectory(path1);
ASSERT_EQ(err, server::SERVER_SUCCESS);
......
......@@ -9,44 +9,93 @@
#include "utils/TimeRecorder.h"
#include "utils/CommonUtil.h"
#include <time.h>
using namespace zilliz::vecwise;
namespace {
std::string CurrentTime() {
time_t tt;
time(&tt);
tt = tt + 8 * 3600;
tm *t = gmtime(&tt);
std::string str = std::to_string(t->tm_year + 1900) + "_" + std::to_string(t->tm_mon + 1)
+ "_" + std::to_string(t->tm_mday) + "_" + std::to_string(t->tm_hour)
+ "_" + std::to_string(t->tm_min) + "_" + std::to_string(t->tm_sec);
return str;
}
std::string GetGroupID() {
static std::string s_id(CurrentTime());
return s_id;
}
}
TEST(IdMapperTest, IDMAPPER_TEST) {
server::IVecIdMapper* mapper = server::IVecIdMapper::GetInstance();
std::string group_id = GetGroupID();
std::vector<std::string> nid = {"1", "50", "900", "10000"};
std::vector<std::string> sid = {"one", "fifty", "nine zero zero", "many"};
server::ServerError err = mapper->Put(nid, sid);
server::ServerError err = mapper->Put(nid, sid, group_id);
ASSERT_EQ(err, server::SERVER_SUCCESS);
err = mapper->Put(nid, std::vector<std::string>());
err = mapper->Put(nid, std::vector<std::string>(), group_id);
ASSERT_NE(err, server::SERVER_SUCCESS);
std::vector<std::string> res;
err = mapper->Get(nid, res);
err = mapper->Get(nid, res, group_id);
ASSERT_EQ(res.size(), nid.size());
for(size_t i = 0; i < res.size(); i++) {
ASSERT_EQ(res[i], sid[i]);
}
std::string str_id;
err = mapper->Get(nid[1], str_id);
err = mapper->Get(nid[1], str_id, group_id);
ASSERT_EQ(str_id, "fifty");
err = mapper->Delete(nid[2]);
err = mapper->Get(nid[1], str_id);
ASSERT_EQ(str_id, "");
err = mapper->Get(nid[2], str_id, group_id);
ASSERT_EQ(str_id, "nine zero zero");
err = mapper->Delete(nid[2], group_id);
ASSERT_EQ(err, server::SERVER_SUCCESS);
err = mapper->Get(nid[2], str_id);
ASSERT_NE(err, server::SERVER_SUCCESS);
err = mapper->Get(nid[2], str_id, group_id);
ASSERT_EQ(str_id, "");
err = mapper->Get(nid[3], str_id, group_id);
ASSERT_EQ(str_id, "many");
err = mapper->DeleteGroup(group_id);
ASSERT_EQ(err, server::SERVER_SUCCESS);
err = mapper->Get(nid[3], str_id, group_id);
ASSERT_EQ(str_id, "");
std::string ct = CurrentTime();
err = mapper->Put("current_time", ct, "time");
ASSERT_EQ(err, server::SERVER_SUCCESS);
err = mapper->Get("current_time", str_id, "time");
ASSERT_EQ(str_id, ct);
//test performance
nid.clear();
sid.clear();
const int64_t count = 1000000;
for(int64_t i = 0; i < count; i++) {
nid.push_back(std::to_string(i+100000));
sid.push_back("val_" + std::to_string(i));
{
server::TimeRecorder rc("prepare id data");
for (int64_t i = 0; i < count; i++) {
nid.push_back(std::to_string(i + 100000));
sid.push_back("val_" + std::to_string(i));
}
rc.Record("done!");
}
{
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册