From 0cdbf0ec7169f71a5b3fbdf1516da2811c158489 Mon Sep 17 00:00:00 2001 From: congqixia Date: Fri, 10 Mar 2023 10:57:53 +0800 Subject: [PATCH] Make datacoord handle GetIndexedSegment error from indexcoord (#22673) Signed-off-by: Congqi Xia --- internal/datacoord/compaction_trigger.go | 6 ++- internal/datacoord/garbage_collector.go | 6 ++- internal/datacoord/handler.go | 12 ++++-- internal/datacoord/mock_test.go | 4 +- internal/datacoord/server_test.go | 19 ++++++--- internal/datacoord/services.go | 6 ++- internal/datacoord/util.go | 54 ++++++++++++++++-------- 7 files changed, 74 insertions(+), 33 deletions(-) diff --git a/internal/datacoord/compaction_trigger.go b/internal/datacoord/compaction_trigger.go index ca958a276..79eb5097c 100644 --- a/internal/datacoord/compaction_trigger.go +++ b/internal/datacoord/compaction_trigger.go @@ -630,7 +630,11 @@ func reverseGreedySelect(candidates []*SegmentInfo, free int64, maxSegment int) func (t *compactionTrigger) getCandidateSegments(channel string, partitionID UniqueID) []*SegmentInfo { segments := t.meta.GetSegmentsByChannel(channel) - segments = FilterInIndexedSegments(t.handler, t.indexCoord, segments...) + segments, err := FilterInIndexedSegments(t.handler, t.indexCoord, segments...) + if err != nil { + log.Warn("failed to get indexed segments getting compaction candidates", zap.Error(err)) + return nil + } var res []*SegmentInfo for _, s := range segments { if !isSegmentHealthy(s) || diff --git a/internal/datacoord/garbage_collector.go b/internal/datacoord/garbage_collector.go index 5f854995b..e293e483a 100644 --- a/internal/datacoord/garbage_collector.go +++ b/internal/datacoord/garbage_collector.go @@ -265,7 +265,11 @@ func (gc *garbageCollector) clearEtcd() { droppedCompactTo[to] = struct{}{} } } - indexedSegments := FilterInIndexedSegments(gc.handler, gc.indexCoord, lo.Keys(droppedCompactTo)...) + indexedSegments, err := FilterInIndexedSegments(gc.handler, gc.indexCoord, lo.Keys(droppedCompactTo)...) + if err != nil { + log.Warn("failed to get indexed segments doing garbage collection", zap.Error(err)) + return + } indexedSet := make(typeutil.UniqueSet) for _, segment := range indexedSegments { indexedSet.Insert(segment.GetID()) diff --git a/internal/datacoord/handler.go b/internal/datacoord/handler.go index a700aac23..073b8201a 100644 --- a/internal/datacoord/handler.go +++ b/internal/datacoord/handler.go @@ -32,7 +32,7 @@ import ( // Handler handles some channel method for ChannelManager type Handler interface { // GetQueryVChanPositions gets the information recovery needed of a channel for QueryCoord - GetQueryVChanPositions(channel *channel, partitionID UniqueID) *datapb.VchannelInfo + GetQueryVChanPositions(channel *channel, partitionID UniqueID) (*datapb.VchannelInfo, error) // GetDataVChanPositions gets the information recovery needed of a channel for DataNode GetDataVChanPositions(channel *channel, partitionID UniqueID) *datapb.VchannelInfo CheckShouldDropChannel(channel string) bool @@ -98,13 +98,17 @@ func (h *ServerHandler) GetDataVChanPositions(channel *channel, partitionID Uniq // GetQueryVChanPositions gets vchannel latest postitions with provided dml channel names for QueryCoord, // we expect QueryCoord gets the indexed segments to load, so the flushed segments below are actually the indexed segments, // the unflushed segments are actually the segments without index, even they are flushed. -func (h *ServerHandler) GetQueryVChanPositions(channel *channel, partitionID UniqueID) *datapb.VchannelInfo { +func (h *ServerHandler) GetQueryVChanPositions(channel *channel, partitionID UniqueID) (*datapb.VchannelInfo, error) { // cannot use GetSegmentsByChannel since dropped segments are needed here segments := h.s.meta.SelectSegments(func(s *SegmentInfo) bool { return s.InsertChannel == channel.Name }) segmentInfos := make(map[int64]*SegmentInfo) - indexedSegments := FilterInIndexedSegments(h, h.s.indexCoord, segments...) + indexedSegments, err := FilterInIndexedSegments(h, h.s.indexCoord, segments...) + if err != nil { + log.Warn("filter indexed segment failed", zap.String("channel", channel.Name), zap.Error(err)) + return nil, err + } indexed := make(typeutil.UniqueSet) for _, segment := range indexedSegments { indexed.Insert(segment.GetID()) @@ -165,7 +169,7 @@ func (h *ServerHandler) GetQueryVChanPositions(channel *channel, partitionID Uni FlushedSegmentIds: indexedIDs.Collect(), UnflushedSegmentIds: unIndexedIDs.Collect(), DroppedSegmentIds: droppedIDs.Collect(), - } + }, nil } // getEarliestSegmentDMLPos returns the earliest dml position of segments, diff --git a/internal/datacoord/mock_test.go b/internal/datacoord/mock_test.go index a22321ac6..1ed30fd87 100644 --- a/internal/datacoord/mock_test.go +++ b/internal/datacoord/mock_test.go @@ -830,11 +830,11 @@ func newMockHandler() *mockHandler { return &mockHandler{} } -func (h *mockHandler) GetQueryVChanPositions(channel *channel, partitionID UniqueID) *datapb.VchannelInfo { +func (h *mockHandler) GetQueryVChanPositions(channel *channel, partitionID UniqueID) (*datapb.VchannelInfo, error) { return &datapb.VchannelInfo{ CollectionID: channel.CollectionID, ChannelName: channel.Name, - } + }, nil } func (h *mockHandler) GetDataVChanPositions(channel *channel, partitionID UniqueID) *datapb.VchannelInfo { diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index 36f9a459f..69827a738 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -2307,13 +2307,15 @@ func TestGetQueryVChanPositions(t *testing.T) { svr.indexCoord.(*mocks.MockIndexCoord).EXPECT().GetIndexInfos(mock.Anything, mock.Anything).Return(mockResp, nil) t.Run("get unexisted channel", func(t *testing.T) { - vchan := svr.handler.GetQueryVChanPositions(&channel{Name: "chx1", CollectionID: 0}, allPartitionID) + vchan, err := svr.handler.GetQueryVChanPositions(&channel{Name: "chx1", CollectionID: 0}, allPartitionID) + assert.NoError(t, err) assert.Empty(t, vchan.UnflushedSegmentIds) assert.Empty(t, vchan.FlushedSegmentIds) }) t.Run("get existed channel", func(t *testing.T) { - vchan := svr.handler.GetQueryVChanPositions(&channel{Name: "ch1", CollectionID: 0}, allPartitionID) + vchan, err := svr.handler.GetQueryVChanPositions(&channel{Name: "ch1", CollectionID: 0}, allPartitionID) + assert.NoError(t, err) assert.EqualValues(t, 1, len(vchan.FlushedSegmentIds)) assert.EqualValues(t, 1, vchan.FlushedSegmentIds[0]) assert.EqualValues(t, 2, len(vchan.UnflushedSegmentIds)) @@ -2321,14 +2323,16 @@ func TestGetQueryVChanPositions(t *testing.T) { }) t.Run("empty collection", func(t *testing.T) { - infos := svr.handler.GetQueryVChanPositions(&channel{Name: "ch0_suffix", CollectionID: 1}, allPartitionID) + infos, err := svr.handler.GetQueryVChanPositions(&channel{Name: "ch0_suffix", CollectionID: 1}, allPartitionID) + assert.NoError(t, err) assert.EqualValues(t, 1, infos.CollectionID) assert.EqualValues(t, 0, len(infos.FlushedSegmentIds)) assert.EqualValues(t, 0, len(infos.UnflushedSegmentIds)) }) t.Run("filter partition", func(t *testing.T) { - infos := svr.handler.GetQueryVChanPositions(&channel{Name: "ch1", CollectionID: 0}, 1) + infos, err := svr.handler.GetQueryVChanPositions(&channel{Name: "ch1", CollectionID: 0}, 1) + assert.NoError(t, err) assert.EqualValues(t, 0, infos.CollectionID) assert.EqualValues(t, 0, len(infos.FlushedSegmentIds)) assert.EqualValues(t, 1, len(infos.UnflushedSegmentIds)) @@ -2337,11 +2341,13 @@ func TestGetQueryVChanPositions(t *testing.T) { t.Run("empty collection with passed positions", func(t *testing.T) { vchannel := "ch_no_segment_1" pchannel := funcutil.ToPhysicalChannel(vchannel) - infos := svr.handler.GetQueryVChanPositions(&channel{ + infos, err := svr.handler.GetQueryVChanPositions(&channel{ Name: vchannel, CollectionID: 0, StartPositions: []*commonpb.KeyDataPair{{Key: pchannel, Data: []byte{14, 15, 16}}}, }, allPartitionID) + + assert.NoError(t, err) assert.EqualValues(t, 0, infos.CollectionID) assert.EqualValues(t, vchannel, infos.ChannelName) }) @@ -2351,7 +2357,8 @@ func TestGetQueryVChanPositions(t *testing.T) { svr.indexCoord.(*mocks.MockIndexCoord).EXPECT().GetIndexInfos(mock.Anything, mock.Anything).Return( &indexpb.GetIndexInfoResponse{Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}}, nil) - vchan := svr.handler.GetQueryVChanPositions(&channel{Name: "ch1", CollectionID: 0}, allPartitionID) + vchan, err := svr.handler.GetQueryVChanPositions(&channel{Name: "ch1", CollectionID: 0}, allPartitionID) + assert.NoError(t, err) assert.EqualValues(t, 0, len(vchan.FlushedSegmentIds)) assert.EqualValues(t, 3, len(vchan.UnflushedSegmentIds)) assert.ElementsMatch(t, []int64{s1.ID, s2.ID, s3.ID}, vchan.UnflushedSegmentIds) diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index acf3dcf0e..4664ebb85 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -637,7 +637,11 @@ func (s *Server) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInf channelInfos := make([]*datapb.VchannelInfo, 0, len(channels)) flushedIDs := make(typeutil.UniqueSet) for _, c := range channels { - channelInfo := s.handler.GetQueryVChanPositions(&channel{Name: c, CollectionID: collectionID}, partitionID) + channelInfo, err := s.handler.GetQueryVChanPositions(&channel{Name: c, CollectionID: collectionID}, partitionID) + if err != nil { + resp.Status.Reason = err.Error() + return resp, nil + } channelInfos = append(channelInfos, channelInfo) log.Info("datacoord append channelInfo in GetRecoveryInfo", zap.Any("channelInfo", channelInfo), diff --git a/internal/datacoord/util.go b/internal/datacoord/util.go index 445c3e28c..aea20f358 100644 --- a/internal/datacoord/util.go +++ b/internal/datacoord/util.go @@ -19,11 +19,12 @@ package datacoord import ( "context" "errors" + "fmt" "strconv" - "sync" "time" "go.uber.org/zap" + "golang.org/x/sync/errgroup" "github.com/milvus-io/milvus-proto/go-api/commonpb" "github.com/milvus-io/milvus-proto/go-api/schemapb" @@ -103,9 +104,9 @@ func GetCompactTime(ctx context.Context, allocator allocator) (*compactTime, err return &compactTime{ttRetentionLogic, 0, 0}, nil } -func FilterInIndexedSegments(handler Handler, indexCoord types.IndexCoord, segments ...*SegmentInfo) []*SegmentInfo { +func FilterInIndexedSegments(handler Handler, indexCoord types.IndexCoord, segments ...*SegmentInfo) ([]*SegmentInfo, error) { if len(segments) == 0 { - return nil + return nil, nil } segmentMap := make(map[int64]*SegmentInfo) @@ -124,7 +125,7 @@ func FilterInIndexedSegments(handler Handler, indexCoord types.IndexCoord, segme cancel() if err != nil { log.Warn("failed to get collection schema", zap.Error(err)) - continue + return nil, err } for _, field := range coll.Schema.GetFields() { if field.GetDataType() == schemapb.DataType_BinaryVector || @@ -135,33 +136,50 @@ func FilterInIndexedSegments(handler Handler, indexCoord types.IndexCoord, segme } } - wg := sync.WaitGroup{} indexedSegmentCh := make(chan []int64, len(segments)) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + group, ctx := errgroup.WithContext(ctx) for _, segment := range segments { segment := segment - wg.Add(1) - go func() { - defer wg.Done() - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + + group.Go(func() error { + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() + resp, err := indexCoord.GetIndexInfos(ctx, &indexpb.GetIndexInfoRequest{ CollectionID: segment.GetCollectionID(), SegmentIDs: []int64{segment.GetID()}, }) - if err != nil || resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { + if err != nil { log.Warn("failed to get index of collection", zap.Int64("collectionID", segment.GetCollectionID()), - zap.Int64("segmentID", segment.GetID())) - return + zap.Int64("segmentID", segment.GetID()), + zap.Error(err), + ) + return err + } + if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { + log.Warn("indexcoord return error when get index", + zap.Int64("collectionID", segment.GetCollectionID()), + zap.Int64("segmentID", segment.GetID()), + zap.String("status", resp.GetStatus().String()), + zap.String("reason", resp.GetStatus().GetReason()), + ) + return fmt.Errorf("GetIndex failed for segment %d, status: %s, reason: %s", segment.GetID(), resp.GetStatus().GetErrorCode().String(), resp.GetStatus().GetReason()) } indexed := extractSegmentsWithVectorIndex(vecFieldID, resp.GetSegmentInfo()) - if len(indexed) == 0 { - return + if len(indexed) > 0 { + indexedSegmentCh <- indexed } - indexedSegmentCh <- indexed - }() + return nil + }) } - wg.Wait() + err := group.Wait() + if err != nil { + return nil, err + } + close(indexedSegmentCh) indexedSegments := make([]*SegmentInfo, 0) @@ -174,7 +192,7 @@ func FilterInIndexedSegments(handler Handler, indexCoord types.IndexCoord, segme } } - return indexedSegments + return indexedSegments, nil } func extractSegmentsWithVectorIndex(vecFieldID map[int64]int64, segentIndexInfo map[int64]*indexpb.SegmentInfo) []int64 { -- GitLab