diff --git a/internal/distributed/proxy/service.go b/internal/distributed/proxy/service.go index 3bb812713ad9263ef1d431472c5ad912cf7018c5..175f1400011e57f53a9ce1f606582076f91fe8e8 100644 --- a/internal/distributed/proxy/service.go +++ b/internal/distributed/proxy/service.go @@ -50,6 +50,7 @@ import ( clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" "google.golang.org/grpc" + "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/keepalive" ) @@ -157,7 +158,7 @@ func (s *Server) startGrpcLoop(grpcPort int) { grpc.StreamInterceptor(ot.StreamServerInterceptor(opts...))) proxypb.RegisterProxyServer(s.grpcServer, s) milvuspb.RegisterMilvusServiceServer(s.grpcServer, s) - + grpc_health_v1.RegisterHealthServer(s.grpcServer, s) log.Debug("create Proxy grpc server", zap.Any("enforcement policy", kaep), zap.Any("server parameters", kasp)) @@ -645,3 +646,41 @@ func (s *Server) Import(ctx context.Context, req *milvuspb.ImportRequest) (*milv func (s *Server) GetImportState(ctx context.Context, req *milvuspb.GetImportStateRequest) (*milvuspb.GetImportStateResponse, error) { return s.proxy.GetImportState(ctx, req) } + +// Check is required by gRPC healthy checking +func (s *Server) Check(ctx context.Context, req *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) { + ret := &grpc_health_v1.HealthCheckResponse{ + Status: grpc_health_v1.HealthCheckResponse_NOT_SERVING, + } + state, err := s.proxy.GetComponentStates(ctx) + if err != nil { + return ret, err + } + if state.Status.ErrorCode != commonpb.ErrorCode_Success { + return ret, nil + } + if state.State.StateCode != internalpb.StateCode_Healthy { + return ret, nil + } + ret.Status = grpc_health_v1.HealthCheckResponse_SERVING + return ret, nil +} + +// Watch is required by gRPC healthy checking +func (s *Server) Watch(req *grpc_health_v1.HealthCheckRequest, server grpc_health_v1.Health_WatchServer) error { + ret := &grpc_health_v1.HealthCheckResponse{ + Status: grpc_health_v1.HealthCheckResponse_NOT_SERVING, + } + state, err := s.proxy.GetComponentStates(s.ctx) + if err != nil { + return server.Send(ret) + } + if state.Status.ErrorCode != commonpb.ErrorCode_Success { + return server.Send(ret) + } + if state.State.StateCode != internalpb.StateCode_Healthy { + return server.Send(ret) + } + ret.Status = grpc_health_v1.HealthCheckResponse_SERVING + return server.Send(ret) +} diff --git a/internal/distributed/proxy/service_test.go b/internal/distributed/proxy/service_test.go index cb610045c8c19a1f02d5f2ed79bee0355b6427bf..a5665d44c53c5d35042eca6fb57c0db1e0e28f7e 100644 --- a/internal/distributed/proxy/service_test.go +++ b/internal/distributed/proxy/service_test.go @@ -18,6 +18,7 @@ package grpcproxy import ( "context" + "fmt" "testing" "github.com/milvus-io/milvus/internal/proto/commonpb" @@ -30,16 +31,42 @@ import ( "github.com/milvus-io/milvus/internal/proto/rootcoordpb" "github.com/milvus-io/milvus/internal/proxy" "github.com/milvus-io/milvus/internal/types" + milvusmock "github.com/milvus-io/milvus/internal/util/mock" "github.com/milvus-io/milvus/internal/util/uniquegenerator" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" clientv3 "go.etcd.io/etcd/client/v3" + "google.golang.org/grpc/health/grpc_health_v1" ) /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// type MockBase struct { + mock.Mock + isMockGetComponentStatesOn bool +} + +func (m *MockBase) On(methodName string, arguments ...interface{}) *mock.Call { + if methodName == "GetComponentStates" { + m.isMockGetComponentStatesOn = true + } + return m.Mock.On(methodName, arguments...) } func (m *MockBase) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) { + if m.isMockGetComponentStatesOn { + ret1 := &internalpb.ComponentStates{} + var ret2 error + args := m.Called(ctx) + arg1 := args.Get(0) + arg2 := args.Get(1) + if arg1 != nil { + ret1 = arg1.(*internalpb.ComponentStates) + } + if arg2 != nil { + ret2 = arg2.(error) + } + return ret1, ret2 + } return &internalpb.ComponentStates{ State: &internalpb.ComponentInfo{StateCode: internalpb.StateCode_Healthy}, Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, @@ -431,6 +458,7 @@ type MockProxy struct { startErr error stopErr error regErr error + isMockOn bool } func (m *MockProxy) Init() error { @@ -901,6 +929,128 @@ func Test_NewServer(t *testing.T) { assert.Nil(t, err) } +func TestServer_Check(t *testing.T) { + ctx := context.Background() + server, err := NewServer(ctx, nil) + assert.NotNil(t, server) + assert.Nil(t, err) + + mockProxy := &MockProxy{} + server.proxy = mockProxy + server.rootCoordClient = &MockRootCoord{} + server.indexCoordClient = &MockIndexCoord{} + server.queryCoordClient = &MockQueryCoord{} + server.dataCoordClient = &MockDataCoord{} + + req := &grpc_health_v1.HealthCheckRequest{Service: ""} + ret, err := server.Check(ctx, req) + assert.Nil(t, err) + assert.Equal(t, grpc_health_v1.HealthCheckResponse_SERVING, ret.Status) + + mockProxy.On("GetComponentStates", ctx).Return(nil, fmt.Errorf("mock grpc unexpected error")).Once() + + ret, err = server.Check(ctx, req) + assert.NotNil(t, err) + assert.Equal(t, grpc_health_v1.HealthCheckResponse_NOT_SERVING, ret.Status) + + componentInfo := &internalpb.ComponentInfo{ + NodeID: 0, + Role: "proxy", + StateCode: internalpb.StateCode_Abnormal, + } + status := &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError} + componentState := &internalpb.ComponentStates{ + State: componentInfo, + Status: status, + } + mockProxy.On("GetComponentStates", ctx).Return(componentState, nil) + + ret, err = server.Check(ctx, req) + assert.Nil(t, err) + assert.Equal(t, grpc_health_v1.HealthCheckResponse_NOT_SERVING, ret.Status) + + status.ErrorCode = commonpb.ErrorCode_Success + ret, err = server.Check(ctx, req) + assert.Nil(t, err) + assert.Equal(t, grpc_health_v1.HealthCheckResponse_NOT_SERVING, ret.Status) + + componentInfo.StateCode = internalpb.StateCode_Initializing + ret, err = server.Check(ctx, req) + assert.Nil(t, err) + assert.Equal(t, grpc_health_v1.HealthCheckResponse_NOT_SERVING, ret.Status) + + componentInfo.StateCode = internalpb.StateCode_Healthy + ret, err = server.Check(ctx, req) + assert.Nil(t, err) + assert.Equal(t, grpc_health_v1.HealthCheckResponse_SERVING, ret.Status) +} + +func TestServer_Watch(t *testing.T) { + ctx := context.Background() + server, err := NewServer(ctx, nil) + assert.NotNil(t, server) + assert.Nil(t, err) + + mockProxy := &MockProxy{} + server.proxy = mockProxy + server.rootCoordClient = &MockRootCoord{} + server.indexCoordClient = &MockIndexCoord{} + server.queryCoordClient = &MockQueryCoord{} + server.dataCoordClient = &MockDataCoord{} + + watchServer := milvusmock.NewGrpcHealthWatchServer() + resultChan := watchServer.Chan() + req := &grpc_health_v1.HealthCheckRequest{Service: ""} + //var ret *grpc_health_v1.HealthCheckResponse + err = server.Watch(req, watchServer) + ret := <-resultChan + + assert.Nil(t, err) + assert.Equal(t, grpc_health_v1.HealthCheckResponse_SERVING, ret.Status) + + mockProxy.On("GetComponentStates", ctx).Return(nil, fmt.Errorf("mock grpc unexpected error")).Once() + + err = server.Watch(req, watchServer) + ret = <-resultChan + assert.Nil(t, err) + assert.Equal(t, grpc_health_v1.HealthCheckResponse_NOT_SERVING, ret.Status) + + componentInfo := &internalpb.ComponentInfo{ + NodeID: 0, + Role: "proxy", + StateCode: internalpb.StateCode_Abnormal, + } + status := &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError} + componentState := &internalpb.ComponentStates{ + State: componentInfo, + Status: status, + } + mockProxy.On("GetComponentStates", ctx).Return(componentState, nil) + + err = server.Watch(req, watchServer) + ret = <-resultChan + assert.Nil(t, err) + assert.Equal(t, grpc_health_v1.HealthCheckResponse_NOT_SERVING, ret.Status) + + status.ErrorCode = commonpb.ErrorCode_Success + err = server.Watch(req, watchServer) + ret = <-resultChan + assert.Nil(t, err) + assert.Equal(t, grpc_health_v1.HealthCheckResponse_NOT_SERVING, ret.Status) + + componentInfo.StateCode = internalpb.StateCode_Initializing + err = server.Watch(req, watchServer) + ret = <-resultChan + assert.Nil(t, err) + assert.Equal(t, grpc_health_v1.HealthCheckResponse_NOT_SERVING, ret.Status) + + componentInfo.StateCode = internalpb.StateCode_Healthy + err = server.Watch(req, watchServer) + ret = <-resultChan + assert.Nil(t, err) + assert.Equal(t, grpc_health_v1.HealthCheckResponse_SERVING, ret.Status) +} + func Test_NewServer_HTTPServerDisabled(t *testing.T) { ctx := context.Background() server, err := NewServer(ctx, nil) diff --git a/internal/util/mock/health_watch_server.go b/internal/util/mock/health_watch_server.go new file mode 100644 index 0000000000000000000000000000000000000000..83a19753e31526890c48d277285c10b9dda3e759 --- /dev/null +++ b/internal/util/mock/health_watch_server.go @@ -0,0 +1,66 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mock + +import ( + "context" + + "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/grpc/metadata" +) + +type GrpcHealthWatchServer struct { + chanResult chan *grpc_health_v1.HealthCheckResponse +} + +func NewGrpcHealthWatchServer() *GrpcHealthWatchServer { + return &GrpcHealthWatchServer{ + chanResult: make(chan *grpc_health_v1.HealthCheckResponse, 1), + } +} + +func (m GrpcHealthWatchServer) Send(response *grpc_health_v1.HealthCheckResponse) error { + m.chanResult <- response + return nil +} + +func (m GrpcHealthWatchServer) Chan() <-chan *grpc_health_v1.HealthCheckResponse { + return m.chanResult +} + +func (m GrpcHealthWatchServer) SetHeader(md metadata.MD) error { + return nil +} + +func (m GrpcHealthWatchServer) SendHeader(md metadata.MD) error { + return nil +} + +func (m GrpcHealthWatchServer) SetTrailer(md metadata.MD) { +} + +func (m GrpcHealthWatchServer) Context() context.Context { + return nil +} + +func (m GrpcHealthWatchServer) SendMsg(msg interface{}) error { + return nil +} + +func (m GrpcHealthWatchServer) RecvMsg(msg interface{}) error { + return nil +}