提交 833f1d49 编写于 作者: G groot 提交者: yefu.chen

Let singlenode use rocksmq

Signed-off-by: Ngroot <yihua.mo@zilliz.com>
上级 e2d8358c
......@@ -12,6 +12,7 @@ import (
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
"github.com/zilliztech/milvus-distributed/internal/msgstream/rmqms"
"github.com/zilliztech/milvus-distributed/internal/util/rocksmq"
)
func newMsgFactory(localMsg bool) msgstream.Factory {
......@@ -276,4 +277,6 @@ func (mr *MilvusRoles) Run(localMsg bool) {
}
log.Printf("exit msg stream service")
}
defer rocksmq.CloseRocksMQ()
}
......@@ -20,5 +20,5 @@ func initRoles(roles *roles.MilvusRoles) {
func main() {
var roles roles.MilvusRoles
initRoles(&roles)
roles.Run(false)
roles.Run(true)
}
......@@ -146,3 +146,16 @@ func (kv *RocksdbKV) MultiRemove(keys []string) error {
err := kv.db.Write(kv.writeOptions, writeBatch)
return err
}
func (kv *RocksdbKV) MultiSaveAndRemove(saves map[string]string, removals []string) error {
writeBatch := gorocksdb.NewWriteBatch()
defer writeBatch.Clear()
for k, v := range saves {
writeBatch.Put([]byte(k), []byte(v))
}
for _, key := range removals {
writeBatch.Delete([]byte(key))
}
err := kv.db.Write(kv.writeOptions, writeBatch)
return err
}
......@@ -6,6 +6,7 @@ import (
"github.com/mitchellh/mapstructure"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/util/rocksmq"
)
type Factory struct {
......@@ -37,5 +38,7 @@ func NewFactory() msgstream.Factory {
ReceiveBufSize: 64,
RmqBufSize: 64,
}
rocksmq.InitRocksMQ("/tmp/milvus_rdb")
return f
}
......@@ -62,6 +62,7 @@ func newRmqMsgStream(ctx context.Context, receiveBufSize int64, rmqBufSize int64
rmqBufSize: rmqBufSize,
consumerChannels: consumerChannels,
consumerReflects: consumerReflects,
consumerLock: &sync.Mutex{},
}
return stream, nil
......@@ -99,28 +100,25 @@ func (ms *RmqMsgStream) SetRepackFunc(repackFunc RepackFunc) {
func (ms *RmqMsgStream) AsProducer(channels []string) {
for _, channel := range channels {
// TODO(yhz): Here may allow to create an existing channel
if err := rocksmq.Rmq.CreateChannel(channel); err != nil {
errMsg := "Failed to create producer " + channel + ", error = " + err.Error()
panic(errMsg)
err := rocksmq.Rmq.CreateChannel(channel)
if err == nil {
ms.producers = append(ms.producers, channel)
}
ms.producers = append(ms.producers, channel)
}
}
func (ms *RmqMsgStream) AsConsumer(channels []string, groupName string) {
for _, channelName := range channels {
consumer, err := rocksmq.Rmq.CreateConsumerGroup(groupName, channelName)
if err != nil {
panic(err.Error())
if err == nil {
consumer.MsgNum = make(chan int, ms.rmqBufSize)
ms.consumers = append(ms.consumers, *consumer)
ms.consumerChannels = append(ms.consumerChannels, channelName)
ms.consumerReflects = append(ms.consumerReflects, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(consumer.MsgNum),
})
}
consumer.MsgNum = make(chan int, ms.rmqBufSize)
ms.consumers = append(ms.consumers, *consumer)
ms.consumerChannels = append(ms.consumerChannels, channelName)
ms.consumerReflects = append(ms.consumerReflects, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(consumer.MsgNum),
})
}
}
......
......@@ -16,12 +16,13 @@ import (
"context"
"errors"
"fmt"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"io"
"log"
"sync/atomic"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
"github.com/zilliztech/milvus-distributed/internal/msgstream/rmqms"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
queryPb "github.com/zilliztech/milvus-distributed/internal/proto/querypb"
......@@ -371,8 +372,11 @@ func (node *QueryNode) WatchDmChannels(in *queryPb.WatchDmChannelsRequest) (*com
return status, errors.New(errMsg)
}
fgDMMsgStream, ok := node.dataSyncService.dmStream.(*pulsarms.PulsarTtMsgStream)
if !ok {
switch t := node.dataSyncService.dmStream.(type) {
case *pulsarms.PulsarTtMsgStream:
case *rmqms.RmqTtMsgStream:
default:
_ = t
errMsg := "type assertion failed for dm message stream"
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
......@@ -385,7 +389,7 @@ func (node *QueryNode) WatchDmChannels(in *queryPb.WatchDmChannelsRequest) (*com
// add request channel
consumeChannels := in.ChannelIDs
consumeSubName := Params.MsgChannelSubName
fgDMMsgStream.AsConsumer(consumeChannels, consumeSubName)
node.dataSyncService.dmStream.AsConsumer(consumeChannels, consumeSubName)
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
......
package rocksmq
import (
"sync"
rocksdbkv "github.com/zilliztech/milvus-distributed/internal/kv/rocksdb"
)
var Rmq *RocksMQ
var once sync.Once
type Consumer struct {
GroupName string
......@@ -14,6 +21,27 @@ func InitRmq(rocksdbName string, idAllocator IDAllocator) error {
return err
}
func GetRmq() *RocksMQ {
return Rmq
func InitRocksMQ(rocksdbName string) error {
var err error
once.Do(func() {
kvname := rocksdbName + "_kv"
rocksdbKV, err := rocksdbkv.NewRocksdbKV(kvname)
if err != nil {
panic(err)
}
idAllocator := NewGlobalIDAllocator("rmq_id", rocksdbKV)
_ = idAllocator.Initialize()
Rmq, err = NewRocksMQ(rocksdbName, idAllocator)
if err != nil {
panic(err)
}
})
return err
}
func CloseRocksMQ() {
if Rmq != nil && Rmq.store != nil {
Rmq.store.Close()
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册