diff --git a/internal/datacoord/compaction_trigger.go b/internal/datacoord/compaction_trigger.go index 38888d46b96be6641974973912e6095ea0dfe405..ed8c5ac9257ffdc440911f0e5841e254e514d5aa 100644 --- a/internal/datacoord/compaction_trigger.go +++ b/internal/datacoord/compaction_trigger.go @@ -365,8 +365,6 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) { break } - group.segments = FilterInIndexedSegments(t.handler, t.indexCoord, group.segments...) - isDiskIndex, err := t.updateSegmentMaxSize(group.segments) if err != nil { log.Warn("failed to update segment max size", zap.Error(err)) diff --git a/internal/datacoord/compaction_trigger_test.go b/internal/datacoord/compaction_trigger_test.go index b635fb02ab3be222fbb8931d2c5bbd70a72906fb..2379f793244c52086e0dde6388bb3b46c50a50cc 100644 --- a/internal/datacoord/compaction_trigger_test.go +++ b/internal/datacoord/compaction_trigger_test.go @@ -438,7 +438,7 @@ func Test_compactionTrigger_force(t *testing.T) { case <-time.After(2 * time.Second): hasPlan = false } - assert.Equal(t, false, hasPlan) + assert.Equal(t, true, hasPlan) }) t.Run(tt.name+" with meta error", func(t *testing.T) { @@ -1033,13 +1033,8 @@ func Test_compactionTrigger_noplan(t *testing.T) { err := tr.triggerCompaction() assert.Equal(t, tt.wantErr, err != nil) spy := (tt.fields.compactionHandler).(*spyCompactionHandler) - select { - case val := <-spy.spyChan: - assert.Fail(t, "we expect no compaction generated", val) - return - case <-time.After(3 * time.Second): - return - } + plan := <-spy.spyChan + assert.Equal(t, len(plan.SegmentBinlogs), 4) }) } } diff --git a/internal/datacoord/handler.go b/internal/datacoord/handler.go index 8d6bf0787d7ca10511375127a9968dd6bdccd649..9b384b82d363fc1ea7e24bb4fc9d0c72345d2dda 100644 --- a/internal/datacoord/handler.go +++ b/internal/datacoord/handler.go @@ -137,14 +137,24 @@ func (h *ServerHandler) GetQueryVChanPositions(channel *channel, partitionID Uni unIndexedIDs.Insert(s.GetID()) } } - for id := range unIndexedIDs { - // Indexed segments are compacted to a raw segment, - // replace it with the indexed ones - if len(segmentInfos[id].GetCompactionFrom()) > 0 && - indexed.Contain(segmentInfos[id].GetCompactionFrom()...) { - unIndexedIDs.Remove(id) - indexedIDs.Insert(segmentInfos[id].GetCompactionFrom()...) - droppedIDs.Remove(segmentInfos[id].GetCompactionFrom()...) + hasUnIndexed := true + for hasUnIndexed { + hasUnIndexed = false + for id := range unIndexedIDs { + // Indexed segments are compacted to a raw segment, + // replace it with the indexed ones + if len(segmentInfos[id].GetCompactionFrom()) > 0 { + unIndexedIDs.Remove(id) + for _, segID := range segmentInfos[id].GetCompactionFrom() { + if indexed.Contain(segID) { + indexedIDs.Insert(segID) + } else { + unIndexedIDs.Insert(segID) + hasUnIndexed = true + } + } + droppedIDs.Remove(segmentInfos[id].GetCompactionFrom()...) + } } } diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index 011c5780c1d34793f89cd9131e56482998b89aea..136ae49b8c9abe0ad8eed06be5ca0bfb50c6bed7 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -2644,6 +2644,75 @@ func TestGetRecoveryInfo(t *testing.T) { assert.Equal(t, UniqueID(8), resp.GetChannels()[0].GetDroppedSegmentIds()[0]) }) + t.Run("with continuous compaction", func(t *testing.T) { + svr := newTestServer(t, nil) + defer closeTestServer(t, svr) + + svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdCli *clientv3.Client) (types.RootCoord, error) { + return newMockRootCoordService(), nil + } + + svr.meta.AddCollection(&collectionInfo{ + ID: 0, + Schema: newTestSchema(), + }) + + err := svr.meta.UpdateChannelCheckpoint("vchan1", &internalpb.MsgPosition{ + ChannelName: "vchan1", + Timestamp: 0, + }) + assert.NoError(t, err) + + seg1 := createSegment(9, 0, 0, 100, 30, "vchan1", commonpb.SegmentState_Dropped) + seg2 := createSegment(10, 0, 0, 100, 40, "vchan1", commonpb.SegmentState_Dropped) + seg3 := createSegment(11, 0, 0, 100, 40, "vchan1", commonpb.SegmentState_Dropped) + seg3.CompactionFrom = []int64{9, 10} + seg4 := createSegment(12, 0, 0, 100, 40, "vchan1", commonpb.SegmentState_Dropped) + seg5 := createSegment(13, 0, 0, 100, 40, "vchan1", commonpb.SegmentState_Flushed) + seg5.CompactionFrom = []int64{11, 12} + err = svr.meta.AddSegment(NewSegmentInfo(seg1)) + assert.Nil(t, err) + err = svr.meta.AddSegment(NewSegmentInfo(seg2)) + assert.Nil(t, err) + err = svr.meta.AddSegment(NewSegmentInfo(seg3)) + assert.Nil(t, err) + err = svr.meta.AddSegment(NewSegmentInfo(seg4)) + assert.Nil(t, err) + err = svr.meta.AddSegment(NewSegmentInfo(seg5)) + assert.Nil(t, err) + mockResp := &indexpb.GetIndexInfoResponse{ + Status: &commonpb.Status{}, + SegmentInfo: map[int64]*indexpb.SegmentInfo{ + seg4.ID: { + CollectionID: seg4.CollectionID, + SegmentID: seg4.ID, + EnableIndex: true, + IndexInfos: []*indexpb.IndexFilePathInfo{ + { + SegmentID: seg4.ID, + FieldID: 2, + }, + }, + }, + }, + } + svr.indexCoord = mocks.NewMockIndexCoord(t) + svr.indexCoord.(*mocks.MockIndexCoord).EXPECT().GetIndexInfos(mock.Anything, mock.Anything).Return(mockResp, nil) + + req := &datapb.GetRecoveryInfoRequest{ + CollectionID: 0, + PartitionID: 0, + } + resp, err := svr.GetRecoveryInfo(context.TODO(), req) + assert.Nil(t, err) + assert.EqualValues(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + assert.NotNil(t, resp.GetChannels()[0].SeekPosition) + assert.NotEqual(t, 0, resp.GetChannels()[0].GetSeekPosition().GetTimestamp()) + assert.Len(t, resp.GetChannels()[0].GetDroppedSegmentIds(), 0) + assert.ElementsMatch(t, []UniqueID{9, 10}, resp.GetChannels()[0].GetUnflushedSegmentIds()) + assert.ElementsMatch(t, []UniqueID{12}, resp.GetChannels()[0].GetFlushedSegmentIds()) + }) + t.Run("with closed server", func(t *testing.T) { svr := newTestServer(t, nil) closeTestServer(t, svr)