未验证 提交 45ecc9ae 编写于 作者: G groot 提交者: GitHub

return build index error message to client (#3895)

* return build index error message to client
Signed-off-by: Ngroot <yihua.mo@zilliz.com>

* typo
Signed-off-by: Ngroot <yihua.mo@zilliz.com>

* refne build index relate code
Signed-off-by: Ngroot <yihua.mo@zilliz.com>

* typo
Signed-off-by: Ngroot <yihua.mo@zilliz.com>
上级 6a517fff
......@@ -66,7 +66,11 @@ static const Status SHUTDOWN_ERROR = Status(DB_ERROR, "Milvus server is shutdown
}
DBImpl::DBImpl(const DBOptions& options)
: options_(options), initialized_(false), merge_thread_pool_(1, 1), index_thread_pool_(1, 1) {
: options_(options),
initialized_(false),
merge_thread_pool_(1, 1),
index_thread_pool_(1, 1),
index_task_tracker_(3) {
mem_mgr_ = MemManagerFactory::Build(options_);
merge_mgr_ptr_ = MergeManagerFactory::SSBuild(options_);
......@@ -206,7 +210,7 @@ DBImpl::DropCollection(const std::string& collection_name) {
ClearCollectionCache(ss, options_.meta_.path_);
// clear index failed retry map of this collection
ClearIndexFailedRecord(collection_name);
index_task_tracker_.ClearFailedRecords(collection_name);
return snapshots.DropCollection(ss->GetCollectionId(), std::numeric_limits<snapshot::LSN_TYPE>::max());
}
......@@ -355,6 +359,7 @@ DBImpl::CreateIndex(const std::shared_ptr<server::Context>& context, const std::
if (!utils::IsSameIndex(old_index, new_index)) {
DropIndex(collection_name, field_name);
WaitMergeFileFinish(); // let merge file thread finish since DropIndex start a merge task
// create field element for new index
......@@ -364,6 +369,10 @@ DBImpl::CreateIndex(const std::shared_ptr<server::Context>& context, const std::
}
}
if (engine::utils::IsFlatIndexType(index.index_type_)) {
return Status::OK(); // for IDMAP type, no need to create index
}
// step 3: merge segments before create index, since there could be some small segments just flushed
{
snapshot::ScopedSnapshotT latest_ss;
......@@ -374,7 +383,7 @@ DBImpl::CreateIndex(const std::shared_ptr<server::Context>& context, const std::
}
// clear index failed retry map of this collection
ClearIndexFailedRecord(collection_name);
index_task_tracker_.ClearFailedRecords(collection_name);
// step 4: iterate segments need to be build index, wait until all segments are built
while (true) {
......@@ -390,7 +399,7 @@ DBImpl::CreateIndex(const std::shared_ptr<server::Context>& context, const std::
break; // all segments build index finished
}
IgnoreIndexFailedSegments(collection_name, segment_ids);
index_task_tracker_.IgnoreFailedSegments(collection_name, segment_ids);
if (segment_ids.empty()) {
break; // some segments failed to build index, and ignored
}
......@@ -404,6 +413,20 @@ DBImpl::CreateIndex(const std::shared_ptr<server::Context>& context, const std::
}
}
// step 5: return error message to client if any segment failed to build index
SegmentFailedMap failed_map;
index_task_tracker_.GetFailedRecords(collection_name, failed_map);
if (!failed_map.empty()) {
auto pair = failed_map.begin();
std::string msg =
"Failed to build segment " + std::to_string(pair->first) + " for collection " + collection_name;
msg += ", reason: ";
msg += pair->second.message();
LOG_ENGINE_ERROR_ << msg;
return Status(DB_ERROR, msg);
}
return Status::OK();
}
......@@ -1044,7 +1067,7 @@ DBImpl::BackgroundBuildIndexTask(std::vector<std::string> collection_names, bool
}
// check index retry times
IgnoreIndexFailedSegments(collection_name, segment_ids);
index_task_tracker_.IgnoreFailedSegments(collection_name, segment_ids);
if (segment_ids.empty()) {
continue;
}
......@@ -1062,8 +1085,8 @@ DBImpl::BackgroundBuildIndexTask(std::vector<std::string> collection_names, bool
cache::CpuCacheMgr::GetInstance().PrintInfo(); // print cache info after build index
// record failed segments, avoid build index hang
snapshot::IDS_TYPE& failed_ids = job->FailedSegments();
MarkIndexFailedSegments(collection_name, failed_ids);
auto failed_segments = job->GetFailedSegments();
index_task_tracker_.MarkFailedSegments(collection_name, failed_segments);
if (!job->status().ok()) {
LOG_ENGINE_ERROR_ << job->status().message();
......@@ -1205,33 +1228,5 @@ DBImpl::ConfigUpdate(const std::string& name) {
}
}
void
DBImpl::MarkIndexFailedSegments(const std::string& collection_name, const snapshot::IDS_TYPE& failed_ids) {
std::lock_guard<std::mutex> lock(index_retry_mutex_);
SegmentIndexRetryMap& retry_map = index_retry_map_[collection_name];
for (auto& id : failed_ids) {
retry_map[id]++;
}
}
void
DBImpl::IgnoreIndexFailedSegments(const std::string& collection_name, snapshot::IDS_TYPE& segment_ids) {
std::lock_guard<std::mutex> lock(index_retry_mutex_);
SegmentIndexRetryMap& retry_map = index_retry_map_[collection_name];
snapshot::IDS_TYPE segment_ids_to_build;
for (auto id : segment_ids) {
if (retry_map[id] < BUILD_INEDX_RETRY_TIMES) {
segment_ids_to_build.push_back(id);
}
}
segment_ids.swap(segment_ids_to_build);
}
void
DBImpl::ClearIndexFailedRecord(const std::string& collection_name) {
std::lock_guard<std::mutex> lock(index_retry_mutex_);
index_retry_map_.erase(collection_name);
}
} // namespace engine
} // namespace milvus
......@@ -22,6 +22,7 @@
#include <vector>
#include "db/DB.h"
#include "db/SegmentTaskTracker.h"
#include "config/ConfigMgr.h"
#include "utils/ThreadPool.h"
......@@ -172,15 +173,6 @@ class DBImpl : public DB, public ConfigObserver {
void
DecreaseLiveBuildTaskNum();
void
MarkIndexFailedSegments(const std::string& collection_name, const snapshot::IDS_TYPE& failed_ids);
void
IgnoreIndexFailedSegments(const std::string& collection_name, snapshot::IDS_TYPE& segment_ids);
void
ClearIndexFailedRecord(const std::string& collection_name);
private:
DBOptions options_;
std::atomic<bool> initialized_;
......@@ -207,10 +199,7 @@ class DBImpl : public DB, public ConfigObserver {
std::mutex index_result_mutex_;
std::list<std::future<void>> index_thread_results_;
using SegmentIndexRetryMap = std::unordered_map<snapshot::ID_TYPE, int64_t>;
using CollectionIndexRetryMap = std::unordered_map<std::string, SegmentIndexRetryMap>;
CollectionIndexRetryMap index_retry_map_;
std::mutex index_retry_mutex_;
SegmentTaskTracker index_task_tracker_;
std::mutex build_index_mutex_;
......
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "db/SegmentTaskTracker.h"
#include <utility>
namespace milvus {
namespace engine {
SegmentTaskTracker::SegmentTaskTracker(int64_t retry_times) : retry_times_(retry_times) {
if (retry_times_ <= 0) {
retry_times_ = 1;
}
}
void
SegmentTaskTracker::MarkFailedSegment(const std::string& collection_name, int64_t segment_id, const Status& status) {
std::lock_guard<std::mutex> lock(retry_map_mutex_);
SegmentRetryMap& retry_map = retry_map_[collection_name];
FailedRecord& record = retry_map[segment_id];
++record.failed_count_;
record.error_status = status;
}
void
SegmentTaskTracker::MarkFailedSegments(const std::string& collection_name, const SegmentFailedMap& failed_segments) {
std::lock_guard<std::mutex> lock(retry_map_mutex_);
SegmentRetryMap& retry_map = retry_map_[collection_name];
for (auto& pair : failed_segments) {
FailedRecord& record = retry_map[pair.first];
++record.failed_count_;
record.error_status = pair.second;
}
}
void
SegmentTaskTracker::IgnoreFailedSegments(const std::string& collection_name, std::vector<int64_t>& segment_ids) {
std::lock_guard<std::mutex> lock(retry_map_mutex_);
SegmentRetryMap& retry_map = retry_map_[collection_name];
if (retry_map.empty()) {
return;
}
std::vector<int64_t> temp_segment_ids;
for (auto id : segment_ids) {
if (retry_map.find(id) == retry_map.end()) {
temp_segment_ids.push_back(id);
continue;
}
FailedRecord& record = retry_map[id];
if (record.failed_count_ < retry_times_) {
temp_segment_ids.push_back(id);
}
}
segment_ids.swap(temp_segment_ids);
}
void
SegmentTaskTracker::ClearFailedRecords(const std::string& collection_name) {
std::lock_guard<std::mutex> lock(retry_map_mutex_);
retry_map_.erase(collection_name);
}
void
SegmentTaskTracker::GetFailedRecords(const std::string& collection_name, SegmentFailedMap& failed_map) {
std::lock_guard<std::mutex> lock(retry_map_mutex_);
SegmentRetryMap& retry_map = retry_map_[collection_name];
for (auto& pair : retry_map) {
FailedRecord& record = pair.second;
failed_map.insert(std::make_pair(pair.first, record.error_status));
}
}
} // namespace engine
} // namespace milvus
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include <map>
#include <memory>
#include <mutex>
#include <string>
#include <unordered_map>
#include <vector>
#include "utils/Status.h"
namespace milvus {
namespace engine {
using SegmentFailedMap = std::unordered_map<int64_t, Status>;
// this class is to track segment tasks status such as build index and compact, for these purpose:
// record failed segment id and error status
// ignore failed segments in case of necessery
// return error status to client
class SegmentTaskTracker {
public:
explicit SegmentTaskTracker(int64_t retry_times);
void
MarkFailedSegment(const std::string& collection_name, int64_t segment_id, const Status& status);
void
MarkFailedSegments(const std::string& collection_name, const SegmentFailedMap& failed_segments);
void
IgnoreFailedSegments(const std::string& collection_name, std::vector<int64_t>& segment_ids);
void
ClearFailedRecords(const std::string& collection_name);
void
GetFailedRecords(const std::string& collection_name, SegmentFailedMap& failed_map);
private:
int64_t retry_times_ = 3;
struct FailedRecord {
int64_t failed_count_ = 0;
Status error_status;
};
using SegmentRetryMap = std::unordered_map<int64_t, FailedRecord>;
using CollectionRetryMap = std::unordered_map<std::string, SegmentRetryMap>;
CollectionRetryMap retry_map_;
std::mutex retry_map_mutex_;
};
} // namespace engine
} // namespace milvus
......@@ -14,6 +14,7 @@
#include "db/SnapshotUtils.h"
#include "db/SnapshotVisitor.h"
#include "db/Types.h"
#include "db/Utils.h"
#include "db/snapshot/ResourceHelper.h"
#include "db/snapshot/Resources.h"
#include "db/snapshot/Snapshot.h"
......@@ -51,19 +52,16 @@ SegmentsToIndexCollector::Handle(const snapshot::SegmentCommitPtr& segment_commi
auto segment_visitor = engine::SegmentVisitor::Build(ss_, segment_commit->GetSegmentId());
if (field_name_.empty()) {
auto field_visitors = segment_visitor->GetFieldVisitors();
auto& field_visitors = segment_visitor->GetFieldVisitors();
for (auto& pair : field_visitors) {
auto& field_visitor = pair.second;
auto element_visitor = field_visitor->GetElementVisitor(engine::FieldElementType::FET_INDEX);
if (element_visitor != nullptr && element_visitor->GetFile() == nullptr) {
if (FieldRequireBuildIndex(pair.second)) {
segment_ids_.push_back(segment_commit->GetSegmentId());
break;
}
}
} else {
auto field_visitor = segment_visitor->GetFieldVisitor(field_name_);
auto element_visitor = field_visitor->GetElementVisitor(engine::FieldElementType::FET_INDEX);
if (element_visitor != nullptr && element_visitor->GetFile() == nullptr) {
if (FieldRequireBuildIndex(field_visitor)) {
segment_ids_.push_back(segment_commit->GetSegmentId());
}
}
......@@ -85,7 +83,7 @@ SegmentsToMergeCollector::Handle(const snapshot::SegmentCommitPtr& segment_commi
// if any field has build index, don't merge this segment
auto segment_visitor = engine::SegmentVisitor::Build(ss_, segment_commit->GetSegmentId());
auto field_visitors = segment_visitor->GetFieldVisitors();
auto& field_visitors = segment_visitor->GetFieldVisitors();
bool has_index = false;
for (auto& kv : field_visitors) {
auto element_visitor = kv.second->GetElementVisitor(engine::FieldElementType::FET_INDEX);
......@@ -248,7 +246,7 @@ LoadCollectionHandler::Handle(const snapshot::SegmentPtr& segment) {
segment_ptr->GetFieldType(field_name, ftype);
knowhere::IndexPtr index_ptr;
if (IsVectorType(ftype)) {
if (utils::IsVectorType(ftype)) {
knowhere::VecIndexPtr vec_index_ptr;
segment_reader->LoadVectorIndex(field_name, vec_index_ptr);
index_ptr = vec_index_ptr;
......
......@@ -158,12 +158,7 @@ IsVectorField(const engine::snapshot::FieldPtr& field) {
}
auto ftype = static_cast<engine::DataType>(field->GetFtype());
return IsVectorType(ftype);
}
bool
IsVectorType(engine::DataType type) {
return type == engine::DataType::VECTOR_FLOAT || type == engine::DataType::VECTOR_BINARY;
return utils::IsVectorType(ftype);
}
bool
......@@ -173,12 +168,7 @@ IsBinaryVectorField(const engine::snapshot::FieldPtr& field) {
}
auto ftype = static_cast<engine::DataType>(field->GetFtype());
return IsBinaryVectorType(ftype);
}
bool
IsBinaryVectorType(engine::DataType type) {
return type == engine::DataType::VECTOR_BINARY;
return utils::IsBinaryVectorType(ftype);
}
Status
......@@ -382,5 +372,22 @@ DropSegment(snapshot::ScopedSnapshotT& ss, snapshot::ID_TYPE segment_id) {
return drop_op->Push();
}
bool
FieldRequireBuildIndex(const engine::SegmentFieldVisitorPtr& field_visitor) {
auto element_visitor = field_visitor->GetElementVisitor(engine::FieldElementType::FET_INDEX);
if (element_visitor == nullptr) {
return false; // index undefined
}
if (element_visitor->GetFile() != nullptr) {
return false; // index already build
}
auto element = element_visitor->GetElement();
if (utils::IsFlatIndexType(element->GetTypeName())) {
return false; // no need to build IDMAP
}
return true;
}
} // namespace engine
} // namespace milvus
......@@ -11,6 +11,7 @@
#pragma once
#include "db/SnapshotVisitor.h"
#include "db/Types.h"
#include "db/snapshot/Resources.h"
#include "db/snapshot/Snapshot.h"
......@@ -47,15 +48,9 @@ DeleteSnapshotIndex(const std::string& collection_name, const std::string& field
bool
IsVectorField(const engine::snapshot::FieldPtr& field);
bool
IsVectorType(engine::DataType type);
bool
IsBinaryVectorField(const engine::snapshot::FieldPtr& field);
bool
IsBinaryVectorType(engine::DataType type);
Status
GetSnapshotInfo(const std::string& collection_name, milvus::json& json_info);
......@@ -80,5 +75,8 @@ ClearIndexCache(snapshot::ScopedSnapshotT& ss, const std::string& dir_root, cons
Status
DropSegment(snapshot::ScopedSnapshotT& ss, snapshot::ID_TYPE segment_id);
bool
FieldRequireBuildIndex(const engine::SegmentFieldVisitorPtr& field_visitor);
} // namespace engine
} // namespace milvus
......@@ -27,8 +27,11 @@
#include "db/Types.h"
#ifdef MILVUS_GPU_VERSION
#include "cache/GpuCacheMgr.h"
#endif
#include "config/ServerConfig.h"
//#include "storage/s3/S3ClientWrapper.h"
#include "knowhere/index/vector_index/helpers/IndexParameter.h"
......@@ -62,6 +65,22 @@ IsBinaryMetricType(const std::string& metric_type) {
(metric_type == knowhere::Metric::TANIMOTO);
}
bool
IsFlatIndexType(const std::string& index_type) {
return (index_type == knowhere::IndexEnum::INDEX_FAISS_IDMAP ||
index_type == knowhere::IndexEnum::INDEX_FAISS_BIN_IDMAP);
}
bool
IsVectorType(engine::DataType type) {
return type == engine::DataType::VECTOR_FLOAT || type == engine::DataType::VECTOR_BINARY;
}
bool
IsBinaryVectorType(engine::DataType type) {
return type == engine::DataType::VECTOR_BINARY;
}
engine::date_t
GetDate(const std::time_t& t, int day_delta) {
struct tm ltm;
......
......@@ -31,6 +31,15 @@ IsSameIndex(const CollectionIndex& index1, const CollectionIndex& index2);
bool
IsBinaryMetricType(const std::string& metric_type);
bool
IsFlatIndexType(const std::string& index_type);
bool
IsVectorType(engine::DataType type);
bool
IsBinaryVectorType(engine::DataType type);
engine::date_t
GetDate(const std::time_t& t, int day_delta = 0);
engine::date_t
......
......@@ -307,7 +307,7 @@ ExecutionEngineImpl::Search(ExecutionEngineContext& context) {
std::unordered_map<std::string, engine::DataType> attr_type;
auto segment_visitor = segment_reader_->GetSegmentVisitor();
auto field_visitors = segment_visitor->GetFieldVisitors();
auto& field_visitors = segment_visitor->GetFieldVisitors();
for (const auto& name : context.query_ptr_->index_fields) {
auto field_visitor = segment_visitor->GetFieldVisitor(name);
if (!field_visitor) {
......
......@@ -10,12 +10,13 @@
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "scheduler/job/BuildIndexJob.h"
#include "db/SnapshotUtils.h"
#include "db/Utils.h"
#include "scheduler/task/BuildIndexTask.h"
#include "utils/Log.h"
#include <utility>
#include "utils/Log.h"
namespace milvus {
namespace scheduler {
......@@ -25,30 +26,23 @@ namespace {
void
WhichFieldsToBuild(const engine::snapshot::ScopedSnapshotT& snapshot, engine::snapshot::ID_TYPE segment_id,
std::vector<engine::TargetFields>& field_groups) {
auto field_names = snapshot->GetFieldNames();
engine::TargetFields structured_fields;
for (auto& field_name : field_names) {
auto field = snapshot->GetField(field_name);
auto ftype = static_cast<engine::DataType>(field->GetFtype());
bool is_vector = (ftype == engine::DataType::VECTOR_FLOAT || ftype == engine::DataType::VECTOR_BINARY);
auto elements = snapshot->GetFieldElementsByField(field_name);
for (auto& element : elements) {
if (element->GetFEtype() != engine::FieldElementType::FET_INDEX) {
continue; // only check index element
}
auto element_file = snapshot->GetSegmentFile(segment_id, element->GetID());
if (element_file != nullptr) {
continue; // index file has been created, no need to build index for this field
}
auto segment_visitor = engine::SegmentVisitor::Build(snapshot, segment_id);
auto& field_visitors = segment_visitor->GetFieldVisitors();
for (auto& pair : field_visitors) {
auto& field_visitor = pair.second;
if (!FieldRequireBuildIndex(field_visitor)) {
continue;
}
// index has been defined, but index file not yet created, this field need to be build index
if (is_vector) {
engine::TargetFields fields = {field_name};
field_groups.emplace_back(fields);
} else {
structured_fields.insert(field_name);
}
// index has been defined, but index file not yet created, this field need to be build index
auto& field = field_visitor->GetField();
bool is_vector = engine::utils::IsVectorType(field->GetFtype());
if (is_vector) {
engine::TargetFields fields = {field->GetName()};
field_groups.emplace_back(fields);
} else {
structured_fields.insert(field->GetName());
}
}
......@@ -86,5 +80,18 @@ BuildIndexJob::Dump() const {
ret.insert(base.begin(), base.end());
return ret;
}
void
BuildIndexJob::MarkFailedSegment(engine::snapshot::ID_TYPE segment_id, const Status& status) {
std::lock_guard<std::mutex> lock(failed_segments_mutex_);
failed_segments_[segment_id] = status;
}
SegmentFailedMap
BuildIndexJob::GetFailedSegments() {
std::lock_guard<std::mutex> lock(failed_segments_mutex_);
return failed_segments_;
}
} // namespace scheduler
} // namespace milvus
......@@ -29,6 +29,8 @@
namespace milvus {
namespace scheduler {
using SegmentFailedMap = std::unordered_map<int64_t, Status>;
class BuildIndexJob : public Job {
public:
explicit BuildIndexJob(const engine::snapshot::ScopedSnapshotT&, engine::DBOptions options,
......@@ -45,10 +47,11 @@ class BuildIndexJob : public Job {
return options_;
}
engine::snapshot::IDS_TYPE&
FailedSegments() {
return failed_segment_ids_;
}
void
MarkFailedSegment(engine::snapshot::ID_TYPE segment_id, const Status& status);
SegmentFailedMap
GetFailedSegments();
protected:
void
......@@ -58,7 +61,9 @@ class BuildIndexJob : public Job {
engine::snapshot::ScopedSnapshotT snapshot_;
engine::DBOptions options_;
engine::snapshot::IDS_TYPE segment_ids_;
engine::snapshot::IDS_TYPE failed_segment_ids_;
SegmentFailedMap failed_segments_;
std::mutex failed_segments_mutex_;
};
using BuildIndexJobPtr = std::shared_ptr<BuildIndexJob>;
......
......@@ -83,7 +83,7 @@ BuildIndexTask::OnLoad(milvus::scheduler::LoadType type, uint8_t device_id) {
LOG_ENGINE_ERROR_ << s.message();
auto build_job = static_cast<scheduler::BuildIndexJob*>(job_);
build_job->FailedSegments().push_back(segment_id_);
build_job->MarkFailedSegment(segment_id_, stat);
return s;
}
......@@ -111,7 +111,7 @@ BuildIndexTask::OnExecute() {
execution_engine_ = nullptr;
auto build_job = static_cast<scheduler::BuildIndexJob*>(job_);
build_job->FailedSegments().push_back(segment_id_);
build_job->MarkFailedSegment(segment_id_, status);
return status;
}
......
......@@ -59,7 +59,7 @@ SegmentReader::Initialize() {
segment_ptr_ = std::make_shared<engine::Segment>();
const engine::SegmentVisitor::IdMapT& field_map = segment_visitor_->GetFieldVisitors();
auto& field_map = segment_visitor_->GetFieldVisitors();
for (auto& iter : field_map) {
const engine::snapshot::FieldPtr& field = iter.second->GetField();
std::string name = field->GetName();
......@@ -673,7 +673,7 @@ SegmentReader::ClearCache() {
return Status::OK();
}
const engine::SegmentVisitor::IdMapT& field_visitors = segment_visitor_->GetFieldVisitors();
auto& field_visitors = segment_visitor_->GetFieldVisitors();
auto segment = segment_visitor_->GetSegment();
if (segment == nullptr) {
return Status::OK();
......@@ -726,7 +726,7 @@ SegmentReader::ClearIndexCache(const std::string& field_name) {
}
if (field_name.empty()) {
const engine::SegmentVisitor::IdMapT& field_visitors = segment_visitor_->GetFieldVisitors();
auto& field_visitors = segment_visitor_->GetFieldVisitors();
for (auto& pair : field_visitors) {
auto& field_visitor = pair.second;
ClearFieldIndexCache(field_visitor);
......
......@@ -56,7 +56,7 @@ SegmentWriter::Initialize() {
segment_ptr_ = std::make_shared<engine::Segment>();
const engine::SegmentVisitor::IdMapT& field_map = segment_visitor_->GetFieldVisitors();
auto& field_map = segment_visitor_->GetFieldVisitors();
for (auto& iter : field_map) {
const engine::snapshot::FieldPtr& field = iter.second->GetField();
STATUS_CHECK(segment_ptr_->AddField(field));
......
......@@ -255,8 +255,7 @@ ValidateDimension(int64_t dim, bool is_binary) {
Status
ValidateIndexParams(const milvus::json& index_params, int64_t dimension, const std::string& index_type) {
if (index_type == knowhere::IndexEnum::INDEX_FAISS_IDMAP ||
index_type == knowhere::IndexEnum::INDEX_FAISS_BIN_IDMAP) {
if (engine::utils::IsFlatIndexType(index_type)) {
return Status::OK();
} else if (index_type == knowhere::IndexEnum::INDEX_FAISS_IVFFLAT ||
index_type == knowhere::IndexEnum::INDEX_FAISS_IVFSQ8 ||
......@@ -396,8 +395,7 @@ ValidateSegmentRowCount(int64_t segment_row_count) {
Status
ValidateIndexMetricType(const std::string& metric_type, const std::string& index_type) {
if (index_type == knowhere::IndexEnum::INDEX_FAISS_IDMAP ||
index_type == knowhere::IndexEnum::INDEX_FAISS_BIN_IDMAP) {
if (engine::utils::IsFlatIndexType(index_type)) {
// pass
} else if (index_type == knowhere::IndexEnum::INDEX_FAISS_BIN_IVFFLAT) {
// binary
......
......@@ -27,6 +27,7 @@
#include "db/snapshot/IterateHandler.h"
#include "db/snapshot/InActiveResourcesGCEvent.h"
#include "db/snapshot/ResourceHelper.h"
#include "db/SegmentTaskTracker.h"
#include "db/utils.h"
#include "knowhere/index/vector_index/helpers/IndexParameter.h"
#include "segment/Segment.h"
......@@ -449,7 +450,7 @@ TEST_F(DBTest, VisitorTest) {
ASSERT_FALSE(visitor->GetSegment()->IsActive());
int file_num = 0;
auto field_visitors = visitor->GetFieldVisitors();
auto& field_visitors = visitor->GetFieldVisitors();
for (auto& kv : field_visitors) {
auto& field_visitor = kv.second;
auto field_element_visitors = field_visitor->GetElementVistors();
......@@ -1576,3 +1577,60 @@ TEST_F(DBTest, LoadTest) {
entity_count * (COLLECTION_DIM * sizeof(float) + sizeof(int32_t) + sizeof(int64_t) + sizeof(double)) * 2;
ASSERT_GE(cache_mgr.CacheUsage(), total_size);
}
TEST(SegmentTaskTrackerTest, TrackerTest) {
std::string collection_name = "tracker";
milvus::engine::SegmentTaskTracker tracker_1(0);
milvus::engine::SegmentTaskTracker tracker_2(3);
auto mark_failed_segments = [&]() -> void {
milvus::Status status(100, "illegal");
tracker_1.MarkFailedSegment(collection_name, 1, status);
tracker_2.MarkFailedSegment(collection_name, 1, status);
milvus::engine::SegmentFailedMap failed_map = {
{2, milvus::Status(200, "200")},
{3, milvus::Status(300, "300")},
};
tracker_1.MarkFailedSegments(collection_name, failed_map);
tracker_2.MarkFailedSegments(collection_name, failed_map);
};
mark_failed_segments();
{
std::vector<int64_t> segment_ids = {1, 2, 3};
tracker_1.IgnoreFailedSegments(collection_name, segment_ids);
ASSERT_TRUE(segment_ids.empty());
segment_ids = {1, 2, 3, 4};
tracker_1.IgnoreFailedSegments(collection_name, segment_ids);
ASSERT_FALSE(segment_ids.empty());
ASSERT_EQ(segment_ids[0], 4);
}
{
std::vector<int64_t> segment_ids = {1, 2, 3};
tracker_2.IgnoreFailedSegments(collection_name, segment_ids);
ASSERT_EQ(segment_ids.size(), 3);
segment_ids = {1, 2, 3, 4};
tracker_2.IgnoreFailedSegments(collection_name, segment_ids);
ASSERT_EQ(segment_ids.size(), 4);
}
mark_failed_segments();
mark_failed_segments();
{
std::vector<int64_t> segment_ids = {1, 2, 3};
tracker_2.IgnoreFailedSegments(collection_name, segment_ids);
ASSERT_TRUE(segment_ids.empty());
segment_ids = {1, 2, 3, 4};
tracker_2.IgnoreFailedSegments(collection_name, segment_ids);
ASSERT_FALSE(segment_ids.empty());
ASSERT_EQ(segment_ids[0], 4);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册