未验证 提交 518b6310 编写于 作者: W wei liu 提交者: GitHub

refine retry times on replica (#26043)

Signed-off-by: NWei Liu <wei.liu@zilliz.com>
上级 6663e753
......@@ -199,6 +199,7 @@ func (lb *LBPolicyImpl) Execute(ctx context.Context, workload CollectionWorkLoad
for channel, nodes := range dml2leaders {
channel := channel
nodes := lo.Map(nodes, func(node nodeInfo, _ int) int64 { return node.nodeID })
retryOnReplica := Params.ProxyCfg.RetryTimesOnReplica.GetAsInt()
wg.Go(func() error {
err := lb.ExecuteWithRetry(ctx, ChannelWorkload{
db: workload.db,
......@@ -208,7 +209,7 @@ func (lb *LBPolicyImpl) Execute(ctx context.Context, workload CollectionWorkLoad
shardLeaders: nodes,
nq: workload.nq,
exec: workload.exec,
retryTimes: uint(len(nodes)),
retryTimes: uint(len(nodes) * retryOnReplica),
})
return err
})
......
......@@ -366,6 +366,7 @@ func (s *LBPolicySuite) TestExecute() {
collectionID: s.collectionID,
nq: 1,
exec: func(ctx context.Context, ui UniqueID, qn types.QueryNode, s ...string) error {
// succeed in first execute
if counter.Add(1) == 1 {
return nil
}
......@@ -374,6 +375,7 @@ func (s *LBPolicySuite) TestExecute() {
},
})
s.Error(err)
s.Equal(int64(11), counter.Load())
// test get shard leader failed
s.qc.ExpectedCalls = nil
......
......@@ -50,6 +50,9 @@ type LookAsideBalancer struct {
unreachableQueryNodes *typeutil.ConcurrentSet[int64]
// query node id -> number of consecutive heartbeat failures
failedHeartBeatCounter *typeutil.ConcurrentMap[int64, *atomic.Int64]
closeCh chan struct{}
closeOnce sync.Once
wg sync.WaitGroup
......@@ -57,12 +60,13 @@ type LookAsideBalancer struct {
func NewLookAsideBalancer(clientMgr shardClientMgr) *LookAsideBalancer {
balancer := &LookAsideBalancer{
clientMgr: clientMgr,
metricsMap: typeutil.NewConcurrentMap[int64, *internalpb.CostAggregation](),
metricsUpdateTs: typeutil.NewConcurrentMap[int64, int64](),
executingTaskTotalNQ: typeutil.NewConcurrentMap[int64, *atomic.Int64](),
unreachableQueryNodes: typeutil.NewConcurrentSet[int64](),
closeCh: make(chan struct{}),
clientMgr: clientMgr,
metricsMap: typeutil.NewConcurrentMap[int64, *internalpb.CostAggregation](),
metricsUpdateTs: typeutil.NewConcurrentMap[int64, int64](),
executingTaskTotalNQ: typeutil.NewConcurrentMap[int64, *atomic.Int64](),
unreachableQueryNodes: typeutil.NewConcurrentSet[int64](),
failedHeartBeatCounter: typeutil.NewConcurrentMap[int64, *atomic.Int64](),
closeCh: make(chan struct{}),
}
return balancer
......@@ -198,13 +202,28 @@ func (b *LookAsideBalancer) checkQueryNodeHealthLoop(ctx context.Context) {
ctx, cancel := context.WithTimeout(context.Background(), checkInterval)
defer cancel()
setUnreachable := func() bool {
setUnreachable := func(err error) bool {
failures, ok := b.failedHeartBeatCounter.Get(node)
if !ok {
failures = atomic.NewInt64(0)
}
failures.Inc()
b.failedHeartBeatCounter.Insert(node, failures)
if failures.Load() < Params.ProxyCfg.RetryTimesOnHealthCheck.GetAsInt64() {
log.Warn("get component status failed",
zap.Int64("node", node),
zap.Int64("times", failures.Load()),
zap.Error(err))
return false
}
return b.unreachableQueryNodes.Insert(node)
}
qn, err := b.clientMgr.GetClient(ctx, node)
if err != nil {
if setUnreachable() {
if setUnreachable(err) {
log.Warn("get client failed, set node unreachable", zap.Int64("node", node), zap.Error(err))
}
return struct{}{}, nil
......@@ -212,14 +231,14 @@ func (b *LookAsideBalancer) checkQueryNodeHealthLoop(ctx context.Context) {
resp, err := qn.GetComponentStates(ctx)
if err != nil {
if setUnreachable() {
if setUnreachable(err) {
log.Warn("get component status failed,set node unreachable", zap.Int64("node", node), zap.Error(err))
}
return struct{}{}, nil
}
if resp.GetState().GetStateCode() != commonpb.StateCode_Healthy {
if setUnreachable() {
if setUnreachable(merr.ErrServiceUnavailable) {
log.Warn("component status unhealthy,set node unreachable", zap.Int64("node", node), zap.Error(err))
}
return struct{}{}, nil
......@@ -228,6 +247,11 @@ func (b *LookAsideBalancer) checkQueryNodeHealthLoop(ctx context.Context) {
// check health successfully, update check health ts
b.metricsUpdateTs.Insert(node, time.Now().Local().UnixMilli())
if b.unreachableQueryNodes.TryRemove(node) {
// once heartbeat succeed, clear filed counter
failures, ok := b.failedHeartBeatCounter.Get(node)
if ok {
failures.Store(0)
}
log.Info("component recuperated, set node reachable", zap.Int64("node", node), zap.Error(err))
}
......
......@@ -302,14 +302,14 @@ func (suite *LookAsideBalancerSuite) TestCheckHealthLoop() {
suite.balancer.unreachableQueryNodes.Insert(2)
suite.Eventually(func() bool {
return suite.balancer.unreachableQueryNodes.Contain(1)
}, 3*time.Second, 100*time.Millisecond)
}, 5*time.Second, 100*time.Millisecond)
targetNode, err := suite.balancer.SelectNode(context.Background(), []int64{1}, 1)
suite.ErrorIs(err, merr.ErrServiceUnavailable)
suite.Equal(int64(-1), targetNode)
suite.Eventually(func() bool {
return !suite.balancer.unreachableQueryNodes.Contain(2)
}, 3*time.Second, 100*time.Millisecond)
}, 5*time.Second, 100*time.Millisecond)
}
func (suite *LookAsideBalancerSuite) TestNodeRecover() {
......
......@@ -111,7 +111,7 @@ func (o *LeaderObserver) observeCollection(ctx context.Context, collection int64
actions := o.findNeedLoadedSegments(leaderView, dists)
actions = append(actions, o.findNeedRemovedSegments(leaderView, dists)...)
updateVersionAction := o.checkNeedUpdateTargetVersion(leaderView)
updateVersionAction := o.checkNeedUpdateTargetVersion(ctx, leaderView)
if updateVersionAction != nil {
actions = append(actions, updateVersionAction)
}
......@@ -133,14 +133,15 @@ func (ob *LeaderObserver) CheckTargetVersion(collectionID int64) bool {
return <-notifier
}
func (o *LeaderObserver) checkNeedUpdateTargetVersion(leaderView *meta.LeaderView) *querypb.SyncAction {
func (o *LeaderObserver) checkNeedUpdateTargetVersion(ctx context.Context, leaderView *meta.LeaderView) *querypb.SyncAction {
log.Ctx(ctx).WithRateGroup("qcv2.LeaderObserver", 1, 60)
targetVersion := o.target.GetCollectionTargetVersion(leaderView.CollectionID, meta.CurrentTarget)
if targetVersion <= leaderView.TargetVersion {
return nil
}
log.Info("Update readable segment version",
log.RatedInfo(10, "Update readable segment version",
zap.Int64("collectionID", leaderView.CollectionID),
zap.String("channelName", leaderView.Channel),
zap.Int64("nodeID", leaderView.ID),
......@@ -152,14 +153,6 @@ func (o *LeaderObserver) checkNeedUpdateTargetVersion(leaderView *meta.LeaderVie
growingSegments := o.target.GetStreamingSegmentsByChannel(leaderView.CollectionID, leaderView.Channel, meta.CurrentTarget)
droppedSegments := o.target.GetDroppedSegmentsByChannel(leaderView.CollectionID, leaderView.Channel, meta.CurrentTarget)
log.Info("Update readable segment version",
zap.Int64("collectionID", leaderView.CollectionID),
zap.String("channelName", leaderView.Channel),
zap.Int64("nodeID", leaderView.ID),
zap.Int64("oldVersion", leaderView.TargetVersion),
zap.Int64("newVersion", targetVersion),
)
return &querypb.SyncAction{
Type: querypb.SyncType_UpdateVersion,
GrowingInTarget: growingSegments.Collect(),
......
......@@ -558,11 +558,11 @@ func (suite *LeaderObserverTestSuite) TestSyncTargetVersion() {
view := utils.CreateTestLeaderView(1, collectionID, "channel-1", nil, nil)
view.TargetVersion = TargetVersion
action := observer.checkNeedUpdateTargetVersion(view)
action := observer.checkNeedUpdateTargetVersion(context.Background(), view)
suite.Nil(action)
view.TargetVersion = TargetVersion - 1
action = observer.checkNeedUpdateTargetVersion(view)
action = observer.checkNeedUpdateTargetVersion(context.Background(), view)
suite.NotNil(action)
suite.Equal(querypb.SyncType_UpdateVersion, action.Type)
suite.Len(action.GrowingInTarget, 2)
......
......@@ -968,6 +968,8 @@ type proxyConfig struct {
ReplicaSelectionPolicy ParamItem `refreshable:"false"`
CheckQueryNodeHealthInterval ParamItem `refreshable:"false"`
CostMetricsExpireTime ParamItem `refreshable:"true"`
RetryTimesOnReplica ParamItem `refreshable:"true"`
RetryTimesOnHealthCheck ParamItem `refreshable:"true"`
}
func (p *proxyConfig) init(base *BaseTable) {
......@@ -984,7 +986,7 @@ func (p *proxyConfig) init(base *BaseTable) {
p.HealthCheckTimetout = ParamItem{
Key: "proxy.healthCheckTimetout",
Version: "2.3.0",
DefaultValue: "500",
DefaultValue: "1000",
PanicIfEmpty: true,
Doc: "ms, the interval that to do component healthy check",
Export: true,
......@@ -1212,6 +1214,21 @@ please adjust in embedded Milvus: false`,
}
p.CostMetricsExpireTime.Init(base.mgr)
p.RetryTimesOnReplica = ParamItem{
Key: "proxy.retryTimesOnReplica",
Version: "2.3.0",
DefaultValue: "2",
Doc: "retry times on each replica",
}
p.RetryTimesOnReplica.Init(base.mgr)
p.RetryTimesOnHealthCheck = ParamItem{
Key: "proxy.retryTimesOnHealthCheck",
Version: "2.3.0",
DefaultValue: "3",
Doc: "set query node unavailable on proxy when heartbeat failures reach this limit",
}
p.RetryTimesOnHealthCheck.Init(base.mgr)
}
// /////////////////////////////////////////////////////////////////////////////
......
......@@ -185,6 +185,7 @@ func TestComponentParam(t *testing.T) {
assert.Equal(t, Params.ReplicaSelectionPolicy.GetValue(), "look_aside")
assert.Equal(t, Params.CheckQueryNodeHealthInterval.GetAsInt(), 1000)
assert.Equal(t, Params.CostMetricsExpireTime.GetAsInt(), 1000)
assert.Equal(t, Params.RetryTimesOnReplica.GetAsInt(), 2)
})
// t.Run("test proxyConfig panic", func(t *testing.T) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册