提交 a9f84375 编写于 作者: G groot

support multi db path


Former-commit-id: 83d2772763dbd9effa4149e0f77a8b481753122b
上级 00311ead
......@@ -14,8 +14,8 @@ Please mark all change in change log and use the ticket from JIRA.
## Improvement
- MS-156 - Add unittest for merge result functions
- MS-152 - Delete assert in MySQLMetaImpl and change MySQLConnectionPool impl
- MS-204 - Support multi db_path
## New Feature
- MS-195 - Add nlist and use_blas_threshold conf
......
......@@ -6,6 +6,7 @@ server_config:
db_config:
db_path: @MILVUS_DB_PATH@ # milvus data storage path
db_slave_path: # secondry data storage path, split by semicolon
# URI format: dialect://username:password@host:port/database
# All parts except dialect are optional, but you MUST include the delimiters
......
......@@ -83,26 +83,6 @@ using ConnectorT = decltype(StoragePrototype(""));
static std::unique_ptr<ConnectorT> ConnectorPtr;
using ConditionT = decltype(c(&TableFileSchema::id_) == 1UL);
std::string DBMetaImpl::GetTablePath(const std::string &table_id) {
return options_.path + "/tables/" + table_id;
}
std::string DBMetaImpl::GetTableDatePartitionPath(const std::string &table_id, DateT &date) {
std::stringstream ss;
ss << GetTablePath(table_id) << "/" << date;
return ss.str();
}
void DBMetaImpl::GetTableFilePath(TableFileSchema &group_file) {
if (group_file.date_ == EmptyDate) {
group_file.date_ = Meta::GetDate();
}
std::stringstream ss;
ss << GetTableDatePartitionPath(group_file.table_id_, group_file.date_)
<< "/" << group_file.file_id_;
group_file.location_ = ss.str();
}
Status DBMetaImpl::NextTableId(std::string &table_id) {
std::stringstream ss;
SimpleIDGenerator g;
......@@ -213,15 +193,7 @@ Status DBMetaImpl::CreateTable(TableSchema &table_schema) {
return Status::DBTransactionError("Add Table Error");
}
auto table_path = GetTablePath(table_schema.table_id_);
table_schema.location_ = table_path;
if (!boost::filesystem::is_directory(table_path)) {
auto ret = boost::filesystem::create_directories(table_path);
if (!ret) {
ENGINE_LOG_ERROR << "Create directory " << table_path << " Error";
return Status::Error("Failed to create table path");
}
}
return utils::CreateTablePath(options_, table_schema.table_id_);
} catch (std::exception &e) {
return HandleException("Encounter exception when create table", e);
......@@ -307,9 +279,6 @@ Status DBMetaImpl::DescribeTable(TableSchema &table_schema) {
return Status::NotFound("Table " + table_schema.table_id_ + " not found");
}
auto table_path = GetTablePath(table_schema.table_id_);
table_schema.location_ = table_path;
} catch (std::exception &e) {
return HandleException("Encounter exception when describe table", e);
}
......@@ -411,20 +380,11 @@ Status DBMetaImpl::CreateTableFile(TableFileSchema &file_schema) {
file_schema.created_on_ = utils::GetMicroSecTimeStamp();
file_schema.updated_time_ = file_schema.created_on_;
file_schema.engine_type_ = table_schema.engine_type_;
GetTableFilePath(file_schema);
auto id = ConnectorPtr->insert(file_schema);
file_schema.id_ = id;
auto partition_path = GetTableDatePartitionPath(file_schema.table_id_, file_schema.date_);
if (!boost::filesystem::is_directory(partition_path)) {
auto ret = boost::filesystem::create_directory(partition_path);
if (!ret) {
ENGINE_LOG_ERROR << "Create directory " << partition_path << " Error";
return Status::DBTransactionError("Failed to create partition directory");
}
}
return utils::CreateTableFilePath(options_, file_schema);
} catch (std::exception& ex) {
return HandleException("Encounter exception when create table file", ex);
......@@ -461,7 +421,7 @@ Status DBMetaImpl::FilesToIndex(TableFilesSchema &files) {
table_file.date_ = std::get<5>(file);
table_file.engine_type_ = std::get<6>(file);
GetTableFilePath(table_file);
utils::GetTableFilePath(options_, table_file);
auto groupItr = groups.find(table_file.table_id_);
if (groupItr == groups.end()) {
TableSchema table_schema;
......@@ -524,7 +484,7 @@ Status DBMetaImpl::FilesToSearch(const std::string &table_id,
table_file.date_ = std::get<5>(file);
table_file.engine_type_ = std::get<6>(file);
table_file.dimension_ = table_schema.dimension_;
GetTableFilePath(table_file);
utils::GetTableFilePath(options_, table_file);
auto dateItr = files.find(table_file.date_);
if (dateItr == files.end()) {
files[table_file.date_] = TableFilesSchema();
......@@ -566,7 +526,7 @@ Status DBMetaImpl::FilesToSearch(const std::string &table_id,
table_file.date_ = std::get<5>(file);
table_file.engine_type_ = std::get<6>(file);
table_file.dimension_ = table_schema.dimension_;
GetTableFilePath(table_file);
utils::GetTableFilePath(options_, table_file);
auto dateItr = files.find(table_file.date_);
if (dateItr == files.end()) {
files[table_file.date_] = TableFilesSchema();
......@@ -616,7 +576,7 @@ Status DBMetaImpl::FilesToMerge(const std::string &table_id,
table_file.size_ = std::get<4>(file);
table_file.date_ = std::get<5>(file);
table_file.dimension_ = table_schema.dimension_;
GetTableFilePath(table_file);
utils::GetTableFilePath(options_, table_file);
auto dateItr = files.find(table_file.date_);
if (dateItr == files.end()) {
files[table_file.date_] = TableFilesSchema();
......@@ -662,7 +622,7 @@ Status DBMetaImpl::GetTableFiles(const std::string& table_id,
file_schema.date_ = std::get<4>(file);
file_schema.engine_type_ = std::get<5>(file);
file_schema.dimension_ = table_schema.dimension_;
GetTableFilePath(file_schema);
utils::GetTableFilePath(options_, file_schema);
table_files.emplace_back(file_schema);
}
......@@ -895,10 +855,9 @@ Status DBMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
table_file.table_id_ = std::get<1>(file);
table_file.file_id_ = std::get<2>(file);
table_file.date_ = std::get<3>(file);
GetTableFilePath(table_file);
utils::DeleteTableFilePath(options_, table_file);
ENGINE_LOG_DEBUG << "Removing deleted id =" << table_file.id_ << " location = " << table_file.location_ << std::endl;
boost::filesystem::remove(table_file.location_);
ConnectorPtr->remove<TableFileSchema>(table_file.id_);
}
......@@ -922,10 +881,7 @@ Status DBMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
auto commited = ConnectorPtr->transaction([&]() mutable {
for (auto &table : tables) {
auto table_path = GetTablePath(std::get<1>(table));
ENGINE_LOG_DEBUG << "Remove table folder: " << table_path;
boost::filesystem::remove_all(table_path);
utils::DeleteTablePath(options_, std::get<1>(table));
ConnectorPtr->remove<TableSchema>(std::get<0>(table));
}
......
......@@ -70,9 +70,6 @@ private:
Status NextFileId(std::string& file_id);
Status NextTableId(std::string& table_id);
Status DiscardFiles(long to_discard_size);
std::string GetTablePath(const std::string& table_id);
std::string GetTableDatePartitionPath(const std::string& table_id, DateT& date);
void GetTableFilePath(TableFileSchema& group_file);
Status Initialize();
const DBMetaOptions options_;
......
......@@ -31,7 +31,6 @@ struct TableSchema {
int state_ = (int)NORMAL;
size_t files_cnt_ = 0;
uint16_t dimension_ = 0;
std::string location_;
long created_on_ = 0;
int engine_type_ = (int)EngineType::FAISS_IDMAP;
bool store_raw_data_ = false;
......
......@@ -67,26 +67,6 @@ namespace meta {
}
std::string MySQLMetaImpl::GetTablePath(const std::string &table_id) {
return options_.path + "/tables/" + table_id;
}
std::string MySQLMetaImpl::GetTableDatePartitionPath(const std::string &table_id, DateT &date) {
std::stringstream ss;
ss << GetTablePath(table_id) << "/" << date;
return ss.str();
}
void MySQLMetaImpl::GetTableFilePath(TableFileSchema &group_file) {
if (group_file.date_ == EmptyDate) {
group_file.date_ = Meta::GetDate();
}
std::stringstream ss;
ss << GetTableDatePartitionPath(group_file.table_id_, group_file.date_)
<< "/" << group_file.file_id_;
group_file.location_ = ss.str();
}
Status MySQLMetaImpl::NextTableId(std::string &table_id) {
std::stringstream ss;
SimpleIDGenerator g;
......@@ -412,15 +392,8 @@ namespace meta {
// auto total_time = METRICS_MICROSECONDS(start_time, end_time);
// server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
auto table_path = GetTablePath(table_schema.table_id_);
table_schema.location_ = table_path;
if (!boost::filesystem::is_directory(table_path)) {
auto ret = boost::filesystem::create_directories(table_path);
if (!ret) {
ENGINE_LOG_ERROR << "Create directory " << table_path << " Error";
return Status::Error("Failed to create table path");
}
}
return utils::CreateTablePath(options_, table_schema.table_id_);
} catch (const BadQuery& er) {
// Handle any query errors
ENGINE_LOG_ERROR << "QUERY ERROR WHEN ADDING TABLE" << ": " << er.what();
......@@ -588,9 +561,6 @@ namespace meta {
return Status::NotFound("Table " + table_schema.table_id_ + " not found");
}
auto table_path = GetTablePath(table_schema.table_id_);
table_schema.location_ = table_path;
} catch (const BadQuery& er) {
// Handle any query errors
ENGINE_LOG_ERROR << "QUERY ERROR WHEN DESCRIBING TABLE" << ": " << er.what();
......@@ -743,7 +713,7 @@ namespace meta {
file_schema.created_on_ = utils::GetMicroSecTimeStamp();
file_schema.updated_time_ = file_schema.created_on_;
file_schema.engine_type_ = table_schema.engine_type_;
GetTableFilePath(file_schema);
utils::GetTableFilePath(options_, file_schema);
std::string id = "NULL"; //auto-increment
std::string table_id = file_schema.table_id_;
......@@ -788,15 +758,7 @@ namespace meta {
}
} // Scoped Connection
auto partition_path = GetTableDatePartitionPath(file_schema.table_id_, file_schema.date_);
if (!boost::filesystem::is_directory(partition_path)) {
auto ret = boost::filesystem::create_directory(partition_path);
if (!ret) {
ENGINE_LOG_ERROR << "Create directory " << partition_path << " Error";
return Status::DBTransactionError("Failed to create partition directory");
}
}
return utils::CreateTableFilePath(options_, file_schema);
} catch (const BadQuery& er) {
// Handle any query errors
......@@ -881,7 +843,7 @@ namespace meta {
}
table_file.dimension_ = groups[table_file.table_id_].dimension_;
GetTableFilePath(table_file);
utils::GetTableFilePath(options_, table_file);
files.push_back(table_file);
}
......@@ -992,7 +954,7 @@ namespace meta {
table_file.dimension_ = table_schema.dimension_;
GetTableFilePath(table_file);
utils::GetTableFilePath(options_, table_file);
auto dateItr = files.find(table_file.date_);
if (dateItr == files.end()) {
......@@ -1078,7 +1040,7 @@ namespace meta {
table_file.dimension_ = table_schema.dimension_;
GetTableFilePath(table_file);
utils::GetTableFilePath(options_, table_file);
auto dateItr = files.find(table_file.date_);
if (dateItr == files.end()) {
......@@ -1173,7 +1135,7 @@ namespace meta {
file_schema.dimension_ = table_schema.dimension_;
GetTableFilePath(file_schema);
utils::GetTableFilePath(options_, file_schema);
table_files.emplace_back(file_schema);
}
......@@ -1633,11 +1595,10 @@ namespace meta {
table_file.date_ = resRow["date"];
GetTableFilePath(table_file);
utils::DeleteTableFilePath(options_, table_file);
ENGINE_LOG_DEBUG << "Removing deleted id =" << table_file.id_ << " location = "
<< table_file.location_ << std::endl;
boost::filesystem::remove(table_file.location_);
idsToDelete.emplace_back(std::to_string(table_file.id_));
}
......@@ -1710,10 +1671,7 @@ namespace meta {
std::string table_id;
resRow["table_id"].to_string(table_id);
auto table_path = GetTablePath(table_id);
ENGINE_LOG_DEBUG << "Remove table folder: " << table_path;
boost::filesystem::remove_all(table_path);
utils::DeleteTablePath(options_, table_id);
idsToDeleteSS << "id = " << std::to_string(id) << " OR ";
}
......
......@@ -75,9 +75,6 @@ namespace meta {
Status NextFileId(std::string& file_id);
Status NextTableId(std::string& table_id);
Status DiscardFiles(long long to_discard_size);
std::string GetTablePath(const std::string& table_id);
std::string GetTableDatePartitionPath(const std::string& table_id, DateT& date);
void GetTableFilePath(TableFileSchema& group_file);
Status Initialize();
const DBMetaOptions options_;
......
......@@ -8,6 +8,7 @@
#include <string>
#include <memory>
#include <map>
#include <vector>
namespace zilliz {
namespace milvus {
......@@ -43,6 +44,7 @@ private:
struct DBMetaOptions {
std::string path;
std::vector<std::string> slave_paths;
std::string backend_uri;
ArchiveConf archive_conf = ArchiveConf("delete");
}; // DBMetaOptions
......
......@@ -4,14 +4,40 @@
* Proprietary and confidential.
******************************************************************************/
#include "Utils.h"
#include "utils/CommonUtil.h"
#include "Log.h"
#include <chrono>
#include <boost/filesystem.hpp>
namespace zilliz {
namespace milvus {
namespace engine {
namespace utils {
namespace {
static const std::string TABLES_FOLDER = "/tables/";
std::string ConstructParentFolder(const std::string& db_path, const meta::TableFileSchema& table_file) {
std::string table_path = db_path + TABLES_FOLDER + table_file.table_id_;
std::string partition_path = table_path + "/" + std::to_string(table_file.date_);
return partition_path;
}
std::string GetTableFileParentFolder(const DBMetaOptions& options, const meta::TableFileSchema& table_file) {
uint64_t path_count = options.slave_paths.size() + 1;
std::string target_path = options.path;
uint64_t index = table_file.id_%path_count;
if(index > 0) {
target_path = options.slave_paths[index - 1];
}
return ConstructParentFolder(target_path, table_file);
}
}
long GetMicroSecTimeStamp() {
auto now = std::chrono::system_clock::now();
auto micros = std::chrono::duration_cast<std::chrono::microseconds>(
......@@ -20,6 +46,87 @@ long GetMicroSecTimeStamp() {
return micros;
}
Status CreateTablePath(const DBMetaOptions& options, const std::string& table_id) {
std::string db_path = options.path;
std::string table_path = db_path + TABLES_FOLDER + table_id;
auto status = server::CommonUtil::CreateDirectory(table_path);
if (status != 0) {
ENGINE_LOG_ERROR << "Create directory " << table_path << " Error";
return Status::Error("Failed to create table path");
}
for(auto& path : options.slave_paths) {
table_path = path + TABLES_FOLDER + table_id;
status = server::CommonUtil::CreateDirectory(table_path);
if (status != 0) {
ENGINE_LOG_ERROR << "Create directory " << table_path << " Error";
return Status::Error("Failed to create table path");
}
}
return Status::OK();
}
Status DeleteTablePath(const DBMetaOptions& options, const std::string& table_id) {
std::string db_path = options.path;
std::string table_path = db_path + TABLES_FOLDER + table_id;
boost::filesystem::remove_all(table_path);
ENGINE_LOG_DEBUG << "Remove table folder: " << table_path;
for(auto& path : options.slave_paths) {
table_path = path + TABLES_FOLDER + table_id;
boost::filesystem::remove_all(table_path);
ENGINE_LOG_DEBUG << "Remove table folder: " << table_path;
}
return Status::OK();
}
Status CreateTableFilePath(const DBMetaOptions& options, meta::TableFileSchema& table_file) {
std::string parent_path = GetTableFileParentFolder(options, table_file);
auto status = server::CommonUtil::CreateDirectory(parent_path);
if (status != 0) {
ENGINE_LOG_ERROR << "Create directory " << parent_path << " Error";
return Status::DBTransactionError("Failed to create partition directory");
}
table_file.location_ = parent_path + "/" + table_file.file_id_;
return Status::OK();
}
Status GetTableFilePath(const DBMetaOptions& options, meta::TableFileSchema& table_file) {
#if 0
std::string parent_path = GetTableFileParentFolder(options, table_file);
table_file.location_ = parent_path + "/" + table_file.file_id_;
#else
std::string parent_path = ConstructParentFolder(options.path, table_file);
std::string file_path = parent_path + "/" + table_file.file_id_;
if(boost::filesystem::exists(file_path)) {
table_file.location_ = file_path;
return Status::OK();
} else {
for(auto& path : options.slave_paths) {
parent_path = ConstructParentFolder(path, table_file);
file_path = parent_path + "/" + table_file.file_id_;
if(boost::filesystem::exists(file_path)) {
table_file.location_ = file_path;
return Status::OK();
}
}
}
#endif
return Status::Error("Table file doesn't exist: " + table_file.file_id_);
}
Status DeleteTableFilePath(const DBMetaOptions& options, meta::TableFileSchema& table_file) {
utils::GetTableFilePath(options, table_file);
boost::filesystem::remove(table_file.location_);
return Status::OK();
}
} // namespace utils
} // namespace engine
} // namespace milvus
......
......@@ -5,6 +5,10 @@
******************************************************************************/
#pragma once
#include "Options.h"
#include "MetaTypes.h"
#include <string>
namespace zilliz {
namespace milvus {
......@@ -13,6 +17,13 @@ namespace utils {
long GetMicroSecTimeStamp();
Status CreateTablePath(const DBMetaOptions& options, const std::string& table_id);
Status DeleteTablePath(const DBMetaOptions& options, const std::string& table_id);
Status CreateTableFilePath(const DBMetaOptions& options, meta::TableFileSchema& table_file);
Status GetTableFilePath(const DBMetaOptions& options, meta::TableFileSchema& table_file);
Status DeleteTableFilePath(const DBMetaOptions& options, meta::TableFileSchema& table_file);
} // namespace utils
} // namespace engine
} // namespace milvus
......
......@@ -8,6 +8,7 @@
#include "ServerConfig.h"
#include "utils/CommonUtil.h"
#include "utils/Log.h"
#include "utils/StringHelpFunctions.h"
namespace zilliz {
namespace milvus {
......@@ -19,6 +20,10 @@ DBWrapper::DBWrapper() {
opt.meta.backend_uri = config.GetValue(CONFIG_DB_URL);
std::string db_path = config.GetValue(CONFIG_DB_PATH);
opt.meta.path = db_path + "/db";
std::string db_slave_path = config.GetValue(CONFIG_DB_SLAVE_PATH);
StringHelpFunctions::SplitStringByDelimeter(db_slave_path, ";", opt.meta.slave_paths);
int64_t index_size = config.GetInt64Value(CONFIG_DB_INDEX_TRIGGER_SIZE);
if(index_size > 0) {//ensure larger than zero, unit is MB
opt.index_trigger_size = (size_t)index_size * engine::ONE_MB;
......@@ -59,6 +64,14 @@ DBWrapper::DBWrapper() {
kill(0, SIGUSR1);
}
for(auto& path : opt.meta.slave_paths) {
err = CommonUtil::CreateDirectory(path);
if(err != SERVER_SUCCESS) {
std::cout << "ERROR! Failed to create database slave path: " << path << std::endl;
kill(0, SIGUSR1);
}
}
std::string msg = opt.meta.path;
try {
zilliz::milvus::engine::DB::Open(opt, &db_);
......
......@@ -23,6 +23,7 @@ static const std::string CONFIG_CLUSTER_MODE = "mode";
static const std::string CONFIG_DB = "db_config";
static const std::string CONFIG_DB_URL = "db_backend_url";
static const std::string CONFIG_DB_PATH = "db_path";
static const std::string CONFIG_DB_SLAVE_PATH = "db_slave_path";
static const std::string CONFIG_DB_INDEX_TRIGGER_SIZE = "index_building_threshold";
static const std::string CONFIG_DB_ARCHIVE_DISK = "archive_disk_threshold";
static const std::string CONFIG_DB_ARCHIVE_DAYS = "archive_days_threshold";
......
......@@ -51,8 +51,7 @@ bool CommonUtil::GetSystemAvailableThreads(unsigned int &threadCnt) {
return true;
}
bool CommonUtil::IsDirectoryExist(const std::string &path)
{
bool CommonUtil::IsDirectoryExist(const std::string &path) {
DIR *dp = nullptr;
if ((dp = opendir(path.c_str())) == nullptr) {
return false;
......@@ -63,6 +62,10 @@ bool CommonUtil::IsDirectoryExist(const std::string &path)
}
ServerError CommonUtil::CreateDirectory(const std::string &path) {
if(path.empty()) {
return SERVER_SUCCESS;
}
struct stat directoryStat;
int statOK = stat(path.c_str(), &directoryStat);
if (statOK == 0) {
......@@ -120,6 +123,10 @@ namespace {
}
ServerError CommonUtil::DeleteDirectory(const std::string &path) {
if(path.empty()) {
return SERVER_SUCCESS;
}
struct stat directoryStat;
int statOK = stat(path.c_str(), &directoryStat);
if (statOK != 0)
......
......@@ -27,6 +27,10 @@ void StringHelpFunctions::TrimStringQuote(std::string &string, const std::string
ServerError StringHelpFunctions::SplitStringByDelimeter(const std::string &str,
const std::string &delimeter,
std::vector<std::string> &result) {
if(str.empty()) {
return SERVER_SUCCESS;
}
size_t last = 0;
size_t index = str.find_first_of(delimeter, last);
while (index != std::string::npos) {
......
......@@ -272,7 +272,9 @@ TEST_F(DBTest2, DELETE_TEST) {
stat = db_->DescribeTable(table_info_get);
ASSERT_STATS(stat);
ASSERT_TRUE(boost::filesystem::exists(table_info_get.location_));
bool has_table = false;
db_->HasTable(TABLE_NAME, has_table);
ASSERT_TRUE(has_table);
engine::IDNumbers vector_ids;
......@@ -293,5 +295,7 @@ TEST_F(DBTest2, DELETE_TEST) {
stat = db_->DeleteTable(TABLE_NAME, dates);
std::this_thread::sleep_for(std::chrono::seconds(2));
ASSERT_TRUE(stat.ok());
ASSERT_FALSE(boost::filesystem::exists(table_info_get.location_));
db_->HasTable(TABLE_NAME, has_table);
ASSERT_FALSE(has_table);
};
......@@ -38,7 +38,7 @@ TEST_F(MetaTest, TABLE_TEST) {
table.table_id_ = table_id;
status = impl_->CreateTable(table);
ASSERT_TRUE(status.ok());
ASSERT_TRUE(status.IsAlreadyExist());
table.table_id_ = "";
status = impl_->CreateTable(table);
......
......@@ -271,8 +271,9 @@ TEST_F(MySQLDBTest, DELETE_TEST) {
stat = db_->DescribeTable(table_info_get);
ASSERT_STATS(stat);
// std::cout << "location: " << table_info_get.location_ << std::endl;
ASSERT_TRUE(boost::filesystem::exists(table_info_get.location_));
bool has_table = false;
db_->HasTable(TABLE_NAME, has_table);
ASSERT_TRUE(has_table);
engine::IDNumbers vector_ids;
......@@ -295,7 +296,9 @@ TEST_F(MySQLDBTest, DELETE_TEST) {
std::this_thread::sleep_for(std::chrono::seconds(5));
// std::cout << "5 sec finish" << std::endl;
ASSERT_TRUE(stat.ok());
// ASSERT_FALSE(boost::filesystem::exists(table_info_get.location_));
db_->HasTable(TABLE_NAME, has_table);
ASSERT_FALSE(has_table);
delete db_;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册