提交 a95b29aa 编写于 作者: Y yu yunfeng

add metrics without prometheus


Former-commit-id: e158ea586681abb5ec44bdc9025ec27e3aa21567
上级 7839bada
......@@ -38,7 +38,7 @@ DataObjPtr CacheMgr::GetItem(const std::string& key) {
if(cache_ == nullptr) {
return nullptr;
}
METRICS_INSTANCE.CacheAccessTotalIncrement();
server::Metrics::GetInstance().CacheAccessTotalIncrement();
return cache_->get(key);
}
......@@ -57,7 +57,7 @@ void CacheMgr::InsertItem(const std::string& key, const DataObjPtr& data) {
}
cache_->insert(key, data);
METRICS_INSTANCE.CacheAccessTotalIncrement();
server::Metrics::GetInstance().CacheAccessTotalIncrement();
}
void CacheMgr::InsertItem(const std::string& key, const engine::Index_ptr& index) {
......@@ -67,7 +67,7 @@ void CacheMgr::InsertItem(const std::string& key, const engine::Index_ptr& index
DataObjPtr obj = std::make_shared<DataObj>(index);
cache_->insert(key, obj);
METRICS_INSTANCE.CacheAccessTotalIncrement();
server::Metrics::GetInstance().CacheAccessTotalIncrement();
}
void CacheMgr::EraseItem(const std::string& key) {
......@@ -76,7 +76,7 @@ void CacheMgr::EraseItem(const std::string& key) {
}
cache_->erase(key);
METRICS_INSTANCE.CacheAccessTotalIncrement();
server::Metrics::GetInstance().CacheAccessTotalIncrement();
}
void CacheMgr::PrintInfo() {
......
......@@ -112,16 +112,16 @@ Status DBImpl<EngineT>::add_vectors(const std::string& group_id_,
double total_time = METRICS_MICROSECONDS(start_time,end_time);
double avg_time = total_time / n;
for (int i = 0; i < n; ++i) {
METRICS_INSTANCE.AddVectorsDurationHistogramOberve(avg_time);
server::Metrics::GetInstance().AddVectorsDurationHistogramOberve(avg_time);
}
// server::Metrics::GetInstance().add_vector_duration_seconds_quantiles().Observe((average_time));
if (!status.ok()) {
METRICS_INSTANCE.AddVectorsFailTotalIncrement(n);
server::Metrics::GetInstance().AddVectorsFailTotalIncrement(n);
return status;
}
METRICS_INSTANCE.AddVectorsSuccessTotalIncrement(n);
server::Metrics::GetInstance().AddVectorsSuccessTotalIncrement(n);
}
template<typename EngineT>
......@@ -202,22 +202,22 @@ Status DBImpl<EngineT>::search(const std::string& group_id, size_t k, size_t nq,
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
if(file.file_type == meta::GroupFileSchema::RAW) {
METRICS_INSTANCE.SearchRawDataDurationSecondsHistogramObserve(total_time);
METRICS_INSTANCE.RawFileSizeHistogramObserve(file_size*1024*1024);
METRICS_INSTANCE.RawFileSizeTotalIncrement(file_size*1024*1024);
METRICS_INSTANCE.RawFileSizeGaugeSet(file_size*1024*1024);
server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
server::Metrics::GetInstance().RawFileSizeHistogramObserve(file_size*1024*1024);
server::Metrics::GetInstance().RawFileSizeTotalIncrement(file_size*1024*1024);
server::Metrics::GetInstance().RawFileSizeGaugeSet(file_size*1024*1024);
} else if(file.file_type == meta::GroupFileSchema::TO_INDEX) {
METRICS_INSTANCE.SearchRawDataDurationSecondsHistogramObserve(total_time);
METRICS_INSTANCE.RawFileSizeHistogramObserve(file_size*1024*1024);
METRICS_INSTANCE.RawFileSizeTotalIncrement(file_size*1024*1024);
METRICS_INSTANCE.RawFileSizeGaugeSet(file_size*1024*1024);
server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
server::Metrics::GetInstance().RawFileSizeHistogramObserve(file_size*1024*1024);
server::Metrics::GetInstance().RawFileSizeTotalIncrement(file_size*1024*1024);
server::Metrics::GetInstance().RawFileSizeGaugeSet(file_size*1024*1024);
} else {
METRICS_INSTANCE.SearchIndexDataDurationSecondsHistogramObserve(total_time);
METRICS_INSTANCE.IndexFileSizeHistogramObserve(file_size*1024*1024);
METRICS_INSTANCE.IndexFileSizeTotalIncrement(file_size*1024*1024);
METRICS_INSTANCE.IndexFileSizeGaugeSet(file_size*1024*1024);
server::Metrics::GetInstance().SearchIndexDataDurationSecondsHistogramObserve(total_time);
server::Metrics::GetInstance().IndexFileSizeHistogramObserve(file_size*1024*1024);
server::Metrics::GetInstance().IndexFileSizeTotalIncrement(file_size*1024*1024);
server::Metrics::GetInstance().IndexFileSizeGaugeSet(file_size*1024*1024);
}
cluster(output_ids, output_distence, inner_k); // cluster to each query
memset(output_distence, 0, k * nq * sizeof(float));
......@@ -357,7 +357,7 @@ Status DBImpl<EngineT>::merge_files(const std::string& group_id, const meta::Dat
auto file_schema = file;
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time,end_time);
METRICS_INSTANCE.MemTableMergeDurationSecondsHistogramObserve(total_time);
server::Metrics::GetInstance().MemTableMergeDurationSecondsHistogramObserve(total_time);
file_schema.file_type = meta::GroupFileSchema::TO_DELETE;
updated.push_back(file_schema);
......@@ -435,7 +435,7 @@ Status DBImpl<EngineT>::build_index(const meta::GroupFileSchema& file) {
auto index = to_index.BuildIndex(group_file.location);
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
METRICS_INSTANCE.BuildIndexDurationSecondsHistogramObserve(total_time);
server::Metrics::GetInstance().BuildIndexDurationSecondsHistogramObserve(total_time);
group_file.file_type = meta::GroupFileSchema::INDEX;
group_file.size = index->Size();
......
......@@ -150,14 +150,14 @@ Status DBMetaImpl::delete_group_partitions(const std::string& group_id,
}
Status DBMetaImpl::add_group(GroupSchema& group_info) {
METRICS_INSTANCE.MetaAccessTotalIncrement();
server::Metrics::GetInstance().MetaAccessTotalIncrement();
if (group_info.group_id == "") {
NextGroupId(group_info.group_id);
}
group_info.files_cnt = 0;
group_info.id = -1;
group_info.created_on = utils::GetMicroSecTimeStamp();
auto start_time = MERTICS_NOW_TIME;
auto start_time = METRICS_NOW_TIME;
{
try {
auto id = ConnectorPtr->insert(group_info);
......@@ -168,7 +168,7 @@ Status DBMetaImpl::add_group(GroupSchema& group_info) {
}
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time,end_time);
METRICS_INSTANCE.MetaAccessDurationSecondsHistogramObserve(total_time);
server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
auto group_path = GetGroupPath(group_info.group_id);
......@@ -189,7 +189,7 @@ Status DBMetaImpl::get_group(GroupSchema& group_info) {
Status DBMetaImpl::get_group_no_lock(GroupSchema& group_info) {
try {
METRICS_INSTANCE.MetaAccessTotalIncrement();
server::Metrics::GetInstance().MetaAccessTotalIncrement();
auto start_time = METRICS_NOW_TIME;
auto groups = ConnectorPtr->select(columns(&GroupSchema::id,
&GroupSchema::group_id,
......@@ -198,7 +198,7 @@ Status DBMetaImpl::get_group_no_lock(GroupSchema& group_info) {
where(c(&GroupSchema::group_id) == group_info.group_id));
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time,end_time);
METRICS_INSTANCE.MetaAccessDurationSecondsHistogramObserve(total_time);
server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
assert(groups.size() <= 1);
if (groups.size() == 1) {
group_info.id = std::get<0>(groups[0]);
......@@ -217,13 +217,13 @@ Status DBMetaImpl::get_group_no_lock(GroupSchema& group_info) {
Status DBMetaImpl::has_group(const std::string& group_id, bool& has_or_not) {
try {
METRICS_INSTANCE.MetaAccessTotalIncrement();
server::Metrics::GetInstance().MetaAccessTotalIncrement();
auto start_time = METRICS_NOW_TIME;
auto groups = ConnectorPtr->select(columns(&GroupSchema::id),
where(c(&GroupSchema::group_id) == group_id));
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time,end_time);
METRICS_INSTANCE.MetaAccessDurationSecondsHistogramObserve(total_time);
server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
assert(groups.size() <= 1);
if (groups.size() == 1) {
has_or_not = true;
......@@ -258,12 +258,12 @@ Status DBMetaImpl::add_group_file(GroupFileSchema& group_file) {
{
try {
METRICS_INSTANCE.MetaAccessTotalIncrement();
server::Metrics::GetInstance().MetaAccessTotalIncrement();
auto start_time = METRICS_NOW_TIME;
auto id = ConnectorPtr->insert(group_file);
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time,end_time);
METRICS_INSTANCE.MetaAccessDurationSecondsHistogramObserve(total_time);
server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
group_file.id = id;
} catch (...) {
return Status::DBTransactionError("Add file Error");
......@@ -287,7 +287,7 @@ Status DBMetaImpl::files_to_index(GroupFilesSchema& files) {
files.clear();
try {
METRICS_INSTANCE.MetaAccessTotalIncrement();
server::Metrics::GetInstance().MetaAccessTotalIncrement();
auto start_time =METRICS_NOW_TIME;
auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id,
&GroupFileSchema::group_id,
......@@ -298,7 +298,7 @@ Status DBMetaImpl::files_to_index(GroupFilesSchema& files) {
where(c(&GroupFileSchema::file_type) == (int)GroupFileSchema::TO_INDEX));
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time,end_time);
METRICS_INSTANCE.MetaAccessDurationSecondsHistogramObserve(total_time);
server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
std::map<std::string, GroupSchema> groups;
......@@ -340,7 +340,7 @@ Status DBMetaImpl::files_to_search(const std::string &group_id,
const DatesT& dates = (partition.empty() == true) ? today : partition;
try {
METRICS_INSTANCE.MetaAccessTotalIncrement();
server::Metrics::GetInstance().MetaAccessTotalIncrement();
auto start_time = METRICS_NOW_TIME;
auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id,
&GroupFileSchema::group_id,
......@@ -355,7 +355,7 @@ Status DBMetaImpl::files_to_search(const std::string &group_id,
c(&GroupFileSchema::file_type) == (int) GroupFileSchema::INDEX)));
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time,end_time);
METRICS_INSTANCE.MetaAccessDurationSecondsHistogramObserve(total_time);
server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
GroupSchema group_info;
group_info.group_id = group_id;
auto status = get_group_no_lock(group_info);
......@@ -392,7 +392,7 @@ Status DBMetaImpl::files_to_merge(const std::string& group_id,
files.clear();
try {
METRICS_INSTANCE.MetaAccessTotalIncrement();
server::Metrics::GetInstance().MetaAccessTotalIncrement();
auto start_time = METRICS_NOW_TIME;
auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id,
&GroupFileSchema::group_id,
......@@ -404,7 +404,7 @@ Status DBMetaImpl::files_to_merge(const std::string& group_id,
c(&GroupFileSchema::group_id) == group_id));
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time,end_time);
METRICS_INSTANCE.MetaAccessDurationSecondsHistogramObserve(total_time);
server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
GroupSchema group_info;
group_info.group_id = group_id;
auto status = get_group_no_lock(group_info);
......@@ -592,12 +592,12 @@ Status DBMetaImpl::discard_files_of_size(long to_discard_size) {
Status DBMetaImpl::update_group_file(GroupFileSchema& group_file) {
group_file.updated_time = utils::GetMicroSecTimeStamp();
try {
METRICS_INSTANCE.MetaAccessTotalIncrement();
server::Metrics::GetInstance().MetaAccessTotalIncrement();
auto start_time = METRICS_NOW_TIME;
ConnectorPtr->update(group_file);
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time,end_time);
METRICS_INSTANCE.MetaAccessDurationSecondsHistogramObserve(total_time);
server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
} catch (std::exception & e) {
LOG(DEBUG) << e.what();
LOG(DEBUG) << "id= " << group_file.id << " file_id=" << group_file.file_id;
......@@ -608,7 +608,7 @@ Status DBMetaImpl::update_group_file(GroupFileSchema& group_file) {
Status DBMetaImpl::update_files(GroupFilesSchema& files) {
try {
METRICS_INSTANCE.MetaAccessTotalIncrement();
server::Metrics::GetInstance().MetaAccessTotalIncrement();
auto start_time = METRICS_NOW_TIME;
auto commited = ConnectorPtr->transaction([&] () mutable {
for (auto& file : files) {
......@@ -617,7 +617,7 @@ Status DBMetaImpl::update_files(GroupFilesSchema& files) {
}
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time,end_time);
METRICS_INSTANCE.MetaAccessDurationSecondsHistogramObserve(total_time);
server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
return true;
});
if (!commited) {
......@@ -706,7 +706,7 @@ Status DBMetaImpl::cleanup() {
Status DBMetaImpl::count(const std::string& group_id, long& result) {
try {
METRICS_INSTANCE.MetaAccessTotalIncrement();
server::Metrics::GetInstance().MetaAccessTotalIncrement();
auto start_time = METRICS_NOW_TIME;
auto selected = ConnectorPtr->select(columns(&GroupFileSchema::size,
&GroupFileSchema::date),
......@@ -716,7 +716,7 @@ Status DBMetaImpl::count(const std::string& group_id, long& result) {
c(&GroupFileSchema::group_id) == group_id));
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time,end_time);
METRICS_INSTANCE.MetaAccessDurationSecondsHistogramObserve(total_time);
server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
GroupSchema group_info;
group_info.group_id = group_id;
auto status = get_group_no_lock(group_info);
......
......@@ -79,12 +79,12 @@ Status FaissExecutionEngine<IndexTrait>::Load() {
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
METRICS_INSTANCE.FaissDiskLoadDurationSecondsHistogramObserve(total_time);
server::Metrics::GetInstance().FaissDiskLoadDurationSecondsHistogramObserve(total_time);
double total_size = (pIndex_->d) * (pIndex_->ntotal) * 4;
METRICS_INSTANCE.FaissDiskLoadSizeBytesHistogramObserve(total_size);
METRICS_INSTANCE.FaissDiskLoadIOSpeedHistogramObserve(total_size/double(total_time));
server::Metrics::GetInstance().FaissDiskLoadSizeBytesHistogramObserve(total_size);
server::Metrics::GetInstance().FaissDiskLoadIOSpeedHistogramObserve(total_size/double(total_time));
}
return Status::OK();
......
......@@ -10,22 +10,27 @@ namespace zilliz {
namespace vecwise {
namespace server {
ServerError
PrometheusMetrics::Init() {
ConfigNode& configNode = ServerConfig::GetInstance().GetConfig(CONFIG_METRIC);
startup_ = configNode.GetValue(CONFIG_METRIC_IS_STARTUP) == "true" ? true:false;
// Following should be read from config file.
const std::string bind_address = "8080";
const std::string uri = std::string("/metrics");
const std::size_t num_threads = 2;
// Init Exposer
exposer_ptr_ = std::make_shared<prometheus::Exposer>(bind_address, uri, num_threads);
// Exposer Registry
exposer_ptr_->RegisterCollectable(registry_);
MetricsBase &
Metrics::CreateMetricsCollector(MetricCollectorType collector_type) {
switch (collector_type) {
case MetricCollectorType::PROMETHEUS:
// static PrometheusMetrics instance = PrometheusMetrics::GetInstance();
return MetricsBase::GetInstance();
default:return MetricsBase::GetInstance();
}
}
return SERVER_SUCCESS;
MetricsBase &
Metrics::GetInstance() {
ConfigNode &config = ServerConfig::GetInstance().GetConfig(CONFIG_METRIC);
std::string collector_typr_str = config.GetValue(CONFIG_METRIC_COLLECTOR);
if (collector_typr_str == "prometheus") {
return CreateMetricsCollector(MetricCollectorType::PROMETHEUS);
} else if (collector_typr_str == "zabbix") {
return CreateMetricsCollector(MetricCollectorType::ZABBIX);
} else {
return CreateMetricsCollector(MetricCollectorType::INVALID);
}
}
}
......
此差异已折叠。
......@@ -17,7 +17,7 @@
#define METRICS_NOW_TIME std::chrono::system_clock::now()
#define METRICS_INSTANCE server::GetInstance()
#define server::Metrics::GetInstance() server::GetInstance()
#define METRICS_MICROSECONDS(a,b) (std::chrono::duration_cast<std::chrono::microseconds> (b-a)).count();
......
......@@ -18,8 +18,10 @@ namespace zilliz {
namespace vecwise {
namespace server {
static const std::string ROCKSDB_DEFAULT_GROUP = "default";
RocksIdMapper::RocksIdMapper()
: db_(nullptr) {
: db_(nullptr) {
OpenDb();
}
......@@ -28,6 +30,8 @@ RocksIdMapper::~RocksIdMapper() {
}
void RocksIdMapper::OpenDb() {
std::lock_guard<std::mutex> lck(db_mutex_);
if(db_) {
return;
}
......@@ -79,6 +83,8 @@ void RocksIdMapper::OpenDb() {
}
void RocksIdMapper::CloseDb() {
std::lock_guard<std::mutex> lck(db_mutex_);
for(auto& iter : column_handles_) {
delete iter.second;
}
......@@ -90,7 +96,117 @@ void RocksIdMapper::CloseDb() {
}
}
ServerError RocksIdMapper::AddGroup(const std::string& group) {
std::lock_guard<std::mutex> lck(db_mutex_);
return AddGroupInternal(group);
}
bool RocksIdMapper::IsGroupExist(const std::string& group) const {
std::lock_guard<std::mutex> lck(db_mutex_);
return IsGroupExistInternal(group);
}
ServerError RocksIdMapper::Put(const std::string& nid, const std::string& sid, const std::string& group) {
std::lock_guard<std::mutex> lck(db_mutex_);
return PutInternal(nid, sid, group);
}
ServerError RocksIdMapper::Put(const std::vector<std::string>& nid, const std::vector<std::string>& sid, const std::string& group) {
if(nid.size() != sid.size()) {
return SERVER_INVALID_ARGUMENT;
}
std::lock_guard<std::mutex> lck(db_mutex_);
ServerError err = SERVER_SUCCESS;
for(size_t i = 0; i < nid.size(); i++) {
err = PutInternal(nid[i], sid[i], group);
if(err != SERVER_SUCCESS) {
return err;
}
}
return err;
}
ServerError RocksIdMapper::Get(const std::string& nid, std::string& sid, const std::string& group) const {
std::lock_guard<std::mutex> lck(db_mutex_);
return GetInternal(nid, sid, group);
}
ServerError RocksIdMapper::Get(const std::vector<std::string>& nid, std::vector<std::string>& sid, const std::string& group) const {
sid.clear();
std::lock_guard<std::mutex> lck(db_mutex_);
ServerError err = SERVER_SUCCESS;
for(size_t i = 0; i < nid.size(); i++) {
std::string str_id;
ServerError temp_err = GetInternal(nid[i], str_id, group);
if(temp_err != SERVER_SUCCESS) {
sid.push_back("");
SERVER_LOG_ERROR << "ID mapper failed to get id: " << nid[i];
err = temp_err;
continue;
}
sid.push_back(str_id);
}
return err;
}
ServerError RocksIdMapper::Delete(const std::string& nid, const std::string& group) {
std::lock_guard<std::mutex> lck(db_mutex_);
return DeleteInternal(nid, group);
}
ServerError RocksIdMapper::DeleteGroup(const std::string& group) {
std::lock_guard<std::mutex> lck(db_mutex_);
return DeleteGroupInternal(group);
}
//internal methods(whitout lock)
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
ServerError RocksIdMapper::AddGroupInternal(const std::string& group) {
if(!IsGroupExistInternal(group)) {
if(db_ == nullptr) {
return SERVER_NULL_POINTER;
}
try {//add group
rocksdb::ColumnFamilyHandle *cfh = nullptr;
rocksdb::Status s = db_->CreateColumnFamily(rocksdb::ColumnFamilyOptions(), group, &cfh);
if (!s.ok()) {
SERVER_LOG_ERROR << "ID mapper failed to create group:" << s.ToString();
return SERVER_UNEXPECTED_ERROR;
} else {
column_handles_.insert(std::make_pair(group, cfh));
}
} catch(std::exception& ex) {
SERVER_LOG_ERROR << "ID mapper failed to create group: " << ex.what();
return SERVER_UNEXPECTED_ERROR;
}
}
return SERVER_SUCCESS;
}
bool RocksIdMapper::IsGroupExistInternal(const std::string& group) const {
std::string group_name = group;
if(group_name.empty()){
group_name = ROCKSDB_DEFAULT_GROUP;
}
return (column_handles_.count(group_name) > 0 && column_handles_[group_name] != nullptr);
}
ServerError RocksIdMapper::PutInternal(const std::string& nid, const std::string& sid, const std::string& group) {
if(db_ == nullptr) {
return SERVER_NULL_POINTER;
}
......@@ -104,22 +220,12 @@ ServerError RocksIdMapper::Put(const std::string& nid, const std::string& sid, c
return SERVER_UNEXPECTED_ERROR;
}
} else {
rocksdb::ColumnFamilyHandle *cfh = nullptr;
if(column_handles_.count(group) == 0) {
try {//add group
rocksdb::Status s = db_->CreateColumnFamily(rocksdb::ColumnFamilyOptions(), group, &cfh);
if (!s.ok()) {
SERVER_LOG_ERROR << "ID mapper failed to create group:" << s.ToString();
} else {
column_handles_.insert(std::make_pair(group, cfh));
}
} catch(std::exception& ex) {
std::cout << ex.what() << std::endl;
}
} else {
cfh = column_handles_[group];
//try create group
if(AddGroupInternal(group) != SERVER_SUCCESS){
return SERVER_UNEXPECTED_ERROR;
}
rocksdb::ColumnFamilyHandle *cfh = column_handles_[group];
rocksdb::Status s = db_->Put(rocksdb::WriteOptions(), cfh, key, value);
if (!s.ok()) {
SERVER_LOG_ERROR << "ID mapper failed to put:" << s.ToString();
......@@ -130,23 +236,7 @@ ServerError RocksIdMapper::Put(const std::string& nid, const std::string& sid, c
return SERVER_SUCCESS;
}
ServerError RocksIdMapper::Put(const std::vector<std::string>& nid, const std::vector<std::string>& sid, const std::string& group) {
if(nid.size() != sid.size()) {
return SERVER_INVALID_ARGUMENT;
}
ServerError err = SERVER_SUCCESS;
for(size_t i = 0; i < nid.size(); i++) {
err = Put(nid[i], sid[i], group);
if(err != SERVER_SUCCESS) {
return err;
}
}
return err;
}
ServerError RocksIdMapper::Get(const std::string& nid, std::string& sid, const std::string& group) const {
ServerError RocksIdMapper::GetInternal(const std::string& nid, std::string& sid, const std::string& group) const {
sid = "";
if(db_ == nullptr) {
return SERVER_NULL_POINTER;
......@@ -173,27 +263,7 @@ ServerError RocksIdMapper::Get(const std::string& nid, std::string& sid, const s
return SERVER_SUCCESS;
}
ServerError RocksIdMapper::Get(const std::vector<std::string>& nid, std::vector<std::string>& sid, const std::string& group) const {
sid.clear();
ServerError err = SERVER_SUCCESS;
for(size_t i = 0; i < nid.size(); i++) {
std::string str_id;
ServerError temp_err = Get(nid[i], str_id, group);
if(temp_err != SERVER_SUCCESS) {
sid.push_back("");
SERVER_LOG_ERROR << "ID mapper failed to get id: " << nid[i];
err = temp_err;
continue;
}
sid.push_back(str_id);
}
return err;
}
ServerError RocksIdMapper::Delete(const std::string& nid, const std::string& group) {
ServerError RocksIdMapper::DeleteInternal(const std::string& nid, const std::string& group) {
if(db_ == nullptr) {
return SERVER_NULL_POINTER;
}
......@@ -218,7 +288,7 @@ ServerError RocksIdMapper::Delete(const std::string& nid, const std::string& gro
return SERVER_SUCCESS;
}
ServerError RocksIdMapper::DeleteGroup(const std::string& group) {
ServerError RocksIdMapper::DeleteGroupInternal(const std::string& group) {
if(db_ == nullptr) {
return SERVER_NULL_POINTER;
}
......@@ -237,6 +307,7 @@ ServerError RocksIdMapper::DeleteGroup(const std::string& group) {
return SERVER_SUCCESS;
}
}
}
}
\ No newline at end of file
......@@ -13,16 +13,20 @@
#include <string>
#include <vector>
#include <unordered_map>
#include <mutex>
namespace zilliz {
namespace vecwise {
namespace server {
class RocksIdMapper : public IVecIdMapper{
public:
public:
RocksIdMapper();
~RocksIdMapper();
ServerError AddGroup(const std::string& group) override;
bool IsGroupExist(const std::string& group) const override;
ServerError Put(const std::string& nid, const std::string& sid, const std::string& group = "") override;
ServerError Put(const std::vector<std::string>& nid, const std::vector<std::string>& sid, const std::string& group = "") override;
......@@ -32,15 +36,29 @@ public:
ServerError Delete(const std::string& nid, const std::string& group = "") override;
ServerError DeleteGroup(const std::string& group) override;
private:
private:
void OpenDb();
void CloseDb();
private:
ServerError AddGroupInternal(const std::string& group);
bool IsGroupExistInternal(const std::string& group) const;
ServerError PutInternal(const std::string& nid, const std::string& sid, const std::string& group);
ServerError GetInternal(const std::string& nid, std::string& sid, const std::string& group) const;
ServerError DeleteInternal(const std::string& nid, const std::string& group);
ServerError DeleteGroupInternal(const std::string& group);
private:
rocksdb::DB* db_;
std::unordered_map<std::string, rocksdb::ColumnFamilyHandle*> column_handles_;
mutable std::unordered_map<std::string, rocksdb::ColumnFamilyHandle*> column_handles_;
mutable std::mutex db_mutex_;
};
}
}
}
......@@ -39,6 +39,20 @@ SimpleIdMapper::~SimpleIdMapper() {
}
ServerError
SimpleIdMapper::AddGroup(const std::string& group) {
if(id_groups_.count(group) == 0) {
id_groups_.insert(std::make_pair(group, ID_MAPPING()));
}
}
//not thread-safe
bool
SimpleIdMapper::IsGroupExist(const std::string& group) const {
return id_groups_.count(group) > 0;
}
//not thread-safe
ServerError SimpleIdMapper::Put(const std::string& nid, const std::string& sid, const std::string& group) {
ID_MAPPING& mapping = id_groups_[group];
......
......@@ -25,6 +25,9 @@ public:
virtual ~IVecIdMapper(){}
virtual ServerError AddGroup(const std::string& group) = 0;
virtual bool IsGroupExist(const std::string& group) const = 0;
virtual ServerError Put(const std::string& nid, const std::string& sid, const std::string& group = "") = 0;
virtual ServerError Put(const std::vector<std::string>& nid, const std::vector<std::string>& sid, const std::string& group = "") = 0;
......@@ -41,6 +44,9 @@ public:
SimpleIdMapper();
~SimpleIdMapper();
ServerError AddGroup(const std::string& group) override;
bool IsGroupExist(const std::string& group) const override;
ServerError Put(const std::string& nid, const std::string& sid, const std::string& group = "") override;
ServerError Put(const std::vector<std::string>& nid, const std::vector<std::string>& sid, const std::string& group = "") override;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册