From 518b6310a2529a00069e2173df1ec780f3167ac9 Mon Sep 17 00:00:00 2001 From: wei liu Date: Thu, 3 Aug 2023 15:55:09 +0800 Subject: [PATCH] refine retry times on replica (#26043) Signed-off-by: Wei Liu --- internal/proxy/lb_policy.go | 3 +- internal/proxy/lb_policy_test.go | 2 + internal/proxy/look_aside_balancer.go | 44 ++++++++++++++----- internal/proxy/look_aside_balancer_test.go | 4 +- .../querycoordv2/observers/leader_observer.go | 15 ++----- .../observers/leader_observer_test.go | 4 +- pkg/util/paramtable/component_param.go | 19 +++++++- pkg/util/paramtable/component_param_test.go | 1 + 8 files changed, 65 insertions(+), 27 deletions(-) diff --git a/internal/proxy/lb_policy.go b/internal/proxy/lb_policy.go index 1d5498778..19082b9ce 100644 --- a/internal/proxy/lb_policy.go +++ b/internal/proxy/lb_policy.go @@ -199,6 +199,7 @@ func (lb *LBPolicyImpl) Execute(ctx context.Context, workload CollectionWorkLoad for channel, nodes := range dml2leaders { channel := channel nodes := lo.Map(nodes, func(node nodeInfo, _ int) int64 { return node.nodeID }) + retryOnReplica := Params.ProxyCfg.RetryTimesOnReplica.GetAsInt() wg.Go(func() error { err := lb.ExecuteWithRetry(ctx, ChannelWorkload{ db: workload.db, @@ -208,7 +209,7 @@ func (lb *LBPolicyImpl) Execute(ctx context.Context, workload CollectionWorkLoad shardLeaders: nodes, nq: workload.nq, exec: workload.exec, - retryTimes: uint(len(nodes)), + retryTimes: uint(len(nodes) * retryOnReplica), }) return err }) diff --git a/internal/proxy/lb_policy_test.go b/internal/proxy/lb_policy_test.go index 43dbfc0e8..6b9fbb2b1 100644 --- a/internal/proxy/lb_policy_test.go +++ b/internal/proxy/lb_policy_test.go @@ -366,6 +366,7 @@ func (s *LBPolicySuite) TestExecute() { collectionID: s.collectionID, nq: 1, exec: func(ctx context.Context, ui UniqueID, qn types.QueryNode, s ...string) error { + // succeed in first execute if counter.Add(1) == 1 { return nil } @@ -374,6 +375,7 @@ func (s *LBPolicySuite) TestExecute() { }, }) s.Error(err) + s.Equal(int64(11), counter.Load()) // test get shard leader failed s.qc.ExpectedCalls = nil diff --git a/internal/proxy/look_aside_balancer.go b/internal/proxy/look_aside_balancer.go index 48bb2630d..32d5f17a6 100644 --- a/internal/proxy/look_aside_balancer.go +++ b/internal/proxy/look_aside_balancer.go @@ -50,6 +50,9 @@ type LookAsideBalancer struct { unreachableQueryNodes *typeutil.ConcurrentSet[int64] + // query node id -> number of consecutive heartbeat failures + failedHeartBeatCounter *typeutil.ConcurrentMap[int64, *atomic.Int64] + closeCh chan struct{} closeOnce sync.Once wg sync.WaitGroup @@ -57,12 +60,13 @@ type LookAsideBalancer struct { func NewLookAsideBalancer(clientMgr shardClientMgr) *LookAsideBalancer { balancer := &LookAsideBalancer{ - clientMgr: clientMgr, - metricsMap: typeutil.NewConcurrentMap[int64, *internalpb.CostAggregation](), - metricsUpdateTs: typeutil.NewConcurrentMap[int64, int64](), - executingTaskTotalNQ: typeutil.NewConcurrentMap[int64, *atomic.Int64](), - unreachableQueryNodes: typeutil.NewConcurrentSet[int64](), - closeCh: make(chan struct{}), + clientMgr: clientMgr, + metricsMap: typeutil.NewConcurrentMap[int64, *internalpb.CostAggregation](), + metricsUpdateTs: typeutil.NewConcurrentMap[int64, int64](), + executingTaskTotalNQ: typeutil.NewConcurrentMap[int64, *atomic.Int64](), + unreachableQueryNodes: typeutil.NewConcurrentSet[int64](), + failedHeartBeatCounter: typeutil.NewConcurrentMap[int64, *atomic.Int64](), + closeCh: make(chan struct{}), } return balancer @@ -198,13 +202,28 @@ func (b *LookAsideBalancer) checkQueryNodeHealthLoop(ctx context.Context) { ctx, cancel := context.WithTimeout(context.Background(), checkInterval) defer cancel() - setUnreachable := func() bool { + setUnreachable := func(err error) bool { + failures, ok := b.failedHeartBeatCounter.Get(node) + if !ok { + failures = atomic.NewInt64(0) + } + failures.Inc() + b.failedHeartBeatCounter.Insert(node, failures) + + if failures.Load() < Params.ProxyCfg.RetryTimesOnHealthCheck.GetAsInt64() { + log.Warn("get component status failed", + zap.Int64("node", node), + zap.Int64("times", failures.Load()), + zap.Error(err)) + return false + } + return b.unreachableQueryNodes.Insert(node) } qn, err := b.clientMgr.GetClient(ctx, node) if err != nil { - if setUnreachable() { + if setUnreachable(err) { log.Warn("get client failed, set node unreachable", zap.Int64("node", node), zap.Error(err)) } return struct{}{}, nil @@ -212,14 +231,14 @@ func (b *LookAsideBalancer) checkQueryNodeHealthLoop(ctx context.Context) { resp, err := qn.GetComponentStates(ctx) if err != nil { - if setUnreachable() { + if setUnreachable(err) { log.Warn("get component status failed,set node unreachable", zap.Int64("node", node), zap.Error(err)) } return struct{}{}, nil } if resp.GetState().GetStateCode() != commonpb.StateCode_Healthy { - if setUnreachable() { + if setUnreachable(merr.ErrServiceUnavailable) { log.Warn("component status unhealthy,set node unreachable", zap.Int64("node", node), zap.Error(err)) } return struct{}{}, nil @@ -228,6 +247,11 @@ func (b *LookAsideBalancer) checkQueryNodeHealthLoop(ctx context.Context) { // check health successfully, update check health ts b.metricsUpdateTs.Insert(node, time.Now().Local().UnixMilli()) if b.unreachableQueryNodes.TryRemove(node) { + // once heartbeat succeed, clear filed counter + failures, ok := b.failedHeartBeatCounter.Get(node) + if ok { + failures.Store(0) + } log.Info("component recuperated, set node reachable", zap.Int64("node", node), zap.Error(err)) } diff --git a/internal/proxy/look_aside_balancer_test.go b/internal/proxy/look_aside_balancer_test.go index 54aedd36c..7d982878a 100644 --- a/internal/proxy/look_aside_balancer_test.go +++ b/internal/proxy/look_aside_balancer_test.go @@ -302,14 +302,14 @@ func (suite *LookAsideBalancerSuite) TestCheckHealthLoop() { suite.balancer.unreachableQueryNodes.Insert(2) suite.Eventually(func() bool { return suite.balancer.unreachableQueryNodes.Contain(1) - }, 3*time.Second, 100*time.Millisecond) + }, 5*time.Second, 100*time.Millisecond) targetNode, err := suite.balancer.SelectNode(context.Background(), []int64{1}, 1) suite.ErrorIs(err, merr.ErrServiceUnavailable) suite.Equal(int64(-1), targetNode) suite.Eventually(func() bool { return !suite.balancer.unreachableQueryNodes.Contain(2) - }, 3*time.Second, 100*time.Millisecond) + }, 5*time.Second, 100*time.Millisecond) } func (suite *LookAsideBalancerSuite) TestNodeRecover() { diff --git a/internal/querycoordv2/observers/leader_observer.go b/internal/querycoordv2/observers/leader_observer.go index d53790742..4a55a5115 100644 --- a/internal/querycoordv2/observers/leader_observer.go +++ b/internal/querycoordv2/observers/leader_observer.go @@ -111,7 +111,7 @@ func (o *LeaderObserver) observeCollection(ctx context.Context, collection int64 actions := o.findNeedLoadedSegments(leaderView, dists) actions = append(actions, o.findNeedRemovedSegments(leaderView, dists)...) - updateVersionAction := o.checkNeedUpdateTargetVersion(leaderView) + updateVersionAction := o.checkNeedUpdateTargetVersion(ctx, leaderView) if updateVersionAction != nil { actions = append(actions, updateVersionAction) } @@ -133,14 +133,15 @@ func (ob *LeaderObserver) CheckTargetVersion(collectionID int64) bool { return <-notifier } -func (o *LeaderObserver) checkNeedUpdateTargetVersion(leaderView *meta.LeaderView) *querypb.SyncAction { +func (o *LeaderObserver) checkNeedUpdateTargetVersion(ctx context.Context, leaderView *meta.LeaderView) *querypb.SyncAction { + log.Ctx(ctx).WithRateGroup("qcv2.LeaderObserver", 1, 60) targetVersion := o.target.GetCollectionTargetVersion(leaderView.CollectionID, meta.CurrentTarget) if targetVersion <= leaderView.TargetVersion { return nil } - log.Info("Update readable segment version", + log.RatedInfo(10, "Update readable segment version", zap.Int64("collectionID", leaderView.CollectionID), zap.String("channelName", leaderView.Channel), zap.Int64("nodeID", leaderView.ID), @@ -152,14 +153,6 @@ func (o *LeaderObserver) checkNeedUpdateTargetVersion(leaderView *meta.LeaderVie growingSegments := o.target.GetStreamingSegmentsByChannel(leaderView.CollectionID, leaderView.Channel, meta.CurrentTarget) droppedSegments := o.target.GetDroppedSegmentsByChannel(leaderView.CollectionID, leaderView.Channel, meta.CurrentTarget) - log.Info("Update readable segment version", - zap.Int64("collectionID", leaderView.CollectionID), - zap.String("channelName", leaderView.Channel), - zap.Int64("nodeID", leaderView.ID), - zap.Int64("oldVersion", leaderView.TargetVersion), - zap.Int64("newVersion", targetVersion), - ) - return &querypb.SyncAction{ Type: querypb.SyncType_UpdateVersion, GrowingInTarget: growingSegments.Collect(), diff --git a/internal/querycoordv2/observers/leader_observer_test.go b/internal/querycoordv2/observers/leader_observer_test.go index 3ecdebace..6d205a709 100644 --- a/internal/querycoordv2/observers/leader_observer_test.go +++ b/internal/querycoordv2/observers/leader_observer_test.go @@ -558,11 +558,11 @@ func (suite *LeaderObserverTestSuite) TestSyncTargetVersion() { view := utils.CreateTestLeaderView(1, collectionID, "channel-1", nil, nil) view.TargetVersion = TargetVersion - action := observer.checkNeedUpdateTargetVersion(view) + action := observer.checkNeedUpdateTargetVersion(context.Background(), view) suite.Nil(action) view.TargetVersion = TargetVersion - 1 - action = observer.checkNeedUpdateTargetVersion(view) + action = observer.checkNeedUpdateTargetVersion(context.Background(), view) suite.NotNil(action) suite.Equal(querypb.SyncType_UpdateVersion, action.Type) suite.Len(action.GrowingInTarget, 2) diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index bfb1548cd..d85e95980 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -968,6 +968,8 @@ type proxyConfig struct { ReplicaSelectionPolicy ParamItem `refreshable:"false"` CheckQueryNodeHealthInterval ParamItem `refreshable:"false"` CostMetricsExpireTime ParamItem `refreshable:"true"` + RetryTimesOnReplica ParamItem `refreshable:"true"` + RetryTimesOnHealthCheck ParamItem `refreshable:"true"` } func (p *proxyConfig) init(base *BaseTable) { @@ -984,7 +986,7 @@ func (p *proxyConfig) init(base *BaseTable) { p.HealthCheckTimetout = ParamItem{ Key: "proxy.healthCheckTimetout", Version: "2.3.0", - DefaultValue: "500", + DefaultValue: "1000", PanicIfEmpty: true, Doc: "ms, the interval that to do component healthy check", Export: true, @@ -1212,6 +1214,21 @@ please adjust in embedded Milvus: false`, } p.CostMetricsExpireTime.Init(base.mgr) + p.RetryTimesOnReplica = ParamItem{ + Key: "proxy.retryTimesOnReplica", + Version: "2.3.0", + DefaultValue: "2", + Doc: "retry times on each replica", + } + p.RetryTimesOnReplica.Init(base.mgr) + + p.RetryTimesOnHealthCheck = ParamItem{ + Key: "proxy.retryTimesOnHealthCheck", + Version: "2.3.0", + DefaultValue: "3", + Doc: "set query node unavailable on proxy when heartbeat failures reach this limit", + } + p.RetryTimesOnHealthCheck.Init(base.mgr) } // ///////////////////////////////////////////////////////////////////////////// diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 87240a558..1d05f9255 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -185,6 +185,7 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, Params.ReplicaSelectionPolicy.GetValue(), "look_aside") assert.Equal(t, Params.CheckQueryNodeHealthInterval.GetAsInt(), 1000) assert.Equal(t, Params.CostMetricsExpireTime.GetAsInt(), 1000) + assert.Equal(t, Params.RetryTimesOnReplica.GetAsInt(), 2) }) // t.Run("test proxyConfig panic", func(t *testing.T) { -- GitLab