未验证 提交 67940a5a 编写于 作者: D Davies Liu 提交者: GitHub

cache attribute and chunks for open files (#528)

* cache attribute and chunks for open files

* add mising file

* fix build

* bugfix

* add options

* update docs in CN

* fix build

* fix build sdk

* fix opencache

* bugfix

* speedup fs.Open()

* set KeepCache for cgofuse

* cleanup

* address comments

* address comments
上级 0f1b4156
......@@ -143,9 +143,10 @@ func (g *GateWay) NewGatewayLayer(creds auth.Credentials) (minio.ObjectLayer, er
c := g.ctx
addr := c.Args().Get(0)
m := meta.NewClient(addr, &meta.Config{
Retries: 10,
Strict: true,
ReadOnly: c.Bool("read-only"),
Retries: 10,
Strict: true,
ReadOnly: c.Bool("read-only"),
OpenCache: time.Duration(c.Float64("open-cache") * 1e9),
})
format, err := m.Load()
if err != nil {
......@@ -392,6 +393,7 @@ func (n *jfsObjects) ListBuckets(ctx context.Context) (buckets []minio.BucketInf
if eno != 0 {
return nil, jfsToObjectErr(ctx, eno)
}
defer f.Close(mctx)
entries, eno := f.Readdir(mctx, 10000)
if eno != 0 {
return nil, jfsToObjectErr(ctx, eno)
......@@ -638,7 +640,7 @@ func (n *jfsObjects) GetObject(ctx context.Context, bucket, object string, start
if err = n.checkBucket(ctx, bucket); err != nil {
return
}
f, eno := n.fs.Open(mctx, n.path(bucket, object), 0)
f, eno := n.fs.Open(mctx, n.path(bucket, object), vfs.MODE_MASK_R)
if eno != 0 {
return jfsToObjectErr(ctx, eno, bucket, object)
}
......
......@@ -95,6 +95,7 @@ func mount(c *cli.Context) error {
Strict: true,
CaseInsensi: strings.HasSuffix(mp, ":") && runtime.GOOS == "windows",
ReadOnly: readOnly,
OpenCache: time.Duration(c.Float64("open-cache") * 1e9),
MountPoint: mp,
})
format, err := m.Load()
......@@ -301,6 +302,12 @@ func clientFlags() []cli.Flag {
Name: "cache-partial-only",
Usage: "cache only random/small read",
},
&cli.Float64Flag{
Name: "open-cache",
Value: 0.0,
Usage: "open files cache timeout in seconds",
},
}
}
......
......@@ -176,6 +176,9 @@ file entry cache timeout in seconds (default: 1)
`--dir-entry-cache value`\
dir entry cache timeout in seconds (default: 1)
`--open-cache value`\
open file cache timeout in seconds (default: 0)
`--enable-xattr`\
enable extended attributes (xattr) (default: false)
......
......@@ -176,6 +176,9 @@ juicefs mount [command options] REDIS-URL MOUNTPOINT
`--dir-entry-cache value`\
目录项缓存过期时间;单位为秒 (默认: 1)
`--open-cache value`\
打开的文件的缓存过期时间;单位为秒 (默认: 0)
`--enable-xattr`\
启用扩展属性 (xattr) 功能 (默认: false)
......
......@@ -130,6 +130,7 @@ type File struct {
fs *FileSystem
sync.Mutex
flags uint32
offset int64
rdata vfs.FileReader
wdata vfs.FileWriter
......@@ -258,21 +259,19 @@ func (fs *FileSystem) Open(ctx meta.Context, path string, flags uint32) (f *File
}
if flags != 0 && !fi.IsDir() {
if ctx.Uid() != 0 {
err = fs.m.Access(ctx, fi.inode, uint8(flags), nil)
if err != 0 {
return nil, err
}
var oflags uint32 = syscall.O_RDONLY
if flags == vfs.MODE_MASK_W {
oflags = syscall.O_WRONLY
} else if flags&vfs.MODE_MASK_W != 0 {
oflags = syscall.O_RDWR
}
err = fs.m.Open(ctx, fi.inode, oflags, nil)
if err != 0 {
return
}
err = fs.m.Access(ctx, fi.inode, uint8(flags), fi.attr)
if err != 0 {
return nil, err
}
var oflags uint32 = syscall.O_RDONLY
if flags == vfs.MODE_MASK_W {
oflags = syscall.O_WRONLY
} else if flags&vfs.MODE_MASK_W != 0 {
oflags = syscall.O_RDWR
}
err = fs.m.Open(ctx, fi.inode, oflags, fi.attr)
if err != 0 {
return
}
}
......@@ -281,6 +280,7 @@ func (fs *FileSystem) Open(ctx meta.Context, path string, flags uint32) (f *File
f.inode = fi.inode
f.info = fi
f.fs = fs
f.flags = flags
return
}
......@@ -629,6 +629,7 @@ func (fs *FileSystem) Create(ctx meta.Context, p string, mode uint16) (f *File,
fi = AttrToFileInfo(inode, attr)
fi.name = path.Base(p)
f = &File{}
f.flags = vfs.MODE_MASK_W
f.path = p
f.inode = fi.inode
f.info = fi
......@@ -875,17 +876,20 @@ func (f *File) Close(ctx meta.Context) (err syscall.Errno) {
defer func() { f.fs.log(l, "Close (%s): %s", f.path, errstr(err)) }()
f.Lock()
defer f.Unlock()
f.offset = 0
if f.rdata != nil {
rdata := f.rdata
f.rdata = nil
time.AfterFunc(time.Second, func() {
rdata.Close(meta.Background)
})
}
if f.wdata != nil {
err = f.wdata.Close(meta.Background)
f.wdata = nil
if f.flags != 0 && !f.info.IsDir() {
f.offset = 0
if f.rdata != nil {
rdata := f.rdata
f.rdata = nil
time.AfterFunc(time.Second, func() {
rdata.Close(meta.Background)
})
}
if f.wdata != nil {
err = f.wdata.Close(meta.Background)
f.wdata = nil
}
f.fs.m.Close(ctx, f.inode)
}
return
}
......
......@@ -235,13 +235,15 @@ func (fs *fileSystem) Create(cancel <-chan struct{}, in *fuse.CreateIn, name str
func (fs *fileSystem) Open(cancel <-chan struct{}, in *fuse.OpenIn, out *fuse.OpenOut) (status fuse.Status) {
ctx := newContext(cancel, &in.InHeader)
defer releaseContext(ctx)
_, fh, err := vfs.Open(ctx, Ino(in.NodeId), in.Flags)
entry, fh, err := vfs.Open(ctx, Ino(in.NodeId), in.Flags)
if err != 0 {
return fuse.Status(err)
}
out.Fh = fh
if vfs.IsSpecialNode(Ino(in.NodeId)) {
out.OpenFlags |= fuse.FOPEN_DIRECT_IO
} else if entry.Attr.KeepCache {
out.OpenFlags |= fuse.FOPEN_KEEP_CACHE
}
return 0
}
......
......@@ -15,12 +15,15 @@
package meta
import "time"
// Config for clients.
type Config struct {
Strict bool // update ctime
Retries int
CaseInsensi bool
ReadOnly bool
OpenCache time.Duration
MountPoint string
}
......
......@@ -82,8 +82,10 @@ type Attr struct {
Nlink uint32 // number of links (sub-directories or hardlinks)
Length uint64 // length of regular file
Rdev uint32 // device number
Parent Ino // inode of parent, only for Directory
Full bool // the attributes are completed or not
Parent Ino // inode of parent, only for Directory
Full bool // the attributes are completed or not
KeepCache bool // whether to keep the cached page or not
}
func typeToStatType(_type uint8) uint32 {
......
package meta
import (
"sync"
"time"
)
type openFile struct {
attr Attr
refs int
lastCheck time.Time
chunks map[uint32][]Slice
}
type openfiles struct {
sync.Mutex
expire time.Duration
files map[Ino]*openFile
}
func newOpenFiles(expire time.Duration) *openfiles {
of := &openfiles{
expire: expire,
files: make(map[Ino]*openFile),
}
go of.cleanup()
return of
}
func (o *openfiles) cleanup() {
for {
o.Lock()
cutoff := time.Now().Add(-time.Hour)
for ino, of := range o.files {
if of.refs <= 0 && of.lastCheck.Before(cutoff) {
delete(o.files, ino)
}
}
o.Unlock()
time.Sleep(time.Second)
}
}
func (o *openfiles) OpenCheck(ino Ino, attr *Attr) bool {
o.Lock()
defer o.Unlock()
of, ok := o.files[ino]
if ok && time.Since(of.lastCheck) < o.expire {
if attr != nil {
*attr = of.attr
attr.KeepCache = true
}
of.refs++
return true
}
return false
}
func (o *openfiles) Open(ino Ino, attr *Attr) {
o.Lock()
defer o.Unlock()
of, ok := o.files[ino]
if !ok {
of = &openFile{}
of.chunks = make(map[uint32][]Slice)
o.files[ino] = of
} else if attr != nil && attr.Mtime == of.attr.Mtime && attr.Mtimensec == of.attr.Mtimensec {
attr.KeepCache = true
} else {
of.chunks = make(map[uint32][]Slice)
}
if attr != nil {
of.attr = *attr
}
of.refs++
of.lastCheck = time.Now()
}
func (o *openfiles) Close(ino Ino) bool {
o.Lock()
defer o.Unlock()
of, ok := o.files[ino]
if ok {
of.refs--
return of.refs <= 0
}
return true
}
func (o *openfiles) Check(ino Ino, attr *Attr) bool {
if attr == nil {
panic("attr is nil")
}
o.Lock()
defer o.Unlock()
of, ok := o.files[ino]
if ok && time.Since(of.lastCheck) < o.expire {
*attr = of.attr
return true
}
return false
}
func (o *openfiles) Update(ino Ino, attr *Attr) bool {
if attr == nil {
panic("attr is nil")
}
o.Lock()
defer o.Unlock()
of, ok := o.files[ino]
if ok {
if attr.Mtime != of.attr.Mtime || attr.Mtimensec != of.attr.Mtimensec {
of.chunks = make(map[uint32][]Slice)
}
of.attr = *attr
of.lastCheck = time.Now()
return true
}
return false
}
func (o *openfiles) IsOpen(ino Ino) bool {
o.Lock()
defer o.Unlock()
of, ok := o.files[ino]
return ok && of.refs > 0
}
func (o *openfiles) ReadChunk(ino Ino, indx uint32) ([]Slice, bool) {
o.Lock()
defer o.Unlock()
of, ok := o.files[ino]
if !ok {
return nil, false
}
cs, ok := of.chunks[indx]
return cs, ok
}
func (o *openfiles) CacheChunk(ino Ino, indx uint32, cs []Slice) {
o.Lock()
defer o.Unlock()
of, ok := o.files[ino]
if ok {
of.chunks[indx] = cs
}
}
func (o *openfiles) InvalidateChunk(ino Ino, indx uint32) {
o.Lock()
defer o.Unlock()
of, ok := o.files[ino]
if ok {
if indx == 0xFFFFFFFF {
of.chunks = make(map[uint32][]Slice)
} else {
delete(of.chunks, indx)
}
of.lastCheck = time.Unix(0, 0)
}
}
......@@ -81,7 +81,7 @@ type redisMeta struct {
sid int64
usedSpace uint64
usedInodes uint64
openFiles map[Ino]int
of *openfiles
removedFiles map[Ino]bool
compacting map[uint64]bool
deleting chan int
......@@ -153,7 +153,7 @@ func newRedisMeta(url string, conf *Config) (Meta, error) {
m := &redisMeta{
conf: conf,
rdb: rdb,
openFiles: make(map[Ino]int),
of: newOpenFiles(conf.OpenCache),
removedFiles: make(map[Ino]bool),
compacting: make(map[uint64]bool),
deleting: make(chan int, 2),
......@@ -775,9 +775,15 @@ func (r *redisMeta) GetAttr(ctx Context, inode Ino, attr *Attr) syscall.Errno {
c, cancel = context.WithTimeout(ctx, time.Millisecond*300)
defer cancel()
}
if r.conf.OpenCache > 0 && r.of.Check(inode, attr) {
return 0
}
a, err := r.rdb.Get(c, r.inodeKey(inode)).Bytes()
if err == nil {
r.parseAttr(a, attr)
if r.conf.OpenCache > 0 {
r.of.Update(inode, attr)
}
}
if err != nil && inode == 1 {
err = nil
......@@ -881,6 +887,7 @@ func (r *redisMeta) txn(ctx Context, txf func(tx *redis.Tx) error, keys ...strin
}
func (r *redisMeta) Truncate(ctx Context, inode Ino, flags uint8, length uint64, attr *Attr) syscall.Errno {
defer func() { r.of.InvalidateChunk(inode, 0xFFFFFFFF) }()
return r.txn(ctx, func(tx *redis.Tx) error {
var t Attr
a, err := tx.Get(ctx, r.inodeKey(inode)).Bytes()
......@@ -987,6 +994,7 @@ func (r *redisMeta) Fallocate(ctx Context, inode Ino, mode uint8, off uint64, si
if size == 0 {
return syscall.EINVAL
}
defer func() { r.of.InvalidateChunk(inode, 0xFFFFFFFF) }()
return r.txn(ctx, func(tx *redis.Tx) error {
var t Attr
a, err := tx.Get(ctx, r.inodeKey(inode)).Bytes()
......@@ -1229,9 +1237,7 @@ func (r *redisMeta) Mkdir(ctx Context, parent Ino, name string, mode uint16, cum
func (r *redisMeta) Create(ctx Context, parent Ino, name string, mode uint16, cumask uint16, inode *Ino, attr *Attr) syscall.Errno {
err := r.Mknod(ctx, parent, name, TypeFile, mode, cumask, 0, inode, attr)
if err == 0 && inode != nil {
r.Lock()
r.openFiles[*inode] = 1
r.Unlock()
r.of.Open(*inode, attr)
}
return err
}
......@@ -1287,9 +1293,7 @@ func (r *redisMeta) Unlink(ctx Context, parent Ino, name string) syscall.Errno {
attr.Nlink--
var opened bool
if _type == TypeFile && attr.Nlink == 0 {
r.Lock()
opened = r.openFiles[inode] > 0
r.Unlock()
opened = r.of.IsOpen(inode)
}
_, err = tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
......@@ -1569,9 +1573,7 @@ func (r *redisMeta) Rename(ctx Context, parentSrc Ino, nameSrc string, parentDst
tattr.Ctime = now.Unix()
tattr.Ctimensec = uint32(now.Nanosecond())
} else if dtyp == TypeFile {
r.Lock()
opened = r.openFiles[dino] > 0
r.Unlock()
opened = r.of.IsOpen(dino)
}
}
if ctx.Uid() != 0 && dattr.Mode&01000 != 0 && ctx.Uid() != dattr.Uid && ctx.Uid() != tattr.Uid {
......@@ -1909,27 +1911,27 @@ func (r *redisMeta) deleteInode(inode Ino) error {
}
func (r *redisMeta) Open(ctx Context, inode Ino, flags uint32, attr *Attr) syscall.Errno {
var err syscall.Errno
if attr != nil {
err = r.GetAttr(ctx, inode, attr)
}
if r.conf.ReadOnly && flags&(syscall.O_WRONLY|syscall.O_RDWR|syscall.O_TRUNC|syscall.O_APPEND) != 0 {
return syscall.EROFS
}
if r.conf.OpenCache > 0 && r.of.OpenCheck(inode, attr) {
return 0
}
var err syscall.Errno
if attr != nil && !attr.Full {
err = r.GetAttr(ctx, inode, attr)
}
if err == 0 {
r.Lock()
r.openFiles[inode] = r.openFiles[inode] + 1
r.Unlock()
// TODO: tracking update in Redis
r.of.Open(inode, attr)
}
return 0
}
func (r *redisMeta) Close(ctx Context, inode Ino) syscall.Errno {
r.Lock()
defer r.Unlock()
refs := r.openFiles[inode]
if refs <= 1 {
delete(r.openFiles, inode)
if r.of.Close(inode) {
r.Lock()
defer r.Unlock()
if r.removedFiles[inode] {
delete(r.removedFiles, inode)
go func() {
......@@ -1938,8 +1940,6 @@ func (r *redisMeta) Close(ctx Context, inode Ino) syscall.Errno {
}
}()
}
} else {
r.openFiles[inode] = refs - 1
}
return 0
}
......@@ -1968,12 +1968,17 @@ func buildSlice(ss []*slice) []Slice {
}
func (r *redisMeta) Read(ctx Context, inode Ino, indx uint32, chunks *[]Slice) syscall.Errno {
if cs, ok := r.of.ReadChunk(inode, indx); ok {
*chunks = cs
return 0
}
vals, err := r.rdb.LRange(ctx, r.chunkKey(inode, indx), 0, 1000000).Result()
if err != nil {
return errno(err)
}
ss := readSlices(vals)
*chunks = buildSlice(ss)
r.of.CacheChunk(inode, indx, *chunks)
if !r.conf.ReadOnly && (len(vals) >= 5 || len(*chunks) >= 5) {
go r.compactChunk(inode, indx, false)
}
......@@ -1989,6 +1994,7 @@ func (r *redisMeta) NewChunk(ctx Context, inode Ino, indx uint32, offset uint32,
}
func (r *redisMeta) Write(ctx Context, inode Ino, indx uint32, off uint32, slice Slice) syscall.Errno {
defer func() { r.of.InvalidateChunk(inode, indx) }()
return r.txn(ctx, func(tx *redis.Tx) error {
var attr Attr
a, err := tx.Get(ctx, r.inodeKey(inode)).Bytes()
......@@ -2030,6 +2036,7 @@ func (r *redisMeta) Write(ctx Context, inode Ino, indx uint32, off uint32, slice
}
func (r *redisMeta) CopyFileRange(ctx Context, fin Ino, offIn uint64, fout Ino, offOut uint64, size uint64, flags uint32, copied *uint64) syscall.Errno {
defer func() { r.of.InvalidateChunk(fout, 0xFFFFFFFF) }()
return r.txn(ctx, func(tx *redis.Tx) error {
rs, err := tx.MGet(ctx, r.inodeKey(fin), r.inodeKey(fout)).Result()
if err != nil {
......@@ -2523,6 +2530,7 @@ func (r *redisMeta) compactChunk(inode Ino, indx uint32, force bool) {
logger.Infof("compaction for %d:%d is wasted, delete slice %d (%d bytes)", inode, indx, chunkid, size)
r.deleteSlice(ctx, chunkid, size)
} else if errno == 0 {
r.of.InvalidateChunk(inode, indx)
// reset it to zero
r.rdb.HIncrBy(ctx, sliceRefs, r.sliceKey(chunkid, size), -1)
r.cleanupZeroRef(r.sliceKey(chunkid, size))
......
......@@ -137,7 +137,7 @@ type dbMeta struct {
engine *xorm.Engine
sid uint64
openFiles map[Ino]int
of *openfiles
removedFiles map[Ino]bool
compacting map[uint64]bool
deleting chan int
......@@ -173,7 +173,7 @@ func newSQLMeta(driver, dsn string, conf *Config) (*dbMeta, error) {
m := &dbMeta{
conf: conf,
engine: engine,
openFiles: make(map[Ino]int),
of: newOpenFiles(conf.OpenCache),
removedFiles: make(map[Ino]bool),
compacting: make(map[uint64]bool),
deleting: make(chan int, 2),
......@@ -666,6 +666,9 @@ func (m *dbMeta) Access(ctx Context, inode Ino, mmask uint8, attr *Attr) syscall
}
func (m *dbMeta) GetAttr(ctx Context, inode Ino, attr *Attr) syscall.Errno {
if m.conf.OpenCache > 0 && m.of.Check(inode, attr) {
return 0
}
var n = node{Inode: inode}
ok, err := m.engine.Get(&n)
if err != nil && inode == 1 {
......@@ -682,6 +685,9 @@ func (m *dbMeta) GetAttr(ctx Context, inode Ino, attr *Attr) syscall.Errno {
return syscall.ENOENT
}
m.parseAttr(&n, attr)
if m.conf.OpenCache > 0 {
m.of.Update(inode, attr)
}
return 0
}
......@@ -772,6 +778,7 @@ func (m *dbMeta) appendSlice(s *xorm.Session, inode Ino, indx uint32, buf []byte
}
func (m *dbMeta) Truncate(ctx Context, inode Ino, flags uint8, length uint64, attr *Attr) syscall.Errno {
defer func() { m.of.InvalidateChunk(inode, 0xFFFFFFFF) }()
var newSpace int64
err := m.txn(func(s *xorm.Session) error {
var n = node{Inode: inode}
......@@ -861,6 +868,7 @@ func (m *dbMeta) Fallocate(ctx Context, inode Ino, mode uint8, off uint64, size
if size == 0 {
return syscall.EINVAL
}
defer func() { m.of.InvalidateChunk(inode, 0xFFFFFFFF) }()
var newSpace int64
err := m.txn(func(s *xorm.Session) error {
var n = node{Inode: inode}
......@@ -1054,9 +1062,7 @@ func (m *dbMeta) Mkdir(ctx Context, parent Ino, name string, mode uint16, cumask
func (m *dbMeta) Create(ctx Context, parent Ino, name string, mode uint16, cumask uint16, inode *Ino, attr *Attr) syscall.Errno {
err := m.Mknod(ctx, parent, name, TypeFile, mode, cumask, 0, inode, attr)
if err == 0 && inode != nil {
m.Lock()
m.openFiles[*inode] = 1
m.Unlock()
m.of.Open(*inode, attr)
}
return err
}
......@@ -1113,9 +1119,7 @@ func (m *dbMeta) Unlink(ctx Context, parent Ino, name string) syscall.Errno {
n.Ctime = now
var opened bool
if e.Type == TypeFile && n.Nlink == 0 {
m.Lock()
opened = m.openFiles[Ino(e.Inode)] > 0
m.Unlock()
opened = m.of.IsOpen(e.Inode)
}
if _, err := s.Delete(&edge{Parent: parent, Name: name}); err != nil {
......@@ -1359,9 +1363,7 @@ func (m *dbMeta) Rename(ctx Context, parentSrc Ino, nameSrc string, parentDst In
if dn.Nlink > 0 {
dn.Ctime = time.Now().UnixNano() / 1e3
} else if dn.Type == TypeFile {
m.Lock()
opened = m.openFiles[Ino(dn.Inode)] > 0
m.Unlock()
opened = m.of.IsOpen(dn.Inode)
}
}
if ctx.Uid() != 0 && dpn.Mode&01000 != 0 && ctx.Uid() != dpn.Uid && ctx.Uid() != dn.Uid {
......@@ -1676,27 +1678,26 @@ func (m *dbMeta) deleteInode(inode Ino) error {
}
func (m *dbMeta) Open(ctx Context, inode Ino, flags uint32, attr *Attr) syscall.Errno {
if m.conf.ReadOnly && flags&(syscall.O_WRONLY|syscall.O_RDWR|syscall.O_TRUNC|syscall.O_APPEND) != 0 {
return syscall.EROFS
}
if m.conf.OpenCache > 0 && m.of.OpenCheck(inode, attr) {
return 0
}
var err syscall.Errno
if attr != nil {
err = m.GetAttr(ctx, inode, attr)
}
if m.conf.ReadOnly && flags&(syscall.O_WRONLY|syscall.O_RDWR|syscall.O_TRUNC|syscall.O_APPEND) != 0 {
return syscall.EROFS
}
if err == 0 {
m.Lock()
m.openFiles[inode] = m.openFiles[inode] + 1
m.Unlock()
m.of.Open(inode, attr)
}
return 0
}
func (m *dbMeta) Close(ctx Context, inode Ino) syscall.Errno {
m.Lock()
defer m.Unlock()
refs := m.openFiles[inode]
if refs <= 1 {
delete(m.openFiles, inode)
if m.of.Close(inode) {
m.Lock()
defer m.Unlock()
if m.removedFiles[inode] {
delete(m.removedFiles, inode)
go func() {
......@@ -1708,13 +1709,15 @@ func (m *dbMeta) Close(ctx Context, inode Ino) syscall.Errno {
}
}()
}
} else {
m.openFiles[inode] = refs - 1
}
return 0
}
func (m *dbMeta) Read(ctx Context, inode Ino, indx uint32, chunks *[]Slice) syscall.Errno {
if cs, ok := m.of.ReadChunk(inode, indx); ok {
*chunks = cs
return 0
}
var c chunk
_, err := m.engine.Where("inode=? and indx=?", inode, indx).Get(&c)
if err != nil {
......@@ -1722,6 +1725,7 @@ func (m *dbMeta) Read(ctx Context, inode Ino, indx uint32, chunks *[]Slice) sysc
}
ss := readSliceBuf(c.Slices)
*chunks = buildSlice(ss)
m.of.CacheChunk(inode, indx, *chunks)
if !m.conf.ReadOnly && (len(c.Slices)/sliceBytes >= 5 || len(*chunks) >= 5) {
go m.compactChunk(inode, indx, false)
}
......@@ -1746,6 +1750,7 @@ func (m *dbMeta) NewChunk(ctx Context, inode Ino, indx uint32, offset uint32, ch
}
func (m *dbMeta) Write(ctx Context, inode Ino, indx uint32, off uint32, slice Slice) syscall.Errno {
defer func() { m.of.InvalidateChunk(inode, indx) }()
var newSpace int64
err := m.txn(func(s *xorm.Session) error {
var n = node{Inode: inode}
......@@ -1803,6 +1808,7 @@ func (m *dbMeta) Write(ctx Context, inode Ino, indx uint32, off uint32, slice Sl
func (m *dbMeta) CopyFileRange(ctx Context, fin Ino, offIn uint64, fout Ino, offOut uint64, size uint64, flags uint32, copied *uint64) syscall.Errno {
var newSpace int64
defer func() { m.of.InvalidateChunk(fout, 0xFFFFFFFF) }()
err := m.txn(func(s *xorm.Session) error {
var nin, nout = node{Inode: fin}, node{Inode: fout}
ok, err := s.Get(&nin)
......@@ -2176,6 +2182,7 @@ func (m *dbMeta) compactChunk(inode Ino, indx uint32, force bool) {
logger.Infof("compaction for %d:%d is wasted, delete slice %d (%d bytes)", inode, indx, chunkid, size)
m.deleteSlice(chunkid, size)
} else if err == nil {
m.of.InvalidateChunk(inode, indx)
for _, s := range ss {
var ref = chunkRef{Chunkid: s.chunkid}
ok, err := m.engine.Get(&ref)
......
......@@ -263,11 +263,6 @@ func (j *juice) Open(path string, flags int) (e int, fh uint64) {
return
}
func cache_mode(mattr uint8) (bool, bool) {
var direct_io, keep_cache bool
return direct_io, keep_cache
}
// Open opens a file.
// The flags are a combination of the fuse.O_* constants.
func (j *juice) OpenEx(path string, fi *fuse.FileInfo_t) (e int) {
......@@ -281,11 +276,11 @@ func (j *juice) OpenEx(path string, fi *fuse.FileInfo_t) (e int) {
entry, fh, errno := vfs.Open(ctx, f.Inode(), uint32(fi.Flags))
if errno == 0 {
fi.Fh = fh
fi.DirectIo, fi.KeepCache = cache_mode(entry.Attr.Flags)
if vfs.IsSpecialNode(f.Inode()) {
fi.DirectIo = true
} else {
fi.KeepCache = entry.Attr.KeepCache
}
fi.NonSeekable = false
j.Lock()
j.handlers[fh] = f.Inode()
j.Unlock()
......
......@@ -213,29 +213,29 @@ func freeHandle(fd int) {
}
type javaConf struct {
MetaURL string `json:"meta"`
ReadOnly bool `json:"readOnly"`
CacheDir string `json:"cacheDir"`
CacheSize int64 `json:"cacheSize"`
FreeSpace string `json:"freeSpace"`
AutoCreate bool `json:"autoCreate"`
CacheFullBlock bool `json:"cacheFullBlock"`
Writeback bool `json:"writeback"`
OpenCache bool `json:"opencache"`
MemorySize int `json:"memorySize"`
Prefetch int `json:"prefetch"`
Readahead int `json:"readahead"`
UploadLimit int `json:"uploadLimit"`
MaxUploads int `json:"maxUploads"`
GetTimeout int `json:"getTimeout"`
PutTimeout int `json:"putTimeout"`
FastResolve bool `json:"fastResolve"`
Debug bool `json:"debug"`
NoUsageReport bool `json:"noUsageReport"`
AccessLog string `json:"accessLog"`
PushGateway string `json:"pushGateway"`
PushInterval int `json:"pushInterval"`
PushAuth string `json:"pushAuth"`
MetaURL string `json:"meta"`
ReadOnly bool `json:"readOnly"`
OpenCache float64 `json:"openCache"`
CacheDir string `json:"cacheDir"`
CacheSize int64 `json:"cacheSize"`
FreeSpace string `json:"freeSpace"`
AutoCreate bool `json:"autoCreate"`
CacheFullBlock bool `json:"cacheFullBlock"`
Writeback bool `json:"writeback"`
MemorySize int `json:"memorySize"`
Prefetch int `json:"prefetch"`
Readahead int `json:"readahead"`
UploadLimit int `json:"uploadLimit"`
MaxUploads int `json:"maxUploads"`
GetTimeout int `json:"getTimeout"`
PutTimeout int `json:"putTimeout"`
FastResolve bool `json:"fastResolve"`
Debug bool `json:"debug"`
NoUsageReport bool `json:"noUsageReport"`
AccessLog string `json:"accessLog"`
PushGateway string `json:"pushGateway"`
PushInterval int `json:"pushInterval"`
PushAuth string `json:"pushAuth"`
}
func getOrCreate(name, user, group, superuser, supergroup string, f func() *fs.FileSystem) uintptr {
......@@ -314,7 +314,12 @@ func jfs_init(cname, jsonConf, user, group, superuser, supergroup *C.char) uintp
utils.InitLoggers(false)
addr := jConf.MetaURL
m := meta.NewClient(addr, &meta.Config{Retries: 10, Strict: true, ReadOnly: jConf.ReadOnly})
m := meta.NewClient(addr, &meta.Config{
Retries: 10,
Strict: true,
ReadOnly: jConf.ReadOnly,
OpenCache: time.Duration(jConf.OpenCache * 1e9),
})
format, err := m.Load()
if err != nil {
logger.Fatalf("load setting: %s", err)
......@@ -726,12 +731,11 @@ func jfs_stat1(pid int, h uintptr, cpath *C.char, buf uintptr) int {
if w == nil {
return EINVAL
}
f, err := w.Open(w.withPid(pid), C.GoString(cpath), 0)
info, err := w.Stat(w.withPid(pid), C.GoString(cpath))
if err != 0 {
return errno(err)
}
info, _ := f.Stat()
return fill_stat(w, utils.NewNativeBuffer(toBuf(buf, 130)), info.(*fs.FileStat))
return fill_stat(w, utils.NewNativeBuffer(toBuf(buf, 130)), info)
}
//export jfs_lstat1
......@@ -758,6 +762,7 @@ func jfs_summary(pid int, h uintptr, cpath *C.char, buf uintptr) int {
if err != 0 {
return errno(err)
}
defer f.Close(ctx)
summary, err := f.Summary(ctx, 0, 1)
if err != 0 {
return errno(err)
......@@ -792,6 +797,7 @@ func jfs_chmod(pid int, h uintptr, cpath *C.char, mode C.mode_t) int {
if err != 0 {
return errno(err)
}
defer f.Close(w.withPid(pid))
return errno(f.Chmod(w.withPid(pid), uint16(mode)))
}
......@@ -805,6 +811,7 @@ func jfs_chown(pid int, h uintptr, cpath *C.char, uid uint32, gid uint32) int {
if err != 0 {
return errno(err)
}
defer f.Close(w.withPid(pid))
return errno(f.Chown(w.withPid(pid), uid, gid))
}
......@@ -818,6 +825,7 @@ func jfs_utime(pid int, h uintptr, cpath *C.char, mtime, atime int64) int {
if err != 0 {
return errno(err)
}
defer f.Close(w.withPid(pid))
return errno(f.Utime(w.withPid(pid), atime, mtime))
}
......@@ -831,6 +839,7 @@ func jfs_setOwner(pid int, h uintptr, cpath *C.char, owner *C.char, group *C.cha
if err != 0 {
return errno(err)
}
defer f.Close(w.withPid(pid))
st, _ := f.Stat()
uid := uint32(st.(*fs.FileStat).Uid())
gid := uint32(st.(*fs.FileStat).Gid())
......@@ -909,7 +918,7 @@ func jfs_concat(pid int, h uintptr, _dst *C.char, buf uintptr, bufsize int) int
}
dst := C.GoString(_dst)
ctx := w.withPid(pid)
df, err := w.Open(ctx, dst, 2)
df, err := w.Open(ctx, dst, vfs.MODE_MASK_W)
if err != 0 {
return errno(err)
}
......
......@@ -306,13 +306,14 @@ public class JuiceFileSystemImpl extends FileSystem {
for (String key : keys) {
obj.put(key, getConf(conf, key, ""));
}
String[] bkeys = new String[]{"debug", "writeback", "opencache"};
String[] bkeys = new String[]{"debug", "writeback"};
for (String key : bkeys) {
obj.put(key, Boolean.valueOf(getConf(conf, key, "false")));
}
obj.put("readOnly", Boolean.valueOf(getConf(conf, "read-only", "false")));
obj.put("cacheDir", getConf(conf, "cache-dir", "memory"));
obj.put("cacheSize", Integer.valueOf(getConf(conf, "cache-size", "100")));
obj.put("openCache", Float.valueOf(getConf(conf, "open-cache", "0.0")));
obj.put("cacheFullBlock", Boolean.valueOf(getConf(conf, "cache-full-block", "true")));
obj.put("metacache", Boolean.valueOf(getConf(conf, "metacache", "true")));
obj.put("autoCreate", Boolean.valueOf(getConf(conf, "auto-create-cache-dir", "true")));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册