未验证 提交 8b682fde 编写于 作者: W wei liu 提交者: GitHub

fix duplicate watch channel task (#20670)

Signed-off-by: NWei Liu <wei.liu@zilliz.com>
Signed-off-by: NWei Liu <wei.liu@zilliz.com>
上级 38845d96
......@@ -56,7 +56,8 @@ var (
ErrTaskQueueFull = errors.New("TaskQueueFull")
ErrFailedResponse = errors.New("RpcFailed")
ErrFailedResponse = errors.New("RpcFailed")
ErrTaskAlreadyDone = errors.New("TaskAlreadyDone")
)
type Type = int32
......@@ -260,6 +261,13 @@ func (scheduler *taskScheduler) preAdd(task Task) error {
return ErrConflictTaskExisted
}
if GetTaskType(task) == TaskTypeGrow {
nodesWithSegment := scheduler.distMgr.LeaderViewManager.GetSealedSegmentDist(task.SegmentID())
replicaNodeMap := utils.GroupNodesByReplica(scheduler.meta.ReplicaManager, task.CollectionID(), nodesWithSegment)
if _, ok := replicaNodeMap[task.ReplicaID()]; ok {
return ErrTaskAlreadyDone
}
}
case *ChannelTask:
index := replicaChannelIndex{task.ReplicaID(), task.Channel()}
......@@ -280,6 +288,14 @@ func (scheduler *taskScheduler) preAdd(task Task) error {
return ErrConflictTaskExisted
}
if GetTaskType(task) == TaskTypeGrow {
nodesWithChannel := scheduler.distMgr.LeaderViewManager.GetChannelDist(task.Channel())
replicaNodeMap := utils.GroupNodesByReplica(scheduler.meta.ReplicaManager, task.CollectionID(), nodesWithChannel)
if _, ok := replicaNodeMap[task.ReplicaID()]; ok {
return ErrTaskAlreadyDone
}
}
default:
panic(fmt.Sprintf("preAdd: forget to process task type: %+v", task))
}
......
......@@ -145,7 +145,9 @@ func (suite *TaskSuite) BeforeTest(suiteName, testName string) {
"TestLoadSegmentTaskFailed",
"TestSegmentTaskStale",
"TestTaskCanceled",
"TestMoveSegmentTask":
"TestMoveSegmentTask",
"TestSubmitDuplicateLoadSegmentTask",
"TestSubmitDuplicateSubscribeChannelTask":
suite.meta.PutCollection(&meta.Collection{
CollectionLoadInfo: &querypb.CollectionLoadInfo{
CollectionID: suite.collection,
......@@ -158,6 +160,48 @@ func (suite *TaskSuite) BeforeTest(suiteName, testName string) {
}
}
func (suite *TaskSuite) TestSubmitDuplicateSubscribeChannelTask() {
ctx := context.Background()
timeout := 10 * time.Second
targetNode := int64(3)
tasks := []Task{}
dmChannels := make([]*datapb.VchannelInfo, 0)
for _, channel := range suite.subChannels {
dmChannels = append(dmChannels, &datapb.VchannelInfo{
CollectionID: suite.collection,
ChannelName: channel,
UnflushedSegmentIds: []int64{suite.growingSegments[channel]},
})
task, err := NewChannelTask(
ctx,
timeout,
0,
suite.collection,
suite.replica,
NewChannelAction(targetNode, ActionTypeGrow, channel),
)
suite.NoError(err)
tasks = append(tasks, task)
}
views := make([]*meta.LeaderView, 0)
for _, channel := range suite.subChannels {
views = append(views, &meta.LeaderView{
ID: targetNode,
CollectionID: suite.collection,
Channel: channel,
})
}
suite.dist.LeaderViewManager.Update(targetNode, views...)
for _, task := range tasks {
err := suite.scheduler.Add(task)
suite.Error(err)
suite.ErrorIs(err, ErrTaskAlreadyDone)
}
}
func (suite *TaskSuite) TestSubscribeChannelTask() {
ctx := context.Background()
timeout := 10 * time.Second
......@@ -391,6 +435,47 @@ func (suite *TaskSuite) TestLoadSegmentTask() {
}
}
func (suite *TaskSuite) TestSubmitDuplicateLoadSegmentTask() {
ctx := context.Background()
timeout := 10 * time.Second
targetNode := int64(3)
channel := &datapb.VchannelInfo{
CollectionID: suite.collection,
ChannelName: Params.CommonCfg.RootCoordDml + "-test",
}
tasks := []Task{}
for _, segment := range suite.loadSegments {
task, err := NewSegmentTask(
ctx,
timeout,
0,
suite.collection,
suite.replica,
NewSegmentAction(targetNode, ActionTypeGrow, channel.GetChannelName(), segment),
)
suite.NoError(err)
tasks = append(tasks, task)
}
// Process tasks done
// Dist contains channels
view := &meta.LeaderView{
ID: targetNode,
CollectionID: suite.collection,
Segments: map[int64]*querypb.SegmentDist{},
}
for _, segment := range suite.loadSegments {
view.Segments[segment] = &querypb.SegmentDist{NodeID: targetNode, Version: 0}
}
suite.dist.LeaderViewManager.Update(targetNode, view)
for _, task := range tasks {
err := suite.scheduler.Add(task)
suite.Error(err)
suite.ErrorIs(err, ErrTaskAlreadyDone)
}
}
func (suite *TaskSuite) TestLoadSegmentTaskFailed() {
ctx := context.Background()
timeout := 10 * time.Second
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册