未验证 提交 f92d223b 编写于 作者: D Davies Liu 提交者: GitHub

fix broken block cache (#47)

上级 62542c5f
......@@ -70,19 +70,6 @@ func (c *rChunk) index(off int) int {
return off / c.store.conf.BlockSize
}
func (c *rChunk) loadPage(ctx context.Context, indx int) (b *Page, err error) {
key := c.key(indx)
var block []byte
for i := 0; i < 3 && block == nil; i++ {
block, err = c.store.Get(key)
time.Sleep(time.Second * time.Duration(i*i))
}
if err != nil {
return nil, err
}
return NewPage(block), nil
}
func (c *rChunk) ReadAt(ctx context.Context, page *Page, off int) (n int, err error) {
p := page.Data
if len(p) == 0 {
......@@ -132,53 +119,44 @@ func (c *rChunk) ReadAt(ctx context.Context, page *Page, off int) (n int, err er
}
}
if !c.store.shouldCache(len(p)) {
if c.store.seekable && boff > 0 && len(p) <= blockSize/4 {
// partial read
st := time.Now()
in, err := c.store.storage.Get(key, int64(boff), int64(len(p)))
used := time.Since(st)
logger.Debugf("GET %s RANGE(%d,%d) (%s, %.3fs)", key, boff, len(p), err, used.Seconds())
if used > SlowRequest {
logger.Infof("slow request: GET %s (%s, %.3fs)", key, err, used.Seconds())
}
c.store.fetcher.fetch(key)
if err == nil {
defer in.Close()
return io.ReadFull(in, p)
}
}
block, err := c.store.group.Execute(key, func() (*Page, error) {
tmp := page
if boff > 0 || len(p) < blockSize {
tmp = NewOffPage(blockSize)
} else {
tmp.Acquire()
}
tmp.Acquire()
err := withTimeout(func() error {
defer tmp.Release()
return c.store.load(key, tmp, c.store.shouldCache(blockSize))
}, c.store.conf.GetTimeout)
return tmp, err
})
defer block.Release()
if err != nil {
return 0, err
if c.store.seekable && boff > 0 && len(p) <= blockSize/4 {
// partial read
st := time.Now()
in, err := c.store.storage.Get(key, int64(boff), int64(len(p)))
used := time.Since(st)
logger.Debugf("GET %s RANGE(%d,%d) (%s, %.3fs)", key, boff, len(p), err, used.Seconds())
if used > SlowRequest {
logger.Infof("slow request: GET %s (%s, %.3fs)", key, err, used.Seconds())
}
if block != page {
copy(p, block.Data[boff:])
c.store.fetcher.fetch(key)
if err == nil {
defer in.Close()
return io.ReadFull(in, p)
}
return len(p), nil
}
block, err := c.loadPage(ctx, indx)
block, err := c.store.group.Execute(key, func() (*Page, error) {
tmp := page
if boff > 0 || len(p) < blockSize {
tmp = NewOffPage(blockSize)
} else {
tmp.Acquire()
}
tmp.Acquire()
err := withTimeout(func() error {
defer tmp.Release()
return c.store.load(key, tmp, c.store.shouldCache(blockSize))
}, c.store.conf.GetTimeout)
return tmp, err
})
defer block.Release()
if err != nil {
return 0, err
}
defer block.Release()
n = copy(p, block.Data[boff:])
return n, nil
if block != page {
copy(p, block.Data[boff:])
}
return len(p), nil
}
func (c *rChunk) delete(indx int) error {
......@@ -639,47 +617,6 @@ func (store *cachedStore) load(key string, page *Page, cache bool) (err error) {
return nil
}
func (store *cachedStore) Get(key string) (result []byte, err error) {
err = withTimeout(func() error {
var boff, limit int
if strings.Contains(key, ",") {
parts := strings.SplitN(key, ",", 3)
key = parts[0]
boff, _ = strconv.Atoi(parts[1])
limit, _ = strconv.Atoi(parts[2])
}
size := parseObjOrigSize(key)
if size == 0 || size > store.conf.BlockSize {
logger.Fatalf("Invalid key: %s", key)
}
if limit == 0 {
limit = size
}
r, err := store.bcache.load(key)
if err == nil {
// TODO: use page
block := make([]byte, limit)
n, err := r.ReadAt(block, int64(boff))
r.Close()
if err == nil {
result = block
return nil
}
if f, ok := r.(*os.File); ok {
logger.Errorf("short chunk %s: %d < %d", key, n, size)
os.Remove(f.Name())
}
}
block := make([]byte, size)
err = store.load(key, NewPage(block), true)
if err == nil {
result = block[boff : boff+limit]
}
return err
}, store.conf.GetTimeout)
return result, err
}
// NewCachedStore create a cached store.
func NewCachedStore(storage object.ObjectStorage, config Config) ChunkStore {
compressor := utils.NewCompressor(config.Compress)
......@@ -706,7 +643,13 @@ func NewCachedStore(storage object.ObjectStorage, config Config) ChunkStore {
config.Prefetch = 0 // disable prefetch if cache is disabled
}
store.fetcher = newPrefetcher(config.Prefetch, func(key string) {
store.Get(key)
size := parseObjOrigSize(key)
if size == 0 || size > store.conf.BlockSize {
return
}
p := NewOffPage(size)
defer p.Release()
store.load(key, p, true)
})
go store.uploadStaging()
return store
......
......@@ -80,6 +80,7 @@ func newCacheStore(dir string, cacheSize int64, limit, pendingPages int, config
if br < c.freeRatio || fr < c.freeRatio {
logger.Warnf("not enough space (%d%%) or inodes (%d%%) for caching: free ratio should be >= %d%%", int(br*100), int(fr*100), int(c.freeRatio*100))
}
go c.flush()
go c.checkFreeSpace()
go c.refreshCacheKeys()
return c
......@@ -230,6 +231,21 @@ func (cache *cacheStore) stagePath(key string) string {
return filepath.Join(cache.dir, stagingDir, key)
}
// flush cached block into disk
func (cache *cacheStore) flush() {
for {
w := <-cache.pending
path := cache.cachePath(w.key)
if cache.capacity > 0 && cache.flushPage(path, w.page.Data) == nil {
cache.add(w.key, int32(len(w.page.Data)), uint32(time.Now().Unix()))
}
cache.Lock()
delete(cache.pages, w.key)
cache.Unlock()
w.page.Release()
}
}
func (cache *cacheStore) add(key string, size int32, atime uint32) {
cache.Lock()
defer cache.Unlock()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册