未验证 提交 7bbae9ce 编写于 作者: D Davies Liu 提交者: GitHub

Use SQL database as meta store (#375)

* invalidate read buffer after truncate

* no accesslog for truncate (included in setattr)

* use sql database as meta store

* pass simple tests

* use bytes for chunks

* use bytes for chunks

* cleanup trasaction

* cleanup

* add unit test for SQL

* cleanup

* cleanup

* fix build

* use db as meta store

* fix contention

* fix truncate and compaction

* fix contension on counter

* generate id in batch

* fix build

* fix test

* fix flaky test

* support mysql

* fix ci

* fix flaky test

* test with sqlite3

* update column as notnull

* remove passwd

* update metrics

* fix nextInode and deleted files
上级 3a94e8f8
......@@ -291,7 +291,7 @@ func benchmark(c *cli.Context) error {
st(name+"_sum")/st(name+"_total")*1000)
}
show("FUSE operation", "fuse_ops_durations_histogram_seconds")
show("Update meta", "redis_tx_durations_histogram_seconds")
show("Update meta", "transaction_durations_histogram_seconds")
show("Put object", "object_request_durations_histogram_seconds_PUT")
show("Get object first byte", "object_request_durations_histogram_seconds_GET")
show("Delete object", "object_request_durations_histogram_seconds_DELETE")
......
......@@ -143,16 +143,7 @@ func format(c *cli.Context) error {
if c.Args().Len() < 1 {
logger.Fatalf("Redis URL and name are required")
}
addr := c.Args().Get(0)
if !strings.Contains(addr, "://") {
addr = "redis://" + addr
}
logger.Infof("Meta address: %s", addr)
var rc = meta.RedisConfig{Retries: 2}
m, err := meta.NewRedisMeta(addr, &rc)
if err != nil {
logger.Fatalf("Meta is not available: %s", err)
}
m := meta.NewClient(c.Args().Get(0), &meta.Config{Retries: 2})
if c.Args().Len() < 2 {
logger.Fatalf("Please give it a name")
......
......@@ -41,17 +41,7 @@ func fsck(ctx *cli.Context) error {
if ctx.Args().Len() < 1 {
return fmt.Errorf("REDIS-URL is needed")
}
addr := ctx.Args().Get(0)
if !strings.Contains(addr, "://") {
addr = "redis://" + addr
}
logger.Infof("Meta address: %s", addr)
var rc = meta.RedisConfig{Retries: 10, Strict: true}
m, err := meta.NewRedisMeta(addr, &rc)
if err != nil {
logger.Fatalf("Meta: %s", err)
}
m := meta.NewClient(ctx.Args().Get(0), &meta.Config{Retries: 10, Strict: true})
format, err := m.Load()
if err != nil {
logger.Fatalf("load setting: %s", err)
......
......@@ -141,16 +141,11 @@ func (g *GateWay) NewGatewayLayer(creds auth.Credentials) (minio.ObjectLayer, er
mctx = meta.NewContext(uint32(os.Getpid()), uint32(os.Getuid()), []uint32{uint32(os.Getgid())})
c := g.ctx
redisAddr := c.Args().Get(0)
if !strings.Contains(redisAddr, "://") {
redisAddr = "redis://" + redisAddr
}
logger.Infof("Meta address: %s", redisAddr)
var rc = meta.RedisConfig{Retries: 10, Strict: true}
m, err := meta.NewRedisMeta(redisAddr, &rc)
if err != nil {
logger.Fatalf("Meta: %s", err)
}
addr := c.Args().Get(0)
m := meta.NewClient(addr, &meta.Config{
Retries: 10,
Strict: true,
})
format, err := m.Load()
if err != nil {
logger.Fatalf("load setting: %s", err)
......@@ -206,7 +201,7 @@ func (g *GateWay) NewGatewayLayer(creds auth.Credentials) (minio.ObjectLayer, er
conf := &vfs.Config{
Meta: &meta.Config{
IORetries: 10,
Retries: 10,
},
Format: format,
Version: version.Version(),
......
......@@ -106,17 +106,7 @@ func gc(ctx *cli.Context) error {
if ctx.Args().Len() < 1 {
return fmt.Errorf("REDIS-URL is needed")
}
addr := ctx.Args().Get(0)
if !strings.Contains(addr, "://") {
addr = "redis://" + addr
}
logger.Infof("Meta address: %s", addr)
var rc = meta.RedisConfig{Retries: 10, Strict: true}
m, err := meta.NewRedisMeta(addr, &rc)
if err != nil {
logger.Fatalf("Meta: %s", err)
}
m := meta.NewClient(ctx.Args().Get(0), &meta.Config{Retries: 10, Strict: true})
format, err := m.Load()
if err != nil {
logger.Fatalf("load setting: %s", err)
......
......@@ -75,9 +75,6 @@ func mount(c *cli.Context) error {
logger.Fatalf("Redis URL and mountpoint are required")
}
addr := c.Args().Get(0)
if !strings.Contains(addr, "://") {
addr = "redis://" + addr
}
if c.Args().Len() < 2 {
logger.Fatalf("MOUNTPOINT is required")
}
......@@ -87,17 +84,11 @@ func mount(c *cli.Context) error {
logger.Fatalf("create %s: %s", mp, err)
}
}
logger.Infof("Meta address: %s", addr)
var rc = meta.RedisConfig{
m := meta.NewClient(addr, &meta.Config{
Retries: 10,
Strict: true,
CaseInsensi: strings.HasSuffix(mp, ":") && runtime.GOOS == "windows",
}
m, err := meta.NewRedisMeta(addr, &rc)
if err != nil {
logger.Fatalf("Meta: %s", err)
}
})
format, err := m.Load()
if err != nil {
logger.Fatalf("load setting: %s", err)
......@@ -160,7 +151,7 @@ func mount(c *cli.Context) error {
conf := &vfs.Config{
Meta: &meta.Config{
IORetries: 10,
Retries: 10,
},
Format: format,
Version: version.Version(),
......
......@@ -18,7 +18,6 @@ package main
import (
"encoding/json"
"fmt"
"strings"
"github.com/juicedata/juicefs/pkg/meta"
"github.com/urfave/cli/v2"
......@@ -29,17 +28,7 @@ func status(ctx *cli.Context) error {
if ctx.Args().Len() < 1 {
return fmt.Errorf("REDIS-URL is needed")
}
addr := ctx.Args().Get(0)
if !strings.Contains(addr, "://") {
addr = "redis://" + addr
}
logger.Infof("Meta address: %s", addr)
var rc = meta.RedisConfig{Retries: 10, Strict: true}
m, err := meta.NewRedisMeta(addr, &rc)
if err != nil {
logger.Fatalf("Meta: %s", err)
}
m := meta.NewClient(ctx.Args().Get(0), &meta.Config{Retries: 10, Strict: true})
format, err := m.Load()
if err != nil {
logger.Fatalf("load setting: %s", err)
......
......@@ -685,7 +685,7 @@
"steppedLine": false,
"targets": [
{
"expr": "sum(rate(juicefs_redis_tx_durations_histogram_seconds_count{vol_name=\"$name\"}[1m])) by (node)",
"expr": "sum(rate(juicefs_transaction_durations_histogram_seconds_count{vol_name=\"$name\"}[1m])) by (node)",
"format": "time_series",
"interval": "",
"intervalFactor": 1,
......@@ -697,7 +697,7 @@
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "Redis Transcations",
"title": "Transcations",
"tooltip": {
"shared": true,
"sort": 0,
......@@ -783,7 +783,7 @@
"steppedLine": false,
"targets": [
{
"expr": "sum(rate(juicefs_redis_tx_durations_histogram_seconds_sum{vol_name=\"$name\"}[1m])) by (node,mp) * 1000000 / sum(rate(juicefs_redis_tx_durations_histogram_seconds_count{vol_name=\"$name\"}[1m])) by (node,mp)",
"expr": "sum(rate(juicefs_transaction_durations_histogram_seconds_sum{vol_name=\"$name\"}[1m])) by (node,mp) * 1000000 / sum(rate(juicefs_transaction_durations_histogram_seconds_count{vol_name=\"$name\"}[1m])) by (node,mp)",
"format": "time_series",
"interval": "",
"intervalFactor": 1,
......@@ -795,7 +795,7 @@
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "Redis Transcation Latency",
"title": "Transcation Latency",
"tooltip": {
"shared": true,
"sort": 0,
......@@ -869,7 +869,7 @@
"steppedLine": false,
"targets": [
{
"expr": "sum(rate(juicefs_redis_transaction_restart{vol_name=~\"$name\"}[1m])) by (node)",
"expr": "sum(rate(juicefs_transaction_restart{vol_name=~\"$name\"}[1m])) by (node)",
"format": "time_series",
"intervalFactor": 1,
"legendFormat": "Restarts {{node}}",
......@@ -879,7 +879,7 @@
"thresholds": [],
"timeFrom": null,
"timeShift": null,
"title": "Redis Transaction Restarts",
"title": "Transaction Restarts",
"tooltip": {
"shared": true,
"sort": 0,
......
......@@ -18,6 +18,7 @@ require (
github.com/colinmarc/hdfs/v2 v2.2.0
github.com/go-ini/ini v1.62.0 // indirect
github.com/go-redis/redis/v8 v8.4.0
github.com/go-sql-driver/mysql v1.5.0
github.com/gonutz/w32/v2 v2.2.0
github.com/google/gops v0.3.13
github.com/google/uuid v1.1.2
......@@ -31,6 +32,7 @@ require (
github.com/ks3sdklib/aws-sdk-go v0.0.0-20180820074416-dafab05ad142
github.com/kurin/blazer v0.2.1
github.com/mattn/go-isatty v0.0.12
github.com/mattn/go-sqlite3 v1.14.0
github.com/minio/cli v1.22.0
github.com/minio/minio v0.0.0-20210206053228-97fe57bba92c
github.com/minio/minio-go v6.0.14+incompatible
......@@ -55,6 +57,7 @@ require (
golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1
google.golang.org/api v0.5.0
xorm.io/xorm v1.0.7
)
replace github.com/minio/minio v0.0.0-20210206053228-97fe57bba92c => github.com/juicedata/minio v0.0.0-20210222051636-e7cabdf948f4
......@@ -4,6 +4,7 @@ cloud.google.com/go v0.39.0 h1:UgQP9na6OTfp4dsAiz/eFpFA1C6tPdH5wiRdi19tuMw=
cloud.google.com/go v0.39.0/go.mod h1:rVLT6fkc8chs9sfPtFc1SBH6em7n+ZoXaG+87tDISts=
git.apache.org/thrift.git v0.13.0 h1:/3bz5WZ+sqYArk7MBBBbDufMxKKOA56/6JO6psDpUDY=
git.apache.org/thrift.git v0.13.0/go.mod h1:fPE2ZNJGynbRyZ4dJvy6G277gSllfV2HJqblrnkyeyg=
gitea.com/xorm/sqlfiddle v0.0.0-20180821085327-62ce714f951a/go.mod h1:EXuID2Zs0pAQhH8yz+DNjUbjppKQzKFAn28TMYPB6IU=
github.com/Arvintian/scs-go-sdk v1.0.0 h1:2Hll7bcEc8co8v5/2Ibzo410fW4QnTnhkq1NOASd8vc=
github.com/Arvintian/scs-go-sdk v1.0.0/go.mod h1:DMIkwn27iuTIo9o7INj3L/bcA7bW6QwljWC3ZpxjkXw=
github.com/Arvintian/scs-go-sdk v1.1.0 h1:vqVOfoMD6XSr7eG1a2M9oSiQwhDZYKKdH2rrZRPx6So=
......@@ -50,6 +51,7 @@ github.com/IBM/ibm-cos-sdk-go v1.6.0/go.mod h1:Pa7XzoyngeWPnqGol8ZF+gwUeLEAyDHkk
github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0=
github.com/NetEase-Object-Storage/nos-golang-sdk v0.0.0-20171031020902-cc8892cb2b05 h1:NEPjpPSOSDDmnix+VANw/CfUs1fAorLIaz/IFz2eQ2o=
github.com/NetEase-Object-Storage/nos-golang-sdk v0.0.0-20171031020902-cc8892cb2b05/go.mod h1:0N5CbwYI/8V1T6YOEwkgMvLmiGDNn661vLutBZQrC2c=
github.com/PuerkitoBio/goquery v1.5.1/go.mod h1:GsLWisAFVj4WgDibEWF4pvYnkVQBpKBKeU+7zCJoLcc=
github.com/QcloudApi/qcloud_sign_golang v0.0.0-20141224014652-e4130a326409/go.mod h1:1pk82RBxDY/JZnPQrtqHlUFfCctgdorsd9M06fMynOM=
github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo=
github.com/Shopify/sarama v1.27.2/go.mod h1:g5s5osgELxgM+Md9Qni9rzo7Rbt+vvFQI4bt/Mc93II=
......@@ -67,6 +69,7 @@ github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRF
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/aliyun/aliyun-oss-go-sdk v2.1.0+incompatible h1:90Z2Cp7EqcbaYfVwVjmQoK8kgoFPz+doQlujcwe1BRg=
github.com/aliyun/aliyun-oss-go-sdk v2.1.0+incompatible/go.mod h1:T/Aws4fEfogEE9v+HPhhw+CntffsBHJ8nXQCwKr0/g8=
github.com/andybalholm/cascadia v1.1.0/go.mod h1:GsXiBklL0woXo1j/WYWtSYYC4ouU9PqHO0sqidkEA4Y=
github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/apache/thrift v0.13.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
......@@ -128,6 +131,7 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dchest/siphash v1.2.1 h1:4cLinnzVJDKxTCl9B01807Yiy+W7ZzVHj/KIroQRvT4=
github.com/dchest/siphash v1.2.1/go.mod h1:q+IRvb2gOSrUnYoPqHiyHXS0FOBBOdl6tONBlVnOnt4=
github.com/denisenkom/go-mssqldb v0.0.0-20200428022330-06a60b6afbbc/go.mod h1:xbL0rPBG9cCiLr28tMa8zpbdarY27NDyej4t/EjAShU=
github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
......@@ -197,6 +201,7 @@ github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7a
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
github.com/gogo/protobuf v1.3.1 h1:DqDEcV5aeaTmdFBePNpYsp3FlcVH/2ISVVM9Qf8PSls=
github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 h1:ZgQEtGgCBiWRM39fZuwSd1LwSqqSW0hOdXCYYDX0R3I=
......@@ -395,6 +400,7 @@ github.com/ks3sdklib/aws-sdk-go v0.0.0-20180820074416-dafab05ad142/go.mod h1:WKP
github.com/kurin/blazer v0.2.1 h1:lUhpcdTHl3foU5IcjgzM5Hbv9hQX7ce7PugSGIi+ztU=
github.com/kurin/blazer v0.2.1/go.mod h1:4FCXMUWo9DllR2Do4TtBd377ezyAJ51vB5uTBjt0pGU=
github.com/kylelemons/godebug v0.0.0-20170820004349-d65d576e9348/go.mod h1:B69LEHPfb2qLo0BaaOLcbitczOKLWTsrBG9LczfCD4k=
github.com/lib/pq v1.7.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/lib/pq v1.8.0 h1:9xohqzkUwzR4Ga4ivdTcawVS89YSDVxXMa3xJX3cGzg=
github.com/lib/pq v1.8.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-bc2310a04743/go.mod h1:qklhhLq1aX+mtWk9cPHPzaBjWImj5ULL6C7HFJtXQMM=
......@@ -418,6 +424,8 @@ github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Ky
github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
github.com/mattn/go-runewidth v0.0.4 h1:2BvfKmzob6Bmd4YsL0zygOqfdFnK7GR4QL06Do4/p7Y=
github.com/mattn/go-runewidth v0.0.4/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
github.com/mattn/go-sqlite3 v1.14.0 h1:mLyGNKR8+Vv9CAU7PphKa2hkEqxxhn8i32J6FPj1/QA=
github.com/mattn/go-sqlite3 v1.14.0/go.mod h1:JIl7NbARA7phWnGvh0LKTyg7S9BA+6gx71ShQilpsus=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
......@@ -632,6 +640,8 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/syndtr/goleveldb v1.0.0 h1:fBdIW9lB4Iz0n9khmH8w27SJ3QEJ7+IgjPEwGSZiFdE=
github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ=
github.com/tencentyun/cos-go-sdk-v5 v0.7.8 h1:BeqN3uNCyYgoujWqZDbpQMhNmPf5xIypjzbT2AMMZUs=
github.com/tencentyun/cos-go-sdk-v5 v0.7.8/go.mod h1:wQBO5HdAkLjj2q6XQiIfDSP8DXDNrppDRw2Kp/1BODA=
github.com/tidwall/gjson v1.6.7 h1:Mb1M9HZCRWEcXQ8ieJo7auYyyiSux6w9XN3AdTpxJrE=
......@@ -673,6 +683,7 @@ github.com/xlab/treeprint v1.0.0/go.mod h1:IoImgRak9i3zJyuxOKUP1v4UZd1tMoKkq/Cim
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yunify/qingstor-sdk-go v2.2.15+incompatible h1:/Z0q3/eSMoPYAuRmhjWtuGSmVVciFC6hfm3yfCKuvz0=
github.com/yunify/qingstor-sdk-go v2.2.15+incompatible/go.mod h1:w6wqLDQ5bBTzxGJ55581UrSwLrsTAsdo9N6yX/8d9RY=
github.com/ziutek/mymysql v1.5.4/go.mod h1:LMSpPZ6DbqWFxNCHW77HeMg9I646SAhApZ/wKdgO/C0=
go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/bbolt v1.3.5/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ=
go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738/go.mod h1:dnLIgRNXwCJa5e+c6mIZCrds/GIG4ncV9HhK5PX7jPg=
......@@ -700,6 +711,7 @@ golang.org/x/arch v0.0.0-20201008161808-52c3e6f60cff/go.mod h1:flIaEI6LNU6xOCD5P
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190325154230-a5d413f7728c/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190513172903-22d7a77e9e5f/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
......@@ -726,6 +738,7 @@ golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHl
golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc=
golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20180218175443-cbe0f9307d01/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
......@@ -927,3 +940,7 @@ rsc.io/goversion v1.2.0/go.mod h1:Eih9y/uIBS3ulggl7KNJ09xGSLcuNaLgmvvqa07sgfo=
rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4=
sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o=
sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0/go.mod h1:hI742Nqp5OhwiqlzhgfbWU4mW4yO10fP+LoT9WOswdU=
xorm.io/builder v0.3.7 h1:2pETdKRK+2QG4mLX4oODHEhn5Z8j1m8sXa7jfu+/SZI=
xorm.io/builder v0.3.7/go.mod h1:aUW0S9eb9VCaPohFCH3j7czOx1PMW3i1HrSzbLYGBSE=
xorm.io/xorm v1.0.7 h1:26yBTDVI+CfQpVz2Y88fISh+aiJXIPP4eNoTJlwzsC4=
xorm.io/xorm v1.0.7/go.mod h1:uF9EtbhODq5kNWxMbnBEj8hRRZnlcNSz2t2N7HW/+A4=
......@@ -352,7 +352,7 @@ func (fs *FileSystem) Rmr(ctx meta.Context, p string) (err syscall.Errno) {
if err != 0 {
return
}
err = fs.m.Rmr(ctx, parent.inode, path.Base(p))
err = meta.Remove(fs.m, ctx, parent.inode, path.Base(p))
return
}
......@@ -929,6 +929,6 @@ func (f *File) Summary(ctx meta.Context, depth uint8, maxentries uint32) (s *met
f.fs.log(l, "Summary (%s): %s (%d,%d,%d,%d)", f.path, errstr(err), s.Length, s.Size, s.Files, s.Dirs)
}()
s = &meta.Summary{}
err = f.fs.m.Summary(ctx, f.inode, s)
err = meta.GetSummary(f.fs.m, ctx, f.inode, s)
return
}
......@@ -25,11 +25,7 @@ import (
// nolint:errcheck
func TestFileSystem(t *testing.T) {
m, err := meta.NewRedisMeta("redis://127.0.0.1:6379/10", &meta.RedisConfig{})
if err != nil {
t.Logf("redis is not available: %s", err)
t.Skip()
}
m := meta.NewClient("redis://127.0.0.1:6379/10", &meta.Config{})
format := meta.Format{
Name: "test",
BlockSize: 4096,
......
......@@ -15,10 +15,11 @@
package meta
// Config for clients.
type Config struct {
Addr string
Password string
IORetries int
Strict bool // update ctime
Retries int
CaseInsensi bool
}
type Format struct {
......
......@@ -16,6 +16,7 @@
package meta
import (
"strings"
"syscall"
)
......@@ -203,11 +204,6 @@ type Meta interface {
// Setlk sets a file range lock on given file.
Setlk(ctx Context, inode Ino, owner uint64, block bool, ltype uint32, start, end uint64, pid uint32) syscall.Errno
// Summary returns the summary for given file or directory.
Summary(ctx Context, inode Ino, summary *Summary) syscall.Errno
// Rmr remove all the files and directories recursively.
Rmr(ctx Context, inode Ino, name string) syscall.Errno
// Compact all the chunks by merge small slices together
CompactAll(ctx Context) syscall.Errno
// ListSlices returns all slices used by all files.
......@@ -216,3 +212,26 @@ type Meta interface {
// OnMsg add a callback for the given message type.
OnMsg(mtype uint32, cb MsgCallback)
}
// NewClient creates a Meta client for given uri.
func NewClient(uri string, conf *Config) Meta {
if !strings.Contains(uri, "://") {
uri = "redis://" + uri
}
logger.Infof("Meta address: %s", uri)
var m Meta
var err error
if strings.HasPrefix(uri, "redis") {
m, err = newRedisMeta(uri, conf)
} else {
p := strings.Index(uri, "://")
if p < 0 {
logger.Fatalf("invalid uri: %s", uri)
}
m, err = newSQLMeta(uri[:p], uri[p+3:], conf)
}
if err != nil {
logger.Fatalf("Meta is not available: %s", err)
}
return m
}
......@@ -79,16 +79,9 @@ local ino = struct.unpack(">I8", string.sub(buf, 2))
return {ino, redis.call('GET', "i" .. tostring(ino))}
`
// RedisConfig is config for Redis client.
type RedisConfig struct {
Strict bool // update ctime
Retries int
CaseInsensi bool
}
type redisMeta struct {
sync.Mutex
conf *RedisConfig
conf *Config
rdb *redis.Client
txlocks [1024]sync.Mutex // Pessimistic locks to reduce conflict on Redis
......@@ -110,8 +103,8 @@ type msgCallbacks struct {
callbacks map[uint32]MsgCallback
}
// NewRedisMeta return a meta store using Redis.
func NewRedisMeta(url string, conf *RedisConfig) (Meta, error) {
// newRedisMeta return a meta store using Redis.
func newRedisMeta(url string, conf *Config) (Meta, error) {
opt, err := redis.ParseURL(url)
if err != nil {
return nil, fmt.Errorf("parse %s: %s", url, err)
......@@ -213,6 +206,9 @@ func (r *redisMeta) Init(format Format, force bool) error {
if err != nil {
return err
}
if body != nil {
return nil
}
// root inode
var attr Attr
......@@ -416,7 +412,7 @@ func (r *redisMeta) StatFS(ctx Context, totalspace, availspace, iused, iavail *u
return 0
}
func (r *redisMeta) Summary(ctx Context, inode Ino, summary *Summary) syscall.Errno {
func GetSummary(r Meta, ctx Context, inode Ino, summary *Summary) syscall.Errno {
var attr Attr
if st := r.GetAttr(ctx, inode, &attr); st != 0 {
return st
......@@ -431,7 +427,7 @@ func (r *redisMeta) Summary(ctx Context, inode Ino, summary *Summary) syscall.Er
continue
}
if e.Attr.Typ == TypeDirectory {
if st := r.Summary(ctx, e.Inode, summary); st != 0 {
if st := GetSummary(r, ctx, e.Inode, summary); st != 0 {
return st
}
} else {
......@@ -533,7 +529,7 @@ func (r *redisMeta) Lookup(ctx Context, parent Ino, name string, inode *Ino, att
return errno(err)
}
func (r *redisMeta) accessMode(attr *Attr, uid uint32, gid uint32) uint8 {
func accessMode(attr *Attr, uid uint32, gid uint32) uint8 {
if uid == 0 {
return 0x7
}
......@@ -562,7 +558,7 @@ func (r *redisMeta) Access(ctx Context, inode Ino, mmask uint8, attr *Attr) sysc
}
}
mode := r.accessMode(attr, ctx.Uid(), ctx.Gid())
mode := accessMode(attr, ctx.Uid(), ctx.Gid())
if mode&mmask != mmask {
logger.Debugf("Access inode %d %o, mode %o, request mode %o", inode, attr.Mode, mode, mmask)
return syscall.EACCES
......@@ -611,16 +607,13 @@ func (r *redisMeta) txn(ctx Context, txf func(tx *redis.Tx) error, keys ...strin
_, _ = khash.Write([]byte(keys[0]))
l := &r.txlocks[int(khash.Sum32())%len(r.txlocks)]
start := time.Now()
defer func() {
used := time.Since(start)
redisTxDist.Observe(used.Seconds())
}()
defer func() { txDist.Observe(time.Since(start).Seconds()) }()
l.Lock()
defer l.Unlock()
for i := 0; i < 50; i++ {
err = r.rdb.Watch(ctx, txf, keys...)
if err == redis.TxFailedErr {
redisTxRestart.Add(1)
txRestart.Add(1)
time.Sleep(time.Microsecond * 100 * time.Duration(rand.Int()%(i+1)))
continue
}
......@@ -1137,7 +1130,7 @@ func (r *redisMeta) Rmdir(ctx Context, parent Ino, name string) syscall.Errno {
}, r.inodeKey(parent), r.entryKey(parent), r.inodeKey(inode), r.entryKey(inode))
}
func (r *redisMeta) emptyDir(ctx Context, inode Ino, concurrent chan int) syscall.Errno {
func emptyDir(r Meta, ctx Context, inode Ino, concurrent chan int) syscall.Errno {
if st := r.Access(ctx, inode, 3, nil); st != 0 {
return st
}
......@@ -1157,14 +1150,14 @@ func (r *redisMeta) emptyDir(ctx Context, inode Ino, concurrent chan int) syscal
wg.Add(1)
go func(child Ino, name string) {
defer wg.Done()
e := r.emptyEntry(ctx, inode, name, child, concurrent)
e := emptyEntry(r, ctx, inode, name, child, concurrent)
if e != 0 {
status = e
}
<-concurrent
}(e.Inode, string(e.Name))
default:
if st := r.emptyEntry(ctx, inode, string(e.Name), e.Inode, concurrent); st != 0 {
if st := emptyEntry(r, ctx, inode, string(e.Name), e.Inode, concurrent); st != 0 {
return st
}
}
......@@ -1178,18 +1171,18 @@ func (r *redisMeta) emptyDir(ctx Context, inode Ino, concurrent chan int) syscal
return status
}
func (r *redisMeta) emptyEntry(ctx Context, parent Ino, name string, inode Ino, concurrent chan int) syscall.Errno {
st := r.emptyDir(ctx, inode, concurrent)
func emptyEntry(r Meta, ctx Context, parent Ino, name string, inode Ino, concurrent chan int) syscall.Errno {
st := emptyDir(r, ctx, inode, concurrent)
if st == 0 {
st = r.Rmdir(ctx, parent, name)
if st == syscall.ENOTEMPTY {
st = r.emptyEntry(ctx, parent, name, inode, concurrent)
st = emptyEntry(r, ctx, parent, name, inode, concurrent)
}
}
return st
}
func (r *redisMeta) Rmr(ctx Context, parent Ino, name string) syscall.Errno {
func Remove(r Meta, ctx Context, parent Ino, name string) syscall.Errno {
if st := r.Access(ctx, parent, 3, nil); st != 0 {
return st
}
......@@ -1202,7 +1195,7 @@ func (r *redisMeta) Rmr(ctx Context, parent Ino, name string) syscall.Errno {
return r.Unlink(ctx, parent, name)
}
concurrent := make(chan int, 50)
return r.emptyEntry(ctx, parent, name, inode, concurrent)
return emptyEntry(r, ctx, parent, name, inode, concurrent)
}
func (r *redisMeta) Rename(ctx Context, parentSrc Ino, nameSrc string, parentDst Ino, nameDst string, inode *Ino, attr *Attr) syscall.Errno {
......@@ -1685,7 +1678,7 @@ func (r *redisMeta) Read(ctx Context, inode Ino, indx uint32, chunks *[]Slice) s
ss := readSlices(vals)
*chunks = buildSlice(ss)
if len(vals) >= 5 || len(*chunks) >= 5 {
go r.compactChunk(inode, indx)
go r.compactChunk(inode, indx, false)
}
return 0
}
......@@ -1730,7 +1723,7 @@ func (r *redisMeta) Write(ctx Context, inode Ino, indx uint32, off uint32, slice
return nil
})
if err == nil && rpush.Val()%20 == 0 {
go r.compactChunk(inode, indx)
go r.compactChunk(inode, indx, false)
}
return err
}, r.inodeKey(inode))
......@@ -1847,7 +1840,7 @@ func (r *redisMeta) cleanupDeletedFiles() {
for {
time.Sleep(time.Minute)
now := time.Now()
members, _ := r.rdb.ZRangeByScore(Background, delfiles, &redis.ZRangeBy{Min: strconv.Itoa(0), Max: strconv.Itoa(int(now.Add(time.Hour).Unix())), Count: 1000}).Result()
members, _ := r.rdb.ZRangeByScore(Background, delfiles, &redis.ZRangeBy{Min: strconv.Itoa(0), Max: strconv.Itoa(int(now.Add(-time.Hour).Unix())), Count: 1000}).Result()
for _, member := range members {
ps := strings.Split(member, ":")
inode, _ := strconv.ParseInt(ps[0], 10, 0)
......@@ -2119,21 +2112,23 @@ func (r *redisMeta) deleteFile(inode Ino, length uint64, tracking string) {
_ = r.rdb.ZRem(ctx, delfiles, tracking)
}
func (r *redisMeta) compactChunk(inode Ino, indx uint32) {
func (r *redisMeta) compactChunk(inode Ino, indx uint32, force bool) {
// avoid too many or duplicated compaction
r.Lock()
k := uint64(inode) + (uint64(indx) << 32)
if len(r.compacting) > 10 || r.compacting[k] {
r.Unlock()
return
}
r.compacting[k] = true
r.Unlock()
defer func() {
if !force {
r.Lock()
delete(r.compacting, k)
k := uint64(inode) + (uint64(indx) << 32)
if len(r.compacting) > 10 || r.compacting[k] {
r.Unlock()
return
}
r.compacting[k] = true
r.Unlock()
}()
defer func() {
r.Lock()
delete(r.compacting, k)
r.Unlock()
}()
}
var ctx = Background
vals, err := r.rdb.LRange(ctx, r.chunkKey(inode, indx), 0, 200).Result()
......@@ -2244,7 +2239,7 @@ func (r *redisMeta) compactChunk(inode Ino, indx uint32) {
go func() {
// wait for the current compaction to finish
time.Sleep(time.Millisecond * 10)
r.compactChunk(inode, indx)
r.compactChunk(inode, indx, force)
}()
}
} else {
......@@ -2277,7 +2272,7 @@ func (r *redisMeta) CompactAll(ctx Context) syscall.Errno {
n, err := fmt.Sscanf(keys[i], "c%d_%d", &inode, &indx)
if err == nil && n == 2 {
logger.Debugf("compact chunk %d:%d (%d slices)", inode, indx, cnt)
r.compactChunk(Ino(inode), indx)
r.compactChunk(Ino(inode), indx, true)
}
}
}
......
......@@ -18,18 +18,18 @@ package meta
import "github.com/prometheus/client_golang/prometheus"
var (
redisTxDist = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "redis_tx_durations_histogram_seconds",
Help: "Redis transactions latency distributions.",
txDist = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "transaction_durations_histogram_seconds",
Help: "Transactions latency distributions.",
Buckets: prometheus.ExponentialBuckets(0.0001, 1.5, 30),
})
redisTxRestart = prometheus.NewCounter(prometheus.CounterOpts{
Name: "redis_transaction_restart",
Help: "The number of times a Redis transaction is restarted.",
txRestart = prometheus.NewCounter(prometheus.CounterOpts{
Name: "transaction_restart",
Help: "The number of times a transaction is restarted.",
})
)
func InitMetrics() {
prometheus.MustRegister(redisTxDist)
prometheus.MustRegister(redisTxRestart)
prometheus.MustRegister(txDist)
prometheus.MustRegister(txRestart)
}
......@@ -13,10 +13,12 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
//nolint:errcheck
package meta
import (
"bytes"
"context"
"fmt"
"sync"
"syscall"
......@@ -65,20 +67,23 @@ func BenchmarkReadSlices(b *testing.B) {
}
}
// nolint:errcheck
func TestRedisClient(t *testing.T) {
var conf RedisConfig
_, err := NewRedisMeta("http://127.0.0.1:6379/7", &conf)
var conf Config
_, err := newRedisMeta("http://127.0.0.1:6379/7", &conf)
if err == nil {
t.Fatal("meta created with invalid url")
}
m, err := NewRedisMeta("redis://127.0.0.1:6379/7", &conf)
m, err := newRedisMeta("redis://127.0.0.1:6379/7", &conf)
if err != nil {
t.Skipf("redis is not available: %s", err)
}
testMetaClient(t, m)
}
func testMetaClient(t *testing.T, m Meta) {
m.OnMsg(DeleteChunk, func(args ...interface{}) error { return nil })
_ = m.Init(Format{Name: "test"}, true)
err = m.Init(Format{Name: "test"}, false) // changes nothing
err := m.Init(Format{Name: "test"}, false) // changes nothing
if err != nil {
t.Fatalf("initialize failed: %s", err)
}
......@@ -89,8 +94,10 @@ func TestRedisClient(t *testing.T) {
if format.Name != "test" {
t.Fatalf("load got volume name %s, expected %s", format.Name, "test")
}
_ = m.NewSession()
go m.(*redisMeta).cleanStaleSessions()
switch r := m.(type) {
case *redisMeta:
go r.cleanStaleSessions()
}
ctx := Background
var parent, inode, dummyInode Ino
var attr = &Attr{}
......@@ -273,6 +280,47 @@ func TestRedisClient(t *testing.T) {
t.Fatalf("setxattr: %s", st)
}
var totalspace, availspace, iused, iavail uint64
if st := m.StatFS(ctx, &totalspace, &availspace, &iused, &iavail); st != 0 {
t.Fatalf("statfs: %s", st)
}
var summary Summary
if st := GetSummary(m, ctx, 1, &summary); st != 0 {
t.Fatalf("summary: %s", st)
}
expected := Summary{Length: 202, Size: 16384, Files: 3, Dirs: 2}
if summary != expected {
t.Fatalf("summary %+v not equal to expected: %+v", summary, expected)
}
if st := GetSummary(m, ctx, inode, &summary); st != 0 {
t.Fatalf("summary: %s", st)
}
expected = Summary{Length: 402, Size: 20480, Files: 4, Dirs: 2}
if summary != expected {
t.Fatalf("summary %+v not equal to expected: %+v", summary, expected)
}
if st := m.Unlink(ctx, 1, "f"); st != 0 {
t.Fatalf("unlink f: %s", st)
}
if st := m.Rmdir(ctx, 1, "d"); st != 0 {
t.Fatalf("rmdir d: %s", st)
}
}
func TestLocksRedis(t *testing.T) {
var conf Config
m, err := newRedisMeta("redis://127.0.0.1:6379/5", &conf)
if err != nil {
t.Skipf("redis is not available: %s", err)
}
_ = m.Init(Format{Name: "test"}, true)
ctx := Background
var inode Ino
var attr = &Attr{}
defer m.Unlink(ctx, 1, "f")
if st := m.Create(ctx, 1, "f", 0644, 0, &inode, attr); st != 0 {
t.Fatalf("create f: %s", st)
}
// flock
if st := m.Flock(ctx, inode, 1, syscall.F_RDLCK, false); st != 0 {
t.Fatalf("flock rlock: %s", st)
......@@ -358,40 +406,18 @@ func TestRedisClient(t *testing.T) {
}(i)
}
g.Wait()
var totalspace, availspace, iused, iavail uint64
if st := m.StatFS(ctx, &totalspace, &availspace, &iused, &iavail); st != 0 {
t.Fatalf("statfs: %s", st)
}
var summary Summary
if st := m.Summary(ctx, 1, &summary); st != 0 {
t.Fatalf("summary: %s", st)
}
expected := Summary{Length: 202, Size: 16384, Files: 3, Dirs: 2}
if summary != expected {
t.Fatalf("summary %+v not equal to expected: %+v", summary, expected)
}
if st := m.Summary(ctx, inode, &summary); st != 0 {
t.Fatalf("summary: %s", st)
}
expected = Summary{Length: 402, Size: 20480, Files: 4, Dirs: 2}
if summary != expected {
t.Fatalf("summary %+v not equal to expected: %+v", summary, expected)
}
if st := m.Unlink(ctx, 1, "f"); st != 0 {
t.Fatalf("unlink f: %s", st)
}
if st := m.Rmdir(ctx, 1, "d"); st != 0 {
t.Fatalf("rmdir d: %s", st)
}
}
func TestRmr(t *testing.T) {
var conf RedisConfig
m, err := NewRedisMeta("redis://127.0.0.1:6379/5", &conf)
func TestRemove(t *testing.T) {
var conf Config
m, err := newRedisMeta("redis://127.0.0.1:6379/5", &conf)
if err != nil {
t.Skipf("redis is not available: %s", err)
}
testRemove(t, m)
}
func testRemove(t *testing.T, m Meta) {
_ = m.Init(Format{Name: "test"}, true)
ctx := Background
var inode, parent Ino
......@@ -399,7 +425,7 @@ func TestRmr(t *testing.T) {
if st := m.Create(ctx, 1, "f", 0644, 0, &inode, attr); st != 0 {
t.Fatalf("create f: %s", st)
}
if st := m.Rmr(ctx, 1, "f"); st != 0 {
if st := Remove(m, ctx, 1, "f"); st != 0 {
t.Fatalf("rmr f: %s", st)
}
if st := m.Mkdir(ctx, 1, "d", 0755, 0, 0, &parent, attr); st != 0 {
......@@ -411,17 +437,21 @@ func TestRmr(t *testing.T) {
if st := m.Create(ctx, parent, "f", 0644, 0, &inode, attr); st != 0 {
t.Fatalf("create d/f: %s", st)
}
if st := m.Rmr(ctx, 1, "d"); st != 0 {
if st := Remove(m, ctx, 1, "d"); st != 0 {
t.Fatalf("rmr d: %s", st)
}
}
func TestCaseIncensi(t *testing.T) {
var conf = RedisConfig{CaseInsensi: true}
m, err := NewRedisMeta("redis://127.0.0.1:6379/6", &conf)
var conf = Config{CaseInsensi: true}
m, err := newRedisMeta("redis://127.0.0.1:6379/6", &conf)
if err != nil {
t.Skipf("redis is not available: %s", err)
}
testCaseIncensi(t, m)
}
func testCaseIncensi(t *testing.T, m Meta) {
_ = m.Init(Format{Name: "test"}, true)
ctx := Background
var inode Ino
......@@ -434,7 +464,10 @@ func TestCaseIncensi(t *testing.T) {
t.Fatalf("lookup Foo should be OK")
}
if st := m.Rename(ctx, 1, "Foo", 1, "bar", &inode, attr); st != 0 {
t.Fatalf("rename Foo to bar should be OK")
t.Fatalf("rename Foo to bar should be OK, but got %s", st)
}
if st := m.Lookup(ctx, 1, "Bar", &inode, attr); st != 0 {
t.Fatalf("lookup Bar should be OK")
}
if st := m.Unlink(ctx, 1, "Bar"); st != 0 {
t.Fatalf("unlink Bar should be OK")
......@@ -448,13 +481,16 @@ func TestCaseIncensi(t *testing.T) {
}
func TestCompaction(t *testing.T) {
var conf RedisConfig
m, err := NewRedisMeta("redis://127.0.0.1:6379/8", &conf)
var conf Config
m, err := newRedisMeta("redis://127.0.0.1:6379/8", &conf)
if err != nil {
t.Skipf("redis is not available: %s", err)
}
testCompaction(t, m)
}
func testCompaction(t *testing.T, m Meta) {
_ = m.Init(Format{Name: "test"}, true)
done := make(chan bool, 1)
var l sync.Mutex
deleted := make(map[uint64]int)
m.OnMsg(DeleteChunk, func(args ...interface{}) error {
......@@ -465,13 +501,8 @@ func TestCompaction(t *testing.T) {
return nil
})
m.OnMsg(CompactChunk, func(args ...interface{}) error {
select {
case done <- true:
default:
}
return nil
})
_ = m.NewSession()
ctx := Background
var inode Ino
var attr = &Attr{}
......@@ -484,15 +515,24 @@ func TestCompaction(t *testing.T) {
}()
// random write
_ = m.Write(ctx, inode, 1, uint32(0), Slice{Chunkid: uint64(1000), Size: 64 << 20, Len: 64 << 20})
_ = m.Write(ctx, inode, 1, uint32(30<<20), Slice{Chunkid: uint64(1001), Size: 8, Len: 8})
_ = m.Write(ctx, inode, 1, uint32(40<<20), Slice{Chunkid: uint64(1002), Size: 8, Len: 8})
var chunkid uint64
m.NewChunk(ctx, inode, 0, 0, &chunkid)
_ = m.Write(ctx, inode, 1, uint32(0), Slice{Chunkid: chunkid, Size: 64 << 20, Len: 64 << 20})
m.NewChunk(ctx, inode, 0, 0, &chunkid)
_ = m.Write(ctx, inode, 1, uint32(30<<20), Slice{Chunkid: chunkid, Size: 8, Len: 8})
m.NewChunk(ctx, inode, 0, 0, &chunkid)
_ = m.Write(ctx, inode, 1, uint32(40<<20), Slice{Chunkid: chunkid, Size: 8, Len: 8})
var cs1 []Slice
_ = m.Read(ctx, inode, 1, &cs1)
if len(cs1) != 5 {
t.Fatalf("expect 5 slices, but got %+v", cs1)
}
m.(*redisMeta).compactChunk(inode, 1)
switch r := m.(type) {
case *redisMeta:
r.compactChunk(inode, 1, true)
case *dbMeta:
r.compactChunk(inode, 1, true)
}
var cs []Slice
_ = m.Read(ctx, inode, 1, &cs)
if len(cs) != 1 {
......@@ -502,28 +542,26 @@ func TestCompaction(t *testing.T) {
// append
var size uint32 = 1000000
for i := 0; i < 50; i++ {
if st := m.Write(ctx, inode, 0, uint32(i)*size, Slice{Chunkid: uint64(i) + 1, Size: size, Len: size}); st != 0 {
var chunkid uint64
m.NewChunk(ctx, inode, 0, 0, &chunkid)
if st := m.Write(ctx, inode, 0, uint32(i)*size, Slice{Chunkid: chunkid, Size: size, Len: size}); st != 0 {
t.Fatalf("write %d: %s", i, st)
}
time.Sleep(time.Millisecond)
}
<-done
switch r := m.(type) {
case *redisMeta:
r.compactChunk(inode, 0, true)
case *dbMeta:
r.compactChunk(inode, 0, true)
}
var chunks []Slice
if st := m.Read(ctx, inode, 0, &chunks); st != 0 {
t.Fatalf("read 0: %s", st)
}
if len(chunks) > 20 {
if len(chunks) >= 10 {
t.Fatalf("inode %d should be compacted, but have %d slices", inode, len(chunks))
}
<-done
// wait for it to update chunks
time.Sleep(time.Millisecond * 5)
if st := m.Read(ctx, inode, 0, &chunks); st != 0 {
t.Fatalf("read 0: %s", st)
}
if len(chunks) > 3 {
t.Fatalf("inode %d should be compacted after read, but have %d slices", inode, len(chunks))
}
var total uint32
for _, s := range chunks {
total += s.Len
......@@ -550,11 +588,15 @@ func TestCompaction(t *testing.T) {
}
func TestConcurrentWrite(t *testing.T) {
var conf RedisConfig
m, err := NewRedisMeta("redis://127.0.0.1/9", &conf)
var conf Config
m, err := newRedisMeta("redis://127.0.0.1/9", &conf)
if err != nil {
t.Skipf("redis is not available: %s", err)
}
testConcurrentWrite(t, m)
}
func testConcurrentWrite(t *testing.T, m Meta) {
m.OnMsg(DeleteChunk, func(args ...interface{}) error {
return nil
})
......@@ -562,7 +604,6 @@ func TestConcurrentWrite(t *testing.T) {
return nil
})
_ = m.Init(Format{Name: "test"}, true)
_ = m.NewSession()
ctx := Background
var inode Ino
......@@ -571,17 +612,18 @@ func TestConcurrentWrite(t *testing.T) {
if st := m.Create(ctx, 1, "f", 0650, 022, &inode, attr); st != 0 {
t.Fatalf("create file %s", st)
}
// nolint:errcheck
defer m.Unlink(ctx, 1, "f")
var errno syscall.Errno
var g sync.WaitGroup
for i := 0; i <= 20; i++ {
for i := 0; i <= 10; i++ {
g.Add(1)
go func(indx uint32) {
defer g.Done()
for j := 0; j < 100; j++ {
var slice = Slice{Chunkid: 1, Size: 100, Len: 100}
var chunkid uint64
m.NewChunk(ctx, inode, indx, 0, &chunkid)
var slice = Slice{Chunkid: chunkid, Size: 100, Len: 100}
st := m.Write(ctx, inode, indx, 0, slice)
if st != 0 {
errno = st
......@@ -596,18 +638,21 @@ func TestConcurrentWrite(t *testing.T) {
}
}
// nolint:errcheck
func TestTruncateAndDelete(t *testing.T) {
var conf RedisConfig
m, err := NewRedisMeta("redis://127.0.0.1/10", &conf)
var conf Config
m, err := newRedisMeta("redis://127.0.0.1/10", &conf)
if err != nil {
t.Skipf("redis is not available: %s", err)
}
m.(*redisMeta).rdb.FlushDB(context.Background())
testTruncateAndDelete(t, m)
}
func testTruncateAndDelete(t *testing.T, m Meta) {
m.OnMsg(DeleteChunk, func(args ...interface{}) error {
return nil
})
_ = m.Init(Format{Name: "test"}, true)
_ = m.NewSession()
ctx := Background
var inode Ino
......@@ -632,24 +677,11 @@ func TestTruncateAndDelete(t *testing.T) {
if st := m.Truncate(ctx, inode, 0, (300<<20)+10, attr); st != 0 {
t.Fatalf("truncate file %s", st)
}
r := m.(*redisMeta)
listAll := func(pattern string) []string {
var keys, ks []string
var cursor uint64
for {
ks, cursor, err = r.rdb.Scan(ctx, cursor, pattern, 1000).Result()
keys = append(keys, ks...)
if err != nil || cursor == 0 {
break
}
}
return keys
}
keys := listAll(fmt.Sprintf("c%d_*", inode))
if len(keys) != 3 {
t.Fatalf("number of chunks: %d != 3, %+v", len(keys), keys)
var ss []Slice
m.ListSlices(ctx, &ss)
if len(ss) != 1 {
t.Fatalf("number of chunks: %d != 3, %+v", len(ss), ss)
}
m.Close(ctx, inode)
if st := m.Unlink(ctx, 1, "f"); st != 0 {
......@@ -657,25 +689,28 @@ func TestTruncateAndDelete(t *testing.T) {
}
time.Sleep(time.Millisecond * 100)
keys = listAll(fmt.Sprintf("c%d_*", inode))
m.ListSlices(ctx, &ss)
// the last chunk could be found and deleted
if len(keys) > 1 {
t.Fatalf("number of chunks: %d > 1, %+v", len(keys), keys)
if len(ss) > 1 {
t.Fatalf("number of chunks: %d > 1, %+v", len(ss), ss)
}
}
// nolint:errcheck
func TestCopyFileRange(t *testing.T) {
var conf RedisConfig
m, err := NewRedisMeta("redis://127.0.0.1/10", &conf)
var conf Config
m, err := newRedisMeta("redis://127.0.0.1/10", &conf)
if err != nil {
t.Skipf("redis is not available: %s", err)
}
m.(*redisMeta).rdb.FlushDB(context.Background())
testCopyFileRange(t, m)
}
func testCopyFileRange(t *testing.T, m Meta) {
m.OnMsg(DeleteChunk, func(args ...interface{}) error {
return nil
})
_ = m.Init(Format{Name: "test"}, true)
_ = m.NewSession()
ctx := Background
var iin, iout Ino
......@@ -725,20 +760,19 @@ func TestCopyFileRange(t *testing.T) {
}
func benchmarkReaddir(b *testing.B, n int) {
var conf RedisConfig
m, err := NewRedisMeta("redis://127.0.0.1/10", &conf)
var conf Config
m, err := newRedisMeta("redis://127.0.0.1/10", &conf)
if err != nil {
b.Skipf("redis is not available: %s", err)
}
_ = m.NewSession()
ctx := Background
var inode Ino
dname := fmt.Sprintf("largedir%d", n)
var es []*Entry
if m.Lookup(ctx, 1, dname, &inode, nil) == 0 && m.Readdir(ctx, inode, 0, &es) == 0 && len(es) == n+2 {
} else {
_ = m.Rmr(ctx, 1, dname)
_ = m.Mkdir(ctx, 1, dname, 0755, 0, 0, &inode, nil)
Remove(m, ctx, 1, dname)
m.Mkdir(ctx, 1, dname, 0755, 0, 0, &inode, nil)
for j := 0; j < n; j++ {
_ = m.Create(ctx, inode, fmt.Sprintf("d%d", j), 0755, 0, nil, nil)
}
......
......@@ -87,8 +87,10 @@ func (s *slice) visit(f func(*slice)) {
right.visit(f)
}
const sliceBytes = 24
func marshalSlice(pos uint32, chunkid uint64, size, off, len uint32) []byte {
w := utils.NewBuffer(24)
w := utils.NewBuffer(sliceBytes)
w.Put32(pos)
w.Put64(chunkid)
w.Put32(size)
......@@ -107,3 +109,13 @@ func readSlices(vals []string) []*slice {
}
return ss
}
func readSliceBuf(buf []byte) []*slice {
var ss []*slice
for i := 0; i < len(buf); i += sliceBytes {
s := new(slice)
s.read(buf[i:])
ss = append(ss, s)
}
return ss
}
此差异已折叠。
/*
* JuiceFS, Copyright (C) 2020 Juicedata, Inc.
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
//nolint:errcheck
package meta
import (
"os"
"testing"
)
func TestSQLClient(t *testing.T) {
os.Remove("test.db")
m, err := newSQLMeta("sqlite3", "test.db", &Config{})
if err != nil {
t.Fatalf("create meta: %s", err)
}
testMetaClient(t, m)
}
func TestMySQLClient(t *testing.T) {
m, err := newSQLMeta("mysql", "root:@/dev", &Config{})
if err != nil {
t.Skipf("create meta: %s", err)
}
m.engine.DropTables(&setting{})
m.engine.DropTables(&counter{})
m.engine.DropTables(&node{})
m.engine.DropTables(&edge{})
m.engine.DropTables(&symlink{})
m.engine.DropTables(&chunk{})
m.engine.DropTables(&sliceRef{})
m.engine.DropTables(&session{})
m.engine.DropTables(&sustained{})
m.engine.DropTables(&xattr{})
m.engine.DropTables(&delfile{})
testMetaClient(t, m)
}
func TestConcurrentWriteSQL(t *testing.T) {
os.Remove("test1.db")
m, err := newSQLMeta("sqlite3", "test1.db", &Config{})
if err != nil {
t.Fatalf("create meta: %s", err)
}
testConcurrentWrite(t, m)
}
func TestCompactionSQL(t *testing.T) {
os.Remove("test2.db")
m, err := newSQLMeta("sqlite3", "test2.db", &Config{})
if err != nil {
t.Fatalf("create meta: %s", err)
}
testCompaction(t, m)
}
func TestTruncateAndDeleteSQL(t *testing.T) {
os.Remove("test.db")
m, err := newSQLMeta("sqlite3", "test.db", &Config{})
if err != nil {
t.Fatalf("create meta: %s", err)
}
testTruncateAndDelete(t, m)
}
func TestCopyFileRangeSQL(t *testing.T) {
os.Remove("test.db")
m, err := newSQLMeta("sqlite3", "test.db", &Config{})
if err != nil {
t.Fatalf("create meta: %s", err)
}
testCopyFileRange(t, m)
}
func TestCaseIncensiSQL(t *testing.T) {
os.Remove("test.db")
m, err := newSQLMeta("sqlite3", "test.db", &Config{CaseInsensi: true})
if err != nil {
t.Fatalf("create meta: %s", err)
}
testCaseIncensi(t, m)
}
......@@ -154,14 +154,14 @@ func handleInternalMsg(ctx Context, msg []byte) []byte {
case meta.Rmr:
inode := Ino(r.Get64())
name := string(r.Get(int(r.Get8())))
r := m.Rmr(ctx, inode, name)
r := meta.Remove(m, ctx, inode, name)
return []byte{uint8(r)}
case meta.Info:
var summary meta.Summary
inode := Ino(r.Get64())
wb := utils.NewBuffer(4)
r := m.Summary(ctx, inode, &summary)
r := meta.GetSummary(m, ctx, inode, &summary)
if r != 0 {
msg := r.Error()
wb.Put32(uint32(len(msg)))
......
......@@ -706,7 +706,7 @@ func NewDataReader(conf *Config, m meta.Meta, store chunk.ChunkStore) DataReader
readAheadTotal: uint64(readAheadTotal),
readAheadMax: uint64(readAheadMax),
maxRequests: readAheadMax/conf.Chunk.BlockSize*readSessions + 1,
maxRetries: uint32(conf.Meta.IORetries),
maxRetries: uint32(conf.Meta.Retries),
}
go r.checkReadBuffer()
return r
......
......@@ -402,7 +402,7 @@ func NewDataWriter(conf *Config, m meta.Meta, store chunk.ChunkStore) DataWriter
blockSize: conf.Chunk.BlockSize,
bufferSize: int64(conf.Chunk.BufferSize),
files: make(map[Ino]*fileWriter),
maxRetries: uint32(conf.Meta.IORetries),
maxRetries: uint32(conf.Meta.Retries),
}
go w.flushAll()
return w
......
......@@ -310,15 +310,7 @@ func jfs_init(cname, jsonConf, user, group, superuser, supergroup *C.char) uintp
utils.InitLoggers(false)
addr := jConf.MetaURL
if !strings.Contains(addr, "://") {
addr = "redis://" + addr
}
logger.Infof("Meta address: %s", addr)
var rc = meta.RedisConfig{Retries: 10, Strict: true}
m, err := meta.NewRedisMeta(addr, &rc)
if err != nil {
logger.Fatalf("Meta: %s", err)
}
m := meta.NewClient(addr, &meta.Config{Retries: 10, Strict: true})
format, err := m.Load()
if err != nil {
logger.Fatalf("load setting: %s", err)
......@@ -406,7 +398,7 @@ func jfs_init(cname, jsonConf, user, group, superuser, supergroup *C.char) uintp
conf := &vfs.Config{
Meta: &meta.Config{
IORetries: 10,
Retries: 10,
},
Format: format,
Chunk: &chunkConf,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册