未验证 提交 9eeec4a2 编写于 作者: B bigsheeper 提交者: GitHub

Add collection load cache and InvalidateCollMetaCache by collID (#16882)

Signed-off-by: Nbigsheeper <yihao.dai@zilliz.com>
上级 619bab21
......@@ -404,6 +404,11 @@ func (m *mockRootCoordService) UpdateChannelTimeTick(ctx context.Context, req *i
func (m *mockRootCoordService) ReleaseDQLMessageStream(ctx context.Context, req *proxypb.ReleaseDQLMessageStreamRequest) (*commonpb.Status, error) {
panic("not implemented") // TODO: Implement
}
func (m *mockRootCoordService) InvalidateCollectionMetaCache(ctx context.Context, req *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
panic("not implemented") // TODO: Implement
}
func (m *mockRootCoordService) SegmentFlushCompleted(ctx context.Context, in *datapb.SegmentFlushCompletedMsg) (*commonpb.Status, error) {
return &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, nil
}
......
......@@ -194,6 +194,10 @@ func (m *MockRootCoord) ReleaseDQLMessageStream(ctx context.Context, in *proxypb
return nil, nil
}
func (m *MockRootCoord) InvalidateCollectionMetaCache(ctx context.Context, in *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
return nil, nil
}
func (m *MockRootCoord) SegmentFlushCompleted(ctx context.Context, in *datapb.SegmentFlushCompletedMsg) (*commonpb.Status, error) {
return nil, nil
}
......
......@@ -422,6 +422,20 @@ func (c *Client) ReleaseDQLMessageStream(ctx context.Context, in *proxypb.Releas
return ret.(*commonpb.Status), err
}
// InvalidateCollectionMetaCache notifies RootCoord to release the collection cache in Proxies.
func (c *Client) InvalidateCollectionMetaCache(ctx context.Context, in *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
}
return client.(rootcoordpb.RootCoordClient).InvalidateCollectionMetaCache(ctx, in)
})
if err != nil || ret == nil {
return nil, err
}
return ret.(*commonpb.Status), err
}
// SegmentFlushCompleted check whether segment flush is completed
func (c *Client) SegmentFlushCompleted(ctx context.Context, in *datapb.SegmentFlushCompletedMsg) (*commonpb.Status, error) {
ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
......
......@@ -161,6 +161,9 @@ func Test_NewClient(t *testing.T) {
r34, err := client.ListCredUsers(ctx, nil)
retCheck(retNotNil, r34, err)
r35, err := client.InvalidateCollectionMetaCache(ctx, nil)
retCheck(retNotNil, r35, err)
}
client.grpcClient = &mock.ClientBase{
......@@ -313,6 +316,9 @@ func Test_NewClient(t *testing.T) {
r35Timeout, err := client.ListImportTasks(shortCtx, nil)
retCheck(r35Timeout, err)
r36Timeout, err := client.InvalidateCollectionMetaCache(shortCtx, nil)
retCheck(r36Timeout, err)
// clean up
err = client.Stop()
assert.Nil(t, err)
......
......@@ -429,6 +429,11 @@ func (s *Server) ReleaseDQLMessageStream(ctx context.Context, in *proxypb.Releas
return s.rootCoord.ReleaseDQLMessageStream(ctx, in)
}
// InvalidateCollectionMetaCache notifies RootCoord to release the collection cache in Proxies.
func (s *Server) InvalidateCollectionMetaCache(ctx context.Context, in *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
return s.rootCoord.InvalidateCollectionMetaCache(ctx, in)
}
// SegmentFlushCompleted notifies RootCoord that specified segment has been flushed.
func (s *Server) SegmentFlushCompleted(ctx context.Context, in *datapb.SegmentFlushCompletedMsg) (*commonpb.Status, error) {
return s.rootCoord.SegmentFlushCompleted(ctx, in)
......
......@@ -58,6 +58,12 @@ func (p *proxyMock) InvalidateCollectionMetaCache(ctx context.Context, request *
return p.invalidateCollectionMetaCache(ctx, request)
}
func (p *proxyMock) ReleaseDQLMessageStream(ctx context.Context, request *proxypb.ReleaseDQLMessageStreamRequest) (*commonpb.Status, error) {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}, nil
}
func TestGrpcService(t *testing.T) {
const (
dbName = "testDB"
......@@ -287,7 +293,9 @@ func TestGrpcService(t *testing.T) {
t.Run("release DQL msg stream", func(t *testing.T) {
req := &proxypb.ReleaseDQLMessageStreamRequest{}
assert.Panics(t, func() { svr.ReleaseDQLMessageStream(ctx, req) })
rsp, err := svr.ReleaseDQLMessageStream(ctx, req)
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, rsp.ErrorCode)
})
t.Run("get metrics", func(t *testing.T) {
......@@ -764,7 +772,6 @@ func TestGrpcService(t *testing.T) {
assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
assert.Equal(t, collName, dropCollectionArray[0].CollectionName)
assert.Equal(t, 3, len(collectionMetaCache))
assert.Equal(t, collName, collectionMetaCache[0])
req = &milvuspb.DropCollectionRequest{
Base: &commonpb.MsgBase{
......
......@@ -28,6 +28,7 @@ message InvalidateCollMetaCacheRequest {
common.MsgBase base = 1;
string db_name = 2;
string collection_name = 3;
int64 collectionID = 4;
}
message ReleaseDQLMessageStreamRequest {
......
......@@ -31,6 +31,7 @@ type InvalidateCollMetaCacheRequest struct {
Base *commonpb.MsgBase `protobuf:"bytes,1,opt,name=base,proto3" json:"base,omitempty"`
DbName string `protobuf:"bytes,2,opt,name=db_name,json=dbName,proto3" json:"db_name,omitempty"`
CollectionName string `protobuf:"bytes,3,opt,name=collection_name,json=collectionName,proto3" json:"collection_name,omitempty"`
CollectionID int64 `protobuf:"varint,4,opt,name=collectionID,proto3" json:"collectionID,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
......@@ -82,6 +83,13 @@ func (m *InvalidateCollMetaCacheRequest) GetCollectionName() string {
return ""
}
func (m *InvalidateCollMetaCacheRequest) GetCollectionID() int64 {
if m != nil {
return m.CollectionID
}
return 0
}
type ReleaseDQLMessageStreamRequest struct {
Base *commonpb.MsgBase `protobuf:"bytes,1,opt,name=base,proto3" json:"base,omitempty"`
DbID int64 `protobuf:"varint,2,opt,name=dbID,proto3" json:"dbID,omitempty"`
......@@ -249,42 +257,43 @@ func init() {
func init() { proto.RegisterFile("proxy.proto", fileDescriptor_700b50b08ed8dbaf) }
var fileDescriptor_700b50b08ed8dbaf = []byte{
// 560 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x94, 0xdd, 0x6e, 0xd3, 0x30,
0x14, 0xc7, 0x17, 0x3a, 0xc6, 0x38, 0xab, 0x06, 0x32, 0x1f, 0x1b, 0x01, 0xa6, 0x29, 0x20, 0x98,
0x26, 0xd1, 0x8e, 0xc2, 0x13, 0xac, 0x95, 0xaa, 0x4a, 0x14, 0x41, 0xaa, 0x09, 0x09, 0x2e, 0x90,
0x93, 0x1c, 0xb5, 0x9e, 0x1c, 0x3b, 0xb3, 0x9d, 0x02, 0xb7, 0x5c, 0x72, 0xcd, 0x03, 0xf0, 0xa8,
0x28, 0x1f, 0x4d, 0x9b, 0xb6, 0x59, 0xf8, 0xd0, 0xee, 0x72, 0xec, 0xff, 0xf1, 0xef, 0x1c, 0x1f,
0xff, 0x03, 0x3b, 0x91, 0x92, 0x5f, 0xbf, 0xb5, 0x22, 0x25, 0x8d, 0x24, 0x24, 0x64, 0x7c, 0x1a,
0xeb, 0x2c, 0x6a, 0xa5, 0x3b, 0x76, 0xd3, 0x97, 0x61, 0x28, 0x45, 0xb6, 0x66, 0xef, 0x32, 0x61,
0x50, 0x09, 0xca, 0xf3, 0xb8, 0xb9, 0x98, 0xe1, 0xfc, 0xb4, 0xe0, 0x60, 0x20, 0xa6, 0x94, 0xb3,
0x80, 0x1a, 0xec, 0x4a, 0xce, 0x87, 0x68, 0x68, 0x97, 0xfa, 0x13, 0x74, 0xf1, 0x22, 0x46, 0x6d,
0xc8, 0x09, 0x6c, 0x7a, 0x54, 0xe3, 0xbe, 0x75, 0x68, 0x1d, 0xed, 0x74, 0x1e, 0xb5, 0x4a, 0xc4,
0x1c, 0x35, 0xd4, 0xe3, 0x53, 0xaa, 0xd1, 0x4d, 0x95, 0x64, 0x0f, 0x6e, 0x04, 0xde, 0x67, 0x41,
0x43, 0xdc, 0xbf, 0x76, 0x68, 0x1d, 0xdd, 0x74, 0xb7, 0x02, 0xef, 0x2d, 0x0d, 0x91, 0x3c, 0x87,
0x5b, 0xbe, 0xe4, 0x1c, 0x7d, 0xc3, 0xa4, 0xc8, 0x04, 0x8d, 0x54, 0xb0, 0x3b, 0x5f, 0x4e, 0x84,
0xce, 0x0f, 0x0b, 0x0e, 0x5c, 0xe4, 0x48, 0x35, 0xf6, 0xde, 0xbf, 0x19, 0xa2, 0xd6, 0x74, 0x8c,
0x23, 0xa3, 0x90, 0x86, 0xff, 0x5e, 0x16, 0x81, 0xcd, 0xc0, 0x1b, 0xf4, 0xd2, 0x9a, 0x1a, 0x6e,
0xfa, 0x4d, 0x1c, 0x68, 0xce, 0xd1, 0x83, 0x5e, 0x5a, 0x4e, 0xc3, 0x2d, 0xad, 0x39, 0xe7, 0x60,
0x2f, 0x5c, 0x91, 0xc2, 0xe0, 0x3f, 0xaf, 0xc7, 0x86, 0xed, 0x58, 0x27, 0x23, 0x29, 0xee, 0xa7,
0x88, 0x9d, 0xef, 0x16, 0xdc, 0x3f, 0x8b, 0xae, 0x1e, 0x94, 0xec, 0x45, 0x54, 0xeb, 0x2f, 0x52,
0x05, 0xf9, 0x0c, 0x8a, 0xb8, 0xf3, 0x6b, 0x1b, 0xae, 0xbf, 0x4b, 0x9e, 0x12, 0x89, 0x80, 0xf4,
0xd1, 0x74, 0x65, 0x18, 0x49, 0x81, 0xc2, 0x8c, 0x0c, 0x35, 0xa8, 0xc9, 0x49, 0x99, 0x5d, 0x3c,
0xb0, 0x55, 0x69, 0x5e, 0xbb, 0xfd, 0xac, 0x22, 0x63, 0x49, 0xee, 0x6c, 0x90, 0x0b, 0xb8, 0xdb,
0xc7, 0x34, 0x64, 0xda, 0x30, 0x5f, 0x77, 0x27, 0x54, 0x08, 0xe4, 0xa4, 0x53, 0xcd, 0x5c, 0x11,
0xcf, 0xa8, 0x4f, 0xca, 0x39, 0x79, 0x30, 0x32, 0x8a, 0x89, 0xb1, 0x8b, 0x3a, 0x92, 0x42, 0xa3,
0xb3, 0x41, 0x14, 0x3c, 0x2e, 0x5b, 0x20, 0x9b, 0x7c, 0x61, 0x84, 0x65, 0x76, 0xe6, 0xbf, 0xcb,
0x5d, 0x63, 0x3f, 0x5c, 0x3b, 0x9f, 0xa4, 0xd4, 0x38, 0x69, 0x93, 0x42, 0xb3, 0x8f, 0xa6, 0x17,
0xcc, 0xda, 0x3b, 0xae, 0x6e, 0xaf, 0x10, 0xfd, 0x65, 0x5b, 0x1c, 0xf6, 0x2a, 0x2c, 0xb4, 0xbe,
0xa1, 0xcb, 0xfd, 0x56, 0xd7, 0xd0, 0x07, 0xb8, 0x3d, 0x42, 0x11, 0x8c, 0x90, 0x2a, 0x7f, 0xe2,
0xa2, 0x8e, 0xb9, 0x21, 0x4f, 0x2b, 0x9a, 0x5a, 0x14, 0xe9, 0xba, 0x83, 0x3f, 0x01, 0x49, 0x0e,
0x76, 0xd1, 0x28, 0x86, 0x53, 0xcc, 0x8f, 0xae, 0x7a, 0x50, 0x65, 0x59, 0xed, 0xe1, 0xe7, 0xf0,
0xa0, 0x6c, 0x6d, 0x14, 0x86, 0x51, 0x9e, 0x8d, 0xbd, 0x55, 0x33, 0xf6, 0x25, 0x83, 0xd6, 0xb1,
0x3c, 0xb8, 0x37, 0x77, 0xf6, 0x22, 0xe7, 0x78, 0x1d, 0x67, 0xfd, 0x4f, 0xa0, 0x8e, 0x31, 0x86,
0x3b, 0x5d, 0x8e, 0x54, 0x25, 0x79, 0x67, 0x1a, 0x95, 0xce, 0x08, 0x2f, 0xab, 0xec, 0xb7, 0xaa,
0xfd, 0x33, 0xd0, 0xe9, 0xeb, 0x8f, 0x9d, 0x31, 0x33, 0x93, 0xd8, 0x4b, 0x76, 0xda, 0x99, 0xf4,
0x05, 0x93, 0xf9, 0x57, 0x7b, 0x46, 0x68, 0xa7, 0xd9, 0xed, 0xb4, 0xa5, 0xc8, 0xf3, 0xb6, 0xd2,
0xf0, 0xd5, 0xef, 0x00, 0x00, 0x00, 0xff, 0xff, 0xf2, 0x58, 0x7c, 0x7f, 0xc3, 0x06, 0x00, 0x00,
// 566 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x54, 0xdb, 0x6e, 0xd3, 0x40,
0x10, 0xad, 0x49, 0x28, 0x65, 0x1a, 0x15, 0xb4, 0x5c, 0x1a, 0x0c, 0x54, 0x91, 0x41, 0x10, 0x55,
0x22, 0x29, 0x81, 0x2f, 0x68, 0x22, 0x45, 0x91, 0x08, 0x02, 0x47, 0x15, 0x12, 0x3c, 0xa0, 0xb5,
0x3d, 0x4a, 0x5c, 0xad, 0x77, 0xdd, 0xdd, 0x75, 0x80, 0x57, 0x1e, 0xf9, 0x0a, 0x3e, 0x83, 0xcf,
0x43, 0xbe, 0xc4, 0x89, 0x93, 0xb8, 0xe6, 0xa2, 0xbe, 0x79, 0x66, 0xcf, 0xcc, 0x99, 0xb3, 0xb3,
0xc7, 0xb0, 0x1f, 0x4a, 0xf1, 0xf5, 0x5b, 0x27, 0x94, 0x42, 0x0b, 0x42, 0x02, 0x9f, 0xcd, 0x23,
0x95, 0x46, 0x9d, 0xe4, 0xc4, 0x6c, 0xb8, 0x22, 0x08, 0x04, 0x4f, 0x73, 0xe6, 0x81, 0xcf, 0x35,
0x4a, 0x4e, 0x59, 0x16, 0x37, 0x56, 0x2b, 0xac, 0x5f, 0x06, 0x1c, 0x8d, 0xf8, 0x9c, 0x32, 0xdf,
0xa3, 0x1a, 0xfb, 0x82, 0xb1, 0x31, 0x6a, 0xda, 0xa7, 0xee, 0x0c, 0x6d, 0xbc, 0x88, 0x50, 0x69,
0x72, 0x02, 0x75, 0x87, 0x2a, 0x6c, 0x1a, 0x2d, 0xa3, 0xbd, 0xdf, 0x7b, 0xd4, 0x29, 0x30, 0x66,
0x54, 0x63, 0x35, 0x3d, 0xa5, 0x0a, 0xed, 0x04, 0x49, 0x0e, 0xe1, 0x86, 0xe7, 0x7c, 0xe6, 0x34,
0xc0, 0xe6, 0xb5, 0x96, 0xd1, 0xbe, 0x69, 0xef, 0x7a, 0xce, 0x5b, 0x1a, 0x20, 0x79, 0x0e, 0xb7,
0x5c, 0xc1, 0x18, 0xba, 0xda, 0x17, 0x3c, 0x05, 0xd4, 0x12, 0xc0, 0xc1, 0x32, 0x9d, 0x00, 0x2d,
0x68, 0x2c, 0x33, 0xa3, 0x41, 0xb3, 0xde, 0x32, 0xda, 0x35, 0xbb, 0x90, 0xb3, 0x7e, 0x18, 0x70,
0x64, 0x23, 0x43, 0xaa, 0x70, 0xf0, 0xfe, 0xcd, 0x18, 0x95, 0xa2, 0x53, 0x9c, 0x68, 0x89, 0x34,
0xf8, 0xf7, 0xd1, 0x09, 0xd4, 0x3d, 0x67, 0x34, 0x48, 0xe6, 0xae, 0xd9, 0xc9, 0xf7, 0xc6, 0x30,
0xb5, 0x2d, 0xc3, 0x9c, 0x83, 0xb9, 0x72, 0x8d, 0x12, 0xbd, 0xff, 0xbc, 0x42, 0x13, 0xf6, 0x22,
0x15, 0xaf, 0x2d, 0xbf, 0xc3, 0x3c, 0xb6, 0xbe, 0x1b, 0x70, 0xff, 0x2c, 0xbc, 0x7a, 0xa2, 0xf8,
0x2c, 0xa4, 0x4a, 0x7d, 0x11, 0xd2, 0xcb, 0xf6, 0x94, 0xc7, 0xbd, 0x9f, 0x7b, 0x70, 0xfd, 0x5d,
0xfc, 0xdc, 0x48, 0x08, 0x64, 0x88, 0xba, 0x2f, 0x82, 0x50, 0x70, 0xe4, 0x7a, 0xa2, 0xa9, 0x46,
0x45, 0x4e, 0x8a, 0xdc, 0xf9, 0x23, 0xdc, 0x84, 0x66, 0xb3, 0x9b, 0xcf, 0x4a, 0x2a, 0xd6, 0xe0,
0xd6, 0x0e, 0xb9, 0x80, 0xbb, 0x43, 0x4c, 0x42, 0x5f, 0x69, 0xdf, 0x55, 0xfd, 0x19, 0xe5, 0x1c,
0x19, 0xe9, 0x95, 0x73, 0x6e, 0x80, 0x17, 0xac, 0x4f, 0x8a, 0x35, 0x59, 0x30, 0xd1, 0xd2, 0xe7,
0x53, 0x1b, 0x55, 0x28, 0xb8, 0x42, 0x6b, 0x87, 0x48, 0x78, 0x5c, 0xb4, 0x49, 0xba, 0xf9, 0xdc,
0x2c, 0xeb, 0xdc, 0xa9, 0x47, 0x2f, 0x77, 0x96, 0xf9, 0x70, 0xeb, 0x7e, 0xe2, 0x51, 0xa3, 0x58,
0x26, 0x85, 0xc6, 0x10, 0xf5, 0xc0, 0x5b, 0xc8, 0x3b, 0x2e, 0x97, 0x97, 0x83, 0xfe, 0x52, 0x16,
0x83, 0xc3, 0x12, 0x0b, 0x6d, 0x17, 0x74, 0xb9, 0xdf, 0xaa, 0x04, 0x7d, 0x80, 0xdb, 0x13, 0xe4,
0xde, 0x04, 0xa9, 0x74, 0x67, 0x36, 0xaa, 0x88, 0x69, 0xf2, 0xb4, 0x44, 0xd4, 0x2a, 0x48, 0x55,
0x35, 0xfe, 0x04, 0x24, 0x6e, 0x6c, 0xa3, 0x96, 0x3e, 0xce, 0x31, 0x6b, 0x5d, 0xf6, 0xa0, 0x8a,
0xb0, 0xca, 0xe6, 0xe7, 0xf0, 0xa0, 0x68, 0x6d, 0xe4, 0xda, 0xa7, 0x2c, 0x5d, 0x7b, 0xa7, 0x62,
0xed, 0x6b, 0x06, 0xad, 0xe2, 0x72, 0xe0, 0xde, 0xd2, 0xd9, 0xab, 0x3c, 0xc7, 0xdb, 0x78, 0xb6,
0xff, 0x04, 0xaa, 0x38, 0xa6, 0x70, 0xa7, 0xcf, 0x90, 0xca, 0xb8, 0xee, 0x4c, 0xa1, 0x54, 0x29,
0xc3, 0xcb, 0x32, 0xfb, 0x6d, 0x62, 0xff, 0x8c, 0xe8, 0xf4, 0xf5, 0xc7, 0xde, 0xd4, 0xd7, 0xb3,
0xc8, 0x89, 0x4f, 0xba, 0x29, 0xf4, 0x85, 0x2f, 0xb2, 0xaf, 0xee, 0x82, 0xa1, 0x9b, 0x54, 0x77,
0x13, 0x49, 0xa1, 0xe3, 0xec, 0x26, 0xe1, 0xab, 0xdf, 0x01, 0x00, 0x00, 0xff, 0xff, 0xfb, 0xee,
0x9c, 0xaf, 0xe7, 0x06, 0x00, 0x00,
}
// Reference imports to suppress errors if they are not otherwise used.
......
......@@ -103,6 +103,7 @@ service RootCoord {
rpc AllocID(AllocIDRequest) returns (AllocIDResponse) {}
rpc UpdateChannelTimeTick(internal.ChannelTimeTickMsg) returns (common.Status) {}
rpc ReleaseDQLMessageStream(proxy.ReleaseDQLMessageStreamRequest) returns (common.Status) {}
rpc InvalidateCollectionMetaCache(proxy.InvalidateCollMetaCacheRequest) returns (common.Status) {}
rpc SegmentFlushCompleted(data.SegmentFlushCompletedMsg) returns (common.Status) {}
// https://wiki.lfaidata.foundation/display/MIL/MEP+8+--+Add+metrics+for+proxy
......
......@@ -22,8 +22,9 @@ func TestValidAuth(t *testing.T) {
res = validAuth(ctx, []string{"xxx"})
assert.False(t, res)
// normal metadata
client := &MockRootCoordClientInterface{}
err := InitMetaCache(client)
rootCoord := &MockRootCoordClientInterface{}
queryCoord := &MockQueryCoordClientInterface{}
err := InitMetaCache(rootCoord, queryCoord)
assert.Nil(t, err)
res = validAuth(ctx, []string{crypto.Base64Encode("mockUser:mockPass")})
assert.True(t, res)
......@@ -49,8 +50,9 @@ func TestAuthenticationInterceptor(t *testing.T) {
_, err := AuthenticationInterceptor(ctx)
assert.NotNil(t, err)
// mock metacache
client := &MockRootCoordClientInterface{}
err = InitMetaCache(client)
rootCoord := &MockRootCoordClientInterface{}
queryCoord := &MockQueryCoordClientInterface{}
err = InitMetaCache(rootCoord, queryCoord)
assert.Nil(t, err)
// with invalid metadata
md := metadata.Pairs("xxx", "yyy")
......
......@@ -100,19 +100,27 @@ func (node *Proxy) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringRe
// InvalidateCollectionMetaCache invalidate the meta cache of specific collection.
func (node *Proxy) InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
ctx = logutil.WithModule(ctx, moduleName)
logutil.Logger(ctx).Debug("received request to invalidate collection meta cache",
logutil.Logger(ctx).Info("received request to invalidate collection meta cache",
zap.String("role", typeutil.ProxyRole),
zap.String("db", request.DbName),
zap.String("collection", request.CollectionName))
zap.String("collectionName", request.CollectionName),
zap.Int64("collectionID", request.CollectionID))
collectionName := request.CollectionName
collectionID := request.CollectionID
if globalMetaCache != nil {
globalMetaCache.RemoveCollection(ctx, collectionName) // no need to return error, though collection may be not cached
if collectionName != "" {
globalMetaCache.RemoveCollection(ctx, collectionName) // no need to return error, though collection may be not cached
}
if request.CollectionID != UniqueID(0) {
globalMetaCache.RemoveCollectionsByID(ctx, collectionID)
}
}
logutil.Logger(ctx).Debug("complete to invalidate collection meta cache",
logutil.Logger(ctx).Info("complete to invalidate collection meta cache",
zap.String("role", typeutil.ProxyRole),
zap.String("db", request.DbName),
zap.String("collection", request.CollectionName))
zap.String("collection", collectionName),
zap.Int64("collectionID", collectionID))
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
......
......@@ -56,6 +56,7 @@ type Cache interface {
GetShards(ctx context.Context, withCache bool, collectionName string, qc types.QueryCoord) (map[string][]queryNode, error)
ClearShards(collectionName string)
RemoveCollection(ctx context.Context, collectionName string)
RemoveCollectionsByID(ctx context.Context, collectionID UniqueID)
RemovePartition(ctx context.Context, collectionName string, partitionName string)
// GetCredentialInfo operate credential cache
......@@ -73,6 +74,7 @@ type collectionInfo struct {
shardLeaders map[string][]queryNode
createdTimestamp uint64
createdUtcTimestamp uint64
isLoaded bool
}
type partitionInfo struct {
......@@ -86,7 +88,8 @@ var _ Cache = (*MetaCache)(nil)
// MetaCache implements Cache, provides collection meta cache based on internal RootCoord
type MetaCache struct {
client types.RootCoord
rootCoord types.RootCoord
queryCoord types.QueryCoord
collInfo map[string]*collectionInfo
credMap map[string]*internalpb.CredentialInfo // cache for credential, lazy load
......@@ -99,21 +102,22 @@ type MetaCache struct {
var globalMetaCache Cache
// InitMetaCache initializes globalMetaCache
func InitMetaCache(client types.RootCoord) error {
func InitMetaCache(rootCoord types.RootCoord, queryCoord types.QueryCoord) error {
var err error
globalMetaCache, err = NewMetaCache(client)
globalMetaCache, err = NewMetaCache(rootCoord, queryCoord)
if err != nil {
return err
}
return nil
}
// NewMetaCache creates a MetaCache with provided RootCoord
func NewMetaCache(client types.RootCoord) (*MetaCache, error) {
// NewMetaCache creates a MetaCache with provided RootCoord and QueryNode
func NewMetaCache(rootCoord types.RootCoord, queryCoord types.QueryCoord) (*MetaCache, error) {
return &MetaCache{
client: client,
collInfo: map[string]*collectionInfo{},
credMap: map[string]*internalpb.CredentialInfo{},
rootCoord: rootCoord,
queryCoord: queryCoord,
collInfo: map[string]*collectionInfo{},
credMap: map[string]*internalpb.CredentialInfo{},
}, nil
}
......@@ -159,12 +163,45 @@ func (m *MetaCache) GetCollectionInfo(ctx context.Context, collectionName string
return nil, err
}
m.mu.Lock()
defer m.mu.Unlock()
m.updateCollection(coll, collectionName)
collInfo = m.collInfo[collectionName]
m.mu.Unlock()
metrics.ProxyUpdateCacheLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10)).Observe(float64(tr.ElapseSpan().Milliseconds()))
}
if !collInfo.isLoaded {
// check if collection was loaded
showResp, err := m.queryCoord.ShowCollections(ctx, &querypb.ShowCollectionsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_ShowCollections,
SourceID: Params.ProxyCfg.GetNodeID(),
},
})
if err != nil {
return nil, err
}
if showResp.Status.ErrorCode != commonpb.ErrorCode_Success {
return nil, errors.New(showResp.Status.Reason)
}
log.Debug("QueryCoord show collections",
zap.Int64("collID", collInfo.collID),
zap.Int64s("collections", showResp.GetCollectionIDs()),
zap.Int64s("collectionsInMemoryPercentages", showResp.GetInMemoryPercentages()),
)
loaded := false
for index, collID := range showResp.CollectionIDs {
if collID == collInfo.collID && showResp.GetInMemoryPercentages()[index] >= int64(100) {
loaded = true
break
}
}
if loaded {
m.mu.Lock()
m.collInfo[collectionName].isLoaded = true
m.mu.Unlock()
}
}
metrics.ProxyCacheHitCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), "GetCollectionInfo", metrics.CacheHitLabel).Inc()
return &collectionInfo{
collID: collInfo.collID,
......@@ -173,6 +210,7 @@ func (m *MetaCache) GetCollectionInfo(ctx context.Context, collectionName string
createdTimestamp: collInfo.createdTimestamp,
createdUtcTimestamp: collInfo.createdUtcTimestamp,
shardLeaders: collInfo.shardLeaders,
isLoaded: collInfo.isLoaded,
}, nil
}
......@@ -334,7 +372,7 @@ func (m *MetaCache) describeCollection(ctx context.Context, collectionName strin
},
CollectionName: collectionName,
}
coll, err := m.client.DescribeCollection(ctx, req)
coll, err := m.rootCoord.DescribeCollection(ctx, req)
if err != nil {
return nil, err
}
......@@ -371,7 +409,7 @@ func (m *MetaCache) showPartitions(ctx context.Context, collectionName string) (
CollectionName: collectionName,
}
partitions, err := m.client.ShowPartitions(ctx, req)
partitions, err := m.rootCoord.ShowPartitions(ctx, req)
if err != nil {
return nil, err
}
......@@ -423,6 +461,16 @@ func (m *MetaCache) RemoveCollection(ctx context.Context, collectionName string)
delete(m.collInfo, collectionName)
}
func (m *MetaCache) RemoveCollectionsByID(ctx context.Context, collectionID UniqueID) {
m.mu.Lock()
defer m.mu.Unlock()
for k, v := range m.collInfo {
if v.collID == collectionID {
delete(m.collInfo, k)
}
}
}
func (m *MetaCache) RemovePartition(ctx context.Context, collectionName, partitionName string) {
m.mu.Lock()
defer m.mu.Unlock()
......@@ -452,7 +500,7 @@ func (m *MetaCache) GetCredentialInfo(ctx context.Context, username string) (*in
},
Username: username,
}
resp, err := m.client.GetCredential(ctx, req)
resp, err := m.rootCoord.GetCredential(ctx, req)
if err != nil {
return &internalpb.CredentialInfo{}, err
}
......@@ -516,7 +564,7 @@ func (m *MetaCache) GetCredUsernames(ctx context.Context) ([]string, error) {
MsgType: commonpb.MsgType_ListCredUsernames,
},
}
resp, err := m.client.ListCredUsers(ctx, req)
resp, err := m.rootCoord.ListCredUsers(ctx, req)
if err != nil {
return nil, err
}
......
......@@ -22,17 +22,18 @@ import (
"fmt"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/crypto"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
type MockRootCoordClientInterface struct {
......@@ -41,6 +42,12 @@ type MockRootCoordClientInterface struct {
AccessCount int
}
type MockQueryCoordClientInterface struct {
types.QueryCoord
Error bool
AccessCount int
}
func (m *MockRootCoordClientInterface) ShowPartitions(ctx context.Context, in *milvuspb.ShowPartitionsRequest) (*milvuspb.ShowPartitionsResponse, error) {
if m.Error {
return nil, errors.New("mocked error")
......@@ -171,32 +178,48 @@ func (m *MockRootCoordClientInterface) ListCredUsers(ctx context.Context, req *m
}, nil
}
func (m *MockQueryCoordClientInterface) ShowCollections(ctx context.Context, req *querypb.ShowCollectionsRequest) (*querypb.ShowCollectionsResponse, error) {
if m.Error {
return nil, errors.New("mocked error")
}
m.AccessCount++
rsp := &querypb.ShowCollectionsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
CollectionIDs: []UniqueID{1, 2},
InMemoryPercentages: []int64{100, 50},
}
return rsp, nil
}
//Simulate the cache path and the
func TestMetaCache_GetCollection(t *testing.T) {
ctx := context.Background()
client := &MockRootCoordClientInterface{}
err := InitMetaCache(client)
rootCoord := &MockRootCoordClientInterface{}
queryCoord := &MockQueryCoordClientInterface{}
err := InitMetaCache(rootCoord, queryCoord)
assert.Nil(t, err)
id, err := globalMetaCache.GetCollectionID(ctx, "collection1")
assert.Nil(t, err)
assert.Equal(t, id, typeutil.UniqueID(1))
assert.Equal(t, client.AccessCount, 1)
assert.Equal(t, rootCoord.AccessCount, 1)
// should'nt be accessed to remote root coord.
schema, err := globalMetaCache.GetCollectionSchema(ctx, "collection1")
assert.Equal(t, client.AccessCount, 1)
assert.Equal(t, rootCoord.AccessCount, 1)
assert.Nil(t, err)
assert.Equal(t, schema, &schemapb.CollectionSchema{
AutoID: true,
Fields: []*schemapb.FieldSchema{},
})
id, err = globalMetaCache.GetCollectionID(ctx, "collection2")
assert.Equal(t, client.AccessCount, 2)
assert.Equal(t, rootCoord.AccessCount, 2)
assert.Nil(t, err)
assert.Equal(t, id, typeutil.UniqueID(2))
schema, err = globalMetaCache.GetCollectionSchema(ctx, "collection2")
assert.Equal(t, client.AccessCount, 2)
assert.Equal(t, rootCoord.AccessCount, 2)
assert.Nil(t, err)
assert.Equal(t, schema, &schemapb.CollectionSchema{
AutoID: true,
......@@ -205,11 +228,11 @@ func TestMetaCache_GetCollection(t *testing.T) {
// test to get from cache, this should trigger root request
id, err = globalMetaCache.GetCollectionID(ctx, "collection1")
assert.Equal(t, client.AccessCount, 2)
assert.Equal(t, rootCoord.AccessCount, 2)
assert.Nil(t, err)
assert.Equal(t, id, typeutil.UniqueID(1))
schema, err = globalMetaCache.GetCollectionSchema(ctx, "collection1")
assert.Equal(t, client.AccessCount, 2)
assert.Equal(t, rootCoord.AccessCount, 2)
assert.Nil(t, err)
assert.Equal(t, schema, &schemapb.CollectionSchema{
AutoID: true,
......@@ -220,16 +243,17 @@ func TestMetaCache_GetCollection(t *testing.T) {
func TestMetaCache_GetCollectionFailure(t *testing.T) {
ctx := context.Background()
client := &MockRootCoordClientInterface{}
err := InitMetaCache(client)
rootCoord := &MockRootCoordClientInterface{}
queryCoord := &MockQueryCoordClientInterface{}
err := InitMetaCache(rootCoord, queryCoord)
assert.Nil(t, err)
client.Error = true
rootCoord.Error = true
schema, err := globalMetaCache.GetCollectionSchema(ctx, "collection1")
assert.NotNil(t, err)
assert.Nil(t, schema)
client.Error = false
rootCoord.Error = false
schema, err = globalMetaCache.GetCollectionSchema(ctx, "collection1")
assert.Nil(t, err)
......@@ -238,7 +262,7 @@ func TestMetaCache_GetCollectionFailure(t *testing.T) {
Fields: []*schemapb.FieldSchema{},
})
client.Error = true
rootCoord.Error = true
// should be cached with no error
assert.Nil(t, err)
assert.Equal(t, schema, &schemapb.CollectionSchema{
......@@ -249,8 +273,9 @@ func TestMetaCache_GetCollectionFailure(t *testing.T) {
func TestMetaCache_GetNonExistCollection(t *testing.T) {
ctx := context.Background()
client := &MockRootCoordClientInterface{}
err := InitMetaCache(client)
rootCoord := &MockRootCoordClientInterface{}
queryCoord := &MockQueryCoordClientInterface{}
err := InitMetaCache(rootCoord, queryCoord)
assert.Nil(t, err)
id, err := globalMetaCache.GetCollectionID(ctx, "collection3")
......@@ -263,8 +288,9 @@ func TestMetaCache_GetNonExistCollection(t *testing.T) {
func TestMetaCache_GetPartitionID(t *testing.T) {
ctx := context.Background()
client := &MockRootCoordClientInterface{}
err := InitMetaCache(client)
rootCoord := &MockRootCoordClientInterface{}
queryCoord := &MockQueryCoordClientInterface{}
err := InitMetaCache(rootCoord, queryCoord)
assert.Nil(t, err)
id, err := globalMetaCache.GetPartitionID(ctx, "collection1", "par1")
......@@ -283,8 +309,9 @@ func TestMetaCache_GetPartitionID(t *testing.T) {
func TestMetaCache_GetPartitionError(t *testing.T) {
ctx := context.Background()
client := &MockRootCoordClientInterface{}
err := InitMetaCache(client)
rootCoord := &MockRootCoordClientInterface{}
queryCoord := &MockQueryCoordClientInterface{}
err := InitMetaCache(rootCoord, queryCoord)
assert.Nil(t, err)
// Test the case where ShowPartitionsResponse is not aligned
......@@ -312,8 +339,9 @@ func TestMetaCache_GetPartitionError(t *testing.T) {
}
func TestMetaCache_GetShards(t *testing.T) {
client := &MockRootCoordClientInterface{}
err := InitMetaCache(client)
rootCoord := &MockRootCoordClientInterface{}
queryCoord := &MockQueryCoordClientInterface{}
err := InitMetaCache(rootCoord, queryCoord)
require.Nil(t, err)
var (
......@@ -358,8 +386,9 @@ func TestMetaCache_GetShards(t *testing.T) {
}
func TestMetaCache_ClearShards(t *testing.T) {
client := &MockRootCoordClientInterface{}
err := InitMetaCache(client)
rootCoord := &MockRootCoordClientInterface{}
queryCoord := &MockQueryCoordClientInterface{}
err := InitMetaCache(rootCoord, queryCoord)
require.Nil(t, err)
var (
......@@ -397,3 +426,82 @@ func TestMetaCache_ClearShards(t *testing.T) {
})
}
func TestMetaCache_LoadCache(t *testing.T) {
ctx := context.Background()
rootCoord := &MockRootCoordClientInterface{}
queryCoord := &MockQueryCoordClientInterface{}
err := InitMetaCache(rootCoord, queryCoord)
assert.Nil(t, err)
t.Run("test IsCollectionLoaded", func(t *testing.T) {
info, err := globalMetaCache.GetCollectionInfo(ctx, "collection1")
assert.NoError(t, err)
assert.True(t, info.isLoaded)
// no collectionInfo of collection1, should access RootCoord
assert.Equal(t, rootCoord.AccessCount, 1)
// not loaded, should access QueryCoord
assert.Equal(t, queryCoord.AccessCount, 1)
info, err = globalMetaCache.GetCollectionInfo(ctx, "collection1")
assert.NoError(t, err)
assert.True(t, info.isLoaded)
// shouldn't access QueryCoord or RootCoord again
assert.Equal(t, rootCoord.AccessCount, 1)
assert.Equal(t, queryCoord.AccessCount, 1)
// test collection2 not fully loaded
info, err = globalMetaCache.GetCollectionInfo(ctx, "collection2")
assert.NoError(t, err)
assert.False(t, info.isLoaded)
// no collectionInfo of collection2, should access RootCoord
assert.Equal(t, rootCoord.AccessCount, 2)
// not loaded, should access QueryCoord
assert.Equal(t, queryCoord.AccessCount, 2)
})
t.Run("test RemoveCollectionLoadCache", func(t *testing.T) {
globalMetaCache.RemoveCollection(ctx, "collection1")
info, err := globalMetaCache.GetCollectionInfo(ctx, "collection1")
assert.NoError(t, err)
assert.True(t, info.isLoaded)
// should access QueryCoord
assert.Equal(t, queryCoord.AccessCount, 3)
})
}
func TestMetaCache_RemoveCollection(t *testing.T) {
ctx := context.Background()
rootCoord := &MockRootCoordClientInterface{}
queryCoord := &MockQueryCoordClientInterface{}
err := InitMetaCache(rootCoord, queryCoord)
assert.Nil(t, err)
info, err := globalMetaCache.GetCollectionInfo(ctx, "collection1")
assert.NoError(t, err)
assert.True(t, info.isLoaded)
// no collectionInfo of collection1, should access RootCoord
assert.Equal(t, rootCoord.AccessCount, 1)
info, err = globalMetaCache.GetCollectionInfo(ctx, "collection1")
assert.NoError(t, err)
assert.True(t, info.isLoaded)
// shouldn't access RootCoord again
assert.Equal(t, rootCoord.AccessCount, 1)
globalMetaCache.RemoveCollection(ctx, "collection1")
// no collectionInfo of collection2, should access RootCoord
info, err = globalMetaCache.GetCollectionInfo(ctx, "collection1")
assert.NoError(t, err)
assert.True(t, info.isLoaded)
// shouldn't access RootCoord again
assert.Equal(t, rootCoord.AccessCount, 2)
globalMetaCache.RemoveCollectionsByID(ctx, UniqueID(1))
// no collectionInfo of collection2, should access RootCoord
info, err = globalMetaCache.GetCollectionInfo(ctx, "collection1")
assert.NoError(t, err)
assert.True(t, info.isLoaded)
// shouldn't access RootCoord again
assert.Equal(t, rootCoord.AccessCount, 3)
}
......@@ -245,7 +245,7 @@ func (node *Proxy) Init() error {
log.Debug("create metrics cache manager done", zap.String("role", typeutil.ProxyRole))
log.Debug("init meta cache", zap.String("role", typeutil.ProxyRole))
if err := InitMetaCache(node.rootCoord); err != nil {
if err := InitMetaCache(node.rootCoord, node.queryCoord); err != nil {
log.Warn("failed to init meta cache", zap.Error(err), zap.String("role", typeutil.ProxyRole))
return err
}
......
......@@ -1632,6 +1632,15 @@ func TestProxy(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode)
assert.Equal(t, "", resp.Reason)
// release collection cache
resp, err = proxy.InvalidateCollectionMetaCache(ctx, &proxypb.InvalidateCollMetaCacheRequest{
Base: nil,
CollectionName: collectionName,
})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode)
assert.Equal(t, "", resp.Reason)
})
wg.Add(1)
......@@ -1916,6 +1925,14 @@ func TestProxy(t *testing.T) {
})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode)
// release collection load cache
resp, err = proxy.InvalidateCollectionMetaCache(ctx, &proxypb.InvalidateCollMetaCacheRequest{
Base: nil,
CollectionName: collectionName,
})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode)
})
wg.Add(1)
......@@ -3016,7 +3033,10 @@ func TestProxy_Import(t *testing.T) {
msgStreamFactory := newSimpleMockMsgStreamFactory()
rc.Start()
defer rc.Stop()
err := InitMetaCache(rc)
qc := NewQueryCoordMock()
qc.Start()
defer qc.Stop()
err := InitMetaCache(rc, qc)
assert.NoError(t, err)
rc.CreateCollection(context.TODO(), &milvuspb.CreateCollectionRequest{
Base: &commonpb.MsgBase{
......
......@@ -894,6 +894,20 @@ func (coord *RootCoordMock) ReleaseDQLMessageStream(ctx context.Context, in *pro
}, nil
}
func (coord *RootCoordMock) InvalidateCollectionMetaCache(ctx context.Context, in *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
code := coord.state.Load().(internalpb.StateCode)
if code != internalpb.StateCode_Healthy {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: fmt.Sprintf("state code = %s", internalpb.StateCode_name[int32(code)]),
}, nil
}
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
}, nil
}
func (coord *RootCoordMock) SegmentFlushCompleted(ctx context.Context, in *datapb.SegmentFlushCompletedMsg) (*commonpb.Status, error) {
code := coord.state.Load().(internalpb.StateCode)
if code != internalpb.StateCode_Healthy {
......
......@@ -765,6 +765,7 @@ func (dct *dropCollectionTask) Execute(ctx context.Context) error {
_ = dct.chMgr.removeDMLStream(collID)
_ = dct.chMgr.removeDQLStream(collID)
globalMetaCache.RemoveCollection(ctx, dct.CollectionName)
return nil
}
......@@ -2859,6 +2860,7 @@ func (rct *releaseCollectionTask) Execute(ctx context.Context) (err error) {
rct.result, err = rct.queryCoord.ReleaseCollection(ctx, request)
_ = rct.chMgr.removeDQLStream(collID)
globalMetaCache.RemoveCollection(ctx, rct.CollectionName)
return err
}
......
......@@ -50,6 +50,7 @@ func TestGetIndexStateTask_Execute(t *testing.T) {
rootCoord := newMockRootCoord()
indexCoord := newMockIndexCoord()
queryCoord := NewQueryCoordMock()
gist := &getIndexStateTask{
GetIndexStateRequest: &milvuspb.GetIndexStateRequest{
......@@ -70,7 +71,7 @@ func TestGetIndexStateTask_Execute(t *testing.T) {
}
// failed to get collection id.
_ = InitMetaCache(rootCoord)
_ = InitMetaCache(rootCoord, queryCoord)
assert.Error(t, gist.Execute(ctx))
rootCoord.DescribeCollectionFunc = func(ctx context.Context, request *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
return &milvuspb.DescribeCollectionResponse{
......
......@@ -133,8 +133,12 @@ func (t *queryTask) PreExecute(ctx context.Context) error {
}
}
if !t.checkIfLoaded(collID, t.PartitionIDs) {
return fmt.Errorf("collection:%v or partition:%v not loaded into memory", collectionName, t.request.GetPartitionNames())
loaded, err := t.checkIfLoaded(collID, t.PartitionIDs)
if err != nil {
return fmt.Errorf("checkIfLoaded failed when query, collection:%v, partitions:%v, err = %s", collectionName, t.request.GetPartitionNames(), err)
}
if !loaded {
return fmt.Errorf("collection:%v or partition:%v not loaded into memory when query", collectionName, t.request.GetPartitionNames())
}
schema, _ := globalMetaCache.GetCollectionSchema(ctx, collectionName)
......@@ -379,9 +383,18 @@ func (t *queryTask) queryShard(ctx context.Context, leaders []queryNode, channel
return nil
}
func (t *queryTask) checkIfLoaded(collectionID UniqueID, searchPartitionIDs []UniqueID) bool {
// If request to search partitions
if len(searchPartitionIDs) > 0 {
func (t *queryTask) checkIfLoaded(collectionID UniqueID, queryPartitionIDs []UniqueID) (bool, error) {
// check if collection was loaded into QueryNode
info, err := globalMetaCache.GetCollectionInfo(t.ctx, t.collectionName)
if err != nil {
return false, fmt.Errorf("GetCollectionInfo failed, collectionID = %d, err = %s", collectionID, err)
}
if info.isLoaded {
return true, nil
}
// If request to query partitions
if len(queryPartitionIDs) > 0 {
resp, err := t.qc.ShowPartitions(t.ctx, &querypb.ShowPartitionsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_ShowCollections,
......@@ -390,95 +403,43 @@ func (t *queryTask) checkIfLoaded(collectionID UniqueID, searchPartitionIDs []Un
SourceID: Params.ProxyCfg.GetNodeID(),
},
CollectionID: collectionID,
PartitionIDs: searchPartitionIDs,
PartitionIDs: queryPartitionIDs,
})
if err != nil {
log.Warn("fail to show partitions by QueryCoord",
zap.Int64("requestID", t.Base.MsgID),
zap.Int64("collectionID", collectionID),
zap.Int64s("partitionIDs", searchPartitionIDs),
zap.String("requestType", "search"),
zap.Error(err))
return false
return false, fmt.Errorf("showPartitions failed, collectionID = %d, partitionIDs = %v, err = %s", collectionID, queryPartitionIDs, err)
}
if resp.Status.ErrorCode != commonpb.ErrorCode_Success {
log.Warn("fail to show partitions by QueryCoord",
zap.Int64("collectionID", collectionID),
zap.Int64s("partitionIDs", searchPartitionIDs),
zap.Int64("requestID", t.Base.MsgID), zap.String("requestType", "search"),
zap.String("reason", resp.GetStatus().GetReason()))
return false
if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
return false, fmt.Errorf("showPartitions failed, collectionID = %d, partitionIDs = %v, reason = %s", collectionID, queryPartitionIDs, resp.GetStatus().GetReason())
}
// Current logic: show partitions won't return error if the given partitions are all loaded
return true
return true, nil
}
// If request to search collection
resp, err := t.qc.ShowCollections(t.ctx, &querypb.ShowCollectionsRequest{
// If request to query collection and collection is not fully loaded
resp, err := t.qc.ShowPartitions(t.ctx, &querypb.ShowPartitionsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_ShowCollections,
MsgID: t.Base.MsgID,
Timestamp: t.Base.Timestamp,
SourceID: Params.ProxyCfg.GetNodeID(),
},
CollectionID: collectionID,
})
if err != nil {
log.Warn("fail to show collections by QueryCoord",
zap.Int64("requestID", t.Base.MsgID), zap.String("requestType", "search"),
zap.Error(err))
return false
return false, fmt.Errorf("showPartitions failed, collectionID = %d, partitionIDs = %v, err = %s", collectionID, queryPartitionIDs, err)
}
if resp.Status.ErrorCode != commonpb.ErrorCode_Success {
log.Warn("fail to show collections by QueryCoord",
zap.Int64("requestID", t.Base.MsgID), zap.String("requestType", "search"),
zap.String("reason", resp.GetStatus().GetReason()))
return false
if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
return false, fmt.Errorf("showPartitions failed, collectionID = %d, partitionIDs = %v, reason = %s", collectionID, queryPartitionIDs, resp.GetStatus().GetReason())
}
loaded := false
for index, collID := range resp.CollectionIDs {
if collID == collectionID && resp.GetInMemoryPercentages()[index] >= int64(100) {
loaded = true
break
}
}
if !loaded {
resp, err := t.qc.ShowPartitions(t.ctx, &querypb.ShowPartitionsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_ShowCollections,
MsgID: t.Base.MsgID,
Timestamp: t.Base.Timestamp,
SourceID: Params.ProxyCfg.GetNodeID(),
},
CollectionID: collectionID,
})
if err != nil {
log.Warn("fail to show partitions by QueryCoord",
zap.Int64("requestID", t.Base.MsgID),
zap.Int64("collectionID", collectionID),
zap.String("requestType", "search"),
zap.Error(err))
return false
}
if resp.Status.ErrorCode != commonpb.ErrorCode_Success {
log.Warn("fail to show partitions by QueryCoord",
zap.Int64("collectionID", collectionID),
zap.Int64("requestID", t.Base.MsgID), zap.String("requestType", "search"),
zap.String("reason", resp.GetStatus().GetReason()))
return false
}
if len(resp.GetPartitionIDs()) > 0 {
log.Warn("collection not fully loaded, search on these partitions", zap.Int64s("partitionIDs", resp.GetPartitionIDs()))
return true
}
if len(resp.GetPartitionIDs()) > 0 {
log.Warn("collection not fully loaded, query on these partitions",
zap.Int64("collectionID", collectionID),
zap.Int64s("partitionIDs", resp.GetPartitionIDs()))
return true, nil
}
return loaded
return false, nil
}
// IDs2Expr converts ids slices to bool expresion with specified field name
......
......@@ -50,7 +50,7 @@ func TestQueryTask_all(t *testing.T) {
qc.Start()
defer qc.Stop()
err = InitMetaCache(rc)
err = InitMetaCache(rc, qc)
assert.NoError(t, err)
fieldName2Types := map[string]schemapb.DataType{
......@@ -167,3 +167,158 @@ func TestQueryTask_all(t *testing.T) {
assert.NoError(t, task.PostExecute(ctx))
}
func TestCheckIfLoaded(t *testing.T) {
var err error
Params.Init()
var (
rc = NewRootCoordMock()
qc = NewQueryCoordMock()
ctx = context.TODO()
)
err = rc.Start()
defer rc.Stop()
require.NoError(t, err)
err = InitMetaCache(rc, qc)
require.NoError(t, err)
err = qc.Start()
defer qc.Stop()
require.NoError(t, err)
getQueryTask := func(t *testing.T, collName string) *queryTask {
task := &queryTask{
ctx: ctx,
RetrieveRequest: &internalpb.RetrieveRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Retrieve,
},
},
request: &milvuspb.QueryRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Retrieve,
},
CollectionName: collName,
},
qc: qc,
}
require.NoError(t, task.OnEnqueue())
return task
}
t.Run("test checkIfLoaded error", func(t *testing.T) {
collName := "test_checkIfLoaded_error" + funcutil.GenRandomStr()
createColl(t, collName, rc)
collID, err := globalMetaCache.GetCollectionID(context.TODO(), collName)
require.NoError(t, err)
task := getQueryTask(t, collName)
task.collectionName = collName
t.Run("show collection err", func(t *testing.T) {
qc.SetShowCollectionsFunc(func(ctx context.Context, request *querypb.ShowCollectionsRequest) (*querypb.ShowCollectionsResponse, error) {
return nil, fmt.Errorf("mock")
})
loaded, err := task.checkIfLoaded(collID, []UniqueID{})
assert.Error(t, err)
assert.False(t, loaded)
})
t.Run("show collection status unexpected error", func(t *testing.T) {
qc.SetShowCollectionsFunc(func(ctx context.Context, request *querypb.ShowCollectionsRequest) (*querypb.ShowCollectionsResponse, error) {
return &querypb.ShowCollectionsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "mock",
},
}, nil
})
loaded, err := task.checkIfLoaded(collID, []UniqueID{})
assert.Error(t, err)
assert.False(t, loaded)
assert.Error(t, task.PreExecute(ctx))
qc.ResetShowCollectionsFunc()
})
t.Run("show partition error", func(t *testing.T) {
qc.SetShowPartitionsFunc(func(ctx context.Context, req *querypb.ShowPartitionsRequest) (*querypb.ShowPartitionsResponse, error) {
return &querypb.ShowPartitionsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "mock",
},
}, nil
})
loaded, err := task.checkIfLoaded(collID, []UniqueID{1})
assert.Error(t, err)
assert.False(t, loaded)
})
t.Run("show partition status unexpected error", func(t *testing.T) {
qc.SetShowPartitionsFunc(func(ctx context.Context, req *querypb.ShowPartitionsRequest) (*querypb.ShowPartitionsResponse, error) {
return nil, fmt.Errorf("mock error")
})
loaded, err := task.checkIfLoaded(collID, []UniqueID{1})
assert.Error(t, err)
assert.False(t, loaded)
})
t.Run("show partitions success", func(t *testing.T) {
qc.SetShowPartitionsFunc(func(ctx context.Context, req *querypb.ShowPartitionsRequest) (*querypb.ShowPartitionsResponse, error) {
return &querypb.ShowPartitionsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
}, nil
})
loaded, err := task.checkIfLoaded(collID, []UniqueID{1})
assert.NoError(t, err)
assert.True(t, loaded)
qc.ResetShowPartitionsFunc()
})
t.Run("show collection success but not loaded", func(t *testing.T) {
qc.SetShowCollectionsFunc(func(ctx context.Context, request *querypb.ShowCollectionsRequest) (*querypb.ShowCollectionsResponse, error) {
return &querypb.ShowCollectionsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
CollectionIDs: []UniqueID{collID},
InMemoryPercentages: []int64{0},
}, nil
})
qc.SetShowPartitionsFunc(func(ctx context.Context, req *querypb.ShowPartitionsRequest) (*querypb.ShowPartitionsResponse, error) {
return nil, fmt.Errorf("mock error")
})
loaded, err := task.checkIfLoaded(collID, []UniqueID{})
assert.Error(t, err)
assert.False(t, loaded)
qc.SetShowPartitionsFunc(func(ctx context.Context, req *querypb.ShowPartitionsRequest) (*querypb.ShowPartitionsResponse, error) {
return nil, fmt.Errorf("mock error")
})
loaded, err = task.checkIfLoaded(collID, []UniqueID{})
assert.Error(t, err)
assert.False(t, loaded)
qc.SetShowPartitionsFunc(func(ctx context.Context, req *querypb.ShowPartitionsRequest) (*querypb.ShowPartitionsResponse, error) {
return &querypb.ShowPartitionsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
PartitionIDs: []UniqueID{1},
}, nil
})
loaded, err = task.checkIfLoaded(collID, []UniqueID{})
assert.NoError(t, err)
assert.True(t, loaded)
})
qc.ResetShowCollectionsFunc()
qc.ResetShowPartitionsFunc()
})
}
......@@ -115,8 +115,12 @@ func (t *searchTask) PreExecute(ctx context.Context) error {
}
// check if collection/partitions are loaded into query node
if !t.checkIfLoaded(collID, t.PartitionIDs) {
return fmt.Errorf("collection:%v or partitions:%v not loaded into memory", collectionName, t.request.GetPartitionNames())
loaded, err := t.checkIfLoaded(collID, t.PartitionIDs)
if err != nil {
return fmt.Errorf("checkIfLoaded failed when search, collection:%v, partitions:%v, err = %s", collectionName, t.request.GetPartitionNames(), err)
}
if !loaded {
return fmt.Errorf("collection:%v or partition:%v not loaded into memory when search", collectionName, t.request.GetPartitionNames())
}
// TODO(dragondriver): necessary to check if partition was loaded into query node?
......@@ -428,7 +432,16 @@ func (t *searchTask) searchShard(ctx context.Context, leaders []queryNode, chann
return nil
}
func (t *searchTask) checkIfLoaded(collectionID UniqueID, searchPartitionIDs []UniqueID) bool {
func (t *searchTask) checkIfLoaded(collectionID UniqueID, searchPartitionIDs []UniqueID) (bool, error) {
// check if collection was loaded into QueryNode
info, err := globalMetaCache.GetCollectionInfo(t.ctx, t.collectionName)
if err != nil {
return false, fmt.Errorf("GetCollectionInfo failed, collectionID = %d, err = %s", collectionID, err)
}
if info.isLoaded {
return true, nil
}
// If request to search partitions
if len(searchPartitionIDs) > 0 {
resp, err := t.qc.ShowPartitions(t.ctx, &querypb.ShowPartitionsRequest{
......@@ -442,92 +455,42 @@ func (t *searchTask) checkIfLoaded(collectionID UniqueID, searchPartitionIDs []U
PartitionIDs: searchPartitionIDs,
})
if err != nil {
log.Warn("fail to show partitions by QueryCoord",
zap.Int64("requestID", t.Base.MsgID),
zap.Int64("collectionID", collectionID),
zap.Int64s("partitionIDs", searchPartitionIDs),
zap.String("requestType", "search"),
zap.Error(err))
return false
return false, fmt.Errorf("showPartitions failed, collectionID = %d, partitionIDs = %v, err = %s", collectionID, searchPartitionIDs, err)
}
if resp.Status.ErrorCode != commonpb.ErrorCode_Success {
log.Warn("fail to show partitions by QueryCoord",
zap.Int64("collectionID", collectionID),
zap.Int64s("partitionIDs", searchPartitionIDs),
zap.Int64("requestID", t.Base.MsgID), zap.String("requestType", "search"),
zap.String("reason", resp.GetStatus().GetReason()))
return false
return false, fmt.Errorf("showPartitions failed, collectionID = %d, partitionIDs = %v, reason = %s", collectionID, searchPartitionIDs, resp.GetStatus().GetReason())
}
// Current logic: show partitions won't return error if the given partitions are all loaded
return true
return true, nil
}
// If request to search collection
resp, err := t.qc.ShowCollections(t.ctx, &querypb.ShowCollectionsRequest{
// If request to search collection and collection is not fully loaded
resp, err := t.qc.ShowPartitions(t.ctx, &querypb.ShowPartitionsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_ShowCollections,
MsgID: t.Base.MsgID,
Timestamp: t.Base.Timestamp,
SourceID: Params.ProxyCfg.GetNodeID(),
},
CollectionID: collectionID,
})
if err != nil {
log.Warn("fail to show collections by QueryCoord",
zap.Int64("requestID", t.Base.MsgID), zap.String("requestType", "search"),
zap.Error(err))
return false
return false, fmt.Errorf("showPartitions failed, collectionID = %d, partitionIDs = %v, err = %s", collectionID, searchPartitionIDs, err)
}
if resp.Status.ErrorCode != commonpb.ErrorCode_Success {
log.Warn("fail to show collections by QueryCoord",
zap.Int64("requestID", t.Base.MsgID), zap.String("requestType", "search"),
zap.String("reason", resp.GetStatus().GetReason()))
return false
return false, fmt.Errorf("showPartitions failed, collectionID = %d, partitionIDs = %v, reason = %s", collectionID, searchPartitionIDs, resp.GetStatus().GetReason())
}
loaded := false
for index, collID := range resp.CollectionIDs {
if collID == collectionID && resp.GetInMemoryPercentages()[index] >= int64(100) {
loaded = true
break
}
}
if !loaded {
resp, err := t.qc.ShowPartitions(t.ctx, &querypb.ShowPartitionsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_ShowCollections,
MsgID: t.Base.MsgID,
Timestamp: t.Base.Timestamp,
SourceID: Params.ProxyCfg.GetNodeID(),
},
CollectionID: collectionID,
})
if err != nil {
log.Warn("fail to show partitions by QueryCoord",
zap.Int64("requestID", t.Base.MsgID),
zap.Int64("collectionID", collectionID),
zap.String("requestType", "search"),
zap.Error(err))
return false
}
if resp.Status.ErrorCode != commonpb.ErrorCode_Success {
log.Warn("fail to show partitions by QueryCoord",
zap.Int64("collectionID", collectionID),
zap.Int64("requestID", t.Base.MsgID), zap.String("requestType", "search"),
zap.String("reason", resp.GetStatus().GetReason()))
return false
}
if len(resp.GetPartitionIDs()) > 0 {
log.Warn("collection not fully loaded, search on these partitions", zap.Int64s("partitionIDs", resp.GetPartitionIDs()))
return true
}
if len(resp.GetPartitionIDs()) > 0 {
log.Warn("collection not fully loaded, search on these partitions",
zap.Int64("collectionID", collectionID),
zap.Int64s("partitionIDs", resp.GetPartitionIDs()))
return true, nil
}
return loaded
return false, nil
}
func decodeSearchResults(searchResults []*internalpb.SearchResults) ([]*schemapb.SearchResultData, error) {
......
......@@ -124,7 +124,7 @@ func TestSearchTask_PreExecute(t *testing.T) {
err = rc.Start()
defer rc.Stop()
require.NoError(t, err)
err = InitMetaCache(rc)
err = InitMetaCache(rc, qc)
require.NoError(t, err)
err = qc.Start()
......@@ -198,13 +198,16 @@ func TestSearchTask_PreExecute(t *testing.T) {
collID, err := globalMetaCache.GetCollectionID(context.TODO(), collName)
require.NoError(t, err)
task := getSearchTask(t, collName)
task.collectionName = collName
t.Run("show collection err", func(t *testing.T) {
qc.SetShowCollectionsFunc(func(ctx context.Context, request *querypb.ShowCollectionsRequest) (*querypb.ShowCollectionsResponse, error) {
return nil, errors.New("mock")
})
assert.False(t, task.checkIfLoaded(collID, []UniqueID{}))
loaded, err := task.checkIfLoaded(collID, []UniqueID{})
assert.Error(t, err)
assert.False(t, loaded)
})
t.Run("show collection status unexpected error", func(t *testing.T) {
......@@ -217,7 +220,9 @@ func TestSearchTask_PreExecute(t *testing.T) {
}, nil
})
assert.False(t, task.checkIfLoaded(collID, []UniqueID{}))
loaded, err := task.checkIfLoaded(collID, []UniqueID{})
assert.Error(t, err)
assert.False(t, loaded)
assert.Error(t, task.PreExecute(ctx))
qc.ResetShowCollectionsFunc()
})
......@@ -231,14 +236,18 @@ func TestSearchTask_PreExecute(t *testing.T) {
},
}, nil
})
assert.False(t, task.checkIfLoaded(collID, []UniqueID{1}))
loaded, err := task.checkIfLoaded(collID, []UniqueID{1})
assert.Error(t, err)
assert.False(t, loaded)
})
t.Run("show partition status unexpected error", func(t *testing.T) {
qc.SetShowPartitionsFunc(func(ctx context.Context, req *querypb.ShowPartitionsRequest) (*querypb.ShowPartitionsResponse, error) {
return nil, errors.New("mock error")
})
assert.False(t, task.checkIfLoaded(collID, []UniqueID{1}))
loaded, err := task.checkIfLoaded(collID, []UniqueID{1})
assert.Error(t, err)
assert.False(t, loaded)
})
t.Run("show partitions success", func(t *testing.T) {
......@@ -249,7 +258,9 @@ func TestSearchTask_PreExecute(t *testing.T) {
},
}, nil
})
assert.True(t, task.checkIfLoaded(collID, []UniqueID{1}))
loaded, err := task.checkIfLoaded(collID, []UniqueID{1})
assert.NoError(t, err)
assert.True(t, loaded)
qc.ResetShowPartitionsFunc()
})
......@@ -267,12 +278,16 @@ func TestSearchTask_PreExecute(t *testing.T) {
qc.SetShowPartitionsFunc(func(ctx context.Context, req *querypb.ShowPartitionsRequest) (*querypb.ShowPartitionsResponse, error) {
return nil, errors.New("mock error")
})
assert.False(t, task.checkIfLoaded(collID, []UniqueID{}))
loaded, err := task.checkIfLoaded(collID, []UniqueID{})
assert.Error(t, err)
assert.False(t, loaded)
qc.SetShowPartitionsFunc(func(ctx context.Context, req *querypb.ShowPartitionsRequest) (*querypb.ShowPartitionsResponse, error) {
return nil, errors.New("mock error")
})
assert.False(t, task.checkIfLoaded(collID, []UniqueID{}))
loaded, err = task.checkIfLoaded(collID, []UniqueID{})
assert.Error(t, err)
assert.False(t, loaded)
qc.SetShowPartitionsFunc(func(ctx context.Context, req *querypb.ShowPartitionsRequest) (*querypb.ShowPartitionsResponse, error) {
return &querypb.ShowPartitionsResponse{
......@@ -282,7 +297,9 @@ func TestSearchTask_PreExecute(t *testing.T) {
PartitionIDs: []UniqueID{1},
}, nil
})
assert.True(t, task.checkIfLoaded(collID, []UniqueID{}))
loaded, err = task.checkIfLoaded(collID, []UniqueID{})
assert.NoError(t, err)
assert.True(t, loaded)
})
qc.ResetShowCollectionsFunc()
......@@ -406,7 +423,7 @@ func TestSearchTaskV2_Execute(t *testing.T) {
err = rc.Start()
require.NoError(t, err)
defer rc.Stop()
err = InitMetaCache(rc)
err = InitMetaCache(rc, qc)
require.NoError(t, err)
err = qc.Start()
......
......@@ -1090,8 +1090,11 @@ func TestDropCollectionTask(t *testing.T) {
rc := NewRootCoordMock()
rc.Start()
defer rc.Stop()
qc := NewQueryCoordMock()
qc.Start()
defer qc.Stop()
ctx := context.Background()
InitMetaCache(rc)
InitMetaCache(rc, qc)
master := newMockGetChannelsService()
query := newMockGetChannelsService()
......@@ -1175,8 +1178,11 @@ func TestHasCollectionTask(t *testing.T) {
rc := NewRootCoordMock()
rc.Start()
defer rc.Stop()
qc := NewQueryCoordMock()
qc.Start()
defer qc.Stop()
ctx := context.Background()
InitMetaCache(rc)
InitMetaCache(rc, qc)
prefix := "TestHasCollectionTask"
dbName := ""
collectionName := prefix + funcutil.GenRandomStr()
......@@ -1257,8 +1263,11 @@ func TestDescribeCollectionTask(t *testing.T) {
rc := NewRootCoordMock()
rc.Start()
defer rc.Stop()
qc := NewQueryCoordMock()
qc.Start()
defer qc.Stop()
ctx := context.Background()
InitMetaCache(rc)
InitMetaCache(rc, qc)
prefix := "TestDescribeCollectionTask"
dbName := ""
collectionName := prefix + funcutil.GenRandomStr()
......@@ -1316,8 +1325,11 @@ func TestDescribeCollectionTask_ShardsNum1(t *testing.T) {
rc := NewRootCoordMock()
rc.Start()
defer rc.Stop()
qc := NewQueryCoordMock()
qc.Start()
defer qc.Stop()
ctx := context.Background()
InitMetaCache(rc)
InitMetaCache(rc, qc)
prefix := "TestDescribeCollectionTask"
dbName := ""
collectionName := prefix + funcutil.GenRandomStr()
......@@ -1377,8 +1389,11 @@ func TestDescribeCollectionTask_ShardsNum2(t *testing.T) {
rc := NewRootCoordMock()
rc.Start()
defer rc.Stop()
qc := NewQueryCoordMock()
qc.Start()
defer qc.Stop()
ctx := context.Background()
InitMetaCache(rc)
InitMetaCache(rc, qc)
prefix := "TestDescribeCollectionTask"
dbName := ""
collectionName := prefix + funcutil.GenRandomStr()
......@@ -1639,10 +1654,13 @@ func TestTask_Int64PrimaryKey(t *testing.T) {
rc := NewRootCoordMock()
rc.Start()
defer rc.Stop()
qc := NewQueryCoordMock()
qc.Start()
defer qc.Stop()
ctx := context.Background()
err = InitMetaCache(rc)
err = InitMetaCache(rc, qc)
assert.NoError(t, err)
shardsNum := int32(2)
......@@ -1892,10 +1910,13 @@ func TestTask_VarCharPrimaryKey(t *testing.T) {
rc := NewRootCoordMock()
rc.Start()
defer rc.Stop()
qc := NewQueryCoordMock()
qc.Start()
defer qc.Stop()
ctx := context.Background()
err = InitMetaCache(rc)
err = InitMetaCache(rc, qc)
assert.NoError(t, err)
shardsNum := int32(2)
......
......@@ -73,6 +73,32 @@ func (broker *globalMetaBroker) releaseDQLMessageStream(ctx context.Context, col
return nil
}
// invalidateCollectionMetaCache notifies RootCoord to remove all the collection meta cache with the specified collectionID in Proxies
func (broker *globalMetaBroker) invalidateCollectionMetaCache(ctx context.Context, collectionID UniqueID) error {
ctx1, cancel1 := context.WithTimeout(ctx, timeoutForRPC)
defer cancel1()
req := &proxypb.InvalidateCollMetaCacheRequest{
Base: &commonpb.MsgBase{
MsgType: 0, // TODO: msg type?
},
CollectionID: collectionID,
}
res, err := broker.rootCoord.InvalidateCollectionMetaCache(ctx1, req)
if err != nil {
log.Error("InvalidateCollMetaCacheRequest failed", zap.Int64("collectionID", collectionID), zap.Error(err))
return err
}
if res.ErrorCode != commonpb.ErrorCode_Success {
err = errors.New(res.Reason)
log.Error("InvalidateCollMetaCacheRequest failed", zap.Int64("collectionID", collectionID), zap.Error(err))
return err
}
log.Info("InvalidateCollMetaCacheRequest successfully", zap.Int64("collectionID", collectionID))
return nil
}
func (broker *globalMetaBroker) showPartitionIDs(ctx context.Context, collectionID UniqueID) ([]UniqueID, error) {
ctx2, cancel2 := context.WithTimeout(ctx, timeoutForRPC)
defer cancel2()
......
......@@ -41,6 +41,8 @@ func TestGlobalMetaBroker_RootCoord(t *testing.T) {
t.Run("successCase", func(t *testing.T) {
err = handler.releaseDQLMessageStream(ctx, defaultCollectionID)
assert.Nil(t, err)
err = handler.invalidateCollectionMetaCache(ctx, defaultCollectionID)
assert.NoError(t, err)
enableIndex, _, err := handler.getIndexBuildID(ctx, defaultCollectionID, defaultSegmentID)
assert.Nil(t, err)
_, err = handler.showPartitionIDs(ctx, defaultCollectionID)
......@@ -52,6 +54,8 @@ func TestGlobalMetaBroker_RootCoord(t *testing.T) {
rootCoord.returnError = true
err = handler.releaseDQLMessageStream(ctx, defaultCollectionID)
assert.Error(t, err)
err = handler.invalidateCollectionMetaCache(ctx, defaultCollectionID)
assert.Error(t, err)
_, _, err = handler.getIndexBuildID(ctx, defaultCollectionID, defaultSegmentID)
assert.Error(t, err)
_, err = handler.showPartitionIDs(ctx, defaultCollectionID)
......@@ -63,6 +67,8 @@ func TestGlobalMetaBroker_RootCoord(t *testing.T) {
rootCoord.returnGrpcError = true
err = handler.releaseDQLMessageStream(ctx, defaultCollectionID)
assert.Error(t, err)
err = handler.invalidateCollectionMetaCache(ctx, defaultCollectionID)
assert.Error(t, err)
_, _, err = handler.getIndexBuildID(ctx, defaultCollectionID, defaultSegmentID)
assert.Error(t, err)
_, err = handler.showPartitionIDs(ctx, defaultCollectionID)
......
......@@ -134,6 +134,8 @@ type rootCoordMock struct {
returnError bool
returnGrpcError bool
enableIndex bool
invalidateCollMetaCacheFailed bool
}
func newRootCoordMock(ctx context.Context) *rootCoordMock {
......@@ -238,6 +240,30 @@ func (rc *rootCoordMock) ReleaseDQLMessageStream(ctx context.Context, in *proxyp
}, nil
}
func (rc *rootCoordMock) InvalidateCollectionMetaCache(ctx context.Context, in *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
if rc.returnGrpcError {
return nil, errors.New("InvalidateCollectionMetaCache failed")
}
if rc.returnError {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "InvalidateCollectionMetaCache failed",
}, nil
}
if rc.invalidateCollMetaCacheFailed {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "InvalidateCollectionMetaCache failed",
}, nil
}
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}, nil
}
func (rc *rootCoordMock) DescribeSegment(ctx context.Context, req *milvuspb.DescribeSegmentRequest) (*milvuspb.DescribeSegmentResponse, error) {
if rc.returnGrpcError {
return nil, errors.New("describe segment failed")
......
......@@ -688,6 +688,14 @@ func (rct *releaseCollectionTask) execute(ctx context.Context) error {
return err
}
// invalidate all the collection meta cache with the specified collectionID
err = rct.broker.invalidateCollectionMetaCache(ctx, collectionID)
if err != nil {
log.Error("releaseCollectionTask: release collection end, invalidateCollectionMetaCache occur error", zap.Int64("collectionID", rct.CollectionID), zap.Int64("msgID", rct.Base.MsgID), zap.Error(err))
rct.setResultInfo(err)
return err
}
// TODO(yah01): broadcast to all nodes? Or only nodes serve the collection
onlineNodeIDs := rct.cluster.onlineNodeIDs()
for _, nodeID := range onlineNodeIDs {
......
......@@ -304,6 +304,9 @@ func TestTriggerTask(t *testing.T) {
queryCoord, err := startQueryCoord(ctx)
assert.Nil(t, err)
err = queryCoord.meta.addCollection(defaultCollectionID, querypb.LoadType_LoadCollection, genDefaultCollectionSchema(false))
assert.NoError(t, err)
node1, err := startQueryNodeServer(ctx)
assert.Nil(t, err)
node2, err := startQueryNodeServer(ctx)
......@@ -354,6 +357,14 @@ func TestTriggerTask(t *testing.T) {
assert.Nil(t, err)
})
t.Run("Test ReleaseCollection with invalidateCollectionMetaCache error", func(t *testing.T) {
releaseCollectionTask := genReleaseCollectionTask(ctx, queryCoord)
queryCoord.scheduler.broker.rootCoord.(*rootCoordMock).invalidateCollMetaCacheFailed = true
err = queryCoord.scheduler.processTask(releaseCollectionTask)
assert.Error(t, err)
queryCoord.scheduler.broker.rootCoord.(*rootCoordMock).invalidateCollMetaCacheFailed = false
})
t.Run("Test LoadPartition With Replicas", func(t *testing.T) {
loadPartitionTask := genLoadPartitionTask(ctx, queryCoord)
loadPartitionTask.ReplicaNumber = 3
......@@ -587,6 +598,9 @@ func Test_ReleaseCollectionExecuteFail(t *testing.T) {
assert.Nil(t, err)
node.setRPCInterface(&node.releaseCollection, returnFailedResult)
err = queryCoord.meta.addCollection(defaultCollectionID, querypb.LoadType_LoadCollection, genDefaultCollectionSchema(false))
assert.NoError(t, err)
waitQueryNodeOnline(queryCoord.cluster, node.queryNodeID)
releaseCollectionTask := genReleaseCollectionTask(ctx, queryCoord)
err = queryCoord.scheduler.Enqueue(releaseCollectionTask)
......
......@@ -21,14 +21,15 @@ import (
"fmt"
"sync"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/proxypb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"go.uber.org/zap"
)
type proxyClientManager struct {
......@@ -104,151 +105,132 @@ func (p *proxyClientManager) DelProxyClient(s *sessionutil.Session) {
log.Debug("remove proxy client", zap.String("proxy address", s.Address), zap.Int64("proxy id", s.ServerID))
}
func (p *proxyClientManager) InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) {
func (p *proxyClientManager) InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) error {
p.lock.Lock()
defer p.lock.Unlock()
if len(p.proxyClient) == 0 {
log.Debug("proxy client is empty,InvalidateCollectionMetaCache will not send to any client")
return
log.Warn("proxy client is empty, InvalidateCollectionMetaCache will not send to any client")
return nil
}
for k, f := range p.proxyClient {
err := func() error {
defer func() {
if err := recover(); err != nil {
log.Debug("call InvalidateCollectionMetaCache panic", zap.Int64("proxy id", k), zap.Any("msg", err))
}
}()
sta, err := f.InvalidateCollectionMetaCache(ctx, request)
group := &errgroup.Group{}
for k, v := range p.proxyClient {
k, v := k, v
group.Go(func() error {
sta, err := v.InvalidateCollectionMetaCache(ctx, request)
if err != nil {
return fmt.Errorf("grpc fail,error=%w", err)
return fmt.Errorf("InvalidateCollectionMetaCache failed, proxyID = %d, err = %s", k, err)
}
if sta.ErrorCode != commonpb.ErrorCode_Success {
return fmt.Errorf("message = %s", sta.Reason)
return fmt.Errorf("InvalidateCollectionMetaCache failed, proxyID = %d, err = %s", k, sta.Reason)
}
return nil
}()
if err != nil {
log.Error("Failed to call invalidate collection meta", zap.Int64("proxy id", k), zap.Error(err))
} else {
log.Debug("send invalidate collection meta cache to proxy node", zap.Int64("node id", k))
}
})
}
return group.Wait()
}
func (p *proxyClientManager) ReleaseDQLMessageStream(ctx context.Context, in *proxypb.ReleaseDQLMessageStreamRequest) (*commonpb.Status, error) {
func (p *proxyClientManager) ReleaseDQLMessageStream(ctx context.Context, in *proxypb.ReleaseDQLMessageStreamRequest) error {
p.lock.Lock()
defer p.lock.Unlock()
if len(p.proxyClient) == 0 {
log.Debug("proxy client is empty,ReleaseDQLMessageStream will not send to any client")
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}, nil
}
for k, f := range p.proxyClient {
sta, err := func() (retSta *commonpb.Status, retErr error) {
defer func() {
if err := recover(); err != nil {
log.Debug("call proxy node ReleaseDQLMessageStream panic", zap.Int64("proxy node id", k), zap.Any("error", err))
retSta.ErrorCode = commonpb.ErrorCode_UnexpectedError
retSta.Reason = fmt.Sprintf("call proxy node ReleaseDQLMessageStream panic, proxy node id =%d, error = %v", k, err)
retErr = nil
}
}()
retSta, retErr = f.ReleaseDQLMessageStream(ctx, in)
return
}()
if err != nil {
return sta, err
}
if sta.ErrorCode != commonpb.ErrorCode_Success {
return sta, err
}
}
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}, nil
log.Warn("proxy client is empty, ReleaseDQLMessageStream will not send to any client")
return nil
}
group := &errgroup.Group{}
for k, v := range p.proxyClient {
k, v := k, v
group.Go(func() error {
sta, err := v.ReleaseDQLMessageStream(ctx, in)
if err != nil {
return fmt.Errorf("ReleaseDQLMessageStream failed, proxyID = %d, err = %s", k, err)
}
if sta.ErrorCode != commonpb.ErrorCode_Success {
return fmt.Errorf("ReleaseDQLMessageStream failed, proxyID = %d, err = %s", k, sta.Reason)
}
return nil
})
}
return group.Wait()
}
func (p *proxyClientManager) InvalidateCredentialCache(ctx context.Context, request *proxypb.InvalidateCredCacheRequest) error {
p.lock.Lock()
defer p.lock.Unlock()
defer func() {
if err := recover(); err != nil {
log.Debug("call InvalidateCredentialCache panic", zap.Any("msg", err))
}
}()
if len(p.proxyClient) == 0 {
log.Warn("proxy client is empty, InvalidateCredentialCache will not send to any client")
return nil
}
for _, f := range p.proxyClient {
sta, err := f.InvalidateCredentialCache(ctx, request)
if err != nil {
return fmt.Errorf("grpc fail, error=%w", err)
}
if sta.ErrorCode != commonpb.ErrorCode_Success {
return fmt.Errorf("message = %s", sta.Reason)
}
group := &errgroup.Group{}
for k, v := range p.proxyClient {
k, v := k, v
group.Go(func() error {
sta, err := v.InvalidateCredentialCache(ctx, request)
if err != nil {
return fmt.Errorf("InvalidateCredentialCache failed, proxyID = %d, err = %s", k, err)
}
if sta.ErrorCode != commonpb.ErrorCode_Success {
return fmt.Errorf("InvalidateCredentialCache failed, proxyID = %d, err = %s", k, sta.Reason)
}
return nil
})
}
return nil
return group.Wait()
}
func (p *proxyClientManager) UpdateCredentialCache(ctx context.Context, request *proxypb.UpdateCredCacheRequest) error {
p.lock.Lock()
defer p.lock.Unlock()
defer func() {
if err := recover(); err != nil {
log.Debug("call UpdateCredentialCache panic", zap.Any("msg", err))
}
}()
if len(p.proxyClient) == 0 {
log.Warn("proxy client is empty, UpdateCredentialCache will not send to any client")
return nil
}
for _, f := range p.proxyClient {
sta, err := f.UpdateCredentialCache(ctx, request)
if err != nil {
return fmt.Errorf("grpc fail, error=%w", err)
}
if sta.ErrorCode != commonpb.ErrorCode_Success {
return fmt.Errorf("message = %s", sta.Reason)
}
group := &errgroup.Group{}
for k, v := range p.proxyClient {
k, v := k, v
group.Go(func() error {
sta, err := v.UpdateCredentialCache(ctx, request)
if err != nil {
return fmt.Errorf("UpdateCredentialCache failed, proxyID = %d, err = %s", k, err)
}
if sta.ErrorCode != commonpb.ErrorCode_Success {
return fmt.Errorf("UpdateCredentialCache failed, proxyID = %d, err = %s", k, sta.Reason)
}
return nil
})
}
return nil
return group.Wait()
}
func (p *proxyClientManager) ClearCredUsersCache(ctx context.Context, request *internalpb.ClearCredUsersCacheRequest) error {
p.lock.Lock()
defer p.lock.Unlock()
defer func() {
if err := recover(); err != nil {
log.Debug("call ClearCredUsersCache panic", zap.Any("msg", err))
}
}()
if len(p.proxyClient) == 0 {
log.Warn("proxy client is empty, ClearCredUsersCache will not send to any client")
return nil
}
for _, f := range p.proxyClient {
sta, err := f.ClearCredUsersCache(ctx, request)
if err != nil {
return fmt.Errorf("grpc fail, error=%w", err)
}
if sta.ErrorCode != commonpb.ErrorCode_Success {
return fmt.Errorf("message = %s", sta.Reason)
}
group := &errgroup.Group{}
for k, v := range p.proxyClient {
k, v := k, v
group.Go(func() error {
sta, err := v.ClearCredUsersCache(ctx, request)
if err != nil {
return fmt.Errorf("ClearCredUsersCache failed, proxyID = %d, err = %s", k, err)
}
if sta.ErrorCode != commonpb.ErrorCode_Success {
return fmt.Errorf("ClearCredUsersCache failed, proxyID = %d, err = %s", k, sta.Reason)
}
return nil
})
}
return nil
return group.Wait()
}
......@@ -21,10 +21,12 @@ import (
"errors"
"testing"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/internal/proto/proxypb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/etcd"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/stretchr/testify/assert"
)
func TestProxyClientManager_GetProxyClients(t *testing.T) {
......@@ -93,11 +95,17 @@ func TestProxyClientManager_InvalidateCollectionMetaCache(t *testing.T) {
pcm := newProxyClientManager(core)
pcm.InvalidateCollectionMetaCache(ctx, nil)
ch := make(chan struct{})
pcm.helper = proxyClientManagerHelper{
afterConnect: func() { ch <- struct{}{} },
}
err = pcm.InvalidateCollectionMetaCache(ctx, nil)
assert.NoError(t, err)
core.SetNewProxyClient(
func(se *sessionutil.Session) (types.Proxy, error) {
return nil, nil
return &proxyMock{}, nil
},
)
......@@ -107,8 +115,25 @@ func TestProxyClientManager_InvalidateCollectionMetaCache(t *testing.T) {
}
pcm.AddProxyClient(session)
<-ch
pcm.InvalidateCollectionMetaCache(ctx, nil)
err = pcm.InvalidateCollectionMetaCache(ctx, &proxypb.InvalidateCollMetaCacheRequest{
CollectionName: "collection0",
})
assert.NoError(t, err)
// test releaseDQLMessageStream failed
for _, v := range pcm.proxyClient {
v.(*proxyMock).returnError = true
}
err = pcm.InvalidateCollectionMetaCache(ctx, nil)
assert.Error(t, err)
for _, v := range pcm.proxyClient {
v.(*proxyMock).returnGrpcError = true
}
err = pcm.InvalidateCollectionMetaCache(ctx, nil)
assert.Error(t, err)
}
func TestProxyClientManager_ReleaseDQLMessageStream(t *testing.T) {
......@@ -129,11 +154,12 @@ func TestProxyClientManager_ReleaseDQLMessageStream(t *testing.T) {
afterConnect: func() { ch <- struct{}{} },
}
pcm.ReleaseDQLMessageStream(ctx, nil)
err = pcm.ReleaseDQLMessageStream(ctx, nil)
assert.NoError(t, err)
core.SetNewProxyClient(
func(se *sessionutil.Session) (types.Proxy, error) {
return nil, nil
return &proxyMock{}, nil
},
)
......@@ -145,7 +171,21 @@ func TestProxyClientManager_ReleaseDQLMessageStream(t *testing.T) {
pcm.AddProxyClient(session)
<-ch
assert.Panics(t, func() { pcm.ReleaseDQLMessageStream(ctx, nil) })
err = pcm.ReleaseDQLMessageStream(ctx, nil)
assert.NoError(t, err)
// test releaseDQLMessageStream failed
for _, v := range pcm.proxyClient {
v.(*proxyMock).returnError = true
}
err = pcm.ReleaseDQLMessageStream(ctx, nil)
assert.Error(t, err)
for _, v := range pcm.proxyClient {
v.(*proxyMock).returnGrpcError = true
}
err = pcm.ReleaseDQLMessageStream(ctx, nil)
assert.Error(t, err)
}
func TestProxyClientManager_InvalidateCredentialCache(t *testing.T) {
......@@ -161,11 +201,17 @@ func TestProxyClientManager_InvalidateCredentialCache(t *testing.T) {
pcm := newProxyClientManager(core)
pcm.InvalidateCredentialCache(ctx, nil)
ch := make(chan struct{})
pcm.helper = proxyClientManagerHelper{
afterConnect: func() { ch <- struct{}{} },
}
err = pcm.InvalidateCredentialCache(ctx, nil)
assert.NoError(t, err)
core.SetNewProxyClient(
func(se *sessionutil.Session) (types.Proxy, error) {
return nil, nil
return &proxyMock{}, nil
},
)
......@@ -175,6 +221,21 @@ func TestProxyClientManager_InvalidateCredentialCache(t *testing.T) {
}
pcm.AddProxyClient(session)
<-ch
err = pcm.InvalidateCredentialCache(ctx, nil)
assert.NoError(t, err)
pcm.InvalidateCredentialCache(ctx, nil)
// test releaseDQLMessageStream failed
for _, v := range pcm.proxyClient {
v.(*proxyMock).returnError = true
}
err = pcm.InvalidateCredentialCache(ctx, nil)
assert.Error(t, err)
for _, v := range pcm.proxyClient {
v.(*proxyMock).returnGrpcError = true
}
err = pcm.InvalidateCredentialCache(ctx, nil)
assert.Error(t, err)
}
......@@ -69,6 +69,8 @@ import (
// UniqueID is an alias of typeutil.UniqueID.
type UniqueID = typeutil.UniqueID
const InvalidCollectionID = UniqueID(0)
// ------------------ struct -----------------------
// DdOperation used to save ddMsg into etcd
......@@ -965,7 +967,20 @@ func (c *Core) RemoveIndex(ctx context.Context, collName string, indexName strin
}
// ExpireMetaCache will call invalidate collection meta cache
func (c *Core) ExpireMetaCache(ctx context.Context, collNames []string, ts typeutil.Timestamp) {
func (c *Core) ExpireMetaCache(ctx context.Context, collNames []string, collectionID UniqueID, ts typeutil.Timestamp) error {
// if collectionID is specified, invalidate all the collection meta cache with the specified collectionID and return
if collectionID != InvalidCollectionID {
req := proxypb.InvalidateCollMetaCacheRequest{
Base: &commonpb.MsgBase{
Timestamp: ts,
SourceID: c.session.ServerID,
},
CollectionID: collectionID,
}
return c.proxyClientManager.InvalidateCollectionMetaCache(ctx, &req)
}
// if only collNames are specified, invalidate the collection meta cache with the specified collectionName
for _, collName := range collNames {
req := proxypb.InvalidateCollMetaCacheRequest{
Base: &commonpb.MsgBase{
......@@ -976,8 +991,13 @@ func (c *Core) ExpireMetaCache(ctx context.Context, collNames []string, ts typeu
},
CollectionName: collName,
}
c.proxyClientManager.InvalidateCollectionMetaCache(ctx, &req)
err := c.proxyClientManager.InvalidateCollectionMetaCache(ctx, &req)
if err != nil {
// TODO: try to expire all or directly return err?
return err
}
}
return nil
}
// Register register rootcoord at etcd
......@@ -1187,6 +1207,7 @@ func (c *Core) reSendDdMsg(ctx context.Context, force bool) error {
invalidateCache := false
var ts typeutil.Timestamp
var collName string
var collectionID UniqueID
switch ddOp.Type {
// TODO remove create collection resend
......@@ -1216,6 +1237,7 @@ func (c *Core) reSendDdMsg(ctx context.Context, force bool) error {
return err
}
invalidateCache = true
collectionID = ddReq.CollectionID
} else {
log.Debug("collection has been removed, skip re-send DropCollection",
zap.String("collection name", collName))
......@@ -1236,6 +1258,7 @@ func (c *Core) reSendDdMsg(ctx context.Context, force bool) error {
return err
}
invalidateCache = true
collectionID = ddReq.CollectionID
} else {
log.Debug("partition has been created, skip re-send CreatePartition",
zap.String("collection name", collName), zap.String("partition name", ddReq.PartitionName))
......@@ -1256,6 +1279,7 @@ func (c *Core) reSendDdMsg(ctx context.Context, force bool) error {
return err
}
invalidateCache = true
collectionID = ddReq.CollectionID
} else {
log.Debug("partition has been removed, skip re-send DropPartition",
zap.String("collection name", collName), zap.String("partition name", ddReq.PartitionName))
......@@ -1265,7 +1289,9 @@ func (c *Core) reSendDdMsg(ctx context.Context, force bool) error {
}
if invalidateCache {
c.ExpireMetaCache(ctx, []string{collName}, ts)
if err = c.ExpireMetaCache(ctx, nil, collectionID, ts); err != nil {
return err
}
}
// Update DDOperation in etcd
......@@ -2041,7 +2067,23 @@ func (c *Core) ReleaseDQLMessageStream(ctx context.Context, in *proxypb.ReleaseD
if code, ok := c.checkHealthy(); !ok {
return failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+internalpb.StateCode_name[int32(code)]), nil
}
return c.proxyClientManager.ReleaseDQLMessageStream(ctx, in)
err := c.proxyClientManager.ReleaseDQLMessageStream(ctx, in)
if err != nil {
return failStatus(commonpb.ErrorCode_UnexpectedError, err.Error()), nil
}
return succStatus(), nil
}
// InvalidateCollectionMetaCache notifies RootCoord to release the collection cache in Proxies.
func (c *Core) InvalidateCollectionMetaCache(ctx context.Context, in *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
if code, ok := c.checkHealthy(); !ok {
return failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+internalpb.StateCode_name[int32(code)]), nil
}
err := c.proxyClientManager.InvalidateCollectionMetaCache(ctx, in)
if err != nil {
return failStatus(commonpb.ErrorCode_UnexpectedError, err.Error()), nil
}
return succStatus(), nil
}
// SegmentFlushCompleted check whether segment flush has completed
......
......@@ -72,7 +72,11 @@ type ctxKey struct{}
type proxyMock struct {
types.Proxy
collArray []string
collIDs []UniqueID
mutex sync.Mutex
returnError bool
returnGrpcError bool
}
func (p *proxyMock) Stop() error {
......@@ -82,11 +86,21 @@ func (p *proxyMock) Stop() error {
func (p *proxyMock) InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
p.mutex.Lock()
defer p.mutex.Unlock()
if p.returnError {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
}, nil
}
if p.returnGrpcError {
return nil, fmt.Errorf("grpc error")
}
p.collArray = append(p.collArray, request.CollectionName)
p.collIDs = append(p.collIDs, request.CollectionID)
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}, nil
}
func (p *proxyMock) GetCollArray() []string {
p.mutex.Lock()
defer p.mutex.Unlock()
......@@ -95,7 +109,37 @@ func (p *proxyMock) GetCollArray() []string {
return ret
}
func (p *proxyMock) GetCollIDs() []UniqueID {
p.mutex.Lock()
defer p.mutex.Unlock()
ret := p.collIDs
return ret
}
func (p *proxyMock) ReleaseDQLMessageStream(ctx context.Context, request *proxypb.ReleaseDQLMessageStreamRequest) (*commonpb.Status, error) {
if p.returnError {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
}, nil
}
if p.returnGrpcError {
return nil, fmt.Errorf("grpc error")
}
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
}, nil
}
func (p *proxyMock) InvalidateCredentialCache(ctx context.Context, request *proxypb.InvalidateCredCacheRequest) (*commonpb.Status, error) {
if p.returnError {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
}, nil
}
if p.returnGrpcError {
return nil, fmt.Errorf("grpc error")
}
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
......@@ -1060,8 +1104,8 @@ func TestRootCoord_Base(t *testing.T) {
assert.Equal(t, collMeta.ID, partMsg.CollectionID)
assert.Equal(t, collMeta.PartitionIDs[1], partMsg.PartitionID)
assert.Equal(t, 1, len(pnm.GetCollArray()))
assert.Equal(t, collName, pnm.GetCollArray()[0])
assert.Equal(t, 1, len(pnm.GetCollIDs()))
assert.Equal(t, collMeta.ID, pnm.GetCollIDs()[0])
// check DD operation info
flag, err := core.MetaTable.txn.Load(DDMsgSendPrefix)
......@@ -1647,8 +1691,8 @@ func TestRootCoord_Base(t *testing.T) {
assert.Equal(t, collMeta.ID, dmsg.CollectionID)
assert.Equal(t, dropPartID, dmsg.PartitionID)
assert.Equal(t, 2, len(pnm.GetCollArray()))
assert.Equal(t, collName, pnm.GetCollArray()[1])
assert.Equal(t, 2, len(pnm.GetCollIDs()))
assert.Equal(t, collMeta.ID, pnm.GetCollIDs()[1])
// check DD operation info
flag, err := core.MetaTable.txn.Load(DDMsgSendPrefix)
......@@ -1716,9 +1760,9 @@ func TestRootCoord_Base(t *testing.T) {
dmsg, ok := (msgs[0]).(*msgstream.DropCollectionMsg)
assert.True(t, ok)
assert.Equal(t, collMeta.ID, dmsg.CollectionID)
collArray := pnm.GetCollArray()
assert.Equal(t, 3, len(collArray))
assert.Equal(t, collName, collArray[2])
collIDs := pnm.GetCollIDs()
assert.Equal(t, 3, len(collIDs))
assert.Equal(t, collMeta.ID, collIDs[2])
time.Sleep(100 * time.Millisecond)
qm.mutex.Lock()
......@@ -1740,9 +1784,9 @@ func TestRootCoord_Base(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, status.ErrorCode)
time.Sleep(100 * time.Millisecond)
collArray = pnm.GetCollArray()
assert.Equal(t, 3, len(collArray))
assert.Equal(t, collName, collArray[2])
collIDs = pnm.GetCollIDs()
assert.Equal(t, 3, len(collIDs))
assert.Equal(t, collMeta.ID, collIDs[2])
// check DD operation info
flag, err := core.MetaTable.txn.Load(DDMsgSendPrefix)
......
......@@ -324,9 +324,6 @@ func (t *DropCollectionReqTask) Execute(ctx context.Context) error {
return fmt.Errorf("encodeDdOperation fail, error = %w", err)
}
// get all aliases before meta table updated
aliases := t.core.MetaTable.ListAliases(collMeta.ID)
// use lambda function here to guarantee all resources to be released
dropCollectionFn := func() error {
// lock for ddl operation
......@@ -375,8 +372,11 @@ func (t *DropCollectionReqTask) Execute(ctx context.Context) error {
return err
}
t.core.ExpireMetaCache(ctx, []string{t.Req.CollectionName}, ts)
t.core.ExpireMetaCache(ctx, aliases, ts)
// invalidate all the collection meta cache with the specified collectionID
err = t.core.ExpireMetaCache(ctx, nil, collMeta.ID, ts)
if err != nil {
return err
}
// Update DDOperation in etcd
return t.core.MetaTable.txn.Save(DDMsgSendPrefix, strconv.FormatBool(true))
......@@ -571,7 +571,11 @@ func (t *CreatePartitionReqTask) Execute(ctx context.Context) error {
return err
}
t.core.ExpireMetaCache(ctx, []string{t.Req.CollectionName}, ts)
// invalidate all the collection meta cache with the specified collectionID
err = t.core.ExpireMetaCache(ctx, nil, collMeta.ID, ts)
if err != nil {
return err
}
// Update DDOperation in etcd
return t.core.MetaTable.txn.Save(DDMsgSendPrefix, strconv.FormatBool(true))
......@@ -658,7 +662,11 @@ func (t *DropPartitionReqTask) Execute(ctx context.Context) error {
return err
}
t.core.ExpireMetaCache(ctx, []string{t.Req.CollectionName}, ts)
// invalidate all the collection meta cache with the specified collectionID
err = t.core.ExpireMetaCache(ctx, nil, collInfo.ID, ts)
if err != nil {
return err
}
//notify query service to release partition
// TODO::xige-16, reOpen when queryCoord support release partitions after load collection
......@@ -1111,9 +1119,7 @@ func (t *DropAliasReqTask) Execute(ctx context.Context) error {
return fmt.Errorf("meta table drop alias failed, error = %w", err)
}
t.core.ExpireMetaCache(ctx, []string{t.Req.Alias}, ts)
return nil
return t.core.ExpireMetaCache(ctx, []string{t.Req.Alias}, InvalidCollectionID, ts)
}
// AlterAliasReqTask alter alias request task
......@@ -1142,7 +1148,5 @@ func (t *AlterAliasReqTask) Execute(ctx context.Context) error {
return fmt.Errorf("meta table alter alias failed, error = %w", err)
}
t.core.ExpireMetaCache(ctx, []string{t.Req.Alias}, ts)
return nil
return t.core.ExpireMetaCache(ctx, []string{t.Req.Alias}, InvalidCollectionID, ts)
}
......@@ -585,6 +585,21 @@ type RootCoord interface {
// RootCoord just forwards this request to Proxy client
ReleaseDQLMessageStream(ctx context.Context, in *proxypb.ReleaseDQLMessageStreamRequest) (*commonpb.Status, error)
// InvalidateCollectionMetaCache notifies RootCoord to clear the meta cache of specific collection in Proxies.
// If `CollectionID` is specified in request, all the collection meta cache with the specified collectionID will be
// invalidated, if only the `CollectionName` is specified in request, only the collection meta cache with the
// specified collectionName will be invalidated.
//
// ctx is the request to control request deadline and cancellation.
// request contains the request params, which are database id(not used) and collection id.
//
// The `ErrorCode` of `Status` is `Success` if drop index successfully;
// otherwise, the `ErrorCode` of `Status` will be `Error`, and the `Reason` of `Status` will record the fail cause.
// error is always nil
//
// RootCoord just forwards this request to Proxy client
InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error)
// SegmentFlushCompleted notifies RootCoord that specified segment has been flushed
//
// ctx is the context to control request deadline and cancellation
......@@ -693,7 +708,10 @@ type RootCoordComponent interface {
type Proxy interface {
Component
// InvalidateCollectionMetaCache notifies Proxy to clear all the meta cache of specific collection.
// InvalidateCollectionMetaCache notifies Proxy to clear the meta cache of specific collection.
// If `CollectionID` is specified in request, all the collection meta cache with the specified collectionID will be
// invalidated, if only the `CollectionName` is specified in request, only the collection meta cache with the
// specified collectionName will be invalidated.
//
// InvalidateCollectionMetaCache should be called when there are any meta changes in specific collection.
// Such as `DropCollection`, `CreatePartition`, `DropPartition`, etc.
......
......@@ -131,6 +131,10 @@ func (m *RootCoordClient) ReleaseDQLMessageStream(ctx context.Context, in *proxy
return &commonpb.Status{}, m.Err
}
func (m *RootCoordClient) InvalidateCollectionMetaCache(ctx context.Context, in *proxypb.InvalidateCollMetaCacheRequest, opts ...grpc.CallOption) (*commonpb.Status, error) {
return &commonpb.Status{}, m.Err
}
func (m *RootCoordClient) SegmentFlushCompleted(ctx context.Context, in *datapb.SegmentFlushCompletedMsg, opts ...grpc.CallOption) (*commonpb.Status, error) {
return &commonpb.Status{}, m.Err
}
......
......@@ -828,7 +828,7 @@ class TestPartitionOperations(TestcaseBase):
params={"nprobe": 32}, limit=1,
check_task=ct.CheckTasks.err_res,
check_items={ct.err_code: 0,
ct.err_msg: "not loaded into memory"})
ct.err_msg: "not been loaded"})
# release partition
partition_w.release()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册