未验证 提交 9258f705 编写于 作者: X Xiaofan 提交者: GitHub

Fix Bulkload ut print too much result (#21821)

Signed-off-by: Nxiaofan-luan <xiaofan.luan@zilliz.com>
上级 54ec22e8
......@@ -40,8 +40,7 @@ import (
)
const (
MaxPendingCount = 65536 // TODO: Make this configurable.
delimiter = "/"
delimiter = "/"
)
// checkPendingTasksInterval is the default interval to check and send out pending tasks,
......@@ -94,7 +93,7 @@ func newImportManager(ctx context.Context, client kv.TxnKV,
mgr := &importManager{
ctx: ctx,
taskStore: client,
pendingTasks: make([]*datapb.ImportTaskInfo, 0, MaxPendingCount), // currently task queue max size is 32
pendingTasks: make([]*datapb.ImportTaskInfo, 0, Params.RootCoordCfg.ImportMaxPendingTaskCount.GetAsInt()), // currently task queue max size is 32
workingTasks: make(map[int64]*datapb.ImportTaskInfo),
busyNodes: make(map[int64]int64),
pendingLock: sync.RWMutex{},
......
......@@ -542,7 +542,10 @@ func TestImportManager_ImportJob(t *testing.T) {
globalCount++
return globalCount, 0, nil
}
paramtable.Get().Save(Params.RootCoordCfg.ImportTaskSubPath.Key, "test_import_task")
paramtable.Get().Save(Params.RootCoordCfg.ImportMaxPendingTaskCount.Key, "16")
defer paramtable.Get().Remove(Params.RootCoordCfg.ImportMaxPendingTaskCount.Key)
colID := int64(100)
mockKv := memkv.NewMemoryKV()
callMarkSegmentsDropped := func(ctx context.Context, segIDs []typeutil.UniqueID) (*commonpb.Status, error) {
......@@ -654,9 +657,9 @@ func TestImportManager_ImportJob(t *testing.T) {
// the pending list already has one task
// once task count exceeds MaxPendingCount, return error
for i := 0; i <= MaxPendingCount; i++ {
for i := 0; i <= Params.RootCoordCfg.ImportMaxPendingTaskCount.GetAsInt(); i++ {
resp = mgr.importJob(context.TODO(), rowReq, colID, 0)
if i < MaxPendingCount-1 {
if i < Params.RootCoordCfg.ImportMaxPendingTaskCount.GetAsInt()-1 {
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
} else {
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
......
......@@ -190,9 +190,13 @@ func Test_scheduler_updateDdlMinTsLoop(t *testing.T) {
paramtable.Get().Save(Params.ProxyCfg.TimeTickInterval.Key, "1")
s.Start()
time.Sleep(time.Millisecond * 4)
assert.Greater(t, s.GetMinDdlTs(), Timestamp(100))
for i := 0; i < 100; i++ {
if s.GetMinDdlTs() > Timestamp(100) {
break
}
assert.True(t, i < 100)
time.Sleep(time.Millisecond)
}
// add task to queue.
n := 10
......
......@@ -579,6 +579,7 @@ type rootCoordConfig struct {
MinSegmentSizeToEnableIndex ParamItem `refreshable:"true"`
ImportTaskExpiration ParamItem `refreshable:"true"`
ImportTaskRetention ParamItem `refreshable:"true"`
ImportMaxPendingTaskCount ParamItem `refreshable:"true"`
ImportTaskSubPath ParamItem `refreshable:"true"`
EnableActiveStandby ParamItem `refreshable:"false"`
}
......@@ -626,6 +627,13 @@ func (p *rootCoordConfig) init(base *BaseTable) {
}
p.ImportTaskSubPath.Init(base.mgr)
p.ImportMaxPendingTaskCount = ParamItem{
Key: "rootCoord.importMaxPendingTaskCount",
Version: "2.2.2",
DefaultValue: strconv.Itoa(65535),
}
p.ImportMaxPendingTaskCount.Init(base.mgr)
p.EnableActiveStandby = ParamItem{
Key: "rootCoord.enableActiveStandby",
Version: "2.2.0",
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册