未验证 提交 9e1667e0 编写于 作者: 祝威廉's avatar 祝威廉 提交者: GitHub

Delay upload staging file when writeback enabled with new parameter delay-upload (#736)

fix validateParams in Config

scan pendingkeys instead of staging directory

refractor

fix typo

fix some erros

change cleanDelayStaging interval to 1m

make sure staging file clean before cache evication

upload block > delay-upload && check disk

fix some issues

add license header for alg.go

fix Using goroutines on loop iterator variables

foreach pendingkeys with maxLen limit

foreach pendingkeys with maxLen limit
上级 8a2661e3
......@@ -113,16 +113,21 @@ func mount(c *cli.Context) error {
prometheus.DefaultRegisterer = prometheus.WrapRegistererWith(metricLabels,
prometheus.WrapRegistererWithPrefix("juicefs_", prometheus.DefaultRegisterer))
if !c.Bool("writeback") && c.IsSet("delay-upload"){
logger.Warnf("writeback should be set true before congiure deploy-upload")
}
chunkConf := chunk.Config{
BlockSize: format.BlockSize * 1024,
Compress: format.Compression,
GetTimeout: time.Second * time.Duration(c.Int("get-timeout")),
PutTimeout: time.Second * time.Duration(c.Int("put-timeout")),
MaxUpload: c.Int("max-uploads"),
Writeback: c.Bool("writeback"),
Prefetch: c.Int("prefetch"),
BufferSize: c.Int("buffer-size") << 20,
GetTimeout: time.Second * time.Duration(c.Int("get-timeout")),
PutTimeout: time.Second * time.Duration(c.Int("put-timeout")),
MaxUpload: c.Int("max-uploads"),
Writeback: c.Bool("writeback"),
DelayUpload: c.Duration("delay-upload"),
Prefetch: c.Int("prefetch"),
BufferSize: c.Int("buffer-size") << 20,
CacheDir: c.String("cache-dir"),
CacheSize: int64(c.Int("cache-size")),
......@@ -131,6 +136,7 @@ func mount(c *cli.Context) error {
CacheFullBlock: !c.Bool("cache-partial-only"),
AutoCreate: true,
}
if chunkConf.CacheDir != "memory" {
ds := utils.SplitDir(chunkConf.CacheDir)
for i := range ds {
......@@ -293,6 +299,11 @@ func clientFlags() []cli.Flag {
Name: "writeback",
Usage: "upload objects in background",
},
&cli.DurationFlag{
Name: "delay-upload",
Value: time.Duration(0),
Usage: "upload objects with delay time.Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"",
},
&cli.StringFlag{
Name: "cache-dir",
Value: defaultCacheDir,
......
/*
* JuiceFS, Copyright (C) 2020 Juicedata, Inc.
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package chunk
func TwoRandomEvict(keys map[string]cacheItem, goal int64) *[]string {
var todel []string
var freed int64
var cnt int
var lastKey string
var lastValue cacheItem
// for each two random keys, then compare the access time, evict the older one
for key, value := range keys {
if cnt == 0 || lastValue.atime > value.atime {
lastKey = key
lastValue = value
}
cnt++
if cnt > 1 {
delete(keys, lastKey)
freed += int64(lastValue.size + 4096)
todel = append(todel, lastKey)
cnt = 0
if freed > goal {
break
}
}
}
return &todel
}
......@@ -437,7 +437,7 @@ func (c *wChunk) asyncUpload(key string, block *Page, stagingPath string) {
// release the memory and wait
block.Release()
c.store.pendingMutex.Lock()
c.store.pendingKeys[key] = true
c.store.pendingKeys[key] = int32(blockSize)
c.store.pendingMutex.Unlock()
defer func() {
c.store.pendingMutex.Lock()
......@@ -454,7 +454,7 @@ func (c *wChunk) asyncUpload(key string, block *Page, stagingPath string) {
c.store.pendingMutex.Lock()
ok := c.store.pendingKeys[key]
c.store.pendingMutex.Unlock()
if ok {
if ok > 0 {
logger.Errorf("read stagging file %s: %s", stagingPath, err)
} else {
logger.Debugf("%s is not needed, drop it", key)
......@@ -530,7 +530,13 @@ func (c *wChunk) upload(indx int) {
c.syncUpload(key, block)
} else {
c.errors <- nil
go c.asyncUpload(key, block, stagingPath)
if c.store.conf.DelayUpload.Seconds() == 0 {
go c.asyncUpload(key, block, stagingPath)
} else {
c.store.pendingMutex.Lock()
c.store.pendingKeys[key] = int32(blen)
c.store.pendingMutex.Unlock()
}
}
} else {
c.syncUpload(key, block)
......@@ -604,6 +610,7 @@ type Config struct {
Compress string
MaxUpload int
Writeback bool
DelayUpload time.Duration
Partitions int
BlockSize int
GetTimeout time.Duration
......@@ -621,7 +628,7 @@ type cachedStore struct {
conf Config
group *Controller
currentUpload chan bool
pendingKeys map[string]bool
pendingKeys map[string]int32
pendingMutex sync.Mutex
compressor compress.Compressor
seekable bool
......@@ -698,7 +705,7 @@ func NewCachedStore(storage object.ObjectStorage, config Config) ChunkStore {
compressor: compressor,
seekable: compressor.CompressBound(0) == 0,
bcache: newCacheManager(&config),
pendingKeys: make(map[string]bool),
pendingKeys: make(map[string]int32),
group: &Controller{},
}
if config.CacheSize == 0 {
......@@ -742,6 +749,14 @@ func NewCachedStore(storage object.ObjectStorage, config Config) ChunkStore {
return float64(used)
}))
go store.uploadStaging()
if store.conf.CacheDir != "memory" {
go func() {
for {
store.cleanDelayStaging()
time.Sleep(time.Minute)
}
}()
}
return store
}
......@@ -755,6 +770,118 @@ func parseObjOrigSize(key string) int {
return l
}
func (store *cachedStore) uploadStagingFile(key string, stagingPath string) int {
block, err := ioutil.ReadFile(stagingPath)
if err != nil {
logger.Errorf("open %s: %s", stagingPath, err)
return 0
}
buf := make([]byte, store.compressor.CompressBound(len(block)))
n, err := store.compressor.Compress(buf, block)
if err != nil {
logger.Errorf("compress chunk %s: %s", stagingPath, err)
return 0
}
compressed := buf[:n]
if strings.Count(key, "_") == 1 {
// add size at the end
key = fmt.Sprintf("%s_%d", key, len(block))
}
try := 0
for {
err := store.storage.Put(key, bytes.NewReader(compressed))
if err == nil {
break
}
logger.Infof("upload %s: %s (try %d)", key, err, try)
try++
time.Sleep(time.Second * time.Duration(try*try))
}
store.bcache.uploaded(key, len(block))
os.Remove(stagingPath)
return len(block)
}
func computeFreeGoal(innerStore *cacheStore, br float32, freeRatio float32) int64 {
total, _, _, _ := getDiskUsage(innerStore.dir)
var goal int64
toFree := int64(float32(total) * (freeRatio - br))
if toFree > innerStore.used {
goal = innerStore.used
} else {
goal = toFree
}
return goal
}
func (store *cachedStore) cleanDelayStaging() {
cacheStoreKeys := make(map[*cacheStore]map[string]cacheItem)
var maxLen = len(store.pendingKeys)
if maxLen > 10000 {
maxLen = 10000
}
store.pendingMutex.Lock()
var count = 0
for key, blockSize := range store.pendingKeys {
if count > maxLen {
break
}
innerStore := store.bcache.getStore(key)
if _, ok := cacheStoreKeys[innerStore]; !ok {
cacheStoreKeys[innerStore] = make(map[string]cacheItem)
}
innerStore.Lock()
cacheStoreKeys[innerStore][key] = cacheItem{
size: blockSize,
atime: innerStore.keys[key].atime,
}
innerStore.Unlock()
count += 1
}
store.pendingMutex.Unlock()
uploadByKey := func(key string, stagingPath string) {
store.currentUpload <- true
go func(key string, stagingPath string) {
store.uploadStagingFile(key, stagingPath)
store.pendingMutex.Lock()
delete(store.pendingKeys, key)
store.pendingMutex.Unlock()
<-store.currentUpload
}(key, stagingPath)
}
for innerStore, keys := range cacheStoreKeys {
go func(innerStore *cacheStore, keys map[string]cacheItem) {
// clean all block atime > delay-upload
leftKeys := make(map[string]cacheItem)
for key, ci := range keys {
isTimeout := time.Now().Unix()-int64(ci.atime) > int64(store.conf.DelayUpload.Seconds())
if isTimeout {
uploadByKey(key, innerStore.stagePath(key))
} else {
leftKeys[key] = ci
}
}
// if the free disk < freeRatio, try to remove old block
br, fr := innerStore.curFreeRatio()
freeRatio := innerStore.freeRatio * 1.5
if br >= freeRatio && fr >= freeRatio {
return
}
goal := computeFreeGoal(innerStore, br, freeRatio)
toDelKeys := TwoRandomEvict(leftKeys, goal)
for _, key := range *toDelKeys {
uploadByKey(key, innerStore.stagePath(key))
}
}(innerStore, keys)
}
}
func (store *cachedStore) uploadStaging() {
staging := store.bcache.scanStaging()
for key, path := range staging {
......@@ -763,35 +890,7 @@ func (store *cachedStore) uploadStaging() {
defer func() {
<-store.currentUpload
}()
block, err := ioutil.ReadFile(stagingPath)
if err != nil {
logger.Errorf("open %s: %s", stagingPath, err)
return
}
buf := make([]byte, store.compressor.CompressBound(len(block)))
n, err := store.compressor.Compress(buf, block)
if err != nil {
logger.Errorf("compress chunk %s: %s", stagingPath, err)
return
}
compressed := buf[:n]
if strings.Count(key, "_") == 1 {
// add size at the end
key = fmt.Sprintf("%s_%d", key, len(block))
}
try := 0
for {
err := store.storage.Put(key, bytes.NewReader(compressed))
if err == nil {
break
}
logger.Infof("upload %s: %s (try %d)", key, err, try)
try++
time.Sleep(time.Second * time.Duration(try*try))
}
store.bcache.uploaded(key, len(block))
os.Remove(stagingPath)
store.uploadStagingFile(key, stagingPath)
}(key, path)
}
}
......
......@@ -513,6 +513,7 @@ type CacheManager interface {
scanStaging() map[string]string
stats() (int64, int64)
usedMemory() int64
getStore(key string) *cacheStore
}
func newCacheManager(config *Config) CacheManager {
......
......@@ -41,6 +41,10 @@ func newMemStore(config *Config) *memcache {
return c
}
func (m *memcache) getStore(key string) *cacheStore {
return nil
}
func (c *memcache) usedMemory() int64 {
c.Lock()
defer c.Unlock()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册