diff --git a/main.go b/main.go index 87dcf68c57294d7578bce6fedea99de07cb29d1d..6a1f218cc9832a9989807c2cbeca6eac55737812 100644 --- a/main.go +++ b/main.go @@ -37,7 +37,7 @@ func supportHTTPS(name, endpoint string) bool { } } -func createStorage(uri string, conf *config.Config) object.ObjectStorage { +func createStorage(uri string, conf *config.Config) (object.ObjectStorage, error) { if !strings.Contains(uri, "://") { if strings.Contains(uri, ":") { var user string @@ -91,9 +91,9 @@ func createStorage(uri string, conf *config.Config) object.ObjectStorage { endpoint = "http://" + endpoint } - store := object.CreateStorage(name, endpoint, accessKey, secretKey) - if store == nil { - logger.Fatalf("Invalid storage type: %s", u.Scheme) + store, err := object.CreateStorage(name, endpoint, accessKey, secretKey) + if err != nil { + return nil, fmt.Errorf("create %s %s: %s", name, endpoint, err) } if conf.Perms { if _, ok := store.(object.FileSystem); !ok { @@ -102,9 +102,9 @@ func createStorage(uri string, conf *config.Config) object.ObjectStorage { } } if name != "file" && len(u.Path) > 1 { - store = object.WithPrefix(store, u.Path[1:]) + store, _ = object.WithPrefix(store, u.Path[1:]) } - return store + return store, nil } func run(c *cli.Context) error { @@ -121,8 +121,14 @@ func run(c *cli.Context) error { if strings.HasSuffix(c.Args().Get(0), "/") != strings.HasSuffix(c.Args().Get(1), "/") { logger.Fatalf("SRC and DST should both end with '/' or not!") } - src := createStorage(c.Args().Get(0), config) - dst := createStorage(c.Args().Get(1), config) + src, err := createStorage(c.Args().Get(0), config) + if err != nil { + return err + } + dst, err := createStorage(c.Args().Get(1), config) + if err != nil { + return err + } return sync.Sync(src, dst, config) } diff --git a/object/azure.go b/object/azure.go index e053b9a3244a53b9af48a4958cb945cd430ae54a..a4f521763aac3362b0548ef6df8304dfec8f6c33 100644 --- a/object/azure.go +++ b/object/azure.go @@ -16,7 +16,7 @@ import ( ) type wasb struct { - defaultObjectStorage + DefaultObjectStorage container *storage.Container marker string } @@ -137,10 +137,10 @@ func autoWasbEndpoint(containerName, accountName, accountKey string, useHTTPS bo return endpoint, nil } -func newWabs(endpoint, accountName, accountKey string) ObjectStorage { +func newWabs(endpoint, accountName, accountKey string) (ObjectStorage, error) { uri, err := url.ParseRequestURI(endpoint) if err != nil { - log.Fatalf("Invalid endpoint: %v, error: %v", endpoint, err) + return nil, fmt.Errorf("Invalid endpoint: %v, error: %v", endpoint, err) } hostParts := strings.SplitN(uri.Host, ".", 2) @@ -155,7 +155,7 @@ func newWabs(endpoint, accountName, accountKey string) ObjectStorage { } parts := strings.SplitN(item, "=", 2) if len(parts) != 2 { - logger.Fatalf("Invalid connection string item: %s", item) + return nil, fmt.Errorf("Invalid connection string item: %s", item) } // Arguments from command line take precedence if parts[0] == "DefaultEndpointsProtocol" && scheme == "" { @@ -178,7 +178,7 @@ func newWabs(endpoint, accountName, accountKey string) ObjectStorage { domain = hostParts[1] } else if domain == "" { if domain, err = autoWasbEndpoint(name, accountName, accountKey, scheme == "https"); err != nil { - logger.Fatalf("Unable to get endpoint of container %s: %s", name, err) + return nil, fmt.Errorf("Unable to get endpoint of container %s: %s", name, err) } } @@ -188,9 +188,9 @@ func newWabs(endpoint, accountName, accountKey string) ObjectStorage { } service := client.GetBlobService() container := service.GetContainerReference(name) - return &wasb{container: container} + return &wasb{container: container}, nil } func init() { - register("wasb", newWabs) + Register("wasb", newWabs) } diff --git a/object/b2.go b/object/b2.go index 549b972dbf5dbaa295e42b10ea17a05434ca2273..115b1a7e4bac7367d29bdb5b7d1815b1c2a2bccb 100644 --- a/object/b2.go +++ b/object/b2.go @@ -5,7 +5,6 @@ package object import ( "fmt" "io" - "log" "net/url" "strings" @@ -13,7 +12,7 @@ import ( ) type b2client struct { - defaultObjectStorage + DefaultObjectStorage client *b2.Client bucket *b2.Bucket cursor *b2.Cursor @@ -103,16 +102,16 @@ func (c *b2client) List(prefix, marker string, limit int64) ([]*Object, error) { return objs, nil } -func newB2(endpoint, account, key string) ObjectStorage { +func newB2(endpoint, account, key string) (ObjectStorage, error) { uri, err := url.ParseRequestURI(endpoint) if err != nil { - logger.Fatalf("Invalid endpoint: %v, error: %v", endpoint, err) + return nil, fmt.Errorf("Invalid endpoint: %v, error: %v", endpoint, err) } hostParts := strings.Split(uri.Host, ".") bucketName := hostParts[0] client, err := b2.NewClient(ctx, account, key, b2.Transport(httpClient.Transport)) if err != nil { - log.Fatalf("Failed to create client: %v", err) + return nil, fmt.Errorf("Failed to create client: %v", err) } bucket, err := client.Bucket(ctx, bucketName) if err != nil { @@ -120,12 +119,12 @@ func newB2(endpoint, account, key string) ObjectStorage { Type: "allPrivate", }) if err != nil { - log.Fatalf("Failed to create bucket: %v", err) + return nil, fmt.Errorf("Failed to create bucket: %v", err) } } - return &b2client{client: client, bucket: bucket} + return &b2client{client: client, bucket: bucket}, nil } func init() { - register("b2", newB2) + Register("b2", newB2) } diff --git a/object/bos.go b/object/bos.go index be0efd6dabf8ea6794c7eade4b83b416a7626d67..f5ff483c9a169921dee091585ccbe673d2b781df 100644 --- a/object/bos.go +++ b/object/bos.go @@ -18,7 +18,7 @@ import ( const bosDefaultRegion = "bj" type bosclient struct { - defaultObjectStorage + DefaultObjectStorage bucket string c *bos.Client } @@ -177,10 +177,10 @@ func autoBOSEndpoint(bucketName, accessKey, secretKey string) (string, error) { } } -func newBOS(endpoint, accessKey, secretKey string) ObjectStorage { +func newBOS(endpoint, accessKey, secretKey string) (ObjectStorage, error) { uri, err := url.ParseRequestURI(endpoint) if err != nil { - logger.Fatalf("Invalid endpoint: %v, error: %v", endpoint, err) + return nil, fmt.Errorf("Invalid endpoint: %v, error: %v", endpoint, err) } hostParts := strings.SplitN(uri.Host, ".", 2) bucketName := hostParts[0] @@ -195,7 +195,7 @@ func newBOS(endpoint, accessKey, secretKey string) ObjectStorage { if len(hostParts) == 1 { if endpoint, err = autoBOSEndpoint(bucketName, accessKey, secretKey); err != nil { - logger.Fatalf("Fail to get location of bucket %q: %s", bucketName, err) + return nil, fmt.Errorf("Fail to get location of bucket %q: %s", bucketName, err) } if !strings.HasPrefix(endpoint, "http") { endpoint = fmt.Sprintf("%s://%s", uri.Scheme, endpoint) @@ -204,9 +204,9 @@ func newBOS(endpoint, accessKey, secretKey string) ObjectStorage { } bosClient, err := bos.NewClient(accessKey, secretKey, endpoint) - return &bosclient{bucket: bucketName, c: bosClient} + return &bosclient{bucket: bucketName, c: bosClient}, nil } func init() { - register("bos", newBOS) + Register("bos", newBOS) } diff --git a/object/cos.go b/object/cos.go index b98a4702b7b88d31255f3e541de9596e7f97e0a9..d11681a489bb5dbd34ce88868e5652d64b9bbf58 100644 --- a/object/cos.go +++ b/object/cos.go @@ -189,10 +189,10 @@ func autoCOSEndpoint(bucketName, accessKey, secretKey string) (string, error) { return "", fmt.Errorf("bucket %q doesnot exist", bucketName) } -func newCOS(endpoint, accessKey, secretKey string) ObjectStorage { +func newCOS(endpoint, accessKey, secretKey string) (ObjectStorage, error) { uri, err := url.ParseRequestURI(endpoint) if err != nil { - logger.Fatalf("Invalid endpoint %s: %s", endpoint, err) + return nil, fmt.Errorf("Invalid endpoint %s: %s", endpoint, err) } hostParts := strings.SplitN(uri.Host, ".", 2) @@ -203,10 +203,10 @@ func newCOS(endpoint, accessKey, secretKey string) ObjectStorage { if len(hostParts) == 1 { if endpoint, err = autoCOSEndpoint(hostParts[0], accessKey, secretKey); err != nil { - logger.Fatalf("Unable to get endpoint of bucket %s: %s", hostParts[0], err) + return nil, fmt.Errorf("Unable to get endpoint of bucket %s: %s", hostParts[0], err) } if uri, err = url.ParseRequestURI(endpoint); err != nil { - logger.Fatalf("Invalid endpoint %s: %s", endpoint, err) + return nil, fmt.Errorf("Invalid endpoint %s: %s", endpoint, err) } logger.Debugf("Use endpoint %q", endpoint) } @@ -219,9 +219,9 @@ func newCOS(endpoint, accessKey, secretKey string) ObjectStorage { }, }) client.UserAgent = UserAgent - return &COS{client, uri.Host} + return &COS{client, uri.Host}, nil } func init() { - register("cos", newCOS) + Register("cos", newCOS) } diff --git a/object/file.go b/object/file.go index b18e0043e022627fb5665b308fe427c9bc58aaf9..b751a259becad5537bb3f4dbbdb0b66e260aa793 100644 --- a/object/file.go +++ b/object/file.go @@ -4,6 +4,7 @@ package object import ( "bytes" + "fmt" "io" "io/ioutil" "os" @@ -20,7 +21,7 @@ const ( ) type filestore struct { - defaultObjectStorage + DefaultObjectStorage root string } @@ -301,22 +302,22 @@ func (d *filestore) Chown(path string, owner, group string) error { return os.Chown(p, uid, gid) } -func newDisk(root, accesskey, secretkey string) ObjectStorage { +func newDisk(root, accesskey, secretkey string) (ObjectStorage, error) { if strings.HasSuffix(root, dirSuffix) { logger.Debugf("Ensure dicectory %s", root) if err := os.MkdirAll(root, 0755); err != nil { - logger.Fatalf("Creating directory %s failed: %q", root, err) + return nil, fmt.Errorf("Creating directory %s failed: %q", root, err) } } else { dir := path.Dir(root) logger.Debugf("Ensure dicectory %s", dir) if err := os.MkdirAll(dir, 0755); err != nil { - logger.Fatalf("Creating directory %s failed: %q", dir, err) + return nil, fmt.Errorf("Creating directory %s failed: %q", dir, err) } } - return &filestore{root: root} + return &filestore{root: root}, nil } func init() { - register("file", newDisk) + Register("file", newDisk) } diff --git a/object/filesystem_test.go b/object/filesystem_test.go index 0f9880be84f42d5a1027cd65ea7c5a73deee194d..0648232c2610f88883e2c80a809091164f3c40c2 100644 --- a/object/filesystem_test.go +++ b/object/filesystem_test.go @@ -37,7 +37,7 @@ func TestFsFile(t *testing.T) { "xyz/", "xyz/xyz.txt", } - s0 := newDisk("/tmp/abc/unit-test/", "", "") + s0, _ := newDisk("/tmp/abc/unit-test/", "", "") // initialize directory tree for _, key := range keys { if err := s0.Put(key, bytes.NewReader([]byte{})); err != nil { @@ -55,7 +55,7 @@ func TestFsFile(t *testing.T) { } }() - s := newDisk("/tmp/abc/unit-test/x/", "", "") + s, _ := newDisk("/tmp/abc/unit-test/x/", "", "") objs, err := listAll(s, "", "", 100) if err != nil { t.Fatalf("list failed: %s", err) @@ -65,7 +65,7 @@ func TestFsFile(t *testing.T) { t.Fatalf("testKeysEqual fail: %s", err) } - s = newDisk("/tmp/abc/unit-test/x", "", "") + s, _ = newDisk("/tmp/abc/unit-test/x", "", "") objs, err = listAll(s, "", "", 100) if err != nil { t.Fatalf("list failed: %s", err) @@ -75,7 +75,7 @@ func TestFsFile(t *testing.T) { t.Fatalf("testKeysEqual fail: %s", err) } - s = newDisk("/tmp/abc/unit-test/xy", "", "") + s, _ = newDisk("/tmp/abc/unit-test/xy", "", "") objs, err = listAll(s, "", "", 100) if err != nil { t.Fatalf("list failed: %s", err) @@ -93,7 +93,7 @@ func TestFsSftp(t *testing.T) { t.SkipNow() } sftpUser, sftpPass := os.Getenv("SFTP_USER"), os.Getenv("SFTP_PASS") - s0 := newSftp(sftpHost, sftpUser, sftpPass) + s0, _ := newSftp(sftpHost, sftpUser, sftpPass) keys := []string{ "x/", @@ -119,7 +119,7 @@ func TestFsSftp(t *testing.T) { } }() - s := newSftp(sftpHost+"x/", sftpUser, sftpPass) + s, _ := newSftp(sftpHost+"x/", sftpUser, sftpPass) objs, err := listAll(s, "", "", 100) if err != nil { t.Fatalf("list failed: %s", err) @@ -129,7 +129,7 @@ func TestFsSftp(t *testing.T) { t.Fatalf("testKeysEqual fail: %s", err) } - s = newSftp(sftpHost+"x", sftpUser, sftpPass) + s, _ = newSftp(sftpHost+"x", sftpUser, sftpPass) objs, err = listAll(s, "", "", 100) if err != nil { t.Fatalf("list failed: %s", err) @@ -139,7 +139,7 @@ func TestFsSftp(t *testing.T) { t.Fatalf("testKeysEqual fail: %s", err) } - s = newSftp(sftpHost+"xy", sftpUser, sftpPass) + s, _ = newSftp(sftpHost+"xy", sftpUser, sftpPass) objs, err = listAll(s, "", "", 100) if err != nil { t.Fatalf("list failed: %s", err) diff --git a/object/gs.go b/object/gs.go index 2fe2966db2885df6e1c99d318f32abf8d653d296..91f6a1cc0ad48f6293cb5773b7c944d2322c8ac0 100644 --- a/object/gs.go +++ b/object/gs.go @@ -20,7 +20,7 @@ import ( var ctx = context.Background() type gs struct { - defaultObjectStorage + DefaultObjectStorage service *storage.Service bucket string region string @@ -135,10 +135,10 @@ func (g *gs) List(prefix, marker string, limit int64) ([]*Object, error) { return objs, nil } -func newGS(endpoint, accessKey, secretKey string) ObjectStorage { +func newGS(endpoint, accessKey, secretKey string) (ObjectStorage, error) { uri, err := url.ParseRequestURI(endpoint) if err != nil { - logger.Fatalf("Invalid endpoint: %v, error: %v", endpoint, err) + return nil, fmt.Errorf("Invalid endpoint: %v, error: %v", endpoint, err) } hostParts := strings.Split(uri.Host, ".") bucket := hostParts[0] @@ -151,9 +151,9 @@ func newGS(endpoint, accessKey, secretKey string) ObjectStorage { if err != nil { log.Fatalf("Failed to create service: %v", err) } - return &gs{service: service, bucket: bucket, region: region} + return &gs{service: service, bucket: bucket, region: region}, nil } func init() { - register("gs", newGS) + Register("gs", newGS) } diff --git a/object/hdfs.go b/object/hdfs.go index b7bd1530171b951d21d2a734cfb25776e5b54ec0..1c562e3e69a857eeb4f24e3bd6bd7483df1b7e85 100644 --- a/object/hdfs.go +++ b/object/hdfs.go @@ -24,7 +24,7 @@ var superuser = "hdfs" var supergroup = "supergroup" type hdfsclient struct { - defaultObjectStorage + DefaultObjectStorage addr string c *hdfs.Client } @@ -285,10 +285,10 @@ func (h *hdfsclient) Chown(key string, owner, group string) error { return h.c.Chown(h.path(key), owner, group) } -func newHDFS(addr, username, sk string) ObjectStorage { +func newHDFS(addr, username, sk string) (ObjectStorage, error) { conf, err := hadoopconf.LoadFromEnvironment() if err != nil { - logger.Fatalf("Problem loading configuration: %s", err) + return nil, fmt.Errorf("Problem loading configuration: %s", err) } options := hdfs.ClientOptionsFromConf(conf) @@ -299,7 +299,7 @@ func newHDFS(addr, username, sk string) ObjectStorage { if options.KerberosClient != nil { options.KerberosClient, err = getKerberosClient() if err != nil { - logger.Fatalf("Problem with kerberos authentication: %s", err) + return nil, fmt.Errorf("Problem with kerberos authentication: %s", err) } } else { if username == "" { @@ -308,7 +308,7 @@ func newHDFS(addr, username, sk string) ObjectStorage { if username == "" { current, err := user.Current() if err != nil { - logger.Fatalf("get current user: %s", err) + return nil, fmt.Errorf("get current user: %s", err) } username = current.Username } @@ -317,7 +317,7 @@ func newHDFS(addr, username, sk string) ObjectStorage { c, err := hdfs.NewClient(options) if err != nil { - logger.Fatalf("new HDFS client %s: %s", addr, err) + return nil, fmt.Errorf("new HDFS client %s: %s", addr, err) } if os.Getenv("HADOOP_SUPER_USER") != "" { superuser = os.Getenv("HADOOP_SUPER_USER") @@ -326,9 +326,9 @@ func newHDFS(addr, username, sk string) ObjectStorage { supergroup = os.Getenv("HADOOP_SUPER_GROUP") } - return &hdfsclient{addr: addr, c: c} + return &hdfsclient{addr: addr, c: c}, nil } func init() { - register("hdfs", newHDFS) + Register("hdfs", newHDFS) } diff --git a/object/ibmcos.go b/object/ibmcos.go index 5b03c6838056a64704042fb703fe3e42524d8fe5..862613e35ecf0719544ff34061f693f4f7b44f5b 100644 --- a/object/ibmcos.go +++ b/object/ibmcos.go @@ -215,7 +215,7 @@ func (s *ibmcos) ListUploads(marker string) ([]*PendingPart, string, error) { return parts, nextMarker, nil } -func newIBMCOS(endpoint, apiKey, serviceInstanceID string) ObjectStorage { +func newIBMCOS(endpoint, apiKey, serviceInstanceID string) (ObjectStorage, error) { uri, _ := url.ParseRequestURI(endpoint) hostParts := strings.Split(uri.Host, ".") bucket := hostParts[0] @@ -230,9 +230,9 @@ func newIBMCOS(endpoint, apiKey, serviceInstanceID string) ObjectStorage { WithS3ForcePathStyle(true) sess := session.Must(session.NewSession()) client := s3.New(sess, conf) - return &ibmcos{bucket, client} + return &ibmcos{bucket, client}, nil } func init() { - register("ibmcos", newIBMCOS) + Register("ibmcos", newIBMCOS) } diff --git a/object/jss.go b/object/jss.go index 0811eddc39802800895cb3c1de6042545fe7d750..f66d712364c115d3aace76438ee8ebe02a30ce60 100644 --- a/object/jss.go +++ b/object/jss.go @@ -32,7 +32,7 @@ func (j *jss) Copy(dst, src string) error { return err } -func newJSS(endpoint, accessKey, secretKey string) ObjectStorage { +func newJSS(endpoint, accessKey, secretKey string) (ObjectStorage, error) { uri, _ := url.ParseRequestURI(endpoint) ssl := strings.ToLower(uri.Scheme) == "https" hostParts := strings.Split(uri.Host, ".") @@ -50,9 +50,9 @@ func newJSS(endpoint, accessKey, secretKey string) ObjectStorage { } ses := session.New(awsConfig) //.WithLogLevel(aws.LogDebugWithHTTPBody)) - return &jss{s3client{bucket, s3.New(ses), ses}} + return &jss{s3client{bucket, s3.New(ses), ses}}, nil } func init() { - register("jss", newJSS) + Register("jss", newJSS) } diff --git a/object/ks3.go b/object/ks3.go index 296fc1783014c698461e7ab7f1cac383fb05fadf..e769196e4eed993183d9b52080bb2802fad32b38 100644 --- a/object/ks3.go +++ b/object/ks3.go @@ -225,7 +225,7 @@ var ks3Regions = map[string]string{ "sgp": "SINGAPORE", } -func newKS3(endpoint, accessKey, secretKey string) ObjectStorage { +func newKS3(endpoint, accessKey, secretKey string) (ObjectStorage, error) { uri, _ := url.ParseRequestURI(endpoint) ssl := strings.ToLower(uri.Scheme) == "https" hostParts := strings.Split(uri.Host, ".") @@ -247,9 +247,9 @@ func newKS3(endpoint, accessKey, secretKey string) ObjectStorage { Credentials: credentials.NewStaticCredentials(accessKey, secretKey, ""), } - return &ks3{bucket, s3.New(awsConfig), nil} + return &ks3{bucket, s3.New(awsConfig), nil}, nil } func init() { - register("ks3", newKS3) + Register("ks3", newKS3) } diff --git a/object/mem.go b/object/mem.go index d761fa21c3124dc872bc39669e714d87b5a9f9ae..2e758119dab30e13956a09534ab99748c3c1671a 100644 --- a/object/mem.go +++ b/object/mem.go @@ -26,7 +26,7 @@ type obj struct { type memStore struct { sync.Mutex - defaultObjectStorage + DefaultObjectStorage name string objects map[string]*obj } @@ -185,12 +185,12 @@ func (m *memStore) List(prefix, marker string, limit int64) ([]*Object, error) { return objs, nil } -func newMem(endpoint, accesskey, secretkey string) ObjectStorage { +func newMem(endpoint, accesskey, secretkey string) (ObjectStorage, error) { store := &memStore{name: endpoint} store.objects = make(map[string]*obj) - return store + return store, nil } func init() { - register("mem", newMem) + Register("mem", newMem) } diff --git a/object/mss.go b/object/mss.go index a8d911b7d5c21daa7351e131d678afe50a4ee147..e037f3105815a7ba66106190e44c3cec535ff9f2 100644 --- a/object/mss.go +++ b/object/mss.go @@ -135,7 +135,7 @@ func (c *mss) List(prefix, marker string, limit int64) ([]*Object, error) { return objs, nil } -func newMSS(endpoint, accessKey, secretKey string) ObjectStorage { +func newMSS(endpoint, accessKey, secretKey string) (ObjectStorage, error) { qs := &mss{RestfulStorage{ endpoint: endpoint, accessKey: accessKey, @@ -143,9 +143,9 @@ func newMSS(endpoint, accessKey, secretKey string) ObjectStorage { signName: "AWS", signer: mssSigner, }} - return qs + return qs, nil } func init() { - register("mss", newMSS) + Register("mss", newMSS) } diff --git a/object/nos.go b/object/nos.go index db639112d02e32b567ce0813878a36d6b206a8f4..31eb638b473897fd768772bf9b344e32ae89df7f 100644 --- a/object/nos.go +++ b/object/nos.go @@ -18,7 +18,7 @@ import ( ) type nos struct { - defaultObjectStorage + DefaultObjectStorage bucket string client *nosclient.NosClient } @@ -134,10 +134,10 @@ func (s *nos) List(prefix, marker string, limit int64) ([]*Object, error) { return objs, nil } -func newNOS(endpoint, accessKey, secretKey string) ObjectStorage { +func newNOS(endpoint, accessKey, secretKey string) (ObjectStorage, error) { uri, err := url.ParseRequestURI(endpoint) if err != nil { - logger.Fatalf("Invalid endpoint: %v, error: %v", endpoint, err) + return nil, fmt.Errorf("Invalid endpoint: %v, error: %v", endpoint, err) } hostParts := strings.SplitN(uri.Host, ".", 2) bucket := hostParts[0] @@ -156,9 +156,9 @@ func newNOS(endpoint, accessKey, secretKey string) ObjectStorage { nosClient, _ := nosclient.New(conf) - return &nos{bucket: bucket, client: nosClient} + return &nos{bucket: bucket, client: nosClient}, nil } func init() { - register("nos", newNOS) + Register("nos", newNOS) } diff --git a/object/object_storage.go b/object/object_storage.go index e9a589c36d553ec83447d822cfa7f864c5dd97c4..8986135e766d151a5abe174e18971beabf1e0765 100644 --- a/object/object_storage.go +++ b/object/object_storage.go @@ -76,51 +76,51 @@ type FileSystem interface { var notSupported = errors.New("not supported") -type defaultObjectStorage struct{} +type DefaultObjectStorage struct{} -func (s defaultObjectStorage) Create() error { +func (s DefaultObjectStorage) Create() error { return nil } -func (s defaultObjectStorage) CreateMultipartUpload(key string) (*MultipartUpload, error) { +func (s DefaultObjectStorage) CreateMultipartUpload(key string) (*MultipartUpload, error) { return nil, notSupported } -func (s defaultObjectStorage) UploadPart(key string, uploadID string, num int, body []byte) (*Part, error) { +func (s DefaultObjectStorage) UploadPart(key string, uploadID string, num int, body []byte) (*Part, error) { return nil, notSupported } -func (s defaultObjectStorage) AbortUpload(key string, uploadID string) {} +func (s DefaultObjectStorage) AbortUpload(key string, uploadID string) {} -func (s defaultObjectStorage) CompleteUpload(key string, uploadID string, parts []*Part) error { +func (s DefaultObjectStorage) CompleteUpload(key string, uploadID string, parts []*Part) error { return notSupported } -func (s defaultObjectStorage) ListUploads(marker string) ([]*PendingPart, string, error) { +func (s DefaultObjectStorage) ListUploads(marker string) ([]*PendingPart, string, error) { return nil, "", nil } -func (s defaultObjectStorage) List(prefix, marker string, limit int64) ([]*Object, error) { +func (s DefaultObjectStorage) List(prefix, marker string, limit int64) ([]*Object, error) { return nil, notSupported } -func (s defaultObjectStorage) ListAll(prefix, marker string) (<-chan *Object, error) { +func (s DefaultObjectStorage) ListAll(prefix, marker string) (<-chan *Object, error) { return nil, notSupported } -type Register func(endpoint, accessKey, secretKey string) ObjectStorage +type Creator func(bucket, accessKey, secretKey string) (ObjectStorage, error) -var storages = make(map[string]Register) +var storages = make(map[string]Creator) -func register(name string, register Register) { +func Register(name string, register Creator) { storages[name] = register } -func CreateStorage(name, endpoint, accessKey, secretKey string) ObjectStorage { +func CreateStorage(name, endpoint, accessKey, secretKey string) (ObjectStorage, error) { f, ok := storages[name] if ok { logger.Debugf("Creating %s storage at endpoint %s", name, endpoint) return f(endpoint, accessKey, secretKey) } - panic(fmt.Sprintf("invalid storage: %s", name)) + return nil, fmt.Errorf("invalid storage: %s", name) } diff --git a/object/object_storage_test.go b/object/object_storage_test.go index d0f108c937bbca938d13d67c759ad414f86f67ec..5c87b6729a5632dd25b4d77f48394e2fcab8965f 100644 --- a/object/object_storage_test.go +++ b/object/object_storage_test.go @@ -47,7 +47,7 @@ func testStorage(t *testing.T, s ObjectStorage) { t.Fatalf("Can't create bucket %s: %s", s, err) } - s = WithPrefix(s, "unit-test") + s, _ = WithPrefix(s, "unit-test") defer s.Delete("/test") k := "/large" defer s.Delete(k) @@ -179,12 +179,12 @@ func testStorage(t *testing.T, s ObjectStorage) { } func TestMem(t *testing.T) { - m := newMem("", "", "") + m, _ := newMem("", "", "") testStorage(t, m) } func TestDisk(t *testing.T) { - s := newDisk("/tmp/abc/", "", "") + s, _ := newDisk("/tmp/abc/", "", "") testStorage(t, s) } @@ -192,7 +192,7 @@ func TestQingStor(t *testing.T) { if os.Getenv("QY_ACCESS_KEY") == "" { t.SkipNow() } - s := newQingStor("https://test.pek3a.qingstor.com", + s, _ := newQingStor("https://test.pek3a.qingstor.com", os.Getenv("QY_ACCESS_KEY"), os.Getenv("QY_SECRET_KEY")) testStorage(t, s) } @@ -201,7 +201,7 @@ func TestS3(t *testing.T) { if os.Getenv("AWS_ACCESS_KEY_ID") == "" { t.SkipNow() } - s := newS3(fmt.Sprintf("https://%s", os.Getenv("S3_TEST_BUCKET")), + s, _ := newS3(fmt.Sprintf("https://%s", os.Getenv("S3_TEST_BUCKET")), os.Getenv("AWS_ACCESS_KEY_ID"), os.Getenv("AWS_SECRET_ACCESS_KEY")) testStorage(t, s) } @@ -214,7 +214,7 @@ func TestOSS(t *testing.T) { if b := os.Getenv("OSS_TEST_BUCKET"); b != "" { bucketName = b } - s := newOSS(fmt.Sprintf("https://%s", bucketName), + s, _ := newOSS(fmt.Sprintf("https://%s", bucketName), os.Getenv("ALICLOUD_ACCESS_KEY_ID"), os.Getenv("ALICLOUD_ACCESS_KEY_SECRET")) testStorage(t, s) } @@ -223,7 +223,7 @@ func TestUFile(t *testing.T) { if os.Getenv("UCLOUD_PUBLIC_KEY") == "" { t.SkipNow() } - ufile := newUFile("https://test.us-ca.ufileos.com", + ufile, _ := newUFile("https://test.us-ca.ufileos.com", os.Getenv("UCLOUD_PUBLIC_KEY"), os.Getenv("UCLOUD_PRIVATE_KEY")) testStorage(t, ufile) } @@ -233,7 +233,7 @@ func TestGS(t *testing.T) { t.SkipNow() } os.Setenv("GOOGLE_CLOUD_PROJECT", "davies-test") - gs := newGS("https://test.us-west1.googleapi.com", "", "") + gs, _ := newGS("https://test.us-west1.googleapi.com", "", "") testStorage(t, gs) } @@ -241,10 +241,10 @@ func TestQiniu(t *testing.T) { if os.Getenv("QINIU_ACCESS_KEY") == "" { t.SkipNow() } - qiniu := newQiniu("https://test.cn-east-1-s3.qiniu.com", + qiniu, _ := newQiniu("https://test.cn-east-1-s3.qiniu.com", os.Getenv("QINIU_ACCESS_KEY"), os.Getenv("QINIU_SECRET_KEY")) testStorage(t, qiniu) - qiniu = newQiniu("https://test.cn-north-1-s3.qiniu.com", + qiniu, _ = newQiniu("https://test.cn-north-1-s3.qiniu.com", os.Getenv("QINIU_ACCESS_KEY"), os.Getenv("QINIU_SECRET_KEY")) testStorage(t, qiniu) } @@ -253,7 +253,7 @@ func TestKS3(t *testing.T) { if os.Getenv("KS3_ACCESS_KEY") == "" { t.SkipNow() } - ks3 := newKS3("https://test.kss.ksyun.com", + ks3, _ := newKS3("https://test.kss.ksyun.com", os.Getenv("KS3_ACCESS_KEY"), os.Getenv("KS3_SECRET_KEY")) testStorage(t, ks3) } @@ -262,7 +262,7 @@ func TestCOS(t *testing.T) { if os.Getenv("COS_SECRETID") == "" { t.SkipNow() } - cos := newCOS( + cos, _ := newCOS( fmt.Sprintf("https://%s", os.Getenv("COS_TEST_BUCKET")), os.Getenv("COS_SECRETID"), os.Getenv("COS_SECRETKEY")) testStorage(t, cos) @@ -272,7 +272,7 @@ func TestAzure(t *testing.T) { if os.Getenv("AZURE_STORAGE_ACCOUNT") == "" { t.SkipNow() } - abs := newWabs("https://test-chunk.core.chinacloudapi.cn", + abs, _ := newWabs("https://test-chunk.core.chinacloudapi.cn", os.Getenv("AZURE_STORAGE_ACCOUNT"), os.Getenv("AZURE_STORAGE_KEY")) testStorage(t, abs) } @@ -281,7 +281,7 @@ func TestNOS(t *testing.T) { if os.Getenv("NOS_ACCESS_KEY") == "" { t.SkipNow() } - nos := newNOS("https://test.nos-eastchina1.126.net", + nos, _ := newNOS("https://test.nos-eastchina1.126.net", os.Getenv("NOS_ACCESS_KEY"), os.Getenv("NOS_SECRET_KEY")) testStorage(t, nos) } @@ -290,7 +290,7 @@ func TestMSS(t *testing.T) { if os.Getenv("MSS_ACCESS_KEY") == "" { t.SkipNow() } - mss := newMSS("https://test.mtmss.com", + mss, _ := newMSS("https://test.mtmss.com", os.Getenv("MSS_ACCESS_KEY"), os.Getenv("MSS_SECRET_KEY")) testStorage(t, mss) } @@ -299,7 +299,7 @@ func TestJSS(t *testing.T) { if os.Getenv("JSS_ACCESS_KEY") == "" { t.SkipNow() } - jss := newJSS("https://test.s3.cn-north-1.jcloudcs.com", + jss, _ := newJSS("https://test.s3.cn-north-1.jcloudcs.com", os.Getenv("JSS_ACCESS_KEY"), os.Getenv("JSS_SECRET_KEY")) testStorage(t, jss) } @@ -308,7 +308,7 @@ func TestSpeedy(t *testing.T) { if os.Getenv("SPEEDY_ACCESS_KEY") == "" { t.SkipNow() } - cos := newSpeedy("https://test.oss-cn-beijing.speedycloud.org", + cos, _ := newSpeedy("https://test.oss-cn-beijing.speedycloud.org", os.Getenv("SPEEDY_ACCESS_KEY"), os.Getenv("SPEEDY_SECRET_KEY")) testStorage(t, cos) } @@ -317,7 +317,7 @@ func TestB2(t *testing.T) { if os.Getenv("B2_ACCOUNT_ID") == "" { t.SkipNow() } - b := newB2("https://test.backblaze.com", os.Getenv("B2_ACCOUNT_ID"), os.Getenv("B2_APP_KEY")) + b, _ := newB2("https://test.backblaze.com", os.Getenv("B2_ACCOUNT_ID"), os.Getenv("B2_APP_KEY")) testStorage(t, b) } @@ -325,7 +325,7 @@ func TestSpace(t *testing.T) { if os.Getenv("SPACE_ACCESS_KEY") == "" { t.SkipNow() } - b := newSpace("https://test.nyc3.digitaloceanspaces.com", os.Getenv("SPACE_ACCESS_KEY"), os.Getenv("SPACE_SECRET_KEY")) + b, _ := newSpace("https://test.nyc3.digitaloceanspaces.com", os.Getenv("SPACE_ACCESS_KEY"), os.Getenv("SPACE_SECRET_KEY")) testStorage(t, b) } @@ -333,7 +333,7 @@ func TestBOS(t *testing.T) { if os.Getenv("BDCLOUD_ACCESS_KEY") == "" { t.SkipNow() } - b := newBOS(fmt.Sprintf("https://%s", os.Getenv("BOS_TEST_BUCKET")), + b, _ := newBOS(fmt.Sprintf("https://%s", os.Getenv("BOS_TEST_BUCKET")), os.Getenv("BDCLOUD_ACCESS_KEY"), os.Getenv("BDCLOUD_SECRET_KEY")) testStorage(t, b) } @@ -342,7 +342,7 @@ func TestSftp(t *testing.T) { if os.Getenv("SFTP_HOST") == "" { t.SkipNow() } - b := newSftp(os.Getenv("SFTP_HOST"), os.Getenv("SFTP_USER"), os.Getenv("SFTP_PASS")) + b, _ := newSftp(os.Getenv("SFTP_HOST"), os.Getenv("SFTP_USER"), os.Getenv("SFTP_PASS")) testStorage(t, b) } @@ -350,7 +350,7 @@ func TestOBS(t *testing.T) { if os.Getenv("HWCLOUD_ACCESS_KEY") == "" { t.SkipNow() } - b := newOBS(fmt.Sprintf("https://%s", os.Getenv("OBS_TEST_BUCKET")), + b, _ := newOBS(fmt.Sprintf("https://%s", os.Getenv("OBS_TEST_BUCKET")), os.Getenv("HWCLOUD_ACCESS_KEY"), os.Getenv("HWCLOUD_SECRET_KEY")) testStorage(t, b) } @@ -359,7 +359,7 @@ func TestHDFS(t *testing.T) { if os.Getenv("HDFS_ADDR") == "" { t.Skip() } - dfs := newHDFS(os.Getenv("HDFS_ADDR"), "", "") + dfs, _ := newHDFS(os.Getenv("HDFS_ADDR"), "", "") testStorage(t, dfs) } @@ -367,7 +367,7 @@ func TestOOS(t *testing.T) { if os.Getenv("OOS_ACCESS_KEY") == "" { t.SkipNow() } - b := newOOS(fmt.Sprintf("https://%s", os.Getenv("OOS_TEST_BUCKET")), + b, _ := newOOS(fmt.Sprintf("https://%s", os.Getenv("OOS_TEST_BUCKET")), os.Getenv("OOS_ACCESS_KEY"), os.Getenv("OOS_SECRET_KEY")) testStorage(t, b) } diff --git a/object/obs.go b/object/obs.go index 1fa3847f519abf7ee1301c80646e56c6d6d515db..a0a5849b6e5b4b6809cc4fac8ed12d596e1fa5af 100644 --- a/object/obs.go +++ b/object/obs.go @@ -228,10 +228,10 @@ func autoOBSEndpoint(bucketName, accessKey, secretKey string) (string, error) { return "", fmt.Errorf("bucket %q does not exist", bucketName) } -func newOBS(endpoint, accessKey, secretKey string) ObjectStorage { +func newOBS(endpoint, accessKey, secretKey string) (ObjectStorage, error) { uri, err := url.ParseRequestURI(endpoint) if err != nil { - logger.Fatalf("Invalid endpoint %s: %s", endpoint, err) + return nil, fmt.Errorf("Invalid endpoint %s: %s", endpoint, err) } hostParts := strings.SplitN(uri.Host, ".", 2) bucketName := hostParts[0] @@ -247,7 +247,7 @@ func newOBS(endpoint, accessKey, secretKey string) ObjectStorage { var region string if len(hostParts) == 1 { if endpoint, err = autoOBSEndpoint(bucketName, accessKey, secretKey); err != nil { - logger.Fatalf("Cannot get location of bucket %q: %s", bucketName, err) + return nil, fmt.Errorf("Cannot get location of bucket %q: %s", bucketName, err) } if !strings.HasPrefix(endpoint, "http") { endpoint = fmt.Sprintf("%s://%s", uri.Scheme, endpoint) @@ -258,11 +258,11 @@ func newOBS(endpoint, accessKey, secretKey string) ObjectStorage { c, err := obs.New(accessKey, secretKey, endpoint) if err != nil { - logger.Fatalf("Fail to initialize OBS: %s", err) + return nil, fmt.Errorf("Fail to initialize OBS: %s", err) } - return &obsClient{bucketName, region, c} + return &obsClient{bucketName, region, c}, nil } func init() { - register("obs", newOBS) + Register("obs", newOBS) } diff --git a/object/oos.go b/object/oos.go index 4d52f35f74d94c740f607dc5d1102843542571ec..af58a2c3d4b288ebab3a928bb15de7796ca5072a 100644 --- a/object/oos.go +++ b/object/oos.go @@ -40,10 +40,10 @@ func (s *oos) List(prefix, marker string, limit int64) ([]*Object, error) { return objs, err } -func newOOS(endpoint, accessKey, secretKey string) ObjectStorage { +func newOOS(endpoint, accessKey, secretKey string) (ObjectStorage, error) { uri, err := url.ParseRequestURI(endpoint) if err != nil { - logger.Fatalf("Invalid endpoint %s: %s", endpoint, err) + return nil, fmt.Errorf("Invalid endpoint %s: %s", endpoint, err) } ssl := strings.ToLower(uri.Scheme) == "https" hostParts := strings.Split(uri.Host, ".") @@ -57,13 +57,13 @@ func newOOS(endpoint, accessKey, secretKey string) ObjectStorage { DisableSSL: aws.Bool(!ssl), S3ForcePathStyle: aws.Bool(true), // HTTPClient: httpClient, - Credentials: credentials.NewStaticCredentials(accessKey, secretKey, ""), + Credentials: credentials.NewStaticCredentials(accessKey, secretKey, ""), } ses := session.New(awsConfig) - return &oos{s3client{bucket, s3.New(ses), ses}} + return &oos{s3client{bucket, s3.New(ses), ses}}, nil } func init() { - register("oos", newOOS) + Register("oos", newOOS) } diff --git a/object/oss.go b/object/oss.go index f9a857f5c4d8e2bd116e4de45f8a5580b9522e2e..68b6892e3b249716ffca5d7a5ffc88a8e761face 100644 --- a/object/oss.go +++ b/object/oss.go @@ -285,10 +285,10 @@ func autoOSSEndpoint(bucketName, accessKey, secretKey, securityToken string) (st return fmt.Sprintf("https://%s.aliyuncs.com", bucketLocation), nil } -func newOSS(endpoint, accessKey, secretKey string) ObjectStorage { +func newOSS(endpoint, accessKey, secretKey string) (ObjectStorage, error) { uri, err := url.ParseRequestURI(endpoint) if err != nil { - logger.Fatalf("Invalid endpoint: %v, error: %v", endpoint, err) + return nil, fmt.Errorf("Invalid endpoint: %v, error: %v", endpoint, err) } hostParts := strings.SplitN(uri.Host, ".", 2) bucketName := hostParts[0] @@ -307,7 +307,7 @@ func newOSS(endpoint, accessKey, secretKey string) ObjectStorage { if accessKey == "" { if cred, err := fetchStsToken(); err != nil { - logger.Fatalf("No credential provided for OSS") + return nil, fmt.Errorf("No credential provided for OSS") } else { accessKey = cred.AccessKeyId secretKey = cred.AccessKeySecret @@ -318,7 +318,7 @@ func newOSS(endpoint, accessKey, secretKey string) ObjectStorage { if domain == "" { if domain, err = autoOSSEndpoint(bucketName, accessKey, secretKey, securityToken); err != nil { - logger.Fatalf("Unable to get endpoint of bucket %s: %s", bucketName, err) + return nil, fmt.Errorf("Unable to get endpoint of bucket %s: %s", bucketName, err) } logger.Debugf("Use endpoint %q", domain) } @@ -345,7 +345,7 @@ func newOSS(endpoint, accessKey, secretKey string) ObjectStorage { }() } if err != nil { - logger.Fatalf("Cannot create OSS client with endpoint %s: %s", endpoint, err) + return nil, fmt.Errorf("Cannot create OSS client with endpoint %s: %s", endpoint, err) } client.Config.Timeout = 10 @@ -357,12 +357,12 @@ func newOSS(endpoint, accessKey, secretKey string) ObjectStorage { bucket, err := client.Bucket(bucketName) if err != nil { - logger.Fatalf("Cannot create bucket %s: %s", bucketName, err) + return nil, fmt.Errorf("Cannot create bucket %s: %s", bucketName, err) } - return &ossClient{client: client, bucket: bucket} + return &ossClient{client: client, bucket: bucket}, nil } func init() { - register("oss", newOSS) + Register("oss", newOSS) } diff --git a/object/prefix.go b/object/prefix.go index 23c6379c3000a94d51f12079d2746e25dc7d42d3..b418164d437772eaf541d40f0a636ffebbcc653e 100644 --- a/object/prefix.go +++ b/object/prefix.go @@ -14,8 +14,8 @@ type withPrefix struct { } // WithPrefix retuns a object storage that add a prefix to keys. -func WithPrefix(os ObjectStorage, prefix string) ObjectStorage { - return &withPrefix{os, prefix} +func WithPrefix(os ObjectStorage, prefix string) (ObjectStorage, error) { + return &withPrefix{os, prefix}, nil } func (p *withPrefix) String() string { diff --git a/object/qingstor.go b/object/qingstor.go index e2e12843e435c92049c3cf951b503fa8f5d54723..8140c199973a055f081032f59281d7561a53ed53 100644 --- a/object/qingstor.go +++ b/object/qingstor.go @@ -229,10 +229,10 @@ func (q *qingstor) ListUploads(marker string) ([]*PendingPart, string, error) { return parts, *result.NextKeyMarker, nil } -func newQingStor(endpoint, accessKey, secretKey string) ObjectStorage { +func newQingStor(endpoint, accessKey, secretKey string) (ObjectStorage, error) { uri, err := url.ParseRequestURI(endpoint) if err != nil { - logger.Fatalf("Invalid endpoint: %v, error: %v", endpoint, err) + return nil, fmt.Errorf("Invalid endpoint: %v, error: %v", endpoint, err) } hostParts := strings.SplitN(uri.Host, ".", 3) bucketName := hostParts[0] @@ -240,7 +240,7 @@ func newQingStor(endpoint, accessKey, secretKey string) ObjectStorage { conf, err := config.New(accessKey, secretKey) if err != nil { - logger.Fatalf("Can't load config: %s", err.Error()) + return nil, fmt.Errorf("Can't load config: %s", err.Error()) } conf.Protocol = uri.Scheme if uri.Scheme == "http" { @@ -251,9 +251,9 @@ func newQingStor(endpoint, accessKey, secretKey string) ObjectStorage { conf.Connection = httpClient qsService, _ := qs.Init(conf) bucket, _ := qsService.Bucket(bucketName, zone) - return &qingstor{bucket: bucket} + return &qingstor{bucket: bucket}, nil } func init() { - register("qingstor", newQingStor) + Register("qingstor", newQingStor) } diff --git a/object/qiniu.go b/object/qiniu.go index 625dabb85f21eb91e29e61421f89b08ebf63ba79..2c4a2c226167c1c7fdb9e6b3465c3b030426fcf0 100644 --- a/object/qiniu.go +++ b/object/qiniu.go @@ -144,10 +144,10 @@ var publicRegions = map[string]*storage.Zone{ "ap-southeast-1": &storage.ZoneXinjiapo, } -func newQiniu(endpoint, accessKey, secretKey string) ObjectStorage { +func newQiniu(endpoint, accessKey, secretKey string) (ObjectStorage, error) { uri, err := url.ParseRequestURI(endpoint) if err != nil { - logger.Fatalf("Invalid endpoint: %v, error: %v", endpoint, err) + return nil, fmt.Errorf("Invalid endpoint: %v, error: %v", endpoint, err) } hostParts := strings.SplitN(uri.Host, ".", 2) bucket := hostParts[0] @@ -195,9 +195,9 @@ func newQiniu(endpoint, accessKey, secretKey string) ObjectStorage { cfg.Zone = zone mac := qbox.NewMac(accessKey, secretKey) bucketManager := storage.NewBucketManager(mac, &cfg) - return &qiniu{s3client, bucketManager, mac, &cfg, ""} + return &qiniu{s3client, bucketManager, mac, &cfg, ""}, nil } func init() { - register("qiniu", newQiniu) + Register("qiniu", newQiniu) } diff --git a/object/restful.go b/object/restful.go index ccf4cd3188e5f521213fc75fbd84f25e43ff28cd..9e6f5d2b832e0ab71359e8bf17b7545acf79881f 100644 --- a/object/restful.go +++ b/object/restful.go @@ -60,7 +60,7 @@ func cleanup(response *http.Response) { } type RestfulStorage struct { - defaultObjectStorage + DefaultObjectStorage endpoint string accessKey string secretKey string diff --git a/object/s3.go b/object/s3.go index 0974fabe28d32205c8bc8fb21d60fc747826d4f1..e48f4306cf0dc40b9cdf3c372a94e846ccfdc9f2 100644 --- a/object/s3.go +++ b/object/s3.go @@ -270,10 +270,10 @@ func autoS3Region(bucketName, accessKey, secretKey string) (string, error) { return "", err } -func newS3(endpoint, accessKey, secretKey string) ObjectStorage { +func newS3(endpoint, accessKey, secretKey string) (ObjectStorage, error) { uri, err := url.ParseRequestURI(endpoint) if err != nil { - logger.Fatalf("Invalid endpoint %s: %s", endpoint, err.Error()) + return nil, fmt.Errorf("Invalid endpoint %s: %s", endpoint, err.Error()) } hostParts := strings.SplitN(uri.Host, ".", 2) @@ -286,7 +286,7 @@ func newS3(endpoint, accessKey, secretKey string) ObjectStorage { if len(hostParts) == 1 { // take endpoint as bucketname bucketName = hostParts[0] if region, err = autoS3Region(bucketName, accessKey, secretKey); err != nil { - logger.Fatalf("Can't guess your region for bucket %s: %s", bucketName, err) + return nil, fmt.Errorf("Can't guess your region for bucket %s: %s", bucketName, err) } } else { // get region in endpoint if strings.Contains(uri.Host, ".amazonaws.com") { @@ -335,11 +335,11 @@ func newS3(endpoint, accessKey, secretKey string) ObjectStorage { ses, err := session.NewSession(awsConfig) //.WithLogLevel(aws.LogDebugWithHTTPBody)) if err != nil { - logger.Fatalf("Fail to create aws session: %s", err) + return nil, fmt.Errorf("Fail to create aws session: %s", err) } - return &s3client{bucketName, s3.New(ses), ses} + return &s3client{bucketName, s3.New(ses), ses}, nil } func init() { - register("s3", newS3) + Register("s3", newS3) } diff --git a/object/sftp.go b/object/sftp.go index 5bf2207c1a9b7e47101115ef183decd41af20424..467526003982b925eb7fee5cad1afc3be4d2d9ab 100644 --- a/object/sftp.go +++ b/object/sftp.go @@ -54,7 +54,7 @@ func (c *conn) closed() error { } type sftpStore struct { - defaultObjectStorage + DefaultObjectStorage host string root string config *ssh.ClientConfig @@ -372,7 +372,7 @@ func (f *sftpStore) ListAll(prefix, marker string) (<-chan *Object, error) { return listed, nil } -func newSftp(endpoint, user, pass string) ObjectStorage { +func newSftp(endpoint, user, pass string) (ObjectStorage, error) { parts := strings.Split(endpoint, ":") root := filepath.Clean(parts[1]) // append suffix `/` removed by filepath.Clean() @@ -394,12 +394,12 @@ func newSftp(endpoint, user, pass string) ObjectStorage { if privateKeyPath := os.Getenv("SSH_PRIVATE_KEY_PATH"); privateKeyPath != "" { key, err := ioutil.ReadFile(privateKeyPath) if err != nil { - logger.Fatalf("unable to read private key, error: %v", err) + return nil, fmt.Errorf("unable to read private key, error: %v", err) } signer, err := ssh.ParsePrivateKey(key) if err != nil { - logger.Fatalf("unable to parse private key, error: %v", err) + return nil, fmt.Errorf("unable to parse private key, error: %v", err) } config.Auth = append(config.Auth, ssh.PublicKeys(signer)) @@ -414,26 +414,26 @@ func newSftp(endpoint, user, pass string) ObjectStorage { c, err := f.getSftpConnection() if err != nil { logger.Errorf("getSftpConnection failed: %s", err) - return nil + return nil, err } defer f.putSftpConnection(&c, err) if strings.HasSuffix(root, dirSuffix) { logger.Debugf("Ensure dicectory %s", root) if err := c.sftpClient.MkdirAll(root); err != nil { - logger.Fatalf("Creating directory %s failed: %q", root, err) + return nil, fmt.Errorf("Creating directory %s failed: %q", root, err) } } else { dir := filepath.Dir(root) logger.Debugf("Ensure dicectory %s", dir) if err := c.sftpClient.MkdirAll(dir); err != nil { - logger.Fatalf("Creating directory %s failed: %q", dir, err) + return nil, fmt.Errorf("Creating directory %s failed: %q", dir, err) } } - return f + return f, nil } func init() { - register("sftp", newSftp) + Register("sftp", newSftp) } diff --git a/object/space.go b/object/space.go index 8cf363f42c7437c030d5b98d6394760ea2ed0b9f..8c8f0a60ce3cf9e98d6b0a0961d3ab80341ccf0a 100644 --- a/object/space.go +++ b/object/space.go @@ -40,7 +40,7 @@ func (s *space) Create() error { return err } -func newSpace(endpoint, accessKey, secretKey string) ObjectStorage { +func newSpace(endpoint, accessKey, secretKey string) (ObjectStorage, error) { uri, _ := url.ParseRequestURI(endpoint) ssl := strings.ToLower(uri.Scheme) == "https" hostParts := strings.Split(uri.Host, ".") @@ -58,9 +58,9 @@ func newSpace(endpoint, accessKey, secretKey string) ObjectStorage { } ses := session.New(awsConfig) //.WithLogLevel(aws.LogDebugWithHTTPBody)) - return &space{s3client{bucket, s3.New(ses), ses}} + return &space{s3client{bucket, s3.New(ses), ses}}, nil } func init() { - register("space", newSpace) + Register("space", newSpace) } diff --git a/object/speedy.go b/object/speedy.go index 9821faff835cbe9209d57621cd5e5ef6763bf7e3..a8c5f05a14695ae19be1fc690c237a09611f5ce1 100644 --- a/object/speedy.go +++ b/object/speedy.go @@ -71,16 +71,16 @@ func (s *speedy) List(prefix, marker string, limit int64) ([]*Object, error) { return objs, nil } -func newSpeedy(endpoint, accessKey, secretKey string) ObjectStorage { +func newSpeedy(endpoint, accessKey, secretKey string) (ObjectStorage, error) { return &speedy{RestfulStorage{ endpoint: endpoint, accessKey: accessKey, secretKey: secretKey, signName: "AWS", signer: sign, - }} + }}, nil } func init() { - register("speedy", newSpeedy) + Register("speedy", newSpeedy) } diff --git a/object/ufile.go b/object/ufile.go index 3c7b6b1c0d2d11dedbf13ce6d2a8a8c422a81992..b7dd565b262e0bb97d669e7c30ab7f70ad3d8541 100644 --- a/object/ufile.go +++ b/object/ufile.go @@ -252,10 +252,10 @@ func (u *ufile) ListUploads(marker string) ([]*PendingPart, string, error) { return parts, out.NextMarker, nil } -func newUFile(endpoint, accessKey, secretKey string) ObjectStorage { - return &ufile{RestfulStorage{defaultObjectStorage{}, endpoint, accessKey, secretKey, "UCloud", ufileSigner}} +func newUFile(endpoint, accessKey, secretKey string) (ObjectStorage, error) { + return &ufile{RestfulStorage{DefaultObjectStorage{}, endpoint, accessKey, secretKey, "UCloud", ufileSigner}}, nil } func init() { - register("ufile", newUFile) + Register("ufile", newUFile) } diff --git a/object/wasabi.go b/object/wasabi.go index d4f131d44f237bebe57f292a837742951d5e4e8a..d2d42535d650484be30954f57a67abb037d40d9f 100644 --- a/object/wasabi.go +++ b/object/wasabi.go @@ -40,10 +40,10 @@ func (s *wasabi) Create() error { return err } -func newWasabi(endpoint, accessKey, secretKey string) ObjectStorage { +func newWasabi(endpoint, accessKey, secretKey string) (ObjectStorage, error) { uri, err := url.ParseRequestURI(endpoint) if err != nil { - logger.Fatalf("Invalid endpoint %s: %s", endpoint, err) + return nil, fmt.Errorf("Invalid endpoint %s: %s", endpoint, err) } ssl := strings.ToLower(uri.Scheme) == "https" hostParts := strings.Split(uri.Host, ".") @@ -61,9 +61,9 @@ func newWasabi(endpoint, accessKey, secretKey string) ObjectStorage { } ses := session.New(awsConfig) //.WithLogLevel(aws.LogDebugWithHTTPBody)) - return &wasabi{s3client{bucket, s3.New(ses), ses}} + return &wasabi{s3client{bucket, s3.New(ses), ses}}, nil } func init() { - register("wasabi", newWasabi) + Register("wasabi", newWasabi) } diff --git a/sync/cluster.go b/sync/cluster.go index 9b3bfebb76c5e231ed4e0b2b4e52a4e719685e69..26bdc6539f9b43e2f2dd6cb84440b33d77c05056 100644 --- a/sync/cluster.go +++ b/sync/cluster.go @@ -110,7 +110,7 @@ func findLocalIP() (string, error) { return "", errors.New("are you connected to the network?") } -func startManager(tasks chan *object.Object) string { +func startManager(tasks chan *object.Object) (string, error) { http.HandleFunc("/fetch", func(w http.ResponseWriter, req *http.Request) { var objs []*object.Object obj, ok := <-tasks @@ -164,38 +164,37 @@ func startManager(tasks chan *object.Object) string { }) l, err := net.Listen("tcp", "0.0.0.0:0") if err != nil { - logger.Fatalf("listen: %s", err) + return "", fmt.Errorf("listen: %s", err) } logger.Infof("Listen at %s", l.Addr()) go http.Serve(l, nil) ip, err := findLocalIP() if err != nil { - logger.Fatalf("find local ip: %s", err) + return "", fmt.Errorf("find local ip: %s", err) } ps := strings.Split(l.Addr().String(), ":") port := ps[len(ps)-1] - return fmt.Sprintf("%s:%s", ip, port) + return fmt.Sprintf("%s:%s", ip, port), nil } -func findSelfPath() string { +func findSelfPath() (string, error) { program := os.Args[0] if strings.Contains(program, "/") { path, err := filepath.Abs(program) if err != nil { - logger.Fatalf("resolve path %s: %s", program, err) + return "", fmt.Errorf("resolve path %s: %s", program, err) } - return path + return path, nil } for _, searchPath := range strings.Split(os.Getenv("PATH"), ":") { if searchPath != "" { p := filepath.Join(searchPath, program) if _, err := os.Stat(p); err == nil { - return p + return p, nil } } } - logger.Fatalf("can't find path for %s", program) - panic("") + return "", fmt.Errorf("can't find path for %s", program) } func launchWorker(address string, config *config.Config, wg *sync.WaitGroup) { @@ -205,10 +204,14 @@ func launchWorker(address string, config *config.Config, wg *sync.WaitGroup) { go func(host string) { defer wg.Done() // copy - path := findSelfPath() + path, err := findSelfPath() + if err != nil { + logger.Errorf("find self path: %s", err) + return + } rpath := "/tmp/juicesync" cmd := exec.Command("rsync", "-au", path, host+":"+rpath) - err := cmd.Run() + err = cmd.Run() if err != nil { // fallback to scp cmd = exec.Command("scp", path, host+":"+rpath) diff --git a/sync/cluster_test.go b/sync/cluster_test.go index 80dcb4e2b84cda64c8e080f1eda489c344cf2fe8..1053cbf81a7126276898cb630494d1e0f4d3255b 100644 --- a/sync/cluster_test.go +++ b/sync/cluster_test.go @@ -12,7 +12,10 @@ import ( func TestCluster(t *testing.T) { // manager todo := make(chan *object.Object, 100) - addr := startManager(todo) + addr, err := startManager(todo) + if err != nil { + t.Fatal(err) + } sendStats(addr) // worker var conf config.Config diff --git a/sync/sync.go b/sync/sync.go index da5668fdd4ad3a96494b929558d3ce72baf1c9a5..001fe32badb9e0bea327bf9beb87d0f71cfb3c8c 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -599,7 +599,10 @@ func Sync(src, dst object.ObjectStorage, config *config.Config) error { go showProgress() } if config.Workers != nil { - addr := startManager(todo) + addr, err := startManager(todo) + if err != nil { + return err + } launchWorker(addr, config, &wg) } } else { diff --git a/sync/sync_test.go b/sync/sync_test.go index 485b2c1366c5562727259b3a81c7cca751202699..e5513d3ba79cfa6cde42bbb087fb0eb88c9e1801 100644 --- a/sync/sync_test.go +++ b/sync/sync_test.go @@ -19,7 +19,7 @@ func collectAll(c <-chan *object.Object) []string { } func TestIterator(t *testing.T) { - m := object.CreateStorage("mem", "", "", "") + m, _ := object.CreateStorage("mem", "", "", "") m.Put("a", bytes.NewReader([]byte("a"))) m.Put("b", bytes.NewReader([]byte("a"))) m.Put("aa", bytes.NewReader([]byte("a"))) @@ -37,7 +37,7 @@ func TestIterator(t *testing.T) { } // Single object - s := object.CreateStorage("mem", "", "", "") + s, _ := object.CreateStorage("mem", "", "", "") s.Put("a", bytes.NewReader([]byte("a"))) ch, _ = iterate(s, "", "") keys = collectAll(ch) @@ -52,7 +52,7 @@ func TestIeratorSingleEmptyKey(t *testing.T) { // utils.SetLogLevel(logrus.DebugLevel) // Construct mem storage - s := object.CreateStorage("mem", "", "", "") + s, _ := object.CreateStorage("mem", "", "", "") err := s.Put("abc", bytes.NewReader([]byte("abc"))) if err != nil { t.Errorf("Put error: %q", err) @@ -60,7 +60,7 @@ func TestIeratorSingleEmptyKey(t *testing.T) { } // Simulate command line prefix in SRC or DST - s = object.WithPrefix(s, "abc") + s, _ = object.WithPrefix(s, "abc") ch, _ := iterate(s, "", "") keys := collectAll(ch) if !reflect.DeepEqual(keys, []string{""}) { @@ -87,12 +87,12 @@ func TestSync(t *testing.T) { Quiet: false, } - a := object.CreateStorage("mem", "a", "", "") + a, _ := object.CreateStorage("mem", "a", "", "") a.Put("a", bytes.NewReader([]byte("a"))) a.Put("ab", bytes.NewReader([]byte("ab"))) a.Put("abc", bytes.NewReader([]byte("abc"))) - b := object.CreateStorage("mem", "b", "", "") + b, _ := object.CreateStorage("mem", "b", "", "") b.Put("ba", bytes.NewReader([]byte("ba"))) // Copy "a" from mem://a to mem://b