未验证 提交 18c9746a 编写于 作者: D Davies Liu 提交者: GitHub

delete all chunks together (#84)

* delete all chunks together

* update comment

* cleanup

* fix lint error

* fix rename
上级 8a474e5e
......@@ -43,7 +43,7 @@ import (
Flock: lockf$inode -> { $sid_$owner -> ltype }
POSIX lock: lockp$inode -> { $sid_$owner -> Plock(pid,ltype,start,end) }
Sessions: sessions -> [ $sid -> heartbeat ]
Removed chunks: delchunks -> [($inode,$start,$end,$maxchunk) -> seconds]
Removed chunks: delchunks -> [$inode -> seconds]
*/
var logger = utils.GetLogger("juicefs")
......@@ -216,7 +216,7 @@ func (r *redisMeta) entryKey(parent Ino) string {
}
func (r *redisMeta) chunkKey(inode Ino, indx uint32) string {
return fmt.Sprintf("c%d_%d", inode, indx)
return "c" + inode.String() + "_" + strconv.FormatInt(int64(indx), 10)
}
func (r *redisMeta) xattrKey(inode Ino) string {
......@@ -389,10 +389,6 @@ func (r *redisMeta) txn(txf func(tx *redis.Tx) error, keys ...string) syscall.Er
}
func (r *redisMeta) Truncate(ctx Context, inode Ino, flags uint8, length uint64, attr *Attr) syscall.Errno {
maxchunk, err := r.rdb.IncrBy(c, "nextchunk", 0).Uint64()
if err != nil {
return errno(err)
}
return r.txn(func(tx *redis.Tx) error {
var t Attr
a, err := tx.Get(c, r.inodeKey(inode)).Bytes()
......@@ -404,6 +400,37 @@ func (r *redisMeta) Truncate(ctx Context, inode Ino, flags uint8, length uint64,
return syscall.EPERM
}
old := t.Length
var zeroChunks []uint32
if length > old {
if (length-old)/ChunkSize >= 100 {
// super large
var cursor uint64
var keys []string
for {
keys, cursor, err = tx.Scan(c, cursor, fmt.Sprintf("c%d_*", inode), 10000).Result()
if err != nil {
return err
}
for _, key := range keys {
indx, err := strconv.Atoi(strings.Split(key, "_")[1])
if err != nil {
logger.Errorf("parse %s: %s", key, err)
continue
}
if uint64(indx) > old/ChunkSize && uint64(indx) < length/ChunkSize {
zeroChunks = append(zeroChunks, uint32(indx))
}
}
if len(keys) < 10000 {
break
}
}
} else {
for i := old/ChunkSize + 1; i < length/ChunkSize; i++ {
zeroChunks = append(zeroChunks, uint32(i))
}
}
}
t.Length = length
now := time.Now()
t.Mtime = now.Unix()
......@@ -412,17 +439,37 @@ func (r *redisMeta) Truncate(ctx Context, inode Ino, flags uint8, length uint64,
t.Ctimensec = uint32(now.Nanosecond())
_, err = tx.TxPipelined(c, func(pipe redis.Pipeliner) error {
pipe.Set(c, r.inodeKey(inode), r.marshal(&t), 0)
if old > length {
pipe.ZAdd(c, delchunks, &redis.Z{Score: float64(now.Unix()), Member: r.delChunks(inode, length, old, maxchunk)})
} else if length > (old/ChunkSize+1)*ChunkSize {
// zero out last chunks
if length > old {
// zero out from old to length
w := utils.NewBuffer(24)
w.Put32(uint32(old % ChunkSize))
w.Put64(0)
w.Put32(0)
w.Put32(0)
w.Put32(ChunkSize - uint32(old%ChunkSize))
if length > (old/ChunkSize+1)*ChunkSize {
w.Put32(ChunkSize - uint32(old%ChunkSize))
} else {
w.Put32(uint32(length - old))
}
pipe.RPush(c, r.chunkKey(inode, uint32(old/ChunkSize)), w.Bytes())
w = utils.NewBuffer(24)
w.Put32(0)
w.Put64(0)
w.Put32(0)
w.Put32(0)
w.Put32(ChunkSize)
for _, indx := range zeroChunks {
pipe.RPush(c, r.chunkKey(inode, indx), w.Bytes())
}
if length > (old/ChunkSize+1)*ChunkSize && length%ChunkSize > 0 {
w := utils.NewBuffer(24)
w.Put32(0)
w.Put64(0)
w.Put32(0)
w.Put32(0)
w.Put32(uint32(length % ChunkSize))
pipe.RPush(c, r.chunkKey(inode, uint32(length/ChunkSize)), w.Bytes())
}
}
pipe.IncrBy(c, usedSpace, align4K(length)-align4K(old))
return nil
......@@ -431,7 +478,6 @@ func (r *redisMeta) Truncate(ctx Context, inode Ino, flags uint8, length uint64,
if attr != nil {
*attr = t
}
go r.deleteChunks(inode, length, old, maxchunk)
}
return err
}, r.inodeKey(inode))
......@@ -721,17 +767,10 @@ func (r *redisMeta) Unlink(ctx Context, parent Ino, name string) syscall.Errno {
attr.Nlink--
var opened bool
var maxchunk uint64
if _type == TypeFile && attr.Nlink == 0 {
r.Lock()
opened = r.openFiles[inode] > 0
r.Unlock()
if !opened {
maxchunk, err = tx.IncrBy(c, "nextchunk", 0).Uint64()
if err != nil {
return err
}
}
}
_, err = tx.TxPipelined(c, func(pipe redis.Pipeliner) error {
......@@ -750,7 +789,7 @@ func (r *redisMeta) Unlink(ctx Context, parent Ino, name string) syscall.Errno {
pipe.Set(c, r.inodeKey(inode), r.marshal(&attr), 0)
pipe.SAdd(c, r.sessionKey(r.sid), strconv.Itoa(int(inode)))
} else {
pipe.ZAdd(c, delchunks, &redis.Z{Score: float64(now.Unix()), Member: r.delChunks(inode, 0, attr.Length, maxchunk)})
pipe.ZAdd(c, delchunks, &redis.Z{Score: float64(now.Unix()), Member: inode.String()})
pipe.Del(c, r.inodeKey(inode))
pipe.IncrBy(c, usedSpace, -align4K(attr.Length))
}
......@@ -765,7 +804,7 @@ func (r *redisMeta) Unlink(ctx Context, parent Ino, name string) syscall.Errno {
r.removedFiles[inode] = true
r.Unlock()
} else {
go r.deleteChunks(inode, 0, attr.Length, maxchunk)
go r.deleteChunks(inode, inode.String())
}
}
return err
......@@ -869,7 +908,6 @@ func (r *redisMeta) Rename(ctx Context, parentSrc Ino, nameSrc string, parentDst
}
var tattr Attr
var opened bool
var maxchunk uint64
if err == nil {
typ1, dino1 := r.parseEntry(buf)
if dino1 != dino || typ1 != dtyp {
......@@ -898,12 +936,6 @@ func (r *redisMeta) Rename(ctx Context, parentSrc Ino, nameSrc string, parentDst
r.Lock()
opened = r.openFiles[dino] > 0
r.Unlock()
if !opened {
maxchunk, err = tx.IncrBy(c, "nextchunk", 0).Uint64()
if err != nil {
return err
}
}
}
}
} else {
......@@ -971,7 +1003,7 @@ func (r *redisMeta) Rename(ctx Context, parentSrc Ino, nameSrc string, parentDst
pipe.Set(c, r.inodeKey(dino), r.marshal(&tattr), 0)
pipe.SAdd(c, r.sessionKey(r.sid), strconv.Itoa(int(dino)))
} else {
pipe.ZAdd(c, delchunks, &redis.Z{Score: float64(now.Unix()), Member: r.delChunks(dino, 0, tattr.Length, maxchunk)})
pipe.ZAdd(c, delchunks, &redis.Z{Score: float64(now.Unix()), Member: dino.String()})
pipe.Del(c, r.inodeKey(dino))
pipe.IncrBy(c, usedSpace, -align4K(tattr.Length))
}
......@@ -994,7 +1026,7 @@ func (r *redisMeta) Rename(ctx Context, parentSrc Ino, nameSrc string, parentDst
r.removedFiles[dino] = true
r.Unlock()
} else {
go r.deleteChunks(dino, 0, tattr.Length, maxchunk)
go r.deleteChunks(dino, dino.String())
}
}
return err
......@@ -1166,19 +1198,15 @@ func (r *redisMeta) deleteInode(inode Ino) error {
if err != nil {
return err
}
maxchunk, err := r.rdb.IncrBy(c, "nextchunk", 0).Uint64()
if err != nil {
return err
}
r.parseAttr(a, &attr)
_, err = r.rdb.TxPipelined(c, func(pipe redis.Pipeliner) error {
pipe.ZAdd(c, delchunks, &redis.Z{Score: float64(time.Now().Unix()), Member: r.delChunks(inode, 0, attr.Length, maxchunk)})
pipe.ZAdd(c, delchunks, &redis.Z{Score: float64(time.Now().Unix()), Member: inode.String()})
pipe.Del(c, r.inodeKey(inode))
pipe.IncrBy(c, usedSpace, -align4K(attr.Length))
return nil
})
if err == nil {
go r.deleteChunks(inode, 0, attr.Length, maxchunk)
go r.deleteChunks(inode, inode.String())
}
return err
}
......@@ -1307,46 +1335,36 @@ func (r *redisMeta) Write(ctx Context, inode Ino, indx uint32, off uint32, slice
}, r.inodeKey(inode))
}
func (r *redisMeta) delChunks(inode Ino, start, end, maxchunkid uint64) string {
return fmt.Sprintf("%d:%d:%d:%d", inode, start, end, maxchunkid)
}
func (r *redisMeta) cleanupChunks() {
for {
now := time.Now()
members, _ := r.rdb.ZRangeByScore(c, delchunks, &redis.ZRangeBy{Min: strconv.Itoa(0), Max: strconv.Itoa(int(now.Add(time.Hour).Unix())), Count: 1000}).Result()
for _, member := range members {
ps := strings.Split(member, ":")
if len(ps) != 4 {
logger.Errorf("invalid del chunks: %s", member)
continue
}
inode, _ := strconv.Atoi(ps[0])
start, _ := strconv.Atoi(ps[1])
end, _ := strconv.Atoi(ps[2])
maxchunk, _ := strconv.Atoi(ps[3])
r.deleteChunks(Ino(inode), uint64(start), uint64(end), uint64(maxchunk))
r.deleteChunks(Ino(inode), member)
}
time.Sleep(time.Minute)
}
}
func (r *redisMeta) deleteChunks(inode Ino, start, end, maxchunk uint64) {
var i uint32
if start > 0 {
i = uint32((start-1)/ChunkSize) + 1
}
func (r *redisMeta) deleteChunks(inode Ino, tracking string) {
var rs []*redis.StringSliceCmd
for uint64(i)*ChunkSize <= end {
for {
keys, _, err := r.rdb.Scan(c, 0, fmt.Sprintf("c%d_*", inode), 1000).Result()
if err != nil {
return
}
if len(keys) == 0 {
break
}
p := r.rdb.Pipeline()
var indx = i
for j := 0; uint64(i)*ChunkSize <= end && j < 1000; j++ {
rs = append(rs, p.LRange(c, r.chunkKey(inode, i), 0, 1000))
i++
for _, k := range keys {
rs = append(rs, p.LRange(c, k, 0, 1000))
}
vals, err := p.Exec(c)
if err != nil {
logger.Errorf("LRange %d[%d-%d]: %s", inode, start, end, err)
logger.Errorf("delete chunk of %d: %s", inode, err)
return
}
for j := range vals {
......@@ -1354,46 +1372,40 @@ func (r *redisMeta) deleteChunks(inode Ino, start, end, maxchunk uint64) {
if err == redis.Nil {
continue
}
indx, _ := strconv.Atoi(strings.Split(keys[j], "_")[1])
for _, cs := range val {
rb := utils.ReadBuffer([]byte(cs))
_ = rb.Get32() // pos
chunkid := rb.Get64()
if chunkid == 0 {
continue
cleng := rb.Get32()
var err error
if chunkid > 0 {
err = r.newMsg(DeleteChunk, chunkid, cleng)
}
// there could be new data written after the chunk is marked for deletion,
// so we should not delete any chunk with id > maxchunk
if chunkid > maxchunk {
// mark this chunk is deleted
break
if err == nil {
err = r.txn(func(tx *redis.Tx) error {
val, err := tx.LRange(c, r.chunkKey(inode, uint32(indx)), 0, 0).Result()
if err != nil {
return err
}
if len(val) == 1 && val[0] == cs {
_, err = tx.TxPipelined(c, func(pipe redis.Pipeliner) error {
pipe.LPop(c, r.chunkKey(inode, uint32(indx)))
return nil
})
return err
}
return fmt.Errorf("chunk %d %d changed", inode, uint32(indx))
}, r.chunkKey(inode, uint32(indx)))
}
cleng := rb.Get32()
err := r.newMsg(DeleteChunk, chunkid, cleng)
if err != nil {
if err != nil && err != syscall.Errno(0) {
logger.Warnf("delete chunk %d fail: %s, retry later", inode, err)
now := time.Now()
key := r.delChunks(inode, uint64((indx+uint32(j)))*ChunkSize, uint64((indx+uint32(j)+1))*ChunkSize, maxchunk)
r.rdb.ZAdd(c, delchunks, &redis.Z{Score: float64(now.Unix()), Member: key})
return
}
_ = r.txn(func(tx *redis.Tx) error {
val, err := tx.LRange(c, r.chunkKey(inode, indx+uint32(j)), 0, 1).Result()
if err != nil {
return err
}
if len(val) == 1 && val[0] == cs {
_, err = tx.TxPipelined(c, func(pipe redis.Pipeliner) error {
pipe.LPop(c, r.chunkKey(inode, indx+uint32(j)))
return nil
})
return err
}
return nil
}, r.chunkKey(inode, indx+uint32(j)))
}
}
}
r.rdb.ZRem(c, delchunks, r.delChunks(inode, start, end, maxchunk))
r.rdb.ZRem(c, delchunks, tracking)
}
func (r *redisMeta) parseSlice(buf []byte) *slice {
......@@ -1488,8 +1500,7 @@ func (r *redisMeta) compact(inode Ino, indx uint32) {
w.Put32(size)
_, err = r.rdb.Pipelined(c, func(pipe redis.Pipeliner) error {
pipe.RPush(c, r.chunkKey(0, 0), w.Bytes())
key := r.delChunks(0, 0, uint64(size), chunkid)
r.rdb.ZAdd(c, delchunks, &redis.Z{Score: float64(time.Now().Unix()), Member: key})
r.rdb.ZAdd(c, delchunks, &redis.Z{Score: float64(time.Now().Unix()), Member: "0"})
return nil
})
if err != nil {
......
......@@ -16,6 +16,7 @@
package meta
import (
"fmt"
"sync"
"syscall"
"testing"
......@@ -314,3 +315,51 @@ func TestConcurrentWrite(t *testing.T) {
t.Fatal()
}
}
// nolint:errcheck
func TestTruncateAndDelete(t *testing.T) {
var conf RedisConfig
m, err := NewRedisMeta("redis://127.0.0.1/10", &conf)
if err != nil {
t.Logf("redis is not available: %s", err)
t.Skip()
}
m.OnMsg(DeleteChunk, func(args ...interface{}) error {
return nil
})
_ = m.Init(Format{Name: "test"}, true)
ctx := Background
var inode Ino
var attr = &Attr{}
m.Unlink(ctx, 1, "f")
if st := m.Create(ctx, 1, "f", 0650, 022, &inode, attr); st != 0 {
t.Fatalf("create file %s", st)
}
defer m.Unlink(ctx, 1, "f")
if st := m.Write(ctx, inode, 0, 100, Slice{1, 100, 0, 100}); st != 0 {
t.Fatalf("write file %s", st)
}
if st := m.Truncate(ctx, inode, 0, 200<<20, attr); st != 0 {
t.Fatalf("truncate file %s", st)
}
if st := m.Truncate(ctx, inode, 0, (10<<40)+10, attr); st != 0 {
t.Fatalf("truncate file %s", st)
}
r := m.(*redisMeta)
keys, _, _ := r.rdb.Scan(c, 0, fmt.Sprintf("c%d_*", inode), 1000).Result()
if len(keys) != 5 {
for _, k := range keys {
println("key", k)
}
t.Fatalf("number of chunks: %d != 5", len(keys))
}
m.Close(ctx, inode)
if st := m.Unlink(ctx, 1, "f"); st != 0 {
t.Fatalf("unlink file %s", st)
}
time.Sleep(time.Millisecond * 10)
keys, _, _ = r.rdb.Scan(c, 0, fmt.Sprintf("c%d_*", inode), 1000).Result()
if len(keys) != 0 {
t.Fatalf("number of chunks: %d != 0", len(keys))
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册