diff --git a/cmd/singlenode/main.go b/cmd/singlenode/main.go index 76eb70e09bb68846a354f71c0e2ef1c2bf0a822d..c204f8d2234e9e94335da1f4c5a7b73f4de17d11 100644 --- a/cmd/singlenode/main.go +++ b/cmd/singlenode/main.go @@ -1,6 +1,8 @@ package main import ( + "os" + "github.com/zilliztech/milvus-distributed/cmd/distributed/roles" ) @@ -20,5 +22,6 @@ func initRoles(roles *roles.MilvusRoles) { func main() { var roles roles.MilvusRoles initRoles(&roles) + os.Setenv("QUERY_NODE_ID", "1") roles.Run(true) } diff --git a/internal/util/rocksmq/rocksmq.go b/internal/util/rocksmq/rocksmq.go index 83f0de46c5da29bfa9ee7f22e3106c2ca594f488..4515d94689e05a52cc4b6dedb2556b1af41ea9de 100644 --- a/internal/util/rocksmq/rocksmq.go +++ b/internal/util/rocksmq/rocksmq.go @@ -75,8 +75,7 @@ type RocksMQ struct { channels map[string]*Channel cgCtxs map[string]ConsumerGroupContext idAllocator allocator.GIDAllocator - produceMu sync.Mutex - consumeMu sync.Mutex + channelMu map[string]*sync.Mutex notify map[string][]*Consumer //ctx context.Context @@ -110,6 +109,7 @@ func NewRocksMQ(name string, idAllocator allocator.GIDAllocator) (*RocksMQ, erro } rmq.channels = make(map[string]*Channel) rmq.notify = make(map[string][]*Consumer) + rmq.channelMu = make(map[string]*sync.Mutex) return rmq, nil } @@ -148,6 +148,7 @@ func (rmq *RocksMQ) CreateChannel(channelName string) error { endOffset: 0, } rmq.channels[channelName] = channel + rmq.channelMu[channelName] = new(sync.Mutex) return nil } @@ -200,8 +201,8 @@ func (rmq *RocksMQ) DestroyConsumerGroup(groupName string, channelName string) e } func (rmq *RocksMQ) Produce(channelName string, messages []ProducerMessage) error { - rmq.produceMu.Lock() - defer rmq.produceMu.Unlock() + rmq.channelMu[channelName].Lock() + defer rmq.channelMu[channelName].Unlock() msgLen := len(messages) idStart, idEnd, err := rmq.idAllocator.Alloc(uint32(msgLen)) @@ -260,8 +261,8 @@ func (rmq *RocksMQ) Produce(channelName string, messages []ProducerMessage) erro } func (rmq *RocksMQ) Consume(groupName string, channelName string, n int) ([]ConsumerMessage, error) { - rmq.consumeMu.Lock() - defer rmq.consumeMu.Unlock() + rmq.channelMu[channelName].Lock() + defer rmq.channelMu[channelName].Unlock() metaKey := groupName + "/" + channelName + "/current_id" currentID, err := rmq.kv.Load(metaKey) if err != nil {