未验证 提交 ba02e703 编写于 作者: M MrPresent-Han 提交者: GitHub

fix bug when syncing distribution without schema and remove unused logic(#23085) (#23216)

Signed-off-by: NMrPresent-Han <jamesharden11122@gmail.com>
上级 fe0d12c5
......@@ -369,6 +369,7 @@ generate-mockery: getdeps
$(PWD)/bin/mockery --name=Manager --dir=$(PWD)/internal/querynodev2/cluster --output=$(PWD)/internal/querynodev2/cluster --filename=mock_manager.go --with-expecter --outpkg=cluster --structname=MockManager --inpackage
$(PWD)/bin/mockery --name=Loader --dir=$(PWD)/internal/querynodev2/segments --output=$(PWD)/internal/querynodev2/segments --filename=mock_loader.go --with-expecter --outpkg=segments --structname=MockLoader --inpackage
$(PWD)/bin/mockery --name=Worker --dir=$(PWD)/internal/querynodev2/cluster --output=$(PWD)/internal/querynodev2/cluster --filename=mock_worker.go --with-expecter --outpkg=worker --structname=MockWorker --inpackage
$(PWD)/bin/mockery --name=ShardDelegator --dir=$(PWD)/internal/querynodev2/delegator/ --output=$(PWD)/internal/querynodev2/delegator/ --filename=mock_delegator.go --with-expecter --outpkg=delegator --structname=MockShardDelegator --inpackage
ci-ut: build-cpp-with-coverage generated-proto-go-without-cpp codecov-cpp codecov-go
......@@ -555,6 +555,10 @@ message SyncDistributionRequest {
int64 collectionID = 2;
string channel = 3;
repeated SyncAction actions = 4;
schema.CollectionSchema schema = 5;
LoadMetaInfo load_meta = 6;
int64 replicaID = 7;
int64 version = 8;
}
message ResourceGroup {
......
......@@ -101,7 +101,7 @@ func (o *LeaderObserver) observeCollection(ctx context.Context, collection int64
dists := o.dist.SegmentDistManager.GetByShardWithReplica(ch, replica)
needLoaded, needRemoved := o.findNeedLoadedSegments(leaderView, dists),
o.findNeedRemovedSegments(leaderView, dists)
o.sync(ctx, leaderView, append(needLoaded, needRemoved...))
o.sync(ctx, replica.GetID(), leaderView, append(needLoaded, needRemoved...))
}
}
}
......@@ -129,27 +129,6 @@ func (o *LeaderObserver) findNeedLoadedSegments(leaderView *meta.LeaderView, dis
segment := resp.GetInfos()[0]
loadInfo := utils.PackSegmentLoadInfo(segment, nil)
// Fix the leader view with lacks of delta logs
if existInCurrentTarget && s.LastDeltaTimestamp < currentTarget.GetDmlPosition().GetTimestamp() {
log.Info("Fix QueryNode delta logs lag",
zap.Int64("nodeID", s.Node),
zap.Int64("collectionID", s.GetCollectionID()),
zap.Int64("partitionID", s.GetPartitionID()),
zap.Int64("segmentID", s.GetID()),
zap.Uint64("segmentDeltaTimestamp", s.LastDeltaTimestamp),
zap.Uint64("channelTimestamp", currentTarget.GetDmlPosition().GetTimestamp()),
)
ret = append(ret, &querypb.SyncAction{
Type: querypb.SyncType_Amend,
PartitionID: s.GetPartitionID(),
SegmentID: s.GetID(),
NodeID: s.Node,
Version: s.Version,
Info: loadInfo,
})
}
ret = append(ret, &querypb.SyncAction{
Type: querypb.SyncType_Set,
PartitionID: s.GetPartitionID(),
......@@ -190,7 +169,7 @@ func (o *LeaderObserver) findNeedRemovedSegments(leaderView *meta.LeaderView, di
return ret
}
func (o *LeaderObserver) sync(ctx context.Context, leaderView *meta.LeaderView, diffs []*querypb.SyncAction) {
func (o *LeaderObserver) sync(ctx context.Context, replicaID int64, leaderView *meta.LeaderView, diffs []*querypb.SyncAction) {
if len(diffs) == 0 {
return
}
......@@ -200,13 +179,33 @@ func (o *LeaderObserver) sync(ctx context.Context, leaderView *meta.LeaderView,
zap.Int64("collectionID", leaderView.CollectionID),
zap.String("channel", leaderView.Channel),
)
schema, err := o.broker.GetCollectionSchema(ctx, leaderView.CollectionID)
if err != nil {
log.Error("sync distribution failed, cannot get schema of collection", zap.Error(err))
return
}
partitions, err := utils.GetPartitions(o.meta.CollectionManager, leaderView.CollectionID)
if err != nil {
log.Error("sync distribution failed, cannot get partitions of collection", zap.Error(err))
return
}
req := &querypb.SyncDistributionRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(commonpb.MsgType_SyncDistribution),
),
CollectionID: leaderView.CollectionID,
ReplicaID: replicaID,
Channel: leaderView.Channel,
Actions: diffs,
Schema: schema,
LoadMeta: &querypb.LoadMetaInfo{
LoadType: o.meta.GetLoadType(leaderView.CollectionID),
CollectionID: leaderView.CollectionID,
PartitionIDs: partitions,
},
Version: time.Now().UnixNano(),
}
resp, err := o.cluster.SyncDistribution(ctx, leaderView.ID, req)
if err != nil {
......
......@@ -21,6 +21,7 @@ import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"go.uber.org/atomic"
......@@ -105,6 +106,8 @@ func (suite *LeaderObserverTestSuite) TestSyncLoadedSegments() {
InsertChannel: "test-insert-channel",
}
loadInfo := utils.PackSegmentLoadInfo(info, nil)
schema := utils.CreateTestSchema()
suite.broker.EXPECT().GetCollectionSchema(mock.Anything, int64(1)).Return(schema, nil)
suite.broker.EXPECT().GetSegmentInfo(mock.Anything, int64(1)).Return(
&datapb.GetSegmentInfoResponse{Infos: []*datapb.SegmentInfo{info}}, nil)
suite.broker.EXPECT().GetRecoveryInfo(mock.Anything, int64(1), int64(1)).Return(
......@@ -114,26 +117,43 @@ func (suite *LeaderObserverTestSuite) TestSyncLoadedSegments() {
observer.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 1, 2, 1, "test-insert-channel"))
observer.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel"))
observer.dist.LeaderViewManager.Update(2, utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{}))
expectReq := &querypb.SyncDistributionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_SyncDistribution,
},
CollectionID: 1,
Channel: "test-insert-channel",
Actions: []*querypb.SyncAction{
{
Type: querypb.SyncType_Set,
PartitionID: 1,
SegmentID: 1,
NodeID: 1,
Version: 1,
Info: loadInfo,
expectReqeustFunc := func(version int64) *querypb.SyncDistributionRequest {
return &querypb.SyncDistributionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_SyncDistribution,
},
},
CollectionID: 1,
ReplicaID: 1,
Channel: "test-insert-channel",
Actions: []*querypb.SyncAction{
{
Type: querypb.SyncType_Set,
PartitionID: 1,
SegmentID: 1,
NodeID: 1,
Version: 1,
Info: loadInfo,
},
},
Schema: schema,
LoadMeta: &querypb.LoadMetaInfo{
CollectionID: 1,
PartitionIDs: []int64{},
},
Version: version,
}
}
called := atomic.NewBool(false)
suite.mockCluster.EXPECT().SyncDistribution(context.TODO(), int64(2), expectReq).Once().
Run(func(args mock.Arguments) { called.Store(true) }).
suite.mockCluster.EXPECT().SyncDistribution(context.TODO(), int64(2),
mock.AnythingOfType("*querypb.SyncDistributionRequest")).Once().
Run(func(args mock.Arguments) {
inputReq := (args[2]).(*querypb.SyncDistributionRequest)
assert.ElementsMatch(suite.T(), []*querypb.SyncDistributionRequest{inputReq},
[]*querypb.SyncDistributionRequest{expectReqeustFunc(inputReq.GetVersion())})
called.Store(true)
}).
Return(&commonpb.Status{}, nil)
observer.Start(context.TODO())
......@@ -163,6 +183,8 @@ func (suite *LeaderObserverTestSuite) TestIgnoreSyncLoadedSegments() {
ChannelName: "test-insert-channel",
},
}
schema := utils.CreateTestSchema()
suite.broker.EXPECT().GetCollectionSchema(mock.Anything, int64(1)).Return(schema, nil)
suite.broker.EXPECT().GetPartitions(mock.Anything, int64(1)).Return([]int64{1}, nil)
info := &datapb.SegmentInfo{
ID: 1,
......@@ -182,26 +204,40 @@ func (suite *LeaderObserverTestSuite) TestIgnoreSyncLoadedSegments() {
observer.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel"))
observer.dist.LeaderViewManager.Update(2, utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{}))
expectReq := &querypb.SyncDistributionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_SyncDistribution,
},
CollectionID: 1,
Channel: "test-insert-channel",
Actions: []*querypb.SyncAction{
{
Type: querypb.SyncType_Set,
PartitionID: 1,
SegmentID: 1,
NodeID: 1,
Version: 1,
Info: loadInfo,
expectReqeustFunc := func(version int64) *querypb.SyncDistributionRequest {
return &querypb.SyncDistributionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_SyncDistribution,
},
},
CollectionID: 1,
ReplicaID: 1,
Channel: "test-insert-channel",
Actions: []*querypb.SyncAction{
{
Type: querypb.SyncType_Set,
PartitionID: 1,
SegmentID: 1,
NodeID: 1,
Version: 1,
Info: loadInfo,
},
},
Schema: schema,
LoadMeta: &querypb.LoadMetaInfo{
CollectionID: 1,
PartitionIDs: []int64{},
},
Version: version,
}
}
called := atomic.NewBool(false)
suite.mockCluster.EXPECT().SyncDistribution(context.TODO(), int64(2), expectReq).Once().
Run(func(args mock.Arguments) { called.Store(true) }).
suite.mockCluster.EXPECT().SyncDistribution(context.TODO(), int64(2), mock.AnythingOfType("*querypb.SyncDistributionRequest")).Once().
Run(func(args mock.Arguments) {
inputReq := (args[2]).(*querypb.SyncDistributionRequest)
assert.ElementsMatch(suite.T(), []*querypb.SyncDistributionRequest{inputReq},
[]*querypb.SyncDistributionRequest{expectReqeustFunc(inputReq.GetVersion())})
called.Store(true)
}).
Return(&commonpb.Status{}, nil)
observer.Start(context.TODO())
......@@ -279,10 +315,12 @@ func (suite *LeaderObserverTestSuite) TestSyncLoadedSegmentsWithReplicas() {
InsertChannel: "test-insert-channel",
}
loadInfo := utils.PackSegmentLoadInfo(info, nil)
schema := utils.CreateTestSchema()
suite.broker.EXPECT().GetSegmentInfo(mock.Anything, int64(1)).Return(
&datapb.GetSegmentInfoResponse{Infos: []*datapb.SegmentInfo{info}}, nil)
suite.broker.EXPECT().GetRecoveryInfo(mock.Anything, int64(1), int64(1)).Return(
channels, segments, nil)
suite.broker.EXPECT().GetCollectionSchema(mock.Anything, int64(1)).Return(schema, nil)
observer.target.UpdateCollectionNextTargetWithPartitions(int64(1), int64(1))
observer.target.UpdateCollectionCurrentTarget(1)
observer.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 1, 1, 1, "test-insert-channel"))
......@@ -290,26 +328,42 @@ func (suite *LeaderObserverTestSuite) TestSyncLoadedSegmentsWithReplicas() {
observer.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel"))
observer.dist.LeaderViewManager.Update(2, utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{}))
observer.dist.LeaderViewManager.Update(4, utils.CreateTestLeaderView(4, 1, "test-insert-channel", map[int64]int64{1: 4}, map[int64]*meta.Segment{}))
expectReq := &querypb.SyncDistributionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_SyncDistribution,
},
CollectionID: 1,
Channel: "test-insert-channel",
Actions: []*querypb.SyncAction{
{
Type: querypb.SyncType_Set,
PartitionID: 1,
SegmentID: 1,
NodeID: 1,
Version: 1,
Info: loadInfo,
expectReqeustFunc := func(version int64) *querypb.SyncDistributionRequest {
return &querypb.SyncDistributionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_SyncDistribution,
},
},
CollectionID: 1,
ReplicaID: 1,
Channel: "test-insert-channel",
Actions: []*querypb.SyncAction{
{
Type: querypb.SyncType_Set,
PartitionID: 1,
SegmentID: 1,
NodeID: 1,
Version: 1,
Info: loadInfo,
},
},
Schema: schema,
LoadMeta: &querypb.LoadMetaInfo{
CollectionID: 1,
PartitionIDs: []int64{},
},
Version: version,
}
}
called := atomic.NewBool(false)
suite.mockCluster.EXPECT().SyncDistribution(context.TODO(), int64(2), expectReq).Once().
Run(func(args mock.Arguments) { called.Store(true) }).
suite.mockCluster.EXPECT().SyncDistribution(context.TODO(), int64(2),
mock.AnythingOfType("*querypb.SyncDistributionRequest")).Once().
Run(func(args mock.Arguments) {
inputReq := (args[2]).(*querypb.SyncDistributionRequest)
assert.ElementsMatch(suite.T(), []*querypb.SyncDistributionRequest{inputReq},
[]*querypb.SyncDistributionRequest{expectReqeustFunc(inputReq.GetVersion())})
called.Store(true)
}).
Return(&commonpb.Status{}, nil)
observer.Start(context.TODO())
......@@ -331,23 +385,41 @@ func (suite *LeaderObserverTestSuite) TestSyncRemovedSegments() {
observer.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel"))
observer.dist.LeaderViewManager.Update(2, utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{3: 2}, map[int64]*meta.Segment{}))
expectReq := &querypb.SyncDistributionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_SyncDistribution,
},
CollectionID: 1,
Channel: "test-insert-channel",
Actions: []*querypb.SyncAction{
{
Type: querypb.SyncType_Remove,
SegmentID: 3,
NodeID: 2,
schema := utils.CreateTestSchema()
suite.broker.EXPECT().GetCollectionSchema(mock.Anything, int64(1)).Return(schema, nil)
expectReqeustFunc := func(version int64) *querypb.SyncDistributionRequest {
return &querypb.SyncDistributionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_SyncDistribution,
},
},
CollectionID: 1,
ReplicaID: 1,
Channel: "test-insert-channel",
Actions: []*querypb.SyncAction{
{
Type: querypb.SyncType_Remove,
SegmentID: 3,
NodeID: 2,
},
},
Schema: schema,
LoadMeta: &querypb.LoadMetaInfo{
CollectionID: 1,
PartitionIDs: []int64{},
},
Version: version,
}
}
ch := make(chan struct{})
suite.mockCluster.EXPECT().SyncDistribution(context.TODO(), int64(2), expectReq).Once().
Run(func(args mock.Arguments) { close(ch) }).
suite.mockCluster.EXPECT().SyncDistribution(context.TODO(), int64(2),
mock.AnythingOfType("*querypb.SyncDistributionRequest")).Once().
Run(func(args mock.Arguments) {
inputReq := (args[2]).(*querypb.SyncDistributionRequest)
assert.ElementsMatch(suite.T(), []*querypb.SyncDistributionRequest{inputReq},
[]*querypb.SyncDistributionRequest{expectReqeustFunc(inputReq.GetVersion())})
close(ch)
}).
Return(&commonpb.Status{}, nil)
observer.Start(context.TODO())
......@@ -376,6 +448,8 @@ func (suite *LeaderObserverTestSuite) TestIgnoreSyncRemovedSegments() {
ChannelName: "test-insert-channel",
},
}
schema := utils.CreateTestSchema()
suite.broker.EXPECT().GetCollectionSchema(mock.Anything, int64(1)).Return(schema, nil)
suite.broker.EXPECT().GetPartitions(mock.Anything, int64(1)).Return([]int64{1}, nil)
suite.broker.EXPECT().GetRecoveryInfo(mock.Anything, int64(1), int64(1)).Return(
channels, segments, nil)
......@@ -384,23 +458,37 @@ func (suite *LeaderObserverTestSuite) TestIgnoreSyncRemovedSegments() {
observer.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel"))
observer.dist.LeaderViewManager.Update(2, utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{3: 2, 2: 2}, map[int64]*meta.Segment{}))
expectReq := &querypb.SyncDistributionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_SyncDistribution,
},
CollectionID: 1,
Channel: "test-insert-channel",
Actions: []*querypb.SyncAction{
{
Type: querypb.SyncType_Remove,
SegmentID: 3,
NodeID: 2,
expectReqeustFunc := func(version int64) *querypb.SyncDistributionRequest {
return &querypb.SyncDistributionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_SyncDistribution,
},
},
CollectionID: 1,
ReplicaID: 1,
Channel: "test-insert-channel",
Actions: []*querypb.SyncAction{
{
Type: querypb.SyncType_Remove,
SegmentID: 3,
NodeID: 2,
},
},
Schema: schema,
LoadMeta: &querypb.LoadMetaInfo{
CollectionID: 1,
PartitionIDs: []int64{},
},
Version: version,
}
}
called := atomic.NewBool(false)
suite.mockCluster.EXPECT().SyncDistribution(context.TODO(), int64(2), expectReq).Once().
Run(func(args mock.Arguments) { called.Store(true) }).
suite.mockCluster.EXPECT().SyncDistribution(context.TODO(), int64(2), mock.AnythingOfType("*querypb.SyncDistributionRequest")).Once().
Run(func(args mock.Arguments) {
inputReq := (args[2]).(*querypb.SyncDistributionRequest)
assert.ElementsMatch(suite.T(), []*querypb.SyncDistributionRequest{inputReq},
[]*querypb.SyncDistributionRequest{expectReqeustFunc(inputReq.GetVersion())})
called.Store(true)
}).
Return(&commonpb.Status{}, nil)
observer.Start(context.TODO())
......
......@@ -17,6 +17,7 @@
package utils
import (
"github.com/milvus-io/milvus-proto/go-api/schemapb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
......@@ -63,6 +64,22 @@ func CreateTestReplica(id, collectionID int64, nodes []int64) *meta.Replica {
)
}
func CreateTestSchema() *schemapb.CollectionSchema {
return &schemapb.CollectionSchema{
Name: "schema",
Description: "schema",
AutoID: true,
Fields: []*schemapb.FieldSchema{
{
FieldID: 101,
Name: "id",
IsPrimaryKey: false,
DataType: schemapb.DataType_Int64,
},
},
}
}
func CreateTestCollection(collection int64, replica int32) *meta.Collection {
return &meta.Collection{
CollectionLoadInfo: &querypb.CollectionLoadInfo{
......
......@@ -362,7 +362,7 @@ func (sd *shardDelegator) LoadSegments(ctx context.Context, req *querypb.LoadSeg
})
removed := sd.distribution.AddDistributions(entries...)
// call worker release async
// release possible matched growing segments async
if len(removed) > 0 {
go func() {
worker, err := sd.workerManager.GetWorker(paramtable.GetNodeID())
......@@ -537,7 +537,7 @@ func (sd *shardDelegator) readDeleteFromMsgstream(ctx context.Context, position
return result, nil
}
// ReleaseSegments releases segments local or remotely depends ont the target node.
// ReleaseSegments releases segments local or remotely depending on the target node.
func (sd *shardDelegator) ReleaseSegments(ctx context.Context, req *querypb.ReleaseSegmentsRequest, force bool) error {
log := sd.getLogger(ctx)
......
......@@ -200,7 +200,7 @@ func (d *distribution) RemoveDistributions(sealedSegments []SegmentEntry, growin
}
// getSnapshot converts current distribution to snapshot format.
// in which, user could juse found nodeID=>segmentID list.
// in which, user could use found nodeID=>segmentID list.
// mutex RLock is required before calling this method.
func (d *distribution) genSnapshot() chan struct{} {
......
......@@ -963,6 +963,8 @@ func (node *QueryNode) SyncDistribution(ctx context.Context, req *querypb.SyncDi
}
return status, nil
}
// get shard delegator
shardDelegator, ok := node.delegators.Get(req.GetChannel())
if !ok {
log.Warn("failed to find shard cluster when sync ", zap.String("channel", req.GetChannel()))
......@@ -972,6 +974,7 @@ func (node *QueryNode) SyncDistribution(ctx context.Context, req *querypb.SyncDi
}, nil
}
//translate segment action
removeActions := make([]*querypb.SyncAction, 0)
addSegments := make(map[int64][]*querypb.SegmentLoadInfo)
for _, action := range req.GetActions() {
......@@ -985,22 +988,6 @@ func (node *QueryNode) SyncDistribution(ctx context.Context, req *querypb.SyncDi
removeActions = append(removeActions, action)
case querypb.SyncType_Set:
addSegments[action.GetNodeID()] = append(addSegments[action.GetNodeID()], action.GetInfo())
case querypb.SyncType_Amend:
err := shardDelegator.LoadSegments(ctx, &querypb.LoadSegmentsRequest{
Base: commonpbutil.NewMsgBase(),
DstNodeID: action.GetNodeID(),
CollectionID: action.GetInfo().GetCollectionID(),
Version: action.GetVersion(),
Infos: []*querypb.SegmentLoadInfo{action.GetInfo()},
LoadScope: querypb.LoadScope_Delta,
})
if err != nil {
log.Warn("failed to ament segment",
zap.Int64("segmentID", action.SegmentID),
zap.Error(err),
)
return util.WrapStatus(commonpb.ErrorCode_UnexpectedError, "failed to amend segment", err), nil
}
default:
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
......@@ -1011,10 +998,18 @@ func (node *QueryNode) SyncDistribution(ctx context.Context, req *querypb.SyncDi
for nodeID, infos := range addSegments {
err := shardDelegator.LoadSegments(ctx, &querypb.LoadSegmentsRequest{
Base: req.GetBase(),
DstNodeID: nodeID,
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(commonpb.MsgType_LoadSegments),
commonpbutil.WithMsgID(req.Base.GetMsgID()),
),
Infos: infos,
Schema: req.GetSchema(),
LoadMeta: req.GetLoadMeta(),
CollectionID: req.GetCollectionID(),
ReplicaID: req.GetReplicaID(),
DstNodeID: nodeID,
Version: req.GetVersion(),
NeedTransfer: false,
})
if err != nil {
return util.WrapStatus(commonpb.ErrorCode_UnexpectedError, "failed to sync(load) segment", err), nil
......
......@@ -1168,15 +1168,7 @@ func (suite *ServiceSuite) TestSyncDistribution_Normal() {
PartitionID: suite.partitionIDs[0],
}
amendAction := &querypb.SyncAction{
Type: querypb.SyncType_Amend,
Info: &querypb.SegmentLoadInfo{
SegmentID: suite.validSegmentIDs[0],
PartitionID: suite.partitionIDs[0],
CollectionID: suite.collectionID,
},
}
req.Actions = []*querypb.SyncAction{releaseAction, setAction, amendAction}
req.Actions = []*querypb.SyncAction{releaseAction, setAction}
status, err := suite.node.SyncDistribution(ctx, req)
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_UnexpectedError, status.ErrorCode)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册