未验证 提交 387ad98b 编写于 作者: W wayblink 提交者: GitHub

Check and reset if grpc client serverID mismatch with session (#26473)

Signed-off-by: Nwayblink <anyang.wang@zilliz.com>
上级 2c32d0db
......@@ -78,6 +78,7 @@ func NewClient(ctx context.Context, metaRoot string, etcdCli *clientv3.Client) (
client.grpcClient.SetRole(typeutil.DataCoordRole)
client.grpcClient.SetGetAddrFunc(client.getDataCoordAddr)
client.grpcClient.SetNewGrpcClientFunc(client.newGrpcClient)
client.grpcClient.SetSession(sess)
return client, nil
}
......
......@@ -74,6 +74,7 @@ func NewClient(ctx context.Context, metaRoot string, etcdCli *clientv3.Client) (
client.grpcClient.SetRole(typeutil.IndexCoordRole)
client.grpcClient.SetGetAddrFunc(client.getIndexCoordAddr)
client.grpcClient.SetNewGrpcClientFunc(client.newGrpcClient)
client.grpcClient.SetSession(sess)
return client, nil
}
......
......@@ -73,6 +73,7 @@ func NewClient(ctx context.Context, metaRoot string, etcdCli *clientv3.Client) (
client.grpcClient.SetRole(typeutil.QueryCoordRole)
client.grpcClient.SetGetAddrFunc(client.getQueryCoordAddr)
client.grpcClient.SetNewGrpcClientFunc(client.newGrpcClient)
client.grpcClient.SetSession(sess)
return client, nil
}
......
......@@ -81,6 +81,7 @@ func NewClient(ctx context.Context, metaRoot string, etcdCli *clientv3.Client) (
client.grpcClient.SetRole(typeutil.RootCoordRole)
client.grpcClient.SetGetAddrFunc(client.getRootCoordAddr)
client.grpcClient.SetNewGrpcClientFunc(client.newGrpcClient)
client.grpcClient.SetSession(sess)
return client, nil
}
......
......@@ -42,6 +42,7 @@ import (
"github.com/milvus-io/milvus/internal/util/generic"
"github.com/milvus-io/milvus/internal/util/interceptor"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/trace"
)
......@@ -60,6 +61,7 @@ type GrpcClient[T interface {
Close() error
SetNodeID(int64)
GetNodeID() int64
SetSession(sess *sessionutil.Session)
}
// ClientBase is a base of grpc client
......@@ -87,6 +89,7 @@ type ClientBase[T interface {
MaxBackoff float32
BackoffMultiplier float32
NodeID int64
sess *sessionutil.Session
sf singleflight.Group
}
......@@ -291,11 +294,6 @@ func (c *ClientBase[T]) callOnce(ctx context.Context, caller func(client T) (any
return ret, nil
}
if !funcutil.CheckCtxValid(ctx) {
// start bg check in case of https://github.com/milvus-io/milvus/issues/22435
go c.bgHealthCheck(client)
return generic.Zero[T](), err
}
if IsCrossClusterRoutingErr(err) {
log.Warn("CrossClusterRoutingErr, start to reset connection", zap.Error(err))
c.resetConnection(client)
......@@ -306,6 +304,28 @@ func (c *ClientBase[T]) callOnce(ctx context.Context, caller func(client T) (any
c.resetConnection(client)
return ret, err
}
if !funcutil.CheckCtxValid(ctx) {
// check if server ID matches coord session, if not, reset connection
if c.sess != nil {
sessions, _, getSessionErr := c.sess.GetSessions(c.GetRole())
if getSessionErr != nil {
// Only log but not handle this error as it is an auxiliary logic
log.Warn("Fail to GetSessions", zap.Error(getSessionErr))
}
if coordSess, exist := sessions[c.GetRole()]; exist {
if c.GetNodeID() != coordSess.ServerID {
log.Warn("Server ID mismatch, may connected to a old server, start to reset connection", zap.Error(err))
c.resetConnection(client)
return ret, err
}
}
}
// start bg check in case of https://github.com/milvus-io/milvus/issues/22435
go c.bgHealthCheck(client)
return generic.Zero[T](), err
}
if !funcutil.IsGrpcErr(err) {
log.Warn("ClientBase:isNotGrpcErr", zap.Error(err))
return generic.Zero[T](), err
......@@ -398,6 +418,11 @@ func (c *ClientBase[T]) GetNodeID() int64 {
return c.NodeID
}
// SetSession set session role of client
func (c *ClientBase[T]) SetSession(sess *sessionutil.Session) {
c.sess = sess
}
func IsCrossClusterRoutingErr(err error) bool {
// GRPC utilizes `status.Status` to encapsulate errors,
// hence it is not viable to employ the `errors.Is` for assessment.
......
......@@ -30,6 +30,7 @@ import (
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/generic"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/trace"
)
......@@ -43,6 +44,7 @@ type GRPCClientBase[T any] struct {
GetGrpcClientErr error
role string
nodeID int64
sess *sessionutil.Session
}
func (c *GRPCClientBase[T]) SetGetAddrFunc(f func() (string, error)) {
......@@ -169,3 +171,7 @@ func (c *GRPCClientBase[T]) SetNodeID(nodeID int64) {
func SuccessStatus() *commonpb.Status {
return &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}
}
func (c *GRPCClientBase[T]) SetSession(sess *sessionutil.Session) {
c.sess = sess
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册