未验证 提交 c1a75be3 编写于 作者: D Davies Liu 提交者: GitHub

add quota for volume (#495)

* add quota for volume

* address comments

* update docs
上级 81014193
......@@ -172,6 +172,8 @@ func format(c *cli.Context) error {
AccessKey: c.String("access-key"),
SecretKey: c.String("secret-key"),
Shards: c.Int("shards"),
Capacity: c.Uint64("capacity") << 30,
Inodes: c.Uint64("inodes"),
BlockSize: fixObjectSize(c.Int("block-size")),
Compression: c.String("compress"),
}
......@@ -244,6 +246,16 @@ func formatFlags() *cli.Command {
Value: 4096,
Usage: "size of block in KiB",
},
&cli.Uint64Flag{
Name: "capacity",
Value: 0,
Usage: "the limit for space in GiB",
},
&cli.Uint64Flag{
Name: "inodes",
Value: 0,
Usage: "the limit for number of inodes",
},
&cli.StringFlag{
Name: "compress",
Value: "none",
......
......@@ -72,6 +72,12 @@ size of block in KiB (default: 4096)
`--compress value`\
compression algorithm (lz4, zstd, none) (default: "none")
`--capacity value`\
the limit for space in GiB (default: unlimited)
`--inodes value`\
the limit for number of inodes (default: unlimited)
`--shards value`\
store the blocks into N buckets by hash of key (default: 0)
......
......@@ -71,6 +71,12 @@ juicefs format [command options] REDIS-URL NAME
`--block-size value`\
size of block in KiB (default: 4096)
`--capacity value`\
the limit for space in GiB (default: unlimited)
`--inodes value`\
the limit for number of inodes (default: unlimited)
`--compress value`\
compression algorithm (lz4, zstd, none) (default: "none")
......
......@@ -34,5 +34,7 @@ type Format struct {
Compression string
Shards int
Partitions int
Capacity uint64
Inodes uint64
EncryptKey string `json:",omitempty"`
}
......@@ -29,6 +29,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"
......@@ -72,10 +73,13 @@ const sliceRefs = "sliceRef"
type redisMeta struct {
sync.Mutex
conf *Config
fmt Format
rdb *redis.Client
txlocks [1024]sync.Mutex // Pessimistic locks to reduce conflict on Redis
sid int64
usedSpace uint64
usedInodes uint64
openFiles map[Ino]int
removedFiles map[Ino]bool
compacting map[uint64]bool
......@@ -177,10 +181,12 @@ func (r *redisMeta) Init(format Format, force bool) error {
old.SecretKey = "removed"
logger.Warnf("Existing volume will be overwrited: %+v", old)
} else {
// only AccessKey and SecretKey can be safely updated.
format.UUID = old.UUID
// these can be safely updated.
old.AccessKey = format.AccessKey
old.SecretKey = format.SecretKey
old.Capacity = format.Capacity
old.Inodes = format.Inodes
if format != old {
old.SecretKey = ""
format.SecretKey = ""
......@@ -197,6 +203,7 @@ func (r *redisMeta) Init(format Format, force bool) error {
if err != nil {
return err
}
r.fmt = format
if body != nil {
return nil
}
......@@ -223,12 +230,11 @@ func (r *redisMeta) Load() (*Format, error) {
if err != nil {
return nil, err
}
var format Format
err = json.Unmarshal(body, &format)
err = json.Unmarshal(body, &r.fmt)
if err != nil {
return nil, fmt.Errorf("json: %s", err)
}
return &format, nil
return &r.fmt, nil
}
func (r *redisMeta) NewSession() error {
......@@ -260,12 +266,30 @@ func (r *redisMeta) NewSession() error {
r.shaResolve = ""
}
go r.refreshUsage()
go r.refreshSession()
go r.cleanupDeletedFiles()
go r.cleanupSlices()
return nil
}
func (r *redisMeta) refreshUsage() {
for {
used, _ := r.rdb.IncrBy(Background, usedSpace, 0).Result()
atomic.StoreUint64(&r.usedSpace, uint64(used))
inodes, _ := r.rdb.IncrBy(Background, totalInodes, 0).Result()
atomic.StoreUint64(&r.usedInodes, uint64(inodes))
time.Sleep(time.Second * 10)
}
}
func (r *redisMeta) checkQuota(size, inodes int64) bool {
if size > 0 && r.fmt.Capacity > 0 && atomic.LoadUint64(&r.usedSpace)+uint64(size) > r.fmt.Capacity {
return true
}
return inodes > 0 && r.fmt.Inodes > 0 && atomic.LoadUint64(&r.usedInodes)+uint64(inodes) > r.fmt.Inodes
}
func (r *redisMeta) getSession(sid string, detail bool) (*Session, error) {
ctx := Background
info, err := r.rdb.HGet(ctx, sessionInfos, sid).Bytes()
......@@ -489,7 +513,11 @@ func align4K(length uint64) int64 {
}
func (r *redisMeta) StatFS(ctx Context, totalspace, availspace, iused, iavail *uint64) syscall.Errno {
*totalspace = 1 << 50
if r.fmt.Capacity > 0 {
*totalspace = r.fmt.Capacity
} else {
*totalspace = 1 << 50
}
c, cancel := context.WithTimeout(ctx, time.Millisecond*300)
defer cancel()
used, _ := r.rdb.IncrBy(c, usedSpace, 0).Result()
......@@ -497,8 +525,14 @@ func (r *redisMeta) StatFS(ctx Context, totalspace, availspace, iused, iavail *u
used = 0
}
used = ((used >> 16) + 1) << 16 // aligned to 64K
for used*10 > int64(*totalspace)*8 {
*totalspace *= 2
if r.fmt.Capacity > 0 {
if used > int64(*totalspace) {
*totalspace = uint64(used)
}
} else {
for used*10 > int64(*totalspace)*8 {
*totalspace *= 2
}
}
*availspace = *totalspace - uint64(used)
inodes, _ := r.rdb.IncrBy(c, totalInodes, 0).Result()
......@@ -506,7 +540,15 @@ func (r *redisMeta) StatFS(ctx Context, totalspace, availspace, iused, iavail *u
inodes = 0
}
*iused = uint64(inodes)
*iavail = 10 << 20
if r.fmt.Inodes > 0 {
if *iused > r.fmt.Inodes {
*iavail = 0
} else {
*iavail = r.fmt.Inodes - *iused
}
} else {
*iavail = 10 << 20
}
return 0
}
......@@ -841,6 +883,9 @@ func (r *redisMeta) Truncate(ctx Context, inode Ino, flags uint8, length uint64,
old := t.Length
var zeroChunks []uint32
if length > old {
if r.checkQuota(align4K(length)-align4K(old), 0) {
return syscall.ENOSPC
}
if (length-old)/ChunkSize >= 100 {
// super large
var cursor uint64
......@@ -952,6 +997,9 @@ func (r *redisMeta) Fallocate(ctx Context, inode Ino, mode uint8, off uint64, si
}
old := t.Length
if length > old && r.checkQuota(align4K(length)-align4K(old), 0) {
return syscall.ENOSPC
}
t.Length = length
now := time.Now()
t.Ctime = now.Unix()
......@@ -1079,6 +1127,9 @@ func (r *redisMeta) Mknod(ctx Context, parent Ino, name string, _type uint8, mod
}
func (r *redisMeta) mknod(ctx Context, parent Ino, name string, _type uint8, mode, cumask uint16, rdev uint32, path string, inode *Ino, attr *Attr) syscall.Errno {
if r.checkQuota(4<<10, 1) {
return syscall.ENOSPC
}
ino, err := r.nextInode()
if err != nil {
return errno(err)
......@@ -1934,6 +1985,9 @@ func (r *redisMeta) Write(ctx Context, inode Ino, indx uint32, off uint32, slice
added = align4K(newleng) - align4K(attr.Length)
attr.Length = newleng
}
if r.checkQuota(added, 0) {
return syscall.ENOSPC
}
now := time.Now()
attr.Mtime = now.Unix()
attr.Mtimensec = uint32(now.Nanosecond())
......@@ -1991,6 +2045,9 @@ func (r *redisMeta) CopyFileRange(ctx Context, fin Ino, offIn uint64, fout Ino,
added = align4K(newleng) - align4K(attr.Length)
attr.Length = newleng
}
if r.checkQuota(added, 0) {
return syscall.ENOSPC
}
now := time.Now()
attr.Mtime = now.Unix()
attr.Mtimensec = uint32(now.Nanosecond())
......
......@@ -23,6 +23,7 @@ import (
"fmt"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"
......@@ -130,6 +131,7 @@ type freeID struct {
type dbMeta struct {
sync.Mutex
conf *Config
fmt Format
engine *xorm.Engine
sid uint64
......@@ -141,6 +143,8 @@ type dbMeta struct {
msgCallbacks *msgCallbacks
newSpace int64
newInodes int64
usedSpace int64
usedInodes int64
freeMu sync.Mutex
freeInodes freeID
......@@ -205,10 +209,12 @@ func (m *dbMeta) Init(format Format, force bool) error {
old.SecretKey = "removed"
logger.Warnf("Existing volume will be overwrited: %+v", old)
} else {
// only AccessKey and SecretKey can be safely updated.
format.UUID = old.UUID
// these can be safely updated.
old.AccessKey = format.AccessKey
old.SecretKey = format.SecretKey
old.Capacity = format.Capacity
old.Inodes = format.Inodes
if format != *old {
old.SecretKey = ""
format.SecretKey = ""
......@@ -228,6 +234,7 @@ func (m *dbMeta) Init(format Format, force bool) error {
logger.Fatalf("json: %s", err)
}
m.fmt = format
return m.txn(func(s *xorm.Session) error {
var set = &setting{"format", string(data)}
now := time.Now()
......@@ -261,12 +268,11 @@ func (m *dbMeta) Load() (*Format, error) {
return nil, err
}
var format Format
err = json.Unmarshal([]byte(s.Value), &format)
err = json.Unmarshal([]byte(s.Value), &m.fmt)
if err != nil {
return nil, fmt.Errorf("json: %s", err)
}
return &format, nil
return &m.fmt, nil
}
func (m *dbMeta) NewSession() error {
......@@ -295,6 +301,7 @@ func (m *dbMeta) NewSession() error {
m.sid = v
logger.Debugf("session is %d", m.sid)
go m.refreshUsage()
go m.refreshSession()
go m.cleanupDeletedFiles()
go m.cleanupSlices()
......@@ -302,6 +309,29 @@ func (m *dbMeta) NewSession() error {
return nil
}
func (m *dbMeta) refreshUsage() {
for {
var c = counter{Name: "usedSpace"}
_, err := m.engine.Get(&c)
if err == nil {
atomic.StoreInt64(&m.usedSpace, c.Value)
}
c = counter{Name: "totalInodes"}
_, err = m.engine.Get(&c)
if err == nil {
atomic.StoreInt64(&m.usedInodes, c.Value)
}
time.Sleep(time.Second * 10)
}
}
func (r *dbMeta) checkQuota(size, inodes int64) bool {
if size > 0 && r.fmt.Capacity > 0 && atomic.LoadInt64(&r.usedSpace)+atomic.LoadInt64(&r.newSpace)+size > int64(r.fmt.Capacity) {
return true
}
return inodes > 0 && r.fmt.Inodes > 0 && atomic.LoadInt64(&r.usedInodes)+atomic.LoadInt64(&r.newInodes)+inodes > int64(r.fmt.Inodes)
}
func (m *dbMeta) getSession(row *session, detail bool) (*Session, error) {
var s Session
if row.Info == nil { // legacy client has no info
......@@ -490,18 +520,14 @@ func (m *dbMeta) parseAttr(n *node, attr *Attr) {
}
func (m *dbMeta) updateStats(space int64, inodes int64) {
m.Lock()
m.newSpace += space
m.newInodes += inodes
m.Unlock()
atomic.AddInt64(&m.newSpace, space)
atomic.AddInt64(&m.newInodes, inodes)
}
func (m *dbMeta) flushStats() {
for {
m.Lock()
newSpace, newInodes := m.newSpace, m.newInodes
m.newSpace, m.newInodes = 0, 0
m.Unlock()
newSpace := atomic.SwapInt64(&m.newSpace, 0)
newInodes := atomic.SwapInt64(&m.newInodes, 0)
if newSpace != 0 || newInodes != 0 {
err := m.txn(func(s *xorm.Session) error {
_, err := s.Exec("UPDATE jfs_counter SET value=value+ (CASE name WHEN 'usedSpace' THEN ? ELSE ? END) WHERE name='usedSpace' OR name='totalInodes' ", newSpace, newInodes)
......@@ -517,10 +543,8 @@ func (m *dbMeta) flushStats() {
}
func (m *dbMeta) StatFS(ctx Context, totalspace, availspace, iused, iavail *uint64) syscall.Errno {
m.Lock()
usedSpace := m.newSpace
inodes := m.newInodes
m.Unlock()
usedSpace := atomic.LoadInt64(&m.newSpace)
inodes := atomic.LoadInt64(&m.newInodes)
var c = counter{Name: "usedSpace"}
_, err := m.engine.Get(&c)
if err != nil {
......@@ -532,10 +556,18 @@ func (m *dbMeta) StatFS(ctx Context, totalspace, availspace, iused, iavail *uint
usedSpace = 0
}
usedSpace = ((usedSpace >> 16) + 1) << 16 // aligned to 64K
*totalspace = 1 << 50
for *totalspace < uint64(usedSpace) {
*totalspace *= 2
if m.fmt.Capacity > 0 {
*totalspace = m.fmt.Capacity
if *totalspace < uint64(usedSpace) {
*totalspace = uint64(usedSpace)
}
} else {
*totalspace = 1 << 50
for *totalspace*8 < uint64(usedSpace)*10 {
*totalspace *= 2
}
}
*availspace = *totalspace - uint64(usedSpace)
c = counter{Name: "totalInodes"}
_, err = m.engine.Get(&c)
......@@ -548,7 +580,15 @@ func (m *dbMeta) StatFS(ctx Context, totalspace, availspace, iused, iavail *uint
inodes = 0
}
*iused = uint64(inodes)
*iavail = 10 << 20
if m.fmt.Inodes > 0 {
if *iused > m.fmt.Inodes {
*iavail = 0
} else {
*iavail = m.fmt.Inodes - *iused
}
} else {
*iavail = 10 << 20
}
return 0
}
......@@ -769,6 +809,9 @@ func (m *dbMeta) Truncate(ctx Context, inode Ino, flags uint8, length uint64, at
}
}
newSpace = align4K(length) - align4K(n.Length)
if m.checkQuota(newSpace, 0) {
return syscall.ENOSPC
}
now := time.Now().UnixNano() / 1e3
n.Length = length
n.Mtime = now
......@@ -827,6 +870,9 @@ func (m *dbMeta) Fallocate(ctx Context, inode Ino, mode uint8, off uint64, size
old := n.Length
newSpace = align4K(length) - align4K(n.Length)
if m.checkQuota(newSpace, 0) {
return syscall.ENOSPC
}
now := time.Now().UnixNano() / 1e3
n.Length = length
n.Mtime = now
......@@ -901,6 +947,9 @@ func (m *dbMeta) resolveCase(ctx Context, parent Ino, name string) *Entry {
}
func (m *dbMeta) mknod(ctx Context, parent Ino, name string, _type uint8, mode, cumask uint16, rdev uint32, path string, inode *Ino, attr *Attr) syscall.Errno {
if m.checkQuota(4<<10, 1) {
return syscall.ENOSPC
}
ino, err := m.nextInode()
if err != nil {
return errno(err)
......@@ -1693,6 +1742,9 @@ func (m *dbMeta) Write(ctx Context, inode Ino, indx uint32, off uint32, slice Sl
newSpace = align4K(newleng) - align4K(n.Length)
n.Length = newleng
}
if m.checkQuota(newSpace, 0) {
return syscall.ENOSPC
}
now := time.Now().UnixNano() / 1e3
n.Mtime = now
n.Ctime = now
......@@ -1761,6 +1813,9 @@ func (m *dbMeta) CopyFileRange(ctx Context, fin Ino, offIn uint64, fout Ino, off
newSpace = align4K(newleng) - align4K(nout.Length)
nout.Length = newleng
}
if m.checkQuota(newSpace, 0) {
return syscall.ENOSPC
}
now := time.Now().UnixNano() / 1e3
nout.Mtime = now
nout.Ctime = now
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册