From d7ef21c64425cf6c5a03851806d06db2c9fda3a9 Mon Sep 17 00:00:00 2001 From: FluorineDog Date: Tue, 3 Nov 2020 12:42:13 +0800 Subject: [PATCH] Remove message.pb and reformat Signed-off-by: FluorineDog --- internal/core/src/pb/CMakeLists.txt | 2 +- internal/core/src/query/Parser.cpp | 22 +- internal/core/src/query/Parser.h | 5 +- internal/core/src/query/Predicate.h | 8 +- internal/core/src/query/QueryNode.h | 15 +- internal/core/src/segcore/Collection.cpp | 4 +- internal/core/src/segcore/Collection.h | 7 - internal/core/src/segcore/collection_c.cpp | 7 - internal/core/src/segcore/collection_c.h | 3 - internal/core/unittest/test_query.cpp | 1 - internal/reader/index.go | 17 +- internal/reader/index_test.go | 224 --------------------- internal/reader/meta.go | 14 +- internal/reader/query_node.go | 7 +- 14 files changed, 35 insertions(+), 301 deletions(-) delete mode 100644 internal/reader/index_test.go diff --git a/internal/core/src/pb/CMakeLists.txt b/internal/core/src/pb/CMakeLists.txt index 010528d10..dfb48bfda 100644 --- a/internal/core/src/pb/CMakeLists.txt +++ b/internal/core/src/pb/CMakeLists.txt @@ -1,7 +1,7 @@ get_property(PROTOC_EXCUTABLE GLOBAL PROPERTY PROTOC_EXCUTABLE ) -set(proto_file_names common.proto etcd_meta.proto schema.proto message.proto service_msg.proto) +set(proto_file_names common.proto etcd_meta.proto schema.proto service_msg.proto) set( PROTO_PATH "${MILVUS_SOURCE_DIR}/../proto/" ) diff --git a/internal/core/src/query/Parser.cpp b/internal/core/src/query/Parser.cpp index dda919ebd..08a374361 100644 --- a/internal/core/src/query/Parser.cpp +++ b/internal/core/src/query/Parser.cpp @@ -3,8 +3,8 @@ #include "Parser.h" namespace milvus::wtf { -using google::protobuf::RepeatedPtrField; using google::protobuf::RepeatedField; +using google::protobuf::RepeatedPtrField; #if 0 #if 0 void @@ -52,7 +52,7 @@ CopyRowRecords(const RepeatedPtrField& grpc_re Status ProcessLeafQueryJson(const milvus::json& query_json, query_old::BooleanQueryPtr& query, std::string& field_name) { - #if 0 +#if 0 if (query_json.contains("term")) { auto leaf_query = std::make_shared(); auto term_query = std::make_shared(); @@ -86,7 +86,7 @@ ProcessLeafQueryJson(const milvus::json& query_json, query_old::BooleanQueryPtr& } else { return Status{SERVER_INVALID_ARGUMENT, "Leaf query get wrong key"}; } - #endif +#endif return Status::OK(); } @@ -94,7 +94,7 @@ Status ProcessBooleanQueryJson(const milvus::json& query_json, query_old::BooleanQueryPtr& boolean_query, query_old::QueryPtr& query_ptr) { - #if 0 +#if 0 if (query_json.empty()) { return Status{SERVER_INVALID_ARGUMENT, "BoolQuery is null"}; } @@ -167,7 +167,7 @@ ProcessBooleanQueryJson(const milvus::json& query_json, return Status{SERVER_INVALID_DSL_PARAMETER, msg}; } } - #endif +#endif return Status::OK(); } @@ -176,7 +176,7 @@ DeserializeJsonToBoolQuery(const google::protobuf::RepeatedPtrField<::milvus::gr const std::string& dsl_string, query_old::BooleanQueryPtr& boolean_query, query_old::QueryPtr& query_ptr) { - #if 0 +#if 0 try { milvus::json dsl_json = json::parse(dsl_string); @@ -236,24 +236,24 @@ DeserializeJsonToBoolQuery(const google::protobuf::RepeatedPtrField<::milvus::gr } catch (std::exception& e) { return Status(SERVER_INVALID_DSL_PARAMETER, e.what()); } - #endif +#endif return Status::OK(); } #endif -query_old::QueryPtr tester(proto::service::Query* request) { +query_old::QueryPtr +Transformer(proto::service::Query* request) { query_old::BooleanQueryPtr boolean_query = std::make_shared(); query_old::QueryPtr query_ptr = std::make_shared(); - #if 0 +#if 0 query_ptr->collection_id = request->collection_name(); auto status = DeserializeJsonToBoolQuery(request->placeholders(), request->dsl(), boolean_query, query_ptr); status = query_old::ValidateBooleanQuery(boolean_query); query_old::GeneralQueryPtr general_query = std::make_shared(); query_old::GenBinaryQuery(boolean_query, general_query->bin); query_ptr->root = general_query; - #endif +#endif return query_ptr; } - } // namespace milvus::wtf \ No newline at end of file diff --git a/internal/core/src/query/Parser.h b/internal/core/src/query/Parser.h index 8aba37ff0..36342c294 100644 --- a/internal/core/src/query/Parser.h +++ b/internal/core/src/query/Parser.h @@ -1,5 +1,4 @@ #pragma once -//#include "pb/message.pb.h" #include "pb/service_msg.pb.h" #include "query/BooleanQuery.h" #include "query/BinaryQuery.h" @@ -8,8 +7,6 @@ namespace milvus::wtf { query_old::QueryPtr -tester(proto::service::Query* query); - - +Transformer(proto::service::Query* query); } // namespace milvus::wtf diff --git a/internal/core/src/query/Predicate.h b/internal/core/src/query/Predicate.h index 0309de0a4..9ddb9ff57 100644 --- a/internal/core/src/query/Predicate.h +++ b/internal/core/src/query/Predicate.h @@ -20,6 +20,7 @@ using ExprPtr = std::unique_ptr; struct BinaryExpr : Expr { ExprPtr left_; ExprPtr right_; + public: void accept(ExprVisitor&) = 0; @@ -27,25 +28,27 @@ struct BinaryExpr : Expr { struct UnaryExpr : Expr { ExprPtr child_; + public: void accept(ExprVisitor&) = 0; }; // TODO: not enabled in sprint 1 -struct BoolUnaryExpr: UnaryExpr { +struct BoolUnaryExpr : UnaryExpr { enum class OpType { LogicalNot }; OpType op_type_; + public: void accept(ExprVisitor&) override; }; - // TODO: not enabled in sprint 1 struct BoolBinaryExpr : BinaryExpr { enum class OpType { LogicalAnd, LogicalOr, LogicalXor }; OpType op_type_; + public: void accept(ExprVisitor&) override; @@ -74,6 +77,7 @@ struct RangeExpr : Expr { FieldId field_id_; enum class OpType { LessThan, LessEqual, GreaterThan, GreaterEqual, Equal, NotEqual }; std::vector> conditions_; + public: void accept(ExprVisitor&) override; diff --git a/internal/core/src/query/QueryNode.h b/internal/core/src/query/QueryNode.h index 25b007f45..bb53bbfdb 100644 --- a/internal/core/src/query/QueryNode.h +++ b/internal/core/src/query/QueryNode.h @@ -9,14 +9,15 @@ namespace milvus::query { class QueryNodeVisitor; enum class QueryNodeType { - kInvalid = 0, - kScan, + kInvalid = 0, + kScan, kANNS, }; // Base of all Nodes struct QueryNode { QueryNodeType node_type; + public: virtual ~QueryNode() = default; virtual void @@ -25,28 +26,28 @@ struct QueryNode { using QueryNodePtr = std::unique_ptr; - struct VectorQueryNode : QueryNode { std::optional child_; int64_t num_queries_; int64_t dim_; FieldId field_id_; + public: virtual void accept(QueryNodeVisitor&) = 0; }; -struct FloatVectorANNS: VectorQueryNode { +struct FloatVectorANNS : VectorQueryNode { std::shared_ptr data; - std::string metric_type_; // TODO: use enum + std::string metric_type_; // TODO: use enum public: void accept(QueryNodeVisitor&) override; }; -struct BinaryVectorANNS: VectorQueryNode { +struct BinaryVectorANNS : VectorQueryNode { std::shared_ptr data; - std::string metric_type_; // TODO: use enum + std::string metric_type_; // TODO: use enum public: void accept(QueryNodeVisitor&) override; diff --git a/internal/core/src/segcore/Collection.cpp b/internal/core/src/segcore/Collection.cpp index c1ff9da18..7625791d6 100644 --- a/internal/core/src/segcore/Collection.cpp +++ b/internal/core/src/segcore/Collection.cpp @@ -2,7 +2,6 @@ #include "pb/common.pb.h" #include "pb/schema.pb.h" #include "pb/etcd_meta.pb.h" -#include "pb/message.pb.h" #include #include #include @@ -14,7 +13,7 @@ Collection::Collection(std::string& collection_name, std::string& schema) parse(); index_ = nullptr; } - +#if 0 void Collection::AddIndex(const grpc::IndexParam& index_param) { auto& index_name = index_param.index_name(); @@ -109,6 +108,7 @@ Collection::CreateIndex(std::string& index_config) { // AddIndex(index); // } } +#endif void Collection::parse() { diff --git a/internal/core/src/segcore/Collection.h b/internal/core/src/segcore/Collection.h index ef6492ae3..b50564f55 100644 --- a/internal/core/src/segcore/Collection.h +++ b/internal/core/src/segcore/Collection.h @@ -1,6 +1,5 @@ #pragma once -#include #include "segcore/Partition.h" #include "SegmentDefs.h" @@ -10,12 +9,6 @@ class Collection { public: explicit Collection(std::string& collection_name, std::string& schema); - void - AddIndex(const grpc::IndexParam& index_param); - - void - CreateIndex(std::string& index_config); - void parse(); diff --git a/internal/core/src/segcore/collection_c.cpp b/internal/core/src/segcore/collection_c.cpp index 93e9a2a42..120f11f59 100644 --- a/internal/core/src/segcore/collection_c.cpp +++ b/internal/core/src/segcore/collection_c.cpp @@ -21,10 +21,3 @@ DeleteCollection(CCollection collection) { std::cout << "delete collection " << col->get_collection_name() << std::endl; delete col; } - -void -UpdateIndexes(CCollection c_collection, const char* index_string) { - auto c = (milvus::segcore::Collection*)c_collection; - std::string s(index_string); - c->CreateIndex(s); -} diff --git a/internal/core/src/segcore/collection_c.h b/internal/core/src/segcore/collection_c.h index ff68975c0..5d3c8aea2 100644 --- a/internal/core/src/segcore/collection_c.h +++ b/internal/core/src/segcore/collection_c.h @@ -10,9 +10,6 @@ NewCollection(const char* collection_name, const char* schema_conf); void DeleteCollection(CCollection collection); -void -UpdateIndexes(CCollection c_collection, const char* index_string); - #ifdef __cplusplus } #endif \ No newline at end of file diff --git a/internal/core/unittest/test_query.cpp b/internal/core/unittest/test_query.cpp index aa440cf76..0b7b0e48a 100644 --- a/internal/core/unittest/test_query.cpp +++ b/internal/core/unittest/test_query.cpp @@ -42,5 +42,4 @@ TEST(Query, Naive) { ] } })"; - } \ No newline at end of file diff --git a/internal/reader/index.go b/internal/reader/index.go index 9902c04b3..dd139984e 100644 --- a/internal/reader/index.go +++ b/internal/reader/index.go @@ -18,10 +18,10 @@ import ( type IndexConfig struct{} -func (s *Segment) buildIndex(collection* Collection) commonpb.Status { +func (s *Segment) buildIndex(collection *Collection) commonpb.Status { /* - int - BuildIndex(CCollection c_collection, CSegmentBase c_segment); + int + BuildIndex(CCollection c_collection, CSegmentBase c_segment); */ var status = C.BuildIndex(collection.CollectionPtr, s.SegmentPtr) if status != 0 { @@ -35,14 +35,3 @@ func (s *Segment) dropIndex(fieldName string) commonpb.Status { return commonpb.Status{ErrorCode: commonpb.ErrorCode_SUCCESS} } - - -func (node *QueryNode) UpdateIndexes(collection *Collection, indexConfig *string) { - /* - void - UpdateIndexes(CCollection c_collection, const char *index_string); - */ - cCollectionPtr := collection.CollectionPtr - cIndexConfig := C.CString(*indexConfig) - C.UpdateIndexes(cCollectionPtr, cIndexConfig) -} diff --git a/internal/reader/index_test.go b/internal/reader/index_test.go deleted file mode 100644 index dcb798b93..000000000 --- a/internal/reader/index_test.go +++ /dev/null @@ -1,224 +0,0 @@ -package reader - -import ( - "context" - "encoding/binary" - "fmt" - msgPb "github.com/zilliztech/milvus-distributed/internal/proto/message" - "math" - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestIndex_BuildIndex(t *testing.T) { - // 1. Construct node, collection, partition and segment - ctx := context.Background() - node := NewQueryNode(ctx, 0, 0) - var collection = node.NewCollection(0, "collection0", "") - var partition = collection.NewPartition("partition0") - var segment = partition.NewSegment(0) - - // 2. Create ids and timestamps - ids := make([]int64, 0) - timestamps := make([]uint64, 0) - - // 3. Create records, use schema below: - // schema_tmp->AddField("fakeVec", DataType::VECTOR_FLOAT, 16); - // schema_tmp->AddField("age", DataType::INT32); - const DIM = 16 - const N = 100 - var vec = [DIM]float32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16} - var rawData []byte - for _, ele := range vec { - buf := make([]byte, 4) - binary.LittleEndian.PutUint32(buf, math.Float32bits(ele)) - rawData = append(rawData, buf...) - } - bs := make([]byte, 4) - binary.LittleEndian.PutUint32(bs, 1) - rawData = append(rawData, bs...) - var records [][]byte - for i := 0; i < N; i++ { - ids = append(ids, int64(i)) - timestamps = append(timestamps, uint64(i)) - records = append(records, rawData) - } - - // 4. Do PreInsert - var offset = segment.SegmentPreInsert(N) - assert.GreaterOrEqual(t, offset, int64(0)) - - // 5. Do Insert - var err = segment.SegmentInsert(offset, &ids, ×tamps, &records) - assert.NoError(t, err) - - // 6. Build index - //segment.BuildIndex(collection) - //assert.Equal(t, status.ErrorCode, msgPb.ErrorCode_SUCCESS) - - // 7. Do search - var queryJson = "{\"field_name\":\"fakevec\",\"num_queries\":1,\"topK\":10}" - var queryRawData = make([]float32, 0) - for i := 0; i < DIM; i++ { - queryRawData = append(queryRawData, float32(i)) - } - var vectorRecord = msgPb.VectorRowRecord{ - FloatData: queryRawData, - } - - query := node.QueryJson2Info(&queryJson) - var searchRes, searchErr = segment.SegmentSearch(query, timestamps[N/2], &vectorRecord) - assert.NoError(t, searchErr) - fmt.Println(searchRes) - - // 8. Destruct node, collection, and segment - partition.DeleteSegment(node, segment) - collection.DeletePartition(node, partition) - node.DeleteCollection(collection) - node.Close() -} - -func TestIndex_DropIndex(t *testing.T) { - // 1. Construct node, collection, partition and segment - ctx := context.Background() - node := NewQueryNode(ctx, 0, 0) - var collection = node.NewCollection(0, "collection0", "") - var partition = collection.NewPartition("partition0") - var segment = partition.NewSegment(0) - - // 2. Create ids and timestamps - ids := make([]int64, 0) - timestamps := make([]uint64, 0) - - // 3. Create records, use schema below: - // schema_tmp->AddField("fakeVec", DataType::VECTOR_FLOAT, 16); - // schema_tmp->AddField("age", DataType::INT32); - const DIM = 16 - const N = 100 - var vec = [DIM]float32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16} - var rawData []byte - for _, ele := range vec { - buf := make([]byte, 4) - binary.LittleEndian.PutUint32(buf, math.Float32bits(ele)) - rawData = append(rawData, buf...) - } - bs := make([]byte, 4) - binary.LittleEndian.PutUint32(bs, 1) - rawData = append(rawData, bs...) - var records [][]byte - for i := 0; i < N; i++ { - ids = append(ids, int64(i)) - timestamps = append(timestamps, uint64(i)) - records = append(records, rawData) - } - - // 4. Do PreInsert - var offset = segment.SegmentPreInsert(N) - assert.GreaterOrEqual(t, offset, int64(0)) - - // 5. Do Insert - var err = segment.SegmentInsert(offset, &ids, ×tamps, &records) - assert.NoError(t, err) - - // 6. Build index - //var status = segment.BuildIndex(collection) - //assert.Equal(t, status.ErrorCode, msgPb.ErrorCode_SUCCESS) - - // 7. Do search - var queryJson = "{\"field_name\":\"fakevec\",\"num_queries\":1,\"topK\":10}" - var queryRawData = make([]float32, 0) - for i := 0; i < DIM; i++ { - queryRawData = append(queryRawData, float32(i)) - } - var vectorRecord = msgPb.VectorRowRecord{ - FloatData: queryRawData, - } - - query := node.QueryJson2Info(&queryJson) - var searchRes, searchErr = segment.SegmentSearch(query, timestamps[N/2], &vectorRecord) - assert.NoError(t, searchErr) - fmt.Println(searchRes) - - // 8. Drop index - //status = segment.DropIndex("fakevec") - //assert.Equal(t, status.ErrorCode, msgPb.ErrorCode_SUCCESS) - - // 9. Destruct node, collection, and segment - partition.DeleteSegment(node, segment) - collection.DeletePartition(node, partition) - node.DeleteCollection(collection) - node.Close() -} - -func TestIndex_UpdateIndex(t *testing.T) { - // 1. Construct node, collection, partition and segment - ctx := context.Background() - node := NewQueryNode(ctx, 0, 0) - var collection = node.NewCollection(0, "collection0", "") - var partition = collection.NewPartition("partition0") - var segment = partition.NewSegment(0) - - // 2. Create ids and timestamps - ids := make([]int64, 0) - timestamps := make([]uint64, 0) - - // 3. Create records, use schema below: - // schema_tmp->AddField("fakeVec", DataType::VECTOR_FLOAT, 16); - // schema_tmp->AddField("age", DataType::INT32); - const DIM = 16 - const N = 100 - var vec = [DIM]float32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16} - var rawData []byte - for _, ele := range vec { - buf := make([]byte, 4) - binary.LittleEndian.PutUint32(buf, math.Float32bits(ele)) - rawData = append(rawData, buf...) - } - bs := make([]byte, 4) - binary.LittleEndian.PutUint32(bs, 1) - rawData = append(rawData, bs...) - var records [][]byte - for i := 0; i < N; i++ { - ids = append(ids, int64(i)) - timestamps = append(timestamps, uint64(i)) - records = append(records, rawData) - } - - // 4. Do PreInsert - var offset = segment.SegmentPreInsert(N) - assert.GreaterOrEqual(t, offset, int64(0)) - - // 5. Do Insert - var err = segment.SegmentInsert(offset, &ids, ×tamps, &records) - assert.NoError(t, err) - - //// 6. Build index - //segment.BuildIndex(collection) - //assert.NoError(t, err) - - // 7. Do search - var queryJson = "{\"field_name\":\"fakevec\",\"num_queries\":1,\"topK\":10}" - var queryRawData = make([]float32, 0) - for i := 0; i < 16; i++ { - queryRawData = append(queryRawData, float32(i)) - } - var vectorRecord = msgPb.VectorRowRecord{ - FloatData: queryRawData, - } - - query := node.QueryJson2Info(&queryJson) - var searchRes, searchErr = segment.SegmentSearch(query, timestamps[N/2], &vectorRecord) - assert.NoError(t, searchErr) - fmt.Println(searchRes) - - // 8. UpdateIndex - newIndex := "" - node.UpdateIndexes(collection, &newIndex) - - // 9. Destruct node, collection, and segment - partition.DeleteSegment(node, segment) - collection.DeletePartition(node, partition) - node.DeleteCollection(collection) - node.Close() -} diff --git a/internal/reader/meta.go b/internal/reader/meta.go index 82f231b7e..dbfb9e8e4 100644 --- a/internal/reader/meta.go +++ b/internal/reader/meta.go @@ -164,19 +164,7 @@ func (node *QueryNode) processSegmentModify(id string, value string) { } func (node *QueryNode) processCollectionModify(id string, value string) { - // println("Modify Collection: ", id) - collection, err := collection.JSON2Collection(value) - if err != nil { - println("error of json 2 collection") - println(err.Error()) - } - // printCollectionStruct(collection) - - goCollection := node.GetCollectionByID(collection.ID) - if goCollection != nil { - node.UpdateIndexes(goCollection, &collection.GrpcMarshalString) - } - + println("Modify Collection: ", id) } func (node *QueryNode) processModify(key string, msg string) { diff --git a/internal/reader/query_node.go b/internal/reader/query_node.go index a2dad247f..5d52d1bee 100644 --- a/internal/reader/query_node.go +++ b/internal/reader/query_node.go @@ -15,9 +15,10 @@ import "C" import ( "context" - "github.com/zilliztech/milvus-distributed/internal/util/typeutil" "time" + "github.com/zilliztech/milvus-distributed/internal/util/typeutil" + "github.com/zilliztech/milvus-distributed/internal/kv" "github.com/zilliztech/milvus-distributed/internal/msgclient" msgPb "github.com/zilliztech/milvus-distributed/internal/proto/message" @@ -211,10 +212,6 @@ func (node *QueryNode) QueryNodeDataInit() { } func (node *QueryNode) NewCollection(collectionID int64, collectionName string, schemaConfig string) *Collection { - /* - void - UpdateIndexes(CCollection c_collection, const char *index_string); - */ cName := C.CString(collectionName) cSchema := C.CString(schemaConfig) collection := C.NewCollection(cName, cSchema) -- GitLab