未验证 提交 8b4d60b0 编写于 作者: C congqixia 提交者: GitHub

Add guarantee and serviceable lag too large check (#21441)

Signed-off-by: NCongqi Xia <congqi.xia@zilliz.com>
上级 3a58ac9f
......@@ -230,6 +230,10 @@ queryNode:
# Max read concurrency must greater than or equal to 1, and less than or equal to runtime.NumCPU * 100.
maxReadConcurrentRatio: 2.0 # (0, 100]
cpuRatio: 10.0 # ratio used to estimate read task cpu usage.
# maxTimestampLag is the max ts lag between serviceable and guarantee timestamp.
# if the lag is larger than this config, scheduler will return error without waiting.
# the valid value is [3600, infinite)
maxTimestampLag: 86400
grouping:
enabled: true
......
......@@ -19,16 +19,26 @@ package querynode
import (
"errors"
"fmt"
"time"
)
var (
// ErrShardNotAvailable shard not available error base.
ErrShardNotAvailable = errors.New("ShardNotAvailable")
// ErrTsLagTooLarge serviceable and guarantee lag too large.
ErrTsLagTooLarge = errors.New("Timestamp lag too large")
)
// WrapErrShardNotAvailable wraps ErrShardNotAvailable with replica id and channel name.
func WrapErrShardNotAvailable(replicaID int64, shard string) error {
return fmt.Errorf("%w(replica=%d, shard=%s)", ErrShardNotAvailable, replicaID, shard)
}
// WrapErrTsLagTooLarge wraps ErrTsLagTooLarge with lag and max value.
func WrapErrTsLagTooLarge(duration time.Duration, maxLag time.Duration) error {
return fmt.Errorf("%w lag(%s) max(%s)", ErrTsLagTooLarge, duration, maxLag)
}
// msgQueryNodeIsUnhealthy is the error msg of unhealthy query node
func msgQueryNodeIsUnhealthy(nodeID UniqueID) string {
return fmt.Sprintf("query node %d is not ready", nodeID)
......
......@@ -168,15 +168,26 @@ func (b *baseReadTask) Ready() (bool, error) {
gt, _ := tsoutil.ParseTS(guaranteeTs)
st, _ := tsoutil.ParseTS(serviceTime)
if guaranteeTs > serviceTime {
lag := gt.Sub(st)
maxLag := Params.QueryNodeCfg.MaxTimestampLag.GetAsDuration(time.Second)
if lag > maxLag {
log.Warn("guarantee and servicable ts larger than MaxLag",
zap.Time("guaranteeTime", gt),
zap.Time("serviceableTime", st),
zap.Duration("lag", lag),
zap.Duration("maxTsLag", maxLag),
)
return false, WrapErrTsLagTooLarge(lag, maxLag)
}
return false, nil
}
log.Debug("query msg can do",
zap.Any("collectionID", b.CollectionID),
zap.Any("sm.GuaranteeTimestamp", gt),
zap.Any("serviceTime", st),
zap.Any("delta milliseconds", gt.Sub(st).Milliseconds()),
zap.Any("channel", channel),
zap.Any("msgID", b.ID()))
zap.Int64("collectionID", b.CollectionID),
zap.Time("sm.GuaranteeTimestamp", gt),
zap.Time("serviceTime", st),
zap.Int64("delta milliseconds", gt.Sub(st).Milliseconds()),
zap.String("channel", channel),
zap.Int64("msgID", b.ID()))
b.waitTsDur = b.waitTSafeTr.Elapse("wait for tsafe done")
return true, nil
}
......@@ -6,7 +6,10 @@ import (
"time"
"github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/util/timerecord"
"github.com/milvus-io/milvus/internal/util/tsoutil"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
)
......@@ -30,6 +33,7 @@ func (s *baseReadTaskSuite) SetupSuite() {
lcm := &mocks.ChunkManager{}
tsafe := &MockTSafeReplicaInterface{}
s.tsafe = tsafe
qs, err := newQueryShard(context.Background(), defaultCollectionID, defaultDMLChannel, defaultReplicaID, nil, meta, tsafe, lcm, rcm, false)
s.Require().NoError(err)
......@@ -107,7 +111,49 @@ func (s *baseReadTaskSuite) TestTimeoutError() {
s.Assert().ErrorIs(s.task.TimeoutError(), context.DeadlineExceeded)
})
}
func (s *baseReadTaskSuite) TestReady() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s.task.ctx = ctx
baseTime := time.Now()
serviceable := tsoutil.ComposeTSByTime(baseTime, 0)
s.tsafe.EXPECT().getTSafe(mock.AnythingOfType("string")).Return(serviceable, nil)
s.Run("lag too large", func() {
tooLargeGuarantee := baseTime.Add(Params.QueryNodeCfg.MaxTimestampLag.GetAsDuration(time.Second)).Add(time.Second)
guaranteeTs := tsoutil.ComposeTSByTime(tooLargeGuarantee, 0)
s.task.GuaranteeTimestamp = guaranteeTs
s.task.DataScope = querypb.DataScope_Historical
ready, err := s.task.Ready()
s.False(ready)
s.Error(err)
s.ErrorIs(err, ErrTsLagTooLarge)
})
s.Run("not ready", func() {
guarantee := baseTime.Add(Params.QueryNodeCfg.MaxTimestampLag.GetAsDuration(time.Second)).Add(-time.Second)
guaranteeTs := tsoutil.ComposeTSByTime(guarantee, 0)
s.task.GuaranteeTimestamp = guaranteeTs
s.task.DataScope = querypb.DataScope_Historical
ready, err := s.task.Ready()
s.False(ready)
s.NoError(err)
})
s.Run("ready", func() {
guarantee := baseTime.Add(-time.Second)
guaranteeTs := tsoutil.ComposeTSByTime(guarantee, 0)
s.task.GuaranteeTimestamp = guaranteeTs
s.task.DataScope = querypb.DataScope_Historical
ready, err := s.task.Ready()
s.True(ready)
s.NoError(err)
})
}
func TestBaseReadTask(t *testing.T) {
......
......@@ -1041,6 +1041,7 @@ type queryNodeConfig struct {
MaxGroupNQ ParamItem `refreshable:"true"`
TopKMergeRatio ParamItem `refreshable:"true"`
CPURatio ParamItem `refreshable:"true"`
MaxTimestampLag ParamItem `refreshable:"true"`
GCHelperEnabled ParamItem `refreshable:"false"`
MinimumGOGCConfig ParamItem `refreshable:"false"`
......@@ -1254,6 +1255,13 @@ func (p *queryNodeConfig) init(base *BaseTable) {
}
p.MaxDiskUsagePercentage.Init(base.mgr)
p.MaxTimestampLag = ParamItem{
Key: "queryNode.scheduler.maxTimestampLag",
Version: "2.2.3",
DefaultValue: "86400",
}
p.MaxTimestampLag.Init(base.mgr)
p.GCHelperEnabled = ParamItem{
Key: "queryNode.gchelper.enabled",
Version: "2.0.0",
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册