From dfc6670f0f755ba01a83cd677643d8483e3ab571 Mon Sep 17 00:00:00 2001 From: xige-16 Date: Mon, 17 Jan 2022 17:37:37 +0800 Subject: [PATCH] Unsubscribe channel after query node down (#15230) Signed-off-by: xige-16 --- internal/proto/query_coord.proto | 10 + internal/proto/querypb/query_coord.pb.go | 392 +++++++++++------- internal/querycoord/channel_unsubscribe.go | 174 ++++++++ .../querycoord/channel_unsubscribe_test.go | 131 ++++++ internal/querycoord/cluster.go | 27 +- internal/querycoord/cluster_test.go | 82 ++++ internal/querycoord/meta.go | 72 ++++ internal/querycoord/query_coord.go | 18 +- internal/querycoord/querynode.go | 6 +- internal/querycoord/segment_allocator_test.go | 7 +- internal/querycoord/task.go | 69 ++- internal/querycoord/task_test.go | 9 +- internal/querynode/mock_test.go | 6 +- internal/querynode/task.go | 18 +- internal/util/funcutil/func.go | 5 + internal/util/paramtable/global_param.go | 2 +- internal/util/paramtable/global_param_test.go | 2 +- 17 files changed, 832 insertions(+), 198 deletions(-) create mode 100644 internal/querycoord/channel_unsubscribe.go create mode 100644 internal/querycoord/channel_unsubscribe_test.go diff --git a/internal/proto/query_coord.proto b/internal/proto/query_coord.proto index 9ddb9d972..33942887b 100644 --- a/internal/proto/query_coord.proto +++ b/internal/proto/query_coord.proto @@ -308,6 +308,16 @@ message CollectionInfo { int64 inMemory_percentage = 7; } +message UnsubscribeChannels { + int64 collectionID = 1; + repeated string channels = 2; +} + +message UnsubscribeChannelInfo { + int64 nodeID = 1; + repeated UnsubscribeChannels collection_channels = 2; +} + //---- synchronize messages proto between QueryCoord and QueryNode ----- message SegmentChangeInfo { int64 online_nodeID = 1; diff --git a/internal/proto/querypb/query_coord.pb.go b/internal/proto/querypb/query_coord.pb.go index 51530b3c8..3940dfcb4 100644 --- a/internal/proto/querypb/query_coord.pb.go +++ b/internal/proto/querypb/query_coord.pb.go @@ -2156,6 +2156,100 @@ func (m *CollectionInfo) GetInMemoryPercentage() int64 { return 0 } +type UnsubscribeChannels struct { + CollectionID int64 `protobuf:"varint,1,opt,name=collectionID,proto3" json:"collectionID,omitempty"` + Channels []string `protobuf:"bytes,2,rep,name=channels,proto3" json:"channels,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *UnsubscribeChannels) Reset() { *m = UnsubscribeChannels{} } +func (m *UnsubscribeChannels) String() string { return proto.CompactTextString(m) } +func (*UnsubscribeChannels) ProtoMessage() {} +func (*UnsubscribeChannels) Descriptor() ([]byte, []int) { + return fileDescriptor_aab7cc9a69ed26e8, []int{29} +} + +func (m *UnsubscribeChannels) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_UnsubscribeChannels.Unmarshal(m, b) +} +func (m *UnsubscribeChannels) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_UnsubscribeChannels.Marshal(b, m, deterministic) +} +func (m *UnsubscribeChannels) XXX_Merge(src proto.Message) { + xxx_messageInfo_UnsubscribeChannels.Merge(m, src) +} +func (m *UnsubscribeChannels) XXX_Size() int { + return xxx_messageInfo_UnsubscribeChannels.Size(m) +} +func (m *UnsubscribeChannels) XXX_DiscardUnknown() { + xxx_messageInfo_UnsubscribeChannels.DiscardUnknown(m) +} + +var xxx_messageInfo_UnsubscribeChannels proto.InternalMessageInfo + +func (m *UnsubscribeChannels) GetCollectionID() int64 { + if m != nil { + return m.CollectionID + } + return 0 +} + +func (m *UnsubscribeChannels) GetChannels() []string { + if m != nil { + return m.Channels + } + return nil +} + +type UnsubscribeChannelInfo struct { + NodeID int64 `protobuf:"varint,1,opt,name=nodeID,proto3" json:"nodeID,omitempty"` + CollectionChannels []*UnsubscribeChannels `protobuf:"bytes,2,rep,name=collection_channels,json=collectionChannels,proto3" json:"collection_channels,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *UnsubscribeChannelInfo) Reset() { *m = UnsubscribeChannelInfo{} } +func (m *UnsubscribeChannelInfo) String() string { return proto.CompactTextString(m) } +func (*UnsubscribeChannelInfo) ProtoMessage() {} +func (*UnsubscribeChannelInfo) Descriptor() ([]byte, []int) { + return fileDescriptor_aab7cc9a69ed26e8, []int{30} +} + +func (m *UnsubscribeChannelInfo) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_UnsubscribeChannelInfo.Unmarshal(m, b) +} +func (m *UnsubscribeChannelInfo) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_UnsubscribeChannelInfo.Marshal(b, m, deterministic) +} +func (m *UnsubscribeChannelInfo) XXX_Merge(src proto.Message) { + xxx_messageInfo_UnsubscribeChannelInfo.Merge(m, src) +} +func (m *UnsubscribeChannelInfo) XXX_Size() int { + return xxx_messageInfo_UnsubscribeChannelInfo.Size(m) +} +func (m *UnsubscribeChannelInfo) XXX_DiscardUnknown() { + xxx_messageInfo_UnsubscribeChannelInfo.DiscardUnknown(m) +} + +var xxx_messageInfo_UnsubscribeChannelInfo proto.InternalMessageInfo + +func (m *UnsubscribeChannelInfo) GetNodeID() int64 { + if m != nil { + return m.NodeID + } + return 0 +} + +func (m *UnsubscribeChannelInfo) GetCollectionChannels() []*UnsubscribeChannels { + if m != nil { + return m.CollectionChannels + } + return nil +} + //---- synchronize messages proto between QueryCoord and QueryNode ----- type SegmentChangeInfo struct { OnlineNodeID int64 `protobuf:"varint,1,opt,name=online_nodeID,json=onlineNodeID,proto3" json:"online_nodeID,omitempty"` @@ -2171,7 +2265,7 @@ func (m *SegmentChangeInfo) Reset() { *m = SegmentChangeInfo{} } func (m *SegmentChangeInfo) String() string { return proto.CompactTextString(m) } func (*SegmentChangeInfo) ProtoMessage() {} func (*SegmentChangeInfo) Descriptor() ([]byte, []int) { - return fileDescriptor_aab7cc9a69ed26e8, []int{29} + return fileDescriptor_aab7cc9a69ed26e8, []int{31} } func (m *SegmentChangeInfo) XXX_Unmarshal(b []byte) error { @@ -2232,7 +2326,7 @@ func (m *SealedSegmentsChangeInfo) Reset() { *m = SealedSegmentsChangeIn func (m *SealedSegmentsChangeInfo) String() string { return proto.CompactTextString(m) } func (*SealedSegmentsChangeInfo) ProtoMessage() {} func (*SealedSegmentsChangeInfo) Descriptor() ([]byte, []int) { - return fileDescriptor_aab7cc9a69ed26e8, []int{30} + return fileDescriptor_aab7cc9a69ed26e8, []int{32} } func (m *SealedSegmentsChangeInfo) XXX_Unmarshal(b []byte) error { @@ -2300,6 +2394,8 @@ func init() { proto.RegisterType((*PartitionStates)(nil), "milvus.proto.query.PartitionStates") proto.RegisterType((*SegmentInfo)(nil), "milvus.proto.query.SegmentInfo") proto.RegisterType((*CollectionInfo)(nil), "milvus.proto.query.CollectionInfo") + proto.RegisterType((*UnsubscribeChannels)(nil), "milvus.proto.query.UnsubscribeChannels") + proto.RegisterType((*UnsubscribeChannelInfo)(nil), "milvus.proto.query.UnsubscribeChannelInfo") proto.RegisterType((*SegmentChangeInfo)(nil), "milvus.proto.query.SegmentChangeInfo") proto.RegisterType((*SealedSegmentsChangeInfo)(nil), "milvus.proto.query.SealedSegmentsChangeInfo") } @@ -2307,150 +2403,154 @@ func init() { func init() { proto.RegisterFile("query_coord.proto", fileDescriptor_aab7cc9a69ed26e8) } var fileDescriptor_aab7cc9a69ed26e8 = []byte{ - // 2282 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xec, 0x39, 0x4f, 0x73, 0x1b, 0x49, - 0xf5, 0x1e, 0xfd, 0xb3, 0xf5, 0x24, 0x4b, 0xe3, 0x8e, 0xed, 0x9f, 0xac, 0x5f, 0x36, 0x9b, 0x9d, - 0x5d, 0x67, 0x83, 0x97, 0xb5, 0x83, 0x03, 0x55, 0x6c, 0x01, 0x87, 0xd8, 0xc6, 0x5e, 0x93, 0x8d, - 0xd7, 0xc8, 0x4e, 0x28, 0x52, 0xa9, 0x12, 0x23, 0x4d, 0x4b, 0x9e, 0xca, 0xcc, 0xb4, 0x32, 0x3d, - 0x5a, 0xdb, 0xe1, 0xc2, 0x81, 0xc3, 0x72, 0xa0, 0xb8, 0x53, 0x14, 0x27, 0x28, 0xc8, 0x61, 0x2f, - 0x7c, 0x82, 0x5c, 0xf8, 0x1a, 0x14, 0x07, 0xf8, 0x08, 0x1c, 0xa9, 0xa2, 0xfa, 0xcf, 0x8c, 0x66, - 0x46, 0x3d, 0xb6, 0x6c, 0x97, 0x37, 0x29, 0x8a, 0xdb, 0xcc, 0xeb, 0xd7, 0xef, 0xbd, 0x7e, 0xff, - 0xbb, 0x1f, 0xcc, 0xbd, 0x18, 0x62, 0xff, 0xb4, 0xdd, 0x25, 0xc4, 0xb7, 0x56, 0x07, 0x3e, 0x09, - 0x08, 0x42, 0xae, 0xed, 0x7c, 0x31, 0xa4, 0xe2, 0x6f, 0x95, 0xaf, 0x37, 0xab, 0x5d, 0xe2, 0xba, - 0xc4, 0x13, 0xb0, 0x66, 0x35, 0x8e, 0xd1, 0xac, 0xd9, 0x5e, 0x80, 0x7d, 0xcf, 0x74, 0xc2, 0x55, - 0xda, 0x3d, 0xc2, 0xae, 0x29, 0xff, 0x74, 0xcb, 0x0c, 0xcc, 0x38, 0x7d, 0xe3, 0x97, 0x1a, 0x2c, - 0x1e, 0x1c, 0x91, 0xe3, 0x4d, 0xe2, 0x38, 0xb8, 0x1b, 0xd8, 0xc4, 0xa3, 0x2d, 0xfc, 0x62, 0x88, - 0x69, 0x80, 0xee, 0x41, 0xa1, 0x63, 0x52, 0xdc, 0xd0, 0x6e, 0x6b, 0x77, 0x2b, 0xeb, 0x37, 0x57, - 0x13, 0x92, 0x48, 0x11, 0x1e, 0xd1, 0xfe, 0x86, 0x49, 0x71, 0x8b, 0x63, 0x22, 0x04, 0x05, 0xab, - 0xb3, 0xbb, 0xd5, 0xc8, 0xdd, 0xd6, 0xee, 0xe6, 0x5b, 0xfc, 0x1b, 0x7d, 0x00, 0xb3, 0xdd, 0x88, - 0xf6, 0xee, 0x16, 0x6d, 0xe4, 0x6f, 0xe7, 0xef, 0xe6, 0x5b, 0x49, 0xa0, 0xf1, 0x27, 0x0d, 0xfe, - 0x6f, 0x4c, 0x0c, 0x3a, 0x20, 0x1e, 0xc5, 0xe8, 0x3e, 0x94, 0x68, 0x60, 0x06, 0x43, 0x2a, 0x25, - 0xf9, 0x7f, 0xa5, 0x24, 0x07, 0x1c, 0xa5, 0x25, 0x51, 0xc7, 0xd9, 0xe6, 0x14, 0x6c, 0xd1, 0xb7, - 0x60, 0xde, 0xf6, 0x1e, 0x61, 0x97, 0xf8, 0xa7, 0xed, 0x01, 0xf6, 0xbb, 0xd8, 0x0b, 0xcc, 0x3e, - 0x0e, 0x65, 0xbc, 0x11, 0xae, 0xed, 0x8f, 0x96, 0x8c, 0x3f, 0x6a, 0xb0, 0xc0, 0x24, 0xdd, 0x37, - 0xfd, 0xc0, 0xbe, 0x06, 0x7d, 0x19, 0x50, 0x8d, 0xcb, 0xd8, 0xc8, 0xf3, 0xb5, 0x04, 0x8c, 0xe1, - 0x0c, 0x42, 0xf6, 0xec, 0x6c, 0x05, 0x2e, 0x6e, 0x02, 0x66, 0xfc, 0x41, 0x1a, 0x36, 0x2e, 0xe7, - 0x55, 0x14, 0x9a, 0xe6, 0x99, 0x1b, 0xe7, 0x79, 0x19, 0x75, 0xbe, 0xd6, 0x60, 0xe1, 0x33, 0x62, - 0x5a, 0x23, 0xc3, 0x7f, 0xfd, 0xea, 0xfc, 0x01, 0x94, 0x44, 0x94, 0x34, 0x0a, 0x9c, 0xd7, 0x72, - 0x92, 0x97, 0x8c, 0xa0, 0x91, 0x84, 0x07, 0x1c, 0xd0, 0x92, 0x9b, 0x8c, 0xdf, 0x69, 0xd0, 0x68, - 0x61, 0x07, 0x9b, 0x14, 0xbf, 0xc9, 0x53, 0x2c, 0x42, 0xc9, 0x23, 0x16, 0xde, 0xdd, 0xe2, 0xa7, - 0xc8, 0xb7, 0xe4, 0x9f, 0xf1, 0x0f, 0xa9, 0xe1, 0xb7, 0xdc, 0x61, 0x63, 0x56, 0x28, 0x5e, 0xc6, - 0x0a, 0xaf, 0x47, 0x56, 0x78, 0xdb, 0x4f, 0x3a, 0xb2, 0x54, 0x31, 0x61, 0xa9, 0x9f, 0xc2, 0xd2, - 0xa6, 0x8f, 0xcd, 0x00, 0xff, 0x98, 0xa5, 0xf9, 0xcd, 0x23, 0xd3, 0xf3, 0xb0, 0x13, 0x1e, 0x21, - 0xcd, 0x5c, 0x53, 0x30, 0x6f, 0xc0, 0xf4, 0xc0, 0x27, 0x27, 0xa7, 0x91, 0xdc, 0xe1, 0xaf, 0xf1, - 0x67, 0x0d, 0x9a, 0x2a, 0xda, 0x57, 0xc9, 0x08, 0xef, 0xc3, 0xac, 0xac, 0x57, 0x82, 0x1a, 0xe7, - 0x59, 0x6e, 0x55, 0x5f, 0xc4, 0x38, 0xa0, 0x7b, 0x30, 0x2f, 0x90, 0x7c, 0x4c, 0x87, 0x4e, 0x10, - 0xe1, 0xe6, 0x39, 0x2e, 0xe2, 0x6b, 0x2d, 0xbe, 0x24, 0x77, 0x18, 0xaf, 0x34, 0x58, 0xda, 0xc1, - 0x41, 0x64, 0x44, 0xc6, 0x15, 0xbf, 0xa5, 0x49, 0xf6, 0x2b, 0x0d, 0x9a, 0x2a, 0x59, 0xaf, 0xa2, - 0xd6, 0xa7, 0xb0, 0x18, 0xf1, 0x68, 0x5b, 0x98, 0x76, 0x7d, 0x7b, 0xc0, 0x9d, 0x99, 0xa7, 0xdc, - 0xca, 0xfa, 0xfb, 0xab, 0xe3, 0x2d, 0xc1, 0x6a, 0x5a, 0x82, 0x85, 0x88, 0xc4, 0x56, 0x8c, 0x82, - 0xf1, 0x6b, 0x0d, 0x16, 0x76, 0x70, 0x70, 0x80, 0xfb, 0x2e, 0xf6, 0x82, 0x5d, 0xaf, 0x47, 0x2e, - 0xaf, 0xd7, 0x5b, 0x00, 0x54, 0xd2, 0x89, 0xca, 0x41, 0x0c, 0x32, 0x89, 0x8e, 0x79, 0xf7, 0x91, - 0x96, 0xe7, 0x2a, 0xba, 0xfb, 0x0e, 0x14, 0x6d, 0xaf, 0x47, 0x42, 0x55, 0xbd, 0xab, 0x52, 0x55, - 0x9c, 0x99, 0xc0, 0x36, 0xfe, 0x9d, 0x83, 0xc5, 0x07, 0x96, 0xa5, 0x0a, 0xbb, 0x8b, 0xeb, 0x65, - 0x14, 0xdd, 0xb9, 0x78, 0x74, 0x4f, 0xe4, 0x73, 0x63, 0x21, 0x55, 0xb8, 0x40, 0x48, 0x15, 0xb3, - 0x42, 0x0a, 0xed, 0xc0, 0x2c, 0xc5, 0xf8, 0x79, 0x7b, 0x40, 0x28, 0xf7, 0x89, 0x46, 0x89, 0x9f, - 0xc6, 0x48, 0x9e, 0x26, 0xea, 0x1c, 0x1f, 0xd1, 0xfe, 0xbe, 0xc4, 0x6c, 0x55, 0xd9, 0xc6, 0xf0, - 0x0f, 0x3d, 0x86, 0xc5, 0xbe, 0x43, 0x3a, 0xa6, 0xd3, 0xa6, 0xd8, 0x74, 0xb0, 0xd5, 0x96, 0xf6, - 0xa6, 0x8d, 0xe9, 0xc9, 0x14, 0x3e, 0x2f, 0xb6, 0x1f, 0xf0, 0xdd, 0x72, 0x81, 0x1a, 0x7f, 0xd7, - 0x60, 0xa9, 0x85, 0x5d, 0xf2, 0x05, 0xfe, 0x6f, 0x35, 0x81, 0xf1, 0xcf, 0x1c, 0x2c, 0xfe, 0xc4, - 0x0c, 0xba, 0x47, 0x5b, 0xae, 0x04, 0xd1, 0x37, 0x73, 0xbe, 0x49, 0x0a, 0x54, 0x14, 0x46, 0x45, - 0x95, 0x55, 0xd9, 0x1d, 0x62, 0xf5, 0x89, 0x3c, 0x72, 0x2c, 0x8c, 0x62, 0x15, 0xbc, 0x74, 0x89, - 0x0a, 0x8e, 0x36, 0x61, 0x16, 0x9f, 0x74, 0x9d, 0xa1, 0x85, 0xdb, 0x82, 0xbb, 0xf0, 0xa9, 0x5b, - 0x0a, 0xee, 0x71, 0x97, 0xaa, 0xca, 0x4d, 0xbb, 0x3c, 0x94, 0x5f, 0x6b, 0xb0, 0x24, 0xf4, 0x8c, - 0x9d, 0xc0, 0x7c, 0xb3, 0xaa, 0x8e, 0xd4, 0x58, 0xb8, 0x88, 0x1a, 0x8d, 0xdf, 0x16, 0xa0, 0x2e, - 0x0f, 0xc8, 0xfa, 0x36, 0xb6, 0x84, 0x6e, 0x42, 0x39, 0x4a, 0xad, 0xb2, 0xf4, 0x8f, 0x00, 0xe8, - 0x36, 0x54, 0x62, 0xf6, 0x93, 0x92, 0xc6, 0x41, 0x13, 0x89, 0x1b, 0x16, 0xca, 0x42, 0xac, 0x50, - 0xbe, 0x03, 0xd0, 0x73, 0x86, 0xf4, 0xa8, 0x1d, 0xd8, 0x2e, 0x96, 0xed, 0x4a, 0x99, 0x43, 0x0e, - 0x6d, 0x17, 0xa3, 0x07, 0x50, 0xed, 0xd8, 0x9e, 0x43, 0xfa, 0xed, 0x81, 0x19, 0x1c, 0xd1, 0x46, - 0x29, 0xd3, 0x62, 0xdb, 0x36, 0x76, 0xac, 0x0d, 0x8e, 0xdb, 0xaa, 0x88, 0x3d, 0xfb, 0x6c, 0x0b, - 0xba, 0x05, 0x15, 0x6f, 0xe8, 0xb6, 0x49, 0xaf, 0xed, 0x93, 0x63, 0x66, 0x73, 0xce, 0xc2, 0x1b, - 0xba, 0x9f, 0xf7, 0x5a, 0xe4, 0x98, 0xa2, 0xef, 0x43, 0x99, 0x25, 0x77, 0xea, 0x90, 0x3e, 0x6d, - 0xcc, 0x4c, 0x44, 0x7f, 0xb4, 0x81, 0xed, 0xb6, 0x98, 0x23, 0xf0, 0xdd, 0xe5, 0xc9, 0x76, 0x47, - 0x1b, 0xd0, 0x1d, 0xa8, 0x75, 0x89, 0x3b, 0x30, 0xb9, 0x86, 0xb6, 0x7d, 0xe2, 0x36, 0x80, 0x47, - 0x4b, 0x0a, 0x8a, 0xb6, 0xa1, 0x62, 0x7b, 0x16, 0x3e, 0x91, 0x7e, 0x5b, 0xe1, 0x7c, 0x96, 0x55, - 0xb9, 0xf0, 0x09, 0xee, 0x72, 0x5e, 0xbb, 0x0c, 0x9d, 0x1b, 0x1d, 0xec, 0xf0, 0x93, 0xa2, 0xf7, - 0xa0, 0x2a, 0x8d, 0xda, 0xa6, 0xf6, 0x4b, 0xdc, 0xa8, 0x0a, 0x43, 0x4a, 0xd8, 0x81, 0xfd, 0x12, - 0x1b, 0x7f, 0xc9, 0xc1, 0xdc, 0x18, 0x11, 0xd6, 0xf8, 0xf5, 0x38, 0x24, 0x74, 0x8e, 0xf0, 0x97, - 0x91, 0xc4, 0x9e, 0xd9, 0x71, 0x58, 0x4c, 0x59, 0xf8, 0x84, 0xfb, 0xc6, 0x4c, 0xab, 0x22, 0x60, - 0x9c, 0x00, 0xb3, 0xb1, 0x90, 0xde, 0x33, 0x5d, 0x2c, 0x1b, 0xb3, 0x32, 0x87, 0xec, 0x99, 0x2e, - 0x66, 0xb4, 0x85, 0x88, 0xa1, 0x67, 0x84, 0xbf, 0x6c, 0xa5, 0x33, 0xb4, 0x39, 0x57, 0xe1, 0x19, - 0xe1, 0x2f, 0xda, 0x82, 0xaa, 0x20, 0x39, 0x30, 0x7d, 0xd3, 0x0d, 0xfd, 0xe2, 0x3d, 0x65, 0xbc, - 0x3d, 0xc4, 0xa7, 0x4f, 0x4c, 0x67, 0x88, 0xf7, 0x4d, 0xdb, 0x6f, 0x09, 0x3d, 0xee, 0xf3, 0x5d, - 0xe8, 0x2e, 0xe8, 0x82, 0x4a, 0xcf, 0x76, 0xb0, 0xf4, 0x30, 0x96, 0x13, 0xca, 0xad, 0x1a, 0x87, - 0x6f, 0xdb, 0x0e, 0x16, 0x4e, 0x14, 0x1d, 0x81, 0xab, 0x6d, 0x46, 0xf8, 0x10, 0x87, 0x70, 0xa5, - 0xbd, 0xca, 0xc1, 0x0d, 0x16, 0x4a, 0x61, 0xc1, 0xb9, 0x7c, 0x3a, 0x78, 0x07, 0xc0, 0xa2, 0x41, - 0x3b, 0x91, 0x12, 0xca, 0x16, 0x0d, 0xf6, 0x44, 0x56, 0xf8, 0x24, 0x8c, 0xf8, 0x7c, 0x76, 0xab, - 0x96, 0x0a, 0xed, 0xf1, 0xe4, 0x79, 0x99, 0x4b, 0x28, 0x2b, 0x5b, 0x94, 0x0c, 0xfd, 0x2e, 0x6e, - 0x27, 0xae, 0x16, 0x55, 0x01, 0xdc, 0x53, 0x27, 0xad, 0x92, 0xa2, 0x25, 0xfb, 0x9b, 0x06, 0x8b, - 0xf2, 0x1e, 0x75, 0x75, 0x75, 0x65, 0x65, 0xcf, 0x30, 0xd5, 0xe4, 0xcf, 0xe8, 0xc9, 0x0b, 0x13, - 0x14, 0xaf, 0xa2, 0xa2, 0x78, 0x25, 0xfb, 0xd2, 0x52, 0xba, 0x2f, 0x35, 0x7e, 0xa3, 0xc1, 0xe2, - 0xa7, 0xa6, 0x67, 0x91, 0x5e, 0xef, 0xea, 0x07, 0xdc, 0x8c, 0x22, 0x76, 0xf7, 0x22, 0x7d, 0x67, - 0x62, 0x93, 0xf1, 0x65, 0x0e, 0x10, 0x73, 0x87, 0x0d, 0xd3, 0x31, 0xbd, 0x2e, 0xbe, 0xbc, 0x34, - 0xcb, 0x50, 0x4b, 0x38, 0x41, 0xf4, 0xea, 0x15, 0xf7, 0x02, 0x8a, 0x1e, 0x42, 0xad, 0x23, 0x58, - 0xb5, 0x7d, 0x6c, 0x52, 0xe2, 0x71, 0x3b, 0xd4, 0xd6, 0x3f, 0x50, 0x89, 0x7d, 0xe8, 0xdb, 0xfd, - 0x3e, 0xf6, 0x37, 0x89, 0x67, 0x89, 0x8e, 0x70, 0xb6, 0x13, 0x8a, 0xc9, 0xb6, 0xa2, 0x77, 0xa1, - 0x32, 0x8a, 0x88, 0xb0, 0x9d, 0x80, 0x28, 0x24, 0x28, 0xfa, 0x08, 0xe6, 0x92, 0xcd, 0xe2, 0xc8, - 0x70, 0x3a, 0x8d, 0xf7, 0x81, 0xcc, 0x38, 0x3f, 0x07, 0x14, 0x35, 0x48, 0xbc, 0x8c, 0xf3, 0xf4, - 0x36, 0xc9, 0xdd, 0xf7, 0x26, 0x94, 0xad, 0x70, 0xa7, 0xbc, 0x89, 0x8e, 0x00, 0x2c, 0x3c, 0x84, - 0x84, 0x6d, 0x87, 0x98, 0x16, 0xb6, 0xc2, 0x02, 0x28, 0x80, 0x9f, 0x71, 0x98, 0xf1, 0x55, 0x0e, - 0xf4, 0x78, 0x03, 0x3a, 0x31, 0xef, 0xeb, 0xb9, 0x09, 0x9f, 0xd1, 0x6d, 0x17, 0xae, 0xd0, 0x6d, - 0x8f, 0xdf, 0x06, 0x8a, 0x97, 0xbb, 0x0d, 0x18, 0xbf, 0xd7, 0xa0, 0x9e, 0xba, 0x78, 0xa6, 0x5b, - 0x11, 0x6d, 0xbc, 0x15, 0xf9, 0x2e, 0x14, 0x59, 0x7d, 0xc6, 0x5c, 0x49, 0xb5, 0x34, 0x5b, 0xd5, - 0x75, 0xb6, 0x25, 0x36, 0xa0, 0x35, 0xb8, 0xa1, 0x78, 0x5e, 0x94, 0xa6, 0x44, 0xe3, 0xaf, 0x8b, - 0xc6, 0x2f, 0x0a, 0x50, 0x89, 0xe9, 0xe3, 0x9c, 0x2e, 0x2a, 0x6d, 0xe9, 0x9c, 0xc2, 0xd2, 0xa9, - 0xe3, 0xe5, 0xc7, 0x8f, 0x97, 0xf1, 0x0c, 0x87, 0x96, 0x60, 0xc6, 0xc5, 0xae, 0x28, 0x50, 0xb2, - 0x5a, 0xba, 0xd8, 0x65, 0xe5, 0x89, 0x2d, 0xb1, 0x16, 0x88, 0xf7, 0x3f, 0x22, 0x25, 0x4f, 0x7b, - 0x43, 0x97, 0x77, 0x3f, 0xc9, 0xda, 0x3c, 0x7d, 0x46, 0x6d, 0x9e, 0x49, 0xd6, 0xe6, 0x44, 0x38, - 0x94, 0xd3, 0xe1, 0x30, 0x69, 0x63, 0x73, 0x0f, 0x6e, 0x74, 0xf9, 0xab, 0x91, 0xb5, 0x71, 0xba, - 0x19, 0x2d, 0x35, 0x2a, 0xbc, 0x89, 0x50, 0x2d, 0xa1, 0x6d, 0xe6, 0x5c, 0xb2, 0x85, 0xe1, 0x56, - 0xae, 0x72, 0x2b, 0xab, 0x4b, 0xbf, 0xb4, 0x8d, 0x30, 0x72, 0x98, 0x13, 0xf9, 0x5f, 0xba, 0xa5, - 0x9a, 0xbd, 0x64, 0x4b, 0x65, 0x7c, 0x99, 0x87, 0xda, 0xa8, 0x68, 0x4e, 0x1c, 0xd1, 0x93, 0xbc, - 0x76, 0xef, 0x81, 0x3e, 0x7a, 0xa8, 0xe1, 0x87, 0x3d, 0xb3, 0xee, 0xa7, 0x9f, 0x68, 0xea, 0x83, - 0x54, 0xe8, 0x7c, 0x02, 0x65, 0x96, 0x9c, 0xda, 0xc1, 0xe9, 0x00, 0x73, 0xe7, 0xa9, 0xa5, 0x93, - 0xbe, 0x20, 0xc4, 0xb2, 0xd5, 0xe1, 0xe9, 0x00, 0xb7, 0x66, 0x1c, 0xf9, 0x75, 0xc5, 0xb7, 0x53, - 0x74, 0x1f, 0x16, 0x7c, 0x51, 0xf2, 0xad, 0x76, 0xe2, 0xd8, 0xa2, 0x7a, 0xce, 0x87, 0x8b, 0xfb, - 0xf1, 0xe3, 0x67, 0x44, 0xe3, 0x74, 0x66, 0x34, 0xfe, 0x4b, 0x83, 0x39, 0x69, 0x71, 0xe6, 0x87, - 0x7d, 0x7e, 0x63, 0x63, 0xb9, 0x93, 0x78, 0x8e, 0xed, 0x45, 0x8d, 0x8b, 0x34, 0x87, 0x00, 0xca, - 0xc6, 0xe5, 0x53, 0xa8, 0x4b, 0xa4, 0x28, 0x05, 0x4e, 0x58, 0x69, 0x6b, 0x62, 0x5f, 0x94, 0xfc, - 0x96, 0xa1, 0x46, 0x7a, 0xbd, 0x38, 0x3f, 0x11, 0xc3, 0xb3, 0x12, 0x2a, 0x19, 0xfe, 0x08, 0xf4, - 0x10, 0xed, 0xa2, 0x49, 0xb7, 0x2e, 0x37, 0x46, 0xaf, 0x1b, 0xbf, 0xd2, 0xa0, 0x91, 0x4c, 0xc1, - 0xb1, 0xe3, 0x5f, 0xbc, 0xc8, 0x7f, 0x2f, 0xf9, 0xc6, 0xb5, 0x7c, 0x86, 0x3c, 0x23, 0x3e, 0xb2, - 0xcb, 0x5c, 0x79, 0x09, 0xb5, 0xa4, 0x1f, 0xa2, 0x2a, 0xcc, 0xec, 0x91, 0xe0, 0x87, 0x27, 0x36, - 0x0d, 0xf4, 0x29, 0x54, 0x03, 0xd8, 0x23, 0xc1, 0xbe, 0x8f, 0x29, 0xf6, 0x02, 0x5d, 0x43, 0x00, - 0xa5, 0xcf, 0xbd, 0x2d, 0x9b, 0x3e, 0xd7, 0x73, 0xe8, 0x86, 0xcc, 0xf6, 0xa6, 0xb3, 0x2b, 0x8d, - 0xab, 0xe7, 0xd9, 0xf6, 0xe8, 0xaf, 0x80, 0x74, 0xa8, 0x46, 0x28, 0x3b, 0xfb, 0x8f, 0xf5, 0x22, - 0x2a, 0x43, 0x51, 0x7c, 0x96, 0x56, 0x2c, 0xd0, 0xd3, 0xcd, 0x04, 0xa3, 0xf9, 0xd8, 0x7b, 0xe8, - 0x91, 0xe3, 0x08, 0xa4, 0x4f, 0xa1, 0x0a, 0x4c, 0xcb, 0x06, 0x4d, 0xd7, 0x50, 0x1d, 0x2a, 0xb1, - 0xde, 0x48, 0xcf, 0x31, 0xc0, 0x8e, 0x3f, 0xe8, 0xca, 0x2e, 0x49, 0x88, 0xc0, 0xac, 0xb6, 0x45, - 0x8e, 0x3d, 0xbd, 0xb0, 0xf2, 0x00, 0x66, 0xc2, 0x00, 0x61, 0xa7, 0x11, 0xd4, 0xd9, 0x9f, 0x3e, - 0x85, 0xe6, 0x60, 0x36, 0x31, 0x09, 0xd1, 0x35, 0x84, 0xa0, 0xe6, 0x24, 0xc6, 0x4f, 0x7a, 0x6e, - 0xfd, 0xaf, 0x15, 0x00, 0xd1, 0x07, 0x10, 0xe2, 0x5b, 0x68, 0x00, 0x68, 0x07, 0x07, 0x2c, 0xc7, - 0x11, 0x2f, 0xcc, 0x4f, 0x14, 0xdd, 0xcb, 0x28, 0x97, 0xe3, 0xa8, 0x52, 0xd2, 0xe6, 0x9d, 0x8c, - 0x1d, 0x29, 0x74, 0x63, 0x0a, 0xb9, 0x9c, 0x23, 0xbb, 0x61, 0x1f, 0xda, 0xdd, 0xe7, 0x51, 0x03, - 0x91, 0xcd, 0x31, 0x85, 0x1a, 0x72, 0x4c, 0xe5, 0x21, 0xf9, 0x73, 0x10, 0xf8, 0xb6, 0xd7, 0x0f, - 0x1f, 0x5a, 0x8d, 0x29, 0xf4, 0x02, 0xe6, 0x77, 0x30, 0xe7, 0x6e, 0xd3, 0xc0, 0xee, 0xd2, 0x90, - 0xe1, 0x7a, 0x36, 0xc3, 0x31, 0xe4, 0x0b, 0xb2, 0x74, 0xa0, 0x9e, 0x1a, 0xf7, 0xa2, 0x15, 0xa5, - 0x23, 0x2b, 0x47, 0xd3, 0xcd, 0x8f, 0x26, 0xc2, 0x8d, 0xb8, 0xd9, 0x50, 0x4b, 0x8e, 0x42, 0xd1, - 0x37, 0xb2, 0x08, 0x8c, 0xcd, 0x8e, 0x9a, 0x2b, 0x93, 0xa0, 0x46, 0xac, 0x9e, 0x42, 0x2d, 0x39, - 0x6c, 0x53, 0xb3, 0x52, 0x0e, 0xe4, 0x9a, 0x67, 0xbd, 0x71, 0x1b, 0x53, 0xe8, 0x67, 0x30, 0x37, - 0x36, 0xe1, 0x42, 0xdf, 0x54, 0x91, 0xcf, 0x1a, 0x84, 0x9d, 0xc7, 0x41, 0x4a, 0x3f, 0xd2, 0x62, - 0xb6, 0xf4, 0x63, 0xa3, 0xce, 0xc9, 0xa5, 0x8f, 0x91, 0x3f, 0x4b, 0xfa, 0x0b, 0x73, 0x18, 0x02, - 0x1a, 0x9f, 0x71, 0xa1, 0x8f, 0x55, 0x2c, 0x32, 0xe7, 0x6c, 0xcd, 0xd5, 0x49, 0xd1, 0x23, 0x93, - 0x0f, 0x79, 0xb4, 0xa6, 0x1b, 0x61, 0x25, 0xdb, 0xcc, 0xb9, 0x96, 0x9a, 0x6d, 0xf6, 0x68, 0x49, - 0x38, 0x75, 0x72, 0x74, 0xa2, 0xb6, 0x95, 0x72, 0xdc, 0xa3, 0x76, 0x6a, 0xf5, 0x24, 0xc6, 0x98, - 0x42, 0x87, 0x89, 0x1c, 0x8c, 0xee, 0x64, 0xf9, 0x44, 0xf2, 0x02, 0x7b, 0x9e, 0xb9, 0xda, 0x00, - 0x3b, 0x38, 0x78, 0x84, 0x03, 0xdf, 0xee, 0xd2, 0x34, 0x51, 0xf9, 0x33, 0x42, 0x08, 0x89, 0x7e, - 0x78, 0x2e, 0x5e, 0x28, 0xf6, 0xfa, 0x2b, 0x80, 0x32, 0xb7, 0x19, 0x2b, 0x0f, 0xff, 0x4b, 0xe3, - 0xd7, 0x90, 0xc6, 0x9f, 0x41, 0x3d, 0x35, 0x37, 0x53, 0xa7, 0x71, 0xf5, 0x70, 0xed, 0x3c, 0x07, - 0xe9, 0x00, 0x1a, 0x9f, 0x0a, 0xa9, 0x03, 0x2b, 0x73, 0x7a, 0x74, 0x1e, 0x8f, 0x67, 0x50, 0x4f, - 0x8d, 0x65, 0xd4, 0x27, 0x50, 0xcf, 0x6e, 0x26, 0x38, 0xc1, 0xf8, 0x30, 0x42, 0x7d, 0x82, 0xcc, - 0xa1, 0xc5, 0x79, 0x3c, 0x9e, 0x40, 0x35, 0xfe, 0xb6, 0x89, 0x3e, 0xcc, 0x8a, 0xce, 0xd4, 0x6b, - 0xd7, 0x9b, 0xcf, 0xd7, 0xd7, 0x5f, 0xcf, 0x9e, 0x41, 0x3d, 0xf5, 0x96, 0xa9, 0xb6, 0xae, 0xfa, - 0xc1, 0xf3, 0x3c, 0xea, 0x5f, 0x63, 0x06, 0xbe, 0xee, 0x5c, 0xb9, 0xf1, 0xed, 0xa7, 0xeb, 0x7d, - 0x3b, 0x38, 0x1a, 0x76, 0xd8, 0x29, 0xd7, 0x04, 0xe6, 0xc7, 0x36, 0x91, 0x5f, 0x6b, 0x61, 0xd2, - 0x58, 0xe3, 0x94, 0xd6, 0xb8, 0xb4, 0x83, 0x4e, 0xa7, 0xc4, 0x7f, 0xef, 0xff, 0x27, 0x00, 0x00, - 0xff, 0xff, 0x7b, 0x37, 0x4e, 0x07, 0xb9, 0x28, 0x00, 0x00, + // 2345 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xec, 0x39, 0xcd, 0x6f, 0x1c, 0x49, + 0xf5, 0xee, 0x99, 0xf1, 0xd8, 0xf3, 0x66, 0x3c, 0x1e, 0x97, 0x13, 0xff, 0x26, 0xf3, 0xcb, 0x66, + 0xb3, 0xbd, 0x9b, 0x0f, 0xb2, 0xac, 0x13, 0x1c, 0x90, 0x58, 0x01, 0x87, 0xc4, 0x26, 0x5e, 0x93, + 0x8d, 0xd7, 0xb4, 0x9d, 0x00, 0x51, 0xa4, 0xa6, 0x67, 0xba, 0x66, 0xdc, 0x4a, 0x77, 0xd7, 0xa4, + 0xab, 0x67, 0x13, 0x87, 0x0b, 0x07, 0x0e, 0x0b, 0x12, 0xe2, 0x8e, 0x10, 0x27, 0x10, 0xe4, 0xb0, + 0x17, 0xfe, 0x82, 0x5c, 0xf8, 0x37, 0x10, 0x07, 0xf8, 0x13, 0x38, 0x22, 0xa1, 0xfa, 0xe8, 0x9e, + 0xfe, 0xa8, 0xb6, 0xdb, 0xb6, 0xbc, 0x89, 0x10, 0xb7, 0xae, 0xd7, 0xaf, 0xde, 0x7b, 0xf5, 0xbe, + 0xab, 0x1e, 0x2c, 0x3d, 0x9b, 0xe0, 0xe0, 0xc0, 0x1c, 0x10, 0x12, 0xd8, 0xab, 0xe3, 0x80, 0x84, + 0x04, 0x21, 0xcf, 0x71, 0x3f, 0x9f, 0x50, 0xb1, 0x5a, 0xe5, 0xff, 0x7b, 0xad, 0x01, 0xf1, 0x3c, + 0xe2, 0x0b, 0x58, 0xaf, 0x95, 0xc4, 0xe8, 0xb5, 0x1d, 0x3f, 0xc4, 0x81, 0x6f, 0xb9, 0xd1, 0x5f, + 0x3a, 0xd8, 0xc7, 0x9e, 0x25, 0x57, 0x1d, 0xdb, 0x0a, 0xad, 0x24, 0x7d, 0xfd, 0x17, 0x1a, 0xac, + 0xec, 0xee, 0x93, 0xe7, 0xeb, 0xc4, 0x75, 0xf1, 0x20, 0x74, 0x88, 0x4f, 0x0d, 0xfc, 0x6c, 0x82, + 0x69, 0x88, 0x6e, 0x41, 0xad, 0x6f, 0x51, 0xdc, 0xd5, 0x2e, 0x6b, 0xd7, 0x9b, 0x6b, 0x17, 0x57, + 0x53, 0x92, 0x48, 0x11, 0x1e, 0xd0, 0xd1, 0x5d, 0x8b, 0x62, 0x83, 0x63, 0x22, 0x04, 0x35, 0xbb, + 0xbf, 0xb5, 0xd1, 0xad, 0x5c, 0xd6, 0xae, 0x57, 0x0d, 0xfe, 0x8d, 0x3e, 0x80, 0x85, 0x41, 0x4c, + 0x7b, 0x6b, 0x83, 0x76, 0xab, 0x97, 0xab, 0xd7, 0xab, 0x46, 0x1a, 0xa8, 0xff, 0x49, 0x83, 0xff, + 0xcb, 0x89, 0x41, 0xc7, 0xc4, 0xa7, 0x18, 0xdd, 0x86, 0x3a, 0x0d, 0xad, 0x70, 0x42, 0xa5, 0x24, + 0xff, 0xaf, 0x94, 0x64, 0x97, 0xa3, 0x18, 0x12, 0x35, 0xcf, 0xb6, 0xa2, 0x60, 0x8b, 0xbe, 0x01, + 0xe7, 0x1c, 0xff, 0x01, 0xf6, 0x48, 0x70, 0x60, 0x8e, 0x71, 0x30, 0xc0, 0x7e, 0x68, 0x8d, 0x70, + 0x24, 0xe3, 0x72, 0xf4, 0x6f, 0x67, 0xfa, 0x4b, 0xff, 0xa3, 0x06, 0xe7, 0x99, 0xa4, 0x3b, 0x56, + 0x10, 0x3a, 0x67, 0xa0, 0x2f, 0x1d, 0x5a, 0x49, 0x19, 0xbb, 0x55, 0xfe, 0x2f, 0x05, 0x63, 0x38, + 0xe3, 0x88, 0x3d, 0x3b, 0x5b, 0x8d, 0x8b, 0x9b, 0x82, 0xe9, 0x7f, 0x90, 0x86, 0x4d, 0xca, 0x79, + 0x1a, 0x85, 0x66, 0x79, 0x56, 0xf2, 0x3c, 0x4f, 0xa2, 0xce, 0xd7, 0x1a, 0x9c, 0xff, 0x94, 0x58, + 0xf6, 0xd4, 0xf0, 0x5f, 0xbd, 0x3a, 0xbf, 0x07, 0x75, 0x11, 0x25, 0xdd, 0x1a, 0xe7, 0x75, 0x25, + 0xcd, 0x4b, 0x46, 0xd0, 0x54, 0xc2, 0x5d, 0x0e, 0x30, 0xe4, 0x26, 0xfd, 0x77, 0x1a, 0x74, 0x0d, + 0xec, 0x62, 0x8b, 0xe2, 0x37, 0x79, 0x8a, 0x15, 0xa8, 0xfb, 0xc4, 0xc6, 0x5b, 0x1b, 0xfc, 0x14, + 0x55, 0x43, 0xae, 0xf4, 0x7f, 0x48, 0x0d, 0xbf, 0xe5, 0x0e, 0x9b, 0xb0, 0xc2, 0xec, 0x49, 0xac, + 0xf0, 0x7a, 0x6a, 0x85, 0xb7, 0xfd, 0xa4, 0x53, 0x4b, 0xcd, 0xa6, 0x2c, 0xf5, 0x13, 0xb8, 0xb0, + 0x1e, 0x60, 0x2b, 0xc4, 0x3f, 0x64, 0x69, 0x7e, 0x7d, 0xdf, 0xf2, 0x7d, 0xec, 0x46, 0x47, 0xc8, + 0x32, 0xd7, 0x14, 0xcc, 0xbb, 0x30, 0x37, 0x0e, 0xc8, 0x8b, 0x83, 0x58, 0xee, 0x68, 0xa9, 0xff, + 0x59, 0x83, 0x9e, 0x8a, 0xf6, 0x69, 0x32, 0xc2, 0xfb, 0xb0, 0x20, 0xeb, 0x95, 0xa0, 0xc6, 0x79, + 0x36, 0x8c, 0xd6, 0xb3, 0x04, 0x07, 0x74, 0x0b, 0xce, 0x09, 0xa4, 0x00, 0xd3, 0x89, 0x1b, 0xc6, + 0xb8, 0x55, 0x8e, 0x8b, 0xf8, 0x3f, 0x83, 0xff, 0x92, 0x3b, 0xf4, 0x57, 0x1a, 0x5c, 0xd8, 0xc4, + 0x61, 0x6c, 0x44, 0xc6, 0x15, 0xbf, 0xa5, 0x49, 0xf6, 0x4b, 0x0d, 0x7a, 0x2a, 0x59, 0x4f, 0xa3, + 0xd6, 0xc7, 0xb0, 0x12, 0xf3, 0x30, 0x6d, 0x4c, 0x07, 0x81, 0x33, 0xe6, 0xce, 0xcc, 0x53, 0x6e, + 0x73, 0xed, 0xfd, 0xd5, 0x7c, 0x4b, 0xb0, 0x9a, 0x95, 0xe0, 0x7c, 0x4c, 0x62, 0x23, 0x41, 0x41, + 0xff, 0xb5, 0x06, 0xe7, 0x37, 0x71, 0xb8, 0x8b, 0x47, 0x1e, 0xf6, 0xc3, 0x2d, 0x7f, 0x48, 0x4e, + 0xae, 0xd7, 0x4b, 0x00, 0x54, 0xd2, 0x89, 0xcb, 0x41, 0x02, 0x52, 0x46, 0xc7, 0xbc, 0xfb, 0xc8, + 0xca, 0x73, 0x1a, 0xdd, 0x7d, 0x0b, 0x66, 0x1d, 0x7f, 0x48, 0x22, 0x55, 0xbd, 0xab, 0x52, 0x55, + 0x92, 0x99, 0xc0, 0xd6, 0xff, 0x5d, 0x81, 0x95, 0x3b, 0xb6, 0xad, 0x0a, 0xbb, 0xe3, 0xeb, 0x65, + 0x1a, 0xdd, 0x95, 0x64, 0x74, 0x97, 0xf2, 0xb9, 0x5c, 0x48, 0xd5, 0x8e, 0x11, 0x52, 0xb3, 0x45, + 0x21, 0x85, 0x36, 0x61, 0x81, 0x62, 0xfc, 0xd4, 0x1c, 0x13, 0xca, 0x7d, 0xa2, 0x5b, 0xe7, 0xa7, + 0xd1, 0xd3, 0xa7, 0x89, 0x3b, 0xc7, 0x07, 0x74, 0xb4, 0x23, 0x31, 0x8d, 0x16, 0xdb, 0x18, 0xad, + 0xd0, 0x43, 0x58, 0x19, 0xb9, 0xa4, 0x6f, 0xb9, 0x26, 0xc5, 0x96, 0x8b, 0x6d, 0x53, 0xda, 0x9b, + 0x76, 0xe7, 0xca, 0x29, 0xfc, 0x9c, 0xd8, 0xbe, 0xcb, 0x77, 0xcb, 0x1f, 0x54, 0xff, 0xbb, 0x06, + 0x17, 0x0c, 0xec, 0x91, 0xcf, 0xf1, 0x7f, 0xab, 0x09, 0xf4, 0x7f, 0x56, 0x60, 0xe5, 0x47, 0x56, + 0x38, 0xd8, 0xdf, 0xf0, 0x24, 0x88, 0xbe, 0x99, 0xf3, 0x95, 0x29, 0x50, 0x71, 0x18, 0xcd, 0xaa, + 0xac, 0xca, 0xee, 0x10, 0xab, 0x8f, 0xe4, 0x91, 0x13, 0x61, 0x94, 0xa8, 0xe0, 0xf5, 0x13, 0x54, + 0x70, 0xb4, 0x0e, 0x0b, 0xf8, 0xc5, 0xc0, 0x9d, 0xd8, 0xd8, 0x14, 0xdc, 0x85, 0x4f, 0x5d, 0x52, + 0x70, 0x4f, 0xba, 0x54, 0x4b, 0x6e, 0xda, 0xe2, 0xa1, 0xfc, 0x5a, 0x83, 0x0b, 0x42, 0xcf, 0xd8, + 0x0d, 0xad, 0x37, 0xab, 0xea, 0x58, 0x8d, 0xb5, 0xe3, 0xa8, 0x51, 0xff, 0x6d, 0x0d, 0x16, 0xe5, + 0x01, 0x59, 0xdf, 0xc6, 0x7e, 0xa1, 0x8b, 0xd0, 0x88, 0x53, 0xab, 0x2c, 0xfd, 0x53, 0x00, 0xba, + 0x0c, 0xcd, 0x84, 0xfd, 0xa4, 0xa4, 0x49, 0x50, 0x29, 0x71, 0xa3, 0x42, 0x59, 0x4b, 0x14, 0xca, + 0x77, 0x00, 0x86, 0xee, 0x84, 0xee, 0x9b, 0xa1, 0xe3, 0x61, 0xd9, 0xae, 0x34, 0x38, 0x64, 0xcf, + 0xf1, 0x30, 0xba, 0x03, 0xad, 0xbe, 0xe3, 0xbb, 0x64, 0x64, 0x8e, 0xad, 0x70, 0x9f, 0x76, 0xeb, + 0x85, 0x16, 0xbb, 0xe7, 0x60, 0xd7, 0xbe, 0xcb, 0x71, 0x8d, 0xa6, 0xd8, 0xb3, 0xc3, 0xb6, 0xa0, + 0x4b, 0xd0, 0xf4, 0x27, 0x9e, 0x49, 0x86, 0x66, 0x40, 0x9e, 0x33, 0x9b, 0x73, 0x16, 0xfe, 0xc4, + 0xfb, 0x6c, 0x68, 0x90, 0xe7, 0x14, 0x7d, 0x17, 0x1a, 0x2c, 0xb9, 0x53, 0x97, 0x8c, 0x68, 0x77, + 0xbe, 0x14, 0xfd, 0xe9, 0x06, 0xb6, 0xdb, 0x66, 0x8e, 0xc0, 0x77, 0x37, 0xca, 0xed, 0x8e, 0x37, + 0xa0, 0xab, 0xd0, 0x1e, 0x10, 0x6f, 0x6c, 0x71, 0x0d, 0xdd, 0x0b, 0x88, 0xd7, 0x05, 0x1e, 0x2d, + 0x19, 0x28, 0xba, 0x07, 0x4d, 0xc7, 0xb7, 0xf1, 0x0b, 0xe9, 0xb7, 0x4d, 0xce, 0xe7, 0x8a, 0x2a, + 0x17, 0x3e, 0xc2, 0x03, 0xce, 0x6b, 0x8b, 0xa1, 0x73, 0xa3, 0x83, 0x13, 0x7d, 0x52, 0xf4, 0x1e, + 0xb4, 0xa4, 0x51, 0x4d, 0xea, 0xbc, 0xc4, 0xdd, 0x96, 0x30, 0xa4, 0x84, 0xed, 0x3a, 0x2f, 0xb1, + 0xfe, 0x97, 0x0a, 0x2c, 0xe5, 0x88, 0xb0, 0xc6, 0x6f, 0xc8, 0x21, 0x91, 0x73, 0x44, 0x4b, 0x46, + 0x12, 0xfb, 0x56, 0xdf, 0x65, 0x31, 0x65, 0xe3, 0x17, 0xdc, 0x37, 0xe6, 0x8d, 0xa6, 0x80, 0x71, + 0x02, 0xcc, 0xc6, 0x42, 0x7a, 0xdf, 0xf2, 0xb0, 0x6c, 0xcc, 0x1a, 0x1c, 0xb2, 0x6d, 0x79, 0x98, + 0xd1, 0x16, 0x22, 0x46, 0x9e, 0x11, 0x2d, 0xd9, 0x9f, 0xfe, 0xc4, 0xe1, 0x5c, 0x85, 0x67, 0x44, + 0x4b, 0xb4, 0x01, 0x2d, 0x41, 0x72, 0x6c, 0x05, 0x96, 0x17, 0xf9, 0xc5, 0x7b, 0xca, 0x78, 0xbb, + 0x8f, 0x0f, 0x1e, 0x59, 0xee, 0x04, 0xef, 0x58, 0x4e, 0x60, 0x08, 0x3d, 0xee, 0xf0, 0x5d, 0xe8, + 0x3a, 0x74, 0x04, 0x95, 0xa1, 0xe3, 0x62, 0xe9, 0x61, 0x2c, 0x27, 0x34, 0x8c, 0x36, 0x87, 0xdf, + 0x73, 0x5c, 0x2c, 0x9c, 0x28, 0x3e, 0x02, 0x57, 0xdb, 0xbc, 0xf0, 0x21, 0x0e, 0xe1, 0x4a, 0x7b, + 0x55, 0x81, 0x65, 0x16, 0x4a, 0x51, 0xc1, 0x39, 0x79, 0x3a, 0x78, 0x07, 0xc0, 0xa6, 0xa1, 0x99, + 0x4a, 0x09, 0x0d, 0x9b, 0x86, 0xdb, 0x22, 0x2b, 0x7c, 0x1c, 0x45, 0x7c, 0xb5, 0xb8, 0x55, 0xcb, + 0x84, 0x76, 0x3e, 0x79, 0x9e, 0xe4, 0x12, 0xca, 0xca, 0x16, 0x25, 0x93, 0x60, 0x80, 0xcd, 0xd4, + 0xd5, 0xa2, 0x25, 0x80, 0xdb, 0xea, 0xa4, 0x55, 0x57, 0xb4, 0x64, 0x7f, 0xd3, 0x60, 0x45, 0xde, + 0xa3, 0x4e, 0xaf, 0xae, 0xa2, 0xec, 0x19, 0xa5, 0x9a, 0xea, 0x21, 0x3d, 0x79, 0xad, 0x44, 0xf1, + 0x9a, 0x55, 0x14, 0xaf, 0x74, 0x5f, 0x5a, 0xcf, 0xf6, 0xa5, 0xfa, 0x6f, 0x34, 0x58, 0xf9, 0xc4, + 0xf2, 0x6d, 0x32, 0x1c, 0x9e, 0xfe, 0x80, 0xeb, 0x71, 0xc4, 0x6e, 0x1d, 0xa7, 0xef, 0x4c, 0x6d, + 0xd2, 0xbf, 0xa8, 0x00, 0x62, 0xee, 0x70, 0xd7, 0x72, 0x2d, 0x7f, 0x80, 0x4f, 0x2e, 0xcd, 0x15, + 0x68, 0xa7, 0x9c, 0x20, 0x7e, 0xf5, 0x4a, 0x7a, 0x01, 0x45, 0xf7, 0xa1, 0xdd, 0x17, 0xac, 0xcc, + 0x00, 0x5b, 0x94, 0xf8, 0xdc, 0x0e, 0xed, 0xb5, 0x0f, 0x54, 0x62, 0xef, 0x05, 0xce, 0x68, 0x84, + 0x83, 0x75, 0xe2, 0xdb, 0xa2, 0x23, 0x5c, 0xe8, 0x47, 0x62, 0xb2, 0xad, 0xe8, 0x5d, 0x68, 0x4e, + 0x23, 0x22, 0x6a, 0x27, 0x20, 0x0e, 0x09, 0x8a, 0x3e, 0x84, 0xa5, 0x74, 0xb3, 0x38, 0x35, 0x5c, + 0x87, 0x26, 0xfb, 0x40, 0x66, 0x9c, 0x9f, 0x01, 0x8a, 0x1b, 0x24, 0x5e, 0xc6, 0x79, 0x7a, 0x2b, + 0x73, 0xf7, 0xbd, 0x08, 0x0d, 0x3b, 0xda, 0x29, 0x6f, 0xa2, 0x53, 0x00, 0x0b, 0x0f, 0x21, 0xa1, + 0xe9, 0x12, 0xcb, 0xc6, 0x76, 0x54, 0x00, 0x05, 0xf0, 0x53, 0x0e, 0xd3, 0xbf, 0xac, 0x40, 0x27, + 0xd9, 0x80, 0x96, 0xe6, 0x7d, 0x36, 0x37, 0xe1, 0x43, 0xba, 0xed, 0xda, 0x29, 0xba, 0xed, 0xfc, + 0x6d, 0x60, 0xf6, 0x64, 0xb7, 0x01, 0xfd, 0xf7, 0x1a, 0x2c, 0x66, 0x2e, 0x9e, 0xd9, 0x56, 0x44, + 0xcb, 0xb7, 0x22, 0xdf, 0x86, 0x59, 0x56, 0x9f, 0x31, 0x57, 0x52, 0x3b, 0xcb, 0x56, 0x75, 0x9d, + 0x35, 0xc4, 0x06, 0x74, 0x13, 0x96, 0x15, 0xcf, 0x8b, 0xd2, 0x94, 0x28, 0xff, 0xba, 0xa8, 0xff, + 0xbc, 0x06, 0xcd, 0x84, 0x3e, 0x8e, 0xe8, 0xa2, 0xb2, 0x96, 0xae, 0x28, 0x2c, 0x9d, 0x39, 0x5e, + 0x35, 0x7f, 0xbc, 0x82, 0x67, 0x38, 0x74, 0x01, 0xe6, 0x3d, 0xec, 0x89, 0x02, 0x25, 0xab, 0xa5, + 0x87, 0x3d, 0x56, 0x9e, 0xd8, 0x2f, 0xd6, 0x02, 0xf1, 0xfe, 0x47, 0xa4, 0xe4, 0x39, 0x7f, 0xe2, + 0xf1, 0xee, 0x27, 0x5d, 0x9b, 0xe7, 0x0e, 0xa9, 0xcd, 0xf3, 0xe9, 0xda, 0x9c, 0x0a, 0x87, 0x46, + 0x36, 0x1c, 0xca, 0x36, 0x36, 0xb7, 0x60, 0x79, 0xc0, 0x5f, 0x8d, 0xec, 0xbb, 0x07, 0xeb, 0xf1, + 0xaf, 0x6e, 0x93, 0x37, 0x11, 0xaa, 0x5f, 0xe8, 0x1e, 0x73, 0x2e, 0xd9, 0xc2, 0x70, 0x2b, 0xb7, + 0xb8, 0x95, 0xd5, 0xa5, 0x5f, 0xda, 0x46, 0x18, 0x39, 0xca, 0x89, 0x7c, 0x95, 0x6d, 0xa9, 0x16, + 0x4e, 0xd8, 0x52, 0xe9, 0x5f, 0x54, 0xa1, 0x3d, 0x2d, 0x9a, 0xa5, 0x23, 0xba, 0xcc, 0x6b, 0xf7, + 0x36, 0x74, 0xa6, 0x0f, 0x35, 0xfc, 0xb0, 0x87, 0xd6, 0xfd, 0xec, 0x13, 0xcd, 0xe2, 0x38, 0x13, + 0x3a, 0x1f, 0x43, 0x83, 0x25, 0x27, 0x33, 0x3c, 0x18, 0x63, 0xee, 0x3c, 0xed, 0x6c, 0xd2, 0x17, + 0x84, 0x58, 0xb6, 0xda, 0x3b, 0x18, 0x63, 0x63, 0xde, 0x95, 0x5f, 0xa7, 0x7c, 0x3b, 0x45, 0xb7, + 0xe1, 0x7c, 0x20, 0x4a, 0xbe, 0x6d, 0xa6, 0x8e, 0x2d, 0xaa, 0xe7, 0xb9, 0xe8, 0xe7, 0x4e, 0xf2, + 0xf8, 0x05, 0xd1, 0x38, 0x57, 0x18, 0x8d, 0x0f, 0x61, 0xf9, 0xa1, 0x4f, 0x27, 0x7d, 0x3a, 0x08, + 0x9c, 0x3e, 0x8e, 0xae, 0x66, 0xa5, 0xcc, 0xd1, 0x83, 0x79, 0x99, 0x2e, 0x85, 0x29, 0x1a, 0x46, + 0xbc, 0xd6, 0x7f, 0xa5, 0xc1, 0x4a, 0x9e, 0x2e, 0xb7, 0xf4, 0x34, 0x16, 0xb5, 0x54, 0x2c, 0xfe, + 0x18, 0x96, 0xa7, 0xe4, 0xcd, 0x14, 0xe5, 0xe6, 0xda, 0x35, 0x95, 0xce, 0x15, 0x82, 0x1b, 0x68, + 0x4a, 0x23, 0x82, 0xe9, 0xff, 0xd2, 0x60, 0x49, 0x7a, 0x35, 0x83, 0x8d, 0xf8, 0xad, 0x94, 0xd5, + 0x07, 0xe2, 0xbb, 0x8e, 0x1f, 0x37, 0x67, 0xf2, 0x8c, 0x02, 0x28, 0x9b, 0xb3, 0x4f, 0x60, 0x51, + 0x22, 0xc5, 0x69, 0xbe, 0x64, 0x37, 0xd1, 0x16, 0xfb, 0xe2, 0x04, 0x7f, 0x05, 0xda, 0x64, 0x38, + 0x4c, 0xf2, 0x13, 0x79, 0x6a, 0x41, 0x42, 0x25, 0xc3, 0x1f, 0x40, 0x27, 0x42, 0x3b, 0x6e, 0x61, + 0x59, 0x94, 0x1b, 0xe3, 0x17, 0x9c, 0x5f, 0x6a, 0xd0, 0x4d, 0x97, 0x99, 0xc4, 0xf1, 0x8f, 0xdf, + 0xc8, 0x7c, 0x27, 0xfd, 0x8e, 0x77, 0xe5, 0x10, 0x79, 0xa6, 0x7c, 0x64, 0x27, 0x7d, 0xe3, 0x25, + 0xb4, 0xd3, 0xb1, 0x86, 0x5a, 0x30, 0xbf, 0x4d, 0xc2, 0xef, 0xbf, 0x70, 0x68, 0xd8, 0x99, 0x41, + 0x6d, 0x80, 0x6d, 0x12, 0xee, 0x04, 0x98, 0x62, 0x3f, 0xec, 0x68, 0x08, 0xa0, 0xfe, 0x99, 0xbf, + 0xe1, 0xd0, 0xa7, 0x9d, 0x0a, 0x5a, 0x96, 0x15, 0xcd, 0x72, 0xb7, 0xa4, 0x03, 0x77, 0xaa, 0x6c, + 0x7b, 0xbc, 0xaa, 0xa1, 0x0e, 0xb4, 0x62, 0x94, 0xcd, 0x9d, 0x87, 0x9d, 0x59, 0xd4, 0x80, 0x59, + 0xf1, 0x59, 0xbf, 0x61, 0x43, 0x27, 0xdb, 0x30, 0x31, 0x9a, 0x0f, 0xfd, 0xfb, 0x3e, 0x79, 0x1e, + 0x83, 0x3a, 0x33, 0xa8, 0x09, 0x73, 0xb2, 0x09, 0xed, 0x68, 0x68, 0x11, 0x9a, 0x89, 0xfe, 0xaf, + 0x53, 0x61, 0x80, 0xcd, 0x60, 0x3c, 0x90, 0x9d, 0xa0, 0x10, 0x81, 0x59, 0x6d, 0x83, 0x3c, 0xf7, + 0x3b, 0xb5, 0x1b, 0x77, 0x60, 0x3e, 0x4a, 0x02, 0xec, 0x34, 0x82, 0x3a, 0x5b, 0x75, 0x66, 0xd0, + 0x12, 0x2c, 0xa4, 0xa6, 0x3d, 0x1d, 0x0d, 0x21, 0x68, 0xbb, 0xa9, 0x11, 0x5b, 0xa7, 0xb2, 0xf6, + 0xd7, 0x26, 0x80, 0xe8, 0x75, 0x08, 0x09, 0x6c, 0x34, 0x06, 0xb4, 0x89, 0x43, 0x96, 0xc7, 0x89, + 0x1f, 0xe5, 0x60, 0x8a, 0x6e, 0x15, 0xb4, 0x04, 0x79, 0x54, 0x29, 0x69, 0xef, 0x6a, 0xc1, 0x8e, + 0x0c, 0xba, 0x3e, 0x83, 0x3c, 0xce, 0x71, 0xcf, 0xf1, 0xf0, 0x9e, 0x33, 0x78, 0x1a, 0x37, 0x49, + 0xc5, 0x1c, 0x33, 0xa8, 0x11, 0xc7, 0x4c, 0xae, 0x95, 0x8b, 0xdd, 0x30, 0x70, 0xfc, 0x51, 0xf4, + 0x98, 0xac, 0xcf, 0xa0, 0x67, 0x70, 0x6e, 0x13, 0x73, 0xee, 0x0e, 0x0d, 0x9d, 0x01, 0x8d, 0x18, + 0xae, 0x15, 0x33, 0xcc, 0x21, 0x1f, 0x93, 0xa5, 0x0b, 0x8b, 0x99, 0x91, 0x36, 0xba, 0xa1, 0x74, + 0x64, 0xe5, 0xf8, 0xbd, 0xf7, 0x61, 0x29, 0xdc, 0x98, 0x9b, 0x03, 0xed, 0xf4, 0xb8, 0x17, 0x7d, + 0xad, 0x88, 0x40, 0x6e, 0x3e, 0xd6, 0xbb, 0x51, 0x06, 0x35, 0x66, 0xf5, 0x18, 0xda, 0xe9, 0x81, + 0xa2, 0x9a, 0x95, 0x72, 0xe8, 0xd8, 0x3b, 0xec, 0x1d, 0x5f, 0x9f, 0x41, 0x3f, 0x85, 0xa5, 0xdc, + 0x14, 0x0f, 0x7d, 0x5d, 0x45, 0xbe, 0x68, 0xd8, 0x77, 0x14, 0x07, 0x29, 0xfd, 0x54, 0x8b, 0xc5, + 0xd2, 0xe7, 0xc6, 0xb9, 0xe5, 0xa5, 0x4f, 0x90, 0x3f, 0x4c, 0xfa, 0x63, 0x73, 0x98, 0x00, 0xca, + 0xcf, 0xf1, 0xd0, 0x47, 0x2a, 0x16, 0x85, 0xb3, 0xc4, 0xde, 0x6a, 0x59, 0xf4, 0xd8, 0xe4, 0x13, + 0x1e, 0xad, 0xd9, 0x66, 0x5f, 0xc9, 0xb6, 0x70, 0x76, 0xa7, 0x66, 0x5b, 0x3c, 0x3e, 0x13, 0x4e, + 0x9d, 0x1e, 0x0f, 0xa9, 0x6d, 0xa5, 0x1c, 0x69, 0xa9, 0x9d, 0x5a, 0x3d, 0x6d, 0xd2, 0x67, 0xd0, + 0x5e, 0x2a, 0x07, 0xa3, 0xab, 0x45, 0x3e, 0x91, 0xbe, 0xa4, 0x1f, 0x65, 0x2e, 0x13, 0x60, 0x13, + 0x87, 0x0f, 0x70, 0x18, 0x38, 0x03, 0x9a, 0x25, 0x2a, 0x17, 0x53, 0x84, 0x88, 0xe8, 0xb5, 0x23, + 0xf1, 0x22, 0xb1, 0xd7, 0x5e, 0x01, 0x34, 0xb8, 0xcd, 0x58, 0x79, 0xf8, 0x5f, 0x1a, 0x3f, 0x83, + 0x34, 0xfe, 0x04, 0x16, 0x33, 0xb3, 0x41, 0x75, 0x1a, 0x57, 0x0f, 0x10, 0x8f, 0x72, 0x90, 0x3e, + 0xa0, 0xfc, 0xe4, 0x4b, 0x1d, 0x58, 0x85, 0x13, 0xb2, 0xa3, 0x78, 0x3c, 0x81, 0xc5, 0xcc, 0xe8, + 0x49, 0x7d, 0x02, 0xf5, 0x7c, 0xaa, 0xc4, 0x09, 0xf2, 0x03, 0x17, 0xf5, 0x09, 0x0a, 0x07, 0x33, + 0x47, 0xf1, 0x78, 0x04, 0xad, 0xe4, 0xfb, 0x2d, 0xba, 0x56, 0x14, 0x9d, 0x99, 0x17, 0xbd, 0x37, + 0x9f, 0xaf, 0xcf, 0xbe, 0x9e, 0x3d, 0x81, 0xc5, 0xcc, 0x7b, 0xad, 0xda, 0xba, 0xea, 0x47, 0xdd, + 0xa3, 0xa8, 0x7f, 0x85, 0x19, 0xf8, 0xac, 0x73, 0xe5, 0xdd, 0x6f, 0x3e, 0x5e, 0x1b, 0x39, 0xe1, + 0xfe, 0xa4, 0xcf, 0x4e, 0x79, 0x53, 0x60, 0x7e, 0xe4, 0x10, 0xf9, 0x75, 0x33, 0x4a, 0x1a, 0x37, + 0x39, 0xa5, 0x9b, 0x5c, 0xda, 0x71, 0xbf, 0x5f, 0xe7, 0xcb, 0xdb, 0xff, 0x09, 0x00, 0x00, 0xff, + 0xff, 0x0a, 0x67, 0x23, 0x4a, 0x9d, 0x29, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. diff --git a/internal/querycoord/channel_unsubscribe.go b/internal/querycoord/channel_unsubscribe.go new file mode 100644 index 000000000..8ada8686f --- /dev/null +++ b/internal/querycoord/channel_unsubscribe.go @@ -0,0 +1,174 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package querycoord + +import ( + "container/list" + "context" + "fmt" + "sync" + "time" + + "github.com/golang/protobuf/proto" + "go.uber.org/zap" + + etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" + "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/msgstream" + "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/internal/util/funcutil" +) + +const ( + unsubscribeChannelInfoPrefix = "queryCoord-unsubscribeChannelInfo" + unsubscribeChannelCheckInterval = time.Second +) + +type channelUnsubscribeHandler struct { + ctx context.Context + cancel context.CancelFunc + kvClient *etcdkv.EtcdKV + factory msgstream.Factory + + channelInfos *list.List + downNodeChan chan int64 + + wg sync.WaitGroup +} + +// newChannelUnsubscribeHandler create a new handler service to unsubscribe channels +func newChannelUnsubscribeHandler(ctx context.Context, kv *etcdkv.EtcdKV, factory msgstream.Factory) (*channelUnsubscribeHandler, error) { + childCtx, cancel := context.WithCancel(ctx) + handler := &channelUnsubscribeHandler{ + ctx: childCtx, + cancel: cancel, + kvClient: kv, + factory: factory, + + channelInfos: list.New(), + //TODO:: if the query nodes that are down exceed 1024, query coord will not be able to restart + downNodeChan: make(chan int64, 1024), + } + + err := handler.reloadFromKV() + if err != nil { + return nil, err + } + + return handler, nil +} + +// reloadFromKV reload unsolved channels to unsubscribe +func (csh *channelUnsubscribeHandler) reloadFromKV() error { + log.Debug("start reload unsubscribe channelInfo from kv") + _, channelInfoValues, err := csh.kvClient.LoadWithPrefix(unsubscribeChannelInfoPrefix) + if err != nil { + return err + } + for _, value := range channelInfoValues { + channelInfo := &querypb.UnsubscribeChannelInfo{} + err = proto.Unmarshal([]byte(value), channelInfo) + if err != nil { + return err + } + csh.channelInfos.PushBack(channelInfo) + csh.downNodeChan <- channelInfo.NodeID + } + + return nil +} + +// addUnsubscribeChannelInfo add channel info to handler service, and persistent to etcd +func (csh *channelUnsubscribeHandler) addUnsubscribeChannelInfo(info *querypb.UnsubscribeChannelInfo) { + nodeID := info.NodeID + channelInfoValue, err := proto.Marshal(info) + if err != nil { + panic(err) + } + // when queryCoord is restarted multiple times, the nodeID of added channelInfo may be the same + hasEnqueue := false + for e := csh.channelInfos.Back(); e != nil; e = e.Prev() { + if e.Value.(*querypb.UnsubscribeChannelInfo).NodeID == nodeID { + hasEnqueue = true + } + } + + if !hasEnqueue { + channelInfoKey := fmt.Sprintf("%s/%d", unsubscribeChannelInfoPrefix, nodeID) + err = csh.kvClient.Save(channelInfoKey, string(channelInfoValue)) + if err != nil { + panic(err) + } + csh.channelInfos.PushBack(info) + csh.downNodeChan <- info.NodeID + log.Debug("add unsubscribeChannelInfo to handler", zap.Int64("nodeID", info.NodeID)) + } +} + +// handleChannelUnsubscribeLoop handle the unsubscription of channels which query node has watched +func (csh *channelUnsubscribeHandler) handleChannelUnsubscribeLoop() { + defer csh.wg.Done() + for { + select { + case <-csh.ctx.Done(): + log.Debug("channelUnsubscribeHandler ctx done, handleChannelUnsubscribeLoop end") + return + case <-csh.downNodeChan: + channelInfo := csh.channelInfos.Front().Value.(*querypb.UnsubscribeChannelInfo) + nodeID := channelInfo.NodeID + for _, collectionChannels := range channelInfo.CollectionChannels { + collectionID := collectionChannels.CollectionID + subName := funcutil.GenChannelSubName(Params.QueryNodeCfg.MsgChannelSubName, collectionID, nodeID) + err := unsubscribeChannels(csh.ctx, csh.factory, subName, collectionChannels.Channels) + if err != nil { + log.Debug("unsubscribe channels failed", zap.Int64("nodeID", nodeID)) + panic(err) + } + } + + channelInfoKey := fmt.Sprintf("%s/%d", unsubscribeChannelInfoPrefix, nodeID) + err := csh.kvClient.Remove(channelInfoKey) + if err != nil { + log.Error("remove unsubscribe channelInfo from etcd failed", zap.Int64("nodeID", nodeID)) + panic(err) + } + log.Debug("unsubscribe channels success", zap.Int64("nodeID", nodeID)) + } + } +} + +func (csh *channelUnsubscribeHandler) start() { + csh.wg.Add(1) + go csh.handleChannelUnsubscribeLoop() +} + +func (csh *channelUnsubscribeHandler) close() { + csh.cancel() + csh.wg.Wait() +} + +// unsubscribeChannels create consumer fist, and unsubscribe channel through msgStream.close() +func unsubscribeChannels(ctx context.Context, factory msgstream.Factory, subName string, channels []string) error { + msgStream, err := factory.NewMsgStream(ctx) + if err != nil { + return err + } + + msgStream.AsConsumer(channels, subName) + msgStream.Close() + return nil +} diff --git a/internal/querycoord/channel_unsubscribe_test.go b/internal/querycoord/channel_unsubscribe_test.go new file mode 100644 index 000000000..a81b2babd --- /dev/null +++ b/internal/querycoord/channel_unsubscribe_test.go @@ -0,0 +1,131 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package querycoord + +import ( + "context" + "fmt" + "testing" + + "github.com/golang/protobuf/proto" + "github.com/stretchr/testify/assert" + + etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" + "github.com/milvus-io/milvus/internal/msgstream" + "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/internal/util/etcd" +) + +func Test_HandlerReloadFromKV(t *testing.T) { + refreshParams() + baseCtx, cancel := context.WithCancel(context.Background()) + etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) + assert.Nil(t, err) + defer etcdCli.Close() + kv := etcdkv.NewEtcdKV(etcdCli, Params.BaseParams.MetaRootPath) + + channelInfoKey := fmt.Sprintf("%s/%d", unsubscribeChannelInfoPrefix, defaultQueryNodeID) + unsubscribeChannelInfo := &querypb.UnsubscribeChannelInfo{ + NodeID: defaultQueryNodeID, + } + channelInfoBytes, err := proto.Marshal(unsubscribeChannelInfo) + assert.Nil(t, err) + + err = kv.Save(channelInfoKey, string(channelInfoBytes)) + assert.Nil(t, err) + + factory := msgstream.NewPmsFactory() + handler, err := newChannelUnsubscribeHandler(baseCtx, kv, factory) + assert.Nil(t, err) + assert.Equal(t, 1, len(handler.downNodeChan)) + + cancel() +} + +func Test_AddUnsubscribeChannelInfo(t *testing.T) { + refreshParams() + baseCtx, cancel := context.WithCancel(context.Background()) + etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) + assert.Nil(t, err) + defer etcdCli.Close() + kv := etcdkv.NewEtcdKV(etcdCli, Params.BaseParams.MetaRootPath) + factory := msgstream.NewPmsFactory() + handler, err := newChannelUnsubscribeHandler(baseCtx, kv, factory) + assert.Nil(t, err) + + collectionChannels := &querypb.UnsubscribeChannels{ + CollectionID: defaultCollectionID, + Channels: []string{"test-channel"}, + } + unsubscribeChannelInfo := &querypb.UnsubscribeChannelInfo{ + NodeID: defaultQueryNodeID, + CollectionChannels: []*querypb.UnsubscribeChannels{collectionChannels}, + } + + handler.addUnsubscribeChannelInfo(unsubscribeChannelInfo) + frontValue := handler.channelInfos.Front() + assert.NotNil(t, frontValue) + assert.Equal(t, defaultQueryNodeID, frontValue.Value.(*querypb.UnsubscribeChannelInfo).NodeID) + + // repeat nodeID which has down + handler.addUnsubscribeChannelInfo(unsubscribeChannelInfo) + assert.Equal(t, 1, len(handler.downNodeChan)) + + cancel() +} + +func Test_HandleChannelUnsubscribeLoop(t *testing.T) { + refreshParams() + baseCtx, cancel := context.WithCancel(context.Background()) + etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) + assert.Nil(t, err) + defer etcdCli.Close() + kv := etcdkv.NewEtcdKV(etcdCli, Params.BaseParams.MetaRootPath) + factory := msgstream.NewPmsFactory() + m := map[string]interface{}{ + "PulsarAddress": Params.PulsarCfg.Address, + "ReceiveBufSize": 1024, + "PulsarBufSize": 1024} + factory.SetParams(m) + handler, err := newChannelUnsubscribeHandler(baseCtx, kv, factory) + assert.Nil(t, err) + + collectionChannels := &querypb.UnsubscribeChannels{ + CollectionID: defaultCollectionID, + Channels: []string{"test-channel"}, + } + unsubscribeChannelInfo := &querypb.UnsubscribeChannelInfo{ + NodeID: defaultQueryNodeID, + CollectionChannels: []*querypb.UnsubscribeChannels{collectionChannels}, + } + + handler.addUnsubscribeChannelInfo(unsubscribeChannelInfo) + channelInfoKey := fmt.Sprintf("%s/%d", unsubscribeChannelInfoPrefix, defaultQueryNodeID) + _, err = kv.Load(channelInfoKey) + assert.Nil(t, err) + + handler.start() + + for { + _, err = kv.Load(channelInfoKey) + if err != nil { + break + } + } + + cancel() +} diff --git a/internal/querycoord/cluster.go b/internal/querycoord/cluster.go index d287c5ecb..c01e90d56 100644 --- a/internal/querycoord/cluster.go +++ b/internal/querycoord/cluster.go @@ -103,6 +103,7 @@ type queryNodeCluster struct { sync.RWMutex clusterMeta Meta + handler *channelUnsubscribeHandler nodes map[int64]Node newNodeFn newQueryNodeFn segmentAllocator SegmentAllocatePolicy @@ -110,7 +111,7 @@ type queryNodeCluster struct { segSizeEstimator func(request *querypb.LoadSegmentsRequest, dataKV kv.DataKV) (int64, error) } -func newQueryNodeCluster(ctx context.Context, clusterMeta Meta, kv *etcdkv.EtcdKV, newNodeFn newQueryNodeFn, session *sessionutil.Session) (Cluster, error) { +func newQueryNodeCluster(ctx context.Context, clusterMeta Meta, kv *etcdkv.EtcdKV, newNodeFn newQueryNodeFn, session *sessionutil.Session, handler *channelUnsubscribeHandler) (Cluster, error) { childCtx, cancel := context.WithCancel(ctx) nodes := make(map[int64]Node) c := &queryNodeCluster{ @@ -119,6 +120,7 @@ func newQueryNodeCluster(ctx context.Context, clusterMeta Meta, kv *etcdkv.EtcdK client: kv, session: session, clusterMeta: clusterMeta, + handler: handler, nodes: nodes, newNodeFn: newNodeFn, segmentAllocator: defaultSegAllocatePolicy(), @@ -546,6 +548,26 @@ func (c *queryNodeCluster) getMetrics(ctx context.Context, in *milvuspb.GetMetri return ret } +// setNodeState update queryNode state, which may be offline, disconnect, online +// when queryCoord restart, it will call setNodeState via the registerNode function +// when the new queryNode starts, queryCoord calls setNodeState via the registerNode function +// when the new queryNode down, queryCoord calls setNodeState via the stopNode function +func (c *queryNodeCluster) setNodeState(nodeID int64, node Node, state nodeState) { + // if query node down, should unsubscribe all channel the node has watched + // if not unsubscribe channel, may result in pulsar having too many backlogs + if state == offline { + // 1. find all the search/dmChannel/deltaChannel the node has watched + unsubscribeChannelInfo := c.clusterMeta.getWatchedChannelsByNodeID(nodeID) + + // 2.add unsubscribed channels to handler, handler will auto unsubscribe channel + if len(unsubscribeChannelInfo.CollectionChannels) != 0 { + c.handler.addUnsubscribeChannelInfo(unsubscribeChannelInfo) + } + } + + node.setState(state) +} + func (c *queryNodeCluster) registerNode(ctx context.Context, session *sessionutil.Session, id UniqueID, state nodeState) error { c.Lock() defer c.Unlock() @@ -566,7 +588,7 @@ func (c *queryNodeCluster) registerNode(ctx context.Context, session *sessionuti log.Debug("registerNode: create a new QueryNode failed", zap.Int64("nodeID", id), zap.Error(err)) return err } - node.setState(state) + c.setNodeState(id, node, state) if state < online { go node.start() } @@ -614,6 +636,7 @@ func (c *queryNodeCluster) stopNode(nodeID int64) { if node, ok := c.nodes[nodeID]; ok { node.stop() + c.setNodeState(nodeID, node, offline) log.Debug("stopNode: queryNode offline", zap.Int64("nodeID", nodeID)) } } diff --git a/internal/querycoord/cluster_test.go b/internal/querycoord/cluster_test.go index 12d1a164f..9f7096099 100644 --- a/internal/querycoord/cluster_test.go +++ b/internal/querycoord/cluster_test.go @@ -424,12 +424,21 @@ func TestReloadClusterFromKV(t *testing.T) { t.Run("Test LoadOfflineNodes", func(t *testing.T) { refreshParams() + ctx, cancel := context.WithCancel(context.Background()) kv := etcdkv.NewEtcdKV(etcdCli, Params.BaseParams.MetaRootPath) clusterSession := sessionutil.NewSession(context.Background(), Params.BaseParams.MetaRootPath, etcdCli) clusterSession.Init(typeutil.QueryCoordRole, Params.QueryCoordCfg.Address, true, false) clusterSession.Register() + factory := msgstream.NewPmsFactory() + handler, err := newChannelUnsubscribeHandler(ctx, kv, factory) + assert.Nil(t, err) + meta, err := newMeta(ctx, kv, factory, nil) + assert.Nil(t, err) + cluster := &queryNodeCluster{ client: kv, + handler: handler, + clusterMeta: meta, nodes: make(map[int64]Node), newNodeFn: newQueryNodeTest, session: clusterSession, @@ -454,6 +463,7 @@ func TestReloadClusterFromKV(t *testing.T) { err = removeAllSession() assert.Nil(t, err) + cancel() }) } @@ -480,11 +490,15 @@ func TestGrpcRequest(t *testing.T) { meta, err := newMeta(baseCtx, kv, factory, idAllocator) assert.Nil(t, err) + handler, err := newChannelUnsubscribeHandler(baseCtx, kv, factory) + assert.Nil(t, err) + cluster := &queryNodeCluster{ ctx: baseCtx, cancel: cancel, client: kv, clusterMeta: meta, + handler: handler, nodes: make(map[int64]Node), newNodeFn: newQueryNodeTest, session: clusterSession, @@ -686,3 +700,71 @@ func TestEstimateSegmentSize(t *testing.T) { assert.NoError(t, err) assert.Equal(t, int64(2048), size) } + +func TestSetNodeState(t *testing.T) { + refreshParams() + baseCtx, cancel := context.WithCancel(context.Background()) + etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) + assert.Nil(t, err) + defer etcdCli.Close() + kv := etcdkv.NewEtcdKV(etcdCli, Params.BaseParams.MetaRootPath) + clusterSession := sessionutil.NewSession(context.Background(), Params.BaseParams.MetaRootPath, etcdCli) + clusterSession.Init(typeutil.QueryCoordRole, Params.QueryCoordCfg.Address, true, false) + clusterSession.Register() + factory := msgstream.NewPmsFactory() + m := map[string]interface{}{ + "PulsarAddress": Params.PulsarCfg.Address, + "ReceiveBufSize": 1024, + "PulsarBufSize": 1024} + err = factory.SetParams(m) + assert.Nil(t, err) + idAllocator := func() (UniqueID, error) { + return 0, nil + } + meta, err := newMeta(baseCtx, kv, factory, idAllocator) + assert.Nil(t, err) + + handler, err := newChannelUnsubscribeHandler(baseCtx, kv, factory) + assert.Nil(t, err) + + cluster := &queryNodeCluster{ + ctx: baseCtx, + cancel: cancel, + client: kv, + clusterMeta: meta, + handler: handler, + nodes: make(map[int64]Node), + newNodeFn: newQueryNodeTest, + session: clusterSession, + segSizeEstimator: segSizeEstimateForTest, + } + + node, err := startQueryNodeServer(baseCtx) + assert.Nil(t, err) + err = cluster.registerNode(baseCtx, node.session, node.queryNodeID, disConnect) + assert.Nil(t, err) + waitQueryNodeOnline(cluster, node.queryNodeID) + + dmChannelWatchInfo := &querypb.DmChannelWatchInfo{ + CollectionID: defaultCollectionID, + DmChannel: "test-dmChannel", + NodeIDLoaded: node.queryNodeID, + } + err = meta.setDmChannelInfos([]*querypb.DmChannelWatchInfo{dmChannelWatchInfo}) + assert.Nil(t, err) + deltaChannelInfo := &datapb.VchannelInfo{ + CollectionID: defaultCollectionID, + ChannelName: "test-deltaChannel", + } + err = meta.setDeltaChannel(defaultCollectionID, []*datapb.VchannelInfo{deltaChannelInfo}) + assert.Nil(t, err) + + nodeInfo, err := cluster.getNodeInfoByID(node.queryNodeID) + assert.Nil(t, err) + cluster.setNodeState(node.queryNodeID, nodeInfo, offline) + assert.Equal(t, 1, len(handler.downNodeChan)) + + node.stop() + removeAllSession() + cancel() +} diff --git a/internal/querycoord/meta.go b/internal/querycoord/meta.go index b36718702..fd106d6aa 100644 --- a/internal/querycoord/meta.go +++ b/internal/querycoord/meta.go @@ -36,6 +36,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/proto/schemapb" + "github.com/milvus-io/milvus/internal/rootcoord" "github.com/milvus-io/milvus/internal/util" ) @@ -86,6 +87,8 @@ type Meta interface { saveGlobalSealedSegInfos(saves col2SegmentInfos) (col2SealedSegmentChangeInfos, error) removeGlobalSealedSegInfos(collectionID UniqueID, partitionIDs []UniqueID) (col2SealedSegmentChangeInfos, error) sendSealedSegmentChangeInfos(collectionID UniqueID, queryChannel string, changeInfos *querypb.SealedSegmentsChangeInfo) (*internalpb.MsgPosition, error) + + getWatchedChannelsByNodeID(nodeID int64) *querypb.UnsubscribeChannelInfo } // MetaReplica records the current load information on all querynodes @@ -995,6 +998,75 @@ func (m *MetaReplica) setLoadPercentage(collectionID UniqueID, partitionID Uniqu return nil } +func (m *MetaReplica) getWatchedChannelsByNodeID(nodeID int64) *querypb.UnsubscribeChannelInfo { + // 1. find all the search/dmChannel/deltaChannel the node has watched + colID2DmChannels := make(map[UniqueID][]string) + colID2DeltaChannels := make(map[UniqueID][]string) + colID2QueryChannel := make(map[UniqueID]string) + dmChannelInfos := m.getDmChannelInfosByNodeID(nodeID) + // get dmChannel/search channel the node has watched + for _, channelInfo := range dmChannelInfos { + collectionID := channelInfo.CollectionID + dmChannel := rootcoord.ToPhysicalChannel(channelInfo.DmChannel) + if _, ok := colID2DmChannels[collectionID]; !ok { + colID2DmChannels[collectionID] = []string{} + } + colID2DmChannels[collectionID] = append(colID2DmChannels[collectionID], dmChannel) + if _, ok := colID2QueryChannel[collectionID]; !ok { + queryChannelInfo := m.getQueryChannelInfoByID(collectionID) + colID2QueryChannel[collectionID] = queryChannelInfo.QueryChannel + } + } + segmentInfos := m.getSegmentInfosByNode(nodeID) + // get delta/search channel the node has watched + for _, segmentInfo := range segmentInfos { + collectionID := segmentInfo.CollectionID + if _, ok := colID2DeltaChannels[collectionID]; !ok { + deltaChanelInfos, err := m.getDeltaChannelsByCollectionID(collectionID) + if err != nil { + // all nodes succeeded in releasing the Data, but queryCoord hasn't cleaned up the meta in time, and a Node went down + // and meta was cleaned after m.getSegmentInfosByNode(nodeID) + continue + } + deltaChannels := make([]string, len(deltaChanelInfos)) + for offset, channelInfo := range deltaChanelInfos { + deltaChannels[offset] = rootcoord.ToPhysicalChannel(channelInfo.ChannelName) + } + colID2DeltaChannels[collectionID] = deltaChannels + } + if _, ok := colID2QueryChannel[collectionID]; !ok { + queryChannelInfo := m.getQueryChannelInfoByID(collectionID) + colID2QueryChannel[collectionID] = queryChannelInfo.QueryChannel + } + } + + // creating unsubscribeChannelInfo, which will be written to etcd + colID2Channels := make(map[UniqueID][]string) + for collectionID, channels := range colID2DmChannels { + colID2Channels[collectionID] = append(colID2Channels[collectionID], channels...) + } + for collectionID, channels := range colID2DeltaChannels { + colID2Channels[collectionID] = append(colID2Channels[collectionID], channels...) + } + for collectionID, channel := range colID2QueryChannel { + colID2Channels[collectionID] = append(colID2Channels[collectionID], channel) + } + + unsubscribeChannelInfo := &querypb.UnsubscribeChannelInfo{ + NodeID: nodeID, + } + + for collectionID, channels := range colID2Channels { + unsubscribeChannelInfo.CollectionChannels = append(unsubscribeChannelInfo.CollectionChannels, + &querypb.UnsubscribeChannels{ + CollectionID: collectionID, + Channels: channels, + }) + } + + return unsubscribeChannelInfo +} + //func (m *MetaReplica) printMeta() { // m.RLock() // defer m.RUnlock() diff --git a/internal/querycoord/query_coord.go b/internal/querycoord/query_coord.go index 7566fd0af..96790c535 100644 --- a/internal/querycoord/query_coord.go +++ b/internal/querycoord/query_coord.go @@ -78,6 +78,7 @@ type QueryCoord struct { queryCoordID uint64 meta Meta cluster Cluster + handler *channelUnsubscribeHandler newNodeFn newQueryNodeFn scheduler *TaskScheduler idAllocator func() (UniqueID, error) @@ -161,8 +162,15 @@ func (qc *QueryCoord) Init() error { return } + // init channelUnsubscribeHandler + qc.handler, initError = newChannelUnsubscribeHandler(qc.loopCtx, qc.kvClient, qc.msFactory) + if initError != nil { + log.Error("query coordinator init channelUnsubscribeHandler failed", zap.Error(initError)) + return + } + // init cluster - qc.cluster, initError = newQueryNodeCluster(qc.loopCtx, qc.meta, qc.kvClient, qc.newNodeFn, qc.session) + qc.cluster, initError = newQueryNodeCluster(qc.loopCtx, qc.meta, qc.kvClient, qc.newNodeFn, qc.session, qc.handler) if initError != nil { log.Error("query coordinator init cluster failed", zap.Error(initError)) return @@ -204,6 +212,9 @@ func (qc *QueryCoord) Start() error { qc.indexChecker.start() log.Debug("start index checker ...") + qc.handler.start() + log.Debug("start channel unsubscribe loop ...") + Params.QueryCoordCfg.CreatedTime = time.Now() Params.QueryCoordCfg.UpdatedTime = time.Now() @@ -237,6 +248,11 @@ func (qc *QueryCoord) Stop() error { log.Debug("close index checker ...") } + if qc.handler != nil { + qc.handler.close() + log.Debug("close channel unsubscribe loop ...") + } + if qc.loopCancel != nil { qc.loopCancel() log.Info("cancel the loop of QueryCoord") diff --git a/internal/querycoord/querynode.go b/internal/querycoord/querynode.go index 079a402ca..77cd95b1f 100644 --- a/internal/querycoord/querynode.go +++ b/internal/querycoord/querynode.go @@ -132,9 +132,9 @@ func (qn *queryNode) start() error { } func (qn *queryNode) stop() { - qn.stateLock.Lock() - defer qn.stateLock.Unlock() - qn.state = offline + //qn.stateLock.Lock() + //defer qn.stateLock.Unlock() + //qn.state = offline if qn.client != nil { qn.client.Stop() } diff --git a/internal/querycoord/segment_allocator_test.go b/internal/querycoord/segment_allocator_test.go index b6edc33df..ca56bc3d7 100644 --- a/internal/querycoord/segment_allocator_test.go +++ b/internal/querycoord/segment_allocator_test.go @@ -24,6 +24,7 @@ import ( etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" minioKV "github.com/milvus-io/milvus/internal/kv/minio" + "github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/util/etcd" "github.com/milvus-io/milvus/internal/util/sessionutil" @@ -39,13 +40,17 @@ func TestShuffleSegmentsToQueryNode(t *testing.T) { kv := etcdkv.NewEtcdKV(etcdCli, Params.BaseParams.MetaRootPath) clusterSession := sessionutil.NewSession(context.Background(), Params.BaseParams.MetaRootPath, etcdCli) clusterSession.Init(typeutil.QueryCoordRole, Params.QueryCoordCfg.Address, true, false) - meta, err := newMeta(baseCtx, kv, nil, nil) + factory := msgstream.NewPmsFactory() + meta, err := newMeta(baseCtx, kv, factory, nil) + assert.Nil(t, err) + handler, err := newChannelUnsubscribeHandler(baseCtx, kv, factory) assert.Nil(t, err) cluster := &queryNodeCluster{ ctx: baseCtx, cancel: cancel, client: kv, clusterMeta: meta, + handler: handler, nodes: make(map[int64]Node), newNodeFn: newQueryNodeTest, session: clusterSession, diff --git a/internal/querycoord/task.go b/internal/querycoord/task.go index 8aaedd9b9..56e5210c9 100644 --- a/internal/querycoord/task.go +++ b/internal/querycoord/task.go @@ -538,6 +538,19 @@ func (rct *releaseCollectionTask) timestamp() Timestamp { return rct.Base.Timestamp } +func (rct *releaseCollectionTask) updateTaskProcess() { + collectionID := rct.CollectionID + parentTask := rct.getParentTask() + if parentTask == nil { + // all queryNodes have successfully released the data, clean up collectionMeta + err := rct.meta.releaseCollection(collectionID) + if err != nil { + log.Error("releaseCollectionTask: release collectionInfo from meta failed", zap.Int64("collectionID", collectionID), zap.Int64("msgID", rct.Base.MsgID), zap.Error(err)) + panic(err) + } + } +} + func (rct *releaseCollectionTask) preExecute(context.Context) error { collectionID := rct.CollectionID rct.setResultInfo(nil) @@ -548,7 +561,8 @@ func (rct *releaseCollectionTask) preExecute(context.Context) error { } func (rct *releaseCollectionTask) execute(ctx context.Context) error { - defer rct.reduceRetryCount() + // cancel the maximum number of retries for queryNode cleaning data until the data is completely freed + // defer rct.reduceRetryCount() collectionID := rct.CollectionID // if nodeID ==0, it means that the release request has not been assigned to the specified query node @@ -593,20 +607,12 @@ func (rct *releaseCollectionTask) execute(ctx context.Context) error { rct.addChildTask(releaseCollectionTask) log.Debug("releaseCollectionTask: add a releaseCollectionTask to releaseCollectionTask's childTask", zap.Any("task", releaseCollectionTask)) } - - //TODO::xige-16 delete collection info from should wait all internal release task execute done - // if some query nodes release collection failed, the collection data will can't be removed from query node - err = rct.meta.releaseCollection(collectionID) - if err != nil { - log.Error("releaseCollectionTask: release collectionInfo from meta failed", zap.Int64("collectionID", collectionID), zap.Int64("msgID", rct.Base.MsgID), zap.Error(err)) - rct.setResultInfo(err) - return err - } } else { err := rct.cluster.releaseCollection(ctx, rct.NodeID, rct.ReleaseCollectionRequest) if err != nil { - log.Error("releaseCollectionTask: release collection end, node occur error", zap.Int64("collectionID", collectionID), zap.Int64("nodeID", rct.NodeID)) - rct.setResultInfo(err) + log.Warn("releaseCollectionTask: release collection end, node occur error", zap.Int64("collectionID", collectionID), zap.Int64("nodeID", rct.NodeID)) + // after release failed, the task will always redo + // if the query node happens to be down, the node release was judged to have succeeded return err } } @@ -911,6 +917,21 @@ func (rpt *releasePartitionTask) timestamp() Timestamp { return rpt.Base.Timestamp } +func (rpt *releasePartitionTask) updateTaskProcess() { + collectionID := rpt.CollectionID + partitionIDs := rpt.PartitionIDs + parentTask := rpt.getParentTask() + if parentTask == nil { + // all queryNodes have successfully released the data, clean up collectionMeta + err := rpt.meta.releasePartitions(collectionID, partitionIDs) + if err != nil { + log.Error("releasePartitionTask: release collectionInfo from meta failed", zap.Int64("collectionID", collectionID), zap.Int64("msgID", rpt.Base.MsgID), zap.Error(err)) + panic(err) + } + + } +} + func (rpt *releasePartitionTask) preExecute(context.Context) error { collectionID := rpt.CollectionID partitionIDs := rpt.PartitionIDs @@ -923,7 +944,8 @@ func (rpt *releasePartitionTask) preExecute(context.Context) error { } func (rpt *releasePartitionTask) execute(ctx context.Context) error { - defer rpt.reduceRetryCount() + // cancel the maximum number of retries for queryNode cleaning data until the data is completely freed + // defer rpt.reduceRetryCount() collectionID := rpt.CollectionID partitionIDs := rpt.PartitionIDs @@ -944,20 +966,12 @@ func (rpt *releasePartitionTask) execute(ctx context.Context) error { rpt.addChildTask(releasePartitionTask) log.Debug("releasePartitionTask: add a releasePartitionTask to releasePartitionTask's childTask", zap.Int64("collectionID", collectionID), zap.Int64("msgID", rpt.Base.MsgID)) } - - //TODO::xige-16 delete partition info from meta should wait all internal release task execute done - // if some query nodes release partitions failed, the partition data will can't be removed from query node - err := rpt.meta.releasePartitions(collectionID, partitionIDs) - if err != nil { - log.Error("releasePartitionTask: release collectionInfo from meta failed", zap.Int64("collectionID", collectionID), zap.Int64("msgID", rpt.Base.MsgID), zap.Error(err)) - rpt.setResultInfo(err) - return err - } } else { err := rpt.cluster.releasePartitions(ctx, rpt.NodeID, rpt.ReleasePartitionsRequest) if err != nil { log.Warn("ReleasePartitionsTask: release partition end, node occur error", zap.Int64("collectionID", collectionID), zap.String("nodeID", fmt.Sprintln(rpt.NodeID))) - rpt.setResultInfo(err) + // after release failed, the task will always redo + // if the query node happens to be down, the node release was judged to have succeeded return err } } @@ -1288,6 +1302,15 @@ func (wdt *watchDeltaChannelTask) marshal() ([]byte, error) { return proto.Marshal(wdt.WatchDeltaChannelsRequest) } +func (wdt *watchDeltaChannelTask) isValid() bool { + online, err := wdt.cluster.isOnline(wdt.NodeID) + if err != nil { + return false + } + + return wdt.ctx != nil && online +} + func (wdt *watchDeltaChannelTask) msgType() commonpb.MsgType { return wdt.Base.MsgType } diff --git a/internal/querycoord/task_test.go b/internal/querycoord/task_test.go index e0646e552..e512c8dbd 100644 --- a/internal/querycoord/task_test.go +++ b/internal/querycoord/task_test.go @@ -520,7 +520,14 @@ func Test_ReleaseCollectionExecuteFail(t *testing.T) { err = queryCoord.scheduler.Enqueue(releaseCollectionTask) assert.Nil(t, err) - waitTaskFinalState(releaseCollectionTask, taskFailed) + for { + if releaseCollectionTask.getState() == taskDone { + break + } + } + node.releaseCollection = returnSuccessResult + + waitTaskFinalState(releaseCollectionTask, taskExpired) node.stop() queryCoord.Stop() diff --git a/internal/querynode/mock_test.go b/internal/querynode/mock_test.go index 552bdc739..7f4dfea80 100644 --- a/internal/querynode/mock_test.go +++ b/internal/querynode/mock_test.go @@ -43,7 +43,6 @@ import ( "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util" "github.com/milvus-io/milvus/internal/util/etcd" - "github.com/milvus-io/milvus/internal/util/sessionutil" ) // ---------- unittest util functions ---------- @@ -1296,10 +1295,7 @@ func genSimpleQueryNode(ctx context.Context) (*QueryNode, error) { return nil, err } node.etcdCli = etcdCli - session := &sessionutil.Session{ - ServerID: 1, - } - node.session = session + node.initSession() etcdKV := etcdkv.NewEtcdKV(etcdCli, Params.BaseParams.MetaRootPath) node.etcdKV = etcdKV diff --git a/internal/querynode/task.go b/internal/querynode/task.go index 7f2f8fb15..dd1f525e6 100644 --- a/internal/querynode/task.go +++ b/internal/querynode/task.go @@ -22,7 +22,6 @@ import ( "fmt" "math/rand" "runtime/debug" - "strconv" "time" "go.uber.org/zap" @@ -32,6 +31,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/internalpb" queryPb "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/rootcoord" + "github.com/milvus-io/milvus/internal/util/funcutil" "github.com/milvus-io/milvus/internal/util/mqclient" ) @@ -158,7 +158,7 @@ func (r *addQueryChannelTask) Execute(ctx context.Context) error { return err } consumeChannels := []string{r.req.QueryChannel} - consumeSubName := Params.QueryNodeCfg.MsgChannelSubName + "-" + strconv.FormatInt(collectionID, 10) + "-" + strconv.Itoa(rand.Int()) + consumeSubName := funcutil.GenChannelSubName(Params.QueryNodeCfg.MsgChannelSubName, collectionID, Params.QueryNodeCfg.QueryNodeID) if Params.QueryNodeCfg.SkipQueryChannelRecovery { log.Debug("Skip query channel seek back ", zap.Strings("channels", consumeChannels), @@ -310,12 +310,7 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error { } log.Debug("watchDMChannel, init replica done", zap.Int64("collectionID", collectionID)) - // get subscription name - getUniqueSubName := func() string { - prefixName := Params.QueryNodeCfg.MsgChannelSubName - return prefixName + "-" + strconv.FormatInt(collectionID, 10) + "-" + strconv.Itoa(rand.Int()) - } - consumeSubName := getUniqueSubName() + consumeSubName := funcutil.GenChannelSubName(Params.QueryNodeCfg.MsgChannelSubName, collectionID, Params.QueryNodeCfg.QueryNodeID) // group channels by to seeking or consuming toSeekChannels := make([]*internalpb.MsgPosition, 0) @@ -574,12 +569,7 @@ func (w *watchDeltaChannelsTask) Execute(ctx context.Context) error { sCol.addVDeltaChannels(vDeltaChannels) sCol.addPDeltaChannels(pDeltaChannels) - // get subscription name - getUniqueSubName := func() string { - prefixName := Params.QueryNodeCfg.MsgChannelSubName - return prefixName + "-" + strconv.FormatInt(collectionID, 10) + "-" + strconv.Itoa(rand.Int()) - } - consumeSubName := getUniqueSubName() + consumeSubName := funcutil.GenChannelSubName(Params.QueryNodeCfg.MsgChannelSubName, collectionID, Params.QueryNodeCfg.QueryNodeID) // group channels by to seeking or consuming toSubChannels := make([]Channel, 0) diff --git a/internal/util/funcutil/func.go b/internal/util/funcutil/func.go index afc6431af..b37f8b22f 100644 --- a/internal/util/funcutil/func.go +++ b/internal/util/funcutil/func.go @@ -175,6 +175,11 @@ func CheckCtxValid(ctx context.Context) bool { return ctx.Err() != context.DeadlineExceeded && ctx.Err() != context.Canceled } +// GenChannelSubName generate subName to watch channel +func GenChannelSubName(prefix string, collectionID int64, nodeID int64) string { + return fmt.Sprintf("%s-%d-%d", prefix, collectionID, nodeID) +} + // CheckPortAvailable check if a port is available to be listened on func CheckPortAvailable(port int) bool { addr := ":" + strconv.Itoa(port) diff --git a/internal/util/paramtable/global_param.go b/internal/util/paramtable/global_param.go index 8d900a7c9..e81ae8f1f 100644 --- a/internal/util/paramtable/global_param.go +++ b/internal/util/paramtable/global_param.go @@ -1028,7 +1028,7 @@ func (p *queryNodeConfig) initMsgChannelSubName() { log.Warn(err.Error()) } - s := []string{p.ClusterChannelPrefix, namePrefix, strconv.FormatInt(p.QueryNodeID, 10)} + s := []string{p.ClusterChannelPrefix, namePrefix} p.MsgChannelSubName = strings.Join(s, "-") } diff --git a/internal/util/paramtable/global_param_test.go b/internal/util/paramtable/global_param_test.go index 4c6d79712..9645a21ca 100644 --- a/internal/util/paramtable/global_param_test.go +++ b/internal/util/paramtable/global_param_test.go @@ -230,7 +230,7 @@ func TestGlobalParamTable(t *testing.T) { Params.QueryNodeID = 3 Params.initMsgChannelSubName() name := Params.MsgChannelSubName - assert.Equal(t, name, "by-dev-queryNode-3") + assert.Equal(t, name, "by-dev-queryNode") name = Params.StatsChannelName assert.Equal(t, name, "by-dev-query-node-stats") -- GitLab