未验证 提交 279d7702 编写于 作者: G godchen 提交者: GitHub

Change proxy grpc retry (#5431)

Change proxy grpc retry.

Signed-off-by: godchen qingxiang.chen@zilliz.com
上级 7dbe7370
......@@ -177,7 +177,7 @@ func (s *Server) init() error {
s.masterService.SetNewProxyClient(
func(s *sessionutil.Session) (types.ProxyNode, error) {
cli := pnc.NewClient(ctx, s.Address)
cli := pnc.NewClient(ctx, s.Address, 10)
if err := cli.Init(); err != nil {
return nil, err
}
......
......@@ -29,14 +29,22 @@ import (
type Client struct {
grpcClient proxypb.ProxyNodeServiceClient
address string
conn *grpc.ClientConn
ctx context.Context
address string
timeout time.Duration
reconnTry int
recallTry int
}
func NewClient(ctx context.Context, address string) *Client {
func NewClient(ctx context.Context, address string, timeout time.Duration) *Client {
return &Client{
address: address,
ctx: ctx,
address: address,
ctx: ctx,
timeout: timeout,
recallTry: 3,
reconnTry: 10,
}
}
......@@ -52,16 +60,58 @@ func (c *Client) Init() error {
if err != nil {
return err
}
c.grpcClient = proxypb.NewProxyNodeServiceClient(conn)
c.conn = conn
return nil
}
err := retry.Retry(100000, time.Millisecond*200, connectGrpcFunc)
err := retry.Retry(c.reconnTry, time.Millisecond*200, connectGrpcFunc)
if err != nil {
return err
}
c.grpcClient = proxypb.NewProxyNodeServiceClient(c.conn)
return nil
}
func (c *Client) reconnect() error {
tracer := opentracing.GlobalTracer()
connectGrpcFunc := func() error {
log.Debug("ProxyNode connect ", zap.String("address", c.address))
conn, err := grpc.DialContext(c.ctx, c.address, grpc.WithInsecure(), grpc.WithBlock(),
grpc.WithUnaryInterceptor(
otgrpc.OpenTracingClientInterceptor(tracer)),
grpc.WithStreamInterceptor(
otgrpc.OpenTracingStreamClientInterceptor(tracer)))
if err != nil {
return err
}
c.conn = conn
return nil
}
err := retry.Retry(c.reconnTry, 500*time.Millisecond, connectGrpcFunc)
if err != nil {
return err
}
c.grpcClient = proxypb.NewProxyNodeServiceClient(c.conn)
return nil
}
func (c *Client) recall(caller func() (interface{}, error)) (interface{}, error) {
ret, err := caller()
if err == nil {
return ret, nil
}
for i := 0; i < c.recallTry; i++ {
err = c.reconnect()
if err == nil {
ret, err = caller()
if err == nil {
return ret, nil
}
}
}
return ret, err
}
func (c *Client) Start() error {
return nil
}
......@@ -76,13 +126,22 @@ func (c *Client) Register() error {
}
func (c *Client) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
return c.grpcClient.GetComponentStates(ctx, &internalpb.GetComponentStatesRequest{})
ret, err := c.recall(func() (interface{}, error) {
return c.grpcClient.GetComponentStates(ctx, &internalpb.GetComponentStatesRequest{})
})
return ret.(*internalpb.ComponentStates), err
}
func (c *Client) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
return c.grpcClient.GetStatisticsChannel(ctx, &internalpb.GetStatisticsChannelRequest{})
ret, err := c.recall(func() (interface{}, error) {
return c.grpcClient.GetStatisticsChannel(ctx, &internalpb.GetStatisticsChannelRequest{})
})
return ret.(*milvuspb.StringResponse), err
}
func (c *Client) InvalidateCollectionMetaCache(ctx context.Context, req *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
return c.grpcClient.InvalidateCollectionMetaCache(ctx, req)
ret, err := c.recall(func() (interface{}, error) {
return c.grpcClient.InvalidateCollectionMetaCache(ctx, req)
})
return ret.(*commonpb.Status), err
}
......@@ -86,7 +86,7 @@ func (table *globalNodeInfoTable) createClients() error {
for nodeID, info := range table.infos {
_, ok := table.ProxyNodes[nodeID]
if !ok {
table.ProxyNodes[nodeID] = grpcproxynodeclient.NewClient(context.Background(), info.ip+":"+strconv.Itoa(int(info.port)))
table.ProxyNodes[nodeID] = grpcproxynodeclient.NewClient(context.Background(), info.ip+":"+strconv.Itoa(int(info.port)), 10)
var err error
err = table.ProxyNodes[nodeID].Init()
if err != nil {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册