From 38266b8569b6b8bf25b3dd199611a363e80d3c38 Mon Sep 17 00:00:00 2001 From: Xiangyu Wang Date: Thu, 25 Feb 2021 17:44:29 +0800 Subject: [PATCH] Lock channel at produce and consume Signed-off-by: Xiangyu Wang --- cmd/singlenode/main.go | 3 +++ internal/util/rocksmq/rocksmq.go | 13 +++++++------ 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/cmd/singlenode/main.go b/cmd/singlenode/main.go index 76eb70e09..c204f8d22 100644 --- a/cmd/singlenode/main.go +++ b/cmd/singlenode/main.go @@ -1,6 +1,8 @@ package main import ( + "os" + "github.com/zilliztech/milvus-distributed/cmd/distributed/roles" ) @@ -20,5 +22,6 @@ func initRoles(roles *roles.MilvusRoles) { func main() { var roles roles.MilvusRoles initRoles(&roles) + os.Setenv("QUERY_NODE_ID", "1") roles.Run(true) } diff --git a/internal/util/rocksmq/rocksmq.go b/internal/util/rocksmq/rocksmq.go index 83f0de46c..4515d9468 100644 --- a/internal/util/rocksmq/rocksmq.go +++ b/internal/util/rocksmq/rocksmq.go @@ -75,8 +75,7 @@ type RocksMQ struct { channels map[string]*Channel cgCtxs map[string]ConsumerGroupContext idAllocator allocator.GIDAllocator - produceMu sync.Mutex - consumeMu sync.Mutex + channelMu map[string]*sync.Mutex notify map[string][]*Consumer //ctx context.Context @@ -110,6 +109,7 @@ func NewRocksMQ(name string, idAllocator allocator.GIDAllocator) (*RocksMQ, erro } rmq.channels = make(map[string]*Channel) rmq.notify = make(map[string][]*Consumer) + rmq.channelMu = make(map[string]*sync.Mutex) return rmq, nil } @@ -148,6 +148,7 @@ func (rmq *RocksMQ) CreateChannel(channelName string) error { endOffset: 0, } rmq.channels[channelName] = channel + rmq.channelMu[channelName] = new(sync.Mutex) return nil } @@ -200,8 +201,8 @@ func (rmq *RocksMQ) DestroyConsumerGroup(groupName string, channelName string) e } func (rmq *RocksMQ) Produce(channelName string, messages []ProducerMessage) error { - rmq.produceMu.Lock() - defer rmq.produceMu.Unlock() + rmq.channelMu[channelName].Lock() + defer rmq.channelMu[channelName].Unlock() msgLen := len(messages) idStart, idEnd, err := rmq.idAllocator.Alloc(uint32(msgLen)) @@ -260,8 +261,8 @@ func (rmq *RocksMQ) Produce(channelName string, messages []ProducerMessage) erro } func (rmq *RocksMQ) Consume(groupName string, channelName string, n int) ([]ConsumerMessage, error) { - rmq.consumeMu.Lock() - defer rmq.consumeMu.Unlock() + rmq.channelMu[channelName].Lock() + defer rmq.channelMu[channelName].Unlock() metaKey := groupName + "/" + channelName + "/current_id" currentID, err := rmq.kv.Load(metaKey) if err != nil { -- GitLab