minio_kv.go 8.2 KB
Newer Older
1 2 3 4 5 6
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
Y
yukun 已提交
7 8
// with the License. You may obtain a copy of the License at
//
9
//     http://www.apache.org/licenses/LICENSE-2.0
Y
yukun 已提交
10
//
11 12 13 14 15
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
Y
yukun 已提交
16

Z
zhenshan.cao 已提交
17 18 19 20
package miniokv

import (
	"context"
N
neza2017 已提交
21
	"fmt"
22
	"io/ioutil"
G
godchen 已提交
23
	"sync"
N
neza2017 已提交
24

Z
zhenshan.cao 已提交
25 26 27
	"io"
	"strings"

X
Xiangyu Wang 已提交
28 29
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/util/retry"
Z
zhenshan.cao 已提交
30
	"github.com/minio/minio-go/v7"
N
neza2017 已提交
31
	"github.com/minio/minio-go/v7/pkg/credentials"
紫晴 已提交
32
	"go.uber.org/zap"
Z
zhenshan.cao 已提交
33 34
)

35 36
// MinIOKV implements DataKV interface and relies on underling MinIO service.
// MinIOKV object contains a client which can be used to access the MinIO service.
Z
zhenshan.cao 已提交
37 38 39 40 41 42
type MinIOKV struct {
	ctx         context.Context
	minioClient *minio.Client
	bucketName  string
}

43
// Option option when creates MinIOKV.
N
neza2017 已提交
44 45 46 47 48 49 50 51 52
type Option struct {
	Address           string
	AccessKeyID       string
	BucketName        string
	SecretAccessKeyID string
	UseSSL            bool
	CreateBucket      bool // when bucket not existed, create it
}

G
godchen 已提交
53
// NewMinIOKV creates MinIOKV to save and load object to MinIOKV.
N
neza2017 已提交
54
func NewMinIOKV(ctx context.Context, option *Option) (*MinIOKV, error) {
55
	var minIOClient *minio.Client
C
congqixia 已提交
56 57 58 59 60 61
	var err error
	minIOClient, err = minio.New(option.Address, &minio.Options{
		Creds:  credentials.NewStaticV4(option.AccessKeyID, option.SecretAccessKeyID, ""),
		Secure: option.UseSSL,
	})
	// options nil or invalid formatted endpoint, don't need retry
N
neza2017 已提交
62 63 64
	if err != nil {
		return nil, err
	}
C
congqixia 已提交
65 66 67 68
	var bucketExists bool
	// check valid in first query
	checkBucketFn := func() error {
		bucketExists, err = minIOClient.BucketExists(ctx, option.BucketName)
69 70 71 72 73 74 75 76 77 78 79 80
		if err != nil {
			return err
		}
		if !bucketExists {
			log.Debug("MinioKV NewMinioKV", zap.Any("Check bucket", "bucket not exist"))
			if option.CreateBucket {
				log.Debug("MinioKV NewMinioKV create bucket.")
				return minIOClient.MakeBucket(ctx, option.BucketName, minio.MakeBucketOptions{})
			}
			return fmt.Errorf("bucket %s not Existed", option.BucketName)
		}
		return nil
C
congqixia 已提交
81
	}
G
godchen 已提交
82
	err = retry.Do(ctx, checkBucketFn, retry.Attempts(300))
Z
zhenshan.cao 已提交
83 84 85
	if err != nil {
		return nil, err
	}
N
neza2017 已提交
86

紫晴 已提交
87
	kv := &MinIOKV{
Z
zhenshan.cao 已提交
88
		ctx:         ctx,
N
neza2017 已提交
89 90
		minioClient: minIOClient,
		bucketName:  option.BucketName,
紫晴 已提交
91
	}
92
	log.Debug("MinioKV new MinioKV success.")
紫晴 已提交
93 94

	return kv, nil
Z
zhenshan.cao 已提交
95 96
}

97
// Exist checks whether a key exists in MinIO.
G
godchen 已提交
98 99 100 101 102
func (kv *MinIOKV) Exist(key string) bool {
	_, err := kv.minioClient.StatObject(kv.ctx, kv.bucketName, key, minio.StatObjectOptions{})
	return err == nil
}

103
// LoadWithPrefix loads objects with the same prefix @key from minio .
Z
zhenshan.cao 已提交
104 105 106 107 108 109 110 111 112 113 114
func (kv *MinIOKV) LoadWithPrefix(key string) ([]string, []string, error) {
	objects := kv.minioClient.ListObjects(kv.ctx, kv.bucketName, minio.ListObjectsOptions{Prefix: key})

	var objectsKeys []string
	var objectsValues []string

	for object := range objects {
		objectsKeys = append(objectsKeys, object.Key)
	}
	objectsValues, err := kv.MultiLoad(objectsKeys)
	if err != nil {
G
godchen 已提交
115 116
		log.Error(fmt.Sprintf("MinIO load with prefix error. path = %s", key), zap.Error(err))
		return nil, nil, err
Z
zhenshan.cao 已提交
117 118 119 120 121
	}

	return objectsKeys, objectsValues, nil
}

122
// Load loads an object with @key.
Z
zhenshan.cao 已提交
123 124
func (kv *MinIOKV) Load(key string) (string, error) {
	object, err := kv.minioClient.GetObject(kv.ctx, kv.bucketName, key, minio.GetObjectOptions{})
125 126 127
	if object != nil {
		defer object.Close()
	}
Z
zhenshan.cao 已提交
128 129 130
	if err != nil {
		return "", err
	}
131 132 133 134
	info, err := object.Stat()
	if err != nil {
		return "", err
	}
Z
zhenshan.cao 已提交
135
	buf := new(strings.Builder)
136
	buf.Grow(int(info.Size))
Z
zhenshan.cao 已提交
137 138 139 140 141 142 143
	_, err = io.Copy(buf, object)
	if err != nil && err != io.EOF {
		return "", err
	}
	return buf.String(), nil
}

144
// FGetObject downloads file from minio to local storage system.
G
godchen 已提交
145
func (kv *MinIOKV) FGetObject(key, localPath string) error {
146
	return kv.minioClient.FGetObject(kv.ctx, kv.bucketName, key, localPath+key, minio.GetObjectOptions{})
G
godchen 已提交
147 148
}

149
// FGetObjects downloads files from minio to local storage system.
G
godchen 已提交
150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172
// For parallell downloads file, n goroutines will be started to download n keys.
func (kv *MinIOKV) FGetObjects(keys []string, localPath string) error {
	var wg sync.WaitGroup
	el := make(errorList, len(keys))
	for i, key := range keys {
		wg.Add(1)
		go func(i int, key string) {
			err := kv.minioClient.FGetObject(kv.ctx, kv.bucketName, key, localPath+key, minio.GetObjectOptions{})
			if err != nil {
				el[i] = err
			}
			wg.Done()
		}(i, key)
	}
	wg.Wait()
	for _, err := range el {
		if err != nil {
			return el
		}
	}
	return nil
}

G
godchen 已提交
173
// MultiLoad loads objects with multi @keys.
Z
zhenshan.cao 已提交
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189
func (kv *MinIOKV) MultiLoad(keys []string) ([]string, error) {
	var resultErr error
	var objectsValues []string
	for _, key := range keys {
		objectValue, err := kv.Load(key)
		if err != nil {
			if resultErr == nil {
				resultErr = err
			}
		}
		objectsValues = append(objectsValues, objectValue)
	}

	return objectsValues, resultErr
}

G
godchen 已提交
190
// Save object with @key to Minio. Object value is @value.
Z
zhenshan.cao 已提交
191 192 193 194 195 196 197 198 199 200 201
func (kv *MinIOKV) Save(key, value string) error {
	reader := strings.NewReader(value)
	_, err := kv.minioClient.PutObject(kv.ctx, kv.bucketName, key, reader, int64(len(value)), minio.PutObjectOptions{})

	if err != nil {
		return err
	}

	return err
}

202
// MultiSave saves multiple objects, the path is the key of @kvs.
203
// The object value is the value of @kvs.
Z
zhenshan.cao 已提交
204 205 206 207 208 209 210 211 212 213 214 215 216
func (kv *MinIOKV) MultiSave(kvs map[string]string) error {
	var resultErr error
	for key, value := range kvs {
		err := kv.Save(key, value)
		if err != nil {
			if resultErr == nil {
				resultErr = err
			}
		}
	}
	return resultErr
}

217
// RemoveWithPrefix removes all objects with the same prefix @prefix from minio.
Z
zhenshan.cao 已提交
218 219 220 221 222 223
func (kv *MinIOKV) RemoveWithPrefix(prefix string) error {
	objectsCh := make(chan minio.ObjectInfo)

	go func() {
		defer close(objectsCh)

224
		for object := range kv.minioClient.ListObjects(kv.ctx, kv.bucketName, minio.ListObjectsOptions{Prefix: prefix, Recursive: true}) {
Z
zhenshan.cao 已提交
225 226 227 228 229 230 231 232 233 234 235 236
			objectsCh <- object
		}
	}()

	for rErr := range kv.minioClient.RemoveObjects(kv.ctx, kv.bucketName, objectsCh, minio.RemoveObjectsOptions{GovernanceBypass: true}) {
		if rErr.Err != nil {
			return rErr.Err
		}
	}
	return nil
}

237
// Remove deletes an object with @key.
Z
zhenshan.cao 已提交
238 239 240 241 242
func (kv *MinIOKV) Remove(key string) error {
	err := kv.minioClient.RemoveObject(kv.ctx, kv.bucketName, string(key), minio.RemoveObjectOptions{})
	return err
}

243
// MultiRemove deletes a objects with @keys.
Z
zhenshan.cao 已提交
244 245 246 247 248 249 250 251 252 253 254 255 256
func (kv *MinIOKV) MultiRemove(keys []string) error {
	var resultErr error
	for _, key := range keys {
		err := kv.Remove(key)
		if err != nil {
			if resultErr == nil {
				resultErr = err
			}
		}
	}
	return resultErr
}

257
// LoadPartial loads partial data ranged in [start, end) with @key.
258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282
func (kv *MinIOKV) LoadPartial(key string, start, end int64) ([]byte, error) {
	switch {
	case start < 0 || end < 0:
		return nil, fmt.Errorf("invalid range specified: start=%d end=%d",
			start, end)
	case start >= end:
		return nil, fmt.Errorf("invalid range specified: start=%d end=%d",
			start, end)
	}

	opts := minio.GetObjectOptions{}
	err := opts.SetRange(start, end-1)
	if err != nil {
		return nil, err
	}

	object, err := kv.minioClient.GetObject(kv.ctx, kv.bucketName, key, opts)
	if err != nil {
		return nil, err
	}
	defer object.Close()

	return ioutil.ReadAll(object)
}

283
// GetSize obtains the data size of the object with @key.
284 285 286 287 288 289 290 291 292
func (kv *MinIOKV) GetSize(key string) (int64, error) {
	objectInfo, err := kv.minioClient.StatObject(kv.ctx, kv.bucketName, key, minio.StatObjectOptions{})
	if err != nil {
		return 0, err
	}

	return objectInfo.Size, nil
}

293
// Close close the MinIOKV.
Z
zhenshan.cao 已提交
294 295 296
func (kv *MinIOKV) Close() {

}
紫晴 已提交
297

G
godchen 已提交
298
type errorList []error
紫晴 已提交
299

G
godchen 已提交
300 301 302 303 304
func (el errorList) Error() string {
	var builder strings.Builder
	builder.WriteString("All downloads results:\n")
	for index, err := range el {
		builder.WriteString(fmt.Sprintf("downloads #%d:%s\n", index+1, err.Error()))
紫晴 已提交
305
	}
G
godchen 已提交
306
	return builder.String()
紫晴 已提交
307
}