未验证 提交 54c0e640 编写于 作者: B Bingyi Sun 提交者: GitHub

Fix search on empty segments set bug (#26136)

Signed-off-by: Nsunby <sunbingyi1992@gmail.com>
上级 703f0a18
......@@ -175,13 +175,6 @@ func (o *LeaderObserver) findNeedLoadedSegments(leaderView *meta.LeaderView, dis
continue
}
readableVersion := int64(0)
if existInCurrentTarget {
readableVersion = o.target.GetCollectionTargetVersion(s.CollectionID, meta.CurrentTarget)
} else {
readableVersion = o.target.GetCollectionTargetVersion(s.CollectionID, meta.NextTarget)
}
if !ok || version.GetVersion() < s.Version { // Leader misses this segment
ctx := context.Background()
resp, err := o.broker.GetSegmentInfo(ctx, s.GetID())
......@@ -189,7 +182,7 @@ func (o *LeaderObserver) findNeedLoadedSegments(leaderView *meta.LeaderView, dis
log.Warn("failed to get segment info from DataCoord", zap.Error(err))
continue
}
loadInfo := utils.PackSegmentLoadInfo(resp, nil, readableVersion)
loadInfo := utils.PackSegmentLoadInfo(resp, nil)
ret = append(ret, &querypb.SyncAction{
Type: querypb.SyncType_Set,
......
......@@ -127,7 +127,7 @@ func (suite *LeaderObserverTestSuite) TestSyncLoadedSegments() {
view := utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{})
view.TargetVersion = observer.target.GetCollectionTargetVersion(1, meta.CurrentTarget)
observer.dist.LeaderViewManager.Update(2, view)
loadInfo := utils.PackSegmentLoadInfo(resp, nil, view.TargetVersion)
loadInfo := utils.PackSegmentLoadInfo(resp, nil)
expectReqeustFunc := func(version int64) *querypb.SyncDistributionRequest {
return &querypb.SyncDistributionRequest{
......@@ -218,7 +218,7 @@ func (suite *LeaderObserverTestSuite) TestIgnoreSyncLoadedSegments() {
view := utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{})
view.TargetVersion = observer.target.GetCollectionTargetVersion(1, meta.CurrentTarget)
observer.dist.LeaderViewManager.Update(2, view)
loadInfo := utils.PackSegmentLoadInfo(resp, nil, view.TargetVersion)
loadInfo := utils.PackSegmentLoadInfo(resp, nil)
expectReqeustFunc := func(version int64) *querypb.SyncDistributionRequest {
return &querypb.SyncDistributionRequest{
......@@ -352,7 +352,7 @@ func (suite *LeaderObserverTestSuite) TestSyncLoadedSegmentsWithReplicas() {
view2 := utils.CreateTestLeaderView(4, 1, "test-insert-channel", map[int64]int64{1: 4}, map[int64]*meta.Segment{})
view.TargetVersion = observer.target.GetCollectionTargetVersion(1, meta.CurrentTarget)
observer.dist.LeaderViewManager.Update(4, view2)
loadInfo := utils.PackSegmentLoadInfo(resp, nil, view.TargetVersion)
loadInfo := utils.PackSegmentLoadInfo(resp, nil)
expectReqeustFunc := func(version int64) *querypb.SyncDistributionRequest {
return &querypb.SyncDistributionRequest{
......
......@@ -273,14 +273,7 @@ func (ex *Executor) loadSegment(task *SegmentTask, step int) error {
indexes = nil
}
readableVersion := int64(0)
switch GetTaskType(task) {
case TaskTypeGrow:
readableVersion = ex.targetMgr.GetCollectionTargetVersion(task.CollectionID(), meta.NextTarget)
case TaskTypeMove, TaskTypeUpdate:
readableVersion = ex.targetMgr.GetCollectionTargetVersion(task.CollectionID(), meta.CurrentTarget)
}
loadInfo := utils.PackSegmentLoadInfo(resp, indexes, readableVersion)
loadInfo := utils.PackSegmentLoadInfo(resp, indexes)
// Get shard leader for the given replica and segment
leader, ok := getShardLeader(ex.meta.ReplicaManager, ex.dist, task.CollectionID(), action.Node(), segment.GetInsertChannel())
......
......@@ -64,7 +64,7 @@ func MergeMetaSegmentIntoSegmentInfo(info *querypb.SegmentInfo, segments ...*met
// packSegmentLoadInfo packs SegmentLoadInfo for given segment,
// packs with index if withIndex is true, this fetch indexes from IndexCoord
func PackSegmentLoadInfo(resp *datapb.GetSegmentInfoResponse, indexes []*querypb.FieldIndexInfo, readableVersion int64) *querypb.SegmentLoadInfo {
func PackSegmentLoadInfo(resp *datapb.GetSegmentInfoResponse, indexes []*querypb.FieldIndexInfo) *querypb.SegmentLoadInfo {
var (
deltaPosition *msgpb.MsgPosition
positionSrc string
......@@ -96,18 +96,17 @@ func PackSegmentLoadInfo(resp *datapb.GetSegmentInfoResponse, indexes []*querypb
zap.Duration("tsLag", tsLag))
}
loadInfo := &querypb.SegmentLoadInfo{
SegmentID: segment.ID,
PartitionID: segment.PartitionID,
CollectionID: segment.CollectionID,
BinlogPaths: segment.Binlogs,
NumOfRows: segment.NumOfRows,
Statslogs: segment.Statslogs,
Deltalogs: segment.Deltalogs,
InsertChannel: segment.InsertChannel,
IndexInfos: indexes,
StartPosition: segment.GetStartPosition(),
DeltaPosition: deltaPosition,
ReadableVersion: readableVersion,
SegmentID: segment.ID,
PartitionID: segment.PartitionID,
CollectionID: segment.CollectionID,
BinlogPaths: segment.Binlogs,
NumOfRows: segment.NumOfRows,
Statslogs: segment.Statslogs,
Deltalogs: segment.Deltalogs,
InsertChannel: segment.InsertChannel,
IndexInfos: indexes,
StartPosition: segment.GetStartPosition(),
DeltaPosition: deltaPosition,
}
loadInfo.SegmentSize = calculateSegmentSize(loadInfo)
return loadInfo
......
......@@ -55,7 +55,7 @@ func Test_packLoadSegmentRequest(t *testing.T) {
proto.Clone(segmentInfo).(*datapb.SegmentInfo),
},
}
req := PackSegmentLoadInfo(resp, nil, 0)
req := PackSegmentLoadInfo(resp, nil)
assert.NotNil(t, req.GetDeltaPosition())
assert.Equal(t, mockPChannel, req.GetDeltaPosition().ChannelName)
assert.Equal(t, t2, req.GetDeltaPosition().Timestamp)
......@@ -67,7 +67,7 @@ func Test_packLoadSegmentRequest(t *testing.T) {
resp := &datapb.GetSegmentInfoResponse{
Infos: []*datapb.SegmentInfo{segInfo},
}
req := PackSegmentLoadInfo(resp, nil, 0)
req := PackSegmentLoadInfo(resp, nil)
assert.NotNil(t, req.GetDeltaPosition())
assert.Equal(t, mockPChannel, req.GetDeltaPosition().ChannelName)
assert.Equal(t, t1, req.GetDeltaPosition().Timestamp)
......@@ -79,7 +79,7 @@ func Test_packLoadSegmentRequest(t *testing.T) {
resp := &datapb.GetSegmentInfoResponse{
Infos: []*datapb.SegmentInfo{segInfo},
}
req := PackSegmentLoadInfo(resp, nil, 0)
req := PackSegmentLoadInfo(resp, nil)
assert.NotNil(t, req.GetDeltaPosition())
assert.Equal(t, mockPChannel, req.GetDeltaPosition().ChannelName)
assert.Equal(t, t0, req.GetDeltaPosition().Timestamp)
......
......@@ -374,11 +374,10 @@ func (sd *shardDelegator) LoadSegments(ctx context.Context, req *querypb.LoadSeg
// alter distribution
entries := lo.Map(req.GetInfos(), func(info *querypb.SegmentLoadInfo, _ int) SegmentEntry {
return SegmentEntry{
SegmentID: info.GetSegmentID(),
PartitionID: info.GetPartitionID(),
NodeID: req.GetDstNodeID(),
Version: req.GetVersion(),
TargetVersion: info.GetReadableVersion(),
SegmentID: info.GetSegmentID(),
PartitionID: info.GetPartitionID(),
NodeID: req.GetDstNodeID(),
Version: req.GetVersion(),
}
})
sd.distribution.AddDistributions(entries...)
......
......@@ -171,6 +171,10 @@ func (d *distribution) AddDistributions(entries ...SegmentEntry) {
defer d.mut.Unlock()
for _, entry := range entries {
if s, ok := d.sealedSegments[entry.SegmentID]; ok {
// remain the target version for already loaded segment to void skipping this segment when executing search
entry.TargetVersion = s.TargetVersion
}
d.sealedSegments[entry.SegmentID] = entry
d.offlines.Remove(entry.SegmentID)
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册