未验证 提交 523d97af 编写于 作者: C Cai Yudong 提交者: GitHub

snapshot integrate (#2782)

* update HybridQuery
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* add IterateHandler
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* IterateHandler opt
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* opt
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* opt
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>
上级 71d36534
......@@ -564,36 +564,7 @@ SSDBImpl::DeleteEntities(const std::string& collection_id, engine::IDNumbers ent
}
Status
SSDBImpl::GetPartitionsByTags(const std::string& collection_name, const std::vector<std::string>& partition_patterns,
std::set<std::string>& partition_names) {
snapshot::ScopedSnapshotT ss;
STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));
std::vector<std::string> partition_array;
STATUS_CHECK(ShowPartitions(collection_name, partition_array));
for (auto& pattern : partition_patterns) {
if (pattern == milvus::engine::DEFAULT_PARTITON_TAG) {
partition_names.insert(collection_name);
return Status::OK();
}
for (auto& p_name : partition_array) {
if (StringHelpFunctions::IsRegexMatch(p_name, pattern)) {
partition_names.insert(p_name);
}
}
}
if (partition_names.empty()) {
return Status(DB_PARTITION_NOT_FOUND, "Specified partition does not exist");
}
return Status::OK();
}
Status
SSDBImpl::HybridQuery(const server::ContextPtr& context, const std::string& collection_id,
SSDBImpl::HybridQuery(const server::ContextPtr& context, const std::string& collection_name,
const std::vector<std::string>& partition_patterns, query::GeneralQueryPtr general_query,
query::QueryPtr query_ptr, std::vector<std::string>& field_names,
std::unordered_map<std::string, engine::meta::hybrid::DataType>& attr_type,
......@@ -602,87 +573,28 @@ SSDBImpl::HybridQuery(const server::ContextPtr& context, const std::string& coll
auto query_ctx = context->Child("Query");
Status status;
meta::FilesHolder files_holder;
#if 0
if (partition_patterns.empty()) {
// no partition tag specified, means search in whole table
// get all table files from parent table
STATUS_CHECK(meta_ptr_->FilesToSearch(collection_id, files_holder));
std::vector<meta::CollectionSchema> partition_array;
STATUS_CHECK(meta_ptr_->ShowPartitions(collection_id, partition_array));
for (auto& schema : partition_array) {
status = meta_ptr_->FilesToSearch(schema.collection_id_, files_holder);
if (!status.ok()) {
return Status(DB_ERROR, "get files to search failed in HybridQuery");
}
}
if (files_holder.HoldFiles().empty()) {
return Status::OK(); // no files to search
}
} else {
// get files from specified partitions
std::set<std::string> partition_name_array;
GetPartitionsByTags(collection_id, partition_patterns, partition_name_array);
for (auto& partition_name : partition_name_array) {
status = meta_ptr_->FilesToSearch(partition_name, files_holder);
if (!status.ok()) {
return Status(DB_ERROR, "get files to search failed in HybridQuery");
}
}
if (files_holder.HoldFiles().empty()) {
return Status::OK();
}
}
#endif
cache::CpuCacheMgr::GetInstance()->PrintInfo(); // print cache info before query
status = HybridQueryAsync(query_ctx, collection_id, files_holder, general_query, query_ptr, field_names, attr_type,
result);
if (!status.ok()) {
return status;
}
cache::CpuCacheMgr::GetInstance()->PrintInfo(); // print cache info after query
query_ctx->GetTraceContext()->GetSpan()->Finish();
TimeRecorder rc("HybridQuery");
return status;
}
snapshot::ScopedSnapshotT ss;
STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));
////////////////////////////////////////////////////////////////////////////////
// Internal APIs
////////////////////////////////////////////////////////////////////////////////
Status
SSDBImpl::HybridQueryAsync(const server::ContextPtr& context, const std::string& collection_name,
meta::FilesHolder& files_holder, query::GeneralQueryPtr general_query,
query::QueryPtr query_ptr, std::vector<std::string>& field_names,
std::unordered_map<std::string, engine::meta::hybrid::DataType>& attr_type,
engine::QueryResult& result) {
auto query_async_ctx = context->Child("Query Async");
auto handler = std::make_shared<HybridQueryHelperSegmentHandler>(nullptr, ss, partition_patterns);
handler->Iterate();
STATUS_CHECK(handler->GetStatus());
TimeRecorder rc("");
LOG_ENGINE_DEBUG_ << LogOut("Engine query begin, segment count: %ld", handler->segments_.size());
// step 1: construct search job
VectorsData vectors;
milvus::engine::meta::SegmentsSchema& files = files_holder.HoldFiles();
LOG_ENGINE_DEBUG_ << LogOut("Engine query begin, index file count: %ld", files_holder.HoldFiles().size());
scheduler::SearchJobPtr job =
std::make_shared<scheduler::SearchJob>(query_async_ctx, general_query, query_ptr, attr_type, vectors);
for (auto& file : files) {
scheduler::SegmentSchemaPtr file_ptr = std::make_shared<meta::SegmentSchema>(file);
job->AddIndexFile(file_ptr);
std::make_shared<scheduler::SearchJob>(query_ctx, general_query, query_ptr, attr_type, vectors);
for (auto& segment : handler->segments_) {
// job->AddSegment(segment);
}
// step 2: put search job to scheduler and wait result
scheduler::JobMgrInst::GetInstance()->Put(job);
job->WaitResult();
files_holder.ReleaseFiles();
if (!job->GetStatus().ok()) {
return job->GetStatus();
}
......@@ -693,11 +605,7 @@ SSDBImpl::HybridQueryAsync(const server::ContextPtr& context, const std::string&
result.result_distances_ = job->GetResultDistances();
// step 4: get entities by result ids
auto status = GetEntityByID(collection_name, result.result_ids_, field_names, result.vectors_, result.attrs_);
if (!status.ok()) {
query_async_ctx->GetTraceContext()->GetSpan()->Finish();
return status;
}
STATUS_CHECK(GetEntityByID(collection_name, result.result_ids_, field_names, result.vectors_, result.attrs_));
// step 5: filter entities by field names
// std::vector<engine::AttrsData> filter_attrs;
......@@ -718,11 +626,14 @@ SSDBImpl::HybridQueryAsync(const server::ContextPtr& context, const std::string&
rc.ElapseFromBegin("Engine query totally cost");
query_async_ctx->GetTraceContext()->GetSpan()->Finish();
query_ctx->GetTraceContext()->GetSpan()->Finish();
return Status::OK();
}
////////////////////////////////////////////////////////////////////////////////
// Internal APIs
////////////////////////////////////////////////////////////////////////////////
void
SSDBImpl::InternalFlush(const std::string& collection_id) {
wal::MXLogRecord record;
......
......@@ -103,17 +103,6 @@ class SSDBImpl {
engine::QueryResult& result);
private:
Status
GetPartitionsByTags(const std::string& collection_name, const std::vector<std::string>& partition_patterns,
std::set<std::string>& partition_names);
Status
HybridQueryAsync(const server::ContextPtr& context, const std::string& collection_name,
meta::FilesHolder& files_holder, query::GeneralQueryPtr general_query, query::QueryPtr query_ptr,
std::vector<std::string>& field_names,
std::unordered_map<std::string, engine::meta::hybrid::DataType>& attr_type,
engine::QueryResult& result);
void
InternalFlush(const std::string& collection_id = "");
......
......@@ -15,6 +15,7 @@
#include "db/snapshot/Snapshot.h"
#include "knowhere/index/vector_index/helpers/IndexParameter.h"
#include "segment/SegmentReader.h"
#include "utils/StringHelpFunctions.h"
#include <unordered_map>
#include <utility>
......@@ -219,5 +220,30 @@ GetEntityByIdSegmentHandler::Handle(const snapshot::SegmentPtr& segment) {
return Status::OK();
}
///////////////////////////////////////////////////////////////////////////////
HybridQueryHelperSegmentHandler::HybridQueryHelperSegmentHandler(const server::ContextPtr& context,
engine::snapshot::ScopedSnapshotT ss,
const std::vector<std::string>& partition_patterns)
: BaseT(ss), context_(context), partition_patterns_(partition_patterns), segments_() {
}
Status
HybridQueryHelperSegmentHandler::Handle(const snapshot::SegmentPtr& segment) {
if (partition_patterns_.empty()) {
segments_.push_back(segment);
} else {
auto p_id = segment->GetPartitionId();
auto p_ptr = ss_->GetResource<snapshot::Partition>(p_id);
auto& p_name = p_ptr->GetName();
for (auto& pattern : partition_patterns_) {
if (StringHelpFunctions::IsRegexMatch(p_name, pattern)) {
segments_.push_back(segment);
break;
}
}
}
return Status::OK();
}
} // namespace engine
} // namespace milvus
......@@ -13,6 +13,7 @@
#include "db/Types.h"
#include "db/meta/FilesHolder.h"
#include "db/snapshot/IterateHandler.h"
#include "db/snapshot/Snapshot.h"
#include "server/context/Context.h"
#include "utils/Log.h"
......@@ -77,5 +78,20 @@ struct GetEntityByIdSegmentHandler : public snapshot::IterateHandler<snapshot::S
std::vector<engine::AttrsData> attr_data_;
};
///////////////////////////////////////////////////////////////////////////////
struct HybridQueryHelperSegmentHandler : public snapshot::IterateHandler<snapshot::Segment> {
using ResourceT = snapshot::Segment;
using BaseT = snapshot::IterateHandler<ResourceT>;
HybridQueryHelperSegmentHandler(const server::ContextPtr& context, snapshot::ScopedSnapshotT ss,
const std::vector<std::string>& partition_patterns);
Status
Handle(const typename ResourceT::Ptr&) override;
const server::ContextPtr context_;
const std::vector<std::string> partition_patterns_;
std::vector<snapshot::SegmentPtr> segments_;
};
} // namespace engine
} // namespace milvus
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include <memory>
#include <mutex>
#include "db/snapshot/Snapshot.h"
#include "utils/Status.h"
namespace milvus {
namespace engine {
namespace snapshot {
template <typename T>
struct IterateHandler : public std::enable_shared_from_this<IterateHandler<T>> {
using ResourceT = T;
using ThisT = IterateHandler<ResourceT>;
using Ptr = std::shared_ptr<ThisT>;
explicit IterateHandler(ScopedSnapshotT ss) : ss_(ss) {
}
virtual Status
PreIterate() {
return Status::OK();
}
virtual Status
Handle(const typename ResourceT::Ptr& resource) = 0;
virtual Status
PostIterate() {
return Status::OK();
}
void
SetStatus(Status status) {
std::unique_lock<std::mutex> lock(mtx_);
status_ = status;
}
Status
GetStatus() const {
std::unique_lock<std::mutex> lock(mtx_);
return status_;
}
virtual void
Iterate() {
ss_->IterateResources<ThisT>(this->shared_from_this());
}
ScopedSnapshotT ss_;
Status status_;
mutable std::mutex mtx_;
};
using IterateSegmentHandler = IterateHandler<Segment>;
using SegmentExecutorT = std::function<Status(const Segment::Ptr&, IterateSegmentHandler*)>;
struct SegmentCollector : public IterateSegmentHandler {
using ResourceT = Segment;
using BaseT = IterateSegmentHandler;
explicit SegmentCollector(ScopedSnapshotT ss, const SegmentExecutorT& executor) : BaseT(ss), executor_(executor) {
}
Status
Handle(const typename ResourceT::Ptr& segment) override {
return executor_(segment, this);
}
SegmentExecutorT executor_;
};
} // namespace snapshot
} // namespace engine
} // namespace milvus
......@@ -347,47 +347,6 @@ class Snapshot : public ReferenceProxy {
using GCHandler = std::function<void(Snapshot::Ptr)>;
using ScopedSnapshotT = ScopedResource<Snapshot>;
template <typename T>
struct IterateHandler : public std::enable_shared_from_this<IterateHandler<T>> {
using ResourceT = T;
using ThisT = IterateHandler<ResourceT>;
using Ptr = std::shared_ptr<ThisT>;
explicit IterateHandler(ScopedSnapshotT ss) : ss_(ss) {
}
virtual Status
PreIterate() {
return Status::OK();
}
virtual Status
Handle(const typename ResourceT::Ptr& resource) = 0;
virtual Status
PostIterate() {
return Status::OK();
}
void
SetStatus(Status status) {
std::unique_lock<std::mutex> lock(mtx_);
status_ = status;
}
Status
GetStatus() const {
std::unique_lock<std::mutex> lock(mtx_);
return status_;
}
virtual void
Iterate() {
ss_->IterateResources<ThisT>(this->shared_from_this());
}
ScopedSnapshotT ss_;
Status status_;
mutable std::mutex mtx_;
};
} // namespace snapshot
} // namespace engine
} // namespace milvus
......@@ -19,6 +19,7 @@
#include "ssdb/utils.h"
#include "db/SnapshotVisitor.h"
#include "db/snapshot/IterateHandler.h"
using SegmentVisitor = milvus::engine::SegmentVisitor;
......@@ -256,7 +257,7 @@ TEST_F(SSDBTest, VisitorTest) {
return Status::OK();
};
auto segment_handler = std::make_shared<SegmentCollector>(ss, executor);
auto segment_handler = std::make_shared<milvus::engine::snapshot::SegmentCollector>(ss, executor);
segment_handler->Iterate();
std::cout << segment_handler->GetStatus().ToString() << std::endl;
ASSERT_TRUE(segment_handler->GetStatus().ok());
......
......@@ -120,22 +120,6 @@ struct PartitionCollector : public IteratePartitionHandler {
std::vector<std::string> partition_names_;
};
using SegmentExecutorT = std::function<Status(const Segment::Ptr&, IterateSegmentHandler*)>;
struct SegmentCollector : public IterateSegmentHandler {
using ResourceT = Segment;
using BaseT = IterateSegmentHandler;
explicit SegmentCollector(ScopedSnapshotT ss, const SegmentExecutorT& executor)
: BaseT(ss), executor_(executor) {}
Status
Handle(const typename ResourceT::Ptr& segment) override {
return executor_(segment, this);
}
SegmentExecutorT executor_;
};
using FilterT = std::function<bool(SegmentFile::Ptr)>;
struct SegmentFileCollector : public IterateSegmentFileHandler {
using ResourceT = SegmentFile;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册