You need to sign in or sign up before continuing.
提交 7ba890b9 编写于 作者: G groot

format db code


Former-commit-id: b6756f8b981219f72e0291f7a1e5b08ef7327d4c
上级 292331ad
...@@ -33,8 +33,8 @@ constexpr uint64_t MAX_TABLE_FILE_MEM = 128 * M; ...@@ -33,8 +33,8 @@ constexpr uint64_t MAX_TABLE_FILE_MEM = 128 * M;
constexpr int VECTOR_TYPE_SIZE = sizeof(float); constexpr int VECTOR_TYPE_SIZE = sizeof(float);
static constexpr uint64_t ONE_KB = K; static constexpr uint64_t ONE_KB = K;
static constexpr uint64_t ONE_MB = ONE_KB*ONE_KB; static constexpr uint64_t ONE_MB = ONE_KB * ONE_KB;
static constexpr uint64_t ONE_GB = ONE_KB*ONE_MB; static constexpr uint64_t ONE_GB = ONE_KB * ONE_MB;
} // namespace engine } // namespace engine
} // namespace milvus } // namespace milvus
......
...@@ -25,6 +25,7 @@ ...@@ -25,6 +25,7 @@
#include <string> #include <string>
#include <memory> #include <memory>
#include <vector>
namespace zilliz { namespace zilliz {
namespace milvus { namespace milvus {
...@@ -33,46 +34,45 @@ namespace engine { ...@@ -33,46 +34,45 @@ namespace engine {
class Env; class Env;
class DB { class DB {
public: public:
DB() = default; DB() = default;
DB(const DB&) = delete; DB(const DB &) = delete;
DB& operator=(const DB&) = delete; DB &operator=(const DB &) = delete;
virtual ~DB() = default; virtual ~DB() = default;
virtual Status Start() = 0; virtual Status Start() = 0;
virtual Status Stop() = 0; virtual Status Stop() = 0;
virtual Status CreateTable(meta::TableSchema& table_schema_) = 0; virtual Status CreateTable(meta::TableSchema &table_schema_) = 0;
virtual Status DeleteTable(const std::string& table_id, const meta::DatesT& dates) = 0; virtual Status DeleteTable(const std::string &table_id, const meta::DatesT &dates) = 0;
virtual Status DescribeTable(meta::TableSchema& table_schema_) = 0; virtual Status DescribeTable(meta::TableSchema &table_schema_) = 0;
virtual Status HasTable(const std::string& table_id, bool& has_or_not_) = 0; virtual Status HasTable(const std::string &table_id, bool &has_or_not_) = 0;
virtual Status AllTables(std::vector<meta::TableSchema>& table_schema_array) = 0; virtual Status AllTables(std::vector<meta::TableSchema> &table_schema_array) = 0;
virtual Status GetTableRowCount(const std::string& table_id, uint64_t& row_count) = 0; virtual Status GetTableRowCount(const std::string &table_id, uint64_t &row_count) = 0;
virtual Status PreloadTable(const std::string& table_id) = 0; virtual Status PreloadTable(const std::string &table_id) = 0;
virtual Status UpdateTableFlag(const std::string &table_id, int64_t flag) = 0; virtual Status UpdateTableFlag(const std::string &table_id, int64_t flag) = 0;
virtual Status InsertVectors(const std::string& table_id_, virtual Status InsertVectors(const std::string &table_id_,
uint64_t n, const float* vectors, IDNumbers& vector_ids_) = 0; uint64_t n, const float *vectors, IDNumbers &vector_ids_) = 0;
virtual Status Query(const std::string& table_id, uint64_t k, uint64_t nq, uint64_t nprobe, virtual Status Query(const std::string &table_id, uint64_t k, uint64_t nq, uint64_t nprobe,
const float* vectors, QueryResults& results) = 0; const float *vectors, QueryResults &results) = 0;
virtual Status Query(const std::string& table_id, uint64_t k, uint64_t nq, uint64_t nprobe, virtual Status Query(const std::string &table_id, uint64_t k, uint64_t nq, uint64_t nprobe,
const float* vectors, const meta::DatesT& dates, QueryResults& results) = 0; const float *vectors, const meta::DatesT &dates, QueryResults &results) = 0;
virtual Status Query(const std::string& table_id, const std::vector<std::string>& file_ids, virtual Status Query(const std::string &table_id, const std::vector<std::string> &file_ids,
uint64_t k, uint64_t nq, uint64_t nprobe, const float* vectors, uint64_t k, uint64_t nq, uint64_t nprobe, const float *vectors,
const meta::DatesT& dates, QueryResults& results) = 0; const meta::DatesT &dates, QueryResults &results) = 0;
virtual Status Size(uint64_t& result) = 0; virtual Status Size(uint64_t &result) = 0;
virtual Status CreateIndex(const std::string& table_id, const TableIndex& index) = 0; virtual Status CreateIndex(const std::string &table_id, const TableIndex &index) = 0;
virtual Status DescribeIndex(const std::string& table_id, TableIndex& index) = 0; virtual Status DescribeIndex(const std::string &table_id, TableIndex &index) = 0;
virtual Status DropIndex(const std::string& table_id) = 0; virtual Status DropIndex(const std::string &table_id) = 0;
virtual Status DropAll() = 0; virtual Status DropAll() = 0;
}; // DB }; // DB
using DBPtr = std::shared_ptr<DB>; using DBPtr = std::shared_ptr<DB>;
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
// under the License. // under the License.
#include "DBFactory.h" #include "db/DBFactory.h"
#include "DBImpl.h" #include "DBImpl.h"
#include "utils/Exception.h" #include "utils/Exception.h"
#include "meta/MetaFactory.h" #include "meta/MetaFactory.h"
...@@ -33,14 +33,16 @@ namespace zilliz { ...@@ -33,14 +33,16 @@ namespace zilliz {
namespace milvus { namespace milvus {
namespace engine { namespace engine {
DBOptions DBFactory::BuildOption() { DBOptions
DBFactory::BuildOption() {
auto meta = MetaFactory::BuildOption(); auto meta = MetaFactory::BuildOption();
DBOptions options; DBOptions options;
options.meta_ = meta; options.meta_ = meta;
return options; return options;
} }
DBPtr DBFactory::Build(const DBOptions& options) { DBPtr
DBFactory::Build(const DBOptions &options) {
return std::make_shared<DBImpl>(options); return std::make_shared<DBImpl>(options);
} }
......
...@@ -28,13 +28,12 @@ namespace milvus { ...@@ -28,13 +28,12 @@ namespace milvus {
namespace engine { namespace engine {
class DBFactory { class DBFactory {
public: public:
static DBOptions BuildOption(); static DBOptions BuildOption();
static DBPtr Build(const DBOptions& options); static DBPtr Build(const DBOptions &options);
}; };
} // namespace engine
} } // namespace milvus
} } // namespace zilliz
}
此差异已折叠。
...@@ -29,7 +29,8 @@ ...@@ -29,7 +29,8 @@
#include <thread> #include <thread>
#include <list> #include <list>
#include <set> #include <set>
#include <vector>
#include <string>
namespace zilliz { namespace zilliz {
namespace milvus { namespace milvus {
...@@ -68,11 +69,11 @@ class DBImpl : public DB { ...@@ -68,11 +69,11 @@ class DBImpl : public DB {
Status InsertVectors(const std::string &table_id, uint64_t n, const float *vectors, IDNumbers &vector_ids) override; Status InsertVectors(const std::string &table_id, uint64_t n, const float *vectors, IDNumbers &vector_ids) override;
Status CreateIndex(const std::string& table_id, const TableIndex& index) override; Status CreateIndex(const std::string &table_id, const TableIndex &index) override;
Status DescribeIndex(const std::string& table_id, TableIndex& index) override; Status DescribeIndex(const std::string &table_id, TableIndex &index) override;
Status DropIndex(const std::string& table_id) override; Status DropIndex(const std::string &table_id) override;
Status Query(const std::string &table_id, Status Query(const std::string &table_id,
uint64_t k, uint64_t k,
...@@ -123,7 +124,7 @@ class DBImpl : public DB { ...@@ -123,7 +124,7 @@ class DBImpl : public DB {
Status BackgroundMergeFiles(const std::string &table_id); Status BackgroundMergeFiles(const std::string &table_id);
void BackgroundCompaction(std::set<std::string> table_ids); void BackgroundCompaction(std::set<std::string> table_ids);
void StartBuildIndexTask(bool force=false); void StartBuildIndexTask(bool force = false);
void BackgroundBuildIndex(); void BackgroundBuildIndex();
Status BuildIndex(const meta::TableFileSchema &); Status BuildIndex(const meta::TableFileSchema &);
...@@ -151,7 +152,6 @@ class DBImpl : public DB { ...@@ -151,7 +152,6 @@ class DBImpl : public DB {
std::list<std::future<void>> index_thread_results_; std::list<std::future<void>> index_thread_results_;
std::mutex build_index_mutex_; std::mutex build_index_mutex_;
}; // DBImpl }; // DBImpl
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
// specific language governing permissions and limitations // specific language governing permissions and limitations
// under the License. // under the License.
#include "IDGenerator.h" #include "db/IDGenerator.h"
#include <chrono> #include <chrono>
#include <assert.h> #include <assert.h>
...@@ -29,16 +29,18 @@ IDGenerator::~IDGenerator() = default; ...@@ -29,16 +29,18 @@ IDGenerator::~IDGenerator() = default;
constexpr size_t SimpleIDGenerator::MAX_IDS_PER_MICRO; constexpr size_t SimpleIDGenerator::MAX_IDS_PER_MICRO;
IDNumber SimpleIDGenerator::GetNextIDNumber() { IDNumber
SimpleIDGenerator::GetNextIDNumber() {
auto now = std::chrono::system_clock::now(); auto now = std::chrono::system_clock::now();
auto micros = std::chrono::duration_cast<std::chrono::microseconds>( auto micros = std::chrono::duration_cast<std::chrono::microseconds>(
now.time_since_epoch()).count(); now.time_since_epoch()).count();
return micros * MAX_IDS_PER_MICRO; return micros * MAX_IDS_PER_MICRO;
} }
void SimpleIDGenerator::NextIDNumbers(size_t n, IDNumbers& ids) { void
SimpleIDGenerator::NextIDNumbers(size_t n, IDNumbers &ids) {
if (n > MAX_IDS_PER_MICRO) { if (n > MAX_IDS_PER_MICRO) {
NextIDNumbers(n-MAX_IDS_PER_MICRO, ids); NextIDNumbers(n - MAX_IDS_PER_MICRO, ids);
NextIDNumbers(MAX_IDS_PER_MICRO, ids); NextIDNumbers(MAX_IDS_PER_MICRO, ids);
return; return;
} }
...@@ -48,20 +50,20 @@ void SimpleIDGenerator::NextIDNumbers(size_t n, IDNumbers& ids) { ...@@ -48,20 +50,20 @@ void SimpleIDGenerator::NextIDNumbers(size_t n, IDNumbers& ids) {
auto now = std::chrono::system_clock::now(); auto now = std::chrono::system_clock::now();
auto micros = std::chrono::duration_cast<std::chrono::microseconds>( auto micros = std::chrono::duration_cast<std::chrono::microseconds>(
now.time_since_epoch()).count(); now.time_since_epoch()).count();
micros *= MAX_IDS_PER_MICRO; micros *= MAX_IDS_PER_MICRO;
for (int pos=0; pos<n; ++pos) { for (int pos = 0; pos < n; ++pos) {
ids.push_back(micros+pos); ids.push_back(micros + pos);
} }
} }
void SimpleIDGenerator::GetNextIDNumbers(size_t n, IDNumbers& ids) { void
SimpleIDGenerator::GetNextIDNumbers(size_t n, IDNumbers &ids) {
ids.clear(); ids.clear();
NextIDNumbers(n, ids); NextIDNumbers(n, ids);
} }
} // namespace engine } // namespace engine
} // namespace milvus } // namespace milvus
} // namespace zilliz } // namespace zilliz
...@@ -22,7 +22,6 @@ ...@@ -22,7 +22,6 @@
#include <cstddef> #include <cstddef>
#include <vector> #include <vector>
namespace zilliz { namespace zilliz {
namespace milvus { namespace milvus {
namespace engine { namespace engine {
...@@ -55,7 +54,6 @@ class SimpleIDGenerator : public IDGenerator { ...@@ -55,7 +54,6 @@ class SimpleIDGenerator : public IDGenerator {
NextIDNumbers(size_t n, IDNumbers &ids); NextIDNumbers(size_t n, IDNumbers &ids);
static constexpr size_t MAX_IDS_PER_MICRO = 1000; static constexpr size_t MAX_IDS_PER_MICRO = 1000;
}; // SimpleIDGenerator }; // SimpleIDGenerator
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
// specific language governing permissions and limitations // specific language governing permissions and limitations
// under the License. // under the License.
#include "Options.h" #include "db/Options.h"
#include "utils/Exception.h" #include "utils/Exception.h"
#include "utils/Log.h" #include "utils/Log.h"
...@@ -27,18 +27,20 @@ namespace zilliz { ...@@ -27,18 +27,20 @@ namespace zilliz {
namespace milvus { namespace milvus {
namespace engine { namespace engine {
ArchiveConf::ArchiveConf(const std::string& type, const std::string& criterias) { ArchiveConf::ArchiveConf(const std::string &type, const std::string &criterias) {
ParseType(type); ParseType(type);
ParseCritirias(criterias); ParseCritirias(criterias);
} }
void ArchiveConf::SetCriterias(const ArchiveConf::CriteriaT& criterial) { void
for(auto& pair : criterial) { ArchiveConf::SetCriterias(const ArchiveConf::CriteriaT &criterial) {
for (auto &pair : criterial) {
criterias_[pair.first] = pair.second; criterias_[pair.first] = pair.second;
} }
} }
void ArchiveConf::ParseCritirias(const std::string& criterias) { void
ArchiveConf::ParseCritirias(const std::string &criterias) {
std::stringstream ss(criterias); std::stringstream ss(criterias);
std::vector<std::string> tokens; std::vector<std::string> tokens;
...@@ -48,8 +50,8 @@ void ArchiveConf::ParseCritirias(const std::string& criterias) { ...@@ -48,8 +50,8 @@ void ArchiveConf::ParseCritirias(const std::string& criterias) {
return; return;
} }
for (auto& token : tokens) { for (auto &token : tokens) {
if(token.empty()) { if (token.empty()) {
continue; continue;
} }
...@@ -67,12 +69,12 @@ void ArchiveConf::ParseCritirias(const std::string& criterias) { ...@@ -67,12 +69,12 @@ void ArchiveConf::ParseCritirias(const std::string& criterias) {
auto value = std::stoi(kv[1]); auto value = std::stoi(kv[1]);
criterias_[kv[0]] = value; criterias_[kv[0]] = value;
} }
catch (std::out_of_range&){ catch (std::out_of_range &) {
std::string msg = "Out of range: '" + kv[1] + "'"; std::string msg = "Out of range: '" + kv[1] + "'";
ENGINE_LOG_ERROR << msg; ENGINE_LOG_ERROR << msg;
throw InvalidArgumentException(msg); throw InvalidArgumentException(msg);
} }
catch (...){ catch (...) {
std::string msg = "Invalid argument: '" + kv[1] + "'"; std::string msg = "Invalid argument: '" + kv[1] + "'";
ENGINE_LOG_ERROR << msg; ENGINE_LOG_ERROR << msg;
throw InvalidArgumentException(msg); throw InvalidArgumentException(msg);
...@@ -80,7 +82,8 @@ void ArchiveConf::ParseCritirias(const std::string& criterias) { ...@@ -80,7 +82,8 @@ void ArchiveConf::ParseCritirias(const std::string& criterias) {
} }
} }
void ArchiveConf::ParseType(const std::string& type) { void
ArchiveConf::ParseType(const std::string &type) {
if (type != "delete" && type != "swap") { if (type != "delete" && type != "swap") {
std::string msg = "Invalid argument: type='" + type + "'"; std::string msg = "Invalid argument: type='" + type + "'";
throw InvalidArgumentException(msg); throw InvalidArgumentException(msg);
......
...@@ -30,22 +30,27 @@ namespace engine { ...@@ -30,22 +30,27 @@ namespace engine {
class Env; class Env;
static const char* ARCHIVE_CONF_DISK = "disk"; static const char *ARCHIVE_CONF_DISK = "disk";
static const char* ARCHIVE_CONF_DAYS = "days"; static const char *ARCHIVE_CONF_DAYS = "days";
struct ArchiveConf { struct ArchiveConf {
using CriteriaT = std::map<std::string, int>; using CriteriaT = std::map<std::string, int>;
ArchiveConf(const std::string& type, const std::string& criterias = std::string()); explicit ArchiveConf(const std::string &type, const std::string &criterias = std::string());
const std::string& GetType() const { return type_; } const std::string &GetType() const {
const CriteriaT GetCriterias() const { return criterias_; } return type_;
}
void SetCriterias(const ArchiveConf::CriteriaT& criterial); const CriteriaT GetCriterias() const {
return criterias_;
}
private: void SetCriterias(const ArchiveConf::CriteriaT &criterial);
void ParseCritirias(const std::string& type);
void ParseType(const std::string& criterias); private:
void ParseCritirias(const std::string &type);
void ParseType(const std::string &criterias);
std::string type_; std::string type_;
CriteriaT criterias_; CriteriaT criterias_;
...@@ -65,7 +70,7 @@ struct DBOptions { ...@@ -65,7 +70,7 @@ struct DBOptions {
CLUSTER_WRITABLE CLUSTER_WRITABLE
} MODE; } MODE;
uint16_t merge_trigger_number_ = 2; uint16_t merge_trigger_number_ = 2;
DBMetaOptions meta_; DBMetaOptions meta_;
int mode_ = MODE::SINGLE; int mode_ = MODE::SINGLE;
......
...@@ -21,22 +21,23 @@ ...@@ -21,22 +21,23 @@
#include <vector> #include <vector>
#include <stdint.h> #include <stdint.h>
#include <utility>
namespace zilliz { namespace zilliz {
namespace milvus { namespace milvus {
namespace engine { namespace engine {
typedef long IDNumber; typedef int64_t IDNumber;
typedef IDNumber* IDNumberPtr; typedef IDNumber *IDNumberPtr;
typedef std::vector<IDNumber> IDNumbers; typedef std::vector<IDNumber> IDNumbers;
typedef std::vector<std::pair<IDNumber, double>> QueryResult; typedef std::vector<std::pair<IDNumber, double>> QueryResult;
typedef std::vector<QueryResult> QueryResults; typedef std::vector<QueryResult> QueryResults;
struct TableIndex { struct TableIndex {
int32_t engine_type_ = (int)EngineType::FAISS_IDMAP; int32_t engine_type_ = (int) EngineType::FAISS_IDMAP;
int32_t nlist_ = 16384; int32_t nlist_ = 16384;
int32_t metric_type_ = (int)MetricType::L2; int32_t metric_type_ = (int) MetricType::L2;
}; };
} // namespace engine } // namespace engine
......
...@@ -15,13 +15,14 @@ ...@@ -15,13 +15,14 @@
// specific language governing permissions and limitations // specific language governing permissions and limitations
// under the License. // under the License.
#include "Utils.h" #include "db/Utils.h"
#include "utils/CommonUtil.h" #include "utils/CommonUtil.h"
#include "utils/Log.h" #include "utils/Log.h"
#include <mutex> #include <mutex>
#include <chrono> #include <chrono>
#include <regex> #include <regex>
#include <vector>
#include <boost/filesystem.hpp> #include <boost/filesystem.hpp>
namespace zilliz { namespace zilliz {
...@@ -31,23 +32,25 @@ namespace utils { ...@@ -31,23 +32,25 @@ namespace utils {
namespace { namespace {
const char* TABLES_FOLDER = "/tables/"; const char *TABLES_FOLDER = "/tables/";
uint64_t index_file_counter = 0; uint64_t index_file_counter = 0;
std::mutex index_file_counter_mutex; std::mutex index_file_counter_mutex;
std::string ConstructParentFolder(const std::string& db_path, const meta::TableFileSchema& table_file) { std::string
ConstructParentFolder(const std::string &db_path, const meta::TableFileSchema &table_file) {
std::string table_path = db_path + TABLES_FOLDER + table_file.table_id_; std::string table_path = db_path + TABLES_FOLDER + table_file.table_id_;
std::string partition_path = table_path + "/" + std::to_string(table_file.date_); std::string partition_path = table_path + "/" + std::to_string(table_file.date_);
return partition_path; return partition_path;
} }
std::string GetTableFileParentFolder(const DBMetaOptions& options, const meta::TableFileSchema& table_file) { std::string
GetTableFileParentFolder(const DBMetaOptions &options, const meta::TableFileSchema &table_file) {
uint64_t path_count = options.slave_paths_.size() + 1; uint64_t path_count = options.slave_paths_.size() + 1;
std::string target_path = options.path_; std::string target_path = options.path_;
uint64_t index = 0; uint64_t index = 0;
if(meta::TableFileSchema::NEW_INDEX == table_file.file_type_) { if (meta::TableFileSchema::NEW_INDEX == table_file.file_type_) {
// index file is large file and to be persisted permanently // index file is large file and to be persisted permanently
// we need to distribute index files to each db_path averagely // we need to distribute index files to each db_path averagely
// round robin according to a file counter // round robin according to a file counter
...@@ -67,17 +70,19 @@ std::string GetTableFileParentFolder(const DBMetaOptions& options, const meta::T ...@@ -67,17 +70,19 @@ std::string GetTableFileParentFolder(const DBMetaOptions& options, const meta::T
return ConstructParentFolder(target_path, table_file); return ConstructParentFolder(target_path, table_file);
} }
} } // namespace
long GetMicroSecTimeStamp() { int64_t
GetMicroSecTimeStamp() {
auto now = std::chrono::system_clock::now(); auto now = std::chrono::system_clock::now();
auto micros = std::chrono::duration_cast<std::chrono::microseconds>( auto micros = std::chrono::duration_cast<std::chrono::microseconds>(
now.time_since_epoch()).count(); now.time_since_epoch()).count();
return micros; return micros;
} }
Status CreateTablePath(const DBMetaOptions& options, const std::string& table_id) { Status
CreateTablePath(const DBMetaOptions &options, const std::string &table_id) {
std::string db_path = options.path_; std::string db_path = options.path_;
std::string table_path = db_path + TABLES_FOLDER + table_id; std::string table_path = db_path + TABLES_FOLDER + table_id;
auto status = server::CommonUtil::CreateDirectory(table_path); auto status = server::CommonUtil::CreateDirectory(table_path);
...@@ -86,7 +91,7 @@ Status CreateTablePath(const DBMetaOptions& options, const std::string& table_id ...@@ -86,7 +91,7 @@ Status CreateTablePath(const DBMetaOptions& options, const std::string& table_id
return status; return status;
} }
for(auto& path : options.slave_paths_) { for (auto &path : options.slave_paths_) {
table_path = path + TABLES_FOLDER + table_id; table_path = path + TABLES_FOLDER + table_id;
status = server::CommonUtil::CreateDirectory(table_path); status = server::CommonUtil::CreateDirectory(table_path);
if (!status.ok()) { if (!status.ok()) {
...@@ -98,17 +103,18 @@ Status CreateTablePath(const DBMetaOptions& options, const std::string& table_id ...@@ -98,17 +103,18 @@ Status CreateTablePath(const DBMetaOptions& options, const std::string& table_id
return Status::OK(); return Status::OK();
} }
Status DeleteTablePath(const DBMetaOptions& options, const std::string& table_id, bool force) { Status
DeleteTablePath(const DBMetaOptions &options, const std::string &table_id, bool force) {
std::vector<std::string> paths = options.slave_paths_; std::vector<std::string> paths = options.slave_paths_;
paths.push_back(options.path_); paths.push_back(options.path_);
for(auto& path : paths) { for (auto &path : paths) {
std::string table_path = path + TABLES_FOLDER + table_id; std::string table_path = path + TABLES_FOLDER + table_id;
if(force) { if (force) {
boost::filesystem::remove_all(table_path); boost::filesystem::remove_all(table_path);
ENGINE_LOG_DEBUG << "Remove table folder: " << table_path; ENGINE_LOG_DEBUG << "Remove table folder: " << table_path;
} else if(boost::filesystem::exists(table_path) && } else if (boost::filesystem::exists(table_path) &&
boost::filesystem::is_empty(table_path)) { boost::filesystem::is_empty(table_path)) {
boost::filesystem::remove_all(table_path); boost::filesystem::remove_all(table_path);
ENGINE_LOG_DEBUG << "Remove table folder: " << table_path; ENGINE_LOG_DEBUG << "Remove table folder: " << table_path;
} }
...@@ -117,7 +123,8 @@ Status DeleteTablePath(const DBMetaOptions& options, const std::string& table_id ...@@ -117,7 +123,8 @@ Status DeleteTablePath(const DBMetaOptions& options, const std::string& table_id
return Status::OK(); return Status::OK();
} }
Status CreateTableFilePath(const DBMetaOptions& options, meta::TableFileSchema& table_file) { Status
CreateTableFilePath(const DBMetaOptions &options, meta::TableFileSchema &table_file) {
std::string parent_path = GetTableFileParentFolder(options, table_file); std::string parent_path = GetTableFileParentFolder(options, table_file);
auto status = server::CommonUtil::CreateDirectory(parent_path); auto status = server::CommonUtil::CreateDirectory(parent_path);
...@@ -131,17 +138,18 @@ Status CreateTableFilePath(const DBMetaOptions& options, meta::TableFileSchema& ...@@ -131,17 +138,18 @@ Status CreateTableFilePath(const DBMetaOptions& options, meta::TableFileSchema&
return Status::OK(); return Status::OK();
} }
Status GetTableFilePath(const DBMetaOptions& options, meta::TableFileSchema& table_file) { Status
GetTableFilePath(const DBMetaOptions &options, meta::TableFileSchema &table_file) {
std::string parent_path = ConstructParentFolder(options.path_, table_file); std::string parent_path = ConstructParentFolder(options.path_, table_file);
std::string file_path = parent_path + "/" + table_file.file_id_; std::string file_path = parent_path + "/" + table_file.file_id_;
if(boost::filesystem::exists(file_path)) { if (boost::filesystem::exists(file_path)) {
table_file.location_ = file_path; table_file.location_ = file_path;
return Status::OK(); return Status::OK();
} else { } else {
for(auto& path : options.slave_paths_) { for (auto &path : options.slave_paths_) {
parent_path = ConstructParentFolder(path, table_file); parent_path = ConstructParentFolder(path, table_file);
file_path = parent_path + "/" + table_file.file_id_; file_path = parent_path + "/" + table_file.file_id_;
if(boost::filesystem::exists(file_path)) { if (boost::filesystem::exists(file_path)) {
table_file.location_ = file_path; table_file.location_ = file_path;
return Status::OK(); return Status::OK();
} }
...@@ -155,49 +163,55 @@ Status GetTableFilePath(const DBMetaOptions& options, meta::TableFileSchema& tab ...@@ -155,49 +163,55 @@ Status GetTableFilePath(const DBMetaOptions& options, meta::TableFileSchema& tab
return Status(DB_ERROR, msg); return Status(DB_ERROR, msg);
} }
Status DeleteTableFilePath(const DBMetaOptions& options, meta::TableFileSchema& table_file) { Status
DeleteTableFilePath(const DBMetaOptions &options, meta::TableFileSchema &table_file) {
utils::GetTableFilePath(options, table_file); utils::GetTableFilePath(options, table_file);
boost::filesystem::remove(table_file.location_); boost::filesystem::remove(table_file.location_);
return Status::OK(); return Status::OK();
} }
bool IsSameIndex(const TableIndex& index1, const TableIndex& index2) { bool
IsSameIndex(const TableIndex &index1, const TableIndex &index2) {
return index1.engine_type_ == index2.engine_type_ return index1.engine_type_ == index2.engine_type_
&& index1.nlist_ == index2.nlist_ && index1.nlist_ == index2.nlist_
&& index1.metric_type_ == index2.metric_type_; && index1.metric_type_ == index2.metric_type_;
} }
meta::DateT GetDate(const std::time_t& t, int day_delta) { meta::DateT
GetDate(const std::time_t &t, int day_delta) {
struct tm ltm; struct tm ltm;
localtime_r(&t, &ltm); localtime_r(&t, &ltm);
if (day_delta > 0) { if (day_delta > 0) {
do { do {
++ltm.tm_mday; ++ltm.tm_mday;
--day_delta; --day_delta;
} while(day_delta > 0); } while (day_delta > 0);
mktime(&ltm); mktime(&ltm);
} else if (day_delta < 0) { } else if (day_delta < 0) {
do { do {
--ltm.tm_mday; --ltm.tm_mday;
++day_delta; ++day_delta;
} while(day_delta < 0); } while (day_delta < 0);
mktime(&ltm); mktime(&ltm);
} else { } else {
ltm.tm_mday; ltm.tm_mday;
} }
return ltm.tm_year*10000 + ltm.tm_mon*100 + ltm.tm_mday; return ltm.tm_year * 10000 + ltm.tm_mon * 100 + ltm.tm_mday;
} }
meta::DateT GetDateWithDelta(int day_delta) { meta::DateT
GetDateWithDelta(int day_delta) {
return GetDate(std::time(nullptr), day_delta); return GetDate(std::time(nullptr), day_delta);
} }
meta::DateT GetDate() { meta::DateT
GetDate() {
return GetDate(std::time(nullptr), 0); return GetDate(std::time(nullptr), 0);
} }
// URI format: dialect://username:password@host:port/database // URI format: dialect://username:password@host:port/database
Status ParseMetaUri(const std::string& uri, MetaUriInfo& info) { Status
ParseMetaUri(const std::string &uri, MetaUriInfo &info) {
std::string dialect_regex = "(.*)"; std::string dialect_regex = "(.*)";
std::string username_tegex = "(.*)"; std::string username_tegex = "(.*)";
std::string password_regex = "(.*)"; std::string password_regex = "(.*)";
...@@ -205,7 +219,7 @@ Status ParseMetaUri(const std::string& uri, MetaUriInfo& info) { ...@@ -205,7 +219,7 @@ Status ParseMetaUri(const std::string& uri, MetaUriInfo& info) {
std::string port_regex = "(.*)"; std::string port_regex = "(.*)";
std::string db_name_regex = "(.*)"; std::string db_name_regex = "(.*)";
std::string uri_regex_str = std::string uri_regex_str =
dialect_regex + "\\:\\/\\/" + dialect_regex + "\\:\\/\\/" +
username_tegex + "\\:" + username_tegex + "\\:" +
password_regex + "\\@" + password_regex + "\\@" +
host_regex + "\\:" + host_regex + "\\:" +
......
...@@ -29,20 +29,30 @@ namespace milvus { ...@@ -29,20 +29,30 @@ namespace milvus {
namespace engine { namespace engine {
namespace utils { namespace utils {
long GetMicroSecTimeStamp(); int64_t
GetMicroSecTimeStamp();
Status CreateTablePath(const DBMetaOptions& options, const std::string& table_id); Status
Status DeleteTablePath(const DBMetaOptions& options, const std::string& table_id, bool force = true); CreateTablePath(const DBMetaOptions &options, const std::string &table_id);
Status
DeleteTablePath(const DBMetaOptions &options, const std::string &table_id, bool force = true);
Status CreateTableFilePath(const DBMetaOptions& options, meta::TableFileSchema& table_file); Status
Status GetTableFilePath(const DBMetaOptions& options, meta::TableFileSchema& table_file); CreateTableFilePath(const DBMetaOptions &options, meta::TableFileSchema &table_file);
Status DeleteTableFilePath(const DBMetaOptions& options, meta::TableFileSchema& table_file); Status
GetTableFilePath(const DBMetaOptions &options, meta::TableFileSchema &table_file);
Status
DeleteTableFilePath(const DBMetaOptions &options, meta::TableFileSchema &table_file);
bool IsSameIndex(const TableIndex& index1, const TableIndex& index2); bool
IsSameIndex(const TableIndex &index1, const TableIndex &index2);
meta::DateT GetDate(const std::time_t &t, int day_delta = 0); meta::DateT
meta::DateT GetDate(); GetDate(const std::time_t &t, int day_delta = 0);
meta::DateT GetDateWithDelta(int day_delta); meta::DateT
GetDate();
meta::DateT
GetDateWithDelta(int day_delta);
struct MetaUriInfo { struct MetaUriInfo {
std::string dialect_; std::string dialect_;
...@@ -53,7 +63,8 @@ struct MetaUriInfo { ...@@ -53,7 +63,8 @@ struct MetaUriInfo {
std::string db_name_; std::string db_name_;
}; };
Status ParseMetaUri(const std::string& uri, MetaUriInfo& info); Status
ParseMetaUri(const std::string &uri, MetaUriInfo &info);
} // namespace utils } // namespace utils
} // namespace engine } // namespace engine
......
...@@ -15,10 +15,12 @@ ...@@ -15,10 +15,12 @@
// specific language governing permissions and limitations // specific language governing permissions and limitations
// under the License. // under the License.
#include "EngineFactory.h" #include "db/engine/EngineFactory.h"
#include "ExecutionEngineImpl.h" #include "db/engine/ExecutionEngineImpl.h"
#include "utils/Log.h" #include "utils/Log.h"
#include <memory>
namespace zilliz { namespace zilliz {
namespace milvus { namespace milvus {
namespace engine { namespace engine {
...@@ -29,20 +31,19 @@ EngineFactory::Build(uint16_t dimension, ...@@ -29,20 +31,19 @@ EngineFactory::Build(uint16_t dimension,
EngineType index_type, EngineType index_type,
MetricType metric_type, MetricType metric_type,
int32_t nlist) { int32_t nlist) {
if (index_type == EngineType::INVALID) {
if(index_type == EngineType::INVALID) {
ENGINE_LOG_ERROR << "Unsupported engine type"; ENGINE_LOG_ERROR << "Unsupported engine type";
return nullptr; return nullptr;
} }
ENGINE_LOG_DEBUG << "EngineFactory index type: " << (int)index_type; ENGINE_LOG_DEBUG << "EngineFactory index type: " << (int) index_type;
ExecutionEnginePtr execution_engine_ptr = ExecutionEnginePtr execution_engine_ptr =
std::make_shared<ExecutionEngineImpl>(dimension, location, index_type, metric_type, nlist); std::make_shared<ExecutionEngineImpl>(dimension, location, index_type, metric_type, nlist);
execution_engine_ptr->Init(); execution_engine_ptr->Init();
return execution_engine_ptr; return execution_engine_ptr;
} }
} } // namespace engine
} } // namespace milvus
} } // namespace zilliz
\ No newline at end of file
...@@ -21,19 +21,22 @@ ...@@ -21,19 +21,22 @@
#include "ExecutionEngine.h" #include "ExecutionEngine.h"
#include "utils/Status.h" #include "utils/Status.h"
#include <string>
namespace zilliz { namespace zilliz {
namespace milvus { namespace milvus {
namespace engine { namespace engine {
class EngineFactory { class EngineFactory {
public: public:
static ExecutionEnginePtr Build(uint16_t dimension, static ExecutionEnginePtr Build(uint16_t dimension,
const std::string& location, const std::string &location,
EngineType index_type, EngineType index_type,
MetricType metric_type, MetricType metric_type,
int32_t nlist); int32_t nlist);
}; };
} } // namespace engine
} } // namespace milvus
} } // namespace zilliz
...@@ -21,6 +21,7 @@ ...@@ -21,6 +21,7 @@
#include <vector> #include <vector>
#include <memory> #include <memory>
#include <string>
namespace zilliz { namespace zilliz {
namespace milvus { namespace milvus {
...@@ -41,9 +42,8 @@ enum class MetricType { ...@@ -41,9 +42,8 @@ enum class MetricType {
}; };
class ExecutionEngine { class ExecutionEngine {
public: public:
virtual Status AddWithIds(int64_t n, const float *xdata, const int64_t *xids) = 0;
virtual Status AddWithIds(long n, const float *xdata, const long *xids) = 0;
virtual size_t Count() const = 0; virtual size_t Count() const = 0;
...@@ -63,16 +63,16 @@ public: ...@@ -63,16 +63,16 @@ public:
virtual std::shared_ptr<ExecutionEngine> Clone() = 0; virtual std::shared_ptr<ExecutionEngine> Clone() = 0;
virtual Status Merge(const std::string& location) = 0; virtual Status Merge(const std::string &location) = 0;
virtual Status Search(long n, virtual Status Search(int64_t n,
const float *data, const float *data,
long k, int64_t k,
long nprobe, int64_t nprobe,
float *distances, float *distances,
long *labels) const = 0; int64_t *labels) const = 0;
virtual std::shared_ptr<ExecutionEngine> BuildIndex(const std::string& location, EngineType engine_type) = 0; virtual std::shared_ptr<ExecutionEngine> BuildIndex(const std::string &location, EngineType engine_type) = 0;
virtual Status Cache() = 0; virtual Status Cache() = 0;
...@@ -89,7 +89,6 @@ public: ...@@ -89,7 +89,6 @@ public:
using ExecutionEnginePtr = std::shared_ptr<ExecutionEngine>; using ExecutionEnginePtr = std::shared_ptr<ExecutionEngine>;
} // namespace engine } // namespace engine
} // namespace milvus } // namespace milvus
} // namespace zilliz } // namespace zilliz
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
// specific language governing permissions and limitations // specific language governing permissions and limitations
// under the License. // under the License.
#include "ExecutionEngineImpl.h" #include "db/engine/ExecutionEngineImpl.h"
#include "cache/GpuCacheMgr.h" #include "cache/GpuCacheMgr.h"
#include "cache/CpuCacheMgr.h" #include "cache/CpuCacheMgr.h"
#include "metrics/Metrics.h" #include "metrics/Metrics.h"
...@@ -29,6 +29,7 @@ ...@@ -29,6 +29,7 @@
#include "server/Config.h" #include "server/Config.h"
#include <stdexcept> #include <stdexcept>
#include <utility>
namespace zilliz { namespace zilliz {
namespace milvus { namespace milvus {
...@@ -72,7 +73,8 @@ ExecutionEngineImpl::ExecutionEngineImpl(VecIndexPtr index, ...@@ -72,7 +73,8 @@ ExecutionEngineImpl::ExecutionEngineImpl(VecIndexPtr index,
nlist_(nlist) { nlist_(nlist) {
} }
VecIndexPtr ExecutionEngineImpl::CreatetVecIndex(EngineType type) { VecIndexPtr
ExecutionEngineImpl::CreatetVecIndex(EngineType type) {
std::shared_ptr<VecIndex> index; std::shared_ptr<VecIndex> index;
switch (type) { switch (type) {
case EngineType::FAISS_IDMAP: { case EngineType::FAISS_IDMAP: {
...@@ -99,41 +101,48 @@ VecIndexPtr ExecutionEngineImpl::CreatetVecIndex(EngineType type) { ...@@ -99,41 +101,48 @@ VecIndexPtr ExecutionEngineImpl::CreatetVecIndex(EngineType type) {
return index; return index;
} }
Status ExecutionEngineImpl::AddWithIds(long n, const float *xdata, const long *xids) { Status
ExecutionEngineImpl::AddWithIds(int64_t n, const float *xdata, const int64_t *xids) {
auto status = index_->Add(n, xdata, xids); auto status = index_->Add(n, xdata, xids);
return status; return status;
} }
size_t ExecutionEngineImpl::Count() const { size_t
if(index_ == nullptr) { ExecutionEngineImpl::Count() const {
if (index_ == nullptr) {
ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, return count 0"; ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, return count 0";
return 0; return 0;
} }
return index_->Count(); return index_->Count();
} }
size_t ExecutionEngineImpl::Size() const { size_t
ExecutionEngineImpl::Size() const {
return (size_t) (Count() * Dimension()) * sizeof(float); return (size_t) (Count() * Dimension()) * sizeof(float);
} }
size_t ExecutionEngineImpl::Dimension() const { size_t
if(index_ == nullptr) { ExecutionEngineImpl::Dimension() const {
if (index_ == nullptr) {
ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, return dimension " << dim_; ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, return dimension " << dim_;
return dim_; return dim_;
} }
return index_->Dimension(); return index_->Dimension();
} }
size_t ExecutionEngineImpl::PhysicalSize() const { size_t
ExecutionEngineImpl::PhysicalSize() const {
return server::CommonUtil::GetFileSize(location_); return server::CommonUtil::GetFileSize(location_);
} }
Status ExecutionEngineImpl::Serialize() { Status
ExecutionEngineImpl::Serialize() {
auto status = write_index(index_, location_); auto status = write_index(index_, location_);
return status; return status;
} }
Status ExecutionEngineImpl::Load(bool to_cache) { Status
ExecutionEngineImpl::Load(bool to_cache) {
index_ = cache::CpuCacheMgr::GetInstance()->GetIndex(location_); index_ = cache::CpuCacheMgr::GetInstance()->GetIndex(location_);
bool already_in_cache = (index_ != nullptr); bool already_in_cache = (index_ != nullptr);
if (!already_in_cache) { if (!already_in_cache) {
...@@ -141,7 +150,7 @@ Status ExecutionEngineImpl::Load(bool to_cache) { ...@@ -141,7 +150,7 @@ Status ExecutionEngineImpl::Load(bool to_cache) {
double physical_size = PhysicalSize(); double physical_size = PhysicalSize();
server::CollectExecutionEngineMetrics metrics(physical_size); server::CollectExecutionEngineMetrics metrics(physical_size);
index_ = read_index(location_); index_ = read_index(location_);
if(index_ == nullptr) { if (index_ == nullptr) {
std::string msg = "Failed to load index from " + location_; std::string msg = "Failed to load index from " + location_;
ENGINE_LOG_ERROR << msg; ENGINE_LOG_ERROR << msg;
return Status(DB_ERROR, msg); return Status(DB_ERROR, msg);
...@@ -160,13 +169,14 @@ Status ExecutionEngineImpl::Load(bool to_cache) { ...@@ -160,13 +169,14 @@ Status ExecutionEngineImpl::Load(bool to_cache) {
return Status::OK(); return Status::OK();
} }
Status ExecutionEngineImpl::CopyToGpu(uint64_t device_id) { Status
ExecutionEngineImpl::CopyToGpu(uint64_t device_id) {
auto index = cache::GpuCacheMgr::GetInstance(device_id)->GetIndex(location_); auto index = cache::GpuCacheMgr::GetInstance(device_id)->GetIndex(location_);
bool already_in_cache = (index != nullptr); bool already_in_cache = (index != nullptr);
if (already_in_cache) { if (already_in_cache) {
index_ = index; index_ = index;
} else { } else {
if(index_ == nullptr) { if (index_ == nullptr) {
ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, failed to copy to gpu"; ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, failed to copy to gpu";
return Status(DB_ERROR, "index is null"); return Status(DB_ERROR, "index is null");
} }
...@@ -187,13 +197,14 @@ Status ExecutionEngineImpl::CopyToGpu(uint64_t device_id) { ...@@ -187,13 +197,14 @@ Status ExecutionEngineImpl::CopyToGpu(uint64_t device_id) {
return Status::OK(); return Status::OK();
} }
Status ExecutionEngineImpl::CopyToCpu() { Status
ExecutionEngineImpl::CopyToCpu() {
auto index = cache::CpuCacheMgr::GetInstance()->GetIndex(location_); auto index = cache::CpuCacheMgr::GetInstance()->GetIndex(location_);
bool already_in_cache = (index != nullptr); bool already_in_cache = (index != nullptr);
if (already_in_cache) { if (already_in_cache) {
index_ = index; index_ = index;
} else { } else {
if(index_ == nullptr) { if (index_ == nullptr) {
ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, failed to copy to cpu"; ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, failed to copy to cpu";
return Status(DB_ERROR, "index is null"); return Status(DB_ERROR, "index is null");
} }
...@@ -213,8 +224,9 @@ Status ExecutionEngineImpl::CopyToCpu() { ...@@ -213,8 +224,9 @@ Status ExecutionEngineImpl::CopyToCpu() {
return Status::OK(); return Status::OK();
} }
ExecutionEnginePtr ExecutionEngineImpl::Clone() { ExecutionEnginePtr
if(index_ == nullptr) { ExecutionEngineImpl::Clone() {
if (index_ == nullptr) {
ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, failed to clone"; ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, failed to clone";
return nullptr; return nullptr;
} }
...@@ -225,7 +237,8 @@ ExecutionEnginePtr ExecutionEngineImpl::Clone() { ...@@ -225,7 +237,8 @@ ExecutionEnginePtr ExecutionEngineImpl::Clone() {
return ret; return ret;
} }
Status ExecutionEngineImpl::Merge(const std::string &location) { Status
ExecutionEngineImpl::Merge(const std::string &location) {
if (location == location_) { if (location == location_) {
return Status(DB_ERROR, "Cannot Merge Self"); return Status(DB_ERROR, "Cannot Merge Self");
} }
...@@ -243,7 +256,7 @@ Status ExecutionEngineImpl::Merge(const std::string &location) { ...@@ -243,7 +256,7 @@ Status ExecutionEngineImpl::Merge(const std::string &location) {
} }
} }
if(index_ == nullptr) { if (index_ == nullptr) {
ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, failed to merge"; ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, failed to merge";
return Status(DB_ERROR, "index is null"); return Status(DB_ERROR, "index is null");
} }
...@@ -264,7 +277,7 @@ ExecutionEngineImpl::BuildIndex(const std::string &location, EngineType engine_t ...@@ -264,7 +277,7 @@ ExecutionEngineImpl::BuildIndex(const std::string &location, EngineType engine_t
ENGINE_LOG_DEBUG << "Build index file: " << location << " from: " << location_; ENGINE_LOG_DEBUG << "Build index file: " << location << " from: " << location_;
auto from_index = std::dynamic_pointer_cast<BFIndex>(index_); auto from_index = std::dynamic_pointer_cast<BFIndex>(index_);
if(from_index == nullptr) { if (from_index == nullptr) {
ENGINE_LOG_ERROR << "ExecutionEngineImpl: from_index is null, failed to build index"; ENGINE_LOG_ERROR << "ExecutionEngineImpl: from_index is null, failed to build index";
return nullptr; return nullptr;
} }
...@@ -282,21 +295,22 @@ ExecutionEngineImpl::BuildIndex(const std::string &location, EngineType engine_t ...@@ -282,21 +295,22 @@ ExecutionEngineImpl::BuildIndex(const std::string &location, EngineType engine_t
AutoGenParams(to_index->GetType(), Count(), build_cfg); AutoGenParams(to_index->GetType(), Count(), build_cfg);
auto status = to_index->BuildAll(Count(), auto status = to_index->BuildAll(Count(),
from_index->GetRawVectors(), from_index->GetRawVectors(),
from_index->GetRawIds(), from_index->GetRawIds(),
build_cfg); build_cfg);
if (!status.ok()) { throw Exception(DB_ERROR, status.message()); } if (!status.ok()) { throw Exception(DB_ERROR, status.message()); }
return std::make_shared<ExecutionEngineImpl>(to_index, location, engine_type, metric_type_, nlist_); return std::make_shared<ExecutionEngineImpl>(to_index, location, engine_type, metric_type_, nlist_);
} }
Status ExecutionEngineImpl::Search(long n, Status
const float *data, ExecutionEngineImpl::Search(int64_t n,
long k, const float *data,
long nprobe, int64_t k,
float *distances, int64_t nprobe,
long *labels) const { float *distances,
if(index_ == nullptr) { int64_t *labels) const {
if (index_ == nullptr) {
ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, failed to search"; ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, failed to search";
return Status(DB_ERROR, "index is null"); return Status(DB_ERROR, "index is null");
} }
...@@ -310,14 +324,16 @@ Status ExecutionEngineImpl::Search(long n, ...@@ -310,14 +324,16 @@ Status ExecutionEngineImpl::Search(long n,
return status; return status;
} }
Status ExecutionEngineImpl::Cache() { Status
ExecutionEngineImpl::Cache() {
cache::DataObjPtr obj = std::make_shared<cache::DataObj>(index_, PhysicalSize()); cache::DataObjPtr obj = std::make_shared<cache::DataObj>(index_, PhysicalSize());
zilliz::milvus::cache::CpuCacheMgr::GetInstance()->InsertItem(location_, obj); zilliz::milvus::cache::CpuCacheMgr::GetInstance()->InsertItem(location_, obj);
return Status::OK(); return Status::OK();
} }
Status ExecutionEngineImpl::GpuCache(uint64_t gpu_id) { Status
ExecutionEngineImpl::GpuCache(uint64_t gpu_id) {
cache::DataObjPtr obj = std::make_shared<cache::DataObj>(index_, PhysicalSize()); cache::DataObjPtr obj = std::make_shared<cache::DataObj>(index_, PhysicalSize());
zilliz::milvus::cache::GpuCacheMgr::GetInstance(gpu_id)->InsertItem(location_, obj); zilliz::milvus::cache::GpuCacheMgr::GetInstance(gpu_id)->InsertItem(location_, obj);
...@@ -325,8 +341,8 @@ Status ExecutionEngineImpl::GpuCache(uint64_t gpu_id) { ...@@ -325,8 +341,8 @@ Status ExecutionEngineImpl::GpuCache(uint64_t gpu_id) {
} }
// TODO(linxj): remove. // TODO(linxj): remove.
Status ExecutionEngineImpl::Init() { Status
using namespace zilliz::milvus::server; ExecutionEngineImpl::Init() {
server::Config &config = server::Config::GetInstance(); server::Config &config = server::Config::GetInstance();
Status s = config.GetDBConfigBuildIndexGPU(gpu_num_); Status s = config.GetDBConfigBuildIndexGPU(gpu_num_);
if (!s.ok()) return s; if (!s.ok()) return s;
...@@ -334,7 +350,6 @@ Status ExecutionEngineImpl::Init() { ...@@ -334,7 +350,6 @@ Status ExecutionEngineImpl::Init() {
return Status::OK(); return Status::OK();
} }
} // namespace engine } // namespace engine
} // namespace milvus } // namespace milvus
} // namespace zilliz } // namespace zilliz
...@@ -23,15 +23,12 @@ ...@@ -23,15 +23,12 @@
#include <memory> #include <memory>
#include <string> #include <string>
namespace zilliz { namespace zilliz {
namespace milvus { namespace milvus {
namespace engine { namespace engine {
class ExecutionEngineImpl : public ExecutionEngine { class ExecutionEngineImpl : public ExecutionEngine {
public: public:
ExecutionEngineImpl(uint16_t dimension, ExecutionEngineImpl(uint16_t dimension,
const std::string &location, const std::string &location,
EngineType index_type, EngineType index_type,
...@@ -44,7 +41,7 @@ public: ...@@ -44,7 +41,7 @@ public:
MetricType metric_type, MetricType metric_type,
int32_t nlist); int32_t nlist);
Status AddWithIds(long n, const float *xdata, const long *xids) override; Status AddWithIds(int64_t n, const float *xdata, const int64_t *xids) override;
size_t Count() const override; size_t Count() const override;
...@@ -66,12 +63,12 @@ public: ...@@ -66,12 +63,12 @@ public:
Status Merge(const std::string &location) override; Status Merge(const std::string &location) override;
Status Search(long n, Status Search(int64_t n,
const float *data, const float *data,
long k, int64_t k,
long nprobe, int64_t nprobe,
float *distances, float *distances,
long *labels) const override; int64_t *labels) const override;
ExecutionEnginePtr BuildIndex(const std::string &location, EngineType engine_type) override; ExecutionEnginePtr BuildIndex(const std::string &location, EngineType engine_type) override;
...@@ -81,18 +78,24 @@ public: ...@@ -81,18 +78,24 @@ public:
Status Init() override; Status Init() override;
EngineType IndexEngineType() const override { return index_type_; } EngineType IndexEngineType() const override {
return index_type_;
}
MetricType IndexMetricType() const override { return metric_type_; } MetricType IndexMetricType() const override {
return metric_type_;
}
std::string GetLocation() const override { return location_; } std::string GetLocation() const override {
return location_;
}
private: private:
VecIndexPtr CreatetVecIndex(EngineType type); VecIndexPtr CreatetVecIndex(EngineType type);
VecIndexPtr Load(const std::string &location); VecIndexPtr Load(const std::string &location);
protected: protected:
VecIndexPtr index_ = nullptr; VecIndexPtr index_ = nullptr;
EngineType index_type_; EngineType index_type_;
MetricType metric_type_; MetricType metric_type_;
...@@ -104,7 +107,6 @@ protected: ...@@ -104,7 +107,6 @@ protected:
int32_t gpu_num_ = 0; int32_t gpu_num_ = 0;
}; };
} // namespace engine } // namespace engine
} // namespace milvus } // namespace milvus
} // namespace zilliz } // namespace zilliz
...@@ -23,6 +23,7 @@ ...@@ -23,6 +23,7 @@
#include <set> #include <set>
#include <memory> #include <memory>
#include <string>
namespace zilliz { namespace zilliz {
namespace milvus { namespace milvus {
...@@ -30,7 +31,6 @@ namespace engine { ...@@ -30,7 +31,6 @@ namespace engine {
class MemManager { class MemManager {
public: public:
virtual Status InsertVectors(const std::string &table_id, virtual Status InsertVectors(const std::string &table_id,
size_t n, const float *vectors, IDNumbers &vector_ids) = 0; size_t n, const float *vectors, IDNumbers &vector_ids) = 0;
...@@ -43,11 +43,10 @@ class MemManager { ...@@ -43,11 +43,10 @@ class MemManager {
virtual size_t GetCurrentImmutableMem() = 0; virtual size_t GetCurrentImmutableMem() = 0;
virtual size_t GetCurrentMem() = 0; virtual size_t GetCurrentMem() = 0;
}; // MemManagerAbstract }; // MemManagerAbstract
using MemManagerPtr = std::shared_ptr<MemManager>; using MemManagerPtr = std::shared_ptr<MemManager>;
} // namespace engine } // namespace engine
} // namespace milvus } // namespace milvus
} // namespace zilliz } // namespace zilliz
\ No newline at end of file
...@@ -16,19 +16,19 @@ ...@@ -16,19 +16,19 @@
// under the License. // under the License.
#include "MemManagerImpl.h" #include "db/insert/MemManagerImpl.h"
#include "VectorSource.h" #include "VectorSource.h"
#include "utils/Log.h" #include "utils/Log.h"
#include "db/Constants.h" #include "db/Constants.h"
#include <thread> #include <thread>
namespace zilliz { namespace zilliz {
namespace milvus { namespace milvus {
namespace engine { namespace engine {
MemTablePtr MemManagerImpl::GetMemByTable(const std::string &table_id) { MemTablePtr
MemManagerImpl::GetMemByTable(const std::string &table_id) {
auto memIt = mem_id_map_.find(table_id); auto memIt = mem_id_map_.find(table_id);
if (memIt != mem_id_map_.end()) { if (memIt != mem_id_map_.end()) {
return memIt->second; return memIt->second;
...@@ -38,11 +38,11 @@ MemTablePtr MemManagerImpl::GetMemByTable(const std::string &table_id) { ...@@ -38,11 +38,11 @@ MemTablePtr MemManagerImpl::GetMemByTable(const std::string &table_id) {
return mem_id_map_[table_id]; return mem_id_map_[table_id];
} }
Status MemManagerImpl::InsertVectors(const std::string &table_id_, Status
size_t n_, MemManagerImpl::InsertVectors(const std::string &table_id_,
const float *vectors_, size_t n_,
IDNumbers &vector_ids_) { const float *vectors_,
IDNumbers &vector_ids_) {
while (GetCurrentMem() > options_.insert_buffer_size_) { while (GetCurrentMem() > options_.insert_buffer_size_) {
std::this_thread::sleep_for(std::chrono::milliseconds(1)); std::this_thread::sleep_for(std::chrono::milliseconds(1));
} }
...@@ -52,11 +52,11 @@ Status MemManagerImpl::InsertVectors(const std::string &table_id_, ...@@ -52,11 +52,11 @@ Status MemManagerImpl::InsertVectors(const std::string &table_id_,
return InsertVectorsNoLock(table_id_, n_, vectors_, vector_ids_); return InsertVectorsNoLock(table_id_, n_, vectors_, vector_ids_);
} }
Status MemManagerImpl::InsertVectorsNoLock(const std::string &table_id, Status
size_t n, MemManagerImpl::InsertVectorsNoLock(const std::string &table_id,
const float *vectors, size_t n,
IDNumbers &vector_ids) { const float *vectors,
IDNumbers &vector_ids) {
MemTablePtr mem = GetMemByTable(table_id); MemTablePtr mem = GetMemByTable(table_id);
VectorSourcePtr source = std::make_shared<VectorSource>(n, vectors); VectorSourcePtr source = std::make_shared<VectorSource>(n, vectors);
...@@ -69,10 +69,11 @@ Status MemManagerImpl::InsertVectorsNoLock(const std::string &table_id, ...@@ -69,10 +69,11 @@ Status MemManagerImpl::InsertVectorsNoLock(const std::string &table_id,
return status; return status;
} }
Status MemManagerImpl::ToImmutable() { Status
MemManagerImpl::ToImmutable() {
std::unique_lock<std::mutex> lock(mutex_); std::unique_lock<std::mutex> lock(mutex_);
MemIdMap temp_map; MemIdMap temp_map;
for (auto &kv: mem_id_map_) { for (auto &kv : mem_id_map_) {
if (kv.second->Empty()) { if (kv.second->Empty()) {
//empty table, no need to serialize //empty table, no need to serialize
temp_map.insert(kv); temp_map.insert(kv);
...@@ -85,7 +86,8 @@ Status MemManagerImpl::ToImmutable() { ...@@ -85,7 +86,8 @@ Status MemManagerImpl::ToImmutable() {
return Status::OK(); return Status::OK();
} }
Status MemManagerImpl::Serialize(std::set<std::string> &table_ids) { Status
MemManagerImpl::Serialize(std::set<std::string> &table_ids) {
ToImmutable(); ToImmutable();
std::unique_lock<std::mutex> lock(serialization_mtx_); std::unique_lock<std::mutex> lock(serialization_mtx_);
table_ids.clear(); table_ids.clear();
...@@ -97,7 +99,8 @@ Status MemManagerImpl::Serialize(std::set<std::string> &table_ids) { ...@@ -97,7 +99,8 @@ Status MemManagerImpl::Serialize(std::set<std::string> &table_ids) {
return Status::OK(); return Status::OK();
} }
Status MemManagerImpl::EraseMemVector(const std::string &table_id) { Status
MemManagerImpl::EraseMemVector(const std::string &table_id) {
{//erase MemVector from rapid-insert cache {//erase MemVector from rapid-insert cache
std::unique_lock<std::mutex> lock(mutex_); std::unique_lock<std::mutex> lock(mutex_);
mem_id_map_.erase(table_id); mem_id_map_.erase(table_id);
...@@ -117,7 +120,8 @@ Status MemManagerImpl::EraseMemVector(const std::string &table_id) { ...@@ -117,7 +120,8 @@ Status MemManagerImpl::EraseMemVector(const std::string &table_id) {
return Status::OK(); return Status::OK();
} }
size_t MemManagerImpl::GetCurrentMutableMem() { size_t
MemManagerImpl::GetCurrentMutableMem() {
size_t total_mem = 0; size_t total_mem = 0;
for (auto &kv : mem_id_map_) { for (auto &kv : mem_id_map_) {
auto memTable = kv.second; auto memTable = kv.second;
...@@ -126,7 +130,8 @@ size_t MemManagerImpl::GetCurrentMutableMem() { ...@@ -126,7 +130,8 @@ size_t MemManagerImpl::GetCurrentMutableMem() {
return total_mem; return total_mem;
} }
size_t MemManagerImpl::GetCurrentImmutableMem() { size_t
MemManagerImpl::GetCurrentImmutableMem() {
size_t total_mem = 0; size_t total_mem = 0;
for (auto &mem_table : immu_mem_list_) { for (auto &mem_table : immu_mem_list_) {
total_mem += mem_table->GetCurrentMem(); total_mem += mem_table->GetCurrentMem();
...@@ -134,10 +139,11 @@ size_t MemManagerImpl::GetCurrentImmutableMem() { ...@@ -134,10 +139,11 @@ size_t MemManagerImpl::GetCurrentImmutableMem() {
return total_mem; return total_mem;
} }
size_t MemManagerImpl::GetCurrentMem() { size_t
MemManagerImpl::GetCurrentMem() {
return GetCurrentMutableMem() + GetCurrentImmutableMem(); return GetCurrentMutableMem() + GetCurrentImmutableMem();
} }
} // namespace engine } // namespace engine
} // namespace milvus } // namespace milvus
} // namespace zilliz } // namespace zilliz
\ No newline at end of file
...@@ -24,12 +24,13 @@ ...@@ -24,12 +24,13 @@
#include "utils/Status.h" #include "utils/Status.h"
#include <map> #include <map>
#include <set>
#include <vector>
#include <string> #include <string>
#include <ctime> #include <ctime>
#include <memory> #include <memory>
#include <mutex> #include <mutex>
namespace zilliz { namespace zilliz {
namespace milvus { namespace milvus {
namespace engine { namespace engine {
...@@ -39,7 +40,8 @@ class MemManagerImpl : public MemManager { ...@@ -39,7 +40,8 @@ class MemManagerImpl : public MemManager {
using Ptr = std::shared_ptr<MemManagerImpl>; using Ptr = std::shared_ptr<MemManagerImpl>;
MemManagerImpl(const meta::MetaPtr &meta, const DBOptions &options) MemManagerImpl(const meta::MetaPtr &meta, const DBOptions &options)
: meta_(meta), options_(options) {} : meta_(meta), options_(options) {
}
Status InsertVectors(const std::string &table_id, Status InsertVectors(const std::string &table_id,
size_t n, const float *vectors, IDNumbers &vector_ids) override; size_t n, const float *vectors, IDNumbers &vector_ids) override;
...@@ -71,7 +73,6 @@ class MemManagerImpl : public MemManager { ...@@ -71,7 +73,6 @@ class MemManagerImpl : public MemManager {
std::mutex serialization_mtx_; std::mutex serialization_mtx_;
}; // NewMemManager }; // NewMemManager
} // namespace engine } // namespace engine
} // namespace milvus } // namespace milvus
} // namespace zilliz } // namespace zilliz
\ No newline at end of file
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
// specific language governing permissions and limitations // specific language governing permissions and limitations
// under the License. // under the License.
#include "MemMenagerFactory.h" #include "db/insert/MemMenagerFactory.h"
#include "MemManagerImpl.h" #include "MemManagerImpl.h"
#include "utils/Log.h" #include "utils/Log.h"
#include "utils/Exception.h" #include "utils/Exception.h"
...@@ -26,12 +26,14 @@ ...@@ -26,12 +26,14 @@
#include <cstdlib> #include <cstdlib>
#include <string> #include <string>
#include <regex> #include <regex>
#include <memory>
namespace zilliz { namespace zilliz {
namespace milvus { namespace milvus {
namespace engine { namespace engine {
MemManagerPtr MemManagerFactory::Build(const std::shared_ptr<meta::Meta>& meta, const DBOptions& options) { MemManagerPtr
MemManagerFactory::Build(const std::shared_ptr<meta::Meta> &meta, const DBOptions &options) {
return std::make_shared<MemManagerImpl>(meta, options); return std::make_shared<MemManagerImpl>(meta, options);
} }
......
...@@ -20,15 +20,17 @@ ...@@ -20,15 +20,17 @@
#include "MemManager.h" #include "MemManager.h"
#include "db/meta/Meta.h" #include "db/meta/Meta.h"
#include <memory>
namespace zilliz { namespace zilliz {
namespace milvus { namespace milvus {
namespace engine { namespace engine {
class MemManagerFactory { class MemManagerFactory {
public: public:
static MemManagerPtr Build(const std::shared_ptr<meta::Meta> &meta, const DBOptions &options); static MemManagerPtr Build(const std::shared_ptr<meta::Meta> &meta, const DBOptions &options);
}; };
} } // namespace engine
} } // namespace milvus
} } // namespace zilliz
...@@ -16,9 +16,11 @@ ...@@ -16,9 +16,11 @@
// under the License. // under the License.
#include "MemTable.h" #include "db/insert/MemTable.h"
#include "utils/Log.h" #include "utils/Log.h"
#include <memory>
#include <string>
namespace zilliz { namespace zilliz {
namespace milvus { namespace milvus {
...@@ -30,13 +32,11 @@ MemTable::MemTable(const std::string &table_id, ...@@ -30,13 +32,11 @@ MemTable::MemTable(const std::string &table_id,
table_id_(table_id), table_id_(table_id),
meta_(meta), meta_(meta),
options_(options) { options_(options) {
} }
Status MemTable::Add(VectorSourcePtr &source, IDNumbers &vector_ids) { Status
MemTable::Add(VectorSourcePtr &source, IDNumbers &vector_ids) {
while (!source->AllAdded()) { while (!source->AllAdded()) {
MemTableFilePtr current_mem_table_file; MemTableFilePtr current_mem_table_file;
if (!mem_table_file_list_.empty()) { if (!mem_table_file_list_.empty()) {
current_mem_table_file = mem_table_file_list_.back(); current_mem_table_file = mem_table_file_list_.back();
...@@ -62,15 +62,18 @@ Status MemTable::Add(VectorSourcePtr &source, IDNumbers &vector_ids) { ...@@ -62,15 +62,18 @@ Status MemTable::Add(VectorSourcePtr &source, IDNumbers &vector_ids) {
return Status::OK(); return Status::OK();
} }
void MemTable::GetCurrentMemTableFile(MemTableFilePtr &mem_table_file) { void
MemTable::GetCurrentMemTableFile(MemTableFilePtr &mem_table_file) {
mem_table_file = mem_table_file_list_.back(); mem_table_file = mem_table_file_list_.back();
} }
size_t MemTable::GetTableFileCount() { size_t
MemTable::GetTableFileCount() {
return mem_table_file_list_.size(); return mem_table_file_list_.size();
} }
Status MemTable::Serialize() { Status
MemTable::Serialize() {
for (auto mem_table_file = mem_table_file_list_.begin(); mem_table_file != mem_table_file_list_.end();) { for (auto mem_table_file = mem_table_file_list_.begin(); mem_table_file != mem_table_file_list_.end();) {
auto status = (*mem_table_file)->Serialize(); auto status = (*mem_table_file)->Serialize();
if (!status.ok()) { if (!status.ok()) {
...@@ -84,15 +87,18 @@ Status MemTable::Serialize() { ...@@ -84,15 +87,18 @@ Status MemTable::Serialize() {
return Status::OK(); return Status::OK();
} }
bool MemTable::Empty() { bool
MemTable::Empty() {
return mem_table_file_list_.empty(); return mem_table_file_list_.empty();
} }
const std::string &MemTable::GetTableId() const { const std::string &
MemTable::GetTableId() const {
return table_id_; return table_id_;
} }
size_t MemTable::GetCurrentMem() { size_t
MemTable::GetCurrentMem() {
std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_);
size_t total_mem = 0; size_t total_mem = 0;
for (auto &mem_table_file : mem_table_file_list_) { for (auto &mem_table_file : mem_table_file_list_) {
...@@ -103,4 +109,4 @@ size_t MemTable::GetCurrentMem() { ...@@ -103,4 +109,4 @@ size_t MemTable::GetCurrentMem() {
} // namespace engine } // namespace engine
} // namespace milvus } // namespace milvus
} // namespace zilliz } // namespace zilliz
\ No newline at end of file
...@@ -23,7 +23,9 @@ ...@@ -23,7 +23,9 @@
#include "utils/Status.h" #include "utils/Status.h"
#include <mutex> #include <mutex>
#include <vector>
#include <memory>
#include <string>
namespace zilliz { namespace zilliz {
namespace milvus { namespace milvus {
...@@ -59,11 +61,10 @@ class MemTable { ...@@ -59,11 +61,10 @@ class MemTable {
DBOptions options_; DBOptions options_;
std::mutex mutex_; std::mutex mutex_;
}; //MemTable }; //MemTable
using MemTablePtr = std::shared_ptr<MemTable>; using MemTablePtr = std::shared_ptr<MemTable>;
} // namespace engine } // namespace engine
} // namespace milvus } // namespace milvus
} // namespace zilliz } // namespace zilliz
\ No newline at end of file
...@@ -16,14 +16,14 @@ ...@@ -16,14 +16,14 @@
// under the License. // under the License.
#include "MemTableFile.h" #include "db/insert/MemTableFile.h"
#include "db/Constants.h" #include "db/Constants.h"
#include "db/engine/EngineFactory.h" #include "db/engine/EngineFactory.h"
#include "metrics/Metrics.h" #include "metrics/Metrics.h"
#include "utils/Log.h" #include "utils/Log.h"
#include <cmath> #include <cmath>
#include <string>
namespace zilliz { namespace zilliz {
namespace milvus { namespace milvus {
...@@ -35,20 +35,19 @@ MemTableFile::MemTableFile(const std::string &table_id, ...@@ -35,20 +35,19 @@ MemTableFile::MemTableFile(const std::string &table_id,
table_id_(table_id), table_id_(table_id),
meta_(meta), meta_(meta),
options_(options) { options_(options) {
current_mem_ = 0; current_mem_ = 0;
auto status = CreateTableFile(); auto status = CreateTableFile();
if (status.ok()) { if (status.ok()) {
execution_engine_ = EngineFactory::Build(table_file_schema_.dimension_, execution_engine_ = EngineFactory::Build(table_file_schema_.dimension_,
table_file_schema_.location_, table_file_schema_.location_,
(EngineType) table_file_schema_.engine_type_, (EngineType) table_file_schema_.engine_type_,
(MetricType)table_file_schema_.metric_type_, (MetricType) table_file_schema_.metric_type_,
table_file_schema_.nlist_); table_file_schema_.nlist_);
} }
} }
Status MemTableFile::CreateTableFile() { Status
MemTableFile::CreateTableFile() {
meta::TableFileSchema table_file_schema; meta::TableFileSchema table_file_schema;
table_file_schema.table_id_ = table_id_; table_file_schema.table_id_ = table_id_;
auto status = meta_->CreateTableFile(table_file_schema); auto status = meta_->CreateTableFile(table_file_schema);
...@@ -61,8 +60,8 @@ Status MemTableFile::CreateTableFile() { ...@@ -61,8 +60,8 @@ Status MemTableFile::CreateTableFile() {
return status; return status;
} }
Status MemTableFile::Add(const VectorSourcePtr &source, IDNumbers& vector_ids) { Status
MemTableFile::Add(const VectorSourcePtr &source, IDNumbers &vector_ids) {
if (table_file_schema_.dimension_ <= 0) { if (table_file_schema_.dimension_ <= 0) {
std::string err_msg = "MemTableFile::Add: table_file_schema dimension = " + std::string err_msg = "MemTableFile::Add: table_file_schema dimension = " +
std::to_string(table_file_schema_.dimension_) + ", table_id = " + table_file_schema_.table_id_; std::to_string(table_file_schema_.dimension_) + ", table_id = " + table_file_schema_.table_id_;
...@@ -75,7 +74,8 @@ Status MemTableFile::Add(const VectorSourcePtr &source, IDNumbers& vector_ids) { ...@@ -75,7 +74,8 @@ Status MemTableFile::Add(const VectorSourcePtr &source, IDNumbers& vector_ids) {
if (mem_left >= single_vector_mem_size) { if (mem_left >= single_vector_mem_size) {
size_t num_vectors_to_add = std::ceil(mem_left / single_vector_mem_size); size_t num_vectors_to_add = std::ceil(mem_left / single_vector_mem_size);
size_t num_vectors_added; size_t num_vectors_added;
auto status = source->Add(execution_engine_, table_file_schema_, num_vectors_to_add, num_vectors_added, vector_ids); auto status =
source->Add(execution_engine_, table_file_schema_, num_vectors_to_add, num_vectors_added, vector_ids);
if (status.ok()) { if (status.ok()) {
current_mem_ += (num_vectors_added * single_vector_mem_size); current_mem_ += (num_vectors_added * single_vector_mem_size);
} }
...@@ -84,20 +84,24 @@ Status MemTableFile::Add(const VectorSourcePtr &source, IDNumbers& vector_ids) { ...@@ -84,20 +84,24 @@ Status MemTableFile::Add(const VectorSourcePtr &source, IDNumbers& vector_ids) {
return Status::OK(); return Status::OK();
} }
size_t MemTableFile::GetCurrentMem() { size_t
MemTableFile::GetCurrentMem() {
return current_mem_; return current_mem_;
} }
size_t MemTableFile::GetMemLeft() { size_t
MemTableFile::GetMemLeft() {
return (MAX_TABLE_FILE_MEM - current_mem_); return (MAX_TABLE_FILE_MEM - current_mem_);
} }
bool MemTableFile::IsFull() { bool
MemTableFile::IsFull() {
size_t single_vector_mem_size = table_file_schema_.dimension_ * VECTOR_TYPE_SIZE; size_t single_vector_mem_size = table_file_schema_.dimension_ * VECTOR_TYPE_SIZE;
return (GetMemLeft() < single_vector_mem_size); return (GetMemLeft() < single_vector_mem_size);
} }
Status MemTableFile::Serialize() { Status
MemTableFile::Serialize() {
size_t size = GetCurrentMem(); size_t size = GetCurrentMem();
server::CollectSerializeMetrics metrics(size); server::CollectSerializeMetrics metrics(size);
...@@ -107,7 +111,7 @@ Status MemTableFile::Serialize() { ...@@ -107,7 +111,7 @@ Status MemTableFile::Serialize() {
//if index type isn't IDMAP, set file type to TO_INDEX if file size execeed index_file_size //if index type isn't IDMAP, set file type to TO_INDEX if file size execeed index_file_size
//else set file type to RAW, no need to build index //else set file type to RAW, no need to build index
if (table_file_schema_.engine_type_ != (int)EngineType::FAISS_IDMAP) { if (table_file_schema_.engine_type_ != (int) EngineType::FAISS_IDMAP) {
table_file_schema_.file_type_ = (size >= table_file_schema_.index_file_size_) ? table_file_schema_.file_type_ = (size >= table_file_schema_.index_file_size_) ?
meta::TableFileSchema::TO_INDEX : meta::TableFileSchema::RAW; meta::TableFileSchema::TO_INDEX : meta::TableFileSchema::RAW;
} else { } else {
...@@ -117,9 +121,9 @@ Status MemTableFile::Serialize() { ...@@ -117,9 +121,9 @@ Status MemTableFile::Serialize() {
auto status = meta_->UpdateTableFile(table_file_schema_); auto status = meta_->UpdateTableFile(table_file_schema_);
ENGINE_LOG_DEBUG << "New " << ((table_file_schema_.file_type_ == meta::TableFileSchema::RAW) ? "raw" : "to_index") ENGINE_LOG_DEBUG << "New " << ((table_file_schema_.file_type_ == meta::TableFileSchema::RAW) ? "raw" : "to_index")
<< " file " << table_file_schema_.file_id_ << " of size " << size << " bytes"; << " file " << table_file_schema_.file_id_ << " of size " << size << " bytes";
if(options_.insert_cache_immediately_) { if (options_.insert_cache_immediately_) {
execution_engine_->Cache(); execution_engine_->Cache();
} }
...@@ -128,4 +132,4 @@ Status MemTableFile::Serialize() { ...@@ -128,4 +132,4 @@ Status MemTableFile::Serialize() {
} // namespace engine } // namespace engine
} // namespace milvus } // namespace milvus
} // namespace zilliz } // namespace zilliz
\ No newline at end of file
...@@ -23,17 +23,18 @@ ...@@ -23,17 +23,18 @@
#include "db/engine/ExecutionEngine.h" #include "db/engine/ExecutionEngine.h"
#include "utils/Status.h" #include "utils/Status.h"
#include <string>
#include <memory>
namespace zilliz { namespace zilliz {
namespace milvus { namespace milvus {
namespace engine { namespace engine {
class MemTableFile { class MemTableFile {
public: public:
MemTableFile(const std::string &table_id, const meta::MetaPtr &meta, const DBOptions &options); MemTableFile(const std::string &table_id, const meta::MetaPtr &meta, const DBOptions &options);
Status Add(const VectorSourcePtr &source, IDNumbers& vector_ids); Status Add(const VectorSourcePtr &source, IDNumbers &vector_ids);
size_t GetCurrentMem(); size_t GetCurrentMem();
...@@ -44,25 +45,20 @@ class MemTableFile { ...@@ -44,25 +45,20 @@ class MemTableFile {
Status Serialize(); Status Serialize();
private: private:
Status CreateTableFile(); Status CreateTableFile();
private:
const std::string table_id_; const std::string table_id_;
meta::TableFileSchema table_file_schema_; meta::TableFileSchema table_file_schema_;
meta::MetaPtr meta_; meta::MetaPtr meta_;
DBOptions options_; DBOptions options_;
size_t current_mem_; size_t current_mem_;
ExecutionEnginePtr execution_engine_; ExecutionEnginePtr execution_engine_;
}; //MemTableFile }; //MemTableFile
using MemTableFilePtr = std::shared_ptr<MemTableFile>; using MemTableFilePtr = std::shared_ptr<MemTableFile>;
} // namespace engine } // namespace engine
} // namespace milvus } // namespace milvus
} // namespace zilliz } // namespace zilliz
\ No newline at end of file
...@@ -16,32 +16,30 @@ ...@@ -16,32 +16,30 @@
// under the License. // under the License.
#include "VectorSource.h" #include "db/insert/VectorSource.h"
#include "db/engine/ExecutionEngine.h" #include "db/engine/ExecutionEngine.h"
#include "db/engine/EngineFactory.h" #include "db/engine/EngineFactory.h"
#include "utils/Log.h" #include "utils/Log.h"
#include "metrics/Metrics.h" #include "metrics/Metrics.h"
namespace zilliz { namespace zilliz {
namespace milvus { namespace milvus {
namespace engine { namespace engine {
VectorSource::VectorSource(const size_t &n, VectorSource::VectorSource(const size_t &n,
const float *vectors) : const float *vectors) :
n_(n), n_(n),
vectors_(vectors), vectors_(vectors),
id_generator_(std::make_shared<SimpleIDGenerator>()) { id_generator_(std::make_shared<SimpleIDGenerator>()) {
current_num_vectors_added = 0; current_num_vectors_added = 0;
} }
Status VectorSource::Add(const ExecutionEnginePtr &execution_engine, Status
const meta::TableFileSchema &table_file_schema, VectorSource::Add(const ExecutionEnginePtr &execution_engine,
const size_t &num_vectors_to_add, const meta::TableFileSchema &table_file_schema,
size_t &num_vectors_added, const size_t &num_vectors_to_add,
IDNumbers &vector_ids) { size_t &num_vectors_added,
IDNumbers &vector_ids) {
server::CollectAddMetrics metrics(n_, table_file_schema.dimension_); server::CollectAddMetrics metrics(n_, table_file_schema.dimension_);
num_vectors_added = current_num_vectors_added + num_vectors_to_add <= n_ ? num_vectors_added = current_num_vectors_added + num_vectors_to_add <= n_ ?
...@@ -52,7 +50,7 @@ Status VectorSource::Add(const ExecutionEnginePtr &execution_engine, ...@@ -52,7 +50,7 @@ Status VectorSource::Add(const ExecutionEnginePtr &execution_engine,
} else { } else {
vector_ids_to_add.resize(num_vectors_added); vector_ids_to_add.resize(num_vectors_added);
for (int pos = current_num_vectors_added; pos < current_num_vectors_added + num_vectors_added; pos++) { for (int pos = current_num_vectors_added; pos < current_num_vectors_added + num_vectors_added; pos++) {
vector_ids_to_add[pos-current_num_vectors_added] = vector_ids[pos]; vector_ids_to_add[pos - current_num_vectors_added] = vector_ids[pos];
} }
} }
Status status = execution_engine->AddWithIds(num_vectors_added, Status status = execution_engine->AddWithIds(num_vectors_added,
...@@ -70,18 +68,21 @@ Status VectorSource::Add(const ExecutionEnginePtr &execution_engine, ...@@ -70,18 +68,21 @@ Status VectorSource::Add(const ExecutionEnginePtr &execution_engine,
return status; return status;
} }
size_t VectorSource::GetNumVectorsAdded() { size_t
VectorSource::GetNumVectorsAdded() {
return current_num_vectors_added; return current_num_vectors_added;
} }
bool VectorSource::AllAdded() { bool
VectorSource::AllAdded() {
return (current_num_vectors_added == n_); return (current_num_vectors_added == n_);
} }
IDNumbers VectorSource::GetVectorIds() { IDNumbers
VectorSource::GetVectorIds() {
return vector_ids_; return vector_ids_;
} }
} // namespace engine } // namespace engine
} // namespace milvus } // namespace milvus
} // namespace zilliz } // namespace zilliz
\ No newline at end of file
...@@ -23,6 +23,7 @@ ...@@ -23,6 +23,7 @@
#include "db/engine/ExecutionEngine.h" #include "db/engine/ExecutionEngine.h"
#include "utils/Status.h" #include "utils/Status.h"
#include <memory>
namespace zilliz { namespace zilliz {
namespace milvus { namespace milvus {
...@@ -45,7 +46,6 @@ class VectorSource { ...@@ -45,7 +46,6 @@ class VectorSource {
IDNumbers GetVectorIds(); IDNumbers GetVectorIds();
private: private:
const size_t n_; const size_t n_;
const float *vectors_; const float *vectors_;
IDNumbers vector_ids_; IDNumbers vector_ids_;
...@@ -53,11 +53,10 @@ class VectorSource { ...@@ -53,11 +53,10 @@ class VectorSource {
size_t current_num_vectors_added; size_t current_num_vectors_added;
std::shared_ptr<IDGenerator> id_generator_; std::shared_ptr<IDGenerator> id_generator_;
}; //VectorSource }; //VectorSource
using VectorSourcePtr = std::shared_ptr<VectorSource>; using VectorSourcePtr = std::shared_ptr<VectorSource>;
} // namespace engine } // namespace engine
} // namespace milvus } // namespace milvus
} // namespace zilliz } // namespace zilliz
\ No newline at end of file
...@@ -25,14 +25,16 @@ ...@@ -25,14 +25,16 @@
#include <cstddef> #include <cstddef>
#include <memory> #include <memory>
#include <vector>
#include <string>
namespace zilliz { namespace zilliz {
namespace milvus { namespace milvus {
namespace engine { namespace engine {
namespace meta { namespace meta {
static const char* META_TABLES = "Tables"; static const char *META_TABLES = "Tables";
static const char* META_TABLEFILES = "TableFiles"; static const char *META_TABLEFILES = "TableFiles";
class Meta { class Meta {
public: public:
...@@ -46,7 +48,7 @@ class Meta { ...@@ -46,7 +48,7 @@ class Meta {
virtual Status AllTables(std::vector<TableSchema> &table_schema_array) = 0; virtual Status AllTables(std::vector<TableSchema> &table_schema_array) = 0;
virtual Status UpdateTableIndex(const std::string &table_id, const TableIndex& index) = 0; virtual Status UpdateTableIndex(const std::string &table_id, const TableIndex &index) = 0;
virtual Status UpdateTableFlag(const std::string &table_id, int64_t flag) = 0; virtual Status UpdateTableFlag(const std::string &table_id, int64_t flag) = 0;
...@@ -83,9 +85,9 @@ class Meta { ...@@ -83,9 +85,9 @@ class Meta {
virtual Status FilesByType(const std::string &table_id, virtual Status FilesByType(const std::string &table_id,
const std::vector<int> &file_types, const std::vector<int> &file_types,
std::vector<std::string>& file_ids) = 0; std::vector<std::string> &file_ids) = 0;
virtual Status DescribeTableIndex(const std::string &table_id, TableIndex& index) = 0; virtual Status DescribeTableIndex(const std::string &table_id, TableIndex &index) = 0;
virtual Status DropTableIndex(const std::string &table_id) = 0; virtual Status DropTableIndex(const std::string &table_id) = 0;
...@@ -96,7 +98,6 @@ class Meta { ...@@ -96,7 +98,6 @@ class Meta {
virtual Status DropAll() = 0; virtual Status DropAll() = 0;
virtual Status Count(const std::string &table_id, uint64_t &result) = 0; virtual Status Count(const std::string &table_id, uint64_t &result) = 0;
}; // MetaData }; // MetaData
using MetaPtr = std::shared_ptr<Meta>; using MetaPtr = std::shared_ptr<Meta>;
......
...@@ -23,20 +23,20 @@ namespace engine { ...@@ -23,20 +23,20 @@ namespace engine {
namespace meta { namespace meta {
const size_t K = 1024UL; const size_t K = 1024UL;
const size_t M = K*K; const size_t M = K * K;
const size_t G = K*M; const size_t G = K * M;
const size_t T = K*G; const size_t T = K * G;
const size_t S_PS = 1UL; const size_t S_PS = 1UL;
const size_t MS_PS = 1000*S_PS; const size_t MS_PS = 1000 * S_PS;
const size_t US_PS = 1000*MS_PS; const size_t US_PS = 1000 * MS_PS;
const size_t NS_PS = 1000*US_PS; const size_t NS_PS = 1000 * US_PS;
const size_t SECOND = 1UL; const size_t SECOND = 1UL;
const size_t M_SEC = 60*SECOND; const size_t M_SEC = 60 * SECOND;
const size_t H_SEC = 60*M_SEC; const size_t H_SEC = 60 * M_SEC;
const size_t D_SEC = 24*H_SEC; const size_t D_SEC = 24 * H_SEC;
const size_t W_SEC = 7*D_SEC; const size_t W_SEC = 7 * D_SEC;
} // namespace meta } // namespace meta
} // namespace engine } // namespace engine
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
// specific language governing permissions and limitations // specific language governing permissions and limitations
// under the License. // under the License.
#include "MetaFactory.h" #include "db/meta/MetaFactory.h"
#include "SqliteMetaImpl.h" #include "SqliteMetaImpl.h"
#include "MySQLMetaImpl.h" #include "MySQLMetaImpl.h"
#include "utils/Log.h" #include "utils/Log.h"
...@@ -28,46 +28,51 @@ ...@@ -28,46 +28,51 @@
#include <cstdlib> #include <cstdlib>
#include <string> #include <string>
#include <string.h> #include <string.h>
#include <memory>
namespace zilliz { namespace zilliz {
namespace milvus { namespace milvus {
namespace engine { namespace engine {
DBMetaOptions MetaFactory::BuildOption(const std::string &path) { DBMetaOptions
auto p = path; MetaFactory::BuildOption(const std::string &path) {
if(p == "") { auto p = path;
srand(time(nullptr)); if (p == "") {
std::stringstream ss; srand(time(nullptr));
ss << "/tmp/" << rand(); std::stringstream ss;
p = ss.str(); uint32_t rd = 0;
} rand_r(&rd);
ss << "/tmp/" << rd;
DBMetaOptions meta; p = ss.str();
meta.path_ = p;
return meta;
} }
meta::MetaPtr MetaFactory::Build(const DBMetaOptions &metaOptions, const int &mode) { DBMetaOptions meta;
std::string uri = metaOptions.backend_uri_; meta.path_ = p;
return meta;
}
meta::MetaPtr
MetaFactory::Build(const DBMetaOptions &metaOptions, const int &mode) {
std::string uri = metaOptions.backend_uri_;
utils::MetaUriInfo uri_info; utils::MetaUriInfo uri_info;
auto status = utils::ParseMetaUri(uri, uri_info); auto status = utils::ParseMetaUri(uri, uri_info);
if(!status.ok()) { if (!status.ok()) {
ENGINE_LOG_ERROR << "Wrong URI format: URI = " << uri; ENGINE_LOG_ERROR << "Wrong URI format: URI = " << uri;
throw InvalidArgumentException("Wrong URI format "); throw InvalidArgumentException("Wrong URI format ");
} }
if (strcasecmp(uri_info.dialect_.c_str(), "mysql") == 0) { if (strcasecmp(uri_info.dialect_.c_str(), "mysql") == 0) {
ENGINE_LOG_INFO << "Using MySQL"; ENGINE_LOG_INFO << "Using MySQL";
return std::make_shared<meta::MySQLMetaImpl>(metaOptions, mode); return std::make_shared<meta::MySQLMetaImpl>(metaOptions, mode);
} else if (strcasecmp(uri_info.dialect_.c_str(), "sqlite") == 0) { } else if (strcasecmp(uri_info.dialect_.c_str(), "sqlite") == 0) {
ENGINE_LOG_INFO << "Using SQLite"; ENGINE_LOG_INFO << "Using SQLite";
return std::make_shared<meta::SqliteMetaImpl>(metaOptions); return std::make_shared<meta::SqliteMetaImpl>(metaOptions);
} else { } else {
ENGINE_LOG_ERROR << "Invalid dialect in URI: dialect = " << uri_info.dialect_; ENGINE_LOG_ERROR << "Invalid dialect in URI: dialect = " << uri_info.dialect_;
throw InvalidArgumentException("URI dialect is not mysql / sqlite"); throw InvalidArgumentException("URI dialect is not mysql / sqlite");
}
} }
}
} // namespace engine } // namespace engine
} // namespace milvus } // namespace milvus
......
...@@ -20,18 +20,19 @@ ...@@ -20,18 +20,19 @@
#include "Meta.h" #include "Meta.h"
#include "db/Options.h" #include "db/Options.h"
#include <string>
namespace zilliz { namespace zilliz {
namespace milvus { namespace milvus {
namespace engine { namespace engine {
class MetaFactory { class MetaFactory {
public: public:
static DBMetaOptions BuildOption(const std::string &path = ""); static DBMetaOptions BuildOption(const std::string &path = "");
static meta::MetaPtr Build(const DBMetaOptions &metaOptions, const int &mode); static meta::MetaPtr Build(const DBMetaOptions &metaOptions, const int &mode);
}; };
} // namespace engine
} } // namespace milvus
} } // namespace zilliz
}
...@@ -23,21 +23,22 @@ ...@@ -23,21 +23,22 @@
#include <vector> #include <vector>
#include <map> #include <map>
#include <string> #include <string>
#include <memory>
namespace zilliz { namespace zilliz {
namespace milvus { namespace milvus {
namespace engine { namespace engine {
namespace meta { namespace meta {
constexpr int32_t DEFAULT_ENGINE_TYPE = (int)EngineType::FAISS_IDMAP; constexpr int32_t DEFAULT_ENGINE_TYPE = (int) EngineType::FAISS_IDMAP;
constexpr int32_t DEFAULT_NLIST = 16384; constexpr int32_t DEFAULT_NLIST = 16384;
constexpr int32_t DEFAULT_METRIC_TYPE = (int)MetricType::L2; constexpr int32_t DEFAULT_METRIC_TYPE = (int) MetricType::L2;
constexpr int32_t DEFAULT_INDEX_FILE_SIZE = ONE_GB; constexpr int32_t DEFAULT_INDEX_FILE_SIZE = ONE_GB;
constexpr int64_t FLAG_MASK_NO_USERID = 0x1; constexpr int64_t FLAG_MASK_NO_USERID = 0x1;
constexpr int64_t FLAG_MASK_HAS_USERID = 0x1<<1; constexpr int64_t FLAG_MASK_HAS_USERID = 0x1 << 1;
using DateT = int ; using DateT = int;
const DateT EmptyDate = -1; const DateT EmptyDate = -1;
using DatesT = std::vector<DateT>; using DatesT = std::vector<DateT>;
...@@ -49,7 +50,7 @@ struct TableSchema { ...@@ -49,7 +50,7 @@ struct TableSchema {
size_t id_ = 0; size_t id_ = 0;
std::string table_id_; std::string table_id_;
int32_t state_ = (int)NORMAL; int32_t state_ = (int) NORMAL;
uint16_t dimension_ = 0; uint16_t dimension_ = 0;
int64_t created_on_ = 0; int64_t created_on_ = 0;
int64_t flag_ = 0; int64_t flag_ = 0;
......
...@@ -16,37 +16,39 @@ ...@@ -16,37 +16,39 @@
// under the License. // under the License.
#include "MySQLConnectionPool.h" #include "db/meta/MySQLConnectionPool.h"
namespace zilliz { namespace zilliz {
namespace milvus { namespace milvus {
namespace engine { namespace engine {
namespace meta { namespace meta {
// Do a simple form of in-use connection limiting: wait to return // Do a simple form of in-use connection limiting: wait to return
// a connection until there are a reasonably low number in use // a connection until there are a reasonably low number in use
// already. Can't do this in create() because we're interested in // already. Can't do this in create() because we're interested in
// connections actually in use, not those created. Also note that // connections actually in use, not those created. Also note that
// we keep our own count; ConnectionPool::size() isn't the same! // we keep our own count; ConnectionPool::size() isn't the same!
mysqlpp::Connection *MySQLConnectionPool::grab() { mysqlpp::Connection *
while (conns_in_use_ > max_pool_size_) { MySQLConnectionPool::grab() {
sleep(1); while (conns_in_use_ > max_pool_size_) {
} sleep(1);
++conns_in_use_;
return mysqlpp::ConnectionPool::grab();
} }
// Other half of in-use conn count limit ++conns_in_use_;
void MySQLConnectionPool::release(const mysqlpp::Connection *pc) { return mysqlpp::ConnectionPool::grab();
mysqlpp::ConnectionPool::release(pc); }
if (conns_in_use_ <= 0) { // Other half of in-use conn count limit
ENGINE_LOG_WARNING << "MySQLConnetionPool::release: conns_in_use_ is less than zero. conns_in_use_ = " << conns_in_use_; void
} else { MySQLConnectionPool::release(const mysqlpp::Connection *pc) {
--conns_in_use_; mysqlpp::ConnectionPool::release(pc);
} if (conns_in_use_ <= 0) {
ENGINE_LOG_WARNING << "MySQLConnetionPool::release: conns_in_use_ is less than zero. conns_in_use_ = "
<< conns_in_use_;
} else {
--conns_in_use_;
} }
}
// int MySQLConnectionPool::getConnectionsInUse() { // int MySQLConnectionPool::getConnectionsInUse() {
// return conns_in_use_; // return conns_in_use_;
...@@ -56,39 +58,42 @@ namespace meta { ...@@ -56,39 +58,42 @@ namespace meta {
// max_idle_time_ = max_idle; // max_idle_time_ = max_idle;
// } // }
std::string MySQLConnectionPool::getDB() { std::string
return db_; MySQLConnectionPool::getDB() {
} return db_;
}
// Superclass overrides
mysqlpp::Connection *MySQLConnectionPool::create() {
try { // Superclass overrides
// Create connection using the parameters we were passed upon mysqlpp::Connection *
// creation. MySQLConnectionPool::create() {
mysqlpp::Connection *conn = new mysqlpp::Connection(); try {
conn->set_option(new mysqlpp::ReconnectOption(true)); // Create connection using the parameters we were passed upon
conn->connect(db_.empty() ? 0 : db_.c_str(), // creation.
server_.empty() ? 0 : server_.c_str(), mysqlpp::Connection *conn = new mysqlpp::Connection();
user_.empty() ? 0 : user_.c_str(), conn->set_option(new mysqlpp::ReconnectOption(true));
password_.empty() ? 0 : password_.c_str(), conn->connect(db_.empty() ? 0 : db_.c_str(),
port_); server_.empty() ? 0 : server_.c_str(),
return conn; user_.empty() ? 0 : user_.c_str(),
} catch (const mysqlpp::ConnectionFailed& er) { password_.empty() ? 0 : password_.c_str(),
ENGINE_LOG_ERROR << "Failed to connect to database server" << ": " << er.what(); port_);
return nullptr; return conn;
} } catch (const mysqlpp::ConnectionFailed &er) {
ENGINE_LOG_ERROR << "Failed to connect to database server" << ": " << er.what();
return nullptr;
} }
}
void MySQLConnectionPool::destroy(mysqlpp::Connection *cp) { void
// Our superclass can't know how we created the Connection, so MySQLConnectionPool::destroy(mysqlpp::Connection *cp) {
// it delegates destruction to us, to be safe. // Our superclass can't know how we created the Connection, so
delete cp; // it delegates destruction to us, to be safe.
} delete cp;
}
unsigned int MySQLConnectionPool::max_idle_time() { unsigned int
return max_idle_time_; MySQLConnectionPool::max_idle_time() {
} return max_idle_time_;
}
} // namespace meta } // namespace meta
} // namespace engine } // namespace engine
......
...@@ -30,8 +30,7 @@ namespace engine { ...@@ -30,8 +30,7 @@ namespace engine {
namespace meta { namespace meta {
class MySQLConnectionPool : public mysqlpp::ConnectionPool { class MySQLConnectionPool : public mysqlpp::ConnectionPool {
public:
public:
// The object's only constructor // The object's only constructor
MySQLConnectionPool(std::string dbName, MySQLConnectionPool(std::string dbName,
std::string userName, std::string userName,
...@@ -39,15 +38,13 @@ public: ...@@ -39,15 +38,13 @@ public:
std::string serverIp, std::string serverIp,
int port = 0, int port = 0,
int maxPoolSize = 8) : int maxPoolSize = 8) :
db_(dbName), db_(dbName),
user_(userName), user_(userName),
password_(passWord), password_(passWord),
server_(serverIp), server_(serverIp),
port_(port), port_(port),
max_pool_size_(maxPoolSize) { max_pool_size_(maxPoolSize) {
conns_in_use_ = 0; conns_in_use_ = 0;
max_idle_time_ = 10; //10 seconds max_idle_time_ = 10; //10 seconds
} }
...@@ -68,8 +65,7 @@ public: ...@@ -68,8 +65,7 @@ public:
std::string getDB(); std::string getDB();
protected: protected:
// Superclass overrides // Superclass overrides
mysqlpp::Connection *create() override; mysqlpp::Connection *create() override;
...@@ -77,7 +73,7 @@ protected: ...@@ -77,7 +73,7 @@ protected:
unsigned int max_idle_time() override; unsigned int max_idle_time() override;
private: private:
// Number of connections currently in use // Number of connections currently in use
std::atomic<int> conns_in_use_; std::atomic<int> conns_in_use_;
...@@ -93,4 +89,4 @@ private: ...@@ -93,4 +89,4 @@ private:
} // namespace meta } // namespace meta
} // namespace engine } // namespace engine
} // namespace milvus } // namespace milvus
} // namespace zilliz } // namespace zilliz
\ No newline at end of file
此差异已折叠。
...@@ -21,26 +21,25 @@ ...@@ -21,26 +21,25 @@
#include "db/Options.h" #include "db/Options.h"
#include "MySQLConnectionPool.h" #include "MySQLConnectionPool.h"
#include "mysql++/mysql++.h" #include <mysql++/mysql++.h>
#include <mutex> #include <mutex>
#include <vector>
#include <string>
#include <memory>
namespace zilliz { namespace zilliz {
namespace milvus { namespace milvus {
namespace engine { namespace engine {
namespace meta { namespace meta {
// auto StoragePrototype(const std::string& path);
using namespace mysqlpp;
class MySQLMetaImpl : public Meta { class MySQLMetaImpl : public Meta {
public: public:
MySQLMetaImpl(const DBMetaOptions &options_, const int &mode); MySQLMetaImpl(const DBMetaOptions &options, const int &mode);
~MySQLMetaImpl(); ~MySQLMetaImpl();
Status CreateTable(TableSchema &table_schema) override; Status CreateTable(TableSchema &table_schema) override;
Status DescribeTable(TableSchema &group_info_) override; Status DescribeTable(TableSchema &table_schema) override;
Status HasTable(const std::string &table_id, bool &has_or_not) override; Status HasTable(const std::string &table_id, bool &has_or_not) override;
...@@ -63,11 +62,11 @@ class MySQLMetaImpl : public Meta { ...@@ -63,11 +62,11 @@ class MySQLMetaImpl : public Meta {
const std::vector<int> &file_types, const std::vector<int> &file_types,
std::vector<std::string> &file_ids) override; std::vector<std::string> &file_ids) override;
Status UpdateTableIndex(const std::string &table_id, const TableIndex& index) override; Status UpdateTableIndex(const std::string &table_id, const TableIndex &index) override;
Status UpdateTableFlag(const std::string &table_id, int64_t flag) override; Status UpdateTableFlag(const std::string &table_id, int64_t flag) override;
Status DescribeTableIndex(const std::string &table_id, TableIndex& index) override; Status DescribeTableIndex(const std::string &table_id, TableIndex &index) override;
Status DropTableIndex(const std::string &table_id) override; Status DropTableIndex(const std::string &table_id) override;
...@@ -102,12 +101,12 @@ class MySQLMetaImpl : public Meta { ...@@ -102,12 +101,12 @@ class MySQLMetaImpl : public Meta {
private: private:
Status NextFileId(std::string &file_id); Status NextFileId(std::string &file_id);
Status NextTableId(std::string &table_id); Status NextTableId(std::string &table_id);
Status DiscardFiles(long long to_discard_size); Status DiscardFiles(int64_t to_discard_size);
void ValidateMetaSchema(); void ValidateMetaSchema();
Status Initialize(); Status Initialize();
private: private:
const DBMetaOptions options_; const DBMetaOptions options_;
const int mode_; const int mode_;
......
此差异已折叠。
...@@ -21,22 +21,25 @@ ...@@ -21,22 +21,25 @@
#include "db/Options.h" #include "db/Options.h"
#include <mutex> #include <mutex>
#include <vector>
#include <string>
namespace zilliz { namespace zilliz {
namespace milvus { namespace milvus {
namespace engine { namespace engine {
namespace meta { namespace meta {
auto StoragePrototype(const std::string &path); auto
StoragePrototype(const std::string &path);
class SqliteMetaImpl : public Meta { class SqliteMetaImpl : public Meta {
public: public:
explicit SqliteMetaImpl(const DBMetaOptions &options_); explicit SqliteMetaImpl(const DBMetaOptions &options);
~SqliteMetaImpl(); ~SqliteMetaImpl();
Status CreateTable(TableSchema &table_schema) override; Status CreateTable(TableSchema &table_schema) override;
Status DescribeTable(TableSchema &group_info_) override; Status DescribeTable(TableSchema &table_schema) override;
Status HasTable(const std::string &table_id, bool &has_or_not) override; Status HasTable(const std::string &table_id, bool &has_or_not) override;
...@@ -58,11 +61,11 @@ class SqliteMetaImpl : public Meta { ...@@ -58,11 +61,11 @@ class SqliteMetaImpl : public Meta {
const std::vector<int> &file_types, const std::vector<int> &file_types,
std::vector<std::string> &file_ids) override; std::vector<std::string> &file_ids) override;
Status UpdateTableIndex(const std::string &table_id, const TableIndex& index) override; Status UpdateTableIndex(const std::string &table_id, const TableIndex &index) override;
Status UpdateTableFlag(const std::string &table_id, int64_t flag) override; Status UpdateTableFlag(const std::string &table_id, int64_t flag) override;
Status DescribeTableIndex(const std::string &table_id, TableIndex& index) override; Status DescribeTableIndex(const std::string &table_id, TableIndex &index) override;
Status DropTableIndex(const std::string &table_id) override; Status DropTableIndex(const std::string &table_id) override;
...@@ -96,12 +99,12 @@ class SqliteMetaImpl : public Meta { ...@@ -96,12 +99,12 @@ class SqliteMetaImpl : public Meta {
private: private:
Status NextFileId(std::string &file_id); Status NextFileId(std::string &file_id);
Status NextTableId(std::string &table_id); Status NextTableId(std::string &table_id);
Status DiscardFiles(long to_discard_size); Status DiscardFiles(int64_t to_discard_size);
void ValidateMetaSchema(); void ValidateMetaSchema();
Status Initialize(); Status Initialize();
private: private:
const DBMetaOptions options_; const DBMetaOptions options_;
std::mutex meta_mutex_; std::mutex meta_mutex_;
}; // DBMetaImpl }; // DBMetaImpl
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册