From 7f044fff822591086ef4a31e87cdba981b7fc3fe Mon Sep 17 00:00:00 2001 From: FluorineDog Date: Thu, 21 Jan 2021 15:29:52 +0800 Subject: [PATCH] Enable search for sealed Signed-off-by: FluorineDog --- .../chap01_system_overview.md | 6 +-- internal/core/src/query/Plan.cpp | 2 +- internal/core/src/query/PlanNode.h | 3 +- internal/core/src/query/SearchOnGrowing.cpp | 35 +++++---------- internal/core/src/query/SearchOnGrowing.h | 3 +- internal/core/src/query/SearchOnSealed.cpp | 4 +- .../src/query/generated/ExecExprVisitor.h | 2 +- .../src/query/generated/ExecPlanNodeVisitor.h | 4 +- .../src/query/visitors/ExecExprVisitor.cpp | 2 +- .../query/visitors/ExecPlanNodeVisitor.cpp | 15 +++---- .../query/visitors/ShowPlanNodeVisitor.cpp | 24 +++++----- .../core/src/segcore/SegmentGrowingImpl.cpp | 26 ++++++----- .../core/src/segcore/SegmentGrowingImpl.h | 14 +++--- .../core/src/segcore/SegmentInterface.cpp | 13 ++++++ internal/core/src/segcore/SegmentInterface.h | 13 +++++- .../core/src/segcore/SegmentSealedImpl.cpp | 23 ++++++---- internal/core/src/segcore/SegmentSealedImpl.h | 14 +++--- internal/core/src/segcore/plan_c.cpp | 3 +- internal/core/src/segcore/segment_c.cpp | 3 +- internal/core/unittest/test_expr.cpp | 3 +- internal/core/unittest/test_query.cpp | 44 ++----------------- internal/core/unittest/test_sealed.cpp | 41 ++++++++++++++++- internal/core/unittest/test_utils/DataGen.cpp | 14 ++++++ internal/distributed/datanode/client.go | 6 +-- internal/distributed/indexservice/service.go | 6 +-- internal/distributed/querynode/client.go | 6 +-- internal/distributed/queryservice/client.go | 6 +-- internal/distributed/queryservice/service.go | 6 +-- internal/indexnode/indexnode.go | 6 +-- internal/indexservice/indexservice.go | 6 +-- internal/proxyservice/proxyservice.go | 6 +-- internal/querynode/query_node.go | 18 +++----- internal/queryservice/queryservice.go | 6 +-- internal/util/typeutil/service.go | 6 +-- 34 files changed, 208 insertions(+), 181 deletions(-) create mode 100644 internal/core/unittest/test_utils/DataGen.cpp diff --git a/docs/developer_guides/chap01_system_overview.md b/docs/developer_guides/chap01_system_overview.md index e5d648253..e0e2923a1 100644 --- a/docs/developer_guides/chap01_system_overview.md +++ b/docs/developer_guides/chap01_system_overview.md @@ -78,9 +78,9 @@ In order to boost throughput, we model Milvus as a stream-driven system. ```go type Component interface { - Init() error - Start() error - Stop() error + Init() + Start() + Stop() GetComponentStates() (ComponentStates, error) GetTimeTickChannel() (string, error) GetStatisticsChannel() (string, error) diff --git a/internal/core/src/query/Plan.cpp b/internal/core/src/query/Plan.cpp index 9d0434d01..d999eb717 100644 --- a/internal/core/src/query/Plan.cpp +++ b/internal/core/src/query/Plan.cpp @@ -209,7 +209,7 @@ Parser::ParseVecNode(const Json& out_body) { } }(); vec_node->query_info_.topK_ = topK; - vec_node->query_info_.metric_type_ = vec_info.at("metric_type"); + vec_node->query_info_.metric_type_ = GetMetricType(vec_info.at("metric_type")); vec_node->query_info_.search_params_ = vec_info.at("params"); vec_node->query_info_.field_offset_ = field_offset; vec_node->placeholder_tag_ = vec_info.at("query"); diff --git a/internal/core/src/query/PlanNode.h b/internal/core/src/query/PlanNode.h index bbe5e9d35..3b1e068b5 100644 --- a/internal/core/src/query/PlanNode.h +++ b/internal/core/src/query/PlanNode.h @@ -41,7 +41,8 @@ using PlanNodePtr = std::unique_ptr; struct QueryInfo { int64_t topK_; FieldOffset field_offset_; - std::string metric_type_; // TODO: use enum + MetricType metric_type_; + std::string deprecated_metric_type_; // TODO: use enum nlohmann::json search_params_; }; diff --git a/internal/core/src/query/SearchOnGrowing.cpp b/internal/core/src/query/SearchOnGrowing.cpp index 1389b71fd..54edd7e2f 100644 --- a/internal/core/src/query/SearchOnGrowing.cpp +++ b/internal/core/src/query/SearchOnGrowing.cpp @@ -63,7 +63,7 @@ FloatSearch(const segcore::SegmentGrowingImpl& segment, auto dim = field.get_dim(); auto topK = info.topK_; auto total_count = topK * num_queries; - auto metric_type = GetMetricType(info.metric_type_); + auto metric_type = info.metric_type_; // step 3: small indexing search // std::vector final_uids(total_count, -1); @@ -138,7 +138,7 @@ BinarySearch(const segcore::SegmentGrowingImpl& segment, auto& record = segment.get_insert_record(); // step 1: binary search to find the barrier of the snapshot // auto ins_barrier = get_barrier(record, timestamp); - auto metric_type = GetMetricType(info.metric_type_); + auto metric_type = info.metric_type_; // auto del_barrier = get_barrier(deleted_record_, timestamp); #if 0 @@ -195,37 +195,24 @@ BinarySearch(const segcore::SegmentGrowingImpl& segment, } // TODO: refactor and merge this into one -template void SearchOnGrowing(const segcore::SegmentGrowingImpl& segment, int64_t ins_barrier, const query::QueryInfo& info, - const EmbeddedType* query_data, + const void* query_data, int64_t num_queries, const faiss::BitsetView& bitset, QueryResult& results) { - static_assert(IsVector); - if constexpr (std::is_same_v) { - FloatSearch(segment, info, query_data, num_queries, ins_barrier, bitset, results); + // TODO: add data_type to info + auto data_type = segment.get_schema()[info.field_offset_].get_data_type(); + Assert(datatype_is_vector(data_type)); + if (data_type == DataType::VECTOR_FLOAT) { + auto typed_data = reinterpret_cast(query_data); + FloatSearch(segment, info, typed_data, num_queries, ins_barrier, bitset, results); } else { - BinarySearch(segment, info, query_data, num_queries, ins_barrier, bitset, results); + auto typed_data = reinterpret_cast(query_data); + BinarySearch(segment, info, typed_data, num_queries, ins_barrier, bitset, results); } } -template void -SearchOnGrowing(const segcore::SegmentGrowingImpl& segment, - int64_t ins_barrier, - const query::QueryInfo& info, - const EmbeddedType* query_data, - int64_t num_queries, - const faiss::BitsetView& bitset, - QueryResult& results); -template void -SearchOnGrowing(const segcore::SegmentGrowingImpl& segment, - int64_t ins_barrier, - const query::QueryInfo& info, - const EmbeddedType* query_data, - int64_t num_queries, - const faiss::BitsetView& bitset, - QueryResult& results); } // namespace milvus::query diff --git a/internal/core/src/query/SearchOnGrowing.h b/internal/core/src/query/SearchOnGrowing.h index f1251d512..2f7869d8c 100644 --- a/internal/core/src/query/SearchOnGrowing.h +++ b/internal/core/src/query/SearchOnGrowing.h @@ -20,12 +20,11 @@ namespace milvus::query { using BitmapChunk = boost::dynamic_bitset<>; using BitmapSimple = std::deque; -template void SearchOnGrowing(const segcore::SegmentGrowingImpl& segment, int64_t ins_barrier, const query::QueryInfo& info, - const EmbeddedType* query_data, + const void* query_data, int64_t num_queries, const faiss::BitsetView& bitset, QueryResult& results); diff --git a/internal/core/src/query/SearchOnSealed.cpp b/internal/core/src/query/SearchOnSealed.cpp index 2aec8b41e..e7f57435d 100644 --- a/internal/core/src/query/SearchOnSealed.cpp +++ b/internal/core/src/query/SearchOnSealed.cpp @@ -63,9 +63,7 @@ SearchOnSealed(const Schema& schema, Assert(record.is_ready(field_offset)); auto indexing_entry = record.get_entry(field_offset); - std::cout << " SearchOnSealed, indexing_entry->metric:" << indexing_entry->metric_type_ << std::endl; - std::cout << " SearchOnSealed, query_info.metric_type_:" << query_info.metric_type_ << std::endl; - Assert(indexing_entry->metric_type_ == GetMetricType(query_info.metric_type_)); + Assert(indexing_entry->metric_type_ == query_info.metric_type_); auto final = [&] { auto ds = knowhere::GenDataset(num_queries, dim, query_data); diff --git a/internal/core/src/query/generated/ExecExprVisitor.h b/internal/core/src/query/generated/ExecExprVisitor.h index 96e9439ed..bf432ded4 100644 --- a/internal/core/src/query/generated/ExecExprVisitor.h +++ b/internal/core/src/query/generated/ExecExprVisitor.h @@ -37,7 +37,7 @@ class ExecExprVisitor : public ExprVisitor { public: using RetType = std::deque>; - ExecExprVisitor(const segcore::SegmentGrowingImpl& segment, int64_t row_count) + ExecExprVisitor(const segcore::SegmentInternalInterface& segment, int64_t row_count) : segment_(segment), row_count_(row_count) { } RetType diff --git a/internal/core/src/query/generated/ExecPlanNodeVisitor.h b/internal/core/src/query/generated/ExecPlanNodeVisitor.h index 08ac2f694..882c66fe1 100644 --- a/internal/core/src/query/generated/ExecPlanNodeVisitor.h +++ b/internal/core/src/query/generated/ExecPlanNodeVisitor.h @@ -29,7 +29,7 @@ class ExecPlanNodeVisitor : public PlanNodeVisitor { public: using RetType = QueryResult; - ExecPlanNodeVisitor(const segcore::SegmentGrowing& segment, + ExecPlanNodeVisitor(const segcore::SegmentInterface& segment, Timestamp timestamp, const PlaceholderGroup& placeholder_group) : segment_(segment), timestamp_(timestamp), placeholder_group_(placeholder_group) { @@ -53,7 +53,7 @@ class ExecPlanNodeVisitor : public PlanNodeVisitor { private: // std::optional ret_; - const segcore::SegmentGrowing& segment_; + const segcore::SegmentInterface& segment_; Timestamp timestamp_; const PlaceholderGroup& placeholder_group_; diff --git a/internal/core/src/query/visitors/ExecExprVisitor.cpp b/internal/core/src/query/visitors/ExecExprVisitor.cpp index 0c3e85e4a..c8652da4b 100644 --- a/internal/core/src/query/visitors/ExecExprVisitor.cpp +++ b/internal/core/src/query/visitors/ExecExprVisitor.cpp @@ -25,7 +25,7 @@ namespace impl { class ExecExprVisitor : ExprVisitor { public: using RetType = std::deque>; - ExecExprVisitor(const segcore::SegmentGrowingImpl& segment, int64_t row_count) + ExecExprVisitor(const segcore::SegmentInternalInterface& segment, int64_t row_count) : segment_(segment), row_count_(row_count) { } RetType diff --git a/internal/core/src/query/visitors/ExecPlanNodeVisitor.cpp b/internal/core/src/query/visitors/ExecPlanNodeVisitor.cpp index 62cfeb56e..d558978ee 100644 --- a/internal/core/src/query/visitors/ExecPlanNodeVisitor.cpp +++ b/internal/core/src/query/visitors/ExecPlanNodeVisitor.cpp @@ -28,7 +28,7 @@ namespace impl { class ExecPlanNodeVisitor : PlanNodeVisitor { public: using RetType = QueryResult; - ExecPlanNodeVisitor(const segcore::SegmentGrowing& segment, + ExecPlanNodeVisitor(const segcore::SegmentInterface& segment, Timestamp timestamp, const PlaceholderGroup& placeholder_group) : segment_(segment), timestamp_(timestamp), placeholder_group_(placeholder_group) { @@ -52,7 +52,7 @@ class ExecPlanNodeVisitor : PlanNodeVisitor { private: // std::optional ret_; - const segcore::SegmentGrowing& segment_; + const segcore::SegmentInterface& segment_; Timestamp timestamp_; const PlaceholderGroup& placeholder_group_; @@ -66,7 +66,7 @@ void ExecPlanNodeVisitor::VectorVisitorImpl(VectorPlanNode& node) { // TODO: optimize here, remove the dynamic cast assert(!ret_.has_value()); - auto segment = dynamic_cast(&segment_); + auto segment = dynamic_cast(&segment_); AssertInfo(segment, "support SegmentSmallIndex Only"); RetType ret; auto& ph = placeholder_group_.at(0); @@ -76,7 +76,7 @@ ExecPlanNodeVisitor::VectorVisitorImpl(VectorPlanNode& node) { aligned_vector bitset_holder; BitsetView view; // TODO: add API to unify row_count - auto row_count = segcore::get_barrier(segment->get_insert_record(), timestamp_); + auto row_count = segment->get_row_count(); if (node.predicate_.has_value()) { ExecExprVisitor::RetType expr_ret = ExecExprVisitor(*segment, row_count).call_child(*node.predicate_.value()); @@ -84,12 +84,7 @@ ExecPlanNodeVisitor::VectorVisitorImpl(VectorPlanNode& node) { view = BitsetView(bitset_holder.data(), bitset_holder.size() * 8); } - auto& sealed_indexing = segment->get_sealed_indexing_record(); - if (sealed_indexing.is_ready(node.query_info_.field_offset_)) { - SearchOnSealed(segment->get_schema(), sealed_indexing, node.query_info_, src_data, num_queries, view, ret); - } else { - SearchOnGrowing(*segment, row_count, node.query_info_, src_data, num_queries, view, ret); - } + segment->vector_search(row_count, node.query_info_, src_data, num_queries, view, ret); ret_ = ret; } diff --git a/internal/core/src/query/visitors/ShowPlanNodeVisitor.cpp b/internal/core/src/query/visitors/ShowPlanNodeVisitor.cpp index 6154dee4c..bbbd7df45 100644 --- a/internal/core/src/query/visitors/ShowPlanNodeVisitor.cpp +++ b/internal/core/src/query/visitors/ShowPlanNodeVisitor.cpp @@ -54,12 +54,12 @@ ShowPlanNodeVisitor::visit(FloatVectorANNS& node) { assert(!ret_); auto& info = node.query_info_; Json json_body{ - {"node_type", "FloatVectorANNS"}, // - {"metric_type", info.metric_type_}, // - {"field_offset_", info.field_offset_.get()}, // - {"topK", info.topK_}, // - {"search_params", info.search_params_}, // - {"placeholder_tag", node.placeholder_tag_}, // + {"node_type", "FloatVectorANNS"}, // + {"metric_type", MetricTypeToName(info.metric_type_)}, // + {"field_offset_", info.field_offset_.get()}, // + {"topK", info.topK_}, // + {"search_params", info.search_params_}, // + {"placeholder_tag", node.placeholder_tag_}, // }; if (node.predicate_.has_value()) { ShowExprVisitor expr_show; @@ -76,12 +76,12 @@ ShowPlanNodeVisitor::visit(BinaryVectorANNS& node) { assert(!ret_); auto& info = node.query_info_; Json json_body{ - {"node_type", "BinaryVectorANNS"}, // - {"metric_type", info.metric_type_}, // - {"field_offset_", info.field_offset_.get()}, // - {"topK", info.topK_}, // - {"search_params", info.search_params_}, // - {"placeholder_tag", node.placeholder_tag_}, // + {"node_type", "BinaryVectorANNS"}, // + {"metric_type", MetricTypeToName(info.metric_type_)}, // + {"field_offset_", info.field_offset_.get()}, // + {"topK", info.topK_}, // + {"search_params", info.search_params_}, // + {"placeholder_tag", node.placeholder_tag_}, // }; if (node.predicate_.has_value()) { ShowExprVisitor expr_show; diff --git a/internal/core/src/segcore/SegmentGrowingImpl.cpp b/internal/core/src/segcore/SegmentGrowingImpl.cpp index 3044de214..b356dc208 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.cpp +++ b/internal/core/src/segcore/SegmentGrowingImpl.cpp @@ -19,6 +19,7 @@ #include #include #include +#include #include "query/generated/ExecPlanNodeVisitor.h" #include "segcore/SegmentGrowingImpl.h" #include "query/PlanNode.h" @@ -237,17 +238,6 @@ SegmentGrowingImpl::GetMemoryUsageInBytes() const { return total_bytes; } -QueryResult -SegmentGrowingImpl::Search(const query::Plan* plan, - const query::PlaceholderGroup** placeholder_groups, - const Timestamp* timestamps, - int64_t num_groups) const { - Assert(num_groups == 1); - query::ExecPlanNodeVisitor visitor(*this, timestamps[0], *placeholder_groups[0]); - auto results = visitor.get_moved_result(*plan->plan_node_); - return results; -} - Status SegmentGrowingImpl::LoadIndexing(const LoadIndexInfo& info) { auto field_offset = schema_->get_offset(FieldName(info.field_name)); @@ -270,5 +260,19 @@ SegmentGrowingImpl::num_chunk_data() const { auto size = get_insert_record().ack_responder_.GetAck(); return upper_div(size, chunk_size_); } +void +SegmentGrowingImpl::vector_search(int64_t vec_count, + query::QueryInfo query_info, + const void* query_data, + int64_t query_count, + const BitsetView& bitset, + QueryResult& output) const { + auto& sealed_indexing = this->get_sealed_indexing_record(); + if (sealed_indexing.is_ready(query_info.field_offset_)) { + query::SearchOnSealed(this->get_schema(), sealed_indexing, query_info, query_data, query_count, bitset, output); + } else { + SearchOnGrowing(*this, vec_count, query_info, query_data, query_count, bitset, output); + } +} } // namespace milvus::segcore diff --git a/internal/core/src/segcore/SegmentGrowingImpl.h b/internal/core/src/segcore/SegmentGrowingImpl.h index e779cac17..6a54e05be 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.h +++ b/internal/core/src/segcore/SegmentGrowingImpl.h @@ -53,12 +53,6 @@ class SegmentGrowingImpl : public SegmentGrowing { Status Delete(int64_t reserverd_offset, int64_t size, const int64_t* row_ids, const Timestamp* timestamps) override; - QueryResult - Search(const query::Plan* Plan, - const query::PlaceholderGroup* placeholder_groups[], - const Timestamp timestamps[], - int64_t num_groups) const override; - // stop receive insert requests // will move data to immutable vector or something Status @@ -181,6 +175,14 @@ class SegmentGrowingImpl : public SegmentGrowing { indexing_record_(*schema_, chunk_size) { } + void + vector_search(int64_t vec_count, + query::QueryInfo query_info, + const void* query_data, + int64_t query_count, + const BitsetView& bitset, + QueryResult& output) const override; + public: std::shared_ptr get_deleted_bitmap(int64_t del_barrier, Timestamp query_timestamp, int64_t insert_barrier, bool force = false); diff --git a/internal/core/src/segcore/SegmentInterface.cpp b/internal/core/src/segcore/SegmentInterface.cpp index e6d4303b5..0066ee360 100644 --- a/internal/core/src/segcore/SegmentInterface.cpp +++ b/internal/core/src/segcore/SegmentInterface.cpp @@ -10,6 +10,7 @@ // or implied. See the License for the specific language governing permissions and limitations under the License #include "segcore/SegmentInterface.h" +#include "query/generated/ExecPlanNodeVisitor.h" namespace milvus::segcore { class Naive; @@ -39,4 +40,16 @@ SegmentInterface::FillTargetEntry(const query::Plan* plan, QueryResult& results) results.row_data_.emplace_back(std::move(blob)); } } + +QueryResult +SegmentInterface::Search(const query::Plan* plan, + const query::PlaceholderGroup** placeholder_groups, + const Timestamp* timestamps, + int64_t num_groups) const { + Assert(num_groups == 1); + query::ExecPlanNodeVisitor visitor(*this, timestamps[0], *placeholder_groups[0]); + auto results = visitor.get_moved_result(*plan->plan_node_); + return results; +} + } // namespace milvus::segcore diff --git a/internal/core/src/segcore/SegmentInterface.h b/internal/core/src/segcore/SegmentInterface.h index 08f18b4b4..a6f0668e3 100644 --- a/internal/core/src/segcore/SegmentInterface.h +++ b/internal/core/src/segcore/SegmentInterface.h @@ -17,6 +17,7 @@ #include "IndexingEntry.h" #include #include "common/SystemProperty.h" +#include "query/PlanNode.h" namespace milvus::segcore { @@ -25,11 +26,11 @@ class SegmentInterface { void FillTargetEntry(const query::Plan* plan, QueryResult& results) const; - virtual QueryResult + QueryResult Search(const query::Plan* Plan, const query::PlaceholderGroup* placeholder_groups[], const Timestamp timestamps[], - int64_t num_groups) const = 0; + int64_t num_groups) const; virtual int64_t GetMemoryUsageInBytes() const = 0; @@ -71,6 +72,14 @@ class SegmentInternalInterface : public SegmentInterface { } public: + virtual void + vector_search(int64_t vec_count, + query::QueryInfo query_info, + const void* query_data, + int64_t query_count, + const BitsetView& bitset, + QueryResult& output) const = 0; + virtual int64_t num_chunk_index_safe(FieldOffset field_offset) const = 0; diff --git a/internal/core/src/segcore/SegmentSealedImpl.cpp b/internal/core/src/segcore/SegmentSealedImpl.cpp index caa167076..a0ef3a772 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.cpp +++ b/internal/core/src/segcore/SegmentSealedImpl.cpp @@ -10,6 +10,7 @@ // or implied. See the License for the specific language governing permissions and limitations under the License #include "segcore/SegmentSealedImpl.h" +#include "query/SearchOnSealed.h" namespace milvus::segcore { void SegmentSealedImpl::LoadIndex(const LoadIndexInfo& info) { @@ -107,14 +108,6 @@ SegmentSealedImpl::chunk_index_impl(FieldOffset field_offset, int64_t chunk_id) return nullptr; } -QueryResult -SegmentSealedImpl::Search(const query::Plan* Plan, - const query::PlaceholderGroup** placeholder_groups, - const Timestamp* timestamps, - int64_t num_groups) const { - PanicInfo("unimplemented"); -} - int64_t SegmentSealedImpl::GetMemoryUsageInBytes() const { // TODO: add estimate for index @@ -134,6 +127,20 @@ SegmentSealedImpl::get_schema() const { return *schema_; } +void +SegmentSealedImpl::vector_search(int64_t vec_count, + query::QueryInfo query_info, + const void* query_data, + int64_t query_count, + const BitsetView& bitset, + QueryResult& output) const { + auto field_offset = query_info.field_offset_; + auto& field_meta = schema_->operator[](field_offset); + Assert(field_meta.is_vector()); + Assert(vec_indexings_.is_ready(field_offset)); + query::SearchOnSealed(*schema_, vec_indexings_, query_info, query_data, query_count, bitset, output); +} + SegmentSealedPtr CreateSealedSegment(SchemaPtr schema, int64_t chunk_size) { return std::make_unique(schema); diff --git a/internal/core/src/segcore/SegmentSealedImpl.h b/internal/core/src/segcore/SegmentSealedImpl.h index ce7989f18..f901e3667 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.h +++ b/internal/core/src/segcore/SegmentSealedImpl.h @@ -26,12 +26,6 @@ class SegmentSealedImpl : public SegmentSealed { LoadFieldData(const LoadFieldDataInfo& info) override; public: - QueryResult - Search(const query::Plan* Plan, - const query::PlaceholderGroup* placeholder_groups[], - const Timestamp timestamps[], - int64_t num_groups) const override; - int64_t GetMemoryUsageInBytes() const override; @@ -104,6 +98,14 @@ class SegmentSealedImpl : public SegmentSealed { } } + void + vector_search(int64_t vec_count, + query::QueryInfo query_info, + const void* query_data, + int64_t query_count, + const BitsetView& bitset, + QueryResult& output) const override; + bool is_all_ready() const { // TODO: optimize here diff --git a/internal/core/src/segcore/plan_c.cpp b/internal/core/src/segcore/plan_c.cpp index b3cb53543..16455ea4b 100644 --- a/internal/core/src/segcore/plan_c.cpp +++ b/internal/core/src/segcore/plan_c.cpp @@ -78,7 +78,8 @@ GetTopK(CPlan plan) { const char* GetMetricType(CPlan plan) { auto query_plan = static_cast(plan); - return strdup(query_plan->plan_node_->query_info_.metric_type_.c_str()); + auto metric_str = milvus::MetricTypeToName(query_plan->plan_node_->query_info_.metric_type_); + return strdup(metric_str.c_str()); } void diff --git a/internal/core/src/segcore/segment_c.cpp b/internal/core/src/segcore/segment_c.cpp index 0f00d324f..84d10709c 100644 --- a/internal/core/src/segcore/segment_c.cpp +++ b/internal/core/src/segcore/segment_c.cpp @@ -20,6 +20,7 @@ #include "common/type_c.h" #include #include +#include "common/Types.h" ////////////////////////////// common interfaces ////////////////////////////// CSegmentInterface @@ -79,7 +80,7 @@ Search(CSegmentInterface c_segment, auto status = CStatus(); try { *query_result = segment->Search(plan, placeholder_groups.data(), timestamps, num_groups); - if (plan->plan_node_->query_info_.metric_type_ != "IP") { + if (plan->plan_node_->query_info_.metric_type_ != milvus::MetricType::METRIC_INNER_PRODUCT) { for (auto& dis : query_result->result_distances_) { dis *= -1; } diff --git a/internal/core/unittest/test_expr.cpp b/internal/core/unittest/test_expr.cpp index 7d104400f..8a85e2ead 100644 --- a/internal/core/unittest/test_expr.cpp +++ b/internal/core/unittest/test_expr.cpp @@ -235,7 +235,8 @@ TEST(Expr, ShowExecutor) { int64_t num_queries = 100L; auto raw_data = DataGen(schema, num_queries); auto& info = node->query_info_; - info.metric_type_ = "L2"; + + info.metric_type_ = MetricType::METRIC_L2; info.topK_ = 20; info.field_offset_ = FieldOffset(0); node->predicate_ = std::nullopt; diff --git a/internal/core/unittest/test_query.cpp b/internal/core/unittest/test_query.cpp index e8124ac74..3470e274a 100644 --- a/internal/core/unittest/test_query.cpp +++ b/internal/core/unittest/test_query.cpp @@ -20,51 +20,12 @@ #include "query/generated/ExecPlanNodeVisitor.h" #include "query/PlanImpl.h" #include "segcore/SegmentGrowingImpl.h" +#include "segcore/SegmentSealed.h" #include "pb/schema.pb.h" using namespace milvus; using namespace milvus::query; using namespace milvus::segcore; -TEST(Query, Naive) { - SUCCEED(); - using namespace milvus::wtf; - std::string dsl_string = R"( -{ - "bool": { - "must": [ - { - "term": { - "A": [ - 1, - 2, - 5 - ] - } - }, - { - "range": { - "B": { - "GT": 1, - "LT": 100 - } - } - }, - { - "vector": { - "Vec": { - "metric_type": "L2", - "params": { - "nprobe": 10 - }, - "query": "$0", - "topk": 10 - } - } - } - ] - } -})"; -} TEST(Query, ShowExecutor) { using namespace milvus::query; @@ -76,7 +37,7 @@ TEST(Query, ShowExecutor) { int64_t num_queries = 100L; auto raw_data = DataGen(schema, num_queries); auto& info = node->query_info_; - info.metric_type_ = "L2"; + info.metric_type_ = MetricType::METRIC_L2; info.topK_ = 20; info.field_offset_ = FieldOffset(1000); node->predicate_ = std::nullopt; @@ -258,6 +219,7 @@ TEST(Query, ExecWithPredicate) { ])"); ASSERT_EQ(json.dump(2), ref.dump(2)); } + TEST(Query, ExecTerm) { using namespace milvus::query; using namespace milvus::segcore; diff --git a/internal/core/unittest/test_sealed.cpp b/internal/core/unittest/test_sealed.cpp index 671deb908..022916395 100644 --- a/internal/core/unittest/test_sealed.cpp +++ b/internal/core/unittest/test_sealed.cpp @@ -22,7 +22,7 @@ using namespace milvus; using namespace milvus::segcore; -using namespace milvus; +using namespace milvus::query; TEST(Sealed, without_predicate) { using namespace milvus::query; @@ -286,4 +286,41 @@ TEST(Sealed, LoadFieldData) { ASSERT_EQ(chunk_span1[i], ref1[i]); ASSERT_EQ(chunk_span2[i], ref2[i]); } -} \ No newline at end of file + std::string dsl = R"({ + "bool": { + "must": [ + { + "range": { + "double": { + "GE": -1, + "LT": 1 + } + } + }, + { + "vector": { + "fakevec": { + "metric_type": "L2", + "params": { + "nprobe": 10 + }, + "query": "$0", + "topk": 5 + } + } + } + ] + } + })"; + + auto plan = CreatePlan(*schema, dsl); + auto num_queries = 5; + auto ph_group_raw = CreatePlaceholderGroup(num_queries, 16, 1024); + auto ph_group = ParsePlaceholderGroup(plan.get(), ph_group_raw.SerializeAsString()); + Timestamp time = 1000000; + std::vector ph_group_arr = {ph_group.get()}; + + auto qr = segment->Search(plan.get(), ph_group_arr.data(), &time, 1); + auto json = QueryResultToJson(qr); + std::cout << json.dump(1); +} diff --git a/internal/core/unittest/test_utils/DataGen.cpp b/internal/core/unittest/test_utils/DataGen.cpp new file mode 100644 index 000000000..f9f29239c --- /dev/null +++ b/internal/core/unittest/test_utils/DataGen.cpp @@ -0,0 +1,14 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +// +// Created by mike on 1/21/21. +// diff --git a/internal/distributed/datanode/client.go b/internal/distributed/datanode/client.go index 1bdfcc492..a8480b6cb 100644 --- a/internal/distributed/datanode/client.go +++ b/internal/distributed/datanode/client.go @@ -11,15 +11,15 @@ type Client struct { // GOOSE TODO: add DataNodeClient } -func (c *Client) Init() error { +func (c *Client) Init() { panic("implement me") } -func (c *Client) Start() error { +func (c *Client) Start() { panic("implement me") } -func (c *Client) Stop() error { +func (c *Client) Stop() { panic("implement me") } diff --git a/internal/distributed/indexservice/service.go b/internal/distributed/indexservice/service.go index 5451595a3..7bf384d86 100644 --- a/internal/distributed/indexservice/service.go +++ b/internal/distributed/indexservice/service.go @@ -28,9 +28,8 @@ type Server struct { loopWg sync.WaitGroup } -func (s *Server) Init() error { +func (s *Server) Init() { indexservice.Params.Init() - return nil } func (s *Server) Start() error { @@ -38,9 +37,8 @@ func (s *Server) Start() error { return s.startIndexServer() } -func (s *Server) Stop() error { +func (s *Server) Stop() { s.loopWg.Wait() - return nil } func (s *Server) GetComponentStates() (*internalpb2.ComponentStates, error) { diff --git a/internal/distributed/querynode/client.go b/internal/distributed/querynode/client.go index 0b0e0bb13..6a0672f8a 100644 --- a/internal/distributed/querynode/client.go +++ b/internal/distributed/querynode/client.go @@ -13,15 +13,15 @@ type Client struct { grpcClient querypb.QueryNodeClient } -func (c *Client) Init() error { +func (c *Client) Init() { panic("implement me") } -func (c *Client) Start() error { +func (c *Client) Start() { panic("implement me") } -func (c *Client) Stop() error { +func (c *Client) Stop() { panic("implement me") } diff --git a/internal/distributed/queryservice/client.go b/internal/distributed/queryservice/client.go index 64238f4bf..3609e5670 100644 --- a/internal/distributed/queryservice/client.go +++ b/internal/distributed/queryservice/client.go @@ -9,15 +9,15 @@ type Client struct { grpcClient querypb.QueryServiceClient } -func (c *Client) Init() error { +func (c *Client) Init() { panic("implement me") } -func (c *Client) Start() error { +func (c *Client) Start() { panic("implement me") } -func (c *Client) Stop() error { +func (c *Client) Stop() { panic("implement me") } diff --git a/internal/distributed/queryservice/service.go b/internal/distributed/queryservice/service.go index 0c3c25806..9551886bf 100644 --- a/internal/distributed/queryservice/service.go +++ b/internal/distributed/queryservice/service.go @@ -13,15 +13,15 @@ type Server struct { queryService queryServiceImpl.Interface } -func (s *Server) Init() error { +func (s *Server) Init() { panic("implement me") } -func (s *Server) Start() error { +func (s *Server) Start() { panic("implement me") } -func (s *Server) Stop() error { +func (s *Server) Stop() { panic("implement me") } diff --git a/internal/indexnode/indexnode.go b/internal/indexnode/indexnode.go index b3e9a1899..01644c8ca 100644 --- a/internal/indexnode/indexnode.go +++ b/internal/indexnode/indexnode.go @@ -45,15 +45,15 @@ type IndexNode struct { //serviceClient indexservice.Interface // method factory } -func (i *IndexNode) Init() error { +func (i *IndexNode) Init() { panic("implement me") } -func (i *IndexNode) Start() error { +func (i *IndexNode) Start() { panic("implement me") } -func (i *IndexNode) Stop() error { +func (i *IndexNode) Stop() { panic("implement me") } diff --git a/internal/indexservice/indexservice.go b/internal/indexservice/indexservice.go index 7c0f83636..862de4f67 100644 --- a/internal/indexservice/indexservice.go +++ b/internal/indexservice/indexservice.go @@ -46,15 +46,15 @@ type IndexService struct { type UniqueID = typeutil.UniqueID type Timestamp = typeutil.Timestamp -func (i *IndexService) Init() error { +func (i *IndexService) Init() { panic("implement me") } -func (i *IndexService) Start() error { +func (i *IndexService) Start() { panic("implement me") } -func (i *IndexService) Stop() error { +func (i *IndexService) Stop() { panic("implement me") } diff --git a/internal/proxyservice/proxyservice.go b/internal/proxyservice/proxyservice.go index 50890e47e..3dfb34996 100644 --- a/internal/proxyservice/proxyservice.go +++ b/internal/proxyservice/proxyservice.go @@ -13,15 +13,15 @@ type ProxyService struct { } -func (s ProxyService) Init() error { +func (s ProxyService) Init() { panic("implement me") } -func (s ProxyService) Start() error { +func (s ProxyService) Start() { panic("implement me") } -func (s ProxyService) Stop() error { +func (s ProxyService) Stop() { panic("implement me") } diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index d30699359..6b0fbfb9d 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -16,13 +16,10 @@ import ( "context" "errors" "fmt" + queryserviceimpl "github.com/zilliztech/milvus-distributed/internal/queryservice" "io" "sync/atomic" - "github.com/zilliztech/milvus-distributed/internal/util/typeutil" - - queryserviceimpl "github.com/zilliztech/milvus-distributed/internal/queryservice" - "github.com/opentracing/opentracing-go" "github.com/uber/jaeger-client-go/config" @@ -34,7 +31,9 @@ import ( ) type Node interface { - typeutil.Service + Init() + Start() + Stop() GetComponentStates() (*internalpb2.ComponentStates, error) GetTimeTickChannel() (string, error) @@ -124,7 +123,7 @@ func Init() { Params.Init() } -func (node *QueryNode) Init() error { +func (node *QueryNode) Init() { registerReq := queryPb.RegisterNodeRequest{ Address: &commonpb.Address{ Ip: Params.QueryNodeIP, @@ -142,10 +141,9 @@ func (node *QueryNode) Init() error { // TODO: use response.initParams Params.Init() - return nil } -func (node *QueryNode) Start() error { +func (node *QueryNode) Start() { // todo add connectMaster logic // init services and manager node.dataSyncService = newDataSyncService(node.queryNodeLoopCtx, node.replica) @@ -164,10 +162,9 @@ func (node *QueryNode) Start() error { node.stateCode.Store(internalpb2.StateCode_HEALTHY) <-node.queryNodeLoopCtx.Done() - return nil } -func (node *QueryNode) Stop() error { +func (node *QueryNode) Stop() { node.stateCode.Store(internalpb2.StateCode_ABNORMAL) node.queryNodeLoopCancel() @@ -190,7 +187,6 @@ func (node *QueryNode) Stop() error { if node.closer != nil { node.closer.Close() } - return nil } func (node *QueryNode) GetComponentStates() (*internalpb2.ComponentStates, error) { diff --git a/internal/queryservice/queryservice.go b/internal/queryservice/queryservice.go index 5edf64c37..fe904da26 100644 --- a/internal/queryservice/queryservice.go +++ b/internal/queryservice/queryservice.go @@ -9,15 +9,15 @@ type QueryService struct { } //serverBase interface -func (qs *QueryService) Init() error { +func (qs *QueryService) Init() { panic("implement me") } -func (qs *QueryService) Start() error { +func (qs *QueryService) Start() { panic("implement me") } -func (qs *QueryService) Stop() error { +func (qs *QueryService) Stop() { panic("implement me") } diff --git a/internal/util/typeutil/service.go b/internal/util/typeutil/service.go index e04c7e35a..9fe320b54 100644 --- a/internal/util/typeutil/service.go +++ b/internal/util/typeutil/service.go @@ -5,9 +5,9 @@ import ( ) type Service interface { - Init() error - Start() error - Stop() error + Init() + Start() + Stop() } type Component interface { -- GitLab