提交 4b9fbe96 编写于 作者: J jinhai

Merge branch 'branch-0.5.0' into 'branch-0.5.0'

refine code

See merge request megasearch/milvus!602

Former-commit-id: e42d6d32ace2c6887837a6831adfbff06052b967
......@@ -34,78 +34,78 @@ public:
typedef typename std::list<key_value_pair_t>::iterator list_iterator_t;
typedef typename std::list<key_value_pair_t>::reverse_iterator reverse_list_iterator_t;
LRU(size_t max_size) : _max_size(max_size) {}
LRU(size_t max_size) : max_size_(max_size) {}
void put(const key_t& key, const value_t& value) {
auto it = _cache_items_map.find(key);
_cache_items_list.push_front(key_value_pair_t(key, value));
if (it != _cache_items_map.end()) {
_cache_items_list.erase(it->second);
_cache_items_map.erase(it);
auto it = cache_items_map_.find(key);
cache_items_list_.push_front(key_value_pair_t(key, value));
if (it != cache_items_map_.end()) {
cache_items_list_.erase(it->second);
cache_items_map_.erase(it);
}
_cache_items_map[key] = _cache_items_list.begin();
cache_items_map_[key] = cache_items_list_.begin();
if (_cache_items_map.size() > _max_size) {
auto last = _cache_items_list.end();
if (cache_items_map_.size() > max_size_) {
auto last = cache_items_list_.end();
last--;
_cache_items_map.erase(last->first);
_cache_items_list.pop_back();
cache_items_map_.erase(last->first);
cache_items_list_.pop_back();
}
}
const value_t& get(const key_t& key) {
auto it = _cache_items_map.find(key);
if (it == _cache_items_map.end()) {
auto it = cache_items_map_.find(key);
if (it == cache_items_map_.end()) {
throw std::range_error("There is no such key in cache");
} else {
_cache_items_list.splice(_cache_items_list.begin(), _cache_items_list, it->second);
cache_items_list_.splice(cache_items_list_.begin(), cache_items_list_, it->second);
return it->second->second;
}
}
void erase(const key_t& key) {
auto it = _cache_items_map.find(key);
if (it != _cache_items_map.end()) {
_cache_items_list.erase(it->second);
_cache_items_map.erase(it);
auto it = cache_items_map_.find(key);
if (it != cache_items_map_.end()) {
cache_items_list_.erase(it->second);
cache_items_map_.erase(it);
}
}
bool exists(const key_t& key) const {
return _cache_items_map.find(key) != _cache_items_map.end();
return cache_items_map_.find(key) != cache_items_map_.end();
}
size_t size() const {
return _cache_items_map.size();
return cache_items_map_.size();
}
list_iterator_t begin() {
_iter = _cache_items_list.begin();
return _iter;
iter_ = cache_items_list_.begin();
return iter_;
}
list_iterator_t end() {
return _cache_items_list.end();
return cache_items_list_.end();
}
reverse_list_iterator_t rbegin() {
return _cache_items_list.rbegin();
return cache_items_list_.rbegin();
}
reverse_list_iterator_t rend() {
return _cache_items_list.rend();
return cache_items_list_.rend();
}
void clear() {
_cache_items_list.clear();
_cache_items_map.clear();
cache_items_list_.clear();
cache_items_map_.clear();
}
private:
std::list<key_value_pair_t> _cache_items_list;
std::unordered_map<key_t, list_iterator_t> _cache_items_map;
size_t _max_size;
list_iterator_t _iter;
std::list<key_value_pair_t> cache_items_list_;
std::unordered_map<key_t, list_iterator_t> cache_items_map_;
size_t max_size_;
list_iterator_t iter_;
};
} // cache
......
......@@ -36,7 +36,7 @@ namespace engine {
DBOptions DBFactory::BuildOption() {
auto meta = MetaFactory::BuildOption();
DBOptions options;
options.meta = meta;
options.meta_ = meta;
return options;
}
......
......@@ -56,7 +56,7 @@ DBImpl::DBImpl(const DBOptions& options)
shutting_down_(true),
compact_thread_pool_(1, 1),
index_thread_pool_(1, 1) {
meta_ptr_ = MetaFactory::Build(options.meta, options.mode);
meta_ptr_ = MetaFactory::Build(options.meta_, options.mode_);
mem_mgr_ = MemManagerFactory::Build(meta_ptr_, options_);
Start();
}
......@@ -77,7 +77,7 @@ Status DBImpl::Start() {
shutting_down_.store(false, std::memory_order_release);
//for distribute version, some nodes are read only
if (options_.mode != DBOptions::MODE::READ_ONLY) {
if (options_.mode_ != DBOptions::MODE::READ_ONLY) {
ENGINE_LOG_TRACE << "StartTimerTasks";
bg_timer_thread_ = std::thread(&DBImpl::BackgroundTimerTask, this);
}
......@@ -98,7 +98,7 @@ Status DBImpl::Stop() {
//wait compaction/buildindex finish
bg_timer_thread_.join();
if (options_.mode != DBOptions::MODE::READ_ONLY) {
if (options_.mode_ != DBOptions::MODE::READ_ONLY) {
meta_ptr_->CleanUp();
}
......@@ -649,7 +649,7 @@ Status DBImpl::BackgroundMergeFiles(const std::string& table_id) {
bool has_merge = false;
for (auto& kv : raw_files) {
auto files = kv.second;
if (files.size() < options_.merge_trigger_number) {
if (files.size() < options_.merge_trigger_number_) {
ENGINE_LOG_DEBUG << "Files number not greater equal than merge trigger number, skip merge action";
continue;
}
......@@ -684,7 +684,7 @@ void DBImpl::BackgroundCompaction(std::set<std::string> table_ids) {
meta_ptr_->Archive();
int ttl = 5*meta::M_SEC;//default: file will be deleted after 5 minutes
if (options_.mode == DBOptions::MODE::CLUSTER) {
if (options_.mode_ == DBOptions::MODE::CLUSTER) {
ttl = meta::D_SEC;
}
meta_ptr_->CleanUpFilesWithTTL(ttl);
......
......@@ -52,10 +52,10 @@ private:
};
struct DBMetaOptions {
std::string path;
std::vector<std::string> slave_paths;
std::string backend_uri;
ArchiveConf archive_conf = ArchiveConf("delete");
std::string path_;
std::vector<std::string> slave_paths_;
std::string backend_uri_;
ArchiveConf archive_conf_ = ArchiveConf("delete");
}; // DBMetaOptions
struct DBOptions {
......@@ -65,11 +65,11 @@ struct DBOptions {
READ_ONLY
} MODE;
uint16_t merge_trigger_number = 2;
DBMetaOptions meta;
int mode = MODE::SINGLE;
uint16_t merge_trigger_number_ = 2;
DBMetaOptions meta_;
int mode_ = MODE::SINGLE;
size_t insert_buffer_size = 4 * ONE_GB;
size_t insert_buffer_size_ = 4 * ONE_GB;
bool insert_cache_immediately_ = false;
}; // Options
......
......@@ -43,8 +43,8 @@ std::string ConstructParentFolder(const std::string& db_path, const meta::TableF
}
std::string GetTableFileParentFolder(const DBMetaOptions& options, const meta::TableFileSchema& table_file) {
uint64_t path_count = options.slave_paths.size() + 1;
std::string target_path = options.path;
uint64_t path_count = options.slave_paths_.size() + 1;
std::string target_path = options.path_;
uint64_t index = 0;
if(meta::TableFileSchema::NEW_INDEX == table_file.file_type_) {
......@@ -61,7 +61,7 @@ std::string GetTableFileParentFolder(const DBMetaOptions& options, const meta::T
}
if (index > 0) {
target_path = options.slave_paths[index - 1];
target_path = options.slave_paths_[index - 1];
}
return ConstructParentFolder(target_path, table_file);
......@@ -78,7 +78,7 @@ long GetMicroSecTimeStamp() {
}
Status CreateTablePath(const DBMetaOptions& options, const std::string& table_id) {
std::string db_path = options.path;
std::string db_path = options.path_;
std::string table_path = db_path + TABLES_FOLDER + table_id;
auto status = server::CommonUtil::CreateDirectory(table_path);
if (!status.ok()) {
......@@ -86,7 +86,7 @@ Status CreateTablePath(const DBMetaOptions& options, const std::string& table_id
return status;
}
for(auto& path : options.slave_paths) {
for(auto& path : options.slave_paths_) {
table_path = path + TABLES_FOLDER + table_id;
status = server::CommonUtil::CreateDirectory(table_path);
if (!status.ok()) {
......@@ -99,8 +99,8 @@ Status CreateTablePath(const DBMetaOptions& options, const std::string& table_id
}
Status DeleteTablePath(const DBMetaOptions& options, const std::string& table_id, bool force) {
std::vector<std::string> paths = options.slave_paths;
paths.push_back(options.path);
std::vector<std::string> paths = options.slave_paths_;
paths.push_back(options.path_);
for(auto& path : paths) {
std::string table_path = path + TABLES_FOLDER + table_id;
......@@ -132,13 +132,13 @@ Status CreateTableFilePath(const DBMetaOptions& options, meta::TableFileSchema&
}
Status GetTableFilePath(const DBMetaOptions& options, meta::TableFileSchema& table_file) {
std::string parent_path = ConstructParentFolder(options.path, table_file);
std::string parent_path = ConstructParentFolder(options.path_, table_file);
std::string file_path = parent_path + "/" + table_file.file_id_;
if(boost::filesystem::exists(file_path)) {
table_file.location_ = file_path;
return Status::OK();
} else {
for(auto& path : options.slave_paths) {
for(auto& path : options.slave_paths_) {
parent_path = ConstructParentFolder(path, table_file);
file_path = parent_path + "/" + table_file.file_id_;
if(boost::filesystem::exists(file_path)) {
......@@ -149,7 +149,7 @@ Status GetTableFilePath(const DBMetaOptions& options, meta::TableFileSchema& tab
}
std::string msg = "Table file doesn't exist: " + file_path;
ENGINE_LOG_ERROR << msg << " in path: " << options.path
ENGINE_LOG_ERROR << msg << " in path: " << options.path_
<< " for table: " << table_file.table_id_;
return Status(DB_ERROR, msg);
......
......@@ -43,7 +43,7 @@ Status MemManagerImpl::InsertVectors(const std::string &table_id_,
const float *vectors_,
IDNumbers &vector_ids_) {
while (GetCurrentMem() > options_.insert_buffer_size) {
while (GetCurrentMem() > options_.insert_buffer_size_) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
......
......@@ -43,12 +43,12 @@ namespace engine {
}
DBMetaOptions meta;
meta.path = p;
meta.path_ = p;
return meta;
}
meta::MetaPtr MetaFactory::Build(const DBMetaOptions &metaOptions, const int &mode) {
std::string uri = metaOptions.backend_uri;
std::string uri = metaOptions.backend_uri_;
utils::MetaUriInfo uri_info;
auto status = utils::ParseMetaUri(uri, uri_info);
......
......@@ -194,7 +194,7 @@ void MySQLMetaImpl::ValidateMetaSchema() {
return;
}
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
if (connectionPtr == nullptr) {
return;
}
......@@ -239,16 +239,16 @@ void MySQLMetaImpl::ValidateMetaSchema() {
Status MySQLMetaImpl::Initialize() {
//step 1: create db root path
if (!boost::filesystem::is_directory(options_.path)) {
auto ret = boost::filesystem::create_directory(options_.path);
if (!boost::filesystem::is_directory(options_.path_)) {
auto ret = boost::filesystem::create_directory(options_.path_);
if (!ret) {
std::string msg = "Failed to create db directory " + options_.path;
std::string msg = "Failed to create db directory " + options_.path_;
ENGINE_LOG_ERROR << msg;
return Status(DB_META_TRANSACTION_FAILED, msg);
}
}
std::string uri = options_.backend_uri;
std::string uri = options_.backend_uri_;
//step 2: parse and check meta uri
utils::MetaUriInfo uri_info;
......@@ -289,7 +289,7 @@ Status MySQLMetaImpl::Initialize() {
}
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
if (connectionPtr == nullptr) {
return Status(DB_ERROR, "Failed to connect to database server");
......@@ -351,7 +351,7 @@ Status MySQLMetaImpl::DropPartitionsByDates(const std::string &table_id,
dateListStr = dateListStr.substr(0, dateListStr.size() - 2); //remove the last ", "
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
if (connectionPtr == nullptr) {
return Status(DB_ERROR, "Failed to connect to database server");
......@@ -383,7 +383,7 @@ Status MySQLMetaImpl::CreateTable(TableSchema &table_schema) {
try {
server::MetricCollector metric;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
if (connectionPtr == nullptr) {
return Status(DB_ERROR, "Failed to connect to database server");
......@@ -462,7 +462,7 @@ Status MySQLMetaImpl::FilesByType(const std::string &table_id,
StoreQueryResult res;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
if (connectionPtr == nullptr) {
return Status(DB_ERROR, "Failed to connect to database server");
......@@ -542,7 +542,7 @@ Status MySQLMetaImpl::UpdateTableIndex(const std::string &table_id, const TableI
server::MetricCollector metric;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
if (connectionPtr == nullptr) {
return Status(DB_ERROR, "Failed to connect to database server");
......@@ -601,7 +601,7 @@ Status MySQLMetaImpl::UpdateTableFlag(const std::string &table_id, int64_t flag)
server::MetricCollector metric;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
if (connectionPtr == nullptr) {
return Status(DB_ERROR, "Failed to connect to database server");
......@@ -633,7 +633,7 @@ Status MySQLMetaImpl::DescribeTableIndex(const std::string &table_id, TableIndex
server::MetricCollector metric;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
if (connectionPtr == nullptr) {
return Status(DB_ERROR, "Failed to connect to database server");
......@@ -673,7 +673,7 @@ Status MySQLMetaImpl::DropTableIndex(const std::string &table_id) {
server::MetricCollector metric;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
if (connectionPtr == nullptr) {
return Status(DB_ERROR, "Failed to connect to database server");
......@@ -736,7 +736,7 @@ Status MySQLMetaImpl::DeleteTable(const std::string &table_id) {
try {
server::MetricCollector metric;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
if (connectionPtr == nullptr) {
return Status(DB_ERROR, "Failed to connect to database server");
......@@ -773,7 +773,7 @@ Status MySQLMetaImpl::DeleteTableFiles(const std::string &table_id) {
try {
server::MetricCollector metric;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
if (connectionPtr == nullptr) {
return Status(DB_ERROR, "Failed to connect to database server");
......@@ -807,7 +807,7 @@ Status MySQLMetaImpl::DescribeTable(TableSchema &table_schema) {
server::MetricCollector metric;
StoreQueryResult res;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
if (connectionPtr == nullptr) {
return Status(DB_ERROR, "Failed to connect to database server");
......@@ -860,7 +860,7 @@ Status MySQLMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) {
server::MetricCollector metric;
StoreQueryResult res;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
if (connectionPtr == nullptr) {
return Status(DB_ERROR, "Failed to connect to database server");
......@@ -895,7 +895,7 @@ Status MySQLMetaImpl::AllTables(std::vector<TableSchema> &table_schema_array) {
server::MetricCollector metric;
StoreQueryResult res;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
if (connectionPtr == nullptr) {
return Status(DB_ERROR, "Failed to connect to database server");
......@@ -976,7 +976,7 @@ Status MySQLMetaImpl::CreateTableFile(TableFileSchema &file_schema) {
std::string date = std::to_string(file_schema.date_);
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
if (connectionPtr == nullptr) {
return Status(DB_ERROR, "Failed to connect to database server");
......@@ -1015,7 +1015,7 @@ Status MySQLMetaImpl::FilesToIndex(TableFilesSchema &files) {
server::MetricCollector metric;
StoreQueryResult res;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
if (connectionPtr == nullptr) {
return Status(DB_ERROR, "Failed to connect to database server");
......@@ -1099,7 +1099,7 @@ Status MySQLMetaImpl::FilesToSearch(const std::string &table_id,
server::MetricCollector metric;
StoreQueryResult res;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
if (connectionPtr == nullptr) {
return Status(DB_ERROR, "Failed to connect to database server");
......@@ -1218,7 +1218,7 @@ Status MySQLMetaImpl::FilesToMerge(const std::string &table_id,
StoreQueryResult res;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
if (connectionPtr == nullptr) {
return Status(DB_ERROR, "Failed to connect to database server");
......@@ -1309,7 +1309,7 @@ Status MySQLMetaImpl::GetTableFiles(const std::string &table_id,
try {
StoreQueryResult res;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
if (connectionPtr == nullptr) {
return Status(DB_ERROR, "Failed to connect to database server");
......@@ -1378,7 +1378,7 @@ Status MySQLMetaImpl::GetTableFiles(const std::string &table_id,
// PXU TODO: Support Swap
Status MySQLMetaImpl::Archive() {
auto &criterias = options_.archive_conf.GetCriterias();
auto &criterias = options_.archive_conf_.GetCriterias();
if (criterias.empty()) {
return Status::OK();
}
......@@ -1391,7 +1391,7 @@ Status MySQLMetaImpl::Archive() {
long now = utils::GetMicroSecTimeStamp();
try {
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
if (connectionPtr == nullptr) {
return Status(DB_ERROR, "Failed to connect to database server");
......@@ -1432,7 +1432,7 @@ Status MySQLMetaImpl::Size(uint64_t &result) {
try {
StoreQueryResult res;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
if (connectionPtr == nullptr) {
return Status(DB_ERROR, "Failed to connect to database server");
......@@ -1472,7 +1472,7 @@ Status MySQLMetaImpl::DiscardFiles(long long to_discard_size) {
server::MetricCollector metric;
bool status;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
if (connectionPtr == nullptr) {
return Status(DB_ERROR, "Failed to connect to database server");
......@@ -1537,7 +1537,7 @@ Status MySQLMetaImpl::UpdateTableFile(TableFileSchema &file_schema) {
try {
server::MetricCollector metric;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
if (connectionPtr == nullptr) {
return Status(DB_ERROR, "Failed to connect to database server");
......@@ -1605,7 +1605,7 @@ Status MySQLMetaImpl::UpdateTableFile(TableFileSchema &file_schema) {
Status MySQLMetaImpl::UpdateTableFilesToIndex(const std::string &table_id) {
try {
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
if (connectionPtr == nullptr) {
return Status(DB_ERROR, "Failed to connect to database server");
......@@ -1636,7 +1636,7 @@ Status MySQLMetaImpl::UpdateTableFiles(TableFilesSchema &files) {
try {
server::MetricCollector metric;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
if (connectionPtr == nullptr) {
return Status(DB_ERROR, "Failed to connect to database server");
......@@ -1721,7 +1721,7 @@ Status MySQLMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
server::MetricCollector metric;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
if (connectionPtr == nullptr) {
return Status(DB_ERROR, "Failed to connect to database server");
......@@ -1793,7 +1793,7 @@ Status MySQLMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
server::MetricCollector metric;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
if (connectionPtr == nullptr) {
return Status(DB_ERROR, "Failed to connect to database server");
......@@ -1844,7 +1844,7 @@ Status MySQLMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
server::MetricCollector metric;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
if (connectionPtr == nullptr) {
return Status(DB_ERROR, "Failed to connect to database server");
......@@ -1874,7 +1874,7 @@ Status MySQLMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
Status MySQLMetaImpl::CleanUp() {
try {
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
if (connectionPtr == nullptr) {
return Status(DB_ERROR, "Failed to connect to database server");
......@@ -1925,7 +1925,7 @@ Status MySQLMetaImpl::Count(const std::string &table_id, uint64_t &result) {
StoreQueryResult res;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
if (connectionPtr == nullptr) {
return Status(DB_ERROR, "Failed to connect to database server");
......@@ -1961,7 +1961,7 @@ Status MySQLMetaImpl::Count(const std::string &table_id, uint64_t &result) {
Status MySQLMetaImpl::DropAll() {
try {
ENGINE_LOG_DEBUG << "Drop all mysql meta";
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
if (connectionPtr == nullptr) {
return Status(DB_ERROR, "Failed to connect to database server");
......
......@@ -112,7 +112,7 @@ private:
const int mode_;
std::shared_ptr<MySQLConnectionPool> mysql_connection_pool_;
bool safe_grab = false;
bool safe_grab_ = false;
// std::mutex connectionMutex_;
}; // DBMetaImpl
......
......@@ -128,16 +128,16 @@ void SqliteMetaImpl::ValidateMetaSchema() {
}
Status SqliteMetaImpl::Initialize() {
if (!boost::filesystem::is_directory(options_.path)) {
auto ret = boost::filesystem::create_directory(options_.path);
if (!boost::filesystem::is_directory(options_.path_)) {
auto ret = boost::filesystem::create_directory(options_.path_);
if (!ret) {
std::string msg = "Failed to create db directory " + options_.path;
std::string msg = "Failed to create db directory " + options_.path_;
ENGINE_LOG_ERROR << msg;
return Status(DB_INVALID_PATH, msg);
}
}
ConnectorPtr = std::make_unique<ConnectorT>(StoragePrototype(options_.path + "/meta.sqlite"));
ConnectorPtr = std::make_unique<ConnectorT>(StoragePrototype(options_.path_ + "/meta.sqlite"));
ValidateMetaSchema();
......@@ -886,7 +886,7 @@ Status SqliteMetaImpl::GetTableFiles(const std::string& table_id,
// PXU TODO: Support Swap
Status SqliteMetaImpl::Archive() {
auto &criterias = options_.archive_conf.GetCriterias();
auto &criterias = options_.archive_conf_.GetCriterias();
if (criterias.size() == 0) {
return Status::OK();
}
......
......@@ -38,12 +38,12 @@ Status DBWrapper::StartService() {
//db config
engine::DBOptions opt;
ConfigNode& db_config = ServerConfig::GetInstance().GetConfig(CONFIG_DB);
opt.meta.backend_uri = db_config.GetValue(CONFIG_DB_URL);
opt.meta_.backend_uri_ = db_config.GetValue(CONFIG_DB_URL);
std::string db_path = db_config.GetValue(CONFIG_DB_PATH);
opt.meta.path = db_path + "/db";
opt.meta_.path_ = db_path + "/db";
std::string db_slave_path = db_config.GetValue(CONFIG_DB_SLAVE_PATH);
StringHelpFunctions::SplitStringByDelimeter(db_slave_path, ";", opt.meta.slave_paths);
StringHelpFunctions::SplitStringByDelimeter(db_slave_path, ";", opt.meta_.slave_paths_);
// cache config
ConfigNode& cache_config = ServerConfig::GetInstance().GetConfig(CONFIG_CACHE);
......@@ -52,13 +52,13 @@ Status DBWrapper::StartService() {
ConfigNode& serverConfig = ServerConfig::GetInstance().GetConfig(CONFIG_SERVER);
std::string mode = serverConfig.GetValue(CONFIG_CLUSTER_MODE, "single");
if (mode == "single") {
opt.mode = engine::DBOptions::MODE::SINGLE;
opt.mode_ = engine::DBOptions::MODE::SINGLE;
}
else if (mode == "cluster") {
opt.mode = engine::DBOptions::MODE::CLUSTER;
opt.mode_ = engine::DBOptions::MODE::CLUSTER;
}
else if (mode == "read_only") {
opt.mode = engine::DBOptions::MODE::READ_ONLY;
opt.mode_ = engine::DBOptions::MODE::READ_ONLY;
}
else {
std::cerr << "ERROR: mode specified in server_config is not one of ['single', 'cluster', 'read_only']" << std::endl;
......@@ -91,16 +91,16 @@ Status DBWrapper::StartService() {
if(days > 0) {
criterial[engine::ARCHIVE_CONF_DAYS] = days;
}
opt.meta.archive_conf.SetCriterias(criterial);
opt.meta_.archive_conf_.SetCriterias(criterial);
//create db root folder
Status status = CommonUtil::CreateDirectory(opt.meta.path);
Status status = CommonUtil::CreateDirectory(opt.meta_.path_);
if(!status.ok()) {
std::cerr << "ERROR! Failed to create database root path: " << opt.meta.path << std::endl;
std::cerr << "ERROR! Failed to create database root path: " << opt.meta_.path_ << std::endl;
kill(0, SIGUSR1);
}
for(auto& path : opt.meta.slave_paths) {
for(auto& path : opt.meta_.slave_paths_) {
status = CommonUtil::CreateDirectory(path);
if(!status.ok()) {
std::cerr << "ERROR! Failed to create database slave path: " << path << std::endl;
......
......@@ -136,18 +136,18 @@ Server::Daemonize() {
stderr = fopen("/dev/null", "w+");
// Try to write PID of daemon to lockfile
if (!pid_filename_.empty()) {
pid_fd = open(pid_filename_.c_str(), O_RDWR | O_CREAT, 0640);
if (pid_fd < 0) {
pid_fd_ = open(pid_filename_.c_str(), O_RDWR | O_CREAT, 0640);
if (pid_fd_ < 0) {
std::cerr << "Can't open filename: " + pid_filename_ + ", Error: " + strerror(errno);
exit(EXIT_FAILURE);
}
if (lockf(pid_fd, F_TLOCK, 0) < 0) {
if (lockf(pid_fd_, F_TLOCK, 0) < 0) {
std::cerr << "Can't lock filename: " + pid_filename_ + ", Error: " + strerror(errno);
exit(EXIT_FAILURE);
}
std::string pid_file_context = std::to_string(getpid());
ssize_t res = write(pid_fd, pid_file_context.c_str(), pid_file_context.size());
ssize_t res = write(pid_fd_, pid_file_context.c_str(), pid_file_context.size());
if (res != 0) {
return;
}
......@@ -209,13 +209,13 @@ Server::Stop() {
std::cerr << "Milvus server is going to shutdown ..." << std::endl;
/* Unlock and close lockfile */
if (pid_fd != -1) {
int ret = lockf(pid_fd, F_ULOCK, 0);
if (pid_fd_ != -1) {
int ret = lockf(pid_fd_, F_ULOCK, 0);
if (ret != 0) {
std::cerr << "Can't lock file: " << strerror(errno) << std::endl;
exit(0);
}
ret = close(pid_fd);
ret = close(pid_fd_);
if (ret != 0) {
std::cerr << "Can't close file: " << strerror(errno) << std::endl;
exit(0);
......
......@@ -52,7 +52,7 @@ class Server {
private:
int64_t daemonized_ = 0;
int pid_fd = -1;
int pid_fd_ = -1;
std::string pid_filename_;
std::string config_filename_;
std::string log_config_file_;
......
......@@ -45,17 +45,17 @@ public:
private:
// need to keep track of threads so we can join them
std::vector<std::thread> workers;
std::vector<std::thread> workers_;
// the task queue
std::queue<std::function<void()> > tasks;
std::queue<std::function<void()> > tasks_;
size_t max_queue_size;
size_t max_queue_size_;
// synchronization
std::mutex queue_mutex;
std::mutex queue_mutex_;
std::condition_variable condition;
std::condition_variable condition_;
bool stop;
};
......@@ -63,23 +63,23 @@ private:
// the constructor just launches some amount of workers
inline ThreadPool::ThreadPool(size_t threads, size_t queue_size)
: max_queue_size(queue_size), stop(false) {
: max_queue_size_(queue_size), stop(false) {
for (size_t i = 0; i < threads; ++i)
workers.emplace_back(
workers_.emplace_back(
[this] {
for (;;) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
this->condition.wait(lock,
[this] { return this->stop || !this->tasks.empty(); });
if (this->stop && this->tasks.empty())
std::unique_lock<std::mutex> lock(this->queue_mutex_);
this->condition_.wait(lock,
[this] { return this->stop || !this->tasks_.empty(); });
if (this->stop && this->tasks_.empty())
return;
task = std::move(this->tasks.front());
this->tasks.pop();
task = std::move(this->tasks_.front());
this->tasks_.pop();
}
this->condition.notify_all();
this->condition_.notify_all();
task();
}
......@@ -99,27 +99,27 @@ auto ThreadPool::enqueue(F &&f, Args &&... args)
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);
this->condition.wait(lock,
[this] { return this->tasks.size() < max_queue_size; });
std::unique_lock<std::mutex> lock(queue_mutex_);
this->condition_.wait(lock,
[this] { return this->tasks_.size() < max_queue_size_; });
// don't allow enqueueing after stopping the pool
if (stop)
throw std::runtime_error("enqueue on stopped ThreadPool");
tasks.emplace([task]() { (*task)(); });
tasks_.emplace([task]() { (*task)(); });
}
condition.notify_all();
condition_.notify_all();
return res;
}
// the destructor joins all threads
inline ThreadPool::~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queue_mutex);
std::unique_lock<std::mutex> lock(queue_mutex_);
stop = true;
}
condition.notify_all();
for (std::thread &worker: workers)
condition_.notify_all();
for (std::thread &worker: workers_)
worker.join();
}
......
......@@ -117,11 +117,11 @@ TEST_F(MetaTest, TABLE_FILE_TEST) {
TEST_F(MetaTest, ARCHIVE_TEST_DAYS) {
srand(time(0));
DBMetaOptions options;
options.path = "/tmp/milvus_test";
options.path_ = "/tmp/milvus_test";
int days_num = rand() % 100;
std::stringstream ss;
ss << "days:" << days_num;
options.archive_conf = ArchiveConf("delete", ss.str());
options.archive_conf_ = ArchiveConf("delete", ss.str());
meta::SqliteMetaImpl impl(options);
auto table_id = "meta_test_table";
......@@ -168,8 +168,8 @@ TEST_F(MetaTest, ARCHIVE_TEST_DAYS) {
TEST_F(MetaTest, ARCHIVE_TEST_DISK) {
DBMetaOptions options;
options.path = "/tmp/milvus_test";
options.archive_conf = ArchiveConf("delete", "disk:11");
options.path_ = "/tmp/milvus_test";
options.archive_conf_ = ArchiveConf("delete", "disk:11");
meta::SqliteMetaImpl impl(options);
auto table_id = "meta_test_group";
......
......@@ -77,7 +77,7 @@ TEST(DBMiscTest, OPTIONS_TEST) {
TEST(DBMiscTest, META_TEST) {
engine::DBMetaOptions options;
options.path = "/tmp/milvus_test";
options.path_ = "/tmp/milvus_test";
engine::meta::SqliteMetaImpl impl(options);
time_t tt;
......@@ -89,15 +89,15 @@ TEST(DBMiscTest, META_TEST) {
TEST(DBMiscTest, UTILS_TEST) {
engine::DBMetaOptions options;
options.path = "/tmp/milvus_test/main";
options.slave_paths.push_back("/tmp/milvus_test/slave_1");
options.slave_paths.push_back("/tmp/milvus_test/slave_2");
options.path_ = "/tmp/milvus_test/main";
options.slave_paths_.push_back("/tmp/milvus_test/slave_1");
options.slave_paths_.push_back("/tmp/milvus_test/slave_2");
const std::string TABLE_NAME = "test_tbl";
auto status = engine::utils::CreateTablePath(options, TABLE_NAME);
ASSERT_TRUE(status.ok());
ASSERT_TRUE(boost::filesystem::exists(options.path));
for(auto& path : options.slave_paths) {
ASSERT_TRUE(boost::filesystem::exists(options.path_));
for(auto& path : options.slave_paths_) {
ASSERT_TRUE(boost::filesystem::exists(path));
}
......
......@@ -121,12 +121,12 @@ TEST_F(MySqlMetaTest, TABLE_FILE_TEST) {
TEST_F(MySqlMetaTest, ARCHIVE_TEST_DAYS) {
srand(time(0));
DBMetaOptions options = GetOptions().meta;
DBMetaOptions options = GetOptions().meta_;
int days_num = rand() % 100;
std::stringstream ss;
ss << "days:" << days_num;
options.archive_conf = ArchiveConf("delete", ss.str());
options.archive_conf_ = ArchiveConf("delete", ss.str());
int mode = DBOptions::MODE::SINGLE;
meta::MySQLMetaImpl impl(options, mode);
......@@ -184,9 +184,9 @@ TEST_F(MySqlMetaTest, ARCHIVE_TEST_DAYS) {
}
TEST_F(MySqlMetaTest, ARCHIVE_TEST_DISK) {
DBMetaOptions options = GetOptions().meta;
DBMetaOptions options = GetOptions().meta_;
options.archive_conf = ArchiveConf("delete", "disk:11");
options.archive_conf_ = ArchiveConf("delete", "disk:11");
int mode = DBOptions::MODE::SINGLE;
auto impl = meta::MySQLMetaImpl(options, mode);
auto table_id = "meta_test_group";
......
......@@ -71,8 +71,8 @@ void BaseTest::TearDown() {
engine::DBOptions BaseTest::GetOptions() {
auto options = engine::DBFactory::BuildOption();
options.meta.path = "/tmp/milvus_test";
options.meta.backend_uri = "sqlite://:@:/";
options.meta_.path_ = "/tmp/milvus_test";
options.meta_.backend_uri_ = "sqlite://:@:/";
return options;
}
......@@ -110,15 +110,15 @@ void DBTest::TearDown() {
BaseTest::TearDown();
auto options = GetOptions();
boost::filesystem::remove_all(options.meta.path);
boost::filesystem::remove_all(options.meta_.path_);
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////
engine::DBOptions DBTest2::GetOptions() {
auto options = engine::DBFactory::BuildOption();
options.meta.path = "/tmp/milvus_test";
options.meta.archive_conf = engine::ArchiveConf("delete", "disk:1");
options.meta.backend_uri = "sqlite://:@:/";
options.meta_.path_ = "/tmp/milvus_test";
options.meta_.archive_conf_ = engine::ArchiveConf("delete", "disk:1");
options.meta_.backend_uri_ = "sqlite://:@:/";
return options;
}
......@@ -127,7 +127,7 @@ void MetaTest::SetUp() {
BaseTest::SetUp();
auto options = GetOptions();
impl_ = std::make_shared<engine::meta::SqliteMetaImpl>(options.meta);
impl_ = std::make_shared<engine::meta::SqliteMetaImpl>(options.meta_);
}
void MetaTest::TearDown() {
......@@ -136,17 +136,17 @@ void MetaTest::TearDown() {
BaseTest::TearDown();
auto options = GetOptions();
boost::filesystem::remove_all(options.meta.path);
boost::filesystem::remove_all(options.meta_.path_);
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////
engine::DBOptions MySqlDBTest::GetOptions() {
auto options = engine::DBFactory::BuildOption();
options.meta.path = "/tmp/milvus_test";
options.meta.backend_uri = DBTestEnvironment::getURI();
options.meta_.path_ = "/tmp/milvus_test";
options.meta_.backend_uri_ = DBTestEnvironment::getURI();
if(options.meta.backend_uri.empty()) {
options.meta.backend_uri = "mysql://root:Fantast1c@192.168.1.194:3306/";
if(options.meta_.backend_uri_.empty()) {
options.meta_.backend_uri_ = "mysql://root:Fantast1c@192.168.1.194:3306/";
}
return options;
......@@ -157,7 +157,7 @@ void MySqlMetaTest::SetUp() {
BaseTest::SetUp();
auto options = GetOptions();
impl_ = std::make_shared<engine::meta::MySQLMetaImpl>(options.meta, options.mode);
impl_ = std::make_shared<engine::meta::MySQLMetaImpl>(options.meta_, options.mode_);
}
void MySqlMetaTest::TearDown() {
......@@ -166,16 +166,16 @@ void MySqlMetaTest::TearDown() {
BaseTest::TearDown();
auto options = GetOptions();
boost::filesystem::remove_all(options.meta.path);
boost::filesystem::remove_all(options.meta_.path_);
}
engine::DBOptions MySqlMetaTest::GetOptions() {
auto options = engine::DBFactory::BuildOption();
options.meta.path = "/tmp/milvus_test";
options.meta.backend_uri = DBTestEnvironment::getURI();
options.meta_.path_ = "/tmp/milvus_test";
options.meta_.backend_uri_ = DBTestEnvironment::getURI();
if(options.meta.backend_uri.empty()) {
options.meta.backend_uri = "mysql://root:Fantast1c@192.168.1.194:3306/";
if(options.meta_.backend_uri_.empty()) {
options.meta_.backend_uri_ = "mysql://root:Fantast1c@192.168.1.194:3306/";
}
return options;
......
......@@ -54,8 +54,8 @@ void MetricTest::InitLog() {
engine::DBOptions MetricTest::GetOptions() {
auto options = engine::DBFactory::BuildOption();
options.meta.path = "/tmp/milvus_test";
options.meta.backend_uri = "sqlite://:@:/";
options.meta_.path_ = "/tmp/milvus_test";
options.meta_.backend_uri_ = "sqlite://:@:/";
return options;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册