// Copyright (C) 2018-present Juicedata Inc. package object import ( "bytes" "fmt" "io" "io/ioutil" "net/url" "strings" "github.com/IBM/ibm-cos-sdk-go/aws" "github.com/IBM/ibm-cos-sdk-go/aws/awserr" "github.com/IBM/ibm-cos-sdk-go/aws/credentials/ibmiam" "github.com/IBM/ibm-cos-sdk-go/aws/session" "github.com/IBM/ibm-cos-sdk-go/service/s3" ) type ibmcos struct { bucket string s3 *s3.S3 } func (s *ibmcos) String() string { return fmt.Sprintf("ibmcos://%s", s.bucket) } func (s *ibmcos) Create() error { _, err := s.s3.CreateBucket(&s3.CreateBucketInput{Bucket: &s.bucket}) if err != nil { if aerr, ok := err.(awserr.Error); ok { switch aerr.Code() { case s3.ErrCodeBucketAlreadyExists: err = nil case s3.ErrCodeBucketAlreadyOwnedByYou: err = nil } } } return err } func (s *ibmcos) Get(key string, off, limit int64) (io.ReadCloser, error) { params := &s3.GetObjectInput{Bucket: &s.bucket, Key: &key} if off > 0 || limit > 0 { var r string if limit > 0 { r = fmt.Sprintf("bytes=%d-%d", off, off+limit-1) } else { r = fmt.Sprintf("bytes=%d-", off) } params.Range = &r } resp, err := s.s3.GetObject(params) if err != nil { return nil, err } return resp.Body, nil } func (s *ibmcos) Put(key string, in io.Reader) error { var body io.ReadSeeker if b, ok := in.(io.ReadSeeker); ok { body = b } else { data, err := ioutil.ReadAll(in) if err != nil { return err } body = bytes.NewReader(data) } params := &s3.PutObjectInput{ Bucket: &s.bucket, Key: &key, Body: body, } _, err := s.s3.PutObject(params) return err } func (s *ibmcos) Copy(dst, src string) error { src = s.bucket + "/" + src params := &s3.CopyObjectInput{ Bucket: &s.bucket, Key: &dst, CopySource: &src, } _, err := s.s3.CopyObject(params) return err } func (s *ibmcos) Head(key string) (*Object, error) { param := s3.HeadObjectInput{ Bucket: &s.bucket, Key: &key, } r, err := s.s3.HeadObject(¶m) if err != nil { return nil, err } return &Object{ key, *r.ContentLength, *r.LastModified, strings.HasSuffix(key, "/"), }, nil } func (s *ibmcos) Delete(key string) error { param := s3.DeleteObjectInput{ Bucket: &s.bucket, Key: &key, } _, err := s.s3.DeleteObject(¶m) return err } func (s *ibmcos) List(prefix, marker string, limit int64) ([]*Object, error) { param := s3.ListObjectsInput{ Bucket: &s.bucket, Prefix: &prefix, Marker: &marker, MaxKeys: &limit, } resp, err := s.s3.ListObjects(¶m) if err != nil { return nil, err } n := len(resp.Contents) objs := make([]*Object, n) for i := 0; i < n; i++ { o := resp.Contents[i] objs[i] = &Object{*o.Key, *o.Size, *o.LastModified, strings.HasSuffix(*o.Key, "/")} } return objs, nil } func (s *ibmcos) ListAll(prefix, marker string) (<-chan *Object, error) { return nil, notSupported } func (s *ibmcos) CreateMultipartUpload(key string) (*MultipartUpload, error) { params := &s3.CreateMultipartUploadInput{ Bucket: &s.bucket, Key: &key, } resp, err := s.s3.CreateMultipartUpload(params) if err != nil { return nil, err } return &MultipartUpload{UploadID: *resp.UploadId, MinPartSize: 5 << 20, MaxCount: 10000}, nil } func (s *ibmcos) UploadPart(key string, uploadID string, num int, body []byte) (*Part, error) { n := int64(num) params := &s3.UploadPartInput{ Bucket: &s.bucket, Key: &key, UploadId: &uploadID, Body: bytes.NewReader(body), PartNumber: &n, } resp, err := s.s3.UploadPart(params) if err != nil { return nil, err } return &Part{Num: num, ETag: *resp.ETag}, nil } func (s *ibmcos) AbortUpload(key string, uploadID string) { params := &s3.AbortMultipartUploadInput{ Bucket: &s.bucket, Key: &key, UploadId: &uploadID, } s.s3.AbortMultipartUpload(params) } func (s *ibmcos) CompleteUpload(key string, uploadID string, parts []*Part) error { var s3Parts []*s3.CompletedPart for i := range parts { n := new(int64) *n = int64(parts[i].Num) s3Parts = append(s3Parts, &s3.CompletedPart{ETag: &parts[i].ETag, PartNumber: n}) } params := &s3.CompleteMultipartUploadInput{ Bucket: &s.bucket, Key: &key, UploadId: &uploadID, MultipartUpload: &s3.CompletedMultipartUpload{Parts: s3Parts}, } _, err := s.s3.CompleteMultipartUpload(params) return err } func (s *ibmcos) ListUploads(marker string) ([]*PendingPart, string, error) { input := &s3.ListMultipartUploadsInput{ Bucket: aws.String(s.bucket), KeyMarker: aws.String(marker), } // FIXME: parsing time "2018-08-23T12:23:26.046+08:00" as "2006-01-02T15:04:05Z" result, err := s.s3.ListMultipartUploads(input) if err != nil { return nil, "", err } parts := make([]*PendingPart, len(result.Uploads)) for i, u := range result.Uploads { parts[i] = &PendingPart{*u.Key, *u.UploadId, *u.Initiated} } var nextMarker string if result.NextKeyMarker != nil { nextMarker = *result.NextKeyMarker } return parts, nextMarker, nil } func newIBMCOS(endpoint, apiKey, serviceInstanceID string) (ObjectStorage, error) { uri, _ := url.ParseRequestURI(endpoint) hostParts := strings.Split(uri.Host, ".") bucket := hostParts[0] region := hostParts[2] authEndpoint := "https://iam.cloud.ibm.com/identity/token" serviceEndpoint := "https://" + strings.SplitN(uri.Host, ".", 2)[1] conf := aws.NewConfig(). WithRegion(region). WithEndpoint(serviceEndpoint). WithCredentials(ibmiam.NewStaticCredentials(aws.NewConfig(), authEndpoint, apiKey, serviceInstanceID)). WithS3ForcePathStyle(true) sess := session.Must(session.NewSession()) client := s3.New(sess, conf) return &ibmcos{bucket, client}, nil } func init() { Register("ibmcos", newIBMCOS) }