未验证 提交 8c697903 编写于 作者: X xige-16 提交者: GitHub

Fix lost delete msg caused by loadSegment after watchDeltaChannel (#17308)

Signed-off-by: Nxige-16 <xi.ge@zilliz.com>
上级 5f44e454
...@@ -19,7 +19,6 @@ package msgstream ...@@ -19,7 +19,6 @@ package msgstream
import ( import (
"errors" "errors"
"fmt" "fmt"
"strconv"
"github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/commonpb"
) )
...@@ -72,9 +71,6 @@ func DeleteRepackFunc(tsMsgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, e ...@@ -72,9 +71,6 @@ func DeleteRepackFunc(tsMsgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, e
if keysLen != timestampLen || int64(keysLen) != deleteRequest.NumRows { if keysLen != timestampLen || int64(keysLen) != deleteRequest.NumRows {
return nil, errors.New("the length of hashValue, timestamps, primaryKeys are not equal") return nil, errors.New("the length of hashValue, timestamps, primaryKeys are not equal")
} }
if keysLen != 1 {
return nil, errors.New("len(msg.hashValue) must equal 1, but it is: " + strconv.Itoa(keysLen))
}
key := keys[0] key := keys[0]
_, ok := result[key] _, ok := result[key]
......
...@@ -218,6 +218,7 @@ message LoadSegmentsRequest { ...@@ -218,6 +218,7 @@ message LoadSegmentsRequest {
int64 collectionID = 6; int64 collectionID = 6;
LoadMetaInfo load_meta = 7; LoadMetaInfo load_meta = 7;
int64 replicaID = 8; int64 replicaID = 8;
repeated internal.MsgPosition delta_positions = 9;
} }
message ReleaseSegmentsRequest { message ReleaseSegmentsRequest {
......
...@@ -33,9 +33,11 @@ import ( ...@@ -33,9 +33,11 @@ import (
"github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metrics" "github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb" "github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/typeutil" "github.com/milvus-io/milvus/internal/util/typeutil"
) )
...@@ -210,6 +212,48 @@ func (c *queryNodeCluster) LoadSegments(ctx context.Context, nodeID int64, in *q ...@@ -210,6 +212,48 @@ func (c *queryNodeCluster) LoadSegments(ctx context.Context, nodeID int64, in *q
c.RUnlock() c.RUnlock()
if targetNode != nil { if targetNode != nil {
collectionID := in.CollectionID
// if node has watched the collection's deltaChannel
// then the node should recover part delete log from dmChanel
if c.HasWatchedDeltaChannel(ctx, nodeID, collectionID) {
// get all deltaChannelInfo of the collection from meta
deltaChannelInfos, err := c.clusterMeta.getDeltaChannelsByCollectionID(collectionID)
if err != nil {
// this case should not happen
// deltaChannelInfos should have been set to meta before executing child tasks
log.Error("loadSegments: failed to get deltaChannelInfo from meta", zap.Error(err))
return err
}
deltaChannel2Info := make(map[string]*datapb.VchannelInfo, len(deltaChannelInfos))
for _, info := range deltaChannelInfos {
deltaChannel2Info[info.ChannelName] = info
}
// check delta channel which should be reloaded
reloadDeltaChannels := make(map[string]struct{})
for _, segment := range in.Infos {
// convert vChannel to deltaChannel
deltaChannelName, err := funcutil.ConvertChannelName(segment.InsertChannel, Params.CommonCfg.RootCoordDml, Params.CommonCfg.RootCoordDelta)
if err != nil {
return err
}
reloadDeltaChannels[deltaChannelName] = struct{}{}
}
in.DeltaPositions = make([]*internalpb.MsgPosition, 0)
for deltaChannelName := range reloadDeltaChannels {
if info, ok := deltaChannel2Info[deltaChannelName]; ok {
in.DeltaPositions = append(in.DeltaPositions, info.SeekPosition)
} else {
// this case should not happen
err = fmt.Errorf("loadSegments: can't find deltaChannelInfo, channel name = %s", deltaChannelName)
log.Error(err.Error())
return err
}
}
}
err := targetNode.loadSegments(ctx, in) err := targetNode.loadSegments(ctx, in)
if err != nil { if err != nil {
log.Warn("loadSegments: queryNode load segments error", zap.Int64("nodeID", nodeID), zap.String("error info", err.Error())) log.Warn("loadSegments: queryNode load segments error", zap.Int64("nodeID", nodeID), zap.String("error info", err.Error()))
......
...@@ -27,8 +27,7 @@ import ( ...@@ -27,8 +27,7 @@ import (
"sync/atomic" "sync/atomic"
"testing" "testing"
"github.com/milvus-io/milvus/internal/util/dependency" "github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/internal/util/indexcgowrapper"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/log"
...@@ -36,13 +35,15 @@ import ( ...@@ -36,13 +35,15 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/etcdpb" "github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/proto/schemapb" "github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/internal/util/etcd" "github.com/milvus-io/milvus/internal/util/etcd"
"github.com/milvus-io/milvus/internal/util/indexcgowrapper"
"github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/typeutil" "github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/stretchr/testify/assert"
) )
const ( const (
...@@ -426,6 +427,17 @@ func TestGrpcRequest(t *testing.T) { ...@@ -426,6 +427,17 @@ func TestGrpcRequest(t *testing.T) {
} }
meta, err := newMeta(baseCtx, kv, factory, idAllocator) meta, err := newMeta(baseCtx, kv, factory, idAllocator)
assert.Nil(t, err) assert.Nil(t, err)
deltaChannelInfo := []*datapb.VchannelInfo{
{
CollectionID: defaultCollectionID,
ChannelName: "by-dev-rootcoord-delta_1_2021v1",
SeekPosition: &internalpb.MsgPosition{
ChannelName: "by-dev-rootcoord-dml_1",
},
},
}
err = meta.setDeltaChannel(defaultCollectionID, deltaChannelInfo)
assert.Nil(t, err)
handler, err := newChannelUnsubscribeHandler(baseCtx, kv, factory) handler, err := newChannelUnsubscribeHandler(baseCtx, kv, factory)
assert.Nil(t, err) assert.Nil(t, err)
...@@ -471,10 +483,39 @@ func TestGrpcRequest(t *testing.T) { ...@@ -471,10 +483,39 @@ func TestGrpcRequest(t *testing.T) {
t.Run("Test LoadSegments", func(t *testing.T) { t.Run("Test LoadSegments", func(t *testing.T) {
segmentLoadInfo := &querypb.SegmentLoadInfo{ segmentLoadInfo := &querypb.SegmentLoadInfo{
SegmentID: defaultSegmentID, SegmentID: defaultSegmentID,
PartitionID: defaultPartitionID, PartitionID: defaultPartitionID,
CollectionID: defaultCollectionID,
InsertChannel: "by-dev-rootcoord-dml_1_2021v1",
}
loadSegmentReq := &querypb.LoadSegmentsRequest{
DstNodeID: nodeID,
Infos: []*querypb.SegmentLoadInfo{segmentLoadInfo},
Schema: genDefaultCollectionSchema(false),
CollectionID: defaultCollectionID, CollectionID: defaultCollectionID,
} }
err := cluster.LoadSegments(baseCtx, nodeID, loadSegmentReq)
assert.Equal(t, 0, len(loadSegmentReq.DeltaPositions))
assert.Nil(t, err)
})
t.Run("Test WatchDeletaChannel", func(t *testing.T) {
watchDeltaChannelReq := &querypb.WatchDeltaChannelsRequest{
CollectionID: defaultCollectionID,
Infos: deltaChannelInfo,
}
err := cluster.WatchDeltaChannels(baseCtx, nodeID, watchDeltaChannelReq)
assert.Nil(t, err)
assert.Equal(t, true, cluster.HasWatchedDeltaChannel(baseCtx, nodeID, defaultCollectionID))
})
t.Run("Test LoadSegmentsAfterWatchDeltaChannel", func(t *testing.T) {
segmentLoadInfo := &querypb.SegmentLoadInfo{
SegmentID: defaultSegmentID,
PartitionID: defaultPartitionID,
CollectionID: defaultCollectionID,
InsertChannel: "by-dev-rootcoord-dml_1_2021v1",
}
loadSegmentReq := &querypb.LoadSegmentsRequest{ loadSegmentReq := &querypb.LoadSegmentsRequest{
DstNodeID: nodeID, DstNodeID: nodeID,
Infos: []*querypb.SegmentLoadInfo{segmentLoadInfo}, Infos: []*querypb.SegmentLoadInfo{segmentLoadInfo},
...@@ -482,6 +523,7 @@ func TestGrpcRequest(t *testing.T) { ...@@ -482,6 +523,7 @@ func TestGrpcRequest(t *testing.T) {
CollectionID: defaultCollectionID, CollectionID: defaultCollectionID,
} }
err := cluster.LoadSegments(baseCtx, nodeID, loadSegmentReq) err := cluster.LoadSegments(baseCtx, nodeID, loadSegmentReq)
assert.Equal(t, 1, len(loadSegmentReq.DeltaPositions))
assert.Nil(t, err) assert.Nil(t, err)
}) })
......
...@@ -923,6 +923,13 @@ func (m *MetaReplica) getDeltaChannelsByCollectionID(collectionID UniqueID) ([]* ...@@ -923,6 +923,13 @@ func (m *MetaReplica) getDeltaChannelsByCollectionID(collectionID UniqueID) ([]*
func (m *MetaReplica) setDeltaChannel(collectionID UniqueID, infos []*datapb.VchannelInfo) error { func (m *MetaReplica) setDeltaChannel(collectionID UniqueID, infos []*datapb.VchannelInfo) error {
m.deltaChannelMu.Lock() m.deltaChannelMu.Lock()
defer m.deltaChannelMu.Unlock() defer m.deltaChannelMu.Unlock()
if len(infos) == 0 {
err := fmt.Errorf("set empty delta channel info to meta of collection %d", collectionID)
log.Error(err.Error())
return err
}
_, ok := m.deltaChannelInfos[collectionID] _, ok := m.deltaChannelInfos[collectionID]
if ok { if ok {
log.Debug("delta channel already exist", zap.Any("collectionID", collectionID)) log.Debug("delta channel already exist", zap.Any("collectionID", collectionID))
......
...@@ -36,7 +36,6 @@ import ( ...@@ -36,7 +36,6 @@ import (
"github.com/milvus-io/milvus/internal/proto/schemapb" "github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/typeutil" "github.com/milvus-io/milvus/internal/util/typeutil"
) )
...@@ -374,9 +373,8 @@ func (data *dataCoordMock) GetRecoveryInfo(ctx context.Context, req *datapb.GetR ...@@ -374,9 +373,8 @@ func (data *dataCoordMock) GetRecoveryInfo(ctx context.Context, req *datapb.GetR
if _, ok := data.col2DmChannels[collectionID]; !ok { if _, ok := data.col2DmChannels[collectionID]; !ok {
channelInfos := make([]*datapb.VchannelInfo, 0) channelInfos := make([]*datapb.VchannelInfo, 0)
data.collections = append(data.collections, collectionID) data.collections = append(data.collections, collectionID)
collectionName := funcutil.RandomString(8)
for i := int32(0); i < common.DefaultShardsNum; i++ { for i := int32(0); i < common.DefaultShardsNum; i++ {
vChannel := fmt.Sprintf("Dml_%s_%d_%d_v", collectionName, collectionID, i) vChannel := fmt.Sprintf("%s_%d_%dv%d", Params.CommonCfg.RootCoordDml, i, collectionID, i)
channelInfo := &datapb.VchannelInfo{ channelInfo := &datapb.VchannelInfo{
CollectionID: collectionID, CollectionID: collectionID,
ChannelName: vChannel, ChannelName: vChannel,
......
...@@ -54,8 +54,6 @@ func refreshParams() { ...@@ -54,8 +54,6 @@ func refreshParams() {
Params.CommonCfg.QueryNodeStats = Params.CommonCfg.QueryNodeStats + suffix Params.CommonCfg.QueryNodeStats = Params.CommonCfg.QueryNodeStats + suffix
Params.CommonCfg.QueryCoordTimeTick = Params.CommonCfg.QueryCoordTimeTick + suffix Params.CommonCfg.QueryCoordTimeTick = Params.CommonCfg.QueryCoordTimeTick + suffix
Params.EtcdCfg.MetaRootPath = Params.EtcdCfg.MetaRootPath + suffix Params.EtcdCfg.MetaRootPath = Params.EtcdCfg.MetaRootPath + suffix
Params.CommonCfg.RootCoordDml = "Dml"
Params.CommonCfg.RootCoordDelta = "delta"
GlobalSegmentInfos = make(map[UniqueID]*querypb.SegmentInfo) GlobalSegmentInfos = make(map[UniqueID]*querypb.SegmentInfo)
} }
......
...@@ -519,6 +519,16 @@ func Test_generateDerivedInternalTasks(t *testing.T) { ...@@ -519,6 +519,16 @@ func Test_generateDerivedInternalTasks(t *testing.T) {
node1, err := startQueryNodeServer(baseCtx) node1, err := startQueryNodeServer(baseCtx)
assert.Nil(t, err) assert.Nil(t, err)
waitQueryNodeOnline(queryCoord.cluster, node1.queryNodeID) waitQueryNodeOnline(queryCoord.cluster, node1.queryNodeID)
vChannelInfos, _, err := queryCoord.broker.getRecoveryInfo(baseCtx, defaultCollectionID, defaultPartitionID)
assert.NoError(t, err)
deltaChannelInfos := make([]*datapb.VchannelInfo, len(vChannelInfos))
for i, info := range vChannelInfos {
deltaInfo, err := generateWatchDeltaChannelInfo(info)
assert.NoError(t, err)
deltaChannelInfos[i] = deltaInfo
}
queryCoord.meta.setDeltaChannel(defaultCollectionID, deltaChannelInfos)
loadCollectionTask := genLoadCollectionTask(baseCtx, queryCoord) loadCollectionTask := genLoadCollectionTask(baseCtx, queryCoord)
loadSegmentTask := genLoadSegmentTask(baseCtx, queryCoord, node1.queryNodeID) loadSegmentTask := genLoadSegmentTask(baseCtx, queryCoord, node1.queryNodeID)
......
...@@ -37,7 +37,6 @@ import ( ...@@ -37,7 +37,6 @@ import (
) )
func genLoadCollectionTask(ctx context.Context, queryCoord *QueryCoord) *loadCollectionTask { func genLoadCollectionTask(ctx context.Context, queryCoord *QueryCoord) *loadCollectionTask {
queryCoord.meta.setDeltaChannel(defaultCollectionID, nil)
req := &querypb.LoadCollectionRequest{ req := &querypb.LoadCollectionRequest{
Base: &commonpb.MsgBase{ Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_LoadCollection, MsgType: commonpb.MsgType_LoadCollection,
...@@ -58,7 +57,6 @@ func genLoadCollectionTask(ctx context.Context, queryCoord *QueryCoord) *loadCol ...@@ -58,7 +57,6 @@ func genLoadCollectionTask(ctx context.Context, queryCoord *QueryCoord) *loadCol
} }
func genLoadPartitionTask(ctx context.Context, queryCoord *QueryCoord) *loadPartitionTask { func genLoadPartitionTask(ctx context.Context, queryCoord *QueryCoord) *loadPartitionTask {
queryCoord.meta.setDeltaChannel(defaultCollectionID, nil)
req := &querypb.LoadPartitionsRequest{ req := &querypb.LoadPartitionsRequest{
Base: &commonpb.MsgBase{ Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_LoadPartitions, MsgType: commonpb.MsgType_LoadPartitions,
...@@ -195,7 +193,6 @@ func genWatchDmChannelTask(ctx context.Context, queryCoord *QueryCoord, nodeID i ...@@ -195,7 +193,6 @@ func genWatchDmChannelTask(ctx context.Context, queryCoord *QueryCoord, nodeID i
return watchDmChannelTask return watchDmChannelTask
} }
func genLoadSegmentTask(ctx context.Context, queryCoord *QueryCoord, nodeID int64) *loadSegmentTask { func genLoadSegmentTask(ctx context.Context, queryCoord *QueryCoord, nodeID int64) *loadSegmentTask {
queryCoord.meta.setDeltaChannel(defaultCollectionID, nil)
schema := genDefaultCollectionSchema(false) schema := genDefaultCollectionSchema(false)
segmentInfo := &querypb.SegmentLoadInfo{ segmentInfo := &querypb.SegmentLoadInfo{
SegmentID: defaultSegmentID, SegmentID: defaultSegmentID,
...@@ -666,7 +663,6 @@ func Test_RescheduleSegment(t *testing.T) { ...@@ -666,7 +663,6 @@ func Test_RescheduleSegment(t *testing.T) {
node1.loadSegment = returnFailedResult node1.loadSegment = returnFailedResult
loadSegmentTask := genLoadSegmentTask(ctx, queryCoord, node1.queryNodeID) loadSegmentTask := genLoadSegmentTask(ctx, queryCoord, node1.queryNodeID)
loadSegmentTask.meta.setDeltaChannel(defaultCollectionID, []*datapb.VchannelInfo{})
loadCollectionTask := loadSegmentTask.parentTask loadCollectionTask := loadSegmentTask.parentTask
queryCoord.scheduler.triggerTaskQueue.addTask(loadCollectionTask) queryCoord.scheduler.triggerTaskQueue.addTask(loadCollectionTask)
...@@ -1445,8 +1441,7 @@ func Test_LoadSegment(t *testing.T) { ...@@ -1445,8 +1441,7 @@ func Test_LoadSegment(t *testing.T) {
waitQueryNodeOnline(queryCoord.cluster, node1.queryNodeID) waitQueryNodeOnline(queryCoord.cluster, node1.queryNodeID)
loadSegmentTask := genLoadSegmentTask(ctx, queryCoord, node1.queryNodeID) loadSegmentTask := genLoadSegmentTask(ctx, queryCoord, node1.queryNodeID)
err = loadSegmentTask.meta.setDeltaChannel(111, []*datapb.VchannelInfo{})
assert.Nil(t, err)
loadCollectionTask := loadSegmentTask.parentTask loadCollectionTask := loadSegmentTask.parentTask
queryCoord.scheduler.triggerTaskQueue.addTask(loadCollectionTask) queryCoord.scheduler.triggerTaskQueue.addTask(loadCollectionTask)
......
...@@ -23,6 +23,7 @@ import ( ...@@ -23,6 +23,7 @@ import (
"fmt" "fmt"
"math" "math"
"math/rand" "math/rand"
"path"
"strconv" "strconv"
"github.com/milvus-io/milvus/internal/util/dependency" "github.com/milvus-io/milvus/internal/util/dependency"
...@@ -298,7 +299,7 @@ func loadIndexForSegment(ctx context.Context, node *QueryNode, segmentID UniqueI ...@@ -298,7 +299,7 @@ func loadIndexForSegment(ctx context.Context, node *QueryNode, segmentID UniqueI
schema := genTestCollectionSchema(pkType) schema := genTestCollectionSchema(pkType)
// generate insert binlog // generate insert binlog
fieldBinlog, err := saveBinLog(ctx, defaultCollectionID, defaultPartitionID, defaultSegmentID, msgLength, schema) fieldBinlog, _, err := saveBinLog(ctx, defaultCollectionID, defaultPartitionID, defaultSegmentID, msgLength, schema)
if err != nil { if err != nil {
return err return err
} }
...@@ -925,7 +926,7 @@ func genStorageBlob(collectionID UniqueID, ...@@ -925,7 +926,7 @@ func genStorageBlob(collectionID UniqueID,
partitionID UniqueID, partitionID UniqueID,
segmentID UniqueID, segmentID UniqueID,
msgLength int, msgLength int,
schema *schemapb.CollectionSchema) ([]*storage.Blob, error) { schema *schemapb.CollectionSchema) ([]*storage.Blob, []*storage.Blob, error) {
tmpSchema := &schemapb.CollectionSchema{ tmpSchema := &schemapb.CollectionSchema{
Name: schema.Name, Name: schema.Name,
AutoID: schema.AutoID, AutoID: schema.AutoID,
...@@ -936,11 +937,11 @@ func genStorageBlob(collectionID UniqueID, ...@@ -936,11 +937,11 @@ func genStorageBlob(collectionID UniqueID,
inCodec := storage.NewInsertCodec(collMeta) inCodec := storage.NewInsertCodec(collMeta)
insertData, err := genInsertData(msgLength, schema) insertData, err := genInsertData(msgLength, schema)
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
binLogs, _, err := inCodec.Serialize(partitionID, segmentID, insertData) binLogs, statsLogs, err := inCodec.Serialize(partitionID, segmentID, insertData)
return binLogs, err return binLogs, statsLogs, err
} }
func genSimpleInsertMsg(schema *schemapb.CollectionSchema, numRows int) (*msgstream.InsertMsg, error) { func genSimpleInsertMsg(schema *schemapb.CollectionSchema, numRows int) (*msgstream.InsertMsg, error) {
...@@ -1000,14 +1001,14 @@ func saveBinLog(ctx context.Context, ...@@ -1000,14 +1001,14 @@ func saveBinLog(ctx context.Context,
partitionID UniqueID, partitionID UniqueID,
segmentID UniqueID, segmentID UniqueID,
msgLength int, msgLength int,
schema *schemapb.CollectionSchema) ([]*datapb.FieldBinlog, error) { schema *schemapb.CollectionSchema) ([]*datapb.FieldBinlog, []*datapb.FieldBinlog, error) {
binLogs, err := genStorageBlob(collectionID, binLogs, statsLogs, err := genStorageBlob(collectionID,
partitionID, partitionID,
segmentID, segmentID,
msgLength, msgLength,
schema) schema)
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
log.Debug(".. [query node unittest] Saving bin logs to MinIO ..", zap.Int("number", len(binLogs))) log.Debug(".. [query node unittest] Saving bin logs to MinIO ..", zap.Int("number", len(binLogs)))
...@@ -1019,10 +1020,11 @@ func saveBinLog(ctx context.Context, ...@@ -1019,10 +1020,11 @@ func saveBinLog(ctx context.Context,
fieldID, err := strconv.ParseInt(blob.GetKey(), 10, 64) fieldID, err := strconv.ParseInt(blob.GetKey(), 10, 64)
log.Debug("[query node unittest] save binlog", zap.Int64("fieldID", fieldID)) log.Debug("[query node unittest] save binlog", zap.Int64("fieldID", fieldID))
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
key := JoinIDPath(collectionID, partitionID, segmentID, fieldID) k := JoinIDPath(collectionID, partitionID, segmentID, fieldID)
key := path.Join("insert-log", k)
kvs[key] = blob.Value[:] kvs[key] = blob.Value[:]
fieldBinlog = append(fieldBinlog, &datapb.FieldBinlog{ fieldBinlog = append(fieldBinlog, &datapb.FieldBinlog{
FieldID: fieldID, FieldID: fieldID,
...@@ -1031,9 +1033,28 @@ func saveBinLog(ctx context.Context, ...@@ -1031,9 +1033,28 @@ func saveBinLog(ctx context.Context,
} }
log.Debug("[query node unittest] save binlog file to MinIO/S3") log.Debug("[query node unittest] save binlog file to MinIO/S3")
// write insert binlog
statsBinlog := make([]*datapb.FieldBinlog, 0)
for _, blob := range statsLogs {
fieldID, err := strconv.ParseInt(blob.GetKey(), 10, 64)
log.Debug("[query node unittest] save statLog", zap.Int64("fieldID", fieldID))
if err != nil {
return nil, nil, err
}
k := JoinIDPath(collectionID, partitionID, segmentID, fieldID)
key := path.Join("delta-log", k)
kvs[key] = blob.Value[:]
statsBinlog = append(statsBinlog, &datapb.FieldBinlog{
FieldID: fieldID,
Binlogs: []*datapb.Binlog{{LogPath: key}},
})
}
log.Debug("[query node unittest] save statsLog file to MinIO/S3")
cm := storage.NewLocalChunkManager(storage.RootPath(defaultLocalStorage)) cm := storage.NewLocalChunkManager(storage.RootPath(defaultLocalStorage))
err = cm.MultiWrite(kvs) err = cm.MultiWrite(kvs)
return fieldBinlog, err return fieldBinlog, statsBinlog, err
} }
// saveDeltaLog saves delta logs into MinIO for testing purpose. // saveDeltaLog saves delta logs into MinIO for testing purpose.
......
...@@ -41,7 +41,7 @@ func TestSegmentLoader_loadSegment(t *testing.T) { ...@@ -41,7 +41,7 @@ func TestSegmentLoader_loadSegment(t *testing.T) {
defer cancel() defer cancel()
schema := genTestCollectionSchema() schema := genTestCollectionSchema()
fieldBinlog, err := saveBinLog(ctx, defaultCollectionID, defaultPartitionID, defaultSegmentID, defaultMsgLength, schema) fieldBinlog, statsLog, err := saveBinLog(ctx, defaultCollectionID, defaultPartitionID, defaultSegmentID, defaultMsgLength, schema)
assert.NoError(t, err) assert.NoError(t, err)
t.Run("test load segment", func(t *testing.T) { t.Run("test load segment", func(t *testing.T) {
...@@ -65,6 +65,7 @@ func TestSegmentLoader_loadSegment(t *testing.T) { ...@@ -65,6 +65,7 @@ func TestSegmentLoader_loadSegment(t *testing.T) {
PartitionID: defaultPartitionID, PartitionID: defaultPartitionID,
CollectionID: defaultCollectionID, CollectionID: defaultCollectionID,
BinlogPaths: fieldBinlog, BinlogPaths: fieldBinlog,
Statslogs: statsLog,
}, },
}, },
} }
...@@ -178,7 +179,7 @@ func TestSegmentLoader_loadSegmentFieldsData(t *testing.T) { ...@@ -178,7 +179,7 @@ func TestSegmentLoader_loadSegmentFieldsData(t *testing.T) {
segmentTypeSealed) segmentTypeSealed)
assert.Nil(t, err) assert.Nil(t, err)
binlog, err := saveBinLog(ctx, defaultCollectionID, defaultPartitionID, defaultSegmentID, defaultMsgLength, schema) binlog, _, err := saveBinLog(ctx, defaultCollectionID, defaultPartitionID, defaultSegmentID, defaultMsgLength, schema)
assert.NoError(t, err) assert.NoError(t, err)
err = loader.loadFiledBinlogData(segment, binlog) err = loader.loadFiledBinlogData(segment, binlog)
...@@ -383,7 +384,7 @@ func TestSegmentLoader_testLoadGrowingAndSealed(t *testing.T) { ...@@ -383,7 +384,7 @@ func TestSegmentLoader_testLoadGrowingAndSealed(t *testing.T) {
defer cancel() defer cancel()
schema := genTestCollectionSchema() schema := genTestCollectionSchema()
fieldBinlog, err := saveBinLog(ctx, defaultCollectionID, defaultPartitionID, defaultSegmentID, defaultMsgLength, schema) fieldBinlog, statsLog, err := saveBinLog(ctx, defaultCollectionID, defaultPartitionID, defaultSegmentID, defaultMsgLength, schema)
assert.NoError(t, err) assert.NoError(t, err)
deltaLogs, err := saveDeltaLog(defaultCollectionID, defaultPartitionID, defaultSegmentID) deltaLogs, err := saveDeltaLog(defaultCollectionID, defaultPartitionID, defaultSegmentID)
...@@ -410,6 +411,7 @@ func TestSegmentLoader_testLoadGrowingAndSealed(t *testing.T) { ...@@ -410,6 +411,7 @@ func TestSegmentLoader_testLoadGrowingAndSealed(t *testing.T) {
PartitionID: defaultPartitionID, PartitionID: defaultPartitionID,
CollectionID: defaultCollectionID, CollectionID: defaultCollectionID,
BinlogPaths: fieldBinlog, BinlogPaths: fieldBinlog,
Statslogs: statsLog,
}, },
}, },
} }
...@@ -517,7 +519,7 @@ func TestSegmentLoader_testLoadSealedSegmentWithIndex(t *testing.T) { ...@@ -517,7 +519,7 @@ func TestSegmentLoader_testLoadSealedSegmentWithIndex(t *testing.T) {
schema := genTestCollectionSchema() schema := genTestCollectionSchema()
// generate insert binlog // generate insert binlog
fieldBinlog, err := saveBinLog(ctx, defaultCollectionID, defaultPartitionID, defaultSegmentID, defaultMsgLength, schema) fieldBinlog, statsLog, err := saveBinLog(ctx, defaultCollectionID, defaultPartitionID, defaultSegmentID, defaultMsgLength, schema)
assert.NoError(t, err) assert.NoError(t, err)
segmentID := UniqueID(100) segmentID := UniqueID(100)
...@@ -555,6 +557,7 @@ func TestSegmentLoader_testLoadSealedSegmentWithIndex(t *testing.T) { ...@@ -555,6 +557,7 @@ func TestSegmentLoader_testLoadSealedSegmentWithIndex(t *testing.T) {
CollectionID: defaultCollectionID, CollectionID: defaultCollectionID,
BinlogPaths: fieldBinlog, BinlogPaths: fieldBinlog,
IndexInfos: []*querypb.FieldIndexInfo{indexInfo}, IndexInfos: []*querypb.FieldIndexInfo{indexInfo},
Statslogs: statsLog,
}, },
}, },
} }
......
...@@ -529,6 +529,19 @@ func (l *loadSegmentsTask) Execute(ctx context.Context) error { ...@@ -529,6 +529,19 @@ func (l *loadSegmentsTask) Execute(ctx context.Context) error {
log.Warn(err.Error()) log.Warn(err.Error())
return err return err
} }
// reload delete log from cp to latest position
for _, deltaPosition := range l.req.DeltaPositions {
err = l.node.loader.FromDmlCPLoadDelete(ctx, l.req.CollectionID, deltaPosition)
if err != nil {
for _, segment := range l.req.Infos {
l.node.metaReplica.removeSegment(segment.SegmentID, segmentTypeSealed)
}
log.Warn("LoadSegmentTask from delta check point load delete failed", zap.Int64("msgID", l.req.Base.MsgID), zap.Error(err))
return err
}
}
log.Info("LoadSegmentTask Execute done", zap.Int64("msgID", l.req.Base.MsgID)) log.Info("LoadSegmentTask Execute done", zap.Int64("msgID", l.req.Base.MsgID))
return nil return nil
} }
......
...@@ -19,9 +19,9 @@ package querynode ...@@ -19,9 +19,9 @@ package querynode
import ( import (
"context" "context"
"testing" "testing"
"time"
"github.com/apache/pulsar-client-go/pulsar" "github.com/apache/pulsar-client-go/pulsar"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/internal/mq/msgstream" "github.com/milvus-io/milvus/internal/mq/msgstream"
...@@ -29,6 +29,8 @@ import ( ...@@ -29,6 +29,8 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/typeutil" "github.com/milvus-io/milvus/internal/util/typeutil"
) )
...@@ -231,7 +233,7 @@ func TestTask_watchDmChannelsTask(t *testing.T) { ...@@ -231,7 +233,7 @@ func TestTask_watchDmChannelsTask(t *testing.T) {
node: node, node: node,
} }
fieldBinlog, err := saveBinLog(ctx, defaultCollectionID, defaultPartitionID, defaultSegmentID, defaultMsgLength, schema) fieldBinlog, statsLog, err := saveBinLog(ctx, defaultCollectionID, defaultPartitionID, defaultSegmentID, defaultMsgLength, schema)
assert.NoError(t, err) assert.NoError(t, err)
task.req.Infos = []*datapb.VchannelInfo{ task.req.Infos = []*datapb.VchannelInfo{
...@@ -246,7 +248,8 @@ func TestTask_watchDmChannelsTask(t *testing.T) { ...@@ -246,7 +248,8 @@ func TestTask_watchDmChannelsTask(t *testing.T) {
ChannelName: defaultDMLChannel, ChannelName: defaultDMLChannel,
Timestamp: typeutil.MaxTimestamp, Timestamp: typeutil.MaxTimestamp,
}, },
Binlogs: fieldBinlog, Binlogs: fieldBinlog,
Statslogs: statsLog,
}, },
}, },
}, },
...@@ -384,7 +387,7 @@ func TestTask_loadSegmentsTask(t *testing.T) { ...@@ -384,7 +387,7 @@ func TestTask_loadSegmentsTask(t *testing.T) {
node, err := genSimpleQueryNode(ctx) node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err) assert.NoError(t, err)
fieldBinlog, err := saveBinLog(ctx, defaultCollectionID, defaultPartitionID, defaultSegmentID, defaultMsgLength, schema) fieldBinlog, statsLog, err := saveBinLog(ctx, defaultCollectionID, defaultPartitionID, defaultSegmentID, defaultMsgLength, schema)
assert.NoError(t, err) assert.NoError(t, err)
req := &querypb.LoadSegmentsRequest{ req := &querypb.LoadSegmentsRequest{
...@@ -396,6 +399,7 @@ func TestTask_loadSegmentsTask(t *testing.T) { ...@@ -396,6 +399,7 @@ func TestTask_loadSegmentsTask(t *testing.T) {
PartitionID: defaultPartitionID, PartitionID: defaultPartitionID,
CollectionID: defaultCollectionID, CollectionID: defaultCollectionID,
BinlogPaths: fieldBinlog, BinlogPaths: fieldBinlog,
Statslogs: statsLog,
}, },
}, },
} }
...@@ -412,7 +416,7 @@ func TestTask_loadSegmentsTask(t *testing.T) { ...@@ -412,7 +416,7 @@ func TestTask_loadSegmentsTask(t *testing.T) {
node, err := genSimpleQueryNode(ctx) node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err) assert.NoError(t, err)
fieldBinlog, err := saveBinLog(ctx, defaultCollectionID, defaultPartitionID, defaultSegmentID, defaultMsgLength, schema) fieldBinlog, statsLog, err := saveBinLog(ctx, defaultCollectionID, defaultPartitionID, defaultSegmentID, defaultMsgLength, schema)
assert.NoError(t, err) assert.NoError(t, err)
req := &querypb.LoadSegmentsRequest{ req := &querypb.LoadSegmentsRequest{
...@@ -424,6 +428,7 @@ func TestTask_loadSegmentsTask(t *testing.T) { ...@@ -424,6 +428,7 @@ func TestTask_loadSegmentsTask(t *testing.T) {
PartitionID: defaultPartitionID, PartitionID: defaultPartitionID,
CollectionID: defaultCollectionID, CollectionID: defaultCollectionID,
BinlogPaths: fieldBinlog, BinlogPaths: fieldBinlog,
Statslogs: statsLog,
}, },
}, },
} }
...@@ -446,42 +451,112 @@ func TestTask_loadSegmentsTask(t *testing.T) { ...@@ -446,42 +451,112 @@ func TestTask_loadSegmentsTask(t *testing.T) {
assert.Equal(t, 1, num) assert.Equal(t, 1, num)
}) })
t.Run("test execute grpc error", func(t *testing.T) { t.Run("test FromDmlCPLoadDelete", func(t *testing.T) {
node, err := genSimpleQueryNode(ctx) node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err) assert.NoError(t, err)
task := loadSegmentsTask{ vDmChannel := "by-dev-rootcoord-dml_1_2021v1"
req: genLoadEmptySegmentsRequest(), pDmChannel := funcutil.ToPhysicalChannel(vDmChannel)
node: node, stream, err := node.factory.NewMsgStream(node.queryNodeLoopCtx)
assert.Nil(t, err)
stream.AsProducer([]string{pDmChannel})
timeTickMsg := &msgstream.TimeTickMsg{
BaseMsg: msgstream.BaseMsg{
HashValues: []uint32{1},
},
TimeTickMsg: internalpb.TimeTickMsg{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_TimeTick,
Timestamp: 100,
},
},
} }
task.req.Infos = []*querypb.SegmentLoadInfo{
{ deleteMsg := &msgstream.DeleteMsg{
SegmentID: defaultSegmentID + 1, BaseMsg: msgstream.BaseMsg{
PartitionID: defaultPartitionID + 1, HashValues: []uint32{1, 1, 1},
CollectionID: defaultCollectionID + 1, },
DeleteRequest: internalpb.DeleteRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Delete,
Timestamp: 110,
},
CollectionID: defaultCollectionID,
PartitionID: defaultPartitionID,
PrimaryKeys: &schemapb.IDs{
IdField: &schemapb.IDs_IntId{
IntId: &schemapb.LongArray{
Data: []int64{1, 2, 3},
},
},
},
Timestamps: []Timestamp{110, 110, 110},
NumRows: 3,
}, },
} }
err = task.Execute(ctx)
assert.Error(t, err)
})
t.Run("test execute node down", func(t *testing.T) { pos1, err := stream.ProduceMark(&msgstream.MsgPack{Msgs: []msgstream.TsMsg{timeTickMsg}})
node, err := genSimpleQueryNode(ctx) assert.NoError(t, err)
msgIDs, ok := pos1[pDmChannel]
assert.True(t, ok)
assert.Equal(t, 1, len(msgIDs))
err = stream.Produce(&msgstream.MsgPack{Msgs: []msgstream.TsMsg{deleteMsg}})
assert.NoError(t, err)
// to stop reader from cp
go func() {
for {
select {
case <-ctx.Done():
break
default:
timeTickMsg.Base.Timestamp += 100
stream.Produce(&msgstream.MsgPack{Msgs: []msgstream.TsMsg{timeTickMsg}})
time.Sleep(200 * time.Millisecond)
}
}
}()
segmentID := defaultSegmentID + 1
fieldBinlog, statsLog, err := saveBinLog(ctx, defaultCollectionID, defaultPartitionID, segmentID, defaultMsgLength, schema)
assert.NoError(t, err) assert.NoError(t, err)
req := &querypb.LoadSegmentsRequest{
Base: genCommonMsgBase(commonpb.MsgType_LoadSegments),
Schema: schema,
Infos: []*querypb.SegmentLoadInfo{
{
SegmentID: segmentID,
PartitionID: defaultPartitionID,
CollectionID: defaultCollectionID,
BinlogPaths: fieldBinlog,
NumOfRows: defaultMsgLength,
Statslogs: statsLog,
},
},
DeltaPositions: []*internalpb.MsgPosition{
{
ChannelName: vDmChannel,
MsgID: msgIDs[0].Serialize(),
Timestamp: 100,
},
},
}
task := loadSegmentsTask{ task := loadSegmentsTask{
req: genLoadEmptySegmentsRequest(), req: req,
node: node, node: node,
} }
task.req.Infos = []*querypb.SegmentLoadInfo{ err = task.PreExecute(ctx)
{ assert.NoError(t, err)
SegmentID: defaultSegmentID + 1,
PartitionID: defaultPartitionID + 1,
CollectionID: defaultCollectionID + 1,
},
}
err = task.Execute(ctx) err = task.Execute(ctx)
assert.Error(t, err) assert.NoError(t, err)
segment, err := node.metaReplica.getSegmentByID(segmentID, segmentTypeSealed)
assert.NoError(t, err)
// has reload 3 delete log from dm channel, so next delete offset should be 3
offset := segment.segmentPreDelete(1)
assert.Equal(t, int64(3), offset)
}) })
t.Run("test OOM", func(t *testing.T) { t.Run("test OOM", func(t *testing.T) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册