未验证 提交 888f1cdb 编写于 作者: D Davies Liu 提交者: GitHub

Cleanup zero refs for slices (#295)

* cleanup zero ref for slices

* cleanup old refs for slices

* allow delete chunks in gc

* fix dead loop

* remove keys
上级 9a83a1bb
......@@ -141,13 +141,13 @@ func gc(ctx *cli.Context) error {
}
logger.Infof("Data use %s", blob)
store := chunk.NewCachedStore(blob, chunkConf)
m.OnMsg(meta.DeleteChunk, meta.MsgCallback(func(args ...interface{}) error {
chunkid := args[0].(uint64)
length := args[1].(uint32)
return store.Remove(chunkid, int(length))
}))
if ctx.Bool("compact") {
store := chunk.NewCachedStore(blob, chunkConf)
m.OnMsg(meta.DeleteChunk, meta.MsgCallback(func(args ...interface{}) error {
chunkid := args[0].(uint64)
length := args[1].(uint32)
return store.Remove(chunkid, int(length))
}))
var nc, ns, nb int
var lastLog time.Time
m.OnMsg(meta.CompactChunk, meta.MsgCallback(func(args ...interface{}) error {
......
......@@ -1844,8 +1844,9 @@ func (r *redisMeta) cleanupSlices() {
r.deleteSlice(ctx, uint64(chunkid), uint32(size))
}
}
} else if v == "0" {
r.cleanupZeroRef(ckeys[i])
}
// TODO: delete the values "0"
}
}
if cursor == 0 {
......@@ -1855,6 +1856,24 @@ func (r *redisMeta) cleanupSlices() {
}
}
func (r *redisMeta) cleanupZeroRef(key string) {
var ctx = Background
_ = r.txn(ctx, func(tx *redis.Tx) error {
v, err := tx.HGet(ctx, sliceRefs, key).Int()
if err != nil {
return err
}
if v != 0 {
return syscall.EINVAL
}
_, err = tx.Pipelined(ctx, func(p redis.Pipeliner) error {
p.HDel(ctx, sliceRefs, key)
return nil
})
return err
}, sliceRefs)
}
func (r *redisMeta) cleanupLeakedChunks() {
var ctx = Background
var ckeys []string
......@@ -1901,6 +1920,53 @@ func (r *redisMeta) cleanupLeakedChunks() {
}
}
func (r *redisMeta) cleanupOldSliceRefs() {
var ctx = Background
var ckeys []string
var cursor uint64
var err error
for {
ckeys, cursor, err = r.rdb.Scan(ctx, cursor, "k*", 1000).Result()
if err != nil {
logger.Errorf("scan slices: %s", err)
break
}
if len(ckeys) > 0 {
values, err := r.rdb.MGet(ctx, ckeys...).Result()
if err != nil {
logger.Warnf("mget slices: %s", err)
break
}
for i, v := range values {
if v == nil {
continue
}
if strings.HasPrefix(v.(string), "-") { // < 0
ps := strings.Split(ckeys[i], "_")
if len(ps) == 2 {
chunkid, _ := strconv.Atoi(ps[0][1:])
size, _ := strconv.Atoi(ps[1])
if chunkid > 0 && size > 0 {
r.deleteSlice(ctx, uint64(chunkid), uint32(size))
}
}
r.rdb.Del(ctx, ckeys[i])
} else if v == "0" {
r.rdb.Del(ctx, ckeys[i])
} else {
vv, _ := strconv.Atoi(v.(string))
r.rdb.HIncrBy(ctx, sliceRefs, ckeys[i], int64(vv))
r.rdb.DecrBy(ctx, ckeys[i], int64(vv))
logger.Infof("move refs %d for slice %s", vv, ckeys[i])
}
}
}
if cursor == 0 {
break
}
}
}
func (r *redisMeta) toDelete(inode Ino, length uint64) string {
return inode.String() + ":" + strconv.Itoa(int(length))
}
......@@ -2108,6 +2174,7 @@ func (r *redisMeta) compactChunk(inode Ino, indx uint32) {
} else if errno == 0 {
// reset it to zero
r.rdb.HIncrBy(ctx, sliceRefs, r.sliceKey(chunkid, size), -1)
r.cleanupZeroRef(r.sliceKey(chunkid, size))
for i, s := range ss {
if rs[i].Err() == nil && rs[i].Val() < 0 {
r.deleteSlice(ctx, s.chunkid, s.size)
......@@ -2165,6 +2232,7 @@ func (r *redisMeta) CompactAll(ctx Context) syscall.Errno {
func (r *redisMeta) ListSlices(ctx Context, slices *[]Slice) syscall.Errno {
// try to find leaked chunks cause by 0.10-, remove it in 0.13
r.cleanupLeakedChunks()
r.cleanupOldSliceRefs()
*slices = nil
var cursor uint64
p := r.rdb.Pipeline()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册