未验证 提交 4dda37e9 编写于 作者: D Davies Liu 提交者: GitHub

add a option to limit the concurrent deletes (#917)

* add a option to limit the concurrent deletes

* fix tests
上级 7a42e87a
...@@ -162,10 +162,11 @@ func (g *GateWay) NewGatewayLayer(creds auth.Credentials) (minio.ObjectLayer, er ...@@ -162,10 +162,11 @@ func (g *GateWay) NewGatewayLayer(creds auth.Credentials) (minio.ObjectLayer, er
c := g.ctx c := g.ctx
addr := c.Args().Get(0) addr := c.Args().Get(0)
m := meta.NewClient(addr, &meta.Config{ m := meta.NewClient(addr, &meta.Config{
Retries: 10, Retries: 10,
Strict: true, Strict: true,
ReadOnly: c.Bool("read-only"), ReadOnly: c.Bool("read-only"),
OpenCache: time.Duration(c.Float64("open-cache") * 1e9), OpenCache: time.Duration(c.Float64("open-cache") * 1e9),
MaxDeletes: c.Int("max-deletes"),
}) })
format, err := m.Load() format, err := m.Load()
if err != nil { if err != nil {
......
...@@ -80,7 +80,11 @@ func gc(ctx *cli.Context) error { ...@@ -80,7 +80,11 @@ func gc(ctx *cli.Context) error {
if ctx.Args().Len() < 1 { if ctx.Args().Len() < 1 {
return fmt.Errorf("META-URL is needed") return fmt.Errorf("META-URL is needed")
} }
m := meta.NewClient(ctx.Args().Get(0), &meta.Config{Retries: 10, Strict: true}) m := meta.NewClient(ctx.Args().Get(0), &meta.Config{
Retries: 10,
Strict: true,
MaxDeletes: ctx.Int("threads"),
})
format, err := m.Load() format, err := m.Load()
if err != nil { if err != nil {
logger.Fatalf("load setting: %s", err) logger.Fatalf("load setting: %s", err)
......
...@@ -120,6 +120,7 @@ func mount(c *cli.Context) error { ...@@ -120,6 +120,7 @@ func mount(c *cli.Context) error {
OpenCache: time.Duration(c.Float64("open-cache") * 1e9), OpenCache: time.Duration(c.Float64("open-cache") * 1e9),
MountPoint: mp, MountPoint: mp,
Subdir: c.String("subdir"), Subdir: c.String("subdir"),
MaxDeletes: c.Int("max-deletes"),
} }
m := meta.NewClient(addr, metaConf) m := meta.NewClient(addr, metaConf)
format, err := m.Load() format, err := m.Load()
...@@ -277,6 +278,11 @@ func clientFlags() []cli.Flag { ...@@ -277,6 +278,11 @@ func clientFlags() []cli.Flag {
Value: 20, Value: 20,
Usage: "number of connections to upload", Usage: "number of connections to upload",
}, },
&cli.IntFlag{
Name: "max-deletes",
Value: 2,
Usage: "number of threads to delete objects",
},
&cli.IntFlag{ &cli.IntFlag{
Name: "buffer-size", Name: "buffer-size",
Value: 300, Value: 300,
......
...@@ -27,7 +27,7 @@ import ( ...@@ -27,7 +27,7 @@ import (
// nolint:errcheck // nolint:errcheck
func TestFileSystem(t *testing.T) { func TestFileSystem(t *testing.T) {
m := meta.NewClient("redis://127.0.0.1:6379/10", &meta.Config{}) m := meta.NewClient("redis://127.0.0.1:6379/10", &meta.Config{MaxDeletes: 1})
format := meta.Format{ format := meta.Format{
Name: "test", Name: "test",
BlockSize: 4096, BlockSize: 4096,
......
...@@ -26,6 +26,7 @@ type Config struct { ...@@ -26,6 +26,7 @@ type Config struct {
OpenCache time.Duration OpenCache time.Duration
MountPoint string MountPoint string
Subdir string Subdir string
MaxDeletes int
} }
type Format struct { type Format struct {
......
...@@ -162,7 +162,7 @@ func newRedisMeta(driver, addr string, conf *Config) (Meta, error) { ...@@ -162,7 +162,7 @@ func newRedisMeta(driver, addr string, conf *Config) (Meta, error) {
of: newOpenFiles(conf.OpenCache), of: newOpenFiles(conf.OpenCache),
removedFiles: make(map[Ino]bool), removedFiles: make(map[Ino]bool),
compacting: make(map[uint64]bool), compacting: make(map[uint64]bool),
deleting: make(chan int, 2), deleting: make(chan int, conf.MaxDeletes),
symlinks: &sync.Map{}, symlinks: &sync.Map{},
msgCallbacks: &msgCallbacks{ msgCallbacks: &msgCallbacks{
callbacks: make(map[uint32]MsgCallback), callbacks: make(map[uint32]MsgCallback),
...@@ -2535,6 +2535,9 @@ func (r *redisMeta) toDelete(inode Ino, length uint64) string { ...@@ -2535,6 +2535,9 @@ func (r *redisMeta) toDelete(inode Ino, length uint64) string {
} }
func (r *redisMeta) deleteSlice(ctx Context, chunkid uint64, size uint32) { func (r *redisMeta) deleteSlice(ctx Context, chunkid uint64, size uint32) {
if r.conf.MaxDeletes == 0 {
return
}
r.deleting <- 1 r.deleting <- 1
defer func() { <-r.deleting }() defer func() { <-r.deleting }()
err := r.newMsg(DeleteChunk, chunkid, size) err := r.newMsg(DeleteChunk, chunkid, size)
......
...@@ -32,7 +32,7 @@ import ( ...@@ -32,7 +32,7 @@ import (
) )
func TestRedisClient(t *testing.T) { func TestRedisClient(t *testing.T) {
var conf Config var conf = Config{MaxDeletes: 1}
_, err := newRedisMeta("http", "127.0.0.1:6379/7", &conf) _, err := newRedisMeta("http", "127.0.0.1:6379/7", &conf)
if err == nil { if err == nil {
t.Fatal("meta created with invalid url") t.Fatal("meta created with invalid url")
...@@ -680,7 +680,7 @@ func testCaseIncensi(t *testing.T, m Meta) { ...@@ -680,7 +680,7 @@ func testCaseIncensi(t *testing.T, m Meta) {
} }
func TestCompaction(t *testing.T) { func TestCompaction(t *testing.T) {
var conf Config var conf = Config{MaxDeletes: 1}
m, err := newRedisMeta("redis", "127.0.0.1:6379/8", &conf) m, err := newRedisMeta("redis", "127.0.0.1:6379/8", &conf)
if err != nil { if err != nil {
t.Skipf("redis is not available: %s", err) t.Skipf("redis is not available: %s", err)
...@@ -785,7 +785,7 @@ func testCompaction(t *testing.T, m Meta) { ...@@ -785,7 +785,7 @@ func testCompaction(t *testing.T, m Meta) {
} }
func TestConcurrentWrite(t *testing.T) { func TestConcurrentWrite(t *testing.T) {
var conf Config var conf = Config{MaxDeletes: 1}
m, err := newRedisMeta("redis", "127.0.0.1/9", &conf) m, err := newRedisMeta("redis", "127.0.0.1/9", &conf)
if err != nil { if err != nil {
t.Skipf("redis is not available: %s", err) t.Skipf("redis is not available: %s", err)
...@@ -836,7 +836,7 @@ func testConcurrentWrite(t *testing.T, m Meta) { ...@@ -836,7 +836,7 @@ func testConcurrentWrite(t *testing.T, m Meta) {
} }
func TestTruncateAndDelete(t *testing.T) { func TestTruncateAndDelete(t *testing.T) {
var conf Config var conf = Config{MaxDeletes: 1}
m, err := newRedisMeta("redis", "127.0.0.1/10", &conf) m, err := newRedisMeta("redis", "127.0.0.1/10", &conf)
if err != nil { if err != nil {
t.Skipf("redis is not available: %s", err) t.Skipf("redis is not available: %s", err)
......
...@@ -190,7 +190,7 @@ func newSQLMeta(driver, addr string, conf *Config) (Meta, error) { ...@@ -190,7 +190,7 @@ func newSQLMeta(driver, addr string, conf *Config) (Meta, error) {
of: newOpenFiles(conf.OpenCache), of: newOpenFiles(conf.OpenCache),
removedFiles: make(map[Ino]bool), removedFiles: make(map[Ino]bool),
compacting: make(map[uint64]bool), compacting: make(map[uint64]bool),
deleting: make(chan int, 2), deleting: make(chan int, conf.MaxDeletes),
symlinks: &sync.Map{}, symlinks: &sync.Map{},
msgCallbacks: &msgCallbacks{ msgCallbacks: &msgCallbacks{
callbacks: make(map[uint32]MsgCallback), callbacks: make(map[uint32]MsgCallback),
...@@ -2261,6 +2261,9 @@ func (m *dbMeta) cleanupSlices() { ...@@ -2261,6 +2261,9 @@ func (m *dbMeta) cleanupSlices() {
} }
func (m *dbMeta) deleteSlice(chunkid uint64, size uint32) { func (m *dbMeta) deleteSlice(chunkid uint64, size uint32) {
if m.conf.MaxDeletes == 0 {
return
}
m.deleting <- 1 m.deleting <- 1
defer func() { <-m.deleting }() defer func() { <-m.deleting }()
err := m.newMsg(DeleteChunk, chunkid, size) err := m.newMsg(DeleteChunk, chunkid, size)
......
...@@ -53,7 +53,7 @@ func resetDB(m *dbMeta) { ...@@ -53,7 +53,7 @@ func resetDB(m *dbMeta) {
func TestSQLiteClient(t *testing.T) { func TestSQLiteClient(t *testing.T) {
tmp := tempFile(t) tmp := tempFile(t)
defer os.Remove(tmp) defer os.Remove(tmp)
m, err := newSQLMeta("sqlite3", tmp, &Config{}) m, err := newSQLMeta("sqlite3", tmp, &Config{MaxDeletes: 1})
if err != nil { if err != nil {
t.Fatalf("create meta: %s", err) t.Fatalf("create meta: %s", err)
} }
...@@ -61,7 +61,7 @@ func TestSQLiteClient(t *testing.T) { ...@@ -61,7 +61,7 @@ func TestSQLiteClient(t *testing.T) {
} }
func TestMySQLClient(t *testing.T) { func TestMySQLClient(t *testing.T) {
m, err := newSQLMeta("mysql", "root:@/dev", &Config{}) m, err := newSQLMeta("mysql", "root:@/dev", &Config{MaxDeletes: 1})
if err != nil { if err != nil {
t.Fatalf("create meta: %s", err) t.Fatalf("create meta: %s", err)
} }
...@@ -80,7 +80,7 @@ func TestMySQLClient(t *testing.T) { ...@@ -80,7 +80,7 @@ func TestMySQLClient(t *testing.T) {
} }
func TestPostgresQLClient(t *testing.T) { func TestPostgresQLClient(t *testing.T) {
m, err := newSQLMeta("postgres", "localhost:5432/test?sslmode=disable", &Config{}) m, err := newSQLMeta("postgres", "localhost:5432/test?sslmode=disable", &Config{MaxDeletes: 1})
if err != nil { if err != nil {
t.Fatalf("create meta: %s", err) t.Fatalf("create meta: %s", err)
} }
...@@ -121,7 +121,7 @@ func TestLocksSQLite(t *testing.T) { ...@@ -121,7 +121,7 @@ func TestLocksSQLite(t *testing.T) {
func TestConcurrentWriteSQLite(t *testing.T) { func TestConcurrentWriteSQLite(t *testing.T) {
tmp := tempFile(t) tmp := tempFile(t)
defer os.Remove(tmp) defer os.Remove(tmp)
m, err := newSQLMeta("sqlite3", tmp, &Config{}) m, err := newSQLMeta("sqlite3", tmp, &Config{MaxDeletes: 1})
if err != nil { if err != nil {
t.Fatalf("create meta: %s", err) t.Fatalf("create meta: %s", err)
} }
...@@ -131,7 +131,7 @@ func TestConcurrentWriteSQLite(t *testing.T) { ...@@ -131,7 +131,7 @@ func TestConcurrentWriteSQLite(t *testing.T) {
func TestCompactionSQLite(t *testing.T) { func TestCompactionSQLite(t *testing.T) {
tmp := tempFile(t) tmp := tempFile(t)
defer os.Remove(tmp) defer os.Remove(tmp)
m, err := newSQLMeta("sqlite3", tmp, &Config{}) m, err := newSQLMeta("sqlite3", tmp, &Config{MaxDeletes: 1})
if err != nil { if err != nil {
t.Fatalf("create meta: %s", err) t.Fatalf("create meta: %s", err)
} }
...@@ -141,7 +141,7 @@ func TestCompactionSQLite(t *testing.T) { ...@@ -141,7 +141,7 @@ func TestCompactionSQLite(t *testing.T) {
func TestTruncateAndDeleteSQLite(t *testing.T) { func TestTruncateAndDeleteSQLite(t *testing.T) {
tmp := tempFile(t) tmp := tempFile(t)
defer os.Remove(tmp) defer os.Remove(tmp)
m, err := newSQLMeta("sqlite3", tmp, &Config{}) m, err := newSQLMeta("sqlite3", tmp, &Config{MaxDeletes: 1})
if err != nil { if err != nil {
t.Fatalf("create meta: %s", err) t.Fatalf("create meta: %s", err)
} }
......
...@@ -97,7 +97,7 @@ func newKVMeta(driver, addr string, conf *Config) (Meta, error) { ...@@ -97,7 +97,7 @@ func newKVMeta(driver, addr string, conf *Config) (Meta, error) {
of: newOpenFiles(conf.OpenCache), of: newOpenFiles(conf.OpenCache),
removedFiles: make(map[Ino]bool), removedFiles: make(map[Ino]bool),
compacting: make(map[uint64]bool), compacting: make(map[uint64]bool),
deleting: make(chan int, 2), deleting: make(chan int, conf.MaxDeletes),
symlinks: &sync.Map{}, symlinks: &sync.Map{},
msgCallbacks: &msgCallbacks{ msgCallbacks: &msgCallbacks{
callbacks: make(map[uint32]MsgCallback), callbacks: make(map[uint32]MsgCallback),
...@@ -2183,6 +2183,9 @@ func (r *kvMeta) cleanupZeroRef(chunkid uint64, size uint32) { ...@@ -2183,6 +2183,9 @@ func (r *kvMeta) cleanupZeroRef(chunkid uint64, size uint32) {
} }
func (m *kvMeta) deleteSlice(chunkid uint64, size uint32) { func (m *kvMeta) deleteSlice(chunkid uint64, size uint32) {
if m.conf.MaxDeletes == 0 {
return
}
m.deleting <- 1 m.deleting <- 1
defer func() { <-m.deleting }() defer func() { <-m.deleting }()
err := m.newMsg(DeleteChunk, chunkid, size) err := m.newMsg(DeleteChunk, chunkid, size)
......
...@@ -48,7 +48,7 @@ func TestMemKV(t *testing.T) { ...@@ -48,7 +48,7 @@ func TestMemKV(t *testing.T) {
} }
func TestTKVClient(t *testing.T) { func TestTKVClient(t *testing.T) {
m, err := newKVMeta("memkv", "test/jfs", &Config{}) m, err := newKVMeta("memkv", "test/jfs", &Config{MaxDeletes: 1})
// m, err := newKVMeta("tikv", "127.0.0.1:2379/jfs", &Config{}) // m, err := newKVMeta("tikv", "127.0.0.1:2379/jfs", &Config{})
if err != nil { if err != nil {
t.Fatalf("create meta: %s", err) t.Fatalf("create meta: %s", err)
......
...@@ -228,6 +228,7 @@ type javaConf struct { ...@@ -228,6 +228,7 @@ type javaConf struct {
UploadLimit int `json:"uploadLimit"` UploadLimit int `json:"uploadLimit"`
DownloadLimit int `json:"downloadLimit"` DownloadLimit int `json:"downloadLimit"`
MaxUploads int `json:"maxUploads"` MaxUploads int `json:"maxUploads"`
MaxDeletes int `json:"maxDeletes"`
GetTimeout int `json:"getTimeout"` GetTimeout int `json:"getTimeout"`
PutTimeout int `json:"putTimeout"` PutTimeout int `json:"putTimeout"`
FastResolve bool `json:"fastResolve"` FastResolve bool `json:"fastResolve"`
...@@ -318,10 +319,11 @@ func jfs_init(cname, jsonConf, user, group, superuser, supergroup *C.char) uintp ...@@ -318,10 +319,11 @@ func jfs_init(cname, jsonConf, user, group, superuser, supergroup *C.char) uintp
addr := jConf.MetaURL addr := jConf.MetaURL
m := meta.NewClient(addr, &meta.Config{ m := meta.NewClient(addr, &meta.Config{
Retries: 10, Retries: 10,
Strict: true, Strict: true,
ReadOnly: jConf.ReadOnly, ReadOnly: jConf.ReadOnly,
OpenCache: time.Duration(jConf.OpenCache * 1e9), OpenCache: time.Duration(jConf.OpenCache * 1e9),
MaxDeletes: jConf.MaxDeletes,
}) })
format, err := m.Load() format, err := m.Load()
if err != nil { if err != nil {
......
...@@ -321,6 +321,7 @@ public class JuiceFileSystemImpl extends FileSystem { ...@@ -321,6 +321,7 @@ public class JuiceFileSystemImpl extends FileSystem {
obj.put("metacache", Boolean.valueOf(getConf(conf, "metacache", "true"))); obj.put("metacache", Boolean.valueOf(getConf(conf, "metacache", "true")));
obj.put("autoCreate", Boolean.valueOf(getConf(conf, "auto-create-cache-dir", "true"))); obj.put("autoCreate", Boolean.valueOf(getConf(conf, "auto-create-cache-dir", "true")));
obj.put("maxUploads", Integer.valueOf(getConf(conf, "max-uploads", "20"))); obj.put("maxUploads", Integer.valueOf(getConf(conf, "max-uploads", "20")));
obj.put("maxDeletes", Integer.valueOf(getConf(conf, "max-deletes", "2")));
obj.put("uploadLimit", Integer.valueOf(getConf(conf, "upload-limit", "0"))); obj.put("uploadLimit", Integer.valueOf(getConf(conf, "upload-limit", "0")));
obj.put("downloadLimit", Integer.valueOf(getConf(conf, "download-limit", "0"))); obj.put("downloadLimit", Integer.valueOf(getConf(conf, "download-limit", "0")));
obj.put("getTimeout", Integer.valueOf(getConf(conf, "get-timeout", getConf(conf, "object-timeout", "5")))); obj.put("getTimeout", Integer.valueOf(getConf(conf, "get-timeout", getConf(conf, "object-timeout", "5"))));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册