提交 3a9045d7 编写于 作者: D Davies Liu 提交者: Davies Liu

add license header

cleanup
上级 7cd3dcfc
# osync
Sync object storage between clouds.
# Usage
```
osync [options] SRC DST
```
SRC and DST must be an URI of the following object storage:
- file: local files
- s3: Amazone S3
- gcs: Google Cloud Storage
- wasb: Windows Azure Blob Storage
- oss: Aliyun OSS
- cos: Tencent Cloud COS
- ks3: KSYun KS3
- ufile: UCloud UFile
- qingstor: Qingcloud QingStor
- bos: Baidu Cloud Object Storage
- jss: JCloud Object Storage
- qiniu: Qiniu
- b2: Backblaze B2
- space: Digital Ocean Space
Some examples:
- file://Users/me/code/
- s3://my-bucket.us-east1.amazonaws.com/
- s3://access-key:secret-key-id@my-bucket.us-west2.s3.amazonaws.com/prefix
- gcs://my-bucket.us-west1.googleapi.com/
- oss://test.oss-us-west-1.aliyuncs.com
- cos://test-1234.cos.ap-beijing.myqcloud.com
\ No newline at end of file
// Copyright (C) 2018-present Juicedata Inc.
package main
import (
"flag"
"net/url"
"os"
"osync/object"
"osync/utils"
"strings"
......@@ -11,11 +12,9 @@ import (
"github.com/sirupsen/logrus"
)
var srcURI = flag.String("src", "", "source")
var dstURI = flag.String("dst", "", "destination")
var start = flag.String("start", "", "the start of keys to sync")
var end = flag.String("end", "", "the last keys to sync")
var version = flag.Bool("V", false, "show version")
var debug = flag.Bool("v", false, "turn on debug log")
var quiet = flag.Bool("q", false, "change log level to ERROR")
......@@ -26,27 +25,31 @@ func createStorage(uri string) object.ObjectStorage {
if err != nil {
logger.Fatalf("Can't parse %s: %s", uri, err.Error())
}
secretKey, _ := u.User.Password()
objStorage := object.CreateStorage(strings.ToLower(u.Scheme), u.Host, u.User.Username(), secretKey)
user := u.User
var accessKey, secretKey string
if user != nil {
accessKey = user.Username()
secretKey, _ = user.Password()
}
endpoint := u.Host
if u.Scheme == "file" {
endpoint = u.Path
}
objStorage := object.CreateStorage(strings.ToLower(u.Scheme), endpoint, accessKey, secretKey)
if objStorage == nil {
logger.Fatalf("Invalid storage type: %s", u.Scheme)
}
return objStorage
}
var defaultEndpoint string
var defaultURI *url.URL
var defaultKey string
var defaultSecret string
var maxUploads int
func init() {
defaultEndpoint = os.Getenv("endpoint")
defaultURI, _ = url.ParseRequestURI(defaultEndpoint)
}
func main() {
flag.Parse()
args := flag.Args()
if len(args) != 2 {
println("osync [options] SRC DST")
return
}
if *debug {
utils.SetLogLevel(logrus.DebugLevel)
} else if *quiet {
......@@ -54,7 +57,7 @@ func main() {
}
utils.InitLoggers(false)
src := createStorage(*srcURI)
dst := createStorage(*dstURI)
SyncAll(src, dst, *start)
src := createStorage(args[0])
dst := createStorage(args[1])
Sync(src, dst, *start, *end)
}
// Copyright (C) 2018-present Juicedata Inc.
package object
import (
"errors"
"fmt"
"io"
"io/ioutil"
"log"
"net/url"
"strings"
......@@ -13,51 +14,45 @@ import (
"github.com/Azure/azure-sdk-for-go/storage"
)
type abs struct {
type wasb struct {
defaultObjectStorage
container *storage.Container
marker string
}
func (b *abs) String() string {
func (b *wasb) String() string {
return fmt.Sprintf("wasb://%s", b.container.Name)
}
func (b *abs) Create() error {
func (b *wasb) Create() error {
_, err := b.container.CreateIfNotExists(&storage.CreateContainerOptions{})
return err
}
func (b *abs) Get(key string, off, limit int64) (io.ReadCloser, error) {
func (b *wasb) Get(key string, off, limit int64) (io.ReadCloser, error) {
blob := b.container.GetBlobReference(key)
var end int64
if limit > 0 {
return blob.GetRange(&storage.GetBlobRangeOptions{
Range: &storage.BlobRange{
Start: uint64(off),
End: uint64(off + limit - 1),
},
})
}
r, err := blob.Get(nil)
if err != nil {
return nil, err
end = off + limit - 1
}
if off > 0 {
io.CopyN(ioutil.Discard, r, off)
}
return r, nil
return blob.GetRange(&storage.GetBlobRangeOptions{
Range: &storage.BlobRange{
Start: uint64(off),
End: uint64(end),
},
})
}
func (b *abs) Put(key string, data io.Reader) error {
func (b *wasb) Put(key string, data io.Reader) error {
return b.container.GetBlobReference(key).CreateBlockBlobFromReader(data, nil)
}
func (b *abs) Copy(dst, src string) error {
func (b *wasb) Copy(dst, src string) error {
uri := b.container.GetBlobReference(src).GetURL()
return b.container.GetBlobReference(dst).Copy(uri, nil)
}
func (b *abs) Exists(key string) error {
func (b *wasb) Exists(key string) error {
ok, err := b.container.GetBlobReference(key).Exists()
if !ok {
err = errors.New("Not existed")
......@@ -65,7 +60,7 @@ func (b *abs) Exists(key string) error {
return err
}
func (b *abs) Delete(key string) error {
func (b *wasb) Delete(key string) error {
ok, err := b.container.GetBlobReference(key).DeleteIfExists(nil)
if !ok {
err = errors.New("Not existed")
......@@ -73,7 +68,7 @@ func (b *abs) Delete(key string) error {
return err
}
func (b *abs) List(prefix, marker string, limit int64) ([]*Object, error) {
func (b *wasb) List(prefix, marker string, limit int64) ([]*Object, error) {
if marker != "" {
if b.marker == "" {
// last page
......@@ -101,7 +96,9 @@ func (b *abs) List(prefix, marker string, limit int64) ([]*Object, error) {
return objs, nil
}
func newAbs(endpoint, account, key string) ObjectStorage {
// TODO: support multipart upload
func newWabs(endpoint, account, key string) ObjectStorage {
uri, err := url.ParseRequestURI(endpoint)
if err != nil {
log.Fatalf("Invalid endpoint: %v, error: %v", endpoint, err)
......@@ -114,9 +111,9 @@ func newAbs(endpoint, account, key string) ObjectStorage {
}
service := client.GetBlobService()
container := service.GetContainerReference(name)
return &abs{container: container}
return &wasb{container: container}
}
func init() {
RegisterStorage("wasb", newAbs)
register("wasb", newWabs)
}
// Copyright (C) 2018-present Juicedata Inc.
package object
import (
......@@ -115,5 +117,5 @@ func newB2(endpoint, account, key string) ObjectStorage {
}
func init() {
RegisterStorage("b2", newB2)
register("b2", newB2)
}
// Copyright (C) 2018-present Juicedata Inc.
package object
import (
......@@ -158,5 +160,5 @@ func newBOS(endpoint, accessKey, secretKey string) ObjectStorage {
}
func init() {
RegisterStorage("bos", newBOS)
register("bos", newBOS)
}
// Copyright (C) 2018-present Juicedata Inc.
package object
import (
......@@ -256,5 +258,5 @@ func newCOS(endpoint, accessKey, secretKey string) ObjectStorage {
}
func init() {
RegisterStorage("cos", newCOS)
register("cos", newCOS)
}
// Copyright (C) 2018-present Juicedata Inc.
package object
import (
......@@ -8,29 +10,30 @@ import (
"io/ioutil"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"jfs/mount/storage/utils"
)
type diskStore struct {
type filestore struct {
dir string
}
func (d *diskStore) String() string {
func (d *filestore) String() string {
return "file://" + d.dir
}
func (d *diskStore) Create() error {
func (d *filestore) Create() error {
return os.MkdirAll(d.dir, os.FileMode(0700))
}
func (d *diskStore) path(key string) string {
func (d *filestore) path(key string) string {
return filepath.Join(d.dir, key)
}
func (d *diskStore) Get(key string, off, limit int64) (io.ReadCloser, error) {
func (d *filestore) Get(key string, off, limit int64) (io.ReadCloser, error) {
p := d.path(key)
f, err := os.Open(p)
if err != nil {
......@@ -52,7 +55,7 @@ func (d *diskStore) Get(key string, off, limit int64) (io.ReadCloser, error) {
return f, err
}
func (d *diskStore) Put(key string, in io.Reader) error {
func (d *filestore) Put(key string, in io.Reader) error {
p := d.path(key)
if err := os.MkdirAll(filepath.Dir(p), os.FileMode(0700)); err != nil {
return err
......@@ -66,7 +69,7 @@ func (d *diskStore) Put(key string, in io.Reader) error {
return err
}
func (d *diskStore) Copy(dst, src string) error {
func (d *filestore) Copy(dst, src string) error {
r, err := d.Get(src, 0, -1)
if err != nil {
return err
......@@ -74,25 +77,109 @@ func (d *diskStore) Copy(dst, src string) error {
return d.Put(dst, r)
}
func (d *diskStore) Exists(key string) error {
func (d *filestore) Exists(key string) error {
if utils.Exists(d.path(key)) {
return nil
}
return errors.New("not exists")
}
func (d *diskStore) Delete(key string) error {
func (d *filestore) Delete(key string) error {
if d.Exists(key) != nil {
return errors.New("not exists")
}
return os.Remove(d.path(key))
}
func (d *diskStore) List(prefix, marker string, limit int64) ([]*Object, error) {
// walk recursively descends path, calling w.
func walk(path string, info os.FileInfo, walkFn filepath.WalkFunc) error {
err := walkFn(path, info, nil)
if err != nil {
if info.IsDir() && err == filepath.SkipDir {
return nil
}
return err
}
if !info.IsDir() {
return nil
}
names, err := readDirNames(path)
if err != nil {
return walkFn(path, info, err)
}
for _, name := range names {
filename := filepath.Join(path, name)
fileInfo, err := os.Lstat(filename)
if err != nil {
if err := walkFn(filename, fileInfo, err); err != nil && err != filepath.SkipDir {
return err
}
} else {
err = walk(filename, fileInfo, walkFn)
if err != nil {
if !fileInfo.IsDir() || err != filepath.SkipDir {
return err
}
}
}
}
return nil
}
// Walk walks the file tree rooted at root, calling walkFn for each file or
// directory in the tree, including root. All errors that arise visiting files
// and directories are filtered by walkFn. The files are walked in lexical
// order, which makes the output deterministic but means that for very
// large directories Walk can be inefficient.
// Walk does not follow symbolic links.
func Walk(root string, walkFn filepath.WalkFunc) error {
info, err := os.Lstat(root)
if err != nil {
err = walkFn(root, nil, err)
} else {
err = walk(root, info, walkFn)
}
if err == filepath.SkipDir {
return nil
}
return err
}
// readDirNames reads the directory named by dirname and returns
// a sorted list of directory entries.
func readDirNames(dirname string) ([]string, error) {
f, err := os.Open(dirname)
if err != nil {
return nil, err
}
defer f.Close()
fi, err := f.Readdir(-1)
if err != nil {
return nil, err
}
names := make([]string, len(fi))
for i := range fi {
if fi[i].IsDir() {
names[i] = fi[i].Name() + "/"
} else {
names[i] = fi[i].Name()
}
}
sort.Strings(names)
return names, nil
}
func (d *filestore) List(prefix, marker string, limit int64) ([]*Object, error) {
var objs []*Object
filepath.Walk(d.dir, func(path string, info os.FileInfo, err error) error {
if !info.IsDir() {
key := path[len(d.dir)+1:]
Walk(d.dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.IsDir() && info.Size() > 0 {
key := path[len(d.dir):]
if key > marker && strings.HasPrefix(key, prefix) {
t := int(info.ModTime().Unix())
objs = append(objs, &Object{key, info.Size(), t, t})
......@@ -106,17 +193,17 @@ func (d *diskStore) List(prefix, marker string, limit int64) ([]*Object, error)
return objs, nil
}
func (d *diskStore) CreateMultipartUpload(key string) (*MultipartUpload, error) {
func (d *filestore) CreateMultipartUpload(key string) (*MultipartUpload, error) {
dir, err := ioutil.TempDir("", "multipart")
return &MultipartUpload{UploadID: dir, MinPartSize: 1 << 20, MaxCount: 1000}, err
}
func (d *diskStore) UploadPart(key string, uploadID string, num int, body []byte) (*Part, error) {
func (d *filestore) UploadPart(key string, uploadID string, num int, body []byte) (*Part, error) {
path := filepath.Join(uploadID, strconv.Itoa(num))
return &Part{Num: num, ETag: path}, ioutil.WriteFile(path, body, os.FileMode(0700))
}
func (d *diskStore) AbortUpload(key string, uploadID string) {
func (d *filestore) AbortUpload(key string, uploadID string) {
fs, err := ioutil.ReadDir(uploadID)
if err == nil {
for _, f := range fs {
......@@ -125,7 +212,7 @@ func (d *diskStore) AbortUpload(key string, uploadID string) {
}
}
func (d *diskStore) CompleteUpload(key string, uploadID string, parts []*Part) error {
func (d *filestore) CompleteUpload(key string, uploadID string, parts []*Part) error {
p := d.path(key)
if err := os.MkdirAll(filepath.Dir(p), os.FileMode(0700)); err != nil {
return err
......@@ -155,15 +242,18 @@ func (d *diskStore) CompleteUpload(key string, uploadID string, parts []*Part) e
return nil
}
func (d *diskStore) ListUploads(marker string) ([]*PendingPart, string, error) {
func (d *filestore) ListUploads(marker string) ([]*PendingPart, string, error) {
return nil, "", nil
}
func newDisk(endpoint, accesskey, secretkey string) ObjectStorage {
store := &diskStore{dir: endpoint}
if !strings.HasSuffix(endpoint, "/") {
endpoint += "/"
}
store := &filestore{dir: endpoint}
return store
}
func init() {
RegisterStorage("file", newDisk)
register("file", newDisk)
}
// Copyright (C) 2018-present Juicedata Inc.
package object
import (
......@@ -149,5 +151,5 @@ func newGS(endpoint, accessKey, secretKey string) ObjectStorage {
}
func init() {
RegisterStorage("gs", newGS)
register("gs", newGS)
}
// Copyright (C) 2018-present Juicedata Inc.
package object
import (
......@@ -52,5 +54,5 @@ func newJSS(endpoint, accessKey, secretKey string) ObjectStorage {
}
func init() {
RegisterStorage("jss", newJSS)
register("jss", newJSS)
}
// Copyright (C) 2018-present Juicedata Inc.
package object
import (
......@@ -237,5 +239,5 @@ func newKS3(endpoint, accessKey, secretKey string) ObjectStorage {
}
func init() {
RegisterStorage("ks3", newKS3)
register("ks3", newKS3)
}
// Copyright (C) 2018-present Juicedata Inc.
package object
import (
......@@ -119,5 +121,5 @@ func newMem(endpoint, accesskey, secretkey string) ObjectStorage {
}
func init() {
RegisterStorage("mem", newMem)
register("mem", newMem)
}
// Copyright (C) 2018-present Juicedata Inc.
package object
import (
......@@ -144,5 +146,5 @@ func newMSS(endpoint, accessKey, secretKey string) ObjectStorage {
}
func init() {
RegisterStorage("mss", newMSS)
register("mss", newMSS)
}
// Copyright (C) 2018-present Juicedata Inc.
package object
import (
......@@ -160,5 +162,5 @@ func newNOS(endpoint, accessKey, secretKey string) ObjectStorage {
}
func init() {
RegisterStorage("nos", newNOS)
register("nos", newNOS)
}
// Copyright (C) 2018-present Juicedata Inc.
package object
import (
......@@ -89,7 +91,7 @@ type Register func(endpoint, accessKey, secretKey string) ObjectStorage
var storages = make(map[string]Register)
func RegisterStorage(name string, register Register) {
func register(name string, register Register) {
storages[name] = register
}
......
......@@ -176,7 +176,7 @@ func TestMem(t *testing.T) {
}
func TestQingStor(t *testing.T) {
s := newQingStor("https://cfs-test-tmp1.pek3a.qingstor.com",
s := newQingStor("https://test.pek3a.qingstor.com",
os.Getenv("QY_ACCESS_KEY"), os.Getenv("QY_SECRET_KEY"))
testStorage(t, s)
}
......@@ -191,110 +191,70 @@ func TestS3(t *testing.T) {
}
func TestOSS(t *testing.T) {
s := newOSS("https://cfs-test-tmp1.oss-us-west-1.aliyuncs.com",
s := newOSS("https://test.oss-us-west-1.aliyuncs.com",
os.Getenv("OSS_ACCESS_KEY"), os.Getenv("OSS_SECRET_KEY"))
testStorage(t, s)
}
func TestUFile(t *testing.T) {
ufile := newUFile("https://cfs-test-tmp-2.us-ca.ufileos.com",
ufile := newUFile("https://test.us-ca.ufileos.com",
os.Getenv("UCLOUD_PUBLIC_KEY"), os.Getenv("UCLOUD_PRIVATE_KEY"))
testStorage(t, ufile)
}
func TestGS(t *testing.T) {
os.Setenv("GOOGLE_CLOUD_PROJECT", "davies-test")
gs := newGS("https://jfs-test-2.us-west1.googleapi.com", "", "")
gs := newGS("https://test.us-west1.googleapi.com", "", "")
testStorage(t, gs)
}
func TestQiniu(t *testing.T) {
qiniu := newQiniu("https://jfs-test2.cn-east-1-s3.qiniu.com",
qiniu := newQiniu("https://test.cn-east-1-s3.qiniu.com",
os.Getenv("QINIU_ACCESS_KEY"), os.Getenv("QINIU_SECRET_KEY"))
testStorage(t, qiniu)
qiniu = newQiniu("https://jfs-test-tmp.cn-north-1-s3.qiniu.com",
qiniu = newQiniu("https://test.cn-north-1-s3.qiniu.com",
os.Getenv("QINIU_ACCESS_KEY"), os.Getenv("QINIU_SECRET_KEY"))
testStorage(t, qiniu)
}
func TestReplicated(t *testing.T) {
s1 := newMem("", "", "")
s2 := newMem("", "", "")
rep := NewReplicated(s1, s2)
testStorage(t, rep)
// test healing
s2.Put("/a", bytes.NewBuffer([]byte("a")))
if r, e := rep.Get("/a", 0, 1); e != nil {
t.Fatalf("Fail to get /a")
} else if s, _ := ioutil.ReadAll(r); string(s) != "a" {
t.Fatalf("Fail to get /a")
}
if s1.Exists("/a") == nil {
t.Fatalf("a should not be in s1")
}
if r, e := rep.Get("/a", 0, -1); e != nil {
t.Fatalf("Fail to get /a")
} else if s, _ := ioutil.ReadAll(r); string(s) != "a" {
t.Fatalf("Fail to get /a")
}
if s1.Exists("/a") != nil {
t.Fatalf("a should be in s1")
}
// test sync
s1.Put("b", bytes.NewBuffer([]byte("a")))
s2.Put("c", bytes.NewBuffer(make([]byte, 15<<20)))
r := rep.(*Replicated)
r.Backfill("b")
if s2.Exists("b") == nil {
t.Fatalf("b should not be in s2")
}
r.Backfill("c")
if s1.Exists("c") != nil {
t.Fatalf("c should be in s1")
}
}
func TestKS3(t *testing.T) {
ks3 := newKS3("https://jfs-temp4.kss.ksyun.com",
ks3 := newKS3("https://test.kss.ksyun.com",
os.Getenv("KS3_ACCESS_KEY"), os.Getenv("KS3_SECRET_KEY"))
testStorage(t, ks3)
}
func TestCOS(t *testing.T) {
cos := newCOS("https://jfstest1-1252455339.cos.ap-beijing.myqcloud.com",
cos := newCOS("https://test-1234.cos.ap-beijing.myqcloud.com",
os.Getenv("COS_ACCESS_KEY"), os.Getenv("COS_SECRET_KEY"))
testStorage(t, cos)
}
func TestAzure(t *testing.T) {
abs := newAbs("https://test-chunk.core.chinacloudapi.cn",
abs := newWabs("https://test-chunk.core.chinacloudapi.cn",
os.Getenv("AZURE_STORAGE_ACCOUNT"), os.Getenv("AZURE_STORAGE_KEY"))
testStorage(t, abs)
}
func TestNOS(t *testing.T) {
nos := newNOS("https://jfs-test.nos-eastchina1.126.net",
nos := newNOS("https://test.nos-eastchina1.126.net",
os.Getenv("NOS_ACCESS_KEY"), os.Getenv("NOS_SECRET_KEY"))
testStorage(t, nos)
}
func TestMSS(t *testing.T) {
mss := newMSS("https://jfstest.mtmss.com",
mss := newMSS("https://test.mtmss.com",
os.Getenv("MSS_ACCESS_KEY"), os.Getenv("MSS_SECRET_KEY"))
testStorage(t, mss)
}
func TestJSS(t *testing.T) {
jss := newJSS("https://jfstest.s3.cn-north-1.jcloudcs.com",
jss := newJSS("https://test.s3.cn-north-1.jcloudcs.com",
os.Getenv("JSS_ACCESS_KEY"), os.Getenv("JSS_SECRET_KEY"))
testStorage(t, jss)
}
func TestSpeedy(t *testing.T) {
cos := newSpeedy("https://jfs-test.oss-cn-beijing.speedycloud.org",
cos := newSpeedy("https://test.oss-cn-beijing.speedycloud.org",
os.Getenv("SPEEDY_ACCESS_KEY"), os.Getenv("SPEEDY_SECRET_KEY"))
testStorage(t, cos)
}
......@@ -305,16 +265,16 @@ func TestDisk(t *testing.T) {
}
func TestB2(t *testing.T) {
b := newB2("https://jfs-test.backblaze.com", os.Getenv("B2_ACCOUNT_ID"), os.Getenv("B2_APP_KEY"))
b := newB2("https://test.backblaze.com", os.Getenv("B2_ACCOUNT_ID"), os.Getenv("B2_APP_KEY"))
testStorage(t, b)
}
func TestSpace(t *testing.T) {
b := newSpace("https://jfs-test.nyc3.digitaloceanspaces.com", os.Getenv("SPACE_ACCESS_KEY"), os.Getenv("SPACE_SECRET_KEY"))
b := newSpace("https://test.nyc3.digitaloceanspaces.com", os.Getenv("SPACE_ACCESS_KEY"), os.Getenv("SPACE_SECRET_KEY"))
testStorage(t, b)
}
func TestBOS(t *testing.T) {
b := newBOS("https://jfs-test.su.bcebos.com", os.Getenv("BOS_ACCESS_KEY"), os.Getenv("BOS_SECRET_KEY"))
b := newBOS("https://test.su.bcebos.com", os.Getenv("BOS_ACCESS_KEY"), os.Getenv("BOS_SECRET_KEY"))
testStorage(t, b)
}
// Copyright (C) 2018-present Juicedata Inc.
package object
import (
......@@ -153,5 +155,5 @@ func newOSS(endpoint, accessKey, secretKey string) ObjectStorage {
}
func init() {
RegisterStorage("oss", newOSS)
register("oss", newOSS)
}
// Copyright (C) 2018-present Juicedata Inc.
package object
import (
......
// Copyright (C) 2018-present Juicedata Inc.
package object
import (
......@@ -14,7 +16,6 @@ import (
)
type qingstor struct {
defaultObjectStorage
bucket *qs.Bucket
}
......@@ -235,5 +236,5 @@ func newQingStor(endpoint, accessKey, secretKey string) ObjectStorage {
}
func init() {
RegisterStorage("qingstor", newQingStor)
register("qingstor", newQingStor)
}
// Copyright (C) 2018-present Juicedata Inc.
package object
import (
......@@ -123,5 +125,5 @@ func newQiniu(endpoint, accessKey, secretKey string) ObjectStorage {
}
func init() {
RegisterStorage("qiniu", newQiniu)
register("qiniu", newQiniu)
}
// Copyright (C) 2018-present Juicedata Inc.
package object
import (
......
// Copyright (C) 2018-present Juicedata Inc.
package object
import (
......@@ -222,5 +224,5 @@ func newS3(endpoint, accessKey, secretKey string) ObjectStorage {
}
func init() {
RegisterStorage("s3", newS3)
register("s3", newS3)
}
// Copyright (C) 2018-present Juicedata Inc.
package object
import (
......@@ -60,5 +62,5 @@ func newSpace(endpoint, accessKey, secretKey string) ObjectStorage {
}
func init() {
RegisterStorage("space", newSpace)
register("space", newSpace)
}
// Copyright (C) 2018-present Juicedata Inc.
package object
import (
......@@ -20,7 +22,6 @@ func (s *speedy) String() string {
return fmt.Sprintf("speedy://%s", uri.Host)
}
// TODO: Create
func (s *speedy) Create() error {
uri, _ := url.ParseRequestURI(s.endpoint)
parts := strings.SplitN(uri.Host, ".", 2)
......@@ -102,5 +103,5 @@ func newSpeedy(endpoint, accessKey, secretKey string) ObjectStorage {
}
func init() {
RegisterStorage("speedy", newSpeedy)
register("speedy", newSpeedy)
}
// Copyright (C) 2018-present Juicedata Inc.
package object
import (
......@@ -283,5 +285,5 @@ func newUFile(endpoint, accessKey, secretKey string) ObjectStorage {
}
func init() {
RegisterStorage("ufile", newUFile)
register("ufile", newUFile)
}
// Copyright (C) 2018-present Juicedata Inc.
package main
import (
......@@ -8,32 +10,41 @@ import (
"osync/object"
"sync"
"sync/atomic"
"time"
)
// The max number of key per listing request
const MaxResults = 10240
const ReplicateThreads = 50
const maxBlock = 10 << 20
var copied uint64
var (
ReplicateThreads = 50
found uint64
missing uint64
copied uint64
failed uint64
)
// Iterate on all the keys that starts at marker from object storage.
func Iterate(store object.ObjectStorage, marker string) (<-chan *object.Object, error) {
func Iterate(store object.ObjectStorage, marker, end string) (<-chan *object.Object, error) {
objs, err := store.List("", marker, MaxResults)
if err != nil {
logger.Errorf("Can't list %s: %s", store, err.Error())
return nil, err
}
// Sending the keys into two channel, one used as source, another used as destination
out := make(chan *object.Object, MaxResults)
go func() {
lastkey := ""
END:
for len(objs) > 0 {
for _, obj := range objs {
key := obj.Key
if key != "" && key <= lastkey {
logger.Fatalf("The keys are out of order: %q >= %q", lastkey, key)
}
if end != "" && key >= end {
break END
}
lastkey = key
out <- obj
}
......@@ -51,26 +62,6 @@ func Iterate(store object.ObjectStorage, marker string) (<-chan *object.Object,
return out, nil
}
func Duplicate(in <-chan *object.Object) (<-chan string, <-chan string) {
out1 := make(chan string, MaxResults)
out2 := make(chan string, MaxResults)
go func() {
for s := range in {
if s != nil {
out1 <- s.Key
out2 <- s.Key
} else {
out1 <- ""
out2 <- ""
}
}
close(out1)
close(out2)
}()
return out1, out2
}
// Sync replicate the key from secondary to primary
func replicate(src, dst object.ObjectStorage, key string) error {
in, e := src.Get(key, 0, maxBlock)
if e != nil {
......@@ -113,20 +104,8 @@ func replicate(src, dst object.ObjectStorage, key string) error {
return dst.Put(key, f)
}
// Replicate a key from one object storage to another
func rep(src, dst object.ObjectStorage, key string) error {
logger.Debugf("replicating %s", key)
if err := replicate(src, dst, key); err != nil {
logger.Warningf("Failed to replicate %s from %s to %s: %s", key, src, dst, err.Error())
return err
}
atomic.AddUint64(&copied, 1)
return nil
}
// Sync comparing all the ordered keys from two object storage, replicate the missed ones.
func Sync(src, dst object.ObjectStorage, srckeys, dstkeys <-chan string, waiter *sync.WaitGroup) {
defer waiter.Done()
// sync comparing all the ordered keys from two object storage, replicate the missed ones.
func doSync(src, dst object.ObjectStorage, srckeys, dstkeys <-chan *object.Object) {
todo := make(chan string, 1024)
wg := sync.WaitGroup{}
for i := 0; i < ReplicateThreads; i++ {
......@@ -138,64 +117,70 @@ func Sync(src, dst object.ObjectStorage, srckeys, dstkeys <-chan string, waiter
if !ok {
break
}
if rep(src, dst, key) != nil {
if err := rep(src, dst, key); err != nil {
logger.Infof("Failed to replicate %s from %s to %s", key, src, dst)
}
logger.Debugf("replicating %s", key)
if err := replicate(src, dst, key); err != nil {
logger.Warningf("Failed to replicate %s from %s to %s: %s", key, src, dst, err.Error())
atomic.AddUint64(&failed, 1)
} else {
atomic.AddUint64(&copied, 1)
}
}
}()
}
dstkey := ""
hasMore := true
OUT:
for key := range srckeys {
if key == "" {
for obj := range srckeys {
if obj == nil {
logger.Errorf("Listing failed, stop replicating, waiting for pending ones")
break
}
for key > dstkey {
var ok bool
dstkey, ok = <-dstkeys
atomic.AddUint64(&found, 1)
for hasMore && obj.Key > dstkey {
dstobj, ok := <-dstkeys
if !ok {
// the maximum key for JuiceFS
dstkey = "zzzzzzzz"
} else if dstkey == "" {
hasMore = false
} else if dstobj == nil {
// Listing failed, stop
logger.Errorf("Listing failed, stop replicating, waiting for pending ones")
break OUT
} else {
dstkey = dstobj.Key
}
}
if key < dstkey {
todo <- key
if obj.Key < dstkey || !hasMore {
todo <- obj.Key
atomic.AddUint64(&missing, 1)
}
}
close(todo)
// consume all the keys to unblock the producer
for dstkey = range dstkeys {
}
wg.Wait()
}
// SyncAll syncs all the keys between to object storage
func SyncAll(a, b object.ObjectStorage, marker string) error {
logger.Infof("syncing between %s and %s (starting from %q)", a, b, marker)
cha, err := Iterate(a, marker)
func showProgress() {
for {
logger.Infof("Found: %d, missing: %d, copied: %d, failed: %d", atomic.LoadUint64(&found),
atomic.LoadUint64(&missing)-atomic.LoadUint64(&copied), atomic.LoadUint64(&copied), atomic.LoadUint64(&failed))
time.Sleep(time.Second)
}
}
// Sync syncs all the keys between to object storage
func Sync(src, dst object.ObjectStorage, marker, end string) error {
logger.Infof("syncing between %s and %s (starting from %q)", src, dst, marker)
cha, err := Iterate(src, marker, end)
if err != nil {
return err
}
srca, dsta := Duplicate(cha)
chb, err := Iterate(b, marker)
chb, err := Iterate(dst, marker, end)
if err != nil {
return err
}
srcb, dstb := Duplicate(chb)
var wg sync.WaitGroup
wg.Add(2)
go Sync(a, b, srca, dstb, &wg)
go Sync(b, a, srcb, dsta, &wg)
wg.Wait()
logger.Infof("all synchronized (copied %d blocks)", copied)
go showProgress()
doSync(src, dst, cha, chb)
logger.Infof("Finished: found: %d, missing: %d, copied: %d, failed: %d", atomic.LoadUint64(&found),
atomic.LoadUint64(&missing)-atomic.LoadUint64(&copied), atomic.LoadUint64(&copied), atomic.LoadUint64(&failed))
return nil
}
// Copyright (C) 2018-present Juicedata Inc.
package main
import (
......
// Copyright (C) 2018-present Juicedata Inc.
package utils
import (
......
......@@ -433,7 +433,7 @@ type StorageTransferServiceServer interface {
ResumeTransferOperation(context.Context, *ResumeTransferOperationRequest) (*google_protobuf1.Empty, error)
}
func RegisterStorageTransferServiceServer(s *grpc.Server, srv StorageTransferServiceServer) {
func registerTransferServiceServer(s *grpc.Server, srv StorageTransferServiceServer) {
s.RegisterService(&_StorageTransferService_serviceDesc, srv)
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册