未验证 提交 a04b66af 编写于 作者: D Davies Liu 提交者: GitHub

Sync stagging block in writeback mode (#255)

上级 75227abc
......@@ -163,12 +163,12 @@ func (g *GateWay) NewGatewayLayer(creds auth.Credentials) (minio.ObjectLayer, er
BlockSize: format.BlockSize * 1024,
Compress: format.Compression,
GetTimeout: time.Second * time.Duration(c.Int("get-timeout")),
PutTimeout: time.Second * time.Duration(c.Int("put-timeout")),
MaxUpload: c.Int("max-uploads"),
AsyncUpload: c.Bool("writeback"),
Prefetch: c.Int("prefetch"),
BufferSize: c.Int("buffer-size") << 20,
GetTimeout: time.Second * time.Duration(c.Int("get-timeout")),
PutTimeout: time.Second * time.Duration(c.Int("put-timeout")),
MaxUpload: c.Int("max-uploads"),
Writeback: c.Bool("writeback"),
Prefetch: c.Int("prefetch"),
BufferSize: c.Int("buffer-size") << 20,
CacheDir: c.String("cache-dir"),
CacheSize: int64(c.Int("cache-size")),
......
......@@ -159,12 +159,12 @@ func mount(c *cli.Context) error {
BlockSize: format.BlockSize * 1024,
Compress: format.Compression,
GetTimeout: time.Second * time.Duration(c.Int("get-timeout")),
PutTimeout: time.Second * time.Duration(c.Int("put-timeout")),
MaxUpload: c.Int("max-uploads"),
AsyncUpload: c.Bool("writeback"),
Prefetch: c.Int("prefetch"),
BufferSize: c.Int("buffer-size") << 20,
GetTimeout: time.Second * time.Duration(c.Int("get-timeout")),
PutTimeout: time.Second * time.Duration(c.Int("put-timeout")),
MaxUpload: c.Int("max-uploads"),
Writeback: c.Bool("writeback"),
Prefetch: c.Int("prefetch"),
BufferSize: c.Int("buffer-size") << 20,
CacheDir: c.String("cache-dir"),
CacheSize: int64(c.Int("cache-size")),
......
......@@ -464,7 +464,7 @@ func (c *wChunk) upload(indx int) {
logger.Fatalf("block length does not match: %v != %v", off, blen)
}
}
if c.store.conf.AsyncUpload {
if c.store.conf.Writeback {
stagingPath, err := c.store.bcache.stage(key, block.Data, c.store.shouldCache(blen))
if err != nil {
logger.Warnf("write %s to disk: %s, upload it directly", stagingPath, err)
......@@ -541,7 +541,7 @@ type Config struct {
AutoCreate bool
Compress string
MaxUpload int
AsyncUpload bool
Writeback bool
Partitions int
BlockSize int
UploadLimit int
......
......@@ -143,7 +143,7 @@ func (cache *cacheStore) curFreeRatio() (float32, float32) {
return float32(free) / float32(total), float32(ffree) / float32(files)
}
func (cache *cacheStore) flushPage(path string, data []byte) error {
func (cache *cacheStore) flushPage(path string, data []byte, sync bool) error {
cache.createDir(filepath.Dir(path))
tmp := path + ".tmp"
f, err := os.OpenFile(tmp, os.O_WRONLY|os.O_CREATE, cache.mode)
......@@ -158,6 +158,15 @@ func (cache *cacheStore) flushPage(path string, data []byte) error {
_ = os.Remove(tmp)
return err
}
if sync {
err = f.Sync()
if err != nil {
logger.Warnf("sync stagging file %s: %s", tmp, err)
_ = f.Close()
_ = os.Remove(tmp)
return err
}
}
err = f.Close()
if err != nil {
logger.Infof("Close cache file %s: %s", tmp, err)
......@@ -238,7 +247,7 @@ func (cache *cacheStore) flush() {
for {
w := <-cache.pending
path := cache.cachePath(w.key)
if cache.capacity > 0 && cache.flushPage(path, w.page.Data) == nil {
if cache.capacity > 0 && cache.flushPage(path, w.page.Data, false) == nil {
cache.add(w.key, int32(len(w.page.Data)), uint32(time.Now().Unix()))
}
cache.Lock()
......@@ -270,7 +279,7 @@ func (cache *cacheStore) add(key string, size int32, atime uint32) {
func (cache *cacheStore) stage(key string, data []byte, keepCache bool) (string, error) {
stagingPath := cache.stagePath(key)
err := cache.flushPage(stagingPath, data)
err := cache.flushPage(stagingPath, data, true)
if err == nil && cache.capacity > 0 && keepCache {
path := cache.cachePath(key)
cache.createDir(filepath.Dir(path))
......
......@@ -102,7 +102,7 @@ func TestAsyncStore(t *testing.T) {
f, _ := os.Create(p)
f.WriteString("good")
f.Close()
conf.AsyncUpload = true
conf.Writeback = true
conf.UploadLimit = 0
_ = NewCachedStore(mem, conf)
time.Sleep(time.Millisecond * 10) // wait for scan to finish
......
......@@ -281,7 +281,7 @@ func jfs_init(cname, jsonConf, user, group, superuser, supergroup *C.char) uintp
CacheFullBlock: jConf.CacheFullBlock,
MaxUpload: jConf.MaxUploads,
Prefetch: 3,
AsyncUpload: jConf.Writeback,
Writeback: jConf.Writeback,
Partitions: format.Partitions,
UploadLimit: jConf.UploadLimit,
GetTimeout: time.Second * time.Duration(jConf.GetTimeout),
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册