提交 fd0227b6 编写于 作者: N neza2017 提交者: yefu.chen

Add logic of get time tick

Signed-off-by: Nneza2017 <yefu.chen@zilliz.com>
上级 3b081620
...@@ -31,15 +31,31 @@ func main() { ...@@ -31,15 +31,31 @@ func main() {
} }
log.Printf("master service address : %s:%d", ms.Params.Address, ms.Params.Port) log.Printf("master service address : %s:%d", ms.Params.Address, ms.Params.Port)
cnt := 0
psc.Params.Init() psc.Params.Init()
log.Printf("proxy service address : %s", psc.Params.NetworkAddress()) log.Printf("proxy service address : %s", psc.Params.NetworkAddress())
//proxyService := psc.NewClient(ctx, psc.Params.NetworkAddress()) proxyService := psc.NewClient(psc.Params.NetworkAddress())
//TODO, test proxy service GetComponentStates, before set for cnt = 0; cnt < reTryCnt; cnt++ {
pxStates, err := proxyService.GetComponentStates()
if err != nil {
log.Printf("get state from proxy service, retry count = %d, error = %s", cnt, err.Error())
continue
}
if pxStates.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
log.Printf("get state from proxy service, retry count = %d, error = %s", cnt, pxStates.Status.Reason)
continue
}
if pxStates.State.StateCode != internalpb2.StateCode_INITIALIZING && pxStates.State.StateCode != internalpb2.StateCode_HEALTHY {
continue
}
break
}
//if err = svr.SetProxyService(proxyService); err != nil { if err = svr.SetProxyService(proxyService); err != nil {
// panic(err) panic(err)
//} }
ds.Params.Init() ds.Params.Init()
log.Printf("data service address : %s:%d", ds.Params.Address, ds.Params.Port) log.Printf("data service address : %s:%d", ds.Params.Address, ds.Params.Port)
...@@ -50,7 +66,6 @@ func main() { ...@@ -50,7 +66,6 @@ func main() {
if err = dataService.Start(); err != nil { if err = dataService.Start(); err != nil {
panic(err) panic(err)
} }
cnt := 0
for cnt = 0; cnt < reTryCnt; cnt++ { for cnt = 0; cnt < reTryCnt; cnt++ {
dsStates, err := dataService.GetComponentStates() dsStates, err := dataService.GetComponentStates()
if err != nil { if err != nil {
...@@ -93,5 +108,7 @@ func main() { ...@@ -93,5 +108,7 @@ func main() {
syscall.SIGQUIT) syscall.SIGQUIT)
sig := <-sc sig := <-sc
log.Printf("Got %s signal to exit", sig.String()) log.Printf("Got %s signal to exit", sig.String())
_ = indexService.Stop()
_ = dataService.Stop()
_ = svr.Stop() _ = svr.Stop()
} }
...@@ -3,6 +3,10 @@ package grpcproxyservice ...@@ -3,6 +3,10 @@ package grpcproxyservice
import ( import (
"context" "context"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"google.golang.org/grpc" "google.golang.org/grpc"
"github.com/zilliztech/milvus-distributed/internal/proto/proxypb" "github.com/zilliztech/milvus-distributed/internal/proto/proxypb"
...@@ -44,6 +48,18 @@ func (c *Client) InvalidateCollectionMetaCache(request *proxypb.InvalidateCollMe ...@@ -44,6 +48,18 @@ func (c *Client) InvalidateCollectionMetaCache(request *proxypb.InvalidateCollMe
return err return err
} }
func (c *Client) GetTimeTickChannel() (string, error) {
response, err := c.proxyServiceClient.GetTimeTickChannel(c.ctx, &commonpb.Empty{})
if err != nil {
return "", err
}
return response.Value, nil
}
func (c *Client) GetComponentStates() (*internalpb2.ComponentStates, error) {
return c.proxyServiceClient.GetComponentStates(c.ctx, &commonpb.Empty{})
}
func NewClient(address string) *Client { func NewClient(address string) *Client {
return &Client{ return &Client{
address: address, address: address,
......
...@@ -37,7 +37,7 @@ import ( ...@@ -37,7 +37,7 @@ import (
type ProxyServiceInterface interface { type ProxyServiceInterface interface {
GetTimeTickChannel() (string, error) GetTimeTickChannel() (string, error)
InvalidateCollectionMetaCache(request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) InvalidateCollectionMetaCache(request *proxypb.InvalidateCollMetaCacheRequest) error
} }
type DataServiceInterface interface { type DataServiceInterface interface {
...@@ -582,7 +582,7 @@ func (c *Core) SetProxyService(s ProxyServiceInterface) error { ...@@ -582,7 +582,7 @@ func (c *Core) SetProxyService(s ProxyServiceInterface) error {
Params.ProxyTimeTickChannel = rsp Params.ProxyTimeTickChannel = rsp
c.InvalidateCollectionMetaCache = func(ts typeutil.Timestamp, dbName string, collectionName string) error { c.InvalidateCollectionMetaCache = func(ts typeutil.Timestamp, dbName string, collectionName string) error {
status, err := s.InvalidateCollectionMetaCache(&proxypb.InvalidateCollMetaCacheRequest{ err := s.InvalidateCollectionMetaCache(&proxypb.InvalidateCollMetaCacheRequest{
Base: &commonpb.MsgBase{ Base: &commonpb.MsgBase{
MsgType: 0, //TODO,MsgType MsgType: 0, //TODO,MsgType
MsgID: 0, MsgID: 0,
...@@ -595,9 +595,6 @@ func (c *Core) SetProxyService(s ProxyServiceInterface) error { ...@@ -595,9 +595,6 @@ func (c *Core) SetProxyService(s ProxyServiceInterface) error {
if err != nil { if err != nil {
return err return err
} }
if status.ErrorCode != commonpb.ErrorCode_SUCCESS {
return errors.Errorf("InvalidateCollectionMetaCache failed, error = %s", status.Reason)
}
return nil return nil
} }
return nil return nil
......
...@@ -32,14 +32,11 @@ type proxyMock struct { ...@@ -32,14 +32,11 @@ type proxyMock struct {
func (p *proxyMock) GetTimeTickChannel() (string, error) { func (p *proxyMock) GetTimeTickChannel() (string, error) {
return fmt.Sprintf("proxy-time-tick-%d", p.randVal), nil return fmt.Sprintf("proxy-time-tick-%d", p.randVal), nil
} }
func (p *proxyMock) InvalidateCollectionMetaCache(request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) { func (p *proxyMock) InvalidateCollectionMetaCache(request *proxypb.InvalidateCollMetaCacheRequest) error {
p.mutex.Lock() p.mutex.Lock()
defer p.mutex.Unlock() defer p.mutex.Unlock()
p.collArray = append(p.collArray, request.CollectionName) p.collArray = append(p.collArray, request.CollectionName)
return &commonpb.Status{ return nil
ErrorCode: commonpb.ErrorCode_SUCCESS,
Reason: "",
}, nil
} }
func (p *proxyMock) GetCollArray() []string { func (p *proxyMock) GetCollArray() []string {
p.mutex.Lock() p.mutex.Lock()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册