提交 129729bb 编写于 作者: D Davies Liu

preserve permissions on file system

上级 badecffe
......@@ -10,6 +10,7 @@ type Config struct {
Threads int
HTTPPort int
Update bool
Perms bool
Dry bool
DeleteSrc bool
DeleteDst bool
......@@ -26,6 +27,7 @@ func NewConfigFromCli(c *cli.Context) *Config {
Threads: c.Int("threads"),
HTTPPort: c.Int("http-port"),
Update: c.Bool("update"),
Perms: c.Bool("perms"),
Dry: c.Bool("dry"),
DeleteSrc: c.Bool("delete-src"),
DeleteDst: c.Bool("delete-dst"),
......
......@@ -37,7 +37,7 @@ func supportHTTPS(name, endpoint string) bool {
}
}
func createStorage(uri string) object.ObjectStorage {
func createStorage(uri string, conf *config.Config) object.ObjectStorage {
if !strings.Contains(uri, "://") {
if strings.Contains(uri, ":") {
var user string
......@@ -95,6 +95,12 @@ func createStorage(uri string) object.ObjectStorage {
if store == nil {
logger.Fatalf("Invalid storage type: %s", u.Scheme)
}
if conf.Perms {
if _, ok := store.(object.FileSystem); !ok {
logger.Warnf("%s is not a file system, can not preserve permissions", store)
conf.Perms = false
}
}
if name != "file" && len(u.Path) > 1 {
store = object.WithPrefix(store, u.Path[1:])
}
......@@ -112,8 +118,8 @@ func run(c *cli.Context) error {
}
utils.InitLoggers(false)
src := createStorage(c.Args().Get(0))
dst := createStorage(c.Args().Get(1))
src := createStorage(c.Args().Get(0), config)
dst := createStorage(c.Args().Get(1), config)
return sync.Sync(src, dst, config)
}
......@@ -159,6 +165,10 @@ func main() {
Aliases: []string{"u"},
Usage: "update existing file if the source is newer",
},
&cli.BoolFlag{
Name: "perms",
Usage: "preserve permissions",
},
&cli.BoolFlag{
Name: "dry",
Usage: "don't copy file",
......
......@@ -12,6 +12,7 @@ import (
"sort"
"strings"
"time"
"unsafe"
"github.com/juicedata/juicesync/utils"
)
......@@ -197,7 +198,9 @@ func (d *filestore) List(prefix, marker string, limit int64) ([]*Object, error)
}
key := path[len(d.root):]
if key >= marker && strings.HasPrefix(key, prefix) && !info.IsDir() {
listed <- &Object{key, info.Size(), info.ModTime()}
owner, group := getOwnerGroup(info)
f := &File{Object{key, info.Size(), info.ModTime()}, owner, group, info.Mode()}
listed <- (*Object)(unsafe.Pointer(f))
}
return nil
})
......@@ -229,6 +232,16 @@ func (d *filestore) Chtimes(path string, mtime time.Time) error {
return os.Chtimes(filepath.Join(d.root, path), mtime, mtime)
}
func (d *filestore) Chmod(path string, mode os.FileMode) error {
return os.Chmod(filepath.Join(d.root, path), mode)
}
func (d *filestore) Chown(path string, owner, group string) error {
uid := lookupUser(owner)
gid := lookupGroup(group)
return os.Chown(filepath.Join(d.root, path), uid, gid)
}
func newDisk(root, accesskey, secretkey string) ObjectStorage {
os.MkdirAll(root, 0755)
return &filestore{root: root}
......
package object
import (
"os"
"os/user"
"strconv"
"sync"
"syscall"
)
var uids = make(map[int]string)
var gids = make(map[int]string)
var users = make(map[string]int)
var groups = make(map[string]int)
var mutex sync.Mutex
func getOwnerGroup(info os.FileInfo) (string, string) {
mutex.Lock()
defer mutex.Unlock()
var owner, group string
if st, ok := info.Sys().(*syscall.Stat_t); ok {
if u, ok := uids[int(st.Uid)]; ok {
owner = u
} else if u, err := user.LookupId(strconv.Itoa(int(st.Uid))); err == nil {
owner = u.Username
uids[int(st.Uid)] = owner
}
if g, ok := gids[int(st.Gid)]; ok {
group = g
} else if g, err := user.LookupGroupId(strconv.Itoa(int(st.Gid))); err == nil {
group = g.Name
gids[int(st.Gid)] = group
}
}
return owner, group
}
func lookupUser(name string) int {
mutex.Lock()
defer mutex.Unlock()
if u, ok := users[name]; ok {
return u
}
var uid = -1
if u, err := user.Lookup(name); err == nil {
uid, _ = strconv.Atoi(u.Uid)
}
users[name] = uid
return uid
}
func lookupGroup(name string) int {
mutex.Lock()
defer mutex.Unlock()
if u, ok := groups[name]; ok {
return u
}
var gid = -1
if u, err := user.LookupGroup(name); err == nil {
gid, _ = strconv.Atoi(u.Gid)
}
groups[name] = gid
return gid
}
......@@ -9,6 +9,7 @@ import (
"path/filepath"
"strings"
"time"
"unsafe"
"github.com/colinmarc/hdfs"
)
......@@ -116,7 +117,9 @@ func (h *hdfsclient) List(prefix, marker string, limit int64) ([]*Object, error)
return nil
}
if !info.IsDir() {
listed <- &Object{key, info.Size(), info.ModTime()}
hinfo := info.(*hdfs.FileInfo)
f := &File{Object{key, info.Size(), info.ModTime()}, hinfo.Owner(), hinfo.OwnerGroup(), info.Mode()}
listed <- (*Object)(unsafe.Pointer(f))
}
return nil
})
......@@ -148,6 +151,14 @@ func (h *hdfsclient) Chtimes(path string, mtime time.Time) error {
return h.c.Chtimes(path, mtime, mtime)
}
func (h *hdfsclient) Chmod(path string, mode os.FileMode) error {
return h.c.Chmod(path, mode)
}
func (h *hdfsclient) Chown(path string, owner, group string) error {
return h.c.Chown(path, owner, group)
}
// TODO: multipart upload
func newHDFS(addr, user, sk string) ObjectStorage {
......
......@@ -5,27 +5,34 @@ package object
import (
"bytes"
"errors"
"fmt"
"io"
"io/ioutil"
"os"
"sort"
"strings"
"sync"
"time"
"unsafe"
)
type Obj struct {
Data []byte
Updated time.Time
type obj struct {
data []byte
mtime time.Time
mode os.FileMode
owner string
group string
}
type memStore struct {
sync.Mutex
defaultObjectStorage
objects map[string]*Obj
name string
objects map[string]*obj
}
func (m *memStore) String() string {
return "memstore"
return fmt.Sprintf("mem://%s", m.name)
}
func (m *memStore) Get(key string, off, limit int64) (io.ReadCloser, error) {
......@@ -35,7 +42,7 @@ func (m *memStore) Get(key string, off, limit int64) (io.ReadCloser, error) {
if !ok {
return nil, errors.New("not exists")
}
data := d.Data[off:]
data := d.data[off:]
if limit > 0 && limit < int64(len(data)) {
data = data[:limit]
}
......@@ -53,7 +60,30 @@ func (m *memStore) Put(key string, in io.Reader) error {
if err != nil {
return err
}
m.objects[key] = &Obj{data, time.Now()}
m.objects[key] = &obj{data: data, mtime: time.Now()}
return nil
}
func (m *memStore) Chmod(key string, mode os.FileMode) error {
m.Lock()
defer m.Unlock()
obj, ok := m.objects[key]
if !ok {
return errors.New("not found")
}
obj.mode = mode
return nil
}
func (m *memStore) Chown(key string, owner, group string) error {
m.Lock()
defer m.Unlock()
obj, ok := m.objects[key]
if !ok {
return errors.New("not found")
}
obj.owner = owner
obj.group = group
return nil
}
......@@ -64,7 +94,7 @@ func (m *memStore) Chtimes(key string, mtime time.Time) error {
if !ok {
return errors.New("not found")
}
obj.Updated = mtime
obj.mtime = mtime
return nil
}
......@@ -111,7 +141,8 @@ func (m *memStore) List(prefix, marker string, limit int64) ([]*Object, error) {
for k := range m.objects {
if strings.HasPrefix(k, prefix) && k > marker {
obj := m.objects[k]
objs = append(objs, &Object{k, int64(len(obj.Data)), obj.Updated})
f := &File{Object{k, int64(len(obj.data)), obj.mtime}, obj.owner, obj.group, obj.mode}
objs = append(objs, (*Object)(unsafe.Pointer(f)))
}
}
sort.Sort(sortObject(objs))
......@@ -123,7 +154,7 @@ func (m *memStore) List(prefix, marker string, limit int64) ([]*Object, error) {
func newMem(endpoint, accesskey, secretkey string) ObjectStorage {
store := &memStore{}
store.objects = make(map[string]*Obj)
store.objects = make(map[string]*obj)
return store
}
......
......@@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"io"
"os"
"time"
"github.com/juicedata/juicesync/utils"
......@@ -54,6 +55,18 @@ type ObjectStorage interface {
type MtimeChanger interface {
Chtimes(path string, mtime time.Time) error
}
type File struct {
Object
Owner string
Group string
Mode os.FileMode
}
type FileSystem interface {
MtimeChanger
Chmod(path string, mode os.FileMode) error
Chown(path string, owner, group string) error
}
var notSupported = errors.New("not supported")
......
......@@ -13,6 +13,7 @@ import (
"sync"
"sync/atomic"
"time"
"unsafe"
"github.com/juicedata/juicesync/config"
"github.com/juicedata/juicesync/object"
......@@ -26,6 +27,7 @@ const (
defaultPartSize = 5 << 20
maxBlock = defaultPartSize * 2
markDelete = -1
markCopyPerms = -2
)
var (
......@@ -290,10 +292,23 @@ func doSync(src, dst object.ObjectStorage, srckeys, dstkeys <-chan *object.Objec
}
continue
}
if config.Dry {
logger.Debugf("Will copy %s (%d bytes)", obj.Key, obj.Size)
continue
}
if config.Perms && obj.Size == markCopyPerms {
fi := (*object.File)(unsafe.Pointer(obj))
if err := dst.(object.FileSystem).Chmod(obj.Key, fi.Mode); err != nil {
logger.Warnf("Chmod %s to %d: %s", obj.Key, err)
}
if err := dst.(object.FileSystem).Chown(obj.Key, fi.Owner, fi.Group); err != nil {
logger.Warnf("Chown %s to (%s,%s): %s", obj.Key, fi.Owner, fi.Group, err)
}
atomic.AddUint64(&copied, 1)
logger.Debugf("Copied permissions for %s in %s", obj.Key, time.Now().Sub(start))
continue
}
err = copyInParallel(src, dst, obj)
if err != nil {
atomic.AddUint64(&failed, 1)
......@@ -305,6 +320,15 @@ func doSync(src, dst object.ObjectStorage, srckeys, dstkeys <-chan *object.Objec
logger.Warnf("Update mtime of %s: %s", obj.Key, err)
}
}
if config.Perms {
fi := (*object.File)(unsafe.Pointer(obj))
if err := dst.(object.FileSystem).Chmod(obj.Key, fi.Mode); err != nil {
logger.Warnf("Chmod %s to %o: %s", obj.Key, fi.Mode, err)
}
if err := dst.(object.FileSystem).Chown(obj.Key, fi.Owner, fi.Group); err != nil {
logger.Warnf("Chown %s to (%s,%s): %s", obj.Key, fi.Owner, fi.Group, err)
}
}
atomic.AddUint64(&copied, 1)
atomic.AddUint64(&copiedBytes, uint64(obj.Size))
logger.Debugf("Copied %s (%d bytes) in %s", obj.Key, obj.Size, time.Now().Sub(start))
......@@ -347,6 +371,13 @@ OUT:
} else if config.DeleteSrc && dstobj != nil && obj.Key == dstobj.Key && obj.Size == dstobj.Size {
obj.Size = markDelete
todo <- obj
} else if config.Perms {
f1 := (*object.File)(unsafe.Pointer(obj))
f2 := (*object.File)(unsafe.Pointer(dstobj))
if f2.Mode != f1.Mode || f2.Owner != f1.Owner || f2.Group != f1.Group {
obj.Size = markCopyPerms
todo <- obj
}
}
if dstobj != nil && dstobj.Key == obj.Key {
dstobj = nil
......
......@@ -52,7 +52,8 @@ func TestSync(t *testing.T) {
Start: "",
End: "",
Threads: 50,
Update: false,
Update: true,
Perms: true,
Dry: false,
DeleteSrc: false,
DeleteDst: false,
......@@ -62,12 +63,12 @@ func TestSync(t *testing.T) {
Quiet: false,
}
a := object.CreateStorage("mem", "", "", "")
a := object.CreateStorage("mem", "a", "", "")
a.Put("a", bytes.NewReader([]byte("a")))
a.Put("ab", bytes.NewReader([]byte("ab")))
a.Put("abc", bytes.NewReader([]byte("abc")))
b := object.CreateStorage("mem", "", "", "")
b := object.CreateStorage("mem", "b", "", "")
b.Put("ba", bytes.NewReader([]byte("ba")))
if err := Sync(a, b, config); err != nil {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册