From 5b42a3223c7db9e08e9c1b26691373962bb5b9ce Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Tue, 26 Oct 2021 15:34:21 +0800 Subject: [PATCH] Increase compatibility for EstimateMemorySize interface (#10603) Signed-off-by: cai.zhang --- internal/kv/kv.go | 1 + internal/kv/mem/mem_kv.go | 9 +++++++ internal/kv/mem/mem_kv_test.go | 20 +++++++++++++++ internal/kv/minio/minio_kv.go | 9 +++++++ internal/kv/minio/minio_kv_test.go | 28 ++++++++++++++++++++ internal/querynode/index_loader.go | 5 +++- internal/querynode/segment_loader.go | 5 +++- internal/storage/utils.go | 38 +--------------------------- internal/storage/utils_test.go | 22 +++++++++++++++- 9 files changed, 97 insertions(+), 40 deletions(-) diff --git a/internal/kv/kv.go b/internal/kv/kv.go index 9386ee44d..075dc7b08 100644 --- a/internal/kv/kv.go +++ b/internal/kv/kv.go @@ -34,6 +34,7 @@ type BaseKV interface { type DataKV interface { BaseKV LoadPartial(key string, start, end int64) ([]byte, error) + GetSize(key string) (int64, error) } // TxnKV contains extra txn operations of kv. The extra operations is transactional. diff --git a/internal/kv/mem/mem_kv.go b/internal/kv/mem/mem_kv.go index 85314e9da..7188e1508 100644 --- a/internal/kv/mem/mem_kv.go +++ b/internal/kv/mem/mem_kv.go @@ -212,3 +212,12 @@ func (kv *MemoryKV) LoadPartial(key string, start, end int64) ([]byte, error) { start, end) } } + +func (kv *MemoryKV) GetSize(key string) (int64, error) { + value, err := kv.Load(key) + if err != nil { + return 0, err + } + + return int64(len(value)), nil +} diff --git a/internal/kv/mem/mem_kv_test.go b/internal/kv/mem/mem_kv_test.go index b824d52ea..95882aeb3 100644 --- a/internal/kv/mem/mem_kv_test.go +++ b/internal/kv/mem/mem_kv_test.go @@ -56,3 +56,23 @@ func TestMemoryKV_LoadPartial(t *testing.T) { _, err = memKV.LoadPartial(key, start, end) assert.Error(t, err) } + +func TestMemoryKV_GetSize(t *testing.T) { + memKV := NewMemoryKV() + + key := "TestMemoryKV_GetSize_key" + value := "TestMemoryKV_GetSize_value" + + err := memKV.Save(key, value) + assert.NoError(t, err) + + size, err := memKV.GetSize(key) + assert.NoError(t, err) + assert.Equal(t, size, int64(len(value))) + + key2 := "TestMemoryKV_GetSize_key2" + + size, err = memKV.GetSize(key2) + assert.NoError(t, err) + assert.Equal(t, int64(0), size) +} diff --git a/internal/kv/minio/minio_kv.go b/internal/kv/minio/minio_kv.go index ea92b5423..983a5b0f0 100644 --- a/internal/kv/minio/minio_kv.go +++ b/internal/kv/minio/minio_kv.go @@ -272,6 +272,15 @@ func (kv *MinIOKV) LoadPartial(key string, start, end int64) ([]byte, error) { return ioutil.ReadAll(object) } +func (kv *MinIOKV) GetSize(key string) (int64, error) { + objectInfo, err := kv.minioClient.StatObject(kv.ctx, kv.bucketName, key, minio.StatObjectOptions{}) + if err != nil { + return 0, err + } + + return objectInfo.Size, nil +} + func (kv *MinIOKV) Close() { } diff --git a/internal/kv/minio/minio_kv_test.go b/internal/kv/minio/minio_kv_test.go index f4df16e2a..22b50ac51 100644 --- a/internal/kv/minio/minio_kv_test.go +++ b/internal/kv/minio/minio_kv_test.go @@ -334,3 +334,31 @@ func TestMinIOKV_FGetObjects(t *testing.T) { defer file1.Close() defer os.Remove(path + name2) } + +func TestMinIOKV_GetSize(t *testing.T) { + Params.Init() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + bucketName := "fantastic-tech-test" + minIOKV, err := newMinIOKVClient(ctx, bucketName) + assert.Nil(t, err) + defer minIOKV.RemoveWithPrefix("") + + key := "TestMinIOKV_GetSize_key" + value := "TestMinIOKV_GetSize_value" + + err = minIOKV.Save(key, value) + assert.NoError(t, err) + + size, err := minIOKV.GetSize(key) + assert.NoError(t, err) + assert.Equal(t, size, int64(len(value))) + + key2 := "TestMemoryKV_GetSize_key2" + + size, err = minIOKV.GetSize(key2) + assert.Error(t, err) + assert.Equal(t, int64(0), size) +} diff --git a/internal/querynode/index_loader.go b/internal/querynode/index_loader.go index 47702f5ea..43c1ea3f3 100644 --- a/internal/querynode/index_loader.go +++ b/internal/querynode/index_loader.go @@ -152,7 +152,10 @@ func (loader *indexLoader) estimateIndexBinlogSize(segment *Segment, fieldID Fie for _, p := range indexPaths { logSize, err := storage.EstimateMemorySize(loader.kv, p) if err != nil { - return 0, err + logSize, err = storage.GetBinlogSize(loader.kv, p) + if err != nil { + return 0, err + } } indexSize += logSize } diff --git a/internal/querynode/segment_loader.go b/internal/querynode/segment_loader.go index 7c3f12215..0ace3b4cc 100644 --- a/internal/querynode/segment_loader.go +++ b/internal/querynode/segment_loader.go @@ -407,7 +407,10 @@ func (loader *segmentLoader) estimateSegmentSize(segment *Segment, for _, binlogPath := range fb.Binlogs { logSize, err := storage.EstimateMemorySize(loader.minioKV, binlogPath) if err != nil { - return 0, err + logSize, err = storage.GetBinlogSize(loader.minioKV, binlogPath) + if err != nil { + return 0, err + } } segmentSize += logSize } diff --git a/internal/storage/utils.go b/internal/storage/utils.go index f0ed4d178..b7dd0c04f 100644 --- a/internal/storage/utils.go +++ b/internal/storage/utils.go @@ -26,44 +26,8 @@ import ( // key not in binlog format, size = (a not accurate number), error != nil; // failed to read event reader, size = (a not accurate number), error != nil; func GetBinlogSize(kv kv.DataKV, key string) (int64, error) { - total := int64(0) - - header := &baseEventHeader{} - headerSize := binary.Size(header) - startPos := binary.Size(MagicNumber) - endPos := startPos + headerSize - - for { - headerContent, err := kv.LoadPartial(key, int64(startPos), int64(endPos)) - if err != nil { - // case 1: key not exist, total = 0; - // case 2: all events have been read, total = (length of all events); - // whatever the case is, the return value is reasonable. - return total, nil - } - - buffer := bytes.NewBuffer(headerContent) - - header, err := readEventHeader(buffer) - if err != nil { - // FIXME(dragondriver): should we return 0 here? - return total, fmt.Errorf("failed to read event reader: %v", err) - } - - if header.EventLength <= 0 || header.NextPosition < int32(endPos) { - // key not in binlog format - // FIXME(dragondriver): should we return 0 here? - return total, fmt.Errorf("key not in binlog format") - } - - total += int64(header.EventLength) - // startPos = startPos + int(header.EventLength) - // || - // \/ - startPos = int(header.NextPosition) - endPos = startPos + headerSize - } + return kv.GetSize(key) } // EstimateMemorySize get approximate memory size of a binlog file. diff --git a/internal/storage/utils_test.go b/internal/storage/utils_test.go index ddc26fb25..a62b7e356 100644 --- a/internal/storage/utils_test.go +++ b/internal/storage/utils_test.go @@ -45,6 +45,10 @@ func (kv *mockLessHeaderDataKV) LoadPartial(key string, start, end int64) ([]byt return ret, nil } +func (kv *mockLessHeaderDataKV) GetSize(key string) (int64, error) { + return 0, errors.New("less header") +} + func newMockLessHeaderDataKV() *mockLessHeaderDataKV { return &mockLessHeaderDataKV{} } @@ -65,6 +69,10 @@ func (kv *mockWrongHeaderDataKV) LoadPartial(key string, start, end int64) ([]by return buffer.Bytes(), nil } +func (kv *mockWrongHeaderDataKV) GetSize(key string) (int64, error) { + return 0, errors.New("wrong header") +} + func newMockWrongHeaderDataKV() kv.DataKV { return &mockWrongHeaderDataKV{} } @@ -121,7 +129,7 @@ func TestGetBinlogSize(t *testing.T) { size, err = GetBinlogSize(memoryKV, blob.Key) assert.Nil(t, err) - assert.Equal(t, size+int64(binary.Size(MagicNumber)), int64(len(blob.Value))) + assert.Equal(t, size, int64(len(blob.Value))) } } @@ -243,6 +251,10 @@ func (kv *mockFailedToGetDescDataKV) LoadPartial(key string, start, end int64) ( return buf.Bytes(), nil } +func (kv *mockFailedToGetDescDataKV) GetSize(key string) (int64, error) { + return 0, nil +} + func newMockFailedToGetDescDataKV() *mockFailedToGetDescDataKV { return &mockFailedToGetDescDataKV{} } @@ -282,6 +294,10 @@ func (kv *mockLessDescDataKV) LoadPartial(key string, start, end int64) ([]byte, */ } +func (kv *mockLessDescDataKV) GetSize(key string) (int64, error) { + return 0, nil +} + func newMockLessDescDataKV() *mockLessDescDataKV { return &mockLessDescDataKV{} } @@ -307,6 +323,10 @@ func (kv *mockOriginalSizeDataKV) LoadPartial(key string, start, end int64) ([]b return nil, nil } +func (kv *mockOriginalSizeDataKV) GetSize(key string) (int64, error) { + return 0, nil +} + func newMockOriginalSizeDataKV() *mockOriginalSizeDataKV { return &mockOriginalSizeDataKV{} } -- GitLab