未验证 提交 b7b24977 编写于 作者: W wei liu 提交者: GitHub

enable metrics expire for replica selection (#25360)

Signed-off-by: NWei Liu <wei.liu@zilliz.com>
上级 fe22720f
......@@ -37,6 +37,7 @@ import (
var (
checkQueryNodeHealthInterval = 500 * time.Millisecond
CostMetricsExpireTime = 1000 * time.Millisecond
)
type LookAsideBalancer struct {
......@@ -101,7 +102,7 @@ func (b *LookAsideBalancer) SelectNode(ctx context.Context, availableNodes []int
b.executingTaskTotalNQ.Insert(node, executingNQ)
}
score := b.calculateScore(cost, executingNQ.Load())
score := b.calculateScore(node, cost, executingNQ.Load())
metrics.ProxyWorkLoadScore.WithLabelValues(strconv.FormatInt(node, 10)).Set(score)
if targetNode == -1 || score < targetScore {
......@@ -138,10 +139,18 @@ func (b *LookAsideBalancer) UpdateCostMetrics(node int64, cost *internalpb.CostA
// calculateScore compute the query node's workload score
// https://www.usenix.org/conference/nsdi15/technical-sessions/presentation/suresh
func (b *LookAsideBalancer) calculateScore(cost *internalpb.CostAggregation, executingNQ int64) float64 {
func (b *LookAsideBalancer) calculateScore(node int64, cost *internalpb.CostAggregation, executingNQ int64) float64 {
if cost == nil || cost.ResponseTime == 0 || cost.ServiceTime == 0 {
return math.Pow(float64(1+executingNQ), 3.0)
}
// for multi-replica cases, when there are no task which waiting in queue,
// the response time will effect the score, to prevent the score based on a too old value
// we expire the cost metrics by second if no task in queue.
if executingNQ == 0 && cost.TotalNQ == 0 && b.isNodeCostMetricsTooOld(node) {
return 0
}
executeSpeed := float64(cost.ResponseTime) - float64(1)/float64(cost.ServiceTime)
workload := math.Pow(float64(1+cost.TotalNQ+executingNQ), 3.0) / float64(cost.ServiceTime)
if workload < 0.0 {
......@@ -151,6 +160,16 @@ func (b *LookAsideBalancer) calculateScore(cost *internalpb.CostAggregation, exe
return executeSpeed + workload
}
// if the node cost metrics hasn't been updated for a second, we think the metrics is too old
func (b *LookAsideBalancer) isNodeCostMetricsTooOld(node int64) bool {
lastUpdateTs, ok := b.metricsUpdateTs.Get(node)
if !ok || lastUpdateTs == 0 {
return false
}
return time.Now().UnixMilli()-lastUpdateTs > CostMetricsExpireTime.Milliseconds()
}
func (b *LookAsideBalancer) checkQueryNodeHealthLoop(ctx context.Context) {
log := log.Ctx(ctx).WithRateGroup("proxy.LookAsideBalancer", 60, 1)
defer b.wg.Done()
......
......@@ -93,19 +93,19 @@ func (suite *LookAsideBalancerSuite) TestCalculateScore() {
TotalNQ: 0,
}
score1 := suite.balancer.calculateScore(costMetrics1, 0)
score2 := suite.balancer.calculateScore(costMetrics2, 0)
score3 := suite.balancer.calculateScore(costMetrics3, 0)
score4 := suite.balancer.calculateScore(costMetrics4, 0)
score1 := suite.balancer.calculateScore(-1, costMetrics1, 0)
score2 := suite.balancer.calculateScore(-1, costMetrics2, 0)
score3 := suite.balancer.calculateScore(-1, costMetrics3, 0)
score4 := suite.balancer.calculateScore(-1, costMetrics4, 0)
suite.Equal(float64(12), score1)
suite.Equal(float64(8.5), score2)
suite.Equal(float64(17), score3)
suite.Equal(float64(5), score4)
score5 := suite.balancer.calculateScore(costMetrics1, 5)
score6 := suite.balancer.calculateScore(costMetrics2, 5)
score7 := suite.balancer.calculateScore(costMetrics3, 5)
score8 := suite.balancer.calculateScore(costMetrics4, 5)
score5 := suite.balancer.calculateScore(-1, costMetrics1, 5)
score6 := suite.balancer.calculateScore(-1, costMetrics2, 5)
score7 := suite.balancer.calculateScore(-1, costMetrics3, 5)
score8 := suite.balancer.calculateScore(-1, costMetrics4, 5)
suite.Equal(float64(347), score5)
suite.Equal(float64(176), score6)
suite.Equal(float64(352), score7)
......@@ -118,8 +118,16 @@ func (suite *LookAsideBalancerSuite) TestCalculateScore() {
TotalNQ: math.MaxInt64,
}
score9 := suite.balancer.calculateScore(costMetrics5, math.MaxInt64)
score9 := suite.balancer.calculateScore(-1, costMetrics5, math.MaxInt64)
suite.Equal(math.MaxFloat64, score9)
// test metrics expire
suite.balancer.metricsUpdateTs.Insert(1, time.Now().UnixMilli())
score10 := suite.balancer.calculateScore(1, costMetrics4, 0)
suite.Equal(float64(5), score10)
suite.balancer.metricsUpdateTs.Insert(1, time.Now().UnixMilli()-5000)
score11 := suite.balancer.calculateScore(1, costMetrics4, 0)
suite.Equal(float64(0), score11)
}
func (suite *LookAsideBalancerSuite) TestSelectNode() {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册