提交 e5c3b3d6 编写于 作者: D Davies Liu

allow extention (#88)

上级 e1c5ba40
......@@ -37,7 +37,7 @@ func supportHTTPS(name, endpoint string) bool {
}
}
func createStorage(uri string, conf *config.Config) object.ObjectStorage {
func createStorage(uri string, conf *config.Config) (object.ObjectStorage, error) {
if !strings.Contains(uri, "://") {
if strings.Contains(uri, ":") {
var user string
......@@ -91,9 +91,9 @@ func createStorage(uri string, conf *config.Config) object.ObjectStorage {
endpoint = "http://" + endpoint
}
store := object.CreateStorage(name, endpoint, accessKey, secretKey)
if store == nil {
logger.Fatalf("Invalid storage type: %s", u.Scheme)
store, err := object.CreateStorage(name, endpoint, accessKey, secretKey)
if err != nil {
return nil, fmt.Errorf("create %s %s: %s", name, endpoint, err)
}
if conf.Perms {
if _, ok := store.(object.FileSystem); !ok {
......@@ -102,9 +102,9 @@ func createStorage(uri string, conf *config.Config) object.ObjectStorage {
}
}
if name != "file" && len(u.Path) > 1 {
store = object.WithPrefix(store, u.Path[1:])
store, _ = object.WithPrefix(store, u.Path[1:])
}
return store
return store, nil
}
func run(c *cli.Context) error {
......@@ -121,8 +121,14 @@ func run(c *cli.Context) error {
if strings.HasSuffix(c.Args().Get(0), "/") != strings.HasSuffix(c.Args().Get(1), "/") {
logger.Fatalf("SRC and DST should both end with '/' or not!")
}
src := createStorage(c.Args().Get(0), config)
dst := createStorage(c.Args().Get(1), config)
src, err := createStorage(c.Args().Get(0), config)
if err != nil {
return err
}
dst, err := createStorage(c.Args().Get(1), config)
if err != nil {
return err
}
return sync.Sync(src, dst, config)
}
......
......@@ -16,7 +16,7 @@ import (
)
type wasb struct {
defaultObjectStorage
DefaultObjectStorage
container *storage.Container
marker string
}
......@@ -137,10 +137,10 @@ func autoWasbEndpoint(containerName, accountName, accountKey string, useHTTPS bo
return endpoint, nil
}
func newWabs(endpoint, accountName, accountKey string) ObjectStorage {
func newWabs(endpoint, accountName, accountKey string) (ObjectStorage, error) {
uri, err := url.ParseRequestURI(endpoint)
if err != nil {
log.Fatalf("Invalid endpoint: %v, error: %v", endpoint, err)
return nil, fmt.Errorf("Invalid endpoint: %v, error: %v", endpoint, err)
}
hostParts := strings.SplitN(uri.Host, ".", 2)
......@@ -155,7 +155,7 @@ func newWabs(endpoint, accountName, accountKey string) ObjectStorage {
}
parts := strings.SplitN(item, "=", 2)
if len(parts) != 2 {
logger.Fatalf("Invalid connection string item: %s", item)
return nil, fmt.Errorf("Invalid connection string item: %s", item)
}
// Arguments from command line take precedence
if parts[0] == "DefaultEndpointsProtocol" && scheme == "" {
......@@ -178,7 +178,7 @@ func newWabs(endpoint, accountName, accountKey string) ObjectStorage {
domain = hostParts[1]
} else if domain == "" {
if domain, err = autoWasbEndpoint(name, accountName, accountKey, scheme == "https"); err != nil {
logger.Fatalf("Unable to get endpoint of container %s: %s", name, err)
return nil, fmt.Errorf("Unable to get endpoint of container %s: %s", name, err)
}
}
......@@ -188,9 +188,9 @@ func newWabs(endpoint, accountName, accountKey string) ObjectStorage {
}
service := client.GetBlobService()
container := service.GetContainerReference(name)
return &wasb{container: container}
return &wasb{container: container}, nil
}
func init() {
register("wasb", newWabs)
Register("wasb", newWabs)
}
......@@ -5,7 +5,6 @@ package object
import (
"fmt"
"io"
"log"
"net/url"
"strings"
......@@ -13,7 +12,7 @@ import (
)
type b2client struct {
defaultObjectStorage
DefaultObjectStorage
client *b2.Client
bucket *b2.Bucket
cursor *b2.Cursor
......@@ -103,16 +102,16 @@ func (c *b2client) List(prefix, marker string, limit int64) ([]*Object, error) {
return objs, nil
}
func newB2(endpoint, account, key string) ObjectStorage {
func newB2(endpoint, account, key string) (ObjectStorage, error) {
uri, err := url.ParseRequestURI(endpoint)
if err != nil {
logger.Fatalf("Invalid endpoint: %v, error: %v", endpoint, err)
return nil, fmt.Errorf("Invalid endpoint: %v, error: %v", endpoint, err)
}
hostParts := strings.Split(uri.Host, ".")
bucketName := hostParts[0]
client, err := b2.NewClient(ctx, account, key, b2.Transport(httpClient.Transport))
if err != nil {
log.Fatalf("Failed to create client: %v", err)
return nil, fmt.Errorf("Failed to create client: %v", err)
}
bucket, err := client.Bucket(ctx, bucketName)
if err != nil {
......@@ -120,12 +119,12 @@ func newB2(endpoint, account, key string) ObjectStorage {
Type: "allPrivate",
})
if err != nil {
log.Fatalf("Failed to create bucket: %v", err)
return nil, fmt.Errorf("Failed to create bucket: %v", err)
}
}
return &b2client{client: client, bucket: bucket}
return &b2client{client: client, bucket: bucket}, nil
}
func init() {
register("b2", newB2)
Register("b2", newB2)
}
......@@ -18,7 +18,7 @@ import (
const bosDefaultRegion = "bj"
type bosclient struct {
defaultObjectStorage
DefaultObjectStorage
bucket string
c *bos.Client
}
......@@ -177,10 +177,10 @@ func autoBOSEndpoint(bucketName, accessKey, secretKey string) (string, error) {
}
}
func newBOS(endpoint, accessKey, secretKey string) ObjectStorage {
func newBOS(endpoint, accessKey, secretKey string) (ObjectStorage, error) {
uri, err := url.ParseRequestURI(endpoint)
if err != nil {
logger.Fatalf("Invalid endpoint: %v, error: %v", endpoint, err)
return nil, fmt.Errorf("Invalid endpoint: %v, error: %v", endpoint, err)
}
hostParts := strings.SplitN(uri.Host, ".", 2)
bucketName := hostParts[0]
......@@ -195,7 +195,7 @@ func newBOS(endpoint, accessKey, secretKey string) ObjectStorage {
if len(hostParts) == 1 {
if endpoint, err = autoBOSEndpoint(bucketName, accessKey, secretKey); err != nil {
logger.Fatalf("Fail to get location of bucket %q: %s", bucketName, err)
return nil, fmt.Errorf("Fail to get location of bucket %q: %s", bucketName, err)
}
if !strings.HasPrefix(endpoint, "http") {
endpoint = fmt.Sprintf("%s://%s", uri.Scheme, endpoint)
......@@ -204,9 +204,9 @@ func newBOS(endpoint, accessKey, secretKey string) ObjectStorage {
}
bosClient, err := bos.NewClient(accessKey, secretKey, endpoint)
return &bosclient{bucket: bucketName, c: bosClient}
return &bosclient{bucket: bucketName, c: bosClient}, nil
}
func init() {
register("bos", newBOS)
Register("bos", newBOS)
}
......@@ -189,10 +189,10 @@ func autoCOSEndpoint(bucketName, accessKey, secretKey string) (string, error) {
return "", fmt.Errorf("bucket %q doesnot exist", bucketName)
}
func newCOS(endpoint, accessKey, secretKey string) ObjectStorage {
func newCOS(endpoint, accessKey, secretKey string) (ObjectStorage, error) {
uri, err := url.ParseRequestURI(endpoint)
if err != nil {
logger.Fatalf("Invalid endpoint %s: %s", endpoint, err)
return nil, fmt.Errorf("Invalid endpoint %s: %s", endpoint, err)
}
hostParts := strings.SplitN(uri.Host, ".", 2)
......@@ -203,10 +203,10 @@ func newCOS(endpoint, accessKey, secretKey string) ObjectStorage {
if len(hostParts) == 1 {
if endpoint, err = autoCOSEndpoint(hostParts[0], accessKey, secretKey); err != nil {
logger.Fatalf("Unable to get endpoint of bucket %s: %s", hostParts[0], err)
return nil, fmt.Errorf("Unable to get endpoint of bucket %s: %s", hostParts[0], err)
}
if uri, err = url.ParseRequestURI(endpoint); err != nil {
logger.Fatalf("Invalid endpoint %s: %s", endpoint, err)
return nil, fmt.Errorf("Invalid endpoint %s: %s", endpoint, err)
}
logger.Debugf("Use endpoint %q", endpoint)
}
......@@ -219,9 +219,9 @@ func newCOS(endpoint, accessKey, secretKey string) ObjectStorage {
},
})
client.UserAgent = UserAgent
return &COS{client, uri.Host}
return &COS{client, uri.Host}, nil
}
func init() {
register("cos", newCOS)
Register("cos", newCOS)
}
......@@ -4,6 +4,7 @@ package object
import (
"bytes"
"fmt"
"io"
"io/ioutil"
"os"
......@@ -20,7 +21,7 @@ const (
)
type filestore struct {
defaultObjectStorage
DefaultObjectStorage
root string
}
......@@ -301,22 +302,22 @@ func (d *filestore) Chown(path string, owner, group string) error {
return os.Chown(p, uid, gid)
}
func newDisk(root, accesskey, secretkey string) ObjectStorage {
func newDisk(root, accesskey, secretkey string) (ObjectStorage, error) {
if strings.HasSuffix(root, dirSuffix) {
logger.Debugf("Ensure dicectory %s", root)
if err := os.MkdirAll(root, 0755); err != nil {
logger.Fatalf("Creating directory %s failed: %q", root, err)
return nil, fmt.Errorf("Creating directory %s failed: %q", root, err)
}
} else {
dir := path.Dir(root)
logger.Debugf("Ensure dicectory %s", dir)
if err := os.MkdirAll(dir, 0755); err != nil {
logger.Fatalf("Creating directory %s failed: %q", dir, err)
return nil, fmt.Errorf("Creating directory %s failed: %q", dir, err)
}
}
return &filestore{root: root}
return &filestore{root: root}, nil
}
func init() {
register("file", newDisk)
Register("file", newDisk)
}
......@@ -37,7 +37,7 @@ func TestFsFile(t *testing.T) {
"xyz/",
"xyz/xyz.txt",
}
s0 := newDisk("/tmp/abc/unit-test/", "", "")
s0, _ := newDisk("/tmp/abc/unit-test/", "", "")
// initialize directory tree
for _, key := range keys {
if err := s0.Put(key, bytes.NewReader([]byte{})); err != nil {
......@@ -55,7 +55,7 @@ func TestFsFile(t *testing.T) {
}
}()
s := newDisk("/tmp/abc/unit-test/x/", "", "")
s, _ := newDisk("/tmp/abc/unit-test/x/", "", "")
objs, err := listAll(s, "", "", 100)
if err != nil {
t.Fatalf("list failed: %s", err)
......@@ -65,7 +65,7 @@ func TestFsFile(t *testing.T) {
t.Fatalf("testKeysEqual fail: %s", err)
}
s = newDisk("/tmp/abc/unit-test/x", "", "")
s, _ = newDisk("/tmp/abc/unit-test/x", "", "")
objs, err = listAll(s, "", "", 100)
if err != nil {
t.Fatalf("list failed: %s", err)
......@@ -75,7 +75,7 @@ func TestFsFile(t *testing.T) {
t.Fatalf("testKeysEqual fail: %s", err)
}
s = newDisk("/tmp/abc/unit-test/xy", "", "")
s, _ = newDisk("/tmp/abc/unit-test/xy", "", "")
objs, err = listAll(s, "", "", 100)
if err != nil {
t.Fatalf("list failed: %s", err)
......@@ -93,7 +93,7 @@ func TestFsSftp(t *testing.T) {
t.SkipNow()
}
sftpUser, sftpPass := os.Getenv("SFTP_USER"), os.Getenv("SFTP_PASS")
s0 := newSftp(sftpHost, sftpUser, sftpPass)
s0, _ := newSftp(sftpHost, sftpUser, sftpPass)
keys := []string{
"x/",
......@@ -119,7 +119,7 @@ func TestFsSftp(t *testing.T) {
}
}()
s := newSftp(sftpHost+"x/", sftpUser, sftpPass)
s, _ := newSftp(sftpHost+"x/", sftpUser, sftpPass)
objs, err := listAll(s, "", "", 100)
if err != nil {
t.Fatalf("list failed: %s", err)
......@@ -129,7 +129,7 @@ func TestFsSftp(t *testing.T) {
t.Fatalf("testKeysEqual fail: %s", err)
}
s = newSftp(sftpHost+"x", sftpUser, sftpPass)
s, _ = newSftp(sftpHost+"x", sftpUser, sftpPass)
objs, err = listAll(s, "", "", 100)
if err != nil {
t.Fatalf("list failed: %s", err)
......@@ -139,7 +139,7 @@ func TestFsSftp(t *testing.T) {
t.Fatalf("testKeysEqual fail: %s", err)
}
s = newSftp(sftpHost+"xy", sftpUser, sftpPass)
s, _ = newSftp(sftpHost+"xy", sftpUser, sftpPass)
objs, err = listAll(s, "", "", 100)
if err != nil {
t.Fatalf("list failed: %s", err)
......
......@@ -20,7 +20,7 @@ import (
var ctx = context.Background()
type gs struct {
defaultObjectStorage
DefaultObjectStorage
service *storage.Service
bucket string
region string
......@@ -135,10 +135,10 @@ func (g *gs) List(prefix, marker string, limit int64) ([]*Object, error) {
return objs, nil
}
func newGS(endpoint, accessKey, secretKey string) ObjectStorage {
func newGS(endpoint, accessKey, secretKey string) (ObjectStorage, error) {
uri, err := url.ParseRequestURI(endpoint)
if err != nil {
logger.Fatalf("Invalid endpoint: %v, error: %v", endpoint, err)
return nil, fmt.Errorf("Invalid endpoint: %v, error: %v", endpoint, err)
}
hostParts := strings.Split(uri.Host, ".")
bucket := hostParts[0]
......@@ -151,9 +151,9 @@ func newGS(endpoint, accessKey, secretKey string) ObjectStorage {
if err != nil {
log.Fatalf("Failed to create service: %v", err)
}
return &gs{service: service, bucket: bucket, region: region}
return &gs{service: service, bucket: bucket, region: region}, nil
}
func init() {
register("gs", newGS)
Register("gs", newGS)
}
......@@ -24,7 +24,7 @@ var superuser = "hdfs"
var supergroup = "supergroup"
type hdfsclient struct {
defaultObjectStorage
DefaultObjectStorage
addr string
c *hdfs.Client
}
......@@ -285,10 +285,10 @@ func (h *hdfsclient) Chown(key string, owner, group string) error {
return h.c.Chown(h.path(key), owner, group)
}
func newHDFS(addr, username, sk string) ObjectStorage {
func newHDFS(addr, username, sk string) (ObjectStorage, error) {
conf, err := hadoopconf.LoadFromEnvironment()
if err != nil {
logger.Fatalf("Problem loading configuration: %s", err)
return nil, fmt.Errorf("Problem loading configuration: %s", err)
}
options := hdfs.ClientOptionsFromConf(conf)
......@@ -299,7 +299,7 @@ func newHDFS(addr, username, sk string) ObjectStorage {
if options.KerberosClient != nil {
options.KerberosClient, err = getKerberosClient()
if err != nil {
logger.Fatalf("Problem with kerberos authentication: %s", err)
return nil, fmt.Errorf("Problem with kerberos authentication: %s", err)
}
} else {
if username == "" {
......@@ -308,7 +308,7 @@ func newHDFS(addr, username, sk string) ObjectStorage {
if username == "" {
current, err := user.Current()
if err != nil {
logger.Fatalf("get current user: %s", err)
return nil, fmt.Errorf("get current user: %s", err)
}
username = current.Username
}
......@@ -317,7 +317,7 @@ func newHDFS(addr, username, sk string) ObjectStorage {
c, err := hdfs.NewClient(options)
if err != nil {
logger.Fatalf("new HDFS client %s: %s", addr, err)
return nil, fmt.Errorf("new HDFS client %s: %s", addr, err)
}
if os.Getenv("HADOOP_SUPER_USER") != "" {
superuser = os.Getenv("HADOOP_SUPER_USER")
......@@ -326,9 +326,9 @@ func newHDFS(addr, username, sk string) ObjectStorage {
supergroup = os.Getenv("HADOOP_SUPER_GROUP")
}
return &hdfsclient{addr: addr, c: c}
return &hdfsclient{addr: addr, c: c}, nil
}
func init() {
register("hdfs", newHDFS)
Register("hdfs", newHDFS)
}
......@@ -215,7 +215,7 @@ func (s *ibmcos) ListUploads(marker string) ([]*PendingPart, string, error) {
return parts, nextMarker, nil
}
func newIBMCOS(endpoint, apiKey, serviceInstanceID string) ObjectStorage {
func newIBMCOS(endpoint, apiKey, serviceInstanceID string) (ObjectStorage, error) {
uri, _ := url.ParseRequestURI(endpoint)
hostParts := strings.Split(uri.Host, ".")
bucket := hostParts[0]
......@@ -230,9 +230,9 @@ func newIBMCOS(endpoint, apiKey, serviceInstanceID string) ObjectStorage {
WithS3ForcePathStyle(true)
sess := session.Must(session.NewSession())
client := s3.New(sess, conf)
return &ibmcos{bucket, client}
return &ibmcos{bucket, client}, nil
}
func init() {
register("ibmcos", newIBMCOS)
Register("ibmcos", newIBMCOS)
}
......@@ -32,7 +32,7 @@ func (j *jss) Copy(dst, src string) error {
return err
}
func newJSS(endpoint, accessKey, secretKey string) ObjectStorage {
func newJSS(endpoint, accessKey, secretKey string) (ObjectStorage, error) {
uri, _ := url.ParseRequestURI(endpoint)
ssl := strings.ToLower(uri.Scheme) == "https"
hostParts := strings.Split(uri.Host, ".")
......@@ -50,9 +50,9 @@ func newJSS(endpoint, accessKey, secretKey string) ObjectStorage {
}
ses := session.New(awsConfig) //.WithLogLevel(aws.LogDebugWithHTTPBody))
return &jss{s3client{bucket, s3.New(ses), ses}}
return &jss{s3client{bucket, s3.New(ses), ses}}, nil
}
func init() {
register("jss", newJSS)
Register("jss", newJSS)
}
......@@ -225,7 +225,7 @@ var ks3Regions = map[string]string{
"sgp": "SINGAPORE",
}
func newKS3(endpoint, accessKey, secretKey string) ObjectStorage {
func newKS3(endpoint, accessKey, secretKey string) (ObjectStorage, error) {
uri, _ := url.ParseRequestURI(endpoint)
ssl := strings.ToLower(uri.Scheme) == "https"
hostParts := strings.Split(uri.Host, ".")
......@@ -247,9 +247,9 @@ func newKS3(endpoint, accessKey, secretKey string) ObjectStorage {
Credentials: credentials.NewStaticCredentials(accessKey, secretKey, ""),
}
return &ks3{bucket, s3.New(awsConfig), nil}
return &ks3{bucket, s3.New(awsConfig), nil}, nil
}
func init() {
register("ks3", newKS3)
Register("ks3", newKS3)
}
......@@ -26,7 +26,7 @@ type obj struct {
type memStore struct {
sync.Mutex
defaultObjectStorage
DefaultObjectStorage
name string
objects map[string]*obj
}
......@@ -185,12 +185,12 @@ func (m *memStore) List(prefix, marker string, limit int64) ([]*Object, error) {
return objs, nil
}
func newMem(endpoint, accesskey, secretkey string) ObjectStorage {
func newMem(endpoint, accesskey, secretkey string) (ObjectStorage, error) {
store := &memStore{name: endpoint}
store.objects = make(map[string]*obj)
return store
return store, nil
}
func init() {
register("mem", newMem)
Register("mem", newMem)
}
......@@ -135,7 +135,7 @@ func (c *mss) List(prefix, marker string, limit int64) ([]*Object, error) {
return objs, nil
}
func newMSS(endpoint, accessKey, secretKey string) ObjectStorage {
func newMSS(endpoint, accessKey, secretKey string) (ObjectStorage, error) {
qs := &mss{RestfulStorage{
endpoint: endpoint,
accessKey: accessKey,
......@@ -143,9 +143,9 @@ func newMSS(endpoint, accessKey, secretKey string) ObjectStorage {
signName: "AWS",
signer: mssSigner,
}}
return qs
return qs, nil
}
func init() {
register("mss", newMSS)
Register("mss", newMSS)
}
......@@ -18,7 +18,7 @@ import (
)
type nos struct {
defaultObjectStorage
DefaultObjectStorage
bucket string
client *nosclient.NosClient
}
......@@ -134,10 +134,10 @@ func (s *nos) List(prefix, marker string, limit int64) ([]*Object, error) {
return objs, nil
}
func newNOS(endpoint, accessKey, secretKey string) ObjectStorage {
func newNOS(endpoint, accessKey, secretKey string) (ObjectStorage, error) {
uri, err := url.ParseRequestURI(endpoint)
if err != nil {
logger.Fatalf("Invalid endpoint: %v, error: %v", endpoint, err)
return nil, fmt.Errorf("Invalid endpoint: %v, error: %v", endpoint, err)
}
hostParts := strings.SplitN(uri.Host, ".", 2)
bucket := hostParts[0]
......@@ -156,9 +156,9 @@ func newNOS(endpoint, accessKey, secretKey string) ObjectStorage {
nosClient, _ := nosclient.New(conf)
return &nos{bucket: bucket, client: nosClient}
return &nos{bucket: bucket, client: nosClient}, nil
}
func init() {
register("nos", newNOS)
Register("nos", newNOS)
}
......@@ -76,51 +76,51 @@ type FileSystem interface {
var notSupported = errors.New("not supported")
type defaultObjectStorage struct{}
type DefaultObjectStorage struct{}
func (s defaultObjectStorage) Create() error {
func (s DefaultObjectStorage) Create() error {
return nil
}
func (s defaultObjectStorage) CreateMultipartUpload(key string) (*MultipartUpload, error) {
func (s DefaultObjectStorage) CreateMultipartUpload(key string) (*MultipartUpload, error) {
return nil, notSupported
}
func (s defaultObjectStorage) UploadPart(key string, uploadID string, num int, body []byte) (*Part, error) {
func (s DefaultObjectStorage) UploadPart(key string, uploadID string, num int, body []byte) (*Part, error) {
return nil, notSupported
}
func (s defaultObjectStorage) AbortUpload(key string, uploadID string) {}
func (s DefaultObjectStorage) AbortUpload(key string, uploadID string) {}
func (s defaultObjectStorage) CompleteUpload(key string, uploadID string, parts []*Part) error {
func (s DefaultObjectStorage) CompleteUpload(key string, uploadID string, parts []*Part) error {
return notSupported
}
func (s defaultObjectStorage) ListUploads(marker string) ([]*PendingPart, string, error) {
func (s DefaultObjectStorage) ListUploads(marker string) ([]*PendingPart, string, error) {
return nil, "", nil
}
func (s defaultObjectStorage) List(prefix, marker string, limit int64) ([]*Object, error) {
func (s DefaultObjectStorage) List(prefix, marker string, limit int64) ([]*Object, error) {
return nil, notSupported
}
func (s defaultObjectStorage) ListAll(prefix, marker string) (<-chan *Object, error) {
func (s DefaultObjectStorage) ListAll(prefix, marker string) (<-chan *Object, error) {
return nil, notSupported
}
type Register func(endpoint, accessKey, secretKey string) ObjectStorage
type Creator func(bucket, accessKey, secretKey string) (ObjectStorage, error)
var storages = make(map[string]Register)
var storages = make(map[string]Creator)
func register(name string, register Register) {
func Register(name string, register Creator) {
storages[name] = register
}
func CreateStorage(name, endpoint, accessKey, secretKey string) ObjectStorage {
func CreateStorage(name, endpoint, accessKey, secretKey string) (ObjectStorage, error) {
f, ok := storages[name]
if ok {
logger.Debugf("Creating %s storage at endpoint %s", name, endpoint)
return f(endpoint, accessKey, secretKey)
}
panic(fmt.Sprintf("invalid storage: %s", name))
return nil, fmt.Errorf("invalid storage: %s", name)
}
......@@ -47,7 +47,7 @@ func testStorage(t *testing.T, s ObjectStorage) {
t.Fatalf("Can't create bucket %s: %s", s, err)
}
s = WithPrefix(s, "unit-test")
s, _ = WithPrefix(s, "unit-test")
defer s.Delete("/test")
k := "/large"
defer s.Delete(k)
......@@ -179,12 +179,12 @@ func testStorage(t *testing.T, s ObjectStorage) {
}
func TestMem(t *testing.T) {
m := newMem("", "", "")
m, _ := newMem("", "", "")
testStorage(t, m)
}
func TestDisk(t *testing.T) {
s := newDisk("/tmp/abc/", "", "")
s, _ := newDisk("/tmp/abc/", "", "")
testStorage(t, s)
}
......@@ -192,7 +192,7 @@ func TestQingStor(t *testing.T) {
if os.Getenv("QY_ACCESS_KEY") == "" {
t.SkipNow()
}
s := newQingStor("https://test.pek3a.qingstor.com",
s, _ := newQingStor("https://test.pek3a.qingstor.com",
os.Getenv("QY_ACCESS_KEY"), os.Getenv("QY_SECRET_KEY"))
testStorage(t, s)
}
......@@ -201,7 +201,7 @@ func TestS3(t *testing.T) {
if os.Getenv("AWS_ACCESS_KEY_ID") == "" {
t.SkipNow()
}
s := newS3(fmt.Sprintf("https://%s", os.Getenv("S3_TEST_BUCKET")),
s, _ := newS3(fmt.Sprintf("https://%s", os.Getenv("S3_TEST_BUCKET")),
os.Getenv("AWS_ACCESS_KEY_ID"), os.Getenv("AWS_SECRET_ACCESS_KEY"))
testStorage(t, s)
}
......@@ -214,7 +214,7 @@ func TestOSS(t *testing.T) {
if b := os.Getenv("OSS_TEST_BUCKET"); b != "" {
bucketName = b
}
s := newOSS(fmt.Sprintf("https://%s", bucketName),
s, _ := newOSS(fmt.Sprintf("https://%s", bucketName),
os.Getenv("ALICLOUD_ACCESS_KEY_ID"), os.Getenv("ALICLOUD_ACCESS_KEY_SECRET"))
testStorage(t, s)
}
......@@ -223,7 +223,7 @@ func TestUFile(t *testing.T) {
if os.Getenv("UCLOUD_PUBLIC_KEY") == "" {
t.SkipNow()
}
ufile := newUFile("https://test.us-ca.ufileos.com",
ufile, _ := newUFile("https://test.us-ca.ufileos.com",
os.Getenv("UCLOUD_PUBLIC_KEY"), os.Getenv("UCLOUD_PRIVATE_KEY"))
testStorage(t, ufile)
}
......@@ -233,7 +233,7 @@ func TestGS(t *testing.T) {
t.SkipNow()
}
os.Setenv("GOOGLE_CLOUD_PROJECT", "davies-test")
gs := newGS("https://test.us-west1.googleapi.com", "", "")
gs, _ := newGS("https://test.us-west1.googleapi.com", "", "")
testStorage(t, gs)
}
......@@ -241,10 +241,10 @@ func TestQiniu(t *testing.T) {
if os.Getenv("QINIU_ACCESS_KEY") == "" {
t.SkipNow()
}
qiniu := newQiniu("https://test.cn-east-1-s3.qiniu.com",
qiniu, _ := newQiniu("https://test.cn-east-1-s3.qiniu.com",
os.Getenv("QINIU_ACCESS_KEY"), os.Getenv("QINIU_SECRET_KEY"))
testStorage(t, qiniu)
qiniu = newQiniu("https://test.cn-north-1-s3.qiniu.com",
qiniu, _ = newQiniu("https://test.cn-north-1-s3.qiniu.com",
os.Getenv("QINIU_ACCESS_KEY"), os.Getenv("QINIU_SECRET_KEY"))
testStorage(t, qiniu)
}
......@@ -253,7 +253,7 @@ func TestKS3(t *testing.T) {
if os.Getenv("KS3_ACCESS_KEY") == "" {
t.SkipNow()
}
ks3 := newKS3("https://test.kss.ksyun.com",
ks3, _ := newKS3("https://test.kss.ksyun.com",
os.Getenv("KS3_ACCESS_KEY"), os.Getenv("KS3_SECRET_KEY"))
testStorage(t, ks3)
}
......@@ -262,7 +262,7 @@ func TestCOS(t *testing.T) {
if os.Getenv("COS_SECRETID") == "" {
t.SkipNow()
}
cos := newCOS(
cos, _ := newCOS(
fmt.Sprintf("https://%s", os.Getenv("COS_TEST_BUCKET")),
os.Getenv("COS_SECRETID"), os.Getenv("COS_SECRETKEY"))
testStorage(t, cos)
......@@ -272,7 +272,7 @@ func TestAzure(t *testing.T) {
if os.Getenv("AZURE_STORAGE_ACCOUNT") == "" {
t.SkipNow()
}
abs := newWabs("https://test-chunk.core.chinacloudapi.cn",
abs, _ := newWabs("https://test-chunk.core.chinacloudapi.cn",
os.Getenv("AZURE_STORAGE_ACCOUNT"), os.Getenv("AZURE_STORAGE_KEY"))
testStorage(t, abs)
}
......@@ -281,7 +281,7 @@ func TestNOS(t *testing.T) {
if os.Getenv("NOS_ACCESS_KEY") == "" {
t.SkipNow()
}
nos := newNOS("https://test.nos-eastchina1.126.net",
nos, _ := newNOS("https://test.nos-eastchina1.126.net",
os.Getenv("NOS_ACCESS_KEY"), os.Getenv("NOS_SECRET_KEY"))
testStorage(t, nos)
}
......@@ -290,7 +290,7 @@ func TestMSS(t *testing.T) {
if os.Getenv("MSS_ACCESS_KEY") == "" {
t.SkipNow()
}
mss := newMSS("https://test.mtmss.com",
mss, _ := newMSS("https://test.mtmss.com",
os.Getenv("MSS_ACCESS_KEY"), os.Getenv("MSS_SECRET_KEY"))
testStorage(t, mss)
}
......@@ -299,7 +299,7 @@ func TestJSS(t *testing.T) {
if os.Getenv("JSS_ACCESS_KEY") == "" {
t.SkipNow()
}
jss := newJSS("https://test.s3.cn-north-1.jcloudcs.com",
jss, _ := newJSS("https://test.s3.cn-north-1.jcloudcs.com",
os.Getenv("JSS_ACCESS_KEY"), os.Getenv("JSS_SECRET_KEY"))
testStorage(t, jss)
}
......@@ -308,7 +308,7 @@ func TestSpeedy(t *testing.T) {
if os.Getenv("SPEEDY_ACCESS_KEY") == "" {
t.SkipNow()
}
cos := newSpeedy("https://test.oss-cn-beijing.speedycloud.org",
cos, _ := newSpeedy("https://test.oss-cn-beijing.speedycloud.org",
os.Getenv("SPEEDY_ACCESS_KEY"), os.Getenv("SPEEDY_SECRET_KEY"))
testStorage(t, cos)
}
......@@ -317,7 +317,7 @@ func TestB2(t *testing.T) {
if os.Getenv("B2_ACCOUNT_ID") == "" {
t.SkipNow()
}
b := newB2("https://test.backblaze.com", os.Getenv("B2_ACCOUNT_ID"), os.Getenv("B2_APP_KEY"))
b, _ := newB2("https://test.backblaze.com", os.Getenv("B2_ACCOUNT_ID"), os.Getenv("B2_APP_KEY"))
testStorage(t, b)
}
......@@ -325,7 +325,7 @@ func TestSpace(t *testing.T) {
if os.Getenv("SPACE_ACCESS_KEY") == "" {
t.SkipNow()
}
b := newSpace("https://test.nyc3.digitaloceanspaces.com", os.Getenv("SPACE_ACCESS_KEY"), os.Getenv("SPACE_SECRET_KEY"))
b, _ := newSpace("https://test.nyc3.digitaloceanspaces.com", os.Getenv("SPACE_ACCESS_KEY"), os.Getenv("SPACE_SECRET_KEY"))
testStorage(t, b)
}
......@@ -333,7 +333,7 @@ func TestBOS(t *testing.T) {
if os.Getenv("BDCLOUD_ACCESS_KEY") == "" {
t.SkipNow()
}
b := newBOS(fmt.Sprintf("https://%s", os.Getenv("BOS_TEST_BUCKET")),
b, _ := newBOS(fmt.Sprintf("https://%s", os.Getenv("BOS_TEST_BUCKET")),
os.Getenv("BDCLOUD_ACCESS_KEY"), os.Getenv("BDCLOUD_SECRET_KEY"))
testStorage(t, b)
}
......@@ -342,7 +342,7 @@ func TestSftp(t *testing.T) {
if os.Getenv("SFTP_HOST") == "" {
t.SkipNow()
}
b := newSftp(os.Getenv("SFTP_HOST"), os.Getenv("SFTP_USER"), os.Getenv("SFTP_PASS"))
b, _ := newSftp(os.Getenv("SFTP_HOST"), os.Getenv("SFTP_USER"), os.Getenv("SFTP_PASS"))
testStorage(t, b)
}
......@@ -350,7 +350,7 @@ func TestOBS(t *testing.T) {
if os.Getenv("HWCLOUD_ACCESS_KEY") == "" {
t.SkipNow()
}
b := newOBS(fmt.Sprintf("https://%s", os.Getenv("OBS_TEST_BUCKET")),
b, _ := newOBS(fmt.Sprintf("https://%s", os.Getenv("OBS_TEST_BUCKET")),
os.Getenv("HWCLOUD_ACCESS_KEY"), os.Getenv("HWCLOUD_SECRET_KEY"))
testStorage(t, b)
}
......@@ -359,7 +359,7 @@ func TestHDFS(t *testing.T) {
if os.Getenv("HDFS_ADDR") == "" {
t.Skip()
}
dfs := newHDFS(os.Getenv("HDFS_ADDR"), "", "")
dfs, _ := newHDFS(os.Getenv("HDFS_ADDR"), "", "")
testStorage(t, dfs)
}
......@@ -367,7 +367,7 @@ func TestOOS(t *testing.T) {
if os.Getenv("OOS_ACCESS_KEY") == "" {
t.SkipNow()
}
b := newOOS(fmt.Sprintf("https://%s", os.Getenv("OOS_TEST_BUCKET")),
b, _ := newOOS(fmt.Sprintf("https://%s", os.Getenv("OOS_TEST_BUCKET")),
os.Getenv("OOS_ACCESS_KEY"), os.Getenv("OOS_SECRET_KEY"))
testStorage(t, b)
}
......@@ -228,10 +228,10 @@ func autoOBSEndpoint(bucketName, accessKey, secretKey string) (string, error) {
return "", fmt.Errorf("bucket %q does not exist", bucketName)
}
func newOBS(endpoint, accessKey, secretKey string) ObjectStorage {
func newOBS(endpoint, accessKey, secretKey string) (ObjectStorage, error) {
uri, err := url.ParseRequestURI(endpoint)
if err != nil {
logger.Fatalf("Invalid endpoint %s: %s", endpoint, err)
return nil, fmt.Errorf("Invalid endpoint %s: %s", endpoint, err)
}
hostParts := strings.SplitN(uri.Host, ".", 2)
bucketName := hostParts[0]
......@@ -247,7 +247,7 @@ func newOBS(endpoint, accessKey, secretKey string) ObjectStorage {
var region string
if len(hostParts) == 1 {
if endpoint, err = autoOBSEndpoint(bucketName, accessKey, secretKey); err != nil {
logger.Fatalf("Cannot get location of bucket %q: %s", bucketName, err)
return nil, fmt.Errorf("Cannot get location of bucket %q: %s", bucketName, err)
}
if !strings.HasPrefix(endpoint, "http") {
endpoint = fmt.Sprintf("%s://%s", uri.Scheme, endpoint)
......@@ -258,11 +258,11 @@ func newOBS(endpoint, accessKey, secretKey string) ObjectStorage {
c, err := obs.New(accessKey, secretKey, endpoint)
if err != nil {
logger.Fatalf("Fail to initialize OBS: %s", err)
return nil, fmt.Errorf("Fail to initialize OBS: %s", err)
}
return &obsClient{bucketName, region, c}
return &obsClient{bucketName, region, c}, nil
}
func init() {
register("obs", newOBS)
Register("obs", newOBS)
}
......@@ -40,10 +40,10 @@ func (s *oos) List(prefix, marker string, limit int64) ([]*Object, error) {
return objs, err
}
func newOOS(endpoint, accessKey, secretKey string) ObjectStorage {
func newOOS(endpoint, accessKey, secretKey string) (ObjectStorage, error) {
uri, err := url.ParseRequestURI(endpoint)
if err != nil {
logger.Fatalf("Invalid endpoint %s: %s", endpoint, err)
return nil, fmt.Errorf("Invalid endpoint %s: %s", endpoint, err)
}
ssl := strings.ToLower(uri.Scheme) == "https"
hostParts := strings.Split(uri.Host, ".")
......@@ -57,13 +57,13 @@ func newOOS(endpoint, accessKey, secretKey string) ObjectStorage {
DisableSSL: aws.Bool(!ssl),
S3ForcePathStyle: aws.Bool(true),
// HTTPClient: httpClient,
Credentials: credentials.NewStaticCredentials(accessKey, secretKey, ""),
Credentials: credentials.NewStaticCredentials(accessKey, secretKey, ""),
}
ses := session.New(awsConfig)
return &oos{s3client{bucket, s3.New(ses), ses}}
return &oos{s3client{bucket, s3.New(ses), ses}}, nil
}
func init() {
register("oos", newOOS)
Register("oos", newOOS)
}
......@@ -285,10 +285,10 @@ func autoOSSEndpoint(bucketName, accessKey, secretKey, securityToken string) (st
return fmt.Sprintf("https://%s.aliyuncs.com", bucketLocation), nil
}
func newOSS(endpoint, accessKey, secretKey string) ObjectStorage {
func newOSS(endpoint, accessKey, secretKey string) (ObjectStorage, error) {
uri, err := url.ParseRequestURI(endpoint)
if err != nil {
logger.Fatalf("Invalid endpoint: %v, error: %v", endpoint, err)
return nil, fmt.Errorf("Invalid endpoint: %v, error: %v", endpoint, err)
}
hostParts := strings.SplitN(uri.Host, ".", 2)
bucketName := hostParts[0]
......@@ -307,7 +307,7 @@ func newOSS(endpoint, accessKey, secretKey string) ObjectStorage {
if accessKey == "" {
if cred, err := fetchStsToken(); err != nil {
logger.Fatalf("No credential provided for OSS")
return nil, fmt.Errorf("No credential provided for OSS")
} else {
accessKey = cred.AccessKeyId
secretKey = cred.AccessKeySecret
......@@ -318,7 +318,7 @@ func newOSS(endpoint, accessKey, secretKey string) ObjectStorage {
if domain == "" {
if domain, err = autoOSSEndpoint(bucketName, accessKey, secretKey, securityToken); err != nil {
logger.Fatalf("Unable to get endpoint of bucket %s: %s", bucketName, err)
return nil, fmt.Errorf("Unable to get endpoint of bucket %s: %s", bucketName, err)
}
logger.Debugf("Use endpoint %q", domain)
}
......@@ -345,7 +345,7 @@ func newOSS(endpoint, accessKey, secretKey string) ObjectStorage {
}()
}
if err != nil {
logger.Fatalf("Cannot create OSS client with endpoint %s: %s", endpoint, err)
return nil, fmt.Errorf("Cannot create OSS client with endpoint %s: %s", endpoint, err)
}
client.Config.Timeout = 10
......@@ -357,12 +357,12 @@ func newOSS(endpoint, accessKey, secretKey string) ObjectStorage {
bucket, err := client.Bucket(bucketName)
if err != nil {
logger.Fatalf("Cannot create bucket %s: %s", bucketName, err)
return nil, fmt.Errorf("Cannot create bucket %s: %s", bucketName, err)
}
return &ossClient{client: client, bucket: bucket}
return &ossClient{client: client, bucket: bucket}, nil
}
func init() {
register("oss", newOSS)
Register("oss", newOSS)
}
......@@ -14,8 +14,8 @@ type withPrefix struct {
}
// WithPrefix retuns a object storage that add a prefix to keys.
func WithPrefix(os ObjectStorage, prefix string) ObjectStorage {
return &withPrefix{os, prefix}
func WithPrefix(os ObjectStorage, prefix string) (ObjectStorage, error) {
return &withPrefix{os, prefix}, nil
}
func (p *withPrefix) String() string {
......
......@@ -229,10 +229,10 @@ func (q *qingstor) ListUploads(marker string) ([]*PendingPart, string, error) {
return parts, *result.NextKeyMarker, nil
}
func newQingStor(endpoint, accessKey, secretKey string) ObjectStorage {
func newQingStor(endpoint, accessKey, secretKey string) (ObjectStorage, error) {
uri, err := url.ParseRequestURI(endpoint)
if err != nil {
logger.Fatalf("Invalid endpoint: %v, error: %v", endpoint, err)
return nil, fmt.Errorf("Invalid endpoint: %v, error: %v", endpoint, err)
}
hostParts := strings.SplitN(uri.Host, ".", 3)
bucketName := hostParts[0]
......@@ -240,7 +240,7 @@ func newQingStor(endpoint, accessKey, secretKey string) ObjectStorage {
conf, err := config.New(accessKey, secretKey)
if err != nil {
logger.Fatalf("Can't load config: %s", err.Error())
return nil, fmt.Errorf("Can't load config: %s", err.Error())
}
conf.Protocol = uri.Scheme
if uri.Scheme == "http" {
......@@ -251,9 +251,9 @@ func newQingStor(endpoint, accessKey, secretKey string) ObjectStorage {
conf.Connection = httpClient
qsService, _ := qs.Init(conf)
bucket, _ := qsService.Bucket(bucketName, zone)
return &qingstor{bucket: bucket}
return &qingstor{bucket: bucket}, nil
}
func init() {
register("qingstor", newQingStor)
Register("qingstor", newQingStor)
}
......@@ -144,10 +144,10 @@ var publicRegions = map[string]*storage.Zone{
"ap-southeast-1": &storage.ZoneXinjiapo,
}
func newQiniu(endpoint, accessKey, secretKey string) ObjectStorage {
func newQiniu(endpoint, accessKey, secretKey string) (ObjectStorage, error) {
uri, err := url.ParseRequestURI(endpoint)
if err != nil {
logger.Fatalf("Invalid endpoint: %v, error: %v", endpoint, err)
return nil, fmt.Errorf("Invalid endpoint: %v, error: %v", endpoint, err)
}
hostParts := strings.SplitN(uri.Host, ".", 2)
bucket := hostParts[0]
......@@ -195,9 +195,9 @@ func newQiniu(endpoint, accessKey, secretKey string) ObjectStorage {
cfg.Zone = zone
mac := qbox.NewMac(accessKey, secretKey)
bucketManager := storage.NewBucketManager(mac, &cfg)
return &qiniu{s3client, bucketManager, mac, &cfg, ""}
return &qiniu{s3client, bucketManager, mac, &cfg, ""}, nil
}
func init() {
register("qiniu", newQiniu)
Register("qiniu", newQiniu)
}
......@@ -60,7 +60,7 @@ func cleanup(response *http.Response) {
}
type RestfulStorage struct {
defaultObjectStorage
DefaultObjectStorage
endpoint string
accessKey string
secretKey string
......
......@@ -270,10 +270,10 @@ func autoS3Region(bucketName, accessKey, secretKey string) (string, error) {
return "", err
}
func newS3(endpoint, accessKey, secretKey string) ObjectStorage {
func newS3(endpoint, accessKey, secretKey string) (ObjectStorage, error) {
uri, err := url.ParseRequestURI(endpoint)
if err != nil {
logger.Fatalf("Invalid endpoint %s: %s", endpoint, err.Error())
return nil, fmt.Errorf("Invalid endpoint %s: %s", endpoint, err.Error())
}
hostParts := strings.SplitN(uri.Host, ".", 2)
......@@ -286,7 +286,7 @@ func newS3(endpoint, accessKey, secretKey string) ObjectStorage {
if len(hostParts) == 1 { // take endpoint as bucketname
bucketName = hostParts[0]
if region, err = autoS3Region(bucketName, accessKey, secretKey); err != nil {
logger.Fatalf("Can't guess your region for bucket %s: %s", bucketName, err)
return nil, fmt.Errorf("Can't guess your region for bucket %s: %s", bucketName, err)
}
} else { // get region in endpoint
if strings.Contains(uri.Host, ".amazonaws.com") {
......@@ -335,11 +335,11 @@ func newS3(endpoint, accessKey, secretKey string) ObjectStorage {
ses, err := session.NewSession(awsConfig) //.WithLogLevel(aws.LogDebugWithHTTPBody))
if err != nil {
logger.Fatalf("Fail to create aws session: %s", err)
return nil, fmt.Errorf("Fail to create aws session: %s", err)
}
return &s3client{bucketName, s3.New(ses), ses}
return &s3client{bucketName, s3.New(ses), ses}, nil
}
func init() {
register("s3", newS3)
Register("s3", newS3)
}
......@@ -54,7 +54,7 @@ func (c *conn) closed() error {
}
type sftpStore struct {
defaultObjectStorage
DefaultObjectStorage
host string
root string
config *ssh.ClientConfig
......@@ -372,7 +372,7 @@ func (f *sftpStore) ListAll(prefix, marker string) (<-chan *Object, error) {
return listed, nil
}
func newSftp(endpoint, user, pass string) ObjectStorage {
func newSftp(endpoint, user, pass string) (ObjectStorage, error) {
parts := strings.Split(endpoint, ":")
root := filepath.Clean(parts[1])
// append suffix `/` removed by filepath.Clean()
......@@ -394,12 +394,12 @@ func newSftp(endpoint, user, pass string) ObjectStorage {
if privateKeyPath := os.Getenv("SSH_PRIVATE_KEY_PATH"); privateKeyPath != "" {
key, err := ioutil.ReadFile(privateKeyPath)
if err != nil {
logger.Fatalf("unable to read private key, error: %v", err)
return nil, fmt.Errorf("unable to read private key, error: %v", err)
}
signer, err := ssh.ParsePrivateKey(key)
if err != nil {
logger.Fatalf("unable to parse private key, error: %v", err)
return nil, fmt.Errorf("unable to parse private key, error: %v", err)
}
config.Auth = append(config.Auth, ssh.PublicKeys(signer))
......@@ -414,26 +414,26 @@ func newSftp(endpoint, user, pass string) ObjectStorage {
c, err := f.getSftpConnection()
if err != nil {
logger.Errorf("getSftpConnection failed: %s", err)
return nil
return nil, err
}
defer f.putSftpConnection(&c, err)
if strings.HasSuffix(root, dirSuffix) {
logger.Debugf("Ensure dicectory %s", root)
if err := c.sftpClient.MkdirAll(root); err != nil {
logger.Fatalf("Creating directory %s failed: %q", root, err)
return nil, fmt.Errorf("Creating directory %s failed: %q", root, err)
}
} else {
dir := filepath.Dir(root)
logger.Debugf("Ensure dicectory %s", dir)
if err := c.sftpClient.MkdirAll(dir); err != nil {
logger.Fatalf("Creating directory %s failed: %q", dir, err)
return nil, fmt.Errorf("Creating directory %s failed: %q", dir, err)
}
}
return f
return f, nil
}
func init() {
register("sftp", newSftp)
Register("sftp", newSftp)
}
......@@ -40,7 +40,7 @@ func (s *space) Create() error {
return err
}
func newSpace(endpoint, accessKey, secretKey string) ObjectStorage {
func newSpace(endpoint, accessKey, secretKey string) (ObjectStorage, error) {
uri, _ := url.ParseRequestURI(endpoint)
ssl := strings.ToLower(uri.Scheme) == "https"
hostParts := strings.Split(uri.Host, ".")
......@@ -58,9 +58,9 @@ func newSpace(endpoint, accessKey, secretKey string) ObjectStorage {
}
ses := session.New(awsConfig) //.WithLogLevel(aws.LogDebugWithHTTPBody))
return &space{s3client{bucket, s3.New(ses), ses}}
return &space{s3client{bucket, s3.New(ses), ses}}, nil
}
func init() {
register("space", newSpace)
Register("space", newSpace)
}
......@@ -71,16 +71,16 @@ func (s *speedy) List(prefix, marker string, limit int64) ([]*Object, error) {
return objs, nil
}
func newSpeedy(endpoint, accessKey, secretKey string) ObjectStorage {
func newSpeedy(endpoint, accessKey, secretKey string) (ObjectStorage, error) {
return &speedy{RestfulStorage{
endpoint: endpoint,
accessKey: accessKey,
secretKey: secretKey,
signName: "AWS",
signer: sign,
}}
}}, nil
}
func init() {
register("speedy", newSpeedy)
Register("speedy", newSpeedy)
}
......@@ -252,10 +252,10 @@ func (u *ufile) ListUploads(marker string) ([]*PendingPart, string, error) {
return parts, out.NextMarker, nil
}
func newUFile(endpoint, accessKey, secretKey string) ObjectStorage {
return &ufile{RestfulStorage{defaultObjectStorage{}, endpoint, accessKey, secretKey, "UCloud", ufileSigner}}
func newUFile(endpoint, accessKey, secretKey string) (ObjectStorage, error) {
return &ufile{RestfulStorage{DefaultObjectStorage{}, endpoint, accessKey, secretKey, "UCloud", ufileSigner}}, nil
}
func init() {
register("ufile", newUFile)
Register("ufile", newUFile)
}
......@@ -40,10 +40,10 @@ func (s *wasabi) Create() error {
return err
}
func newWasabi(endpoint, accessKey, secretKey string) ObjectStorage {
func newWasabi(endpoint, accessKey, secretKey string) (ObjectStorage, error) {
uri, err := url.ParseRequestURI(endpoint)
if err != nil {
logger.Fatalf("Invalid endpoint %s: %s", endpoint, err)
return nil, fmt.Errorf("Invalid endpoint %s: %s", endpoint, err)
}
ssl := strings.ToLower(uri.Scheme) == "https"
hostParts := strings.Split(uri.Host, ".")
......@@ -61,9 +61,9 @@ func newWasabi(endpoint, accessKey, secretKey string) ObjectStorage {
}
ses := session.New(awsConfig) //.WithLogLevel(aws.LogDebugWithHTTPBody))
return &wasabi{s3client{bucket, s3.New(ses), ses}}
return &wasabi{s3client{bucket, s3.New(ses), ses}}, nil
}
func init() {
register("wasabi", newWasabi)
Register("wasabi", newWasabi)
}
......@@ -110,7 +110,7 @@ func findLocalIP() (string, error) {
return "", errors.New("are you connected to the network?")
}
func startManager(tasks chan *object.Object) string {
func startManager(tasks chan *object.Object) (string, error) {
http.HandleFunc("/fetch", func(w http.ResponseWriter, req *http.Request) {
var objs []*object.Object
obj, ok := <-tasks
......@@ -164,38 +164,37 @@ func startManager(tasks chan *object.Object) string {
})
l, err := net.Listen("tcp", "0.0.0.0:0")
if err != nil {
logger.Fatalf("listen: %s", err)
return "", fmt.Errorf("listen: %s", err)
}
logger.Infof("Listen at %s", l.Addr())
go http.Serve(l, nil)
ip, err := findLocalIP()
if err != nil {
logger.Fatalf("find local ip: %s", err)
return "", fmt.Errorf("find local ip: %s", err)
}
ps := strings.Split(l.Addr().String(), ":")
port := ps[len(ps)-1]
return fmt.Sprintf("%s:%s", ip, port)
return fmt.Sprintf("%s:%s", ip, port), nil
}
func findSelfPath() string {
func findSelfPath() (string, error) {
program := os.Args[0]
if strings.Contains(program, "/") {
path, err := filepath.Abs(program)
if err != nil {
logger.Fatalf("resolve path %s: %s", program, err)
return "", fmt.Errorf("resolve path %s: %s", program, err)
}
return path
return path, nil
}
for _, searchPath := range strings.Split(os.Getenv("PATH"), ":") {
if searchPath != "" {
p := filepath.Join(searchPath, program)
if _, err := os.Stat(p); err == nil {
return p
return p, nil
}
}
}
logger.Fatalf("can't find path for %s", program)
panic("")
return "", fmt.Errorf("can't find path for %s", program)
}
func launchWorker(address string, config *config.Config, wg *sync.WaitGroup) {
......@@ -205,10 +204,14 @@ func launchWorker(address string, config *config.Config, wg *sync.WaitGroup) {
go func(host string) {
defer wg.Done()
// copy
path := findSelfPath()
path, err := findSelfPath()
if err != nil {
logger.Errorf("find self path: %s", err)
return
}
rpath := "/tmp/juicesync"
cmd := exec.Command("rsync", "-au", path, host+":"+rpath)
err := cmd.Run()
err = cmd.Run()
if err != nil {
// fallback to scp
cmd = exec.Command("scp", path, host+":"+rpath)
......
......@@ -12,7 +12,10 @@ import (
func TestCluster(t *testing.T) {
// manager
todo := make(chan *object.Object, 100)
addr := startManager(todo)
addr, err := startManager(todo)
if err != nil {
t.Fatal(err)
}
sendStats(addr)
// worker
var conf config.Config
......
......@@ -599,7 +599,10 @@ func Sync(src, dst object.ObjectStorage, config *config.Config) error {
go showProgress()
}
if config.Workers != nil {
addr := startManager(todo)
addr, err := startManager(todo)
if err != nil {
return err
}
launchWorker(addr, config, &wg)
}
} else {
......
......@@ -19,7 +19,7 @@ func collectAll(c <-chan *object.Object) []string {
}
func TestIterator(t *testing.T) {
m := object.CreateStorage("mem", "", "", "")
m, _ := object.CreateStorage("mem", "", "", "")
m.Put("a", bytes.NewReader([]byte("a")))
m.Put("b", bytes.NewReader([]byte("a")))
m.Put("aa", bytes.NewReader([]byte("a")))
......@@ -37,7 +37,7 @@ func TestIterator(t *testing.T) {
}
// Single object
s := object.CreateStorage("mem", "", "", "")
s, _ := object.CreateStorage("mem", "", "", "")
s.Put("a", bytes.NewReader([]byte("a")))
ch, _ = iterate(s, "", "")
keys = collectAll(ch)
......@@ -52,7 +52,7 @@ func TestIeratorSingleEmptyKey(t *testing.T) {
// utils.SetLogLevel(logrus.DebugLevel)
// Construct mem storage
s := object.CreateStorage("mem", "", "", "")
s, _ := object.CreateStorage("mem", "", "", "")
err := s.Put("abc", bytes.NewReader([]byte("abc")))
if err != nil {
t.Errorf("Put error: %q", err)
......@@ -60,7 +60,7 @@ func TestIeratorSingleEmptyKey(t *testing.T) {
}
// Simulate command line prefix in SRC or DST
s = object.WithPrefix(s, "abc")
s, _ = object.WithPrefix(s, "abc")
ch, _ := iterate(s, "", "")
keys := collectAll(ch)
if !reflect.DeepEqual(keys, []string{""}) {
......@@ -87,12 +87,12 @@ func TestSync(t *testing.T) {
Quiet: false,
}
a := object.CreateStorage("mem", "a", "", "")
a, _ := object.CreateStorage("mem", "a", "", "")
a.Put("a", bytes.NewReader([]byte("a")))
a.Put("ab", bytes.NewReader([]byte("ab")))
a.Put("abc", bytes.NewReader([]byte("abc")))
b := object.CreateStorage("mem", "b", "", "")
b, _ := object.CreateStorage("mem", "b", "", "")
b.Put("ba", bytes.NewReader([]byte("ba")))
// Copy "a" from mem://a to mem://b
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册