提交 a2a9efb0 编写于 作者: Y yhmo

Merge branch 'rocksdb' into 'release-v1'

Rocksdb

See merge request megasearch/vecwise_engine!17

Former-commit-id: 1f13687ccf185ef7c5fda83d43bf81737a14e148
......@@ -14,3 +14,4 @@ cmake_build
cpp/third_party/thrift-0.12.0/
cpp/third_party/faiss-1.5.1
cpp/megasearch/
server_config:
address: 0.0.0.0
address: 127.0.0.1
port: 33001
transfer_protocol: json #optional: binary, compact, json, debug
server_mode: thread_pool #optional: simple, thread_pool
......
......@@ -17,8 +17,8 @@ set(license_check_files
set(service_files
thrift/gen-cpp/VecService.cpp
thrift/gen-cpp/VectorService_constants.cpp
thrift/gen-cpp/VectorService_types.cpp
thrift/gen-cpp/megasearch_constants.cpp
thrift/gen-cpp/megasearch_types.cpp
)
set(vecwise_engine_src
......
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "RocksIdMapper.h"
#include "ServerConfig.h"
#include "utils/Log.h"
#include "utils/CommonUtil.h"
#include "rocksdb/slice.h"
#include "rocksdb/options.h"
#include <exception>
namespace zilliz {
namespace vecwise {
namespace server {
RocksIdMapper::RocksIdMapper()
: db_(nullptr) {
OpenDb();
}
RocksIdMapper::~RocksIdMapper() {
CloseDb();
}
void RocksIdMapper::OpenDb() {
if(db_) {
return;
}
ConfigNode& config = ServerConfig::GetInstance().GetConfig(CONFIG_DB);
std::string db_path = config.GetValue(CONFIG_DB_PATH);
db_path += "/id_mapping";
CommonUtil::CreateDirectory(db_path);
rocksdb::Options options;
// Optimize RocksDB. This is the easiest way to get RocksDB to perform well
options.IncreaseParallelism();
options.OptimizeLevelStyleCompaction();
// create the DB if it's not already present
options.create_if_missing = true;
options.max_open_files = config.GetInt32Value(CONFIG_DB_IDMAPPER_MAX_FILE, 512);
//load column families
std::vector<std::string> column_names;
rocksdb::Status s = rocksdb::DB::ListColumnFamilies(options, db_path, &column_names);
if (!s.ok()) {
SERVER_LOG_ERROR << "ID mapper failed to initialize:" << s.ToString();
}
if(column_names.empty()) {
column_names.push_back("default");
}
SERVER_LOG_INFO << "ID mapper has " << std::to_string(column_names.size()) << " groups";
std::vector<rocksdb::ColumnFamilyDescriptor> column_families;
for(auto& column_name : column_names) {
rocksdb::ColumnFamilyDescriptor desc;
desc.name = column_name;
column_families.emplace_back(desc);
}
// open DB
std::vector<rocksdb::ColumnFamilyHandle*> column_handles;
s = rocksdb::DB::Open(options, db_path, column_families, &column_handles, &db_);
if(!s.ok()) {
SERVER_LOG_ERROR << "ID mapper failed to initialize:" << s.ToString();
db_ = nullptr;
}
column_handles_.clear();
for(auto handler : column_handles) {
column_handles_.insert(std::make_pair(handler->GetName(), handler));
}
}
void RocksIdMapper::CloseDb() {
for(auto& iter : column_handles_) {
delete iter.second;
}
column_handles_.clear();
if(db_) {
db_->Close();
delete db_;
}
}
ServerError RocksIdMapper::Put(const std::string& nid, const std::string& sid, const std::string& group) {
if(db_ == nullptr) {
return SERVER_NULL_POINTER;
}
rocksdb::Slice key(nid);
rocksdb::Slice value(sid);
if(group.empty()) {//to default group
rocksdb::Status s = db_->Put(rocksdb::WriteOptions(), key, value);
if (!s.ok()) {
SERVER_LOG_ERROR << "ID mapper failed to put:" << s.ToString();
return SERVER_UNEXPECTED_ERROR;
}
} else {
rocksdb::ColumnFamilyHandle *cfh = nullptr;
if(column_handles_.count(group) == 0) {
try {//add group
rocksdb::Status s = db_->CreateColumnFamily(rocksdb::ColumnFamilyOptions(), group, &cfh);
if (!s.ok()) {
SERVER_LOG_ERROR << "ID mapper failed to create group:" << s.ToString();
} else {
column_handles_.insert(std::make_pair(group, cfh));
}
} catch(std::exception& ex) {
std::cout << ex.what() << std::endl;
}
} else {
cfh = column_handles_[group];
}
rocksdb::Status s = db_->Put(rocksdb::WriteOptions(), cfh, key, value);
if (!s.ok()) {
SERVER_LOG_ERROR << "ID mapper failed to put:" << s.ToString();
return SERVER_UNEXPECTED_ERROR;
}
}
return SERVER_SUCCESS;
}
ServerError RocksIdMapper::Put(const std::vector<std::string>& nid, const std::vector<std::string>& sid, const std::string& group) {
if(nid.size() != sid.size()) {
return SERVER_INVALID_ARGUMENT;
}
ServerError err = SERVER_SUCCESS;
for(size_t i = 0; i < nid.size(); i++) {
err = Put(nid[i], sid[i], group);
if(err != SERVER_SUCCESS) {
return err;
}
}
return err;
}
ServerError RocksIdMapper::Get(const std::string& nid, std::string& sid, const std::string& group) const {
sid = "";
if(db_ == nullptr) {
return SERVER_NULL_POINTER;
}
rocksdb::ColumnFamilyHandle *cfh = nullptr;
if(column_handles_.count(group) != 0) {
cfh = column_handles_.at(group);
}
rocksdb::Slice key(nid);
rocksdb::Status s;
if(cfh){
s = db_->Get(rocksdb::ReadOptions(), cfh, key, &sid);
} else {
s = db_->Get(rocksdb::ReadOptions(), key, &sid);
}
if(!s.ok()) {
SERVER_LOG_ERROR << "ID mapper failed to get:" << s.ToString();
return SERVER_UNEXPECTED_ERROR;
}
return SERVER_SUCCESS;
}
ServerError RocksIdMapper::Get(const std::vector<std::string>& nid, std::vector<std::string>& sid, const std::string& group) const {
sid.clear();
ServerError err = SERVER_SUCCESS;
for(size_t i = 0; i < nid.size(); i++) {
std::string str_id;
ServerError temp_err = Get(nid[i], str_id, group);
if(temp_err != SERVER_SUCCESS) {
sid.push_back("");
SERVER_LOG_ERROR << "ID mapper failed to get id: " << nid[i];
err = temp_err;
continue;
}
sid.push_back(str_id);
}
return err;
}
ServerError RocksIdMapper::Delete(const std::string& nid, const std::string& group) {
if(db_ == nullptr) {
return SERVER_NULL_POINTER;
}
rocksdb::ColumnFamilyHandle *cfh = nullptr;
if(column_handles_.count(group) != 0) {
cfh = column_handles_.at(group);
}
rocksdb::Slice key(nid);
rocksdb::Status s;
if(cfh){
s = db_->Delete(rocksdb::WriteOptions(), cfh, key);
} else {
s = db_->Delete(rocksdb::WriteOptions(), key);
}
if(!s.ok()) {
SERVER_LOG_ERROR << "ID mapper failed to delete:" << s.ToString();
return SERVER_UNEXPECTED_ERROR;
}
return SERVER_SUCCESS;
}
ServerError RocksIdMapper::DeleteGroup(const std::string& group) {
if(db_ == nullptr) {
return SERVER_NULL_POINTER;
}
rocksdb::ColumnFamilyHandle *cfh = nullptr;
if(column_handles_.count(group) != 0) {
cfh = column_handles_.at(group);
}
if(cfh) {
db_->DropColumnFamily(cfh);
db_->DestroyColumnFamilyHandle(cfh);
column_handles_.erase(group);
}
return SERVER_SUCCESS;
}
}
}
}
\ No newline at end of file
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include "utils/Error.h"
#include "VecIdMapper.h"
#include "rocksdb/db.h"
#include <string>
#include <vector>
#include <unordered_map>
namespace zilliz {
namespace vecwise {
namespace server {
class RocksIdMapper : public IVecIdMapper{
public:
RocksIdMapper();
~RocksIdMapper();
ServerError Put(const std::string& nid, const std::string& sid, const std::string& group = "") override;
ServerError Put(const std::vector<std::string>& nid, const std::vector<std::string>& sid, const std::string& group = "") override;
ServerError Get(const std::string& nid, std::string& sid, const std::string& group = "") const override;
ServerError Get(const std::vector<std::string>& nid, std::vector<std::string>& sid, const std::string& group = "") const override;
ServerError Delete(const std::string& nid, const std::string& group = "") override;
ServerError DeleteGroup(const std::string& group) override;
private:
void OpenDb();
void CloseDb();
private:
rocksdb::DB* db_;
std::unordered_map<std::string, rocksdb::ColumnFamilyHandle*> column_handles_;
};
}
}
}
......@@ -5,6 +5,7 @@
******************************************************************************/
#include "VecIdMapper.h"
#include "RocksIdMapper.h"
#include "ServerConfig.h"
#include "utils/Log.h"
#include "utils/CommonUtil.h"
......@@ -13,6 +14,8 @@
#include "rocksdb/slice.h"
#include "rocksdb/options.h"
#include <exception>
namespace zilliz {
namespace vecwise {
namespace server {
......@@ -36,26 +39,33 @@ SimpleIdMapper::~SimpleIdMapper() {
}
ServerError SimpleIdMapper::Put(const std::string& nid, const std::string& sid) {
ids_[nid] = sid;
//not thread-safe
ServerError SimpleIdMapper::Put(const std::string& nid, const std::string& sid, const std::string& group) {
ID_MAPPING& mapping = id_groups_[group];
mapping[nid] = sid;
return SERVER_SUCCESS;
}
ServerError SimpleIdMapper::Put(const std::vector<std::string>& nid, const std::vector<std::string>& sid) {
//not thread-safe
ServerError SimpleIdMapper::Put(const std::vector<std::string>& nid, const std::vector<std::string>& sid, const std::string& group) {
if(nid.size() != sid.size()) {
return SERVER_INVALID_ARGUMENT;
}
ID_MAPPING& mapping = id_groups_[group];
for(size_t i = 0; i < nid.size(); i++) {
ids_[nid[i]] = sid[i];
mapping[nid[i]] = sid[i];
}
return SERVER_SUCCESS;
}
ServerError SimpleIdMapper::Get(const std::string& nid, std::string& sid) const {
auto iter = ids_.find(nid);
if(iter == ids_.end()) {
//not thread-safe
ServerError SimpleIdMapper::Get(const std::string& nid, std::string& sid, const std::string& group) const {
ID_MAPPING& mapping = id_groups_[group];
auto iter = mapping.find(nid);
if(iter == mapping.end()) {
return SERVER_INVALID_ARGUMENT;
}
......@@ -64,13 +74,16 @@ ServerError SimpleIdMapper::Get(const std::string& nid, std::string& sid) const
return SERVER_SUCCESS;
}
ServerError SimpleIdMapper::Get(const std::vector<std::string>& nid, std::vector<std::string>& sid) const {
//not thread-safe
ServerError SimpleIdMapper::Get(const std::vector<std::string>& nid, std::vector<std::string>& sid, const std::string& group) const {
sid.clear();
ID_MAPPING& mapping = id_groups_[group];
ServerError err = SERVER_SUCCESS;
for(size_t i = 0; i < nid.size(); i++) {
auto iter = ids_.find(nid[i]);
if(iter == ids_.end()) {
auto iter = mapping.find(nid[i]);
if(iter == mapping.end()) {
sid.push_back("");
SERVER_LOG_ERROR << "ID mapper failed to find id: " << nid[i];
err = SERVER_INVALID_ARGUMENT;
......@@ -83,120 +96,16 @@ ServerError SimpleIdMapper::Get(const std::vector<std::string>& nid, std::vector
return err;
}
ServerError SimpleIdMapper::Delete(const std::string& nid) {
ids_.erase(nid);
//not thread-safe
ServerError SimpleIdMapper::Delete(const std::string& nid, const std::string& group) {
ID_MAPPING& mapping = id_groups_[group];
mapping.erase(nid);
return SERVER_SUCCESS;
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
RocksIdMapper::RocksIdMapper() {
ConfigNode& config = ServerConfig::GetInstance().GetConfig(CONFIG_DB);
std::string db_path = config.GetValue(CONFIG_DB_PATH);
db_path += "/id_mapping";
CommonUtil::CreateDirectory(db_path);
rocksdb::Options options;
// Optimize RocksDB. This is the easiest way to get RocksDB to perform well
options.IncreaseParallelism();
options.OptimizeLevelStyleCompaction();
// create the DB if it's not already present
options.create_if_missing = true;
options.max_open_files = config.GetInt32Value(CONFIG_DB_IDMAPPER_MAX_FILE, 128);
// open DB
rocksdb::Status s = rocksdb::DB::Open(options, db_path, &db_);
if(!s.ok()) {
SERVER_LOG_ERROR << "ID mapper failed to initialize:" << s.ToString();
db_ = nullptr;
}
}
RocksIdMapper::~RocksIdMapper() {
if(db_) {
db_->Close();
delete db_;
}
}
ServerError RocksIdMapper::Put(const std::string& nid, const std::string& sid) {
if(db_ == nullptr) {
return SERVER_NULL_POINTER;
}
rocksdb::Slice key(nid);
rocksdb::Slice value(sid);
rocksdb::Status s = db_->Put(rocksdb::WriteOptions(), key, value);
if(!s.ok()) {
SERVER_LOG_ERROR << "ID mapper failed to put:" << s.ToString();
return SERVER_UNEXPECTED_ERROR;
}
return SERVER_SUCCESS;
}
ServerError RocksIdMapper::Put(const std::vector<std::string>& nid, const std::vector<std::string>& sid) {
if(nid.size() != sid.size()) {
return SERVER_INVALID_ARGUMENT;
}
ServerError err = SERVER_SUCCESS;
for(size_t i = 0; i < nid.size(); i++) {
err = Put(nid[i], sid[i]);
if(err != SERVER_SUCCESS) {
return err;
}
}
return err;
}
ServerError RocksIdMapper::Get(const std::string& nid, std::string& sid) const {
if(db_ == nullptr) {
return SERVER_NULL_POINTER;
}
rocksdb::Slice key(nid);
rocksdb::Status s = db_->Get(rocksdb::ReadOptions(), key, &sid);
if(!s.ok()) {
SERVER_LOG_ERROR << "ID mapper failed to get:" << s.ToString();
return SERVER_UNEXPECTED_ERROR;
}
return SERVER_SUCCESS;
}
ServerError RocksIdMapper::Get(const std::vector<std::string>& nid, std::vector<std::string>& sid) const {
sid.clear();
ServerError err = SERVER_SUCCESS;
for(size_t i = 0; i < nid.size(); i++) {
std::string str_id;
ServerError temp_err = Get(nid[i], str_id);
if(temp_err != SERVER_SUCCESS) {
sid.push_back("");
SERVER_LOG_ERROR << "ID mapper failed to get id: " << nid[i];
err = temp_err;
continue;
}
sid.push_back(str_id);
}
return err;
}
ServerError RocksIdMapper::Delete(const std::string& nid) {
if(db_ == nullptr) {
return SERVER_NULL_POINTER;
}
rocksdb::Slice key(nid);
rocksdb::Status s = db_->Delete(rocksdb::WriteOptions(), key);
if(!s.ok()) {
SERVER_LOG_ERROR << "ID mapper failed to delete:" << s.ToString();
return SERVER_UNEXPECTED_ERROR;
}
//not thread-safe
ServerError SimpleIdMapper::DeleteGroup(const std::string& group) {
id_groups_.erase(group);
return SERVER_SUCCESS;
}
......
......@@ -25,14 +25,15 @@ public:
virtual ~IVecIdMapper(){}
virtual ServerError Put(const std::string& nid, const std::string& sid) = 0;
virtual ServerError Put(const std::vector<std::string>& nid, const std::vector<std::string>& sid) = 0;
virtual ServerError Put(const std::string& nid, const std::string& sid, const std::string& group = "") = 0;
virtual ServerError Put(const std::vector<std::string>& nid, const std::vector<std::string>& sid, const std::string& group = "") = 0;
virtual ServerError Get(const std::string& nid, std::string& sid) const = 0;
virtual ServerError Get(const std::string& nid, std::string& sid, const std::string& group = "") const = 0;
//NOTE: the 'sid' will be cleared at begin of the function
virtual ServerError Get(const std::vector<std::string>& nid, std::vector<std::string>& sid) const = 0;
virtual ServerError Get(const std::vector<std::string>& nid, std::vector<std::string>& sid, const std::string& group = "") const = 0;
virtual ServerError Delete(const std::string& nid) = 0;
virtual ServerError Delete(const std::string& nid, const std::string& group = "") = 0;
virtual ServerError DeleteGroup(const std::string& group) = 0;
};
class SimpleIdMapper : public IVecIdMapper{
......@@ -40,33 +41,18 @@ public:
SimpleIdMapper();
~SimpleIdMapper();
ServerError Put(const std::string& nid, const std::string& sid) override;
ServerError Put(const std::vector<std::string>& nid, const std::vector<std::string>& sid) override;
ServerError Put(const std::string& nid, const std::string& sid, const std::string& group = "") override;
ServerError Put(const std::vector<std::string>& nid, const std::vector<std::string>& sid, const std::string& group = "") override;
ServerError Get(const std::string& nid, std::string& sid) const override;
ServerError Get(const std::vector<std::string>& nid, std::vector<std::string>& sid) const override;
ServerError Get(const std::string& nid, std::string& sid, const std::string& group = "") const override;
ServerError Get(const std::vector<std::string>& nid, std::vector<std::string>& sid, const std::string& group = "") const override;
ServerError Delete(const std::string& nid) override;
ServerError Delete(const std::string& nid, const std::string& group = "") override;
ServerError DeleteGroup(const std::string& group) override;
private:
std::unordered_map<std::string, std::string> ids_;
};
class RocksIdMapper : public IVecIdMapper{
public:
RocksIdMapper();
~RocksIdMapper();
ServerError Put(const std::string& nid, const std::string& sid) override;
ServerError Put(const std::vector<std::string>& nid, const std::vector<std::string>& sid) override;
ServerError Get(const std::string& nid, std::string& sid) const override;
ServerError Get(const std::vector<std::string>& nid, std::vector<std::string>& sid) const override;
ServerError Delete(const std::string& nid) override;
private:
rocksdb::DB* db_;
using ID_MAPPING = std::unordered_map<std::string, std::string>;
mutable std::unordered_map<std::string, ID_MAPPING> id_groups_;
};
}
......
......@@ -6,7 +6,6 @@
#include "VecServiceHandler.h"
#include "VecServiceTask.h"
#include "ServerConfig.h"
#include "VecIdMapper.h"
#include "utils/Log.h"
#include "utils/CommonUtil.h"
#include "utils/TimeRecorder.h"
......@@ -23,12 +22,12 @@ namespace {
public:
TimeRecordWrapper(const std::string& func_name)
: recorder_(func_name), func_name_(func_name) {
SERVER_LOG_TRACE << func_name << " called";
//SERVER_LOG_TRACE << func_name << " called";
}
~TimeRecordWrapper() {
recorder_.Elapse("cost");
SERVER_LOG_TRACE << func_name_ << " finished";
//SERVER_LOG_TRACE << func_name_ << " finished";
}
private:
......@@ -98,9 +97,9 @@ namespace {
void
VecServiceHandler::add_group(const VecGroup &group) {
TimeRecordWrapper rc("add_group()");
SERVER_LOG_TRACE << "group.id = " << group.id << ", group.dimension = " << group.dimension
<< ", group.index_type = " << group.index_type;
std::string info = "add_group() " + group.id + " dimension = " + std::to_string(group.dimension)
+ " index_type = " + std::to_string(group.index_type);
TimeRecordWrapper rc(info);
BaseTaskPtr task_ptr = AddGroupTask::Create(group.dimension, group.id);
ExecTask(task_ptr);
......@@ -108,8 +107,7 @@ VecServiceHandler::add_group(const VecGroup &group) {
void
VecServiceHandler::get_group(VecGroup &_return, const std::string &group_id) {
TimeRecordWrapper rc("get_group()");
SERVER_LOG_TRACE << "group_id = " << group_id;
TimeRecordWrapper rc("get_group() " + group_id);
_return.id = group_id;
BaseTaskPtr task_ptr = GetGroupTask::Create(group_id, _return.dimension);
......@@ -118,8 +116,7 @@ VecServiceHandler::get_group(VecGroup &_return, const std::string &group_id) {
void
VecServiceHandler::del_group(const std::string &group_id) {
TimeRecordWrapper rc("del_group()");
SERVER_LOG_TRACE << "group_id = " << group_id;
TimeRecordWrapper rc("del_group() " + group_id);
BaseTaskPtr task_ptr = DeleteGroupTask::Create(group_id);
ExecTask(task_ptr);
......@@ -128,8 +125,7 @@ VecServiceHandler::del_group(const std::string &group_id) {
void
VecServiceHandler::add_vector(std::string& _return, const std::string &group_id, const VecTensor &tensor) {
TimeRecordWrapper rc("add_vector()");
SERVER_LOG_TRACE << "group_id = " << group_id << ", vector size = " << tensor.tensor.size();
TimeRecordWrapper rc("add_vector() to " + group_id);
BaseTaskPtr task_ptr = AddVectorTask::Create(group_id, &tensor, _return);
ExecTask(task_ptr);
......@@ -139,9 +135,7 @@ void
VecServiceHandler::add_vector_batch(std::vector<std::string> & _return,
const std::string &group_id,
const VecTensorList &tensor_list) {
TimeRecordWrapper rc("add_vector_batch()");
SERVER_LOG_TRACE << "group_id = " << group_id << ", vector list size = "
<< tensor_list.tensor_list.size();
TimeRecordWrapper rc("add_vector_batch() to " + group_id);
BaseTaskPtr task_ptr = AddBatchVectorTask::Create(group_id, &tensor_list, _return);
ExecTask(task_ptr);
......@@ -151,8 +145,7 @@ void
VecServiceHandler::add_binary_vector(std::string& _return,
const std::string& group_id,
const VecBinaryTensor& tensor) {
TimeRecordWrapper rc("add_binary_vector()");
SERVER_LOG_TRACE << "group_id = " << group_id << ", vector size = " << tensor.tensor.size()/4;
TimeRecordWrapper rc("add_binary_vector() to " + group_id);
BaseTaskPtr task_ptr = AddVectorTask::Create(group_id, &tensor, _return);
ExecTask(task_ptr);
......@@ -162,9 +155,7 @@ void
VecServiceHandler::add_binary_vector_batch(std::vector<std::string> & _return,
const std::string& group_id,
const VecBinaryTensorList& tensor_list) {
TimeRecordWrapper rc("add_binary_vector_batch()");
SERVER_LOG_TRACE << "group_id = " << group_id << ", vector list size = "
<< tensor_list.tensor_list.size();
TimeRecordWrapper rc("add_binary_vector_batch() to " + group_id);
BaseTaskPtr task_ptr = AddBatchVectorTask::Create(group_id, &tensor_list, _return);
ExecTask(task_ptr);
......@@ -176,9 +167,7 @@ VecServiceHandler::search_vector(VecSearchResult &_return,
const int64_t top_k,
const VecTensor &tensor,
const VecSearchFilter& filter) {
TimeRecordWrapper rc("search_vector()");
SERVER_LOG_TRACE << "group_id = " << group_id << ", top_k = " << top_k
<< ", vector dimension = " << tensor.tensor.size();
TimeRecordWrapper rc("search_vector() in " + group_id);
VecTensorList tensor_list;
tensor_list.tensor_list.push_back(tensor);
......@@ -199,9 +188,7 @@ VecServiceHandler::search_vector_batch(VecSearchResultList &_return,
const int64_t top_k,
const VecTensorList &tensor_list,
const VecSearchFilter& filter) {
TimeRecordWrapper rc("search_vector_batch()");
SERVER_LOG_TRACE << "group_id = " << group_id << ", top_k = " << top_k
<< ", vector list size = " << tensor_list.tensor_list.size();
TimeRecordWrapper rc("search_vector_batch() in " + group_id);
BaseTaskPtr task_ptr = SearchVectorTask::Create(group_id, top_k, &tensor_list, filter, _return);
ExecTask(task_ptr);
......@@ -213,9 +200,7 @@ VecServiceHandler::search_binary_vector(VecSearchResult& _return,
const int64_t top_k,
const VecBinaryTensor& tensor,
const VecSearchFilter& filter) {
TimeRecordWrapper rc("search_binary_vector()");
SERVER_LOG_TRACE << "group_id = " << group_id << ", top_k = " << top_k
<< ", vector dimension = " << tensor.tensor.size();
TimeRecordWrapper rc("search_binary_vector() in " + group_id);
VecBinaryTensorList tensor_list;
tensor_list.tensor_list.push_back(tensor);
......@@ -236,9 +221,7 @@ VecServiceHandler::search_binary_vector_batch(VecSearchResultList& _return,
const int64_t top_k,
const VecBinaryTensorList& tensor_list,
const VecSearchFilter& filter) {
TimeRecordWrapper rc("search_binary_vector_batch()");
SERVER_LOG_TRACE << "group_id = " << group_id << ", top_k = " << top_k
<< ", vector list size = " << tensor_list.tensor_list.size();
TimeRecordWrapper rc("search_binary_vector_batch() in " + group_id);
BaseTaskPtr task_ptr = SearchVectorTask::Create(group_id, top_k, &tensor_list, filter, _return);
ExecTask(task_ptr);
......
......@@ -91,11 +91,10 @@ ServerError AddGroupTask::OnExecute() {
group_info.dimension = (size_t)dimension_;
group_info.group_id = group_id_;
engine::Status stat = DB()->add_group(group_info);
if(!stat.ok()) {
error_code_ = SERVER_UNEXPECTED_ERROR;
error_msg_ = "Engine failed: " + stat.ToString();
if(!stat.ok()) {//could exist
SERVER_LOG_ERROR << "Engine failed: " << stat.ToString();
SERVER_LOG_ERROR << error_msg_;
return error_code_;
return SERVER_SUCCESS;
}
} catch (std::exception& ex) {
......@@ -161,6 +160,9 @@ ServerError DeleteGroupTask::OnExecute() {
error_code_ = SERVER_NOT_IMPLEMENT;
error_msg_ = "delete group not implemented";
SERVER_LOG_ERROR << error_msg_;
//IVecIdMapper::GetInstance()->DeleteGroup(group_id_);
return SERVER_NOT_IMPLEMENT;
}
......@@ -241,27 +243,7 @@ const AttribMap& AddVectorTask::GetVecAttrib() const {
ServerError AddVectorTask::OnExecute() {
try {
engine::meta::GroupSchema group_info;
group_info.group_id = group_id_;
engine::Status stat = DB()->get_group(group_info);
if(!stat.ok()) {
error_code_ = SERVER_GROUP_NOT_EXIST;
error_msg_ = "Engine failed: " + stat.ToString();
SERVER_LOG_ERROR << error_msg_;
return error_code_;
}
uint64_t group_dim = group_info.dimension;
uint64_t vec_dim = GetVecDimension();
if(group_dim != vec_dim) {
SERVER_LOG_ERROR << "Invalid vector dimension: " << vec_dim
<< " vs. group dimension:" << group_dim;
error_code_ = SERVER_INVALID_VECTOR_DIMENSION;
error_msg_ = "Engine failed: " + stat.ToString();
SERVER_LOG_ERROR << error_msg_;
return error_code_;
}
std::vector<float> vec_f;
vec_f.resize(vec_dim);
const double* d_p = GetVecData();
......@@ -270,7 +252,7 @@ ServerError AddVectorTask::OnExecute() {
}
engine::IDNumbers vector_ids;
stat = DB()->add_vectors(group_id_, 1, vec_f.data(), vector_ids);
engine::Status stat = DB()->add_vectors(group_id_, 1, vec_f.data(), vector_ids);
if(!stat.ok()) {
error_code_ = SERVER_UNEXPECTED_ERROR;
error_msg_ = "Engine failed: " + stat.ToString();
......@@ -278,7 +260,8 @@ ServerError AddVectorTask::OnExecute() {
return error_code_;
} else {
if(vector_ids.empty()) {
SERVER_LOG_ERROR << "Vector ID not returned";
error_msg_ = "Engine failed: " + stat.ToString();
SERVER_LOG_ERROR << error_msg_;
return SERVER_UNEXPECTED_ERROR;
} else {
std::string uid = GetVecID();
......@@ -294,8 +277,8 @@ ServerError AddVectorTask::OnExecute() {
attrib[VECTOR_UID] = tensor_id_;
std::string attrib_str;
AttributeSerializer::Encode(attrib, attrib_str);
IVecIdMapper::GetInstance()->Put(nid, attrib_str);
SERVER_LOG_TRACE << "nid = " << vector_ids[0] << ", uid = " << uid;
IVecIdMapper::GetInstance()->Put(nid, attrib_str, group_id_);
//SERVER_LOG_TRACE << "nid = " << vector_ids[0] << ", uid = " << uid;
}
}
......@@ -429,7 +412,7 @@ void AddBatchVectorTask::ProcessIdMapping(engine::IDNumbers& vector_ids,
attrib[VECTOR_UID] = uid;
std::string attrib_str;
AttributeSerializer::Encode(attrib, attrib_str);
IVecIdMapper::GetInstance()->Put(nid, attrib_str);
IVecIdMapper::GetInstance()->Put(nid, attrib_str, group_id_);
}
}
......@@ -464,7 +447,6 @@ ServerError AddBatchVectorTask::OnExecute() {
<< " vs. group dimension:" << group_dim;
error_code_ = SERVER_INVALID_VECTOR_DIMENSION;
error_msg_ = "Engine failed: " + stat.ToString();
SERVER_LOG_ERROR << error_msg_;
return error_code_;
}
......@@ -645,7 +627,9 @@ ServerError SearchVectorTask::OnExecute() {
if(vec_dim != group_info.dimension) {
SERVER_LOG_ERROR << "Invalid vector dimension: " << vec_dim
<< " vs. group dimension:" << group_info.dimension;
return SERVER_INVALID_ARGUMENT;
error_code_ = SERVER_INVALID_VECTOR_DIMENSION;
error_msg_ = "Engine failed: " + stat.ToString();
return error_code_;
}
rc.Record("check group dimension");
......@@ -679,18 +663,30 @@ ServerError SearchVectorTask::OnExecute() {
for(auto id : res) {
std::string attrib_str;
std::string nid = nid_prefix + std::to_string(id);
IVecIdMapper::GetInstance()->Get(nid, attrib_str);
IVecIdMapper::GetInstance()->Get(nid, attrib_str, group_id_);
AttribMap attrib_map;
AttributeSerializer::Decode(attrib_str, attrib_map);
AttribMap attrib_return;
VecSearchResultItem item;
item.__set_attrib(attrib_map);
item.uid = item.attrib[VECTOR_UID];
item.uid = attrib_map[VECTOR_UID];
if(filter_.return_attribs.empty()) {//return all attributes
attrib_return.swap(attrib_map);
} else {//filter attributes
for(auto& name : filter_.return_attribs) {
if(attrib_map.count(name) == 0)
continue;
attrib_return[name] = attrib_map[name];
}
}
item.__set_attrib(attrib_return);
item.distance = 0.0;////TODO: return distance
v_res.result_list.emplace_back(item);
SERVER_LOG_TRACE << "nid = " << nid << ", uid = " << item.uid;
//SERVER_LOG_TRACE << "nid = " << nid << ", uid = " << item.uid;
}
result_.result_list.push_back(v_res);
......
......@@ -10,7 +10,7 @@
#include "utils/AttributeSerializer.h"
#include "db/Types.h"
#include "thrift/gen-cpp/VectorService_types.h"
#include "thrift/gen-cpp/megasearch_types.h"
#include <condition_variable>
#include <memory>
......
......@@ -10,8 +10,8 @@
#include "utils/Log.h"
#include "thrift/gen-cpp/VectorService_types.h"
#include "thrift/gen-cpp/VectorService_constants.h"
#include "thrift/gen-cpp/megasearch_types.h"
#include "thrift/gen-cpp/megasearch_constants.h"
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/protocol/TJSONProtocol.h>
......
......@@ -9,7 +9,7 @@
#include <thrift/TDispatchProcessor.h>
#include <thrift/async/TConcurrentClientSyncInfo.h>
#include "VectorService_types.h"
#include "megasearch_types.h"
namespace zilliz {
......
......@@ -4,13 +4,13 @@
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
* @generated
*/
#include "VectorService_constants.h"
#include "megasearch_constants.h"
namespace zilliz {
const VectorServiceConstants g_VectorService_constants;
const megasearchConstants g_megasearch_constants;
VectorServiceConstants::VectorServiceConstants() {
megasearchConstants::megasearchConstants() {
}
} // namespace
......
......@@ -4,20 +4,20 @@
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
* @generated
*/
#ifndef VectorService_CONSTANTS_H
#define VectorService_CONSTANTS_H
#ifndef megasearch_CONSTANTS_H
#define megasearch_CONSTANTS_H
#include "VectorService_types.h"
#include "megasearch_types.h"
namespace zilliz {
class VectorServiceConstants {
class megasearchConstants {
public:
VectorServiceConstants();
megasearchConstants();
};
extern const VectorServiceConstants g_VectorService_constants;
extern const megasearchConstants g_megasearch_constants;
} // namespace
......
......@@ -4,7 +4,7 @@
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
* @generated
*/
#include "VectorService_types.h"
#include "megasearch_types.h"
#include <algorithm>
#include <ostream>
......
......@@ -4,8 +4,8 @@
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
* @generated
*/
#ifndef VectorService_TYPES_H
#define VectorService_TYPES_H
#ifndef megasearch_TYPES_H
#define megasearch_TYPES_H
#include <iosfwd>
......
......@@ -743,9 +743,9 @@ class VecSearchResultList(object):
class VecDateTime(object):
"""
second; Seconds. [0-60] (1 leap second)
minute; Minutes. [0-59]
hour; Hours. [0-23]
second; Seconds. [0-59] reserved
minute; Minutes. [0-59] reserved
hour; Hours. [0-23] reserved
day; Day. [1-31]
month; Month. [0-11]
year; Year - 1900.
......@@ -874,9 +874,9 @@ class VecDateTime(object):
class VecTimeRange(object):
"""
time_begin; time range begin
begine_closed; true means '[', false means '('
begine_closed; true means '[', false means '(' reserved
time_end; set to true to return tensor double array
end_closed; time range end
end_closed; time range end reserved
Attributes:
- time_begin
......@@ -977,7 +977,7 @@ class VecTimeRange(object):
class VecSearchFilter(object):
"""
attrib_filter; search condition, for example: "color=red"
attrib_filter; reserved
time_ranges; search condition, for example: "date between 1999-02-12 and 2008-10-14"
return_attribs; specify required attribute names
......
......@@ -69,9 +69,9 @@ struct VecSearchResultList {
}
/**
* second; Seconds. [0-60] (1 leap second)
* minute; Minutes. [0-59]
* hour; Hours. [0-23]
* second; Seconds. [0-59] reserved
* minute; Minutes. [0-59] reserved
* hour; Hours. [0-23] reserved
* day; Day. [1-31]
* month; Month. [0-11]
* year; Year - 1900.
......@@ -87,9 +87,9 @@ struct VecDateTime {
/**
* time_begin; time range begin
* begine_closed; true means '[', false means '('
* begine_closed; true means '[', false means '(' reserved
* time_end; set to true to return tensor double array
* end_closed; time range end
* end_closed; time range end reserved
*/
struct VecTimeRange {
1: required VecDateTime time_begin;
......@@ -99,7 +99,7 @@ struct VecTimeRange {
}
/**
* attrib_filter; search condition, for example: "color=red"
* attrib_filter; reserved
* time_ranges; search condition, for example: "date between 1999-02-12 and 2008-10-14"
* return_attribs; specify required attribute names
*/
......
......@@ -16,8 +16,8 @@ set(util_files
set(service_files
../src/thrift/gen-cpp/VecService.cpp
../src/thrift/gen-cpp/VectorService_constants.cpp
../src/thrift/gen-cpp/VectorService_types.cpp)
../src/thrift/gen-cpp/megasearch_constants.cpp
../src/thrift/gen-cpp/megasearch_types.cpp)
......
......@@ -6,8 +6,8 @@
#include "ClientSession.h"
#include "Log.h"
#include "thrift/gen-cpp/VectorService_types.h"
#include "thrift/gen-cpp/VectorService_constants.h"
#include "thrift/gen-cpp/megasearch_types.h"
#include "thrift/gen-cpp/megasearch_constants.h"
#include <exception>
......
......@@ -21,7 +21,7 @@ using namespace zilliz::vecwise::client;
namespace {
static const int32_t VEC_DIMENSION = 256;
static const int64_t BATCH_COUNT = 1000;
static const int64_t BATCH_COUNT = 10000;
static const int64_t REPEAT_COUNT = 1;
static const int64_t TOP_K = 10;
......@@ -130,15 +130,18 @@ TEST(AddVector, CLIENT_TEST) {
try {
std::string id;
zilliz::VecTensor tensor;
for(int32_t i = 0; i < VEC_DIMENSION; i++) {
tensor.tensor.push_back(0.5);
}
session.interface()->add_vector(id, GetGroupID(), tensor);
} catch (zilliz::VecException& ex) {
ASSERT_EQ(ex.code, zilliz::VecErrCode::GROUP_NOT_EXISTS);
ASSERT_EQ(ex.code, zilliz::VecErrCode::ILLEGAL_ARGUMENT);
}
try {
VecGroup temp_group;
session.interface()->get_group(temp_group, GetGroupID());
ASSERT_TRUE(temp_group.id.empty());
//ASSERT_TRUE(temp_group.id.empty());
} catch (zilliz::VecException& ex) {
ASSERT_EQ(ex.code, zilliz::VecErrCode::GROUP_NOT_EXISTS);
}
......@@ -231,7 +234,6 @@ TEST(SearchVector, CLIENT_TEST) {
//search vector
{
const int32_t anchor_index = 100;
server::TimeRecorder rc("Search top_k");
VecTensor tensor;
for (int32_t i = 0; i < VEC_DIMENSION; i++) {
tensor.tensor.push_back((double) (i + anchor_index));
......@@ -249,35 +251,60 @@ TEST(SearchVector, CLIENT_TEST) {
time_ranges.emplace_back(range);
filter.__set_time_ranges(time_ranges);
//do search
session.interface()->search_vector(res, GetGroupID(), TOP_K, tensor, filter);
//normal search
{
server::TimeRecorder rc("Search top_k");
session.interface()->search_vector(res, GetGroupID(), TOP_K, tensor, filter);
rc.Elapse("done!");
//build result
std::cout << "Search result: " << std::endl;
for(VecSearchResultItem& item : res.result_list) {
std::cout << "\t" << item.uid << std::endl;
//build result
std::cout << "Search result: " << std::endl;
for (VecSearchResultItem &item : res.result_list) {
std::cout << "\t" << item.uid << std::endl;
ASSERT_TRUE(item.attrib.count(TEST_ATTRIB_NUM) != 0);
ASSERT_TRUE(item.attrib.count(TEST_ATTRIB_COMMENT) != 0);
ASSERT_TRUE(!item.attrib[TEST_ATTRIB_COMMENT].empty());
ASSERT_TRUE(item.attrib.count(TEST_ATTRIB_NUM) != 0);
ASSERT_TRUE(item.attrib.count(TEST_ATTRIB_COMMENT) != 0);
ASSERT_TRUE(!item.attrib[TEST_ATTRIB_COMMENT].empty());
}
ASSERT_EQ(res.result_list.size(), (uint64_t) TOP_K);
if (!res.result_list.empty()) {
ASSERT_TRUE(!res.result_list[0].uid.empty());
}
}
rc.Elapse("done!");
ASSERT_EQ(res.result_list.size(), (uint64_t)TOP_K);
if(!res.result_list.empty()) {
ASSERT_TRUE(!res.result_list[0].uid.empty());
//filter attribute search
{
std::vector<std::string> require_attributes = {TEST_ATTRIB_COMMENT};
filter.__set_return_attribs(require_attributes);
server::TimeRecorder rc("Search top_k with attribute filter");
session.interface()->search_vector(res, GetGroupID(), TOP_K, tensor, filter);
rc.Elapse("done!");
//build result
std::cout << "Search result attributes: " << std::endl;
for (VecSearchResultItem &item : res.result_list) {
ASSERT_EQ(item.attrib.size(), 1UL);
ASSERT_TRUE(item.attrib.count(TEST_ATTRIB_COMMENT) != 0);
ASSERT_TRUE(!item.attrib[TEST_ATTRIB_COMMENT].empty());
std::cout << "\t" << item.uid << ":" << item.attrib[TEST_ATTRIB_COMMENT] << std::endl;
}
ASSERT_EQ(res.result_list.size(), (uint64_t) TOP_K);
}
//empty search
date.day > 0 ? date.day -= 1 : date.day += 1;
range.time_begin = date;
range.time_end = date;
time_ranges.clear();
time_ranges.emplace_back(range);
filter.__set_time_ranges(time_ranges);
session.interface()->search_vector(res, GetGroupID(), TOP_K, tensor, filter);
ASSERT_EQ(res.result_list.size(), 0);
{
date.day > 0 ? date.day -= 1 : date.day += 1;
range.time_begin = date;
range.time_end = date;
time_ranges.clear();
time_ranges.emplace_back(range);
filter.__set_time_ranges(time_ranges);
session.interface()->search_vector(res, GetGroupID(), TOP_K, tensor, filter);
ASSERT_EQ(res.result_list.size(), 0);
}
}
//search binary vector
......
......@@ -14,6 +14,7 @@ link_directories("/usr/local/cuda/lib64")
set(require_files
../../src/server/VecIdMapper.cpp
../../src/server/RocksIdMapper.cpp
../../src/server/ServerConfig.cpp
../../src/utils/CommonUtil.cpp
../../src/utils/TimeRecorder.cpp
......
......@@ -16,6 +16,7 @@ aux_source_directory(./ test_srcs)
set(require_files
../../src/server/VecIdMapper.cpp
../../src/server/RocksIdMapper.cpp
../../src/server/ServerConfig.cpp
../../src/utils/CommonUtil.cpp
../../src/utils/TimeRecorder.cpp
......
......@@ -11,12 +11,13 @@ using namespace zilliz::vecwise;
TEST(CommonTest, COMMON_TEST) {
std::string path1 = "/tmp/vecwise_test/common_test_12345/";
std::string path2 = path1 + "abcdef";
server::ServerError err = server::CommonUtil::CreateDirectory(path2);
std::string path1 = "/tmp/vecwise_test/";
std::string path2 = path1 + "common_test_12345/";
std::string path3 = path2 + "abcdef";
server::ServerError err = server::CommonUtil::CreateDirectory(path3);
ASSERT_EQ(err, server::SERVER_SUCCESS);
ASSERT_TRUE(server::CommonUtil::IsDirectoryExit(path2));
ASSERT_TRUE(server::CommonUtil::IsDirectoryExit(path3));
err = server::CommonUtil::DeleteDirectory(path1);
ASSERT_EQ(err, server::SERVER_SUCCESS);
......
......@@ -9,44 +9,93 @@
#include "utils/TimeRecorder.h"
#include "utils/CommonUtil.h"
#include <time.h>
using namespace zilliz::vecwise;
namespace {
std::string CurrentTime() {
time_t tt;
time(&tt);
tt = tt + 8 * 3600;
tm *t = gmtime(&tt);
std::string str = std::to_string(t->tm_year + 1900) + "_" + std::to_string(t->tm_mon + 1)
+ "_" + std::to_string(t->tm_mday) + "_" + std::to_string(t->tm_hour)
+ "_" + std::to_string(t->tm_min) + "_" + std::to_string(t->tm_sec);
return str;
}
std::string GetGroupID() {
static std::string s_id(CurrentTime());
return s_id;
}
}
TEST(IdMapperTest, IDMAPPER_TEST) {
server::IVecIdMapper* mapper = server::IVecIdMapper::GetInstance();
std::string group_id = GetGroupID();
std::vector<std::string> nid = {"1", "50", "900", "10000"};
std::vector<std::string> sid = {"one", "fifty", "nine zero zero", "many"};
server::ServerError err = mapper->Put(nid, sid);
server::ServerError err = mapper->Put(nid, sid, group_id);
ASSERT_EQ(err, server::SERVER_SUCCESS);
err = mapper->Put(nid, std::vector<std::string>());
err = mapper->Put(nid, std::vector<std::string>(), group_id);
ASSERT_NE(err, server::SERVER_SUCCESS);
std::vector<std::string> res;
err = mapper->Get(nid, res);
err = mapper->Get(nid, res, group_id);
ASSERT_EQ(res.size(), nid.size());
for(size_t i = 0; i < res.size(); i++) {
ASSERT_EQ(res[i], sid[i]);
}
std::string str_id;
err = mapper->Get(nid[1], str_id);
err = mapper->Get(nid[1], str_id, group_id);
ASSERT_EQ(str_id, "fifty");
err = mapper->Delete(nid[2]);
err = mapper->Get(nid[1], str_id);
ASSERT_EQ(str_id, "");
err = mapper->Get(nid[2], str_id, group_id);
ASSERT_EQ(str_id, "nine zero zero");
err = mapper->Delete(nid[2], group_id);
ASSERT_EQ(err, server::SERVER_SUCCESS);
err = mapper->Get(nid[2], str_id);
ASSERT_NE(err, server::SERVER_SUCCESS);
err = mapper->Get(nid[2], str_id, group_id);
ASSERT_EQ(str_id, "");
err = mapper->Get(nid[3], str_id, group_id);
ASSERT_EQ(str_id, "many");
err = mapper->DeleteGroup(group_id);
ASSERT_EQ(err, server::SERVER_SUCCESS);
err = mapper->Get(nid[3], str_id, group_id);
ASSERT_EQ(str_id, "");
std::string ct = CurrentTime();
err = mapper->Put("current_time", ct, "time");
ASSERT_EQ(err, server::SERVER_SUCCESS);
err = mapper->Get("current_time", str_id, "time");
ASSERT_EQ(str_id, ct);
//test performance
nid.clear();
sid.clear();
const int64_t count = 1000000;
for(int64_t i = 0; i < count; i++) {
nid.push_back(std::to_string(i+100000));
sid.push_back("val_" + std::to_string(i));
{
server::TimeRecorder rc("prepare id data");
for (int64_t i = 0; i < count; i++) {
nid.push_back(std::to_string(i + 100000));
sid.push_back("val_" + std::to_string(i));
}
rc.Record("done!");
}
{
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册