提交 58070586 编写于 作者: Y Yu Kun

Merge remote-tracking branch 'upstream/branch-0.4.0' into branch-0.4.0


Former-commit-id: dbf69efc70adfafa11b40c3d470cdb4d39aae8f4
......@@ -39,6 +39,7 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-391 - Add PushTaskToNeighbourHasExecutor action
- MS-394 - Update scheduler unittest
- MS-400 - Add timestamp record in task state change function
- MS-402 - Add dump implementation for TaskTableItem
## New Feature
- MS-343 - Implement ResourceMgr
......
# Define a function that check last file modification
function(Check_Last_Modify cache_ignore_file_path working_dir last_modified_commit_id)
function(Check_Last_Modify cache_check_lists_file_path working_dir last_modified_commit_id)
if(EXISTS "${working_dir}")
if(EXISTS "${cache_ignore_file_path}")
if(EXISTS "${cache_check_lists_file_path}")
set(GIT_LOG_SKIP_NUM 0)
set(_MATCH_ALL ON CACHE BOOL "Match all")
set(_LOOP_STATUS ON CACHE BOOL "Whether out of loop")
file(STRINGS ${cache_ignore_file_path} CACHE_IGNORE_TXT)
file(STRINGS ${cache_check_lists_file_path} CACHE_IGNORE_TXT)
while(_LOOP_STATUS)
foreach(_IGNORE_ENTRY ${CACHE_IGNORE_TXT})
if(NOT _IGNORE_ENTRY MATCHES "^[^#]+")
......
......@@ -5,19 +5,25 @@
******************************************************************************/
#pragma once
#include <stdint.h>
namespace zilliz {
namespace milvus {
namespace engine {
constexpr size_t K = 1024UL;
constexpr size_t M = K * K;
constexpr size_t G = K * M;
constexpr size_t T = K * G;
constexpr uint64_t K = 1024UL;
constexpr uint64_t M = K * K;
constexpr uint64_t G = K * M;
constexpr uint64_t T = K * G;
constexpr size_t MAX_TABLE_FILE_MEM = 128 * M;
constexpr uint64_t MAX_TABLE_FILE_MEM = 128 * M;
constexpr int VECTOR_TYPE_SIZE = sizeof(float);
static constexpr uint64_t ONE_KB = K;
static constexpr uint64_t ONE_MB = ONE_KB*ONE_KB;
static constexpr uint64_t ONE_GB = ONE_KB*ONE_MB;
} // namespace engine
} // namespace milvus
} // namespace zilliz
......@@ -60,27 +60,6 @@ void CollectQueryMetrics(double total_time, size_t nq) {
server::Metrics::GetInstance().QueryVectorResponsePerSecondGaugeSet(double (nq) / total_time);
}
#if 0
void CollectFileMetrics(int file_type, size_t file_size, double total_time) {
switch(file_type) {
case meta::TableFileSchema::RAW:
case meta::TableFileSchema::TO_INDEX: {
server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
server::Metrics::GetInstance().RawFileSizeHistogramObserve(file_size);
server::Metrics::GetInstance().RawFileSizeTotalIncrement(file_size);
server::Metrics::GetInstance().RawFileSizeGaugeSet(file_size);
break;
}
default: {
server::Metrics::GetInstance().SearchIndexDataDurationSecondsHistogramObserve(total_time);
server::Metrics::GetInstance().IndexFileSizeHistogramObserve(file_size);
server::Metrics::GetInstance().IndexFileSizeTotalIncrement(file_size);
server::Metrics::GetInstance().IndexFileSizeGaugeSet(file_size);
break;
}
}
}
#endif
}
......@@ -473,11 +452,7 @@ Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date,
}
//step 4: update table files state
if (index_size >= options_.index_trigger_size) {
table_file.file_type_ = meta::TableFileSchema::TO_INDEX;
} else {
table_file.file_type_ = meta::TableFileSchema::RAW;
}
table_file.file_type_ = meta::TableFileSchema::RAW;
table_file.file_size_ = index->PhysicalSize();
table_file.row_count_ = index->Count();
updated.push_back(table_file);
......
......@@ -5,6 +5,8 @@
******************************************************************************/
#pragma once
#include "Constants.h"
#include <string>
#include <memory>
#include <map>
......@@ -16,10 +18,6 @@ namespace engine {
class Env;
static constexpr uint64_t ONE_KB = 1024;
static constexpr uint64_t ONE_MB = ONE_KB*ONE_KB;
static constexpr uint64_t ONE_GB = ONE_KB*ONE_MB;
static const char* ARCHIVE_CONF_DISK = "disk";
static const char* ARCHIVE_CONF_DAYS = "days";
......
......@@ -86,6 +86,8 @@ Status MemTableFile::Serialize() {
execution_engine_->Serialize();
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
table_file_schema_.file_size_ = execution_engine_->PhysicalSize();
table_file_schema_.row_count_ = execution_engine_->Count();
server::Metrics::GetInstance().DiskStoreIOSpeedGaugeSet((double) size / total_time);
......
......@@ -6,6 +6,7 @@
#pragma once
#include "db/engine/ExecutionEngine.h"
#include "db/Constants.h"
#include <vector>
#include <map>
......@@ -33,7 +34,7 @@ struct TableSchema {
int64_t created_on_ = 0;
int32_t engine_type_ = (int)EngineType::FAISS_IDMAP;
int32_t nlist_ = 16384;
int32_t index_file_size_ = 1024; //MB
int32_t index_file_size_ = 1024*ONE_MB;
int32_t metric_type_ = (int)MetricType::L2;
}; // TableSchema
......
......@@ -424,7 +424,7 @@ Status MySQLMetaImpl::UpdateTableIndexParam(const std::string &table_id, const T
"created_on = " << created_on << ", " <<
"engine_type_ = " << index.engine_type_ << ", " <<
"nlist = " << index.nlist_ << ", " <<
"index_file_size = " << index.index_file_size_ << ", " <<
"index_file_size = " << index.index_file_size_*ONE_MB << ", " <<
"metric_type = " << index.metric_type_ << ", " <<
"WHERE id = " << quote << table_id << ";";
......@@ -481,7 +481,7 @@ Status MySQLMetaImpl::DescribeTableIndex(const std::string &table_id, TableIndex
index.engine_type_ = resRow["engine_type"];
index.nlist_ = resRow["nlist"];
index.index_file_size_ = resRow["index_file_size"];
index.index_file_size_ = resRow["index_file_size"]/ONE_MB;
index.metric_type_ = resRow["metric_type"];
} else {
return Status::NotFound("Table " + table_id + " not found");
......@@ -652,7 +652,7 @@ Status MySQLMetaImpl::DescribeTable(TableSchema &table_schema) {
}
Query describeTableQuery = connectionPtr->query();
describeTableQuery << "SELECT id, dimension, files_cnt, engine_type, store_raw_data " <<
describeTableQuery << "SELECT id, state, dimension, engine_type, nlist, index_file_size, metric_type " <<
"FROM Tables " <<
"WHERE table_id = " << quote << table_schema.table_id_ << " " <<
"AND state <> " << std::to_string(TableSchema::TO_DELETE) << ";";
......@@ -667,9 +667,17 @@ Status MySQLMetaImpl::DescribeTable(TableSchema &table_schema) {
table_schema.id_ = resRow["id"]; //implicit conversion
table_schema.state_ = resRow["state"];
table_schema.dimension_ = resRow["dimension"];
table_schema.engine_type_ = resRow["engine_type"];
table_schema.nlist_ = resRow["nlist"];
table_schema.index_file_size_ = resRow["index_file_size"];
table_schema.metric_type_ = resRow["metric_type"];
} else {
return Status::NotFound("Table " + table_schema.table_id_ + " not found");
}
......@@ -739,7 +747,7 @@ Status MySQLMetaImpl::AllTables(std::vector<TableSchema> &table_schema_array) {
}
Query allTablesQuery = connectionPtr->query();
allTablesQuery << "SELECT id, table_id, dimension, files_cnt, engine_type, store_raw_data " <<
allTablesQuery << "SELECT id, table_id, dimension, engine_type " <<
"FROM Tables " <<
"WHERE state <> " << std::to_string(TableSchema::TO_DELETE) << ";";
......@@ -864,7 +872,7 @@ Status MySQLMetaImpl::FilesToIndex(TableFilesSchema &files) {
}
Query filesToIndexQuery = connectionPtr->query();
filesToIndexQuery << "SELECT id, table_id, engine_type, file_id, file_type, row_count, date " <<
filesToIndexQuery << "SELECT id, table_id, engine_type, file_id, file_type, file_size, row_count, date, created_on " <<
"FROM TableFiles " <<
"WHERE file_type = " << std::to_string(TableFileSchema::TO_INDEX) << ";";
......@@ -891,10 +899,14 @@ Status MySQLMetaImpl::FilesToIndex(TableFilesSchema &files) {
table_file.file_type_ = resRow["file_type"];
table_file.file_size_ = resRow["file_size"];
table_file.row_count_ = resRow["row_count"];
table_file.date_ = resRow["date"];
table_file.created_on_ = resRow["created_on"];
auto groupItr = groups.find(table_file.table_id_);
if (groupItr == groups.end()) {
TableSchema table_schema;
......@@ -943,7 +955,7 @@ Status MySQLMetaImpl::FilesToSearch(const std::string &table_id,
if (partition.empty()) {
Query filesToSearchQuery = connectionPtr->query();
filesToSearchQuery << "SELECT id, table_id, engine_type, file_id, file_type, row_count, date " <<
filesToSearchQuery << "SELECT id, table_id, engine_type, file_id, file_type, file_size, row_count, date " <<
"FROM TableFiles " <<
"WHERE table_id = " << quote << table_id << " AND " <<
"(file_type = " << std::to_string(TableFileSchema::RAW) << " OR " <<
......@@ -965,7 +977,7 @@ Status MySQLMetaImpl::FilesToSearch(const std::string &table_id,
std::string partitionListStr = partitionListSS.str();
partitionListStr = partitionListStr.substr(0, partitionListStr.size() - 2); //remove the last ", "
filesToSearchQuery << "SELECT id, table_id, engine_type, file_id, file_type, row_count, date " <<
filesToSearchQuery << "SELECT id, table_id, engine_type, file_id, file_type, file_size, row_count, date " <<
"FROM TableFiles " <<
"WHERE table_id = " << quote << table_id << " AND " <<
"date IN (" << partitionListStr << ") AND " <<
......@@ -1004,6 +1016,8 @@ Status MySQLMetaImpl::FilesToSearch(const std::string &table_id,
table_file.file_type_ = resRow["file_type"];
table_file.file_size_ = resRow["file_size"];
table_file.row_count_ = resRow["row_count"];
table_file.date_ = resRow["date"];
......@@ -1049,7 +1063,7 @@ Status MySQLMetaImpl::FilesToSearch(const std::string &table_id,
}
Query filesToSearchQuery = connectionPtr->query();
filesToSearchQuery << "SELECT id, table_id, engine_type, file_id, file_type, row_count, date " <<
filesToSearchQuery << "SELECT id, table_id, engine_type, file_id, file_type, file_size, row_count, date " <<
"FROM TableFiles " <<
"WHERE table_id = " << quote << table_id;
......@@ -1110,6 +1124,8 @@ Status MySQLMetaImpl::FilesToSearch(const std::string &table_id,
table_file.file_type_ = resRow["file_type"];
table_file.file_size_ = resRow["file_size"];
table_file.row_count_ = resRow["row_count"];
table_file.date_ = resRow["date"];
......@@ -1144,6 +1160,15 @@ Status MySQLMetaImpl::FilesToMerge(const std::string &table_id,
try {
MetricCollector metric;
//check table existence
TableSchema table_schema;
table_schema.table_id_ = table_id;
auto status = DescribeTable(table_schema);
if (!status.ok()) {
return status;
}
StoreQueryResult res;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -1153,7 +1178,7 @@ Status MySQLMetaImpl::FilesToMerge(const std::string &table_id,
}
Query filesToMergeQuery = connectionPtr->query();
filesToMergeQuery << "SELECT id, table_id, file_id, file_type, file_size, date " <<
filesToMergeQuery << "SELECT id, table_id, file_id, file_type, file_size, row_count, date, engine_type, create_on " <<
"FROM TableFiles " <<
"WHERE table_id = " << quote << table_id << " AND " <<
"file_type = " << std::to_string(TableFileSchema::RAW) << " " <<
......@@ -1164,16 +1189,12 @@ Status MySQLMetaImpl::FilesToMerge(const std::string &table_id,
res = filesToMergeQuery.store();
} //Scoped Connection
TableSchema table_schema;
table_schema.table_id_ = table_id;
auto status = DescribeTable(table_schema);
if (!status.ok()) {
return status;
}
TableFileSchema table_file;
for (auto &resRow : res) {
TableFileSchema table_file;
table_file.file_size_ = resRow["file_size"];
if(table_file.file_size_ >= table_schema.index_file_size_) {
continue;//skip large file
}
table_file.id_ = resRow["id"]; //implicit conversion
......@@ -1187,10 +1208,14 @@ Status MySQLMetaImpl::FilesToMerge(const std::string &table_id,
table_file.file_type_ = resRow["file_type"];
table_file.file_size_ = resRow["file_size"];
table_file.row_count_ = resRow["row_count"];
table_file.date_ = resRow["date"];
table_file.engine_type_ = resRow["engine_type"];
table_file.created_on_ = resRow["created_on"];
table_file.dimension_ = table_schema.dimension_;
utils::GetTableFilePath(options_, table_file);
......@@ -1241,7 +1266,7 @@ Status MySQLMetaImpl::GetTableFiles(const std::string &table_id,
Query getTableFileQuery = connectionPtr->query();
getTableFileQuery << "SELECT id, engine_type, file_id, file_type, file_size, row_count, date " <<
getTableFileQuery << "SELECT id, engine_type, file_id, file_type, file_size, row_count, date, created_on " <<
"FROM TableFiles " <<
"WHERE table_id = " << quote << table_id << " AND " <<
"(" << idStr << ");";
......@@ -1280,6 +1305,8 @@ Status MySQLMetaImpl::GetTableFiles(const std::string &table_id,
file_schema.date_ = resRow["date"];
file_schema.created_on_ = resRow["created_on"];
file_schema.dimension_ = table_schema.dimension_;
utils::GetTableFilePath(options_, file_schema);
......
......@@ -271,15 +271,25 @@ Status SqliteMetaImpl::DescribeTable(TableSchema &table_schema) {
MetricCollector metric;
auto groups = ConnectorPtr->select(columns(&TableSchema::id_,
&TableSchema::state_,
&TableSchema::dimension_,
&TableSchema::engine_type_),
&TableSchema::created_on_,
&TableSchema::engine_type_,
&TableSchema::nlist_,
&TableSchema::index_file_size_,
&TableSchema::metric_type_),
where(c(&TableSchema::table_id_) == table_schema.table_id_
and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
if (groups.size() == 1) {
table_schema.id_ = std::get<0>(groups[0]);
table_schema.dimension_ = std::get<1>(groups[0]);
table_schema.engine_type_ = std::get<2>(groups[0]);
table_schema.state_ = std::get<1>(groups[0]);
table_schema.dimension_ = std::get<2>(groups[0]);
table_schema.created_on_ = std::get<3>(groups[0]);
table_schema.engine_type_ = std::get<4>(groups[0]);
table_schema.nlist_ = std::get<5>(groups[0]);
table_schema.index_file_size_ = std::get<6>(groups[0]);
table_schema.metric_type_ = std::get<7>(groups[0]);
} else {
return Status::NotFound("Table " + table_schema.table_id_ + " not found");
}
......@@ -368,7 +378,7 @@ Status SqliteMetaImpl::UpdateTableIndexParam(const std::string &table_id, const
table_schema.created_on_ = std::get<3>(tables[0]);
table_schema.engine_type_ = index.engine_type_;
table_schema.nlist_ = index.nlist_;
table_schema.index_file_size_ = index.index_file_size_;
table_schema.index_file_size_ = index.index_file_size_*ONE_MB;
table_schema.metric_type_ = index.metric_type_;
ConnectorPtr->update(table_schema);
......@@ -408,7 +418,7 @@ Status SqliteMetaImpl::DescribeTableIndex(const std::string &table_id, TableInde
if (groups.size() == 1) {
index.engine_type_ = std::get<0>(groups[0]);
index.nlist_ = std::get<1>(groups[0]);
index.index_file_size_ = std::get<2>(groups[0]);
index.index_file_size_ = std::get<2>(groups[0])/ONE_MB;
index.metric_type_ = std::get<3>(groups[0]);
} else {
return Status::NotFound("Table " + table_id + " not found");
......@@ -551,9 +561,11 @@ Status SqliteMetaImpl::FilesToIndex(TableFilesSchema &files) {
&TableFileSchema::table_id_,
&TableFileSchema::file_id_,
&TableFileSchema::file_type_,
&TableFileSchema::file_size_,
&TableFileSchema::row_count_,
&TableFileSchema::date_,
&TableFileSchema::engine_type_),
&TableFileSchema::engine_type_,
&TableFileSchema::created_on_),
where(c(&TableFileSchema::file_type_)
== (int) TableFileSchema::TO_INDEX));
......@@ -565,9 +577,11 @@ Status SqliteMetaImpl::FilesToIndex(TableFilesSchema &files) {
table_file.table_id_ = std::get<1>(file);
table_file.file_id_ = std::get<2>(file);
table_file.file_type_ = std::get<3>(file);
table_file.row_count_ = std::get<4>(file);
table_file.date_ = std::get<5>(file);
table_file.engine_type_ = std::get<6>(file);
table_file.file_size_ = std::get<4>(file);
table_file.row_count_ = std::get<5>(file);
table_file.date_ = std::get<6>(file);
table_file.engine_type_ = std::get<7>(file);
table_file.created_on_ = std::get<8>(file);
utils::GetTableFilePath(options_, table_file);
auto groupItr = groups.find(table_file.table_id_);
......@@ -605,6 +619,7 @@ Status SqliteMetaImpl::FilesToSearch(const std::string &table_id,
&TableFileSchema::table_id_,
&TableFileSchema::file_id_,
&TableFileSchema::file_type_,
&TableFileSchema::file_size_,
&TableFileSchema::row_count_,
&TableFileSchema::date_,
&TableFileSchema::engine_type_),
......@@ -625,9 +640,10 @@ Status SqliteMetaImpl::FilesToSearch(const std::string &table_id,
table_file.table_id_ = std::get<1>(file);
table_file.file_id_ = std::get<2>(file);
table_file.file_type_ = std::get<3>(file);
table_file.row_count_ = std::get<4>(file);
table_file.date_ = std::get<5>(file);
table_file.engine_type_ = std::get<6>(file);
table_file.file_size_ = std::get<4>(file);
table_file.row_count_ = std::get<5>(file);
table_file.date_ = std::get<6>(file);
table_file.engine_type_ = std::get<7>(file);
table_file.dimension_ = table_schema.dimension_;
utils::GetTableFilePath(options_, table_file);
auto dateItr = files.find(table_file.date_);
......@@ -643,6 +659,7 @@ Status SqliteMetaImpl::FilesToSearch(const std::string &table_id,
&TableFileSchema::table_id_,
&TableFileSchema::file_id_,
&TableFileSchema::file_type_,
&TableFileSchema::file_size_,
&TableFileSchema::row_count_,
&TableFileSchema::date_,
&TableFileSchema::engine_type_),
......@@ -664,9 +681,10 @@ Status SqliteMetaImpl::FilesToSearch(const std::string &table_id,
table_file.table_id_ = std::get<1>(file);
table_file.file_id_ = std::get<2>(file);
table_file.file_type_ = std::get<3>(file);
table_file.row_count_ = std::get<4>(file);
table_file.date_ = std::get<5>(file);
table_file.engine_type_ = std::get<6>(file);
table_file.file_size_ = std::get<4>(file);
table_file.row_count_ = std::get<5>(file);
table_file.date_ = std::get<6>(file);
table_file.engine_type_ = std::get<7>(file);
table_file.dimension_ = table_schema.dimension_;
utils::GetTableFilePath(options_, table_file);
auto dateItr = files.find(table_file.date_);
......@@ -696,6 +714,7 @@ Status SqliteMetaImpl::FilesToSearch(const std::string &table_id,
&TableFileSchema::table_id_,
&TableFileSchema::file_id_,
&TableFileSchema::file_type_,
&TableFileSchema::file_size_,
&TableFileSchema::row_count_,
&TableFileSchema::date_,
&TableFileSchema::engine_type_);
......@@ -738,9 +757,10 @@ Status SqliteMetaImpl::FilesToSearch(const std::string &table_id,
table_file.table_id_ = std::get<1>(file);
table_file.file_id_ = std::get<2>(file);
table_file.file_type_ = std::get<3>(file);
table_file.row_count_ = std::get<4>(file);
table_file.date_ = std::get<5>(file);
table_file.engine_type_ = std::get<6>(file);
table_file.file_size_ = std::get<4>(file);
table_file.row_count_ = std::get<5>(file);
table_file.date_ = std::get<6>(file);
table_file.engine_type_ = std::get<7>(file);
table_file.dimension_ = table_schema.dimension_;
utils::GetTableFilePath(options_, table_file);
auto dateItr = files.find(table_file.date_);
......@@ -764,32 +784,41 @@ Status SqliteMetaImpl::FilesToMerge(const std::string &table_id,
try {
MetricCollector metric;
//check table existence
TableSchema table_schema;
table_schema.table_id_ = table_id;
auto status = DescribeTable(table_schema);
if (!status.ok()) {
return status;
}
//get files to merge
auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
&TableFileSchema::table_id_,
&TableFileSchema::file_id_,
&TableFileSchema::file_type_,
&TableFileSchema::file_size_,
&TableFileSchema::date_),
&TableFileSchema::row_count_,
&TableFileSchema::date_,
&TableFileSchema::created_on_),
where(c(&TableFileSchema::file_type_) == (int) TableFileSchema::RAW and
c(&TableFileSchema::table_id_) == table_id),
order_by(&TableFileSchema::file_size_).desc());
TableSchema table_schema;
table_schema.table_id_ = table_id;
auto status = DescribeTable(table_schema);
if (!status.ok()) {
return status;
}
TableFileSchema table_file;
for (auto &file : selected) {
TableFileSchema table_file;
table_file.file_size_ = std::get<4>(file);
if(table_file.file_size_ >= table_schema.index_file_size_) {
continue;//skip large file
}
table_file.id_ = std::get<0>(file);
table_file.table_id_ = std::get<1>(file);
table_file.file_id_ = std::get<2>(file);
table_file.file_type_ = std::get<3>(file);
table_file.file_size_ = std::get<4>(file);
table_file.date_ = std::get<5>(file);
table_file.row_count_ = std::get<5>(file);
table_file.date_ = std::get<6>(file);
table_file.created_on_ = std::get<7>(file);
table_file.dimension_ = table_schema.dimension_;
utils::GetTableFilePath(options_, table_file);
auto dateItr = files.find(table_file.date_);
......@@ -816,7 +845,8 @@ Status SqliteMetaImpl::GetTableFiles(const std::string& table_id,
&TableFileSchema::file_size_,
&TableFileSchema::row_count_,
&TableFileSchema::date_,
&TableFileSchema::engine_type_),
&TableFileSchema::engine_type_,
&TableFileSchema::created_on_),
where(c(&TableFileSchema::table_id_) == table_id and
in(&TableFileSchema::id_, ids)
));
......@@ -838,6 +868,7 @@ Status SqliteMetaImpl::GetTableFiles(const std::string& table_id,
file_schema.row_count_ = std::get<4>(file);
file_schema.date_ = std::get<5>(file);
file_schema.engine_type_ = std::get<6>(file);
file_schema.created_on_ = std::get<7>(file);
file_schema.dimension_ = table_schema.dimension_;
utils::GetTableFilePath(options_, file_schema);
......@@ -1218,9 +1249,6 @@ Status SqliteMetaImpl::Count(const std::string &table_id, uint64_t &result) {
result += std::get<0>(file);
}
result /= table_schema.dimension_;
result /= sizeof(float);
} catch (std::exception &e) {
return HandleException("Encounter exception when calculate table file size", e);
}
......
......@@ -99,7 +99,8 @@ ResourceMgr::DumpTaskTables() {
ss << ">>>>>>>>>>>>>>>ResourceMgr::DumpTaskTable<<<<<<<<<<<<<<<" << std::endl;
for (auto &resource : resources_) {
ss << resource->Dump() << std::endl;
ss << resource->task_table().Dump() << std::endl;
ss << resource->task_table().Dump();
ss << resource->Dump() << std::endl << std::endl;
}
return ss.str();
}
......
......@@ -23,6 +23,35 @@ get_now_timestamp() {
return millis;
}
std::string
ToString(TaskTableItemState state) {
switch (state) {
case TaskTableItemState::INVALID: return "INVALID";
case TaskTableItemState::START: return "START";
case TaskTableItemState::LOADING: return "LOADING";
case TaskTableItemState::LOADED: return "LOADED";
case TaskTableItemState::EXECUTING: return "EXECUTING";
case TaskTableItemState::EXECUTED: return "EXECUTED";
case TaskTableItemState::MOVING: return "MOVING";
case TaskTableItemState::MOVED: return "MOVED";
default: return "";
}
}
std::string
ToString(const TaskTimestamp &timestamp) {
std::stringstream ss;
ss << "<start=" << timestamp.start;
ss << ", load=" << timestamp.load;
ss << ", loaded=" << timestamp.loaded;
ss << ", execute=" << timestamp.execute;
ss << ", executed=" << timestamp.executed;
ss << ", move=" << timestamp.move;
ss << ", moved=" << timestamp.moved;
ss << ">";
return ss.str();
}
bool
TaskTableItem::Load() {
std::unique_lock<std::mutex> lock(mutex);
......@@ -90,6 +119,16 @@ TaskTableItem::Moved() {
return false;
}
std::string
TaskTableItem::Dump() {
std::stringstream ss;
ss << "<id=" << id;
ss << ", task=" << task;
ss << ", state=" << ToString(state);
ss << ", timestamp=" << ToString(timestamp);
ss << ">";
return ss.str();
}
void
TaskTable::Put(TaskPtr task) {
......@@ -98,6 +137,7 @@ TaskTable::Put(TaskPtr task) {
item->id = id_++;
item->task = std::move(task);
item->state = TaskTableItemState::START;
item->timestamp.start = get_now_timestamp();
table_.push_back(item);
if (subscriber_) {
subscriber_();
......@@ -112,6 +152,7 @@ TaskTable::Put(std::vector<TaskPtr> &tasks) {
item->id = id_++;
item->task = std::move(task);
item->state = TaskTableItemState::START;
item->timestamp.start = get_now_timestamp();
table_.push_back(item);
}
if (subscriber_) {
......@@ -135,43 +176,12 @@ TaskTable::Clear() {
// table_.erase(table_.begin(), iterator);
}
std::string
ToString(TaskTableItemState state) {
switch (state) {
case TaskTableItemState::INVALID: return "INVALID";
case TaskTableItemState::START: return "START";
case TaskTableItemState::LOADING: return "LOADING";
case TaskTableItemState::LOADED: return "LOADED";
case TaskTableItemState::EXECUTING: return "EXECUTING";
case TaskTableItemState::EXECUTED: return "EXECUTED";
case TaskTableItemState::MOVING: return "MOVING";
case TaskTableItemState::MOVED: return "MOVED";
default: return "";
}
}
std::string
ToString(const TaskTimestamp &timestamp) {
std::stringstream ss;
ss << "<start=" << timestamp.start;
ss << ", load=" << timestamp.load;
ss << ", loaded=" << timestamp.loaded;
ss << ", execute=" << timestamp.execute;
ss << ", executed=" << timestamp.executed;
ss << ", move=" << timestamp.move;
ss << ", moved=" << timestamp.moved;
ss << ">";
return ss.str();
}
std::string
TaskTable::Dump() {
std::stringstream ss;
for (auto &item : table_) {
ss << "<id=" << item->id;
ss << ", state=" << ToString(item->state);
ss << ", timestamp=" << ToString(item->timestamp);
ss << ">" << std::endl;
ss << item->Dump() << std::endl;
}
return ss.str();
}
......
......@@ -70,6 +70,9 @@ struct TaskTableItem {
bool
Moved();
std::string
Dump();
};
using TaskTableItemPtr = std::shared_ptr<TaskTableItem>;
......
......@@ -26,7 +26,7 @@ namespace {
constexpr int64_t NQ = 10;
constexpr int64_t TOP_K = 10;
constexpr int64_t SEARCH_TARGET = 5000; //change this value, result is different
constexpr int64_t ADD_VECTOR_LOOP = 1;
constexpr int64_t ADD_VECTOR_LOOP = 10;
constexpr int64_t SECONDS_EACH_HOUR = 3600;
#define BLOCK_SPLITER std::cout << "===========================================" << std::endl;
......@@ -86,9 +86,8 @@ namespace {
}
std::string GetTableName() {
// static std::string s_id(CurrentTime());
// return "tbl_" + s_id;
return "test";
static std::string s_id(CurrentTime());
return "tbl_" + s_id;
}
TableSchema BuildTableSchema() {
......@@ -272,6 +271,10 @@ ClientTest::Test(const std::string& address, const std::string& port) {
{//search vectors without index
Sleep(2);
int64_t row_count = 0;
Status stat = conn->CountTable(TABLE_NAME, row_count);
std::cout << TABLE_NAME << "(" << row_count << " rows)" << std::endl;
DoSearch(conn, search_record_array, "Search without index");
}
......@@ -303,6 +306,10 @@ ClientTest::Test(const std::string& address, const std::string& port) {
{//delete index
Status stat = conn->DropIndex(TABLE_NAME);
std::cout << "DropIndex function call status: " << stat.ToString() << std::endl;
int64_t row_count = 0;
stat = conn->CountTable(TABLE_NAME, row_count);
std::cout << TABLE_NAME << "(" << row_count << " rows)" << std::endl;
}
{//delete by range
......
#-------------------------------------------------------------------------------
# Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
# Unauthorized copying of this file, via any medium is strictly prohibited.
# Proprietary and confidential.
#-------------------------------------------------------------------------------
aux_source_directory(${MILVUS_ENGINE_SRC}/wrapper wrapper_src)
aux_source_directory(${MILVUS_ENGINE_SRC}/config config_files)
set(util_files
${MILVUS_ENGINE_SRC}/utils/ValidationUtil.cpp)
# Make sure that your call to link_directories takes place before your call to the relevant add_executable.
include_directories(/usr/local/cuda/include)
link_directories("/usr/local/cuda/lib64")
set(wrapper_test_src
${unittest_srcs}
${wrapper_src}
${config_files}
${util_files}
${require_files}
wrapper_test.cpp
)
add_executable(wrapper_test ${wrapper_test_src})
set(wrapper_libs
stdc++
boost_system_static
boost_filesystem_static
faiss
cudart
cublas
sqlite
snappy
bz2
z
zstd
lz4
)
if(${BUILD_FAISS_WITH_MKL} STREQUAL "ON")
set(wrapper_libs ${wrapper_libs} ${MKL_LIBS} ${MKL_LIBS})
else()
set(wrapper_libs ${wrapper_libs}
lapack
openblas)
endif()
target_link_libraries(wrapper_test ${wrapper_libs} ${unittest_libs})
add_definitions("-DUNITTEST_ONLY")
set(topk_test_src
topk_test.cpp
${CMAKE_SOURCE_DIR}/src/wrapper/gpu/Topk.cu)
install(TARGETS wrapper_test DESTINATION bin)
////////////////////////////////////////////////////////////////////////////////
// Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
// Unauthorized copying of this file, via any medium is strictly prohibited.
// Proprietary and confidential.
////////////////////////////////////////////////////////////////////////////////
#include "wrapper/Operand.h"
#include "wrapper/Index.h"
#include "wrapper/IndexBuilder.h"
#include "wrapper/FaissGpuResources.h"
#include "server/ServerConfig.h"
#include <gtest/gtest.h>
#include <random>
#include <src/wrapper/FaissGpuResources.h>
using namespace zilliz::milvus;
using namespace zilliz::milvus::engine;
TEST(operand_test, Wrapper_Test) {
using std::cout;
using std::endl;
auto opd = std::make_shared<Operand>();
opd->index_type = "IVF";
opd->preproc = "OPQ";
opd->postproc = "PQ";
opd->metric_type = "L2";
opd->d = 64;
auto opd_str = operand_to_str(opd);
auto new_opd = str_to_operand(opd_str);
// TODO: fix all place where using opd to build index.
assert(new_opd->get_index_type(10000) == opd->get_index_type(10000));
auto opd_sq8 = std::make_shared<Operand>();
opd_sq8->index_type = "IVFSQ8";
opd_sq8->preproc = "OPQ";
opd_sq8->postproc = "PQ";
opd_sq8->metric_type = "L2";
opd_sq8->d = 64;
auto opd_str_sq8 = operand_to_str(opd_sq8);
auto new_opd_sq8 = str_to_operand(opd_str_sq8);
assert(new_opd_sq8->get_index_type(10000) == opd_sq8->get_index_type(10000));
}
TEST(build_test, Wrapper_Test) {
// dimension of the vectors to index
int d = 3;
// make a set of nt training vectors in the unit cube
size_t nt = 10000;
// a reasonable number of cetroids to index nb vectors
int ncentroids = 16;
std::random_device rd;
std::mt19937 gen(rd());
std::vector<float> xb;
std::vector<long> ids;
//prepare train data
std::uniform_real_distribution<> dis_xt(-1.0, 1.0);
std::vector<float> xt(nt * d);
for (size_t i = 0; i < nt * d; i++) {
xt[i] = dis_xt(gen);
}
//train the index
auto opd = std::make_shared<Operand>();
opd->index_type = "IVF";
opd->d = d;
opd->ncent = ncentroids;
IndexBuilderPtr index_builder_1 = GetIndexBuilder(opd);
auto index_1 = index_builder_1->build_all(0, xb, ids, nt, xt);
ASSERT_TRUE(index_1 != nullptr);
// size of the database we plan to index
size_t nb = 100000;
//prepare raw data
xb.resize(nb);
ids.resize(nb);
for (size_t i = 0; i < nb; i++) {
xb[i] = dis_xt(gen);
ids[i] = i;
}
index_1->add_with_ids(nb, xb.data(), ids.data());
//search in first quadrant
int nq = 1, k = 10;
std::vector<float> xq = {0.5, 0.5, 0.5};
float *result_dists = new float[k];
long *result_ids = new long[k];
index_1->search(nq, xq.data(), k, result_dists, result_ids);
for (int i = 0; i < k; i++) {
if (result_ids[i] < 0) {
ASSERT_TRUE(false);
break;
}
long id = result_ids[i];
std::cout << "No." << id << " [" << xb[id * 3] << ", " << xb[id * 3 + 1] << ", "
<< xb[id * 3 + 2] << "] distance = " << result_dists[i] << std::endl;
//makesure result vector is in first quadrant
ASSERT_TRUE(xb[id * 3] > 0.0);
ASSERT_TRUE(xb[id * 3 + 1] > 0.0);
ASSERT_TRUE(xb[id * 3 + 2] > 0.0);
}
delete[] result_dists;
delete[] result_ids;
}
TEST(gpu_build_test, Wrapper_Test) {
using std::vector;
int d = 256;
int nb = 3 * 1000 * 100;
int nq = 100;
vector<float> xb(d * nb);
vector<float> xq(d * nq);
vector<long> ids(nb);
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_real_distribution<> dis_xt(-1.0, 1.0);
for (auto &e : xb) { e = float(dis_xt(gen)); }
for (auto &e : xq) { e = float(dis_xt(gen)); }
for (int i = 0; i < nb; ++i) { ids[i] = i; }
auto opd = std::make_shared<Operand>();
opd->index_type = "IVF";
opd->d = d;
opd->ncent = 256;
IndexBuilderPtr index_builder_1 = GetIndexBuilder(opd);
auto index_1 = index_builder_1->build_all(nb, xb.data(), ids.data());
assert(index_1->ntotal == nb);
assert(index_1->dim == d);
// sanity check: search 5 first vectors of xb
int k = 1;
vector<long> I(5 * k);
vector<float> D(5 * k);
index_1->search(5, xb.data(), k, D.data(), I.data());
for (int i = 0; i < 5; ++i) { assert(i == I[i]); }
}
TEST(gpu_resource_test, Wrapper_Test) {
FaissGpuResources res_mgr;
FaissGpuResources::Ptr& res = res_mgr.GetGpuResources(0);
ASSERT_NE(res, nullptr);
res = res_mgr.GetGpuResources(0);
ASSERT_NE(res, nullptr);
server::ServerConfig &config = server::ServerConfig::GetInstance();
server::ConfigNode& server_config = config.GetConfig(server::CONFIG_SERVER);
server_config.SetValue(server::CONFIG_GPU_INDEX, "0");
res_mgr.SelectGpu();
int32_t gpu_num = res_mgr.GetGpu();
ASSERT_EQ(gpu_num, 0);
}
TEST(index_test, Wrapper_Test) {
std::vector<float> data;
std::vector<long> ids;
long vec_count = 10000;
for(long i = 0; i < vec_count; i++) {
data.push_back(i/3);
data.push_back(i/9);
ids.push_back(i);
}
faiss::Index* faiss_index = faiss::index_factory(2, "IVF128,SQ8");
faiss_index->train(vec_count, data.data());
std::shared_ptr<faiss::Index> raw_index(faiss_index);
engine::Index_ptr index = std::make_shared<engine::Index>(raw_index);
index->add_with_ids(vec_count, data.data(), ids.data());
ASSERT_EQ(index->ntotal, vec_count);
std::string file_name = "/tmp/index_test.t";
write_index(index, file_name);
server::ServerConfig &config = server::ServerConfig::GetInstance();
server::ConfigNode& engine_config = config.GetConfig(server::CONFIG_ENGINE);
engine_config.SetValue(server::CONFIG_USE_HYBRID_INDEX, "true");
Index_ptr index_out = read_index(file_name);
ASSERT_NE(index_out, nullptr);
bool res = index_out->reset();
ASSERT_TRUE(res);
}
......@@ -199,12 +199,22 @@ TEST(UtilTest, VALIDATE_DIMENSIONTEST) {
ASSERT_EQ(server::ValidationUtil::ValidateTableDimension(1), server::SERVER_SUCCESS);
}
TEST(UtilTest, VALIDATE_INDEXTYPE_TEST) {
TEST(UtilTest, VALIDATE_INDEX_TEST) {
ASSERT_EQ(server::ValidationUtil::ValidateTableIndexType((int)engine::EngineType::INVALID), server::SERVER_INVALID_INDEX_TYPE);
for(int i = 1; i <= (int)engine::EngineType::MAX_VALUE; i++) {
ASSERT_EQ(server::ValidationUtil::ValidateTableIndexType(i), server::SERVER_SUCCESS);
}
ASSERT_EQ(server::ValidationUtil::ValidateTableIndexType((int)engine::EngineType::MAX_VALUE + 1), server::SERVER_INVALID_INDEX_TYPE);
ASSERT_EQ(server::ValidationUtil::ValidateTableIndexNlist(0), server::SERVER_INVALID_INDEX_NLIST);
ASSERT_EQ(server::ValidationUtil::ValidateTableIndexNlist(100), server::SERVER_SUCCESS);
ASSERT_EQ(server::ValidationUtil::ValidateTableIndexFileSize(0), server::SERVER_INVALID_INDEX_FILE_SIZE);
ASSERT_EQ(server::ValidationUtil::ValidateTableIndexFileSize(100), server::SERVER_SUCCESS);
ASSERT_EQ(server::ValidationUtil::ValidateTableIndexMetricType(0), server::SERVER_INVALID_INDEX_METRIC_TYPE);
ASSERT_EQ(server::ValidationUtil::ValidateTableIndexMetricType(1), server::SERVER_SUCCESS);
ASSERT_EQ(server::ValidationUtil::ValidateTableIndexMetricType(2), server::SERVER_SUCCESS);
}
TEST(ValidationUtilTest, ValidateGpuTest) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册