未验证 提交 6e3a0d2a 编写于 作者: G godchen 提交者: GitHub

Add queryservice grpc retry (#5437)

Add queryservice grpc retry.
Signed-off-by: Ngodchen <qingxiang.chen@zilliz.com>
上级 c6aee9b4
...@@ -961,7 +961,7 @@ func TestRun(t *testing.T) { ...@@ -961,7 +961,7 @@ func TestRun(t *testing.T) {
svr.newIndexServiceClient = func(s, etcdAddress, metaRootPath string, timeout time.Duration) types.IndexService { svr.newIndexServiceClient = func(s, etcdAddress, metaRootPath string, timeout time.Duration) types.IndexService {
return &mockIndex{} return &mockIndex{}
} }
svr.newQueryServiceClient = func(s string) (types.QueryService, error) { svr.newQueryServiceClient = func(s, metaRootPath, etcdAddress string) (types.QueryService, error) {
return &mockQuery{}, nil return &mockQuery{}, nil
} }
......
...@@ -63,7 +63,7 @@ type Server struct { ...@@ -63,7 +63,7 @@ type Server struct {
newProxyServiceClient func(string) types.ProxyService newProxyServiceClient func(string) types.ProxyService
newIndexServiceClient func(string, string, string, time.Duration) types.IndexService newIndexServiceClient func(string, string, string, time.Duration) types.IndexService
newDataServiceClient func(string, string, string, time.Duration) types.DataService newDataServiceClient func(string, string, string, time.Duration) types.DataService
newQueryServiceClient func(string) (types.QueryService, error) newQueryServiceClient func(string, string, string) (types.QueryService, error)
closer io.Closer closer io.Closer
} }
...@@ -113,7 +113,7 @@ func (s *Server) setClient() { ...@@ -113,7 +113,7 @@ func (s *Server) setClient() {
} }
return dsClient return dsClient
} }
s.newIndexServiceClient = func(s, etcdAddress, metaRootPath string, timeout time.Duration) types.IndexService { s.newIndexServiceClient = func(s, metaRootPath, etcdAddress string, timeout time.Duration) types.IndexService {
isClient := isc.NewClient(s, metaRootPath, []string{etcdAddress}, timeout) isClient := isc.NewClient(s, metaRootPath, []string{etcdAddress}, timeout)
if err := isClient.Init(); err != nil { if err := isClient.Init(); err != nil {
panic(err) panic(err)
...@@ -123,8 +123,8 @@ func (s *Server) setClient() { ...@@ -123,8 +123,8 @@ func (s *Server) setClient() {
} }
return isClient return isClient
} }
s.newQueryServiceClient = func(s string) (types.QueryService, error) { s.newQueryServiceClient = func(s, metaRootPath, etcdAddress string) (types.QueryService, error) {
qsClient, err := qsc.NewClient(s, 5*time.Second) qsClient, err := qsc.NewClient(context.Background(), s, metaRootPath, []string{etcdAddress}, 5*time.Second)
if err != nil { if err != nil {
panic(err) panic(err)
} }
...@@ -214,7 +214,7 @@ func (s *Server) init() error { ...@@ -214,7 +214,7 @@ func (s *Server) init() error {
} }
if s.newQueryServiceClient != nil { if s.newQueryServiceClient != nil {
log.Debug("query service", zap.String("address", Params.QueryServiceAddress)) log.Debug("query service", zap.String("address", Params.QueryServiceAddress))
queryService, _ := s.newQueryServiceClient(Params.QueryServiceAddress) queryService, _ := s.newQueryServiceClient(Params.QueryServiceAddress, cms.Params.MetaRootPath, cms.Params.EtcdAddress)
if err := s.masterService.SetQueryService(queryService); err != nil { if err := s.masterService.SetQueryService(queryService); err != nil {
panic(err) panic(err)
} }
......
...@@ -227,7 +227,7 @@ func (s *Server) init() error { ...@@ -227,7 +227,7 @@ func (s *Server) init() error {
queryServiceAddr := Params.QueryServiceAddress queryServiceAddr := Params.QueryServiceAddress
log.Debug("proxynode", zap.String("query server address", queryServiceAddr)) log.Debug("proxynode", zap.String("query server address", queryServiceAddr))
s.queryServiceClient, err = grpcqueryserviceclient.NewClient(queryServiceAddr, timeout) s.queryServiceClient, err = grpcqueryserviceclient.NewClient(ctx, queryServiceAddr, proxynode.Params.MetaRootPath, []string{proxynode.Params.EtcdAddress}, timeout)
if err != nil { if err != nil {
return err return err
} }
......
...@@ -106,7 +106,7 @@ func (s *Server) init() error { ...@@ -106,7 +106,7 @@ func (s *Server) init() error {
// --- QueryService --- // --- QueryService ---
log.Debug("QueryService", zap.String("address", Params.QueryServiceAddress)) log.Debug("QueryService", zap.String("address", Params.QueryServiceAddress))
log.Debug("Init Query service client ...") log.Debug("Init Query service client ...")
queryService, err := qsc.NewClient(Params.QueryServiceAddress, 20*time.Second) queryService, err := qsc.NewClient(ctx, Params.QueryServiceAddress, qn.Params.MetaRootPath, []string{qn.Params.EtcdAddress}, 20*time.Second)
if err != nil { if err != nil {
panic(err) panic(err)
} }
......
...@@ -13,8 +13,12 @@ package grpcqueryserviceclient ...@@ -13,8 +13,12 @@ package grpcqueryserviceclient
import ( import (
"context" "context"
"fmt"
"time" "time"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
otgrpc "github.com/opentracing-contrib/go-grpc" otgrpc "github.com/opentracing-contrib/go-grpc"
"github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go"
"go.uber.org/zap" "go.uber.org/zap"
...@@ -28,22 +32,43 @@ import ( ...@@ -28,22 +32,43 @@ import (
) )
type Client struct { type Client struct {
ctx context.Context
grpcClient querypb.QueryServiceClient grpcClient querypb.QueryServiceClient
conn *grpc.ClientConn conn *grpc.ClientConn
addr string addr string
timeout time.Duration timeout time.Duration
retry int sess *sessionutil.Session
reconnTry int
recallTry int
} }
func NewClient(address string, timeout time.Duration) (*Client, error) { func getQueryServiceAddress(sess *sessionutil.Session) (string, error) {
key := typeutil.QueryServiceRole
msess, _, err := sess.GetSessions(key)
if err != nil {
return "", err
}
ms, ok := msess[key]
if !ok {
return "", fmt.Errorf("number of master service is incorrect, %d", len(msess))
}
return ms.Address, nil
}
// NewClient creates a client for queryservice grpc call.
func NewClient(ctx context.Context, address, metaRootPath string, etcdAddr []string, timeout time.Duration) (*Client, error) {
sess := sessionutil.NewSession(context.Background(), metaRootPath, etcdAddr)
return &Client{ return &Client{
ctx: ctx,
grpcClient: nil, grpcClient: nil,
conn: nil, conn: nil,
addr: address, addr: address,
timeout: timeout, timeout: timeout,
retry: 300, reconnTry: 10,
recallTry: 3,
sess: sess,
}, nil }, nil
} }
...@@ -51,25 +76,84 @@ func (c *Client) Init() error { ...@@ -51,25 +76,84 @@ func (c *Client) Init() error {
tracer := opentracing.GlobalTracer() tracer := opentracing.GlobalTracer()
ctx, cancel := context.WithTimeout(context.Background(), c.timeout) ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
defer cancel() defer cancel()
var err error if c.addr != "" {
for i := 0; i < c.retry; i++ { connectGrpcFunc := func() error {
if c.conn, err = grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(), log.Debug("queryservice connect ", zap.String("address", c.addr))
conn, err := grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(),
grpc.WithUnaryInterceptor( grpc.WithUnaryInterceptor(
otgrpc.OpenTracingClientInterceptor(tracer)), otgrpc.OpenTracingClientInterceptor(tracer)),
grpc.WithStreamInterceptor( grpc.WithStreamInterceptor(
otgrpc.OpenTracingStreamClientInterceptor(tracer))); err == nil { otgrpc.OpenTracingStreamClientInterceptor(tracer)))
break if err != nil {
return err
} }
c.conn = conn
return nil
} }
err := retry.Retry(100000, time.Millisecond*200, connectGrpcFunc)
if err != nil { if err != nil {
return err return err
} }
} else {
return c.reconnect()
}
c.grpcClient = querypb.NewQueryServiceClient(c.conn) c.grpcClient = querypb.NewQueryServiceClient(c.conn)
log.Debug("connected to queryService", zap.String("queryService", c.addr)) log.Debug("connected to queryService", zap.String("queryService", c.addr))
return nil return nil
} }
func (c *Client) reconnect() error {
tracer := opentracing.GlobalTracer()
var err error
getQueryServiceAddressFn := func() error {
c.addr, err = getQueryServiceAddress(c.sess)
if err != nil {
return err
}
return nil
}
err = retry.Retry(c.reconnTry, 3*time.Second, getQueryServiceAddressFn)
if err != nil {
return err
}
connectGrpcFunc := func() error {
log.Debug("QueryService connect ", zap.String("address", c.addr))
conn, err := grpc.DialContext(c.ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(),
grpc.WithUnaryInterceptor(
otgrpc.OpenTracingClientInterceptor(tracer)),
grpc.WithStreamInterceptor(
otgrpc.OpenTracingStreamClientInterceptor(tracer)))
if err != nil {
return err
}
c.conn = conn
return nil
}
err = retry.Retry(c.reconnTry, 500*time.Millisecond, connectGrpcFunc)
if err != nil {
return err
}
c.grpcClient = querypb.NewQueryServiceClient(c.conn)
return nil
}
func (c *Client) recall(caller func() (interface{}, error)) (interface{}, error) {
ret, err := caller()
if err == nil {
return ret, nil
}
for i := 0; i < c.recallTry; i++ {
err = c.reconnect()
if err == nil {
ret, err = caller()
if err == nil {
return ret, nil
}
}
}
return ret, err
}
func (c *Client) Start() error { func (c *Client) Start() error {
return nil return nil
...@@ -85,53 +169,92 @@ func (c *Client) Register() error { ...@@ -85,53 +169,92 @@ func (c *Client) Register() error {
} }
func (c *Client) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) { func (c *Client) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
ret, err := c.recall(func() (interface{}, error) {
return c.grpcClient.GetComponentStates(ctx, &internalpb.GetComponentStatesRequest{}) return c.grpcClient.GetComponentStates(ctx, &internalpb.GetComponentStatesRequest{})
})
return ret.(*internalpb.ComponentStates), err
} }
func (c *Client) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) { func (c *Client) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
ret, err := c.recall(func() (interface{}, error) {
return c.grpcClient.GetTimeTickChannel(ctx, &internalpb.GetTimeTickChannelRequest{}) return c.grpcClient.GetTimeTickChannel(ctx, &internalpb.GetTimeTickChannelRequest{})
})
return ret.(*milvuspb.StringResponse), err
} }
func (c *Client) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) { func (c *Client) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
ret, err := c.recall(func() (interface{}, error) {
return c.grpcClient.GetStatisticsChannel(ctx, &internalpb.GetStatisticsChannelRequest{}) return c.grpcClient.GetStatisticsChannel(ctx, &internalpb.GetStatisticsChannelRequest{})
})
return ret.(*milvuspb.StringResponse), err
} }
func (c *Client) RegisterNode(ctx context.Context, req *querypb.RegisterNodeRequest) (*querypb.RegisterNodeResponse, error) { func (c *Client) RegisterNode(ctx context.Context, req *querypb.RegisterNodeRequest) (*querypb.RegisterNodeResponse, error) {
ret, err := c.recall(func() (interface{}, error) {
return c.grpcClient.RegisterNode(ctx, req) return c.grpcClient.RegisterNode(ctx, req)
})
return ret.(*querypb.RegisterNodeResponse), err
} }
func (c *Client) ShowCollections(ctx context.Context, req *querypb.ShowCollectionsRequest) (*querypb.ShowCollectionsResponse, error) { func (c *Client) ShowCollections(ctx context.Context, req *querypb.ShowCollectionsRequest) (*querypb.ShowCollectionsResponse, error) {
ret, err := c.recall(func() (interface{}, error) {
return c.grpcClient.ShowCollections(ctx, req) return c.grpcClient.ShowCollections(ctx, req)
})
return ret.(*querypb.ShowCollectionsResponse), err
} }
func (c *Client) LoadCollection(ctx context.Context, req *querypb.LoadCollectionRequest) (*commonpb.Status, error) { func (c *Client) LoadCollection(ctx context.Context, req *querypb.LoadCollectionRequest) (*commonpb.Status, error) {
ret, err := c.recall(func() (interface{}, error) {
return c.grpcClient.LoadCollection(ctx, req) return c.grpcClient.LoadCollection(ctx, req)
})
return ret.(*commonpb.Status), err
} }
func (c *Client) ReleaseCollection(ctx context.Context, req *querypb.ReleaseCollectionRequest) (*commonpb.Status, error) { func (c *Client) ReleaseCollection(ctx context.Context, req *querypb.ReleaseCollectionRequest) (*commonpb.Status, error) {
ret, err := c.recall(func() (interface{}, error) {
return c.grpcClient.ReleaseCollection(ctx, req) return c.grpcClient.ReleaseCollection(ctx, req)
})
return ret.(*commonpb.Status), err
} }
func (c *Client) ShowPartitions(ctx context.Context, req *querypb.ShowPartitionsRequest) (*querypb.ShowPartitionsResponse, error) { func (c *Client) ShowPartitions(ctx context.Context, req *querypb.ShowPartitionsRequest) (*querypb.ShowPartitionsResponse, error) {
ret, err := c.recall(func() (interface{}, error) {
return c.grpcClient.ShowPartitions(ctx, req) return c.grpcClient.ShowPartitions(ctx, req)
})
return ret.(*querypb.ShowPartitionsResponse), err
} }
func (c *Client) LoadPartitions(ctx context.Context, req *querypb.LoadPartitionsRequest) (*commonpb.Status, error) { func (c *Client) LoadPartitions(ctx context.Context, req *querypb.LoadPartitionsRequest) (*commonpb.Status, error) {
ret, err := c.recall(func() (interface{}, error) {
return c.grpcClient.LoadPartitions(ctx, req) return c.grpcClient.LoadPartitions(ctx, req)
})
return ret.(*commonpb.Status), err
} }
func (c *Client) ReleasePartitions(ctx context.Context, req *querypb.ReleasePartitionsRequest) (*commonpb.Status, error) { func (c *Client) ReleasePartitions(ctx context.Context, req *querypb.ReleasePartitionsRequest) (*commonpb.Status, error) {
ret, err := c.recall(func() (interface{}, error) {
return c.grpcClient.ReleasePartitions(ctx, req) return c.grpcClient.ReleasePartitions(ctx, req)
})
return ret.(*commonpb.Status), err
} }
func (c *Client) CreateQueryChannel(ctx context.Context) (*querypb.CreateQueryChannelResponse, error) { func (c *Client) CreateQueryChannel(ctx context.Context) (*querypb.CreateQueryChannelResponse, error) {
ret, err := c.recall(func() (interface{}, error) {
return c.grpcClient.CreateQueryChannel(ctx, &querypb.CreateQueryChannelRequest{}) return c.grpcClient.CreateQueryChannel(ctx, &querypb.CreateQueryChannelRequest{})
})
return ret.(*querypb.CreateQueryChannelResponse), err
} }
func (c *Client) GetPartitionStates(ctx context.Context, req *querypb.GetPartitionStatesRequest) (*querypb.GetPartitionStatesResponse, error) { func (c *Client) GetPartitionStates(ctx context.Context, req *querypb.GetPartitionStatesRequest) (*querypb.GetPartitionStatesResponse, error) {
ret, err := c.recall(func() (interface{}, error) {
return c.grpcClient.GetPartitionStates(ctx, req) return c.grpcClient.GetPartitionStates(ctx, req)
})
return ret.(*querypb.GetPartitionStatesResponse), err
} }
func (c *Client) GetSegmentInfo(ctx context.Context, req *querypb.GetSegmentInfoRequest) (*querypb.GetSegmentInfoResponse, error) { func (c *Client) GetSegmentInfo(ctx context.Context, req *querypb.GetSegmentInfoRequest) (*querypb.GetSegmentInfoResponse, error) {
ret, err := c.recall(func() (interface{}, error) {
return c.grpcClient.GetSegmentInfo(ctx, req) return c.grpcClient.GetSegmentInfo(ctx, req)
})
return ret.(*querypb.GetSegmentInfoResponse), err
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册