提交 56291b21 编写于 作者: G groot

use rocksdb column family


Former-commit-id: 8cbc5d91e02e6bbc0eff1e329b4c76687d2c1f74
上级 354f68a9
server_config: server_config:
address: 0.0.0.0 address: 127.0.0.1
port: 33001 port: 33001
transfer_protocol: json #optional: binary, compact, json, debug transfer_protocol: json #optional: binary, compact, json, debug
server_mode: thread_pool #optional: simple, thread_pool server_mode: thread_pool #optional: simple, thread_pool
db_config: db_config:
db_path: /var/vecwise db_path: /tmp/vecwise
db_backend_url: http://127.0.0.1 db_backend_url: http://127.0.0.1
db_flush_interval: 5 #unit: second db_flush_interval: 5 #unit: second
idmapper_max_open_file: 128 idmapper_max_open_file: 128
...@@ -13,7 +13,7 @@ db_config: ...@@ -13,7 +13,7 @@ db_config:
log_config: log_config:
global: global:
format: "%datetime | %level | %logger | %msg" format: "%datetime | %level | %logger | %msg"
filename: "/var/vecwise/logs/vecwise_engine-%datetime{%h:%m}-global.log" filename: "/tmp/vecwise/logs/vecwise_engine-%datetime{%h:%m}-global.log"
enabled: true enabled: true
to_file: true to_file: true
to_standard_output: true to_standard_output: true
...@@ -21,22 +21,22 @@ log_config: ...@@ -21,22 +21,22 @@ log_config:
performance_tracking: false performance_tracking: false
max_log_file_size: 2097152 # throw log files away after 2mb max_log_file_size: 2097152 # throw log files away after 2mb
debug: debug:
filename: "/var/vecwise/logs/vecwise_engine-%datetime{%h:%m}-debug.log" filename: "/tmp/vecwise/logs/vecwise_engine-%datetime{%h:%m}-debug.log"
enabled: true enabled: true
warning: warning:
filename: "/var/vecwise/logs/vecwise_engine-%datetime{%h:%m}-warning.log" filename: "/tmp/vecwise/logs/vecwise_engine-%datetime{%h:%m}-warning.log"
trace: trace:
filename: "/var/vecwise/logs/vecwise_engine-%datetime{%h:%m}-trace.log" filename: "/tmp/vecwise/logs/vecwise_engine-%datetime{%h:%m}-trace.log"
verbose: verbose:
format: "%datetime{%d/%m/%y} | %level-%vlevel | %msg" format: "%datetime{%d/%m/%y} | %level-%vlevel | %msg"
to_file: false to_file: false
to_standard_output: true to_standard_output: true
error: error:
enabled: false enabled: false
filename: "/var/vecwise/logs/vecwise_engine-%datetime{%h:%m}-error.log" filename: "/tmp/vecwise/logs/vecwise_engine-%datetime{%h:%m}-error.log"
fatal: fatal:
enabled: false enabled: false
filename: "/var/vecwise/logs/vecwise_engine-%datetime{%h:%m}-fatal.log" filename: "/tmp/vecwise/logs/vecwise_engine-%datetime{%h:%m}-fatal.log"
cache_config: cache_config:
cpu_cache_capacity: 16 # unit: GB cpu_cache_capacity: 16 # unit: GB
......
* GLOBAL: * GLOBAL:
FORMAT = "%datetime | %level | %logger | %msg" FORMAT = "%datetime | %level | %logger | %msg"
FILENAME = "/var/vecwise/logs/vecwise_engine-%datetime{%H:%m}-global.log" FILENAME = "/tmp/vecwise/logs/vecwise_engine-%datetime{%H:%m}-global.log"
ENABLED = true ENABLED = true
TO_FILE = true TO_FILE = true
TO_STANDARD_OUTPUT = true TO_STANDARD_OUTPUT = true
...@@ -8,12 +8,12 @@ ...@@ -8,12 +8,12 @@
PERFORMANCE_TRACKING = false PERFORMANCE_TRACKING = false
MAX_LOG_FILE_SIZE = 2097152 ## Throw log files away after 2MB MAX_LOG_FILE_SIZE = 2097152 ## Throw log files away after 2MB
* DEBUG: * DEBUG:
FILENAME = "/var/vecwise/logs/vecwise_engine-%datetime{%H:%m}-debug.log" FILENAME = "/tmp/vecwise/logs/vecwise_engine-%datetime{%H:%m}-debug.log"
ENABLED = true ENABLED = true
* WARNING: * WARNING:
FILENAME = "/var/vecwise/logs/vecwise_engine-%datetime{%H:%m}-warning.log" FILENAME = "/tmp/vecwise/logs/vecwise_engine-%datetime{%H:%m}-warning.log"
* TRACE: * TRACE:
FILENAME = "/var/vecwise/logs/vecwise_engine-%datetime{%H:%m}-trace.log" FILENAME = "/tmp/vecwise/logs/vecwise_engine-%datetime{%H:%m}-trace.log"
* VERBOSE: * VERBOSE:
FORMAT = "%datetime{%d/%M/%y} | %level-%vlevel | %msg" FORMAT = "%datetime{%d/%M/%y} | %level-%vlevel | %msg"
TO_FILE = false TO_FILE = false
...@@ -21,7 +21,7 @@ ...@@ -21,7 +21,7 @@
## Error logs ## Error logs
* ERROR: * ERROR:
ENABLED = false ENABLED = false
FILENAME = "/var/vecwise/logs/vecwise_engine-%datetime{%H:%m}-error.log" FILENAME = "/tmp/vecwise/logs/vecwise_engine-%datetime{%H:%m}-error.log"
* FATAL: * FATAL:
ENABLED = false ENABLED = false
FILENAME = "/var/vecwise/logs/vecwise_engine-%datetime{%H:%m}-fatal.log" FILENAME = "/tmp/vecwise/logs/vecwise_engine-%datetime{%H:%m}-fatal.log"
\ No newline at end of file \ No newline at end of file
...@@ -13,6 +13,8 @@ ...@@ -13,6 +13,8 @@
#include "rocksdb/slice.h" #include "rocksdb/slice.h"
#include "rocksdb/options.h" #include "rocksdb/options.h"
#include <exception>
namespace zilliz { namespace zilliz {
namespace vecwise { namespace vecwise {
namespace server { namespace server {
...@@ -36,26 +38,33 @@ SimpleIdMapper::~SimpleIdMapper() { ...@@ -36,26 +38,33 @@ SimpleIdMapper::~SimpleIdMapper() {
} }
ServerError SimpleIdMapper::Put(const std::string& nid, const std::string& sid) { //not thread-safe
ids_[nid] = sid; ServerError SimpleIdMapper::Put(const std::string& nid, const std::string& sid, const std::string& group) {
ID_MAPPING& mapping = id_groups_[group];
mapping[nid] = sid;
return SERVER_SUCCESS; return SERVER_SUCCESS;
} }
ServerError SimpleIdMapper::Put(const std::vector<std::string>& nid, const std::vector<std::string>& sid) { //not thread-safe
ServerError SimpleIdMapper::Put(const std::vector<std::string>& nid, const std::vector<std::string>& sid, const std::string& group) {
if(nid.size() != sid.size()) { if(nid.size() != sid.size()) {
return SERVER_INVALID_ARGUMENT; return SERVER_INVALID_ARGUMENT;
} }
ID_MAPPING& mapping = id_groups_[group];
for(size_t i = 0; i < nid.size(); i++) { for(size_t i = 0; i < nid.size(); i++) {
ids_[nid[i]] = sid[i]; mapping[nid[i]] = sid[i];
} }
return SERVER_SUCCESS; return SERVER_SUCCESS;
} }
ServerError SimpleIdMapper::Get(const std::string& nid, std::string& sid) const { //not thread-safe
auto iter = ids_.find(nid); ServerError SimpleIdMapper::Get(const std::string& nid, std::string& sid, const std::string& group) const {
if(iter == ids_.end()) { ID_MAPPING& mapping = id_groups_[group];
auto iter = mapping.find(nid);
if(iter == mapping.end()) {
return SERVER_INVALID_ARGUMENT; return SERVER_INVALID_ARGUMENT;
} }
...@@ -64,13 +73,16 @@ ServerError SimpleIdMapper::Get(const std::string& nid, std::string& sid) const ...@@ -64,13 +73,16 @@ ServerError SimpleIdMapper::Get(const std::string& nid, std::string& sid) const
return SERVER_SUCCESS; return SERVER_SUCCESS;
} }
ServerError SimpleIdMapper::Get(const std::vector<std::string>& nid, std::vector<std::string>& sid) const { //not thread-safe
ServerError SimpleIdMapper::Get(const std::vector<std::string>& nid, std::vector<std::string>& sid, const std::string& group) const {
sid.clear(); sid.clear();
ID_MAPPING& mapping = id_groups_[group];
ServerError err = SERVER_SUCCESS; ServerError err = SERVER_SUCCESS;
for(size_t i = 0; i < nid.size(); i++) { for(size_t i = 0; i < nid.size(); i++) {
auto iter = ids_.find(nid[i]); auto iter = mapping.find(nid[i]);
if(iter == ids_.end()) { if(iter == mapping.end()) {
sid.push_back(""); sid.push_back("");
SERVER_LOG_ERROR << "ID mapper failed to find id: " << nid[i]; SERVER_LOG_ERROR << "ID mapper failed to find id: " << nid[i];
err = SERVER_INVALID_ARGUMENT; err = SERVER_INVALID_ARGUMENT;
...@@ -83,8 +95,10 @@ ServerError SimpleIdMapper::Get(const std::vector<std::string>& nid, std::vector ...@@ -83,8 +95,10 @@ ServerError SimpleIdMapper::Get(const std::vector<std::string>& nid, std::vector
return err; return err;
} }
ServerError SimpleIdMapper::Delete(const std::string& nid) { //not thread-safe
ids_.erase(nid); ServerError SimpleIdMapper::Delete(const std::string& nid, const std::string& group) {
ID_MAPPING& mapping = id_groups_[group];
mapping.erase(nid);
return SERVER_SUCCESS; return SERVER_SUCCESS;
} }
...@@ -104,12 +118,43 @@ RocksIdMapper::RocksIdMapper() { ...@@ -104,12 +118,43 @@ RocksIdMapper::RocksIdMapper() {
options.create_if_missing = true; options.create_if_missing = true;
options.max_open_files = config.GetInt32Value(CONFIG_DB_IDMAPPER_MAX_FILE, 128); options.max_open_files = config.GetInt32Value(CONFIG_DB_IDMAPPER_MAX_FILE, 128);
//load column families
std::vector<std::string> column_names;
rocksdb::Status s = rocksdb::DB::ListColumnFamilies(options, db_path, &column_names);
if (!s.ok()) {
SERVER_LOG_ERROR << "ID mapper failed to initialize:" << s.ToString();
}
std::vector<rocksdb::ColumnFamilyDescriptor> column_families;
for(auto& column_name : column_names) {
rocksdb::ColumnFamilyDescriptor desc;
desc.name = column_name;
column_families.emplace_back(desc);
}
// open DB // open DB
rocksdb::Status s = rocksdb::DB::Open(options, db_path, &db_); std::vector<rocksdb::ColumnFamilyHandle*> column_handles;
s = rocksdb::DB::Open(options, db_path, column_families, &column_handles, &db_);
if(!s.ok()) { if(!s.ok()) {
SERVER_LOG_ERROR << "ID mapper failed to initialize:" << s.ToString(); SERVER_LOG_ERROR << "ID mapper failed to initialize:" << s.ToString();
db_ = nullptr; db_ = nullptr;
} }
try {
rocksdb::ColumnFamilyHandle *cf;
s = db_->CreateColumnFamily(rocksdb::ColumnFamilyOptions(), "222", &cf);
if (!s.ok()) {
SERVER_LOG_ERROR << "ID mapper failed to initialize:" << s.ToString();
}
std::vector<std::string> column_families;
s = db_->ListColumnFamilies(options, db_path, &column_families);
if (!s.ok()) {
SERVER_LOG_ERROR << "ID mapper failed to initialize:" << s.ToString();
}
} catch(std::exception& ex) {
std::cout << ex.what() << std::endl;
}
} }
RocksIdMapper::~RocksIdMapper() { RocksIdMapper::~RocksIdMapper() {
if(db_) { if(db_) {
...@@ -118,7 +163,7 @@ RocksIdMapper::~RocksIdMapper() { ...@@ -118,7 +163,7 @@ RocksIdMapper::~RocksIdMapper() {
} }
} }
ServerError RocksIdMapper::Put(const std::string& nid, const std::string& sid) { ServerError RocksIdMapper::Put(const std::string& nid, const std::string& sid, const std::string& group) {
if(db_ == nullptr) { if(db_ == nullptr) {
return SERVER_NULL_POINTER; return SERVER_NULL_POINTER;
} }
...@@ -134,7 +179,7 @@ ServerError RocksIdMapper::Put(const std::string& nid, const std::string& sid) { ...@@ -134,7 +179,7 @@ ServerError RocksIdMapper::Put(const std::string& nid, const std::string& sid) {
return SERVER_SUCCESS; return SERVER_SUCCESS;
} }
ServerError RocksIdMapper::Put(const std::vector<std::string>& nid, const std::vector<std::string>& sid) { ServerError RocksIdMapper::Put(const std::vector<std::string>& nid, const std::vector<std::string>& sid, const std::string& group) {
if(nid.size() != sid.size()) { if(nid.size() != sid.size()) {
return SERVER_INVALID_ARGUMENT; return SERVER_INVALID_ARGUMENT;
} }
...@@ -150,7 +195,7 @@ ServerError RocksIdMapper::Put(const std::vector<std::string>& nid, const std::v ...@@ -150,7 +195,7 @@ ServerError RocksIdMapper::Put(const std::vector<std::string>& nid, const std::v
return err; return err;
} }
ServerError RocksIdMapper::Get(const std::string& nid, std::string& sid) const { ServerError RocksIdMapper::Get(const std::string& nid, std::string& sid, const std::string& group) const {
if(db_ == nullptr) { if(db_ == nullptr) {
return SERVER_NULL_POINTER; return SERVER_NULL_POINTER;
} }
...@@ -165,7 +210,7 @@ ServerError RocksIdMapper::Get(const std::string& nid, std::string& sid) const { ...@@ -165,7 +210,7 @@ ServerError RocksIdMapper::Get(const std::string& nid, std::string& sid) const {
return SERVER_SUCCESS; return SERVER_SUCCESS;
} }
ServerError RocksIdMapper::Get(const std::vector<std::string>& nid, std::vector<std::string>& sid) const { ServerError RocksIdMapper::Get(const std::vector<std::string>& nid, std::vector<std::string>& sid, const std::string& group) const {
sid.clear(); sid.clear();
ServerError err = SERVER_SUCCESS; ServerError err = SERVER_SUCCESS;
...@@ -185,7 +230,7 @@ ServerError RocksIdMapper::Get(const std::vector<std::string>& nid, std::vector< ...@@ -185,7 +230,7 @@ ServerError RocksIdMapper::Get(const std::vector<std::string>& nid, std::vector<
return err; return err;
} }
ServerError RocksIdMapper::Delete(const std::string& nid) { ServerError RocksIdMapper::Delete(const std::string& nid, const std::string& group) {
if(db_ == nullptr) { if(db_ == nullptr) {
return SERVER_NULL_POINTER; return SERVER_NULL_POINTER;
} }
......
...@@ -25,14 +25,14 @@ public: ...@@ -25,14 +25,14 @@ public:
virtual ~IVecIdMapper(){} virtual ~IVecIdMapper(){}
virtual ServerError Put(const std::string& nid, const std::string& sid) = 0; virtual ServerError Put(const std::string& nid, const std::string& sid, const std::string& group = "") = 0;
virtual ServerError Put(const std::vector<std::string>& nid, const std::vector<std::string>& sid) = 0; virtual ServerError Put(const std::vector<std::string>& nid, const std::vector<std::string>& sid, const std::string& group = "") = 0;
virtual ServerError Get(const std::string& nid, std::string& sid) const = 0; virtual ServerError Get(const std::string& nid, std::string& sid, const std::string& group = "") const = 0;
//NOTE: the 'sid' will be cleared at begin of the function //NOTE: the 'sid' will be cleared at begin of the function
virtual ServerError Get(const std::vector<std::string>& nid, std::vector<std::string>& sid) const = 0; virtual ServerError Get(const std::vector<std::string>& nid, std::vector<std::string>& sid, const std::string& group = "") const = 0;
virtual ServerError Delete(const std::string& nid) = 0; virtual ServerError Delete(const std::string& nid, const std::string& group = "") = 0;
}; };
class SimpleIdMapper : public IVecIdMapper{ class SimpleIdMapper : public IVecIdMapper{
...@@ -40,16 +40,17 @@ public: ...@@ -40,16 +40,17 @@ public:
SimpleIdMapper(); SimpleIdMapper();
~SimpleIdMapper(); ~SimpleIdMapper();
ServerError Put(const std::string& nid, const std::string& sid) override; ServerError Put(const std::string& nid, const std::string& sid, const std::string& group = "") override;
ServerError Put(const std::vector<std::string>& nid, const std::vector<std::string>& sid) override; ServerError Put(const std::vector<std::string>& nid, const std::vector<std::string>& sid, const std::string& group = "") override;
ServerError Get(const std::string& nid, std::string& sid) const override; ServerError Get(const std::string& nid, std::string& sid, const std::string& group = "") const override;
ServerError Get(const std::vector<std::string>& nid, std::vector<std::string>& sid) const override; ServerError Get(const std::vector<std::string>& nid, std::vector<std::string>& sid, const std::string& group = "") const override;
ServerError Delete(const std::string& nid) override; ServerError Delete(const std::string& nid, const std::string& group = "") override;
private: private:
std::unordered_map<std::string, std::string> ids_; using ID_MAPPING = std::unordered_map<std::string, std::string>;
mutable std::unordered_map<std::string, ID_MAPPING> id_groups_;
}; };
class RocksIdMapper : public IVecIdMapper{ class RocksIdMapper : public IVecIdMapper{
...@@ -57,13 +58,13 @@ public: ...@@ -57,13 +58,13 @@ public:
RocksIdMapper(); RocksIdMapper();
~RocksIdMapper(); ~RocksIdMapper();
ServerError Put(const std::string& nid, const std::string& sid) override; ServerError Put(const std::string& nid, const std::string& sid, const std::string& group = "") override;
ServerError Put(const std::vector<std::string>& nid, const std::vector<std::string>& sid) override; ServerError Put(const std::vector<std::string>& nid, const std::vector<std::string>& sid, const std::string& group = "") override;
ServerError Get(const std::string& nid, std::string& sid) const override; ServerError Get(const std::string& nid, std::string& sid, const std::string& group = "") const override;
ServerError Get(const std::vector<std::string>& nid, std::vector<std::string>& sid) const override; ServerError Get(const std::vector<std::string>& nid, std::vector<std::string>& sid, const std::string& group = "") const override;
ServerError Delete(const std::string& nid) override; ServerError Delete(const std::string& nid, const std::string& group = "") override;
private: private:
rocksdb::DB* db_; rocksdb::DB* db_;
......
...@@ -44,9 +44,13 @@ TEST(IdMapperTest, IDMAPPER_TEST) { ...@@ -44,9 +44,13 @@ TEST(IdMapperTest, IDMAPPER_TEST) {
nid.clear(); nid.clear();
sid.clear(); sid.clear();
const int64_t count = 1000000; const int64_t count = 1000000;
for(int64_t i = 0; i < count; i++) { {
nid.push_back(std::to_string(i+100000)); server::TimeRecorder rc("prepare id data");
sid.push_back("val_" + std::to_string(i)); for (int64_t i = 0; i < count; i++) {
nid.push_back(std::to_string(i + 100000));
sid.push_back("val_" + std::to_string(i));
}
rc.Record("done!");
} }
{ {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册