提交 ab9618aa 编写于 作者: D Davies Liu

add Create() to create new bucket

上级 664da7c2
module github.com/juicedata/juicesync
require (
cloud.google.com/go v0.39.0 // indirect
cloud.google.com/go v0.39.0
github.com/Azure/azure-sdk-for-go v11.1.1-beta+incompatible
github.com/Azure/go-autorest v8.4.0+incompatible // indirect
github.com/NetEase-Object-Storage/nos-golang-sdk v0.0.0-20171031020902-cc8892cb2b05
......
......@@ -25,6 +25,11 @@ func (b *wasb) String() string {
return fmt.Sprintf("wasb://%s", b.container.Name)
}
func (b *wasb) Create() error {
_, err := b.container.CreateIfNotExists(&storage.CreateContainerOptions{})
return err
}
func (b *wasb) Head(key string) (*Object, error) {
blob := b.container.GetBlobReference(key)
err := blob.GetProperties(nil)
......
......@@ -27,6 +27,14 @@ func (q *bosclient) String() string {
return fmt.Sprintf("bos://%s", q.bucket)
}
func (q *bosclient) Create() error {
_, err := q.c.PutBucket(q.bucket)
if err != nil && strings.Contains(err.Error(), "BucketAlreadyExists") {
err = nil
}
return err
}
func (q *bosclient) Head(key string) (*Object, error) {
r, err := q.c.GetObjectMeta(q.bucket, key)
if err != nil {
......
......@@ -25,6 +25,16 @@ func (c *COS) String() string {
return fmt.Sprintf("cos://%s", strings.Split(c.endpoint, ".")[0])
}
func (c *COS) Create() error {
_, err := c.c.Bucket.Put(ctx, nil)
if err != nil {
if e, ok := err.(*cos.ErrorResponse); ok && e.Code == "BucketAlreadyOwnedByYou" {
err = nil
}
}
return err
}
func (c *COS) Head(key string) (*Object, error) {
resp, err := c.c.Object.Head(ctx, key, nil)
if err != nil {
......
......@@ -8,19 +8,20 @@ import (
"io"
"log"
"net/url"
"os"
"strings"
"time"
"cloud.google.com/go/compute/metadata"
"golang.org/x/oauth2/google"
storage "google.golang.org/api/storage/v1"
"google.golang.org/api/storage/v1"
)
var ctx = context.Background()
type gs struct {
defaultObjectStorage
service *storage.ObjectsService
service *storage.Service
bucket string
region string
pageToken string
......@@ -30,8 +31,39 @@ func (g *gs) String() string {
return fmt.Sprintf("gs://%s", g.bucket)
}
func (g *gs) Create() error {
// check if the bucket is already exists
if objs, err := g.List("", "", 1); err == nil && len(objs) > 0 {
return nil
}
projectID := os.Getenv("GOOGLE_CLOUD_PROJECT")
if projectID == "" {
projectID, _ = metadata.ProjectID()
}
if projectID == "" {
cred, err := google.FindDefaultCredentials(context.Background())
if err == nil {
projectID = cred.ProjectID
}
}
if projectID == "" {
log.Fatalf("GOOGLE_CLOUD_PROJECT environment variable must be set")
}
_, err := g.service.Buckets.Insert(projectID, &storage.Bucket{
Id: g.bucket,
StorageClass: "regional",
Location: g.region,
}).Do()
if err != nil && strings.Contains(err.Error(), "You already own this bucket") {
return nil
}
return err
}
func (g *gs) Head(key string) (*Object, error) {
req := g.service.Get(g.bucket, key)
req := g.service.Objects.Get(g.bucket, key)
obj, err := req.Do()
if err != nil {
return nil, err
......@@ -47,7 +79,7 @@ func (g *gs) Head(key string) (*Object, error) {
}
func (g *gs) Get(key string, off, limit int64) (io.ReadCloser, error) {
req := g.service.Get(g.bucket, key)
req := g.service.Objects.Get(g.bucket, key)
header := req.Header()
if off > 0 || limit > 0 {
if limit > 0 {
......@@ -65,24 +97,21 @@ func (g *gs) Get(key string, off, limit int64) (io.ReadCloser, error) {
func (g *gs) Put(key string, data io.Reader) error {
obj := &storage.Object{Name: key}
_, err := g.service.Insert(g.bucket, obj).Media(data).Do()
_, err := g.service.Objects.Insert(g.bucket, obj).Media(data).Do()
return err
}
func (g *gs) Copy(dst, src string) error {
_, err := g.service.Copy(g.bucket, src, g.bucket, dst, nil).Do()
_, err := g.service.Objects.Copy(g.bucket, src, g.bucket, dst, nil).Do()
return err
}
func (g *gs) Delete(key string) error {
if _, err := g.Head(key); err != nil {
return err
}
return g.service.Delete(g.bucket, key).Do()
return g.service.Objects.Delete(g.bucket, key).Do()
}
func (g *gs) List(prefix, marker string, limit int64) ([]*Object, error) {
call := g.service.List(g.bucket).Prefix(prefix).MaxResults(limit)
call := g.service.Objects.List(g.bucket).Prefix(prefix).MaxResults(limit)
if marker != "" {
if g.pageToken == "" {
// last page
......@@ -122,7 +151,7 @@ func newGS(endpoint, accessKey, secretKey string) ObjectStorage {
if err != nil {
log.Fatalf("Failed to create service: %v", err)
}
return &gs{service: service.Objects, bucket: bucket, region: region}
return &gs{service: service, bucket: bucket, region: region}
}
func init() {
......
......@@ -10,7 +10,9 @@ import (
"net/url"
"strings"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/session"
awss3 "github.com/aws/aws-sdk-go/service/s3"
"github.com/ks3sdklib/aws-sdk-go/aws"
"github.com/ks3sdklib/aws-sdk-go/aws/credentials"
"github.com/ks3sdklib/aws-sdk-go/service/s3"
......@@ -25,6 +27,20 @@ type ks3 struct {
func (s *ks3) String() string {
return fmt.Sprintf("ks3://%s", s.bucket)
}
func (s *ks3) Create() error {
_, err := s.s3.CreateBucket(&s3.CreateBucketInput{Bucket: &s.bucket})
if err != nil {
if aerr, ok := err.(awserr.Error); ok {
switch aerr.Code() {
case awss3.ErrCodeBucketAlreadyExists:
err = nil
case awss3.ErrCodeBucketAlreadyOwnedByYou:
err = nil
}
}
}
return err
}
func (s *ks3) Head(key string) (*Object, error) {
param := s3.HeadObjectInput{
......
......@@ -44,6 +44,7 @@ type PendingPart struct {
type ObjectStorage interface {
String() string
Create() error
Head(key string) (*Object, error)
Get(key string, off, limit int64) (io.ReadCloser, error)
Put(key string, in io.Reader) error
......
......@@ -43,6 +43,10 @@ func listAll(s ObjectStorage, prefix, marker string, limit int64) ([]*Object, er
}
func testStorage(t *testing.T, s ObjectStorage) {
if err := s.Create(); err != nil {
t.Fatalf("Can't create bucket %s: %s", s, err)
}
s = WithPrefix(s, "unit-test")
defer s.Delete("/test")
k := "/large"
......
......@@ -18,6 +18,7 @@ const obsDefaultRegion = "cn-north-1"
type obsClient struct {
bucket string
region string
c *obs.ObsClient
}
......@@ -25,6 +26,19 @@ func (s *obsClient) String() string {
return fmt.Sprintf("obs://%s", s.bucket)
}
func (s *obsClient) Create() error {
params := &obs.CreateBucketInput{}
params.Bucket = s.bucket
params.Location = s.region
_, err := s.c.CreateBucket(params)
if err != nil {
if obsError, ok := err.(obs.ObsError); ok && obsError.Code == "BucketAlreadyOwnedByYou" {
err = nil
}
}
return err
}
func (s *obsClient) Head(key string) (*Object, error) {
params := &obs.GetObjectMetadataInput{
Bucket: s.bucket,
......@@ -230,6 +244,7 @@ func newOBS(endpoint, accessKey, secretKey string) ObjectStorage {
secretKey = os.Getenv("HWCLOUD_SECRET_KEY")
}
var region string
if len(hostParts) == 1 {
if endpoint, err = autoOBSEndpoint(bucketName, accessKey, secretKey); err != nil {
logger.Fatalf("Cannot get location of bucket %q: %s", bucketName, err)
......@@ -237,13 +252,15 @@ func newOBS(endpoint, accessKey, secretKey string) ObjectStorage {
if !strings.HasPrefix(endpoint, "http") {
endpoint = fmt.Sprintf("%s://%s", uri.Scheme, endpoint)
}
} else {
region = strings.Split(hostParts[1], ".")[1]
}
c, err := obs.New(accessKey, secretKey, endpoint)
if err != nil {
logger.Fatalf("Fail to initialize OBS: %s", err)
}
return &obsClient{bucketName, c}
return &obsClient{bucketName, region, c}
}
func init() {
......
......@@ -30,6 +30,15 @@ func (o *ossClient) String() string {
return fmt.Sprintf("oss://%s", o.bucket.BucketName)
}
func (o *ossClient) Create() error {
err := o.bucket.Client.CreateBucket(o.bucket.BucketName)
// ignore error if bucket is already created
if err != nil && strings.Contains(err.Error(), "BucketAlreadyExists") {
err = nil
}
return err
}
func (o *ossClient) Head(key string) (*Object, error) {
r, err := o.bucket.GetObjectMeta(key)
if err != nil {
......
......@@ -22,6 +22,10 @@ func (p *withPrefix) String() string {
return fmt.Sprintf("%s/%s", p.os, p.prefix)
}
func (p *withPrefix) Create() error {
return p.os.Create()
}
func (p *withPrefix) Head(key string) (*Object, error) {
obj, err := p.os.Head(p.prefix + key)
if err != nil {
......
......@@ -24,6 +24,14 @@ func (q *qingstor) String() string {
return fmt.Sprintf("qingstor://%s", *q.bucket.Properties.BucketName)
}
func (q *qingstor) Create() error {
_, err := q.bucket.Put()
if err != nil && strings.Contains(err.Error(), "bucket_already_exists") {
err = nil
}
return err
}
func (q *qingstor) Head(key string) (*Object, error) {
r, err := q.bucket.HeadObject(key, nil)
if err != nil {
......
......@@ -30,6 +30,21 @@ func (s *s3client) String() string {
return fmt.Sprintf("s3://%s", s.bucket)
}
func (s *s3client) Create() error {
_, err := s.s3.CreateBucket(&s3.CreateBucketInput{Bucket: &s.bucket})
if err != nil {
if aerr, ok := err.(awserr.Error); ok {
switch aerr.Code() {
case s3.ErrCodeBucketAlreadyExists:
err = nil
case s3.ErrCodeBucketAlreadyOwnedByYou:
err = nil
}
}
}
return err
}
func (s *s3client) Head(key string) (*Object, error) {
param := s3.HeadObjectInput{
Bucket: &s.bucket,
......@@ -264,8 +279,8 @@ func newS3(endpoint, accessKey, secretKey string) ObjectStorage {
var (
bucketName string
region string
ep string
region string
ep string
)
if len(hostParts) == 1 { // take endpoint as bucketname
......
......@@ -8,6 +8,7 @@ import (
"strings"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
......@@ -21,6 +22,24 @@ func (s *space) String() string {
return fmt.Sprintf("space://%s", s.s3client.bucket)
}
func (s *space) Create() error {
if _, err := s.List("", "", 1); err == nil {
return nil
}
_, err := s.s3.CreateBucket(&s3.CreateBucketInput{Bucket: &s.bucket})
if err != nil {
if aerr, ok := err.(awserr.Error); ok {
switch aerr.Code() {
case s3.ErrCodeBucketAlreadyExists:
err = nil
case s3.ErrCodeBucketAlreadyOwnedByYou:
err = nil
}
}
}
return err
}
func newSpace(endpoint, accessKey, secretKey string) ObjectStorage {
uri, _ := url.ParseRequestURI(endpoint)
ssl := strings.ToLower(uri.Scheme) == "https"
......
......@@ -7,6 +7,7 @@ import (
"crypto/hmac"
"crypto/sha1"
"encoding/base64"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
......@@ -52,6 +53,53 @@ func ufileSigner(req *http.Request, accessKey, secretKey, signName string) {
req.Header.Add("Authorization", token)
}
func (u *ufile) Create() error {
uri, _ := url.ParseRequestURI(u.endpoint)
parts := strings.Split(uri.Host, ".")
name := parts[0]
region := parts[1] // www.cn-bj.ufileos.com
if region == "ufile" {
region = parts[2] // www.ufile.cn-north-02.ucloud.cn
}
if strings.HasPrefix(region, "internal") {
// www.internal-hk-01.ufileos.cn
// www.internal-cn-gd-02.ufileos.cn
ps := strings.Split(region, "-")
region = strings.Join(ps[1:len(ps)-1], "-")
}
query := url.Values{}
query.Add("Action", "CreateBucket")
query.Add("BucketName", name)
query.Add("PublicKey", u.accessKey)
query.Add("Region", region)
// generate signature
toSign := fmt.Sprintf("ActionCreateBucketBucketName%sPublicKey%sRegion%s",
name, u.accessKey, region)
h := sha1.New()
h.Write([]byte(toSign))
h.Write([]byte(u.secretKey))
sig := hex.EncodeToString(h.Sum(nil))
query.Add("Signature", sig)
req, err := http.NewRequest("GET", "https://api.ucloud.cn/?"+query.Encode(), nil)
if err != nil {
return err
}
resp, err := httpClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
err = parseError(resp)
if strings.Contains(err.Error(), "duplicate bucket name") ||
strings.Contains(err.Error(), "CreateBucketResponse") {
err = nil
}
return err
}
func (u *ufile) parseResp(resp *http.Response, out interface{}) error {
defer resp.Body.Close()
if resp.ContentLength <= 0 || resp.ContentLength > (1<<31) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册