From b7b249770cc7f0d4df4d76351e0844838ae31fec Mon Sep 17 00:00:00 2001 From: wei liu Date: Thu, 6 Jul 2023 19:04:25 +0800 Subject: [PATCH] enable metrics expire for replica selection (#25360) Signed-off-by: Wei Liu --- internal/proxy/look_aside_balancer.go | 23 +++++++++++++++++-- internal/proxy/look_aside_balancer_test.go | 26 ++++++++++++++-------- 2 files changed, 38 insertions(+), 11 deletions(-) diff --git a/internal/proxy/look_aside_balancer.go b/internal/proxy/look_aside_balancer.go index 85343f9c8..ad8a092cf 100644 --- a/internal/proxy/look_aside_balancer.go +++ b/internal/proxy/look_aside_balancer.go @@ -37,6 +37,7 @@ import ( var ( checkQueryNodeHealthInterval = 500 * time.Millisecond + CostMetricsExpireTime = 1000 * time.Millisecond ) type LookAsideBalancer struct { @@ -101,7 +102,7 @@ func (b *LookAsideBalancer) SelectNode(ctx context.Context, availableNodes []int b.executingTaskTotalNQ.Insert(node, executingNQ) } - score := b.calculateScore(cost, executingNQ.Load()) + score := b.calculateScore(node, cost, executingNQ.Load()) metrics.ProxyWorkLoadScore.WithLabelValues(strconv.FormatInt(node, 10)).Set(score) if targetNode == -1 || score < targetScore { @@ -138,10 +139,18 @@ func (b *LookAsideBalancer) UpdateCostMetrics(node int64, cost *internalpb.CostA // calculateScore compute the query node's workload score // https://www.usenix.org/conference/nsdi15/technical-sessions/presentation/suresh -func (b *LookAsideBalancer) calculateScore(cost *internalpb.CostAggregation, executingNQ int64) float64 { +func (b *LookAsideBalancer) calculateScore(node int64, cost *internalpb.CostAggregation, executingNQ int64) float64 { if cost == nil || cost.ResponseTime == 0 || cost.ServiceTime == 0 { return math.Pow(float64(1+executingNQ), 3.0) } + + // for multi-replica cases, when there are no task which waiting in queue, + // the response time will effect the score, to prevent the score based on a too old value + // we expire the cost metrics by second if no task in queue. + if executingNQ == 0 && cost.TotalNQ == 0 && b.isNodeCostMetricsTooOld(node) { + return 0 + } + executeSpeed := float64(cost.ResponseTime) - float64(1)/float64(cost.ServiceTime) workload := math.Pow(float64(1+cost.TotalNQ+executingNQ), 3.0) / float64(cost.ServiceTime) if workload < 0.0 { @@ -151,6 +160,16 @@ func (b *LookAsideBalancer) calculateScore(cost *internalpb.CostAggregation, exe return executeSpeed + workload } +// if the node cost metrics hasn't been updated for a second, we think the metrics is too old +func (b *LookAsideBalancer) isNodeCostMetricsTooOld(node int64) bool { + lastUpdateTs, ok := b.metricsUpdateTs.Get(node) + if !ok || lastUpdateTs == 0 { + return false + } + + return time.Now().UnixMilli()-lastUpdateTs > CostMetricsExpireTime.Milliseconds() +} + func (b *LookAsideBalancer) checkQueryNodeHealthLoop(ctx context.Context) { log := log.Ctx(ctx).WithRateGroup("proxy.LookAsideBalancer", 60, 1) defer b.wg.Done() diff --git a/internal/proxy/look_aside_balancer_test.go b/internal/proxy/look_aside_balancer_test.go index e57359fc3..b2f8605be 100644 --- a/internal/proxy/look_aside_balancer_test.go +++ b/internal/proxy/look_aside_balancer_test.go @@ -93,19 +93,19 @@ func (suite *LookAsideBalancerSuite) TestCalculateScore() { TotalNQ: 0, } - score1 := suite.balancer.calculateScore(costMetrics1, 0) - score2 := suite.balancer.calculateScore(costMetrics2, 0) - score3 := suite.balancer.calculateScore(costMetrics3, 0) - score4 := suite.balancer.calculateScore(costMetrics4, 0) + score1 := suite.balancer.calculateScore(-1, costMetrics1, 0) + score2 := suite.balancer.calculateScore(-1, costMetrics2, 0) + score3 := suite.balancer.calculateScore(-1, costMetrics3, 0) + score4 := suite.balancer.calculateScore(-1, costMetrics4, 0) suite.Equal(float64(12), score1) suite.Equal(float64(8.5), score2) suite.Equal(float64(17), score3) suite.Equal(float64(5), score4) - score5 := suite.balancer.calculateScore(costMetrics1, 5) - score6 := suite.balancer.calculateScore(costMetrics2, 5) - score7 := suite.balancer.calculateScore(costMetrics3, 5) - score8 := suite.balancer.calculateScore(costMetrics4, 5) + score5 := suite.balancer.calculateScore(-1, costMetrics1, 5) + score6 := suite.balancer.calculateScore(-1, costMetrics2, 5) + score7 := suite.balancer.calculateScore(-1, costMetrics3, 5) + score8 := suite.balancer.calculateScore(-1, costMetrics4, 5) suite.Equal(float64(347), score5) suite.Equal(float64(176), score6) suite.Equal(float64(352), score7) @@ -118,8 +118,16 @@ func (suite *LookAsideBalancerSuite) TestCalculateScore() { TotalNQ: math.MaxInt64, } - score9 := suite.balancer.calculateScore(costMetrics5, math.MaxInt64) + score9 := suite.balancer.calculateScore(-1, costMetrics5, math.MaxInt64) suite.Equal(math.MaxFloat64, score9) + + // test metrics expire + suite.balancer.metricsUpdateTs.Insert(1, time.Now().UnixMilli()) + score10 := suite.balancer.calculateScore(1, costMetrics4, 0) + suite.Equal(float64(5), score10) + suite.balancer.metricsUpdateTs.Insert(1, time.Now().UnixMilli()-5000) + score11 := suite.balancer.calculateScore(1, costMetrics4, 0) + suite.Equal(float64(0), score11) } func (suite *LookAsideBalancerSuite) TestSelectNode() { -- GitLab