未验证 提交 e245ba87 编写于 作者: B bigsheeper 提交者: GitHub

Add excluded segments for dropped segemnts (#12247)

Signed-off-by: Nbigsheeper <yihao.dai@zilliz.com>
上级 66954ec1
...@@ -39,7 +39,7 @@ func (h *ServerHandler) GetVChanPositions(channel string, collectionID UniqueID, ...@@ -39,7 +39,7 @@ func (h *ServerHandler) GetVChanPositions(channel string, collectionID UniqueID,
) )
var flushed []*datapb.SegmentInfo var flushed []*datapb.SegmentInfo
var unflushed []*datapb.SegmentInfo var unflushed []*datapb.SegmentInfo
var dropped []UniqueID var dropped []*datapb.SegmentInfo
var seekPosition *internalpb.MsgPosition var seekPosition *internalpb.MsgPosition
for _, s := range segments { for _, s := range segments {
if (partitionID > allPartitionID && s.PartitionID != partitionID) || if (partitionID > allPartitionID && s.PartitionID != partitionID) ||
...@@ -48,7 +48,7 @@ func (h *ServerHandler) GetVChanPositions(channel string, collectionID UniqueID, ...@@ -48,7 +48,7 @@ func (h *ServerHandler) GetVChanPositions(channel string, collectionID UniqueID,
} }
if s.GetState() == commonpb.SegmentState_Dropped { if s.GetState() == commonpb.SegmentState_Dropped {
dropped = append(dropped, s.GetID()) dropped = append(dropped, trimSegmentInfo(s.SegmentInfo))
continue continue
} }
......
...@@ -1433,7 +1433,8 @@ func TestGetRecoveryInfo(t *testing.T) { ...@@ -1433,7 +1433,8 @@ func TestGetRecoveryInfo(t *testing.T) {
assert.EqualValues(t, 1, len(resp.GetChannels())) assert.EqualValues(t, 1, len(resp.GetChannels()))
assert.NotNil(t, resp.GetChannels()[0].SeekPosition) assert.NotNil(t, resp.GetChannels()[0].SeekPosition)
assert.NotEqual(t, 0, resp.GetChannels()[0].GetSeekPosition().GetTimestamp()) assert.NotEqual(t, 0, resp.GetChannels()[0].GetSeekPosition().GetTimestamp())
assert.ElementsMatch(t, []int64{8}, resp.GetChannels()[0].GetDroppedSegments()) assert.Len(t, resp.GetChannels()[0].GetDroppedSegments(), 1)
assert.Equal(t, UniqueID(8), resp.GetChannels()[0].GetDroppedSegments()[0].GetID())
}) })
t.Run("with closed server", func(t *testing.T) { t.Run("with closed server", func(t *testing.T) {
......
...@@ -58,7 +58,7 @@ type ddNode struct { ...@@ -58,7 +58,7 @@ type ddNode struct {
segID2SegInfo sync.Map // segment ID to *SegmentInfo segID2SegInfo sync.Map // segment ID to *SegmentInfo
flushedSegments []*datapb.SegmentInfo flushedSegments []*datapb.SegmentInfo
droppedSegments []UniqueID droppedSegments []*datapb.SegmentInfo
vchannelName string vchannelName string
deltaMsgStream msgstream.MsgStream deltaMsgStream msgstream.MsgStream
...@@ -199,8 +199,8 @@ func (ddn *ddNode) isFlushed(segmentID UniqueID) bool { ...@@ -199,8 +199,8 @@ func (ddn *ddNode) isFlushed(segmentID UniqueID) bool {
} }
func (ddn *ddNode) isDropped(segID UniqueID) bool { func (ddn *ddNode) isDropped(segID UniqueID) bool {
for _, sID := range ddn.droppedSegments { for _, droppedSegment := range ddn.droppedSegments {
if sID == segID { if droppedSegment.GetID() == segID {
return true return true
} }
} }
......
...@@ -382,23 +382,29 @@ func TestFlowGraph_DDNode_isFlushed(te *testing.T) { ...@@ -382,23 +382,29 @@ func TestFlowGraph_DDNode_isFlushed(te *testing.T) {
} }
func TestFlowGraph_DDNode_isDropped(te *testing.T) { func TestFlowGraph_DDNode_isDropped(te *testing.T) {
genSegmentInfoByID := func(segmentID UniqueID) *datapb.SegmentInfo {
return &datapb.SegmentInfo{
ID: segmentID,
}
}
tests := []struct { tests := []struct {
indroppedSegment []UniqueID indroppedSegment []*datapb.SegmentInfo
inSeg UniqueID inSeg UniqueID
expectedOut bool expectedOut bool
description string description string
}{ }{
{[]UniqueID{1, 2, 3}, 1, true, {[]*datapb.SegmentInfo{genSegmentInfoByID(1), genSegmentInfoByID(2), genSegmentInfoByID(3)}, 1, true,
"Input seg 1 in droppedSegs{1, 2, 3}"}, "Input seg 1 in droppedSegs{1, 2, 3}"},
{[]UniqueID{1, 2, 3}, 2, true, {[]*datapb.SegmentInfo{genSegmentInfoByID(1), genSegmentInfoByID(2), genSegmentInfoByID(3)}, 2, true,
"Input seg 2 in droppedSegs{1, 2, 3}"}, "Input seg 2 in droppedSegs{1, 2, 3}"},
{[]UniqueID{1, 2, 3}, 3, true, {[]*datapb.SegmentInfo{genSegmentInfoByID(1), genSegmentInfoByID(2), genSegmentInfoByID(3)}, 3, true,
"Input seg 3 in droppedSegs{1, 2, 3}"}, "Input seg 3 in droppedSegs{1, 2, 3}"},
{[]UniqueID{1, 2, 3}, 4, false, {[]*datapb.SegmentInfo{genSegmentInfoByID(1), genSegmentInfoByID(2), genSegmentInfoByID(3)}, 4, false,
"Input seg 4 not in droppedSegs{1, 2, 3}"}, "Input seg 4 not in droppedSegs{1, 2, 3}"},
{[]UniqueID{}, 5, false, {[]*datapb.SegmentInfo{}, 5, false,
"Input seg 5, no droppedSegs {}"}, "Input seg 5, no droppedSegs {}"},
} }
......
...@@ -167,7 +167,7 @@ message VchannelInfo { ...@@ -167,7 +167,7 @@ message VchannelInfo {
internal.MsgPosition seek_position = 3; internal.MsgPosition seek_position = 3;
repeated SegmentInfo unflushedSegments = 4; repeated SegmentInfo unflushedSegments = 4;
repeated SegmentInfo flushedSegments = 5; repeated SegmentInfo flushedSegments = 5;
repeated int64 dropped_segments = 6; repeated SegmentInfo dropped_segments = 6;
} }
message WatchDmChannelsRequest { message WatchDmChannelsRequest {
......
...@@ -249,6 +249,26 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error { ...@@ -249,6 +249,26 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error {
zap.Any("flushedCheckPointInfos", flushedCheckPointInfos), zap.Any("flushedCheckPointInfos", flushedCheckPointInfos),
) )
// add excluded segments for dropped segments,
// dropped segments with later check point than seekPosition should be filtered out.
droppedCheckPointInfos := make([]*datapb.SegmentInfo, 0)
for _, info := range w.req.Infos {
for _, droppedSegment := range info.DroppedSegments {
for _, position := range toSeekChannels {
if droppedSegment != nil &&
droppedSegment.DmlPosition.ChannelName == position.ChannelName &&
droppedSegment.DmlPosition.Timestamp > position.Timestamp {
droppedCheckPointInfos = append(droppedCheckPointInfos, droppedSegment)
}
}
}
}
w.node.streaming.replica.addExcludedSegments(collectionID, droppedCheckPointInfos)
log.Debug("watchDMChannel, add check points info for dropped segments done",
zap.Any("collectionID", collectionID),
zap.Any("droppedCheckPointInfos", droppedCheckPointInfos),
)
// create tSafe // create tSafe
for _, channel := range vChannels { for _, channel := range vChannels {
w.node.tSafeReplica.addTSafe(channel) w.node.tSafeReplica.addTSafe(channel)
......
...@@ -178,6 +178,38 @@ func TestTask_watchDmChannelsTask(t *testing.T) { ...@@ -178,6 +178,38 @@ func TestTask_watchDmChannelsTask(t *testing.T) {
err = task.Execute(ctx) err = task.Execute(ctx)
assert.NoError(t, err) assert.NoError(t, err)
}) })
t.Run("test add excluded segment for dropped segment", func(t *testing.T) {
node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err)
task := watchDmChannelsTask{
req: genWatchDMChannelsRequest(),
node: node,
}
tmpChannel := defaultVChannel + "_1"
task.req.Infos = []*datapb.VchannelInfo{
{
CollectionID: defaultCollectionID,
ChannelName: defaultVChannel,
SeekPosition: &msgstream.MsgPosition{
ChannelName: tmpChannel,
Timestamp: 0,
MsgID: []byte{1, 2, 3},
},
DroppedSegments: []*datapb.SegmentInfo{
{
DmlPosition: &internalpb.MsgPosition{
ChannelName: tmpChannel,
Timestamp: typeutil.MaxTimestamp,
},
},
},
},
}
err = task.Execute(ctx)
assert.NoError(t, err)
})
} }
func TestTask_loadSegmentsTask(t *testing.T) { func TestTask_loadSegmentsTask(t *testing.T) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册