提交 1b61a95d 编写于 作者: J jinhai

Merge branch 'branch-0.3.1-yuncong' into 'branch-0.3.1-yuncong'

MS-204 support multi db path

See merge request megasearch/milvus!188

Former-commit-id: 4e064ae63452262f7f22b91ed1941313de14e7da
...@@ -14,8 +14,8 @@ Please mark all change in change log and use the ticket from JIRA. ...@@ -14,8 +14,8 @@ Please mark all change in change log and use the ticket from JIRA.
## Improvement ## Improvement
- MS-156 - Add unittest for merge result functions - MS-156 - Add unittest for merge result functions
- MS-152 - Delete assert in MySQLMetaImpl and change MySQLConnectionPool impl - MS-152 - Delete assert in MySQLMetaImpl and change MySQLConnectionPool impl
- MS-204 - Support multi db_path
## New Feature ## New Feature
- MS-195 - Add nlist and use_blas_threshold conf - MS-195 - Add nlist and use_blas_threshold conf
......
...@@ -6,6 +6,7 @@ server_config: ...@@ -6,6 +6,7 @@ server_config:
db_config: db_config:
db_path: @MILVUS_DB_PATH@ # milvus data storage path db_path: @MILVUS_DB_PATH@ # milvus data storage path
db_slave_path: # secondry data storage path, split by semicolon
# URI format: dialect://username:password@host:port/database # URI format: dialect://username:password@host:port/database
# All parts except dialect are optional, but you MUST include the delimiters # All parts except dialect are optional, but you MUST include the delimiters
......
...@@ -461,6 +461,7 @@ Status DBImpl::BuildIndex(const meta::TableFileSchema& file) { ...@@ -461,6 +461,7 @@ Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
meta::TableFileSchema table_file; meta::TableFileSchema table_file;
table_file.table_id_ = file.table_id_; table_file.table_id_ = file.table_id_;
table_file.date_ = file.date_; table_file.date_ = file.date_;
table_file.file_type_ = meta::TableFileSchema::INDEX; //for multi-db-path, distribute index file averagely to each path
Status status = meta_ptr_->CreateTableFile(table_file); Status status = meta_ptr_->CreateTableFile(table_file);
if (!status.ok()) { if (!status.ok()) {
return status; return status;
......
...@@ -83,26 +83,6 @@ using ConnectorT = decltype(StoragePrototype("")); ...@@ -83,26 +83,6 @@ using ConnectorT = decltype(StoragePrototype(""));
static std::unique_ptr<ConnectorT> ConnectorPtr; static std::unique_ptr<ConnectorT> ConnectorPtr;
using ConditionT = decltype(c(&TableFileSchema::id_) == 1UL); using ConditionT = decltype(c(&TableFileSchema::id_) == 1UL);
std::string DBMetaImpl::GetTablePath(const std::string &table_id) {
return options_.path + "/tables/" + table_id;
}
std::string DBMetaImpl::GetTableDatePartitionPath(const std::string &table_id, DateT &date) {
std::stringstream ss;
ss << GetTablePath(table_id) << "/" << date;
return ss.str();
}
void DBMetaImpl::GetTableFilePath(TableFileSchema &group_file) {
if (group_file.date_ == EmptyDate) {
group_file.date_ = Meta::GetDate();
}
std::stringstream ss;
ss << GetTableDatePartitionPath(group_file.table_id_, group_file.date_)
<< "/" << group_file.file_id_;
group_file.location_ = ss.str();
}
Status DBMetaImpl::NextTableId(std::string &table_id) { Status DBMetaImpl::NextTableId(std::string &table_id) {
std::stringstream ss; std::stringstream ss;
SimpleIDGenerator g; SimpleIDGenerator g;
...@@ -213,15 +193,7 @@ Status DBMetaImpl::CreateTable(TableSchema &table_schema) { ...@@ -213,15 +193,7 @@ Status DBMetaImpl::CreateTable(TableSchema &table_schema) {
return Status::DBTransactionError("Add Table Error"); return Status::DBTransactionError("Add Table Error");
} }
auto table_path = GetTablePath(table_schema.table_id_); return utils::CreateTablePath(options_, table_schema.table_id_);
table_schema.location_ = table_path;
if (!boost::filesystem::is_directory(table_path)) {
auto ret = boost::filesystem::create_directories(table_path);
if (!ret) {
ENGINE_LOG_ERROR << "Create directory " << table_path << " Error";
return Status::Error("Failed to create table path");
}
}
} catch (std::exception &e) { } catch (std::exception &e) {
return HandleException("Encounter exception when create table", e); return HandleException("Encounter exception when create table", e);
...@@ -307,9 +279,6 @@ Status DBMetaImpl::DescribeTable(TableSchema &table_schema) { ...@@ -307,9 +279,6 @@ Status DBMetaImpl::DescribeTable(TableSchema &table_schema) {
return Status::NotFound("Table " + table_schema.table_id_ + " not found"); return Status::NotFound("Table " + table_schema.table_id_ + " not found");
} }
auto table_path = GetTablePath(table_schema.table_id_);
table_schema.location_ = table_path;
} catch (std::exception &e) { } catch (std::exception &e) {
return HandleException("Encounter exception when describe table", e); return HandleException("Encounter exception when describe table", e);
} }
...@@ -411,20 +380,11 @@ Status DBMetaImpl::CreateTableFile(TableFileSchema &file_schema) { ...@@ -411,20 +380,11 @@ Status DBMetaImpl::CreateTableFile(TableFileSchema &file_schema) {
file_schema.created_on_ = utils::GetMicroSecTimeStamp(); file_schema.created_on_ = utils::GetMicroSecTimeStamp();
file_schema.updated_time_ = file_schema.created_on_; file_schema.updated_time_ = file_schema.created_on_;
file_schema.engine_type_ = table_schema.engine_type_; file_schema.engine_type_ = table_schema.engine_type_;
GetTableFilePath(file_schema);
auto id = ConnectorPtr->insert(file_schema); auto id = ConnectorPtr->insert(file_schema);
file_schema.id_ = id; file_schema.id_ = id;
auto partition_path = GetTableDatePartitionPath(file_schema.table_id_, file_schema.date_); return utils::CreateTableFilePath(options_, file_schema);
if (!boost::filesystem::is_directory(partition_path)) {
auto ret = boost::filesystem::create_directory(partition_path);
if (!ret) {
ENGINE_LOG_ERROR << "Create directory " << partition_path << " Error";
return Status::DBTransactionError("Failed to create partition directory");
}
}
} catch (std::exception& ex) { } catch (std::exception& ex) {
return HandleException("Encounter exception when create table file", ex); return HandleException("Encounter exception when create table file", ex);
...@@ -461,7 +421,7 @@ Status DBMetaImpl::FilesToIndex(TableFilesSchema &files) { ...@@ -461,7 +421,7 @@ Status DBMetaImpl::FilesToIndex(TableFilesSchema &files) {
table_file.date_ = std::get<5>(file); table_file.date_ = std::get<5>(file);
table_file.engine_type_ = std::get<6>(file); table_file.engine_type_ = std::get<6>(file);
GetTableFilePath(table_file); utils::GetTableFilePath(options_, table_file);
auto groupItr = groups.find(table_file.table_id_); auto groupItr = groups.find(table_file.table_id_);
if (groupItr == groups.end()) { if (groupItr == groups.end()) {
TableSchema table_schema; TableSchema table_schema;
...@@ -524,7 +484,7 @@ Status DBMetaImpl::FilesToSearch(const std::string &table_id, ...@@ -524,7 +484,7 @@ Status DBMetaImpl::FilesToSearch(const std::string &table_id,
table_file.date_ = std::get<5>(file); table_file.date_ = std::get<5>(file);
table_file.engine_type_ = std::get<6>(file); table_file.engine_type_ = std::get<6>(file);
table_file.dimension_ = table_schema.dimension_; table_file.dimension_ = table_schema.dimension_;
GetTableFilePath(table_file); utils::GetTableFilePath(options_, table_file);
auto dateItr = files.find(table_file.date_); auto dateItr = files.find(table_file.date_);
if (dateItr == files.end()) { if (dateItr == files.end()) {
files[table_file.date_] = TableFilesSchema(); files[table_file.date_] = TableFilesSchema();
...@@ -566,7 +526,7 @@ Status DBMetaImpl::FilesToSearch(const std::string &table_id, ...@@ -566,7 +526,7 @@ Status DBMetaImpl::FilesToSearch(const std::string &table_id,
table_file.date_ = std::get<5>(file); table_file.date_ = std::get<5>(file);
table_file.engine_type_ = std::get<6>(file); table_file.engine_type_ = std::get<6>(file);
table_file.dimension_ = table_schema.dimension_; table_file.dimension_ = table_schema.dimension_;
GetTableFilePath(table_file); utils::GetTableFilePath(options_, table_file);
auto dateItr = files.find(table_file.date_); auto dateItr = files.find(table_file.date_);
if (dateItr == files.end()) { if (dateItr == files.end()) {
files[table_file.date_] = TableFilesSchema(); files[table_file.date_] = TableFilesSchema();
...@@ -616,7 +576,7 @@ Status DBMetaImpl::FilesToMerge(const std::string &table_id, ...@@ -616,7 +576,7 @@ Status DBMetaImpl::FilesToMerge(const std::string &table_id,
table_file.size_ = std::get<4>(file); table_file.size_ = std::get<4>(file);
table_file.date_ = std::get<5>(file); table_file.date_ = std::get<5>(file);
table_file.dimension_ = table_schema.dimension_; table_file.dimension_ = table_schema.dimension_;
GetTableFilePath(table_file); utils::GetTableFilePath(options_, table_file);
auto dateItr = files.find(table_file.date_); auto dateItr = files.find(table_file.date_);
if (dateItr == files.end()) { if (dateItr == files.end()) {
files[table_file.date_] = TableFilesSchema(); files[table_file.date_] = TableFilesSchema();
...@@ -662,7 +622,7 @@ Status DBMetaImpl::GetTableFiles(const std::string& table_id, ...@@ -662,7 +622,7 @@ Status DBMetaImpl::GetTableFiles(const std::string& table_id,
file_schema.date_ = std::get<4>(file); file_schema.date_ = std::get<4>(file);
file_schema.engine_type_ = std::get<5>(file); file_schema.engine_type_ = std::get<5>(file);
file_schema.dimension_ = table_schema.dimension_; file_schema.dimension_ = table_schema.dimension_;
GetTableFilePath(file_schema); utils::GetTableFilePath(options_, file_schema);
table_files.emplace_back(file_schema); table_files.emplace_back(file_schema);
} }
...@@ -895,10 +855,9 @@ Status DBMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) { ...@@ -895,10 +855,9 @@ Status DBMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
table_file.table_id_ = std::get<1>(file); table_file.table_id_ = std::get<1>(file);
table_file.file_id_ = std::get<2>(file); table_file.file_id_ = std::get<2>(file);
table_file.date_ = std::get<3>(file); table_file.date_ = std::get<3>(file);
GetTableFilePath(table_file);
utils::DeleteTableFilePath(options_, table_file);
ENGINE_LOG_DEBUG << "Removing deleted id =" << table_file.id_ << " location = " << table_file.location_ << std::endl; ENGINE_LOG_DEBUG << "Removing deleted id =" << table_file.id_ << " location = " << table_file.location_ << std::endl;
boost::filesystem::remove(table_file.location_);
ConnectorPtr->remove<TableFileSchema>(table_file.id_); ConnectorPtr->remove<TableFileSchema>(table_file.id_);
} }
...@@ -922,10 +881,7 @@ Status DBMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) { ...@@ -922,10 +881,7 @@ Status DBMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
auto commited = ConnectorPtr->transaction([&]() mutable { auto commited = ConnectorPtr->transaction([&]() mutable {
for (auto &table : tables) { for (auto &table : tables) {
auto table_path = GetTablePath(std::get<1>(table)); utils::DeleteTablePath(options_, std::get<1>(table));
ENGINE_LOG_DEBUG << "Remove table folder: " << table_path;
boost::filesystem::remove_all(table_path);
ConnectorPtr->remove<TableSchema>(std::get<0>(table)); ConnectorPtr->remove<TableSchema>(std::get<0>(table));
} }
......
...@@ -70,9 +70,6 @@ private: ...@@ -70,9 +70,6 @@ private:
Status NextFileId(std::string& file_id); Status NextFileId(std::string& file_id);
Status NextTableId(std::string& table_id); Status NextTableId(std::string& table_id);
Status DiscardFiles(long to_discard_size); Status DiscardFiles(long to_discard_size);
std::string GetTablePath(const std::string& table_id);
std::string GetTableDatePartitionPath(const std::string& table_id, DateT& date);
void GetTableFilePath(TableFileSchema& group_file);
Status Initialize(); Status Initialize();
const DBMetaOptions options_; const DBMetaOptions options_;
......
...@@ -31,7 +31,6 @@ struct TableSchema { ...@@ -31,7 +31,6 @@ struct TableSchema {
int state_ = (int)NORMAL; int state_ = (int)NORMAL;
size_t files_cnt_ = 0; size_t files_cnt_ = 0;
uint16_t dimension_ = 0; uint16_t dimension_ = 0;
std::string location_;
long created_on_ = 0; long created_on_ = 0;
int engine_type_ = (int)EngineType::FAISS_IDMAP; int engine_type_ = (int)EngineType::FAISS_IDMAP;
bool store_raw_data_ = false; bool store_raw_data_ = false;
......
...@@ -67,26 +67,6 @@ namespace meta { ...@@ -67,26 +67,6 @@ namespace meta {
} }
std::string MySQLMetaImpl::GetTablePath(const std::string &table_id) {
return options_.path + "/tables/" + table_id;
}
std::string MySQLMetaImpl::GetTableDatePartitionPath(const std::string &table_id, DateT &date) {
std::stringstream ss;
ss << GetTablePath(table_id) << "/" << date;
return ss.str();
}
void MySQLMetaImpl::GetTableFilePath(TableFileSchema &group_file) {
if (group_file.date_ == EmptyDate) {
group_file.date_ = Meta::GetDate();
}
std::stringstream ss;
ss << GetTableDatePartitionPath(group_file.table_id_, group_file.date_)
<< "/" << group_file.file_id_;
group_file.location_ = ss.str();
}
Status MySQLMetaImpl::NextTableId(std::string &table_id) { Status MySQLMetaImpl::NextTableId(std::string &table_id) {
std::stringstream ss; std::stringstream ss;
SimpleIDGenerator g; SimpleIDGenerator g;
...@@ -412,15 +392,8 @@ namespace meta { ...@@ -412,15 +392,8 @@ namespace meta {
// auto total_time = METRICS_MICROSECONDS(start_time, end_time); // auto total_time = METRICS_MICROSECONDS(start_time, end_time);
// server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time); // server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
auto table_path = GetTablePath(table_schema.table_id_); return utils::CreateTablePath(options_, table_schema.table_id_);
table_schema.location_ = table_path;
if (!boost::filesystem::is_directory(table_path)) {
auto ret = boost::filesystem::create_directories(table_path);
if (!ret) {
ENGINE_LOG_ERROR << "Create directory " << table_path << " Error";
return Status::Error("Failed to create table path");
}
}
} catch (const BadQuery& er) { } catch (const BadQuery& er) {
// Handle any query errors // Handle any query errors
ENGINE_LOG_ERROR << "QUERY ERROR WHEN ADDING TABLE" << ": " << er.what(); ENGINE_LOG_ERROR << "QUERY ERROR WHEN ADDING TABLE" << ": " << er.what();
...@@ -588,9 +561,6 @@ namespace meta { ...@@ -588,9 +561,6 @@ namespace meta {
return Status::NotFound("Table " + table_schema.table_id_ + " not found"); return Status::NotFound("Table " + table_schema.table_id_ + " not found");
} }
auto table_path = GetTablePath(table_schema.table_id_);
table_schema.location_ = table_path;
} catch (const BadQuery& er) { } catch (const BadQuery& er) {
// Handle any query errors // Handle any query errors
ENGINE_LOG_ERROR << "QUERY ERROR WHEN DESCRIBING TABLE" << ": " << er.what(); ENGINE_LOG_ERROR << "QUERY ERROR WHEN DESCRIBING TABLE" << ": " << er.what();
...@@ -743,7 +713,7 @@ namespace meta { ...@@ -743,7 +713,7 @@ namespace meta {
file_schema.created_on_ = utils::GetMicroSecTimeStamp(); file_schema.created_on_ = utils::GetMicroSecTimeStamp();
file_schema.updated_time_ = file_schema.created_on_; file_schema.updated_time_ = file_schema.created_on_;
file_schema.engine_type_ = table_schema.engine_type_; file_schema.engine_type_ = table_schema.engine_type_;
GetTableFilePath(file_schema); utils::GetTableFilePath(options_, file_schema);
std::string id = "NULL"; //auto-increment std::string id = "NULL"; //auto-increment
std::string table_id = file_schema.table_id_; std::string table_id = file_schema.table_id_;
...@@ -788,15 +758,7 @@ namespace meta { ...@@ -788,15 +758,7 @@ namespace meta {
} }
} // Scoped Connection } // Scoped Connection
auto partition_path = GetTableDatePartitionPath(file_schema.table_id_, file_schema.date_); return utils::CreateTableFilePath(options_, file_schema);
if (!boost::filesystem::is_directory(partition_path)) {
auto ret = boost::filesystem::create_directory(partition_path);
if (!ret) {
ENGINE_LOG_ERROR << "Create directory " << partition_path << " Error";
return Status::DBTransactionError("Failed to create partition directory");
}
}
} catch (const BadQuery& er) { } catch (const BadQuery& er) {
// Handle any query errors // Handle any query errors
...@@ -881,7 +843,7 @@ namespace meta { ...@@ -881,7 +843,7 @@ namespace meta {
} }
table_file.dimension_ = groups[table_file.table_id_].dimension_; table_file.dimension_ = groups[table_file.table_id_].dimension_;
GetTableFilePath(table_file); utils::GetTableFilePath(options_, table_file);
files.push_back(table_file); files.push_back(table_file);
} }
...@@ -992,7 +954,7 @@ namespace meta { ...@@ -992,7 +954,7 @@ namespace meta {
table_file.dimension_ = table_schema.dimension_; table_file.dimension_ = table_schema.dimension_;
GetTableFilePath(table_file); utils::GetTableFilePath(options_, table_file);
auto dateItr = files.find(table_file.date_); auto dateItr = files.find(table_file.date_);
if (dateItr == files.end()) { if (dateItr == files.end()) {
...@@ -1078,7 +1040,7 @@ namespace meta { ...@@ -1078,7 +1040,7 @@ namespace meta {
table_file.dimension_ = table_schema.dimension_; table_file.dimension_ = table_schema.dimension_;
GetTableFilePath(table_file); utils::GetTableFilePath(options_, table_file);
auto dateItr = files.find(table_file.date_); auto dateItr = files.find(table_file.date_);
if (dateItr == files.end()) { if (dateItr == files.end()) {
...@@ -1173,7 +1135,7 @@ namespace meta { ...@@ -1173,7 +1135,7 @@ namespace meta {
file_schema.dimension_ = table_schema.dimension_; file_schema.dimension_ = table_schema.dimension_;
GetTableFilePath(file_schema); utils::GetTableFilePath(options_, file_schema);
table_files.emplace_back(file_schema); table_files.emplace_back(file_schema);
} }
...@@ -1633,11 +1595,10 @@ namespace meta { ...@@ -1633,11 +1595,10 @@ namespace meta {
table_file.date_ = resRow["date"]; table_file.date_ = resRow["date"];
GetTableFilePath(table_file); utils::DeleteTableFilePath(options_, table_file);
ENGINE_LOG_DEBUG << "Removing deleted id =" << table_file.id_ << " location = " ENGINE_LOG_DEBUG << "Removing deleted id =" << table_file.id_ << " location = "
<< table_file.location_ << std::endl; << table_file.location_ << std::endl;
boost::filesystem::remove(table_file.location_);
idsToDelete.emplace_back(std::to_string(table_file.id_)); idsToDelete.emplace_back(std::to_string(table_file.id_));
} }
...@@ -1710,10 +1671,7 @@ namespace meta { ...@@ -1710,10 +1671,7 @@ namespace meta {
std::string table_id; std::string table_id;
resRow["table_id"].to_string(table_id); resRow["table_id"].to_string(table_id);
auto table_path = GetTablePath(table_id); utils::DeleteTablePath(options_, table_id);
ENGINE_LOG_DEBUG << "Remove table folder: " << table_path;
boost::filesystem::remove_all(table_path);
idsToDeleteSS << "id = " << std::to_string(id) << " OR "; idsToDeleteSS << "id = " << std::to_string(id) << " OR ";
} }
......
...@@ -75,9 +75,6 @@ namespace meta { ...@@ -75,9 +75,6 @@ namespace meta {
Status NextFileId(std::string& file_id); Status NextFileId(std::string& file_id);
Status NextTableId(std::string& table_id); Status NextTableId(std::string& table_id);
Status DiscardFiles(long long to_discard_size); Status DiscardFiles(long long to_discard_size);
std::string GetTablePath(const std::string& table_id);
std::string GetTableDatePartitionPath(const std::string& table_id, DateT& date);
void GetTableFilePath(TableFileSchema& group_file);
Status Initialize(); Status Initialize();
const DBMetaOptions options_; const DBMetaOptions options_;
......
...@@ -8,6 +8,7 @@ ...@@ -8,6 +8,7 @@
#include <string> #include <string>
#include <memory> #include <memory>
#include <map> #include <map>
#include <vector>
namespace zilliz { namespace zilliz {
namespace milvus { namespace milvus {
...@@ -43,6 +44,7 @@ private: ...@@ -43,6 +44,7 @@ private:
struct DBMetaOptions { struct DBMetaOptions {
std::string path; std::string path;
std::vector<std::string> slave_paths;
std::string backend_uri; std::string backend_uri;
ArchiveConf archive_conf = ArchiveConf("delete"); ArchiveConf archive_conf = ArchiveConf("delete");
}; // DBMetaOptions }; // DBMetaOptions
......
...@@ -4,14 +4,58 @@ ...@@ -4,14 +4,58 @@
* Proprietary and confidential. * Proprietary and confidential.
******************************************************************************/ ******************************************************************************/
#include "Utils.h" #include "Utils.h"
#include "utils/CommonUtil.h"
#include "Log.h"
#include <mutex>
#include <chrono> #include <chrono>
#include <boost/filesystem.hpp>
namespace zilliz { namespace zilliz {
namespace milvus { namespace milvus {
namespace engine { namespace engine {
namespace utils { namespace utils {
namespace {
static const std::string TABLES_FOLDER = "/tables/";
static uint64_t index_file_counter = 0;
static std::mutex index_file_counter_mutex;
std::string ConstructParentFolder(const std::string& db_path, const meta::TableFileSchema& table_file) {
std::string table_path = db_path + TABLES_FOLDER + table_file.table_id_;
std::string partition_path = table_path + "/" + std::to_string(table_file.date_);
return partition_path;
}
std::string GetTableFileParentFolder(const DBMetaOptions& options, const meta::TableFileSchema& table_file) {
uint64_t path_count = options.slave_paths.size() + 1;
std::string target_path = options.path;
uint64_t index = 0;
if(meta::TableFileSchema::INDEX == table_file.file_type_) {
// index file is large file and to be persisted permanently
// we need to distribute index files to each db_path averagely
// round robin according to a file counter
std::lock_guard<std::mutex> lock(index_file_counter_mutex);
index = index_file_counter % path_count;
index_file_counter++;
} else {
// for other type files, they could be merged or deleted
// so we round robin according to their file id
index = table_file.id_ % path_count;
}
if (index > 0) {
target_path = options.slave_paths[index - 1];
}
return ConstructParentFolder(target_path, table_file);
}
}
long GetMicroSecTimeStamp() { long GetMicroSecTimeStamp() {
auto now = std::chrono::system_clock::now(); auto now = std::chrono::system_clock::now();
auto micros = std::chrono::duration_cast<std::chrono::microseconds>( auto micros = std::chrono::duration_cast<std::chrono::microseconds>(
...@@ -20,6 +64,82 @@ long GetMicroSecTimeStamp() { ...@@ -20,6 +64,82 @@ long GetMicroSecTimeStamp() {
return micros; return micros;
} }
Status CreateTablePath(const DBMetaOptions& options, const std::string& table_id) {
std::string db_path = options.path;
std::string table_path = db_path + TABLES_FOLDER + table_id;
auto status = server::CommonUtil::CreateDirectory(table_path);
if (status != 0) {
ENGINE_LOG_ERROR << "Create directory " << table_path << " Error";
return Status::Error("Failed to create table path");
}
for(auto& path : options.slave_paths) {
table_path = path + TABLES_FOLDER + table_id;
status = server::CommonUtil::CreateDirectory(table_path);
if (status != 0) {
ENGINE_LOG_ERROR << "Create directory " << table_path << " Error";
return Status::Error("Failed to create table path");
}
}
return Status::OK();
}
Status DeleteTablePath(const DBMetaOptions& options, const std::string& table_id) {
std::string db_path = options.path;
std::string table_path = db_path + TABLES_FOLDER + table_id;
boost::filesystem::remove_all(table_path);
ENGINE_LOG_DEBUG << "Remove table folder: " << table_path;
for(auto& path : options.slave_paths) {
table_path = path + TABLES_FOLDER + table_id;
boost::filesystem::remove_all(table_path);
ENGINE_LOG_DEBUG << "Remove table folder: " << table_path;
}
return Status::OK();
}
Status CreateTableFilePath(const DBMetaOptions& options, meta::TableFileSchema& table_file) {
std::string parent_path = GetTableFileParentFolder(options, table_file);
auto status = server::CommonUtil::CreateDirectory(parent_path);
if (status != 0) {
ENGINE_LOG_ERROR << "Create directory " << parent_path << " Error";
return Status::DBTransactionError("Failed to create partition directory");
}
table_file.location_ = parent_path + "/" + table_file.file_id_;
return Status::OK();
}
Status GetTableFilePath(const DBMetaOptions& options, meta::TableFileSchema& table_file) {
std::string parent_path = ConstructParentFolder(options.path, table_file);
std::string file_path = parent_path + "/" + table_file.file_id_;
if(boost::filesystem::exists(file_path)) {
table_file.location_ = file_path;
return Status::OK();
} else {
for(auto& path : options.slave_paths) {
parent_path = ConstructParentFolder(path, table_file);
file_path = parent_path + "/" + table_file.file_id_;
if(boost::filesystem::exists(file_path)) {
table_file.location_ = file_path;
return Status::OK();
}
}
}
return Status::Error("Table file doesn't exist: " + table_file.file_id_);
}
Status DeleteTableFilePath(const DBMetaOptions& options, meta::TableFileSchema& table_file) {
utils::GetTableFilePath(options, table_file);
boost::filesystem::remove(table_file.location_);
return Status::OK();
}
} // namespace utils } // namespace utils
} // namespace engine } // namespace engine
} // namespace milvus } // namespace milvus
......
...@@ -5,6 +5,10 @@ ...@@ -5,6 +5,10 @@
******************************************************************************/ ******************************************************************************/
#pragma once #pragma once
#include "Options.h"
#include "MetaTypes.h"
#include <string>
namespace zilliz { namespace zilliz {
namespace milvus { namespace milvus {
...@@ -13,6 +17,13 @@ namespace utils { ...@@ -13,6 +17,13 @@ namespace utils {
long GetMicroSecTimeStamp(); long GetMicroSecTimeStamp();
Status CreateTablePath(const DBMetaOptions& options, const std::string& table_id);
Status DeleteTablePath(const DBMetaOptions& options, const std::string& table_id);
Status CreateTableFilePath(const DBMetaOptions& options, meta::TableFileSchema& table_file);
Status GetTableFilePath(const DBMetaOptions& options, meta::TableFileSchema& table_file);
Status DeleteTableFilePath(const DBMetaOptions& options, meta::TableFileSchema& table_file);
} // namespace utils } // namespace utils
} // namespace engine } // namespace engine
} // namespace milvus } // namespace milvus
......
...@@ -8,6 +8,7 @@ ...@@ -8,6 +8,7 @@
#include "ServerConfig.h" #include "ServerConfig.h"
#include "utils/CommonUtil.h" #include "utils/CommonUtil.h"
#include "utils/Log.h" #include "utils/Log.h"
#include "utils/StringHelpFunctions.h"
namespace zilliz { namespace zilliz {
namespace milvus { namespace milvus {
...@@ -19,6 +20,10 @@ DBWrapper::DBWrapper() { ...@@ -19,6 +20,10 @@ DBWrapper::DBWrapper() {
opt.meta.backend_uri = config.GetValue(CONFIG_DB_URL); opt.meta.backend_uri = config.GetValue(CONFIG_DB_URL);
std::string db_path = config.GetValue(CONFIG_DB_PATH); std::string db_path = config.GetValue(CONFIG_DB_PATH);
opt.meta.path = db_path + "/db"; opt.meta.path = db_path + "/db";
std::string db_slave_path = config.GetValue(CONFIG_DB_SLAVE_PATH);
StringHelpFunctions::SplitStringByDelimeter(db_slave_path, ";", opt.meta.slave_paths);
int64_t index_size = config.GetInt64Value(CONFIG_DB_INDEX_TRIGGER_SIZE); int64_t index_size = config.GetInt64Value(CONFIG_DB_INDEX_TRIGGER_SIZE);
if(index_size > 0) {//ensure larger than zero, unit is MB if(index_size > 0) {//ensure larger than zero, unit is MB
opt.index_trigger_size = (size_t)index_size * engine::ONE_MB; opt.index_trigger_size = (size_t)index_size * engine::ONE_MB;
...@@ -59,6 +64,14 @@ DBWrapper::DBWrapper() { ...@@ -59,6 +64,14 @@ DBWrapper::DBWrapper() {
kill(0, SIGUSR1); kill(0, SIGUSR1);
} }
for(auto& path : opt.meta.slave_paths) {
err = CommonUtil::CreateDirectory(path);
if(err != SERVER_SUCCESS) {
std::cout << "ERROR! Failed to create database slave path: " << path << std::endl;
kill(0, SIGUSR1);
}
}
std::string msg = opt.meta.path; std::string msg = opt.meta.path;
try { try {
zilliz::milvus::engine::DB::Open(opt, &db_); zilliz::milvus::engine::DB::Open(opt, &db_);
......
...@@ -23,6 +23,7 @@ static const std::string CONFIG_CLUSTER_MODE = "mode"; ...@@ -23,6 +23,7 @@ static const std::string CONFIG_CLUSTER_MODE = "mode";
static const std::string CONFIG_DB = "db_config"; static const std::string CONFIG_DB = "db_config";
static const std::string CONFIG_DB_URL = "db_backend_url"; static const std::string CONFIG_DB_URL = "db_backend_url";
static const std::string CONFIG_DB_PATH = "db_path"; static const std::string CONFIG_DB_PATH = "db_path";
static const std::string CONFIG_DB_SLAVE_PATH = "db_slave_path";
static const std::string CONFIG_DB_INDEX_TRIGGER_SIZE = "index_building_threshold"; static const std::string CONFIG_DB_INDEX_TRIGGER_SIZE = "index_building_threshold";
static const std::string CONFIG_DB_ARCHIVE_DISK = "archive_disk_threshold"; static const std::string CONFIG_DB_ARCHIVE_DISK = "archive_disk_threshold";
static const std::string CONFIG_DB_ARCHIVE_DAYS = "archive_days_threshold"; static const std::string CONFIG_DB_ARCHIVE_DAYS = "archive_days_threshold";
......
...@@ -51,8 +51,7 @@ bool CommonUtil::GetSystemAvailableThreads(unsigned int &threadCnt) { ...@@ -51,8 +51,7 @@ bool CommonUtil::GetSystemAvailableThreads(unsigned int &threadCnt) {
return true; return true;
} }
bool CommonUtil::IsDirectoryExist(const std::string &path) bool CommonUtil::IsDirectoryExist(const std::string &path) {
{
DIR *dp = nullptr; DIR *dp = nullptr;
if ((dp = opendir(path.c_str())) == nullptr) { if ((dp = opendir(path.c_str())) == nullptr) {
return false; return false;
...@@ -63,6 +62,10 @@ bool CommonUtil::IsDirectoryExist(const std::string &path) ...@@ -63,6 +62,10 @@ bool CommonUtil::IsDirectoryExist(const std::string &path)
} }
ServerError CommonUtil::CreateDirectory(const std::string &path) { ServerError CommonUtil::CreateDirectory(const std::string &path) {
if(path.empty()) {
return SERVER_SUCCESS;
}
struct stat directoryStat; struct stat directoryStat;
int statOK = stat(path.c_str(), &directoryStat); int statOK = stat(path.c_str(), &directoryStat);
if (statOK == 0) { if (statOK == 0) {
...@@ -120,6 +123,10 @@ namespace { ...@@ -120,6 +123,10 @@ namespace {
} }
ServerError CommonUtil::DeleteDirectory(const std::string &path) { ServerError CommonUtil::DeleteDirectory(const std::string &path) {
if(path.empty()) {
return SERVER_SUCCESS;
}
struct stat directoryStat; struct stat directoryStat;
int statOK = stat(path.c_str(), &directoryStat); int statOK = stat(path.c_str(), &directoryStat);
if (statOK != 0) if (statOK != 0)
......
...@@ -27,6 +27,10 @@ void StringHelpFunctions::TrimStringQuote(std::string &string, const std::string ...@@ -27,6 +27,10 @@ void StringHelpFunctions::TrimStringQuote(std::string &string, const std::string
ServerError StringHelpFunctions::SplitStringByDelimeter(const std::string &str, ServerError StringHelpFunctions::SplitStringByDelimeter(const std::string &str,
const std::string &delimeter, const std::string &delimeter,
std::vector<std::string> &result) { std::vector<std::string> &result) {
if(str.empty()) {
return SERVER_SUCCESS;
}
size_t last = 0; size_t last = 0;
size_t index = str.find_first_of(delimeter, last); size_t index = str.find_first_of(delimeter, last);
while (index != std::string::npos) { while (index != std::string::npos) {
......
...@@ -272,7 +272,9 @@ TEST_F(DBTest2, DELETE_TEST) { ...@@ -272,7 +272,9 @@ TEST_F(DBTest2, DELETE_TEST) {
stat = db_->DescribeTable(table_info_get); stat = db_->DescribeTable(table_info_get);
ASSERT_STATS(stat); ASSERT_STATS(stat);
ASSERT_TRUE(boost::filesystem::exists(table_info_get.location_)); bool has_table = false;
db_->HasTable(TABLE_NAME, has_table);
ASSERT_TRUE(has_table);
engine::IDNumbers vector_ids; engine::IDNumbers vector_ids;
...@@ -293,5 +295,7 @@ TEST_F(DBTest2, DELETE_TEST) { ...@@ -293,5 +295,7 @@ TEST_F(DBTest2, DELETE_TEST) {
stat = db_->DeleteTable(TABLE_NAME, dates); stat = db_->DeleteTable(TABLE_NAME, dates);
std::this_thread::sleep_for(std::chrono::seconds(2)); std::this_thread::sleep_for(std::chrono::seconds(2));
ASSERT_TRUE(stat.ok()); ASSERT_TRUE(stat.ok());
ASSERT_FALSE(boost::filesystem::exists(table_info_get.location_));
db_->HasTable(TABLE_NAME, has_table);
ASSERT_FALSE(has_table);
}; };
...@@ -38,7 +38,7 @@ TEST_F(MetaTest, TABLE_TEST) { ...@@ -38,7 +38,7 @@ TEST_F(MetaTest, TABLE_TEST) {
table.table_id_ = table_id; table.table_id_ = table_id;
status = impl_->CreateTable(table); status = impl_->CreateTable(table);
ASSERT_TRUE(status.ok()); ASSERT_TRUE(status.IsAlreadyExist());
table.table_id_ = ""; table.table_id_ = "";
status = impl_->CreateTable(table); status = impl_->CreateTable(table);
......
...@@ -271,8 +271,9 @@ TEST_F(MySQLDBTest, DELETE_TEST) { ...@@ -271,8 +271,9 @@ TEST_F(MySQLDBTest, DELETE_TEST) {
stat = db_->DescribeTable(table_info_get); stat = db_->DescribeTable(table_info_get);
ASSERT_STATS(stat); ASSERT_STATS(stat);
// std::cout << "location: " << table_info_get.location_ << std::endl; bool has_table = false;
ASSERT_TRUE(boost::filesystem::exists(table_info_get.location_)); db_->HasTable(TABLE_NAME, has_table);
ASSERT_TRUE(has_table);
engine::IDNumbers vector_ids; engine::IDNumbers vector_ids;
...@@ -295,7 +296,9 @@ TEST_F(MySQLDBTest, DELETE_TEST) { ...@@ -295,7 +296,9 @@ TEST_F(MySQLDBTest, DELETE_TEST) {
std::this_thread::sleep_for(std::chrono::seconds(5)); std::this_thread::sleep_for(std::chrono::seconds(5));
// std::cout << "5 sec finish" << std::endl; // std::cout << "5 sec finish" << std::endl;
ASSERT_TRUE(stat.ok()); ASSERT_TRUE(stat.ok());
// ASSERT_FALSE(boost::filesystem::exists(table_info_get.location_));
db_->HasTable(TABLE_NAME, has_table);
ASSERT_FALSE(has_table);
delete db_; delete db_;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册