未验证 提交 7748407e 编写于 作者: D Davies Liu 提交者: GitHub

Add Pessimistic lock to reduce conflict on Redis transaction (#40)

上级 3247ea4b
......@@ -20,6 +20,7 @@ import (
"encoding/binary"
"encoding/json"
"fmt"
"hash/fnv"
"math/rand"
"os"
"strconv"
......@@ -59,8 +60,9 @@ type RedisConfig struct {
type redisMeta struct {
sync.Mutex
conf *RedisConfig
rdb *redis.Client
conf *RedisConfig
rdb *redis.Client
txlocks [1024]sync.Mutex // Pessimistic locks to reduce conflict on Redis
sid int64
openFiles map[Ino]int
......@@ -345,6 +347,11 @@ func errno(err error) syscall.Errno {
func (r *redisMeta) txn(txf func(tx *redis.Tx) error, keys ...string) syscall.Errno {
var err error
var khash = fnv.New32()
_, _ = khash.Write([]byte(keys[0]))
l := &r.txlocks[int(khash.Sum32())%len(r.txlocks)]
l.Lock()
defer l.Unlock()
for i := 0; i < 50; i++ {
err = r.rdb.Watch(c, txf, keys...)
if err == redis.TxFailedErr {
......@@ -1228,7 +1235,6 @@ func (r *redisMeta) NewChunk(ctx Context, inode Ino, indx uint32, offset uint32,
}
func (r *redisMeta) Write(ctx Context, inode Ino, indx uint32, off uint32, slice Slice) syscall.Errno {
return r.txn(func(tx *redis.Tx) error {
// TODO: refcount for chunkid
var attr Attr
......
......@@ -273,7 +273,13 @@ func TestConcurrentWrite(t *testing.T) {
t.Logf("redis is not available: %s", err)
t.Skip()
}
err = m.Init(meta.Format{Name: "test"})
m.OnMsg(meta.DeleteChunk, func(args ...interface{}) error {
return nil
})
m.OnMsg(meta.CompactChunk, func(args ...interface{}) error {
return nil
})
_ = m.Init(meta.Format{Name: "test"})
if err != nil {
t.Fatalf("Failed to initialize meta: %s", err)
}
......@@ -288,7 +294,7 @@ func TestConcurrentWrite(t *testing.T) {
var errno syscall.Errno
var g sync.WaitGroup
for i := 0; i <= 8; i++ {
for i := 0; i <= 20; i++ {
g.Add(1)
go func(indx uint32) {
defer g.Done()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册