From 8204546bcd6972f388648cd99de6938155128d64 Mon Sep 17 00:00:00 2001 From: godchen Date: Mon, 12 Apr 2021 14:07:46 +0800 Subject: [PATCH] Add unit test for binlog module Signed-off-by: godchen --- internal/storage/binlog_writer_test.go | 5 +- internal/storage/print_binglog_test.go | 359 +++++++++++++++++++++++++ internal/storage/print_binlog.go | 19 +- 3 files changed, 373 insertions(+), 10 deletions(-) diff --git a/internal/storage/binlog_writer_test.go b/internal/storage/binlog_writer_test.go index 5b2f2ce40..613e41e83 100644 --- a/internal/storage/binlog_writer_test.go +++ b/internal/storage/binlog_writer_test.go @@ -23,10 +23,13 @@ func TestBinlogWriterReader(t *testing.T) { assert.NotNil(t, err) eventWriter.SetStartTimestamp(1000) eventWriter.SetEndTimestamp(2000) + nums, err := binlogWriter.GetRowNums() + assert.Nil(t, err) + assert.EqualValues(t, 3, nums) err = binlogWriter.Close() assert.Nil(t, err) assert.EqualValues(t, 1, binlogWriter.GetEventNums()) - nums, err := binlogWriter.GetRowNums() + nums, err = binlogWriter.GetRowNums() assert.Nil(t, err) assert.EqualValues(t, 3, nums) err = eventWriter.AddInt32ToPayload([]int32{1, 2, 3}) diff --git a/internal/storage/print_binglog_test.go b/internal/storage/print_binglog_test.go index 3f4b8bf77..376dd3194 100644 --- a/internal/storage/print_binglog_test.go +++ b/internal/storage/print_binglog_test.go @@ -1,11 +1,16 @@ package storage import ( + "fmt" "os" "testing" "time" + "github.com/golang/protobuf/proto" "github.com/stretchr/testify/assert" + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" + "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb" + "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" "github.com/zilliztech/milvus-distributed/internal/util/tsoutil" ) @@ -55,4 +60,358 @@ func TestPrintBinlogFilesInt64(t *testing.T) { assert.Equal(t, num, len(buf)) err = fd.Close() assert.Nil(t, err) + +} + +func TestPrintBinlogFiles(t *testing.T) { + Schema := &etcdpb.CollectionMeta{ + ID: 1, + CreateTime: 1, + SegmentIDs: []int64{0, 1}, + PartitionTags: []string{"partition_0", "partition_1"}, + Schema: &schemapb.CollectionSchema{ + Name: "schema", + Description: "schema", + AutoID: true, + Fields: []*schemapb.FieldSchema{ + { + FieldID: 0, + Name: "row_id", + IsPrimaryKey: false, + Description: "row_id", + DataType: schemapb.DataType_Int64, + }, + { + FieldID: 1, + Name: "Ts", + IsPrimaryKey: false, + Description: "Ts", + DataType: schemapb.DataType_Int64, + }, + { + FieldID: 100, + Name: "field_bool", + IsPrimaryKey: false, + Description: "description_2", + DataType: schemapb.DataType_Bool, + }, + { + FieldID: 101, + Name: "field_int8", + IsPrimaryKey: false, + Description: "description_3", + DataType: schemapb.DataType_Int8, + }, + { + FieldID: 102, + Name: "field_int16", + IsPrimaryKey: false, + Description: "description_4", + DataType: schemapb.DataType_Int16, + }, + { + FieldID: 103, + Name: "field_int32", + IsPrimaryKey: false, + Description: "description_5", + DataType: schemapb.DataType_Int32, + }, + { + FieldID: 104, + Name: "field_int64", + IsPrimaryKey: false, + Description: "description_6", + DataType: schemapb.DataType_Int64, + }, + { + FieldID: 105, + Name: "field_float", + IsPrimaryKey: false, + Description: "description_7", + DataType: schemapb.DataType_Float, + }, + { + FieldID: 106, + Name: "field_double", + IsPrimaryKey: false, + Description: "description_8", + DataType: schemapb.DataType_Double, + }, + { + FieldID: 107, + Name: "field_string", + IsPrimaryKey: false, + Description: "description_9", + DataType: schemapb.DataType_String, + }, + { + FieldID: 108, + Name: "field_binary_vector", + IsPrimaryKey: false, + Description: "description_10", + DataType: schemapb.DataType_BinaryVector, + }, + { + FieldID: 109, + Name: "field_float_vector", + IsPrimaryKey: false, + Description: "description_11", + DataType: schemapb.DataType_FloatVector, + }, + }, + }, + } + insertCodec := NewInsertCodec(Schema) + insertDataFirst := &InsertData{ + Data: map[int64]FieldData{ + 0: &Int64FieldData{ + NumRows: 2, + Data: []int64{3, 4}, + }, + 1: &Int64FieldData{ + NumRows: 2, + Data: []int64{3, 4}, + }, + 100: &BoolFieldData{ + NumRows: 2, + Data: []bool{true, false}, + }, + 101: &Int8FieldData{ + NumRows: 2, + Data: []int8{3, 4}, + }, + 102: &Int16FieldData{ + NumRows: 2, + Data: []int16{3, 4}, + }, + 103: &Int32FieldData{ + NumRows: 2, + Data: []int32{3, 4}, + }, + 104: &Int64FieldData{ + NumRows: 2, + Data: []int64{3, 4}, + }, + 105: &FloatFieldData{ + NumRows: 2, + Data: []float32{3, 4}, + }, + 106: &DoubleFieldData{ + NumRows: 2, + Data: []float64{3, 4}, + }, + 107: &StringFieldData{ + NumRows: 2, + Data: []string{"3", "4"}, + }, + 108: &BinaryVectorFieldData{ + NumRows: 2, + Data: []byte{0, 255}, + Dim: 8, + }, + 109: &FloatVectorFieldData{ + NumRows: 2, + Data: []float32{0, 1, 2, 3, 4, 5, 6, 7, 0, 1, 2, 3, 4, 5, 6, 7}, + Dim: 8, + }, + }, + } + + insertDataSecond := &InsertData{ + Data: map[int64]FieldData{ + 0: &Int64FieldData{ + NumRows: 2, + Data: []int64{1, 2}, + }, + 1: &Int64FieldData{ + NumRows: 2, + Data: []int64{1, 2}, + }, + 100: &BoolFieldData{ + NumRows: 2, + Data: []bool{true, false}, + }, + 101: &Int8FieldData{ + NumRows: 2, + Data: []int8{1, 2}, + }, + 102: &Int16FieldData{ + NumRows: 2, + Data: []int16{1, 2}, + }, + 103: &Int32FieldData{ + NumRows: 2, + Data: []int32{1, 2}, + }, + 104: &Int64FieldData{ + NumRows: 2, + Data: []int64{1, 2}, + }, + 105: &FloatFieldData{ + NumRows: 2, + Data: []float32{1, 2}, + }, + 106: &DoubleFieldData{ + NumRows: 2, + Data: []float64{1, 2}, + }, + 107: &StringFieldData{ + NumRows: 2, + Data: []string{"1", "2"}, + }, + 108: &BinaryVectorFieldData{ + NumRows: 2, + Data: []byte{0, 255}, + Dim: 8, + }, + 109: &FloatVectorFieldData{ + NumRows: 2, + Data: []float32{0, 1, 2, 3, 4, 5, 6, 7, 0, 1, 2, 3, 4, 5, 6, 7}, + Dim: 8, + }, + }, + } + firstBlobs, err := insertCodec.Serialize(1, 1, insertDataFirst) + assert.Nil(t, err) + var binlogFiles []string + for index, blob := range firstBlobs { + blob.Key = fmt.Sprintf("1/insert_log/2/3/4/5/%d", 100) + fileName := fmt.Sprintf("/tmp/firstblob_%d.db", index) + binlogFiles = append(binlogFiles, fileName) + fd, err := os.Create(fileName) + assert.Nil(t, err) + num, err := fd.Write(blob.GetValue()) + assert.Nil(t, err) + assert.Equal(t, num, len(blob.GetValue())) + err = fd.Close() + assert.Nil(t, err) + } + secondBlobs, err := insertCodec.Serialize(1, 1, insertDataSecond) + assert.Nil(t, err) + for index, blob := range secondBlobs { + blob.Key = fmt.Sprintf("1/insert_log/2/3/4/5/%d", 99) + fileName := fmt.Sprintf("/tmp/secondblob_%d.db", index) + binlogFiles = append(binlogFiles, fileName) + fd, err := os.Create(fileName) + assert.Nil(t, err) + num, err := fd.Write(blob.GetValue()) + assert.Nil(t, err) + assert.Equal(t, num, len(blob.GetValue())) + err = fd.Close() + assert.Nil(t, err) + } + + PrintBinlogFiles(binlogFiles) +} + +func TestPrintDDFiles(t *testing.T) { + dataDefinitionCodec := NewDataDefinitionCodec(int64(1)) + ts := []Timestamp{ + 1, + 2, + 3, + 4, + } + collID := int64(1) + partitionID := int64(1) + collName := "test" + partitionName := "test" + createCollReq := internalpb.CreateCollectionRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_CreateCollection, + MsgID: 1, + Timestamp: 1, + SourceID: 1, + }, + CollectionID: collID, + Schema: make([]byte, 0), + CollectionName: collName, + DbName: "DbName", + DbID: UniqueID(0), + } + createCollString, err := proto.Marshal(&createCollReq) + assert.Nil(t, err) + + dropCollReq := internalpb.DropCollectionRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_DropCollection, + MsgID: 2, + Timestamp: 2, + SourceID: 2, + }, + CollectionID: collID, + CollectionName: collName, + DbName: "DbName", + DbID: UniqueID(0), + } + dropCollString, err := proto.Marshal(&dropCollReq) + assert.Nil(t, err) + + createPartitionReq := internalpb.CreatePartitionRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_CreatePartition, + MsgID: 3, + Timestamp: 3, + SourceID: 3, + }, + CollectionID: collID, + PartitionID: partitionID, + CollectionName: collName, + PartitionName: partitionName, + DbName: "DbName", + DbID: UniqueID(0), + } + createPartitionString, err := proto.Marshal(&createPartitionReq) + assert.Nil(t, err) + + dropPartitionReq := internalpb.DropPartitionRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_DropPartition, + MsgID: 4, + Timestamp: 4, + SourceID: 4, + }, + CollectionID: collID, + PartitionID: partitionID, + CollectionName: collName, + PartitionName: partitionName, + DbName: "DbName", + DbID: UniqueID(0), + } + dropPartitionString, err := proto.Marshal(&dropPartitionReq) + assert.Nil(t, err) + ddRequests := []string{ + string(createCollString[:]), + string(dropCollString[:]), + string(createPartitionString[:]), + string(dropPartitionString[:]), + } + eventTypeCodes := []EventTypeCode{ + CreateCollectionEventType, + DropCollectionEventType, + CreatePartitionEventType, + DropPartitionEventType, + } + blobs, err := dataDefinitionCodec.Serialize(ts, ddRequests, eventTypeCodes) + assert.Nil(t, err) + var binlogFiles []string + for index, blob := range blobs { + blob.Key = fmt.Sprintf("1/data_definition/3/4/5/%d", 99) + fileName := fmt.Sprintf("/tmp/ddblob_%d.db", index) + binlogFiles = append(binlogFiles, fileName) + fd, err := os.Create(fileName) + assert.Nil(t, err) + num, err := fd.Write(blob.GetValue()) + assert.Nil(t, err) + assert.Equal(t, num, len(blob.GetValue())) + err = fd.Close() + assert.Nil(t, err) + } + resultTs, resultRequests, err := dataDefinitionCodec.Deserialize(blobs) + assert.Nil(t, err) + assert.Equal(t, resultTs, ts) + assert.Equal(t, resultRequests, ddRequests) + assert.Nil(t, dataDefinitionCodec.Close()) + + PrintBinlogFiles(binlogFiles) } diff --git a/internal/storage/print_binlog.go b/internal/storage/print_binlog.go index cdc30476a..8828c0377 100644 --- a/internal/storage/print_binlog.go +++ b/internal/storage/print_binlog.go @@ -7,7 +7,7 @@ import ( "syscall" "github.com/golang/protobuf/proto" - "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" + "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" "github.com/zilliztech/milvus-distributed/internal/util/tsoutil" ) @@ -304,31 +304,32 @@ func printDDLPayloadValues(eventType EventTypeCode, colType schemapb.DataType, r } for i := 0; i < rows; i++ { val, err := reader.GetOneStringFromPayload(i) + valBytes := []byte(val) if err != nil { return err } switch eventType { case CreateCollectionEventType: - var req milvuspb.CreateCollectionRequest - if err := proto.UnmarshalText(val, &req); err != nil { + var req internalpb.CreateCollectionRequest + if err := proto.Unmarshal(valBytes, &req); err != nil { return err } fmt.Printf("\t\t%d : create collection: %v\n", i, req) case DropCollectionEventType: - var req milvuspb.DropCollectionRequest - if err := proto.UnmarshalText(val, &req); err != nil { + var req internalpb.DropCollectionRequest + if err := proto.Unmarshal(valBytes, &req); err != nil { return err } fmt.Printf("\t\t%d : drop collection: %v\n", i, req) case CreatePartitionEventType: - var req milvuspb.CreatePartitionRequest - if err := proto.UnmarshalText(val, &req); err != nil { + var req internalpb.CreatePartitionRequest + if err := proto.Unmarshal(valBytes, &req); err != nil { return err } fmt.Printf("\t\t%d : create partition: %v\n", i, req) case DropPartitionEventType: - var req milvuspb.DropPartitionRequest - if err := proto.UnmarshalText(val, &req); err != nil { + var req internalpb.DropPartitionRequest + if err := proto.Unmarshal(valBytes, &req); err != nil { return err } fmt.Printf("\t\t%d : drop partition: %v\n", i, req) -- GitLab