提交 907d8c4d 编写于 作者: P peng.xu

Merge branch 'branch-0.4.0' into 'branch-0.4.0'

fix index size bug

See merge request megasearch/milvus!404

Former-commit-id: f8b332e59b07ceeeb062e20b2294af864f8e24ce
......@@ -5,19 +5,25 @@
******************************************************************************/
#pragma once
#include <stdint.h>
namespace zilliz {
namespace milvus {
namespace engine {
constexpr size_t K = 1024UL;
constexpr size_t M = K * K;
constexpr size_t G = K * M;
constexpr size_t T = K * G;
constexpr uint64_t K = 1024UL;
constexpr uint64_t M = K * K;
constexpr uint64_t G = K * M;
constexpr uint64_t T = K * G;
constexpr size_t MAX_TABLE_FILE_MEM = 128 * M;
constexpr uint64_t MAX_TABLE_FILE_MEM = 128 * M;
constexpr int VECTOR_TYPE_SIZE = sizeof(float);
static constexpr uint64_t ONE_KB = K;
static constexpr uint64_t ONE_MB = ONE_KB*ONE_KB;
static constexpr uint64_t ONE_GB = ONE_KB*ONE_MB;
} // namespace engine
} // namespace milvus
} // namespace zilliz
......@@ -60,27 +60,6 @@ void CollectQueryMetrics(double total_time, size_t nq) {
server::Metrics::GetInstance().QueryVectorResponsePerSecondGaugeSet(double (nq) / total_time);
}
#if 0
void CollectFileMetrics(int file_type, size_t file_size, double total_time) {
switch(file_type) {
case meta::TableFileSchema::RAW:
case meta::TableFileSchema::TO_INDEX: {
server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
server::Metrics::GetInstance().RawFileSizeHistogramObserve(file_size);
server::Metrics::GetInstance().RawFileSizeTotalIncrement(file_size);
server::Metrics::GetInstance().RawFileSizeGaugeSet(file_size);
break;
}
default: {
server::Metrics::GetInstance().SearchIndexDataDurationSecondsHistogramObserve(total_time);
server::Metrics::GetInstance().IndexFileSizeHistogramObserve(file_size);
server::Metrics::GetInstance().IndexFileSizeTotalIncrement(file_size);
server::Metrics::GetInstance().IndexFileSizeGaugeSet(file_size);
break;
}
}
}
#endif
}
......@@ -473,11 +452,7 @@ Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date,
}
//step 4: update table files state
if (index_size >= options_.index_trigger_size) {
table_file.file_type_ = meta::TableFileSchema::TO_INDEX;
} else {
table_file.file_type_ = meta::TableFileSchema::RAW;
}
table_file.file_type_ = meta::TableFileSchema::RAW;
table_file.file_size_ = index->PhysicalSize();
table_file.row_count_ = index->Count();
updated.push_back(table_file);
......
......@@ -5,6 +5,8 @@
******************************************************************************/
#pragma once
#include "Constants.h"
#include <string>
#include <memory>
#include <map>
......@@ -16,10 +18,6 @@ namespace engine {
class Env;
static constexpr uint64_t ONE_KB = 1024;
static constexpr uint64_t ONE_MB = ONE_KB*ONE_KB;
static constexpr uint64_t ONE_GB = ONE_KB*ONE_MB;
static const char* ARCHIVE_CONF_DISK = "disk";
static const char* ARCHIVE_CONF_DAYS = "days";
......
......@@ -6,6 +6,7 @@
#pragma once
#include "db/engine/ExecutionEngine.h"
#include "db/Constants.h"
#include <vector>
#include <map>
......@@ -33,7 +34,7 @@ struct TableSchema {
int64_t created_on_ = 0;
int32_t engine_type_ = (int)EngineType::FAISS_IDMAP;
int32_t nlist_ = 16384;
int32_t index_file_size_ = 1024; //MB
int32_t index_file_size_ = 1024*ONE_MB;
int32_t metric_type_ = (int)MetricType::L2;
}; // TableSchema
......
......@@ -424,7 +424,7 @@ Status MySQLMetaImpl::UpdateTableIndexParam(const std::string &table_id, const T
"created_on = " << created_on << ", " <<
"engine_type_ = " << index.engine_type_ << ", " <<
"nlist = " << index.nlist_ << ", " <<
"index_file_size = " << index.index_file_size_ << ", " <<
"index_file_size = " << index.index_file_size_*ONE_MB << ", " <<
"metric_type = " << index.metric_type_ << ", " <<
"WHERE id = " << quote << table_id << ";";
......@@ -481,7 +481,7 @@ Status MySQLMetaImpl::DescribeTableIndex(const std::string &table_id, TableIndex
index.engine_type_ = resRow["engine_type"];
index.nlist_ = resRow["nlist"];
index.index_file_size_ = resRow["index_file_size"];
index.index_file_size_ = resRow["index_file_size"]/ONE_MB;
index.metric_type_ = resRow["metric_type"];
} else {
return Status::NotFound("Table " + table_id + " not found");
......@@ -652,7 +652,7 @@ Status MySQLMetaImpl::DescribeTable(TableSchema &table_schema) {
}
Query describeTableQuery = connectionPtr->query();
describeTableQuery << "SELECT id, dimension, engine_type " <<
describeTableQuery << "SELECT id, state, dimension, engine_type, nlist, index_file_size, metric_type " <<
"FROM Tables " <<
"WHERE table_id = " << quote << table_schema.table_id_ << " " <<
"AND state <> " << std::to_string(TableSchema::TO_DELETE) << ";";
......@@ -667,9 +667,17 @@ Status MySQLMetaImpl::DescribeTable(TableSchema &table_schema) {
table_schema.id_ = resRow["id"]; //implicit conversion
table_schema.state_ = resRow["state"];
table_schema.dimension_ = resRow["dimension"];
table_schema.engine_type_ = resRow["engine_type"];
table_schema.nlist_ = resRow["nlist"];
table_schema.index_file_size_ = resRow["index_file_size"];
table_schema.metric_type_ = resRow["metric_type"];
} else {
return Status::NotFound("Table " + table_schema.table_id_ + " not found");
}
......@@ -1152,6 +1160,15 @@ Status MySQLMetaImpl::FilesToMerge(const std::string &table_id,
try {
MetricCollector metric;
//check table existence
TableSchema table_schema;
table_schema.table_id_ = table_id;
auto status = DescribeTable(table_schema);
if (!status.ok()) {
return status;
}
StoreQueryResult res;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -1172,16 +1189,12 @@ Status MySQLMetaImpl::FilesToMerge(const std::string &table_id,
res = filesToMergeQuery.store();
} //Scoped Connection
TableSchema table_schema;
table_schema.table_id_ = table_id;
auto status = DescribeTable(table_schema);
if (!status.ok()) {
return status;
}
TableFileSchema table_file;
for (auto &resRow : res) {
TableFileSchema table_file;
table_file.file_size_ = resRow["file_size"];
if(table_file.file_size_ >= table_schema.index_file_size_) {
continue;//skip large file
}
table_file.id_ = resRow["id"]; //implicit conversion
......@@ -1195,8 +1208,6 @@ Status MySQLMetaImpl::FilesToMerge(const std::string &table_id,
table_file.file_type_ = resRow["file_type"];
table_file.file_size_ = resRow["file_size"];
table_file.row_count_ = resRow["row_count"];
table_file.date_ = resRow["date"];
......
......@@ -271,15 +271,25 @@ Status SqliteMetaImpl::DescribeTable(TableSchema &table_schema) {
MetricCollector metric;
auto groups = ConnectorPtr->select(columns(&TableSchema::id_,
&TableSchema::state_,
&TableSchema::dimension_,
&TableSchema::engine_type_),
&TableSchema::created_on_,
&TableSchema::engine_type_,
&TableSchema::nlist_,
&TableSchema::index_file_size_,
&TableSchema::metric_type_),
where(c(&TableSchema::table_id_) == table_schema.table_id_
and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
if (groups.size() == 1) {
table_schema.id_ = std::get<0>(groups[0]);
table_schema.dimension_ = std::get<1>(groups[0]);
table_schema.engine_type_ = std::get<2>(groups[0]);
table_schema.state_ = std::get<1>(groups[0]);
table_schema.dimension_ = std::get<2>(groups[0]);
table_schema.created_on_ = std::get<3>(groups[0]);
table_schema.engine_type_ = std::get<4>(groups[0]);
table_schema.nlist_ = std::get<5>(groups[0]);
table_schema.index_file_size_ = std::get<6>(groups[0]);
table_schema.metric_type_ = std::get<7>(groups[0]);
} else {
return Status::NotFound("Table " + table_schema.table_id_ + " not found");
}
......@@ -368,7 +378,7 @@ Status SqliteMetaImpl::UpdateTableIndexParam(const std::string &table_id, const
table_schema.created_on_ = std::get<3>(tables[0]);
table_schema.engine_type_ = index.engine_type_;
table_schema.nlist_ = index.nlist_;
table_schema.index_file_size_ = index.index_file_size_;
table_schema.index_file_size_ = index.index_file_size_*ONE_MB;
table_schema.metric_type_ = index.metric_type_;
ConnectorPtr->update(table_schema);
......@@ -408,7 +418,7 @@ Status SqliteMetaImpl::DescribeTableIndex(const std::string &table_id, TableInde
if (groups.size() == 1) {
index.engine_type_ = std::get<0>(groups[0]);
index.nlist_ = std::get<1>(groups[0]);
index.index_file_size_ = std::get<2>(groups[0]);
index.index_file_size_ = std::get<2>(groups[0])/ONE_MB;
index.metric_type_ = std::get<3>(groups[0]);
} else {
return Status::NotFound("Table " + table_id + " not found");
......@@ -774,6 +784,15 @@ Status SqliteMetaImpl::FilesToMerge(const std::string &table_id,
try {
MetricCollector metric;
//check table existence
TableSchema table_schema;
table_schema.table_id_ = table_id;
auto status = DescribeTable(table_schema);
if (!status.ok()) {
return status;
}
//get files to merge
auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
&TableFileSchema::table_id_,
&TableFileSchema::file_id_,
......@@ -786,21 +805,17 @@ Status SqliteMetaImpl::FilesToMerge(const std::string &table_id,
c(&TableFileSchema::table_id_) == table_id),
order_by(&TableFileSchema::file_size_).desc());
TableSchema table_schema;
table_schema.table_id_ = table_id;
auto status = DescribeTable(table_schema);
if (!status.ok()) {
return status;
}
TableFileSchema table_file;
for (auto &file : selected) {
TableFileSchema table_file;
table_file.file_size_ = std::get<4>(file);
if(table_file.file_size_ >= table_schema.index_file_size_) {
continue;//skip large file
}
table_file.id_ = std::get<0>(file);
table_file.table_id_ = std::get<1>(file);
table_file.file_id_ = std::get<2>(file);
table_file.file_type_ = std::get<3>(file);
table_file.file_size_ = std::get<4>(file);
table_file.row_count_ = std::get<5>(file);
table_file.date_ = std::get<6>(file);
table_file.created_on_ = std::get<7>(file);
......
......@@ -26,7 +26,7 @@ namespace {
constexpr int64_t NQ = 10;
constexpr int64_t TOP_K = 10;
constexpr int64_t SEARCH_TARGET = 5000; //change this value, result is different
constexpr int64_t ADD_VECTOR_LOOP = 1;
constexpr int64_t ADD_VECTOR_LOOP = 10;
constexpr int64_t SECONDS_EACH_HOUR = 3600;
#define BLOCK_SPLITER std::cout << "===========================================" << std::endl;
......
#-------------------------------------------------------------------------------
# Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
# Unauthorized copying of this file, via any medium is strictly prohibited.
# Proprietary and confidential.
#-------------------------------------------------------------------------------
aux_source_directory(${MILVUS_ENGINE_SRC}/wrapper wrapper_src)
aux_source_directory(${MILVUS_ENGINE_SRC}/config config_files)
set(util_files
${MILVUS_ENGINE_SRC}/utils/ValidationUtil.cpp)
# Make sure that your call to link_directories takes place before your call to the relevant add_executable.
include_directories(/usr/local/cuda/include)
link_directories("/usr/local/cuda/lib64")
set(wrapper_test_src
${unittest_srcs}
${wrapper_src}
${config_files}
${util_files}
${require_files}
wrapper_test.cpp
)
add_executable(wrapper_test ${wrapper_test_src})
set(wrapper_libs
stdc++
boost_system_static
boost_filesystem_static
faiss
cudart
cublas
sqlite
snappy
bz2
z
zstd
lz4
)
if(${BUILD_FAISS_WITH_MKL} STREQUAL "ON")
set(wrapper_libs ${wrapper_libs} ${MKL_LIBS} ${MKL_LIBS})
else()
set(wrapper_libs ${wrapper_libs}
lapack
openblas)
endif()
target_link_libraries(wrapper_test ${wrapper_libs} ${unittest_libs})
add_definitions("-DUNITTEST_ONLY")
set(topk_test_src
topk_test.cpp
${CMAKE_SOURCE_DIR}/src/wrapper/gpu/Topk.cu)
install(TARGETS wrapper_test DESTINATION bin)
////////////////////////////////////////////////////////////////////////////////
// Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
// Unauthorized copying of this file, via any medium is strictly prohibited.
// Proprietary and confidential.
////////////////////////////////////////////////////////////////////////////////
#include "wrapper/Operand.h"
#include "wrapper/Index.h"
#include "wrapper/IndexBuilder.h"
#include "wrapper/FaissGpuResources.h"
#include "server/ServerConfig.h"
#include <gtest/gtest.h>
#include <random>
#include <src/wrapper/FaissGpuResources.h>
using namespace zilliz::milvus;
using namespace zilliz::milvus::engine;
TEST(operand_test, Wrapper_Test) {
using std::cout;
using std::endl;
auto opd = std::make_shared<Operand>();
opd->index_type = "IVF";
opd->preproc = "OPQ";
opd->postproc = "PQ";
opd->metric_type = "L2";
opd->d = 64;
auto opd_str = operand_to_str(opd);
auto new_opd = str_to_operand(opd_str);
// TODO: fix all place where using opd to build index.
assert(new_opd->get_index_type(10000) == opd->get_index_type(10000));
auto opd_sq8 = std::make_shared<Operand>();
opd_sq8->index_type = "IVFSQ8";
opd_sq8->preproc = "OPQ";
opd_sq8->postproc = "PQ";
opd_sq8->metric_type = "L2";
opd_sq8->d = 64;
auto opd_str_sq8 = operand_to_str(opd_sq8);
auto new_opd_sq8 = str_to_operand(opd_str_sq8);
assert(new_opd_sq8->get_index_type(10000) == opd_sq8->get_index_type(10000));
}
TEST(build_test, Wrapper_Test) {
// dimension of the vectors to index
int d = 3;
// make a set of nt training vectors in the unit cube
size_t nt = 10000;
// a reasonable number of cetroids to index nb vectors
int ncentroids = 16;
std::random_device rd;
std::mt19937 gen(rd());
std::vector<float> xb;
std::vector<long> ids;
//prepare train data
std::uniform_real_distribution<> dis_xt(-1.0, 1.0);
std::vector<float> xt(nt * d);
for (size_t i = 0; i < nt * d; i++) {
xt[i] = dis_xt(gen);
}
//train the index
auto opd = std::make_shared<Operand>();
opd->index_type = "IVF";
opd->d = d;
opd->ncent = ncentroids;
IndexBuilderPtr index_builder_1 = GetIndexBuilder(opd);
auto index_1 = index_builder_1->build_all(0, xb, ids, nt, xt);
ASSERT_TRUE(index_1 != nullptr);
// size of the database we plan to index
size_t nb = 100000;
//prepare raw data
xb.resize(nb);
ids.resize(nb);
for (size_t i = 0; i < nb; i++) {
xb[i] = dis_xt(gen);
ids[i] = i;
}
index_1->add_with_ids(nb, xb.data(), ids.data());
//search in first quadrant
int nq = 1, k = 10;
std::vector<float> xq = {0.5, 0.5, 0.5};
float *result_dists = new float[k];
long *result_ids = new long[k];
index_1->search(nq, xq.data(), k, result_dists, result_ids);
for (int i = 0; i < k; i++) {
if (result_ids[i] < 0) {
ASSERT_TRUE(false);
break;
}
long id = result_ids[i];
std::cout << "No." << id << " [" << xb[id * 3] << ", " << xb[id * 3 + 1] << ", "
<< xb[id * 3 + 2] << "] distance = " << result_dists[i] << std::endl;
//makesure result vector is in first quadrant
ASSERT_TRUE(xb[id * 3] > 0.0);
ASSERT_TRUE(xb[id * 3 + 1] > 0.0);
ASSERT_TRUE(xb[id * 3 + 2] > 0.0);
}
delete[] result_dists;
delete[] result_ids;
}
TEST(gpu_build_test, Wrapper_Test) {
using std::vector;
int d = 256;
int nb = 3 * 1000 * 100;
int nq = 100;
vector<float> xb(d * nb);
vector<float> xq(d * nq);
vector<long> ids(nb);
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_real_distribution<> dis_xt(-1.0, 1.0);
for (auto &e : xb) { e = float(dis_xt(gen)); }
for (auto &e : xq) { e = float(dis_xt(gen)); }
for (int i = 0; i < nb; ++i) { ids[i] = i; }
auto opd = std::make_shared<Operand>();
opd->index_type = "IVF";
opd->d = d;
opd->ncent = 256;
IndexBuilderPtr index_builder_1 = GetIndexBuilder(opd);
auto index_1 = index_builder_1->build_all(nb, xb.data(), ids.data());
assert(index_1->ntotal == nb);
assert(index_1->dim == d);
// sanity check: search 5 first vectors of xb
int k = 1;
vector<long> I(5 * k);
vector<float> D(5 * k);
index_1->search(5, xb.data(), k, D.data(), I.data());
for (int i = 0; i < 5; ++i) { assert(i == I[i]); }
}
TEST(gpu_resource_test, Wrapper_Test) {
FaissGpuResources res_mgr;
FaissGpuResources::Ptr& res = res_mgr.GetGpuResources(0);
ASSERT_NE(res, nullptr);
res = res_mgr.GetGpuResources(0);
ASSERT_NE(res, nullptr);
server::ServerConfig &config = server::ServerConfig::GetInstance();
server::ConfigNode& server_config = config.GetConfig(server::CONFIG_SERVER);
server_config.SetValue(server::CONFIG_GPU_INDEX, "0");
res_mgr.SelectGpu();
int32_t gpu_num = res_mgr.GetGpu();
ASSERT_EQ(gpu_num, 0);
}
TEST(index_test, Wrapper_Test) {
std::vector<float> data;
std::vector<long> ids;
long vec_count = 10000;
for(long i = 0; i < vec_count; i++) {
data.push_back(i/3);
data.push_back(i/9);
ids.push_back(i);
}
faiss::Index* faiss_index = faiss::index_factory(2, "IVF128,SQ8");
faiss_index->train(vec_count, data.data());
std::shared_ptr<faiss::Index> raw_index(faiss_index);
engine::Index_ptr index = std::make_shared<engine::Index>(raw_index);
index->add_with_ids(vec_count, data.data(), ids.data());
ASSERT_EQ(index->ntotal, vec_count);
std::string file_name = "/tmp/index_test.t";
write_index(index, file_name);
server::ServerConfig &config = server::ServerConfig::GetInstance();
server::ConfigNode& engine_config = config.GetConfig(server::CONFIG_ENGINE);
engine_config.SetValue(server::CONFIG_USE_HYBRID_INDEX, "true");
Index_ptr index_out = read_index(file_name);
ASSERT_NE(index_out, nullptr);
bool res = index_out->reset();
ASSERT_TRUE(res);
}
......@@ -199,12 +199,22 @@ TEST(UtilTest, VALIDATE_DIMENSIONTEST) {
ASSERT_EQ(server::ValidationUtil::ValidateTableDimension(1), server::SERVER_SUCCESS);
}
TEST(UtilTest, VALIDATE_INDEXTYPE_TEST) {
TEST(UtilTest, VALIDATE_INDEX_TEST) {
ASSERT_EQ(server::ValidationUtil::ValidateTableIndexType((int)engine::EngineType::INVALID), server::SERVER_INVALID_INDEX_TYPE);
for(int i = 1; i <= (int)engine::EngineType::MAX_VALUE; i++) {
ASSERT_EQ(server::ValidationUtil::ValidateTableIndexType(i), server::SERVER_SUCCESS);
}
ASSERT_EQ(server::ValidationUtil::ValidateTableIndexType((int)engine::EngineType::MAX_VALUE + 1), server::SERVER_INVALID_INDEX_TYPE);
ASSERT_EQ(server::ValidationUtil::ValidateTableIndexNlist(0), server::SERVER_INVALID_INDEX_NLIST);
ASSERT_EQ(server::ValidationUtil::ValidateTableIndexNlist(100), server::SERVER_SUCCESS);
ASSERT_EQ(server::ValidationUtil::ValidateTableIndexFileSize(0), server::SERVER_INVALID_INDEX_FILE_SIZE);
ASSERT_EQ(server::ValidationUtil::ValidateTableIndexFileSize(100), server::SERVER_SUCCESS);
ASSERT_EQ(server::ValidationUtil::ValidateTableIndexMetricType(0), server::SERVER_INVALID_INDEX_METRIC_TYPE);
ASSERT_EQ(server::ValidationUtil::ValidateTableIndexMetricType(1), server::SERVER_SUCCESS);
ASSERT_EQ(server::ValidationUtil::ValidateTableIndexMetricType(2), server::SERVER_SUCCESS);
}
TEST(ValidationUtilTest, ValidateGpuTest) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册