提交 af45efd5 编写于 作者: P peng.xu

Merge branch 'branch-0.4.0' into 'branch-0.4.0'

MS-406 add table flag for meta

See merge request megasearch/milvus!413

Former-commit-id: 6213d8e45b38ede615793bf26a378c56123fbf3f
...@@ -40,6 +40,7 @@ Please mark all change in change log and use the ticket from JIRA. ...@@ -40,6 +40,7 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-394 - Update scheduler unittest - MS-394 - Update scheduler unittest
- MS-400 - Add timestamp record in task state change function - MS-400 - Add timestamp record in task state change function
- MS-402 - Add dump implementation for TaskTableItem - MS-402 - Add dump implementation for TaskTableItem
- MS-406 - Add table flag for meta
- MS-403 - Add GpuCacheMgr - MS-403 - Add GpuCacheMgr
- MS-404 - Release index after search task done avoid memory increment continues - MS-404 - Release index after search task done avoid memory increment continues
- MS-405 - Add delete task support - MS-405 - Add delete task support
......
...@@ -29,6 +29,7 @@ public: ...@@ -29,6 +29,7 @@ public:
virtual Status AllTables(std::vector<meta::TableSchema>& table_schema_array) = 0; virtual Status AllTables(std::vector<meta::TableSchema>& table_schema_array) = 0;
virtual Status GetTableRowCount(const std::string& table_id, uint64_t& row_count) = 0; virtual Status GetTableRowCount(const std::string& table_id, uint64_t& row_count) = 0;
virtual Status PreloadTable(const std::string& table_id) = 0; virtual Status PreloadTable(const std::string& table_id) = 0;
virtual Status UpdateTableFlag(const std::string &table_id, int64_t flag) = 0;
virtual Status InsertVectors(const std::string& table_id_, virtual Status InsertVectors(const std::string& table_id_,
uint64_t n, const float* vectors, IDNumbers& vector_ids_) = 0; uint64_t n, const float* vectors, IDNumbers& vector_ids_) = 0;
......
...@@ -154,6 +154,10 @@ Status DBImpl::PreloadTable(const std::string &table_id) { ...@@ -154,6 +154,10 @@ Status DBImpl::PreloadTable(const std::string &table_id) {
return Status::OK(); return Status::OK();
} }
Status DBImpl::UpdateTableFlag(const std::string &table_id, int64_t flag) {
return meta_ptr_->UpdateTableFlag(table_id, flag);
}
Status DBImpl::GetTableRowCount(const std::string& table_id, uint64_t& row_count) { Status DBImpl::GetTableRowCount(const std::string& table_id, uint64_t& row_count) {
return meta_ptr_->Count(table_id, row_count); return meta_ptr_->Count(table_id, row_count);
} }
......
...@@ -36,40 +36,32 @@ class DBImpl : public DB { ...@@ -36,40 +36,32 @@ class DBImpl : public DB {
explicit DBImpl(const Options &options); explicit DBImpl(const Options &options);
Status Status CreateTable(meta::TableSchema &table_schema) override;
CreateTable(meta::TableSchema &table_schema) override;
Status Status DeleteTable(const std::string &table_id, const meta::DatesT &dates) override;
DeleteTable(const std::string &table_id, const meta::DatesT &dates) override;
Status Status DescribeTable(meta::TableSchema &table_schema) override;
DescribeTable(meta::TableSchema &table_schema) override;
Status Status HasTable(const std::string &table_id, bool &has_or_not) override;
HasTable(const std::string &table_id, bool &has_or_not) override;
Status Status AllTables(std::vector<meta::TableSchema> &table_schema_array) override;
AllTables(std::vector<meta::TableSchema> &table_schema_array) override;
Status Status PreloadTable(const std::string &table_id) override;
PreloadTable(const std::string &table_id) override;
Status Status UpdateTableFlag(const std::string &table_id, int64_t flag);
GetTableRowCount(const std::string &table_id, uint64_t &row_count) override;
Status Status GetTableRowCount(const std::string &table_id, uint64_t &row_count) override;
InsertVectors(const std::string &table_id, uint64_t n, const float *vectors, IDNumbers &vector_ids) override;
Status Status InsertVectors(const std::string &table_id, uint64_t n, const float *vectors, IDNumbers &vector_ids) override;
Query(const std::string &table_id,
Status Query(const std::string &table_id,
uint64_t k, uint64_t k,
uint64_t nq, uint64_t nq,
uint64_t nprobe, uint64_t nprobe,
const float *vectors, const float *vectors,
QueryResults &results) override; QueryResults &results) override;
Status Status Query(const std::string &table_id,
Query(const std::string &table_id,
uint64_t k, uint64_t k,
uint64_t nq, uint64_t nq,
uint64_t nprobe, uint64_t nprobe,
...@@ -77,8 +69,7 @@ class DBImpl : public DB { ...@@ -77,8 +69,7 @@ class DBImpl : public DB {
const meta::DatesT &dates, const meta::DatesT &dates,
QueryResults &results) override; QueryResults &results) override;
Status Status Query(const std::string &table_id,
Query(const std::string &table_id,
const std::vector<std::string> &file_ids, const std::vector<std::string> &file_ids,
uint64_t k, uint64_t k,
uint64_t nq, uint64_t nq,
......
...@@ -153,6 +153,10 @@ bool IsSameIndex(const TableIndex& index1, const TableIndex& index2) { ...@@ -153,6 +153,10 @@ bool IsSameIndex(const TableIndex& index1, const TableIndex& index2) {
&& index1.metric_type_ == index2.metric_type_; && index1.metric_type_ == index2.metric_type_;
} }
bool UserDefinedId(int64_t flag) {
return flag & meta::FLAG_MASK_USERID;
}
} // namespace utils } // namespace utils
} // namespace engine } // namespace engine
} // namespace milvus } // namespace milvus
......
...@@ -27,6 +27,8 @@ Status DeleteTableFilePath(const DBMetaOptions& options, meta::TableFileSchema& ...@@ -27,6 +27,8 @@ Status DeleteTableFilePath(const DBMetaOptions& options, meta::TableFileSchema&
bool IsSameIndex(const TableIndex& index1, const TableIndex& index2); bool IsSameIndex(const TableIndex& index1, const TableIndex& index2);
bool UserDefinedId(int64_t flag);
} // namespace utils } // namespace utils
} // namespace engine } // namespace engine
} // namespace milvus } // namespace milvus
......
...@@ -42,6 +42,9 @@ class Meta { ...@@ -42,6 +42,9 @@ class Meta {
virtual Status virtual Status
UpdateTableIndexParam(const std::string &table_id, const TableIndex& index) = 0; UpdateTableIndexParam(const std::string &table_id, const TableIndex& index) = 0;
virtual Status
UpdateTableFlag(const std::string &table_id, int64_t flag) = 0;
virtual Status virtual Status
DeleteTable(const std::string &table_id) = 0; DeleteTable(const std::string &table_id) = 0;
......
...@@ -22,6 +22,8 @@ constexpr int32_t DEFAULT_NLIST = 16384; ...@@ -22,6 +22,8 @@ constexpr int32_t DEFAULT_NLIST = 16384;
constexpr int32_t DEFAULT_INDEX_FILE_SIZE = 1024*ONE_MB; constexpr int32_t DEFAULT_INDEX_FILE_SIZE = 1024*ONE_MB;
constexpr int32_t DEFAULT_METRIC_TYPE = (int)MetricType::L2; constexpr int32_t DEFAULT_METRIC_TYPE = (int)MetricType::L2;
constexpr int64_t FLAG_MASK_USERID = 1;
typedef int DateT; typedef int DateT;
const DateT EmptyDate = -1; const DateT EmptyDate = -1;
typedef std::vector<DateT> DatesT; typedef std::vector<DateT> DatesT;
...@@ -37,6 +39,7 @@ struct TableSchema { ...@@ -37,6 +39,7 @@ struct TableSchema {
int32_t state_ = (int)NORMAL; int32_t state_ = (int)NORMAL;
uint16_t dimension_ = 0; uint16_t dimension_ = 0;
int64_t created_on_ = 0; int64_t created_on_ = 0;
int64_t flag_ = 0;
int32_t engine_type_ = DEFAULT_ENGINE_TYPE; int32_t engine_type_ = DEFAULT_ENGINE_TYPE;
int32_t nlist_ = DEFAULT_NLIST; int32_t nlist_ = DEFAULT_NLIST;
int32_t index_file_size_ = DEFAULT_INDEX_FILE_SIZE; int32_t index_file_size_ = DEFAULT_INDEX_FILE_SIZE;
......
...@@ -155,6 +155,7 @@ Status MySQLMetaImpl::Initialize() { ...@@ -155,6 +155,7 @@ Status MySQLMetaImpl::Initialize() {
"state INT NOT NULL, " << "state INT NOT NULL, " <<
"dimension SMALLINT NOT NULL, " << "dimension SMALLINT NOT NULL, " <<
"created_on BIGINT NOT NULL, " << "created_on BIGINT NOT NULL, " <<
"flag BIGINT DEFAULT 0 NOT NULL, " <<
"engine_type INT DEFAULT 1 NOT NULL, " << "engine_type INT DEFAULT 1 NOT NULL, " <<
"nlist INT DEFAULT 16384 NOT NULL, " << "nlist INT DEFAULT 16384 NOT NULL, " <<
"index_file_size INT DEFAULT 1024 NOT NULL, " << "index_file_size INT DEFAULT 1024 NOT NULL, " <<
...@@ -425,7 +426,7 @@ Status MySQLMetaImpl::UpdateTableIndexParam(const std::string &table_id, const T ...@@ -425,7 +426,7 @@ Status MySQLMetaImpl::UpdateTableIndexParam(const std::string &table_id, const T
"engine_type_ = " << index.engine_type_ << ", " << "engine_type_ = " << index.engine_type_ << ", " <<
"nlist = " << index.nlist_ << ", " << "nlist = " << index.nlist_ << ", " <<
"index_file_size = " << index.index_file_size_*ONE_MB << ", " << "index_file_size = " << index.index_file_size_*ONE_MB << ", " <<
"metric_type = " << index.metric_type_ << ", " << "metric_type = " << index.metric_type_ << " " <<
"WHERE id = " << quote << table_id << ";"; "WHERE id = " << quote << table_id << ";";
ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableIndexParam: " << updateTableIndexParamQuery.str(); ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableIndexParam: " << updateTableIndexParamQuery.str();
...@@ -455,6 +456,46 @@ Status MySQLMetaImpl::UpdateTableIndexParam(const std::string &table_id, const T ...@@ -455,6 +456,46 @@ Status MySQLMetaImpl::UpdateTableIndexParam(const std::string &table_id, const T
return Status::OK(); return Status::OK();
} }
Status MySQLMetaImpl::UpdateTableFlag(const std::string &table_id, int64_t flag) {
try {
MetricCollector metric;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
if (connectionPtr == nullptr) {
return Status::Error("Failed to connect to database server");
}
Query updateTableFlagQuery = connectionPtr->query();
updateTableFlagQuery << "UPDATE Tables " <<
"SET flag = " << flag << " " <<
"WHERE id = " << quote << table_id << ";";
ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFlag: " << updateTableFlagQuery.str();
if (!updateTableFlagQuery.exec()) {
ENGINE_LOG_ERROR << "QUERY ERROR WHEN UPDATING TABLE FLAG";
return Status::DBTransactionError("QUERY ERROR WHEN UPDATING TABLE FLAG",
updateTableFlagQuery.error());
}
} //Scoped Connection
} catch (const BadQuery &er) {
// Handle any query errors
ENGINE_LOG_ERROR << "QUERY ERROR WHEN UPDATING TABLE FLAG" << ": " << er.what();
return Status::DBTransactionError("QUERY ERROR WHEN UPDATING TABLE FLAG", er.what());
} catch (const Exception &er) {
// Catch-all for any other MySQL++ exceptions
ENGINE_LOG_ERROR << "GENERAL ERROR WHEN UPDATING TABLE FLAG" << ": " << er.what();
return Status::DBTransactionError("GENERAL ERROR WHEN UPDATING TABLE FLAG", er.what());
}
return Status::OK();
}
Status MySQLMetaImpl::DescribeTableIndex(const std::string &table_id, TableIndex& index) { Status MySQLMetaImpl::DescribeTableIndex(const std::string &table_id, TableIndex& index) {
try { try {
MetricCollector metric; MetricCollector metric;
......
...@@ -26,14 +26,19 @@ class MySQLMetaImpl : public Meta { ...@@ -26,14 +26,19 @@ class MySQLMetaImpl : public Meta {
MySQLMetaImpl(const DBMetaOptions &options_, const int &mode); MySQLMetaImpl(const DBMetaOptions &options_, const int &mode);
Status CreateTable(TableSchema &table_schema) override; Status CreateTable(TableSchema &table_schema) override;
Status DescribeTable(TableSchema &group_info_) override; Status DescribeTable(TableSchema &group_info_) override;
Status HasTable(const std::string &table_id, bool &has_or_not) override; Status HasTable(const std::string &table_id, bool &has_or_not) override;
Status AllTables(std::vector<TableSchema> &table_schema_array) override; Status AllTables(std::vector<TableSchema> &table_schema_array) override;
Status DeleteTable(const std::string &table_id) override; Status DeleteTable(const std::string &table_id) override;
Status DeleteTableFiles(const std::string &table_id) override; Status DeleteTableFiles(const std::string &table_id) override;
Status CreateTableFile(TableFileSchema &file_schema) override; Status CreateTableFile(TableFileSchema &file_schema) override;
Status DropPartitionsByDates(const std::string &table_id, Status DropPartitionsByDates(const std::string &table_id,
const DatesT &dates) override; const DatesT &dates) override;
...@@ -45,6 +50,8 @@ class MySQLMetaImpl : public Meta { ...@@ -45,6 +50,8 @@ class MySQLMetaImpl : public Meta {
Status UpdateTableIndexParam(const std::string &table_id, const TableIndex& index) override; Status UpdateTableIndexParam(const std::string &table_id, const TableIndex& index) override;
Status UpdateTableFlag(const std::string &table_id, int64_t flag);
Status DescribeTableIndex(const std::string &table_id, TableIndex& index) override; Status DescribeTableIndex(const std::string &table_id, TableIndex& index) override;
Status DropTableIndex(const std::string &table_id) override; Status DropTableIndex(const std::string &table_id) override;
......
...@@ -62,6 +62,7 @@ inline auto StoragePrototype(const std::string &path) { ...@@ -62,6 +62,7 @@ inline auto StoragePrototype(const std::string &path) {
make_column("state", &TableSchema::state_), make_column("state", &TableSchema::state_),
make_column("dimension", &TableSchema::dimension_), make_column("dimension", &TableSchema::dimension_),
make_column("created_on", &TableSchema::created_on_), make_column("created_on", &TableSchema::created_on_),
make_column("flag", &TableSchema::flag_, default_value(0)),
make_column("engine_type", &TableSchema::engine_type_), make_column("engine_type", &TableSchema::engine_type_),
make_column("nlist", &TableSchema::nlist_), make_column("nlist", &TableSchema::nlist_),
make_column("index_file_size", &TableSchema::index_file_size_), make_column("index_file_size", &TableSchema::index_file_size_),
...@@ -267,6 +268,7 @@ Status SqliteMetaImpl::DescribeTable(TableSchema &table_schema) { ...@@ -267,6 +268,7 @@ Status SqliteMetaImpl::DescribeTable(TableSchema &table_schema) {
&TableSchema::state_, &TableSchema::state_,
&TableSchema::dimension_, &TableSchema::dimension_,
&TableSchema::created_on_, &TableSchema::created_on_,
&TableSchema::flag_,
&TableSchema::engine_type_, &TableSchema::engine_type_,
&TableSchema::nlist_, &TableSchema::nlist_,
&TableSchema::index_file_size_, &TableSchema::index_file_size_,
...@@ -279,10 +281,11 @@ Status SqliteMetaImpl::DescribeTable(TableSchema &table_schema) { ...@@ -279,10 +281,11 @@ Status SqliteMetaImpl::DescribeTable(TableSchema &table_schema) {
table_schema.state_ = std::get<1>(groups[0]); table_schema.state_ = std::get<1>(groups[0]);
table_schema.dimension_ = std::get<2>(groups[0]); table_schema.dimension_ = std::get<2>(groups[0]);
table_schema.created_on_ = std::get<3>(groups[0]); table_schema.created_on_ = std::get<3>(groups[0]);
table_schema.engine_type_ = std::get<4>(groups[0]); table_schema.flag_ = std::get<4>(groups[0]);
table_schema.nlist_ = std::get<5>(groups[0]); table_schema.engine_type_ = std::get<5>(groups[0]);
table_schema.index_file_size_ = std::get<6>(groups[0]); table_schema.nlist_ = std::get<6>(groups[0]);
table_schema.metric_type_ = std::get<7>(groups[0]); table_schema.index_file_size_ = std::get<7>(groups[0]);
table_schema.metric_type_ = std::get<8>(groups[0]);
} else { } else {
return Status::NotFound("Table " + table_schema.table_id_ + " not found"); return Status::NotFound("Table " + table_schema.table_id_ + " not found");
} }
...@@ -358,7 +361,8 @@ Status SqliteMetaImpl::UpdateTableIndexParam(const std::string &table_id, const ...@@ -358,7 +361,8 @@ Status SqliteMetaImpl::UpdateTableIndexParam(const std::string &table_id, const
auto tables = ConnectorPtr->select(columns(&TableSchema::id_, auto tables = ConnectorPtr->select(columns(&TableSchema::id_,
&TableSchema::state_, &TableSchema::state_,
&TableSchema::dimension_, &TableSchema::dimension_,
&TableSchema::created_on_), &TableSchema::created_on_,
&TableSchema::flag_),
where(c(&TableSchema::table_id_) == table_id where(c(&TableSchema::table_id_) == table_id
and c(&TableSchema::state_) != (int) TableSchema::TO_DELETE)); and c(&TableSchema::state_) != (int) TableSchema::TO_DELETE));
...@@ -369,6 +373,7 @@ Status SqliteMetaImpl::UpdateTableIndexParam(const std::string &table_id, const ...@@ -369,6 +373,7 @@ Status SqliteMetaImpl::UpdateTableIndexParam(const std::string &table_id, const
table_schema.state_ = std::get<1>(tables[0]); table_schema.state_ = std::get<1>(tables[0]);
table_schema.dimension_ = std::get<2>(tables[0]); table_schema.dimension_ = std::get<2>(tables[0]);
table_schema.created_on_ = std::get<3>(tables[0]); table_schema.created_on_ = std::get<3>(tables[0]);
table_schema.flag_ = std::get<4>(tables[0]);
table_schema.engine_type_ = index.engine_type_; table_schema.engine_type_ = index.engine_type_;
table_schema.nlist_ = index.nlist_; table_schema.nlist_ = index.nlist_;
table_schema.index_file_size_ = index.index_file_size_*ONE_MB; table_schema.index_file_size_ = index.index_file_size_*ONE_MB;
...@@ -394,6 +399,28 @@ Status SqliteMetaImpl::UpdateTableIndexParam(const std::string &table_id, const ...@@ -394,6 +399,28 @@ Status SqliteMetaImpl::UpdateTableIndexParam(const std::string &table_id, const
std::string msg = "Encounter exception when update table index: table_id = " + table_id; std::string msg = "Encounter exception when update table index: table_id = " + table_id;
return HandleException(msg, e); return HandleException(msg, e);
} }
return Status::OK();
}
Status SqliteMetaImpl::UpdateTableFlag(const std::string &table_id, int64_t flag) {
try {
MetricCollector metric;
//set all backup file to raw
ConnectorPtr->update_all(
set(
c(&TableSchema::flag_) = flag
),
where(
c(&TableSchema::table_id_) == table_id
));
} catch (std::exception &e) {
std::string msg = "Encounter exception when update table flag: table_id = " + table_id;
return HandleException(msg, e);
}
return Status::OK(); return Status::OK();
} }
...@@ -489,6 +516,7 @@ Status SqliteMetaImpl::AllTables(std::vector<TableSchema>& table_schema_array) { ...@@ -489,6 +516,7 @@ Status SqliteMetaImpl::AllTables(std::vector<TableSchema>& table_schema_array) {
&TableSchema::table_id_, &TableSchema::table_id_,
&TableSchema::dimension_, &TableSchema::dimension_,
&TableSchema::created_on_, &TableSchema::created_on_,
&TableSchema::flag_,
&TableSchema::engine_type_, &TableSchema::engine_type_,
&TableSchema::nlist_, &TableSchema::nlist_,
&TableSchema::index_file_size_, &TableSchema::index_file_size_,
...@@ -498,12 +526,13 @@ Status SqliteMetaImpl::AllTables(std::vector<TableSchema>& table_schema_array) { ...@@ -498,12 +526,13 @@ Status SqliteMetaImpl::AllTables(std::vector<TableSchema>& table_schema_array) {
TableSchema schema; TableSchema schema;
schema.id_ = std::get<0>(table); schema.id_ = std::get<0>(table);
schema.table_id_ = std::get<1>(table); schema.table_id_ = std::get<1>(table);
schema.created_on_ = std::get<2>(table); schema.dimension_ = std::get<2>(table);
schema.dimension_ = std::get<3>(table); schema.created_on_ = std::get<3>(table);
schema.engine_type_ = std::get<4>(table); schema.flag_ = std::get<4>(table);
schema.nlist_ = std::get<5>(table); schema.engine_type_ = std::get<5>(table);
schema.index_file_size_ = std::get<6>(table); schema.nlist_ = std::get<6>(table);
schema.metric_type_ = std::get<7>(table); schema.index_file_size_ = std::get<7>(table);
schema.metric_type_ = std::get<8>(table);
table_schema_array.emplace_back(schema); table_schema_array.emplace_back(schema);
} }
......
...@@ -21,82 +21,64 @@ class SqliteMetaImpl : public Meta { ...@@ -21,82 +21,64 @@ class SqliteMetaImpl : public Meta {
public: public:
explicit SqliteMetaImpl(const DBMetaOptions &options_); explicit SqliteMetaImpl(const DBMetaOptions &options_);
Status Status CreateTable(TableSchema &table_schema) override;
CreateTable(TableSchema &table_schema) override;
Status Status DescribeTable(TableSchema &group_info_) override;
DescribeTable(TableSchema &group_info_) override;
Status Status HasTable(const std::string &table_id, bool &has_or_not) override;
HasTable(const std::string &table_id, bool &has_or_not) override;
Status Status AllTables(std::vector<TableSchema> &table_schema_array) override;
AllTables(std::vector<TableSchema> &table_schema_array) override;
Status Status DeleteTable(const std::string &table_id) override;
DeleteTable(const std::string &table_id) override;
Status Status DeleteTableFiles(const std::string &table_id) override;
DeleteTableFiles(const std::string &table_id) override;
Status Status CreateTableFile(TableFileSchema &file_schema) override;
CreateTableFile(TableFileSchema &file_schema) override;
Status Status DropPartitionsByDates(const std::string &table_id, const DatesT &dates) override;
DropPartitionsByDates(const std::string &table_id, const DatesT &dates) override;
Status Status GetTableFiles(const std::string &table_id,
GetTableFiles(const std::string &table_id, const std::vector<size_t> &ids, TableFilesSchema &table_files) override; const std::vector<size_t> &ids,
TableFilesSchema &table_files) override;
Status HasNonIndexFiles(const std::string &table_id, bool &has) override;
Status Status UpdateTableIndexParam(const std::string &table_id, const TableIndex& index) override;
HasNonIndexFiles(const std::string &table_id, bool &has) override;
Status Status UpdateTableFlag(const std::string &table_id, int64_t flag) override;
UpdateTableIndexParam(const std::string &table_id, const TableIndex& index) override;
Status Status DescribeTableIndex(const std::string &table_id, TableIndex& index) override;
DescribeTableIndex(const std::string &table_id, TableIndex& index) override;
Status Status DropTableIndex(const std::string &table_id) override;
DropTableIndex(const std::string &table_id) override;
Status Status UpdateTableFilesToIndex(const std::string &table_id) override;
UpdateTableFilesToIndex(const std::string &table_id) override;
Status Status UpdateTableFile(TableFileSchema &file_schema) override;
UpdateTableFile(TableFileSchema &file_schema) override;
Status Status UpdateTableFiles(TableFilesSchema &files) override;
UpdateTableFiles(TableFilesSchema &files) override;
Status Status FilesToSearch(const std::string &table_id,
FilesToSearch(const std::string &table_id, const DatesT &partition, DatePartionedTableFilesSchema &files) override; const DatesT &partition,
DatePartionedTableFilesSchema &files) override;
Status FilesToSearch(const std::string &table_id, Status FilesToSearch(const std::string &table_id,
const std::vector<size_t> &ids, const std::vector<size_t> &ids,
const DatesT &partition, const DatesT &partition,
DatePartionedTableFilesSchema &files) override; DatePartionedTableFilesSchema &files) override;
Status Status FilesToMerge(const std::string &table_id, DatePartionedTableFilesSchema &files) override;
FilesToMerge(const std::string &table_id, DatePartionedTableFilesSchema &files) override;
Status Status FilesToIndex(TableFilesSchema &) override;
FilesToIndex(TableFilesSchema &) override;
Status Status Archive() override;
Archive() override;
Status Status Size(uint64_t &result) override;
Size(uint64_t &result) override;
Status Status CleanUp() override;
CleanUp() override;
Status Status CleanUpFilesWithTTL(uint16_t seconds) override;
CleanUpFilesWithTTL(uint16_t seconds) override;
Status Status DropAll() override;
DropAll() override;
Status Count(const std::string &table_id, uint64_t &result) override; Status Count(const std::string &table_id, uint64_t &result) override;
......
...@@ -16,7 +16,7 @@ namespace engine { ...@@ -16,7 +16,7 @@ namespace engine {
namespace { namespace {
static constexpr size_t PARALLEL_REDUCE_THRESHOLD = 10000; static constexpr size_t PARALLEL_REDUCE_THRESHOLD = 1000000;
static constexpr size_t PARALLEL_REDUCE_BATCH = 1000; static constexpr size_t PARALLEL_REDUCE_BATCH = 1000;
bool NeedParallelReduce(uint64_t nq, uint64_t topk) { bool NeedParallelReduce(uint64_t nq, uint64_t topk) {
......
...@@ -236,7 +236,6 @@ ClientTest::Test(const std::string& address, const std::string& port) { ...@@ -236,7 +236,6 @@ ClientTest::Test(const std::string& address, const std::string& port) {
std::vector<std::pair<int64_t, RowRecord>> search_record_array; std::vector<std::pair<int64_t, RowRecord>> search_record_array;
{//insert vectors {//insert vectors
std::vector<int64_t> record_ids;
for (int i = 0; i < ADD_VECTOR_LOOP; i++) {//add vectors for (int i = 0; i < ADD_VECTOR_LOOP; i++) {//add vectors
std::vector<RowRecord> record_array; std::vector<RowRecord> record_array;
int64_t begin_index = i * BATCH_ROW_COUNT; int64_t begin_index = i * BATCH_ROW_COUNT;
...@@ -249,6 +248,12 @@ ClientTest::Test(const std::string& address, const std::string& port) { ...@@ -249,6 +248,12 @@ ClientTest::Test(const std::string& address, const std::string& port) {
} }
#endif #endif
std::vector<int64_t> record_ids;
//generate user defined ids
for(int k = 0; k < BATCH_ROW_COUNT; k++) {
record_ids.push_back(i*BATCH_ROW_COUNT+k);
}
auto start = std::chrono::high_resolution_clock::now(); auto start = std::chrono::high_resolution_clock::now();
Status stat = conn->Insert(TABLE_NAME, record_array, record_ids); Status stat = conn->Insert(TABLE_NAME, record_array, record_ids);
......
...@@ -74,12 +74,6 @@ DBWrapper::DBWrapper() { ...@@ -74,12 +74,6 @@ DBWrapper::DBWrapper() {
} }
} }
std::string metric_type = engine_config.GetValue(CONFIG_METRICTYPE, "L2");
if(metric_type != "L2" && metric_type != "IP") {
std::cout << "ERROR! Illegal metric type: " << metric_type << ", available options: L2 or IP" << std::endl;
kill(0, SIGUSR1);
}
//set archive config //set archive config
engine::ArchiveConf::CriteriaT criterial; engine::ArchiveConf::CriteriaT criterial;
int64_t disk = db_config.GetInt64Value(CONFIG_DB_ARCHIVE_DISK, 0); int64_t disk = db_config.GetInt64Value(CONFIG_DB_ARCHIVE_DISK, 0);
......
...@@ -17,7 +17,6 @@ namespace server { ...@@ -17,7 +17,6 @@ namespace server {
static const char* CONFIG_SERVER = "server_config"; static const char* CONFIG_SERVER = "server_config";
static const char* CONFIG_SERVER_ADDRESS = "address"; static const char* CONFIG_SERVER_ADDRESS = "address";
static const char* CONFIG_SERVER_PORT = "port"; static const char* CONFIG_SERVER_PORT = "port";
static const char* CONFIG_SERVER_PROTOCOL = "transfer_protocol";
static const char* CONFIG_CLUSTER_MODE = "mode"; static const char* CONFIG_CLUSTER_MODE = "mode";
static const char* CONFIG_GPU_INDEX = "gpu_index"; static const char* CONFIG_GPU_INDEX = "gpu_index";
...@@ -41,9 +40,6 @@ static const char* CONFIG_INSERT_CACHE_IMMEDIATELY = "insert_cache_immediately"; ...@@ -41,9 +40,6 @@ static const char* CONFIG_INSERT_CACHE_IMMEDIATELY = "insert_cache_immediately";
static const char* CONFIG_GPU_IDS = "gpu_ids"; static const char* CONFIG_GPU_IDS = "gpu_ids";
static const char *GPU_CACHE_FREE_PERCENT = "gpu_cache_free_percent"; static const char *GPU_CACHE_FREE_PERCENT = "gpu_cache_free_percent";
static const char* CONFIG_LICENSE = "license_config";
static const char* CONFIG_LICENSE_PATH = "license_path";
static const char* CONFIG_METRIC = "metric_config"; static const char* CONFIG_METRIC = "metric_config";
static const char* CONFIG_METRIC_IS_STARTUP = "is_startup"; static const char* CONFIG_METRIC_IS_STARTUP = "is_startup";
static const char* CONFIG_METRIC_COLLECTOR = "collector"; static const char* CONFIG_METRIC_COLLECTOR = "collector";
...@@ -51,13 +47,8 @@ static const char* CONFIG_PROMETHEUS = "prometheus_config"; ...@@ -51,13 +47,8 @@ static const char* CONFIG_PROMETHEUS = "prometheus_config";
static const char* CONFIG_METRIC_PROMETHEUS_PORT = "port"; static const char* CONFIG_METRIC_PROMETHEUS_PORT = "port";
static const std::string CONFIG_ENGINE = "engine_config"; static const std::string CONFIG_ENGINE = "engine_config";
static const std::string CONFIG_NPROBE = "nprobe";
static const std::string CONFIG_NLIST = "nlist";
static const std::string CONFIG_DCBT = "use_blas_threshold"; static const std::string CONFIG_DCBT = "use_blas_threshold";
static const std::string CONFIG_METRICTYPE = "metric_type";
static const std::string CONFIG_OMP_THREAD_NUM = "omp_thread_num"; static const std::string CONFIG_OMP_THREAD_NUM = "omp_thread_num";
static const std::string CONFIG_USE_HYBRID_INDEX = "use_hybrid_index";
static const std::string CONFIG_HYBRID_INDEX_GPU = "hybrid_index_gpu";
class ServerConfig { class ServerConfig {
public: public:
......
...@@ -12,9 +12,12 @@ ...@@ -12,9 +12,12 @@
#include "../DBWrapper.h" #include "../DBWrapper.h"
#include "version.h" #include "version.h"
#include "GrpcMilvusServer.h" #include "GrpcMilvusServer.h"
#include "db/Utils.h"
#include "src/server/Server.h" #include "src/server/Server.h"
#include <string.h>
namespace zilliz { namespace zilliz {
namespace milvus { namespace milvus {
namespace server { namespace server {
...@@ -435,6 +438,23 @@ InsertTask::OnExecute() { ...@@ -435,6 +438,23 @@ InsertTask::OnExecute() {
} }
} }
//all user provide id, or all internal id
uint64_t row_count = 0;
DBWrapper::DB()->GetTableRowCount(table_info.table_id_, row_count);
bool empty_table = (row_count == 0);
bool user_provide_ids = !insert_param_.row_id_array().empty();
if(!empty_table) {
//user already provided id before, all insert action require user id
if(engine::utils::UserDefinedId(table_info.flag_) && !user_provide_ids) {
return SetError(SERVER_INVALID_ARGUMENT, "Table vector ids are user defined, please provide id for this batch");
}
//user didn't provided id before, no need to provide user id
if(!engine::utils::UserDefinedId(table_info.flag_) && user_provide_ids) {
return SetError(SERVER_INVALID_ARGUMENT, "Table vector ids are auto generated, no need to provide id for this batch");
}
}
rc.RecordSection("check validation"); rc.RecordSection("check validation");
#ifdef MILVUS_ENABLE_PROFILING #ifdef MILVUS_ENABLE_PROFILING
...@@ -469,8 +489,10 @@ InsertTask::OnExecute() { ...@@ -469,8 +489,10 @@ InsertTask::OnExecute() {
//step 4: insert vectors //step 4: insert vectors
auto vec_count = (uint64_t) insert_param_.row_record_array_size(); auto vec_count = (uint64_t) insert_param_.row_record_array_size();
std::vector<int64_t> vec_ids(insert_param_.row_id_array_size(), 0); std::vector<int64_t> vec_ids(insert_param_.row_id_array_size(), 0);
for (auto i = 0; i < insert_param_.row_id_array_size(); i++) { if(!insert_param_.row_id_array().empty()) {
vec_ids[i] = insert_param_.row_id_array(i); const int64_t* src_data = insert_param_.row_id_array().data();
int64_t* target_data = vec_ids.data();
memcpy(target_data, src_data, (size_t)(sizeof(int64_t)*insert_param_.row_id_array_size()));
} }
stat = DBWrapper::DB()->InsertVectors(insert_param_.table_name(), vec_count, vec_f.data(), vec_ids); stat = DBWrapper::DB()->InsertVectors(insert_param_.table_name(), vec_count, vec_f.data(), vec_ids);
...@@ -489,6 +511,12 @@ InsertTask::OnExecute() { ...@@ -489,6 +511,12 @@ InsertTask::OnExecute() {
return SetError(SERVER_ILLEGAL_VECTOR_ID, msg); return SetError(SERVER_ILLEGAL_VECTOR_ID, msg);
} }
//step 5: update table flag
if(empty_table && user_provide_ids) {
stat = DBWrapper::DB()->UpdateTableFlag(insert_param_.table_name(),
table_info.flag_ | engine::meta::FLAG_MASK_USERID);
}
#ifdef MILVUS_ENABLE_PROFILING #ifdef MILVUS_ENABLE_PROFILING
ProfilerStop(); ProfilerStop();
#endif #endif
......
...@@ -21,7 +21,8 @@ TEST(MetricbaseTest, METRICBASE_TEST){ ...@@ -21,7 +21,8 @@ TEST(MetricbaseTest, METRICBASE_TEST){
instance.RawFileSizeHistogramObserve(1.0); instance.RawFileSizeHistogramObserve(1.0);
instance.IndexFileSizeHistogramObserve(1.0); instance.IndexFileSizeHistogramObserve(1.0);
instance.BuildIndexDurationSecondsHistogramObserve(1.0); instance.BuildIndexDurationSecondsHistogramObserve(1.0);
instance.CacheUsageGaugeSet(1.0); instance.CpuCacheUsageGaugeSet(1.0);
instance.GpuCacheUsageGaugeSet(1.0);
instance.MetaAccessTotalIncrement(); instance.MetaAccessTotalIncrement();
instance.MetaAccessDurationSecondsHistogramObserve(1.0); instance.MetaAccessDurationSecondsHistogramObserve(1.0);
instance.FaissDiskLoadDurationSecondsHistogramObserve(1.0); instance.FaissDiskLoadDurationSecondsHistogramObserve(1.0);
......
...@@ -22,7 +22,8 @@ TEST(PrometheusTest, PROMETHEUS_TEST){ ...@@ -22,7 +22,8 @@ TEST(PrometheusTest, PROMETHEUS_TEST){
instance.RawFileSizeHistogramObserve(1.0); instance.RawFileSizeHistogramObserve(1.0);
instance.IndexFileSizeHistogramObserve(1.0); instance.IndexFileSizeHistogramObserve(1.0);
instance.BuildIndexDurationSecondsHistogramObserve(1.0); instance.BuildIndexDurationSecondsHistogramObserve(1.0);
instance.CacheUsageGaugeSet(1.0); instance.CpuCacheUsageGaugeSet(1.0);
instance.GpuCacheUsageGaugeSet(1.0);
instance.MetaAccessTotalIncrement(); instance.MetaAccessTotalIncrement();
instance.MetaAccessDurationSecondsHistogramObserve(1.0); instance.MetaAccessDurationSecondsHistogramObserve(1.0);
instance.FaissDiskLoadDurationSecondsHistogramObserve(1.0); instance.FaissDiskLoadDurationSecondsHistogramObserve(1.0);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册