From fd0227b69aba4ca282c1dfd4abca5fb10e31b72e Mon Sep 17 00:00:00 2001 From: neza2017 Date: Tue, 26 Jan 2021 15:23:42 +0800 Subject: [PATCH] Add logic of get time tick Signed-off-by: neza2017 --- cmd/masterservice/main.go | 29 +++++++++++++++---- internal/distributed/proxyservice/client.go | 16 ++++++++++ internal/masterservice/master_service.go | 7 ++--- internal/masterservice/master_service_test.go | 7 ++--- 4 files changed, 43 insertions(+), 16 deletions(-) diff --git a/cmd/masterservice/main.go b/cmd/masterservice/main.go index 00dc38aac..70b12f7e6 100644 --- a/cmd/masterservice/main.go +++ b/cmd/masterservice/main.go @@ -31,15 +31,31 @@ func main() { } log.Printf("master service address : %s:%d", ms.Params.Address, ms.Params.Port) + cnt := 0 + psc.Params.Init() log.Printf("proxy service address : %s", psc.Params.NetworkAddress()) - //proxyService := psc.NewClient(ctx, psc.Params.NetworkAddress()) + proxyService := psc.NewClient(psc.Params.NetworkAddress()) - //TODO, test proxy service GetComponentStates, before set + for cnt = 0; cnt < reTryCnt; cnt++ { + pxStates, err := proxyService.GetComponentStates() + if err != nil { + log.Printf("get state from proxy service, retry count = %d, error = %s", cnt, err.Error()) + continue + } + if pxStates.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { + log.Printf("get state from proxy service, retry count = %d, error = %s", cnt, pxStates.Status.Reason) + continue + } + if pxStates.State.StateCode != internalpb2.StateCode_INITIALIZING && pxStates.State.StateCode != internalpb2.StateCode_HEALTHY { + continue + } + break + } - //if err = svr.SetProxyService(proxyService); err != nil { - // panic(err) - //} + if err = svr.SetProxyService(proxyService); err != nil { + panic(err) + } ds.Params.Init() log.Printf("data service address : %s:%d", ds.Params.Address, ds.Params.Port) @@ -50,7 +66,6 @@ func main() { if err = dataService.Start(); err != nil { panic(err) } - cnt := 0 for cnt = 0; cnt < reTryCnt; cnt++ { dsStates, err := dataService.GetComponentStates() if err != nil { @@ -93,5 +108,7 @@ func main() { syscall.SIGQUIT) sig := <-sc log.Printf("Got %s signal to exit", sig.String()) + _ = indexService.Stop() + _ = dataService.Stop() _ = svr.Stop() } diff --git a/internal/distributed/proxyservice/client.go b/internal/distributed/proxyservice/client.go index 94c29241a..d53b9c83f 100644 --- a/internal/distributed/proxyservice/client.go +++ b/internal/distributed/proxyservice/client.go @@ -3,6 +3,10 @@ package grpcproxyservice import ( "context" + "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" + + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" + "google.golang.org/grpc" "github.com/zilliztech/milvus-distributed/internal/proto/proxypb" @@ -44,6 +48,18 @@ func (c *Client) InvalidateCollectionMetaCache(request *proxypb.InvalidateCollMe return err } +func (c *Client) GetTimeTickChannel() (string, error) { + response, err := c.proxyServiceClient.GetTimeTickChannel(c.ctx, &commonpb.Empty{}) + if err != nil { + return "", err + } + return response.Value, nil +} + +func (c *Client) GetComponentStates() (*internalpb2.ComponentStates, error) { + return c.proxyServiceClient.GetComponentStates(c.ctx, &commonpb.Empty{}) +} + func NewClient(address string) *Client { return &Client{ address: address, diff --git a/internal/masterservice/master_service.go b/internal/masterservice/master_service.go index cc4338d4a..11841cbb8 100644 --- a/internal/masterservice/master_service.go +++ b/internal/masterservice/master_service.go @@ -37,7 +37,7 @@ import ( type ProxyServiceInterface interface { GetTimeTickChannel() (string, error) - InvalidateCollectionMetaCache(request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) + InvalidateCollectionMetaCache(request *proxypb.InvalidateCollMetaCacheRequest) error } type DataServiceInterface interface { @@ -582,7 +582,7 @@ func (c *Core) SetProxyService(s ProxyServiceInterface) error { Params.ProxyTimeTickChannel = rsp c.InvalidateCollectionMetaCache = func(ts typeutil.Timestamp, dbName string, collectionName string) error { - status, err := s.InvalidateCollectionMetaCache(&proxypb.InvalidateCollMetaCacheRequest{ + err := s.InvalidateCollectionMetaCache(&proxypb.InvalidateCollMetaCacheRequest{ Base: &commonpb.MsgBase{ MsgType: 0, //TODO,MsgType MsgID: 0, @@ -595,9 +595,6 @@ func (c *Core) SetProxyService(s ProxyServiceInterface) error { if err != nil { return err } - if status.ErrorCode != commonpb.ErrorCode_SUCCESS { - return errors.Errorf("InvalidateCollectionMetaCache failed, error = %s", status.Reason) - } return nil } return nil diff --git a/internal/masterservice/master_service_test.go b/internal/masterservice/master_service_test.go index d366b4d42..f83091c68 100644 --- a/internal/masterservice/master_service_test.go +++ b/internal/masterservice/master_service_test.go @@ -32,14 +32,11 @@ type proxyMock struct { func (p *proxyMock) GetTimeTickChannel() (string, error) { return fmt.Sprintf("proxy-time-tick-%d", p.randVal), nil } -func (p *proxyMock) InvalidateCollectionMetaCache(request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) { +func (p *proxyMock) InvalidateCollectionMetaCache(request *proxypb.InvalidateCollMetaCacheRequest) error { p.mutex.Lock() defer p.mutex.Unlock() p.collArray = append(p.collArray, request.CollectionName) - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_SUCCESS, - Reason: "", - }, nil + return nil } func (p *proxyMock) GetCollArray() []string { p.mutex.Lock() -- GitLab