未验证 提交 ca129d43 编写于 作者: X xiyichan 提交者: GitHub

Support configurable msgstream (#16131)

Signed-off-by: Nxiyichan <2863768433@qq.com>
上级 478890a7
...@@ -67,7 +67,10 @@ func init() { ...@@ -67,7 +67,10 @@ func init() {
func newMsgFactory(localMsg bool) msgstream.Factory { func newMsgFactory(localMsg bool) msgstream.Factory {
if localMsg { if localMsg {
return msgstream.NewRmsFactory() if Params.RocksmqEnable() {
return msgstream.NewRmsFactory()
}
return msgstream.NewPmsFactory()
} }
return msgstream.NewPmsFactory() return msgstream.NewPmsFactory()
} }
......
...@@ -213,7 +213,7 @@ type MsgStream interface { ...@@ -213,7 +213,7 @@ type MsgStream interface {
} }
type Factory interface { type Factory interface {
SetParams(params map[string]interface{}) error Init(params *paramtable.ComponentParam) error
NewMsgStream(ctx context.Context) (MsgStream, error) NewMsgStream(ctx context.Context) (MsgStream, error)
NewTtMsgStream(ctx context.Context) (MsgStream, error) NewTtMsgStream(ctx context.Context) (MsgStream, error)
NewQueryMsgStream(ctx context.Context) (MsgStream, error) NewQueryMsgStream(ctx context.Context) (MsgStream, error)
......
...@@ -259,11 +259,7 @@ func (s *Server) Init() error { ...@@ -259,11 +259,7 @@ func (s *Server) Init() error {
// 4. set server state to Healthy // 4. set server state to Healthy
func (s *Server) Start() error { func (s *Server) Start() error {
var err error var err error
m := map[string]interface{}{ err = s.msFactory.Init(&Params)
"PulsarAddress": Params.PulsarCfg.Address,
"ReceiveBufSize": 1024,
"PulsarBufSize": 1024}
err = s.msFactory.SetParams(m)
if err != nil { if err != nil {
return err return err
} }
......
...@@ -2320,12 +2320,7 @@ func newTestServer(t *testing.T, receiveCh chan interface{}, opts ...Option) *Se ...@@ -2320,12 +2320,7 @@ func newTestServer(t *testing.T, receiveCh chan interface{}, opts ...Option) *Se
Params.CommonCfg.DataCoordTimeTick = Params.CommonCfg.DataCoordTimeTick + strconv.Itoa(rand.Int()) Params.CommonCfg.DataCoordTimeTick = Params.CommonCfg.DataCoordTimeTick + strconv.Itoa(rand.Int())
var err error var err error
factory := msgstream.NewPmsFactory() factory := msgstream.NewPmsFactory()
m := map[string]interface{}{ err = factory.Init(&Params)
"pulsarAddress": Params.PulsarCfg.Address,
"receiveBufSize": 1024,
"pulsarBufSize": 1024,
}
err = factory.SetParams(m)
assert.Nil(t, err) assert.Nil(t, err)
etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg)
......
...@@ -217,13 +217,7 @@ func (node *DataNode) Init() error { ...@@ -217,13 +217,7 @@ func (node *DataNode) Init() error {
return err return err
} }
m := map[string]interface{}{ if err := node.msFactory.Init(&Params); err != nil {
"PulsarAddress": Params.PulsarCfg.Address,
"ReceiveBufSize": 1024,
"PulsarBufSize": 1024,
}
if err := node.msFactory.SetParams(m); err != nil {
log.Warn("DataNode Init msFactory SetParams failed, use default", log.Warn("DataNode Init msFactory SetParams failed, use default",
zap.Error(err)) zap.Error(err))
return err return err
......
...@@ -209,11 +209,7 @@ func TestDataNode(t *testing.T) { ...@@ -209,11 +209,7 @@ func TestDataNode(t *testing.T) {
// pulsar produce // pulsar produce
msFactory := msgstream.NewPmsFactory() msFactory := msgstream.NewPmsFactory()
m := map[string]interface{}{ err = msFactory.Init(&Params)
"pulsarAddress": Params.PulsarCfg.Address,
"receiveBufSize": 1024,
"pulsarBufSize": 1024}
err = msFactory.SetParams(m)
assert.NoError(t, err) assert.NoError(t, err)
insertStream, err := msFactory.NewMsgStream(node1.ctx) insertStream, err := msFactory.NewMsgStream(node1.ctx)
assert.NoError(t, err) assert.NoError(t, err)
......
...@@ -181,7 +181,6 @@ func TestDataSyncService_Start(t *testing.T) { ...@@ -181,7 +181,6 @@ func TestDataSyncService_Start(t *testing.T) {
defer cancel() defer cancel()
// init data node // init data node
pulsarURL := Params.PulsarCfg.Address
Factory := &MetaFactory{} Factory := &MetaFactory{}
collMeta := Factory.GetCollectionMeta(UniqueID(0), "coll1") collMeta := Factory.GetCollectionMeta(UniqueID(0), "coll1")
...@@ -196,11 +195,7 @@ func TestDataSyncService_Start(t *testing.T) { ...@@ -196,11 +195,7 @@ func TestDataSyncService_Start(t *testing.T) {
allocFactory := NewAllocatorFactory(1) allocFactory := NewAllocatorFactory(1)
msFactory := msgstream.NewPmsFactory() msFactory := msgstream.NewPmsFactory()
m := map[string]interface{}{ err = msFactory.Init(&Params)
"pulsarAddress": pulsarURL,
"receiveBufSize": 1024,
"pulsarBufSize": 1024}
err = msFactory.SetParams(m)
assert.Nil(t, err) assert.Nil(t, err)
insertChannelName := "data_sync_service_test_dml" insertChannelName := "data_sync_service_test_dml"
......
...@@ -21,6 +21,8 @@ import ( ...@@ -21,6 +21,8 @@ import (
"errors" "errors"
"testing" "testing"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/mq/msgstream" "github.com/milvus-io/milvus/internal/mq/msgstream"
"github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper" "github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper"
"github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/internalpb"
...@@ -28,17 +30,16 @@ import ( ...@@ -28,17 +30,16 @@ import (
) )
type mockMsgStreamFactory struct { type mockMsgStreamFactory struct {
SetParamsReturnNil bool InitReturnNil bool
NewMsgStreamNoError bool NewMsgStreamNoError bool
} }
var _ msgstream.Factory = &mockMsgStreamFactory{} var _ msgstream.Factory = &mockMsgStreamFactory{}
func (mm *mockMsgStreamFactory) SetParams(params map[string]interface{}) error { func (mm *mockMsgStreamFactory) Init(params *paramtable.ComponentParam) error {
if !mm.SetParamsReturnNil { if !mm.InitReturnNil {
return errors.New("Set Params Error") return errors.New("Init Error")
} }
return nil return nil
} }
......
...@@ -78,11 +78,7 @@ func TestFlowGraphInsertBufferNodeCreate(t *testing.T) { ...@@ -78,11 +78,7 @@ func TestFlowGraphInsertBufferNodeCreate(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
msFactory := msgstream.NewPmsFactory() msFactory := msgstream.NewPmsFactory()
m := map[string]interface{}{ err = msFactory.Init(&Params)
"receiveBufSize": 1024,
"pulsarAddress": Params.PulsarCfg.Address,
"pulsarBufSize": 1024}
err = msFactory.SetParams(m)
assert.Nil(t, err) assert.Nil(t, err)
fm := NewRendezvousFlushManager(&allocator{}, cm, replica, func(*segmentFlushPack) {}, emptyFlushAndDropFunc) fm := NewRendezvousFlushManager(&allocator{}, cm, replica, func(*segmentFlushPack) {}, emptyFlushAndDropFunc)
...@@ -168,11 +164,7 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) { ...@@ -168,11 +164,7 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
msFactory := msgstream.NewPmsFactory() msFactory := msgstream.NewPmsFactory()
m := map[string]interface{}{ err = msFactory.Init(&Params)
"receiveBufSize": 1024,
"pulsarAddress": Params.PulsarCfg.Address,
"pulsarBufSize": 1024}
err = msFactory.SetParams(m)
assert.Nil(t, err) assert.Nil(t, err)
fm := NewRendezvousFlushManager(NewAllocatorFactory(), cm, replica, func(*segmentFlushPack) {}, emptyFlushAndDropFunc) fm := NewRendezvousFlushManager(NewAllocatorFactory(), cm, replica, func(*segmentFlushPack) {}, emptyFlushAndDropFunc)
...@@ -372,11 +364,7 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) { ...@@ -372,11 +364,7 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) {
colRep.metaService = newMetaService(mockRootCoord, collMeta.ID) colRep.metaService = newMetaService(mockRootCoord, collMeta.ID)
msFactory := msgstream.NewPmsFactory() msFactory := msgstream.NewPmsFactory()
m := map[string]interface{}{ err = msFactory.Init(&Params)
"receiveBufSize": 1024,
"pulsarAddress": Params.PulsarCfg.Address,
"pulsarBufSize": 1024}
err = msFactory.SetParams(m)
assert.Nil(t, err) assert.Nil(t, err)
flushPacks := []*segmentFlushPack{} flushPacks := []*segmentFlushPack{}
...@@ -649,11 +637,7 @@ func TestInsertBufferNode_bufferInsertMsg(t *testing.T) { ...@@ -649,11 +637,7 @@ func TestInsertBufferNode_bufferInsertMsg(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
msFactory := msgstream.NewPmsFactory() msFactory := msgstream.NewPmsFactory()
m := map[string]interface{}{ err = msFactory.Init(&Params)
"receiveBufSize": 1024,
"pulsarAddress": Params.PulsarCfg.Address,
"pulsarBufSize": 1024}
err = msFactory.SetParams(m)
assert.Nil(t, err) assert.Nil(t, err)
fm := NewRendezvousFlushManager(&allocator{}, cm, replica, func(*segmentFlushPack) {}, emptyFlushAndDropFunc) fm := NewRendezvousFlushManager(&allocator{}, cm, replica, func(*segmentFlushPack) {}, emptyFlushAndDropFunc)
......
...@@ -19,12 +19,13 @@ package msgstream ...@@ -19,12 +19,13 @@ package msgstream
import ( import (
"context" "context"
"github.com/milvus-io/milvus/internal/util/paramtable"
rmqimplserver "github.com/milvus-io/milvus/internal/mq/mqimpl/rocksmq/server" rmqimplserver "github.com/milvus-io/milvus/internal/mq/mqimpl/rocksmq/server"
"github.com/apache/pulsar-client-go/pulsar" "github.com/apache/pulsar-client-go/pulsar"
puslarmqwrapper "github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper/pulsar" puslarmqwrapper "github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper/pulsar"
rmqwrapper "github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper/rmq" rmqwrapper "github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper/rmq"
"github.com/mitchellh/mapstructure"
) )
// PmsFactory is a pulsar msgstream factory that implemented Factory interface(msgstream.go) // PmsFactory is a pulsar msgstream factory that implemented Factory interface(msgstream.go)
...@@ -36,12 +37,11 @@ type PmsFactory struct { ...@@ -36,12 +37,11 @@ type PmsFactory struct {
PulsarBufSize int64 PulsarBufSize int64
} }
// SetParams is used to set parameters for PmsFactory // Init is used to set parameters for PmsFactory
func (f *PmsFactory) SetParams(params map[string]interface{}) error { func (f *PmsFactory) Init(params *paramtable.ComponentParam) error {
err := mapstructure.Decode(params, f) f.PulsarBufSize = 1024
if err != nil { f.ReceiveBufSize = 1024
return err f.PulsarAddress = params.PulsarCfg.Address
}
return nil return nil
} }
...@@ -86,12 +86,10 @@ type RmsFactory struct { ...@@ -86,12 +86,10 @@ type RmsFactory struct {
RmqBufSize int64 RmqBufSize int64
} }
// SetParams is used to set parameters for RmsFactory // Init is used to set parameters for RmsFactory
func (f *RmsFactory) SetParams(params map[string]interface{}) error { func (f *RmsFactory) Init(params *paramtable.ComponentParam) error {
err := mapstructure.Decode(params, f) f.RmqBufSize = 1024
if err != nil { f.ReceiveBufSize = 1024
return err
}
return nil return nil
} }
......
...@@ -27,13 +27,7 @@ import ( ...@@ -27,13 +27,7 @@ import (
func TestPmsFactory(t *testing.T) { func TestPmsFactory(t *testing.T) {
pmsFactory := NewPmsFactory() pmsFactory := NewPmsFactory()
pulsarAddress, _ := Params.Load("_PulsarAddress") pmsFactory.Init(&Params)
m := map[string]interface{}{
"PulsarAddress": pulsarAddress,
"receiveBufSize": 1024,
"pulsarBufSize": 1024,
}
pmsFactory.SetParams(m)
ctx := context.Background() ctx := context.Background()
_, err := pmsFactory.NewMsgStream(ctx) _, err := pmsFactory.NewMsgStream(ctx)
...@@ -46,17 +40,10 @@ func TestPmsFactory(t *testing.T) { ...@@ -46,17 +40,10 @@ func TestPmsFactory(t *testing.T) {
assert.Nil(t, err) assert.Nil(t, err)
} }
func TestPmsFactory_SetParams(t *testing.T) { func TestPmsFactory_Init(t *testing.T) {
pmsFactory := (*PmsFactory)(nil) rmsFactory := NewRmsFactory()
err := rmsFactory.Init(&Params)
pulsarAddress, _ := Params.Load("_PulsarAddress") assert.Nil(t, err)
m := map[string]interface{}{
"PulsarAddress": pulsarAddress,
"receiveBufSize": 1024,
"pulsarBufSize": 1024,
}
err := pmsFactory.SetParams(m)
assert.NotNil(t, err)
} }
func TestRmsFactory(t *testing.T) { func TestRmsFactory(t *testing.T) {
...@@ -65,11 +52,7 @@ func TestRmsFactory(t *testing.T) { ...@@ -65,11 +52,7 @@ func TestRmsFactory(t *testing.T) {
rmsFactory := NewRmsFactory() rmsFactory := NewRmsFactory()
m := map[string]interface{}{ rmsFactory.Init(&Params)
"ReceiveBufSize": 1024,
"RmqBufSize": 1024,
}
rmsFactory.SetParams(m)
ctx := context.Background() ctx := context.Background()
_, err := rmsFactory.NewMsgStream(ctx) _, err := rmsFactory.NewMsgStream(ctx)
...@@ -82,13 +65,8 @@ func TestRmsFactory(t *testing.T) { ...@@ -82,13 +65,8 @@ func TestRmsFactory(t *testing.T) {
assert.Nil(t, err) assert.Nil(t, err)
} }
func TestRmsFactory_SetParams(t *testing.T) { func TestRmsFactory_Init(t *testing.T) {
rmsFactory := (*RmsFactory)(nil) rmsFactory := NewRmsFactory()
err := rmsFactory.Init(&Params)
m := map[string]interface{}{ assert.Nil(t, err)
"ReceiveBufSize": 1024,
"RmqBufSize": 1024,
}
err := rmsFactory.SetParams(m)
assert.NotNil(t, err)
} }
...@@ -48,7 +48,7 @@ import ( ...@@ -48,7 +48,7 @@ import (
"github.com/milvus-io/milvus/internal/util/paramtable" "github.com/milvus-io/milvus/internal/util/paramtable"
) )
var Params paramtable.BaseTable var Params paramtable.ComponentParam
func TestMain(m *testing.M) { func TestMain(m *testing.M) {
Params.Init() Params.Init()
......
...@@ -19,6 +19,8 @@ package msgstream ...@@ -19,6 +19,8 @@ package msgstream
import ( import (
"context" "context"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper" "github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper"
"github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/util/typeutil" "github.com/milvus-io/milvus/internal/util/typeutil"
...@@ -75,7 +77,7 @@ type MsgStream interface { ...@@ -75,7 +77,7 @@ type MsgStream interface {
// Factory is an interface that can be used to generate a new msgstream object // Factory is an interface that can be used to generate a new msgstream object
type Factory interface { type Factory interface {
SetParams(params map[string]interface{}) error Init(params *paramtable.ComponentParam) error
NewMsgStream(ctx context.Context) (MsgStream, error) NewMsgStream(ctx context.Context) (MsgStream, error)
NewTtMsgStream(ctx context.Context) (MsgStream, error) NewTtMsgStream(ctx context.Context) (MsgStream, error)
NewQueryMsgStream(ctx context.Context) (MsgStream, error) NewQueryMsgStream(ctx context.Context) (MsgStream, error)
......
...@@ -22,6 +22,8 @@ import ( ...@@ -22,6 +22,8 @@ import (
"sync" "sync"
"time" "time"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/mq/msgstream" "github.com/milvus-io/milvus/internal/mq/msgstream"
"github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper" "github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper"
"github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/commonpb"
...@@ -373,7 +375,7 @@ func newSimpleMockMsgStream() *simpleMockMsgStream { ...@@ -373,7 +375,7 @@ func newSimpleMockMsgStream() *simpleMockMsgStream {
type simpleMockMsgStreamFactory struct { type simpleMockMsgStreamFactory struct {
} }
func (factory *simpleMockMsgStreamFactory) SetParams(params map[string]interface{}) error { func (factory *simpleMockMsgStreamFactory) Init(param *paramtable.ComponentParam) error {
return nil return nil
} }
......
...@@ -182,18 +182,15 @@ func (node *Proxy) Init() error { ...@@ -182,18 +182,15 @@ func (node *Proxy) Init() error {
log.Debug("create query channel for Proxy done", zap.String("QueryResultChannel", resp.QueryResultChannel)) log.Debug("create query channel for Proxy done", zap.String("QueryResultChannel", resp.QueryResultChannel))
} }
m := map[string]interface{}{ log.Debug("set parameters for ms factory", zap.String("role", typeutil.ProxyRole), zap.Any("parameters", Params.ServiceParam))
"PulsarAddress": Params.PulsarCfg.Address, if err := node.msFactory.Init(&Params); err != nil {
"PulsarBufSize": 1024}
log.Debug("set parameters for ms factory", zap.String("role", typeutil.ProxyRole), zap.Any("parameters", m))
if err := node.msFactory.SetParams(m); err != nil {
log.Warn("failed to set parameters for ms factory", log.Warn("failed to set parameters for ms factory",
zap.Error(err), zap.Error(err),
zap.String("role", typeutil.ProxyRole), zap.String("role", typeutil.ProxyRole),
zap.Any("parameters", m)) zap.Any("parameters", Params.ServiceParam))
return err return err
} }
log.Debug("set parameters for ms factory done", zap.String("role", typeutil.ProxyRole), zap.Any("parameters", m)) log.Debug("set parameters for ms factory done", zap.String("role", typeutil.ProxyRole), zap.Any("parameters", Params.ServiceParam))
log.Debug("create id allocator", zap.String("role", typeutil.ProxyRole), zap.Int64("ProxyID", Params.ProxyCfg.ProxyID)) log.Debug("create id allocator", zap.String("role", typeutil.ProxyRole), zap.Int64("ProxyID", Params.ProxyCfg.ProxyID))
idAllocator, err := allocator.NewIDAllocator(node.ctx, node.rootCoord, Params.ProxyCfg.ProxyID) idAllocator, err := allocator.NewIDAllocator(node.ctx, node.rootCoord, Params.ProxyCfg.ProxyID)
......
...@@ -96,11 +96,7 @@ func Test_HandleChannelUnsubscribeLoop(t *testing.T) { ...@@ -96,11 +96,7 @@ func Test_HandleChannelUnsubscribeLoop(t *testing.T) {
defer etcdCli.Close() defer etcdCli.Close()
kv := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath) kv := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath)
factory := msgstream.NewPmsFactory() factory := msgstream.NewPmsFactory()
m := map[string]interface{}{ factory.Init(&Params)
"PulsarAddress": Params.PulsarCfg.Address,
"ReceiveBufSize": 1024,
"PulsarBufSize": 1024}
factory.SetParams(m)
handler, err := newChannelUnsubscribeHandler(baseCtx, kv, factory) handler, err := newChannelUnsubscribeHandler(baseCtx, kv, factory)
assert.Nil(t, err) assert.Nil(t, err)
......
...@@ -457,11 +457,7 @@ func TestGrpcRequest(t *testing.T) { ...@@ -457,11 +457,7 @@ func TestGrpcRequest(t *testing.T) {
clusterSession.Init(typeutil.QueryCoordRole, Params.QueryCoordCfg.Address, true, false) clusterSession.Init(typeutil.QueryCoordRole, Params.QueryCoordCfg.Address, true, false)
clusterSession.Register() clusterSession.Register()
factory := msgstream.NewPmsFactory() factory := msgstream.NewPmsFactory()
m := map[string]interface{}{ err = factory.Init(&Params)
"PulsarAddress": Params.PulsarCfg.Address,
"ReceiveBufSize": 1024,
"PulsarBufSize": 1024}
err = factory.SetParams(m)
assert.Nil(t, err) assert.Nil(t, err)
idAllocator := func() (UniqueID, error) { idAllocator := func() (UniqueID, error) {
return 0, nil return 0, nil
...@@ -652,11 +648,7 @@ func TestSetNodeState(t *testing.T) { ...@@ -652,11 +648,7 @@ func TestSetNodeState(t *testing.T) {
clusterSession.Init(typeutil.QueryCoordRole, Params.QueryCoordCfg.Address, true, false) clusterSession.Init(typeutil.QueryCoordRole, Params.QueryCoordCfg.Address, true, false)
clusterSession.Register() clusterSession.Register()
factory := msgstream.NewPmsFactory() factory := msgstream.NewPmsFactory()
m := map[string]interface{}{ err = factory.Init(&Params)
"PulsarAddress": Params.PulsarCfg.Address,
"ReceiveBufSize": 1024,
"PulsarBufSize": 1024}
err = factory.SetParams(m)
assert.Nil(t, err) assert.Nil(t, err)
idAllocator := func() (UniqueID, error) { idAllocator := func() (UniqueID, error) {
return 0, nil return 0, nil
......
...@@ -222,11 +222,7 @@ func (qc *QueryCoord) Init() error { ...@@ -222,11 +222,7 @@ func (qc *QueryCoord) Init() error {
// Start function starts the goroutines to watch the meta and node updates // Start function starts the goroutines to watch the meta and node updates
func (qc *QueryCoord) Start() error { func (qc *QueryCoord) Start() error {
m := map[string]interface{}{ err := qc.msFactory.Init(&Params)
"PulsarAddress": Params.PulsarCfg.Address,
"ReceiveBufSize": 1024,
"PulsarBufSize": 1024}
err := qc.msFactory.SetParams(m)
if err != nil { if err != nil {
return err return err
} }
......
...@@ -24,6 +24,8 @@ import ( ...@@ -24,6 +24,8 @@ import (
"math/rand" "math/rand"
"strconv" "strconv"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/indexcgowrapper" "github.com/milvus-io/milvus/internal/util/indexcgowrapper"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
...@@ -623,13 +625,8 @@ func genEtcdKV() (*etcdkv.EtcdKV, error) { ...@@ -623,13 +625,8 @@ func genEtcdKV() (*etcdkv.EtcdKV, error) {
func genFactory() (msgstream.Factory, error) { func genFactory() (msgstream.Factory, error) {
const receiveBufSize = 1024 const receiveBufSize = 1024
pulsarURL := Params.PulsarCfg.Address
msFactory := msgstream.NewPmsFactory() msFactory := msgstream.NewPmsFactory()
m := map[string]interface{}{ err := msFactory.Init(&Params)
"receiveBufSize": receiveBufSize,
"pulsarAddress": pulsarURL,
"pulsarBufSize": 1024}
err := msFactory.SetParams(m)
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -640,11 +637,7 @@ func genInvalidFactory() (msgstream.Factory, error) { ...@@ -640,11 +637,7 @@ func genInvalidFactory() (msgstream.Factory, error) {
const receiveBufSize = 1024 const receiveBufSize = 1024
msFactory := msgstream.NewPmsFactory() msFactory := msgstream.NewPmsFactory()
m := map[string]interface{}{ err := msFactory.Init(&Params)
"receiveBufSize": receiveBufSize,
"pulsarAddress": "",
"pulsarBufSize": 1024}
err := msFactory.SetParams(m)
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -1835,7 +1828,7 @@ type mockMsgStreamFactory struct { ...@@ -1835,7 +1828,7 @@ type mockMsgStreamFactory struct {
var _ msgstream.Factory = &mockMsgStreamFactory{} var _ msgstream.Factory = &mockMsgStreamFactory{}
func (mm *mockMsgStreamFactory) SetParams(params map[string]interface{}) error { func (mm *mockMsgStreamFactory) Init(params *paramtable.ComponentParam) error {
return nil return nil
} }
......
...@@ -125,12 +125,8 @@ func updateTSafe(queryCollection *queryCollection, timestamp Timestamp) error { ...@@ -125,12 +125,8 @@ func updateTSafe(queryCollection *queryCollection, timestamp Timestamp) error {
func TestQueryCollection_withoutVChannel(t *testing.T) { func TestQueryCollection_withoutVChannel(t *testing.T) {
ctx := context.Background() ctx := context.Background()
m := map[string]interface{}{
"PulsarAddress": Params.PulsarCfg.Address,
"ReceiveBufSize": 1024,
"PulsarBufSize": 1024}
factory := msgstream.NewPmsFactory() factory := msgstream.NewPmsFactory()
err := factory.SetParams(m) err := factory.Init(&Params)
assert.Nil(t, err) assert.Nil(t, err)
etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg)
assert.Nil(t, err) assert.Nil(t, err)
......
...@@ -331,12 +331,7 @@ func (node *QueryNode) Init() error { ...@@ -331,12 +331,7 @@ func (node *QueryNode) Init() error {
// Start mainly start QueryNode's query service. // Start mainly start QueryNode's query service.
func (node *QueryNode) Start() error { func (node *QueryNode) Start() error {
var err error err := node.msFactory.Init(&Params)
m := map[string]interface{}{
"PulsarAddress": Params.PulsarCfg.Address,
"ReceiveBufSize": 1024,
"PulsarBufSize": 1024}
err = node.msFactory.SetParams(m)
if err != nil { if err != nil {
return err return err
} }
......
...@@ -227,13 +227,8 @@ func makeNewChannelNames(names []string, suffix string) []string { ...@@ -227,13 +227,8 @@ func makeNewChannelNames(names []string, suffix string) []string {
func newMessageStreamFactory() (msgstream.Factory, error) { func newMessageStreamFactory() (msgstream.Factory, error) {
const receiveBufSize = 1024 const receiveBufSize = 1024
pulsarURL := Params.PulsarCfg.Address
msFactory := msgstream.NewPmsFactory() msFactory := msgstream.NewPmsFactory()
m := map[string]interface{}{ err := msFactory.Init(&Params)
"receiveBufSize": receiveBufSize,
"pulsarAddress": pulsarURL,
"pulsarBufSize": 1024}
err := msFactory.SetParams(m)
return msFactory, err return msFactory, err
} }
......
...@@ -30,11 +30,7 @@ func TestStatsService_start(t *testing.T) { ...@@ -30,11 +30,7 @@ func TestStatsService_start(t *testing.T) {
initTestMeta(t, node, 0, 0) initTestMeta(t, node, 0, 0)
msFactory := msgstream.NewPmsFactory() msFactory := msgstream.NewPmsFactory()
m := map[string]interface{}{ msFactory.Init(&Params)
"PulsarAddress": Params.PulsarCfg.Address,
"ReceiveBufSize": 1024,
"PulsarBufSize": 1024}
msFactory.SetParams(m)
node.statsService = newStatsService(node.queryNodeLoopCtx, node.historical.replica, msFactory) node.statsService = newStatsService(node.queryNodeLoopCtx, node.historical.replica, msFactory)
node.statsService.start() node.statsService.start()
node.Stop() node.Stop()
...@@ -53,11 +49,7 @@ func TestSegmentManagement_sendSegmentStatistic(t *testing.T) { ...@@ -53,11 +49,7 @@ func TestSegmentManagement_sendSegmentStatistic(t *testing.T) {
producerChannels := []string{Params.CommonCfg.QueryNodeStats} producerChannels := []string{Params.CommonCfg.QueryNodeStats}
msFactory := msgstream.NewPmsFactory() msFactory := msgstream.NewPmsFactory()
m := map[string]interface{}{ err = msFactory.Init(&Params)
"receiveBufSize": receiveBufSize,
"pulsarAddress": Params.PulsarCfg.Address,
"pulsarBufSize": 1024}
err = msFactory.SetParams(m)
assert.Nil(t, err) assert.Nil(t, err)
statsStream, err := msFactory.NewMsgStream(node.queryNodeLoopCtx) statsStream, err := msFactory.NewMsgStream(node.queryNodeLoopCtx)
......
...@@ -41,11 +41,7 @@ func TestDmlChannels(t *testing.T) { ...@@ -41,11 +41,7 @@ func TestDmlChannels(t *testing.T) {
factory := msgstream.NewPmsFactory() factory := msgstream.NewPmsFactory()
Params.Init() Params.Init()
m := map[string]interface{}{ err := factory.Init(&Params)
"pulsarAddress": Params.PulsarCfg.Address,
"receiveBufSize": 1024,
"pulsarBufSize": 1024}
err := factory.SetParams(m)
assert.Nil(t, err) assert.Nil(t, err)
dml := newDmlChannels(ctx, factory, dmlChanPrefix, totalDmlChannelNum) dml := newDmlChannels(ctx, factory, dmlChanPrefix, totalDmlChannelNum)
......
...@@ -1055,11 +1055,7 @@ func (c *Core) Init() error { ...@@ -1055,11 +1055,7 @@ func (c *Core) Init() error {
return tsoAllocator.GetLastSavedTime() return tsoAllocator.GetLastSavedTime()
} }
m := map[string]interface{}{ if initError = c.msFactory.Init(&Params); initError != nil {
"PulsarAddress": Params.PulsarCfg.Address,
"ReceiveBufSize": 1024,
"PulsarBufSize": 1024}
if initError = c.msFactory.SetParams(m); initError != nil {
return return
} }
......
...@@ -628,11 +628,7 @@ func TestRootCoord(t *testing.T) { ...@@ -628,11 +628,7 @@ func TestRootCoord(t *testing.T) {
tmpFactory := msgstream.NewPmsFactory() tmpFactory := msgstream.NewPmsFactory()
m := map[string]interface{}{ err = tmpFactory.Init(&Params)
"pulsarAddress": Params.PulsarCfg.Address,
"receiveBufSize": 1024,
"pulsarBufSize": 1024}
err = tmpFactory.SetParams(m)
assert.Nil(t, err) assert.Nil(t, err)
timeTickStream, _ := tmpFactory.NewMsgStream(ctx) timeTickStream, _ := tmpFactory.NewMsgStream(ctx)
...@@ -2420,11 +2416,7 @@ func TestRootCoord2(t *testing.T) { ...@@ -2420,11 +2416,7 @@ func TestRootCoord2(t *testing.T) {
err = core.Register() err = core.Register()
assert.Nil(t, err) assert.Nil(t, err)
m := map[string]interface{}{ err = msFactory.Init(&Params)
"receiveBufSize": 1024,
"pulsarAddress": Params.PulsarCfg.Address,
"pulsarBufSize": 1024}
err = msFactory.SetParams(m)
assert.Nil(t, err) assert.Nil(t, err)
timeTickStream, _ := msFactory.NewMsgStream(ctx) timeTickStream, _ := msFactory.NewMsgStream(ctx)
...@@ -2708,11 +2700,7 @@ func TestCheckFlushedSegments(t *testing.T) { ...@@ -2708,11 +2700,7 @@ func TestCheckFlushedSegments(t *testing.T) {
err = core.Register() err = core.Register()
assert.Nil(t, err) assert.Nil(t, err)
m := map[string]interface{}{ err = msFactory.Init(&Params)
"receiveBufSize": 1024,
"pulsarAddress": Params.PulsarCfg.Address,
"pulsarBufSize": 1024}
err = msFactory.SetParams(m)
assert.Nil(t, err) assert.Nil(t, err)
timeTickStream, _ := msFactory.NewMsgStream(ctx) timeTickStream, _ := msFactory.NewMsgStream(ctx)
...@@ -2875,11 +2863,7 @@ func TestRootCoord_CheckZeroShardsNum(t *testing.T) { ...@@ -2875,11 +2863,7 @@ func TestRootCoord_CheckZeroShardsNum(t *testing.T) {
err = core.Register() err = core.Register()
assert.Nil(t, err) assert.Nil(t, err)
m := map[string]interface{}{ err = msFactory.Init(&Params)
"receiveBufSize": 1024,
"pulsarAddress": Params.PulsarCfg.Address,
"pulsarBufSize": 1024}
err = msFactory.SetParams(m)
assert.Nil(t, err) assert.Nil(t, err)
timeTickStream, _ := msFactory.NewMsgStream(ctx) timeTickStream, _ := msFactory.NewMsgStream(ctx)
......
...@@ -120,11 +120,7 @@ func BenchmarkAllocTimestamp(b *testing.B) { ...@@ -120,11 +120,7 @@ func BenchmarkAllocTimestamp(b *testing.B) {
err = core.Start() err = core.Start()
assert.Nil(b, err) assert.Nil(b, err)
m := map[string]interface{}{ err = msFactory.Init(&Params)
"receiveBufSize": 1024,
"pulsarAddress": Params.PulsarCfg.Address,
"pulsarBufSize": 1024}
err = msFactory.SetParams(m)
assert.Nil(b, err) assert.Nil(b, err)
b.ResetTimer() b.ResetTimer()
......
...@@ -33,11 +33,7 @@ func TestTimetickSync(t *testing.T) { ...@@ -33,11 +33,7 @@ func TestTimetickSync(t *testing.T) {
sourceID := int64(100) sourceID := int64(100)
factory := msgstream.NewPmsFactory() factory := msgstream.NewPmsFactory()
m := map[string]interface{}{ err := factory.Init(&Params)
"pulsarAddress": Params.PulsarCfg.Address,
"receiveBufSize": 1024,
"pulsarBufSize": 1024}
err := factory.SetParams(m)
assert.Nil(t, err) assert.Nil(t, err)
//chanMap := map[typeutil.UniqueID][]string{ //chanMap := map[typeutil.UniqueID][]string{
......
...@@ -21,6 +21,8 @@ import ( ...@@ -21,6 +21,8 @@ import (
"os" "os"
"testing" "testing"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/mq/msgstream" "github.com/milvus-io/milvus/internal/mq/msgstream"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
...@@ -28,8 +30,8 @@ import ( ...@@ -28,8 +30,8 @@ import (
func TestInputNode(t *testing.T) { func TestInputNode(t *testing.T) {
os.Setenv("ROCKSMQ_PATH", "/tmp/MilvusTest/FlowGraph/TestInputNode") os.Setenv("ROCKSMQ_PATH", "/tmp/MilvusTest/FlowGraph/TestInputNode")
msFactory := msgstream.NewRmsFactory() msFactory := msgstream.NewRmsFactory()
m := map[string]interface{}{} var Params paramtable.ComponentParam
err := msFactory.SetParams(m) err := msFactory.Init(&Params)
assert.Nil(t, err) assert.Nil(t, err)
msgStream, _ := msFactory.NewMsgStream(context.TODO()) msgStream, _ := msFactory.NewMsgStream(context.TODO())
......
...@@ -24,6 +24,8 @@ import ( ...@@ -24,6 +24,8 @@ import (
"testing" "testing"
"time" "time"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/mq/msgstream" "github.com/milvus-io/milvus/internal/mq/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/internalpb"
...@@ -57,8 +59,8 @@ func generateMsgPack() msgstream.MsgPack { ...@@ -57,8 +59,8 @@ func generateMsgPack() msgstream.MsgPack {
func TestNodeCtx_Start(t *testing.T) { func TestNodeCtx_Start(t *testing.T) {
os.Setenv("ROCKSMQ_PATH", "/tmp/MilvusTest/FlowGraph/TestNodeStart") os.Setenv("ROCKSMQ_PATH", "/tmp/MilvusTest/FlowGraph/TestNodeStart")
msFactory := msgstream.NewRmsFactory() msFactory := msgstream.NewRmsFactory()
m := map[string]interface{}{} var Params paramtable.ComponentParam
err := msFactory.SetParams(m) err := msFactory.Init(&Params)
assert.Nil(t, err) assert.Nil(t, err)
msgStream, _ := msFactory.NewMsgStream(context.TODO()) msgStream, _ := msFactory.NewMsgStream(context.TODO())
......
...@@ -75,6 +75,14 @@ func (p *ComponentParam) SetLogConfig(role string) { ...@@ -75,6 +75,14 @@ func (p *ComponentParam) SetLogConfig(role string) {
p.BaseTable.SetLogConfig() p.BaseTable.SetLogConfig()
} }
func (p *ComponentParam) RocksmqEnable() bool {
return p.RocksmqCfg.Path != ""
}
func (p *ComponentParam) PulsarEnable() bool {
return p.PulsarCfg.Address != ""
}
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
// --- common --- // --- common ---
type commonConfig struct { type commonConfig struct {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册