未验证 提交 7efb02a4 编写于 作者: C Cai Yudong 提交者: GitHub

Use PulsarConfig in GlobalParams for all components (#15046)

Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>
上级 e2d16ed8
......@@ -256,7 +256,7 @@ func (s *Server) Init() error {
func (s *Server) Start() error {
var err error
m := map[string]interface{}{
"PulsarAddress": Params.DataCoordCfg.PulsarAddress,
"PulsarAddress": Params.PulsarCfg.Address,
"ReceiveBufSize": 1024,
"PulsarBufSize": 1024}
err = s.msFactory.SetParams(m)
......
......@@ -2237,7 +2237,7 @@ func newTestServer(t *testing.T, receiveCh chan interface{}, opts ...Option) *Se
var err error
factory := msgstream.NewPmsFactory()
m := map[string]interface{}{
"pulsarAddress": Params.DataCoordCfg.PulsarAddress,
"pulsarAddress": Params.PulsarCfg.Address,
"receiveBufSize": 1024,
"pulsarBufSize": 1024,
}
......
......@@ -221,7 +221,7 @@ func (node *DataNode) Init() error {
Params.DataNodeCfg.Refresh()
m := map[string]interface{}{
"PulsarAddress": Params.DataNodeCfg.PulsarAddress,
"PulsarAddress": Params.PulsarCfg.Address,
"ReceiveBufSize": 1024,
"PulsarBufSize": 1024,
}
......
......@@ -235,7 +235,7 @@ func TestDataNode(t *testing.T) {
// pulsar produce
msFactory := msgstream.NewPmsFactory()
m := map[string]interface{}{
"pulsarAddress": Params.DataNodeCfg.PulsarAddress,
"pulsarAddress": Params.PulsarCfg.Address,
"receiveBufSize": 1024,
"pulsarBufSize": 1024}
err = msFactory.SetParams(m)
......
......@@ -177,7 +177,7 @@ func TestDataSyncService_Start(t *testing.T) {
defer cancel()
// init data node
pulsarURL := Params.DataNodeCfg.PulsarAddress
pulsarURL := Params.PulsarCfg.Address
Factory := &MetaFactory{}
collMeta := Factory.GetCollectionMeta(UniqueID(0), "coll1")
......
......@@ -76,7 +76,7 @@ func TestFlowGraphInsertBufferNodeCreate(t *testing.T) {
msFactory := msgstream.NewPmsFactory()
m := map[string]interface{}{
"receiveBufSize": 1024,
"pulsarAddress": Params.DataNodeCfg.PulsarAddress,
"pulsarAddress": Params.PulsarCfg.Address,
"pulsarBufSize": 1024}
err = msFactory.SetParams(m)
assert.Nil(t, err)
......@@ -166,7 +166,7 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) {
msFactory := msgstream.NewPmsFactory()
m := map[string]interface{}{
"receiveBufSize": 1024,
"pulsarAddress": Params.DataNodeCfg.PulsarAddress,
"pulsarAddress": Params.PulsarCfg.Address,
"pulsarBufSize": 1024}
err = msFactory.SetParams(m)
assert.Nil(t, err)
......@@ -372,7 +372,7 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) {
msFactory := msgstream.NewPmsFactory()
m := map[string]interface{}{
"receiveBufSize": 1024,
"pulsarAddress": Params.DataNodeCfg.PulsarAddress,
"pulsarAddress": Params.PulsarCfg.Address,
"pulsarBufSize": 1024}
err = msFactory.SetParams(m)
assert.Nil(t, err)
......@@ -646,7 +646,7 @@ func TestInsertBufferNode_bufferInsertMsg(t *testing.T) {
msFactory := msgstream.NewPmsFactory()
m := map[string]interface{}{
"receiveBufSize": 1024,
"pulsarAddress": Params.DataNodeCfg.PulsarAddress,
"pulsarAddress": Params.PulsarCfg.Address,
"pulsarBufSize": 1024}
err = msFactory.SetParams(m)
assert.Nil(t, err)
......
......@@ -177,7 +177,7 @@ func (node *Proxy) Init() error {
}
m := map[string]interface{}{
"PulsarAddress": Params.ProxyCfg.PulsarAddress,
"PulsarAddress": Params.PulsarCfg.Address,
"PulsarBufSize": 1024}
log.Debug("set parameters for ms factory", zap.String("role", typeutil.ProxyRole), zap.Any("parameters", m))
if err := node.msFactory.SetParams(m); err != nil {
......
......@@ -891,7 +891,7 @@ func (it *insertTask) _assignSegmentID(stream msgstream.MsgStream, pack *msgstre
return 0
}
threshold := Params.ProxyCfg.PulsarMaxMessageSize
threshold := Params.PulsarCfg.MaxMessageSize
// not accurate
/* #nosec G103 */
getFixedSizeOfInsertMsg := func(msg *msgstream.InsertMsg) int {
......
......@@ -469,7 +469,7 @@ func TestGrpcRequest(t *testing.T) {
clusterSession.Register()
factory := msgstream.NewPmsFactory()
m := map[string]interface{}{
"PulsarAddress": Params.QueryCoordCfg.PulsarAddress,
"PulsarAddress": Params.PulsarCfg.Address,
"ReceiveBufSize": 1024,
"PulsarBufSize": 1024}
err = factory.SetParams(m)
......
......@@ -191,7 +191,7 @@ func (qc *QueryCoord) Init() error {
// Start function starts the goroutines to watch the meta and node updates
func (qc *QueryCoord) Start() error {
m := map[string]interface{}{
"PulsarAddress": Params.QueryCoordCfg.PulsarAddress,
"PulsarAddress": Params.PulsarCfg.Address,
"ReceiveBufSize": 1024,
"PulsarBufSize": 1024}
err := qc.msFactory.SetParams(m)
......
......@@ -928,7 +928,7 @@ func doInsert(ctx context.Context, collectionID UniqueID, partitionID UniqueID,
msFactory := msgstream.NewPmsFactory()
m := map[string]interface{}{
"receiveBufSize": receiveBufSize,
"pulsarAddress": Params.QueryNodeCfg.PulsarAddress,
"pulsarAddress": Params.PulsarCfg.Address,
"pulsarBufSize": 1024}
err := msFactory.SetParams(m)
if err != nil {
......
......@@ -382,7 +382,7 @@ func genEtcdKV() (*etcdkv.EtcdKV, error) {
func genFactory() (msgstream.Factory, error) {
const receiveBufSize = 1024
pulsarURL := Params.QueryNodeCfg.PulsarAddress
pulsarURL := Params.PulsarCfg.Address
msFactory := msgstream.NewPmsFactory()
m := map[string]interface{}{
"receiveBufSize": receiveBufSize,
......
......@@ -126,7 +126,7 @@ func updateTSafe(queryCollection *queryCollection, timestamp Timestamp) error {
func TestQueryCollection_withoutVChannel(t *testing.T) {
ctx := context.Background()
m := map[string]interface{}{
"PulsarAddress": Params.QueryNodeCfg.PulsarAddress,
"PulsarAddress": Params.PulsarCfg.Address,
"ReceiveBufSize": 1024,
"PulsarBufSize": 1024}
factory := msgstream.NewPmsFactory()
......
......@@ -250,7 +250,7 @@ func (node *QueryNode) Init() error {
func (node *QueryNode) Start() error {
var err error
m := map[string]interface{}{
"PulsarAddress": Params.QueryNodeCfg.PulsarAddress,
"PulsarAddress": Params.PulsarCfg.Address,
"ReceiveBufSize": 1024,
"PulsarBufSize": 1024}
err = node.msFactory.SetParams(m)
......
......@@ -223,7 +223,7 @@ func makeNewChannelNames(names []string, suffix string) []string {
func newMessageStreamFactory() (msgstream.Factory, error) {
const receiveBufSize = 1024
pulsarURL := Params.QueryNodeCfg.PulsarAddress
pulsarURL := Params.PulsarCfg.Address
msFactory := msgstream.NewPmsFactory()
m := map[string]interface{}{
"receiveBufSize": receiveBufSize,
......
......@@ -31,7 +31,7 @@ func TestStatsService_start(t *testing.T) {
msFactory := msgstream.NewPmsFactory()
m := map[string]interface{}{
"PulsarAddress": Params.QueryNodeCfg.PulsarAddress,
"PulsarAddress": Params.PulsarCfg.Address,
"ReceiveBufSize": 1024,
"PulsarBufSize": 1024}
msFactory.SetParams(m)
......@@ -55,7 +55,7 @@ func TestSegmentManagement_sendSegmentStatistic(t *testing.T) {
msFactory := msgstream.NewPmsFactory()
m := map[string]interface{}{
"receiveBufSize": receiveBufSize,
"pulsarAddress": Params.QueryNodeCfg.PulsarAddress,
"pulsarAddress": Params.PulsarCfg.Address,
"pulsarBufSize": 1024}
err = msFactory.SetParams(m)
assert.Nil(t, err)
......
......@@ -42,7 +42,7 @@ func TestDmlChannels(t *testing.T) {
Params.Init()
m := map[string]interface{}{
"pulsarAddress": Params.RootCoordCfg.PulsarAddress,
"pulsarAddress": Params.PulsarCfg.Address,
"receiveBufSize": 1024,
"pulsarBufSize": 1024}
err := factory.SetParams(m)
......
......@@ -443,8 +443,8 @@ func (c *Core) setDdMsgSendFlag(b bool) error {
}
func (c *Core) setMsgStreams() error {
if Params.RootCoordCfg.PulsarAddress == "" {
return fmt.Errorf("pulsarAddress is empty")
if Params.PulsarCfg.Address == "" {
return fmt.Errorf("pulsar address is empty")
}
if Params.RootCoordCfg.MsgChannelSubName == "" {
return fmt.Errorf("msgChannelSubName is empty")
......@@ -1021,7 +1021,7 @@ func (c *Core) Init() error {
}
m := map[string]interface{}{
"PulsarAddress": Params.RootCoordCfg.PulsarAddress,
"PulsarAddress": Params.PulsarCfg.Address,
"ReceiveBufSize": 1024,
"PulsarBufSize": 1024}
if initError = c.msFactory.SetParams(m); initError != nil {
......
......@@ -619,7 +619,7 @@ func TestRootCoord(t *testing.T) {
tmpFactory := msgstream.NewPmsFactory()
m := map[string]interface{}{
"pulsarAddress": Params.RootCoordCfg.PulsarAddress,
"pulsarAddress": Params.PulsarCfg.Address,
"receiveBufSize": 1024,
"pulsarBufSize": 1024}
err = tmpFactory.SetParams(m)
......@@ -2355,7 +2355,7 @@ func TestRootCoord2(t *testing.T) {
m := map[string]interface{}{
"receiveBufSize": 1024,
"pulsarAddress": Params.RootCoordCfg.PulsarAddress,
"pulsarAddress": Params.PulsarCfg.Address,
"pulsarBufSize": 1024}
err = msFactory.SetParams(m)
assert.Nil(t, err)
......@@ -2636,7 +2636,7 @@ func TestCheckFlushedSegments(t *testing.T) {
m := map[string]interface{}{
"receiveBufSize": 1024,
"pulsarAddress": Params.RootCoordCfg.PulsarAddress,
"pulsarAddress": Params.PulsarCfg.Address,
"pulsarBufSize": 1024}
err = msFactory.SetParams(m)
assert.Nil(t, err)
......@@ -2803,7 +2803,7 @@ func TestRootCoord_CheckZeroShardsNum(t *testing.T) {
m := map[string]interface{}{
"receiveBufSize": 1024,
"pulsarAddress": Params.RootCoordCfg.PulsarAddress,
"pulsarAddress": Params.PulsarCfg.Address,
"pulsarBufSize": 1024}
err = msFactory.SetParams(m)
assert.Nil(t, err)
......
......@@ -122,7 +122,7 @@ func BenchmarkAllocTimestamp(b *testing.B) {
m := map[string]interface{}{
"receiveBufSize": 1024,
"pulsarAddress": Params.RootCoordCfg.PulsarAddress,
"pulsarAddress": Params.PulsarCfg.Address,
"pulsarBufSize": 1024}
err = msFactory.SetParams(m)
assert.Nil(b, err)
......
......@@ -38,7 +38,7 @@ func TestTimetickSync(t *testing.T) {
factory := msgstream.NewPmsFactory()
m := map[string]interface{}{
"pulsarAddress": Params.RootCoordCfg.PulsarAddress,
"pulsarAddress": Params.PulsarCfg.Address,
"receiveBufSize": 1024,
"pulsarBufSize": 1024}
err := factory.SetParams(m)
......
......@@ -53,6 +53,7 @@ type GlobalParamTable struct {
once sync.Once
BaseParams BaseParamTable
PulsarCfg pulsarConfig
//CommonCfg commonConfig
//KnowhereCfg knowhereConfig
//MsgChannelCfg msgChannelConfig
......@@ -78,6 +79,7 @@ func (p *GlobalParamTable) InitOnce() {
func (p *GlobalParamTable) Init() {
p.BaseParams.Init()
p.PulsarCfg.init(&p.BaseParams)
//p.CommonCfg.init(&p.BaseParams)
//p.KnowhereCfg.init(&p.BaseParams)
//p.MsgChannelCfg.init(&p.BaseParams)
......@@ -100,6 +102,43 @@ func (p *GlobalParamTable) SetLogConfig(role string) {
// TODO: considering remove it: comment a large block of code is not a good practice, old code can be found with git
///////////////////////////////////////////////////////////////////////////////
// --- pulsar ---
type pulsarConfig struct {
BaseParams *BaseParamTable
Address string
MaxMessageSize int
}
func (p *pulsarConfig) init(bp *BaseParamTable) {
p.BaseParams = bp
p.initAddress()
p.initMaxMessageSize()
}
func (p *pulsarConfig) initAddress() {
addr, err := p.BaseParams.Load("_PulsarAddress")
if err != nil {
panic(err)
}
p.Address = addr
}
func (p *pulsarConfig) initMaxMessageSize() {
maxMessageSizeStr, err := p.BaseParams.Load("pulsar.maxMessageSize")
if err != nil {
p.MaxMessageSize = SuggestPulsarMaxMessageSize
} else {
maxMessageSize, err := strconv.Atoi(maxMessageSizeStr)
if err != nil {
p.MaxMessageSize = SuggestPulsarMaxMessageSize
} else {
p.MaxMessageSize = maxMessageSize
}
}
}
// --- common ---
//type commonConfig struct {
// BaseParams *BaseParamTable
......@@ -294,8 +333,6 @@ type rootCoordConfig struct {
Address string
Port int
PulsarAddress string
ClusterChannelPrefix string
MsgChannelSubName string
TimeTickChannel string
......@@ -316,8 +353,6 @@ type rootCoordConfig struct {
func (p *rootCoordConfig) init(bp *BaseParamTable) {
p.BaseParams = bp
p.initPulsarAddress()
// Has to init global msgchannel prefix before other channel names
p.initClusterMsgChannelPrefix()
p.initMsgChannelSubName()
......@@ -333,14 +368,6 @@ func (p *rootCoordConfig) init(bp *BaseParamTable) {
p.initDefaultIndexName()
}
func (p *rootCoordConfig) initPulsarAddress() {
addr, err := p.BaseParams.Load("_PulsarAddress")
if err != nil {
panic(err)
}
p.PulsarAddress = addr
}
func (p *rootCoordConfig) initClusterMsgChannelPrefix() {
config, err := p.BaseParams.Load("msgChannel.chanNamePrefix.cluster")
if err != nil {
......@@ -429,8 +456,6 @@ type proxyConfig struct {
Alias string
PulsarAddress string
RocksmqPath string // not used in Proxy
ProxyID UniqueID
......@@ -456,8 +481,6 @@ type proxyConfig struct {
MaxTaskNum int64
PulsarMaxMessageSize int
RetentionDuration int64
CreatedTime time.Time
......@@ -467,7 +490,6 @@ type proxyConfig struct {
func (p *proxyConfig) init(bp *BaseParamTable) {
p.BaseParams = bp
p.initPulsarAddress()
p.initRocksmqPath()
p.initTimeTickInterval()
......@@ -483,8 +505,6 @@ func (p *proxyConfig) init(bp *BaseParamTable) {
p.initDefaultPartitionName()
p.initDefaultIndexName()
p.initPulsarMaxMessageSize()
p.initMaxTaskNum()
p.initBufFlagExpireTime()
p.initBufFlagCleanupInterval()
......@@ -501,14 +521,6 @@ func (p *proxyConfig) InitAlias(alias string) {
p.Alias = alias
}
func (p *proxyConfig) initPulsarAddress() {
ret, err := p.BaseParams.Load("_PulsarAddress")
if err != nil {
panic(err)
}
p.PulsarAddress = ret
}
func (p *proxyConfig) initRocksmqPath() {
path, err := p.BaseParams.Load("_RocksmqPath")
if err != nil {
......@@ -599,20 +611,6 @@ func (p *proxyConfig) initDefaultIndexName() {
p.DefaultIndexName = name
}
func (p *proxyConfig) initPulsarMaxMessageSize() {
maxMessageSizeStr, err := p.BaseParams.Load("pulsar.maxMessageSize")
if err != nil {
p.PulsarMaxMessageSize = SuggestPulsarMaxMessageSize
} else {
maxMessageSize, err := strconv.Atoi(maxMessageSizeStr)
if err != nil {
p.PulsarMaxMessageSize = SuggestPulsarMaxMessageSize
} else {
p.PulsarMaxMessageSize = maxMessageSize
}
}
}
func (p *proxyConfig) initMaxTaskNum() {
p.MaxTaskNum = p.BaseParams.ParseInt64WithDefault("proxy.maxTaskNum", 1024)
}
......@@ -666,9 +664,6 @@ type queryCoordConfig struct {
DmlChannelPrefix string
DeltaChannelPrefix string
// --- Pulsar ---
PulsarAddress string
//---- Handoff ---
AutoHandoff bool
......@@ -696,9 +691,6 @@ func (p *queryCoordConfig) init(bp *BaseParamTable) {
p.initMinioUseSSLStr()
p.initMinioBucketName()
//--- Pulsar ----
p.initPulsarAddress()
//---- Handoff ---
p.initAutoHandoff()
......@@ -801,14 +793,6 @@ func (p *queryCoordConfig) initMinioBucketName() {
p.MinioBucketName = bucketName
}
func (p *queryCoordConfig) initPulsarAddress() {
addr, err := p.BaseParams.Load("_PulsarAddress")
if err != nil {
panic(err)
}
p.PulsarAddress = addr
}
func (p *queryCoordConfig) initAutoHandoff() {
handoff, err := p.BaseParams.Load("queryCoord.autoHandoff")
if err != nil {
......@@ -879,8 +863,7 @@ func (p *queryCoordConfig) initDeltaChannelName() {
type queryNodeConfig struct {
BaseParams *BaseParamTable
PulsarAddress string
RocksmqPath string
RocksmqPath string
Alias string
QueryNodeIP string
......@@ -950,7 +933,6 @@ func (p *queryNodeConfig) init(bp *BaseParamTable) {
p.initMinioUseSSLStr()
p.initMinioBucketName()
p.initPulsarAddress()
p.initRocksmqPath()
p.initGracefulTime()
......@@ -1053,14 +1035,6 @@ func (p *queryNodeConfig) initMinioBucketName() {
p.MinioBucketName = bucketName
}
func (p *queryNodeConfig) initPulsarAddress() {
url, err := p.BaseParams.Load("_PulsarAddress")
if err != nil {
panic(err)
}
p.PulsarAddress = url
}
func (p *queryNodeConfig) initRocksmqPath() {
path, err := p.BaseParams.Load("_RocksmqPath")
if err != nil {
......@@ -1184,9 +1158,6 @@ type dataCoordConfig struct {
MinioBucketName string
MinioRootPath string
// --- Pulsar ---
PulsarAddress string
// --- Rocksmq ---
RocksmqPath string
......@@ -1222,7 +1193,6 @@ func (p *dataCoordConfig) init(bp *BaseParamTable) {
p.initChannelWatchPrefix()
p.initPulsarAddress()
p.initRocksmqPath()
p.initSegmentMaxSize()
......@@ -1254,14 +1224,6 @@ func (p *dataCoordConfig) init(bp *BaseParamTable) {
p.initGCDropTolerance()
}
func (p *dataCoordConfig) initPulsarAddress() {
addr, err := p.BaseParams.Load("_PulsarAddress")
if err != nil {
panic(err)
}
p.PulsarAddress = addr
}
func (p *dataCoordConfig) initRocksmqPath() {
path, err := p.BaseParams.Load("_RocksmqPath")
if err != nil {
......@@ -1436,9 +1398,6 @@ type dataNodeConfig struct {
DmlChannelName string
DeltaChannelName string
// Pulsar address
PulsarAddress string
// Rocksmq path
RocksmqPath string
......@@ -1475,7 +1434,6 @@ func (p *dataNodeConfig) init(bp *BaseParamTable) {
p.initStatsBinlogRootPath()
p.initDeleteBinlogRootPath()
p.initPulsarAddress()
p.initRocksmqPath()
// Must init global msgchannel prefix before other channel names
......@@ -1542,14 +1500,6 @@ func (p *dataNodeConfig) initDeleteBinlogRootPath() {
p.DeleteBinlogRootPath = path.Join(rootPath, "delta_log")
}
func (p *dataNodeConfig) initPulsarAddress() {
url, err := p.BaseParams.Load("_PulsarAddress")
if err != nil {
panic(err)
}
p.PulsarAddress = url
}
func (p *dataNodeConfig) initRocksmqPath() {
path, err := p.BaseParams.Load("_RocksmqPath")
if err != nil {
......
......@@ -15,7 +15,6 @@ import (
"log"
"os"
"path"
"strings"
"testing"
"time"
......@@ -33,12 +32,18 @@ func TestGlobalParamTable(t *testing.T) {
var GlobalParams GlobalParamTable
GlobalParams.Init()
t.Run("test pulsarConfig", func(t *testing.T) {
Params := GlobalParams.PulsarCfg
assert.NotEqual(t, Params.Address, "")
t.Logf("pulsar address = %s", Params.Address)
assert.Equal(t, Params.MaxMessageSize, SuggestPulsarMaxMessageSize)
})
t.Run("test rootCoordConfig", func(t *testing.T) {
Params := GlobalParams.RootCoordCfg
assert.NotEqual(t, Params.PulsarAddress, "")
t.Logf("pulsar address = %s", Params.PulsarAddress)
assert.Equal(t, Params.MsgChannelSubName, "by-dev-rootCoord")
t.Logf("msg channel sub name = %s", Params.MsgChannelSubName)
......@@ -75,8 +80,6 @@ func TestGlobalParamTable(t *testing.T) {
t.Run("test proxyConfig", func(t *testing.T) {
Params := GlobalParams.ProxyCfg
t.Logf("PulsarAddress: %s", Params.PulsarAddress)
t.Logf("RocksmqPath: %s", Params.RocksmqPath)
t.Logf("TimeTickInterval: %v", Params.TimeTickInterval)
......@@ -101,8 +104,6 @@ func TestGlobalParamTable(t *testing.T) {
t.Logf("DefaultIndexName: %s", Params.DefaultIndexName)
t.Logf("PulsarMaxMessageSize: %d", Params.PulsarMaxMessageSize)
//t.Logf("RoleName: %s", typeutil.ProxyRole)
t.Logf("MaxTaskNum: %d", Params.MaxTaskNum)
......@@ -166,11 +167,6 @@ func TestGlobalParamTable(t *testing.T) {
t.Run("test queryNodeConfig", func(t *testing.T) {
Params := GlobalParams.QueryNodeCfg
address := Params.PulsarAddress
split := strings.Split(address, ":")
assert.Equal(t, "pulsar", split[0])
assert.Equal(t, "6650", split[len(split)-1])
cacheSize := Params.CacheSize
assert.Equal(t, int64(32), cacheSize)
err := os.Setenv("CACHE_SIZE", "2")
......@@ -265,9 +261,6 @@ func TestGlobalParamTable(t *testing.T) {
path1 := Params.InsertBinlogRootPath
log.Println("InsertBinlogRootPath:", path1)
address := Params.PulsarAddress
log.Println("PulsarAddress:", address)
path1 = Params.ClusterChannelPrefix
assert.Equal(t, path1, "by-dev")
log.Println("ClusterChannelPrefix:", Params.ClusterChannelPrefix)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册