提交 38266b85 编写于 作者: X Xiangyu Wang 提交者: yefu.chen

Lock channel at produce and consume

Signed-off-by: NXiangyu Wang <xiangyu.wang@zilliz.com>
上级 882c7aaa
package main
import (
"os"
"github.com/zilliztech/milvus-distributed/cmd/distributed/roles"
)
......@@ -20,5 +22,6 @@ func initRoles(roles *roles.MilvusRoles) {
func main() {
var roles roles.MilvusRoles
initRoles(&roles)
os.Setenv("QUERY_NODE_ID", "1")
roles.Run(true)
}
......@@ -75,8 +75,7 @@ type RocksMQ struct {
channels map[string]*Channel
cgCtxs map[string]ConsumerGroupContext
idAllocator allocator.GIDAllocator
produceMu sync.Mutex
consumeMu sync.Mutex
channelMu map[string]*sync.Mutex
notify map[string][]*Consumer
//ctx context.Context
......@@ -110,6 +109,7 @@ func NewRocksMQ(name string, idAllocator allocator.GIDAllocator) (*RocksMQ, erro
}
rmq.channels = make(map[string]*Channel)
rmq.notify = make(map[string][]*Consumer)
rmq.channelMu = make(map[string]*sync.Mutex)
return rmq, nil
}
......@@ -148,6 +148,7 @@ func (rmq *RocksMQ) CreateChannel(channelName string) error {
endOffset: 0,
}
rmq.channels[channelName] = channel
rmq.channelMu[channelName] = new(sync.Mutex)
return nil
}
......@@ -200,8 +201,8 @@ func (rmq *RocksMQ) DestroyConsumerGroup(groupName string, channelName string) e
}
func (rmq *RocksMQ) Produce(channelName string, messages []ProducerMessage) error {
rmq.produceMu.Lock()
defer rmq.produceMu.Unlock()
rmq.channelMu[channelName].Lock()
defer rmq.channelMu[channelName].Unlock()
msgLen := len(messages)
idStart, idEnd, err := rmq.idAllocator.Alloc(uint32(msgLen))
......@@ -260,8 +261,8 @@ func (rmq *RocksMQ) Produce(channelName string, messages []ProducerMessage) erro
}
func (rmq *RocksMQ) Consume(groupName string, channelName string, n int) ([]ConsumerMessage, error) {
rmq.consumeMu.Lock()
defer rmq.consumeMu.Unlock()
rmq.channelMu[channelName].Lock()
defer rmq.channelMu[channelName].Unlock()
metaKey := groupName + "/" + channelName + "/current_id"
currentID, err := rmq.kv.Load(metaKey)
if err != nil {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册