未验证 提交 75227abc 编写于 作者: D Davies Liu 提交者: GitHub

Check consistency of file system (#253)

* check existence of blocks

* fix size of last block

* add gc and fsck into travis

* update tests

* update license header

* use file in tests

* umount force

* disable check for PEM

* fix travis

* fix travis
上级 ddd4fdaf
......@@ -12,6 +12,7 @@ addons:
- libacl1-dev
- redis-server
- attr
- lsof
- gcc-mingw-w64 # for windows
install: true
before_script:
......@@ -29,6 +30,10 @@ script:
- sudo chmod 777 /jfs/tmp
- sudo `which mvn` test -B
- sudo `which mvn` package -B -Dmaven.test.skip=true --quiet || sudo `which mvn` package -B -Dmaven.test.skip=true --quiet
- cd ../../
- sudo ./juicefs umount /jfs || sudo lsof /jfs && sudo ./juicefs umount --force /jfs || true
- sudo ./juicefs gc localhost
- sudo ./juicefs fsck localhost
after_failure:
- sudo tail -n 100 /var/log/syslog
notifications:
......
/*
* JuiceFS, Copyright (C) 2021 Juicedata, Inc.
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package main
import (
"fmt"
"strings"
"time"
"github.com/juicedata/juicefs/pkg/chunk"
"github.com/juicedata/juicefs/pkg/meta"
"github.com/juicedata/juicefs/pkg/object"
osync "github.com/juicedata/juicefs/pkg/sync"
"github.com/urfave/cli/v2"
)
func checkFlags() *cli.Command {
return &cli.Command{
Name: "fsck",
Usage: "Check consistency of file system",
ArgsUsage: "REDIS-URL",
Action: fsck,
}
}
func fsck(ctx *cli.Context) error {
setLoggerLevel(ctx)
if ctx.Args().Len() < 1 {
return fmt.Errorf("REDIS-URL is needed")
}
addr := ctx.Args().Get(0)
if !strings.Contains(addr, "://") {
addr = "redis://" + addr
}
logger.Infof("Meta address: %s", addr)
var rc = meta.RedisConfig{Retries: 10, Strict: true}
m, err := meta.NewRedisMeta(addr, &rc)
if err != nil {
logger.Fatalf("Meta: %s", err)
}
format, err := m.Load()
if err != nil {
logger.Fatalf("load setting: %s", err)
}
chunkConf := chunk.Config{
BlockSize: format.BlockSize * 1024,
Compress: format.Compression,
GetTimeout: time.Second * 60,
PutTimeout: time.Second * 60,
MaxUpload: 20,
Prefetch: 0,
BufferSize: 300,
CacheDir: "memory",
CacheSize: 0,
}
blob, err := createStorage(format)
if err != nil {
logger.Fatalf("object storage: %s", err)
}
logger.Infof("Data use %s", blob)
logger.Infof("Listing all blocks ...")
blob = object.WithPrefix(blob, "chunks/")
objs, err := osync.ListAll(blob, "", "")
if err != nil {
logger.Fatalf("list all blocks: %s", err)
}
var blocks = make(map[string]int64)
var totalBlockBytes int64
for obj := range objs {
if obj == nil {
break // failed listing
}
if obj.IsDir {
continue
}
logger.Debugf("found block %s", obj.Key)
parts := strings.Split(obj.Key, "/")
if len(parts) != 3 {
continue
}
name := parts[2]
blocks[name] = obj.Size
totalBlockBytes += obj.Size
}
logger.Infof("Found %d blocks (%d bytes)", len(blocks), totalBlockBytes)
logger.Infof("Listing all slices ...")
var c = meta.NewContext(0, 0, []uint32{0})
var slices []meta.Slice
r := m.ListSlices(c, &slices)
if r != 0 {
logger.Fatalf("list all slices: %s", r)
}
keys := make(map[uint64]uint32)
var totalBytes uint64
var lost, lostBytes int
for _, s := range slices {
keys[s.Chunkid] = s.Size
totalBytes += uint64(s.Size)
n := (s.Size - 1) / uint32(chunkConf.BlockSize)
for i := uint32(0); i <= n; i++ {
sz := chunkConf.BlockSize
if i == n {
sz = int(s.Size) - int(i)*chunkConf.BlockSize
}
key := fmt.Sprintf("%d_%d_%d", s.Chunkid, i, sz)
if _, ok := blocks[key]; !ok {
if _, err := blob.Head(key); err != nil {
logger.Errorf("can't find block %s: %s", key, err)
lost++
lostBytes += sz
}
}
}
}
logger.Infof("Used by %d slices (%d bytes)", len(keys), totalBytes)
if lost > 0 {
logger.Fatalf("%d object is lost (%d bytes)", lost, lostBytes)
}
return nil
}
......@@ -202,6 +202,10 @@ func (g *GateWay) NewGatewayLayer(creds auth.Credentials) (minio.ObjectLayer, er
chunkid := args[1].(uint64)
return vfs.Compact(chunkConf, store, slices, chunkid)
}))
err = m.NewSession()
if err != nil {
logger.Fatalf("new session: %s", err)
}
conf := &vfs.Config{
Meta: &meta.Config{
......
/*
* JuiceFS, Copyright (C) 2018 Juicedata, Inc.
* JuiceFS, Copyright (C) 2021 Juicedata, Inc.
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
......@@ -17,6 +17,7 @@ package main
import (
"fmt"
"os"
"strconv"
"strings"
"sync"
......@@ -26,7 +27,7 @@ import (
"github.com/juicedata/juicefs/pkg/meta"
"github.com/juicedata/juicefs/pkg/object"
osync "github.com/juicedata/juicefs/pkg/sync"
"github.com/juicedata/juicefs/pkg/vfs"
"github.com/mattn/go-isatty"
"github.com/urfave/cli/v2"
)
......@@ -134,17 +135,6 @@ func gc(ctx *cli.Context) error {
logger.Fatalf("object storage: %s", err)
}
logger.Infof("Data use %s", blob)
store := chunk.NewCachedStore(blob, chunkConf)
m.OnMsg(meta.DeleteChunk, meta.MsgCallback(func(args ...interface{}) error {
chunkid := args[0].(uint64)
length := args[1].(uint32)
return store.Remove(chunkid, int(length))
}))
m.OnMsg(meta.CompactChunk, meta.MsgCallback(func(args ...interface{}) error {
slices := args[0].([]meta.Slice)
chunkid := args[1].(uint64)
return vfs.Compact(chunkConf, store, slices, chunkid)
}))
blob = object.WithPrefix(blob, "chunks/")
objs, err := osync.ListAll(blob, "", "")
......@@ -167,7 +157,9 @@ func gc(ctx *cli.Context) error {
logger.Infof("using %d slices (%d bytes)", len(keys), totalBytes)
var p = gcProgress{total: len(keys)}
go showProgress(&p)
if isatty.IsTerminal(os.Stdout.Fd()) {
go showProgress(&p)
}
var skipped, skippedBytes int64
maxMtime := time.Now().Add(time.Hour * -1)
......@@ -244,6 +236,15 @@ func gc(ctx *cli.Context) error {
}
close(leakedObj)
wg.Wait()
logger.Infof("found %d leaked objects (%d bytes), skipped %d (%d bytes)", p.leaked, p.leakedBytes, skipped, skippedBytes)
if p.leaked > 0 {
logger.Infof("found %d leaked objects (%d bytes), skipped %d (%d bytes)", p.leaked, p.leakedBytes, skipped, skippedBytes)
if !ctx.Bool("delete") {
logger.Infof("Please add `--delete` to clean them")
}
} else {
logger.Infof("scan %d objects, no leaked found", p.found)
}
return nil
}
......@@ -66,6 +66,7 @@ func main() {
rmrFlags(),
benchmarkFlags(),
gcFlags(),
checkFlags(),
},
}
......
......@@ -197,6 +197,10 @@ func mount(c *cli.Context) error {
chunkid := args[1].(uint64)
return vfs.Compact(chunkConf, store, slices, chunkid)
}))
err = m.NewSession()
if err != nil {
logger.Fatalf("new session: %s", err)
}
conf := &vfs.Config{
Meta: &meta.Config{
......
......@@ -18,7 +18,7 @@ fsx: healthcheck secfs.test/tools/bin/fsx
setup:
redis-server &
mkdir -p /jfs
../juicefs format -storage mem localhost unittest
../juicefs format localhost unittest
../juicefs mount -d --no-usage-report --enable-xattr localhost /jfs
healthcheck:
......
......@@ -133,6 +133,8 @@ type Meta interface {
Init(format Format, force bool) error
// Load loads the existing setting of a formatted volume from meta service.
Load() (*Format, error)
// NewSession create a new client session.
NewSession() error
// StatFS returns summary statistics of a volume.
StatFS(ctx Context, totalspace, availspace, iused, iavail *uint64) syscall.Errno
......
......@@ -171,22 +171,7 @@ func NewRedisMeta(url string, conf *RedisConfig) (Meta, error) {
},
}
m.shaLookup, err = m.rdb.ScriptLoad(Background, scriptLookup).Result()
if err != nil {
logger.Warnf("load scriptLookup: %v", err)
m.shaLookup = ""
}
m.checkServerConfig()
m.sid, err = m.rdb.Incr(Background, "nextsession").Result()
if err != nil {
return nil, fmt.Errorf("create session: %s", err)
}
logger.Debugf("session is is %d", m.sid)
go m.refreshSession()
go m.cleanupDeletedFiles()
go m.cleanupSlices()
go m.cleanupLeakedChunks()
return m, nil
}
......@@ -256,6 +241,27 @@ func (r *redisMeta) Load() (*Format, error) {
return &format, nil
}
func (r *redisMeta) NewSession() error {
var err error
r.sid, err = r.rdb.Incr(Background, "nextsession").Result()
if err != nil {
return fmt.Errorf("create session: %s", err)
}
logger.Debugf("session is is %d", r.sid)
r.shaLookup, err = r.rdb.ScriptLoad(Background, scriptLookup).Result()
if err != nil {
logger.Warnf("load scriptLookup: %v", err)
r.shaLookup = ""
}
go r.refreshSession()
go r.cleanupDeletedFiles()
go r.cleanupSlices()
go r.cleanupLeakedChunks()
return nil
}
func (r *redisMeta) OnMsg(mtype uint32, cb MsgCallback) {
r.msgCallbacks.Lock()
defer r.msgCallbacks.Unlock()
......
......@@ -74,6 +74,7 @@ func TestRedisClient(t *testing.T) {
}
m.OnMsg(DeleteChunk, func(args ...interface{}) error { return nil })
_ = m.Init(Format{Name: "test"}, true)
_ = m.NewSession()
ctx := Background
var parent, inode Ino
var attr = &Attr{}
......@@ -286,6 +287,7 @@ func TestCompaction(t *testing.T) {
}
return nil
})
_ = m.NewSession()
ctx := Background
var inode Ino
var attr = &Attr{}
......@@ -343,6 +345,8 @@ func TestConcurrentWrite(t *testing.T) {
return nil
})
_ = m.Init(Format{Name: "test"}, true)
_ = m.NewSession()
ctx := Background
var inode Ino
var attr = &Attr{}
......@@ -387,6 +391,8 @@ func TestTruncateAndDelete(t *testing.T) {
return nil
})
_ = m.Init(Format{Name: "test"}, true)
_ = m.NewSession()
ctx := Background
var inode Ino
var attr = &Attr{}
......@@ -451,6 +457,8 @@ func TestCopyFileRange(t *testing.T) {
return nil
})
_ = m.Init(Format{Name: "test"}, true)
_ = m.NewSession()
ctx := Background
var iin, iout Ino
var attr = &Attr{}
......@@ -505,6 +513,7 @@ func benchmarkReaddir(b *testing.B, n int) {
b.Logf("redis is not available: %s", err)
b.Skip()
}
_ = m.NewSession()
ctx := Background
var inode Ino
dname := fmt.Sprintf("largedir%d", n)
......
......@@ -49,6 +49,7 @@ func ExportRsaPrivateKeyToPem(key *rsa.PrivateKey, passphrase string) string {
}
if passphrase != "" {
var err error
// nolint:staticcheck
block, _ = x509.EncryptPEMBlock(rand.Reader, block.Type, buf, []byte(passphrase), x509.PEMCipherAES256)
if err != nil {
panic(err)
......@@ -65,12 +66,14 @@ func ParseRsaPrivateKeyFromPem(privPEM string, passphrase string) (*rsa.PrivateK
}
buf := block.Bytes
// nolint:staticcheck
if strings.Contains(block.Headers["Proc-Type"], "ENCRYPTED") &&
x509.IsEncryptedPEMBlock(block) {
if passphrase == "" {
return nil, fmt.Errorf("passphrase is required to private key")
}
var err error
// nolint:staticcheck
buf, err = x509.DecryptPEMBlock(block, []byte(passphrase))
if err != nil {
if err == x509.IncorrectPasswordError {
......
......@@ -303,6 +303,10 @@ func jfs_init(cname, jsonConf, user, group, superuser, supergroup *C.char) uintp
chunkid := args[1].(uint64)
return vfs.Compact(chunkConf, store, slices, chunkid)
}))
err = m.NewSession()
if err != nil {
logger.Fatalf("new session: %s", err)
}
conf := &vfs.Config{
Meta: &meta.Config{
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册