未验证 提交 0cdbf0ec 编写于 作者: C congqixia 提交者: GitHub

Make datacoord handle GetIndexedSegment error from indexcoord (#22673)

Signed-off-by: NCongqi Xia <congqi.xia@zilliz.com>
上级 2c0fd23b
......@@ -630,7 +630,11 @@ func reverseGreedySelect(candidates []*SegmentInfo, free int64, maxSegment int)
func (t *compactionTrigger) getCandidateSegments(channel string, partitionID UniqueID) []*SegmentInfo {
segments := t.meta.GetSegmentsByChannel(channel)
segments = FilterInIndexedSegments(t.handler, t.indexCoord, segments...)
segments, err := FilterInIndexedSegments(t.handler, t.indexCoord, segments...)
if err != nil {
log.Warn("failed to get indexed segments getting compaction candidates", zap.Error(err))
return nil
}
var res []*SegmentInfo
for _, s := range segments {
if !isSegmentHealthy(s) ||
......
......@@ -265,7 +265,11 @@ func (gc *garbageCollector) clearEtcd() {
droppedCompactTo[to] = struct{}{}
}
}
indexedSegments := FilterInIndexedSegments(gc.handler, gc.indexCoord, lo.Keys(droppedCompactTo)...)
indexedSegments, err := FilterInIndexedSegments(gc.handler, gc.indexCoord, lo.Keys(droppedCompactTo)...)
if err != nil {
log.Warn("failed to get indexed segments doing garbage collection", zap.Error(err))
return
}
indexedSet := make(typeutil.UniqueSet)
for _, segment := range indexedSegments {
indexedSet.Insert(segment.GetID())
......
......@@ -32,7 +32,7 @@ import (
// Handler handles some channel method for ChannelManager
type Handler interface {
// GetQueryVChanPositions gets the information recovery needed of a channel for QueryCoord
GetQueryVChanPositions(channel *channel, partitionID UniqueID) *datapb.VchannelInfo
GetQueryVChanPositions(channel *channel, partitionID UniqueID) (*datapb.VchannelInfo, error)
// GetDataVChanPositions gets the information recovery needed of a channel for DataNode
GetDataVChanPositions(channel *channel, partitionID UniqueID) *datapb.VchannelInfo
CheckShouldDropChannel(channel string) bool
......@@ -98,13 +98,17 @@ func (h *ServerHandler) GetDataVChanPositions(channel *channel, partitionID Uniq
// GetQueryVChanPositions gets vchannel latest postitions with provided dml channel names for QueryCoord,
// we expect QueryCoord gets the indexed segments to load, so the flushed segments below are actually the indexed segments,
// the unflushed segments are actually the segments without index, even they are flushed.
func (h *ServerHandler) GetQueryVChanPositions(channel *channel, partitionID UniqueID) *datapb.VchannelInfo {
func (h *ServerHandler) GetQueryVChanPositions(channel *channel, partitionID UniqueID) (*datapb.VchannelInfo, error) {
// cannot use GetSegmentsByChannel since dropped segments are needed here
segments := h.s.meta.SelectSegments(func(s *SegmentInfo) bool {
return s.InsertChannel == channel.Name
})
segmentInfos := make(map[int64]*SegmentInfo)
indexedSegments := FilterInIndexedSegments(h, h.s.indexCoord, segments...)
indexedSegments, err := FilterInIndexedSegments(h, h.s.indexCoord, segments...)
if err != nil {
log.Warn("filter indexed segment failed", zap.String("channel", channel.Name), zap.Error(err))
return nil, err
}
indexed := make(typeutil.UniqueSet)
for _, segment := range indexedSegments {
indexed.Insert(segment.GetID())
......@@ -165,7 +169,7 @@ func (h *ServerHandler) GetQueryVChanPositions(channel *channel, partitionID Uni
FlushedSegmentIds: indexedIDs.Collect(),
UnflushedSegmentIds: unIndexedIDs.Collect(),
DroppedSegmentIds: droppedIDs.Collect(),
}
}, nil
}
// getEarliestSegmentDMLPos returns the earliest dml position of segments,
......
......@@ -830,11 +830,11 @@ func newMockHandler() *mockHandler {
return &mockHandler{}
}
func (h *mockHandler) GetQueryVChanPositions(channel *channel, partitionID UniqueID) *datapb.VchannelInfo {
func (h *mockHandler) GetQueryVChanPositions(channel *channel, partitionID UniqueID) (*datapb.VchannelInfo, error) {
return &datapb.VchannelInfo{
CollectionID: channel.CollectionID,
ChannelName: channel.Name,
}
}, nil
}
func (h *mockHandler) GetDataVChanPositions(channel *channel, partitionID UniqueID) *datapb.VchannelInfo {
......
......@@ -2307,13 +2307,15 @@ func TestGetQueryVChanPositions(t *testing.T) {
svr.indexCoord.(*mocks.MockIndexCoord).EXPECT().GetIndexInfos(mock.Anything, mock.Anything).Return(mockResp, nil)
t.Run("get unexisted channel", func(t *testing.T) {
vchan := svr.handler.GetQueryVChanPositions(&channel{Name: "chx1", CollectionID: 0}, allPartitionID)
vchan, err := svr.handler.GetQueryVChanPositions(&channel{Name: "chx1", CollectionID: 0}, allPartitionID)
assert.NoError(t, err)
assert.Empty(t, vchan.UnflushedSegmentIds)
assert.Empty(t, vchan.FlushedSegmentIds)
})
t.Run("get existed channel", func(t *testing.T) {
vchan := svr.handler.GetQueryVChanPositions(&channel{Name: "ch1", CollectionID: 0}, allPartitionID)
vchan, err := svr.handler.GetQueryVChanPositions(&channel{Name: "ch1", CollectionID: 0}, allPartitionID)
assert.NoError(t, err)
assert.EqualValues(t, 1, len(vchan.FlushedSegmentIds))
assert.EqualValues(t, 1, vchan.FlushedSegmentIds[0])
assert.EqualValues(t, 2, len(vchan.UnflushedSegmentIds))
......@@ -2321,14 +2323,16 @@ func TestGetQueryVChanPositions(t *testing.T) {
})
t.Run("empty collection", func(t *testing.T) {
infos := svr.handler.GetQueryVChanPositions(&channel{Name: "ch0_suffix", CollectionID: 1}, allPartitionID)
infos, err := svr.handler.GetQueryVChanPositions(&channel{Name: "ch0_suffix", CollectionID: 1}, allPartitionID)
assert.NoError(t, err)
assert.EqualValues(t, 1, infos.CollectionID)
assert.EqualValues(t, 0, len(infos.FlushedSegmentIds))
assert.EqualValues(t, 0, len(infos.UnflushedSegmentIds))
})
t.Run("filter partition", func(t *testing.T) {
infos := svr.handler.GetQueryVChanPositions(&channel{Name: "ch1", CollectionID: 0}, 1)
infos, err := svr.handler.GetQueryVChanPositions(&channel{Name: "ch1", CollectionID: 0}, 1)
assert.NoError(t, err)
assert.EqualValues(t, 0, infos.CollectionID)
assert.EqualValues(t, 0, len(infos.FlushedSegmentIds))
assert.EqualValues(t, 1, len(infos.UnflushedSegmentIds))
......@@ -2337,11 +2341,13 @@ func TestGetQueryVChanPositions(t *testing.T) {
t.Run("empty collection with passed positions", func(t *testing.T) {
vchannel := "ch_no_segment_1"
pchannel := funcutil.ToPhysicalChannel(vchannel)
infos := svr.handler.GetQueryVChanPositions(&channel{
infos, err := svr.handler.GetQueryVChanPositions(&channel{
Name: vchannel,
CollectionID: 0,
StartPositions: []*commonpb.KeyDataPair{{Key: pchannel, Data: []byte{14, 15, 16}}},
}, allPartitionID)
assert.NoError(t, err)
assert.EqualValues(t, 0, infos.CollectionID)
assert.EqualValues(t, vchannel, infos.ChannelName)
})
......@@ -2351,7 +2357,8 @@ func TestGetQueryVChanPositions(t *testing.T) {
svr.indexCoord.(*mocks.MockIndexCoord).EXPECT().GetIndexInfos(mock.Anything, mock.Anything).Return(
&indexpb.GetIndexInfoResponse{Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}}, nil)
vchan := svr.handler.GetQueryVChanPositions(&channel{Name: "ch1", CollectionID: 0}, allPartitionID)
vchan, err := svr.handler.GetQueryVChanPositions(&channel{Name: "ch1", CollectionID: 0}, allPartitionID)
assert.NoError(t, err)
assert.EqualValues(t, 0, len(vchan.FlushedSegmentIds))
assert.EqualValues(t, 3, len(vchan.UnflushedSegmentIds))
assert.ElementsMatch(t, []int64{s1.ID, s2.ID, s3.ID}, vchan.UnflushedSegmentIds)
......
......@@ -637,7 +637,11 @@ func (s *Server) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInf
channelInfos := make([]*datapb.VchannelInfo, 0, len(channels))
flushedIDs := make(typeutil.UniqueSet)
for _, c := range channels {
channelInfo := s.handler.GetQueryVChanPositions(&channel{Name: c, CollectionID: collectionID}, partitionID)
channelInfo, err := s.handler.GetQueryVChanPositions(&channel{Name: c, CollectionID: collectionID}, partitionID)
if err != nil {
resp.Status.Reason = err.Error()
return resp, nil
}
channelInfos = append(channelInfos, channelInfo)
log.Info("datacoord append channelInfo in GetRecoveryInfo",
zap.Any("channelInfo", channelInfo),
......
......@@ -19,11 +19,12 @@ package datacoord
import (
"context"
"errors"
"fmt"
"strconv"
"sync"
"time"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus-proto/go-api/schemapb"
......@@ -103,9 +104,9 @@ func GetCompactTime(ctx context.Context, allocator allocator) (*compactTime, err
return &compactTime{ttRetentionLogic, 0, 0}, nil
}
func FilterInIndexedSegments(handler Handler, indexCoord types.IndexCoord, segments ...*SegmentInfo) []*SegmentInfo {
func FilterInIndexedSegments(handler Handler, indexCoord types.IndexCoord, segments ...*SegmentInfo) ([]*SegmentInfo, error) {
if len(segments) == 0 {
return nil
return nil, nil
}
segmentMap := make(map[int64]*SegmentInfo)
......@@ -124,7 +125,7 @@ func FilterInIndexedSegments(handler Handler, indexCoord types.IndexCoord, segme
cancel()
if err != nil {
log.Warn("failed to get collection schema", zap.Error(err))
continue
return nil, err
}
for _, field := range coll.Schema.GetFields() {
if field.GetDataType() == schemapb.DataType_BinaryVector ||
......@@ -135,33 +136,50 @@ func FilterInIndexedSegments(handler Handler, indexCoord types.IndexCoord, segme
}
}
wg := sync.WaitGroup{}
indexedSegmentCh := make(chan []int64, len(segments))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
group, ctx := errgroup.WithContext(ctx)
for _, segment := range segments {
segment := segment
wg.Add(1)
go func() {
defer wg.Done()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
group.Go(func() error {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
resp, err := indexCoord.GetIndexInfos(ctx, &indexpb.GetIndexInfoRequest{
CollectionID: segment.GetCollectionID(),
SegmentIDs: []int64{segment.GetID()},
})
if err != nil || resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
if err != nil {
log.Warn("failed to get index of collection",
zap.Int64("collectionID", segment.GetCollectionID()),
zap.Int64("segmentID", segment.GetID()))
return
zap.Int64("segmentID", segment.GetID()),
zap.Error(err),
)
return err
}
if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
log.Warn("indexcoord return error when get index",
zap.Int64("collectionID", segment.GetCollectionID()),
zap.Int64("segmentID", segment.GetID()),
zap.String("status", resp.GetStatus().String()),
zap.String("reason", resp.GetStatus().GetReason()),
)
return fmt.Errorf("GetIndex failed for segment %d, status: %s, reason: %s", segment.GetID(), resp.GetStatus().GetErrorCode().String(), resp.GetStatus().GetReason())
}
indexed := extractSegmentsWithVectorIndex(vecFieldID, resp.GetSegmentInfo())
if len(indexed) == 0 {
return
if len(indexed) > 0 {
indexedSegmentCh <- indexed
}
indexedSegmentCh <- indexed
}()
return nil
})
}
wg.Wait()
err := group.Wait()
if err != nil {
return nil, err
}
close(indexedSegmentCh)
indexedSegments := make([]*SegmentInfo, 0)
......@@ -174,7 +192,7 @@ func FilterInIndexedSegments(handler Handler, indexCoord types.IndexCoord, segme
}
}
return indexedSegments
return indexedSegments, nil
}
func extractSegmentsWithVectorIndex(vecFieldID map[int64]int64, segentIndexInfo map[int64]*indexpb.SegmentInfo) []int64 {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册