未验证 提交 824ff3e1 编写于 作者: G groot 提交者: GitHub

build index (#3058)

* build index
Signed-off-by: Nyhmo <yihua.mo@zilliz.com>

* fix some problem
Signed-off-by: Nyhmo <yihua.mo@zilliz.com>

* build index ut
Signed-off-by: Nyhmo <yihua.mo@zilliz.com>

* typo
Signed-off-by: Nyhmo <yihua.mo@zilliz.com>

* refine code
Signed-off-by: Nyhmo <yihua.mo@zilliz.com>

* fix ut
Signed-off-by: Nyhmo <yihua.mo@zilliz.com>

* typo
Signed-off-by: Nyhmo <yihua.mo@zilliz.com>

* avoid ut error
Signed-off-by: Nyhmo <yihua.mo@zilliz.com>
上级 d91facc8
......@@ -103,7 +103,7 @@ if (BUILD_UNIT_TEST STREQUAL "ON")
if (BUILD_COVERAGE STREQUAL "ON")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fprofile-arcs -ftest-coverage")
endif ()
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DELPP_DISABLE_LOGS")
# set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DELPP_DISABLE_LOGS")
add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/unittest)
endif ()
......
......@@ -30,7 +30,7 @@ class DeletedDocsFormat {
public:
DeletedDocsFormat() = default;
std::string
static std::string
FilePostfix();
void
......
......@@ -30,7 +30,7 @@ class IdBloomFilterFormat {
public:
IdBloomFilterFormat() = default;
std::string
static std::string
FilePostfix();
void
......
......@@ -32,7 +32,7 @@ class StructuredIndexFormat {
public:
StructuredIndexFormat() = default;
std::string
static std::string
FilePostfix();
void
......
......@@ -30,7 +30,7 @@ class VectorCompressFormat {
public:
VectorCompressFormat() = default;
std::string
static std::string
FilePostfix();
void
......
......@@ -31,7 +31,7 @@ class VectorIndexFormat {
public:
VectorIndexFormat() = default;
std::string
static std::string
FilePostfix();
void
......
......@@ -947,13 +947,20 @@ DBImpl::BackgroundBuildIndexTask(std::vector<std::string> collection_names) {
std::unique_lock<std::mutex> lock(build_index_mutex_);
for (auto collection_name : collection_names) {
SnapshotVisitor ss_visitor(collection_name);
snapshot::ScopedSnapshotT latest_ss;
auto status = snapshot::Snapshots::GetInstance().GetSnapshot(latest_ss, collection_name);
if (!status.ok()) {
return;
}
SnapshotVisitor ss_visitor(latest_ss);
snapshot::IDS_TYPE segment_ids;
ss_visitor.SegmentsToIndex("", segment_ids);
if (segment_ids.empty()) {
continue;
}
scheduler::BuildIndexJobPtr job =
std::make_shared<scheduler::BuildIndexJob>(options_, collection_name, segment_ids);
scheduler::BuildIndexJobPtr job = std::make_shared<scheduler::BuildIndexJob>(latest_ss, options_, segment_ids);
scheduler::JobMgrInst::GetInstance()->Put(job);
job->WaitFinish();
......
......@@ -11,7 +11,6 @@
#include "db/engine/EngineFactory.h"
#include "db/engine/ExecutionEngineImpl.h"
#include "db/snapshot/Snapshots.h"
#include "utils/Log.h"
#include <memory>
......@@ -22,42 +21,13 @@ namespace milvus {
namespace engine {
ExecutionEnginePtr
EngineFactory::Build(const std::string& dir_root, const std::string& collection_name, int64_t segment_id) {
snapshot::ScopedSnapshotT ss;
snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name);
auto seg_visitor = engine::SegmentVisitor::Build(ss, segment_id);
EngineFactory::Build(const engine::snapshot::ScopedSnapshotT& snapshot, const std::string& dir_root,
int64_t segment_id) {
auto seg_visitor = engine::SegmentVisitor::Build(snapshot, segment_id);
ExecutionEnginePtr execution_engine_ptr = std::make_shared<ExecutionEngineImpl>(dir_root, seg_visitor);
return execution_engine_ptr;
}
void
EngineFactory::GroupFieldsForIndex(const std::string& collection_name, TargetFieldGroups& field_groups) {
snapshot::ScopedSnapshotT ss;
auto status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name);
if (!status.ok()) {
LOG_ENGINE_ERROR_ << collection_name << " doesn't exist: " << status.message();
return;
}
std::set<std::string> structured_fields;
std::vector<std::string> field_names = ss->GetFieldNames();
for (auto& field_name : field_names) {
auto field = ss->GetField(field_name);
auto ftype = field->GetFtype();
if (ftype == meta::DataType::VECTOR_FLOAT || ftype == meta::DataType::VECTOR_BINARY) {
std::set<std::string> index_field = {field_name};
field_groups.emplace_back(index_field);
} else {
structured_fields.insert(field_name);
}
}
if (!structured_fields.empty()) {
field_groups.emplace_back(structured_fields);
}
}
} // namespace engine
} // namespace milvus
......@@ -12,6 +12,7 @@
#pragma once
#include "ExecutionEngine.h"
#include "db/snapshot/Snapshots.h"
#include "utils/Json.h"
#include "utils/Status.h"
......@@ -23,13 +24,7 @@ namespace engine {
class EngineFactory {
public:
static ExecutionEnginePtr
Build(const std::string& dir_root, const std::string& collection_name, int64_t segment_id);
// this method distribute fields to multiple groups:
// put structured fields into one group
// each vector field as a group
static void
GroupFieldsForIndex(const std::string& collection_name, TargetFieldGroups& field_groups);
Build(const engine::snapshot::ScopedSnapshotT& snapshot, const std::string& dir_root, int64_t segment_id);
};
} // namespace engine
......
......@@ -27,7 +27,6 @@ namespace milvus {
namespace engine {
using TargetFields = std::set<std::string>;
using TargetFieldGroups = std::vector<TargetFields>;
struct ExecutionEngineContext {
query::QueryPtr query_ptr_;
......
......@@ -101,20 +101,24 @@ ExecutionEngineImpl::CreateStructuredIndex(const milvus::engine::meta::DataType
index_ptr = std::static_pointer_cast<knowhere::Index>(int32_index_ptr);
break;
}
case engine::meta::DataType::UID:
case engine::meta::DataType::INT64: {
auto int64_index_ptr = std::make_shared<knowhere::StructuredIndexSort<int64_t>>(
raw_data.size(), reinterpret_cast<const int64_t*>(raw_data.data()));
index_ptr = std::static_pointer_cast<knowhere::Index>(int64_index_ptr);
break;
}
case engine::meta::DataType::FLOAT: {
auto float_index_ptr = std::make_shared<knowhere::StructuredIndexSort<float>>(
raw_data.size(), reinterpret_cast<const float*>(raw_data.data()));
index_ptr = std::static_pointer_cast<knowhere::Index>(float_index_ptr);
break;
}
case engine::meta::DataType::DOUBLE: {
auto double_index_ptr = std::make_shared<knowhere::StructuredIndexSort<double>>(
raw_data.size(), reinterpret_cast<const double*>(raw_data.data()));
index_ptr = std::static_pointer_cast<knowhere::Index>(double_index_ptr);
break;
}
default: { return Status(DB_ERROR, "Field is not structured type"); }
}
......@@ -487,11 +491,11 @@ ExecutionEngineImpl::BuildIndex() {
auto collection = snapshot->GetCollection();
auto& segment = segment_visitor->GetSegment();
for (auto& field_name : target_fields_) {
snapshot::OperationContext context;
context.prev_partition = snapshot->GetResource<snapshot::Partition>(segment->GetPartitionId());
auto build_op = std::make_shared<snapshot::ChangeSegmentFileOperation>(context, snapshot);
snapshot::OperationContext context;
context.prev_partition = snapshot->GetResource<snapshot::Partition>(segment->GetPartitionId());
auto build_op = std::make_shared<snapshot::ChangeSegmentFileOperation>(context, snapshot);
for (auto& field_name : target_fields_) {
// create snapshot segment files
CollectionIndex index_info;
auto status = CreateSnapshotIndexFile(build_op, field_name, index_info);
......@@ -504,7 +508,9 @@ ExecutionEngineImpl::BuildIndex() {
auto& field = field_visitor->GetField();
auto root_path = segment_reader_->GetRootPath();
auto segment_writer_ptr = std::make_shared<segment::SegmentWriter>(root_path, segment_visitor);
auto op_ctx = build_op->GetContext();
auto new_visitor = SegmentVisitor::Build(snapshot, segment, op_ctx.new_segment_files);
auto segment_writer_ptr = std::make_shared<segment::SegmentWriter>(root_path, new_visitor);
if (IsVectorField(field)) {
knowhere::VecIndexPtr new_index;
status = BuildKnowhereIndex(field_name, index_info, new_index);
......@@ -512,25 +518,32 @@ ExecutionEngineImpl::BuildIndex() {
return status;
}
segment_writer_ptr->SetVectorIndex(field_name, new_index);
rc.RecordSection("build index");
rc.RecordSection("build structured index");
// serialze index files
status = segment_writer_ptr->WriteVectorIndex(field_name);
if (!status.ok()) {
return status;
}
rc.RecordSection("serialize vector index");
} else {
knowhere::IndexPtr index_ptr;
segment_ptr->GetStructuredIndex(field_name, index_ptr);
segment_writer_ptr->SetStructuredIndex(field_name, index_ptr);
rc.RecordSection("build index");
}
rc.RecordSection("build structured index");
// serialze index files
status = segment_writer_ptr->WriteVectorIndex(field_name);
if (!status.ok()) {
return status;
// serialze index files
status = segment_writer_ptr->WriteStructuredIndex(field_name);
if (!status.ok()) {
return status;
}
rc.RecordSection("serialize structured index");
}
rc.RecordSection("serialize index");
// finish transaction
build_op->Push();
}
// finish transaction
build_op->Push();
return Status::OK();
}
......@@ -543,16 +556,26 @@ ExecutionEngineImpl::CreateSnapshotIndexFile(AddSegmentFileOperation& operation,
auto& field = field_visitor->GetField();
auto element_visitor = field_visitor->GetElementVisitor(engine::FieldElementType::FET_INDEX);
auto& index_element = element_visitor->GetElement();
if (element_visitor == nullptr) {
return Status(DB_ERROR, "Could not build index: index not specified"); // no index specified
}
auto& index_element = element_visitor->GetElement();
index_info.index_name_ = index_element->GetName();
auto params = index_element->GetParams();
if (params.find(engine::PARAM_INDEX_METRIC_TYPE) != params.end()) {
index_info.metric_name_ = params[engine::PARAM_INDEX_METRIC_TYPE];
}
if (params.find(engine::PARAM_INDEX_EXTRA_PARAMS) != params.end()) {
index_info.extra_params_ = params[engine::PARAM_INDEX_EXTRA_PARAMS];
}
snapshot::SegmentFilePtr seg_file = element_visitor->GetFile();
if (seg_file != nullptr) {
// index already build?
std::string file_path =
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(segment_reader_->GetRootPath(), seg_file);
std::string file_path = engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(
segment_reader_->GetCollectionsPath(), seg_file);
file_path += codec::VectorIndexFormat::FilePostfix();
if (CommonUtil::IsFileExist(file_path)) {
return Status(DB_ERROR, "Could not build index: Index file already exist"); // index already build
}
......@@ -563,6 +586,7 @@ ExecutionEngineImpl::CreateSnapshotIndexFile(AddSegmentFileOperation& operation,
sf_context.field_element_name = index_element->GetName();
sf_context.collection_id = segment->GetCollectionId();
sf_context.partition_id = segment->GetPartitionId();
sf_context.segment_id = segment->GetID();
auto status = operation->CommitNewSegmentFile(sf_context, seg_file);
if (!status.ok()) {
......@@ -589,6 +613,7 @@ ExecutionEngineImpl::CreateSnapshotIndexFile(AddSegmentFileOperation& operation,
sf_context.field_element_name = compress_element->GetName();
sf_context.collection_id = segment->GetCollectionId();
sf_context.partition_id = segment->GetPartitionId();
sf_context.segment_id = segment->GetID();
auto status = operation->CommitNewSegmentFile(sf_context, seg_file);
if (!status.ok()) {
......@@ -617,8 +642,8 @@ ExecutionEngineImpl::BuildKnowhereIndex(const std::string& field_name, const Col
}
// build index by knowhere
auto to_index = CreateVecIndex(index_info.index_name_);
if (!to_index) {
new_index = CreateVecIndex(index_info.index_name_);
if (!new_index) {
throw Exception(DB_ERROR, "Unsupported index type");
}
......@@ -628,24 +653,20 @@ ExecutionEngineImpl::BuildKnowhereIndex(const std::string& field_name, const Col
auto field_visitor = segment_visitor->GetFieldVisitor(field_name);
auto& field = field_visitor->GetField();
auto element_json = index_info.extra_params_;
auto metric_type = element_json[engine::PARAM_INDEX_METRIC_TYPE];
auto index_params = element_json[engine::PARAM_INDEX_EXTRA_PARAMS];
auto field_json = field->GetParams();
auto dimension = field_json[milvus::knowhere::meta::DIM];
auto segment_commit = snapshot->GetSegmentCommitBySegmentId(segment->GetID());
auto row_count = segment_commit->GetRowCount();
milvus::json conf = index_params;
milvus::json conf = index_info.extra_params_;
conf[knowhere::meta::DIM] = dimension;
conf[knowhere::meta::ROWS] = row_count;
conf[knowhere::meta::DEVICEID] = gpu_num_;
conf[knowhere::Metric::TYPE] = metric_type;
conf[knowhere::Metric::TYPE] = index_info.metric_name_;
LOG_ENGINE_DEBUG_ << "Index params: " << conf.dump();
auto adapter = knowhere::AdapterMgr::GetInstance().GetAdapter(to_index->index_type());
if (!adapter->CheckTrain(conf, to_index->index_mode())) {
auto adapter = knowhere::AdapterMgr::GetInstance().GetAdapter(new_index->index_type());
if (!adapter->CheckTrain(conf, new_index->index_mode())) {
throw Exception(DB_ERROR, "Illegal index params");
}
LOG_ENGINE_DEBUG_ << "Index config: " << conf.dump();
......@@ -655,28 +676,28 @@ ExecutionEngineImpl::BuildKnowhereIndex(const std::string& field_name, const Col
if (from_index) {
auto dataset =
knowhere::GenDatasetWithIds(row_count, dimension, from_index->GetRawVectors(), from_index->GetRawIds());
to_index->BuildAll(dataset, conf);
new_index->BuildAll(dataset, conf);
uids = from_index->GetUids();
blacklist = from_index->GetBlacklist();
} else if (bin_from_index) {
auto dataset = knowhere::GenDatasetWithIds(row_count, dimension, bin_from_index->GetRawVectors(),
bin_from_index->GetRawIds());
to_index->BuildAll(dataset, conf);
new_index->BuildAll(dataset, conf);
uids = bin_from_index->GetUids();
blacklist = bin_from_index->GetBlacklist();
}
#ifdef MILVUS_GPU_VERSION
/* for GPU index, need copy back to CPU */
if (to_index->index_mode() == knowhere::IndexMode::MODE_GPU) {
auto device_index = std::dynamic_pointer_cast<knowhere::GPUIndex>(to_index);
to_index = device_index->CopyGpuToCpu(conf);
if (new_index->index_mode() == knowhere::IndexMode::MODE_GPU) {
auto device_index = std::dynamic_pointer_cast<knowhere::GPUIndex>(new_index);
new_index = device_index->CopyGpuToCpu(conf);
}
#endif
to_index->SetUids(uids);
new_index->SetUids(uids);
if (blacklist != nullptr) {
to_index->SetBlacklist(blacklist);
new_index->SetBlacklist(blacklist);
}
return Status::OK();
......
......@@ -19,25 +19,56 @@
namespace milvus {
namespace scheduler {
BuildIndexJob::BuildIndexJob(engine::DBOptions options, const std::string& collection_name,
const engine::snapshot::IDS_TYPE& segment_ids)
: Job(JobType::BUILD), options_(std::move(options)), collection_name_(collection_name), segment_ids_(segment_ids) {
namespace {
// each vector field in one group
// all structured fields put into one group
void
WhichFieldsToBuild(const engine::snapshot::ScopedSnapshotT& snapshot, std::vector<engine::TargetFields>& field_groups) {
auto field_names = snapshot->GetFieldNames();
engine::TargetFields structured_fields;
for (auto& field_name : field_names) {
auto field = snapshot->GetField(field_name);
engine::FIELD_TYPE ftype = static_cast<engine::FIELD_TYPE>(field->GetFtype());
bool is_vector = (ftype == engine::FIELD_TYPE::VECTOR_FLOAT || ftype == engine::FIELD_TYPE::VECTOR_BINARY);
auto elements = snapshot->GetFieldElementsByField(field_name);
for (auto& element : elements) {
if (element->GetFtype() == engine::FieldElementType::FET_INDEX) {
// index has been defined
if (is_vector) {
engine::TargetFields fields = {field_name};
field_groups.emplace_back(fields);
} else {
structured_fields.insert(field_name);
}
break;
}
}
}
if (!structured_fields.empty()) {
field_groups.push_back(structured_fields);
}
}
JobTasks
BuildIndexJob::CreateTasks() {
engine::TargetFieldGroups target_groups;
BuildIndexTask::GroupFieldsForIndex(collection_name_, target_groups);
} // namespace
BuildIndexJob::BuildIndexJob(const engine::snapshot::ScopedSnapshotT& snapshot, engine::DBOptions options,
const engine::snapshot::IDS_TYPE& segment_ids)
: Job(JobType::BUILD), snapshot_(snapshot), options_(std::move(options)), segment_ids_(segment_ids) {
}
std::vector<TaskPtr> tasks;
void
BuildIndexJob::OnCreateTasks(JobTasks& tasks) {
std::vector<engine::TargetFields> field_groups;
WhichFieldsToBuild(snapshot_, field_groups);
for (auto& id : segment_ids_) {
for (auto& group : target_groups) {
auto task = std::make_shared<BuildIndexTask>(options_, collection_name_, id, group, nullptr);
for (auto& group : field_groups) {
auto task = std::make_shared<BuildIndexTask>(snapshot_, options_, id, group, nullptr);
task->job_ = this;
tasks.emplace_back(task);
}
}
return tasks;
}
json
......
......@@ -31,37 +31,32 @@ namespace scheduler {
class BuildIndexJob : public Job {
public:
explicit BuildIndexJob(engine::DBOptions options, const std::string& collection_name,
explicit BuildIndexJob(const engine::snapshot::ScopedSnapshotT&, engine::DBOptions options,
const engine::snapshot::IDS_TYPE& segment_ids);
~BuildIndexJob() = default;
public:
JobTasks
CreateTasks() override;
json
Dump() const override;
public:
engine::DBOptions
options() const {
return options_;
}
const std::string&
collection_name() {
return collection_name_;
}
const engine::snapshot::IDS_TYPE&
segment_ids() {
return segment_ids_;
}
protected:
void
OnCreateTasks(JobTasks& tasks) override;
private:
engine::snapshot::ScopedSnapshotT snapshot_;
engine::DBOptions options_;
std::string collection_name_;
engine::snapshot::IDS_TYPE segment_ids_;
};
......
......@@ -33,6 +33,18 @@ Job::Dump() const {
return ret;
}
JobTasks
Job::CreateTasks() {
std::unique_lock<std::mutex> lock(mutex_);
tasks_.clear();
OnCreateTasks(tasks_);
tasks_created_ = true;
if (tasks_.empty()) {
cv_.notify_all();
}
return tasks_;
}
void
Job::TaskDone(Task* task) {
if (task == nullptr) {
......@@ -52,17 +64,17 @@ Job::TaskDone(Task* task) {
auto json = task->Dump();
std::string task_desc = json.dump();
LOG_SERVER_DEBUG_ << LogOut("[%s][%ld] task %s finish", "scheduler job", id(), task_desc.c_str());
LOG_SERVER_DEBUG_ << LogOut("scheduler job [%ld] task %s finish", id(), task_desc.c_str());
}
void
Job::WaitFinish() {
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [this] { return tasks_.empty(); });
cv_.wait(lock, [this] { return tasks_created_ && tasks_.empty(); });
auto json = Dump();
std::string job_desc = json.dump();
LOG_SERVER_DEBUG_ << LogOut("[%s][%ld] %s all done", "scheduler job", id(), job_desc.c_str());
LOG_SERVER_DEBUG_ << LogOut("scheduler job [%ld] %s all done", id(), job_desc.c_str());
}
} // namespace scheduler
......
......@@ -55,8 +55,8 @@ class Job : public interface::dumpable {
json
Dump() const override;
virtual JobTasks
CreateTasks() = 0;
JobTasks
CreateTasks();
void
TaskDone(Task* task);
......@@ -72,6 +72,9 @@ class Job : public interface::dumpable {
protected:
explicit Job(JobType type);
virtual void
OnCreateTasks(JobTasks& tasks) = 0;
protected:
Status status_;
std::mutex mutex_;
......@@ -82,6 +85,7 @@ class Job : public interface::dumpable {
JobType type_;
JobTasks tasks_;
bool tasks_created_ = false;
};
using JobPtr = std::shared_ptr<Job>;
......
......@@ -21,15 +21,13 @@ SearchJob::SearchJob(const server::ContextPtr& context, engine::DBOptions option
GetSegmentsFromQuery(query_ptr, segment_ids_);
}
JobTasks
SearchJob::CreateTasks() {
std::vector<TaskPtr> tasks;
void
SearchJob::OnCreateTasks(JobTasks& tasks) {
for (auto& id : segment_ids_) {
auto task = std::make_shared<SearchTask>(context_, options_, query_ptr_, id, nullptr);
task->job_ = this;
tasks.emplace_back(task);
}
return tasks;
}
json
......
......@@ -42,13 +42,9 @@ class SearchJob : public Job {
SearchJob(const server::ContextPtr& context, engine::DBOptions options, const query::QueryPtr& query_ptr);
public:
JobTasks
CreateTasks() override;
json
Dump() const override;
public:
const server::ContextPtr&
GetContext() const {
return context_;
......@@ -74,6 +70,10 @@ class SearchJob : public Job {
return segment_ids_;
}
protected:
void
OnCreateTasks(JobTasks& tasks) override;
private:
void
GetSegmentsFromQuery(const query::QueryPtr& query_ptr, engine::snapshot::IDS_TYPE& segment_ids);
......
......@@ -23,12 +23,12 @@
namespace milvus {
namespace scheduler {
BuildIndexTask::BuildIndexTask(const engine::DBOptions& options, const std::string& collection_name,
BuildIndexTask::BuildIndexTask(const engine::snapshot::ScopedSnapshotT& snapshot, const engine::DBOptions& options,
engine::snapshot::ID_TYPE segment_id, const engine::TargetFields& target_fields,
TaskLabelPtr label)
: Task(TaskType::BuildIndexTask, std::move(label)),
snapshot_(snapshot),
options_(options),
collection_name_(collection_name),
segment_id_(segment_id),
target_fields_(target_fields) {
CreateExecEngine();
......@@ -37,13 +37,13 @@ BuildIndexTask::BuildIndexTask(const engine::DBOptions& options, const std::stri
void
BuildIndexTask::CreateExecEngine() {
if (execution_engine_ == nullptr) {
execution_engine_ = engine::EngineFactory::Build(options_.meta_.path_, collection_name_, segment_id_);
execution_engine_ = engine::EngineFactory::Build(snapshot_, options_.meta_.path_, segment_id_);
}
}
Status
BuildIndexTask::OnLoad(milvus::scheduler::LoadType type, uint8_t device_id) {
TimeRecorder rc("BuildIndexTask::Load");
TimeRecorder rc("BuildIndexTask::OnLoad");
Status stat = Status::OK();
std::string error_msg;
std::string type_str;
......@@ -61,7 +61,6 @@ BuildIndexTask::OnLoad(milvus::scheduler::LoadType type, uint8_t device_id) {
error_msg = "Wrong load type";
stat = Status(SERVER_UNEXPECTED_ERROR, error_msg);
}
fiu_do_on("XSSBuildIndexTask.Load.throw_std_exception", throw std::exception());
} catch (std::exception& ex) {
// typical error: out of disk space or permission denied
error_msg = "Failed to load to_index file: " + std::string(ex.what());
......@@ -80,7 +79,6 @@ BuildIndexTask::OnLoad(milvus::scheduler::LoadType type, uint8_t device_id) {
}
LOG_ENGINE_ERROR_ << s.message();
return s;
}
......@@ -89,7 +87,7 @@ BuildIndexTask::OnLoad(milvus::scheduler::LoadType type, uint8_t device_id) {
Status
BuildIndexTask::OnExecute() {
TimeRecorderAuto rc("XSSBuildIndexTask::Execute " + std::to_string(segment_id_));
TimeRecorderAuto rc("BuildIndexTask::OnExecute " + std::to_string(segment_id_));
if (execution_engine_ == nullptr) {
return Status(DB_ERROR, "execution engine is null");
......@@ -97,7 +95,7 @@ BuildIndexTask::OnExecute() {
auto status = execution_engine_->BuildIndex();
if (!status.ok()) {
LOG_ENGINE_ERROR_ << "Failed to create collection file: " << status.ToString();
LOG_ENGINE_ERROR_ << "Failed to build index: " << status.ToString();
execution_engine_ = nullptr;
return status;
}
......@@ -105,10 +103,5 @@ BuildIndexTask::OnExecute() {
return Status::OK();
}
void
BuildIndexTask::GroupFieldsForIndex(const std::string& collection_name, engine::TargetFieldGroups& groups) {
engine::EngineFactory::GroupFieldsForIndex(collection_name, groups);
}
} // namespace scheduler
} // namespace milvus
......@@ -24,7 +24,7 @@ namespace scheduler {
class BuildIndexTask : public Task {
public:
explicit BuildIndexTask(const engine::DBOptions& options, const std::string& collection_name,
explicit BuildIndexTask(const engine::snapshot::ScopedSnapshotT& snapshot, const engine::DBOptions& options,
engine::snapshot::ID_TYPE segment_id, const engine::TargetFields& target_fields,
TaskLabelPtr label);
......@@ -32,7 +32,6 @@ class BuildIndexTask : public Task {
Dump() const override {
json ret{
{"type", type_},
{"collection_name", collection_name_},
{"segment_id", segment_id_},
};
return ret;
......@@ -44,16 +43,13 @@ class BuildIndexTask : public Task {
Status
OnExecute() override;
static void
GroupFieldsForIndex(const std::string& collection_name, engine::TargetFieldGroups& groups);
private:
void
CreateExecEngine();
public:
const engine::DBOptions& options_;
std::string collection_name_;
engine::snapshot::ScopedSnapshotT snapshot_;
engine::DBOptions options_;
engine::snapshot::ID_TYPE segment_id_;
// structured field could not be processed with vector field in a task
......
......@@ -42,19 +42,20 @@ SearchTask::SearchTask(const server::ContextPtr& context, const engine::DBOption
void
SearchTask::CreateExecEngine() {
if (execution_engine_ == nullptr && query_ptr_ != nullptr) {
execution_engine_ = engine::EngineFactory::Build(options_.meta_.path_, query_ptr_->collection_id, segment_id_);
engine::snapshot::ScopedSnapshotT latest_ss;
engine::snapshot::Snapshots::GetInstance().GetSnapshot(latest_ss, query_ptr_->collection_id);
execution_engine_ = engine::EngineFactory::Build(latest_ss, options_.meta_.path_, segment_id_);
}
}
Status
SearchTask::OnLoad(LoadType type, uint8_t device_id) {
TimeRecorder rc(LogOut("[%s][%ld]", "search", segment_id_));
TimeRecorder rc("SearchTask::OnLoad " + std::to_string(segment_id_));
Status stat = Status::OK();
std::string error_msg;
std::string type_str;
try {
fiu_do_on("XSearchTask.Load.throw_std_exception", throw std::exception());
if (type == LoadType::DISK2CPU) {
engine::ExecutionEngineContext context;
context.query_ptr_ = query_ptr_;
......@@ -73,7 +74,7 @@ SearchTask::OnLoad(LoadType type, uint8_t device_id) {
} catch (std::exception& ex) {
// typical error: out of disk space or permition denied
error_msg = "Failed to load index file: " + std::string(ex.what());
LOG_ENGINE_ERROR_ << LogOut("[%s][%ld] Encounter exception: %s", "search", 0, error_msg.c_str());
LOG_ENGINE_ERROR_ << LogOut("Search task encounter exception: %s", error_msg.c_str());
stat = Status(SERVER_UNEXPECTED_ERROR, error_msg);
}
......
......@@ -44,10 +44,10 @@ SegmentReader::SegmentReader(const std::string& dir_root, const engine::SegmentV
Status
SegmentReader::Initialize() {
dir_root_ += engine::COLLECTIONS_FOLDER;
dir_collections_ = dir_root_ + engine::COLLECTIONS_FOLDER;
std::string directory =
engine::snapshot::GetResPath<engine::snapshot::Segment>(dir_root_, segment_visitor_->GetSegment());
engine::snapshot::GetResPath<engine::snapshot::Segment>(dir_collections_, segment_visitor_->GetSegment());
storage::IOReaderPtr reader_ptr = std::make_shared<storage::DiskIOReader>();
storage::IOWriterPtr writer_ptr = std::make_shared<storage::DiskIOWriter>();
......@@ -113,7 +113,7 @@ SegmentReader::LoadField(const std::string& field_name, std::vector<uint8_t>& ra
auto field_visitor = segment_visitor_->GetFieldVisitor(field_name);
auto raw_visitor = field_visitor->GetElementVisitor(engine::FieldElementType::FET_RAW);
std::string file_path =
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_root_, raw_visitor->GetFile());
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_collections_, raw_visitor->GetFile());
auto& ss_codec = codec::Codec::instance();
ss_codec.GetBlockFormat()->Read(fs_ptr_, file_path, raw);
......@@ -138,8 +138,8 @@ SegmentReader::LoadFields() {
if (!status.ok() || raw_data.empty()) {
auto element_visitor = iter.second->GetElementVisitor(engine::FieldElementType::FET_RAW);
std::string file_path =
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_root_, element_visitor->GetFile());
std::string file_path = engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(
dir_collections_, element_visitor->GetFile());
STATUS_CHECK(LoadField(file_path, raw_data));
}
}
......@@ -154,7 +154,7 @@ SegmentReader::LoadEntities(const std::string& field_name, const std::vector<int
auto field_visitor = segment_visitor_->GetFieldVisitor(field_name);
auto raw_visitor = field_visitor->GetElementVisitor(engine::FieldElementType::FET_RAW);
std::string file_path =
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_root_, raw_visitor->GetFile());
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_collections_, raw_visitor->GetFile());
int64_t field_width = 0;
segment_ptr_->GetFixedFieldWidth(field_name, field_width);
......@@ -263,7 +263,7 @@ SegmentReader::LoadVectorIndex(const std::string& field_name, knowhere::VecIndex
ss_codec.GetVectorIndexFormat()->ConvertRaw(fixed_data, raw_data);
} else if (auto visitor = field_visitor->GetElementVisitor(engine::FieldElementType::FET_RAW)) {
auto file_path =
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_root_, visitor->GetFile());
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_collections_, visitor->GetFile());
ss_codec.GetVectorIndexFormat()->ReadRaw(fs_ptr_, file_path, raw_data);
}
};
......@@ -295,7 +295,7 @@ SegmentReader::LoadVectorIndex(const std::string& field_name, knowhere::VecIndex
// read index file
std::string file_path =
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_root_, index_visitor->GetFile());
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_collections_, index_visitor->GetFile());
ss_codec.GetVectorIndexFormat()->ReadIndex(fs_ptr_, file_path, index_data);
auto index_name = index_visitor->GetElement()->GetName();
......@@ -310,7 +310,8 @@ SegmentReader::LoadVectorIndex(const std::string& field_name, knowhere::VecIndex
if (index_name == knowhere::IndexEnum::INDEX_FAISS_IVFSQ8NR ||
index_name == knowhere::IndexEnum::INDEX_HNSW_SQ8NM) {
if (auto visitor = field_visitor->GetElementVisitor(engine::FieldElementType::FET_COMPRESS_SQ8)) {
file_path = engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_root_, visitor->GetFile());
file_path =
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_collections_, visitor->GetFile());
ss_codec.GetVectorIndexFormat()->ReadCompress(fs_ptr_, file_path, compress_data);
}
}
......@@ -347,9 +348,9 @@ SegmentReader::LoadStructuredIndex(const std::string& field_name, knowhere::Inde
// read field index
auto index_visitor = field_visitor->GetElementVisitor(engine::FieldElementType::FET_INDEX);
if (index_visitor) {
if (index_visitor == nullptr || index_visitor->GetFile() != nullptr) {
std::string file_path =
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_root_, index_visitor->GetFile());
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_collections_, index_visitor->GetFile());
ss_codec.GetStructuredIndexFormat()->Read(fs_ptr_, file_path, index_ptr);
segment_ptr_->SetStructuredIndex(field_name, index_ptr);
......@@ -376,7 +377,7 @@ SegmentReader::LoadVectorIndice() {
}
std::string file_path =
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_root_, element_visitor->GetFile());
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_collections_, element_visitor->GetFile());
if (engine::IsVectorField(field)) {
knowhere::VecIndexPtr index_ptr;
STATUS_CHECK(LoadVectorIndex(name, index_ptr));
......@@ -400,7 +401,7 @@ SegmentReader::LoadBloomFilter(segment::IdBloomFilterPtr& id_bloom_filter_ptr) {
auto uid_field_visitor = segment_visitor_->GetFieldVisitor(engine::DEFAULT_UID_NAME);
auto visitor = uid_field_visitor->GetElementVisitor(engine::FieldElementType::FET_BLOOM_FILTER);
std::string file_path =
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_root_, visitor->GetFile());
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_collections_, visitor->GetFile());
auto& ss_codec = codec::Codec::instance();
ss_codec.GetIdBloomFilterFormat()->Read(fs_ptr_, file_path, id_bloom_filter_ptr);
......@@ -427,7 +428,7 @@ SegmentReader::LoadDeletedDocs(segment::DeletedDocsPtr& deleted_docs_ptr) {
auto uid_field_visitor = segment_visitor_->GetFieldVisitor(engine::DEFAULT_UID_NAME);
auto visitor = uid_field_visitor->GetElementVisitor(engine::FieldElementType::FET_DELETED_DOCS);
std::string file_path =
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_root_, visitor->GetFile());
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_collections_, visitor->GetFile());
if (!boost::filesystem::exists(file_path)) {
return Status::OK(); // file doesn't exist
}
......@@ -459,7 +460,7 @@ SegmentReader::ReadDeletedDocsSize(size_t& size) {
auto uid_field_visitor = segment_visitor_->GetFieldVisitor(engine::DEFAULT_UID_NAME);
auto visitor = uid_field_visitor->GetElementVisitor(engine::FieldElementType::FET_DELETED_DOCS);
std::string file_path =
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_root_, visitor->GetFile());
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_collections_, visitor->GetFile());
if (!boost::filesystem::exists(file_path)) {
return Status::OK(); // file doesn't exist
}
......@@ -496,7 +497,7 @@ SegmentReader::GetSegmentID(int64_t& id) {
std::string
SegmentReader::GetSegmentPath() {
std::string seg_path =
engine::snapshot::GetResPath<engine::snapshot::Segment>(dir_root_, segment_visitor_->GetSegment());
engine::snapshot::GetResPath<engine::snapshot::Segment>(dir_collections_, segment_visitor_->GetSegment());
return seg_path;
}
......
......@@ -84,6 +84,11 @@ class SegmentReader {
return dir_root_;
}
std::string
GetCollectionsPath() const {
return dir_collections_;
}
engine::SegmentVisitorPtr
GetSegmentVisitor() const {
return segment_visitor_;
......@@ -97,7 +102,9 @@ class SegmentReader {
engine::SegmentVisitorPtr segment_visitor_;
storage::FSHandlerPtr fs_ptr_;
engine::SegmentPtr segment_ptr_;
std::string dir_root_;
std::string dir_collections_;
};
using SegmentReaderPtr = std::shared_ptr<SegmentReader>;
......
......@@ -44,10 +44,10 @@ SegmentWriter::SegmentWriter(const std::string& dir_root, const engine::SegmentV
Status
SegmentWriter::Initialize() {
dir_root_ += engine::COLLECTIONS_FOLDER;
dir_collections_ = dir_root_ + engine::COLLECTIONS_FOLDER;
std::string directory =
engine::snapshot::GetResPath<engine::snapshot::Segment>(dir_root_, segment_visitor_->GetSegment());
engine::snapshot::GetResPath<engine::snapshot::Segment>(dir_collections_, segment_visitor_->GetSegment());
storage::IOReaderPtr reader_ptr = std::make_shared<storage::DiskIOReader>();
storage::IOWriterPtr writer_ptr = std::make_shared<storage::DiskIOWriter>();
......@@ -136,7 +136,7 @@ SegmentWriter::WriteFields() {
auto element_visitor = iter.second->GetElementVisitor(engine::FieldElementType::FET_RAW);
std::string file_path =
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_root_, element_visitor->GetFile());
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_collections_, element_visitor->GetFile());
STATUS_CHECK(WriteField(file_path, raw_data));
}
......@@ -158,7 +158,7 @@ SegmentWriter::WriteBloomFilter() {
auto uid_field_visitor = segment_visitor_->GetFieldVisitor(engine::DEFAULT_UID_NAME);
auto uid_blf_visitor = uid_field_visitor->GetElementVisitor(engine::FieldElementType::FET_BLOOM_FILTER);
std::string uid_blf_path =
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_root_, uid_blf_visitor->GetFile());
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_collections_, uid_blf_visitor->GetFile());
auto& ss_codec = codec::Codec::instance();
segment::IdBloomFilterPtr bloom_filter_ptr;
......@@ -212,7 +212,7 @@ SegmentWriter::WriteDeletedDocs() {
auto uid_field_visitor = segment_visitor_->GetFieldVisitor(engine::DEFAULT_UID_NAME);
auto del_doc_visitor = uid_field_visitor->GetElementVisitor(engine::FieldElementType::FET_DELETED_DOCS);
std::string file_path =
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_root_, del_doc_visitor->GetFile());
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_collections_, del_doc_visitor->GetFile());
return WriteDeletedDocs(file_path, segment_ptr_->GetDeletedDocs());
}
......@@ -344,13 +344,13 @@ SegmentWriter::WriteVectorIndex(const std::string& field_name) {
fs_ptr_->operation_ptr_->CreateDirectory();
std::string file_path =
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_root_, element_visitor->GetFile());
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_collections_, element_visitor->GetFile());
ss_codec.GetVectorIndexFormat()->WriteIndex(fs_ptr_, file_path, index);
element_visitor = field->GetElementVisitor(engine::FieldElementType::FET_COMPRESS_SQ8);
if (element_visitor != nullptr) {
file_path =
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_root_, element_visitor->GetFile());
file_path = engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_collections_,
element_visitor->GetFile());
ss_codec.GetVectorIndexFormat()->WriteCompress(fs_ptr_, file_path, index);
}
} catch (std::exception& e) {
......@@ -396,7 +396,7 @@ SegmentWriter::WriteStructuredIndex(const std::string& field_name) {
segment_ptr_->GetFieldType(field_name, field_type);
std::string file_path =
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_root_, element_visitor->GetFile());
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_collections_, element_visitor->GetFile());
ss_codec.GetStructuredIndexFormat()->Write(fs_ptr_, file_path, field_type, index);
} catch (std::exception& e) {
std::string err_msg = "Failed to write vector index: " + std::string(e.what());
......@@ -431,7 +431,7 @@ SegmentWriter::GetSegmentID(int64_t& id) {
std::string
SegmentWriter::GetSegmentPath() {
std::string seg_path =
engine::snapshot::GetResPath<engine::snapshot::Segment>(dir_root_, segment_visitor_->GetSegment());
engine::snapshot::GetResPath<engine::snapshot::Segment>(dir_collections_, segment_visitor_->GetSegment());
return seg_path;
}
......
......@@ -81,6 +81,16 @@ class SegmentWriter {
std::string
GetSegmentPath();
std::string
GetRootPath() const {
return dir_root_;
}
engine::SegmentVisitorPtr
GetSegmentVisitor() const {
return segment_visitor_;
}
private:
Status
Initialize();
......@@ -101,7 +111,9 @@ class SegmentWriter {
engine::SegmentVisitorPtr segment_visitor_;
storage::FSHandlerPtr fs_ptr_;
engine::SegmentPtr segment_ptr_;
std::string dir_root_;
std::string dir_collections_;
};
using SegmentWriterPtr = std::shared_ptr<SegmentWriter>;
......
......@@ -493,10 +493,23 @@ TEST_F(DBTest, IndexTest) {
status = db_->Flush();
ASSERT_TRUE(status.ok());
// milvus::engine::CollectionIndex index;
// index.index_name_ = "IVFLAT";
// index.metric_name_ = "L2";
// index.extra_params_["nlist"] = 2048;
// status = db_->CreateIndex(dummy_context_, collection_name, "vector", index);
// ASSERT_TRUE(status.ok());
{
milvus::engine::CollectionIndex index;
index.index_name_ = milvus::knowhere::IndexEnum::INDEX_FAISS_IVFFLAT;
index.metric_name_ = milvus::knowhere::Metric::L2;
index.extra_params_["nlist"] = 2048;
status = db_->CreateIndex(dummy_context_, collection_name, "vector", index);
ASSERT_TRUE(status.ok());
}
{
// milvus::engine::CollectionIndex index;
// index.index_name_ = "SORTED";
// status = db_->CreateIndex(dummy_context_, collection_name, "field_0", index);
// ASSERT_TRUE(status.ok());
// status = db_->CreateIndex(dummy_context_, collection_name, "field_1", index);
// ASSERT_TRUE(status.ok());
// status = db_->CreateIndex(dummy_context_, collection_name, "field_2", index);
// ASSERT_TRUE(status.ok());
}
}
......@@ -250,6 +250,8 @@ DBTest::SetUp() {
void
DBTest::TearDown() {
db_ = nullptr; // db must be stopped before JobMgr and Snapshot
milvus::scheduler::JobMgrInst::GetInstance()->Stop();
milvus::scheduler::SchedInst::GetInstance()->Stop();
milvus::scheduler::CPUBuilderInst::GetInstance()->Stop();
......@@ -257,7 +259,6 @@ DBTest::TearDown() {
milvus::scheduler::ResMgrInst::GetInstance()->Clear();
BaseTest::SnapshotStop();
db_ = nullptr;
auto options = GetOptions();
boost::filesystem::remove_all(options.meta_.path_);
......@@ -330,13 +331,14 @@ SchedulerTest::SetUp() {
void
SchedulerTest::TearDown() {
db_ = nullptr; // db must be stopped before JobMgr and Snapshot
milvus::scheduler::JobMgrInst::GetInstance()->Stop();
milvus::scheduler::SchedInst::GetInstance()->Stop();
milvus::scheduler::CPUBuilderInst::GetInstance()->Stop();
milvus::scheduler::ResMgrInst::GetInstance()->Stop();
milvus::scheduler::ResMgrInst::GetInstance()->Clear();
db_ = nullptr;
BaseTest::SnapshotStop();
BaseTest::TearDown();
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册