未验证 提交 f85271cf 编写于 作者: D dragondriver 提交者: GitHub

Estimate memory size by descriptor event (#9688)

Signed-off-by: Ndragondriver <jiquan.long@zilliz.com>
上级 0717d4eb
......@@ -13,10 +13,6 @@ package storage
import (
"bytes"
"encoding/binary"
"fmt"
"strconv"
"errors"
)
......@@ -47,14 +43,9 @@ func (reader *BinlogReader) NextEventReader() (*EventReader, error) {
}
func (reader *BinlogReader) readMagicNumber() (int32, error) {
if err := binary.Read(reader.buffer, binary.LittleEndian, &reader.magicNumber); err != nil {
return -1, err
}
if reader.magicNumber != MagicNumber {
return -1, fmt.Errorf("parse magic number failed, expected: %s, actual: %s", strconv.Itoa(int(MagicNumber)), strconv.Itoa(int(reader.magicNumber)))
}
return reader.magicNumber, nil
var err error
reader.magicNumber, err = readMagicNumber(reader.buffer)
return reader.magicNumber, err
}
func (reader *BinlogReader) readDescriptorEvent() (*descriptorEvent, error) {
......
......@@ -15,7 +15,9 @@ import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"io"
"strconv"
"github.com/milvus-io/milvus/internal/proto/schemapb"
)
......@@ -77,6 +79,18 @@ func (event *descriptorEvent) Write(buffer io.Writer) error {
return nil
}
func readMagicNumber(buffer io.Reader) (int32, error) {
var magicNumber int32
if err := binary.Read(buffer, binary.LittleEndian, &magicNumber); err != nil {
return -1, err
}
if magicNumber != MagicNumber {
return -1, fmt.Errorf("parse magic number failed, expected: %s, actual: %s", strconv.Itoa(int(MagicNumber)), strconv.Itoa(int(magicNumber)))
}
return magicNumber, nil
}
func ReadDescriptorEvent(buffer io.Reader) (*descriptorEvent, error) {
header, err := readDescriptorEventHeader(buffer)
if err != nil {
......
......@@ -85,3 +85,23 @@ func TestEventWriter(t *testing.T) {
err = insertEvent.Close()
assert.Nil(t, err)
}
func TestReadMagicNumber(t *testing.T) {
var err error
buf := bytes.Buffer{}
// eof
_, err = readMagicNumber(&buf)
assert.Error(t, err)
// not a magic number
_ = binary.Write(&buf, binary.LittleEndian, MagicNumber+1)
_, err = readMagicNumber(&buf)
assert.Error(t, err)
// normal case
_ = binary.Write(&buf, binary.LittleEndian, MagicNumber)
num, err := readMagicNumber(&buf)
assert.NoError(t, err)
assert.Equal(t, MagicNumber, num)
}
......@@ -15,6 +15,7 @@ import (
"bytes"
"encoding/binary"
"fmt"
"strconv"
"github.com/milvus-io/milvus/internal/kv"
)
......@@ -64,3 +65,66 @@ func GetBinlogSize(kv kv.DataKV, key string) (int64, error) {
endPos = startPos + headerSize
}
}
// EstimateMemorySize get approximate memory size of a binlog file.
// 1, key not exist, size = 0, error != nil;
// 2, failed to read event header, size = 0, error != nil;
// 3, invalid event length, size = 0, error != nil;
// 4, failed to read descriptor event, size = 0, error != nil;
// 5, original_size not in extra, size = 0, error != nil;
// 6, original_size not in int format, size = 0, error != nil;
// 7, normal binlog with original_size, return original_size, error = nil;
func EstimateMemorySize(kv kv.DataKV, key string) (int64, error) {
total := int64(0)
header := &eventHeader{}
headerSize := binary.Size(header)
startPos := binary.Size(MagicNumber)
endPos := startPos + headerSize
// get header
headerContent, err := kv.LoadPartial(key, int64(startPos), int64(endPos))
if err != nil {
return total, err
}
buffer := bytes.NewBuffer(headerContent)
header, err = readEventHeader(buffer)
if err != nil {
return total, err
}
if header.EventLength <= 0 {
return total, fmt.Errorf("key %v not in binlog format", key)
}
desc := &descriptorEvent{}
endPos = startPos + int(header.EventLength)
descContent, err := kv.LoadPartial(key, int64(startPos), int64(endPos))
if err != nil {
return total, err
}
buffer = bytes.NewBuffer(descContent)
desc, err = ReadDescriptorEvent(buffer)
if err != nil {
return total, err
}
sizeStr, ok := desc.Extras[originalSizeKey]
if !ok {
return total, fmt.Errorf("key %v not in extra information", originalSizeKey)
}
size, err := strconv.Atoi(fmt.Sprintf("%v", sizeStr))
if err != nil {
return total, fmt.Errorf("%v not in valid format, value: %v", originalSizeKey, sizeStr)
}
total = int64(size)
return total, nil
}
......@@ -15,6 +15,9 @@ import (
"bytes"
"crypto/rand"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"testing"
"github.com/milvus-io/milvus/internal/util/funcutil"
......@@ -210,3 +213,321 @@ func TestGetBinlogSize_not_in_binlog_format(t *testing.T) {
_, err := GetBinlogSize(mockKV, key)
assert.Error(t, err)
}
func TestEstimateMemorySize(t *testing.T) {
memoryKV := memkv.NewMemoryKV()
defer memoryKV.Close()
key := "TestEstimateMemorySize"
var size int64
var err error
// key not in memoryKV
_, err = EstimateMemorySize(memoryKV, key)
assert.Error(t, err)
// normal binlog key, for example, index binlog
indexBuildID := UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt())
version := int64(uniquegenerator.GetUniqueIntGeneratorIns().GetInt())
collectionID := UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt())
partitionID := UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt())
segmentID := UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt())
fieldID := UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt())
indexName := funcutil.GenRandomStr()
indexID := UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt())
indexParams := make(map[string]string)
indexParams["index_type"] = "IVF_FLAT"
datas := []*Blob{
{
Key: "ivf1",
Value: []byte{1, 2, 3},
},
{
Key: "ivf2",
Value: []byte{4, 5, 6},
},
{
Key: "large",
Value: []byte(funcutil.RandomString(maxLengthPerRowOfIndexFile + 1)),
},
}
codec := NewIndexFileBinlogCodec()
defer codec.Close()
serializedBlobs, err := codec.Serialize(indexBuildID, version, collectionID, partitionID, segmentID, fieldID, indexParams, indexName, indexID, datas)
assert.Nil(t, err)
for _, blob := range serializedBlobs {
err = memoryKV.Save(blob.Key, string(blob.Value))
assert.Nil(t, err)
buf := bytes.NewBuffer(blob.Value)
desc := &descriptorEvent{}
_, _ = readMagicNumber(buf)
desc, _ = ReadDescriptorEvent(buf)
size, err = EstimateMemorySize(memoryKV, blob.Key)
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf("%v", desc.Extras[originalSizeKey]), fmt.Sprintf("%v", size))
}
}
// cover case that failed to read event header
func TestEstimateMemorySize_less_header(t *testing.T) {
mockKV := newMockLessHeaderDataKV()
key := "TestEstimateMemorySize_less_header"
_, err := EstimateMemorySize(mockKV, key)
assert.Error(t, err)
}
// cover case that file not in binlog format
func TestEstimateMemorySize_not_in_binlog_format(t *testing.T) {
mockKV := newMockWrongHeaderDataKV()
key := "TestEstimateMemorySize_not_in_binlog_format"
_, err := EstimateMemorySize(mockKV, key)
assert.Error(t, err)
}
type mockFailedToGetDescDataKV struct {
}
func (kv *mockFailedToGetDescDataKV) Load(key string) (string, error) {
panic("implement me")
}
func (kv *mockFailedToGetDescDataKV) MultiLoad(keys []string) ([]string, error) {
panic("implement me")
}
func (kv *mockFailedToGetDescDataKV) LoadWithPrefix(key string) ([]string, []string, error) {
panic("implement me")
}
func (kv *mockFailedToGetDescDataKV) Save(key, value string) error {
panic("implement me")
}
func (kv *mockFailedToGetDescDataKV) MultiSave(kvs map[string]string) error {
panic("implement me")
}
func (kv *mockFailedToGetDescDataKV) Remove(key string) error {
panic("implement me")
}
func (kv *mockFailedToGetDescDataKV) MultiRemove(keys []string) error {
panic("implement me")
}
func (kv *mockFailedToGetDescDataKV) RemoveWithPrefix(key string) error {
panic("implement me")
}
func (kv *mockFailedToGetDescDataKV) Close() {
panic("implement me")
}
func (kv *mockFailedToGetDescDataKV) LoadPartial(key string, start, end int64) ([]byte, error) {
header := &eventHeader{}
header.EventLength = 20
headerSize := binary.Size(header)
if end-start > int64(headerSize) {
return nil, errors.New("mock failed to get desc data")
}
buf := bytes.Buffer{}
_ = binary.Write(&buf, binary.LittleEndian, header)
return buf.Bytes(), nil
}
func newMockFailedToGetDescDataKV() *mockFailedToGetDescDataKV {
return &mockFailedToGetDescDataKV{}
}
// cover case that failed to get descriptor event content
func TestEstimateMemorySize_failed_to_load_desc(t *testing.T) {
mockKV := newMockFailedToGetDescDataKV()
key := "TestEstimateMemorySize_failed_to_load_desc"
_, err := EstimateMemorySize(mockKV, key)
assert.Error(t, err)
}
type mockLessDescDataKV struct {
}
func (kv *mockLessDescDataKV) Load(key string) (string, error) {
panic("implement me")
}
func (kv *mockLessDescDataKV) MultiLoad(keys []string) ([]string, error) {
panic("implement me")
}
func (kv *mockLessDescDataKV) LoadWithPrefix(key string) ([]string, []string, error) {
panic("implement me")
}
func (kv *mockLessDescDataKV) Save(key, value string) error {
panic("implement me")
}
func (kv *mockLessDescDataKV) MultiSave(kvs map[string]string) error {
panic("implement me")
}
func (kv *mockLessDescDataKV) Remove(key string) error {
panic("implement me")
}
func (kv *mockLessDescDataKV) MultiRemove(keys []string) error {
panic("implement me")
}
func (kv *mockLessDescDataKV) RemoveWithPrefix(key string) error {
panic("implement me")
}
func (kv *mockLessDescDataKV) Close() {
panic("implement me")
}
func (kv *mockLessDescDataKV) LoadPartial(key string, start, end int64) ([]byte, error) {
header := &baseEventHeader{}
header.EventLength = 20
buffer := bytes.Buffer{}
_ = binary.Write(&buffer, binary.LittleEndian, header)
// no event data
return buffer.Bytes(), nil
/*
desc := &descriptorEvent{}
desc.ExtraLength = 2
desc.ExtraBytes = []byte{1, 2}
buffer := bytes.Buffer{}
_ = binary.Write(&buffer, binary.LittleEndian, desc)
// extra not in json format
return buffer.Bytes(), nil
*/
}
func newMockLessDescDataKV() *mockLessDescDataKV {
return &mockLessDescDataKV{}
}
func TestEstimateMemorySize_less_desc_data(t *testing.T) {
mockKV := newMockLessDescDataKV()
key := "TestEstimateMemorySize_less_desc_data"
_, err := EstimateMemorySize(mockKV, key)
assert.Error(t, err)
}
type mockOriginalSizeDataKV struct {
impl func(key string, start, end int64) ([]byte, error)
}
func (kv *mockOriginalSizeDataKV) Load(key string) (string, error) {
panic("implement me")
}
func (kv *mockOriginalSizeDataKV) MultiLoad(keys []string) ([]string, error) {
panic("implement me")
}
func (kv *mockOriginalSizeDataKV) LoadWithPrefix(key string) ([]string, []string, error) {
panic("implement me")
}
func (kv *mockOriginalSizeDataKV) Save(key, value string) error {
panic("implement me")
}
func (kv *mockOriginalSizeDataKV) MultiSave(kvs map[string]string) error {
panic("implement me")
}
func (kv *mockOriginalSizeDataKV) Remove(key string) error {
panic("implement me")
}
func (kv *mockOriginalSizeDataKV) MultiRemove(keys []string) error {
panic("implement me")
}
func (kv *mockOriginalSizeDataKV) RemoveWithPrefix(key string) error {
panic("implement me")
}
func (kv *mockOriginalSizeDataKV) Close() {
panic("implement me")
}
func (kv *mockOriginalSizeDataKV) LoadPartial(key string, start, end int64) ([]byte, error) {
if kv.impl != nil {
return kv.impl(key, start, end)
}
return nil, nil
}
func newMockOriginalSizeDataKV() *mockOriginalSizeDataKV {
return &mockOriginalSizeDataKV{}
}
func TestEstimateMemorySize_no_original_size(t *testing.T) {
mockKV := newMockOriginalSizeDataKV()
mockKV.impl = func(key string, start, end int64) ([]byte, error) {
desc := &descriptorEvent{}
desc.descriptorEventHeader.EventLength = 20
desc.descriptorEventData = *newDescriptorEventData()
extra := make(map[string]interface{})
extra["key"] = "value"
extraBytes, _ := json.Marshal(extra)
desc.ExtraBytes = extraBytes
desc.ExtraLength = int32(len(extraBytes))
buf := bytes.Buffer{}
_ = desc.descriptorEventHeader.Write(&buf)
_ = desc.descriptorEventData.Write(&buf)
return buf.Bytes(), nil
}
key := "TestEstimateMemorySize_no_original_size"
_, err := EstimateMemorySize(mockKV, key)
assert.Error(t, err)
}
func TestEstimateMemorySize_cannot_convert_original_size_to_int(t *testing.T) {
mockKV := newMockOriginalSizeDataKV()
mockKV.impl = func(key string, start, end int64) ([]byte, error) {
desc := &descriptorEvent{}
desc.descriptorEventHeader.EventLength = 20
desc.descriptorEventData = *newDescriptorEventData()
extra := make(map[string]interface{})
extra[originalSizeKey] = "value"
extraBytes, _ := json.Marshal(extra)
desc.ExtraBytes = extraBytes
desc.ExtraLength = int32(len(extraBytes))
buf := bytes.Buffer{}
_ = desc.descriptorEventHeader.Write(&buf)
_ = desc.descriptorEventData.Write(&buf)
return buf.Bytes(), nil
}
key := "TestEstimateMemorySize_cannot_convert_original_size_to_int"
_, err := EstimateMemorySize(mockKV, key)
assert.Error(t, err)
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册