未验证 提交 8e0c8feb 编写于 作者: C congqixia 提交者: GitHub

Add dropped segment ids in vchaninfo (#12230)

Signed-off-by: NCongqi Xia <congqi.xia@zilliz.com>
上级 3fa5b4a9
......@@ -28,7 +28,10 @@ func newServerHandler(s *Server) *ServerHandler {
// GetVChanPositions get vchannel latest postitions with provided dml channel names
func (h *ServerHandler) GetVChanPositions(channel string, collectionID UniqueID, partitionID UniqueID) *datapb.VchannelInfo {
segments := h.s.meta.GetSegmentsByChannel(channel)
// cannot use GetSegmentsByChannel since dropped segments are needed here
segments := h.s.meta.SelectSegments(func(s *SegmentInfo) bool {
return s.InsertChannel == channel
})
log.Debug("GetSegmentsByChannel",
zap.Any("collectionID", collectionID),
zap.Any("channel", channel),
......@@ -36,6 +39,7 @@ func (h *ServerHandler) GetVChanPositions(channel string, collectionID UniqueID,
)
var flushed []*datapb.SegmentInfo
var unflushed []*datapb.SegmentInfo
var dropped []UniqueID
var seekPosition *internalpb.MsgPosition
for _, s := range segments {
if (partitionID > allPartitionID && s.PartitionID != partitionID) ||
......@@ -43,6 +47,11 @@ func (h *ServerHandler) GetVChanPositions(channel string, collectionID UniqueID,
continue
}
if s.GetState() == commonpb.SegmentState_Dropped {
dropped = append(dropped, s.GetID())
continue
}
if s.GetState() == commonpb.SegmentState_Flushing || s.GetState() == commonpb.SegmentState_Flushed {
flushed = append(flushed, trimSegmentInfo(s.SegmentInfo))
} else {
......@@ -74,6 +83,7 @@ func (h *ServerHandler) GetVChanPositions(channel string, collectionID UniqueID,
SeekPosition: seekPosition,
FlushedSegments: flushed,
UnflushedSegments: unflushed,
DroppedSegments: dropped,
}
}
......
......@@ -1407,6 +1407,35 @@ func TestGetRecoveryInfo(t *testing.T) {
assert.ElementsMatch(t, []string{"/binlog/file1", "/binlog/file2"}, resp.GetBinlogs()[0].GetFieldBinlogs()[0].GetBinlogs())
})
t.Run("with dropped segments", func(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)
svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdEndpoints []string) (types.RootCoord, error) {
return newMockRootCoordService(), nil
}
seg1 := createSegment(7, 0, 0, 100, 30, "vchan1", commonpb.SegmentState_Growing)
seg2 := createSegment(8, 0, 0, 100, 40, "vchan1", commonpb.SegmentState_Dropped)
err := svr.meta.AddSegment(NewSegmentInfo(seg1))
assert.Nil(t, err)
err = svr.meta.AddSegment(NewSegmentInfo(seg2))
assert.Nil(t, err)
req := &datapb.GetRecoveryInfoRequest{
CollectionID: 0,
PartitionID: 0,
}
resp, err := svr.GetRecoveryInfo(context.TODO(), req)
assert.Nil(t, err)
assert.EqualValues(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
assert.EqualValues(t, 0, len(resp.GetBinlogs()))
assert.EqualValues(t, 1, len(resp.GetChannels()))
assert.NotNil(t, resp.GetChannels()[0].SeekPosition)
assert.NotEqual(t, 0, resp.GetChannels()[0].GetSeekPosition().GetTimestamp())
assert.ElementsMatch(t, []int64{8}, resp.GetChannels()[0].GetDroppedSegments())
})
t.Run("with closed server", func(t *testing.T) {
svr := newTestServer(t, nil)
closeTestServer(t, svr)
......
......@@ -167,6 +167,7 @@ message VchannelInfo {
internal.MsgPosition seek_position = 3;
repeated SegmentInfo unflushedSegments = 4;
repeated SegmentInfo flushedSegments = 5;
repeated int64 dropped_segments = 6;
}
message WatchDmChannelsRequest {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册