未验证 提交 3e862ecd 编写于 作者: C Cai Yudong 提交者: GitHub

Merge msgChannelConfig and knowhereConfig into commonConfig (#15843)

Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>
上级 416ef3db
......@@ -215,7 +215,7 @@ log:
maxBackups: 20
format: text # text/json
msgChannel:
common:
# Channel name generation rule: ${namePrefix}-${ChannelIdx}
chanNamePrefix:
cluster: "by-dev"
......@@ -241,12 +241,10 @@ msgChannel:
dataNodeSubNamePrefix: "dataNode"
dataCoordSubNamePrefix: "dataCoord"
common:
defaultPartitionName: "_default" # default partition name for a collection
defaultIndexName: "_default_idx" # default index name
retentionDuration: 432000 # 5 days in seconds
knowhere:
# Default value: auto
# Valid values: [auto, avx512, avx2, avx, sse4_2]
# This configuration is only used by querynode and indexnode, it selects CPU instruction set for Searching and Index-building.
......
......@@ -289,7 +289,7 @@ func (c *ChannelManager) unsubAttempt(ncInfo *NodeChannelInfo) {
// buildSubName generates a subscription name by concatenating DataNodeSubName, node ID and collection ID.
func buildSubName(collectionID int64, nodeID int64) string {
return fmt.Sprintf("%s-%d-%d", Params.MsgChannelCfg.DataNodeSubName, nodeID, collectionID)
return fmt.Sprintf("%s-%d-%d", Params.CommonCfg.DataNodeSubName, nodeID, collectionID)
}
func (c *ChannelManager) unsubscribe(subName string, channel string) error {
......
......@@ -448,11 +448,11 @@ func (s *Server) startDataNodeTtLoop(ctx context.Context) {
log.Error("DataCoord failed to create timetick channel", zap.Error(err))
return
}
ttMsgStream.AsConsumerWithPosition([]string{Params.MsgChannelCfg.DataCoordTimeTick},
Params.MsgChannelCfg.DataCoordSubName, mqwrapper.SubscriptionPositionLatest)
ttMsgStream.AsConsumerWithPosition([]string{Params.CommonCfg.DataCoordTimeTick},
Params.CommonCfg.DataCoordSubName, mqwrapper.SubscriptionPositionLatest)
log.Info("DataCoord creates the timetick channel consumer",
zap.String("timeTickChannel", Params.MsgChannelCfg.DataCoordTimeTick),
zap.String("subscription", Params.MsgChannelCfg.DataCoordSubName))
zap.String("timeTickChannel", Params.CommonCfg.DataCoordTimeTick),
zap.String("subscription", Params.CommonCfg.DataCoordSubName))
ttMsgStream.Start()
go s.handleDataNodeTimetickMsgstream(ctx, ttMsgStream)
......
......@@ -61,7 +61,7 @@ func TestGetSegmentInfoChannel(t *testing.T) {
resp, err := svr.GetSegmentInfoChannel(context.TODO())
assert.Nil(t, err)
assert.EqualValues(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
assert.EqualValues(t, Params.MsgChannelCfg.DataCoordSegmentInfo, resp.Value)
assert.EqualValues(t, Params.CommonCfg.DataCoordSegmentInfo, resp.Value)
})
}
......@@ -248,7 +248,7 @@ func TestGetTimeTickChannel(t *testing.T) {
resp, err := svr.GetTimeTickChannel(context.TODO())
assert.Nil(t, err)
assert.EqualValues(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
assert.EqualValues(t, Params.MsgChannelCfg.DataCoordTimeTick, resp.Value)
assert.EqualValues(t, Params.CommonCfg.DataCoordTimeTick, resp.Value)
}
func TestGetSegmentStates(t *testing.T) {
......@@ -1080,7 +1080,7 @@ func TestDataNodeTtChannel(t *testing.T) {
ttMsgStream, err := svr.msFactory.NewMsgStream(context.TODO())
assert.Nil(t, err)
ttMsgStream.AsProducer([]string{Params.MsgChannelCfg.DataCoordTimeTick})
ttMsgStream.AsProducer([]string{Params.CommonCfg.DataCoordTimeTick})
ttMsgStream.Start()
defer ttMsgStream.Close()
info := &NodeInfo{
......@@ -1148,7 +1148,7 @@ func TestDataNodeTtChannel(t *testing.T) {
})
ttMsgStream, err := svr.msFactory.NewMsgStream(context.TODO())
assert.Nil(t, err)
ttMsgStream.AsProducer([]string{Params.MsgChannelCfg.DataCoordTimeTick})
ttMsgStream.AsProducer([]string{Params.CommonCfg.DataCoordTimeTick})
ttMsgStream.Start()
defer ttMsgStream.Close()
info := &NodeInfo{
......@@ -1230,7 +1230,7 @@ func TestDataNodeTtChannel(t *testing.T) {
ttMsgStream, err := svr.msFactory.NewMsgStream(context.TODO())
assert.Nil(t, err)
ttMsgStream.AsProducer([]string{Params.MsgChannelCfg.DataCoordTimeTick})
ttMsgStream.AsProducer([]string{Params.CommonCfg.DataCoordTimeTick})
ttMsgStream.Start()
defer ttMsgStream.Close()
node := &NodeInfo{
......@@ -2286,7 +2286,7 @@ func (ms *MockClosePanicMsgstream) Consume() *msgstream.MsgPack {
func newTestServer(t *testing.T, receiveCh chan interface{}, opts ...Option) *Server {
Params.Init()
Params.MsgChannelCfg.DataCoordTimeTick = Params.MsgChannelCfg.DataCoordTimeTick + strconv.Itoa(rand.Int())
Params.CommonCfg.DataCoordTimeTick = Params.CommonCfg.DataCoordTimeTick + strconv.Itoa(rand.Int())
var err error
factory := msgstream.NewPmsFactory()
m := map[string]interface{}{
......
......@@ -48,7 +48,7 @@ func (s *Server) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringRespon
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
Value: Params.MsgChannelCfg.DataCoordTimeTick,
Value: Params.CommonCfg.DataCoordTimeTick,
}, nil
}
......@@ -271,7 +271,7 @@ func (s *Server) GetSegmentInfoChannel(ctx context.Context) (*milvuspb.StringRes
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
Value: Params.MsgChannelCfg.DataCoordSegmentInfo,
Value: Params.CommonCfg.DataCoordSegmentInfo,
}, nil
}
......
......@@ -207,7 +207,7 @@ func (node *DataNode) initSession() error {
// Init function does nothing now.
func (node *DataNode) Init() error {
log.Info("DataNode Init",
zap.String("TimeTickChannelName", Params.MsgChannelCfg.DataCoordTimeTick),
zap.String("TimeTickChannelName", Params.CommonCfg.DataCoordTimeTick),
)
if err := node.initSession(); err != nil {
log.Error("DataNode init session failed", zap.Error(err))
......@@ -226,7 +226,7 @@ func (node *DataNode) Init() error {
return err
}
log.Info("DataNode Init successfully",
zap.String("MsgChannelSubName", Params.MsgChannelCfg.DataNodeSubName))
zap.String("MsgChannelSubName", Params.CommonCfg.DataNodeSubName))
return nil
}
......
......@@ -54,7 +54,7 @@ func TestMain(t *testing.M) {
Params.DataNodeCfg.InitAlias("datanode-alias-1")
Params.Init()
// change to specific channel for test
Params.MsgChannelCfg.DataCoordTimeTick = Params.MsgChannelCfg.DataCoordTimeTick + strconv.Itoa(rand.Int())
Params.CommonCfg.DataCoordTimeTick = Params.CommonCfg.DataCoordTimeTick + strconv.Itoa(rand.Int())
code := t.Run()
os.Exit(code)
}
......
......@@ -283,7 +283,7 @@ func newDDNode(ctx context.Context, collID UniqueID, vchanInfo *datapb.VchannelI
return nil
}
pChannelName := rootcoord.ToPhysicalChannel(vchanInfo.ChannelName)
deltaChannelName, err := rootcoord.ConvertChannelName(pChannelName, Params.MsgChannelCfg.RootCoordDml, Params.MsgChannelCfg.RootCoordDelta)
deltaChannelName, err := rootcoord.ConvertChannelName(pChannelName, Params.CommonCfg.RootCoordDml, Params.CommonCfg.RootCoordDelta)
if err != nil {
log.Error(err.Error())
return nil
......
......@@ -35,7 +35,7 @@ import (
func newDmInputNode(ctx context.Context, seekPos *internalpb.MsgPosition, dmNodeConfig *nodeConfig) (*flowgraph.InputNode, error) {
// subName should be unique, since pchannelName is shared among several collections
// consumeSubName := Params.MsgChannelSubName + "-" + strconv.FormatInt(collID, 10)
consumeSubName := fmt.Sprintf("%s-%d-%d", Params.MsgChannelCfg.DataNodeSubName, Params.DataNodeCfg.NodeID, dmNodeConfig.collectionID)
consumeSubName := fmt.Sprintf("%s-%d-%d", Params.CommonCfg.DataNodeSubName, Params.DataNodeCfg.NodeID, dmNodeConfig.collectionID)
insertStream, err := dmNodeConfig.msFactory.NewTtMsgStream(ctx)
if err != nil {
return nil, err
......
......@@ -722,9 +722,9 @@ func newInsertBufferNode(ctx context.Context, collID UniqueID, flushCh <-chan fl
if err != nil {
return nil, err
}
wTt.AsProducer([]string{Params.MsgChannelCfg.DataCoordTimeTick})
wTt.AsProducer([]string{Params.CommonCfg.DataCoordTimeTick})
metrics.DataNodeNumProducers.WithLabelValues(fmt.Sprint(collID), fmt.Sprint(Params.DataNodeCfg.NodeID)).Inc()
log.Debug("datanode AsProducer", zap.String("TimeTickChannelName", Params.MsgChannelCfg.DataCoordTimeTick))
log.Debug("datanode AsProducer", zap.String("TimeTickChannelName", Params.CommonCfg.DataCoordTimeTick))
var wTtMsgStream msgstream.MsgStream = wTt
wTtMsgStream.Start()
......
......@@ -82,9 +82,9 @@ func TestGrpcService(t *testing.T) {
rootcoord.Params.Init()
rootcoord.Params.EtcdCfg.MetaRootPath = fmt.Sprintf("/%d/test/meta", randVal)
rootcoord.Params.EtcdCfg.KvRootPath = fmt.Sprintf("/%d/test/kv", randVal)
rootcoord.Params.MsgChannelCfg.RootCoordSubName = fmt.Sprintf("msgChannel%d", randVal)
rootcoord.Params.MsgChannelCfg.RootCoordTimeTick = fmt.Sprintf("timeTick%d", randVal)
rootcoord.Params.MsgChannelCfg.RootCoordStatistics = fmt.Sprintf("stateChannel%d", randVal)
rootcoord.Params.CommonCfg.RootCoordSubName = fmt.Sprintf("msgChannel%d", randVal)
rootcoord.Params.CommonCfg.RootCoordTimeTick = fmt.Sprintf("timeTick%d", randVal)
rootcoord.Params.CommonCfg.RootCoordStatistics = fmt.Sprintf("stateChannel%d", randVal)
rootcoord.Params.RootCoordCfg.MaxPartitionNum = 64
rootcoord.Params.CommonCfg.DefaultPartitionName = "_default"
......
......@@ -141,9 +141,9 @@ func (i *IndexNode) initKnowhere() {
C.IndexBuilderInit()
// override index builder SIMD type
cSimdType := C.CString(Params.KnowhereCfg.SimdType)
cSimdType := C.CString(Params.CommonCfg.SimdType)
cRealSimdType := C.IndexBuilderSetSimdType(cSimdType)
Params.KnowhereCfg.SimdType = C.GoString(cRealSimdType)
Params.CommonCfg.SimdType = C.GoString(cRealSimdType)
C.free(unsafe.Pointer(cRealSimdType))
C.free(unsafe.Pointer(cSimdType))
}
......
......@@ -354,7 +354,7 @@ func getMockSystemInfoMetrics(
},
SystemConfigurations: metricsinfo.IndexNodeConfiguration{
MinioBucketName: Params.MinioCfg.BucketName,
SimdType: Params.KnowhereCfg.SimdType,
SimdType: Params.CommonCfg.SimdType,
},
}
......
......@@ -52,7 +52,7 @@ func getSystemInfoMetrics(
},
SystemConfigurations: metricsinfo.IndexNodeConfiguration{
MinioBucketName: Params.MinioCfg.BucketName,
SimdType: Params.KnowhereCfg.SimdType,
SimdType: Params.CommonCfg.SimdType,
},
}
......
......@@ -817,7 +817,7 @@ func (sched *taskScheduler) collectResultLoop() {
queryResultMsgStream, _ := sched.msFactory.NewQueryMsgStream(sched.ctx)
// proxy didn't need to walk through all the search results in channel, because it no longer has client connections.
consumeSubName := fmt.Sprintf("%s-%d", Params.MsgChannelCfg.ProxySubName, Params.ProxyCfg.ProxyID)
consumeSubName := fmt.Sprintf("%s-%d", Params.CommonCfg.ProxySubName, Params.ProxyCfg.ProxyID)
queryResultMsgStream.AsConsumerWithPosition(Params.ProxyCfg.SearchResultChannelNames, consumeSubName, mqwrapper.SubscriptionPositionLatest)
log.Debug("Proxy", zap.Strings("SearchResultChannelNames", Params.ProxyCfg.SearchResultChannelNames),
zap.Any("consumeSubName", consumeSubName))
......
......@@ -132,7 +132,7 @@ func (csh *channelUnsubscribeHandler) handleChannelUnsubscribeLoop() {
nodeID := channelInfo.NodeID
for _, collectionChannels := range channelInfo.CollectionChannels {
collectionID := collectionChannels.CollectionID
subName := funcutil.GenChannelSubName(Params.MsgChannelCfg.QueryNodeSubName, collectionID, nodeID)
subName := funcutil.GenChannelSubName(Params.CommonCfg.QueryNodeSubName, collectionID, nodeID)
err := unsubscribeChannels(csh.ctx, csh.factory, subName, collectionChannels.Channels)
if err != nil {
log.Debug("unsubscribe channels failed", zap.Int64("nodeID", nodeID))
......
......@@ -72,7 +72,7 @@ func (qc *QueryCoord) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringR
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
},
Value: Params.MsgChannelCfg.QueryCoordTimeTick,
Value: Params.CommonCfg.QueryCoordTimeTick,
}, nil
}
......@@ -84,7 +84,7 @@ func (qc *QueryCoord) GetStatisticsChannel(ctx context.Context) (*milvuspb.Strin
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
},
Value: Params.MsgChannelCfg.QueryNodeStats,
Value: Params.CommonCfg.QueryNodeStats,
}, nil
}
......
......@@ -784,8 +784,8 @@ func (m *MetaReplica) createQueryChannel(collectionID UniqueID) *querypb.QueryCh
// all collection use the same query channel
colIDForAssignChannel := UniqueID(0)
searchPrefix := Params.MsgChannelCfg.QueryCoordSearch
searchResultPrefix := Params.MsgChannelCfg.QueryCoordSearchResult
searchPrefix := Params.CommonCfg.QueryCoordSearch
searchResultPrefix := Params.CommonCfg.QueryCoordSearchResult
allocatedQueryChannel := searchPrefix + "-" + strconv.FormatInt(colIDForAssignChannel, 10)
allocatedQueryResultChannel := searchResultPrefix + "-" + strconv.FormatInt(colIDForAssignChannel, 10)
log.Debug("query coordinator create query channel", zap.String("queryChannelName", allocatedQueryChannel), zap.String("queryResultChannelName", allocatedQueryResultChannel))
......
......@@ -58,8 +58,8 @@ func getSystemInfoMetrics(
ID: qc.session.ServerID,
},
SystemConfigurations: metricsinfo.QueryCoordConfiguration{
SearchChannelPrefix: Params.MsgChannelCfg.QueryCoordSearch,
SearchResultChannelPrefix: Params.MsgChannelCfg.QueryCoordSearchResult,
SearchChannelPrefix: Params.CommonCfg.QueryCoordSearch,
SearchResultChannelPrefix: Params.CommonCfg.QueryCoordSearchResult,
},
},
ConnectedNodes: make([]metricsinfo.QueryNodeInfos, 0),
......
......@@ -280,8 +280,8 @@ func NewQueryCoord(ctx context.Context, factory msgstream.Factory) (*QueryCoord,
rand.Seed(time.Now().UnixNano())
queryChannels := make([]*queryChannelInfo, 0)
channelID := len(queryChannels)
searchPrefix := Params.MsgChannelCfg.QueryCoordSearch
searchResultPrefix := Params.MsgChannelCfg.QueryCoordSearchResult
searchPrefix := Params.CommonCfg.QueryCoordSearch
searchResultPrefix := Params.CommonCfg.QueryCoordSearchResult
allocatedQueryChannel := searchPrefix + "-" + strconv.FormatInt(int64(channelID), 10)
allocatedQueryResultChannel := searchResultPrefix + "-" + strconv.FormatInt(int64(channelID), 10)
......
......@@ -48,11 +48,11 @@ func setup() {
func refreshParams() {
rand.Seed(time.Now().UnixNano())
suffix := "-test-query-Coord" + strconv.FormatInt(rand.Int63(), 10)
Params.MsgChannelCfg.QueryNodeStats = Params.MsgChannelCfg.QueryNodeStats + suffix
Params.MsgChannelCfg.QueryCoordTimeTick = Params.MsgChannelCfg.QueryCoordTimeTick + suffix
Params.CommonCfg.QueryNodeStats = Params.CommonCfg.QueryNodeStats + suffix
Params.CommonCfg.QueryCoordTimeTick = Params.CommonCfg.QueryCoordTimeTick + suffix
Params.EtcdCfg.MetaRootPath = Params.EtcdCfg.MetaRootPath + suffix
Params.MsgChannelCfg.RootCoordDml = "Dml"
Params.MsgChannelCfg.RootCoordDelta = "delta"
Params.CommonCfg.RootCoordDml = "Dml"
Params.CommonCfg.RootCoordDelta = "delta"
GlobalSegmentInfos = make(map[UniqueID]*querypb.SegmentInfo)
}
......
......@@ -1999,7 +1999,7 @@ func getSizeOfLoadSegmentReq(req *querypb.LoadSegmentsRequest) int {
}
func generateWatchDeltaChannelInfo(info *datapb.VchannelInfo) (*datapb.VchannelInfo, error) {
deltaChannelName, err := rootcoord.ConvertChannelName(info.ChannelName, Params.MsgChannelCfg.RootCoordDml, Params.MsgChannelCfg.RootCoordDelta)
deltaChannelName, err := rootcoord.ConvertChannelName(info.ChannelName, Params.CommonCfg.RootCoordDml, Params.CommonCfg.RootCoordDelta)
if err != nil {
return nil, err
}
......
......@@ -71,7 +71,7 @@ func (node *QueryNode) GetTimeTickChannel(ctx context.Context) (*milvuspb.String
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
},
Value: Params.MsgChannelCfg.QueryCoordTimeTick,
Value: Params.CommonCfg.QueryCoordTimeTick,
}, nil
}
......@@ -83,7 +83,7 @@ func (node *QueryNode) GetStatisticsChannel(ctx context.Context) (*milvuspb.Stri
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
},
Value: Params.MsgChannelCfg.QueryNodeStats,
Value: Params.CommonCfg.QueryNodeStats,
}, nil
}
......
......@@ -55,7 +55,7 @@ func getSystemInfoMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest,
RetrievePulsarBufSize: Params.QueryNodeCfg.RetrievePulsarBufSize,
RetrieveResultReceiveBufSize: Params.QueryNodeCfg.RetrieveResultReceiveBufSize,
SimdType: Params.KnowhereCfg.SimdType,
SimdType: Params.CommonCfg.SimdType,
},
}
metricsinfo.FillDeployMetricsWithEnv(&nodeInfos.SystemInfo)
......
......@@ -174,9 +174,9 @@ func (node *QueryNode) InitSegcore() {
C.SegcoreSetChunkRows(cChunkRows)
// override segcore SIMD type
cSimdType := C.CString(Params.KnowhereCfg.SimdType)
cSimdType := C.CString(Params.CommonCfg.SimdType)
cRealSimdType := C.SegcoreSetSimdType(cSimdType)
Params.KnowhereCfg.SimdType = C.GoString(cRealSimdType)
Params.CommonCfg.SimdType = C.GoString(cRealSimdType)
C.free(unsafe.Pointer(cRealSimdType))
C.free(unsafe.Pointer(cSimdType))
}
......
......@@ -237,7 +237,7 @@ func newMessageStreamFactory() (msgstream.Factory, error) {
func TestMain(m *testing.M) {
setup()
Params.MsgChannelCfg.QueryNodeStats = Params.MsgChannelCfg.QueryNodeStats + strconv.Itoa(rand.Int())
Params.CommonCfg.QueryNodeStats = Params.CommonCfg.QueryNodeStats + strconv.Itoa(rand.Int())
exitCode := m.Run()
os.Exit(exitCode)
}
......
......@@ -50,7 +50,7 @@ func (sService *statsService) start() {
sleepTimeInterval := Params.QueryNodeCfg.StatsPublishInterval
// start pulsar
producerChannels := []string{Params.MsgChannelCfg.QueryNodeStats}
producerChannels := []string{Params.CommonCfg.QueryNodeStats}
statsStream, _ := sService.msFactory.NewMsgStream(sService.ctx)
statsStream.AsProducer(producerChannels)
......
......@@ -50,7 +50,7 @@ func TestSegmentManagement_sendSegmentStatistic(t *testing.T) {
const receiveBufSize = 1024
// start pulsar
producerChannels := []string{Params.MsgChannelCfg.QueryNodeStats}
producerChannels := []string{Params.CommonCfg.QueryNodeStats}
msFactory := msgstream.NewPmsFactory()
m := map[string]interface{}{
......
......@@ -159,7 +159,7 @@ func (r *addQueryChannelTask) Execute(ctx context.Context) error {
return err
}
consumeChannels := []string{r.req.QueryChannel}
consumeSubName := funcutil.GenChannelSubName(Params.MsgChannelCfg.QueryNodeSubName, collectionID, Params.QueryNodeCfg.QueryNodeID)
consumeSubName := funcutil.GenChannelSubName(Params.CommonCfg.QueryNodeSubName, collectionID, Params.QueryNodeCfg.QueryNodeID)
sc.queryMsgStream.AsConsumer(consumeChannels, consumeSubName)
metrics.QueryNodeNumConsumers.WithLabelValues(fmt.Sprint(collectionID), fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Inc()
......@@ -312,7 +312,7 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error {
}
}()
consumeSubName := funcutil.GenChannelSubName(Params.MsgChannelCfg.QueryNodeSubName, collectionID, Params.QueryNodeCfg.QueryNodeID)
consumeSubName := funcutil.GenChannelSubName(Params.CommonCfg.QueryNodeSubName, collectionID, Params.QueryNodeCfg.QueryNodeID)
// group channels by to seeking or consuming
channel2SeekPosition := make(map[string]*internalpb.MsgPosition)
......@@ -532,7 +532,7 @@ func (w *watchDeltaChannelsTask) Execute(ctx context.Context) error {
}
channel2FlowGraph := w.node.dataSyncService.addFlowGraphsForDeltaChannels(collectionID, vDeltaChannels)
consumeSubName := funcutil.GenChannelSubName(Params.MsgChannelCfg.QueryNodeSubName, collectionID, Params.QueryNodeCfg.QueryNodeID)
consumeSubName := funcutil.GenChannelSubName(Params.CommonCfg.QueryNodeSubName, collectionID, Params.QueryNodeCfg.QueryNodeID)
// channels as consumer
for _, channel := range vDeltaChannels {
fg := channel2FlowGraph[channel]
......
......@@ -441,18 +441,18 @@ func (c *Core) setMsgStreams() error {
if Params.PulsarCfg.Address == "" {
return fmt.Errorf("pulsar address is empty")
}
if Params.MsgChannelCfg.RootCoordSubName == "" {
if Params.CommonCfg.RootCoordSubName == "" {
return fmt.Errorf("RootCoordSubName is empty")
}
// rootcoord time tick channel
if Params.MsgChannelCfg.RootCoordTimeTick == "" {
if Params.CommonCfg.RootCoordTimeTick == "" {
return fmt.Errorf("timeTickChannel is empty")
}
timeTickStream, _ := c.msFactory.NewMsgStream(c.ctx)
metrics.RootCoordNumOfMsgStream.Inc()
timeTickStream.AsProducer([]string{Params.MsgChannelCfg.RootCoordTimeTick})
log.Debug("RootCoord register timetick producer success", zap.String("channel name", Params.MsgChannelCfg.RootCoordTimeTick))
timeTickStream.AsProducer([]string{Params.CommonCfg.RootCoordTimeTick})
log.Debug("RootCoord register timetick producer success", zap.String("channel name", Params.CommonCfg.RootCoordTimeTick))
c.SendTimeTick = func(t typeutil.Timestamp, reason string) error {
msgPack := ms.MsgPack{}
......@@ -1186,7 +1186,7 @@ func (c *Core) Start() error {
}
log.Debug(typeutil.RootCoordRole, zap.Int64("node id", c.session.ServerID))
log.Debug(typeutil.RootCoordRole, zap.String("time tick channel name", Params.MsgChannelCfg.RootCoordTimeTick))
log.Debug(typeutil.RootCoordRole, zap.String("time tick channel name", Params.CommonCfg.RootCoordTimeTick))
c.startOnce.Do(func() {
if err := c.proxyManager.WatchProxy(); err != nil {
......@@ -1261,7 +1261,7 @@ func (c *Core) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
},
Value: Params.MsgChannelCfg.RootCoordTimeTick,
Value: Params.CommonCfg.RootCoordTimeTick,
}, nil
}
......@@ -1272,7 +1272,7 @@ func (c *Core) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringRespon
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
},
Value: Params.MsgChannelCfg.RootCoordStatistics,
Value: Params.CommonCfg.RootCoordStatistics,
}, nil
}
......
......@@ -559,13 +559,13 @@ func TestRootCoord(t *testing.T) {
core, err := NewCore(ctx, coreFactory)
assert.Nil(t, err)
randVal := rand.Int()
Params.MsgChannelCfg.RootCoordTimeTick = fmt.Sprintf("rootcoord-time-tick-%d", randVal)
Params.MsgChannelCfg.RootCoordStatistics = fmt.Sprintf("rootcoord-statistics-%d", randVal)
Params.CommonCfg.RootCoordTimeTick = fmt.Sprintf("rootcoord-time-tick-%d", randVal)
Params.CommonCfg.RootCoordStatistics = fmt.Sprintf("rootcoord-statistics-%d", randVal)
Params.EtcdCfg.MetaRootPath = fmt.Sprintf("/%d/%s", randVal, Params.EtcdCfg.MetaRootPath)
Params.EtcdCfg.KvRootPath = fmt.Sprintf("/%d/%s", randVal, Params.EtcdCfg.KvRootPath)
Params.MsgChannelCfg.RootCoordSubName = fmt.Sprintf("subname-%d", randVal)
Params.MsgChannelCfg.RootCoordDml = fmt.Sprintf("rootcoord-dml-test-%d", randVal)
Params.MsgChannelCfg.RootCoordDelta = fmt.Sprintf("rootcoord-delta-test-%d", randVal)
Params.CommonCfg.RootCoordSubName = fmt.Sprintf("subname-%d", randVal)
Params.CommonCfg.RootCoordDml = fmt.Sprintf("rootcoord-dml-test-%d", randVal)
Params.CommonCfg.RootCoordDelta = fmt.Sprintf("rootcoord-delta-test-%d", randVal)
etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg)
assert.NoError(t, err)
......@@ -626,7 +626,7 @@ func TestRootCoord(t *testing.T) {
assert.Nil(t, err)
timeTickStream, _ := tmpFactory.NewMsgStream(ctx)
timeTickStream.AsConsumer([]string{Params.MsgChannelCfg.RootCoordTimeTick}, Params.MsgChannelCfg.RootCoordSubName)
timeTickStream.AsConsumer([]string{Params.CommonCfg.RootCoordTimeTick}, Params.CommonCfg.RootCoordSubName)
timeTickStream.Start()
dmlStream, _ := tmpFactory.NewMsgStream(ctx)
......@@ -723,7 +723,7 @@ func TestRootCoord(t *testing.T) {
createMeta, err := core.MetaTable.GetCollectionByName(collName, 0)
assert.Nil(t, err)
dmlStream.AsConsumer([]string{createMeta.PhysicalChannelNames[0]}, Params.MsgChannelCfg.RootCoordSubName)
dmlStream.AsConsumer([]string{createMeta.PhysicalChannelNames[0]}, Params.CommonCfg.RootCoordSubName)
dmlStream.Start()
pChanMap := core.MetaTable.ListCollectionPhysicalChannels()
......@@ -2311,11 +2311,11 @@ func TestRootCoord2(t *testing.T) {
randVal := rand.Int()
Params.MsgChannelCfg.RootCoordTimeTick = fmt.Sprintf("rootcoord-time-tick-%d", randVal)
Params.MsgChannelCfg.RootCoordStatistics = fmt.Sprintf("rootcoord-statistics-%d", randVal)
Params.CommonCfg.RootCoordTimeTick = fmt.Sprintf("rootcoord-time-tick-%d", randVal)
Params.CommonCfg.RootCoordStatistics = fmt.Sprintf("rootcoord-statistics-%d", randVal)
Params.EtcdCfg.MetaRootPath = fmt.Sprintf("/%d/%s", randVal, Params.EtcdCfg.MetaRootPath)
Params.EtcdCfg.KvRootPath = fmt.Sprintf("/%d/%s", randVal, Params.EtcdCfg.KvRootPath)
Params.MsgChannelCfg.RootCoordSubName = fmt.Sprintf("subname-%d", randVal)
Params.CommonCfg.RootCoordSubName = fmt.Sprintf("subname-%d", randVal)
dm := &dataMock{randVal: randVal}
err = core.SetDataCoord(ctx, dm)
......@@ -2361,7 +2361,7 @@ func TestRootCoord2(t *testing.T) {
assert.Nil(t, err)
timeTickStream, _ := msFactory.NewMsgStream(ctx)
timeTickStream.AsConsumer([]string{Params.MsgChannelCfg.RootCoordTimeTick}, Params.MsgChannelCfg.RootCoordSubName)
timeTickStream.AsConsumer([]string{Params.CommonCfg.RootCoordTimeTick}, Params.CommonCfg.RootCoordSubName)
timeTickStream.Start()
time.Sleep(100 * time.Millisecond)
......@@ -2404,7 +2404,7 @@ func TestRootCoord2(t *testing.T) {
collInfo, err := core.MetaTable.GetCollectionByName(collName, 0)
assert.Nil(t, err)
dmlStream, _ := msFactory.NewMsgStream(ctx)
dmlStream.AsConsumer([]string{collInfo.PhysicalChannelNames[0]}, Params.MsgChannelCfg.RootCoordSubName)
dmlStream.AsConsumer([]string{collInfo.PhysicalChannelNames[0]}, Params.CommonCfg.RootCoordSubName)
dmlStream.Start()
msgs := getNotTtMsg(ctx, 1, dmlStream.Chan())
......@@ -2589,11 +2589,11 @@ func TestCheckFlushedSegments(t *testing.T) {
assert.Nil(t, err)
randVal := rand.Int()
Params.MsgChannelCfg.RootCoordTimeTick = fmt.Sprintf("rootcoord-time-tick-%d", randVal)
Params.MsgChannelCfg.RootCoordStatistics = fmt.Sprintf("rootcoord-statistics-%d", randVal)
Params.CommonCfg.RootCoordTimeTick = fmt.Sprintf("rootcoord-time-tick-%d", randVal)
Params.CommonCfg.RootCoordStatistics = fmt.Sprintf("rootcoord-statistics-%d", randVal)
Params.EtcdCfg.MetaRootPath = fmt.Sprintf("/%d/%s", randVal, Params.EtcdCfg.MetaRootPath)
Params.EtcdCfg.KvRootPath = fmt.Sprintf("/%d/%s", randVal, Params.EtcdCfg.KvRootPath)
Params.MsgChannelCfg.RootCoordSubName = fmt.Sprintf("subname-%d", randVal)
Params.CommonCfg.RootCoordSubName = fmt.Sprintf("subname-%d", randVal)
dm := &dataMock{randVal: randVal}
err = core.SetDataCoord(ctx, dm)
......@@ -2642,7 +2642,7 @@ func TestCheckFlushedSegments(t *testing.T) {
assert.Nil(t, err)
timeTickStream, _ := msFactory.NewMsgStream(ctx)
timeTickStream.AsConsumer([]string{Params.MsgChannelCfg.RootCoordTimeTick}, Params.MsgChannelCfg.RootCoordSubName)
timeTickStream.AsConsumer([]string{Params.CommonCfg.RootCoordTimeTick}, Params.CommonCfg.RootCoordSubName)
timeTickStream.Start()
time.Sleep(100 * time.Millisecond)
......@@ -2755,11 +2755,11 @@ func TestRootCoord_CheckZeroShardsNum(t *testing.T) {
core, err := NewCore(ctx, msFactory)
assert.Nil(t, err)
randVal := rand.Int()
Params.MsgChannelCfg.RootCoordTimeTick = fmt.Sprintf("rootcoord-time-tick-%d", randVal)
Params.MsgChannelCfg.RootCoordStatistics = fmt.Sprintf("rootcoord-statistics-%d", randVal)
Params.CommonCfg.RootCoordTimeTick = fmt.Sprintf("rootcoord-time-tick-%d", randVal)
Params.CommonCfg.RootCoordStatistics = fmt.Sprintf("rootcoord-statistics-%d", randVal)
Params.EtcdCfg.MetaRootPath = fmt.Sprintf("/%d/%s", randVal, Params.EtcdCfg.MetaRootPath)
Params.EtcdCfg.KvRootPath = fmt.Sprintf("/%d/%s", randVal, Params.EtcdCfg.KvRootPath)
Params.MsgChannelCfg.RootCoordSubName = fmt.Sprintf("subname-%d", randVal)
Params.CommonCfg.RootCoordSubName = fmt.Sprintf("subname-%d", randVal)
dm := &dataMock{randVal: randVal}
err = core.SetDataCoord(ctx, dm)
......@@ -2809,7 +2809,7 @@ func TestRootCoord_CheckZeroShardsNum(t *testing.T) {
assert.Nil(t, err)
timeTickStream, _ := msFactory.NewMsgStream(ctx)
timeTickStream.AsConsumer([]string{Params.MsgChannelCfg.RootCoordTimeTick}, Params.MsgChannelCfg.RootCoordSubName)
timeTickStream.AsConsumer([]string{Params.CommonCfg.RootCoordTimeTick}, Params.CommonCfg.RootCoordSubName)
timeTickStream.Start()
time.Sleep(100 * time.Millisecond)
......
......@@ -149,7 +149,7 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error {
chanNames[i] = ToPhysicalChannel(vchanNames[i])
deltaChanNames[i] = t.core.chanTimeTick.getDeltaChannelName()
deltaChanName, err1 := ConvertChannelName(chanNames[i], Params.MsgChannelCfg.RootCoordDml, Params.MsgChannelCfg.RootCoordDelta)
deltaChanName, err1 := ConvertChannelName(chanNames[i], Params.CommonCfg.RootCoordDml, Params.CommonCfg.RootCoordDelta)
if err1 != nil || deltaChanName != deltaChanNames[i] {
return fmt.Errorf("dmlChanName %s and deltaChanName %s mis-match", chanNames[i], deltaChanNames[i])
}
......@@ -363,7 +363,7 @@ func (t *DropCollectionReqTask) Execute(ctx context.Context) error {
// remove delta channels
deltaChanNames := make([]string, len(collMeta.PhysicalChannelNames))
for i, chanName := range collMeta.PhysicalChannelNames {
if deltaChanNames[i], err = ConvertChannelName(chanName, Params.MsgChannelCfg.RootCoordDml, Params.MsgChannelCfg.RootCoordDelta); err != nil {
if deltaChanNames[i], err = ConvertChannelName(chanName, Params.CommonCfg.RootCoordDml, Params.CommonCfg.RootCoordDelta); err != nil {
return err
}
}
......
......@@ -88,11 +88,11 @@ func BenchmarkAllocTimestamp(b *testing.B) {
randVal := rand.Int()
Params.MsgChannelCfg.RootCoordTimeTick = fmt.Sprintf("master-time-tick-%d", randVal)
Params.MsgChannelCfg.RootCoordStatistics = fmt.Sprintf("master-statistics-%d", randVal)
Params.CommonCfg.RootCoordTimeTick = fmt.Sprintf("master-time-tick-%d", randVal)
Params.CommonCfg.RootCoordStatistics = fmt.Sprintf("master-statistics-%d", randVal)
Params.EtcdCfg.MetaRootPath = fmt.Sprintf("/%d/%s", randVal, Params.EtcdCfg.MetaRootPath)
Params.EtcdCfg.KvRootPath = fmt.Sprintf("/%d/%s", randVal, Params.EtcdCfg.KvRootPath)
Params.MsgChannelCfg.RootCoordSubName = fmt.Sprintf("subname-%d", randVal)
Params.CommonCfg.RootCoordSubName = fmt.Sprintf("subname-%d", randVal)
err = core.SetDataCoord(ctx, &tbd{})
assert.Nil(b, err)
......
......@@ -87,9 +87,9 @@ func (c *chanTsMsg) getTimetick(channelName string) typeutil.Timestamp {
func newTimeTickSync(ctx context.Context, sourceID int64, factory msgstream.Factory, chanMap map[typeutil.UniqueID][]string) *timetickSync {
// initialize dml channels used for insert
dmlChannels := newDmlChannels(ctx, factory, Params.MsgChannelCfg.RootCoordDml, Params.RootCoordCfg.DmlChannelNum)
dmlChannels := newDmlChannels(ctx, factory, Params.CommonCfg.RootCoordDml, Params.RootCoordCfg.DmlChannelNum)
// initialize delta channels used for delete, share Params.DmlChannelNum with dmlChannels
deltaChannels := newDmlChannels(ctx, factory, Params.MsgChannelCfg.RootCoordDelta, Params.RootCoordCfg.DmlChannelNum)
deltaChannels := newDmlChannels(ctx, factory, Params.CommonCfg.RootCoordDelta, Params.RootCoordCfg.DmlChannelNum)
// recover physical channels for all collections
for collID, chanNames := range chanMap {
......@@ -99,7 +99,7 @@ func newTimeTickSync(ctx context.Context, sourceID int64, factory msgstream.Fact
var err error
deltaChanNames := make([]string, len(chanNames))
for i, chanName := range chanNames {
deltaChanNames[i], err = ConvertChannelName(chanName, Params.MsgChannelCfg.RootCoordDml, Params.MsgChannelCfg.RootCoordDelta)
deltaChanNames[i], err = ConvertChannelName(chanName, Params.CommonCfg.RootCoordDml, Params.CommonCfg.RootCoordDelta)
if err != nil {
log.Error("failed to convert dml channel name to delta channel name", zap.String("chanName", chanName))
panic("invalid dml channel name " + chanName)
......
......@@ -45,8 +45,8 @@ func TestTimetickSync(t *testing.T) {
//}
Params.RootCoordCfg.DmlChannelNum = 2
Params.MsgChannelCfg.RootCoordDml = "rootcoord-dml"
Params.MsgChannelCfg.RootCoordDelta = "rootcoord-delta"
Params.CommonCfg.RootCoordDml = "rootcoord-dml"
Params.CommonCfg.RootCoordDelta = "rootcoord-delta"
ttSync := newTimeTickSync(ctx, sourceID, factory, nil)
var wg sync.WaitGroup
......
......@@ -137,11 +137,35 @@ func (gp *BaseTable) Load(key string) (string, error) {
return gp.params.Load(strings.ToLower(key))
}
// Load2 loads an object with multiple @keys, return the first successful value.
// If all keys not exist, return error.
// This is to be compatible with old configuration file.
func (gp *BaseTable) Load2(keys []string) (string, error) {
for _, key := range keys {
if str, err := gp.params.Load(strings.ToLower(key)); err == nil {
return str, nil
}
}
return "", fmt.Errorf("invalid keys: %v", keys)
}
// LoadWithDefault loads an object with @key. If the object does not exist, @defaultValue will be returned.
func (gp *BaseTable) LoadWithDefault(key, defaultValue string) string {
return gp.params.LoadWithDefault(strings.ToLower(key), defaultValue)
}
// LoadWithDefault2 loads an object with multiple @keys, return the first successful value.
// If all keys not exist, return @defaultValue.
// This is to be compatible with old configuration file.
func (gp *BaseTable) LoadWithDefault2(keys []string, defaultValue string) string {
for _, key := range keys {
if str, err := gp.params.Load(strings.ToLower(key)); err == nil {
return str
}
}
return defaultValue
}
// LoadRange loads objects with range @startKey to @endKey with @limit number of objects.
func (gp *BaseTable) LoadRange(key, endKey string, limit int) ([]string, []string, error) {
return gp.params.LoadRange(strings.ToLower(key), strings.ToLower(endKey), limit)
......
......@@ -73,6 +73,23 @@ func TestBaseTable_LoadFromKVPair(t *testing.T) {
v, err = baseParams.Load("k2")
assert.Nil(t, err)
assert.Equal(t, "v2", v)
v, err = baseParams.Load2([]string{"k2_new"})
assert.NotNil(t, err)
assert.Equal(t, "", v)
v, err = baseParams.Load2([]string{"k2_new", "k2"})
assert.Nil(t, err)
assert.Equal(t, "v2", v)
v = baseParams.LoadWithDefault("k2_new", "v2_new")
assert.Equal(t, "v2_new", v)
v = baseParams.LoadWithDefault2([]string{"k2_new"}, "v2_new")
assert.Equal(t, "v2_new", v)
v = baseParams.LoadWithDefault2([]string{"k2_new", "k2"}, "v2_new")
assert.Equal(t, "v2", v)
}
func TestBaseTable_LoadRange(t *testing.T) {
......
......@@ -34,9 +34,7 @@ type ComponentParam struct {
ServiceParam
once sync.Once
CommonCfg commonConfig
KnowhereCfg knowhereConfig
MsgChannelCfg msgChannelConfig
CommonCfg commonConfig
RootCoordCfg rootCoordConfig
ProxyCfg proxyConfig
......@@ -60,8 +58,6 @@ func (p *ComponentParam) Init() {
p.ServiceParam.Init()
p.CommonCfg.init(&p.BaseTable)
p.KnowhereCfg.init(&p.BaseTable)
p.MsgChannelCfg.init(&p.BaseTable)
p.RootCoordCfg.init(&p.BaseTable)
p.ProxyCfg.init(&p.BaseTable)
......@@ -84,54 +80,6 @@ func (p *ComponentParam) SetLogConfig(role string) {
type commonConfig struct {
Base *BaseTable
DefaultPartitionName string
DefaultIndexName string
RetentionDuration int64
}
func (p *commonConfig) init(base *BaseTable) {
p.Base = base
p.initDefaultPartitionName()
p.initDefaultIndexName()
p.initRetentionDuration()
}
func (p *commonConfig) initDefaultPartitionName() {
p.DefaultPartitionName = p.Base.LoadWithDefault("common.defaultPartitionName", "_default")
}
func (p *commonConfig) initDefaultIndexName() {
p.DefaultIndexName = p.Base.LoadWithDefault("common.defaultIndexName", "_default_idx")
}
func (p *commonConfig) initRetentionDuration() {
p.RetentionDuration = p.Base.ParseInt64WithDefault("common.retentionDuration", DefaultRetentionDuration)
}
///////////////////////////////////////////////////////////////////////////////
// --- knowhere ---
type knowhereConfig struct {
Base *BaseTable
SimdType string
}
func (p *knowhereConfig) init(base *BaseTable) {
p.Base = base
p.initSimdType()
}
func (p *knowhereConfig) initSimdType() {
p.SimdType = p.Base.LoadWithDefault("knowhere.simdType", "auto")
}
///////////////////////////////////////////////////////////////////////////////
// --- msgChannel ---
type msgChannelConfig struct {
Base *BaseTable
ClusterPrefix string
ProxySubName string
......@@ -153,9 +101,15 @@ type msgChannelConfig struct {
DataCoordSegmentInfo string
DataCoordSubName string
DataNodeSubName string
DefaultPartitionName string
DefaultIndexName string
RetentionDuration int64
SimdType string
}
func (p *msgChannelConfig) init(base *BaseTable) {
func (p *commonConfig) init(base *BaseTable) {
p.Base = base
// must init cluster prefix first
......@@ -180,18 +134,28 @@ func (p *msgChannelConfig) init(base *BaseTable) {
p.initDataCoordSegmentInfo()
p.initDataCoordSubName()
p.initDataNodeSubName()
p.initDefaultPartitionName()
p.initDefaultIndexName()
p.initRetentionDuration()
p.initSimdType()
}
func (p *msgChannelConfig) initClusterPrefix() {
str, err := p.Base.Load("msgChannel.chanNamePrefix.cluster")
func (p *commonConfig) initClusterPrefix() {
keys := []string{
"common.chanNamePrefix.cluster",
"msgChannel.chanNamePrefix.cluster",
}
str, err := p.Base.Load2(keys)
if err != nil {
panic(err)
}
p.ClusterPrefix = str
}
func (p *msgChannelConfig) initChanNamePrefix(cfg string) string {
value, err := p.Base.Load(cfg)
func (p *commonConfig) initChanNamePrefix(keys []string) string {
value, err := p.Base.Load2(keys)
if err != nil {
panic(err)
}
......@@ -200,72 +164,156 @@ func (p *msgChannelConfig) initChanNamePrefix(cfg string) string {
}
// --- proxy ---
func (p *msgChannelConfig) initProxySubName() {
p.ProxySubName = p.initChanNamePrefix("msgChannel.subNamePrefix.proxySubNamePrefix")
func (p *commonConfig) initProxySubName() {
keys := []string{
"common.subNamePrefix.proxySubNamePrefix",
"msgChannel.subNamePrefix.proxySubNamePrefix",
}
p.ProxySubName = p.initChanNamePrefix(keys)
}
// --- rootcoord ---
func (p *msgChannelConfig) initRootCoordTimeTick() {
p.RootCoordTimeTick = p.initChanNamePrefix("msgChannel.chanNamePrefix.rootCoordTimeTick")
func (p *commonConfig) initRootCoordTimeTick() {
keys := []string{
"common.chanNamePrefix.rootCoordTimeTick",
"msgChannel.chanNamePrefix.rootCoordTimeTick",
}
p.RootCoordTimeTick = p.initChanNamePrefix(keys)
}
func (p *msgChannelConfig) initRootCoordStatistics() {
p.RootCoordStatistics = p.initChanNamePrefix("msgChannel.chanNamePrefix.rootCoordStatistics")
func (p *commonConfig) initRootCoordStatistics() {
keys := []string{
"common.chanNamePrefix.rootCoordStatistics",
"msgChannel.chanNamePrefix.rootCoordStatistics",
}
p.RootCoordStatistics = p.initChanNamePrefix(keys)
}
func (p *msgChannelConfig) initRootCoordDml() {
p.RootCoordDml = p.initChanNamePrefix("msgChannel.chanNamePrefix.rootCoordDml")
func (p *commonConfig) initRootCoordDml() {
keys := []string{
"common.chanNamePrefix.rootCoordDml",
"msgChannel.chanNamePrefix.rootCoordDml",
}
p.RootCoordDml = p.initChanNamePrefix(keys)
}
func (p *msgChannelConfig) initRootCoordDelta() {
p.RootCoordDelta = p.initChanNamePrefix("msgChannel.chanNamePrefix.rootCoordDelta")
func (p *commonConfig) initRootCoordDelta() {
keys := []string{
"common.chanNamePrefix.rootCoordDelta",
"msgChannel.chanNamePrefix.rootCoordDelta",
}
p.RootCoordDelta = p.initChanNamePrefix(keys)
}
func (p *msgChannelConfig) initRootCoordSubName() {
p.RootCoordSubName = p.initChanNamePrefix("msgChannel.subNamePrefix.rootCoordSubNamePrefix")
func (p *commonConfig) initRootCoordSubName() {
keys := []string{
"common.subNamePrefix.rootCoordSubNamePrefix",
"msgChannel.subNamePrefix.rootCoordSubNamePrefix",
}
p.RootCoordSubName = p.initChanNamePrefix(keys)
}
// --- querycoord ---
func (p *msgChannelConfig) initQueryCoordSearch() {
p.QueryCoordSearch = p.initChanNamePrefix("msgChannel.chanNamePrefix.search")
func (p *commonConfig) initQueryCoordSearch() {
keys := []string{
"common.chanNamePrefix.search",
"msgChannel.chanNamePrefix.search",
}
p.QueryCoordSearch = p.initChanNamePrefix(keys)
}
func (p *msgChannelConfig) initQueryCoordSearchResult() {
p.QueryCoordSearchResult = p.initChanNamePrefix("msgChannel.chanNamePrefix.searchResult")
func (p *commonConfig) initQueryCoordSearchResult() {
keys := []string{
"common.chanNamePrefix.searchResult",
"msgChannel.chanNamePrefix.searchResult",
}
p.QueryCoordSearchResult = p.initChanNamePrefix(keys)
}
func (p *msgChannelConfig) initQueryCoordTimeTick() {
p.QueryCoordTimeTick = p.initChanNamePrefix("msgChannel.chanNamePrefix.queryTimeTick")
func (p *commonConfig) initQueryCoordTimeTick() {
keys := []string{
"common.chanNamePrefix.queryTimeTick",
"msgChannel.chanNamePrefix.queryTimeTick",
}
p.QueryCoordTimeTick = p.initChanNamePrefix(keys)
}
// --- querynode ---
func (p *msgChannelConfig) initQueryNodeStats() {
p.QueryNodeStats = p.initChanNamePrefix("msgChannel.chanNamePrefix.queryNodeStats")
func (p *commonConfig) initQueryNodeStats() {
keys := []string{
"common.chanNamePrefix.queryNodeStats",
"msgChannel.chanNamePrefix.queryNodeStats",
}
p.QueryNodeStats = p.initChanNamePrefix(keys)
}
func (p *msgChannelConfig) initQueryNodeSubName() {
p.QueryNodeSubName = p.initChanNamePrefix("msgChannel.subNamePrefix.queryNodeSubNamePrefix")
func (p *commonConfig) initQueryNodeSubName() {
keys := []string{
"common.subNamePrefix.queryNodeSubNamePrefix",
"msgChannel.subNamePrefix.queryNodeSubNamePrefix",
}
p.QueryNodeSubName = p.initChanNamePrefix(keys)
}
// --- datacoord ---
func (p *msgChannelConfig) initDataCoordStatistic() {
p.DataCoordStatistic = p.initChanNamePrefix("msgChannel.chanNamePrefix.dataCoordStatistic")
func (p *commonConfig) initDataCoordStatistic() {
keys := []string{
"common.chanNamePrefix.dataCoordStatistic",
"msgChannel.chanNamePrefix.dataCoordStatistic",
}
p.DataCoordStatistic = p.initChanNamePrefix(keys)
}
func (p *msgChannelConfig) initDataCoordTimeTick() {
p.DataCoordTimeTick = p.initChanNamePrefix("msgChannel.chanNamePrefix.dataCoordTimeTick")
func (p *commonConfig) initDataCoordTimeTick() {
keys := []string{
"common.chanNamePrefix.dataCoordTimeTick",
"msgChannel.chanNamePrefix.dataCoordTimeTick",
}
p.DataCoordTimeTick = p.initChanNamePrefix(keys)
}
func (p *msgChannelConfig) initDataCoordSegmentInfo() {
p.DataCoordSegmentInfo = p.initChanNamePrefix("msgChannel.chanNamePrefix.dataCoordSegmentInfo")
func (p *commonConfig) initDataCoordSegmentInfo() {
keys := []string{
"common.chanNamePrefix.dataCoordSegmentInfo",
"msgChannel.chanNamePrefix.dataCoordSegmentInfo",
}
p.DataCoordSegmentInfo = p.initChanNamePrefix(keys)
}
func (p *msgChannelConfig) initDataCoordSubName() {
p.DataCoordSubName = p.initChanNamePrefix("msgChannel.subNamePrefix.dataCoordSubNamePrefix")
func (p *commonConfig) initDataCoordSubName() {
keys := []string{
"common.subNamePrefix.dataCoordSubNamePrefix",
"msgChannel.subNamePrefix.dataCoordSubNamePrefix",
}
p.DataCoordSubName = p.initChanNamePrefix(keys)
}
func (p *msgChannelConfig) initDataNodeSubName() {
p.DataNodeSubName = p.initChanNamePrefix("msgChannel.subNamePrefix.dataNodeSubNamePrefix")
func (p *commonConfig) initDataNodeSubName() {
keys := []string{
"common.subNamePrefix.dataNodeSubNamePrefix",
"msgChannel.subNamePrefix.dataNodeSubNamePrefix",
}
p.DataNodeSubName = p.initChanNamePrefix(keys)
}
func (p *commonConfig) initDefaultPartitionName() {
p.DefaultPartitionName = p.Base.LoadWithDefault("common.defaultPartitionName", "_default")
}
func (p *commonConfig) initDefaultIndexName() {
p.DefaultIndexName = p.Base.LoadWithDefault("common.defaultIndexName", "_default_idx")
}
func (p *commonConfig) initRetentionDuration() {
p.RetentionDuration = p.Base.ParseInt64WithDefault("common.retentionDuration", DefaultRetentionDuration)
}
func (p *commonConfig) initSimdType() {
keys := []string{
"common.simdType",
"knowhere.simdType",
}
p.SimdType = p.Base.LoadWithDefault2(keys, "auto")
}
///////////////////////////////////////////////////////////////////////////////
......
......@@ -40,17 +40,9 @@ func TestComponentParam(t *testing.T) {
t.Logf("default index name = %s", Params.DefaultIndexName)
assert.Equal(t, Params.RetentionDuration, int64(DefaultRetentionDuration))
})
t.Run("test knowhereConfig", func(t *testing.T) {
Params := CParams.KnowhereCfg
assert.NotEqual(t, Params.SimdType, "")
t.Logf("knowhere simd type = %s", Params.SimdType)
})
t.Run("test msgChannelConfig", func(t *testing.T) {
Params := CParams.MsgChannelCfg
// -- proxy --
assert.Equal(t, Params.ProxySubName, "by-dev-proxy")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册