diff --git a/cpp/src/server/RocksIdMapper.cpp b/cpp/src/server/RocksIdMapper.cpp index 1f24f6c9da254d009cf521d6a7b9c628fd779416..386058f00e8ca24d72b0d02c942379b241be5ce2 100644 --- a/cpp/src/server/RocksIdMapper.cpp +++ b/cpp/src/server/RocksIdMapper.cpp @@ -30,6 +30,8 @@ RocksIdMapper::~RocksIdMapper() { } void RocksIdMapper::OpenDb() { + std::lock_guard lck(db_mutex_); + if(db_) { return; } @@ -81,6 +83,8 @@ void RocksIdMapper::OpenDb() { } void RocksIdMapper::CloseDb() { + std::lock_guard lck(db_mutex_); + for(auto& iter : column_handles_) { delete iter.second; } @@ -92,9 +96,86 @@ void RocksIdMapper::CloseDb() { } } -//not thread-safe ServerError RocksIdMapper::AddGroup(const std::string& group) { - if(!IsGroupExist(group)) { + std::lock_guard lck(db_mutex_); + + return AddGroupInternal(group); +} + +bool RocksIdMapper::IsGroupExist(const std::string& group) const { + std::lock_guard lck(db_mutex_); + + return IsGroupExistInternal(group); +} + + +ServerError RocksIdMapper::Put(const std::string& nid, const std::string& sid, const std::string& group) { + std::lock_guard lck(db_mutex_); + + return PutInternal(nid, sid, group); +} + +ServerError RocksIdMapper::Put(const std::vector& nid, const std::vector& sid, const std::string& group) { + if(nid.size() != sid.size()) { + return SERVER_INVALID_ARGUMENT; + } + + std::lock_guard lck(db_mutex_); + ServerError err = SERVER_SUCCESS; + for(size_t i = 0; i < nid.size(); i++) { + err = PutInternal(nid[i], sid[i], group); + if(err != SERVER_SUCCESS) { + return err; + } + } + + return err; +} + +ServerError RocksIdMapper::Get(const std::string& nid, std::string& sid, const std::string& group) const { + std::lock_guard lck(db_mutex_); + + return GetInternal(nid, sid, group); +} + +ServerError RocksIdMapper::Get(const std::vector& nid, std::vector& sid, const std::string& group) const { + sid.clear(); + + std::lock_guard lck(db_mutex_); + + ServerError err = SERVER_SUCCESS; + for(size_t i = 0; i < nid.size(); i++) { + std::string str_id; + ServerError temp_err = GetInternal(nid[i], str_id, group); + if(temp_err != SERVER_SUCCESS) { + sid.push_back(""); + SERVER_LOG_ERROR << "ID mapper failed to get id: " << nid[i]; + err = temp_err; + continue; + } + + sid.push_back(str_id); + } + + return err; +} + +ServerError RocksIdMapper::Delete(const std::string& nid, const std::string& group) { + std::lock_guard lck(db_mutex_); + + return DeleteInternal(nid, group); +} + +ServerError RocksIdMapper::DeleteGroup(const std::string& group) { + std::lock_guard lck(db_mutex_); + + return DeleteGroupInternal(group); +} + +//internal methods(whitout lock) +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +ServerError RocksIdMapper::AddGroupInternal(const std::string& group) { + if(!IsGroupExistInternal(group)) { if(db_ == nullptr) { return SERVER_NULL_POINTER; } @@ -117,8 +198,7 @@ ServerError RocksIdMapper::AddGroup(const std::string& group) { return SERVER_SUCCESS; } -//not thread-safe -bool RocksIdMapper::IsGroupExist(const std::string& group) const { +bool RocksIdMapper::IsGroupExistInternal(const std::string& group) const { std::string group_name = group; if(group_name.empty()){ group_name = ROCKSDB_DEFAULT_GROUP; @@ -126,7 +206,7 @@ bool RocksIdMapper::IsGroupExist(const std::string& group) const { return (column_handles_.count(group_name) > 0 && column_handles_[group_name] != nullptr); } -ServerError RocksIdMapper::Put(const std::string& nid, const std::string& sid, const std::string& group) { +ServerError RocksIdMapper::PutInternal(const std::string& nid, const std::string& sid, const std::string& group) { if(db_ == nullptr) { return SERVER_NULL_POINTER; } @@ -141,7 +221,7 @@ ServerError RocksIdMapper::Put(const std::string& nid, const std::string& sid, c } } else { //try create group - if(AddGroup(group) != SERVER_SUCCESS){ + if(AddGroupInternal(group) != SERVER_SUCCESS){ return SERVER_UNEXPECTED_ERROR; } @@ -156,23 +236,7 @@ ServerError RocksIdMapper::Put(const std::string& nid, const std::string& sid, c return SERVER_SUCCESS; } -ServerError RocksIdMapper::Put(const std::vector& nid, const std::vector& sid, const std::string& group) { - if(nid.size() != sid.size()) { - return SERVER_INVALID_ARGUMENT; - } - - ServerError err = SERVER_SUCCESS; - for(size_t i = 0; i < nid.size(); i++) { - err = Put(nid[i], sid[i], group); - if(err != SERVER_SUCCESS) { - return err; - } - } - - return err; -} - -ServerError RocksIdMapper::Get(const std::string& nid, std::string& sid, const std::string& group) const { +ServerError RocksIdMapper::GetInternal(const std::string& nid, std::string& sid, const std::string& group) const { sid = ""; if(db_ == nullptr) { return SERVER_NULL_POINTER; @@ -199,28 +263,8 @@ ServerError RocksIdMapper::Get(const std::string& nid, std::string& sid, const s return SERVER_SUCCESS; } -ServerError RocksIdMapper::Get(const std::vector& nid, std::vector& sid, const std::string& group) const { - sid.clear(); - - ServerError err = SERVER_SUCCESS; - for(size_t i = 0; i < nid.size(); i++) { - std::string str_id; - ServerError temp_err = Get(nid[i], str_id, group); - if(temp_err != SERVER_SUCCESS) { - sid.push_back(""); - SERVER_LOG_ERROR << "ID mapper failed to get id: " << nid[i]; - err = temp_err; - continue; - } - - sid.push_back(str_id); - } - - return err; -} - -ServerError RocksIdMapper::Delete(const std::string& nid, const std::string& group) { - if(db_ == nullptr) { +ServerError RocksIdMapper::DeleteInternal(const std::string& nid, const std::string& group) { + if(db_ == nullptr) { return SERVER_NULL_POINTER; } @@ -244,7 +288,7 @@ ServerError RocksIdMapper::Delete(const std::string& nid, const std::string& gro return SERVER_SUCCESS; } -ServerError RocksIdMapper::DeleteGroup(const std::string& group) { +ServerError RocksIdMapper::DeleteGroupInternal(const std::string& group) { if(db_ == nullptr) { return SERVER_NULL_POINTER; } diff --git a/cpp/src/server/RocksIdMapper.h b/cpp/src/server/RocksIdMapper.h index 70b77eee04b1986726af2277982fc64f5c97a5df..5fc4667e75875866dd06d8bfad3d823628d2f9ed 100644 --- a/cpp/src/server/RocksIdMapper.h +++ b/cpp/src/server/RocksIdMapper.h @@ -13,6 +13,7 @@ #include #include #include +#include namespace zilliz { namespace vecwise { @@ -39,9 +40,22 @@ private: void OpenDb(); void CloseDb(); + ServerError AddGroupInternal(const std::string& group); + + bool IsGroupExistInternal(const std::string& group) const; + + ServerError PutInternal(const std::string& nid, const std::string& sid, const std::string& group); + + ServerError GetInternal(const std::string& nid, std::string& sid, const std::string& group) const; + + ServerError DeleteInternal(const std::string& nid, const std::string& group); + + ServerError DeleteGroupInternal(const std::string& group); + private: rocksdb::DB* db_; mutable std::unordered_map column_handles_; + mutable std::mutex db_mutex_; }; }