diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 937c4c1a1812232bc986b2b9d356a7c916db4c37..b29a1d437f0f6ff5dea000b6e2e294a215744046 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -66,6 +66,9 @@ dataCoord: dataNode: port: 21124 +storage: + path: /var/lib/milvus/data/ + log: level: debug # info, warn, error, panic, fatal file: diff --git a/internal/kv/minio/minio_kv.go b/internal/kv/minio/minio_kv.go index b135950b5447bb5113c45229ee405d8d65f01f05..7d1b8e5589481910946a8a91a09286edda61bafb 100644 --- a/internal/kv/minio/minio_kv.go +++ b/internal/kv/minio/minio_kv.go @@ -13,17 +13,13 @@ package miniokv import ( "context" - "encoding/json" "fmt" - "io/ioutil" - "math/rand" - "time" + "sync" "io" "strings" "github.com/milvus-io/milvus/internal/log" - "github.com/milvus-io/milvus/internal/util/performance" "github.com/milvus-io/milvus/internal/util/retry" "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" @@ -121,6 +117,39 @@ func (kv *MinIOKV) Load(key string) (string, error) { return buf.String(), nil } +// FGetObject download file from minio to local storage system. +func (kv *MinIOKV) FGetObject(key, localPath string) error { + err := kv.minioClient.FGetObject(kv.ctx, kv.bucketName, key, localPath+key, minio.GetObjectOptions{}) + if err != nil { + return err + } + return nil +} + +// FGetObjects download file from minio to local storage system. +// For parallell downloads file, n goroutines will be started to download n keys. +func (kv *MinIOKV) FGetObjects(keys []string, localPath string) error { + var wg sync.WaitGroup + el := make(errorList, len(keys)) + for i, key := range keys { + wg.Add(1) + go func(i int, key string) { + err := kv.minioClient.FGetObject(kv.ctx, kv.bucketName, key, localPath+key, minio.GetObjectOptions{}) + if err != nil { + el[i] = err + } + wg.Done() + }(i, key) + } + wg.Wait() + for _, err := range el { + if err != nil { + return el + } + } + return nil +} + func (kv *MinIOKV) MultiLoad(keys []string) ([]string, error) { var resultErr error var objectsValues []string @@ -202,46 +231,13 @@ func (kv *MinIOKV) Close() { } -type Case struct { - Name string - BlockSize int // unit: byte - Speed float64 // unit: MB/s -} - -type Test struct { - Name string - Cases []Case -} - -func (kv *MinIOKV) performanceTest(toFile bool, totalBytes int) { - r := rand.Int() - results := Test{Name: "MinIO performance"} - for i := 0; i < 10; i += 2 { - data := performance.GenerateData(2*1024, float64(9-i)) - startT := time.Now() - for j := 0; j < totalBytes/(len(data)); j++ { - kv.Save(fmt.Sprintf("performance-rand%d-test-%d-%d", r, i, j), data) - } - tc := time.Since(startT) - results.Cases = append(results.Cases, Case{Name: "write", BlockSize: len(data), Speed: 16.0 / tc.Seconds()}) +type errorList []error - startT = time.Now() - for j := 0; j < totalBytes/(len(data)); j++ { - kv.Load(fmt.Sprintf("performance-rand%d-test-%d-%d", r, i, j)) - } - tc = time.Since(startT) - results.Cases = append(results.Cases, Case{Name: "read", BlockSize: len(data), Speed: 16.0 / tc.Seconds()}) - } - kv.RemoveWithPrefix(fmt.Sprintf("performance-rand%d", r)) - mb, err := json.Marshal(results) - if err != nil { - return - } - log.Debug(string(mb)) - if toFile { - err = ioutil.WriteFile(fmt.Sprintf("./%d", r), mb, 0644) - if err != nil { - return - } +func (el errorList) Error() string { + var builder strings.Builder + builder.WriteString("All downloads results:\n") + for index, err := range el { + builder.WriteString(fmt.Sprintf("downloads #%d:%s\n", index+1, err.Error())) } + return builder.String() } diff --git a/internal/kv/minio/minio_kv_test.go b/internal/kv/minio/minio_kv_test.go index d2b6d017ed1d03514552eaecc29745156f226c01..4cb93d976b3253f3501802461b45238dccd9b7a0 100644 --- a/internal/kv/minio/minio_kv_test.go +++ b/internal/kv/minio/minio_kv_test.go @@ -13,6 +13,8 @@ package miniokv_test import ( "context" + "io/ioutil" + "os" "strconv" "testing" @@ -186,3 +188,94 @@ func TestMinIOKV_Remove(t *testing.T) { assert.Error(t, err) assert.Empty(t, val) } + +func TestMinIOKV_FGetObject(t *testing.T) { + Params.Init() + path := "/tmp/milvus/data" + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + bucketName := "fantastic-tech-test" + MinIOKV, err := newMinIOKVClient(ctx, bucketName) + assert.Nil(t, err) + defer MinIOKV.RemoveWithPrefix("") + + name1 := "31280791048324/4325023534/53443534/key_1" + value1 := "123" + err = MinIOKV.Save(name1, value1) + assert.Nil(t, err) + name2 := "312895849354/31205934503459/18948129301/key_2" + value2 := "333" + err = MinIOKV.Save(name2, value2) + assert.Nil(t, err) + + err = MinIOKV.FGetObject(name1, path) + assert.Nil(t, err) + + err = MinIOKV.FGetObject(name2, path) + assert.Nil(t, err) + + err = MinIOKV.FGetObject("fail", path) + assert.NotNil(t, err) + + file1, err := os.Open(path + name1) + assert.Nil(t, err) + content1, err := ioutil.ReadAll(file1) + assert.Nil(t, err) + assert.Equal(t, value1, string(content1)) + defer file1.Close() + defer os.Remove(path + name1) + + file2, err := os.Open(path + name2) + assert.Nil(t, err) + content2, err := ioutil.ReadAll(file2) + assert.Nil(t, err) + assert.Equal(t, value2, string(content2)) + defer file1.Close() + defer os.Remove(path + name2) +} + +func TestMinIOKV_FGetObjects(t *testing.T) { + Params.Init() + path := "/tmp/milvus/data" + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + bucketName := "fantastic-tech-test" + MinIOKV, err := newMinIOKVClient(ctx, bucketName) + assert.Nil(t, err) + defer MinIOKV.RemoveWithPrefix("") + + name1 := "31280791048324/4325023534/53443534/key_1" + value1 := "123" + err = MinIOKV.Save(name1, value1) + assert.Nil(t, err) + name2 := "312895849354/31205934503459/18948129301/key_2" + value2 := "333" + err = MinIOKV.Save(name2, value2) + assert.Nil(t, err) + + err = MinIOKV.FGetObjects([]string{name1, name2}, path) + assert.Nil(t, err) + + err = MinIOKV.FGetObjects([]string{"fail1", "fail2"}, path) + assert.NotNil(t, err) + + file1, err := os.Open(path + name1) + assert.Nil(t, err) + content1, err := ioutil.ReadAll(file1) + assert.Nil(t, err) + assert.Equal(t, value1, string(content1)) + defer file1.Close() + defer os.Remove(path + name1) + + file2, err := os.Open(path + name2) + assert.Nil(t, err) + content2, err := ioutil.ReadAll(file2) + assert.Nil(t, err) + assert.Equal(t, value2, string(content2)) + defer file1.Close() + defer os.Remove(path + name2) +}