未验证 提交 300fef44 编写于 作者: Y yah01 提交者: GitHub

Enable mmap for vector index (#25877)

Signed-off-by: Nyah01 <yah2er0ne@outlook.com>
上级 3421956a
......@@ -164,7 +164,7 @@ build-cpp-with-unittest: generated-proto
build-cpp-with-coverage: generated-proto
@echo "Building Milvus cpp library with coverage and unittest ..."
@(env bash $(PWD)/scripts/core_build.sh -t ${mode} -u -a ${useasan} -c -f "$(CUSTOM_THIRDPARTY_PATH)" -n ${disk_index} -y ${use_dynamic_simd})
@(env bash $(PWD)/scripts/core_build.sh -t ${mode} -u -c -f "$(CUSTOM_THIRDPARTY_PATH)" -n ${disk_index} -y ${use_dynamic_simd})
check-proto-product: generated-proto
@(env bash $(PWD)/scripts/check_proto_product.sh)
......
......@@ -18,13 +18,18 @@
#include <memory>
#include <boost/dynamic_bitset.hpp>
#include "exceptions/EasyAssert.h"
#include "knowhere/comp/index_param.h"
#include "knowhere/dataset.h"
#include "common/Types.h"
const std::string kMmapFilepath = "mmap_filepath";
namespace milvus::index {
class IndexBase {
public:
IndexBase() = default;
virtual ~IndexBase() = default;
virtual BinarySet
......@@ -53,7 +58,28 @@ class IndexBase {
virtual BinarySet
Upload(const Config& config = {}) = 0;
bool
IsMmapSupported() const {
return index_type_ == knowhere::IndexEnum::INDEX_HNSW ||
// index_type_ == knowhere::IndexEnum::INDEX_FAISS_IVFFLAT || IVF_FLAT is not supported as it doesn't stores the vectors
index_type_ == knowhere::IndexEnum::INDEX_FAISS_IVFFLAT_CC ||
index_type_ == knowhere::IndexEnum::INDEX_FAISS_IVFPQ ||
index_type_ == knowhere::IndexEnum::INDEX_FAISS_IVFSQ8 ||
index_type_ == knowhere::IndexEnum::INDEX_FAISS_BIN_IVFFLAT ||
index_type_ == knowhere::IndexEnum::INDEX_FAISS_IDMAP ||
index_type_ == knowhere::IndexEnum::INDEX_FAISS_BIN_IDMAP;
}
const IndexType&
Type() const {
return index_type_;
}
protected:
explicit IndexBase(IndexType index_type)
: index_type_(std::move(index_type)) {
}
IndexType index_type_ = "";
};
......
......@@ -15,6 +15,9 @@
// limitations under the License.
#include <algorithm>
#include <cerrno>
#include <cstring>
#include <filesystem>
#include <string>
#include <tuple>
#include <unordered_map>
......@@ -25,11 +28,13 @@
#include "index/Utils.h"
#include "index/Meta.h"
#include <google/protobuf/text_format.h>
#include <unistd.h>
#include "exceptions/EasyAssert.h"
#include "knowhere/comp/index_param.h"
#include "common/Slice.h"
#include "storage/FieldData.h"
#include "storage/Util.h"
#include "utils/File.h"
namespace milvus::index {
......
......@@ -35,7 +35,7 @@ class VectorIndex : public IndexBase {
public:
explicit VectorIndex(const IndexType& index_type,
const MetricType& metric_type)
: index_type_(index_type), metric_type_(metric_type) {
: IndexBase(index_type), metric_type_(metric_type) {
}
public:
......@@ -87,7 +87,6 @@ class VectorIndex : public IndexBase {
}
private:
IndexType index_type_;
MetricType metric_type_;
int64_t dim_;
};
......
......@@ -16,17 +16,21 @@
#include "index/VectorMemIndex.h"
#include <unistd.h>
#include <cmath>
#include <filesystem>
#include <memory>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include "fmt/format.h"
#include "index/Index.h"
#include "index/Meta.h"
#include "index/Utils.h"
#include "exceptions/EasyAssert.h"
#include "config/ConfigKnowhere.h"
#include "knowhere/factory.h"
#include "knowhere/comp/time_recorder.h"
#include "common/BitsetView.h"
......@@ -39,6 +43,7 @@
#include "storage/MemFileManagerImpl.h"
#include "storage/ThreadPools.h"
#include "storage/Util.h"
#include "utils/File.h"
namespace milvus::index {
......@@ -101,6 +106,10 @@ VectorMemIndex::Load(const BinarySet& binary_set, const Config& config) {
void
VectorMemIndex::Load(const Config& config) {
if (config.contains(kMmapFilepath)) {
return LoadFromFile(config);
}
auto index_files =
GetValueFromConfig<std::vector<std::string>>(config, "index_files");
AssertInfo(index_files.has_value(),
......@@ -368,5 +377,120 @@ VectorMemIndex::GetVector(const DatasetPtr dataset) const {
memcpy(raw_data.data(), tensor, data_size);
return raw_data;
}
void
VectorMemIndex::LoadFromFile(const Config& config) {
auto filepath = GetValueFromConfig<std::string>(config, kMmapFilepath);
AssertInfo(filepath.has_value(), "mmap filepath is empty when load index");
std::filesystem::create_directories(
std::filesystem::path(filepath.value()).parent_path());
auto file = File::Open(filepath.value(), O_CREAT | O_TRUNC | O_RDWR);
auto index_files =
GetValueFromConfig<std::vector<std::string>>(config, "index_files");
AssertInfo(index_files.has_value(),
"index file paths is empty when load index");
std::unordered_set<std::string> pending_index_files(index_files->begin(),
index_files->end());
LOG_SEGCORE_INFO_ << "load index files: " << index_files.value().size();
auto parallel_degree =
static_cast<uint64_t>(DEFAULT_FIELD_MAX_MEMORY_LIMIT / FILE_SLICE_SIZE);
// try to read slice meta first
std::string slice_meta_filepath;
for (auto& file : pending_index_files) {
auto file_name = file.substr(file.find_last_of('/') + 1);
if (file_name == INDEX_FILE_SLICE_META) {
slice_meta_filepath = file;
pending_index_files.erase(file);
break;
}
}
LOG_SEGCORE_INFO_ << "load with slice meta: "
<< !slice_meta_filepath.empty();
if (!slice_meta_filepath
.empty()) { // load with the slice meta info, then we can load batch by batch
std::string index_file_prefix = slice_meta_filepath.substr(
0, slice_meta_filepath.find_last_of('/') + 1);
std::vector<std::string> batch{};
batch.reserve(parallel_degree);
auto result = file_manager_->LoadIndexToMemory({slice_meta_filepath});
auto raw_slice_meta = result[INDEX_FILE_SLICE_META];
Config meta_data = Config::parse(
std::string(static_cast<const char*>(raw_slice_meta->Data()),
raw_slice_meta->Size()));
for (auto& item : meta_data[META]) {
std::string prefix = item[NAME];
int slice_num = item[SLICE_NUM];
auto total_len = static_cast<size_t>(item[TOTAL_LEN]);
auto HandleBatch = [&](int index) {
auto batch_data = file_manager_->LoadIndexToMemory(batch);
for (int j = index - batch.size() + 1; j <= index; j++) {
std::string file_name = GenSlicedFileName(prefix, j);
AssertInfo(batch_data.find(file_name) != batch_data.end(),
"lost index slice data");
auto data = batch_data[file_name];
auto written = file.Write(data->Data(), data->Size());
AssertInfo(
written == data->Size(),
fmt::format("failed to write index data to disk {}: {}",
filepath->data(),
strerror(errno)));
}
for (auto& file : batch) {
pending_index_files.erase(file);
}
batch.clear();
};
for (auto i = 0; i < slice_num; ++i) {
std::string file_name = GenSlicedFileName(prefix, i);
batch.push_back(index_file_prefix + file_name);
if (batch.size() >= parallel_degree) {
HandleBatch(i);
}
}
if (batch.size() > 0) {
HandleBatch(slice_num - 1);
}
}
} else {
auto result = file_manager_->LoadIndexToMemory(std::vector<std::string>(
pending_index_files.begin(), pending_index_files.end()));
for (auto& [_, index_data] : result) {
file.Write(index_data->Data(), index_data->Size());
}
}
file.Close();
LOG_SEGCORE_INFO_ << "load index into Knowhere...";
auto conf = config;
conf.erase(kMmapFilepath);
auto stat = index_.DeserializeFromFile(filepath.value(), conf);
if (stat != knowhere::Status::success) {
PanicCodeInfo(ErrorCodeEnum::UnexpectedError,
fmt::format("failed to Deserialize index: {}",
KnowhereStatusString(stat)));
}
auto dim = index_.Dim();
this->SetDim(index_.Dim());
auto ok = unlink(filepath->data());
AssertInfo(ok == 0,
fmt::format("failed to unlink mmap index file {}: {}",
filepath.value(),
strerror(errno)));
LOG_SEGCORE_INFO_ << "load vector index done";
}
} // namespace milvus::index
......@@ -75,6 +75,10 @@ class VectorMemIndex : public VectorIndex {
virtual void
LoadWithoutAssemble(const BinarySet& binary_set, const Config& config);
private:
void
LoadFromFile(const Config& config);
protected:
Config config_;
knowhere::Index<knowhere::IndexNode> index_;
......
......@@ -26,13 +26,14 @@
#include "exceptions/EasyAssert.h"
#include "fmt/format.h"
#include "mmap/Utils.h"
#include "utils/File.h"
namespace milvus {
#ifdef MAP_POPULATE
static int mmap_flags = MAP_PRIVATE | MAP_POPULATE;
static int mmap_flags = MAP_SHARED | MAP_POPULATE;
#else
static int mmap_flags = MAP_PRIVATE;
static int mmap_flags = MAP_SHARED;
#endif
class ColumnBase {
......@@ -64,7 +65,7 @@ class ColumnBase {
}
// mmap mode ctor
ColumnBase(int fd, size_t size, const FieldMeta& field_meta) {
ColumnBase(const File& file, size_t size, const FieldMeta& field_meta) {
padding_ = field_meta.get_data_type() == DataType::JSON
? simdjson::SIMDJSON_PADDING
: 0;
......@@ -72,7 +73,7 @@ class ColumnBase {
len_ = size;
size_ = size + padding_;
data_ = static_cast<char*>(
mmap(nullptr, size_, PROT_READ, mmap_flags, fd, 0));
mmap(nullptr, size_, PROT_READ, mmap_flags, file.Descriptor(), 0));
#ifndef MAP_POPULATE
// Manually access the mapping to populate it
const size_t page_size = getpagesize();
......@@ -164,8 +165,8 @@ class Column : public ColumnBase {
}
// mmap mode ctor
Column(int fd, size_t size, const FieldMeta& field_meta)
: ColumnBase(fd, size, field_meta),
Column(const File& file, size_t size, const FieldMeta& field_meta)
: ColumnBase(file, size, field_meta),
num_rows_(size / field_meta.get_sizeof()) {
}
......@@ -202,8 +203,8 @@ class VariableColumn : public ColumnBase {
}
// mmap mode ctor
VariableColumn(int fd, size_t size, const FieldMeta& field_meta)
: ColumnBase(fd, size, field_meta) {
VariableColumn(const File& file, size_t size, const FieldMeta& field_meta)
: ColumnBase(file, size, field_meta) {
}
VariableColumn(VariableColumn&& column) noexcept
......
......@@ -27,6 +27,7 @@
#include "common/FieldMeta.h"
#include "mmap/Types.h"
#include "storage/Util.h"
#include "utils/File.h"
namespace milvus {
......@@ -77,7 +78,9 @@ FillField(DataType data_type, const storage::FieldDataPtr data, void* dst) {
}
inline size_t
WriteFieldData(int fd, DataType data_type, const storage::FieldDataPtr& data) {
WriteFieldData(File& file,
DataType data_type,
const storage::FieldDataPtr& data) {
size_t total_written{0};
if (datatype_is_variable(data_type)) {
switch (data_type) {
......@@ -86,7 +89,7 @@ WriteFieldData(int fd, DataType data_type, const storage::FieldDataPtr& data) {
for (auto i = 0; i < data->get_num_rows(); ++i) {
auto str =
static_cast<const std::string*>(data->RawValue(i));
ssize_t written = write(fd, str->data(), str->size());
ssize_t written = file.Write(str->data(), str->size());
if (written < str->size()) {
break;
}
......@@ -99,7 +102,7 @@ WriteFieldData(int fd, DataType data_type, const storage::FieldDataPtr& data) {
auto padded_string =
static_cast<const Json*>(data->RawValue(i))->data();
ssize_t written =
write(fd, padded_string.data(), padded_string.size());
file.Write(padded_string.data(), padded_string.size());
if (written < padded_string.size()) {
break;
}
......@@ -112,7 +115,7 @@ WriteFieldData(int fd, DataType data_type, const storage::FieldDataPtr& data) {
datatype_name(data_type)));
}
} else {
total_written += write(fd, data->Data(), data->Size());
total_written += file.Write(data->Data(), data->Size());
}
return total_written;
......
......@@ -13,6 +13,7 @@
#include <string>
#include "common/QueryInfo.h"
#include "common/Types.h"
#include "query/SearchBruteForce.h"
#include "query/SearchOnSealed.h"
#include "query/helper.h"
......
......@@ -35,6 +35,7 @@
#include "storage/FieldData.h"
#include "storage/Util.h"
#include "storage/ThreadPools.h"
#include "utils/File.h"
namespace milvus::segcore {
......@@ -335,10 +336,7 @@ SegmentSealedImpl::MapFieldData(const FieldId field_id, FieldDataInfo& data) {
auto dir = filepath.parent_path();
std::filesystem::create_directories(dir);
int fd =
open(filepath.c_str(), O_CREAT | O_TRUNC | O_RDWR, S_IRUSR | S_IWUSR);
AssertInfo(fd != -1,
fmt::format("failed to create mmap file {}", filepath.c_str()));
auto file = File::Open(filepath.string(), O_CREAT | O_TRUNC | O_RDWR);
auto& field_meta = (*schema_)[field_id];
auto data_type = field_meta.get_data_type();
......@@ -350,7 +348,7 @@ SegmentSealedImpl::MapFieldData(const FieldId field_id, FieldDataInfo& data) {
storage::FieldDataPtr field_data;
while (data.channel->pop(field_data)) {
data_size += field_data->Size();
auto written = WriteFieldData(fd, data_type, field_data);
auto written = WriteFieldData(file, data_type, field_data);
if (written != field_data->Size()) {
break;
}
......@@ -369,11 +367,6 @@ SegmentSealedImpl::MapFieldData(const FieldId field_id, FieldDataInfo& data) {
total_written,
data_size,
strerror(errno)));
int ok = fsync(fd);
AssertInfo(ok == 0,
fmt::format("failed to fsync mmap data file {}, err: {}",
filepath.c_str(),
strerror(errno)));
auto num_rows = data.row_count;
std::shared_ptr<ColumnBase> column{};
......@@ -382,7 +375,7 @@ SegmentSealedImpl::MapFieldData(const FieldId field_id, FieldDataInfo& data) {
case milvus::DataType::STRING:
case milvus::DataType::VARCHAR: {
auto var_column = std::make_shared<VariableColumn<std::string>>(
fd, total_written, field_meta);
file, total_written, field_meta);
var_column->Seal(std::move(indices));
column = std::move(var_column);
break;
......@@ -390,7 +383,7 @@ SegmentSealedImpl::MapFieldData(const FieldId field_id, FieldDataInfo& data) {
case milvus::DataType::JSON: {
auto var_column =
std::make_shared<VariableColumn<milvus::Json>>(
fd, total_written, field_meta);
file, total_written, field_meta);
var_column->Seal(std::move(indices));
column = std::move(var_column);
break;
......@@ -399,7 +392,7 @@ SegmentSealedImpl::MapFieldData(const FieldId field_id, FieldDataInfo& data) {
}
}
} else {
column = std::make_shared<Column>(fd, total_written, field_meta);
column = std::make_shared<Column>(file, total_written, field_meta);
}
{
......@@ -407,16 +400,11 @@ SegmentSealedImpl::MapFieldData(const FieldId field_id, FieldDataInfo& data) {
fields_.emplace(field_id, column);
}
ok = unlink(filepath.c_str());
auto ok = unlink(filepath.c_str());
AssertInfo(ok == 0,
fmt::format("failed to unlink mmap data file {}, err: {}",
filepath.c_str(),
strerror(errno)));
ok = close(fd);
AssertInfo(ok == 0,
fmt::format("failed to close data file {}, err: {}",
filepath.c_str(),
strerror(errno)));
// set pks to offset
if (schema_->get_primary_field_id() == field_id) {
......
......@@ -35,6 +35,7 @@ struct LoadIndexInfo {
int64_t segment_id;
int64_t field_id;
DataType field_type;
std::string mmap_dir_path;
int64_t index_id;
int64_t index_build_id;
int64_t index_version;
......
......@@ -12,9 +12,12 @@
#include "segcore/load_index_c.h"
#include "common/FieldMeta.h"
#include "exceptions/EasyAssert.h"
#include "index/Index.h"
#include "index/IndexFactory.h"
#include "index/Meta.h"
#include "index/Utils.h"
#include "log/Log.h"
#include "segcore/Types.h"
#include "storage/Util.h"
#include "storage/RemoteChunkManagerSingleton.h"
......@@ -74,7 +77,8 @@ AppendFieldInfo(CLoadIndexInfo c_load_index_info,
int64_t partition_id,
int64_t segment_id,
int64_t field_id,
enum CDataType field_type) {
enum CDataType field_type,
const char* mmap_dir_path) {
try {
auto load_index_info =
(milvus::segcore::LoadIndexInfo*)c_load_index_info;
......@@ -83,6 +87,7 @@ AppendFieldInfo(CLoadIndexInfo c_load_index_info,
load_index_info->segment_id = segment_id;
load_index_info->field_id = field_id;
load_index_info->field_type = milvus::DataType(field_type);
load_index_info->mmap_dir_path = std::string(mmap_dir_path);
auto status = CStatus();
status.error_code = Success;
......@@ -250,6 +255,18 @@ AppendIndexV2(CLoadIndexInfo c_load_index_info) {
load_index_info->index =
milvus::index::IndexFactory::GetInstance().CreateIndex(
index_info, file_manager);
if (!load_index_info->mmap_dir_path.empty() &&
load_index_info->index->IsMmapSupported()) {
auto filepath =
std::filesystem::path(load_index_info->mmap_dir_path) /
std::to_string(load_index_info->segment_id) /
std::to_string(load_index_info->field_id) /
std::to_string(load_index_info->index_id);
config[kMmapFilepath] = filepath.string();
}
load_index_info->index->Load(config);
auto status = CStatus();
status.error_code = Success;
......
......@@ -40,7 +40,8 @@ AppendFieldInfo(CLoadIndexInfo c_load_index_info,
int64_t partition_id,
int64_t segment_id,
int64_t field_id,
enum CDataType field_type);
enum CDataType field_type,
const char* mmap_dir_path);
CStatus
AppendIndexInfo(CLoadIndexInfo c_load_index_info,
......
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include <string>
#include "exceptions/EasyAssert.h"
#include "fmt/core.h"
#include <fcntl.h>
#include <unistd.h>
namespace milvus {
class File {
public:
File(const File& file) = delete;
File(File&& file) noexcept {
fd_ = file.fd_;
file.fd_ = -1;
}
~File() {
if (fd_ >= 0) {
close(fd_);
}
}
static File
Open(const std::string_view filepath, int flags) {
int fd = open(filepath.data(), flags, S_IRUSR | S_IWUSR);
AssertInfo(fd != -1,
fmt::format("failed to create mmap file {}: {}",
filepath,
strerror(errno)));
return File(fd);
}
int
Descriptor() const {
return fd_;
}
ssize_t
Write(const void* buf, size_t size) {
return write(fd_, buf, size);
}
void
Close() {
close(fd_);
fd_ = -1;
}
private:
explicit File(int fd) : fd_(fd) {
}
int fd_{-1};
};
} // namespace milvus
\ No newline at end of file
......@@ -1690,8 +1690,8 @@ TEST(CApiTest, LoadIndexInfo) {
c_load_index_info, index_param_key2.data(), index_param_value2.data());
ASSERT_EQ(status.error_code, Success);
std::string field_name = "field0";
status =
AppendFieldInfo(c_load_index_info, 0, 0, 0, 0, CDataType::FloatVector);
status = AppendFieldInfo(
c_load_index_info, 0, 0, 0, 0, CDataType::FloatVector, "");
ASSERT_EQ(status.error_code, Success);
status = AppendIndex(c_load_index_info, c_binary_set);
ASSERT_EQ(status.error_code, Success);
......@@ -1874,7 +1874,8 @@ TEST(CApiTest, Indexing_Without_Predicate) {
c_load_index_info, index_type_key.c_str(), index_type_value.c_str());
AppendIndexParam(
c_load_index_info, metric_type_key.c_str(), metric_type_value.c_str());
AppendFieldInfo(c_load_index_info, 0, 0, 0, 100, CDataType::FloatVector);
AppendFieldInfo(
c_load_index_info, 0, 0, 0, 100, CDataType::FloatVector, "");
AppendIndex(c_load_index_info, (CBinarySet)&binary_set);
// load index for vec field, load raw data for scalar field
......@@ -2021,7 +2022,8 @@ TEST(CApiTest, Indexing_Expr_Without_Predicate) {
c_load_index_info, index_type_key.c_str(), index_type_value.c_str());
AppendIndexParam(
c_load_index_info, metric_type_key.c_str(), metric_type_value.c_str());
AppendFieldInfo(c_load_index_info, 0, 0, 0, 100, CDataType::FloatVector);
AppendFieldInfo(
c_load_index_info, 0, 0, 0, 100, CDataType::FloatVector, "");
AppendIndex(c_load_index_info, (CBinarySet)&binary_set);
// load index for vec field, load raw data for scalar field
......@@ -2225,7 +2227,8 @@ TEST(CApiTest, Indexing_With_float_Predicate_Range) {
c_load_index_info, index_type_key.c_str(), index_type_value.c_str());
AppendIndexParam(
c_load_index_info, metric_type_key.c_str(), metric_type_value.c_str());
AppendFieldInfo(c_load_index_info, 0, 0, 0, 100, CDataType::FloatVector);
AppendFieldInfo(
c_load_index_info, 0, 0, 0, 100, CDataType::FloatVector, "");
AppendIndex(c_load_index_info, (CBinarySet)&binary_set);
// load index for vec field, load raw data for scalar field
......@@ -2401,7 +2404,8 @@ TEST(CApiTest, Indexing_Expr_With_float_Predicate_Range) {
c_load_index_info, index_type_key.c_str(), index_type_value.c_str());
AppendIndexParam(
c_load_index_info, metric_type_key.c_str(), metric_type_value.c_str());
AppendFieldInfo(c_load_index_info, 0, 0, 0, 100, CDataType::FloatVector);
AppendFieldInfo(
c_load_index_info, 0, 0, 0, 100, CDataType::FloatVector, "");
AppendIndex(c_load_index_info, (CBinarySet)&binary_set);
// load index for vec field, load raw data for scalar field
......@@ -2596,7 +2600,8 @@ TEST(CApiTest, Indexing_With_float_Predicate_Term) {
c_load_index_info, index_type_key.c_str(), index_type_value.c_str());
AppendIndexParam(
c_load_index_info, metric_type_key.c_str(), metric_type_value.c_str());
AppendFieldInfo(c_load_index_info, 0, 0, 0, 100, CDataType::FloatVector);
AppendFieldInfo(
c_load_index_info, 0, 0, 0, 100, CDataType::FloatVector, "");
AppendIndex(c_load_index_info, (CBinarySet)&binary_set);
// load index for vec field, load raw data for scalar field
......@@ -2765,7 +2770,8 @@ TEST(CApiTest, Indexing_Expr_With_float_Predicate_Term) {
c_load_index_info, index_type_key.c_str(), index_type_value.c_str());
AppendIndexParam(
c_load_index_info, metric_type_key.c_str(), metric_type_value.c_str());
AppendFieldInfo(c_load_index_info, 0, 0, 0, 100, CDataType::FloatVector);
AppendFieldInfo(
c_load_index_info, 0, 0, 0, 100, CDataType::FloatVector, "");
AppendIndex(c_load_index_info, (CBinarySet)&binary_set);
// load index for vec field, load raw data for scalar field
......@@ -2967,7 +2973,8 @@ TEST(CApiTest, Indexing_With_binary_Predicate_Range) {
c_load_index_info, index_type_key.c_str(), index_type_value.c_str());
AppendIndexParam(
c_load_index_info, metric_type_key.c_str(), metric_type_value.c_str());
AppendFieldInfo(c_load_index_info, 0, 0, 0, 100, CDataType::BinaryVector);
AppendFieldInfo(
c_load_index_info, 0, 0, 0, 100, CDataType::BinaryVector, "");
AppendIndex(c_load_index_info, (CBinarySet)&binary_set);
// load index for vec field, load raw data for scalar field
......@@ -3142,7 +3149,8 @@ TEST(CApiTest, Indexing_Expr_With_binary_Predicate_Range) {
c_load_index_info, index_type_key.c_str(), index_type_value.c_str());
AppendIndexParam(
c_load_index_info, metric_type_key.c_str(), metric_type_value.c_str());
AppendFieldInfo(c_load_index_info, 0, 0, 0, 100, CDataType::BinaryVector);
AppendFieldInfo(
c_load_index_info, 0, 0, 0, 100, CDataType::BinaryVector, "");
AppendIndex(c_load_index_info, (CBinarySet)&binary_set);
// load index for vec field, load raw data for scalar field
......@@ -3338,7 +3346,8 @@ TEST(CApiTest, Indexing_With_binary_Predicate_Term) {
c_load_index_info, index_type_key.c_str(), index_type_value.c_str());
AppendIndexParam(
c_load_index_info, metric_type_key.c_str(), metric_type_value.c_str());
AppendFieldInfo(c_load_index_info, 0, 0, 0, 100, CDataType::BinaryVector);
AppendFieldInfo(
c_load_index_info, 0, 0, 0, 100, CDataType::BinaryVector, "");
AppendIndex(c_load_index_info, (CBinarySet)&binary_set);
// load index for vec field, load raw data for scalar field
......@@ -3529,7 +3538,8 @@ TEST(CApiTest, Indexing_Expr_With_binary_Predicate_Term) {
c_load_index_info, index_type_key.c_str(), index_type_value.c_str());
AppendIndexParam(
c_load_index_info, metric_type_key.c_str(), metric_type_value.c_str());
AppendFieldInfo(c_load_index_info, 0, 0, 0, 100, CDataType::BinaryVector);
AppendFieldInfo(
c_load_index_info, 0, 0, 0, 100, CDataType::BinaryVector, "");
AppendIndex(c_load_index_info, (CBinarySet)&binary_set);
// load index for vec field, load raw data for scalar field
......@@ -3730,7 +3740,8 @@ TEST(CApiTest, SealedSegment_search_float_Predicate_Range) {
c_load_index_info, index_type_key.c_str(), index_type_value.c_str());
AppendIndexParam(
c_load_index_info, metric_type_key.c_str(), metric_type_value.c_str());
AppendFieldInfo(c_load_index_info, 0, 0, 0, 100, CDataType::FloatVector);
AppendFieldInfo(
c_load_index_info, 0, 0, 0, 100, CDataType::FloatVector, "");
AppendIndex(c_load_index_info, (CBinarySet)&binary_set);
auto load_index_info = (LoadIndexInfo*)c_load_index_info;
......@@ -3974,7 +3985,8 @@ TEST(CApiTest, SealedSegment_search_float_With_Expr_Predicate_Range) {
c_load_index_info, index_type_key.c_str(), index_type_value.c_str());
AppendIndexParam(
c_load_index_info, metric_type_key.c_str(), metric_type_value.c_str());
AppendFieldInfo(c_load_index_info, 0, 0, 0, 100, CDataType::FloatVector);
AppendFieldInfo(
c_load_index_info, 0, 0, 0, 100, CDataType::FloatVector, "");
AppendIndex(c_load_index_info, (CBinarySet)&binary_set);
// load vec index
......
......@@ -200,7 +200,7 @@ TEST(CBoolIndexTest, All) {
{ DeleteBinarySet(binary_set); }
}
delete[](char*)(half_ds->GetTensor());
delete[] (char*)(half_ds->GetTensor());
}
// TODO: more scalar type.
......@@ -317,6 +317,6 @@ TEST(CStringIndexTest, All) {
{ DeleteBinarySet(binary_set); }
}
delete[](char*)(str_ds->GetTensor());
delete[] (char*)(str_ds->GetTensor());
}
#endif
......@@ -16,10 +16,12 @@
#include <string>
#include <vector>
#include "knowhere/comp/index_param.h"
#include "query/SearchBruteForce.h"
#include "segcore/Reduce.h"
#include "index/IndexFactory.h"
#include "common/QueryResult.h"
#include "segcore/Types.h"
#include "test_utils/indexbuilder_test_utils.h"
#include "test_utils/DataGen.h"
#include "test_utils/Timer.h"
......@@ -290,6 +292,10 @@ class IndexTest : public ::testing::TestWithParam<Param> {
auto param = GetParam();
index_type = param.first;
metric_type = param.second;
NB = 10000;
if (index_type == knowhere::IndexEnum::INDEX_HNSW) {
NB = 270000;
}
build_conf = generate_build_conf(index_type, metric_type);
load_conf = generate_load_conf(index_type, metric_type, NB);
search_conf = generate_search_conf(index_type, metric_type);
......@@ -418,6 +424,61 @@ TEST_P(IndexTest, BuildAndQuery) {
vec_index->Query(xq_dataset, search_info, nullptr);
}
TEST_P(IndexTest, Mmap) {
milvus::index::CreateIndexInfo create_index_info;
create_index_info.index_type = index_type;
create_index_info.metric_type = metric_type;
create_index_info.field_type = vec_field_data_type;
index::IndexBasePtr index;
milvus::storage::FieldDataMeta field_data_meta{1, 2, 3, 100};
milvus::storage::IndexMeta index_meta{3, 100, 1000, 1};
auto chunk_manager = milvus::storage::CreateChunkManager(storage_config_);
auto file_manager = milvus::storage::CreateFileManager(
index_type, field_data_meta, index_meta, chunk_manager);
index = milvus::index::IndexFactory::GetInstance().CreateIndex(
create_index_info, file_manager);
ASSERT_NO_THROW(index->BuildWithDataset(xb_dataset, build_conf));
milvus::index::IndexBasePtr new_index;
milvus::index::VectorIndex* vec_index = nullptr;
auto binary_set = index->Upload();
index.reset();
new_index = milvus::index::IndexFactory::GetInstance().CreateIndex(
create_index_info, file_manager);
if (!new_index->IsMmapSupported()) {
return;
}
vec_index = dynamic_cast<milvus::index::VectorIndex*>(new_index.get());
std::vector<std::string> index_files;
for (auto& binary : binary_set.binary_map_) {
index_files.emplace_back(binary.first);
}
load_conf["index_files"] = index_files;
load_conf["mmap_filepath"] = "mmap/test_index_mmap_" + index_type;
vec_index->Load(load_conf);
EXPECT_EQ(vec_index->Count(), NB);
EXPECT_EQ(vec_index->GetDim(), DIM);
milvus::SearchInfo search_info;
search_info.topk_ = K;
search_info.metric_type_ = metric_type;
search_info.search_params_ = search_conf;
auto result = vec_index->Query(xq_dataset, search_info, nullptr);
EXPECT_EQ(result->total_nq_, NQ);
EXPECT_EQ(result->unity_topK_, K);
EXPECT_EQ(result->distances_.size(), NQ * K);
EXPECT_EQ(result->seg_offsets_.size(), NQ * K);
if (!is_binary) {
EXPECT_EQ(result->seg_offsets_[0], query_offset);
}
search_info.search_params_ = range_search_conf;
vec_index->Query(xq_dataset, search_info, nullptr);
}
TEST_P(IndexTest, GetVector) {
milvus::index::CreateIndexInfo create_index_info;
create_index_info.index_type = index_type;
......
......@@ -60,7 +60,8 @@ func (li *LoadIndexInfo) appendLoadIndexInfo(indexInfo *querypb.FieldIndexInfo,
fieldID := indexInfo.FieldID
indexPaths := indexInfo.IndexFilePaths
err := li.appendFieldInfo(collectionID, partitionID, segmentID, fieldID, fieldType)
mmapDirPath := paramtable.Get().QueryNodeCfg.MmapDirPath.GetValue()
err := li.appendFieldInfo(collectionID, partitionID, segmentID, fieldID, fieldType, mmapDirPath)
if err != nil {
return err
}
......@@ -123,13 +124,15 @@ func (li *LoadIndexInfo) appendIndexFile(filePath string) error {
}
// appendFieldInfo appends fieldID & fieldType to index
func (li *LoadIndexInfo) appendFieldInfo(collectionID int64, partitionID int64, segmentID int64, fieldID int64, fieldType schemapb.DataType) error {
func (li *LoadIndexInfo) appendFieldInfo(collectionID int64, partitionID int64, segmentID int64, fieldID int64, fieldType schemapb.DataType, mmapDirPath string) error {
cColID := C.int64_t(collectionID)
cParID := C.int64_t(partitionID)
cSegID := C.int64_t(segmentID)
cFieldID := C.int64_t(fieldID)
cintDType := uint32(fieldType)
status := C.AppendFieldInfo(li.cLoadIndexInfo, cColID, cParID, cSegID, cFieldID, cintDType)
cMmapDirPath := C.CString(mmapDirPath)
defer C.free(unsafe.Pointer(cMmapDirPath))
status := C.AppendFieldInfo(li.cLoadIndexInfo, cColID, cParID, cSegID, cFieldID, cintDType, cMmapDirPath)
return HandleCStatus(&status, "AppendFieldInfo failed")
}
......
......@@ -311,7 +311,7 @@ func (loader *segmentLoader) requestResource(ctx context.Context, infos ...*quer
loader.mut.Lock()
defer loader.mut.Unlock()
poolCap := runtime.NumCPU() * paramtable.Get().CommonCfg.MiddlePriorityThreadCoreCoefficient.GetAsInt()
poolCap := runtime.NumCPU() * paramtable.Get().CommonCfg.HighPriorityThreadCoreCoefficient.GetAsInt()
if loader.committedResource.WorkNum >= poolCap {
return resource, 0, merr.WrapErrServiceRequestLimitExceeded(int32(poolCap))
} else if loader.committedResource.MemorySize+memoryUsage >= totalMemory {
......
......@@ -223,11 +223,6 @@ func (node *QueryNode) InitSegcore() error {
localDataRootPath := filepath.Join(paramtable.Get().LocalStorageCfg.Path.GetValue(), typeutil.QueryNodeRole)
initcore.InitLocalChunkManager(localDataRootPath)
mmapDirPath := paramtable.Get().QueryNodeCfg.MmapDirPath.GetValue()
if len(mmapDirPath) > 0 {
log.Info("mmap enabled", zap.String("dir", mmapDirPath))
}
initcore.InitTraceConfig(paramtable.Get())
return initcore.InitRemoteChunkManager(paramtable.Get())
}
......@@ -366,12 +361,6 @@ func (node *QueryNode) Start() error {
paramtable.SetUpdateTime(time.Now())
mmapDirPath := paramtable.Get().QueryNodeCfg.MmapDirPath.GetValue()
mmapEnabled := len(mmapDirPath) > 0
if mmapEnabled {
err := os.MkdirAll(mmapDirPath, 0666)
if err != nil {
panic(err)
}
}
node.UpdateStateCode(commonpb.StateCode_Healthy)
log.Info("query node start successfully",
zap.Int64("queryNodeID", paramtable.GetNodeID()),
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册