未验证 提交 232ef575 编写于 作者: D Davies Liu 提交者: GitHub

retry transactions with random backoff (#34)

上级 9f8a7563
......@@ -20,6 +20,7 @@ import (
"encoding/binary"
"encoding/json"
"fmt"
"math/rand"
"os"
"strconv"
"strings"
......@@ -345,12 +346,10 @@ func errno(err error) syscall.Errno {
func (r *redisMeta) txn(txf func(tx *redis.Tx) error, keys ...string) syscall.Errno {
var err error
for i := 0; i < 10; i++ {
for i := 0; i < 50; i++ {
err = r.rdb.Watch(c, txf, keys...)
if err == nil {
return 0
}
if err == redis.TxFailedErr {
time.Sleep(time.Microsecond * 100 * time.Duration(rand.Int()%(i+1)))
continue
}
return errno(err)
......@@ -1230,6 +1229,7 @@ func (r *redisMeta) NewChunk(ctx Context, inode Ino, indx uint32, offset uint32,
}
func (r *redisMeta) Write(ctx Context, inode Ino, indx uint32, off uint32, slice Slice) syscall.Errno {
return r.txn(func(tx *redis.Tx) error {
// TODO: refcount for chunkid
var attr Attr
......@@ -1270,7 +1270,7 @@ func (r *redisMeta) Write(ctx Context, inode Ino, indx uint32, off uint32, slice
go r.compact(inode, indx)
}
return err
}, r.inodeKey(inode), r.chunkKey(inode, indx))
}, r.inodeKey(inode))
}
func (r *redisMeta) delChunks(inode Ino, start, end, maxchunkid uint64) string {
......
......@@ -265,3 +265,42 @@ func TestCompaction(t *testing.T) {
t.Fatalf("size of slice should be 5000, but got %d", chunks[0].Size)
}
}
func TestConcurrentWrite(t *testing.T) {
var conf RedisConfig
m, err := NewRedisMeta("redis://127.0.0.1/9", &conf)
if err != nil {
t.Logf("redis is not available: %s", err)
t.Skip()
}
m.Init(meta.Format{Name: "test"})
ctx := meta.Background
var inode meta.Ino
var attr = &meta.Attr{}
m.Unlink(ctx, 1, "f")
if st := m.Create(ctx, 1, "f", 0650, 022, &inode, attr); st != 0 {
t.Fatalf("create file %s", st)
}
defer m.Unlink(ctx, 1, "f")
var errno syscall.Errno
var g sync.WaitGroup
for i := 0; i <= 8; i++ {
g.Add(1)
go func(indx uint32) {
defer g.Done()
for j := 0; j < 1000; j++ {
var slice = meta.Slice{1, 100, 0, 100}
st := m.Write(ctx, inode, indx, 0, slice)
if st != 0 {
errno = st
break
}
}
}(uint32(i))
}
g.Wait()
if errno != 0 {
t.Fatal()
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册