From af8f7acb7b90ba4a2d940ed97538c5cb7b61276f Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Tue, 14 Feb 2023 09:56:33 +0800 Subject: [PATCH] Remove contraint that compaction based on indexed segment (#22145) Signed-off-by: cai.zhang --- internal/datacoord/compaction_trigger.go | 2 - internal/datacoord/compaction_trigger_test.go | 11 +-- internal/datacoord/handler.go | 26 ++++--- internal/datacoord/server_test.go | 69 +++++++++++++++++++ 4 files changed, 90 insertions(+), 18 deletions(-) diff --git a/internal/datacoord/compaction_trigger.go b/internal/datacoord/compaction_trigger.go index 38888d46b..ed8c5ac92 100644 --- a/internal/datacoord/compaction_trigger.go +++ b/internal/datacoord/compaction_trigger.go @@ -365,8 +365,6 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) { break } - group.segments = FilterInIndexedSegments(t.handler, t.indexCoord, group.segments...) - isDiskIndex, err := t.updateSegmentMaxSize(group.segments) if err != nil { log.Warn("failed to update segment max size", zap.Error(err)) diff --git a/internal/datacoord/compaction_trigger_test.go b/internal/datacoord/compaction_trigger_test.go index b635fb02a..2379f7932 100644 --- a/internal/datacoord/compaction_trigger_test.go +++ b/internal/datacoord/compaction_trigger_test.go @@ -438,7 +438,7 @@ func Test_compactionTrigger_force(t *testing.T) { case <-time.After(2 * time.Second): hasPlan = false } - assert.Equal(t, false, hasPlan) + assert.Equal(t, true, hasPlan) }) t.Run(tt.name+" with meta error", func(t *testing.T) { @@ -1033,13 +1033,8 @@ func Test_compactionTrigger_noplan(t *testing.T) { err := tr.triggerCompaction() assert.Equal(t, tt.wantErr, err != nil) spy := (tt.fields.compactionHandler).(*spyCompactionHandler) - select { - case val := <-spy.spyChan: - assert.Fail(t, "we expect no compaction generated", val) - return - case <-time.After(3 * time.Second): - return - } + plan := <-spy.spyChan + assert.Equal(t, len(plan.SegmentBinlogs), 4) }) } } diff --git a/internal/datacoord/handler.go b/internal/datacoord/handler.go index 8d6bf0787..9b384b82d 100644 --- a/internal/datacoord/handler.go +++ b/internal/datacoord/handler.go @@ -137,14 +137,24 @@ func (h *ServerHandler) GetQueryVChanPositions(channel *channel, partitionID Uni unIndexedIDs.Insert(s.GetID()) } } - for id := range unIndexedIDs { - // Indexed segments are compacted to a raw segment, - // replace it with the indexed ones - if len(segmentInfos[id].GetCompactionFrom()) > 0 && - indexed.Contain(segmentInfos[id].GetCompactionFrom()...) { - unIndexedIDs.Remove(id) - indexedIDs.Insert(segmentInfos[id].GetCompactionFrom()...) - droppedIDs.Remove(segmentInfos[id].GetCompactionFrom()...) + hasUnIndexed := true + for hasUnIndexed { + hasUnIndexed = false + for id := range unIndexedIDs { + // Indexed segments are compacted to a raw segment, + // replace it with the indexed ones + if len(segmentInfos[id].GetCompactionFrom()) > 0 { + unIndexedIDs.Remove(id) + for _, segID := range segmentInfos[id].GetCompactionFrom() { + if indexed.Contain(segID) { + indexedIDs.Insert(segID) + } else { + unIndexedIDs.Insert(segID) + hasUnIndexed = true + } + } + droppedIDs.Remove(segmentInfos[id].GetCompactionFrom()...) + } } } diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index 011c5780c..136ae49b8 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -2644,6 +2644,75 @@ func TestGetRecoveryInfo(t *testing.T) { assert.Equal(t, UniqueID(8), resp.GetChannels()[0].GetDroppedSegmentIds()[0]) }) + t.Run("with continuous compaction", func(t *testing.T) { + svr := newTestServer(t, nil) + defer closeTestServer(t, svr) + + svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdCli *clientv3.Client) (types.RootCoord, error) { + return newMockRootCoordService(), nil + } + + svr.meta.AddCollection(&collectionInfo{ + ID: 0, + Schema: newTestSchema(), + }) + + err := svr.meta.UpdateChannelCheckpoint("vchan1", &internalpb.MsgPosition{ + ChannelName: "vchan1", + Timestamp: 0, + }) + assert.NoError(t, err) + + seg1 := createSegment(9, 0, 0, 100, 30, "vchan1", commonpb.SegmentState_Dropped) + seg2 := createSegment(10, 0, 0, 100, 40, "vchan1", commonpb.SegmentState_Dropped) + seg3 := createSegment(11, 0, 0, 100, 40, "vchan1", commonpb.SegmentState_Dropped) + seg3.CompactionFrom = []int64{9, 10} + seg4 := createSegment(12, 0, 0, 100, 40, "vchan1", commonpb.SegmentState_Dropped) + seg5 := createSegment(13, 0, 0, 100, 40, "vchan1", commonpb.SegmentState_Flushed) + seg5.CompactionFrom = []int64{11, 12} + err = svr.meta.AddSegment(NewSegmentInfo(seg1)) + assert.Nil(t, err) + err = svr.meta.AddSegment(NewSegmentInfo(seg2)) + assert.Nil(t, err) + err = svr.meta.AddSegment(NewSegmentInfo(seg3)) + assert.Nil(t, err) + err = svr.meta.AddSegment(NewSegmentInfo(seg4)) + assert.Nil(t, err) + err = svr.meta.AddSegment(NewSegmentInfo(seg5)) + assert.Nil(t, err) + mockResp := &indexpb.GetIndexInfoResponse{ + Status: &commonpb.Status{}, + SegmentInfo: map[int64]*indexpb.SegmentInfo{ + seg4.ID: { + CollectionID: seg4.CollectionID, + SegmentID: seg4.ID, + EnableIndex: true, + IndexInfos: []*indexpb.IndexFilePathInfo{ + { + SegmentID: seg4.ID, + FieldID: 2, + }, + }, + }, + }, + } + svr.indexCoord = mocks.NewMockIndexCoord(t) + svr.indexCoord.(*mocks.MockIndexCoord).EXPECT().GetIndexInfos(mock.Anything, mock.Anything).Return(mockResp, nil) + + req := &datapb.GetRecoveryInfoRequest{ + CollectionID: 0, + PartitionID: 0, + } + resp, err := svr.GetRecoveryInfo(context.TODO(), req) + assert.Nil(t, err) + assert.EqualValues(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + assert.NotNil(t, resp.GetChannels()[0].SeekPosition) + assert.NotEqual(t, 0, resp.GetChannels()[0].GetSeekPosition().GetTimestamp()) + assert.Len(t, resp.GetChannels()[0].GetDroppedSegmentIds(), 0) + assert.ElementsMatch(t, []UniqueID{9, 10}, resp.GetChannels()[0].GetUnflushedSegmentIds()) + assert.ElementsMatch(t, []UniqueID{12}, resp.GetChannels()[0].GetFlushedSegmentIds()) + }) + t.Run("with closed server", func(t *testing.T) { svr := newTestServer(t, nil) closeTestServer(t, svr) -- GitLab