未验证 提交 cf860007 编写于 作者: D dragondriver 提交者: GitHub

Refactor the index file format (#8514)

Signed-off-by: Ndragondriver <jiquan.long@zilliz.com>
上级 bada3935
......@@ -296,7 +296,7 @@ func (it *IndexBuildTask) Execute(ctx context.Context) error {
storageBlobs := getStorageBlobs(blobs)
var insertCodec storage.InsertCodec
defer insertCodec.Close()
partitionID, segmentID, insertData, err2 := insertCodec.Deserialize(storageBlobs)
collectionID, partitionID, segmentID, insertData, err2 := insertCodec.DeserializeAll(storageBlobs)
if err2 != nil {
return err2
}
......@@ -305,7 +305,7 @@ func (it *IndexBuildTask) Execute(ctx context.Context) error {
}
tr.Record("deserialize storage blobs done")
for _, value := range insertData.Data {
for fieldID, value := range insertData.Data {
// TODO: BinaryVectorFieldData
floatVectorFieldData, fOk := value.(*storage.FloatVectorFieldData)
if fOk {
......@@ -338,11 +338,23 @@ func (it *IndexBuildTask) Execute(ctx context.Context) error {
}
tr.Record("serialize index done")
var indexCodec storage.IndexCodec
serializedIndexBlobs, err := indexCodec.Serialize(getStorageBlobs(indexBlobs), indexParams, it.req.IndexName, it.req.IndexID)
codec := storage.NewIndexFileBinlogCodec()
serializedIndexBlobs, err := codec.Serialize(
it.req.IndexBuildID,
it.req.Version,
collectionID,
partitionID,
segmentID,
fieldID,
indexParams,
it.req.IndexName,
it.req.IndexID,
getStorageBlobs(indexBlobs),
)
if err != nil {
return err
}
_ = codec.Close()
tr.Record("serialize index codec done")
getSavePathByKey := func(key string) string {
......
......@@ -107,6 +107,8 @@ func (loader *indexLoader) getIndexBinlog(indexPath []string) ([][]byte, indexPa
var indexParams indexParam
var indexName string
indexCodec := storage.NewIndexFileBinlogCodec()
defer indexCodec.Close()
for _, p := range indexPath {
log.Debug("", zap.String("load path", fmt.Sprintln(indexPath)))
indexPiece, err := loader.kv.Load(p)
......@@ -115,7 +117,6 @@ func (loader *indexLoader) getIndexBinlog(indexPath []string) ([][]byte, indexPa
}
// get index params when detecting indexParamPrefix
if path.Base(p) == storage.IndexParamsFile {
indexCodec := storage.NewIndexCodec()
_, indexParams, indexName, _, err = indexCodec.Deserialize([]*storage.Blob{
{
Key: storage.IndexParamsFile,
......@@ -126,7 +127,16 @@ func (loader *indexLoader) getIndexBinlog(indexPath []string) ([][]byte, indexPa
return nil, nil, "", err
}
} else {
index = append(index, []byte(indexPiece))
data, _, _, _, err := indexCodec.Deserialize([]*storage.Blob{
{
Key: path.Base(p), // though key is not important here
Value: []byte(indexPiece),
},
})
if err != nil {
return nil, nil, "", err
}
index = append(index, data[0].Value)
}
}
......
......@@ -247,8 +247,19 @@ func generateIndex(segmentID UniqueID) ([]string, error) {
}
// serialize index params
var indexCodec storage.IndexCodec
serializedIndexBlobs, err := indexCodec.Serialize(binarySet, indexParams, indexName, indexID)
indexCodec := storage.NewIndexFileBinlogCodec()
serializedIndexBlobs, err := indexCodec.Serialize(
0,
0,
0,
0,
0,
0,
indexParams,
indexName,
indexID,
binarySet,
)
if err != nil {
return nil, err
}
......
......@@ -14,11 +14,16 @@ package storage
import (
"bytes"
"encoding/binary"
"encoding/json"
"fmt"
"testing"
"time"
"unsafe"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/uniquegenerator"
"github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/util/tsoutil"
"github.com/stretchr/testify/assert"
......@@ -972,6 +977,129 @@ func TestDDLBinlog2(t *testing.T) {
assert.Equal(t, ed2.EndTimestamp, Timestamp(400))
}
/* #nosec G103 */
func TestIndexFileBinlog(t *testing.T) {
indexBuildID := UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt())
version := int64(uniquegenerator.GetUniqueIntGeneratorIns().GetInt())
collectionID := UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt())
partitionID := UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt())
segmentID := UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt())
fieldID := UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt())
indexName := funcutil.GenRandomStr()
indexID := UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt())
key := funcutil.GenRandomStr()
timestamp := Timestamp(time.Now().UnixNano())
payload := funcutil.GenRandomStr()
w := NewIndexFileBinlogWriter(indexBuildID, version, collectionID, partitionID, segmentID, fieldID, indexName, indexID, key)
e, err := w.NextIndexFileEventWriter()
assert.Nil(t, err)
err = e.AddOneStringToPayload(payload)
assert.Nil(t, err)
e.SetEventTimestamp(timestamp, timestamp)
w.SetEventTimeStamp(timestamp, timestamp)
_, err = w.GetBuffer()
assert.NotNil(t, err)
err = w.Close()
assert.Nil(t, err)
buf, err := w.GetBuffer()
assert.Nil(t, err)
//magic number
magicNum := UnsafeReadInt32(buf, 0)
assert.Equal(t, magicNum, MagicNumber)
pos := int(unsafe.Sizeof(MagicNumber))
//descriptor header, timestamp
ts := UnsafeReadInt64(buf, pos)
assert.Greater(t, ts, int64(0))
pos += int(unsafe.Sizeof(ts))
//descriptor header, type code
tc := UnsafeReadInt8(buf, pos)
assert.Equal(t, EventTypeCode(tc), DescriptorEventType)
pos += int(unsafe.Sizeof(tc))
//descriptor header, event length
descEventLen := UnsafeReadInt32(buf, pos)
pos += int(unsafe.Sizeof(descEventLen))
//descriptor header, next position
descNxtPos := UnsafeReadInt32(buf, pos)
assert.Equal(t, descEventLen+int32(unsafe.Sizeof(MagicNumber)), descNxtPos)
pos += int(unsafe.Sizeof(descNxtPos))
//descriptor data fix, collection id
collID := UnsafeReadInt64(buf, pos)
assert.Equal(t, collID, collectionID)
pos += int(unsafe.Sizeof(collID))
//descriptor data fix, partition id
partID := UnsafeReadInt64(buf, pos)
assert.Equal(t, partID, partitionID)
pos += int(unsafe.Sizeof(partID))
//descriptor data fix, segment id
segID := UnsafeReadInt64(buf, pos)
assert.Equal(t, segID, segmentID)
pos += int(unsafe.Sizeof(segID))
//descriptor data fix, field id
fID := UnsafeReadInt64(buf, pos)
assert.Equal(t, fieldID, fieldID)
pos += int(unsafe.Sizeof(fID))
//descriptor data fix, start time stamp
startts := UnsafeReadInt64(buf, pos)
assert.Equal(t, startts, int64(timestamp))
pos += int(unsafe.Sizeof(startts))
//descriptor data fix, end time stamp
endts := UnsafeReadInt64(buf, pos)
assert.Equal(t, endts, int64(timestamp))
pos += int(unsafe.Sizeof(endts))
//descriptor data fix, payload type
colType := UnsafeReadInt32(buf, pos)
assert.Equal(t, schemapb.DataType(colType), schemapb.DataType_String)
pos += int(unsafe.Sizeof(colType))
//descriptor data, post header lengths
for i := DescriptorEventType; i < EventTypeEnd; i++ {
size := getEventFixPartSize(i)
assert.Equal(t, uint8(size), buf[pos])
pos++
}
//descriptor data, extra length
extraLength := UnsafeReadInt32(buf, pos)
assert.Equal(t, extraLength, w.baseBinlogWriter.descriptorEventData.ExtraLength)
pos += int(unsafe.Sizeof(extraLength))
multiBytes := make([]byte, extraLength)
for i := 0; i < int(extraLength); i++ {
singleByte := UnsafeReadByte(buf, pos)
multiBytes[i] = singleByte
pos++
}
j := make(map[string]interface{})
err = json.Unmarshal(multiBytes, &j)
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf("%v", indexBuildID), fmt.Sprintf("%v", j["indexBuildID"]))
assert.Equal(t, fmt.Sprintf("%v", version), fmt.Sprintf("%v", j["version"]))
assert.Equal(t, fmt.Sprintf("%v", indexName), fmt.Sprintf("%v", j["indexName"]))
assert.Equal(t, fmt.Sprintf("%v", indexID), fmt.Sprintf("%v", j["indexID"]))
assert.Equal(t, fmt.Sprintf("%v", key), fmt.Sprintf("%v", j["key"]))
// NextIndexFileBinlogWriter after close
_, err = w.NextIndexFileEventWriter()
assert.NotNil(t, err)
}
func TestNewBinlogReaderError(t *testing.T) {
data := []byte{}
reader, err := NewBinlogReader(data)
......
......@@ -26,6 +26,7 @@ const (
InsertBinlog BinlogType = iota
DeleteBinlog
DDLBinlog
IndexFileBinlog
)
const (
MagicNumber int32 = 0xfffabc
......@@ -219,6 +220,22 @@ func (writer *DDLBinlogWriter) NextDropPartitionEventWriter() (*dropPartitionEve
return event, nil
}
type IndexFileBinlogWriter struct {
baseBinlogWriter
}
func (writer *IndexFileBinlogWriter) NextIndexFileEventWriter() (*indexFileEventWriter, error) {
if writer.isClosed() {
return nil, fmt.Errorf("binlog has closed")
}
event, err := newIndexFileEventWriter()
if err != nil {
return nil, err
}
writer.eventWriters = append(writer.eventWriters, event)
return event, nil
}
// NewInsertBinlogWriter creates InsertBinlogWriter to write binlog file.
func NewInsertBinlogWriter(dataType schemapb.DataType, collectionID, partitionID, segmentID, FieldID int64) *InsertBinlogWriter {
descriptorEvent := newDescriptorEvent()
......@@ -271,3 +288,36 @@ func NewDDLBinlogWriter(dataType schemapb.DataType, collectionID int64) *DDLBinl
},
}
}
func NewIndexFileBinlogWriter(
indexBuildID UniqueID,
version int64,
collectionID UniqueID,
partitionID UniqueID,
segmentID UniqueID,
fieldID UniqueID,
indexName string,
indexID UniqueID,
key string,
) *IndexFileBinlogWriter {
descriptorEvent := newDescriptorEvent()
descriptorEvent.CollectionID = collectionID
descriptorEvent.PartitionID = partitionID
descriptorEvent.SegmentID = segmentID
descriptorEvent.FieldID = fieldID
descriptorEvent.PayloadDataType = schemapb.DataType_String
descriptorEvent.AddExtra("indexBuildID", fmt.Sprintf("%d", indexBuildID))
descriptorEvent.AddExtra("version", fmt.Sprintf("%d", version))
descriptorEvent.AddExtra("indexName", indexName)
descriptorEvent.AddExtra("indexID", fmt.Sprintf("%d", indexID))
descriptorEvent.AddExtra("key", key)
return &IndexFileBinlogWriter{
baseBinlogWriter: baseBinlogWriter{
descriptorEvent: *descriptorEvent,
magicNumber: MagicNumber,
binlogType: IndexFileBinlog,
eventWriters: make([]EventWriter, 0),
buffer: nil,
},
}
}
......@@ -13,11 +13,16 @@ package storage
import (
"encoding/json"
"errors"
"fmt"
"math"
"sort"
"strconv"
"strings"
"time"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
......@@ -248,13 +253,15 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique
return blobs, statsBlobs, nil
}
// Deserialize transfer blob back to insert data.
// From schema, it get all fields.
// For each field, it will create a binlog reader, and read all event to the buffer.
// It returns origin @InsertData in the end.
func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID, segmentID UniqueID, data *InsertData, err error) {
func (insertCodec *InsertCodec) DeserializeAll(blobs []*Blob) (
collectionID UniqueID,
partitionID UniqueID,
segmentID UniqueID,
data *InsertData,
err error,
) {
if len(blobs) == 0 {
return InvalidUniqueID, InvalidUniqueID, nil, fmt.Errorf("blobs is empty")
return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, nil, fmt.Errorf("blobs is empty")
}
readerClose := func(reader *BinlogReader) func() error {
return func() error { return reader.Close() }
......@@ -263,6 +270,7 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID
var blobList BlobList = blobs
sort.Sort(blobList)
var cID UniqueID
var pID UniqueID
var sID UniqueID
resultData := &InsertData{}
......@@ -270,11 +278,11 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID
for _, blob := range blobList {
binlogReader, err := NewBinlogReader(blob.Value)
if err != nil {
return InvalidUniqueID, InvalidUniqueID, nil, err
return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, nil, err
}
// read partitionID and SegmentID
pID, sID = binlogReader.PartitionID, binlogReader.SegmentID
cID, pID, sID = binlogReader.CollectionID, binlogReader.PartitionID, binlogReader.SegmentID
dataType := binlogReader.PayloadDataType
fieldID := binlogReader.FieldID
......@@ -282,7 +290,7 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID
for {
eventReader, err := binlogReader.NextEventReader()
if err != nil {
return InvalidUniqueID, InvalidUniqueID, nil, err
return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, nil, err
}
if eventReader == nil {
break
......@@ -295,12 +303,12 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID
boolFieldData := resultData.Data[fieldID].(*BoolFieldData)
singleData, err := eventReader.GetBoolFromPayload()
if err != nil {
return InvalidUniqueID, InvalidUniqueID, nil, err
return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, nil, err
}
boolFieldData.Data = append(boolFieldData.Data, singleData...)
length, err := eventReader.GetPayloadLengthFromReader()
if err != nil {
return InvalidUniqueID, InvalidUniqueID, nil, err
return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, nil, err
}
totalLength += length
boolFieldData.NumRows = append(boolFieldData.NumRows, int64(length))
......@@ -312,12 +320,12 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID
int8FieldData := resultData.Data[fieldID].(*Int8FieldData)
singleData, err := eventReader.GetInt8FromPayload()
if err != nil {
return InvalidUniqueID, InvalidUniqueID, nil, err
return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, nil, err
}
int8FieldData.Data = append(int8FieldData.Data, singleData...)
length, err := eventReader.GetPayloadLengthFromReader()
if err != nil {
return InvalidUniqueID, InvalidUniqueID, nil, err
return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, nil, err
}
totalLength += length
int8FieldData.NumRows = append(int8FieldData.NumRows, int64(length))
......@@ -329,12 +337,12 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID
int16FieldData := resultData.Data[fieldID].(*Int16FieldData)
singleData, err := eventReader.GetInt16FromPayload()
if err != nil {
return InvalidUniqueID, InvalidUniqueID, nil, err
return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, nil, err
}
int16FieldData.Data = append(int16FieldData.Data, singleData...)
length, err := eventReader.GetPayloadLengthFromReader()
if err != nil {
return InvalidUniqueID, InvalidUniqueID, nil, err
return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, nil, err
}
totalLength += length
int16FieldData.NumRows = append(int16FieldData.NumRows, int64(length))
......@@ -346,12 +354,12 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID
int32FieldData := resultData.Data[fieldID].(*Int32FieldData)
singleData, err := eventReader.GetInt32FromPayload()
if err != nil {
return InvalidUniqueID, InvalidUniqueID, nil, err
return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, nil, err
}
int32FieldData.Data = append(int32FieldData.Data, singleData...)
length, err := eventReader.GetPayloadLengthFromReader()
if err != nil {
return InvalidUniqueID, InvalidUniqueID, nil, err
return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, nil, err
}
totalLength += length
int32FieldData.NumRows = append(int32FieldData.NumRows, int64(length))
......@@ -363,12 +371,12 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID
int64FieldData := resultData.Data[fieldID].(*Int64FieldData)
singleData, err := eventReader.GetInt64FromPayload()
if err != nil {
return InvalidUniqueID, InvalidUniqueID, nil, err
return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, nil, err
}
int64FieldData.Data = append(int64FieldData.Data, singleData...)
length, err := eventReader.GetPayloadLengthFromReader()
if err != nil {
return InvalidUniqueID, InvalidUniqueID, nil, err
return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, nil, err
}
totalLength += length
int64FieldData.NumRows = append(int64FieldData.NumRows, int64(length))
......@@ -380,12 +388,12 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID
floatFieldData := resultData.Data[fieldID].(*FloatFieldData)
singleData, err := eventReader.GetFloatFromPayload()
if err != nil {
return InvalidUniqueID, InvalidUniqueID, nil, err
return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, nil, err
}
floatFieldData.Data = append(floatFieldData.Data, singleData...)
length, err := eventReader.GetPayloadLengthFromReader()
if err != nil {
return InvalidUniqueID, InvalidUniqueID, nil, err
return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, nil, err
}
totalLength += length
floatFieldData.NumRows = append(floatFieldData.NumRows, int64(length))
......@@ -397,12 +405,12 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID
doubleFieldData := resultData.Data[fieldID].(*DoubleFieldData)
singleData, err := eventReader.GetDoubleFromPayload()
if err != nil {
return InvalidUniqueID, InvalidUniqueID, nil, err
return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, nil, err
}
doubleFieldData.Data = append(doubleFieldData.Data, singleData...)
length, err := eventReader.GetPayloadLengthFromReader()
if err != nil {
return InvalidUniqueID, InvalidUniqueID, nil, err
return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, nil, err
}
totalLength += length
doubleFieldData.NumRows = append(doubleFieldData.NumRows, int64(length))
......@@ -414,14 +422,14 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID
stringFieldData := resultData.Data[fieldID].(*StringFieldData)
length, err := eventReader.GetPayloadLengthFromReader()
if err != nil {
return InvalidUniqueID, InvalidUniqueID, nil, err
return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, nil, err
}
totalLength += length
stringFieldData.NumRows = append(stringFieldData.NumRows, int64(length))
for i := 0; i < length; i++ {
singleString, err := eventReader.GetOneStringFromPayload(i)
if err != nil {
return InvalidUniqueID, InvalidUniqueID, nil, err
return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, nil, err
}
stringFieldData.Data = append(stringFieldData.Data, singleString)
}
......@@ -434,12 +442,12 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID
var singleData []byte
singleData, binaryVectorFieldData.Dim, err = eventReader.GetBinaryVectorFromPayload()
if err != nil {
return InvalidUniqueID, InvalidUniqueID, nil, err
return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, nil, err
}
binaryVectorFieldData.Data = append(binaryVectorFieldData.Data, singleData...)
length, err := eventReader.GetPayloadLengthFromReader()
if err != nil {
return InvalidUniqueID, InvalidUniqueID, nil, err
return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, nil, err
}
totalLength += length
binaryVectorFieldData.NumRows = append(binaryVectorFieldData.NumRows, int64(length))
......@@ -452,18 +460,18 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID
var singleData []float32
singleData, floatVectorFieldData.Dim, err = eventReader.GetFloatVectorFromPayload()
if err != nil {
return InvalidUniqueID, InvalidUniqueID, nil, err
return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, nil, err
}
floatVectorFieldData.Data = append(floatVectorFieldData.Data, singleData...)
length, err := eventReader.GetPayloadLengthFromReader()
if err != nil {
return InvalidUniqueID, InvalidUniqueID, nil, err
return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, nil, err
}
totalLength += length
floatVectorFieldData.NumRows = append(floatVectorFieldData.NumRows, int64(length))
resultData.Data[fieldID] = floatVectorFieldData
default:
return InvalidUniqueID, InvalidUniqueID, nil, fmt.Errorf("undefined data type %d", dataType)
return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, nil, fmt.Errorf("undefined data type %d", dataType)
}
}
if fieldID == rootcoord.TimeStampField {
......@@ -475,7 +483,16 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID
insertCodec.readerCloseFunc = append(insertCodec.readerCloseFunc, readerClose(binlogReader))
}
return pID, sID, resultData, nil
return cID, pID, sID, resultData, nil
}
// Deserialize transfer blob back to insert data.
// From schema, it get all fields.
// For each field, it will create a binlog reader, and read all event to the buffer.
// It returns origin @InsertData in the end.
func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID, segmentID UniqueID, data *InsertData, err error) {
_, partitionID, segmentID, data, err = insertCodec.DeserializeAll(blobs)
return partitionID, segmentID, data, err
}
func (insertCodec *InsertCodec) Close() error {
......@@ -766,16 +783,232 @@ func (dataDefinitionCodec *DataDefinitionCodec) Close() error {
return nil
}
//type IndexCodec struct {
// Base
// readerCloseFunc []func() error
//}
//
////func (builder *IndexBuilder) Build(fieldData FieldData, typeParams map[string]string, indexParams map[string]string) ([]*Blob, error) {}
//func (indexCodec *IndexCodec) Serialize(indexSlices []*Blob) ([]*Blob, error) {}
//
//// TODO: describe inputs and return
//func (indexCodec *IndexCodec) Deserialize(blobs []*Blob) ([]*Blob, error) {}
type IndexFileBinlogCodec struct {
readerCloseFuncs []func() error
}
func NewIndexFileBinlogCodec() *IndexFileBinlogCodec {
return &IndexFileBinlogCodec{
readerCloseFuncs: make([]func() error, 0),
}
}
func (codec *IndexFileBinlogCodec) Serialize(
indexBuildID UniqueID,
version int64,
collectionID UniqueID,
partitionID UniqueID,
segmentID UniqueID,
fieldID UniqueID,
indexParams map[string]string,
indexName string,
indexID UniqueID,
datas []*Blob,
) ([]*Blob, error) {
var err error
var blobs []*Blob
ts := Timestamp(time.Now().UnixNano())
for pos := range datas {
writer := NewIndexFileBinlogWriter(indexBuildID, version, collectionID, partitionID, segmentID, fieldID, indexName, indexID, datas[pos].Key)
eventWriter, err := writer.NextIndexFileEventWriter()
if err != nil {
return nil, err
}
err = eventWriter.AddOneStringToPayload(string(datas[pos].Value))
if err != nil {
return nil, err
}
eventWriter.SetEventTimestamp(ts, ts)
writer.SetEventTimeStamp(ts, ts)
err = writer.Close()
if err != nil {
return nil, err
}
buffer, err := writer.GetBuffer()
if err != nil {
return nil, err
}
blobs = append(blobs, &Blob{
Key: datas[pos].Key,
//Key: strconv.Itoa(pos),
Value: buffer,
})
}
// save index params
writer := NewIndexFileBinlogWriter(indexBuildID, version, collectionID, partitionID, segmentID, fieldID, indexName, indexID, "indexParams")
eventWriter, err := writer.NextIndexFileEventWriter()
if err != nil {
return nil, err
}
params, _ := json.Marshal(indexParams)
err = eventWriter.AddOneStringToPayload(string(params))
if err != nil {
return nil, err
}
eventWriter.SetEventTimestamp(ts, ts)
writer.SetEventTimeStamp(ts, ts)
err = writer.Close()
if err != nil {
return nil, err
}
buffer, err := writer.GetBuffer()
if err != nil {
return nil, err
}
blobs = append(blobs, &Blob{
Key: "indexParams",
//Key: strconv.Itoa(len(datas)),
Value: buffer,
})
return blobs, nil
}
func (codec *IndexFileBinlogCodec) DeserializeImpl(blobs []*Blob) (
indexBuildID UniqueID,
version int64,
collectionID UniqueID,
partitionID UniqueID,
segmentID UniqueID,
fieldID UniqueID,
indexParams map[string]string,
indexName string,
indexID UniqueID,
datas []*Blob,
err error,
) {
if len(blobs) == 0 {
return 0, 0, 0, 0, 0, 0, nil, "", 0, nil, errors.New("blobs is empty")
}
readerClose := func(reader *BinlogReader) func() error {
return func() error { return reader.Close() }
}
indexParams = make(map[string]string)
datas = make([]*Blob, 0)
for _, blob := range blobs {
binlogReader, err := NewBinlogReader(blob.Value)
if err != nil {
log.Warn("failed to read binlog",
zap.Error(err))
return 0, 0, 0, 0, 0, 0, nil, "", 0, nil, err
}
dataType := binlogReader.PayloadDataType
//desc, err := binlogReader.readDescriptorEvent()
//if err != nil {
// log.Warn("failed to read descriptor event",
// zap.Error(err))
// return 0, 0, 0, 0, 0, 0, nil, "", 0, nil, err
//}
desc := binlogReader.descriptorEvent
extraBytes := desc.ExtraBytes
extra := make(map[string]interface{})
_ = json.Unmarshal(extraBytes, &extra)
var value int
value, _ = strconv.Atoi(extra["indexBuildID"].(string))
indexBuildID = UniqueID(value)
value, _ = strconv.Atoi(extra["version"].(string))
version = int64(value)
collectionID = desc.CollectionID
partitionID = desc.PartitionID
segmentID = desc.SegmentID
fieldID = desc.FieldID
indexName = extra["indexName"].(string)
value, _ = strconv.Atoi(extra["indexID"].(string))
indexID = UniqueID(value)
key := extra["key"].(string)
for {
eventReader, err := binlogReader.NextEventReader()
if err != nil {
log.Warn("failed to get next event reader",
zap.Error(err))
return 0, 0, 0, 0, 0, 0, nil, "", 0, nil, err
}
if eventReader == nil {
break
}
switch dataType {
case schemapb.DataType_String:
length, err := eventReader.GetPayloadLengthFromReader()
if err != nil {
log.Warn("failed to get payload length",
zap.Error(err))
return 0, 0, 0, 0, 0, 0, nil, "", 0, nil, err
}
for i := 0; i < length; i++ {
singleString, err := eventReader.GetOneStringFromPayload(i)
if err != nil {
log.Warn("failed to get string from payload",
zap.Error(err))
return 0, 0, 0, 0, 0, 0, nil, "", 0, nil, err
}
if key == "indexParams" {
_ = json.Unmarshal([]byte(singleString), &indexParams)
} else {
datas = append(datas, &Blob{
Key: key,
Value: []byte(singleString),
})
}
}
}
}
codec.readerCloseFuncs = append(codec.readerCloseFuncs, readerClose(binlogReader))
}
return indexBuildID, version, collectionID, partitionID, segmentID, fieldID, indexParams, indexName, indexID, datas, nil
}
func (codec *IndexFileBinlogCodec) Deserialize(blobs []*Blob) (
datas []*Blob,
indexParams map[string]string,
indexName string,
indexID UniqueID,
err error,
) {
_, _, _, _, _, _, indexParams, indexName, indexID, datas, err = codec.DeserializeImpl(blobs)
return datas, indexParams, indexName, indexID, err
}
func (codec *IndexFileBinlogCodec) Close() error {
for _, closeFunc := range codec.readerCloseFuncs {
err := closeFunc()
if err != nil {
return err
}
}
return nil
}
type IndexCodec struct {
}
......
......@@ -15,6 +15,9 @@ import (
"fmt"
"testing"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/uniquegenerator"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
......@@ -261,8 +264,9 @@ func TestInsertCodec(t *testing.T) {
assert.Equal(t, blob.GetKey(), blob.Key)
}
resultBlobs := append(Blobs1, Blobs2...)
partID, segID, resultData, err := insertCodec.Deserialize(resultBlobs)
collID, partID, segID, resultData, err := insertCodec.DeserializeAll(resultBlobs)
assert.Nil(t, err)
assert.Equal(t, UniqueID(CollectionID), collID)
assert.Equal(t, UniqueID(PartitionID), partID)
assert.Equal(t, UniqueID(SegmentID), segID)
assert.Equal(t, []int64{2, 2}, resultData.Data[RowIDField].(*Int64FieldData).NumRows)
......@@ -296,6 +300,8 @@ func TestInsertCodec(t *testing.T) {
blobs := []*Blob{}
_, _, _, err = insertCodec.Deserialize(blobs)
assert.NotNil(t, err)
_, _, _, _, err = insertCodec.DeserializeAll(blobs)
assert.NotNil(t, err)
}
func TestDeleteCodec(t *testing.T) {
......@@ -347,6 +353,95 @@ func TestDDCodec(t *testing.T) {
assert.NotNil(t, err)
}
func TestIndexFileBinlogCodec(t *testing.T) {
indexBuildID := UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt())
version := int64(uniquegenerator.GetUniqueIntGeneratorIns().GetInt())
collectionID := UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt())
partitionID := UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt())
segmentID := UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt())
fieldID := UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt())
indexName := funcutil.GenRandomStr()
indexID := UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt())
indexParams := make(map[string]string)
indexParams["index_type"] = "IVF_FLAT"
datas := []*Blob{
{
Key: "ivf1",
Value: []byte{1, 2, 3},
},
{
Key: "ivf2",
Value: []byte{4, 5, 6},
},
}
codec := NewIndexFileBinlogCodec()
serializedBlobs, err := codec.Serialize(indexBuildID, version, collectionID, partitionID, segmentID, fieldID, indexParams, indexName, indexID, datas)
assert.Nil(t, err)
idxBuildID, v, collID, parID, segID, fID, params, idxName, idxID, blobs, err := codec.DeserializeImpl(serializedBlobs)
assert.Nil(t, err)
assert.Equal(t, indexBuildID, idxBuildID)
assert.Equal(t, version, v)
assert.Equal(t, collectionID, collID)
assert.Equal(t, partitionID, parID)
assert.Equal(t, segmentID, segID)
assert.Equal(t, fieldID, fID)
assert.Equal(t, len(indexParams), len(params))
for key, value := range indexParams {
assert.Equal(t, value, params[key])
}
assert.Equal(t, indexName, idxName)
assert.Equal(t, indexID, idxID)
assert.ElementsMatch(t, datas, blobs)
blobs, indexParams, indexName, indexID, err = codec.Deserialize(serializedBlobs)
assert.Nil(t, err)
assert.ElementsMatch(t, datas, blobs)
for key, value := range indexParams {
assert.Equal(t, value, params[key])
}
assert.Equal(t, indexName, idxName)
assert.Equal(t, indexID, idxID)
err = codec.Close()
assert.Nil(t, err)
// empty
_, _, _, _, _, _, _, _, _, _, err = codec.DeserializeImpl(nil)
assert.NotNil(t, err)
}
func TestIndexFileBinlogCodecError(t *testing.T) {
var err error
// failed to read binlog
codec := NewIndexFileBinlogCodec()
_, _, _, _, err = codec.Deserialize([]*Blob{{Key: "key", Value: []byte("not in binlog format")}})
assert.NotNil(t, err)
indexBuildID := UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt())
version := int64(uniquegenerator.GetUniqueIntGeneratorIns().GetInt())
collectionID := UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt())
partitionID := UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt())
segmentID := UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt())
fieldID := UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt())
indexName := funcutil.GenRandomStr()
indexID := UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt())
indexParams := make(map[string]string)
indexParams["index_type"] = "IVF_FLAT"
datas := []*Blob{
{
Key: "ivf1",
Value: []byte{1, 2, 3},
},
}
_, err = codec.Serialize(indexBuildID, version, collectionID, partitionID, segmentID, fieldID, indexParams, indexName, indexID, datas)
assert.Nil(t, err)
}
func TestIndexCodec(t *testing.T) {
indexCodec := NewIndexCodec()
blobs := []*Blob{
......
......@@ -266,6 +266,30 @@ func (data *dropPartitionEventData) WriteEventData(buffer io.Writer) error {
return binary.Write(buffer, binary.LittleEndian, data)
}
type indexFileEventData struct {
StartTimestamp typeutil.Timestamp
EndTimestamp typeutil.Timestamp
}
func (data *indexFileEventData) SetEventTimestamp(start typeutil.Timestamp, end typeutil.Timestamp) {
data.StartTimestamp = start
data.EndTimestamp = end
}
func (data *indexFileEventData) GetEventDataFixPartSize() int32 {
return int32(binary.Size(data))
}
func (data *indexFileEventData) WriteEventData(buffer io.Writer) error {
if data.StartTimestamp == 0 {
return errors.New("hasn't set start time stamp")
}
if data.EndTimestamp == 0 {
return errors.New("hasn't set end time stamp")
}
return binary.Write(buffer, binary.LittleEndian, data)
}
func getEventFixPartSize(code EventTypeCode) int32 {
switch code {
case DescriptorEventType:
......@@ -282,6 +306,8 @@ func getEventFixPartSize(code EventTypeCode) int32 {
return (&createPartitionEventData{}).GetEventDataFixPartSize()
case DropPartitionEventType:
return (&dropPartitionEventData{}).GetEventDataFixPartSize()
case IndexFileEventType:
return (&indexFileEventData{}).GetEventDataFixPartSize()
default:
return -1
}
......@@ -344,6 +370,12 @@ func newDropPartitionEventData() *dropPartitionEventData {
EndTimestamp: 0,
}
}
func newIndexFileEventData() *indexFileEventData {
return &indexFileEventData{
StartTimestamp: 0,
EndTimestamp: 0,
}
}
func readInsertEventDataFixPart(buffer io.Reader) (*insertEventData, error) {
data := &insertEventData{}
......@@ -392,3 +424,11 @@ func readDropPartitionEventDataFixPart(buffer io.Reader) (*dropPartitionEventDat
}
return data, nil
}
func readIndexFileEventDataFixPart(buffer io.Reader) (*indexFileEventData, error) {
data := &indexFileEventData{}
if err := binary.Read(buffer, binary.LittleEndian, data); err != nil {
return nil, err
}
return data, nil
}
......@@ -57,6 +57,8 @@ func (reader *EventReader) readData() error {
data, err = readCreatePartitionEventDataFixPart(reader.buffer)
case DropPartitionEventType:
data, err = readDropPartitionEventDataFixPart(reader.buffer)
case IndexFileEventType:
data, err = readIndexFileEventDataFixPart(reader.buffer)
default:
return fmt.Errorf("unknown header type code: %d", reader.TypeCode)
}
......
......@@ -1096,6 +1096,44 @@ func TestDropPartitionEvent(t *testing.T) {
}
/* #nosec G103 */
func TestIndexFileEvent(t *testing.T) {
t.Run("index_file_timestamp", func(t *testing.T) {
w, err := newIndexFileEventWriter()
assert.Nil(t, err)
w.SetEventTimestamp(tsoutil.ComposeTS(10, 0), tsoutil.ComposeTS(100, 0))
payload := "payload"
err = w.AddOneStringToPayload(payload)
assert.Nil(t, err)
err = w.Finish()
assert.Nil(t, err)
var buf bytes.Buffer
err = w.Write(&buf)
assert.Nil(t, err)
err = w.Close()
assert.Nil(t, err)
wBuf := buf.Bytes()
st := UnsafeReadInt64(wBuf, binary.Size(eventHeader{}))
assert.Equal(t, Timestamp(st), tsoutil.ComposeTS(10, 0))
et := UnsafeReadInt64(wBuf, binary.Size(eventHeader{})+int(unsafe.Sizeof(st)))
assert.Equal(t, Timestamp(et), tsoutil.ComposeTS(100, 0))
payloadOffset := binary.Size(eventHeader{}) + binary.Size(indexFileEventData{})
pBuf := wBuf[payloadOffset:]
pR, err := NewPayloadReader(schemapb.DataType_String, pBuf)
assert.Nil(t, err)
value, err := pR.GetOneStringFromPayload(0)
assert.Nil(t, err)
assert.Equal(t, payload, value)
err = pR.Close()
assert.Nil(t, err)
})
}
func TestDescriptorEventTsError(t *testing.T) {
insertData := &insertEventData{
StartTimestamp: 0,
......@@ -1266,3 +1304,28 @@ func TestEventClose(t *testing.T) {
err = r.readData()
assert.NotNil(t, err)
}
func TestIndexFileEventDataError(t *testing.T) {
var err error
var buffer bytes.Buffer
event := newIndexFileEventData()
event.SetEventTimestamp(0, 1)
// start timestamp not set
err = event.WriteEventData(&buffer)
assert.NotNil(t, err)
event.SetEventTimestamp(1, 0)
// end timestamp not set
err = event.WriteEventData(&buffer)
assert.NotNil(t, err)
}
func TestReadIndexFileEventDataFixPart(t *testing.T) {
var err error
var buffer bytes.Buffer
// buffer is empty
_, err = readIndexFileEventDataFixPart(&buffer)
assert.NotNil(t, err)
}
......@@ -30,6 +30,7 @@ const (
DropCollectionEventType
CreatePartitionEventType
DropPartitionEventType
IndexFileEventType
EventTypeEnd
)
......@@ -42,6 +43,7 @@ func (code EventTypeCode) String() string {
DropCollectionEventType: "DropCollectionEventType",
CreatePartitionEventType: "CreatePartitionEventType",
DropPartitionEventType: "DropPartitionEventType",
IndexFileEventType: "IndexFileEventType",
}
if eventTypeStr, ok := codes[code]; ok {
return eventTypeStr
......@@ -199,6 +201,11 @@ type dropPartitionEventWriter struct {
dropPartitionEventData
}
type indexFileEventWriter struct {
baseEventWriter
indexFileEventData
}
func newDescriptorEvent() *descriptorEvent {
header := newDescriptorEventHeader()
data := newDescriptorEventData()
......@@ -355,3 +362,26 @@ func newDropPartitionEventWriter(dataType schemapb.DataType) (*dropPartitionEven
writer.baseEventWriter.writeEventData = writer.dropPartitionEventData.WriteEventData
return writer, nil
}
func newIndexFileEventWriter() (*indexFileEventWriter, error) {
payloadWriter, err := NewPayloadWriter(schemapb.DataType_String)
if err != nil {
return nil, err
}
header := newEventHeader(IndexFileEventType)
data := newIndexFileEventData()
writer := &indexFileEventWriter{
baseEventWriter: baseEventWriter{
eventHeader: *header,
PayloadWriterInterface: payloadWriter,
isClosed: false,
isFinish: false,
},
indexFileEventData: *data,
}
writer.baseEventWriter.getEventDataSize = writer.indexFileEventData.GetEventDataFixPartSize
writer.baseEventWriter.writeEventData = writer.indexFileEventData.WriteEventData
return writer, nil
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册