From cdcf006690c4a20a768a45dc5c04b617e053091d Mon Sep 17 00:00:00 2001 From: xige-16 Date: Sun, 7 Feb 2021 09:30:48 +0800 Subject: [PATCH] Fix loadCollection error when query service assign insert channels and query nodes Signed-off-by: xige-16 --- docs/developer_guides/chap07_query_service.md | 2 + internal/proto/query_service.proto | 1 + internal/proto/querypb/query_service.pb.go | 182 ++++++------- internal/querynode/flow_graph_dd_node.go | 41 ++- .../querynode/flow_graph_filter_dm_node.go | 8 +- internal/querynode/query_node.go | 27 +- internal/queryservice/meta_replica.go | 117 ++++----- internal/queryservice/queryservice.go | 240 ++++++++---------- 8 files changed, 302 insertions(+), 316 deletions(-) diff --git a/docs/developer_guides/chap07_query_service.md b/docs/developer_guides/chap07_query_service.md index 5103bb7ad..744aee285 100644 --- a/docs/developer_guides/chap07_query_service.md +++ b/docs/developer_guides/chap07_query_service.md @@ -77,6 +77,7 @@ type LoadCollectionRequest struct { MsgBase DbID UniqueID CollectionID UniqueID + schema schemapb.CollectionSchema } ``` @@ -144,6 +145,7 @@ type LoadPartitonRequest struct { DbID UniqueID CollectionID UniqueID PartitionIDs []UniqueID + schema schemapb.CollectionSchema } ``` diff --git a/internal/proto/query_service.proto b/internal/proto/query_service.proto index a89235a30..49eff0251 100644 --- a/internal/proto/query_service.proto +++ b/internal/proto/query_service.proto @@ -128,6 +128,7 @@ message LoadSegmentRequest { repeated int64 segmentIDs = 5; repeated int64 fieldIDs = 6; repeated data.SegmentStateInfo segment_states = 7; + schema.CollectionSchema schema = 8; } message ReleaseSegmentRequest { diff --git a/internal/proto/querypb/query_service.pb.go b/internal/proto/querypb/query_service.pb.go index c9dd91866..f72f13891 100644 --- a/internal/proto/querypb/query_service.pb.go +++ b/internal/proto/querypb/query_service.pb.go @@ -996,6 +996,7 @@ type LoadSegmentRequest struct { SegmentIDs []int64 `protobuf:"varint,5,rep,packed,name=segmentIDs,proto3" json:"segmentIDs,omitempty"` FieldIDs []int64 `protobuf:"varint,6,rep,packed,name=fieldIDs,proto3" json:"fieldIDs,omitempty"` SegmentStates []*datapb.SegmentStateInfo `protobuf:"bytes,7,rep,name=segment_states,json=segmentStates,proto3" json:"segment_states,omitempty"` + Schema *schemapb.CollectionSchema `protobuf:"bytes,8,opt,name=schema,proto3" json:"schema,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -1075,6 +1076,13 @@ func (m *LoadSegmentRequest) GetSegmentStates() []*datapb.SegmentStateInfo { return nil } +func (m *LoadSegmentRequest) GetSchema() *schemapb.CollectionSchema { + if m != nil { + return m.Schema + } + return nil +} + type ReleaseSegmentRequest struct { Base *commonpb.MsgBase `protobuf:"bytes,1,opt,name=base,proto3" json:"base,omitempty"` DbID int64 `protobuf:"varint,2,opt,name=dbID,proto3" json:"dbID,omitempty"` @@ -1412,93 +1420,93 @@ func init() { func init() { proto.RegisterFile("query_service.proto", fileDescriptor_5fcb6756dc1afb8d) } var fileDescriptor_5fcb6756dc1afb8d = []byte{ - // 1368 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xcc, 0x57, 0xdd, 0x8e, 0xdb, 0xc4, - 0x17, 0x5f, 0x6f, 0xb2, 0xd9, 0xe6, 0x6c, 0x9a, 0xa4, 0xb3, 0x5f, 0xf9, 0xfb, 0x5f, 0x4a, 0x19, - 0x68, 0xbb, 0x6d, 0x21, 0x8b, 0xb6, 0x02, 0x71, 0x03, 0xa8, 0xdd, 0x54, 0x55, 0x10, 0x5d, 0x16, - 0xa7, 0x55, 0xc5, 0x42, 0x15, 0x1c, 0x7b, 0x36, 0x99, 0x36, 0xb6, 0x53, 0xcf, 0xa4, 0x1f, 0x7b, - 0x03, 0x48, 0x5c, 0x22, 0x78, 0x06, 0x84, 0x04, 0x88, 0xa7, 0xe0, 0x29, 0xb8, 0xe1, 0x06, 0xc4, - 0x8b, 0x20, 0x8f, 0x27, 0x5e, 0xdb, 0x99, 0xac, 0xd3, 0xa6, 0xed, 0x72, 0xe7, 0x39, 0x3e, 0xe7, - 0xfc, 0xce, 0xd7, 0x9c, 0x39, 0x07, 0x96, 0x1f, 0x0c, 0x89, 0xff, 0xa4, 0xcd, 0x88, 0xff, 0x90, - 0x5a, 0xa4, 0x3e, 0xf0, 0x3d, 0xee, 0x21, 0xe4, 0xd0, 0xfe, 0xc3, 0x21, 0x0b, 0x4f, 0x75, 0xc1, - 0xa1, 0x97, 0x2c, 0xcf, 0x71, 0x3c, 0x37, 0xa4, 0xe9, 0xa5, 0x38, 0x87, 0x5e, 0xa6, 0x2e, 0x27, - 0xbe, 0x6b, 0xf6, 0xe5, 0x19, 0xd9, 0x26, 0x37, 0x93, 0x3a, 0xf5, 0x12, 0xb3, 0x7a, 0xc4, 0x31, - 0xc3, 0x13, 0xfe, 0x0a, 0x96, 0x0d, 0xd2, 0xa5, 0x8c, 0x13, 0x7f, 0xc7, 0xb3, 0x89, 0x41, 0x1e, - 0x0c, 0x09, 0xe3, 0xe8, 0x6d, 0xc8, 0x77, 0x4c, 0x46, 0x6a, 0xda, 0x59, 0x6d, 0x63, 0x69, 0xeb, - 0x74, 0x3d, 0x61, 0x87, 0x34, 0xe0, 0x26, 0xeb, 0x5e, 0x33, 0x19, 0x31, 0x04, 0x27, 0x7a, 0x17, - 0x16, 0x4d, 0xdb, 0xf6, 0x09, 0x63, 0xb5, 0xf9, 0x23, 0x84, 0xae, 0x86, 0x3c, 0xc6, 0x88, 0x19, - 0xff, 0xa0, 0xc1, 0x4a, 0xd2, 0x02, 0x36, 0xf0, 0x5c, 0x46, 0xd0, 0x15, 0x28, 0x30, 0x6e, 0xf2, - 0x21, 0x93, 0x46, 0xfc, 0x5f, 0xa9, 0xaf, 0x25, 0x58, 0x0c, 0xc9, 0x8a, 0xae, 0xc1, 0x12, 0x75, - 0x29, 0x6f, 0x0f, 0x4c, 0xdf, 0x74, 0x46, 0x96, 0xbc, 0x96, 0x94, 0x8c, 0x62, 0xd4, 0x74, 0x29, - 0xdf, 0x15, 0x8c, 0x06, 0xd0, 0xe8, 0x1b, 0xdf, 0x85, 0xd5, 0x56, 0xcf, 0x7b, 0xb4, 0xed, 0xf5, - 0xfb, 0xc4, 0xe2, 0xd4, 0x73, 0x9f, 0x3d, 0x28, 0x08, 0xf2, 0x76, 0xa7, 0xd9, 0x10, 0x76, 0xe4, - 0x0c, 0xf1, 0x8d, 0x19, 0xac, 0xa5, 0xd5, 0xcf, 0xe2, 0xf1, 0x1b, 0x70, 0xd2, 0x8a, 0x54, 0x35, - 0x1b, 0x81, 0xcf, 0xb9, 0x8d, 0x9c, 0x91, 0x24, 0xe2, 0xdf, 0x35, 0x58, 0xfd, 0xd8, 0x33, 0xed, - 0x17, 0xe4, 0x14, 0xc2, 0x50, 0x8a, 0x03, 0xd6, 0x72, 0xe2, 0x5f, 0x82, 0x86, 0xde, 0x87, 0x42, - 0x58, 0x7a, 0xb5, 0xbc, 0xc0, 0x3a, 0x97, 0xc4, 0x92, 0x65, 0x79, 0x68, 0x61, 0x4b, 0x10, 0x0c, - 0x29, 0x84, 0xbf, 0xd5, 0xa0, 0x66, 0x90, 0x3e, 0x31, 0x19, 0x39, 0x46, 0x2f, 0xf0, 0xd7, 0x1a, - 0xac, 0x04, 0xf9, 0xdb, 0x35, 0x7d, 0x4e, 0x8f, 0xc7, 0x84, 0x41, 0x58, 0xa0, 0x31, 0x0b, 0x66, - 0x29, 0x20, 0x0c, 0xa5, 0xc1, 0x48, 0xd3, 0x61, 0xfd, 0x24, 0x68, 0xd8, 0x81, 0x4a, 0x84, 0x16, - 0x88, 0x13, 0x86, 0xce, 0xc2, 0x52, 0x8c, 0x45, 0x00, 0xe6, 0x8c, 0x38, 0x09, 0xbd, 0x07, 0x0b, - 0x01, 0x04, 0x11, 0xfe, 0x95, 0xb7, 0x70, 0x7d, 0xbc, 0x99, 0xd5, 0x93, 0x5a, 0x8d, 0x50, 0x00, - 0xff, 0xac, 0xc1, 0x5a, 0x0a, 0xef, 0xe5, 0x97, 0x6b, 0x3a, 0x2e, 0x79, 0x45, 0x5c, 0x7e, 0xd3, - 0x60, 0x7d, 0xcc, 0xd0, 0x59, 0x92, 0xb1, 0x07, 0x6b, 0x11, 0x40, 0xdb, 0x26, 0xcc, 0xf2, 0xe9, - 0x20, 0xf8, 0x0e, 0xd3, 0xb2, 0xb4, 0xf5, 0x7a, 0x76, 0x10, 0x99, 0xb1, 0x1a, 0xa9, 0x68, 0xc4, - 0x34, 0xe0, 0x7f, 0x34, 0x58, 0x09, 0x7a, 0xc0, 0xf1, 0x55, 0xee, 0x34, 0x31, 0x8d, 0xb5, 0x89, - 0x85, 0x67, 0x69, 0x13, 0xbf, 0x68, 0xb0, 0x2e, 0xdb, 0xc4, 0x7f, 0xdb, 0x51, 0xfc, 0xa3, 0x06, - 0xfa, 0xb6, 0x4f, 0x4c, 0x4e, 0x3e, 0x0d, 0xd2, 0xb8, 0xdd, 0x33, 0x5d, 0x97, 0xf4, 0x67, 0xab, - 0x9f, 0x0b, 0x50, 0xf1, 0x43, 0x67, 0xdb, 0x56, 0xa8, 0x4f, 0x98, 0x5e, 0x34, 0xca, 0x92, 0x2c, - 0x51, 0xd0, 0x39, 0x28, 0xfb, 0x84, 0x0d, 0xfb, 0x87, 0x7c, 0x39, 0xc1, 0x77, 0x32, 0xa4, 0x4a, - 0x36, 0xfc, 0x93, 0x06, 0xeb, 0x57, 0x6d, 0x3b, 0x6e, 0xe0, 0x0c, 0x57, 0xf1, 0x32, 0x9c, 0x4a, - 0x59, 0x27, 0x43, 0x5b, 0x34, 0xaa, 0x49, 0xfb, 0x9a, 0x0d, 0x74, 0x11, 0xaa, 0x49, 0x0b, 0x65, - 0xa8, 0x8b, 0x46, 0x25, 0x61, 0x63, 0xb3, 0x81, 0xff, 0xd4, 0x40, 0x37, 0x88, 0xe3, 0x3d, 0x24, - 0x4a, 0x43, 0x9f, 0x29, 0x92, 0x23, 0xef, 0xe6, 0x67, 0xf3, 0x2e, 0xf7, 0x14, 0xde, 0xe5, 0xd5, - 0xde, 0xdd, 0x83, 0xb5, 0x3b, 0x26, 0xb7, 0x7a, 0x0d, 0x67, 0xf6, 0x0c, 0x9c, 0x01, 0x88, 0xf0, - 0xc2, 0x9e, 0x52, 0x34, 0x62, 0x14, 0xfc, 0xeb, 0x3c, 0xa0, 0xa0, 0x47, 0xb4, 0x48, 0xd7, 0x21, - 0x2e, 0x7f, 0xf9, 0x17, 0x27, 0xf5, 0xac, 0xe4, 0xc7, 0x9f, 0x95, 0x33, 0x00, 0x2c, 0xb4, 0x2e, - 0x70, 0x61, 0x41, 0x5c, 0xac, 0x18, 0x05, 0xe9, 0x70, 0x62, 0x9f, 0x92, 0xbe, 0x1d, 0xfc, 0x2d, - 0x88, 0xbf, 0xd1, 0x19, 0x7d, 0x04, 0x65, 0xc9, 0xd9, 0x16, 0x2f, 0x0d, 0xab, 0x2d, 0xaa, 0xda, - 0x6a, 0x30, 0x35, 0xd7, 0x65, 0x08, 0x44, 0x4f, 0x6d, 0xba, 0xfb, 0x9e, 0x71, 0x92, 0xc5, 0x28, - 0x0c, 0xff, 0xad, 0xc1, 0xaa, 0x6c, 0x34, 0xc7, 0x16, 0xad, 0x69, 0xfa, 0xe9, 0x0c, 0xf1, 0xc2, - 0xdf, 0x6b, 0xb0, 0xbe, 0xed, 0x39, 0x03, 0xcf, 0x8d, 0xfc, 0x9e, 0xad, 0x3f, 0x7d, 0x10, 0x0a, - 0x91, 0xd1, 0x68, 0x7e, 0x7e, 0xc2, 0x68, 0x9e, 0x06, 0x95, 0x52, 0xf8, 0x2f, 0x0d, 0x96, 0x64, - 0xb4, 0x83, 0x9c, 0xa0, 0xd3, 0x50, 0x8c, 0x5c, 0x91, 0x33, 0xc8, 0x21, 0x61, 0x2c, 0x84, 0xf3, - 0xd9, 0x05, 0x97, 0x1b, 0x2f, 0xb8, 0xff, 0xc1, 0x09, 0x87, 0x38, 0x6d, 0x46, 0x0f, 0x88, 0xac, - 0xc7, 0x45, 0x87, 0x38, 0x2d, 0x7a, 0x40, 0x82, 0x5f, 0xee, 0xd0, 0x69, 0xfb, 0xde, 0x23, 0x26, - 0x5e, 0xab, 0x9c, 0xb1, 0xe8, 0x0e, 0x1d, 0xc3, 0x7b, 0xc4, 0xd0, 0x2b, 0x00, 0xd4, 0xb5, 0xc9, - 0xe3, 0xb6, 0x6b, 0x3a, 0xa4, 0x56, 0x10, 0x57, 0xbb, 0x28, 0x28, 0x3b, 0xa6, 0x43, 0x50, 0x0d, - 0x16, 0xc5, 0xa1, 0xd9, 0xa8, 0x2d, 0x86, 0x82, 0xf2, 0x88, 0xf7, 0x01, 0xc5, 0x3c, 0x9c, 0xe9, - 0xaa, 0xc7, 0xf2, 0x3e, 0x9f, 0xce, 0x3b, 0xfe, 0x46, 0x83, 0xe5, 0x04, 0xd0, 0x2c, 0x79, 0x7d, - 0x07, 0x16, 0xa8, 0xbb, 0xef, 0x8d, 0xc6, 0x94, 0x57, 0x55, 0x63, 0x4a, 0x1c, 0x2c, 0xe4, 0xbe, - 0x74, 0x00, 0xe5, 0xe4, 0xf0, 0x82, 0x4a, 0x70, 0x62, 0xc7, 0xe3, 0xd7, 0x1f, 0x53, 0xc6, 0xab, - 0x73, 0xa8, 0x0c, 0xb0, 0xe3, 0xf1, 0x5d, 0x9f, 0x30, 0xe2, 0xf2, 0xaa, 0x86, 0x00, 0x0a, 0x9f, - 0xb8, 0x0d, 0xca, 0xee, 0x57, 0xe7, 0xd1, 0xb2, 0x9c, 0x49, 0xcd, 0x7e, 0xd3, 0xbd, 0x49, 0x1c, - 0xcf, 0x7f, 0x52, 0xcd, 0x05, 0xe2, 0xd1, 0x29, 0x8f, 0xaa, 0x50, 0x8a, 0x58, 0x6e, 0xec, 0xde, - 0xae, 0x2e, 0xa0, 0x22, 0x2c, 0x84, 0x9f, 0x85, 0xad, 0xef, 0x00, 0x4a, 0xe2, 0xb9, 0x68, 0x85, - 0xeb, 0x31, 0xb2, 0xa0, 0x14, 0x5f, 0x44, 0xd1, 0x05, 0x95, 0x13, 0x8a, 0x65, 0x59, 0xdf, 0xc8, - 0x66, 0x0c, 0x63, 0x8b, 0xe7, 0xd0, 0x3d, 0xa8, 0x24, 0xb7, 0x3f, 0x86, 0x2e, 0x2a, 0x83, 0xa5, - 0xda, 0x40, 0xf5, 0x4b, 0xd3, 0xb0, 0x46, 0x58, 0x5d, 0x28, 0x27, 0xf6, 0x04, 0x86, 0x36, 0x26, - 0xc9, 0xa7, 0x47, 0x25, 0xfd, 0xe2, 0x14, 0x9c, 0x11, 0xd0, 0x67, 0x50, 0x4e, 0x0c, 0x96, 0x13, - 0x80, 0x54, 0xc3, 0xa7, 0x7e, 0x54, 0x79, 0xe1, 0x39, 0xd4, 0x86, 0x53, 0xe9, 0x69, 0x8e, 0xa1, - 0xcb, 0xea, 0x80, 0x2b, 0x87, 0xbe, 0x2c, 0x80, 0xbd, 0xd0, 0xf6, 0xc3, 0x00, 0xaa, 0xf3, 0xa1, - 0x5c, 0x9e, 0xb3, 0x74, 0x7f, 0x19, 0x19, 0x1f, 0x53, 0xff, 0xe6, 0x11, 0xc6, 0x3f, 0x35, 0x42, - 0x07, 0xd0, 0xf8, 0x08, 0x89, 0x74, 0xa5, 0xd0, 0x75, 0x67, 0xc0, 0x9f, 0xe8, 0x75, 0x15, 0xfc, - 0xe4, 0x31, 0x14, 0xcf, 0xa1, 0x3b, 0x80, 0x6e, 0x10, 0x7e, 0x8b, 0x3a, 0xe4, 0x16, 0xb5, 0xee, - 0x4f, 0x83, 0x91, 0x7a, 0x4e, 0xe5, 0xa1, 0xc5, 0x7d, 0xea, 0x76, 0x13, 0x65, 0xb3, 0x72, 0x83, - 0x88, 0x0e, 0x4f, 0x19, 0xa7, 0x16, 0x7b, 0x8e, 0xaa, 0x3d, 0x61, 0x73, 0x7a, 0x67, 0xbd, 0x34, - 0xcd, 0xf6, 0x24, 0x03, 0x7f, 0x79, 0x2a, 0xde, 0x08, 0x70, 0x4f, 0x00, 0xa6, 0x9e, 0xad, 0x23, - 0x3d, 0x99, 0xf2, 0xe9, 0xc3, 0x73, 0xc8, 0x82, 0x72, 0x10, 0xa7, 0xd8, 0xb3, 0x77, 0x3e, 0xab, - 0xbf, 0x4a, 0x27, 0x2e, 0x64, 0xf2, 0x8d, 0x1c, 0xd8, 0xfa, 0xa3, 0x00, 0x45, 0x51, 0x00, 0xa2, - 0xf7, 0xbd, 0xb0, 0x9c, 0xdf, 0x82, 0x8a, 0xcc, 0xf9, 0xf3, 0x4c, 0x77, 0xfb, 0xa9, 0xa3, 0xaf, - 0x4c, 0xef, 0x84, 0x51, 0x07, 0xcf, 0xa1, 0xbb, 0x50, 0x49, 0xad, 0x41, 0xea, 0x26, 0x34, 0x61, - 0x57, 0xca, 0xba, 0xc6, 0x16, 0xa0, 0xf1, 0xfd, 0x05, 0xd5, 0xd5, 0x9d, 0x62, 0xd2, 0x9e, 0x93, - 0x05, 0xf2, 0x05, 0x54, 0x52, 0x7b, 0x84, 0xfa, 0x42, 0xa8, 0x97, 0x8d, 0x2c, 0xed, 0xb7, 0xa1, - 0x14, 0x5b, 0x1c, 0x98, 0xba, 0x44, 0xc7, 0x57, 0x8b, 0x2c, 0xb5, 0x9f, 0x43, 0x25, 0x39, 0x64, - 0x4f, 0x78, 0x2f, 0x95, 0x93, 0x78, 0x76, 0xd8, 0x5f, 0xfc, 0xc5, 0xba, 0x76, 0x75, 0xef, 0xc3, - 0x2e, 0xe5, 0xbd, 0x61, 0x27, 0x80, 0xdf, 0x3c, 0xa0, 0xfd, 0x3e, 0x3d, 0xe0, 0xc4, 0xea, 0x6d, - 0x86, 0x1a, 0xde, 0xb2, 0x29, 0xe3, 0x3e, 0xed, 0x0c, 0x39, 0xb1, 0x37, 0x47, 0x4d, 0x60, 0x53, - 0xa8, 0xdd, 0x14, 0x6a, 0x07, 0x9d, 0x4e, 0x41, 0x1c, 0xaf, 0xfc, 0x1b, 0x00, 0x00, 0xff, 0xff, - 0xe0, 0xb4, 0xcb, 0x07, 0x24, 0x18, 0x00, 0x00, + // 1375 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xcc, 0x57, 0xdd, 0x6e, 0x1b, 0xc5, + 0x17, 0xcf, 0xc6, 0x8e, 0x13, 0x9f, 0xb8, 0xb6, 0x3b, 0xf9, 0xf2, 0x7f, 0xff, 0xa5, 0x94, 0x81, + 0xb6, 0x69, 0x0b, 0x0e, 0x4a, 0x05, 0xe2, 0x06, 0x50, 0x1b, 0x57, 0x95, 0x11, 0x0d, 0x61, 0xdd, + 0xaa, 0x22, 0x50, 0x99, 0xf5, 0xee, 0x24, 0x9e, 0xd6, 0xbb, 0xeb, 0xee, 0x8c, 0xfb, 0x91, 0x1b, + 0x40, 0xe2, 0x12, 0xc1, 0x33, 0x20, 0x24, 0x90, 0x78, 0x0a, 0x9e, 0x82, 0x1b, 0x6e, 0x40, 0x5c, + 0xf0, 0x1a, 0x68, 0x67, 0xc7, 0x9b, 0xdd, 0xf5, 0x38, 0xeb, 0xc6, 0x6d, 0xc3, 0xdd, 0xce, 0xd9, + 0x73, 0xce, 0xef, 0x7c, 0xcd, 0x99, 0x73, 0x60, 0xe9, 0xe1, 0x80, 0xf8, 0x4f, 0xdb, 0x8c, 0xf8, + 0x8f, 0xa8, 0x45, 0xea, 0x7d, 0xdf, 0xe3, 0x1e, 0x42, 0x0e, 0xed, 0x3d, 0x1a, 0xb0, 0xf0, 0x54, + 0x17, 0x1c, 0x7a, 0xc9, 0xf2, 0x1c, 0xc7, 0x73, 0x43, 0x9a, 0x5e, 0x8a, 0x73, 0xe8, 0x65, 0xea, + 0x72, 0xe2, 0xbb, 0x66, 0x4f, 0x9e, 0x91, 0x6d, 0x72, 0x33, 0xa9, 0x53, 0x2f, 0x31, 0xab, 0x4b, + 0x1c, 0x33, 0x3c, 0xe1, 0xaf, 0x60, 0xc9, 0x20, 0xfb, 0x94, 0x71, 0xe2, 0x6f, 0x7b, 0x36, 0x31, + 0xc8, 0xc3, 0x01, 0x61, 0x1c, 0xbd, 0x0d, 0xf9, 0x8e, 0xc9, 0x48, 0x4d, 0x3b, 0xa7, 0xad, 0x2f, + 0x6e, 0x9e, 0xa9, 0x27, 0xec, 0x90, 0x06, 0xdc, 0x62, 0xfb, 0xd7, 0x4d, 0x46, 0x0c, 0xc1, 0x89, + 0xde, 0x85, 0x79, 0xd3, 0xb6, 0x7d, 0xc2, 0x58, 0x6d, 0xf6, 0x08, 0xa1, 0x6b, 0x21, 0x8f, 0x31, + 0x64, 0xc6, 0x3f, 0x68, 0xb0, 0x9c, 0xb4, 0x80, 0xf5, 0x3d, 0x97, 0x11, 0x74, 0x15, 0x0a, 0x8c, + 0x9b, 0x7c, 0xc0, 0xa4, 0x11, 0xff, 0x57, 0xea, 0x6b, 0x09, 0x16, 0x43, 0xb2, 0xa2, 0xeb, 0xb0, + 0x48, 0x5d, 0xca, 0xdb, 0x7d, 0xd3, 0x37, 0x9d, 0xa1, 0x25, 0xaf, 0x25, 0x25, 0xa3, 0x18, 0x35, + 0x5d, 0xca, 0x77, 0x04, 0xa3, 0x01, 0x34, 0xfa, 0xc6, 0xf7, 0x60, 0xa5, 0xd5, 0xf5, 0x1e, 0x6f, + 0x79, 0xbd, 0x1e, 0xb1, 0x38, 0xf5, 0xdc, 0xe3, 0x07, 0x05, 0x41, 0xde, 0xee, 0x34, 0x1b, 0xc2, + 0x8e, 0x9c, 0x21, 0xbe, 0x31, 0x83, 0xd5, 0xb4, 0xfa, 0x69, 0x3c, 0x7e, 0x03, 0x4e, 0x59, 0x91, + 0xaa, 0x66, 0x23, 0xf0, 0x39, 0xb7, 0x9e, 0x33, 0x92, 0x44, 0xfc, 0x9b, 0x06, 0x2b, 0x1f, 0x7b, + 0xa6, 0xfd, 0x82, 0x9c, 0x42, 0x18, 0x4a, 0x71, 0xc0, 0x5a, 0x4e, 0xfc, 0x4b, 0xd0, 0xd0, 0xfb, + 0x50, 0x08, 0x4b, 0xaf, 0x96, 0x17, 0x58, 0xe7, 0x93, 0x58, 0xb2, 0x2c, 0x0f, 0x2d, 0x6c, 0x09, + 0x82, 0x21, 0x85, 0xf0, 0xb7, 0x1a, 0xd4, 0x0c, 0xd2, 0x23, 0x26, 0x23, 0x27, 0xe8, 0x05, 0xfe, + 0x5a, 0x83, 0xe5, 0x20, 0x7f, 0x3b, 0xa6, 0xcf, 0xe9, 0xc9, 0x98, 0xd0, 0x0f, 0x0b, 0x34, 0x66, + 0xc1, 0x34, 0x05, 0x84, 0xa1, 0xd4, 0x1f, 0x6a, 0x3a, 0xac, 0x9f, 0x04, 0x0d, 0x3b, 0x50, 0x89, + 0xd0, 0x02, 0x71, 0xc2, 0xd0, 0x39, 0x58, 0x8c, 0xb1, 0x08, 0xc0, 0x9c, 0x11, 0x27, 0xa1, 0xf7, + 0x60, 0x2e, 0x80, 0x20, 0xc2, 0xbf, 0xf2, 0x26, 0xae, 0x8f, 0x36, 0xb3, 0x7a, 0x52, 0xab, 0x11, + 0x0a, 0xe0, 0x9f, 0x35, 0x58, 0x4d, 0xe1, 0xbd, 0xfc, 0x72, 0x4d, 0xc7, 0x25, 0xaf, 0x88, 0xcb, + 0xaf, 0x1a, 0xac, 0x8d, 0x18, 0x3a, 0x4d, 0x32, 0x76, 0x61, 0x35, 0x02, 0x68, 0xdb, 0x84, 0x59, + 0x3e, 0xed, 0x07, 0xdf, 0x61, 0x5a, 0x16, 0x37, 0x5f, 0xcf, 0x0e, 0x22, 0x33, 0x56, 0x22, 0x15, + 0x8d, 0x98, 0x06, 0xfc, 0xb7, 0x06, 0xcb, 0x41, 0x0f, 0x38, 0xb9, 0xca, 0x9d, 0x24, 0xa6, 0xb1, + 0x36, 0x31, 0x77, 0x9c, 0x36, 0xf1, 0x8b, 0x06, 0x6b, 0xb2, 0x4d, 0xfc, 0xb7, 0x1d, 0xc5, 0x3f, + 0x6a, 0xa0, 0x6f, 0xf9, 0xc4, 0xe4, 0xe4, 0xd3, 0x20, 0x8d, 0x5b, 0x5d, 0xd3, 0x75, 0x49, 0x6f, + 0xba, 0xfa, 0xb9, 0x08, 0x15, 0x3f, 0x74, 0xb6, 0x6d, 0x85, 0xfa, 0x84, 0xe9, 0x45, 0xa3, 0x2c, + 0xc9, 0x12, 0x05, 0x9d, 0x87, 0xb2, 0x4f, 0xd8, 0xa0, 0x77, 0xc8, 0x97, 0x13, 0x7c, 0xa7, 0x42, + 0xaa, 0x64, 0xc3, 0x3f, 0x69, 0xb0, 0x76, 0xcd, 0xb6, 0xe3, 0x06, 0x4e, 0x71, 0x15, 0xaf, 0xc0, + 0xe9, 0x94, 0x75, 0x32, 0xb4, 0x45, 0xa3, 0x9a, 0xb4, 0xaf, 0xd9, 0x40, 0x97, 0xa0, 0x9a, 0xb4, + 0x50, 0x86, 0xba, 0x68, 0x54, 0x12, 0x36, 0x36, 0x1b, 0xf8, 0x0f, 0x0d, 0x74, 0x83, 0x38, 0xde, + 0x23, 0xa2, 0x34, 0xf4, 0x58, 0x91, 0x1c, 0x7a, 0x37, 0x3b, 0x9d, 0x77, 0xb9, 0x67, 0xf0, 0x2e, + 0xaf, 0xf6, 0xee, 0x3e, 0xac, 0xde, 0x35, 0xb9, 0xd5, 0x6d, 0x38, 0xd3, 0x67, 0xe0, 0x2c, 0x40, + 0x84, 0x17, 0xf6, 0x94, 0xa2, 0x11, 0xa3, 0xe0, 0x7f, 0x66, 0x01, 0x05, 0x3d, 0xa2, 0x45, 0xf6, + 0x1d, 0xe2, 0xf2, 0x97, 0x7f, 0x71, 0x52, 0xcf, 0x4a, 0x7e, 0xf4, 0x59, 0x39, 0x0b, 0xc0, 0x42, + 0xeb, 0x02, 0x17, 0xe6, 0xc4, 0xc5, 0x8a, 0x51, 0x90, 0x0e, 0x0b, 0x7b, 0x94, 0xf4, 0xec, 0xe0, + 0x6f, 0x41, 0xfc, 0x8d, 0xce, 0xe8, 0x23, 0x28, 0x4b, 0xce, 0xb6, 0x78, 0x69, 0x58, 0x6d, 0x5e, + 0xd5, 0x56, 0x83, 0xa9, 0xb9, 0x2e, 0x43, 0x20, 0x7a, 0x6a, 0xd3, 0xdd, 0xf3, 0x8c, 0x53, 0x2c, + 0x46, 0x89, 0xf7, 0xa9, 0x85, 0xe3, 0xf4, 0xa9, 0xbf, 0x34, 0x58, 0x91, 0x7d, 0xea, 0xc4, 0x82, + 0x3d, 0x49, 0x3b, 0x9e, 0x22, 0xdc, 0xf8, 0x7b, 0x0d, 0xd6, 0xb6, 0x3c, 0xa7, 0xef, 0xb9, 0x51, + 0xd8, 0xa6, 0x6b, 0x6f, 0x1f, 0x84, 0x42, 0x64, 0x38, 0xd9, 0x5f, 0x18, 0x33, 0xd9, 0xa7, 0x41, + 0xa5, 0x14, 0xfe, 0x53, 0x83, 0x45, 0x19, 0xed, 0x20, 0xa5, 0xe8, 0x0c, 0x14, 0x23, 0x57, 0xe4, + 0x08, 0x73, 0x48, 0x18, 0x09, 0xe1, 0x6c, 0x76, 0xbd, 0xe6, 0x46, 0xeb, 0xf5, 0x7f, 0xb0, 0xe0, + 0x10, 0xa7, 0xcd, 0xe8, 0x01, 0x91, 0xe5, 0x3c, 0xef, 0x10, 0xa7, 0x45, 0x0f, 0x48, 0xf0, 0xcb, + 0x1d, 0x38, 0x6d, 0xdf, 0x7b, 0xcc, 0xc4, 0x63, 0x97, 0x33, 0xe6, 0xdd, 0x81, 0x63, 0x78, 0x8f, + 0x19, 0x7a, 0x05, 0x80, 0xba, 0x36, 0x79, 0xd2, 0x76, 0x4d, 0x87, 0xd4, 0x0a, 0xa2, 0x33, 0x14, + 0x05, 0x65, 0xdb, 0x74, 0x08, 0xaa, 0xc1, 0xbc, 0x38, 0x34, 0x1b, 0xb5, 0xf9, 0x50, 0x50, 0x1e, + 0xf1, 0x1e, 0xa0, 0x98, 0x87, 0x53, 0x75, 0x8a, 0x58, 0xde, 0x67, 0xd3, 0x79, 0xc7, 0xdf, 0x68, + 0xb0, 0x94, 0x00, 0x9a, 0x26, 0xaf, 0xef, 0xc0, 0x1c, 0x75, 0xf7, 0xbc, 0xe1, 0x94, 0xf3, 0xaa, + 0x6a, 0xca, 0x89, 0x83, 0x85, 0xdc, 0x97, 0x0f, 0xa0, 0x9c, 0x9c, 0x7d, 0x50, 0x09, 0x16, 0xb6, + 0x3d, 0x7e, 0xe3, 0x09, 0x65, 0xbc, 0x3a, 0x83, 0xca, 0x00, 0xdb, 0x1e, 0xdf, 0xf1, 0x09, 0x23, + 0x2e, 0xaf, 0x6a, 0x08, 0xa0, 0xf0, 0x89, 0xdb, 0xa0, 0xec, 0x41, 0x75, 0x16, 0x2d, 0xc9, 0x91, + 0xd6, 0xec, 0x35, 0xdd, 0x5b, 0xc4, 0xf1, 0xfc, 0xa7, 0xd5, 0x5c, 0x20, 0x1e, 0x9d, 0xf2, 0xa8, + 0x0a, 0xa5, 0x88, 0xe5, 0xe6, 0xce, 0x9d, 0xea, 0x1c, 0x2a, 0xc2, 0x5c, 0xf8, 0x59, 0xd8, 0xfc, + 0x0e, 0xa0, 0x24, 0x5e, 0x9b, 0x56, 0xb8, 0x5d, 0x23, 0x0b, 0x4a, 0xf1, 0x3d, 0x16, 0x5d, 0x54, + 0x39, 0xa1, 0xd8, 0xb5, 0xf5, 0xf5, 0x6c, 0xc6, 0x30, 0xb6, 0x78, 0x06, 0xdd, 0x87, 0x4a, 0x72, + 0x79, 0x64, 0xe8, 0x92, 0x32, 0x58, 0xaa, 0x05, 0x56, 0xbf, 0x3c, 0x09, 0x6b, 0x84, 0xb5, 0x0f, + 0xe5, 0xc4, 0x9a, 0xc1, 0xd0, 0xfa, 0x38, 0xf9, 0xf4, 0xa4, 0xa5, 0x5f, 0x9a, 0x80, 0x33, 0x02, + 0xfa, 0x0c, 0xca, 0x89, 0xb9, 0x74, 0x0c, 0x90, 0x6a, 0x76, 0xd5, 0x8f, 0x2a, 0x2f, 0x3c, 0x83, + 0xda, 0x70, 0x3a, 0x3d, 0x0c, 0x32, 0x74, 0x45, 0x1d, 0x70, 0xe5, 0xcc, 0x98, 0x05, 0xb0, 0x1b, + 0xda, 0x7e, 0x18, 0x40, 0x75, 0x3e, 0x94, 0xbb, 0x77, 0x96, 0xee, 0x2f, 0x23, 0xe3, 0x63, 0xea, + 0xdf, 0x3c, 0xc2, 0xf8, 0x67, 0x46, 0xe8, 0x00, 0x1a, 0x9d, 0x40, 0x91, 0xae, 0x14, 0xba, 0xe1, + 0xf4, 0xf9, 0x53, 0xbd, 0xae, 0x82, 0x1f, 0x3f, 0xc5, 0xe2, 0x19, 0x74, 0x17, 0xd0, 0x4d, 0xc2, + 0x6f, 0x53, 0x87, 0xdc, 0xa6, 0xd6, 0x83, 0x49, 0x30, 0x52, 0xaf, 0xb1, 0x3c, 0xb4, 0xb8, 0x4f, + 0xdd, 0xfd, 0x44, 0xd9, 0x2c, 0xdf, 0x24, 0xa2, 0xc3, 0x53, 0xc6, 0xa9, 0xc5, 0x9e, 0xa3, 0x6a, + 0x4f, 0xd8, 0x9c, 0x5e, 0x79, 0x2f, 0x4f, 0xb2, 0x7c, 0xc9, 0xc0, 0x5f, 0x99, 0x88, 0x37, 0x02, + 0xdc, 0x15, 0x80, 0xa9, 0x67, 0xeb, 0x48, 0x4f, 0x26, 0x7c, 0xfa, 0xf0, 0x0c, 0xb2, 0xa0, 0x1c, + 0xc4, 0x29, 0xf6, 0xec, 0x5d, 0xc8, 0xea, 0xaf, 0xd2, 0x89, 0x8b, 0x99, 0x7c, 0x43, 0x07, 0x36, + 0x7f, 0x2f, 0x40, 0x51, 0x14, 0x80, 0xe8, 0x7d, 0x2f, 0x2c, 0xe7, 0xb7, 0xa1, 0x22, 0x73, 0xfe, + 0x3c, 0xd3, 0xdd, 0x7e, 0xe6, 0xe8, 0x2b, 0xd3, 0x3b, 0x66, 0xd4, 0xc1, 0x33, 0xe8, 0x1e, 0x54, + 0x52, 0x5b, 0x94, 0xba, 0x09, 0x8d, 0x59, 0xb5, 0xb2, 0xae, 0xb1, 0x05, 0x68, 0x74, 0xfd, 0x41, + 0x75, 0x75, 0xa7, 0x18, 0xb7, 0x26, 0x65, 0x81, 0x7c, 0x01, 0x95, 0xd4, 0x1a, 0xa2, 0xbe, 0x10, + 0xea, 0x5d, 0x25, 0x4b, 0xfb, 0x1d, 0x28, 0xc5, 0xf6, 0x0e, 0xa6, 0x2e, 0xd1, 0xd1, 0xcd, 0x24, + 0x4b, 0xed, 0xe7, 0x50, 0x49, 0x0e, 0xd9, 0x63, 0xde, 0x4b, 0xe5, 0x24, 0x9e, 0x1d, 0xf6, 0x17, + 0x7f, 0xb1, 0xae, 0x5f, 0xdb, 0xfd, 0x70, 0x9f, 0xf2, 0xee, 0xa0, 0x13, 0xc0, 0x6f, 0x1c, 0xd0, + 0x5e, 0x8f, 0x1e, 0x70, 0x62, 0x75, 0x37, 0x42, 0x0d, 0x6f, 0xd9, 0x94, 0x71, 0x9f, 0x76, 0x06, + 0x9c, 0xd8, 0x1b, 0xc3, 0x26, 0xb0, 0x21, 0xd4, 0x6e, 0x08, 0xb5, 0xfd, 0x4e, 0xa7, 0x20, 0x8e, + 0x57, 0xff, 0x0d, 0x00, 0x00, 0xff, 0xff, 0xb8, 0x72, 0x8c, 0x04, 0x63, 0x18, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. diff --git a/internal/querynode/flow_graph_dd_node.go b/internal/querynode/flow_graph_dd_node.go index 3b92e206c..d8a519a4d 100644 --- a/internal/querynode/flow_graph_dd_node.go +++ b/internal/querynode/flow_graph_dd_node.go @@ -2,9 +2,6 @@ package querynode import ( "log" - "sort" - - "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/golang/protobuf/proto" @@ -52,27 +49,27 @@ func (ddNode *ddNode) Operate(in []*Msg) []*Msg { ddNode.ddMsg.gcRecord = &gcRecord // sort tsMessages - tsMessages := msMsg.TsMessages() - sort.Slice(tsMessages, - func(i, j int) bool { - return tsMessages[i].BeginTs() < tsMessages[j].BeginTs() - }) + //tsMessages := msMsg.TsMessages() + //sort.Slice(tsMessages, + // func(i, j int) bool { + // return tsMessages[i].BeginTs() < tsMessages[j].BeginTs() + // }) // do dd tasks - for _, msg := range tsMessages { - switch msg.Type() { - case commonpb.MsgType_kCreateCollection: - ddNode.createCollection(msg.(*msgstream.CreateCollectionMsg)) - case commonpb.MsgType_kDropCollection: - ddNode.dropCollection(msg.(*msgstream.DropCollectionMsg)) - case commonpb.MsgType_kCreatePartition: - ddNode.createPartition(msg.(*msgstream.CreatePartitionMsg)) - case commonpb.MsgType_kDropPartition: - ddNode.dropPartition(msg.(*msgstream.DropPartitionMsg)) - default: - log.Println("Non supporting message type:", msg.Type()) - } - } + //for _, msg := range tsMessages { + // switch msg.Type() { + // case commonpb.MsgType_kCreateCollection: + // ddNode.createCollection(msg.(*msgstream.CreateCollectionMsg)) + // case commonpb.MsgType_kDropCollection: + // ddNode.dropCollection(msg.(*msgstream.DropCollectionMsg)) + // case commonpb.MsgType_kCreatePartition: + // ddNode.createPartition(msg.(*msgstream.CreatePartitionMsg)) + // case commonpb.MsgType_kDropPartition: + // ddNode.dropPartition(msg.(*msgstream.DropPartitionMsg)) + // default: + // log.Println("Non supporting message type:", msg.Type()) + // } + //} var res Msg = ddNode.ddMsg return []*Msg{&res} diff --git a/internal/querynode/flow_graph_filter_dm_node.go b/internal/querynode/flow_graph_filter_dm_node.go index 171875b5d..f50a4dab3 100644 --- a/internal/querynode/flow_graph_filter_dm_node.go +++ b/internal/querynode/flow_graph_filter_dm_node.go @@ -69,9 +69,11 @@ func (fdmNode *filterDmNode) Operate(in []*Msg) []*Msg { func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg) *msgstream.InsertMsg { // TODO: open this check // check if partition dm enable - //if enable, _ := fdmNode.replica.getEnablePartitionDM(msg.CollectionID, msg.PartitionID); !enable { - // return nil - //} + enableCollection := fdmNode.replica.hasCollection(msg.CollectionID) + enablePartition := fdmNode.replica.hasPartition(msg.PartitionID) + if !enableCollection || !enablePartition { + return nil + } // No dd record, do all insert requests. records, ok := fdmNode.ddMsg.collectionRecords[msg.CollectionID] diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index dedf615a7..af2a9bed8 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -353,7 +353,7 @@ func (node *QueryNode) WatchDmChannels(in *queryPb.WatchDmChannelsRequest) (*com return status, errors.New(errMsg) } - fgDMMsgStream, ok := node.dataSyncService.dmStream.(*pulsarms.PulsarMsgStream) + fgDMMsgStream, ok := node.dataSyncService.dmStream.(*pulsarms.PulsarTtMsgStream) if !ok { errMsg := "type assertion failed for dm message stream" status := &commonpb.Status{ @@ -381,7 +381,30 @@ func (node *QueryNode) LoadSegments(in *queryPb.LoadSegmentRequest) (*commonpb.S partitionID := in.PartitionID segmentIDs := in.SegmentIDs fieldIDs := in.FieldIDs + schema := in.Schema + hasCollection := node.replica.hasCollection(collectionID) + hasPartition := node.replica.hasPartition(partitionID) + if !hasCollection { + err := node.replica.addCollection(collectionID, schema) + if err != nil { + status := &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: err.Error(), + } + return status, err + } + } + if !hasPartition { + err := node.replica.addPartition(collectionID, partitionID) + if err != nil { + status := &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: err.Error(), + } + return status, err + } + } err := node.replica.enablePartition(partitionID) if err != nil { status := &commonpb.Status{ @@ -395,7 +418,7 @@ func (node *QueryNode) LoadSegments(in *queryPb.LoadSegmentRequest) (*commonpb.S for i, state := range in.SegmentStates { if state.State == commonpb.SegmentState_SegmentGrowing { position := state.StartPosition - err = node.loadService.segLoader.seekSegment(position) + err := node.loadService.segLoader.seekSegment(position) if err != nil { status := &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, diff --git a/internal/queryservice/meta_replica.go b/internal/queryservice/meta_replica.go index c7c97085a..62812f909 100644 --- a/internal/queryservice/meta_replica.go +++ b/internal/queryservice/meta_replica.go @@ -3,19 +3,23 @@ package queryservice import ( "github.com/zilliztech/milvus-distributed/internal/errors" "github.com/zilliztech/milvus-distributed/internal/proto/querypb" + "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" ) type metaReplica interface { getCollections(dbID UniqueID) ([]*collection, error) getPartitions(dbID UniqueID, collectionID UniqueID) ([]*partition, error) getSegments(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) ([]*segment, error) - loadCollection(dbID UniqueID, collectionID UniqueID) (*collection, error) - loadPartition(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) (*partition, error) + getCollectionByID(dbID UniqueID, collectionID UniqueID) (*collection, error) + getPartitionByID(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) (*partition, error) + addCollection(dbID UniqueID, collectionID UniqueID, schema *schemapb.CollectionSchema) error + addPartition(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) error updatePartitionState(dbID UniqueID, collectionID UniqueID, partitionID UniqueID, state querypb.PartitionState) error getPartitionStates(dbID UniqueID, collectionID UniqueID, partitionIDs []UniqueID) ([]*querypb.PartitionStates, error) releaseCollection(dbID UniqueID, collectionID UniqueID) error releasePartition(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) error - addDmChannels(collectionID UniqueID, channels []string) error + addDmChannels(dbID UniqueID, collectionID UniqueID, channels2NodeID map[string]UniqueID) error + getAssignedNodeIDByChannelName(dbID UniqueID, collectionID UniqueID, channel string) (UniqueID, error) } type segment struct { @@ -29,9 +33,10 @@ type partition struct { } type collection struct { - id UniqueID - partitions map[UniqueID]*partition - dmChannelNames []string + id UniqueID + partitions map[UniqueID]*partition + dmChannels2Node map[string]UniqueID + schema *schemapb.CollectionSchema } type metaReplicaImpl struct { @@ -50,22 +55,25 @@ func newMetaReplica() metaReplica { } } -func (mp *metaReplicaImpl) addCollection(dbID UniqueID, collectionID UniqueID) (*collection, error) { +func (mp *metaReplicaImpl) addCollection(dbID UniqueID, collectionID UniqueID, schema *schemapb.CollectionSchema) error { //TODO:: assert dbID = 0 exist if _, ok := mp.db2collections[dbID]; ok { partitions := make(map[UniqueID]*partition) + channels := make(map[string]UniqueID) newCollection := &collection{ - id: collectionID, - partitions: partitions, + id: collectionID, + partitions: partitions, + schema: schema, + dmChannels2Node: channels, } mp.db2collections[dbID] = append(mp.db2collections[dbID], newCollection) - return newCollection, nil + return nil } - return nil, errors.New("addCollection: can't find dbID when add collection") + return errors.New("addCollection: can't find dbID when add collection") } -func (mp *metaReplicaImpl) addPartition(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) (*partition, error) { +func (mp *metaReplicaImpl) addPartition(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) error { if collections, ok := mp.db2collections[dbID]; ok { for _, collection := range collections { if collection.id == collectionID { @@ -77,11 +85,11 @@ func (mp *metaReplicaImpl) addPartition(dbID UniqueID, collectionID UniqueID, pa segments: segments, } partitions[partitionID] = partition - return partition, nil + return nil } } } - return nil, errors.New("addPartition: can't find collection when add partition") + return errors.New("addPartition: can't find collection when add partition") } func (mp *metaReplicaImpl) getCollections(dbID UniqueID) ([]*collection, error) { @@ -125,49 +133,31 @@ func (mp *metaReplicaImpl) getSegments(dbID UniqueID, collectionID UniqueID, par return nil, errors.New("getSegments: can't find segmentID") } -func (mp *metaReplicaImpl) loadCollection(dbID UniqueID, collectionID UniqueID) (*collection, error) { - var res *collection = nil - if collections, err := mp.getCollections(dbID); err == nil { +func (mp *metaReplicaImpl) getCollectionByID(dbID UniqueID, collectionID UniqueID) (*collection, error) { + if collections, ok := mp.db2collections[dbID]; ok { for _, collection := range collections { if collectionID == collection.id { - res = collection + return collection, nil } } } - if res == nil { - collection, err := mp.addCollection(dbID, collectionID) - if err != nil { - return nil, err - } - res = collection - } - return res, nil + + return nil, errors.New("getCollectionByID: can't find collectionID") } -func (mp *metaReplicaImpl) loadPartition(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) (*partition, error) { - var collection *collection = nil - var partition *partition = nil - var err error - for _, col := range mp.db2collections[dbID] { - if col.id == collectionID { - collection = col - } - } - if collection == nil { - collection, err = mp.addCollection(dbID, collectionID) - if err != nil { - return partition, err - } - } - if _, ok := collection.partitions[partitionID]; !ok { - partition, err = mp.addPartition(dbID, collectionID, partitionID) - if err != nil { - return partition, err +func (mp *metaReplicaImpl) getPartitionByID(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) (*partition, error) { + if collections, ok := mp.db2collections[dbID]; ok { + for _, collection := range collections { + if collectionID == collection.id { + partitions := collection.partitions + if partition, ok := partitions[partitionID]; ok { + return partition, nil + } + } } - return partition, nil } - return nil, nil + return nil, errors.New("getPartitionByID: can't find partitionID") } func (mp *metaReplicaImpl) updatePartitionState(dbID UniqueID, @@ -235,23 +225,12 @@ func (mp *metaReplicaImpl) releasePartition(dbID UniqueID, collectionID UniqueID return errors.New("releasePartition: can't find dbID or collectionID or partitionID") } -func (mp *metaReplicaImpl) addDmChannels(collectionID UniqueID, channels []string) error { - //TODO :: use dbID - if collections, ok := mp.db2collections[0]; ok { +func (mp *metaReplicaImpl) addDmChannels(dbID UniqueID, collectionID UniqueID, channels2NodeID map[string]UniqueID) error { + if collections, ok := mp.db2collections[dbID]; ok { for _, collection := range collections { if collectionID == collection.id { - dmChannels := collection.dmChannelNames - for _, channel := range channels { - match := false - for _, existedChannel := range dmChannels { - if channel == existedChannel { - match = true - break - } - } - if !match { - dmChannels = append(dmChannels, channel) - } + for channel, id := range channels2NodeID { + collection.dmChannels2Node[channel] = id } return nil } @@ -259,3 +238,17 @@ func (mp *metaReplicaImpl) addDmChannels(collectionID UniqueID, channels []strin } return errors.New("addDmChannels: can't find dbID or collectionID") } + +func (mp *metaReplicaImpl) getAssignedNodeIDByChannelName(dbID UniqueID, collectionID UniqueID, channel string) (UniqueID, error) { + if collections, ok := mp.db2collections[dbID]; ok { + for _, collection := range collections { + if collectionID == collection.id { + if id, ok := collection.dmChannels2Node[channel]; ok { + return id, nil + } + } + } + } + + return 0, errors.New("getAssignedNodeIDByChannelName: can't find dbID or collectionID") +} diff --git a/internal/queryservice/queryservice.go b/internal/queryservice/queryservice.go index 5dccca99d..767a9252e 100644 --- a/internal/queryservice/queryservice.go +++ b/internal/queryservice/queryservice.go @@ -182,6 +182,7 @@ func (qs *QueryService) ShowCollections(req *querypb.ShowCollectionRequest) (*qu func (qs *QueryService) LoadCollection(req *querypb.LoadCollectionRequest) (*commonpb.Status, error) { dbID := req.DbID collectionID := req.CollectionID + schema := req.Schema fn := func(err error) *commonpb.Status { if err != nil { return &commonpb.Status{ @@ -193,30 +194,23 @@ func (qs *QueryService) LoadCollection(req *querypb.LoadCollectionRequest) (*com ErrorCode: commonpb.ErrorCode_SUCCESS, } } - collection, err := qs.replica.loadCollection(dbID, collectionID) - if err != nil { - return fn(err), err - } - if len(collection.dmChannelNames) != 0 { + fmt.Println("load collection start, collectionID = ", collectionID) + _, err := qs.replica.getCollectionByID(dbID, collectionID) + if err == nil { + fmt.Println("load collection end, collection already exist, collectionID = ", collectionID) return fn(nil), nil } - channelRequest := datapb.InsertChannelRequest{ - DbID: req.DbID, - CollectionID: req.CollectionID, - } - resp, err := qs.dataServiceClient.GetInsertChannels(&channelRequest) - if resp == nil { - err = errors.New("get insert channels resp is nil") - } - if resp.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { - err = errors.New(resp.Status.Reason) + err = qs.replica.addCollection(dbID, collectionID, schema) + if err != nil { + return fn(err), err } + + err = qs.watchDmChannels(dbID, collectionID) if err != nil { return fn(err), err } - dmChannels := resp.Values // get partitionIDs showPartitionRequest := &milvuspb.ShowPartitionRequest{ @@ -235,30 +229,17 @@ func (qs *QueryService) LoadCollection(req *querypb.LoadCollectionRequest) (*com } partitionIDs := showPartitionResponse.PartitionIDs - if len(partitionIDs) == 0 { - loadSegmentRequest := &querypb.LoadSegmentRequest{ - CollectionID: collectionID, - } - for _, node := range qs.queryNodes { - _, err := node.LoadSegments(loadSegmentRequest) - if err != nil { - return fn(err), err - } - } - nodeIDs := qs.shuffleChannelsToQueryNode(dmChannels) - err = qs.watchDmChannels(dmChannels, nodeIDs, collection) - return fn(err), err - } - loadPartitionsRequest := &querypb.LoadPartitionRequest{ Base: req.Base, DbID: dbID, CollectionID: collectionID, PartitionIDs: partitionIDs, + Schema: schema, } status, err := qs.LoadPartitions(loadPartitionsRequest) + fmt.Println("load collection end, collectionID = ", collectionID) return status, err } @@ -327,6 +308,7 @@ func (qs *QueryService) LoadPartitions(req *querypb.LoadPartitionRequest) (*comm dbID := req.DbID collectionID := req.CollectionID partitionIDs := req.PartitionIDs + schema := req.Schema fn := func(err error) *commonpb.Status { if err != nil { @@ -339,28 +321,35 @@ func (qs *QueryService) LoadPartitions(req *querypb.LoadPartitionRequest) (*comm ErrorCode: commonpb.ErrorCode_SUCCESS, } } + fmt.Println("load partitions start, partitionIDs = ", partitionIDs) if len(partitionIDs) == 0 { err := errors.New("partitionIDs are empty") return fn(err), err } - var collection *collection = nil - var err error - if collection, err = qs.replica.loadCollection(dbID, collectionID); err != nil { - return fn(err), err + _, err := qs.replica.getCollectionByID(dbID, collectionID) + if err != nil { + err = qs.replica.addCollection(dbID, collectionID, schema) + if err != nil { + return fn(err), err + } + err = qs.watchDmChannels(dbID, collectionID) + if err != nil { + return fn(err), err + } } for _, partitionID := range partitionIDs { - partition, err := qs.replica.loadPartition(dbID, collectionID, partitionID) + _, err = qs.replica.getPartitionByID(dbID, collectionID, partitionID) + if err == nil { + continue + } + err = qs.replica.addPartition(dbID, collectionID, partitionID) if err != nil { return fn(err), err } - if partition == nil { - return fn(err), nil - } - showSegmentRequest := &milvuspb.ShowSegmentRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_kShowSegment, @@ -372,25 +361,34 @@ func (qs *QueryService) LoadPartitions(req *querypb.LoadPartitionRequest) (*comm if err != nil { return fn(err), err } - segmentIDs := showSegmentResponse.SegmentIDs + if len(segmentIDs) == 0 { + loadSegmentRequest := &querypb.LoadSegmentRequest{ + CollectionID: collectionID, + PartitionID: partitionID, + Schema: schema, + } + for _, node := range qs.queryNodes { + _, err := node.LoadSegments(loadSegmentRequest) + if err != nil { + return fn(err), nil + } + } + } + qs.replica.updatePartitionState(dbID, collectionID, partitionID, querypb.PartitionState_PartialInMemory) + segmentStates := make(map[UniqueID]*datapb.SegmentStateInfo) channel2segs := make(map[string][]UniqueID) - resp, err := qs.dataServiceClient.GetSegmentStates(&datapb.SegmentStatesRequest{ SegmentIDs: segmentIDs, }) - if err != nil { return fn(err), err } - for _, state := range resp.States { segmentID := state.SegmentID segmentStates[segmentID] = state - channelName := state.StartPosition.ChannelName - if _, ok := channel2segs[channelName]; !ok { segments := make([]UniqueID, 0) segments = append(segments, segmentID) @@ -400,88 +398,39 @@ func (qs *QueryService) LoadPartitions(req *querypb.LoadPartitionRequest) (*comm } } - qs.replica.updatePartitionState(dbID, collectionID, partitionID, querypb.PartitionState_PartialInMemory) for channel, segmentIDs := range channel2segs { sort.Slice(segmentIDs, func(i, j int) bool { return segmentStates[segmentIDs[i]].StartPosition.Timestamp < segmentStates[segmentIDs[j]].StartPosition.Timestamp }) - var channelLoadDone = false - for _, node := range qs.queryNodes { - channels2node := node.dmChannelNames - for _, ch := range channels2node { - if channel == ch { - channelLoadDone = true - break - } - } - if channelLoadDone { - break - } - } - if !channelLoadDone { - states := make([]*datapb.SegmentStateInfo, 0) - for _, id := range segmentIDs { - states = append(states, segmentStates[id]) - } - loadSegmentRequest := &querypb.LoadSegmentRequest{ - CollectionID: collectionID, - PartitionID: partitionID, - SegmentIDs: segmentIDs, - SegmentStates: states, - } - dmChannels := []string{channel} - nodeIDs := qs.shuffleChannelsToQueryNode(dmChannels) - err = qs.watchDmChannels(dmChannels, nodeIDs, collection) - if err != nil { - return fn(err), err - } - queryNode := qs.queryNodes[nodeIDs[0]] - //TODO:: seek when loadSegment may cause more msgs consumed - status, err := queryNode.LoadSegments(loadSegmentRequest) - if err != nil { - return status, err - } - } - } - - qs.replica.updatePartitionState(dbID, collectionID, partitionID, querypb.PartitionState_InMemory) - } - - if len(collection.dmChannelNames) == 0 { - channelRequest := datapb.InsertChannelRequest{ - DbID: dbID, - CollectionID: collectionID, - } - resp, err := qs.dataServiceClient.GetInsertChannels(&channelRequest) - if resp == nil { - err = errors.New("get insert channels resp is nil") - return fn(err), err - } - if resp.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { - err = errors.New(resp.Status.Reason) - return fn(err), err - } - dmChannels := resp.Values - for _, partitionID := range partitionIDs { + states := make([]*datapb.SegmentStateInfo, 0) + for _, id := range segmentIDs { + states = append(states, segmentStates[id]) + } loadSegmentRequest := &querypb.LoadSegmentRequest{ - CollectionID: collectionID, - PartitionID: partitionID, + CollectionID: collectionID, + PartitionID: partitionID, + SegmentIDs: segmentIDs, + SegmentStates: states, + Schema: schema, } - for _, node := range qs.queryNodes { - _, err := node.LoadSegments(loadSegmentRequest) - if err != nil { - return fn(err), nil - } + nodeID, err := qs.replica.getAssignedNodeIDByChannelName(dbID, collectionID, channel) + if err != nil { + return fn(err), err + } + queryNode := qs.queryNodes[nodeID] + //TODO:: seek when loadSegment may cause more msgs consumed + //TODO:: all query node should load partition's msg + status, err := queryNode.LoadSegments(loadSegmentRequest) + if err != nil { + return status, err } - nodeIDs := qs.shuffleChannelsToQueryNode(dmChannels) - err = qs.watchDmChannels(dmChannels, nodeIDs, collection) - } - if err != nil { - return fn(err), err } + + qs.replica.updatePartitionState(dbID, collectionID, partitionID, querypb.PartitionState_InMemory) } + fmt.Println("load partitions end, partitionIDs = ", partitionIDs) return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_SUCCESS, }, nil @@ -622,28 +571,38 @@ func (qs *QueryService) SetEnableGrpc(en bool) { qs.enableGrpc = en } -func (qs *QueryService) watchDmChannels(dmChannels []string, assignedNodeIDs []UniqueID, collection *collection) error { - err := qs.replica.addDmChannels(collection.id, dmChannels) +func (qs *QueryService) watchDmChannels(dbID UniqueID, collectionID UniqueID) error { + collection, err := qs.replica.getCollectionByID(0, collectionID) + if err != nil { + return err + } + channelRequest := datapb.InsertChannelRequest{ + DbID: dbID, + CollectionID: collectionID, + } + resp, err := qs.dataServiceClient.GetInsertChannels(&channelRequest) + if err != nil { + return err + } + if len(resp.Values) == 0 { + err = errors.New("haven't assign dm channel to collection") + return err + } + + dmChannels := resp.Values + channels2NodeID := qs.shuffleChannelsToQueryNode(dmChannels) + err = qs.replica.addDmChannels(dbID, collection.id, channels2NodeID) if err != nil { return err } node2channels := make(map[UniqueID][]string) - for i, channel := range dmChannels { - nodeID := assignedNodeIDs[i] - findChannel := false - for _, ch := range collection.dmChannelNames { - if channel == ch { - findChannel = true - } - } - if !findChannel { - if _, ok := node2channels[nodeID]; ok { - node2channels[nodeID] = append(node2channels[nodeID], channel) - } else { - channels := make([]string, 0) - channels = append(channels, channel) - node2channels[nodeID] = channels - } + for channel, nodeID := range channels2NodeID { + if _, ok := node2channels[nodeID]; ok { + node2channels[nodeID] = append(node2channels[nodeID], channel) + } else { + channels := make([]string, 0) + channels = append(channels, channel) + node2channels[nodeID] = channels } } @@ -653,18 +612,19 @@ func (qs *QueryService) watchDmChannels(dmChannels []string, assignedNodeIDs []U ChannelIDs: channels, } _, err := node.WatchDmChannels(request) - node.AddDmChannels(channels) if err != nil { return err } + fmt.Println("query node ", nodeID, "watch channels = ", channels) + node.AddDmChannels(channels) } return nil } -func (qs *QueryService) shuffleChannelsToQueryNode(dmChannels []string) []UniqueID { +func (qs *QueryService) shuffleChannelsToQueryNode(dmChannels []string) map[string]UniqueID { maxNumDMChannel := 0 - res := make([]UniqueID, 0) + res := make(map[string]UniqueID) node2lens := make(map[UniqueID]int) for id, node := range qs.queryNodes { node2lens[id] = len(node.dmChannelNames) @@ -676,14 +636,14 @@ func (qs *QueryService) shuffleChannelsToQueryNode(dmChannels []string) []Unique if len >= maxNumDMChannel { maxNumDMChannel = len } else { - res = append(res, id) + res[dmChannels[offset]] = id node2lens[id]++ offset++ } } if lastOffset == offset { for id := range node2lens { - res = append(res, id) + res[dmChannels[offset]] = id node2lens[id]++ offset++ break -- GitLab