未验证 提交 7a392d60 编写于 作者: W wayblink 提交者: GitHub

Pass backup flag through Import request options (#20374)

Signed-off-by: Nwayblink <anyang.wang@zilliz.com>
Signed-off-by: Nwayblink <anyang.wang@zilliz.com>
上级 65a2b1c5
......@@ -1097,12 +1097,13 @@ func (node *DataNode) Import(ctx context.Context, req *datapb.ImportTaskRequest)
saveSegmentFunc(node, req, importResult, ts))
// todo: pass tsStart and tsStart after import_wrapper support
tsStart, tsEnd, err := importutil.ParseTSFromOptions(req.GetImportTask().GetInfos())
isBackup := importutil.IsBackup(req.GetImportTask().GetInfos())
if err != nil {
return returnFailFunc(err)
}
log.Info("import time range", zap.Uint64("start_ts", tsStart), zap.Uint64("end_ts", tsEnd))
err = importWrapper.Import(req.GetImportTask().GetFiles(),
importutil.ImportOptions{OnlyValidate: false, TsStartPoint: tsStart, TsEndPoint: tsEnd})
importutil.ImportOptions{OnlyValidate: false, TsStartPoint: tsStart, TsEndPoint: tsEnd, IsBackup: isBackup})
if err != nil {
return returnFailFunc(err)
}
......
......@@ -20,6 +20,7 @@ import (
"errors"
"math"
"strconv"
"strings"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus/internal/util/funcutil"
......@@ -33,6 +34,7 @@ const (
EndTs = "end_ts" // end timestamp to filter data, only data between StartTs and EndTs will be imported
OptionFormat = "start_ts: 10-digit physical timestamp, e.g. 1665995420, default 0 \n" +
"end_ts: 10-digit physical timestamp, e.g. 1665995420, default math.MaxInt \n"
BackupFlag = "backup"
)
// ValidateOptions the options is illegal, return nil if illegal, return error if not.
......@@ -92,3 +94,12 @@ func ParseTSFromOptions(options []*commonpb.KeyValuePair) (uint64, uint64, error
}
return tsStart, tsEnd, nil
}
// IsBackup returns if the request is triggered by backup tool
func IsBackup(options []*commonpb.KeyValuePair) bool {
isBackup, err := funcutil.GetAttrByKeyFromRepeatedKV(BackupFlag, options)
if err != nil || strings.ToLower(isBackup) != "true" {
return false
}
return true
}
......@@ -87,3 +87,22 @@ func TestParseTSFromOptions(t *testing.T) {
assert.Equal(t, uint64(0), tsEnd)
assert.Error(t, err)
}
func TestIsBackup(t *testing.T) {
isBackup := IsBackup([]*commonpb.KeyValuePair{
{Key: "backup", Value: "true"},
})
assert.Equal(t, true, isBackup)
isBackup2 := IsBackup([]*commonpb.KeyValuePair{
{Key: "backup", Value: "True"},
})
assert.Equal(t, true, isBackup2)
falseBackup := IsBackup([]*commonpb.KeyValuePair{
{Key: "backup", Value: "false"},
})
assert.Equal(t, false, falseBackup)
noBackup := IsBackup([]*commonpb.KeyValuePair{
{Key: "backup", Value: "false"},
})
assert.Equal(t, false, noBackup)
}
......@@ -82,6 +82,7 @@ type ImportOptions struct {
OnlyValidate bool
TsStartPoint uint64
TsEndPoint uint64
IsBackup bool // whether is triggered by backup tool
}
func DefaultImportOptions() ImportOptions {
......@@ -312,7 +313,7 @@ func (p *ImportWrapper) Import(filePaths []string, options ImportOptions) error
log.Info("import wrapper: begin import", zap.Any("filePaths", filePaths), zap.Any("options", options))
// data restore function to import milvus native binlog files(for backup/restore tools)
// the backup/restore tool provide two paths for a partition, the first path is binlog path, the second is deltalog path
if p.isBinlogImport(filePaths) {
if options.IsBackup && p.isBinlogImport(filePaths) {
return p.doBinlogImport(filePaths, options.TsStartPoint, options.TsEndPoint)
}
......@@ -470,17 +471,6 @@ func (p *ImportWrapper) isBinlogImport(filePaths []string) bool {
log.Info("import wrapper: not a path, not binlog import", zap.String("filePath", filePath), zap.String("fileType", fileType))
return false
}
// check path existence
exist, err := p.chunkManager.Exist(p.ctx, filePath)
if err != nil {
log.Error("import wrapper: failed to check the path existence, not binlog import", zap.String("filePath", filePath), zap.Error(err))
return false
}
if !exist {
log.Info("import wrapper: the input path doesn't exist, not binlog import", zap.String("filePath", filePath))
return false
}
return true
}
......
......@@ -922,19 +922,6 @@ func Test_ImportWrapperIsBinlogImport(t *testing.T) {
"path1",
"path2",
}
b = wrapper.isBinlogImport(paths)
assert.False(t, b)
// insert log path is created, but delta log path doesn't exist
err = os.MkdirAll(TempFilesPath+paths[0], os.ModePerm)
assert.NoError(t, err)
b = wrapper.isBinlogImport(paths)
assert.False(t, b)
// both the two path are created, success
err = os.MkdirAll(TempFilesPath+paths[1], os.ModePerm)
assert.NoError(t, err)
b = wrapper.isBinlogImport(paths)
assert.True(t, b)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册