未验证 提交 ae0f467c 编写于 作者: Y yah01 提交者: GitHub

Fix segment/channel may be re-loaded/subscribed (#22969)

Signed-off-by: Nyah01 <yang.cen@zilliz.com>
上级 ef57ba3b
......@@ -29,6 +29,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/merr"
. "github.com/milvus-io/milvus/internal/util/typeutil"
......@@ -222,6 +223,7 @@ func (scheduler *taskScheduler) Add(task Task) error {
err := scheduler.preAdd(task)
if err != nil {
task.Cancel(err)
return err
}
......@@ -265,6 +267,14 @@ func (scheduler *taskScheduler) preAdd(task Task) error {
return merr.WrapErrServiceInternal("task with the same segment exists")
}
if GetTaskType(task) == TaskTypeGrow {
nodesWithSegment := scheduler.distMgr.LeaderViewManager.GetSealedSegmentDist(task.SegmentID())
replicaNodeMap := utils.GroupNodesByReplica(scheduler.meta.ReplicaManager, task.CollectionID(), nodesWithSegment)
if _, ok := replicaNodeMap[task.ReplicaID()]; ok {
return merr.WrapErrServiceInternal("segment loaded, it can be only balanced")
}
}
case *ChannelTask:
index := replicaChannelIndex{task.ReplicaID(), task.Channel()}
if old, ok := scheduler.channelTasks[index]; ok {
......@@ -283,6 +293,14 @@ func (scheduler *taskScheduler) preAdd(task Task) error {
return merr.WrapErrServiceInternal("task with the same channel exists")
}
if GetTaskType(task) == TaskTypeGrow {
nodesWithChannel := scheduler.distMgr.LeaderViewManager.GetChannelDist(task.Channel())
replicaNodeMap := utils.GroupNodesByReplica(scheduler.meta.ReplicaManager, task.CollectionID(), nodesWithChannel)
if _, ok := replicaNodeMap[task.ReplicaID()]; ok {
return merr.WrapErrServiceInternal("channel subscribed, it can be only balanced")
}
}
default:
panic(fmt.Sprintf("preAdd: forget to process task type: %+v", task))
}
......
......@@ -185,14 +185,6 @@ func (suite *TaskSuite) TestSubscribeChannelTask() {
Return(&schemapb.CollectionSchema{
Name: "TestSubscribeChannelTask",
}, nil)
channels := make([]*datapb.VchannelInfo, 0, len(suite.subChannels))
for _, channel := range suite.subChannels {
channels = append(channels, &datapb.VchannelInfo{
CollectionID: suite.collection,
ChannelName: channel,
UnflushedSegmentIds: []int64{suite.growingSegments[channel]},
})
}
for channel, segment := range suite.growingSegments {
suite.broker.EXPECT().GetSegmentInfo(mock.Anything, segment).
Return(&datapb.GetSegmentInfoResponse{Infos: []*datapb.SegmentInfo{
......@@ -262,6 +254,42 @@ func (suite *TaskSuite) TestSubscribeChannelTask() {
}
}
func (suite *TaskSuite) TestSubmitDuplicateSubscribeChannelTask() {
ctx := context.Background()
timeout := 10 * time.Second
targetNode := int64(3)
tasks := []Task{}
for _, channel := range suite.subChannels {
task, err := NewChannelTask(
ctx,
timeout,
0,
suite.collection,
suite.replica,
NewChannelAction(targetNode, ActionTypeGrow, channel),
)
suite.NoError(err)
tasks = append(tasks, task)
}
views := make([]*meta.LeaderView, 0)
for _, channel := range suite.subChannels {
views = append(views, &meta.LeaderView{
ID: targetNode,
CollectionID: suite.collection,
Channel: channel,
})
}
suite.dist.LeaderViewManager.Update(targetNode, views...)
for _, task := range tasks {
err := suite.scheduler.Add(task)
suite.Equal(TaskStatusCanceled, task.Status())
suite.Error(err)
}
}
func (suite *TaskSuite) TestUnsubscribeChannelTask() {
ctx := context.Background()
timeout := 10 * time.Second
......@@ -1045,7 +1073,6 @@ func (suite *TaskSuite) TestNoExecutor() {
CollectionID: suite.collection,
ChannelName: channel.ChannelName,
}))
tasks := []Task{}
segments := make([]*datapb.SegmentBinlogs, 0)
for _, segment := range suite.loadSegments {
segments = append(segments, &datapb.SegmentBinlogs{
......@@ -1061,7 +1088,6 @@ func (suite *TaskSuite) TestNoExecutor() {
NewSegmentAction(targetNode, ActionTypeGrow, channel.GetChannelName(), segment),
)
suite.NoError(err)
tasks = append(tasks, task)
err = suite.scheduler.Add(task)
suite.NoError(err)
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册