未验证 提交 0fefc52a 编写于 作者: C congqixia 提交者: GitHub

Make lookAsideBalancer checkHealth in parallel (#25155)

Signed-off-by: NCongqi Xia <congqi.xia@zilliz.com>
上级 3a222e97
......@@ -27,6 +27,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
......@@ -149,6 +150,7 @@ func (b *LookAsideBalancer) checkQueryNodeHealthLoop(ctx context.Context) {
ticker := time.NewTicker(checkQueryNodeHealthInterval)
defer ticker.Stop()
log.Info("Start check query node health loop")
pool := conc.NewDefaultPool[any]()
for {
select {
case <-b.closeCh:
......@@ -157,44 +159,49 @@ func (b *LookAsideBalancer) checkQueryNodeHealthLoop(ctx context.Context) {
case <-ticker.C:
now := time.Now().UnixMilli()
var futures []*conc.Future[any]
b.metricsUpdateTs.Range(func(node int64, lastUpdateTs int64) bool {
if now-lastUpdateTs > checkQueryNodeHealthInterval.Milliseconds() {
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
checkHealthFailed := func(err error) bool {
log.RatedWarn(30, "query node check health failed, add it to unreachable nodes list",
zap.Int64("nodeID", node),
zap.Error(err))
b.unreachableQueryNodes.Insert(node)
return true
}
qn, err := b.clientMgr.GetClient(ctx, node)
if err != nil {
return checkHealthFailed(err)
}
resp, err := qn.GetComponentStates(ctx)
if err != nil {
return checkHealthFailed(err)
}
if resp.GetState().GetStateCode() != commonpb.StateCode_Healthy {
return checkHealthFailed(merr.WrapErrNodeOffline(node))
}
// check health successfully, update check health ts
b.metricsUpdateTs.Insert(node, time.Now().UnixMilli())
if b.unreachableQueryNodes.Contain(node) {
futures = append(futures, pool.Submit(func() (any, error) {
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
checkHealthFailed := func(err error) bool {
log.RatedWarn(30, "query node check health failed, add it to unreachable nodes list",
zap.Int64("nodeID", node),
zap.Error(err))
b.unreachableQueryNodes.Insert(node)
return true
}
qn, err := b.clientMgr.GetClient(ctx, node)
if err != nil {
checkHealthFailed(err)
return struct{}{}, nil
}
resp, err := qn.GetComponentStates(ctx)
if err != nil {
checkHealthFailed(err)
return struct{}{}, nil
}
if resp.GetState().GetStateCode() != commonpb.StateCode_Healthy {
checkHealthFailed(merr.WrapErrNodeOffline(node))
return struct{}{}, nil
}
// check health successfully, update check health ts
b.metricsUpdateTs.Insert(node, time.Now().Local().UnixMilli())
b.unreachableQueryNodes.Remove(node)
log.Info("query node check health success, remove it from unreachable nodes list",
zap.Int64("nodeID", node))
}
return struct{}{}, nil
}))
}
return true
})
conc.AwaitAll(futures...)
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册