From 9fac1476f3e4b061b932ef5328eda178926d175f Mon Sep 17 00:00:00 2001 From: shaoyue Date: Tue, 1 Nov 2022 11:07:35 +0800 Subject: [PATCH] Add support for GCS(GoogleCloudStorage) with IAM (#20164) go mod tidy Signed-off-by: shaoyue.chen Signed-off-by: shaoyue.chen --- configs/milvus.yaml | 16 +++- go.mod | 6 +- internal/storage/factory.go | 1 + internal/storage/gcp/gcp.go | 78 ++++++++++++++++ internal/storage/gcp/gcp_test.go | 92 +++++++++++++++++++ internal/storage/minio_chunk_manager.go | 25 +++-- internal/storage/minio_chunk_manager_test.go | 1 + internal/storage/options.go | 7 ++ internal/util/indexcgowrapper/index.go | 3 +- internal/util/paramtable/base_table.go | 1 + internal/util/paramtable/service_param.go | 29 +++++- .../util/paramtable/service_param_test.go | 2 + 12 files changed, 244 insertions(+), 17 deletions(-) create mode 100644 internal/storage/gcp/gcp.go create mode 100644 internal/storage/gcp/gcp_test.go diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 6b4abec0a..e43df797d 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -67,7 +67,8 @@ mysql: localStorage: path: /var/lib/milvus/data/ -# Related configuration of minio, which is responsible for data persistence for Milvus. +# Related configuration of MinIO/S3/GCS or any other service supports S3 API, which is responsible for data persistence for Milvus. +# We refer to the storage service as MinIO/S3 in the following description for simplicity. minio: address: localhost # Address of MinIO/S3 port: 9000 # Port of MinIO/S3 @@ -76,10 +77,17 @@ minio: useSSL: false # Access to MinIO/S3 with SSL bucketName: "a-bucket" # Bucket name in MinIO/S3 rootPath: files # The root path where the message is stored in MinIO/S3 - # Whether to use AWS IAM role to access S3 instead of access/secret keys - # For more infomation, refer to https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_use.html + # Whether to use IAM role to access S3/GCS instead of access/secret keys + # For more infomation, refer to + # aws: https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_use.html + # gcp: https://cloud.google.com/storage/docs/access-control/iam useIAM: false - # Custom endpoint for fetch IAM role credentials. + # Cloud Provider of S3. Supports: "aws", "gcp". + # You can use "aws" for other cloud provider supports S3 API with signature v4, e.g.: minio + # You can use "gcp" for other cloud provider supports S3 API with signature v2 + # When `useIAM` enabled, only "aws" & "gcp" is supported for now + cloudProvider: "aws" + # Custom endpoint for fetch IAM role credentials. when useIAM is true & cloudProvider is "aws". # Leave it empty if you want to use AWS default endpoint iamEndpoint: "" diff --git a/go.mod b/go.mod index 3555ba685..ad332718c 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( github.com/antonmedv/expr v1.8.9 github.com/apache/arrow/go/v8 v8.0.0-20220322092137-778b1772fd20 github.com/apache/pulsar-client-go v0.6.1-0.20210728062540-29414db801a7 - github.com/apache/thrift v0.15.0 + github.com/apache/thrift v0.15.0 // indirect github.com/bits-and-blooms/bloom/v3 v3.0.1 github.com/casbin/casbin/v2 v2.44.2 github.com/casbin/json-adapter/v2 v2.0.0 @@ -20,7 +20,6 @@ require ( github.com/gin-gonic/gin v1.7.7 github.com/go-basic/ipv4 v1.0.0 github.com/gofrs/flock v0.8.1 - github.com/golang/mock v1.5.0 github.com/golang/protobuf v1.5.2 github.com/google/btree v1.0.1 github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 @@ -65,6 +64,7 @@ require ( ) require ( + cloud.google.com/go v0.81.0 // indirect github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 // indirect github.com/AthenZ/athenz v1.10.15 // indirect github.com/DataDog/zstd v1.4.6-0.20210211175136-c6db21d202f4 // indirect @@ -178,7 +178,7 @@ require ( go.uber.org/multierr v1.6.0 // indirect golang.org/x/mod v0.6.0-dev.0.20211013180041-c96bc1413d57 // indirect golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd // indirect - golang.org/x/oauth2 v0.0.0-20210402161424-2e8d93401602 // indirect + golang.org/x/oauth2 v0.0.0-20210402161424-2e8d93401602 golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect golang.org/x/text v0.3.7 // indirect diff --git a/internal/storage/factory.go b/internal/storage/factory.go index 81369118d..ceedeed10 100644 --- a/internal/storage/factory.go +++ b/internal/storage/factory.go @@ -24,6 +24,7 @@ func NewChunkManagerFactoryWithParam(params *paramtable.ComponentParam) *ChunkMa UseSSL(params.MinioCfg.UseSSL), BucketName(params.MinioCfg.BucketName), UseIAM(params.MinioCfg.UseIAM), + CloudProvider(params.MinioCfg.CloudProvider), IAMEndpoint(params.MinioCfg.IAMEndpoint), CreateBucket(true)) } diff --git a/internal/storage/gcp/gcp.go b/internal/storage/gcp/gcp.go new file mode 100644 index 000000000..e849adcef --- /dev/null +++ b/internal/storage/gcp/gcp.go @@ -0,0 +1,78 @@ +package gcp + +import ( + "net/http" + "strings" + + "github.com/minio/minio-go/v7" + "github.com/minio/minio-go/v7/pkg/credentials" + "github.com/pkg/errors" + "golang.org/x/oauth2" + "golang.org/x/oauth2/google" +) + +// WrapHTTPTransport wraps http.Transport, add an auth header to support GCP native auth +type WrapHTTPTransport struct { + tokenSrc oauth2.TokenSource + backend transport +} + +// transport abstracts http.Transport to simplify test +type transport interface { + RoundTrip(req *http.Request) (*http.Response, error) +} + +// NewWrapHTTPTransport constructs a new WrapHTTPTransport +func NewWrapHTTPTransport(secure bool) (*WrapHTTPTransport, error) { + tokenSrc := google.ComputeTokenSource("") + // in fact never return err + backend, err := minio.DefaultTransport(secure) + if err != nil { + return nil, errors.Wrap(err, "failed to create default transport") + } + return &WrapHTTPTransport{ + tokenSrc: tokenSrc, + backend: backend, + }, nil +} + +// RoundTrip implements http.RoundTripper +func (t *WrapHTTPTransport) RoundTrip(req *http.Request) (*http.Response, error) { + token, err := t.tokenSrc.Token() + if err != nil { + return nil, errors.Wrap(err, "failed to acquire token") + } + req.Header.Set("Authorization", "Bearer "+token.AccessToken) + return t.backend.RoundTrip(req) +} + +const GcsDefaultAddress = "storage.googleapis.com" + +// NewMinioClient returns a minio.Client which is compatible for GCS +func NewMinioClient(address string, opts *minio.Options) (*minio.Client, error) { + if opts == nil { + opts = &minio.Options{} + } + if address == "" { + address = GcsDefaultAddress + opts.Secure = true + } + + // adhoc to remove port of gcs address to let minio-go know it's gcs + if strings.Contains(address, GcsDefaultAddress) { + address = GcsDefaultAddress + } + + if opts.Creds != nil { + // if creds is set, use it directly + return minio.New(address, opts) + } + // opts.Creds == nil, assume using IAM + transport, err := NewWrapHTTPTransport(opts.Secure) + if err != nil { + return nil, errors.Wrap(err, "failed to create default transport") + } + opts.Transport = transport + opts.Creds = credentials.NewStaticV2("", "", "") + return minio.New(address, opts) +} diff --git a/internal/storage/gcp/gcp_test.go b/internal/storage/gcp/gcp_test.go new file mode 100644 index 000000000..e22ab72d4 --- /dev/null +++ b/internal/storage/gcp/gcp_test.go @@ -0,0 +1,92 @@ +package gcp + +import ( + "errors" + "net/http" + "testing" + + "github.com/minio/minio-go/v7" + "github.com/minio/minio-go/v7/pkg/credentials" + "github.com/stretchr/testify/assert" + "golang.org/x/oauth2" +) + +func TestNewMinioClient(t *testing.T) { + t.Run("iam ok", func(t *testing.T) { + minioCli, err := NewMinioClient("", nil) + assert.NoError(t, err) + assert.Equal(t, GcsDefaultAddress, minioCli.EndpointURL().Host) + assert.Equal(t, "https", minioCli.EndpointURL().Scheme) + }) + + t.Run("ak sk ok", func(t *testing.T) { + minioCli, err := NewMinioClient(GcsDefaultAddress+":443", &minio.Options{ + Creds: credentials.NewStaticV2("ak", "sk", ""), + Secure: true, + }) + assert.NoError(t, err) + assert.Equal(t, GcsDefaultAddress, minioCli.EndpointURL().Host) + assert.Equal(t, "https", minioCli.EndpointURL().Scheme) + }) + + t.Run("create failed", func(t *testing.T) { + defaultTransBak := minio.DefaultTransport + defer func() { + minio.DefaultTransport = defaultTransBak + }() + minio.DefaultTransport = func(secure bool) (*http.Transport, error) { + return nil, errors.New("mock error") + } + _, err := NewMinioClient("", nil) + assert.Error(t, err) + }) +} + +type mockTransport struct { + err error +} + +func (m *mockTransport) RoundTrip(req *http.Request) (*http.Response, error) { + return nil, m.err +} + +type mockTokenSource struct { + token string + err error +} + +func (m *mockTokenSource) Token() (*oauth2.Token, error) { + return &oauth2.Token{AccessToken: m.token}, m.err +} + +func TestGCPWrappedHTTPTransport_RoundTrip(t *testing.T) { + ts, err := NewWrapHTTPTransport(true) + assert.NoError(t, err) + ts.backend = &mockTransport{} + ts.tokenSrc = &mockTokenSource{token: "mocktoken"} + + t.Run("ok", func(t *testing.T) { + req, err := http.NewRequest("GET", "http://example.com", nil) + assert.NoError(t, err) + _, err = ts.RoundTrip(req) + assert.NoError(t, err) + assert.Equal(t, "Bearer mocktoken", req.Header.Get("Authorization")) + }) + + t.Run("get token failed", func(t *testing.T) { + ts.tokenSrc = &mockTokenSource{err: errors.New("mock error")} + req, err := http.NewRequest("GET", "http://example.com", nil) + assert.NoError(t, err) + _, err = ts.RoundTrip(req) + assert.Error(t, err) + }) + + t.Run("call failed", func(t *testing.T) { + ts.backend = &mockTransport{err: errors.New("mock error")} + req, err := http.NewRequest("GET", "http://example.com", nil) + assert.NoError(t, err) + _, err = ts.RoundTrip(req) + assert.Error(t, err) + }) + +} diff --git a/internal/storage/minio_chunk_manager.go b/internal/storage/minio_chunk_manager.go index 633d98ada..c38c677dd 100644 --- a/internal/storage/minio_chunk_manager.go +++ b/internal/storage/minio_chunk_manager.go @@ -28,7 +28,9 @@ import ( "time" "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/storage/gcp" "github.com/milvus-io/milvus/internal/util/errorutil" + "github.com/milvus-io/milvus/internal/util/paramtable" "github.com/milvus-io/milvus/internal/util/retry" "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" @@ -70,15 +72,26 @@ func NewMinioChunkManager(ctx context.Context, opts ...Option) (*MinioChunkManag func newMinioChunkManagerWithConfig(ctx context.Context, c *config) (*MinioChunkManager, error) { var creds *credentials.Credentials - if c.useIAM { - creds = credentials.NewIAM(c.iamEndpoint) - } else { - creds = credentials.NewStaticV4(c.accessKeyID, c.secretAccessKeyID, "") + var newMinioFn = minio.New + + switch c.cloudProvider { + case paramtable.CloudProviderGCP: + newMinioFn = gcp.NewMinioClient + if !c.useIAM { + creds = credentials.NewStaticV2(c.accessKeyID, c.secretAccessKeyID, "") + } + default: // aws, minio + if c.useIAM { + creds = credentials.NewIAM("") + } else { + creds = credentials.NewStaticV4(c.accessKeyID, c.secretAccessKeyID, "") + } } - minIOClient, err := minio.New(c.address, &minio.Options{ + minioOpts := &minio.Options{ Creds: creds, Secure: c.useSSL, - }) + } + minIOClient, err := newMinioFn(c.address, minioOpts) // options nil or invalid formatted endpoint, don't need to retry if err != nil { return nil, err diff --git a/internal/storage/minio_chunk_manager_test.go b/internal/storage/minio_chunk_manager_test.go index 549243c9f..f16e5705d 100644 --- a/internal/storage/minio_chunk_manager_test.go +++ b/internal/storage/minio_chunk_manager_test.go @@ -44,6 +44,7 @@ func newMinIOChunkManager(ctx context.Context, bucketName string, rootPath strin UseSSL(useSSL), BucketName(bucketName), UseIAM(false), + CloudProvider("aws"), IAMEndpoint(""), CreateBucket(true), ) diff --git a/internal/storage/options.go b/internal/storage/options.go index 432c7b688..8852dd2a9 100644 --- a/internal/storage/options.go +++ b/internal/storage/options.go @@ -10,6 +10,7 @@ type config struct { createBucket bool rootPath string useIAM bool + cloudProvider string iamEndpoint string } @@ -67,6 +68,12 @@ func UseIAM(useIAM bool) Option { } } +func CloudProvider(cloudProvider string) Option { + return func(c *config) { + c.cloudProvider = cloudProvider + } +} + func IAMEndpoint(iamEndpoint string) Option { return func(c *config) { c.iamEndpoint = iamEndpoint diff --git a/internal/util/indexcgowrapper/index.go b/internal/util/indexcgowrapper/index.go index 1f672f651..28d348bd4 100644 --- a/internal/util/indexcgowrapper/index.go +++ b/internal/util/indexcgowrapper/index.go @@ -9,11 +9,12 @@ package indexcgowrapper import "C" import ( "fmt" - "github.com/milvus-io/milvus/internal/proto/indexpb" "path/filepath" "runtime" "unsafe" + "github.com/milvus-io/milvus/internal/proto/indexpb" + "github.com/golang/protobuf/proto" "github.com/milvus-io/milvus-proto/go-api/commonpb" diff --git a/internal/util/paramtable/base_table.go b/internal/util/paramtable/base_table.go index b24357ab4..66e542c42 100644 --- a/internal/util/paramtable/base_table.go +++ b/internal/util/paramtable/base_table.go @@ -41,6 +41,7 @@ const ( DefaultMinioUseSSL = "false" DefaultMinioBucketName = "a-bucket" DefaultMinioUseIAM = "false" + DefaultMinioCloudProvider = "aws" DefaultMinioIAMEndpoint = "" DefaultEtcdEndpoints = "localhost:2379" DefaultInsertBufferSize = "16777216" diff --git a/internal/util/paramtable/service_param.go b/internal/util/paramtable/service_param.go index 62c4982dc..aad73e4e0 100644 --- a/internal/util/paramtable/service_param.go +++ b/internal/util/paramtable/service_param.go @@ -446,6 +446,7 @@ type MinioConfig struct { BucketName string RootPath string UseIAM bool + CloudProvider string IAMEndpoint string } @@ -459,6 +460,7 @@ func (p *MinioConfig) init(base *BaseTable) { p.initBucketName() p.initRootPath() p.initUseIAM() + p.initCloudProvider() p.initIAMEndpoint() } @@ -518,10 +520,31 @@ func (p *MinioConfig) initRootPath() { func (p *MinioConfig) initUseIAM() { useIAM := p.Base.LoadWithDefault("minio.useIAM", DefaultMinioUseIAM) - p.UseIAM, _ = strconv.ParseBool(useIAM) + var err error + p.UseIAM, err = strconv.ParseBool(useIAM) + if err != nil { + panic("parse bool useIAM:" + err.Error()) + } +} + +// CloudProvider supported +const ( + CloudProviderAWS = "aws" + CloudProviderGCP = "gcp" +) + +var supportedCloudProvider = map[string]bool{ + CloudProviderAWS: true, + CloudProviderGCP: true, +} + +func (p *MinioConfig) initCloudProvider() { + p.CloudProvider = p.Base.LoadWithDefault("minio.cloudProvider", DefaultMinioCloudProvider) + if !supportedCloudProvider[p.CloudProvider] { + panic("unsupported cloudProvider:" + p.CloudProvider) + } } func (p *MinioConfig) initIAMEndpoint() { - iamEndpoint := p.Base.LoadWithDefault("minio.iamEndpoint", DefaultMinioIAMEndpoint) - p.IAMEndpoint = iamEndpoint + p.IAMEndpoint = p.Base.LoadWithDefault("minio.iamEndpoint", DefaultMinioIAMEndpoint) } diff --git a/internal/util/paramtable/service_param_test.go b/internal/util/paramtable/service_param_test.go index e95634e1f..dd8695187 100644 --- a/internal/util/paramtable/service_param_test.go +++ b/internal/util/paramtable/service_param_test.go @@ -128,6 +128,8 @@ func TestServiceParam(t *testing.T) { assert.Equal(t, Params.UseIAM, false) + assert.Equal(t, Params.CloudProvider, "aws") + assert.Equal(t, Params.IAMEndpoint, "") t.Logf("Minio BucketName = %s", Params.BucketName) -- GitLab