提交 cdcf0066 编写于 作者: X xige-16 提交者: yefu.chen

Fix loadCollection error when query service assign insert channels and query nodes

Signed-off-by: Nxige-16 <xi.ge@zilliz.com>
上级 39161cac
......@@ -77,6 +77,7 @@ type LoadCollectionRequest struct {
MsgBase
DbID UniqueID
CollectionID UniqueID
schema schemapb.CollectionSchema
}
```
......@@ -144,6 +145,7 @@ type LoadPartitonRequest struct {
DbID UniqueID
CollectionID UniqueID
PartitionIDs []UniqueID
schema schemapb.CollectionSchema
}
```
......
......@@ -128,6 +128,7 @@ message LoadSegmentRequest {
repeated int64 segmentIDs = 5;
repeated int64 fieldIDs = 6;
repeated data.SegmentStateInfo segment_states = 7;
schema.CollectionSchema schema = 8;
}
message ReleaseSegmentRequest {
......
......@@ -996,6 +996,7 @@ type LoadSegmentRequest struct {
SegmentIDs []int64 `protobuf:"varint,5,rep,packed,name=segmentIDs,proto3" json:"segmentIDs,omitempty"`
FieldIDs []int64 `protobuf:"varint,6,rep,packed,name=fieldIDs,proto3" json:"fieldIDs,omitempty"`
SegmentStates []*datapb.SegmentStateInfo `protobuf:"bytes,7,rep,name=segment_states,json=segmentStates,proto3" json:"segment_states,omitempty"`
Schema *schemapb.CollectionSchema `protobuf:"bytes,8,opt,name=schema,proto3" json:"schema,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
......@@ -1075,6 +1076,13 @@ func (m *LoadSegmentRequest) GetSegmentStates() []*datapb.SegmentStateInfo {
return nil
}
func (m *LoadSegmentRequest) GetSchema() *schemapb.CollectionSchema {
if m != nil {
return m.Schema
}
return nil
}
type ReleaseSegmentRequest struct {
Base *commonpb.MsgBase `protobuf:"bytes,1,opt,name=base,proto3" json:"base,omitempty"`
DbID int64 `protobuf:"varint,2,opt,name=dbID,proto3" json:"dbID,omitempty"`
......@@ -1412,93 +1420,93 @@ func init() {
func init() { proto.RegisterFile("query_service.proto", fileDescriptor_5fcb6756dc1afb8d) }
var fileDescriptor_5fcb6756dc1afb8d = []byte{
// 1368 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xcc, 0x57, 0xdd, 0x8e, 0xdb, 0xc4,
0x17, 0x5f, 0x6f, 0xb2, 0xd9, 0xe6, 0x6c, 0x9a, 0xa4, 0xb3, 0x5f, 0xf9, 0xfb, 0x5f, 0x4a, 0x19,
0x68, 0xbb, 0x6d, 0x21, 0x8b, 0xb6, 0x02, 0x71, 0x03, 0xa8, 0xdd, 0x54, 0x55, 0x10, 0x5d, 0x16,
0xa7, 0x55, 0xc5, 0x42, 0x15, 0x1c, 0x7b, 0x36, 0x99, 0x36, 0xb6, 0x53, 0xcf, 0xa4, 0x1f, 0x7b,
0x03, 0x48, 0x5c, 0x22, 0x78, 0x06, 0x84, 0x04, 0x88, 0xa7, 0xe0, 0x29, 0xb8, 0xe1, 0x06, 0xc4,
0x8b, 0x20, 0x8f, 0x27, 0x5e, 0xdb, 0x99, 0xac, 0xd3, 0xa6, 0xed, 0x72, 0xe7, 0x39, 0x3e, 0xe7,
0xfc, 0xce, 0xd7, 0x9c, 0x39, 0x07, 0x96, 0x1f, 0x0c, 0x89, 0xff, 0xa4, 0xcd, 0x88, 0xff, 0x90,
0x5a, 0xa4, 0x3e, 0xf0, 0x3d, 0xee, 0x21, 0xe4, 0xd0, 0xfe, 0xc3, 0x21, 0x0b, 0x4f, 0x75, 0xc1,
0xa1, 0x97, 0x2c, 0xcf, 0x71, 0x3c, 0x37, 0xa4, 0xe9, 0xa5, 0x38, 0x87, 0x5e, 0xa6, 0x2e, 0x27,
0xbe, 0x6b, 0xf6, 0xe5, 0x19, 0xd9, 0x26, 0x37, 0x93, 0x3a, 0xf5, 0x12, 0xb3, 0x7a, 0xc4, 0x31,
0xc3, 0x13, 0xfe, 0x0a, 0x96, 0x0d, 0xd2, 0xa5, 0x8c, 0x13, 0x7f, 0xc7, 0xb3, 0x89, 0x41, 0x1e,
0x0c, 0x09, 0xe3, 0xe8, 0x6d, 0xc8, 0x77, 0x4c, 0x46, 0x6a, 0xda, 0x59, 0x6d, 0x63, 0x69, 0xeb,
0x74, 0x3d, 0x61, 0x87, 0x34, 0xe0, 0x26, 0xeb, 0x5e, 0x33, 0x19, 0x31, 0x04, 0x27, 0x7a, 0x17,
0x16, 0x4d, 0xdb, 0xf6, 0x09, 0x63, 0xb5, 0xf9, 0x23, 0x84, 0xae, 0x86, 0x3c, 0xc6, 0x88, 0x19,
0xff, 0xa0, 0xc1, 0x4a, 0xd2, 0x02, 0x36, 0xf0, 0x5c, 0x46, 0xd0, 0x15, 0x28, 0x30, 0x6e, 0xf2,
0x21, 0x93, 0x46, 0xfc, 0x5f, 0xa9, 0xaf, 0x25, 0x58, 0x0c, 0xc9, 0x8a, 0xae, 0xc1, 0x12, 0x75,
0x29, 0x6f, 0x0f, 0x4c, 0xdf, 0x74, 0x46, 0x96, 0xbc, 0x96, 0x94, 0x8c, 0x62, 0xd4, 0x74, 0x29,
0xdf, 0x15, 0x8c, 0x06, 0xd0, 0xe8, 0x1b, 0xdf, 0x85, 0xd5, 0x56, 0xcf, 0x7b, 0xb4, 0xed, 0xf5,
0xfb, 0xc4, 0xe2, 0xd4, 0x73, 0x9f, 0x3d, 0x28, 0x08, 0xf2, 0x76, 0xa7, 0xd9, 0x10, 0x76, 0xe4,
0x0c, 0xf1, 0x8d, 0x19, 0xac, 0xa5, 0xd5, 0xcf, 0xe2, 0xf1, 0x1b, 0x70, 0xd2, 0x8a, 0x54, 0x35,
0x1b, 0x81, 0xcf, 0xb9, 0x8d, 0x9c, 0x91, 0x24, 0xe2, 0xdf, 0x35, 0x58, 0xfd, 0xd8, 0x33, 0xed,
0x17, 0xe4, 0x14, 0xc2, 0x50, 0x8a, 0x03, 0xd6, 0x72, 0xe2, 0x5f, 0x82, 0x86, 0xde, 0x87, 0x42,
0x58, 0x7a, 0xb5, 0xbc, 0xc0, 0x3a, 0x97, 0xc4, 0x92, 0x65, 0x79, 0x68, 0x61, 0x4b, 0x10, 0x0c,
0x29, 0x84, 0xbf, 0xd5, 0xa0, 0x66, 0x90, 0x3e, 0x31, 0x19, 0x39, 0x46, 0x2f, 0xf0, 0xd7, 0x1a,
0xac, 0x04, 0xf9, 0xdb, 0x35, 0x7d, 0x4e, 0x8f, 0xc7, 0x84, 0x41, 0x58, 0xa0, 0x31, 0x0b, 0x66,
0x29, 0x20, 0x0c, 0xa5, 0xc1, 0x48, 0xd3, 0x61, 0xfd, 0x24, 0x68, 0xd8, 0x81, 0x4a, 0x84, 0x16,
0x88, 0x13, 0x86, 0xce, 0xc2, 0x52, 0x8c, 0x45, 0x00, 0xe6, 0x8c, 0x38, 0x09, 0xbd, 0x07, 0x0b,
0x01, 0x04, 0x11, 0xfe, 0x95, 0xb7, 0x70, 0x7d, 0xbc, 0x99, 0xd5, 0x93, 0x5a, 0x8d, 0x50, 0x00,
0xff, 0xac, 0xc1, 0x5a, 0x0a, 0xef, 0xe5, 0x97, 0x6b, 0x3a, 0x2e, 0x79, 0x45, 0x5c, 0x7e, 0xd3,
0x60, 0x7d, 0xcc, 0xd0, 0x59, 0x92, 0xb1, 0x07, 0x6b, 0x11, 0x40, 0xdb, 0x26, 0xcc, 0xf2, 0xe9,
0x20, 0xf8, 0x0e, 0xd3, 0xb2, 0xb4, 0xf5, 0x7a, 0x76, 0x10, 0x99, 0xb1, 0x1a, 0xa9, 0x68, 0xc4,
0x34, 0xe0, 0x7f, 0x34, 0x58, 0x09, 0x7a, 0xc0, 0xf1, 0x55, 0xee, 0x34, 0x31, 0x8d, 0xb5, 0x89,
0x85, 0x67, 0x69, 0x13, 0xbf, 0x68, 0xb0, 0x2e, 0xdb, 0xc4, 0x7f, 0xdb, 0x51, 0xfc, 0xa3, 0x06,
0xfa, 0xb6, 0x4f, 0x4c, 0x4e, 0x3e, 0x0d, 0xd2, 0xb8, 0xdd, 0x33, 0x5d, 0x97, 0xf4, 0x67, 0xab,
0x9f, 0x0b, 0x50, 0xf1, 0x43, 0x67, 0xdb, 0x56, 0xa8, 0x4f, 0x98, 0x5e, 0x34, 0xca, 0x92, 0x2c,
0x51, 0xd0, 0x39, 0x28, 0xfb, 0x84, 0x0d, 0xfb, 0x87, 0x7c, 0x39, 0xc1, 0x77, 0x32, 0xa4, 0x4a,
0x36, 0xfc, 0x93, 0x06, 0xeb, 0x57, 0x6d, 0x3b, 0x6e, 0xe0, 0x0c, 0x57, 0xf1, 0x32, 0x9c, 0x4a,
0x59, 0x27, 0x43, 0x5b, 0x34, 0xaa, 0x49, 0xfb, 0x9a, 0x0d, 0x74, 0x11, 0xaa, 0x49, 0x0b, 0x65,
0xa8, 0x8b, 0x46, 0x25, 0x61, 0x63, 0xb3, 0x81, 0xff, 0xd4, 0x40, 0x37, 0x88, 0xe3, 0x3d, 0x24,
0x4a, 0x43, 0x9f, 0x29, 0x92, 0x23, 0xef, 0xe6, 0x67, 0xf3, 0x2e, 0xf7, 0x14, 0xde, 0xe5, 0xd5,
0xde, 0xdd, 0x83, 0xb5, 0x3b, 0x26, 0xb7, 0x7a, 0x0d, 0x67, 0xf6, 0x0c, 0x9c, 0x01, 0x88, 0xf0,
0xc2, 0x9e, 0x52, 0x34, 0x62, 0x14, 0xfc, 0xeb, 0x3c, 0xa0, 0xa0, 0x47, 0xb4, 0x48, 0xd7, 0x21,
0x2e, 0x7f, 0xf9, 0x17, 0x27, 0xf5, 0xac, 0xe4, 0xc7, 0x9f, 0x95, 0x33, 0x00, 0x2c, 0xb4, 0x2e,
0x70, 0x61, 0x41, 0x5c, 0xac, 0x18, 0x05, 0xe9, 0x70, 0x62, 0x9f, 0x92, 0xbe, 0x1d, 0xfc, 0x2d,
0x88, 0xbf, 0xd1, 0x19, 0x7d, 0x04, 0x65, 0xc9, 0xd9, 0x16, 0x2f, 0x0d, 0xab, 0x2d, 0xaa, 0xda,
0x6a, 0x30, 0x35, 0xd7, 0x65, 0x08, 0x44, 0x4f, 0x6d, 0xba, 0xfb, 0x9e, 0x71, 0x92, 0xc5, 0x28,
0x0c, 0xff, 0xad, 0xc1, 0xaa, 0x6c, 0x34, 0xc7, 0x16, 0xad, 0x69, 0xfa, 0xe9, 0x0c, 0xf1, 0xc2,
0xdf, 0x6b, 0xb0, 0xbe, 0xed, 0x39, 0x03, 0xcf, 0x8d, 0xfc, 0x9e, 0xad, 0x3f, 0x7d, 0x10, 0x0a,
0x91, 0xd1, 0x68, 0x7e, 0x7e, 0xc2, 0x68, 0x9e, 0x06, 0x95, 0x52, 0xf8, 0x2f, 0x0d, 0x96, 0x64,
0xb4, 0x83, 0x9c, 0xa0, 0xd3, 0x50, 0x8c, 0x5c, 0x91, 0x33, 0xc8, 0x21, 0x61, 0x2c, 0x84, 0xf3,
0xd9, 0x05, 0x97, 0x1b, 0x2f, 0xb8, 0xff, 0xc1, 0x09, 0x87, 0x38, 0x6d, 0x46, 0x0f, 0x88, 0xac,
0xc7, 0x45, 0x87, 0x38, 0x2d, 0x7a, 0x40, 0x82, 0x5f, 0xee, 0xd0, 0x69, 0xfb, 0xde, 0x23, 0x26,
0x5e, 0xab, 0x9c, 0xb1, 0xe8, 0x0e, 0x1d, 0xc3, 0x7b, 0xc4, 0xd0, 0x2b, 0x00, 0xd4, 0xb5, 0xc9,
0xe3, 0xb6, 0x6b, 0x3a, 0xa4, 0x56, 0x10, 0x57, 0xbb, 0x28, 0x28, 0x3b, 0xa6, 0x43, 0x50, 0x0d,
0x16, 0xc5, 0xa1, 0xd9, 0xa8, 0x2d, 0x86, 0x82, 0xf2, 0x88, 0xf7, 0x01, 0xc5, 0x3c, 0x9c, 0xe9,
0xaa, 0xc7, 0xf2, 0x3e, 0x9f, 0xce, 0x3b, 0xfe, 0x46, 0x83, 0xe5, 0x04, 0xd0, 0x2c, 0x79, 0x7d,
0x07, 0x16, 0xa8, 0xbb, 0xef, 0x8d, 0xc6, 0x94, 0x57, 0x55, 0x63, 0x4a, 0x1c, 0x2c, 0xe4, 0xbe,
0x74, 0x00, 0xe5, 0xe4, 0xf0, 0x82, 0x4a, 0x70, 0x62, 0xc7, 0xe3, 0xd7, 0x1f, 0x53, 0xc6, 0xab,
0x73, 0xa8, 0x0c, 0xb0, 0xe3, 0xf1, 0x5d, 0x9f, 0x30, 0xe2, 0xf2, 0xaa, 0x86, 0x00, 0x0a, 0x9f,
0xb8, 0x0d, 0xca, 0xee, 0x57, 0xe7, 0xd1, 0xb2, 0x9c, 0x49, 0xcd, 0x7e, 0xd3, 0xbd, 0x49, 0x1c,
0xcf, 0x7f, 0x52, 0xcd, 0x05, 0xe2, 0xd1, 0x29, 0x8f, 0xaa, 0x50, 0x8a, 0x58, 0x6e, 0xec, 0xde,
0xae, 0x2e, 0xa0, 0x22, 0x2c, 0x84, 0x9f, 0x85, 0xad, 0xef, 0x00, 0x4a, 0xe2, 0xb9, 0x68, 0x85,
0xeb, 0x31, 0xb2, 0xa0, 0x14, 0x5f, 0x44, 0xd1, 0x05, 0x95, 0x13, 0x8a, 0x65, 0x59, 0xdf, 0xc8,
0x66, 0x0c, 0x63, 0x8b, 0xe7, 0xd0, 0x3d, 0xa8, 0x24, 0xb7, 0x3f, 0x86, 0x2e, 0x2a, 0x83, 0xa5,
0xda, 0x40, 0xf5, 0x4b, 0xd3, 0xb0, 0x46, 0x58, 0x5d, 0x28, 0x27, 0xf6, 0x04, 0x86, 0x36, 0x26,
0xc9, 0xa7, 0x47, 0x25, 0xfd, 0xe2, 0x14, 0x9c, 0x11, 0xd0, 0x67, 0x50, 0x4e, 0x0c, 0x96, 0x13,
0x80, 0x54, 0xc3, 0xa7, 0x7e, 0x54, 0x79, 0xe1, 0x39, 0xd4, 0x86, 0x53, 0xe9, 0x69, 0x8e, 0xa1,
0xcb, 0xea, 0x80, 0x2b, 0x87, 0xbe, 0x2c, 0x80, 0xbd, 0xd0, 0xf6, 0xc3, 0x00, 0xaa, 0xf3, 0xa1,
0x5c, 0x9e, 0xb3, 0x74, 0x7f, 0x19, 0x19, 0x1f, 0x53, 0xff, 0xe6, 0x11, 0xc6, 0x3f, 0x35, 0x42,
0x07, 0xd0, 0xf8, 0x08, 0x89, 0x74, 0xa5, 0xd0, 0x75, 0x67, 0xc0, 0x9f, 0xe8, 0x75, 0x15, 0xfc,
0xe4, 0x31, 0x14, 0xcf, 0xa1, 0x3b, 0x80, 0x6e, 0x10, 0x7e, 0x8b, 0x3a, 0xe4, 0x16, 0xb5, 0xee,
0x4f, 0x83, 0x91, 0x7a, 0x4e, 0xe5, 0xa1, 0xc5, 0x7d, 0xea, 0x76, 0x13, 0x65, 0xb3, 0x72, 0x83,
0x88, 0x0e, 0x4f, 0x19, 0xa7, 0x16, 0x7b, 0x8e, 0xaa, 0x3d, 0x61, 0x73, 0x7a, 0x67, 0xbd, 0x34,
0xcd, 0xf6, 0x24, 0x03, 0x7f, 0x79, 0x2a, 0xde, 0x08, 0x70, 0x4f, 0x00, 0xa6, 0x9e, 0xad, 0x23,
0x3d, 0x99, 0xf2, 0xe9, 0xc3, 0x73, 0xc8, 0x82, 0x72, 0x10, 0xa7, 0xd8, 0xb3, 0x77, 0x3e, 0xab,
0xbf, 0x4a, 0x27, 0x2e, 0x64, 0xf2, 0x8d, 0x1c, 0xd8, 0xfa, 0xa3, 0x00, 0x45, 0x51, 0x00, 0xa2,
0xf7, 0xbd, 0xb0, 0x9c, 0xdf, 0x82, 0x8a, 0xcc, 0xf9, 0xf3, 0x4c, 0x77, 0xfb, 0xa9, 0xa3, 0xaf,
0x4c, 0xef, 0x84, 0x51, 0x07, 0xcf, 0xa1, 0xbb, 0x50, 0x49, 0xad, 0x41, 0xea, 0x26, 0x34, 0x61,
0x57, 0xca, 0xba, 0xc6, 0x16, 0xa0, 0xf1, 0xfd, 0x05, 0xd5, 0xd5, 0x9d, 0x62, 0xd2, 0x9e, 0x93,
0x05, 0xf2, 0x05, 0x54, 0x52, 0x7b, 0x84, 0xfa, 0x42, 0xa8, 0x97, 0x8d, 0x2c, 0xed, 0xb7, 0xa1,
0x14, 0x5b, 0x1c, 0x98, 0xba, 0x44, 0xc7, 0x57, 0x8b, 0x2c, 0xb5, 0x9f, 0x43, 0x25, 0x39, 0x64,
0x4f, 0x78, 0x2f, 0x95, 0x93, 0x78, 0x76, 0xd8, 0x5f, 0xfc, 0xc5, 0xba, 0x76, 0x75, 0xef, 0xc3,
0x2e, 0xe5, 0xbd, 0x61, 0x27, 0x80, 0xdf, 0x3c, 0xa0, 0xfd, 0x3e, 0x3d, 0xe0, 0xc4, 0xea, 0x6d,
0x86, 0x1a, 0xde, 0xb2, 0x29, 0xe3, 0x3e, 0xed, 0x0c, 0x39, 0xb1, 0x37, 0x47, 0x4d, 0x60, 0x53,
0xa8, 0xdd, 0x14, 0x6a, 0x07, 0x9d, 0x4e, 0x41, 0x1c, 0xaf, 0xfc, 0x1b, 0x00, 0x00, 0xff, 0xff,
0xe0, 0xb4, 0xcb, 0x07, 0x24, 0x18, 0x00, 0x00,
// 1375 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xcc, 0x57, 0xdd, 0x6e, 0x1b, 0xc5,
0x17, 0xcf, 0xc6, 0x8e, 0x13, 0x9f, 0xb8, 0xb6, 0x3b, 0xf9, 0xf2, 0x7f, 0xff, 0xa5, 0x94, 0x81,
0xb6, 0x69, 0x0b, 0x0e, 0x4a, 0x05, 0xe2, 0x06, 0x50, 0x1b, 0x57, 0x95, 0x11, 0x0d, 0x61, 0xdd,
0xaa, 0x22, 0x50, 0x99, 0xf5, 0xee, 0x24, 0x9e, 0xd6, 0xbb, 0xeb, 0xee, 0x8c, 0xfb, 0x91, 0x1b,
0x40, 0xe2, 0x12, 0xc1, 0x33, 0x20, 0x24, 0x90, 0x78, 0x0a, 0x9e, 0x82, 0x1b, 0x6e, 0x40, 0x5c,
0xf0, 0x1a, 0x68, 0x67, 0xc7, 0x9b, 0xdd, 0xf5, 0x38, 0xeb, 0xc6, 0x6d, 0xc3, 0xdd, 0xce, 0xd9,
0x73, 0xce, 0xef, 0x7c, 0xcd, 0x99, 0x73, 0x60, 0xe9, 0xe1, 0x80, 0xf8, 0x4f, 0xdb, 0x8c, 0xf8,
0x8f, 0xa8, 0x45, 0xea, 0x7d, 0xdf, 0xe3, 0x1e, 0x42, 0x0e, 0xed, 0x3d, 0x1a, 0xb0, 0xf0, 0x54,
0x17, 0x1c, 0x7a, 0xc9, 0xf2, 0x1c, 0xc7, 0x73, 0x43, 0x9a, 0x5e, 0x8a, 0x73, 0xe8, 0x65, 0xea,
0x72, 0xe2, 0xbb, 0x66, 0x4f, 0x9e, 0x91, 0x6d, 0x72, 0x33, 0xa9, 0x53, 0x2f, 0x31, 0xab, 0x4b,
0x1c, 0x33, 0x3c, 0xe1, 0xaf, 0x60, 0xc9, 0x20, 0xfb, 0x94, 0x71, 0xe2, 0x6f, 0x7b, 0x36, 0x31,
0xc8, 0xc3, 0x01, 0x61, 0x1c, 0xbd, 0x0d, 0xf9, 0x8e, 0xc9, 0x48, 0x4d, 0x3b, 0xa7, 0xad, 0x2f,
0x6e, 0x9e, 0xa9, 0x27, 0xec, 0x90, 0x06, 0xdc, 0x62, 0xfb, 0xd7, 0x4d, 0x46, 0x0c, 0xc1, 0x89,
0xde, 0x85, 0x79, 0xd3, 0xb6, 0x7d, 0xc2, 0x58, 0x6d, 0xf6, 0x08, 0xa1, 0x6b, 0x21, 0x8f, 0x31,
0x64, 0xc6, 0x3f, 0x68, 0xb0, 0x9c, 0xb4, 0x80, 0xf5, 0x3d, 0x97, 0x11, 0x74, 0x15, 0x0a, 0x8c,
0x9b, 0x7c, 0xc0, 0xa4, 0x11, 0xff, 0x57, 0xea, 0x6b, 0x09, 0x16, 0x43, 0xb2, 0xa2, 0xeb, 0xb0,
0x48, 0x5d, 0xca, 0xdb, 0x7d, 0xd3, 0x37, 0x9d, 0xa1, 0x25, 0xaf, 0x25, 0x25, 0xa3, 0x18, 0x35,
0x5d, 0xca, 0x77, 0x04, 0xa3, 0x01, 0x34, 0xfa, 0xc6, 0xf7, 0x60, 0xa5, 0xd5, 0xf5, 0x1e, 0x6f,
0x79, 0xbd, 0x1e, 0xb1, 0x38, 0xf5, 0xdc, 0xe3, 0x07, 0x05, 0x41, 0xde, 0xee, 0x34, 0x1b, 0xc2,
0x8e, 0x9c, 0x21, 0xbe, 0x31, 0x83, 0xd5, 0xb4, 0xfa, 0x69, 0x3c, 0x7e, 0x03, 0x4e, 0x59, 0x91,
0xaa, 0x66, 0x23, 0xf0, 0x39, 0xb7, 0x9e, 0x33, 0x92, 0x44, 0xfc, 0x9b, 0x06, 0x2b, 0x1f, 0x7b,
0xa6, 0xfd, 0x82, 0x9c, 0x42, 0x18, 0x4a, 0x71, 0xc0, 0x5a, 0x4e, 0xfc, 0x4b, 0xd0, 0xd0, 0xfb,
0x50, 0x08, 0x4b, 0xaf, 0x96, 0x17, 0x58, 0xe7, 0x93, 0x58, 0xb2, 0x2c, 0x0f, 0x2d, 0x6c, 0x09,
0x82, 0x21, 0x85, 0xf0, 0xb7, 0x1a, 0xd4, 0x0c, 0xd2, 0x23, 0x26, 0x23, 0x27, 0xe8, 0x05, 0xfe,
0x5a, 0x83, 0xe5, 0x20, 0x7f, 0x3b, 0xa6, 0xcf, 0xe9, 0xc9, 0x98, 0xd0, 0x0f, 0x0b, 0x34, 0x66,
0xc1, 0x34, 0x05, 0x84, 0xa1, 0xd4, 0x1f, 0x6a, 0x3a, 0xac, 0x9f, 0x04, 0x0d, 0x3b, 0x50, 0x89,
0xd0, 0x02, 0x71, 0xc2, 0xd0, 0x39, 0x58, 0x8c, 0xb1, 0x08, 0xc0, 0x9c, 0x11, 0x27, 0xa1, 0xf7,
0x60, 0x2e, 0x80, 0x20, 0xc2, 0xbf, 0xf2, 0x26, 0xae, 0x8f, 0x36, 0xb3, 0x7a, 0x52, 0xab, 0x11,
0x0a, 0xe0, 0x9f, 0x35, 0x58, 0x4d, 0xe1, 0xbd, 0xfc, 0x72, 0x4d, 0xc7, 0x25, 0xaf, 0x88, 0xcb,
0xaf, 0x1a, 0xac, 0x8d, 0x18, 0x3a, 0x4d, 0x32, 0x76, 0x61, 0x35, 0x02, 0x68, 0xdb, 0x84, 0x59,
0x3e, 0xed, 0x07, 0xdf, 0x61, 0x5a, 0x16, 0x37, 0x5f, 0xcf, 0x0e, 0x22, 0x33, 0x56, 0x22, 0x15,
0x8d, 0x98, 0x06, 0xfc, 0xb7, 0x06, 0xcb, 0x41, 0x0f, 0x38, 0xb9, 0xca, 0x9d, 0x24, 0xa6, 0xb1,
0x36, 0x31, 0x77, 0x9c, 0x36, 0xf1, 0x8b, 0x06, 0x6b, 0xb2, 0x4d, 0xfc, 0xb7, 0x1d, 0xc5, 0x3f,
0x6a, 0xa0, 0x6f, 0xf9, 0xc4, 0xe4, 0xe4, 0xd3, 0x20, 0x8d, 0x5b, 0x5d, 0xd3, 0x75, 0x49, 0x6f,
0xba, 0xfa, 0xb9, 0x08, 0x15, 0x3f, 0x74, 0xb6, 0x6d, 0x85, 0xfa, 0x84, 0xe9, 0x45, 0xa3, 0x2c,
0xc9, 0x12, 0x05, 0x9d, 0x87, 0xb2, 0x4f, 0xd8, 0xa0, 0x77, 0xc8, 0x97, 0x13, 0x7c, 0xa7, 0x42,
0xaa, 0x64, 0xc3, 0x3f, 0x69, 0xb0, 0x76, 0xcd, 0xb6, 0xe3, 0x06, 0x4e, 0x71, 0x15, 0xaf, 0xc0,
0xe9, 0x94, 0x75, 0x32, 0xb4, 0x45, 0xa3, 0x9a, 0xb4, 0xaf, 0xd9, 0x40, 0x97, 0xa0, 0x9a, 0xb4,
0x50, 0x86, 0xba, 0x68, 0x54, 0x12, 0x36, 0x36, 0x1b, 0xf8, 0x0f, 0x0d, 0x74, 0x83, 0x38, 0xde,
0x23, 0xa2, 0x34, 0xf4, 0x58, 0x91, 0x1c, 0x7a, 0x37, 0x3b, 0x9d, 0x77, 0xb9, 0x67, 0xf0, 0x2e,
0xaf, 0xf6, 0xee, 0x3e, 0xac, 0xde, 0x35, 0xb9, 0xd5, 0x6d, 0x38, 0xd3, 0x67, 0xe0, 0x2c, 0x40,
0x84, 0x17, 0xf6, 0x94, 0xa2, 0x11, 0xa3, 0xe0, 0x7f, 0x66, 0x01, 0x05, 0x3d, 0xa2, 0x45, 0xf6,
0x1d, 0xe2, 0xf2, 0x97, 0x7f, 0x71, 0x52, 0xcf, 0x4a, 0x7e, 0xf4, 0x59, 0x39, 0x0b, 0xc0, 0x42,
0xeb, 0x02, 0x17, 0xe6, 0xc4, 0xc5, 0x8a, 0x51, 0x90, 0x0e, 0x0b, 0x7b, 0x94, 0xf4, 0xec, 0xe0,
0x6f, 0x41, 0xfc, 0x8d, 0xce, 0xe8, 0x23, 0x28, 0x4b, 0xce, 0xb6, 0x78, 0x69, 0x58, 0x6d, 0x5e,
0xd5, 0x56, 0x83, 0xa9, 0xb9, 0x2e, 0x43, 0x20, 0x7a, 0x6a, 0xd3, 0xdd, 0xf3, 0x8c, 0x53, 0x2c,
0x46, 0x89, 0xf7, 0xa9, 0x85, 0xe3, 0xf4, 0xa9, 0xbf, 0x34, 0x58, 0x91, 0x7d, 0xea, 0xc4, 0x82,
0x3d, 0x49, 0x3b, 0x9e, 0x22, 0xdc, 0xf8, 0x7b, 0x0d, 0xd6, 0xb6, 0x3c, 0xa7, 0xef, 0xb9, 0x51,
0xd8, 0xa6, 0x6b, 0x6f, 0x1f, 0x84, 0x42, 0x64, 0x38, 0xd9, 0x5f, 0x18, 0x33, 0xd9, 0xa7, 0x41,
0xa5, 0x14, 0xfe, 0x53, 0x83, 0x45, 0x19, 0xed, 0x20, 0xa5, 0xe8, 0x0c, 0x14, 0x23, 0x57, 0xe4,
0x08, 0x73, 0x48, 0x18, 0x09, 0xe1, 0x6c, 0x76, 0xbd, 0xe6, 0x46, 0xeb, 0xf5, 0x7f, 0xb0, 0xe0,
0x10, 0xa7, 0xcd, 0xe8, 0x01, 0x91, 0xe5, 0x3c, 0xef, 0x10, 0xa7, 0x45, 0x0f, 0x48, 0xf0, 0xcb,
0x1d, 0x38, 0x6d, 0xdf, 0x7b, 0xcc, 0xc4, 0x63, 0x97, 0x33, 0xe6, 0xdd, 0x81, 0x63, 0x78, 0x8f,
0x19, 0x7a, 0x05, 0x80, 0xba, 0x36, 0x79, 0xd2, 0x76, 0x4d, 0x87, 0xd4, 0x0a, 0xa2, 0x33, 0x14,
0x05, 0x65, 0xdb, 0x74, 0x08, 0xaa, 0xc1, 0xbc, 0x38, 0x34, 0x1b, 0xb5, 0xf9, 0x50, 0x50, 0x1e,
0xf1, 0x1e, 0xa0, 0x98, 0x87, 0x53, 0x75, 0x8a, 0x58, 0xde, 0x67, 0xd3, 0x79, 0xc7, 0xdf, 0x68,
0xb0, 0x94, 0x00, 0x9a, 0x26, 0xaf, 0xef, 0xc0, 0x1c, 0x75, 0xf7, 0xbc, 0xe1, 0x94, 0xf3, 0xaa,
0x6a, 0xca, 0x89, 0x83, 0x85, 0xdc, 0x97, 0x0f, 0xa0, 0x9c, 0x9c, 0x7d, 0x50, 0x09, 0x16, 0xb6,
0x3d, 0x7e, 0xe3, 0x09, 0x65, 0xbc, 0x3a, 0x83, 0xca, 0x00, 0xdb, 0x1e, 0xdf, 0xf1, 0x09, 0x23,
0x2e, 0xaf, 0x6a, 0x08, 0xa0, 0xf0, 0x89, 0xdb, 0xa0, 0xec, 0x41, 0x75, 0x16, 0x2d, 0xc9, 0x91,
0xd6, 0xec, 0x35, 0xdd, 0x5b, 0xc4, 0xf1, 0xfc, 0xa7, 0xd5, 0x5c, 0x20, 0x1e, 0x9d, 0xf2, 0xa8,
0x0a, 0xa5, 0x88, 0xe5, 0xe6, 0xce, 0x9d, 0xea, 0x1c, 0x2a, 0xc2, 0x5c, 0xf8, 0x59, 0xd8, 0xfc,
0x0e, 0xa0, 0x24, 0x5e, 0x9b, 0x56, 0xb8, 0x5d, 0x23, 0x0b, 0x4a, 0xf1, 0x3d, 0x16, 0x5d, 0x54,
0x39, 0xa1, 0xd8, 0xb5, 0xf5, 0xf5, 0x6c, 0xc6, 0x30, 0xb6, 0x78, 0x06, 0xdd, 0x87, 0x4a, 0x72,
0x79, 0x64, 0xe8, 0x92, 0x32, 0x58, 0xaa, 0x05, 0x56, 0xbf, 0x3c, 0x09, 0x6b, 0x84, 0xb5, 0x0f,
0xe5, 0xc4, 0x9a, 0xc1, 0xd0, 0xfa, 0x38, 0xf9, 0xf4, 0xa4, 0xa5, 0x5f, 0x9a, 0x80, 0x33, 0x02,
0xfa, 0x0c, 0xca, 0x89, 0xb9, 0x74, 0x0c, 0x90, 0x6a, 0x76, 0xd5, 0x8f, 0x2a, 0x2f, 0x3c, 0x83,
0xda, 0x70, 0x3a, 0x3d, 0x0c, 0x32, 0x74, 0x45, 0x1d, 0x70, 0xe5, 0xcc, 0x98, 0x05, 0xb0, 0x1b,
0xda, 0x7e, 0x18, 0x40, 0x75, 0x3e, 0x94, 0xbb, 0x77, 0x96, 0xee, 0x2f, 0x23, 0xe3, 0x63, 0xea,
0xdf, 0x3c, 0xc2, 0xf8, 0x67, 0x46, 0xe8, 0x00, 0x1a, 0x9d, 0x40, 0x91, 0xae, 0x14, 0xba, 0xe1,
0xf4, 0xf9, 0x53, 0xbd, 0xae, 0x82, 0x1f, 0x3f, 0xc5, 0xe2, 0x19, 0x74, 0x17, 0xd0, 0x4d, 0xc2,
0x6f, 0x53, 0x87, 0xdc, 0xa6, 0xd6, 0x83, 0x49, 0x30, 0x52, 0xaf, 0xb1, 0x3c, 0xb4, 0xb8, 0x4f,
0xdd, 0xfd, 0x44, 0xd9, 0x2c, 0xdf, 0x24, 0xa2, 0xc3, 0x53, 0xc6, 0xa9, 0xc5, 0x9e, 0xa3, 0x6a,
0x4f, 0xd8, 0x9c, 0x5e, 0x79, 0x2f, 0x4f, 0xb2, 0x7c, 0xc9, 0xc0, 0x5f, 0x99, 0x88, 0x37, 0x02,
0xdc, 0x15, 0x80, 0xa9, 0x67, 0xeb, 0x48, 0x4f, 0x26, 0x7c, 0xfa, 0xf0, 0x0c, 0xb2, 0xa0, 0x1c,
0xc4, 0x29, 0xf6, 0xec, 0x5d, 0xc8, 0xea, 0xaf, 0xd2, 0x89, 0x8b, 0x99, 0x7c, 0x43, 0x07, 0x36,
0x7f, 0x2f, 0x40, 0x51, 0x14, 0x80, 0xe8, 0x7d, 0x2f, 0x2c, 0xe7, 0xb7, 0xa1, 0x22, 0x73, 0xfe,
0x3c, 0xd3, 0xdd, 0x7e, 0xe6, 0xe8, 0x2b, 0xd3, 0x3b, 0x66, 0xd4, 0xc1, 0x33, 0xe8, 0x1e, 0x54,
0x52, 0x5b, 0x94, 0xba, 0x09, 0x8d, 0x59, 0xb5, 0xb2, 0xae, 0xb1, 0x05, 0x68, 0x74, 0xfd, 0x41,
0x75, 0x75, 0xa7, 0x18, 0xb7, 0x26, 0x65, 0x81, 0x7c, 0x01, 0x95, 0xd4, 0x1a, 0xa2, 0xbe, 0x10,
0xea, 0x5d, 0x25, 0x4b, 0xfb, 0x1d, 0x28, 0xc5, 0xf6, 0x0e, 0xa6, 0x2e, 0xd1, 0xd1, 0xcd, 0x24,
0x4b, 0xed, 0xe7, 0x50, 0x49, 0x0e, 0xd9, 0x63, 0xde, 0x4b, 0xe5, 0x24, 0x9e, 0x1d, 0xf6, 0x17,
0x7f, 0xb1, 0xae, 0x5f, 0xdb, 0xfd, 0x70, 0x9f, 0xf2, 0xee, 0xa0, 0x13, 0xc0, 0x6f, 0x1c, 0xd0,
0x5e, 0x8f, 0x1e, 0x70, 0x62, 0x75, 0x37, 0x42, 0x0d, 0x6f, 0xd9, 0x94, 0x71, 0x9f, 0x76, 0x06,
0x9c, 0xd8, 0x1b, 0xc3, 0x26, 0xb0, 0x21, 0xd4, 0x6e, 0x08, 0xb5, 0xfd, 0x4e, 0xa7, 0x20, 0x8e,
0x57, 0xff, 0x0d, 0x00, 0x00, 0xff, 0xff, 0xb8, 0x72, 0x8c, 0x04, 0x63, 0x18, 0x00, 0x00,
}
// Reference imports to suppress errors if they are not otherwise used.
......
......@@ -2,9 +2,6 @@ package querynode
import (
"log"
"sort"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/golang/protobuf/proto"
......@@ -52,27 +49,27 @@ func (ddNode *ddNode) Operate(in []*Msg) []*Msg {
ddNode.ddMsg.gcRecord = &gcRecord
// sort tsMessages
tsMessages := msMsg.TsMessages()
sort.Slice(tsMessages,
func(i, j int) bool {
return tsMessages[i].BeginTs() < tsMessages[j].BeginTs()
})
//tsMessages := msMsg.TsMessages()
//sort.Slice(tsMessages,
// func(i, j int) bool {
// return tsMessages[i].BeginTs() < tsMessages[j].BeginTs()
// })
// do dd tasks
for _, msg := range tsMessages {
switch msg.Type() {
case commonpb.MsgType_kCreateCollection:
ddNode.createCollection(msg.(*msgstream.CreateCollectionMsg))
case commonpb.MsgType_kDropCollection:
ddNode.dropCollection(msg.(*msgstream.DropCollectionMsg))
case commonpb.MsgType_kCreatePartition:
ddNode.createPartition(msg.(*msgstream.CreatePartitionMsg))
case commonpb.MsgType_kDropPartition:
ddNode.dropPartition(msg.(*msgstream.DropPartitionMsg))
default:
log.Println("Non supporting message type:", msg.Type())
}
}
//for _, msg := range tsMessages {
// switch msg.Type() {
// case commonpb.MsgType_kCreateCollection:
// ddNode.createCollection(msg.(*msgstream.CreateCollectionMsg))
// case commonpb.MsgType_kDropCollection:
// ddNode.dropCollection(msg.(*msgstream.DropCollectionMsg))
// case commonpb.MsgType_kCreatePartition:
// ddNode.createPartition(msg.(*msgstream.CreatePartitionMsg))
// case commonpb.MsgType_kDropPartition:
// ddNode.dropPartition(msg.(*msgstream.DropPartitionMsg))
// default:
// log.Println("Non supporting message type:", msg.Type())
// }
//}
var res Msg = ddNode.ddMsg
return []*Msg{&res}
......
......@@ -69,9 +69,11 @@ func (fdmNode *filterDmNode) Operate(in []*Msg) []*Msg {
func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg) *msgstream.InsertMsg {
// TODO: open this check
// check if partition dm enable
//if enable, _ := fdmNode.replica.getEnablePartitionDM(msg.CollectionID, msg.PartitionID); !enable {
// return nil
//}
enableCollection := fdmNode.replica.hasCollection(msg.CollectionID)
enablePartition := fdmNode.replica.hasPartition(msg.PartitionID)
if !enableCollection || !enablePartition {
return nil
}
// No dd record, do all insert requests.
records, ok := fdmNode.ddMsg.collectionRecords[msg.CollectionID]
......
......@@ -353,7 +353,7 @@ func (node *QueryNode) WatchDmChannels(in *queryPb.WatchDmChannelsRequest) (*com
return status, errors.New(errMsg)
}
fgDMMsgStream, ok := node.dataSyncService.dmStream.(*pulsarms.PulsarMsgStream)
fgDMMsgStream, ok := node.dataSyncService.dmStream.(*pulsarms.PulsarTtMsgStream)
if !ok {
errMsg := "type assertion failed for dm message stream"
status := &commonpb.Status{
......@@ -381,7 +381,30 @@ func (node *QueryNode) LoadSegments(in *queryPb.LoadSegmentRequest) (*commonpb.S
partitionID := in.PartitionID
segmentIDs := in.SegmentIDs
fieldIDs := in.FieldIDs
schema := in.Schema
hasCollection := node.replica.hasCollection(collectionID)
hasPartition := node.replica.hasPartition(partitionID)
if !hasCollection {
err := node.replica.addCollection(collectionID, schema)
if err != nil {
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: err.Error(),
}
return status, err
}
}
if !hasPartition {
err := node.replica.addPartition(collectionID, partitionID)
if err != nil {
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: err.Error(),
}
return status, err
}
}
err := node.replica.enablePartition(partitionID)
if err != nil {
status := &commonpb.Status{
......@@ -395,7 +418,7 @@ func (node *QueryNode) LoadSegments(in *queryPb.LoadSegmentRequest) (*commonpb.S
for i, state := range in.SegmentStates {
if state.State == commonpb.SegmentState_SegmentGrowing {
position := state.StartPosition
err = node.loadService.segLoader.seekSegment(position)
err := node.loadService.segLoader.seekSegment(position)
if err != nil {
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
......
......@@ -3,19 +3,23 @@ package queryservice
import (
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/proto/querypb"
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
)
type metaReplica interface {
getCollections(dbID UniqueID) ([]*collection, error)
getPartitions(dbID UniqueID, collectionID UniqueID) ([]*partition, error)
getSegments(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) ([]*segment, error)
loadCollection(dbID UniqueID, collectionID UniqueID) (*collection, error)
loadPartition(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) (*partition, error)
getCollectionByID(dbID UniqueID, collectionID UniqueID) (*collection, error)
getPartitionByID(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) (*partition, error)
addCollection(dbID UniqueID, collectionID UniqueID, schema *schemapb.CollectionSchema) error
addPartition(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) error
updatePartitionState(dbID UniqueID, collectionID UniqueID, partitionID UniqueID, state querypb.PartitionState) error
getPartitionStates(dbID UniqueID, collectionID UniqueID, partitionIDs []UniqueID) ([]*querypb.PartitionStates, error)
releaseCollection(dbID UniqueID, collectionID UniqueID) error
releasePartition(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) error
addDmChannels(collectionID UniqueID, channels []string) error
addDmChannels(dbID UniqueID, collectionID UniqueID, channels2NodeID map[string]UniqueID) error
getAssignedNodeIDByChannelName(dbID UniqueID, collectionID UniqueID, channel string) (UniqueID, error)
}
type segment struct {
......@@ -29,9 +33,10 @@ type partition struct {
}
type collection struct {
id UniqueID
partitions map[UniqueID]*partition
dmChannelNames []string
id UniqueID
partitions map[UniqueID]*partition
dmChannels2Node map[string]UniqueID
schema *schemapb.CollectionSchema
}
type metaReplicaImpl struct {
......@@ -50,22 +55,25 @@ func newMetaReplica() metaReplica {
}
}
func (mp *metaReplicaImpl) addCollection(dbID UniqueID, collectionID UniqueID) (*collection, error) {
func (mp *metaReplicaImpl) addCollection(dbID UniqueID, collectionID UniqueID, schema *schemapb.CollectionSchema) error {
//TODO:: assert dbID = 0 exist
if _, ok := mp.db2collections[dbID]; ok {
partitions := make(map[UniqueID]*partition)
channels := make(map[string]UniqueID)
newCollection := &collection{
id: collectionID,
partitions: partitions,
id: collectionID,
partitions: partitions,
schema: schema,
dmChannels2Node: channels,
}
mp.db2collections[dbID] = append(mp.db2collections[dbID], newCollection)
return newCollection, nil
return nil
}
return nil, errors.New("addCollection: can't find dbID when add collection")
return errors.New("addCollection: can't find dbID when add collection")
}
func (mp *metaReplicaImpl) addPartition(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) (*partition, error) {
func (mp *metaReplicaImpl) addPartition(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) error {
if collections, ok := mp.db2collections[dbID]; ok {
for _, collection := range collections {
if collection.id == collectionID {
......@@ -77,11 +85,11 @@ func (mp *metaReplicaImpl) addPartition(dbID UniqueID, collectionID UniqueID, pa
segments: segments,
}
partitions[partitionID] = partition
return partition, nil
return nil
}
}
}
return nil, errors.New("addPartition: can't find collection when add partition")
return errors.New("addPartition: can't find collection when add partition")
}
func (mp *metaReplicaImpl) getCollections(dbID UniqueID) ([]*collection, error) {
......@@ -125,49 +133,31 @@ func (mp *metaReplicaImpl) getSegments(dbID UniqueID, collectionID UniqueID, par
return nil, errors.New("getSegments: can't find segmentID")
}
func (mp *metaReplicaImpl) loadCollection(dbID UniqueID, collectionID UniqueID) (*collection, error) {
var res *collection = nil
if collections, err := mp.getCollections(dbID); err == nil {
func (mp *metaReplicaImpl) getCollectionByID(dbID UniqueID, collectionID UniqueID) (*collection, error) {
if collections, ok := mp.db2collections[dbID]; ok {
for _, collection := range collections {
if collectionID == collection.id {
res = collection
return collection, nil
}
}
}
if res == nil {
collection, err := mp.addCollection(dbID, collectionID)
if err != nil {
return nil, err
}
res = collection
}
return res, nil
return nil, errors.New("getCollectionByID: can't find collectionID")
}
func (mp *metaReplicaImpl) loadPartition(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) (*partition, error) {
var collection *collection = nil
var partition *partition = nil
var err error
for _, col := range mp.db2collections[dbID] {
if col.id == collectionID {
collection = col
}
}
if collection == nil {
collection, err = mp.addCollection(dbID, collectionID)
if err != nil {
return partition, err
}
}
if _, ok := collection.partitions[partitionID]; !ok {
partition, err = mp.addPartition(dbID, collectionID, partitionID)
if err != nil {
return partition, err
func (mp *metaReplicaImpl) getPartitionByID(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) (*partition, error) {
if collections, ok := mp.db2collections[dbID]; ok {
for _, collection := range collections {
if collectionID == collection.id {
partitions := collection.partitions
if partition, ok := partitions[partitionID]; ok {
return partition, nil
}
}
}
return partition, nil
}
return nil, nil
return nil, errors.New("getPartitionByID: can't find partitionID")
}
func (mp *metaReplicaImpl) updatePartitionState(dbID UniqueID,
......@@ -235,23 +225,12 @@ func (mp *metaReplicaImpl) releasePartition(dbID UniqueID, collectionID UniqueID
return errors.New("releasePartition: can't find dbID or collectionID or partitionID")
}
func (mp *metaReplicaImpl) addDmChannels(collectionID UniqueID, channels []string) error {
//TODO :: use dbID
if collections, ok := mp.db2collections[0]; ok {
func (mp *metaReplicaImpl) addDmChannels(dbID UniqueID, collectionID UniqueID, channels2NodeID map[string]UniqueID) error {
if collections, ok := mp.db2collections[dbID]; ok {
for _, collection := range collections {
if collectionID == collection.id {
dmChannels := collection.dmChannelNames
for _, channel := range channels {
match := false
for _, existedChannel := range dmChannels {
if channel == existedChannel {
match = true
break
}
}
if !match {
dmChannels = append(dmChannels, channel)
}
for channel, id := range channels2NodeID {
collection.dmChannels2Node[channel] = id
}
return nil
}
......@@ -259,3 +238,17 @@ func (mp *metaReplicaImpl) addDmChannels(collectionID UniqueID, channels []strin
}
return errors.New("addDmChannels: can't find dbID or collectionID")
}
func (mp *metaReplicaImpl) getAssignedNodeIDByChannelName(dbID UniqueID, collectionID UniqueID, channel string) (UniqueID, error) {
if collections, ok := mp.db2collections[dbID]; ok {
for _, collection := range collections {
if collectionID == collection.id {
if id, ok := collection.dmChannels2Node[channel]; ok {
return id, nil
}
}
}
}
return 0, errors.New("getAssignedNodeIDByChannelName: can't find dbID or collectionID")
}
......@@ -182,6 +182,7 @@ func (qs *QueryService) ShowCollections(req *querypb.ShowCollectionRequest) (*qu
func (qs *QueryService) LoadCollection(req *querypb.LoadCollectionRequest) (*commonpb.Status, error) {
dbID := req.DbID
collectionID := req.CollectionID
schema := req.Schema
fn := func(err error) *commonpb.Status {
if err != nil {
return &commonpb.Status{
......@@ -193,30 +194,23 @@ func (qs *QueryService) LoadCollection(req *querypb.LoadCollectionRequest) (*com
ErrorCode: commonpb.ErrorCode_SUCCESS,
}
}
collection, err := qs.replica.loadCollection(dbID, collectionID)
if err != nil {
return fn(err), err
}
if len(collection.dmChannelNames) != 0 {
fmt.Println("load collection start, collectionID = ", collectionID)
_, err := qs.replica.getCollectionByID(dbID, collectionID)
if err == nil {
fmt.Println("load collection end, collection already exist, collectionID = ", collectionID)
return fn(nil), nil
}
channelRequest := datapb.InsertChannelRequest{
DbID: req.DbID,
CollectionID: req.CollectionID,
}
resp, err := qs.dataServiceClient.GetInsertChannels(&channelRequest)
if resp == nil {
err = errors.New("get insert channels resp is nil")
}
if resp.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
err = errors.New(resp.Status.Reason)
err = qs.replica.addCollection(dbID, collectionID, schema)
if err != nil {
return fn(err), err
}
err = qs.watchDmChannels(dbID, collectionID)
if err != nil {
return fn(err), err
}
dmChannels := resp.Values
// get partitionIDs
showPartitionRequest := &milvuspb.ShowPartitionRequest{
......@@ -235,30 +229,17 @@ func (qs *QueryService) LoadCollection(req *querypb.LoadCollectionRequest) (*com
}
partitionIDs := showPartitionResponse.PartitionIDs
if len(partitionIDs) == 0 {
loadSegmentRequest := &querypb.LoadSegmentRequest{
CollectionID: collectionID,
}
for _, node := range qs.queryNodes {
_, err := node.LoadSegments(loadSegmentRequest)
if err != nil {
return fn(err), err
}
}
nodeIDs := qs.shuffleChannelsToQueryNode(dmChannels)
err = qs.watchDmChannels(dmChannels, nodeIDs, collection)
return fn(err), err
}
loadPartitionsRequest := &querypb.LoadPartitionRequest{
Base: req.Base,
DbID: dbID,
CollectionID: collectionID,
PartitionIDs: partitionIDs,
Schema: schema,
}
status, err := qs.LoadPartitions(loadPartitionsRequest)
fmt.Println("load collection end, collectionID = ", collectionID)
return status, err
}
......@@ -327,6 +308,7 @@ func (qs *QueryService) LoadPartitions(req *querypb.LoadPartitionRequest) (*comm
dbID := req.DbID
collectionID := req.CollectionID
partitionIDs := req.PartitionIDs
schema := req.Schema
fn := func(err error) *commonpb.Status {
if err != nil {
......@@ -339,28 +321,35 @@ func (qs *QueryService) LoadPartitions(req *querypb.LoadPartitionRequest) (*comm
ErrorCode: commonpb.ErrorCode_SUCCESS,
}
}
fmt.Println("load partitions start, partitionIDs = ", partitionIDs)
if len(partitionIDs) == 0 {
err := errors.New("partitionIDs are empty")
return fn(err), err
}
var collection *collection = nil
var err error
if collection, err = qs.replica.loadCollection(dbID, collectionID); err != nil {
return fn(err), err
_, err := qs.replica.getCollectionByID(dbID, collectionID)
if err != nil {
err = qs.replica.addCollection(dbID, collectionID, schema)
if err != nil {
return fn(err), err
}
err = qs.watchDmChannels(dbID, collectionID)
if err != nil {
return fn(err), err
}
}
for _, partitionID := range partitionIDs {
partition, err := qs.replica.loadPartition(dbID, collectionID, partitionID)
_, err = qs.replica.getPartitionByID(dbID, collectionID, partitionID)
if err == nil {
continue
}
err = qs.replica.addPartition(dbID, collectionID, partitionID)
if err != nil {
return fn(err), err
}
if partition == nil {
return fn(err), nil
}
showSegmentRequest := &milvuspb.ShowSegmentRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kShowSegment,
......@@ -372,25 +361,34 @@ func (qs *QueryService) LoadPartitions(req *querypb.LoadPartitionRequest) (*comm
if err != nil {
return fn(err), err
}
segmentIDs := showSegmentResponse.SegmentIDs
if len(segmentIDs) == 0 {
loadSegmentRequest := &querypb.LoadSegmentRequest{
CollectionID: collectionID,
PartitionID: partitionID,
Schema: schema,
}
for _, node := range qs.queryNodes {
_, err := node.LoadSegments(loadSegmentRequest)
if err != nil {
return fn(err), nil
}
}
}
qs.replica.updatePartitionState(dbID, collectionID, partitionID, querypb.PartitionState_PartialInMemory)
segmentStates := make(map[UniqueID]*datapb.SegmentStateInfo)
channel2segs := make(map[string][]UniqueID)
resp, err := qs.dataServiceClient.GetSegmentStates(&datapb.SegmentStatesRequest{
SegmentIDs: segmentIDs,
})
if err != nil {
return fn(err), err
}
for _, state := range resp.States {
segmentID := state.SegmentID
segmentStates[segmentID] = state
channelName := state.StartPosition.ChannelName
if _, ok := channel2segs[channelName]; !ok {
segments := make([]UniqueID, 0)
segments = append(segments, segmentID)
......@@ -400,88 +398,39 @@ func (qs *QueryService) LoadPartitions(req *querypb.LoadPartitionRequest) (*comm
}
}
qs.replica.updatePartitionState(dbID, collectionID, partitionID, querypb.PartitionState_PartialInMemory)
for channel, segmentIDs := range channel2segs {
sort.Slice(segmentIDs, func(i, j int) bool {
return segmentStates[segmentIDs[i]].StartPosition.Timestamp < segmentStates[segmentIDs[j]].StartPosition.Timestamp
})
var channelLoadDone = false
for _, node := range qs.queryNodes {
channels2node := node.dmChannelNames
for _, ch := range channels2node {
if channel == ch {
channelLoadDone = true
break
}
}
if channelLoadDone {
break
}
}
if !channelLoadDone {
states := make([]*datapb.SegmentStateInfo, 0)
for _, id := range segmentIDs {
states = append(states, segmentStates[id])
}
loadSegmentRequest := &querypb.LoadSegmentRequest{
CollectionID: collectionID,
PartitionID: partitionID,
SegmentIDs: segmentIDs,
SegmentStates: states,
}
dmChannels := []string{channel}
nodeIDs := qs.shuffleChannelsToQueryNode(dmChannels)
err = qs.watchDmChannels(dmChannels, nodeIDs, collection)
if err != nil {
return fn(err), err
}
queryNode := qs.queryNodes[nodeIDs[0]]
//TODO:: seek when loadSegment may cause more msgs consumed
status, err := queryNode.LoadSegments(loadSegmentRequest)
if err != nil {
return status, err
}
}
}
qs.replica.updatePartitionState(dbID, collectionID, partitionID, querypb.PartitionState_InMemory)
}
if len(collection.dmChannelNames) == 0 {
channelRequest := datapb.InsertChannelRequest{
DbID: dbID,
CollectionID: collectionID,
}
resp, err := qs.dataServiceClient.GetInsertChannels(&channelRequest)
if resp == nil {
err = errors.New("get insert channels resp is nil")
return fn(err), err
}
if resp.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
err = errors.New(resp.Status.Reason)
return fn(err), err
}
dmChannels := resp.Values
for _, partitionID := range partitionIDs {
states := make([]*datapb.SegmentStateInfo, 0)
for _, id := range segmentIDs {
states = append(states, segmentStates[id])
}
loadSegmentRequest := &querypb.LoadSegmentRequest{
CollectionID: collectionID,
PartitionID: partitionID,
CollectionID: collectionID,
PartitionID: partitionID,
SegmentIDs: segmentIDs,
SegmentStates: states,
Schema: schema,
}
for _, node := range qs.queryNodes {
_, err := node.LoadSegments(loadSegmentRequest)
if err != nil {
return fn(err), nil
}
nodeID, err := qs.replica.getAssignedNodeIDByChannelName(dbID, collectionID, channel)
if err != nil {
return fn(err), err
}
queryNode := qs.queryNodes[nodeID]
//TODO:: seek when loadSegment may cause more msgs consumed
//TODO:: all query node should load partition's msg
status, err := queryNode.LoadSegments(loadSegmentRequest)
if err != nil {
return status, err
}
nodeIDs := qs.shuffleChannelsToQueryNode(dmChannels)
err = qs.watchDmChannels(dmChannels, nodeIDs, collection)
}
if err != nil {
return fn(err), err
}
qs.replica.updatePartitionState(dbID, collectionID, partitionID, querypb.PartitionState_InMemory)
}
fmt.Println("load partitions end, partitionIDs = ", partitionIDs)
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
}, nil
......@@ -622,28 +571,38 @@ func (qs *QueryService) SetEnableGrpc(en bool) {
qs.enableGrpc = en
}
func (qs *QueryService) watchDmChannels(dmChannels []string, assignedNodeIDs []UniqueID, collection *collection) error {
err := qs.replica.addDmChannels(collection.id, dmChannels)
func (qs *QueryService) watchDmChannels(dbID UniqueID, collectionID UniqueID) error {
collection, err := qs.replica.getCollectionByID(0, collectionID)
if err != nil {
return err
}
channelRequest := datapb.InsertChannelRequest{
DbID: dbID,
CollectionID: collectionID,
}
resp, err := qs.dataServiceClient.GetInsertChannels(&channelRequest)
if err != nil {
return err
}
if len(resp.Values) == 0 {
err = errors.New("haven't assign dm channel to collection")
return err
}
dmChannels := resp.Values
channels2NodeID := qs.shuffleChannelsToQueryNode(dmChannels)
err = qs.replica.addDmChannels(dbID, collection.id, channels2NodeID)
if err != nil {
return err
}
node2channels := make(map[UniqueID][]string)
for i, channel := range dmChannels {
nodeID := assignedNodeIDs[i]
findChannel := false
for _, ch := range collection.dmChannelNames {
if channel == ch {
findChannel = true
}
}
if !findChannel {
if _, ok := node2channels[nodeID]; ok {
node2channels[nodeID] = append(node2channels[nodeID], channel)
} else {
channels := make([]string, 0)
channels = append(channels, channel)
node2channels[nodeID] = channels
}
for channel, nodeID := range channels2NodeID {
if _, ok := node2channels[nodeID]; ok {
node2channels[nodeID] = append(node2channels[nodeID], channel)
} else {
channels := make([]string, 0)
channels = append(channels, channel)
node2channels[nodeID] = channels
}
}
......@@ -653,18 +612,19 @@ func (qs *QueryService) watchDmChannels(dmChannels []string, assignedNodeIDs []U
ChannelIDs: channels,
}
_, err := node.WatchDmChannels(request)
node.AddDmChannels(channels)
if err != nil {
return err
}
fmt.Println("query node ", nodeID, "watch channels = ", channels)
node.AddDmChannels(channels)
}
return nil
}
func (qs *QueryService) shuffleChannelsToQueryNode(dmChannels []string) []UniqueID {
func (qs *QueryService) shuffleChannelsToQueryNode(dmChannels []string) map[string]UniqueID {
maxNumDMChannel := 0
res := make([]UniqueID, 0)
res := make(map[string]UniqueID)
node2lens := make(map[UniqueID]int)
for id, node := range qs.queryNodes {
node2lens[id] = len(node.dmChannelNames)
......@@ -676,14 +636,14 @@ func (qs *QueryService) shuffleChannelsToQueryNode(dmChannels []string) []Unique
if len >= maxNumDMChannel {
maxNumDMChannel = len
} else {
res = append(res, id)
res[dmChannels[offset]] = id
node2lens[id]++
offset++
}
}
if lastOffset == offset {
for id := range node2lens {
res = append(res, id)
res[dmChannels[offset]] = id
node2lens[id]++
offset++
break
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册