未验证 提交 ef77c846 编写于 作者: D Davies Liu 提交者: GitHub

compact slices into bigger one (#2)

* compact slices into bigger one

* cleanup
上级 774cde59
......@@ -20,8 +20,9 @@ import (
)
const (
ChunkSize = 1 << 26 // 64M
DeleteChunk = 1000
ChunkSize = 1 << 26 // 64M
DeleteChunk = 1000
CompactChunk = 1001
)
const (
......@@ -127,7 +128,7 @@ type Meta interface {
Create(ctx Context, parent Ino, name string, mode uint16, cumask uint16, inode *Ino, attr *Attr) syscall.Errno
Open(ctx Context, inode Ino, flags uint8, attr *Attr) syscall.Errno
Close(ctx Context, inode Ino) syscall.Errno
Read(inode Ino, indx uint32, chunks *[]Slice) syscall.Errno
Read(ctx Context, inode Ino, indx uint32, chunks *[]Slice) syscall.Errno
NewChunk(ctx Context, inode Ino, indx uint32, offset uint32, chunkid *uint64) syscall.Errno
Write(ctx Context, inode Ino, indx uint32, off uint32, slice Slice) syscall.Errno
......
......@@ -65,6 +65,7 @@ type redisMeta struct {
sid int64
openFiles map[Ino]int
removedFiles map[Ino]bool
compacting map[uint64]bool
msgCallbacks *msgCallbacks
}
......@@ -86,6 +87,7 @@ func NewRedisMeta(url string, conf *RedisConfig) (Meta, error) {
rdb: redis.NewClient(opt),
openFiles: make(map[Ino]int),
removedFiles: make(map[Ino]bool),
compacting: make(map[uint64]bool),
msgCallbacks: &msgCallbacks{
callbacks: make(map[uint32]MsgCallback),
},
......@@ -112,7 +114,27 @@ func (r *redisMeta) Init(format Format) error {
if err != nil {
logger.Fatalf("json: %s", err)
}
return r.rdb.Set(c, "setting", data, 0).Err()
err = r.rdb.Set(c, "setting", data, 0).Err()
if err != nil {
return err
}
// root inode
var attr Attr
attr.Flags = 0
attr.Typ = TypeDirectory
attr.Mode = 0777
attr.Uid = 0
attr.Uid = 0
ts := time.Now().Unix()
attr.Atime = ts
attr.Mtime = ts
attr.Ctime = ts
attr.Nlink = 2
attr.Length = 4 << 10
attr.Rdev = 0
r.rdb.Set(c, r.inodeKey(1), r.marshal(&attr), 0)
return nil
}
func (r *redisMeta) Load() (*Format, error) {
......@@ -144,7 +166,7 @@ func (r *redisMeta) newMsg(mid uint32, args ...interface{}) error {
if ok {
return cb(args...)
}
panic("not callback for " + strconv.Itoa(int(mid)))
return fmt.Errorf("message %d is not supported", mid)
}
var c = context.TODO()
......@@ -1152,31 +1174,42 @@ func (r *redisMeta) Close(ctx Context, inode Ino) syscall.Errno {
return 0
}
func (r *redisMeta) Read(inode Ino, indx uint32, chunks *[]Slice) syscall.Errno {
vals, err := r.rdb.LRange(c, r.chunkKey(inode, indx), 0, 1000000).Result()
if err != nil {
return errno(err)
}
func (r *redisMeta) buildSlice(ss []*slice) []Slice {
var root *slice
for _, val := range vals {
rb := utils.ReadBuffer([]byte(val))
pos := rb.Get32()
chunkid := rb.Get64()
cleng := rb.Get32()
soff := rb.Get32()
slen := rb.Get32()
s := newSlice(pos, chunkid, cleng, soff, slen)
for _, s := range ss {
if root != nil {
var right *slice
s.left, right = root.cut(pos)
_, s.right = right.cut(pos + slen)
s.left, right = root.cut(s.pos)
_, s.right = right.cut(s.pos + s.len)
}
root = s
}
var pos uint32
var chunks []Slice
root.visit(func(s *slice) {
*chunks = append(*chunks, Slice{s.chunkid, s.cleng, s.off, s.len})
if s.pos > pos {
chunks = append(chunks, Slice{0, s.pos - pos, 0, s.pos - pos})
pos = s.pos
}
chunks = append(chunks, Slice{s.chunkid, s.size, s.off, s.len})
pos += s.len
})
// TODO: compact
return chunks
}
func (r *redisMeta) Read(ctx Context, inode Ino, indx uint32, chunks *[]Slice) syscall.Errno {
vals, err := r.rdb.LRange(c, r.chunkKey(inode, indx), 0, 1000000).Result()
if err != nil {
return errno(err)
}
ss := make([]*slice, len(vals))
for i, val := range vals {
ss[i] = r.parseSlice([]byte(val))
}
*chunks = r.buildSlice(ss)
if len(vals) >= 5 {
go r.compact(inode, indx)
}
return 0
}
......@@ -1216,14 +1249,18 @@ func (r *redisMeta) Write(ctx Context, inode Ino, indx uint32, off uint32, slice
w.Put32(slice.Off)
w.Put32(slice.Len)
var rpush *redis.IntCmd
_, err = tx.TxPipelined(c, func(pipe redis.Pipeliner) error {
pipe.RPush(c, r.chunkKey(inode, indx), w.Bytes())
rpush = pipe.RPush(c, r.chunkKey(inode, indx), w.Bytes())
pipe.Set(c, r.inodeKey(inode), r.marshal(&attr), 0)
if added > 0 {
pipe.IncrBy(c, usedSpace, added)
}
return nil
})
if err == nil && rpush.Val() == 20 {
go r.compact(inode, indx)
}
return err
}, r.inodeKey(inode), r.chunkKey(inode, indx))
}
......@@ -1317,6 +1354,94 @@ func (r *redisMeta) deleteChunks(inode Ino, start, end, maxchunk uint64) {
r.rdb.ZRem(c, delchunks, r.delChunks(inode, start, end, maxchunk))
}
func (r *redisMeta) parseSlice(buf []byte) *slice {
rb := utils.ReadBuffer(buf)
pos := rb.Get32()
chunkid := rb.Get64()
size := rb.Get32()
off := rb.Get32()
len := rb.Get32()
return newSlice(pos, chunkid, size, off, len)
}
func (r *redisMeta) compact(inode Ino, indx uint32) {
// avoid too many or duplicated compaction
r.Lock()
k := uint64(inode) + (uint64(indx) << 32)
if len(r.compacting) > 10 || r.compacting[k] {
r.Unlock()
return
}
r.compacting[k] = true
r.Unlock()
defer func() {
r.Lock()
delete(r.compacting, k)
r.Unlock()
}()
vals, err := r.rdb.LRange(c, r.chunkKey(inode, indx), 0, 100).Result()
if err != nil {
return
}
chunkid, err := r.rdb.Incr(c, "nextchunk").Uint64()
if err != nil {
return
}
ss := make([]*slice, len(vals))
for i, val := range vals {
ss[i] = r.parseSlice([]byte(val))
}
chunks := r.buildSlice(ss)
var size uint32
for _, s := range chunks {
size += s.Len
}
// TODO: skip first few large slices
logger.Debugf("compact %d %d %d %d", inode, indx, len(vals), len(chunks))
err = r.newMsg(CompactChunk, chunks, chunkid)
if err != nil {
logger.Warnf("compact %d %d with %d slices: %s", inode, indx, len(vals), err)
return
}
errno := r.txn(func(tx *redis.Tx) error {
vals2, err := tx.LRange(c, r.chunkKey(inode, indx), 0, int64(len(vals))).Result()
if err != nil {
return err
}
if len(vals2) != len(vals) {
return fmt.Errorf("chunks changed")
}
for i, val := range vals2 {
if val != vals[i] {
return fmt.Errorf("slice %d changed", i)
}
}
w := utils.NewBuffer(24)
w.Put32(0)
w.Put64(chunkid)
w.Put32(size)
w.Put32(0)
w.Put32(size)
_, err = tx.Pipelined(c, func(pipe redis.Pipeliner) error {
pipe.LTrim(c, r.chunkKey(inode, indx), int64(len(vals)), -1)
pipe.LPush(c, r.chunkKey(inode, indx), w.Bytes())
return nil
})
return err
}, r.chunkKey(inode, indx))
if errno != 0 {
// TODO: tracking deletion of chunks
logger.Warnf("update compacted chunk: %s", err)
err = r.newMsg(DeleteChunk, chunkid, size)
if err != nil {
logger.Warnf("delete not used chunk %d (%d bytes): %s", chunkid, size, err)
}
}
}
func (r *redisMeta) GetXattr(ctx Context, inode Ino, name string, vbuff *[]byte) syscall.Errno {
var err error
*vbuff, err = r.rdb.HGet(c, r.xattrKey(inode), name).Bytes()
......
......@@ -32,10 +32,10 @@ func TestRedisClient(t *testing.T) {
t.Skip()
}
m.OnMsg(meta.DeleteChunk, func(args ...interface{}) error { return nil })
m.Init(meta.Format{Name: "test"})
ctx := meta.Background
var parent, inode meta.Ino
var attr = &meta.Attr{}
m.GetAttr(ctx, 1, attr) // init
if st := m.Mkdir(ctx, 1, "d", 0640, 022, 0, &parent, attr); st != 0 {
t.Fatalf("mkdir %s", st)
}
......@@ -88,16 +88,16 @@ func TestRedisClient(t *testing.T) {
t.Fatalf("write end: %s", st)
}
var chunks []meta.Slice
if st := m.Read(inode, 0, &chunks); st != 0 {
if st := m.Read(ctx, inode, 0, &chunks); st != 0 {
t.Fatalf("read chunk: %s", st)
}
if len(chunks) != 1 || chunks[0].Chunkid != chunkid || chunks[0].Size != 100 {
if len(chunks) != 2 || chunks[0].Chunkid != 0 || chunks[0].Size != 100 || chunks[1].Chunkid != chunkid || chunks[1].Size != 100 {
t.Fatalf("chunks: %v", chunks)
}
if st := m.Fallocate(ctx, inode, fallocPunchHole|fallocKeepSize, 100, 50); st != 0 {
t.Fatalf("fallocate: %s", st)
}
if st := m.Read(inode, 0, &chunks); st != 0 {
if st := m.Read(ctx, inode, 0, &chunks); st != 0 {
t.Fatalf("read chunk: %s", st)
}
if len(chunks) != 3 || chunks[1].Chunkid != 0 || chunks[1].Len != 50 || chunks[2].Chunkid != chunkid || chunks[2].Len != 50 {
......@@ -212,3 +212,54 @@ func TestRedisClient(t *testing.T) {
t.Fatalf("rmdir: %s", st)
}
}
func TestCompaction(t *testing.T) {
var conf RedisConfig
m, err := NewRedisMeta("redis://127.0.0.1:6379/8", &conf)
if err != nil {
t.Logf("redis is not available: %s", err)
t.Skip()
}
m.Init(meta.Format{Name: "test"})
done := make(chan bool, 1)
m.OnMsg(meta.CompactChunk, func(args ...interface{}) error {
select {
case done <- true:
default:
}
return nil
})
ctx := meta.Background
var inode meta.Ino
var attr = &meta.Attr{}
if st := m.Create(ctx, 1, "f", 0650, 022, &inode, attr); st != 0 {
t.Fatalf("create file %s", st)
}
defer m.Unlink(ctx, 1, "f")
for i := 0; i < 50; i++ {
if st := m.Write(ctx, inode, 0, uint32(i*100), meta.Slice{uint64(i) + 1, 100, 0, 100}); st != 0 {
t.Fatalf("write %d: %s", i, st)
}
time.Sleep(time.Millisecond)
}
<-done
var chunks []meta.Slice
if st := m.Read(ctx, inode, 0, &chunks); st != 0 {
t.Fatalf("read 0: %s", st)
}
if len(chunks) > 20 {
t.Fatalf("inode %d should be compacted, but have %d slices", inode, len(chunks))
}
<-done
// wait for it to update chunks
time.Sleep(time.Millisecond * 5)
if st := m.Read(ctx, inode, 0, &chunks); st != 0 {
t.Fatalf("read 0: %s", st)
}
if len(chunks) > 1 {
t.Fatalf("inode %d should be compacted after read, but have %d slices", inode, len(chunks))
}
if chunks[0].Size != 5000 {
t.Fatalf("size of slice should be 5000, but got %d", chunks[0].Size)
}
}
......@@ -17,7 +17,7 @@ package redis
type slice struct {
chunkid uint64
cleng uint32
size uint32
off uint32
len uint32
pos uint32
......@@ -32,7 +32,7 @@ func newSlice(pos uint32, chunkid uint64, cleng, off, len uint32) *slice {
s := &slice{}
s.pos = pos
s.chunkid = chunkid
s.cleng = cleng
s.size = cleng
s.off = off
s.len = len
s.left = nil
......@@ -52,7 +52,7 @@ func (s *slice) cut(pos uint32) (left, right *slice) {
return left, s
} else if pos < s.pos+s.len {
l := pos - s.pos
right = newSlice(pos, s.chunkid, s.cleng, s.off+l, s.len-l)
right = newSlice(pos, s.chunkid, s.size, s.off+l, s.len-l)
right.right = s.right
s.len = l
s.right = nil
......
......@@ -171,7 +171,7 @@ func (s *sliceReader) run() {
length := f.length
f.Unlock()
var chunks []meta.Slice
err := f.r.m.Read(inode, indx, &chunks)
err := f.r.m.Read(meta.Background, inode, indx, &chunks)
f.Lock()
if s.state != BUSY || f.err != 0 || f.closing {
s.done(0, 0)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册