未验证 提交 4bccc855 编写于 作者: C Cai Yudong 提交者: GitHub

Use MsgChannelConfig in GlobalParams for all components (#15163)

Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>
上级 d7a961c7
......@@ -214,7 +214,6 @@ msgChannel:
rootCoordDelta: "rootcoord-delta"
search: "search"
searchResult: "searchResult"
proxyTimeTick: "proxyTimeTick"
queryTimeTick: "queryTimeTick"
queryNodeStats: "query-node-stats"
# Cmd for loadIndex, flush, etc...
......
......@@ -279,7 +279,7 @@ func (c *ChannelManager) tryToUnsubscribe(nodeChannelInfo *NodeChannelInfo) {
}
func subscriptionGenerator(collectionID int64, nodeID int64) string {
return fmt.Sprintf("%s-%s-%d-%d", Params.DataNodeCfg.ClusterChannelPrefix, Params.DataNodeCfg.SubscriptionNamePrefix, nodeID, collectionID)
return fmt.Sprintf("%s-%d-%d", Params.DataNodeCfg.DataNodeSubName, nodeID, collectionID)
}
func (c *ChannelManager) unsubscribe(subscriptionName string, channel string) error {
......
......@@ -447,11 +447,11 @@ func (s *Server) startDataNodeTtLoop(ctx context.Context) {
log.Error("DataCoord failed to create timetick channel", zap.Error(err))
return
}
ttMsgStream.AsConsumerWithPosition([]string{Params.DataCoordCfg.TimeTickChannelName},
Params.DataCoordCfg.DataCoordSubscriptionName, mqclient.SubscriptionPositionLatest)
ttMsgStream.AsConsumerWithPosition([]string{Params.MsgChannelCfg.DataCoordTimeTick},
Params.MsgChannelCfg.DataCoordSubName, mqclient.SubscriptionPositionLatest)
log.Debug("DataCoord creates the timetick channel consumer",
zap.String("timeTickChannel", Params.DataCoordCfg.TimeTickChannelName),
zap.String("subscription", Params.DataCoordCfg.DataCoordSubscriptionName))
zap.String("timeTickChannel", Params.MsgChannelCfg.DataCoordTimeTick),
zap.String("subscription", Params.MsgChannelCfg.DataCoordSubName))
ttMsgStream.Start()
go func() {
......
......@@ -58,7 +58,7 @@ func TestGetSegmentInfoChannel(t *testing.T) {
resp, err := svr.GetSegmentInfoChannel(context.TODO())
assert.Nil(t, err)
assert.EqualValues(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
assert.EqualValues(t, Params.DataCoordCfg.SegmentInfoChannelName, resp.Value)
assert.EqualValues(t, Params.MsgChannelCfg.DataCoordSegmentInfo, resp.Value)
})
}
......@@ -245,7 +245,7 @@ func TestGetTimeTickChannel(t *testing.T) {
resp, err := svr.GetTimeTickChannel(context.TODO())
assert.Nil(t, err)
assert.EqualValues(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
assert.EqualValues(t, Params.DataCoordCfg.TimeTickChannelName, resp.Value)
assert.EqualValues(t, Params.MsgChannelCfg.DataCoordTimeTick, resp.Value)
}
func TestGetSegmentStates(t *testing.T) {
......@@ -1062,7 +1062,7 @@ func TestDataNodeTtChannel(t *testing.T) {
ttMsgStream, err := svr.msFactory.NewMsgStream(context.TODO())
assert.Nil(t, err)
ttMsgStream.AsProducer([]string{Params.DataCoordCfg.TimeTickChannelName})
ttMsgStream.AsProducer([]string{Params.MsgChannelCfg.DataCoordTimeTick})
ttMsgStream.Start()
defer ttMsgStream.Close()
info := &NodeInfo{
......@@ -1130,7 +1130,7 @@ func TestDataNodeTtChannel(t *testing.T) {
})
ttMsgStream, err := svr.msFactory.NewMsgStream(context.TODO())
assert.Nil(t, err)
ttMsgStream.AsProducer([]string{Params.DataCoordCfg.TimeTickChannelName})
ttMsgStream.AsProducer([]string{Params.MsgChannelCfg.DataCoordTimeTick})
ttMsgStream.Start()
defer ttMsgStream.Close()
info := &NodeInfo{
......@@ -1212,7 +1212,7 @@ func TestDataNodeTtChannel(t *testing.T) {
ttMsgStream, err := svr.msFactory.NewMsgStream(context.TODO())
assert.Nil(t, err)
ttMsgStream.AsProducer([]string{Params.DataCoordCfg.TimeTickChannelName})
ttMsgStream.AsProducer([]string{Params.MsgChannelCfg.DataCoordTimeTick})
ttMsgStream.Start()
defer ttMsgStream.Close()
node := &NodeInfo{
......@@ -2232,7 +2232,7 @@ func TestGetFlushState(t *testing.T) {
func newTestServer(t *testing.T, receiveCh chan interface{}, opts ...Option) *Server {
Params.Init()
Params.DataCoordCfg.TimeTickChannelName = Params.DataCoordCfg.TimeTickChannelName + strconv.Itoa(rand.Int())
Params.MsgChannelCfg.DataCoordTimeTick = Params.MsgChannelCfg.DataCoordTimeTick + strconv.Itoa(rand.Int())
var err error
factory := msgstream.NewPmsFactory()
m := map[string]interface{}{
......
......@@ -48,7 +48,7 @@ func (s *Server) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringRespon
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
Value: Params.DataCoordCfg.TimeTickChannelName,
Value: Params.MsgChannelCfg.DataCoordTimeTick,
}, nil
}
......@@ -269,7 +269,7 @@ func (s *Server) GetSegmentInfoChannel(ctx context.Context) (*milvuspb.StringRes
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
Value: Params.DataCoordCfg.SegmentInfoChannelName,
Value: Params.MsgChannelCfg.DataCoordSegmentInfo,
}, nil
}
......
......@@ -207,7 +207,7 @@ func (node *DataNode) initSession() error {
// Init function does nothing now.
func (node *DataNode) Init() error {
log.Debug("DataNode Init",
zap.String("TimeTickChannelName", Params.DataNodeCfg.TimeTickChannelName),
zap.String("TimeTickChannelName", Params.MsgChannelCfg.DataCoordTimeTick),
)
if err := node.initSession(); err != nil {
log.Error("DataNode init session failed", zap.Error(err))
......@@ -227,7 +227,7 @@ func (node *DataNode) Init() error {
return err
}
log.Debug("DataNode Init",
zap.String("MsgChannelSubName", Params.DataNodeCfg.MsgChannelSubName))
zap.String("MsgChannelSubName", Params.DataNodeCfg.DataNodeSubName))
return nil
}
......
......@@ -54,7 +54,7 @@ func TestMain(t *testing.M) {
Params.DataNodeCfg.InitAlias("datanode-alias-1")
Params.Init()
// change to specific channel for test
Params.DataNodeCfg.TimeTickChannelName = Params.DataNodeCfg.TimeTickChannelName + strconv.Itoa(rand.Int())
Params.MsgChannelCfg.DataCoordTimeTick = Params.MsgChannelCfg.DataCoordTimeTick + strconv.Itoa(rand.Int())
code := t.Run()
os.Exit(code)
}
......
......@@ -284,7 +284,7 @@ func newDDNode(ctx context.Context, collID UniqueID, vchanInfo *datapb.VchannelI
return nil
}
pChannelName := rootcoord.ToPhysicalChannel(vchanInfo.ChannelName)
deltaChannelName, err := rootcoord.ConvertChannelName(pChannelName, Params.DataNodeCfg.DmlChannelName, Params.DataNodeCfg.DeltaChannelName)
deltaChannelName, err := rootcoord.ConvertChannelName(pChannelName, Params.MsgChannelCfg.RootCoordDml, Params.MsgChannelCfg.RootCoordDelta)
if err != nil {
log.Error(err.Error())
return nil
......
......@@ -34,7 +34,7 @@ import (
func newDmInputNode(ctx context.Context, seekPos *internalpb.MsgPosition, dmNodeConfig *nodeConfig) (*flowgraph.InputNode, error) {
// subName should be unique, since pchannelName is shared among several collections
// consumeSubName := Params.MsgChannelSubName + "-" + strconv.FormatInt(collID, 10)
consumeSubName := fmt.Sprintf("%s-%d", Params.DataNodeCfg.MsgChannelSubName, dmNodeConfig.collectionID)
consumeSubName := fmt.Sprintf("%s-%d", Params.DataNodeCfg.DataNodeSubName, dmNodeConfig.collectionID)
insertStream, err := dmNodeConfig.msFactory.NewTtMsgStream(ctx)
if err != nil {
return nil, err
......
......@@ -715,8 +715,8 @@ func newInsertBufferNode(ctx context.Context, flushCh <-chan flushMsg, fm flushM
if err != nil {
return nil, err
}
wTt.AsProducer([]string{Params.DataNodeCfg.TimeTickChannelName})
log.Debug("datanode AsProducer", zap.String("TimeTickChannelName", Params.DataNodeCfg.TimeTickChannelName))
wTt.AsProducer([]string{Params.MsgChannelCfg.DataCoordTimeTick})
log.Debug("datanode AsProducer", zap.String("TimeTickChannelName", Params.MsgChannelCfg.DataCoordTimeTick))
var wTtMsgStream msgstream.MsgStream = wTt
wTtMsgStream.Start()
......
......@@ -82,9 +82,9 @@ func TestGrpcService(t *testing.T) {
rootcoord.Params.Init()
rootcoord.Params.BaseParams.MetaRootPath = fmt.Sprintf("/%d/test/meta", randVal)
rootcoord.Params.BaseParams.KvRootPath = fmt.Sprintf("/%d/test/kv", randVal)
rootcoord.Params.RootCoordCfg.MsgChannelSubName = fmt.Sprintf("msgChannel%d", randVal)
rootcoord.Params.RootCoordCfg.TimeTickChannel = fmt.Sprintf("timeTick%d", randVal)
rootcoord.Params.RootCoordCfg.StatisticsChannel = fmt.Sprintf("stateChannel%d", randVal)
rootcoord.Params.MsgChannelCfg.RootCoordSubName = fmt.Sprintf("msgChannel%d", randVal)
rootcoord.Params.MsgChannelCfg.RootCoordTimeTick = fmt.Sprintf("timeTick%d", randVal)
rootcoord.Params.MsgChannelCfg.RootCoordStatistics = fmt.Sprintf("stateChannel%d", randVal)
rootcoord.Params.RootCoordCfg.MaxPartitionNum = 64
rootcoord.Params.CommonCfg.DefaultPartitionName = "_default"
......
......@@ -132,7 +132,7 @@ func (csh *channelUnsubscribeHandler) handleChannelUnsubscribeLoop() {
nodeID := channelInfo.NodeID
for _, collectionChannels := range channelInfo.CollectionChannels {
collectionID := collectionChannels.CollectionID
subName := funcutil.GenChannelSubName(Params.QueryNodeCfg.MsgChannelSubName, collectionID, nodeID)
subName := funcutil.GenChannelSubName(Params.QueryNodeCfg.QueryNodeSubName, collectionID, nodeID)
err := unsubscribeChannels(csh.ctx, csh.factory, subName, collectionChannels.Channels)
if err != nil {
log.Debug("unsubscribe channels failed", zap.Int64("nodeID", nodeID))
......
......@@ -71,7 +71,7 @@ func (qc *QueryCoord) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringR
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
},
Value: Params.QueryCoordCfg.TimeTickChannelName,
Value: Params.MsgChannelCfg.QueryCoordTimeTick,
}, nil
}
......@@ -83,7 +83,7 @@ func (qc *QueryCoord) GetStatisticsChannel(ctx context.Context) (*milvuspb.Strin
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
},
Value: Params.QueryCoordCfg.StatsChannelName,
Value: Params.MsgChannelCfg.QueryNodeStats,
}, nil
}
......
......@@ -837,8 +837,8 @@ func (m *MetaReplica) createQueryChannel(collectionID UniqueID) *querypb.QueryCh
// all collection use the same query channel
colIDForAssignChannel := UniqueID(0)
searchPrefix := Params.QueryCoordCfg.SearchChannelPrefix
searchResultPrefix := Params.QueryCoordCfg.SearchResultChannelPrefix
searchPrefix := Params.MsgChannelCfg.QueryCoordSearch
searchResultPrefix := Params.MsgChannelCfg.QueryCoordSearchResult
allocatedQueryChannel := searchPrefix + "-" + strconv.FormatInt(colIDForAssignChannel, 10)
allocatedQueryResultChannel := searchResultPrefix + "-" + strconv.FormatInt(colIDForAssignChannel, 10)
log.Debug("query coordinator create query channel", zap.String("queryChannelName", allocatedQueryChannel), zap.String("queryResultChannelName", allocatedQueryResultChannel))
......
......@@ -58,8 +58,8 @@ func getSystemInfoMetrics(
ID: qc.session.ServerID,
},
SystemConfigurations: metricsinfo.QueryCoordConfiguration{
SearchChannelPrefix: Params.QueryCoordCfg.SearchChannelPrefix,
SearchResultChannelPrefix: Params.QueryCoordCfg.SearchResultChannelPrefix,
SearchChannelPrefix: Params.MsgChannelCfg.QueryCoordSearch,
SearchResultChannelPrefix: Params.MsgChannelCfg.QueryCoordSearchResult,
},
},
ConnectedNodes: make([]metricsinfo.QueryNodeInfos, 0),
......
......@@ -273,8 +273,8 @@ func NewQueryCoord(ctx context.Context, factory msgstream.Factory) (*QueryCoord,
rand.Seed(time.Now().UnixNano())
queryChannels := make([]*queryChannelInfo, 0)
channelID := len(queryChannels)
searchPrefix := Params.QueryCoordCfg.SearchChannelPrefix
searchResultPrefix := Params.QueryCoordCfg.SearchResultChannelPrefix
searchPrefix := Params.MsgChannelCfg.QueryCoordSearch
searchResultPrefix := Params.MsgChannelCfg.QueryCoordSearchResult
allocatedQueryChannel := searchPrefix + "-" + strconv.FormatInt(int64(channelID), 10)
allocatedQueryResultChannel := searchResultPrefix + "-" + strconv.FormatInt(int64(channelID), 10)
......
......@@ -46,11 +46,11 @@ func setup() {
func refreshParams() {
rand.Seed(time.Now().UnixNano())
suffix := "-test-query-Coord" + strconv.FormatInt(rand.Int63(), 10)
Params.QueryCoordCfg.StatsChannelName = Params.QueryCoordCfg.StatsChannelName + suffix
Params.QueryCoordCfg.TimeTickChannelName = Params.QueryCoordCfg.TimeTickChannelName + suffix
Params.MsgChannelCfg.QueryNodeStats = Params.MsgChannelCfg.QueryNodeStats + suffix
Params.MsgChannelCfg.QueryCoordTimeTick = Params.MsgChannelCfg.QueryCoordTimeTick + suffix
Params.BaseParams.MetaRootPath = Params.BaseParams.MetaRootPath + suffix
Params.QueryCoordCfg.DmlChannelPrefix = "Dml"
Params.QueryCoordCfg.DeltaChannelPrefix = "delta"
Params.MsgChannelCfg.RootCoordDml = "Dml"
Params.MsgChannelCfg.RootCoordDelta = "delta"
GlobalSegmentInfos = make(map[UniqueID]*querypb.SegmentInfo)
}
......
......@@ -2081,7 +2081,7 @@ func getSizeOfLoadSegmentReq(req *querypb.LoadSegmentsRequest) int {
}
func generateWatchDeltaChannelInfo(info *datapb.VchannelInfo) (*datapb.VchannelInfo, error) {
deltaChannelName, err := rootcoord.ConvertChannelName(info.ChannelName, Params.QueryCoordCfg.DmlChannelPrefix, Params.QueryCoordCfg.DeltaChannelPrefix)
deltaChannelName, err := rootcoord.ConvertChannelName(info.ChannelName, Params.MsgChannelCfg.RootCoordDml, Params.MsgChannelCfg.RootCoordDelta)
if err != nil {
return nil, err
}
......
......@@ -71,7 +71,7 @@ func (node *QueryNode) GetTimeTickChannel(ctx context.Context) (*milvuspb.String
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
},
Value: Params.QueryNodeCfg.QueryTimeTickChannelName,
Value: Params.MsgChannelCfg.QueryCoordTimeTick,
}, nil
}
......@@ -83,7 +83,7 @@ func (node *QueryNode) GetStatisticsChannel(ctx context.Context) (*milvuspb.Stri
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
},
Value: Params.QueryNodeCfg.StatsChannelName,
Value: Params.MsgChannelCfg.QueryNodeStats,
}, nil
}
......
......@@ -234,7 +234,7 @@ func newMessageStreamFactory() (msgstream.Factory, error) {
func TestMain(m *testing.M) {
setup()
Params.QueryNodeCfg.StatsChannelName = Params.QueryNodeCfg.StatsChannelName + strconv.Itoa(rand.Int())
Params.MsgChannelCfg.QueryNodeStats = Params.MsgChannelCfg.QueryNodeStats + strconv.Itoa(rand.Int())
exitCode := m.Run()
os.Exit(exitCode)
}
......
......@@ -55,7 +55,7 @@ func (sService *statsService) start() {
sleepTimeInterval := Params.QueryNodeCfg.StatsPublishInterval
// start pulsar
producerChannels := []string{Params.QueryNodeCfg.StatsChannelName}
producerChannels := []string{Params.MsgChannelCfg.QueryNodeStats}
statsStream, _ := sService.msFactory.NewMsgStream(sService.ctx)
statsStream.AsProducer(producerChannels)
......
......@@ -50,7 +50,7 @@ func TestSegmentManagement_sendSegmentStatistic(t *testing.T) {
const receiveBufSize = 1024
// start pulsar
producerChannels := []string{Params.QueryNodeCfg.StatsChannelName}
producerChannels := []string{Params.MsgChannelCfg.QueryNodeStats}
msFactory := msgstream.NewPmsFactory()
m := map[string]interface{}{
......
......@@ -157,7 +157,7 @@ func (r *addQueryChannelTask) Execute(ctx context.Context) error {
return err
}
consumeChannels := []string{r.req.QueryChannel}
consumeSubName := funcutil.GenChannelSubName(Params.QueryNodeCfg.MsgChannelSubName, collectionID, Params.QueryNodeCfg.QueryNodeID)
consumeSubName := funcutil.GenChannelSubName(Params.QueryNodeCfg.QueryNodeSubName, collectionID, Params.QueryNodeCfg.QueryNodeID)
sc.queryMsgStream.AsConsumer(consumeChannels, consumeSubName)
if r.req.SeekPosition == nil || len(r.req.SeekPosition.MsgID) == 0 {
......@@ -301,7 +301,7 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error {
}
}()
consumeSubName := funcutil.GenChannelSubName(Params.QueryNodeCfg.MsgChannelSubName, collectionID, Params.QueryNodeCfg.QueryNodeID)
consumeSubName := funcutil.GenChannelSubName(Params.QueryNodeCfg.QueryNodeSubName, collectionID, Params.QueryNodeCfg.QueryNodeID)
// group channels by to seeking or consuming
channel2SeekPosition := make(map[string]*internalpb.MsgPosition)
......@@ -525,7 +525,7 @@ func (w *watchDeltaChannelsTask) Execute(ctx context.Context) error {
}
channel2FlowGraph := w.node.dataSyncService.addFlowGraphsForDeltaChannels(collectionID, vDeltaChannels)
consumeSubName := funcutil.GenChannelSubName(Params.QueryNodeCfg.MsgChannelSubName, collectionID, Params.QueryNodeCfg.QueryNodeID)
consumeSubName := funcutil.GenChannelSubName(Params.QueryNodeCfg.QueryNodeSubName, collectionID, Params.QueryNodeCfg.QueryNodeID)
// channels as consumer
for _, channel := range vDeltaChannels {
fg := channel2FlowGraph[channel]
......
......@@ -435,17 +435,17 @@ func (c *Core) setMsgStreams() error {
if Params.PulsarCfg.Address == "" {
return fmt.Errorf("pulsar address is empty")
}
if Params.RootCoordCfg.MsgChannelSubName == "" {
return fmt.Errorf("msgChannelSubName is empty")
if Params.MsgChannelCfg.RootCoordSubName == "" {
return fmt.Errorf("RootCoordSubName is empty")
}
// rootcoord time tick channel
if Params.RootCoordCfg.TimeTickChannel == "" {
if Params.MsgChannelCfg.RootCoordTimeTick == "" {
return fmt.Errorf("timeTickChannel is empty")
}
timeTickStream, _ := c.msFactory.NewMsgStream(c.ctx)
timeTickStream.AsProducer([]string{Params.RootCoordCfg.TimeTickChannel})
log.Debug("RootCoord register timetick producer success", zap.String("channel name", Params.RootCoordCfg.TimeTickChannel))
timeTickStream.AsProducer([]string{Params.MsgChannelCfg.RootCoordTimeTick})
log.Debug("RootCoord register timetick producer success", zap.String("channel name", Params.MsgChannelCfg.RootCoordTimeTick))
c.SendTimeTick = func(t typeutil.Timestamp, reason string) error {
msgPack := ms.MsgPack{}
......@@ -1177,7 +1177,7 @@ func (c *Core) Start() error {
}
log.Debug(typeutil.RootCoordRole, zap.Int64("node id", c.session.ServerID))
log.Debug(typeutil.RootCoordRole, zap.String("time tick channel name", Params.RootCoordCfg.TimeTickChannel))
log.Debug(typeutil.RootCoordRole, zap.String("time tick channel name", Params.MsgChannelCfg.RootCoordTimeTick))
c.startOnce.Do(func() {
if err := c.proxyManager.WatchProxy(); err != nil {
......@@ -1252,7 +1252,7 @@ func (c *Core) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
},
Value: Params.RootCoordCfg.TimeTickChannel,
Value: Params.MsgChannelCfg.RootCoordTimeTick,
}, nil
}
......@@ -1263,7 +1263,7 @@ func (c *Core) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringRespon
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
},
Value: Params.RootCoordCfg.StatisticsChannel,
Value: Params.MsgChannelCfg.RootCoordStatistics,
}, nil
}
......
......@@ -559,13 +559,13 @@ func TestRootCoord(t *testing.T) {
core, err := NewCore(ctx, coreFactory)
assert.Nil(t, err)
randVal := rand.Int()
Params.RootCoordCfg.TimeTickChannel = fmt.Sprintf("rootcoord-time-tick-%d", randVal)
Params.RootCoordCfg.StatisticsChannel = fmt.Sprintf("rootcoord-statistics-%d", randVal)
Params.MsgChannelCfg.RootCoordTimeTick = fmt.Sprintf("rootcoord-time-tick-%d", randVal)
Params.MsgChannelCfg.RootCoordStatistics = fmt.Sprintf("rootcoord-statistics-%d", randVal)
Params.BaseParams.MetaRootPath = fmt.Sprintf("/%d/%s", randVal, Params.BaseParams.MetaRootPath)
Params.BaseParams.KvRootPath = fmt.Sprintf("/%d/%s", randVal, Params.BaseParams.KvRootPath)
Params.RootCoordCfg.MsgChannelSubName = fmt.Sprintf("subname-%d", randVal)
Params.RootCoordCfg.DmlChannelName = fmt.Sprintf("rootcoord-dml-test-%d", randVal)
Params.RootCoordCfg.DeltaChannelName = fmt.Sprintf("rootcoord-delta-test-%d", randVal)
Params.MsgChannelCfg.RootCoordSubName = fmt.Sprintf("subname-%d", randVal)
Params.MsgChannelCfg.RootCoordDml = fmt.Sprintf("rootcoord-dml-test-%d", randVal)
Params.MsgChannelCfg.RootCoordDelta = fmt.Sprintf("rootcoord-delta-test-%d", randVal)
etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams)
assert.NoError(t, err)
......@@ -626,7 +626,7 @@ func TestRootCoord(t *testing.T) {
assert.Nil(t, err)
timeTickStream, _ := tmpFactory.NewMsgStream(ctx)
timeTickStream.AsConsumer([]string{Params.RootCoordCfg.TimeTickChannel}, Params.RootCoordCfg.MsgChannelSubName)
timeTickStream.AsConsumer([]string{Params.MsgChannelCfg.RootCoordTimeTick}, Params.MsgChannelCfg.RootCoordSubName)
timeTickStream.Start()
dmlStream, _ := tmpFactory.NewMsgStream(ctx)
......@@ -723,7 +723,7 @@ func TestRootCoord(t *testing.T) {
createMeta, err := core.MetaTable.GetCollectionByName(collName, 0)
assert.Nil(t, err)
dmlStream.AsConsumer([]string{createMeta.PhysicalChannelNames[0]}, Params.RootCoordCfg.MsgChannelSubName)
dmlStream.AsConsumer([]string{createMeta.PhysicalChannelNames[0]}, Params.MsgChannelCfg.RootCoordSubName)
dmlStream.Start()
pChanMap := core.MetaTable.ListCollectionPhysicalChannels()
......@@ -2311,11 +2311,11 @@ func TestRootCoord2(t *testing.T) {
randVal := rand.Int()
Params.RootCoordCfg.TimeTickChannel = fmt.Sprintf("rootcoord-time-tick-%d", randVal)
Params.RootCoordCfg.StatisticsChannel = fmt.Sprintf("rootcoord-statistics-%d", randVal)
Params.MsgChannelCfg.RootCoordTimeTick = fmt.Sprintf("rootcoord-time-tick-%d", randVal)
Params.MsgChannelCfg.RootCoordStatistics = fmt.Sprintf("rootcoord-statistics-%d", randVal)
Params.BaseParams.MetaRootPath = fmt.Sprintf("/%d/%s", randVal, Params.BaseParams.MetaRootPath)
Params.BaseParams.KvRootPath = fmt.Sprintf("/%d/%s", randVal, Params.BaseParams.KvRootPath)
Params.RootCoordCfg.MsgChannelSubName = fmt.Sprintf("subname-%d", randVal)
Params.MsgChannelCfg.RootCoordSubName = fmt.Sprintf("subname-%d", randVal)
dm := &dataMock{randVal: randVal}
err = core.SetDataCoord(ctx, dm)
......@@ -2361,7 +2361,7 @@ func TestRootCoord2(t *testing.T) {
assert.Nil(t, err)
timeTickStream, _ := msFactory.NewMsgStream(ctx)
timeTickStream.AsConsumer([]string{Params.RootCoordCfg.TimeTickChannel}, Params.RootCoordCfg.MsgChannelSubName)
timeTickStream.AsConsumer([]string{Params.MsgChannelCfg.RootCoordTimeTick}, Params.MsgChannelCfg.RootCoordSubName)
timeTickStream.Start()
time.Sleep(100 * time.Millisecond)
......@@ -2404,7 +2404,7 @@ func TestRootCoord2(t *testing.T) {
collInfo, err := core.MetaTable.GetCollectionByName(collName, 0)
assert.Nil(t, err)
dmlStream, _ := msFactory.NewMsgStream(ctx)
dmlStream.AsConsumer([]string{collInfo.PhysicalChannelNames[0]}, Params.RootCoordCfg.MsgChannelSubName)
dmlStream.AsConsumer([]string{collInfo.PhysicalChannelNames[0]}, Params.MsgChannelCfg.RootCoordSubName)
dmlStream.Start()
msgs := getNotTtMsg(ctx, 1, dmlStream.Chan())
......@@ -2589,11 +2589,11 @@ func TestCheckFlushedSegments(t *testing.T) {
assert.Nil(t, err)
randVal := rand.Int()
Params.RootCoordCfg.TimeTickChannel = fmt.Sprintf("rootcoord-time-tick-%d", randVal)
Params.RootCoordCfg.StatisticsChannel = fmt.Sprintf("rootcoord-statistics-%d", randVal)
Params.MsgChannelCfg.RootCoordTimeTick = fmt.Sprintf("rootcoord-time-tick-%d", randVal)
Params.MsgChannelCfg.RootCoordStatistics = fmt.Sprintf("rootcoord-statistics-%d", randVal)
Params.BaseParams.MetaRootPath = fmt.Sprintf("/%d/%s", randVal, Params.BaseParams.MetaRootPath)
Params.BaseParams.KvRootPath = fmt.Sprintf("/%d/%s", randVal, Params.BaseParams.KvRootPath)
Params.RootCoordCfg.MsgChannelSubName = fmt.Sprintf("subname-%d", randVal)
Params.MsgChannelCfg.RootCoordSubName = fmt.Sprintf("subname-%d", randVal)
dm := &dataMock{randVal: randVal}
err = core.SetDataCoord(ctx, dm)
......@@ -2642,7 +2642,7 @@ func TestCheckFlushedSegments(t *testing.T) {
assert.Nil(t, err)
timeTickStream, _ := msFactory.NewMsgStream(ctx)
timeTickStream.AsConsumer([]string{Params.RootCoordCfg.TimeTickChannel}, Params.RootCoordCfg.MsgChannelSubName)
timeTickStream.AsConsumer([]string{Params.MsgChannelCfg.RootCoordTimeTick}, Params.MsgChannelCfg.RootCoordSubName)
timeTickStream.Start()
time.Sleep(100 * time.Millisecond)
......@@ -2755,11 +2755,11 @@ func TestRootCoord_CheckZeroShardsNum(t *testing.T) {
core, err := NewCore(ctx, msFactory)
assert.Nil(t, err)
randVal := rand.Int()
Params.RootCoordCfg.TimeTickChannel = fmt.Sprintf("rootcoord-time-tick-%d", randVal)
Params.RootCoordCfg.StatisticsChannel = fmt.Sprintf("rootcoord-statistics-%d", randVal)
Params.MsgChannelCfg.RootCoordTimeTick = fmt.Sprintf("rootcoord-time-tick-%d", randVal)
Params.MsgChannelCfg.RootCoordStatistics = fmt.Sprintf("rootcoord-statistics-%d", randVal)
Params.BaseParams.MetaRootPath = fmt.Sprintf("/%d/%s", randVal, Params.BaseParams.MetaRootPath)
Params.BaseParams.KvRootPath = fmt.Sprintf("/%d/%s", randVal, Params.BaseParams.KvRootPath)
Params.RootCoordCfg.MsgChannelSubName = fmt.Sprintf("subname-%d", randVal)
Params.MsgChannelCfg.RootCoordSubName = fmt.Sprintf("subname-%d", randVal)
dm := &dataMock{randVal: randVal}
err = core.SetDataCoord(ctx, dm)
......@@ -2809,7 +2809,7 @@ func TestRootCoord_CheckZeroShardsNum(t *testing.T) {
assert.Nil(t, err)
timeTickStream, _ := msFactory.NewMsgStream(ctx)
timeTickStream.AsConsumer([]string{Params.RootCoordCfg.TimeTickChannel}, Params.RootCoordCfg.MsgChannelSubName)
timeTickStream.AsConsumer([]string{Params.MsgChannelCfg.RootCoordTimeTick}, Params.MsgChannelCfg.RootCoordSubName)
timeTickStream.Start()
time.Sleep(100 * time.Millisecond)
......
......@@ -147,7 +147,7 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error {
chanNames[i] = ToPhysicalChannel(vchanNames[i])
deltaChanNames[i] = t.core.chanTimeTick.getDeltaChannelName()
deltaChanName, err1 := ConvertChannelName(chanNames[i], Params.RootCoordCfg.DmlChannelName, Params.RootCoordCfg.DeltaChannelName)
deltaChanName, err1 := ConvertChannelName(chanNames[i], Params.MsgChannelCfg.RootCoordDml, Params.MsgChannelCfg.RootCoordDelta)
if err1 != nil || deltaChanName != deltaChanNames[i] {
return fmt.Errorf("dmlChanName %s and deltaChanName %s mis-match", chanNames[i], deltaChanNames[i])
}
......@@ -361,7 +361,7 @@ func (t *DropCollectionReqTask) Execute(ctx context.Context) error {
// remove delta channels
deltaChanNames := make([]string, len(collMeta.PhysicalChannelNames))
for i, chanName := range collMeta.PhysicalChannelNames {
if deltaChanNames[i], err = ConvertChannelName(chanName, Params.RootCoordCfg.DmlChannelName, Params.RootCoordCfg.DeltaChannelName); err != nil {
if deltaChanNames[i], err = ConvertChannelName(chanName, Params.MsgChannelCfg.RootCoordDml, Params.MsgChannelCfg.RootCoordDelta); err != nil {
return err
}
}
......
......@@ -88,11 +88,11 @@ func BenchmarkAllocTimestamp(b *testing.B) {
randVal := rand.Int()
Params.RootCoordCfg.TimeTickChannel = fmt.Sprintf("master-time-tick-%d", randVal)
Params.RootCoordCfg.StatisticsChannel = fmt.Sprintf("master-statistics-%d", randVal)
Params.MsgChannelCfg.RootCoordTimeTick = fmt.Sprintf("master-time-tick-%d", randVal)
Params.MsgChannelCfg.RootCoordStatistics = fmt.Sprintf("master-statistics-%d", randVal)
Params.BaseParams.MetaRootPath = fmt.Sprintf("/%d/%s", randVal, Params.BaseParams.MetaRootPath)
Params.BaseParams.KvRootPath = fmt.Sprintf("/%d/%s", randVal, Params.BaseParams.KvRootPath)
Params.RootCoordCfg.MsgChannelSubName = fmt.Sprintf("subname-%d", randVal)
Params.MsgChannelCfg.RootCoordSubName = fmt.Sprintf("subname-%d", randVal)
err = core.SetDataCoord(ctx, &tbd{})
assert.Nil(b, err)
......
......@@ -87,9 +87,9 @@ func (c *chanTsMsg) getTimetick(channelName string) typeutil.Timestamp {
func newTimeTickSync(ctx context.Context, sourceID int64, factory msgstream.Factory, chanMap map[typeutil.UniqueID][]string) *timetickSync {
// initialize dml channels used for insert
dmlChannels := newDmlChannels(ctx, factory, Params.RootCoordCfg.DmlChannelName, Params.RootCoordCfg.DmlChannelNum)
dmlChannels := newDmlChannels(ctx, factory, Params.MsgChannelCfg.RootCoordDml, Params.RootCoordCfg.DmlChannelNum)
// initialize delta channels used for delete, share Params.DmlChannelNum with dmlChannels
deltaChannels := newDmlChannels(ctx, factory, Params.RootCoordCfg.DeltaChannelName, Params.RootCoordCfg.DmlChannelNum)
deltaChannels := newDmlChannels(ctx, factory, Params.MsgChannelCfg.RootCoordDelta, Params.RootCoordCfg.DmlChannelNum)
// recover physical channels for all collections
for collID, chanNames := range chanMap {
......@@ -99,7 +99,7 @@ func newTimeTickSync(ctx context.Context, sourceID int64, factory msgstream.Fact
var err error
deltaChanNames := make([]string, len(chanNames))
for i, chanName := range chanNames {
deltaChanNames[i], err = ConvertChannelName(chanName, Params.RootCoordCfg.DmlChannelName, Params.RootCoordCfg.DeltaChannelName)
deltaChanNames[i], err = ConvertChannelName(chanName, Params.MsgChannelCfg.RootCoordDml, Params.MsgChannelCfg.RootCoordDelta)
if err != nil {
log.Error("failed to convert dml channel name to delta channel name", zap.String("chanName", chanName))
panic("invalid dml channel name " + chanName)
......
......@@ -45,8 +45,8 @@ func TestTimetickSync(t *testing.T) {
//}
Params.RootCoordCfg.DmlChannelNum = 2
Params.RootCoordCfg.DmlChannelName = "rootcoord-dml"
Params.RootCoordCfg.DeltaChannelName = "rootcoord-delta"
Params.MsgChannelCfg.RootCoordDml = "rootcoord-dml"
Params.MsgChannelCfg.RootCoordDelta = "rootcoord-delta"
ttSync := newTimeTickSync(ctx, sourceID, factory, nil)
var wg sync.WaitGroup
......
......@@ -86,23 +86,55 @@ func TestGlobalParamTable(t *testing.T) {
t.Logf("knowhere simd type = %s", Params.SimdType)
})
t.Run("test rootCoordConfig", func(t *testing.T) {
Params := GlobalParams.RootCoordCfg
t.Run("test knowhereConfig", func(t *testing.T) {
Params := GlobalParams.MsgChannelCfg
// -- rootcoord --
assert.Equal(t, Params.RootCoordTimeTick, "by-dev-rootcoord-timetick")
t.Logf("rootcoord timetick channel = %s", Params.RootCoordTimeTick)
assert.Equal(t, Params.RootCoordStatistics, "by-dev-rootcoord-statistics")
t.Logf("rootcoord statistics channel = %s", Params.RootCoordStatistics)
assert.Equal(t, Params.RootCoordDml, "by-dev-rootcoord-dml")
t.Logf("rootcoord dml channel = %s", Params.RootCoordDml)
assert.Equal(t, Params.RootCoordDelta, "by-dev-rootcoord-delta")
t.Logf("rootcoord delta channel = %s", Params.RootCoordDelta)
assert.Equal(t, Params.MsgChannelSubName, "by-dev-rootCoord")
t.Logf("msg channel sub name = %s", Params.MsgChannelSubName)
assert.Equal(t, Params.RootCoordSubName, "by-dev-rootCoord")
t.Logf("rootcoord subname = %s", Params.RootCoordSubName)
assert.Equal(t, Params.TimeTickChannel, "by-dev-rootcoord-timetick")
t.Logf("master time tick channel = %s", Params.TimeTickChannel)
// -- querycoord --
assert.Equal(t, Params.QueryCoordSearch, "by-dev-search")
t.Logf("querycoord search channel = %s", Params.QueryCoordSearch)
assert.Equal(t, Params.StatisticsChannel, "by-dev-rootcoord-statistics")
t.Logf("master statistics channel = %s", Params.StatisticsChannel)
assert.Equal(t, Params.QueryCoordSearchResult, "by-dev-searchResult")
t.Logf("querycoord search result channel = %s", Params.QueryCoordSearchResult)
assert.Equal(t, Params.DmlChannelName, "by-dev-rootcoord-dml")
t.Logf("dml channel = %s", Params.DmlChannelName)
assert.Equal(t, Params.QueryCoordTimeTick, "by-dev-queryTimeTick")
t.Logf("querycoord timetick channel = %s", Params.QueryCoordTimeTick)
assert.Equal(t, Params.DeltaChannelName, "by-dev-rootcoord-delta")
t.Logf("delta channel = %s", Params.DeltaChannelName)
// -- querynode --
assert.Equal(t, Params.QueryNodeStats, "by-dev-query-node-stats")
t.Logf("querynode stats channel = %s", Params.QueryNodeStats)
// -- datacoord --
assert.Equal(t, Params.DataCoordInsert, "by-dev-insert-channel-")
t.Logf("datacoord insert channel = %s", Params.DataCoordInsert)
assert.Equal(t, Params.DataCoordTimeTick, "by-dev-datacoord-timetick-channel")
t.Logf("datacoord timetick channel = %s", Params.DataCoordTimeTick)
assert.Equal(t, Params.DataCoordSegmentInfo, "by-dev-segment-info-channel")
t.Logf("datacoord segment info channel = %s", Params.DataCoordSegmentInfo)
assert.Equal(t, Params.DataCoordSubName, "by-dev-dataCoord")
t.Logf("datacoord subname = %s", Params.DataCoordSubName)
})
t.Run("test rootCoordConfig", func(t *testing.T) {
Params := GlobalParams.RootCoordCfg
assert.NotEqual(t, Params.MaxPartitionNum, 0)
t.Logf("master MaxPartitionNum = %d", Params.MaxPartitionNum)
......@@ -124,9 +156,6 @@ func TestGlobalParamTable(t *testing.T) {
assert.Equal(t, Params.ProxySubName, "by-dev-proxy-0")
t.Logf("ProxySubName: %s", Params.ProxySubName)
assert.Equal(t, Params.ProxyTimeTickChannelNames, []string{"by-dev-proxyTimeTick-0"})
t.Logf("ProxyTimeTickChannelNames: %v", Params.ProxyTimeTickChannelNames)
t.Logf("MsgStreamTimeTickBufSize: %d", Params.MsgStreamTimeTickBufSize)
t.Logf("MaxNameLength: %d", Params.MaxNameLength)
......@@ -180,19 +209,7 @@ func TestGlobalParamTable(t *testing.T) {
})
t.Run("test queryCoordConfig", func(t *testing.T) {
Params := GlobalParams.QueryCoordCfg
assert.Equal(t, Params.SearchChannelPrefix, "by-dev-search")
t.Logf("QueryCoord search channel = %s", Params.SearchChannelPrefix)
assert.Equal(t, Params.SearchResultChannelPrefix, "by-dev-searchResult")
t.Logf("QueryCoord search result channel = %s", Params.SearchResultChannelPrefix)
assert.Equal(t, Params.StatsChannelName, "by-dev-query-node-stats")
t.Logf("QueryCoord stats channel = %s", Params.StatsChannelName)
assert.Equal(t, Params.TimeTickChannelName, "by-dev-queryTimeTick")
t.Logf("QueryCoord time tick channel = %s", Params.TimeTickChannelName)
//Params := GlobalParams.QueryCoordCfg
})
t.Run("test queryNodeConfig", func(t *testing.T) {
......@@ -228,38 +245,20 @@ func TestGlobalParamTable(t *testing.T) {
assert.Equal(t, int32(1024), maxParallelism)
Params.QueryNodeID = 3
Params.initMsgChannelSubName()
name := Params.MsgChannelSubName
Params.initQueryNodeSubName()
name := Params.QueryNodeSubName
assert.Equal(t, name, "by-dev-queryNode")
name = Params.StatsChannelName
assert.Equal(t, name, "by-dev-query-node-stats")
ch := Params.QueryTimeTickChannelName
assert.Equal(t, ch, "by-dev-queryTimeTick")
})
t.Run("test dataCoordConfig", func(t *testing.T) {
Params := GlobalParams.DataCoordCfg
assert.Equal(t, Params.InsertChannelPrefixName, "by-dev-insert-channel-")
t.Logf("DataCoord insert channel = %s", Params.InsertChannelPrefixName)
assert.Equal(t, Params.TimeTickChannelName, "by-dev-datacoord-timetick-channel")
t.Logf("DataCoord timetick channel = %s", Params.TimeTickChannelName)
assert.Equal(t, Params.SegmentInfoChannelName, "by-dev-segment-info-channel")
t.Logf("DataCoord segment info channel = %s", Params.SegmentInfoChannelName)
assert.Equal(t, Params.DataCoordSubscriptionName, "by-dev-dataCoord")
t.Logf("DataCoord subscription channel = %s", Params.DataCoordSubscriptionName)
//Params := GlobalParams.DataCoordCfg
})
t.Run("test dataNodeConfig", func(t *testing.T) {
Params := GlobalParams.DataNodeCfg
Params.NodeID = 2
Params.initMsgChannelSubName()
Params.Refresh()
id := Params.NodeID
log.Println("NodeID:", id)
......@@ -279,17 +278,9 @@ func TestGlobalParamTable(t *testing.T) {
path1 := Params.InsertBinlogRootPath
log.Println("InsertBinlogRootPath:", path1)
path1 = Params.ClusterChannelPrefix
assert.Equal(t, path1, "by-dev")
log.Println("ClusterChannelPrefix:", Params.ClusterChannelPrefix)
name := Params.TimeTickChannelName
assert.Equal(t, name, "by-dev-datacoord-timetick-channel")
log.Println("TimeTickChannelName:", name)
name = Params.MsgChannelSubName
name := Params.DataNodeSubName
assert.Equal(t, name, "by-dev-dataNode-2")
log.Println("MsgChannelSubName:", name)
log.Println("DataNodeSubName:", name)
Params.CreatedTime = time.Now()
log.Println("CreatedTime: ", Params.CreatedTime)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册