未验证 提交 ddd4fdaf 编写于 作者: D Davies Liu 提交者: GitHub

Collect garbage in object store (#248)

* garbage collection for objects

* delete in parallel

* fix tests

* add progress bar

* cleanup
上级 4e79aef5
/*
* JuiceFS, Copyright (C) 2018 Juicedata, Inc.
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package main
import (
"fmt"
"strconv"
"strings"
"sync"
"time"
"github.com/juicedata/juicefs/pkg/chunk"
"github.com/juicedata/juicefs/pkg/meta"
"github.com/juicedata/juicefs/pkg/object"
osync "github.com/juicedata/juicefs/pkg/sync"
"github.com/juicedata/juicefs/pkg/vfs"
"github.com/urfave/cli/v2"
)
func gcFlags() *cli.Command {
return &cli.Command{
Name: "gc",
Usage: "collect any leaked objects",
ArgsUsage: "REDIS-URL",
Action: gc,
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "delete",
Usage: "deleted leaked objects",
},
&cli.IntFlag{
Name: "threads",
Value: 50,
Usage: "number threads to delete leaked objects",
},
},
}
}
type gcProgress struct {
total int
found int // valid slices
leaked int
leakedBytes int64
}
func showProgress(p *gcProgress) {
var lastDone []int
var lastTime []time.Time
for {
if p.total == 0 {
time.Sleep(time.Millisecond * 10)
continue
}
var width int = 55
a := width * p.found / (p.total + p.leaked)
b := width * p.leaked / (p.total + p.leaked)
var bar [80]byte
for i := 0; i < width; i++ {
if i < a {
bar[i] = '='
} else if i < a+b {
bar[i] = '-'
} else {
bar[i] = ' '
}
}
now := time.Now()
lastDone = append(lastDone, p.found+p.leaked)
lastTime = append(lastTime, now)
for len(lastTime) > 18 { // 5 seconds
lastDone = lastDone[1:]
lastTime = lastTime[1:]
}
if len(lastTime) > 1 {
n := len(lastTime) - 1
d := lastTime[n].Sub(lastTime[0]).Seconds()
fps := float64(lastDone[n]-lastDone[0]) / d
fmt.Printf("[%s] % 8d % 2d%% % 4.0f/s \r", string(bar[:]), p.total+p.leaked, (p.found+p.leaked)*100/(p.total+p.leaked), fps)
}
time.Sleep(time.Millisecond * 300)
}
}
func gc(ctx *cli.Context) error {
setLoggerLevel(ctx)
if ctx.Args().Len() < 1 {
return fmt.Errorf("REDIS-URL is needed")
}
addr := ctx.Args().Get(0)
if !strings.Contains(addr, "://") {
addr = "redis://" + addr
}
logger.Infof("Meta address: %s", addr)
var rc = meta.RedisConfig{Retries: 10, Strict: true}
m, err := meta.NewRedisMeta(addr, &rc)
if err != nil {
logger.Fatalf("Meta: %s", err)
}
format, err := m.Load()
if err != nil {
logger.Fatalf("load setting: %s", err)
}
chunkConf := chunk.Config{
BlockSize: format.BlockSize * 1024,
Compress: format.Compression,
GetTimeout: time.Second * 60,
PutTimeout: time.Second * 60,
MaxUpload: 20,
Prefetch: 0,
BufferSize: 300,
CacheDir: "memory",
CacheSize: 0,
}
blob, err := createStorage(format)
if err != nil {
logger.Fatalf("object storage: %s", err)
}
logger.Infof("Data use %s", blob)
store := chunk.NewCachedStore(blob, chunkConf)
m.OnMsg(meta.DeleteChunk, meta.MsgCallback(func(args ...interface{}) error {
chunkid := args[0].(uint64)
length := args[1].(uint32)
return store.Remove(chunkid, int(length))
}))
m.OnMsg(meta.CompactChunk, meta.MsgCallback(func(args ...interface{}) error {
slices := args[0].([]meta.Slice)
chunkid := args[1].(uint64)
return vfs.Compact(chunkConf, store, slices, chunkid)
}))
blob = object.WithPrefix(blob, "chunks/")
objs, err := osync.ListAll(blob, "", "")
if err != nil {
logger.Fatalf("list all blocks: %s", err)
}
var c = meta.NewContext(0, 0, []uint32{0})
var slices []meta.Slice
r := m.ListSlices(c, &slices)
if r != 0 {
logger.Fatalf("list all slices: %s", r)
}
keys := make(map[uint64]uint32)
var totalBytes uint64
for _, s := range slices {
keys[s.Chunkid] = s.Size
totalBytes += uint64(s.Size)
}
logger.Infof("using %d slices (%d bytes)", len(keys), totalBytes)
var p = gcProgress{total: len(keys)}
go showProgress(&p)
var skipped, skippedBytes int64
maxMtime := time.Now().Add(time.Hour * -1)
var leakedObj = make(chan string, 10240)
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for key := range leakedObj {
if err := blob.Delete(key); err != nil {
logger.Warnf("delete %s: %s", key, err)
}
}
}()
}
foundLeaked := func(obj *object.Object) {
p.leakedBytes += obj.Size
p.leaked++
if ctx.Bool("delete") {
leakedObj <- obj.Key
}
}
for obj := range objs {
if obj == nil {
break // failed listing
}
if obj.IsDir {
continue
}
if obj.Mtime.After(maxMtime) || obj.Mtime.Unix() == 0 {
logger.Debugf("ignore new block: %s %s", obj.Key, obj.Mtime)
skippedBytes += obj.Size
skipped++
continue
}
logger.Debugf("found block %s", obj.Key)
parts := strings.Split(obj.Key, "/")
if len(parts) != 3 {
continue
}
name := parts[2]
parts = strings.Split(name, "_")
if len(parts) != 3 {
continue
}
cid, _ := strconv.Atoi(parts[0])
size := keys[uint64(cid)]
if size == 0 {
logger.Debugf("find leaked object: %s, size: %d", obj.Key, obj.Size)
foundLeaked(obj)
continue
}
indx, _ := strconv.Atoi(parts[1])
csize, _ := strconv.Atoi(parts[2])
if csize == chunkConf.BlockSize {
if (indx+1)*csize > int(size) {
logger.Warnf("size of slice %d is larger than expected: %d > %d", cid, indx*chunkConf.BlockSize+csize, size)
foundLeaked(obj)
} else if (indx+1)*csize == int(size) {
p.found++
}
} else {
if indx*chunkConf.BlockSize+csize != int(size) {
logger.Warnf("size of slice %d is %d, but expect %d", cid, indx*chunkConf.BlockSize+csize, size)
foundLeaked(obj)
} else {
p.found++
}
}
}
close(leakedObj)
wg.Wait()
logger.Infof("found %d leaked objects (%d bytes), skipped %d (%d bytes)", p.leaked, p.leakedBytes, skipped, skippedBytes)
return nil
}
......@@ -65,6 +65,7 @@ func main() {
syncFlags(),
rmrFlags(),
benchmarkFlags(),
gcFlags(),
},
}
......
......@@ -204,6 +204,9 @@ type Meta interface {
// Rmr remove all the files and directories recursively.
Rmr(ctx Context, inode Ino, name string) syscall.Errno
// ListSlices returns all slices used by all files.
ListSlices(ctx Context, slices *[]Slice) syscall.Errno
// OnMsg add a callback for the given message type.
OnMsg(mtype uint32, cb MsgCallback)
}
......@@ -2067,6 +2067,41 @@ func (r *redisMeta) compactChunk(inode Ino, indx uint32) {
}
}
func (r *redisMeta) ListSlices(ctx Context, slices *[]Slice) syscall.Errno {
*slices = nil
var cursor uint64
p := r.rdb.Pipeline()
for {
keys, c, err := r.rdb.Scan(ctx, cursor, "c*_*", 10000).Result()
if err != nil {
logger.Warnf("scan chunks: %s", err)
return errno(err)
}
for _, key := range keys {
_ = p.LRange(ctx, key, 0, 100000000)
}
cmds, err := p.Exec(ctx)
if err != nil {
logger.Warnf("list slices: %s", err)
return errno(err)
}
for _, cmd := range cmds {
vals := cmd.(*redis.StringSliceCmd).Val()
ss := readSlices(vals)
for _, s := range ss {
if s.chunkid > 0 {
*slices = append(*slices, Slice{Chunkid: s.chunkid, Size: s.size})
}
}
}
if c == 0 {
break
}
cursor = c
}
return 0
}
func (r *redisMeta) GetXattr(ctx Context, inode Ino, name string, vbuff *[]byte) syscall.Errno {
var err error
*vbuff, err = r.rdb.HGet(ctx, r.xattrKey(inode), name).Bytes()
......
......@@ -59,12 +59,21 @@ type ObjectStorage interface {
// Delete a object.
Delete(key string) error
// Head returns some information about the object or an error if not found.
Head(key string) (*Object, error)
// List returns a list of objects.
List(prefix, marker string, limit int64) ([]*Object, error)
// ListAll returns all the objects as an channel.
ListAll(prefix, marker string) (<-chan *Object, error)
// CreateMultipartUpload starts to upload a large object part by part.
CreateMultipartUpload(key string) (*MultipartUpload, error)
// UploadPart upload a part of an object.
UploadPart(key string, uploadID string, num int, body []byte) (*Part, error)
// AbortUpload abort a multipart upload.
AbortUpload(key string, uploadID string)
// CompleteUpload finish an multipart upload.
CompleteUpload(key string, uploadID string, parts []*Part) error
// ListUploads lists existing multipart uploads.
ListUploads(marker string) ([]*PendingPart, string, error)
}
......@@ -71,8 +71,8 @@ func formatSize(bytes uint64) string {
return fmt.Sprintf("%.3f %siB", v, units[z])
}
// iterate on all the keys that starts at marker from object storage.
func iterate(store object.ObjectStorage, start, end string) (<-chan *object.Object, error) {
// ListAll on all the keys that starts at marker from object storage.
func ListAll(store object.ObjectStorage, start, end string) (<-chan *object.Object, error) {
startTime := time.Now()
logger.Debugf("Iterating objects from %s start %q", store, start)
......@@ -434,12 +434,12 @@ func producer(tasks chan *object.Object, src, dst object.ObjectStorage, config *
}
logger.Debugf("maxResults: %d, defaultPartSize: %d, maxBlock: %d", maxResults, defaultPartSize, maxBlock)
srckeys, err := iterate(src, start, end)
srckeys, err := ListAll(src, start, end)
if err != nil {
logger.Fatal(err)
}
dstkeys, err := iterate(dst, start, end)
dstkeys, err := ListAll(dst, start, end)
if err != nil {
logger.Fatal(err)
}
......
......@@ -38,7 +38,7 @@ func TestIterator(t *testing.T) {
m.Put("aa", bytes.NewReader([]byte("a")))
m.Put("c", bytes.NewReader([]byte("a")))
ch, _ := iterate(m, "a", "b")
ch, _ := ListAll(m, "a", "b")
keys := collectAll(ch)
if len(keys) != 3 {
t.Errorf("length should be 3, but got %d", len(keys))
......@@ -52,7 +52,7 @@ func TestIterator(t *testing.T) {
// Single object
s, _ := object.CreateStorage("mem", "", "", "")
s.Put("a", bytes.NewReader([]byte("a")))
ch, _ = iterate(s, "", "")
ch, _ = ListAll(s, "", "")
keys = collectAll(ch)
if !reflect.DeepEqual(keys, []string{"a"}) {
t.Errorf("result wrong: %s", keys)
......@@ -73,7 +73,7 @@ func TestIeratorSingleEmptyKey(t *testing.T) {
// Simulate command line prefix in SRC or DST
s = object.WithPrefix(s, "abc")
ch, _ := iterate(s, "", "")
ch, _ := ListAll(s, "", "")
keys := collectAll(ch)
if !reflect.DeepEqual(keys, []string{""}) {
t.Errorf("result wrong: %s", keys)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册