未验证 提交 8b695d36 编写于 作者: G groot 提交者: GitHub

Bulkinsert supports partition keys (#24995)

Signed-off-by: Nyhmo <yihua.mo@zilliz.com>
上级 c14a2511
此差异已折叠。
......@@ -444,12 +444,7 @@ func TestDataNode(t *testing.T) {
zap.String("response", resp.Response))
})
t.Run("Test Import", func(t *testing.T) {
node.rootCoord = &RootCoordFactory{
collectionID: 100,
pkType: schemapb.DataType_Int64,
}
content := []byte(`{
content := []byte(`{
"rows":[
{"bool_field": true, "int8_field": 10, "int16_field": 101, "int32_field": 1001, "int64_field": 10001, "float32_field": 3.14, "float64_field": 1.56, "varChar_field": "hello world", "binary_vector_field": [254, 0, 254, 0], "float_vector_field": [1.1, 1.2]},
{"bool_field": false, "int8_field": 11, "int16_field": 102, "int32_field": 1002, "int64_field": 10002, "float32_field": 3.15, "float64_field": 2.56, "varChar_field": "hello world", "binary_vector_field": [253, 0, 253, 0], "float_vector_field": [2.1, 2.2]},
......@@ -459,6 +454,13 @@ func TestDataNode(t *testing.T) {
]
}`)
t.Run("Test Import", func(t *testing.T) {
node.reportImportRetryTimes = 1 // reduce data node test time cost
node.rootCoord = &RootCoordFactory{
collectionID: 100,
pkType: schemapb.DataType_Int64,
}
chName1 := "fake-by-dev-rootcoord-dml-testimport-1"
chName2 := "fake-by-dev-rootcoord-dml-testimport-2"
err := node.flowgraphManager.addAndStart(node, &datapb.VchannelInfo{
......@@ -493,16 +495,6 @@ func TestDataNode(t *testing.T) {
RowBased: true,
},
}
node.rootCoord.(*RootCoordFactory).ReportImportErr = true
_, err = node.Import(node.ctx, req)
assert.NoError(t, err)
node.rootCoord.(*RootCoordFactory).ReportImportErr = false
node.rootCoord.(*RootCoordFactory).ReportImportNotSuccess = true
_, err = node.Import(context.WithValue(ctx, ctxKey{}, ""), req)
assert.NoError(t, err)
node.rootCoord.(*RootCoordFactory).ReportImportNotSuccess = false
node.dataCoord.(*DataCoordFactory).AddSegmentError = true
_, err = node.Import(context.WithValue(ctx, ctxKey{}, ""), req)
assert.NoError(t, err)
......@@ -524,6 +516,42 @@ func TestDataNode(t *testing.T) {
assert.Equal(t, "", stat.GetReason())
})
t.Run("Test Import show partitions error", func(t *testing.T) {
node.rootCoord = &RootCoordFactory{
ShowPartitionsErr: true,
}
_, err := node.getPartitions(context.Background(), "", "")
assert.Error(t, err)
node.rootCoord = &RootCoordFactory{
ShowPartitionsNotSuccess: true,
}
_, err = node.getPartitions(context.Background(), "", "")
assert.Error(t, err)
node.rootCoord = &RootCoordFactory{
ShowPartitionsNames: []string{"a", "b"},
ShowPartitionsIDs: []int64{1},
}
_, err = node.getPartitions(context.Background(), "", "")
assert.Error(t, err)
node.rootCoord = &RootCoordFactory{
ShowPartitionsNames: []string{"a", "b"},
ShowPartitionsIDs: []int64{1, 2},
}
partitions, err := node.getPartitions(context.Background(), "", "")
assert.NoError(t, err)
assert.Contains(t, partitions, "a")
assert.Equal(t, int64(1), partitions["a"])
assert.Contains(t, partitions, "b")
assert.Equal(t, int64(2), partitions["b"])
})
t.Run("Test Import bad flow graph", func(t *testing.T) {
node.rootCoord = &RootCoordFactory{
collectionID: 100,
......@@ -552,16 +580,6 @@ func TestDataNode(t *testing.T) {
_, ok = node.flowgraphManager.getFlowgraphService(chName2)
assert.True(t, ok)
content := []byte(`{
"rows":[
{"bool_field": true, "int8_field": 10, "int16_field": 101, "int32_field": 1001, "int64_field": 10001, "float32_field": 3.14, "float64_field": 1.56, "varChar_field": "hello world", "binary_vector_field": [254, 0, 254, 0], "float_vector_field": [1.1, 1.2]},
{"bool_field": false, "int8_field": 11, "int16_field": 102, "int32_field": 1002, "int64_field": 10002, "float32_field": 3.15, "float64_field": 2.56, "varChar_field": "hello world", "binary_vector_field": [253, 0, 253, 0], "float_vector_field": [2.1, 2.2]},
{"bool_field": true, "int8_field": 12, "int16_field": 103, "int32_field": 1003, "int64_field": 10003, "float32_field": 3.16, "float64_field": 3.56, "varChar_field": "hello world", "binary_vector_field": [252, 0, 252, 0], "float_vector_field": [3.1, 3.2]},
{"bool_field": false, "int8_field": 13, "int16_field": 104, "int32_field": 1004, "int64_field": 10004, "float32_field": 3.17, "float64_field": 4.56, "varChar_field": "hello world", "binary_vector_field": [251, 0, 251, 0], "float_vector_field": [4.1, 4.2]},
{"bool_field": true, "int8_field": 14, "int16_field": 105, "int32_field": 1005, "int64_field": 10005, "float32_field": 3.18, "float64_field": 5.56, "varChar_field": "hello world", "binary_vector_field": [250, 0, 250, 0], "float_vector_field": [5.1, 5.2]}
]
}`)
filePath := filepath.Join(node.chunkManager.RootPath(), "rows_1.json")
err = node.chunkManager.Write(ctx, filePath, content)
assert.NoError(t, err)
......@@ -586,15 +604,6 @@ func TestDataNode(t *testing.T) {
pkType: schemapb.DataType_Int64,
ReportImportErr: true,
}
content := []byte(`{
"rows":[
{"bool_field": true, "int8_field": 10, "int16_field": 101, "int32_field": 1001, "int64_field": 10001, "float32_field": 3.14, "float64_field": 1.56, "varChar_field": "hello world", "binary_vector_field": [254, 0, 254, 0], "float_vector_field": [1.1, 1.2]},
{"bool_field": false, "int8_field": 11, "int16_field": 102, "int32_field": 1002, "int64_field": 10002, "float32_field": 3.15, "float64_field": 2.56, "varChar_field": "hello world", "binary_vector_field": [253, 0, 253, 0], "float_vector_field": [2.1, 2.2]},
{"bool_field": true, "int8_field": 12, "int16_field": 103, "int32_field": 1003, "int64_field": 10003, "float32_field": 3.16, "float64_field": 3.56, "varChar_field": "hello world", "binary_vector_field": [252, 0, 252, 0], "float_vector_field": [3.1, 3.2]},
{"bool_field": false, "int8_field": 13, "int16_field": 104, "int32_field": 1004, "int64_field": 10004, "float32_field": 3.17, "float64_field": 4.56, "varChar_field": "hello world", "binary_vector_field": [251, 0, 251, 0], "float_vector_field": [4.1, 4.2]},
{"bool_field": true, "int8_field": 14, "int16_field": 105, "int32_field": 1005, "int64_field": 10005, "float32_field": 3.18, "float64_field": 5.56, "varChar_field": "hello world", "binary_vector_field": [250, 0, 250, 0], "float_vector_field": [5.1, 5.2]}
]
}`)
filePath := filepath.Join(node.chunkManager.RootPath(), "rows_1.json")
err = node.chunkManager.Write(ctx, filePath, content)
......
......@@ -208,6 +208,11 @@ type RootCoordFactory struct {
ReportImportErr bool
ReportImportNotSuccess bool
ShowPartitionsErr bool
ShowPartitionsNotSuccess bool
ShowPartitionsNames []string
ShowPartitionsIDs []int64
}
type DataCoordFactory struct {
......@@ -1135,6 +1140,33 @@ func (m *RootCoordFactory) DescribeCollectionInternal(ctx context.Context, in *m
return resp, nil
}
func (m *RootCoordFactory) ShowPartitions(ctx context.Context, req *milvuspb.ShowPartitionsRequest) (*milvuspb.ShowPartitionsResponse, error) {
if m.ShowPartitionsErr {
return &milvuspb.ShowPartitionsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
}, fmt.Errorf("mock show partitions error")
}
if m.ShowPartitionsNotSuccess {
return &milvuspb.ShowPartitionsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "not success",
},
}, nil
}
return &milvuspb.ShowPartitionsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
PartitionNames: m.ShowPartitionsNames,
PartitionIDs: m.ShowPartitionsIDs,
}, nil
}
func (m *RootCoordFactory) GetComponentStates(ctx context.Context) (*milvuspb.ComponentStates, error) {
return &milvuspb.ComponentStates{
State: &milvuspb.ComponentInfo{},
......
......@@ -572,6 +572,7 @@ message ImportTask {
int64 task_id = 6; // id of the task
repeated string files = 7; // file paths to be imported
repeated common.KeyValuePair infos = 8; // extra information about the task, bucket, etc.
string database_name = 16; // Database name
}
message ImportTaskState {
......@@ -598,6 +599,7 @@ message ImportTaskInfo {
string partition_name = 13; // Partition name for the import task.
repeated common.KeyValuePair infos = 14; // extra information about the task, bucket, etc.
int64 start_ts = 15; // Timestamp when the import task is sent to datanode to execute.
string database_name = 16; // Database name
}
message ImportTaskResponse {
......
......@@ -1127,19 +1127,9 @@ func getDefaultPartitionsInPartitionKeyMode(ctx context.Context, dbName string,
}
// Make sure the order of the partition names got every time is the same
partitionNames := make([]string, len(partitions))
for partitionName := range partitions {
splits := strings.Split(partitionName, "_")
if len(splits) < 2 {
err = fmt.Errorf("bad default partion name in partition ket mode: %s", partitionName)
return nil, err
}
index, err := strconv.ParseInt(splits[len(splits)-1], 10, 64)
if err != nil {
return nil, err
}
partitionNames[index] = partitionName
partitionNames, _, err := typeutil.RearrangePartitionsForPartitionKey(partitions)
if err != nil {
return nil, err
}
return partitionNames, nil
......
......@@ -204,6 +204,7 @@ func (m *importManager) sendOutTasks(ctx context.Context) error {
TaskId: task.GetId(),
Files: task.GetFiles(),
Infos: task.GetInfos(),
DatabaseName: task.GetDatabaseName(),
}
// Get all busy dataNodes for reference.
......@@ -460,7 +461,8 @@ func (m *importManager) importJob(ctx context.Context, req *milvuspb.ImportReque
State: &datapb.ImportTaskState{
StateCode: commonpb.ImportState_ImportPending,
},
Infos: req.Options,
Infos: req.Options,
DatabaseName: req.GetDbName(),
}
// Here no need to check error returned by setCollectionPartitionName(),
......@@ -498,7 +500,8 @@ func (m *importManager) importJob(ctx context.Context, req *milvuspb.ImportReque
State: &datapb.ImportTaskState{
StateCode: commonpb.ImportState_ImportPending,
},
Infos: req.Options,
Infos: req.Options,
DatabaseName: req.GetDbName(),
}
// Here no need to check error returned by setCollectionPartitionName(),
// since here we always return task list to client no matter something missed.
......
......@@ -1862,37 +1862,31 @@ func (c *Core) Import(ctx context.Context, req *milvuspb.ImportRequest) (*milvus
}
return ret, nil
}
} else {
// In v2.2.9, bulkdinsert cannot support partition key, return error to client.
// Remove the following lines after bulkinsert can support partition key
for _, field := range colInfo.Fields {
if field.IsPartitionKey {
log.Info("partition key is not yet supported by bulkinsert",
zap.String("collection name", req.GetCollectionName()),
zap.String("partition key", field.Name))
ret := &milvuspb.ImportResponse{
Status: failStatus(commonpb.ErrorCode_UnexpectedError,
fmt.Sprintf("the collection '%s' contains partition key '%s', partition key is not yet supported by bulkinsert",
req.GetCollectionName(), field.Name)),
}
return ret, nil
}
}
// Remove the upper lines after bulkinsert can support partition key
}
cID := colInfo.CollectionID
req.ChannelNames = c.meta.GetCollectionVirtualChannels(cID)
if req.GetPartitionName() == "" {
req.PartitionName = Params.CommonCfg.DefaultPartitionName
hasPartitionKey := false
for _, field := range colInfo.Fields {
if field.IsPartitionKey {
hasPartitionKey = true
break
}
}
var pID UniqueID
if pID, err = c.meta.GetPartitionByName(cID, req.GetPartitionName(), typeutil.MaxTimestamp); err != nil {
log.Error("failed to get partition ID from its name",
zap.String("partition name", req.GetPartitionName()),
zap.Error(err))
return nil, err
if !hasPartitionKey {
if req.GetPartitionName() == "" {
req.PartitionName = Params.CommonCfg.DefaultPartitionName
}
if pID, err = c.meta.GetPartitionByName(cID, req.GetPartitionName(), typeutil.MaxTimestamp); err != nil {
log.Error("failed to get partition ID from its name",
zap.String("partition name", req.GetPartitionName()),
zap.Error(err))
return nil, err
}
}
log.Info("RootCoord receive import request",
zap.String("collection name", req.GetCollectionName()),
zap.Int64("collection ID", cID),
......
......@@ -1024,8 +1024,9 @@ func TestCore_Import(t *testing.T) {
ctx := context.Background()
c := newTestCore(withHealthyCode(),
withMeta(meta))
meta.GetCollectionIDByNameFunc = func(name string) (UniqueID, error) {
return 100, nil
coll := &model.Collection{Name: "a-good-name"}
meta.GetCollectionByNameFunc = func(ctx context.Context, collectionName string, ts Timestamp) (*model.Collection, error) {
return coll, nil
}
meta.GetCollectionVirtualChannelsFunc = func(colID int64) []string {
return []string{"ch-1", "ch-2"}
......
......@@ -38,7 +38,7 @@ type BinlogFile struct {
func NewBinlogFile(chunkManager storage.ChunkManager) (*BinlogFile, error) {
if chunkManager == nil {
log.Error("Binlog file: chunk manager pointer is nil")
log.Warn("Binlog file: chunk manager pointer is nil")
return nil, errors.New("chunk manager pointer is nil")
}
......@@ -52,20 +52,20 @@ func NewBinlogFile(chunkManager storage.ChunkManager) (*BinlogFile, error) {
func (p *BinlogFile) Open(filePath string) error {
p.Close()
if len(filePath) == 0 {
log.Error("Binlog file: binlog path is empty")
log.Warn("Binlog file: binlog path is empty")
return errors.New("binlog path is empty")
}
// TODO add context
bytes, err := p.chunkManager.Read(context.TODO(), filePath)
if err != nil {
log.Error("Binlog file: failed to open binlog", zap.String("filePath", filePath), zap.Error(err))
log.Warn("Binlog file: failed to open binlog", zap.String("filePath", filePath), zap.Error(err))
return fmt.Errorf("failed to open binlog %s", filePath)
}
p.reader, err = storage.NewBinlogReader(bytes)
if err != nil {
log.Error("Binlog file: failed to initialize binlog reader", zap.String("filePath", filePath), zap.Error(err))
log.Warn("Binlog file: failed to initialize binlog reader", zap.String("filePath", filePath), zap.Error(err))
return fmt.Errorf("failed to initialize binlog reader for binlog %s, error: %w", filePath, err)
}
......@@ -93,7 +93,7 @@ func (p *BinlogFile) DataType() schemapb.DataType {
// A binlog is designed to support multiple blocks, but so far each binlog always contains only one block.
func (p *BinlogFile) ReadBool() ([]bool, error) {
if p.reader == nil {
log.Error("Binlog file: binlog reader not yet initialized")
log.Warn("Binlog file: binlog reader not yet initialized")
return nil, errors.New("binlog reader not yet initialized")
}
......@@ -101,7 +101,7 @@ func (p *BinlogFile) ReadBool() ([]bool, error) {
for {
event, err := p.reader.NextEventReader()
if err != nil {
log.Error("Binlog file: failed to iterate events reader", zap.Error(err))
log.Warn("Binlog file: failed to iterate events reader", zap.Error(err))
return nil, fmt.Errorf("failed to iterate events reader, error: %w", err)
}
......@@ -111,18 +111,18 @@ func (p *BinlogFile) ReadBool() ([]bool, error) {
}
if event.TypeCode != storage.InsertEventType {
log.Error("Binlog file: binlog file is not insert log")
log.Warn("Binlog file: binlog file is not insert log")
return nil, errors.New("binlog file is not insert log")
}
if p.DataType() != schemapb.DataType_Bool {
log.Error("Binlog file: binlog data type is not bool")
log.Warn("Binlog file: binlog data type is not bool")
return nil, errors.New("binlog data type is not bool")
}
data, err := event.PayloadReaderInterface.GetBoolFromPayload()
if err != nil {
log.Error("Binlog file: failed to read bool data", zap.Error(err))
log.Warn("Binlog file: failed to read bool data", zap.Error(err))
return nil, fmt.Errorf("failed to read bool data, error: %w", err)
}
......@@ -136,7 +136,7 @@ func (p *BinlogFile) ReadBool() ([]bool, error) {
// A binlog is designed to support multiple blocks, but so far each binlog always contains only one block.
func (p *BinlogFile) ReadInt8() ([]int8, error) {
if p.reader == nil {
log.Error("Binlog file: binlog reader not yet initialized")
log.Warn("Binlog file: binlog reader not yet initialized")
return nil, errors.New("binlog reader not yet initialized")
}
......@@ -144,7 +144,7 @@ func (p *BinlogFile) ReadInt8() ([]int8, error) {
for {
event, err := p.reader.NextEventReader()
if err != nil {
log.Error("Binlog file: failed to iterate events reader", zap.Error(err))
log.Warn("Binlog file: failed to iterate events reader", zap.Error(err))
return nil, fmt.Errorf("failed to iterate events reader, error: %w", err)
}
......@@ -154,18 +154,18 @@ func (p *BinlogFile) ReadInt8() ([]int8, error) {
}
if event.TypeCode != storage.InsertEventType {
log.Error("Binlog file: binlog file is not insert log")
log.Warn("Binlog file: binlog file is not insert log")
return nil, errors.New("binlog file is not insert log")
}
if p.DataType() != schemapb.DataType_Int8 {
log.Error("Binlog file: binlog data type is not int8")
log.Warn("Binlog file: binlog data type is not int8")
return nil, errors.New("binlog data type is not int8")
}
data, err := event.PayloadReaderInterface.GetInt8FromPayload()
if err != nil {
log.Error("Binlog file: failed to read int8 data", zap.Error(err))
log.Warn("Binlog file: failed to read int8 data", zap.Error(err))
return nil, fmt.Errorf("failed to read int8 data, error: %w", err)
}
......@@ -179,7 +179,7 @@ func (p *BinlogFile) ReadInt8() ([]int8, error) {
// A binlog is designed to support multiple blocks, but so far each binlog always contains only one block.
func (p *BinlogFile) ReadInt16() ([]int16, error) {
if p.reader == nil {
log.Error("Binlog file: binlog reader not yet initialized")
log.Warn("Binlog file: binlog reader not yet initialized")
return nil, errors.New("binlog reader not yet initialized")
}
......@@ -187,7 +187,7 @@ func (p *BinlogFile) ReadInt16() ([]int16, error) {
for {
event, err := p.reader.NextEventReader()
if err != nil {
log.Error("Binlog file: failed to iterate events reader", zap.Error(err))
log.Warn("Binlog file: failed to iterate events reader", zap.Error(err))
return nil, fmt.Errorf("failed to iterate events reader, error: %w", err)
}
......@@ -197,18 +197,18 @@ func (p *BinlogFile) ReadInt16() ([]int16, error) {
}
if event.TypeCode != storage.InsertEventType {
log.Error("Binlog file: binlog file is not insert log")
log.Warn("Binlog file: binlog file is not insert log")
return nil, errors.New("binlog file is not insert log")
}
if p.DataType() != schemapb.DataType_Int16 {
log.Error("Binlog file: binlog data type is not int16")
log.Warn("Binlog file: binlog data type is not int16")
return nil, errors.New("binlog data type is not int16")
}
data, err := event.PayloadReaderInterface.GetInt16FromPayload()
if err != nil {
log.Error("Binlog file: failed to read int16 data", zap.Error(err))
log.Warn("Binlog file: failed to read int16 data", zap.Error(err))
return nil, fmt.Errorf("failed to read int16 data, error: %w", err)
}
......@@ -222,7 +222,7 @@ func (p *BinlogFile) ReadInt16() ([]int16, error) {
// A binlog is designed to support multiple blocks, but so far each binlog always contains only one block.
func (p *BinlogFile) ReadInt32() ([]int32, error) {
if p.reader == nil {
log.Error("Binlog file: binlog reader not yet initialized")
log.Warn("Binlog file: binlog reader not yet initialized")
return nil, errors.New("binlog reader not yet initialized")
}
......@@ -230,7 +230,7 @@ func (p *BinlogFile) ReadInt32() ([]int32, error) {
for {
event, err := p.reader.NextEventReader()
if err != nil {
log.Error("Binlog file: failed to iterate events reader", zap.Error(err))
log.Warn("Binlog file: failed to iterate events reader", zap.Error(err))
return nil, fmt.Errorf("failed to iterate events reader, error: %w", err)
}
......@@ -240,18 +240,18 @@ func (p *BinlogFile) ReadInt32() ([]int32, error) {
}
if event.TypeCode != storage.InsertEventType {
log.Error("Binlog file: binlog file is not insert log")
log.Warn("Binlog file: binlog file is not insert log")
return nil, errors.New("binlog file is not insert log")
}
if p.DataType() != schemapb.DataType_Int32 {
log.Error("Binlog file: binlog data type is not int32")
log.Warn("Binlog file: binlog data type is not int32")
return nil, errors.New("binlog data type is not int32")
}
data, err := event.PayloadReaderInterface.GetInt32FromPayload()
if err != nil {
log.Error("Binlog file: failed to read int32 data", zap.Error(err))
log.Warn("Binlog file: failed to read int32 data", zap.Error(err))
return nil, fmt.Errorf("failed to read int32 data, error: %w", err)
}
......@@ -265,7 +265,7 @@ func (p *BinlogFile) ReadInt32() ([]int32, error) {
// A binlog is designed to support multiple blocks, but so far each binlog always contains only one block.
func (p *BinlogFile) ReadInt64() ([]int64, error) {
if p.reader == nil {
log.Error("Binlog file: binlog reader not yet initialized")
log.Warn("Binlog file: binlog reader not yet initialized")
return nil, errors.New("binlog reader not yet initialized")
}
......@@ -273,7 +273,7 @@ func (p *BinlogFile) ReadInt64() ([]int64, error) {
for {
event, err := p.reader.NextEventReader()
if err != nil {
log.Error("Binlog file: failed to iterate events reader", zap.Error(err))
log.Warn("Binlog file: failed to iterate events reader", zap.Error(err))
return nil, fmt.Errorf("failed to iterate events reader, error: %w", err)
}
......@@ -283,18 +283,18 @@ func (p *BinlogFile) ReadInt64() ([]int64, error) {
}
if event.TypeCode != storage.InsertEventType {
log.Error("Binlog file: binlog file is not insert log")
log.Warn("Binlog file: binlog file is not insert log")
return nil, errors.New("binlog file is not insert log")
}
if p.DataType() != schemapb.DataType_Int64 {
log.Error("Binlog file: binlog data type is not int64")
log.Warn("Binlog file: binlog data type is not int64")
return nil, errors.New("binlog data type is not int64")
}
data, err := event.PayloadReaderInterface.GetInt64FromPayload()
if err != nil {
log.Error("Binlog file: failed to read int64 data", zap.Error(err))
log.Warn("Binlog file: failed to read int64 data", zap.Error(err))
return nil, fmt.Errorf("failed to read int64 data, error: %w", err)
}
......@@ -308,7 +308,7 @@ func (p *BinlogFile) ReadInt64() ([]int64, error) {
// A binlog is designed to support multiple blocks, but so far each binlog always contains only one block.
func (p *BinlogFile) ReadFloat() ([]float32, error) {
if p.reader == nil {
log.Error("Binlog file: binlog reader not yet initialized")
log.Warn("Binlog file: binlog reader not yet initialized")
return nil, errors.New("binlog reader not yet initialized")
}
......@@ -316,7 +316,7 @@ func (p *BinlogFile) ReadFloat() ([]float32, error) {
for {
event, err := p.reader.NextEventReader()
if err != nil {
log.Error("Binlog file: failed to iterate events reader", zap.Error(err))
log.Warn("Binlog file: failed to iterate events reader", zap.Error(err))
return nil, fmt.Errorf("failed to iterate events reader, error: %w", err)
}
......@@ -326,18 +326,18 @@ func (p *BinlogFile) ReadFloat() ([]float32, error) {
}
if event.TypeCode != storage.InsertEventType {
log.Error("Binlog file: binlog file is not insert log")
log.Warn("Binlog file: binlog file is not insert log")
return nil, errors.New("binlog file is not insert log")
}
if p.DataType() != schemapb.DataType_Float {
log.Error("Binlog file: binlog data type is not float")
log.Warn("Binlog file: binlog data type is not float")
return nil, errors.New("binlog data type is not float")
}
data, err := event.PayloadReaderInterface.GetFloatFromPayload()
if err != nil {
log.Error("Binlog file: failed to read float data", zap.Error(err))
log.Warn("Binlog file: failed to read float data", zap.Error(err))
return nil, fmt.Errorf("failed to read float data, error: %w", err)
}
......@@ -351,7 +351,7 @@ func (p *BinlogFile) ReadFloat() ([]float32, error) {
// A binlog is designed to support multiple blocks, but so far each binlog always contains only one block.
func (p *BinlogFile) ReadDouble() ([]float64, error) {
if p.reader == nil {
log.Error("Binlog file: binlog reader not yet initialized")
log.Warn("Binlog file: binlog reader not yet initialized")
return nil, errors.New("binlog reader not yet initialized")
}
......@@ -359,7 +359,7 @@ func (p *BinlogFile) ReadDouble() ([]float64, error) {
for {
event, err := p.reader.NextEventReader()
if err != nil {
log.Error("Binlog file: failed to iterate events reader", zap.Error(err))
log.Warn("Binlog file: failed to iterate events reader", zap.Error(err))
return nil, fmt.Errorf("failed to iterate events reader, error: %w", err)
}
......@@ -369,18 +369,18 @@ func (p *BinlogFile) ReadDouble() ([]float64, error) {
}
if event.TypeCode != storage.InsertEventType {
log.Error("Binlog file: binlog file is not insert log")
log.Warn("Binlog file: binlog file is not insert log")
return nil, errors.New("binlog file is not insert log")
}
if p.DataType() != schemapb.DataType_Double {
log.Error("Binlog file: binlog data type is not double")
log.Warn("Binlog file: binlog data type is not double")
return nil, errors.New("binlog data type is not double")
}
data, err := event.PayloadReaderInterface.GetDoubleFromPayload()
if err != nil {
log.Error("Binlog file: failed to read double data", zap.Error(err))
log.Warn("Binlog file: failed to read double data", zap.Error(err))
return nil, fmt.Errorf("failed to read double data, error: %w", err)
}
......@@ -394,7 +394,7 @@ func (p *BinlogFile) ReadDouble() ([]float64, error) {
// A binlog is designed to support multiple blocks, but so far each binlog always contains only one block.
func (p *BinlogFile) ReadVarchar() ([]string, error) {
if p.reader == nil {
log.Error("Binlog file: binlog reader not yet initialized")
log.Warn("Binlog file: binlog reader not yet initialized")
return nil, errors.New("binlog reader not yet initialized")
}
......@@ -402,7 +402,7 @@ func (p *BinlogFile) ReadVarchar() ([]string, error) {
for {
event, err := p.reader.NextEventReader()
if err != nil {
log.Error("Binlog file: failed to iterate events reader", zap.Error(err))
log.Warn("Binlog file: failed to iterate events reader", zap.Error(err))
return nil, fmt.Errorf("failed to iterate events reader, error: %w", err)
}
......@@ -413,18 +413,18 @@ func (p *BinlogFile) ReadVarchar() ([]string, error) {
// special case: delete event data type is varchar
if event.TypeCode != storage.InsertEventType && event.TypeCode != storage.DeleteEventType {
log.Error("Binlog file: binlog file is not insert log")
log.Warn("Binlog file: binlog file is not insert log")
return nil, errors.New("binlog file is not insert log")
}
if (p.DataType() != schemapb.DataType_VarChar) && (p.DataType() != schemapb.DataType_String) {
log.Error("Binlog file: binlog data type is not varchar")
log.Warn("Binlog file: binlog data type is not varchar")
return nil, errors.New("binlog data type is not varchar")
}
data, err := event.PayloadReaderInterface.GetStringFromPayload()
if err != nil {
log.Error("Binlog file: failed to read varchar data", zap.Error(err))
log.Warn("Binlog file: failed to read varchar data", zap.Error(err))
return nil, fmt.Errorf("failed to read varchar data, error: %w", err)
}
......@@ -438,7 +438,7 @@ func (p *BinlogFile) ReadVarchar() ([]string, error) {
// A binlog is designed to support multiple blocks, but so far each binlog always contains only one block.
func (p *BinlogFile) ReadJSON() ([][]byte, error) {
if p.reader == nil {
log.Error("Binlog file: binlog reader not yet initialized")
log.Warn("Binlog file: binlog reader not yet initialized")
return nil, errors.New("binlog reader not yet initialized")
}
......@@ -446,7 +446,7 @@ func (p *BinlogFile) ReadJSON() ([][]byte, error) {
for {
event, err := p.reader.NextEventReader()
if err != nil {
log.Error("Binlog file: failed to iterate events reader", zap.Error(err))
log.Warn("Binlog file: failed to iterate events reader", zap.Error(err))
return nil, fmt.Errorf("failed to iterate events reader, error: %w", err)
}
......@@ -456,18 +456,18 @@ func (p *BinlogFile) ReadJSON() ([][]byte, error) {
}
if event.TypeCode != storage.InsertEventType {
log.Error("Binlog file: binlog file is not insert log")
log.Warn("Binlog file: binlog file is not insert log")
return nil, errors.New("binlog file is not insert log")
}
if p.DataType() != schemapb.DataType_JSON {
log.Error("Binlog file: binlog data type is not JSON")
log.Warn("Binlog file: binlog data type is not JSON")
return nil, errors.New("binlog data type is not JSON")
}
data, err := event.PayloadReaderInterface.GetJSONFromPayload()
if err != nil {
log.Error("Binlog file: failed to read JSON data", zap.Error(err))
log.Warn("Binlog file: failed to read JSON data", zap.Error(err))
return nil, fmt.Errorf("failed to read JSON data, error: %w", err)
}
......@@ -482,7 +482,7 @@ func (p *BinlogFile) ReadJSON() ([][]byte, error) {
// return vectors data and the dimension
func (p *BinlogFile) ReadBinaryVector() ([]byte, int, error) {
if p.reader == nil {
log.Error("Binlog file: binlog reader not yet initialized")
log.Warn("Binlog file: binlog reader not yet initialized")
return nil, 0, errors.New("binlog reader not yet initialized")
}
......@@ -491,7 +491,7 @@ func (p *BinlogFile) ReadBinaryVector() ([]byte, int, error) {
for {
event, err := p.reader.NextEventReader()
if err != nil {
log.Error("Binlog file: failed to iterate events reader", zap.Error(err))
log.Warn("Binlog file: failed to iterate events reader", zap.Error(err))
return nil, 0, fmt.Errorf("failed to iterate events reader, error: %w", err)
}
......@@ -501,18 +501,18 @@ func (p *BinlogFile) ReadBinaryVector() ([]byte, int, error) {
}
if event.TypeCode != storage.InsertEventType {
log.Error("Binlog file: binlog file is not insert log")
log.Warn("Binlog file: binlog file is not insert log")
return nil, 0, errors.New("binlog file is not insert log")
}
if p.DataType() != schemapb.DataType_BinaryVector {
log.Error("Binlog file: binlog data type is not binary vector")
log.Warn("Binlog file: binlog data type is not binary vector")
return nil, 0, errors.New("binlog data type is not binary vector")
}
data, dimenson, err := event.PayloadReaderInterface.GetBinaryVectorFromPayload()
if err != nil {
log.Error("Binlog file: failed to read binary vector data", zap.Error(err))
log.Warn("Binlog file: failed to read binary vector data", zap.Error(err))
return nil, 0, fmt.Errorf("failed to read binary vector data, error: %w", err)
}
......@@ -528,7 +528,7 @@ func (p *BinlogFile) ReadBinaryVector() ([]byte, int, error) {
// return vectors data and the dimension
func (p *BinlogFile) ReadFloatVector() ([]float32, int, error) {
if p.reader == nil {
log.Error("Binlog file: binlog reader not yet initialized")
log.Warn("Binlog file: binlog reader not yet initialized")
return nil, 0, errors.New("binlog reader not yet initialized")
}
......@@ -537,7 +537,7 @@ func (p *BinlogFile) ReadFloatVector() ([]float32, int, error) {
for {
event, err := p.reader.NextEventReader()
if err != nil {
log.Error("Binlog file: failed to iterate events reader", zap.Error(err))
log.Warn("Binlog file: failed to iterate events reader", zap.Error(err))
return nil, 0, fmt.Errorf("failed to iterate events reader, error: %w", err)
}
......@@ -547,18 +547,18 @@ func (p *BinlogFile) ReadFloatVector() ([]float32, int, error) {
}
if event.TypeCode != storage.InsertEventType {
log.Error("Binlog file: binlog file is not insert log")
log.Warn("Binlog file: binlog file is not insert log")
return nil, 0, errors.New("binlog file is not insert log")
}
if p.DataType() != schemapb.DataType_FloatVector {
log.Error("Binlog file: binlog data type is not float vector")
log.Warn("Binlog file: binlog data type is not float vector")
return nil, 0, errors.New("binlog data type is not float vector")
}
data, dimension, err := event.PayloadReaderInterface.GetFloatVectorFromPayload()
if err != nil {
log.Error("Binlog file: failed to read float vector data", zap.Error(err))
log.Warn("Binlog file: failed to read float vector data", zap.Error(err))
return nil, 0, fmt.Errorf("failed to read float vector data, error: %w", err)
}
......
......@@ -25,20 +25,19 @@ import (
"strconv"
"strings"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/storage"
"go.uber.org/zap"
)
type BinlogParser struct {
ctx context.Context // for canceling parse process
collectionSchema *schemapb.CollectionSchema // collection schema
shardNum int32 // sharding number of the collection
blockSize int64 // maximum size of a read block(unit:byte)
chunkManager storage.ChunkManager // storage interfaces to browse/read the files
callFlushFunc ImportFlushFunc // call back function to flush segment
updateProgressFunc func(percent int64) // update working progress percent value
ctx context.Context // for canceling parse process
collectionInfo *CollectionInfo // collection details including schema
shardNum int32 // sharding number of the collection
blockSize int64 // maximum size of a read block(unit:byte)
chunkManager storage.ChunkManager // storage interfaces to browse/read the files
callFlushFunc ImportFlushFunc // call back function to flush segment
updateProgressFunc func(percent int64) // update working progress percent value
// a timestamp to define the start time point of restore, data before this time point will be ignored
// set this value to 0, all the data will be imported
......@@ -54,44 +53,43 @@ type BinlogParser struct {
}
func NewBinlogParser(ctx context.Context,
collectionSchema *schemapb.CollectionSchema,
shardNum int32,
collectionInfo *CollectionInfo,
blockSize int64,
chunkManager storage.ChunkManager,
flushFunc ImportFlushFunc,
updateProgressFunc func(percent int64),
tsStartPoint uint64,
tsEndPoint uint64) (*BinlogParser, error) {
if collectionSchema == nil {
log.Error("Binlog parser: collection schema is nil")
if collectionInfo == nil {
log.Warn("Binlog parser: collection schema is nil")
return nil, errors.New("collection schema is nil")
}
if chunkManager == nil {
log.Error("Binlog parser: chunk manager pointer is nil")
log.Warn("Binlog parser: chunk manager pointer is nil")
return nil, errors.New("chunk manager pointer is nil")
}
if flushFunc == nil {
log.Error("Binlog parser: flush function is nil")
log.Warn("Binlog parser: flush function is nil")
return nil, errors.New("flush function is nil")
}
if tsStartPoint > tsEndPoint {
log.Error("Binlog parser: the tsStartPoint should be less than tsEndPoint",
log.Warn("Binlog parser: the tsStartPoint should be less than tsEndPoint",
zap.Uint64("tsStartPoint", tsStartPoint), zap.Uint64("tsEndPoint", tsEndPoint))
return nil, fmt.Errorf("Binlog parser: the tsStartPoint %d should be less than tsEndPoint %d", tsStartPoint, tsEndPoint)
}
v := &BinlogParser{
ctx: ctx,
collectionSchema: collectionSchema,
shardNum: shardNum,
blockSize: blockSize,
chunkManager: chunkManager,
callFlushFunc: flushFunc,
tsStartPoint: tsStartPoint,
tsEndPoint: tsEndPoint,
ctx: ctx,
collectionInfo: collectionInfo,
blockSize: blockSize,
chunkManager: chunkManager,
callFlushFunc: flushFunc,
updateProgressFunc: updateProgressFunc,
tsStartPoint: tsStartPoint,
tsEndPoint: tsEndPoint,
}
return v, nil
......@@ -120,7 +118,7 @@ func (p *BinlogParser) constructSegmentHolders(insertlogRoot string, deltalogRoo
// TODO add context
insertlogs, _, err := p.chunkManager.ListWithPrefix(context.TODO(), insertlogRoot, true)
if err != nil {
log.Error("Binlog parser: list insert logs error", zap.Error(err))
log.Warn("Binlog parser: list insert logs error", zap.Error(err))
return nil, fmt.Errorf("failed to list insert logs with root path %s, error: %w", insertlogRoot, err)
}
......@@ -138,7 +136,7 @@ func (p *BinlogParser) constructSegmentHolders(insertlogRoot string, deltalogRoo
fieldStrID := path.Base(fieldPath)
fieldID, err := strconv.ParseInt(fieldStrID, 10, 64)
if err != nil {
log.Error("Binlog parser: failed to parse field id", zap.String("fieldPath", fieldPath), zap.Error(err))
log.Warn("Binlog parser: failed to parse field id", zap.String("fieldPath", fieldPath), zap.Error(err))
return nil, fmt.Errorf("failed to parse field id from insert log path %s, error: %w", insertlog, err)
}
......@@ -146,7 +144,7 @@ func (p *BinlogParser) constructSegmentHolders(insertlogRoot string, deltalogRoo
segmentStrID := path.Base(segmentPath)
segmentID, err := strconv.ParseInt(segmentStrID, 10, 64)
if err != nil {
log.Error("Binlog parser: failed to parse segment id", zap.String("segmentPath", segmentPath), zap.Error(err))
log.Warn("Binlog parser: failed to parse segment id", zap.String("segmentPath", segmentPath), zap.Error(err))
return nil, fmt.Errorf("failed to parse segment id from insert log path %s, error: %w", insertlog, err)
}
......@@ -185,7 +183,7 @@ func (p *BinlogParser) constructSegmentHolders(insertlogRoot string, deltalogRoo
// TODO add context
deltalogs, _, err := p.chunkManager.ListWithPrefix(context.TODO(), deltalogRoot, true)
if err != nil {
log.Error("Binlog parser: failed to list delta logs", zap.Error(err))
log.Warn("Binlog parser: failed to list delta logs", zap.Error(err))
return nil, fmt.Errorf("failed to list delta logs, error: %w", err)
}
......@@ -196,7 +194,7 @@ func (p *BinlogParser) constructSegmentHolders(insertlogRoot string, deltalogRoo
segmentStrID := path.Base(segmentPath)
segmentID, err := strconv.ParseInt(segmentStrID, 10, 64)
if err != nil {
log.Error("Binlog parser: failed to parse segment id", zap.String("segmentPath", segmentPath), zap.Error(err))
log.Warn("Binlog parser: failed to parse segment id", zap.String("segmentPath", segmentPath), zap.Error(err))
return nil, fmt.Errorf("failed to parse segment id from delta log path %s, error: %w", deltalog, err)
}
......@@ -220,14 +218,14 @@ func (p *BinlogParser) constructSegmentHolders(insertlogRoot string, deltalogRoo
func (p *BinlogParser) parseSegmentFiles(segmentHolder *SegmentFilesHolder) error {
if segmentHolder == nil {
log.Error("Binlog parser: segment files holder is nil")
log.Warn("Binlog parser: segment files holder is nil")
return errors.New("segment files holder is nil")
}
adapter, err := NewBinlogAdapter(p.ctx, p.collectionSchema, p.shardNum, p.blockSize,
adapter, err := NewBinlogAdapter(p.ctx, p.collectionInfo, p.blockSize,
MaxTotalSizeInMemory, p.chunkManager, p.callFlushFunc, p.tsStartPoint, p.tsEndPoint)
if err != nil {
log.Error("Binlog parser: failed to create binlog adapter", zap.Error(err))
log.Warn("Binlog parser: failed to create binlog adapter", zap.Error(err))
return fmt.Errorf("failed to create binlog adapter, error: %w", err)
}
......@@ -239,7 +237,7 @@ func (p *BinlogParser) parseSegmentFiles(segmentHolder *SegmentFilesHolder) erro
// 2. the delta log path of a partiion (optional)
func (p *BinlogParser) Parse(filePaths []string) error {
if len(filePaths) != 1 && len(filePaths) != 2 {
log.Error("Binlog parser: illegal paths for binlog import, partition binlog path and delta path are required")
log.Warn("Binlog parser: illegal paths for binlog import, partition binlog path and delta path are required")
return errors.New("illegal paths for binlog import, partition binlog path and delta path are required")
}
......@@ -271,7 +269,6 @@ func (p *BinlogParser) Parse(filePaths []string) error {
if err != nil {
return err
}
updateProgress(i + 1)
// trigger gb after each segment finished
......
......@@ -24,38 +24,40 @@ import (
"testing"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/stretchr/testify/assert"
)
func Test_NewBinlogParser(t *testing.T) {
func Test_BinlogParserNew(t *testing.T) {
ctx := context.Background()
// nil schema
parser, err := NewBinlogParser(ctx, nil, 2, 1024, nil, nil, nil, 0, math.MaxUint64)
parser, err := NewBinlogParser(ctx, nil, 1024, nil, nil, nil, 0, math.MaxUint64)
assert.Nil(t, parser)
assert.NotNil(t, err)
collectionInfo, err := NewCollectionInfo(sampleSchema(), 2, []int64{1})
assert.NoError(t, err)
// nil chunkmanager
parser, err = NewBinlogParser(ctx, sampleSchema(), 2, 1024, nil, nil, nil, 0, math.MaxUint64)
parser, err = NewBinlogParser(ctx, collectionInfo, 1024, nil, nil, nil, 0, math.MaxUint64)
assert.Nil(t, parser)
assert.NotNil(t, err)
// nil flushfunc
parser, err = NewBinlogParser(ctx, sampleSchema(), 2, 1024, &MockChunkManager{}, nil, nil, 0, math.MaxUint64)
parser, err = NewBinlogParser(ctx, collectionInfo, 1024, &MockChunkManager{}, nil, nil, 0, math.MaxUint64)
assert.Nil(t, parser)
assert.NotNil(t, err)
// succeed
flushFunc := func(fields map[storage.FieldID]storage.FieldData, shardID int) error {
flushFunc := func(fields BlockData, shardID int, partID int64) error {
return nil
}
parser, err = NewBinlogParser(ctx, sampleSchema(), 2, 1024, &MockChunkManager{}, flushFunc, nil, 0, math.MaxUint64)
parser, err = NewBinlogParser(ctx, collectionInfo, 1024, &MockChunkManager{}, flushFunc, nil, 0, math.MaxUint64)
assert.NotNil(t, parser)
assert.Nil(t, err)
assert.NoError(t, err)
// tsStartPoint larger than tsEndPoint
parser, err = NewBinlogParser(ctx, sampleSchema(), 2, 1024, &MockChunkManager{}, flushFunc, nil, 2, 1)
parser, err = NewBinlogParser(ctx, collectionInfo, 1024, &MockChunkManager{}, flushFunc, nil, 2, 1)
assert.Nil(t, parser)
assert.NotNil(t, err)
}
......@@ -63,7 +65,7 @@ func Test_NewBinlogParser(t *testing.T) {
func Test_BinlogParserConstructHolders(t *testing.T) {
ctx := context.Background()
flushFunc := func(fields map[storage.FieldID]storage.FieldData, shardID int) error {
flushFunc := func(fields BlockData, shardID int, partID int64) error {
return nil
}
......@@ -127,12 +129,15 @@ func Test_BinlogParserConstructHolders(t *testing.T) {
"backup/bak1/data/delta_log/435978159196147009/435978159196147010/435978159261483009/434574382554415105",
}
parser, err := NewBinlogParser(ctx, sampleSchema(), 2, 1024, chunkManager, flushFunc, nil, 0, math.MaxUint64)
collectionInfo, err := NewCollectionInfo(sampleSchema(), 2, []int64{1})
assert.NoError(t, err)
parser, err := NewBinlogParser(ctx, collectionInfo, 1024, chunkManager, flushFunc, nil, 0, math.MaxUint64)
assert.NotNil(t, parser)
assert.Nil(t, err)
assert.NoError(t, err)
holders, err := parser.constructSegmentHolders(insertPath, deltaPath)
assert.Nil(t, err)
assert.NoError(t, err)
assert.Equal(t, 2, len(holders))
// verify the first segment
......@@ -178,7 +183,7 @@ func Test_BinlogParserConstructHolders(t *testing.T) {
func Test_BinlogParserConstructHoldersFailed(t *testing.T) {
ctx := context.Background()
flushFunc := func(fields map[storage.FieldID]storage.FieldData, shardID int) error {
flushFunc := func(fields BlockData, shardID int, partID int64) error {
return nil
}
......@@ -187,9 +192,12 @@ func Test_BinlogParserConstructHoldersFailed(t *testing.T) {
listResult: make(map[string][]string),
}
parser, err := NewBinlogParser(ctx, sampleSchema(), 2, 1024, chunkManager, flushFunc, nil, 0, math.MaxUint64)
collectionInfo, err := NewCollectionInfo(sampleSchema(), 2, []int64{1})
assert.NoError(t, err)
parser, err := NewBinlogParser(ctx, collectionInfo, 1024, chunkManager, flushFunc, nil, 0, math.MaxUint64)
assert.NotNil(t, parser)
assert.Nil(t, err)
assert.NoError(t, err)
insertPath := "insertPath"
deltaPath := "deltaPath"
......@@ -229,18 +237,21 @@ func Test_BinlogParserConstructHoldersFailed(t *testing.T) {
func Test_BinlogParserParseFilesFailed(t *testing.T) {
ctx := context.Background()
flushFunc := func(fields map[storage.FieldID]storage.FieldData, shardID int) error {
flushFunc := func(fields BlockData, shardID int, partID int64) error {
return nil
}
parser, err := NewBinlogParser(ctx, sampleSchema(), 2, 1024, &MockChunkManager{}, flushFunc, nil, 0, math.MaxUint64)
collectionInfo, err := NewCollectionInfo(sampleSchema(), 2, []int64{1})
assert.NoError(t, err)
parser, err := NewBinlogParser(ctx, collectionInfo, 1024, &MockChunkManager{}, flushFunc, nil, 0, math.MaxUint64)
assert.NotNil(t, parser)
assert.Nil(t, err)
assert.NoError(t, err)
err = parser.parseSegmentFiles(nil)
assert.NotNil(t, err)
parser.collectionSchema = nil
parser.collectionInfo = nil
err = parser.parseSegmentFiles(&SegmentFilesHolder{})
assert.NotNil(t, err)
}
......@@ -248,7 +259,7 @@ func Test_BinlogParserParseFilesFailed(t *testing.T) {
func Test_BinlogParserParse(t *testing.T) {
ctx := context.Background()
flushFunc := func(fields map[storage.FieldID]storage.FieldData, shardID int) error {
flushFunc := func(fields BlockData, shardID int, partID int64) error {
return nil
}
......@@ -267,13 +278,12 @@ func Test_BinlogParserParse(t *testing.T) {
},
},
}
collectionInfo, err := NewCollectionInfo(schema, 2, []int64{1})
assert.NoError(t, err)
updateProgress := func(percent int64) {
assert.Greater(t, percent, int64(0))
}
parser, err := NewBinlogParser(ctx, schema, 2, 1024, chunkManager, flushFunc, updateProgress, 0, math.MaxUint64)
parser, err := NewBinlogParser(ctx, collectionInfo, 1024, chunkManager, flushFunc, nil, 0, math.MaxUint64)
assert.NotNil(t, parser)
assert.Nil(t, err)
assert.NoError(t, err)
// zero paths
err = parser.Parse(nil)
......@@ -284,12 +294,12 @@ func Test_BinlogParserParse(t *testing.T) {
"insertPath",
}
err = parser.Parse(paths)
assert.Nil(t, err)
assert.NoError(t, err)
// two empty paths
paths = append(paths, "deltaPath")
err = parser.Parse(paths)
assert.Nil(t, err)
assert.NoError(t, err)
// wrong path
chunkManager.listResult = make(map[string][]string)
......@@ -301,18 +311,69 @@ func Test_BinlogParserParse(t *testing.T) {
// file not found
chunkManager.listResult["insertPath"] = []string{
"backup/bak1/data/insert_log/435978159196147009/435978159196147010/435978159261483008/0/435978159903735811",
"backup/bak1/data/insert_log/435978159196147009/435978159196147010/435978159261483008/1/435978159903735811",
"backup/bak1/data/insert_log/435978159196147009/435978159196147010/435978159261483008/101/435978159903735811",
"123/0/a",
"123/1/a",
"123/101/a",
}
err = parser.Parse(paths)
assert.NotNil(t, err)
// progress
rowCount := 100
fieldsData := createFieldsData(sampleSchema(), rowCount)
chunkManager.listResult["deltaPath"] = []string{}
chunkManager.listResult["insertPath"] = []string{
"123/0/a",
"123/1/a",
"123/102/a",
"123/103/a",
"123/104/a",
"123/105/a",
"123/106/a",
"123/107/a",
"123/108/a",
"123/109/a",
"123/110/a",
"123/111/a",
"123/112/a",
}
chunkManager.readBuf = map[string][]byte{
"123/0/a": createBinlogBuf(t, schemapb.DataType_Int64, fieldsData[106].([]int64)),
"123/1/a": createBinlogBuf(t, schemapb.DataType_Int64, fieldsData[106].([]int64)),
"123/102/a": createBinlogBuf(t, schemapb.DataType_Bool, fieldsData[102].([]bool)),
"123/103/a": createBinlogBuf(t, schemapb.DataType_Int8, fieldsData[103].([]int8)),
"123/104/a": createBinlogBuf(t, schemapb.DataType_Int16, fieldsData[104].([]int16)),
"123/105/a": createBinlogBuf(t, schemapb.DataType_Int32, fieldsData[105].([]int32)),
"123/106/a": createBinlogBuf(t, schemapb.DataType_Int64, fieldsData[106].([]int64)), // this is primary key
"123/107/a": createBinlogBuf(t, schemapb.DataType_Float, fieldsData[107].([]float32)),
"123/108/a": createBinlogBuf(t, schemapb.DataType_Double, fieldsData[108].([]float64)),
"123/109/a": createBinlogBuf(t, schemapb.DataType_VarChar, fieldsData[109].([]string)),
"123/110/a": createBinlogBuf(t, schemapb.DataType_BinaryVector, fieldsData[110].([][]byte)),
"123/111/a": createBinlogBuf(t, schemapb.DataType_FloatVector, fieldsData[111].([][]float32)),
"123/112/a": createBinlogBuf(t, schemapb.DataType_JSON, fieldsData[112].([][]byte)),
}
callTime := 0
updateProgress := func(percent int64) {
assert.GreaterOrEqual(t, percent, int64(0))
assert.LessOrEqual(t, percent, int64(100))
callTime++
}
collectionInfo, err = NewCollectionInfo(sampleSchema(), 2, []int64{1})
assert.NoError(t, err)
parser, err = NewBinlogParser(ctx, collectionInfo, 1024, chunkManager, flushFunc, updateProgress, 0, math.MaxUint64)
assert.NotNil(t, parser)
assert.NoError(t, err)
err = parser.Parse(paths)
assert.NoError(t, err)
assert.Equal(t, 1, callTime)
}
func Test_BinlogParserSkipFlagFile(t *testing.T) {
ctx := context.Background()
flushFunc := func(fields map[storage.FieldID]storage.FieldData, shardID int) error {
flushFunc := func(fields BlockData, shardID int, partID int64) error {
return nil
}
......@@ -321,9 +382,12 @@ func Test_BinlogParserSkipFlagFile(t *testing.T) {
listResult: make(map[string][]string),
}
parser, err := NewBinlogParser(ctx, sampleSchema(), 2, 1024, chunkManager, flushFunc, nil, 0, math.MaxUint64)
collectionInfo, err := NewCollectionInfo(sampleSchema(), 2, []int64{1})
assert.NoError(t, err)
parser, err := NewBinlogParser(ctx, collectionInfo, 1024, chunkManager, flushFunc, nil, 0, math.MaxUint64)
assert.NotNil(t, parser)
assert.Nil(t, err)
assert.NoError(t, err)
insertPath := "insertPath"
deltaPath := "deltaPath"
......
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package importutil
import (
"errors"
"fmt"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
type CollectionInfo struct {
Schema *schemapb.CollectionSchema
ShardNum int32
PartitionIDs []int64 // target partitions of bulkinsert, one partition for non-partition-key collection, or all partiitons for partition-key collection
PrimaryKey *schemapb.FieldSchema
PartitionKey *schemapb.FieldSchema
DynamicField *schemapb.FieldSchema
Name2FieldID map[string]int64 // this member is for Numpy file name validation and JSON row validation
}
func DeduceTargetPartitions(partitions map[string]int64, collectionSchema *schemapb.CollectionSchema, defaultPartition int64) ([]int64, error) {
// if no partition key, rutrn the default partition ID as target partition
_, err := typeutil.GetPartitionFieldSchema(collectionSchema)
if err != nil {
return []int64{defaultPartition}, nil
}
_, partitionIDs, err := typeutil.RearrangePartitionsForPartitionKey(partitions)
if err != nil {
return nil, err
}
return partitionIDs, nil
}
func NewCollectionInfo(collectionSchema *schemapb.CollectionSchema,
shardNum int32,
partitionIDs []int64) (*CollectionInfo, error) {
if shardNum <= 0 {
return nil, fmt.Errorf("illegal shard number %d", shardNum)
}
if len(partitionIDs) == 0 {
return nil, errors.New("partition list is empty")
}
info := &CollectionInfo{
ShardNum: shardNum,
PartitionIDs: partitionIDs,
}
err := info.resetSchema(collectionSchema)
if err != nil {
return nil, err
}
return info, nil
}
func (c *CollectionInfo) resetSchema(collectionSchema *schemapb.CollectionSchema) error {
if collectionSchema == nil {
return errors.New("collection schema is null")
}
fields := make([]*schemapb.FieldSchema, 0)
name2FieldID := make(map[string]int64)
var primaryKey *schemapb.FieldSchema
var dynamicField *schemapb.FieldSchema
var partitionKey *schemapb.FieldSchema
for i := 0; i < len(collectionSchema.Fields); i++ {
schema := collectionSchema.Fields[i]
// RowIDField and TimeStampField is internal field, no need to parse
if schema.GetName() == common.RowIDFieldName || schema.GetName() == common.TimeStampFieldName {
continue
}
fields = append(fields, schema)
name2FieldID[schema.GetName()] = schema.GetFieldID()
if schema.GetIsPrimaryKey() {
primaryKey = schema
} else if schema.GetIsDynamic() {
dynamicField = schema
} else if schema.GetIsPartitionKey() {
partitionKey = schema
}
}
if primaryKey == nil {
return errors.New("collection schema has no primary key")
}
if partitionKey == nil && len(c.PartitionIDs) != 1 {
return errors.New("only allow one partition when there is no partition key")
}
c.Schema = &schemapb.CollectionSchema{
Name: collectionSchema.GetName(),
Description: collectionSchema.GetDescription(),
AutoID: collectionSchema.GetAutoID(),
Fields: fields,
EnableDynamicField: collectionSchema.GetEnableDynamicField(),
}
c.PrimaryKey = primaryKey
c.DynamicField = dynamicField
c.PartitionKey = partitionKey
c.Name2FieldID = name2FieldID
return nil
}
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package importutil
import (
"testing"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/stretchr/testify/assert"
)
func Test_DeduceTargetPartitions(t *testing.T) {
schema := sampleSchema()
partitions := map[string]int64{
"part_0": 100,
"part_1": 200,
}
partitionIDs, err := DeduceTargetPartitions(partitions, schema, int64(1))
assert.NoError(t, err)
assert.Equal(t, 1, len(partitionIDs))
assert.Equal(t, int64(1), partitionIDs[0])
schema.Fields[7].IsPartitionKey = true
partitionIDs, err = DeduceTargetPartitions(partitions, schema, int64(1))
assert.NoError(t, err)
assert.Equal(t, len(partitions), len(partitionIDs))
partitions = map[string]int64{
"part_a": 100,
}
partitionIDs, err = DeduceTargetPartitions(partitions, schema, int64(1))
assert.Error(t, err)
assert.Nil(t, partitionIDs)
}
func Test_CollectionInfoNew(t *testing.T) {
t.Run("succeed", func(t *testing.T) {
info, err := NewCollectionInfo(sampleSchema(), 2, []int64{1})
assert.NoError(t, err)
assert.NotNil(t, info)
assert.Greater(t, len(info.Name2FieldID), 0)
assert.Nil(t, info.PartitionKey)
assert.Nil(t, info.DynamicField)
assert.NotNil(t, info.PrimaryKey)
assert.Equal(t, int32(2), info.ShardNum)
assert.Equal(t, 1, len(info.PartitionIDs))
// has partition key, has dynamic field
schema := &schemapb.CollectionSchema{
Name: "schema",
Description: "schema",
Fields: []*schemapb.FieldSchema{
{
FieldID: 0,
Name: "RowID",
DataType: schemapb.DataType_Int64,
},
{
FieldID: 100,
Name: "ID",
IsPrimaryKey: true,
AutoID: false,
DataType: schemapb.DataType_Int64,
},
{
FieldID: 101,
Name: "PartitionKey",
IsPartitionKey: true,
DataType: schemapb.DataType_VarChar,
},
{
FieldID: 102,
Name: "$meta",
IsDynamic: true,
DataType: schemapb.DataType_JSON,
},
},
}
info, err = NewCollectionInfo(schema, 2, []int64{1, 2})
assert.NoError(t, err)
assert.NotNil(t, info)
assert.NotNil(t, info.PrimaryKey)
assert.NotNil(t, int64(100), info.PrimaryKey.GetFieldID())
assert.False(t, info.PrimaryKey.GetAutoID())
assert.NotNil(t, info.DynamicField)
assert.Equal(t, int64(102), info.DynamicField.GetFieldID())
assert.NotNil(t, info.PartitionKey)
assert.Equal(t, int64(101), info.PartitionKey.GetFieldID())
})
t.Run("error cases", func(t *testing.T) {
schema := sampleSchema()
// shard number is 0
info, err := NewCollectionInfo(schema, 0, []int64{1})
assert.Error(t, err)
assert.Nil(t, info)
// partiton ID list is empty
info, err = NewCollectionInfo(schema, 2, []int64{})
assert.Error(t, err)
assert.Nil(t, info)
// only allow one partition when there is no partition key
info, err = NewCollectionInfo(schema, 2, []int64{1, 2})
assert.Error(t, err)
assert.Nil(t, info)
// collection schema is nil
info, err = NewCollectionInfo(nil, 2, []int64{1})
assert.Error(t, err)
assert.Nil(t, info)
// no primary key
schema = &schemapb.CollectionSchema{
Name: "schema",
Description: "schema",
AutoID: true,
Fields: make([]*schemapb.FieldSchema, 0),
}
info, err = NewCollectionInfo(schema, 2, []int64{1})
assert.Error(t, err)
assert.Nil(t, info)
// partition key is nil
info, err = NewCollectionInfo(schema, 2, []int64{1, 2})
assert.Error(t, err)
assert.Nil(t, info)
})
}
......@@ -24,7 +24,7 @@ import (
"github.com/stretchr/testify/assert"
)
func TestValidateOptions(t *testing.T) {
func Test_ValidateOptions(t *testing.T) {
assert.NoError(t, ValidateOptions([]*commonpb.KeyValuePair{}))
assert.NoError(t, ValidateOptions([]*commonpb.KeyValuePair{
......@@ -51,9 +51,13 @@ func TestValidateOptions(t *testing.T) {
{Key: "start_ts", Value: "3.14"},
{Key: "end_ts", Value: "1666007457"},
}))
assert.Error(t, ValidateOptions([]*commonpb.KeyValuePair{
{Key: "start_ts", Value: "1666007457"},
{Key: "end_ts", Value: "3.14"},
}))
}
func TestParseTSFromOptions(t *testing.T) {
func Test_ParseTSFromOptions(t *testing.T) {
var tsStart uint64
var tsEnd uint64
var err error
......@@ -88,7 +92,7 @@ func TestParseTSFromOptions(t *testing.T) {
assert.Error(t, err)
}
func TestIsBackup(t *testing.T) {
func Test_IsBackup(t *testing.T) {
isBackup := IsBackup([]*commonpb.KeyValuePair{
{Key: "backup", Value: "true"},
})
......
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册