diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 53ca013d7f16bb1c13411ac1ba7150b9096ce5cf..febd7c4af5d33470296b0e0e183754162bfbf3e1 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -215,7 +215,7 @@ log: maxBackups: 20 format: text # text/json -msgChannel: +common: # Channel name generation rule: ${namePrefix}-${ChannelIdx} chanNamePrefix: cluster: "by-dev" @@ -241,12 +241,10 @@ msgChannel: dataNodeSubNamePrefix: "dataNode" dataCoordSubNamePrefix: "dataCoord" -common: defaultPartitionName: "_default" # default partition name for a collection defaultIndexName: "_default_idx" # default index name retentionDuration: 432000 # 5 days in seconds -knowhere: # Default value: auto # Valid values: [auto, avx512, avx2, avx, sse4_2] # This configuration is only used by querynode and indexnode, it selects CPU instruction set for Searching and Index-building. diff --git a/internal/datacoord/channel_manager.go b/internal/datacoord/channel_manager.go index 365a8bc02013d01cb6b99d3ac97312545b9ba7eb..2c3945a6c04d84229fd09386f693dbcab1fe473e 100644 --- a/internal/datacoord/channel_manager.go +++ b/internal/datacoord/channel_manager.go @@ -289,7 +289,7 @@ func (c *ChannelManager) unsubAttempt(ncInfo *NodeChannelInfo) { // buildSubName generates a subscription name by concatenating DataNodeSubName, node ID and collection ID. func buildSubName(collectionID int64, nodeID int64) string { - return fmt.Sprintf("%s-%d-%d", Params.MsgChannelCfg.DataNodeSubName, nodeID, collectionID) + return fmt.Sprintf("%s-%d-%d", Params.CommonCfg.DataNodeSubName, nodeID, collectionID) } func (c *ChannelManager) unsubscribe(subName string, channel string) error { diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 34192361f4a3c4c20a08d7ffe14ffb5ab8b7deb1..2a015de55fcef984bc5e99fa11a29e6ec0b73629 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -448,11 +448,11 @@ func (s *Server) startDataNodeTtLoop(ctx context.Context) { log.Error("DataCoord failed to create timetick channel", zap.Error(err)) return } - ttMsgStream.AsConsumerWithPosition([]string{Params.MsgChannelCfg.DataCoordTimeTick}, - Params.MsgChannelCfg.DataCoordSubName, mqwrapper.SubscriptionPositionLatest) + ttMsgStream.AsConsumerWithPosition([]string{Params.CommonCfg.DataCoordTimeTick}, + Params.CommonCfg.DataCoordSubName, mqwrapper.SubscriptionPositionLatest) log.Info("DataCoord creates the timetick channel consumer", - zap.String("timeTickChannel", Params.MsgChannelCfg.DataCoordTimeTick), - zap.String("subscription", Params.MsgChannelCfg.DataCoordSubName)) + zap.String("timeTickChannel", Params.CommonCfg.DataCoordTimeTick), + zap.String("subscription", Params.CommonCfg.DataCoordSubName)) ttMsgStream.Start() go s.handleDataNodeTimetickMsgstream(ctx, ttMsgStream) diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index 5bec2e4e0dde1558bcedc47597f88d7d0086c96e..e7c5dcec144bc0f0d107b1f8cee6a49cadd461b4 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -61,7 +61,7 @@ func TestGetSegmentInfoChannel(t *testing.T) { resp, err := svr.GetSegmentInfoChannel(context.TODO()) assert.Nil(t, err) assert.EqualValues(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) - assert.EqualValues(t, Params.MsgChannelCfg.DataCoordSegmentInfo, resp.Value) + assert.EqualValues(t, Params.CommonCfg.DataCoordSegmentInfo, resp.Value) }) } @@ -248,7 +248,7 @@ func TestGetTimeTickChannel(t *testing.T) { resp, err := svr.GetTimeTickChannel(context.TODO()) assert.Nil(t, err) assert.EqualValues(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) - assert.EqualValues(t, Params.MsgChannelCfg.DataCoordTimeTick, resp.Value) + assert.EqualValues(t, Params.CommonCfg.DataCoordTimeTick, resp.Value) } func TestGetSegmentStates(t *testing.T) { @@ -1080,7 +1080,7 @@ func TestDataNodeTtChannel(t *testing.T) { ttMsgStream, err := svr.msFactory.NewMsgStream(context.TODO()) assert.Nil(t, err) - ttMsgStream.AsProducer([]string{Params.MsgChannelCfg.DataCoordTimeTick}) + ttMsgStream.AsProducer([]string{Params.CommonCfg.DataCoordTimeTick}) ttMsgStream.Start() defer ttMsgStream.Close() info := &NodeInfo{ @@ -1148,7 +1148,7 @@ func TestDataNodeTtChannel(t *testing.T) { }) ttMsgStream, err := svr.msFactory.NewMsgStream(context.TODO()) assert.Nil(t, err) - ttMsgStream.AsProducer([]string{Params.MsgChannelCfg.DataCoordTimeTick}) + ttMsgStream.AsProducer([]string{Params.CommonCfg.DataCoordTimeTick}) ttMsgStream.Start() defer ttMsgStream.Close() info := &NodeInfo{ @@ -1230,7 +1230,7 @@ func TestDataNodeTtChannel(t *testing.T) { ttMsgStream, err := svr.msFactory.NewMsgStream(context.TODO()) assert.Nil(t, err) - ttMsgStream.AsProducer([]string{Params.MsgChannelCfg.DataCoordTimeTick}) + ttMsgStream.AsProducer([]string{Params.CommonCfg.DataCoordTimeTick}) ttMsgStream.Start() defer ttMsgStream.Close() node := &NodeInfo{ @@ -2286,7 +2286,7 @@ func (ms *MockClosePanicMsgstream) Consume() *msgstream.MsgPack { func newTestServer(t *testing.T, receiveCh chan interface{}, opts ...Option) *Server { Params.Init() - Params.MsgChannelCfg.DataCoordTimeTick = Params.MsgChannelCfg.DataCoordTimeTick + strconv.Itoa(rand.Int()) + Params.CommonCfg.DataCoordTimeTick = Params.CommonCfg.DataCoordTimeTick + strconv.Itoa(rand.Int()) var err error factory := msgstream.NewPmsFactory() m := map[string]interface{}{ diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index 42fbbc8fd08fe38778064a38ba8fcf31f0f16867..29ce951b13de705dd3b26cc68c593522cc0b89e4 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -48,7 +48,7 @@ func (s *Server) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringRespon Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, }, - Value: Params.MsgChannelCfg.DataCoordTimeTick, + Value: Params.CommonCfg.DataCoordTimeTick, }, nil } @@ -271,7 +271,7 @@ func (s *Server) GetSegmentInfoChannel(ctx context.Context) (*milvuspb.StringRes Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, }, - Value: Params.MsgChannelCfg.DataCoordSegmentInfo, + Value: Params.CommonCfg.DataCoordSegmentInfo, }, nil } diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index 24ba7a8e70432d7de53b68154ba5ece4cc55ce03..89d5b39d06c8d908828c6f61aa114bea952d7ded 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -207,7 +207,7 @@ func (node *DataNode) initSession() error { // Init function does nothing now. func (node *DataNode) Init() error { log.Info("DataNode Init", - zap.String("TimeTickChannelName", Params.MsgChannelCfg.DataCoordTimeTick), + zap.String("TimeTickChannelName", Params.CommonCfg.DataCoordTimeTick), ) if err := node.initSession(); err != nil { log.Error("DataNode init session failed", zap.Error(err)) @@ -226,7 +226,7 @@ func (node *DataNode) Init() error { return err } log.Info("DataNode Init successfully", - zap.String("MsgChannelSubName", Params.MsgChannelCfg.DataNodeSubName)) + zap.String("MsgChannelSubName", Params.CommonCfg.DataNodeSubName)) return nil } diff --git a/internal/datanode/data_node_test.go b/internal/datanode/data_node_test.go index bc8cf34cc6382f4fbddf7e60c88b9169945b7b0d..b0548a8e5c3863ac30be54c22a9d1d5711d55c23 100644 --- a/internal/datanode/data_node_test.go +++ b/internal/datanode/data_node_test.go @@ -54,7 +54,7 @@ func TestMain(t *testing.M) { Params.DataNodeCfg.InitAlias("datanode-alias-1") Params.Init() // change to specific channel for test - Params.MsgChannelCfg.DataCoordTimeTick = Params.MsgChannelCfg.DataCoordTimeTick + strconv.Itoa(rand.Int()) + Params.CommonCfg.DataCoordTimeTick = Params.CommonCfg.DataCoordTimeTick + strconv.Itoa(rand.Int()) code := t.Run() os.Exit(code) } diff --git a/internal/datanode/flow_graph_dd_node.go b/internal/datanode/flow_graph_dd_node.go index 0423524d9d9927a969194df31cdaf9c31e1d2d00..59b14295917a3e67328d7b8cb4ac76b748158c49 100644 --- a/internal/datanode/flow_graph_dd_node.go +++ b/internal/datanode/flow_graph_dd_node.go @@ -283,7 +283,7 @@ func newDDNode(ctx context.Context, collID UniqueID, vchanInfo *datapb.VchannelI return nil } pChannelName := rootcoord.ToPhysicalChannel(vchanInfo.ChannelName) - deltaChannelName, err := rootcoord.ConvertChannelName(pChannelName, Params.MsgChannelCfg.RootCoordDml, Params.MsgChannelCfg.RootCoordDelta) + deltaChannelName, err := rootcoord.ConvertChannelName(pChannelName, Params.CommonCfg.RootCoordDml, Params.CommonCfg.RootCoordDelta) if err != nil { log.Error(err.Error()) return nil diff --git a/internal/datanode/flow_graph_dmstream_input_node.go b/internal/datanode/flow_graph_dmstream_input_node.go index 49a34bed2c4b7b7c4ac9da9cb2a51795c4541f2c..6ed2c0844faf2cc06012add94c4d8a21304afc6c 100644 --- a/internal/datanode/flow_graph_dmstream_input_node.go +++ b/internal/datanode/flow_graph_dmstream_input_node.go @@ -35,7 +35,7 @@ import ( func newDmInputNode(ctx context.Context, seekPos *internalpb.MsgPosition, dmNodeConfig *nodeConfig) (*flowgraph.InputNode, error) { // subName should be unique, since pchannelName is shared among several collections // consumeSubName := Params.MsgChannelSubName + "-" + strconv.FormatInt(collID, 10) - consumeSubName := fmt.Sprintf("%s-%d-%d", Params.MsgChannelCfg.DataNodeSubName, Params.DataNodeCfg.NodeID, dmNodeConfig.collectionID) + consumeSubName := fmt.Sprintf("%s-%d-%d", Params.CommonCfg.DataNodeSubName, Params.DataNodeCfg.NodeID, dmNodeConfig.collectionID) insertStream, err := dmNodeConfig.msFactory.NewTtMsgStream(ctx) if err != nil { return nil, err diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go index eea70e6fecf992a1fa94448df63d35141344cfba..02323a9056695aca4a5f526a7ca75bce42e7dbc2 100644 --- a/internal/datanode/flow_graph_insert_buffer_node.go +++ b/internal/datanode/flow_graph_insert_buffer_node.go @@ -722,9 +722,9 @@ func newInsertBufferNode(ctx context.Context, collID UniqueID, flushCh <-chan fl if err != nil { return nil, err } - wTt.AsProducer([]string{Params.MsgChannelCfg.DataCoordTimeTick}) + wTt.AsProducer([]string{Params.CommonCfg.DataCoordTimeTick}) metrics.DataNodeNumProducers.WithLabelValues(fmt.Sprint(collID), fmt.Sprint(Params.DataNodeCfg.NodeID)).Inc() - log.Debug("datanode AsProducer", zap.String("TimeTickChannelName", Params.MsgChannelCfg.DataCoordTimeTick)) + log.Debug("datanode AsProducer", zap.String("TimeTickChannelName", Params.CommonCfg.DataCoordTimeTick)) var wTtMsgStream msgstream.MsgStream = wTt wTtMsgStream.Start() diff --git a/internal/distributed/rootcoord/service_test.go b/internal/distributed/rootcoord/service_test.go index ad11ee09e220c2074a9a17998ca4c1a130f728e0..201d6182ed42b732f06c6b947fd06fdd1d8d3750 100644 --- a/internal/distributed/rootcoord/service_test.go +++ b/internal/distributed/rootcoord/service_test.go @@ -82,9 +82,9 @@ func TestGrpcService(t *testing.T) { rootcoord.Params.Init() rootcoord.Params.EtcdCfg.MetaRootPath = fmt.Sprintf("/%d/test/meta", randVal) rootcoord.Params.EtcdCfg.KvRootPath = fmt.Sprintf("/%d/test/kv", randVal) - rootcoord.Params.MsgChannelCfg.RootCoordSubName = fmt.Sprintf("msgChannel%d", randVal) - rootcoord.Params.MsgChannelCfg.RootCoordTimeTick = fmt.Sprintf("timeTick%d", randVal) - rootcoord.Params.MsgChannelCfg.RootCoordStatistics = fmt.Sprintf("stateChannel%d", randVal) + rootcoord.Params.CommonCfg.RootCoordSubName = fmt.Sprintf("msgChannel%d", randVal) + rootcoord.Params.CommonCfg.RootCoordTimeTick = fmt.Sprintf("timeTick%d", randVal) + rootcoord.Params.CommonCfg.RootCoordStatistics = fmt.Sprintf("stateChannel%d", randVal) rootcoord.Params.RootCoordCfg.MaxPartitionNum = 64 rootcoord.Params.CommonCfg.DefaultPartitionName = "_default" diff --git a/internal/indexnode/indexnode.go b/internal/indexnode/indexnode.go index 5215fd6caebbe41dd38d2dec5d8a310fd1c1ade8..d6727d6a63e61a82113bf40625c799d7f8515504 100644 --- a/internal/indexnode/indexnode.go +++ b/internal/indexnode/indexnode.go @@ -141,9 +141,9 @@ func (i *IndexNode) initKnowhere() { C.IndexBuilderInit() // override index builder SIMD type - cSimdType := C.CString(Params.KnowhereCfg.SimdType) + cSimdType := C.CString(Params.CommonCfg.SimdType) cRealSimdType := C.IndexBuilderSetSimdType(cSimdType) - Params.KnowhereCfg.SimdType = C.GoString(cRealSimdType) + Params.CommonCfg.SimdType = C.GoString(cRealSimdType) C.free(unsafe.Pointer(cRealSimdType)) C.free(unsafe.Pointer(cSimdType)) } diff --git a/internal/indexnode/indexnode_mock.go b/internal/indexnode/indexnode_mock.go index 4dba97157b81a1c5aa57cac1fab7055a9f0c6b86..65c5f106d34ed2a019362dff482a198523fc4abe 100644 --- a/internal/indexnode/indexnode_mock.go +++ b/internal/indexnode/indexnode_mock.go @@ -354,7 +354,7 @@ func getMockSystemInfoMetrics( }, SystemConfigurations: metricsinfo.IndexNodeConfiguration{ MinioBucketName: Params.MinioCfg.BucketName, - SimdType: Params.KnowhereCfg.SimdType, + SimdType: Params.CommonCfg.SimdType, }, } diff --git a/internal/indexnode/metrics_info.go b/internal/indexnode/metrics_info.go index fc4ce5c6b399b46c7cd0da001111929a4c60fc5c..08216a63791635bb1f7c538b7cf1f53f29307bcb 100644 --- a/internal/indexnode/metrics_info.go +++ b/internal/indexnode/metrics_info.go @@ -52,7 +52,7 @@ func getSystemInfoMetrics( }, SystemConfigurations: metricsinfo.IndexNodeConfiguration{ MinioBucketName: Params.MinioCfg.BucketName, - SimdType: Params.KnowhereCfg.SimdType, + SimdType: Params.CommonCfg.SimdType, }, } diff --git a/internal/proxy/task_scheduler.go b/internal/proxy/task_scheduler.go index 0f991cd95b81a4341edc2139e66862db5e020fca..b88c31e038cdbaf3c0b31eb624c51fde701534f6 100644 --- a/internal/proxy/task_scheduler.go +++ b/internal/proxy/task_scheduler.go @@ -817,7 +817,7 @@ func (sched *taskScheduler) collectResultLoop() { queryResultMsgStream, _ := sched.msFactory.NewQueryMsgStream(sched.ctx) // proxy didn't need to walk through all the search results in channel, because it no longer has client connections. - consumeSubName := fmt.Sprintf("%s-%d", Params.MsgChannelCfg.ProxySubName, Params.ProxyCfg.ProxyID) + consumeSubName := fmt.Sprintf("%s-%d", Params.CommonCfg.ProxySubName, Params.ProxyCfg.ProxyID) queryResultMsgStream.AsConsumerWithPosition(Params.ProxyCfg.SearchResultChannelNames, consumeSubName, mqwrapper.SubscriptionPositionLatest) log.Debug("Proxy", zap.Strings("SearchResultChannelNames", Params.ProxyCfg.SearchResultChannelNames), zap.Any("consumeSubName", consumeSubName)) diff --git a/internal/querycoord/channel_unsubscribe.go b/internal/querycoord/channel_unsubscribe.go index fdedd12adddbfa6537b1a9c83b0fa51e77b37174..5b2284a809a4e8e406c5aeb3d5c17cb7797f6219 100644 --- a/internal/querycoord/channel_unsubscribe.go +++ b/internal/querycoord/channel_unsubscribe.go @@ -132,7 +132,7 @@ func (csh *channelUnsubscribeHandler) handleChannelUnsubscribeLoop() { nodeID := channelInfo.NodeID for _, collectionChannels := range channelInfo.CollectionChannels { collectionID := collectionChannels.CollectionID - subName := funcutil.GenChannelSubName(Params.MsgChannelCfg.QueryNodeSubName, collectionID, nodeID) + subName := funcutil.GenChannelSubName(Params.CommonCfg.QueryNodeSubName, collectionID, nodeID) err := unsubscribeChannels(csh.ctx, csh.factory, subName, collectionChannels.Channels) if err != nil { log.Debug("unsubscribe channels failed", zap.Int64("nodeID", nodeID)) diff --git a/internal/querycoord/impl.go b/internal/querycoord/impl.go index c6b2474d4cc77bbf07de6e5665e2e3e0dec7bf83..5900747bd4eaef8c74db399d18d84a6e696d7134 100644 --- a/internal/querycoord/impl.go +++ b/internal/querycoord/impl.go @@ -72,7 +72,7 @@ func (qc *QueryCoord) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringR ErrorCode: commonpb.ErrorCode_Success, Reason: "", }, - Value: Params.MsgChannelCfg.QueryCoordTimeTick, + Value: Params.CommonCfg.QueryCoordTimeTick, }, nil } @@ -84,7 +84,7 @@ func (qc *QueryCoord) GetStatisticsChannel(ctx context.Context) (*milvuspb.Strin ErrorCode: commonpb.ErrorCode_Success, Reason: "", }, - Value: Params.MsgChannelCfg.QueryNodeStats, + Value: Params.CommonCfg.QueryNodeStats, }, nil } diff --git a/internal/querycoord/meta.go b/internal/querycoord/meta.go index 60ce145629ef5ed13c0fed12a72ee4463315320e..952091b9966352604810e63cba312de1b30ac9b3 100644 --- a/internal/querycoord/meta.go +++ b/internal/querycoord/meta.go @@ -784,8 +784,8 @@ func (m *MetaReplica) createQueryChannel(collectionID UniqueID) *querypb.QueryCh // all collection use the same query channel colIDForAssignChannel := UniqueID(0) - searchPrefix := Params.MsgChannelCfg.QueryCoordSearch - searchResultPrefix := Params.MsgChannelCfg.QueryCoordSearchResult + searchPrefix := Params.CommonCfg.QueryCoordSearch + searchResultPrefix := Params.CommonCfg.QueryCoordSearchResult allocatedQueryChannel := searchPrefix + "-" + strconv.FormatInt(colIDForAssignChannel, 10) allocatedQueryResultChannel := searchResultPrefix + "-" + strconv.FormatInt(colIDForAssignChannel, 10) log.Debug("query coordinator create query channel", zap.String("queryChannelName", allocatedQueryChannel), zap.String("queryResultChannelName", allocatedQueryResultChannel)) diff --git a/internal/querycoord/metrics_info.go b/internal/querycoord/metrics_info.go index 76c41f0d94d974737a6e3d19900e96cbc0c7de79..4ce20557c594c75e62e9ab16927de7073192f2b2 100644 --- a/internal/querycoord/metrics_info.go +++ b/internal/querycoord/metrics_info.go @@ -58,8 +58,8 @@ func getSystemInfoMetrics( ID: qc.session.ServerID, }, SystemConfigurations: metricsinfo.QueryCoordConfiguration{ - SearchChannelPrefix: Params.MsgChannelCfg.QueryCoordSearch, - SearchResultChannelPrefix: Params.MsgChannelCfg.QueryCoordSearchResult, + SearchChannelPrefix: Params.CommonCfg.QueryCoordSearch, + SearchResultChannelPrefix: Params.CommonCfg.QueryCoordSearchResult, }, }, ConnectedNodes: make([]metricsinfo.QueryNodeInfos, 0), diff --git a/internal/querycoord/query_coord.go b/internal/querycoord/query_coord.go index d9c75cf07319ae1fc212ee9068bfa6cc585859d0..aeebce6bfacc17b4a007831a0a7d6577b4443948 100644 --- a/internal/querycoord/query_coord.go +++ b/internal/querycoord/query_coord.go @@ -280,8 +280,8 @@ func NewQueryCoord(ctx context.Context, factory msgstream.Factory) (*QueryCoord, rand.Seed(time.Now().UnixNano()) queryChannels := make([]*queryChannelInfo, 0) channelID := len(queryChannels) - searchPrefix := Params.MsgChannelCfg.QueryCoordSearch - searchResultPrefix := Params.MsgChannelCfg.QueryCoordSearchResult + searchPrefix := Params.CommonCfg.QueryCoordSearch + searchResultPrefix := Params.CommonCfg.QueryCoordSearchResult allocatedQueryChannel := searchPrefix + "-" + strconv.FormatInt(int64(channelID), 10) allocatedQueryResultChannel := searchResultPrefix + "-" + strconv.FormatInt(int64(channelID), 10) diff --git a/internal/querycoord/query_coord_test.go b/internal/querycoord/query_coord_test.go index 90253f73437465227edeaf84ca1c4f58033810bb..cde2c484149964c472170201b014b26985d4ec64 100644 --- a/internal/querycoord/query_coord_test.go +++ b/internal/querycoord/query_coord_test.go @@ -48,11 +48,11 @@ func setup() { func refreshParams() { rand.Seed(time.Now().UnixNano()) suffix := "-test-query-Coord" + strconv.FormatInt(rand.Int63(), 10) - Params.MsgChannelCfg.QueryNodeStats = Params.MsgChannelCfg.QueryNodeStats + suffix - Params.MsgChannelCfg.QueryCoordTimeTick = Params.MsgChannelCfg.QueryCoordTimeTick + suffix + Params.CommonCfg.QueryNodeStats = Params.CommonCfg.QueryNodeStats + suffix + Params.CommonCfg.QueryCoordTimeTick = Params.CommonCfg.QueryCoordTimeTick + suffix Params.EtcdCfg.MetaRootPath = Params.EtcdCfg.MetaRootPath + suffix - Params.MsgChannelCfg.RootCoordDml = "Dml" - Params.MsgChannelCfg.RootCoordDelta = "delta" + Params.CommonCfg.RootCoordDml = "Dml" + Params.CommonCfg.RootCoordDelta = "delta" GlobalSegmentInfos = make(map[UniqueID]*querypb.SegmentInfo) } diff --git a/internal/querycoord/task.go b/internal/querycoord/task.go index 61e7689b050be22a223b761ac1887933843a4587..eac5e357721f43ca08a326b95020afc005aee915 100644 --- a/internal/querycoord/task.go +++ b/internal/querycoord/task.go @@ -1999,7 +1999,7 @@ func getSizeOfLoadSegmentReq(req *querypb.LoadSegmentsRequest) int { } func generateWatchDeltaChannelInfo(info *datapb.VchannelInfo) (*datapb.VchannelInfo, error) { - deltaChannelName, err := rootcoord.ConvertChannelName(info.ChannelName, Params.MsgChannelCfg.RootCoordDml, Params.MsgChannelCfg.RootCoordDelta) + deltaChannelName, err := rootcoord.ConvertChannelName(info.ChannelName, Params.CommonCfg.RootCoordDml, Params.CommonCfg.RootCoordDelta) if err != nil { return nil, err } diff --git a/internal/querynode/impl.go b/internal/querynode/impl.go index 5a07c1190a9b9410f4b07c380f55e96f062a55a6..274576c43d4d58e76f23fef85fa859cf73a5f4f0 100644 --- a/internal/querynode/impl.go +++ b/internal/querynode/impl.go @@ -71,7 +71,7 @@ func (node *QueryNode) GetTimeTickChannel(ctx context.Context) (*milvuspb.String ErrorCode: commonpb.ErrorCode_Success, Reason: "", }, - Value: Params.MsgChannelCfg.QueryCoordTimeTick, + Value: Params.CommonCfg.QueryCoordTimeTick, }, nil } @@ -83,7 +83,7 @@ func (node *QueryNode) GetStatisticsChannel(ctx context.Context) (*milvuspb.Stri ErrorCode: commonpb.ErrorCode_Success, Reason: "", }, - Value: Params.MsgChannelCfg.QueryNodeStats, + Value: Params.CommonCfg.QueryNodeStats, }, nil } diff --git a/internal/querynode/metrics_info.go b/internal/querynode/metrics_info.go index 0c1c6fd16e3f5c5138d5ed708394b9337780e36f..0212d5cfdc8482e194c14559a8bda127a710842b 100644 --- a/internal/querynode/metrics_info.go +++ b/internal/querynode/metrics_info.go @@ -55,7 +55,7 @@ func getSystemInfoMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest, RetrievePulsarBufSize: Params.QueryNodeCfg.RetrievePulsarBufSize, RetrieveResultReceiveBufSize: Params.QueryNodeCfg.RetrieveResultReceiveBufSize, - SimdType: Params.KnowhereCfg.SimdType, + SimdType: Params.CommonCfg.SimdType, }, } metricsinfo.FillDeployMetricsWithEnv(&nodeInfos.SystemInfo) diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index 559b448f8e4ac185e4a224fa62d86ae25193b0e0..6c7d2165b49da754016c67c3ed6c4ae843285b53 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -174,9 +174,9 @@ func (node *QueryNode) InitSegcore() { C.SegcoreSetChunkRows(cChunkRows) // override segcore SIMD type - cSimdType := C.CString(Params.KnowhereCfg.SimdType) + cSimdType := C.CString(Params.CommonCfg.SimdType) cRealSimdType := C.SegcoreSetSimdType(cSimdType) - Params.KnowhereCfg.SimdType = C.GoString(cRealSimdType) + Params.CommonCfg.SimdType = C.GoString(cRealSimdType) C.free(unsafe.Pointer(cRealSimdType)) C.free(unsafe.Pointer(cSimdType)) } diff --git a/internal/querynode/query_node_test.go b/internal/querynode/query_node_test.go index 5879a253362256ed82cd6e101911a8bd81a81540..700b975b2675ead966e2ea556176966caafa2114 100644 --- a/internal/querynode/query_node_test.go +++ b/internal/querynode/query_node_test.go @@ -237,7 +237,7 @@ func newMessageStreamFactory() (msgstream.Factory, error) { func TestMain(m *testing.M) { setup() - Params.MsgChannelCfg.QueryNodeStats = Params.MsgChannelCfg.QueryNodeStats + strconv.Itoa(rand.Int()) + Params.CommonCfg.QueryNodeStats = Params.CommonCfg.QueryNodeStats + strconv.Itoa(rand.Int()) exitCode := m.Run() os.Exit(exitCode) } diff --git a/internal/querynode/stats_service.go b/internal/querynode/stats_service.go index d8b2e01234a21109e98cf7635f0b4f6b093db00c..d2b9bca78218cc582ac04acf125d1c2f536b241b 100644 --- a/internal/querynode/stats_service.go +++ b/internal/querynode/stats_service.go @@ -50,7 +50,7 @@ func (sService *statsService) start() { sleepTimeInterval := Params.QueryNodeCfg.StatsPublishInterval // start pulsar - producerChannels := []string{Params.MsgChannelCfg.QueryNodeStats} + producerChannels := []string{Params.CommonCfg.QueryNodeStats} statsStream, _ := sService.msFactory.NewMsgStream(sService.ctx) statsStream.AsProducer(producerChannels) diff --git a/internal/querynode/stats_service_test.go b/internal/querynode/stats_service_test.go index 687a23d365d326e71e90dda4872e0e2118aeeb36..c1c458c8d1af7ae18fe00fc16dd701f0d5cd2bc7 100644 --- a/internal/querynode/stats_service_test.go +++ b/internal/querynode/stats_service_test.go @@ -50,7 +50,7 @@ func TestSegmentManagement_sendSegmentStatistic(t *testing.T) { const receiveBufSize = 1024 // start pulsar - producerChannels := []string{Params.MsgChannelCfg.QueryNodeStats} + producerChannels := []string{Params.CommonCfg.QueryNodeStats} msFactory := msgstream.NewPmsFactory() m := map[string]interface{}{ diff --git a/internal/querynode/task.go b/internal/querynode/task.go index 5d6c0482f1aaa7316b3f90f31cb71502ef250aa7..05f2bea4b1898310f2eae8ef044af994a3afbc63 100644 --- a/internal/querynode/task.go +++ b/internal/querynode/task.go @@ -159,7 +159,7 @@ func (r *addQueryChannelTask) Execute(ctx context.Context) error { return err } consumeChannels := []string{r.req.QueryChannel} - consumeSubName := funcutil.GenChannelSubName(Params.MsgChannelCfg.QueryNodeSubName, collectionID, Params.QueryNodeCfg.QueryNodeID) + consumeSubName := funcutil.GenChannelSubName(Params.CommonCfg.QueryNodeSubName, collectionID, Params.QueryNodeCfg.QueryNodeID) sc.queryMsgStream.AsConsumer(consumeChannels, consumeSubName) metrics.QueryNodeNumConsumers.WithLabelValues(fmt.Sprint(collectionID), fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Inc() @@ -312,7 +312,7 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error { } }() - consumeSubName := funcutil.GenChannelSubName(Params.MsgChannelCfg.QueryNodeSubName, collectionID, Params.QueryNodeCfg.QueryNodeID) + consumeSubName := funcutil.GenChannelSubName(Params.CommonCfg.QueryNodeSubName, collectionID, Params.QueryNodeCfg.QueryNodeID) // group channels by to seeking or consuming channel2SeekPosition := make(map[string]*internalpb.MsgPosition) @@ -532,7 +532,7 @@ func (w *watchDeltaChannelsTask) Execute(ctx context.Context) error { } channel2FlowGraph := w.node.dataSyncService.addFlowGraphsForDeltaChannels(collectionID, vDeltaChannels) - consumeSubName := funcutil.GenChannelSubName(Params.MsgChannelCfg.QueryNodeSubName, collectionID, Params.QueryNodeCfg.QueryNodeID) + consumeSubName := funcutil.GenChannelSubName(Params.CommonCfg.QueryNodeSubName, collectionID, Params.QueryNodeCfg.QueryNodeID) // channels as consumer for _, channel := range vDeltaChannels { fg := channel2FlowGraph[channel] diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index 22e7a5e80c33020ea7326a3c5920659feb129c15..c5094baf3192bf340c91c373459976adb84911fa 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -441,18 +441,18 @@ func (c *Core) setMsgStreams() error { if Params.PulsarCfg.Address == "" { return fmt.Errorf("pulsar address is empty") } - if Params.MsgChannelCfg.RootCoordSubName == "" { + if Params.CommonCfg.RootCoordSubName == "" { return fmt.Errorf("RootCoordSubName is empty") } // rootcoord time tick channel - if Params.MsgChannelCfg.RootCoordTimeTick == "" { + if Params.CommonCfg.RootCoordTimeTick == "" { return fmt.Errorf("timeTickChannel is empty") } timeTickStream, _ := c.msFactory.NewMsgStream(c.ctx) metrics.RootCoordNumOfMsgStream.Inc() - timeTickStream.AsProducer([]string{Params.MsgChannelCfg.RootCoordTimeTick}) - log.Debug("RootCoord register timetick producer success", zap.String("channel name", Params.MsgChannelCfg.RootCoordTimeTick)) + timeTickStream.AsProducer([]string{Params.CommonCfg.RootCoordTimeTick}) + log.Debug("RootCoord register timetick producer success", zap.String("channel name", Params.CommonCfg.RootCoordTimeTick)) c.SendTimeTick = func(t typeutil.Timestamp, reason string) error { msgPack := ms.MsgPack{} @@ -1186,7 +1186,7 @@ func (c *Core) Start() error { } log.Debug(typeutil.RootCoordRole, zap.Int64("node id", c.session.ServerID)) - log.Debug(typeutil.RootCoordRole, zap.String("time tick channel name", Params.MsgChannelCfg.RootCoordTimeTick)) + log.Debug(typeutil.RootCoordRole, zap.String("time tick channel name", Params.CommonCfg.RootCoordTimeTick)) c.startOnce.Do(func() { if err := c.proxyManager.WatchProxy(); err != nil { @@ -1261,7 +1261,7 @@ func (c *Core) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse ErrorCode: commonpb.ErrorCode_Success, Reason: "", }, - Value: Params.MsgChannelCfg.RootCoordTimeTick, + Value: Params.CommonCfg.RootCoordTimeTick, }, nil } @@ -1272,7 +1272,7 @@ func (c *Core) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringRespon ErrorCode: commonpb.ErrorCode_Success, Reason: "", }, - Value: Params.MsgChannelCfg.RootCoordStatistics, + Value: Params.CommonCfg.RootCoordStatistics, }, nil } diff --git a/internal/rootcoord/root_coord_test.go b/internal/rootcoord/root_coord_test.go index aa87590d23a1c68f71f9449654cf8e3c184b5b99..a557861734c04ca6e34166092e14c4642560cd82 100644 --- a/internal/rootcoord/root_coord_test.go +++ b/internal/rootcoord/root_coord_test.go @@ -559,13 +559,13 @@ func TestRootCoord(t *testing.T) { core, err := NewCore(ctx, coreFactory) assert.Nil(t, err) randVal := rand.Int() - Params.MsgChannelCfg.RootCoordTimeTick = fmt.Sprintf("rootcoord-time-tick-%d", randVal) - Params.MsgChannelCfg.RootCoordStatistics = fmt.Sprintf("rootcoord-statistics-%d", randVal) + Params.CommonCfg.RootCoordTimeTick = fmt.Sprintf("rootcoord-time-tick-%d", randVal) + Params.CommonCfg.RootCoordStatistics = fmt.Sprintf("rootcoord-statistics-%d", randVal) Params.EtcdCfg.MetaRootPath = fmt.Sprintf("/%d/%s", randVal, Params.EtcdCfg.MetaRootPath) Params.EtcdCfg.KvRootPath = fmt.Sprintf("/%d/%s", randVal, Params.EtcdCfg.KvRootPath) - Params.MsgChannelCfg.RootCoordSubName = fmt.Sprintf("subname-%d", randVal) - Params.MsgChannelCfg.RootCoordDml = fmt.Sprintf("rootcoord-dml-test-%d", randVal) - Params.MsgChannelCfg.RootCoordDelta = fmt.Sprintf("rootcoord-delta-test-%d", randVal) + Params.CommonCfg.RootCoordSubName = fmt.Sprintf("subname-%d", randVal) + Params.CommonCfg.RootCoordDml = fmt.Sprintf("rootcoord-dml-test-%d", randVal) + Params.CommonCfg.RootCoordDelta = fmt.Sprintf("rootcoord-delta-test-%d", randVal) etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) assert.NoError(t, err) @@ -626,7 +626,7 @@ func TestRootCoord(t *testing.T) { assert.Nil(t, err) timeTickStream, _ := tmpFactory.NewMsgStream(ctx) - timeTickStream.AsConsumer([]string{Params.MsgChannelCfg.RootCoordTimeTick}, Params.MsgChannelCfg.RootCoordSubName) + timeTickStream.AsConsumer([]string{Params.CommonCfg.RootCoordTimeTick}, Params.CommonCfg.RootCoordSubName) timeTickStream.Start() dmlStream, _ := tmpFactory.NewMsgStream(ctx) @@ -723,7 +723,7 @@ func TestRootCoord(t *testing.T) { createMeta, err := core.MetaTable.GetCollectionByName(collName, 0) assert.Nil(t, err) - dmlStream.AsConsumer([]string{createMeta.PhysicalChannelNames[0]}, Params.MsgChannelCfg.RootCoordSubName) + dmlStream.AsConsumer([]string{createMeta.PhysicalChannelNames[0]}, Params.CommonCfg.RootCoordSubName) dmlStream.Start() pChanMap := core.MetaTable.ListCollectionPhysicalChannels() @@ -2311,11 +2311,11 @@ func TestRootCoord2(t *testing.T) { randVal := rand.Int() - Params.MsgChannelCfg.RootCoordTimeTick = fmt.Sprintf("rootcoord-time-tick-%d", randVal) - Params.MsgChannelCfg.RootCoordStatistics = fmt.Sprintf("rootcoord-statistics-%d", randVal) + Params.CommonCfg.RootCoordTimeTick = fmt.Sprintf("rootcoord-time-tick-%d", randVal) + Params.CommonCfg.RootCoordStatistics = fmt.Sprintf("rootcoord-statistics-%d", randVal) Params.EtcdCfg.MetaRootPath = fmt.Sprintf("/%d/%s", randVal, Params.EtcdCfg.MetaRootPath) Params.EtcdCfg.KvRootPath = fmt.Sprintf("/%d/%s", randVal, Params.EtcdCfg.KvRootPath) - Params.MsgChannelCfg.RootCoordSubName = fmt.Sprintf("subname-%d", randVal) + Params.CommonCfg.RootCoordSubName = fmt.Sprintf("subname-%d", randVal) dm := &dataMock{randVal: randVal} err = core.SetDataCoord(ctx, dm) @@ -2361,7 +2361,7 @@ func TestRootCoord2(t *testing.T) { assert.Nil(t, err) timeTickStream, _ := msFactory.NewMsgStream(ctx) - timeTickStream.AsConsumer([]string{Params.MsgChannelCfg.RootCoordTimeTick}, Params.MsgChannelCfg.RootCoordSubName) + timeTickStream.AsConsumer([]string{Params.CommonCfg.RootCoordTimeTick}, Params.CommonCfg.RootCoordSubName) timeTickStream.Start() time.Sleep(100 * time.Millisecond) @@ -2404,7 +2404,7 @@ func TestRootCoord2(t *testing.T) { collInfo, err := core.MetaTable.GetCollectionByName(collName, 0) assert.Nil(t, err) dmlStream, _ := msFactory.NewMsgStream(ctx) - dmlStream.AsConsumer([]string{collInfo.PhysicalChannelNames[0]}, Params.MsgChannelCfg.RootCoordSubName) + dmlStream.AsConsumer([]string{collInfo.PhysicalChannelNames[0]}, Params.CommonCfg.RootCoordSubName) dmlStream.Start() msgs := getNotTtMsg(ctx, 1, dmlStream.Chan()) @@ -2589,11 +2589,11 @@ func TestCheckFlushedSegments(t *testing.T) { assert.Nil(t, err) randVal := rand.Int() - Params.MsgChannelCfg.RootCoordTimeTick = fmt.Sprintf("rootcoord-time-tick-%d", randVal) - Params.MsgChannelCfg.RootCoordStatistics = fmt.Sprintf("rootcoord-statistics-%d", randVal) + Params.CommonCfg.RootCoordTimeTick = fmt.Sprintf("rootcoord-time-tick-%d", randVal) + Params.CommonCfg.RootCoordStatistics = fmt.Sprintf("rootcoord-statistics-%d", randVal) Params.EtcdCfg.MetaRootPath = fmt.Sprintf("/%d/%s", randVal, Params.EtcdCfg.MetaRootPath) Params.EtcdCfg.KvRootPath = fmt.Sprintf("/%d/%s", randVal, Params.EtcdCfg.KvRootPath) - Params.MsgChannelCfg.RootCoordSubName = fmt.Sprintf("subname-%d", randVal) + Params.CommonCfg.RootCoordSubName = fmt.Sprintf("subname-%d", randVal) dm := &dataMock{randVal: randVal} err = core.SetDataCoord(ctx, dm) @@ -2642,7 +2642,7 @@ func TestCheckFlushedSegments(t *testing.T) { assert.Nil(t, err) timeTickStream, _ := msFactory.NewMsgStream(ctx) - timeTickStream.AsConsumer([]string{Params.MsgChannelCfg.RootCoordTimeTick}, Params.MsgChannelCfg.RootCoordSubName) + timeTickStream.AsConsumer([]string{Params.CommonCfg.RootCoordTimeTick}, Params.CommonCfg.RootCoordSubName) timeTickStream.Start() time.Sleep(100 * time.Millisecond) @@ -2755,11 +2755,11 @@ func TestRootCoord_CheckZeroShardsNum(t *testing.T) { core, err := NewCore(ctx, msFactory) assert.Nil(t, err) randVal := rand.Int() - Params.MsgChannelCfg.RootCoordTimeTick = fmt.Sprintf("rootcoord-time-tick-%d", randVal) - Params.MsgChannelCfg.RootCoordStatistics = fmt.Sprintf("rootcoord-statistics-%d", randVal) + Params.CommonCfg.RootCoordTimeTick = fmt.Sprintf("rootcoord-time-tick-%d", randVal) + Params.CommonCfg.RootCoordStatistics = fmt.Sprintf("rootcoord-statistics-%d", randVal) Params.EtcdCfg.MetaRootPath = fmt.Sprintf("/%d/%s", randVal, Params.EtcdCfg.MetaRootPath) Params.EtcdCfg.KvRootPath = fmt.Sprintf("/%d/%s", randVal, Params.EtcdCfg.KvRootPath) - Params.MsgChannelCfg.RootCoordSubName = fmt.Sprintf("subname-%d", randVal) + Params.CommonCfg.RootCoordSubName = fmt.Sprintf("subname-%d", randVal) dm := &dataMock{randVal: randVal} err = core.SetDataCoord(ctx, dm) @@ -2809,7 +2809,7 @@ func TestRootCoord_CheckZeroShardsNum(t *testing.T) { assert.Nil(t, err) timeTickStream, _ := msFactory.NewMsgStream(ctx) - timeTickStream.AsConsumer([]string{Params.MsgChannelCfg.RootCoordTimeTick}, Params.MsgChannelCfg.RootCoordSubName) + timeTickStream.AsConsumer([]string{Params.CommonCfg.RootCoordTimeTick}, Params.CommonCfg.RootCoordSubName) timeTickStream.Start() time.Sleep(100 * time.Millisecond) diff --git a/internal/rootcoord/task.go b/internal/rootcoord/task.go index eedd95075aeca23de1fa7d189b02d33452cef371..cf600d92cb2d626a739a2dbc52d5bd5fa32f15a6 100644 --- a/internal/rootcoord/task.go +++ b/internal/rootcoord/task.go @@ -149,7 +149,7 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error { chanNames[i] = ToPhysicalChannel(vchanNames[i]) deltaChanNames[i] = t.core.chanTimeTick.getDeltaChannelName() - deltaChanName, err1 := ConvertChannelName(chanNames[i], Params.MsgChannelCfg.RootCoordDml, Params.MsgChannelCfg.RootCoordDelta) + deltaChanName, err1 := ConvertChannelName(chanNames[i], Params.CommonCfg.RootCoordDml, Params.CommonCfg.RootCoordDelta) if err1 != nil || deltaChanName != deltaChanNames[i] { return fmt.Errorf("dmlChanName %s and deltaChanName %s mis-match", chanNames[i], deltaChanNames[i]) } @@ -363,7 +363,7 @@ func (t *DropCollectionReqTask) Execute(ctx context.Context) error { // remove delta channels deltaChanNames := make([]string, len(collMeta.PhysicalChannelNames)) for i, chanName := range collMeta.PhysicalChannelNames { - if deltaChanNames[i], err = ConvertChannelName(chanName, Params.MsgChannelCfg.RootCoordDml, Params.MsgChannelCfg.RootCoordDelta); err != nil { + if deltaChanNames[i], err = ConvertChannelName(chanName, Params.CommonCfg.RootCoordDml, Params.CommonCfg.RootCoordDelta); err != nil { return err } } diff --git a/internal/rootcoord/timestamp_test.go b/internal/rootcoord/timestamp_test.go index 6eabd97ec96d36f186d8ccc73ce978d101fa9510..e5f7a728d98a0f501070f85ff7c13effc3500cf2 100644 --- a/internal/rootcoord/timestamp_test.go +++ b/internal/rootcoord/timestamp_test.go @@ -88,11 +88,11 @@ func BenchmarkAllocTimestamp(b *testing.B) { randVal := rand.Int() - Params.MsgChannelCfg.RootCoordTimeTick = fmt.Sprintf("master-time-tick-%d", randVal) - Params.MsgChannelCfg.RootCoordStatistics = fmt.Sprintf("master-statistics-%d", randVal) + Params.CommonCfg.RootCoordTimeTick = fmt.Sprintf("master-time-tick-%d", randVal) + Params.CommonCfg.RootCoordStatistics = fmt.Sprintf("master-statistics-%d", randVal) Params.EtcdCfg.MetaRootPath = fmt.Sprintf("/%d/%s", randVal, Params.EtcdCfg.MetaRootPath) Params.EtcdCfg.KvRootPath = fmt.Sprintf("/%d/%s", randVal, Params.EtcdCfg.KvRootPath) - Params.MsgChannelCfg.RootCoordSubName = fmt.Sprintf("subname-%d", randVal) + Params.CommonCfg.RootCoordSubName = fmt.Sprintf("subname-%d", randVal) err = core.SetDataCoord(ctx, &tbd{}) assert.Nil(b, err) diff --git a/internal/rootcoord/timeticksync.go b/internal/rootcoord/timeticksync.go index 1da67efc98b5283275ec0f17e05993725b42ffa7..81a6e48faf7e26f09e3814986fdbb7fcbe1d3470 100644 --- a/internal/rootcoord/timeticksync.go +++ b/internal/rootcoord/timeticksync.go @@ -87,9 +87,9 @@ func (c *chanTsMsg) getTimetick(channelName string) typeutil.Timestamp { func newTimeTickSync(ctx context.Context, sourceID int64, factory msgstream.Factory, chanMap map[typeutil.UniqueID][]string) *timetickSync { // initialize dml channels used for insert - dmlChannels := newDmlChannels(ctx, factory, Params.MsgChannelCfg.RootCoordDml, Params.RootCoordCfg.DmlChannelNum) + dmlChannels := newDmlChannels(ctx, factory, Params.CommonCfg.RootCoordDml, Params.RootCoordCfg.DmlChannelNum) // initialize delta channels used for delete, share Params.DmlChannelNum with dmlChannels - deltaChannels := newDmlChannels(ctx, factory, Params.MsgChannelCfg.RootCoordDelta, Params.RootCoordCfg.DmlChannelNum) + deltaChannels := newDmlChannels(ctx, factory, Params.CommonCfg.RootCoordDelta, Params.RootCoordCfg.DmlChannelNum) // recover physical channels for all collections for collID, chanNames := range chanMap { @@ -99,7 +99,7 @@ func newTimeTickSync(ctx context.Context, sourceID int64, factory msgstream.Fact var err error deltaChanNames := make([]string, len(chanNames)) for i, chanName := range chanNames { - deltaChanNames[i], err = ConvertChannelName(chanName, Params.MsgChannelCfg.RootCoordDml, Params.MsgChannelCfg.RootCoordDelta) + deltaChanNames[i], err = ConvertChannelName(chanName, Params.CommonCfg.RootCoordDml, Params.CommonCfg.RootCoordDelta) if err != nil { log.Error("failed to convert dml channel name to delta channel name", zap.String("chanName", chanName)) panic("invalid dml channel name " + chanName) diff --git a/internal/rootcoord/timeticksync_test.go b/internal/rootcoord/timeticksync_test.go index c72b6169d6ee5af9db4cf1ef4454138fd1f5216b..21da320ea6d02924f1bf2daec6f0369db8dcf411 100644 --- a/internal/rootcoord/timeticksync_test.go +++ b/internal/rootcoord/timeticksync_test.go @@ -45,8 +45,8 @@ func TestTimetickSync(t *testing.T) { //} Params.RootCoordCfg.DmlChannelNum = 2 - Params.MsgChannelCfg.RootCoordDml = "rootcoord-dml" - Params.MsgChannelCfg.RootCoordDelta = "rootcoord-delta" + Params.CommonCfg.RootCoordDml = "rootcoord-dml" + Params.CommonCfg.RootCoordDelta = "rootcoord-delta" ttSync := newTimeTickSync(ctx, sourceID, factory, nil) var wg sync.WaitGroup diff --git a/internal/util/paramtable/base_table.go b/internal/util/paramtable/base_table.go index e2c3ff7e6bcdbab2d07e3099bcebd4b68f6ac434..c70fb6ba39f407dfb4dd8ff8cb8b0703c29fa7df 100644 --- a/internal/util/paramtable/base_table.go +++ b/internal/util/paramtable/base_table.go @@ -137,11 +137,35 @@ func (gp *BaseTable) Load(key string) (string, error) { return gp.params.Load(strings.ToLower(key)) } +// Load2 loads an object with multiple @keys, return the first successful value. +// If all keys not exist, return error. +// This is to be compatible with old configuration file. +func (gp *BaseTable) Load2(keys []string) (string, error) { + for _, key := range keys { + if str, err := gp.params.Load(strings.ToLower(key)); err == nil { + return str, nil + } + } + return "", fmt.Errorf("invalid keys: %v", keys) +} + // LoadWithDefault loads an object with @key. If the object does not exist, @defaultValue will be returned. func (gp *BaseTable) LoadWithDefault(key, defaultValue string) string { return gp.params.LoadWithDefault(strings.ToLower(key), defaultValue) } +// LoadWithDefault2 loads an object with multiple @keys, return the first successful value. +// If all keys not exist, return @defaultValue. +// This is to be compatible with old configuration file. +func (gp *BaseTable) LoadWithDefault2(keys []string, defaultValue string) string { + for _, key := range keys { + if str, err := gp.params.Load(strings.ToLower(key)); err == nil { + return str + } + } + return defaultValue +} + // LoadRange loads objects with range @startKey to @endKey with @limit number of objects. func (gp *BaseTable) LoadRange(key, endKey string, limit int) ([]string, []string, error) { return gp.params.LoadRange(strings.ToLower(key), strings.ToLower(endKey), limit) diff --git a/internal/util/paramtable/base_table_test.go b/internal/util/paramtable/base_table_test.go index a6b811b844cb5a4b44dc1583df647876039f96d4..d4e9cda91a9d7e1d86ba7d646166a920b0f456d9 100644 --- a/internal/util/paramtable/base_table_test.go +++ b/internal/util/paramtable/base_table_test.go @@ -73,6 +73,23 @@ func TestBaseTable_LoadFromKVPair(t *testing.T) { v, err = baseParams.Load("k2") assert.Nil(t, err) assert.Equal(t, "v2", v) + + v, err = baseParams.Load2([]string{"k2_new"}) + assert.NotNil(t, err) + assert.Equal(t, "", v) + + v, err = baseParams.Load2([]string{"k2_new", "k2"}) + assert.Nil(t, err) + assert.Equal(t, "v2", v) + + v = baseParams.LoadWithDefault("k2_new", "v2_new") + assert.Equal(t, "v2_new", v) + + v = baseParams.LoadWithDefault2([]string{"k2_new"}, "v2_new") + assert.Equal(t, "v2_new", v) + + v = baseParams.LoadWithDefault2([]string{"k2_new", "k2"}, "v2_new") + assert.Equal(t, "v2", v) } func TestBaseTable_LoadRange(t *testing.T) { diff --git a/internal/util/paramtable/component_param.go b/internal/util/paramtable/component_param.go index 6de27855a3a1b5c33257f3c06f15a09f4dcdc602..fb244a284c43d78878d972adaee7edd74ffb4319 100644 --- a/internal/util/paramtable/component_param.go +++ b/internal/util/paramtable/component_param.go @@ -34,9 +34,7 @@ type ComponentParam struct { ServiceParam once sync.Once - CommonCfg commonConfig - KnowhereCfg knowhereConfig - MsgChannelCfg msgChannelConfig + CommonCfg commonConfig RootCoordCfg rootCoordConfig ProxyCfg proxyConfig @@ -60,8 +58,6 @@ func (p *ComponentParam) Init() { p.ServiceParam.Init() p.CommonCfg.init(&p.BaseTable) - p.KnowhereCfg.init(&p.BaseTable) - p.MsgChannelCfg.init(&p.BaseTable) p.RootCoordCfg.init(&p.BaseTable) p.ProxyCfg.init(&p.BaseTable) @@ -84,54 +80,6 @@ func (p *ComponentParam) SetLogConfig(role string) { type commonConfig struct { Base *BaseTable - DefaultPartitionName string - DefaultIndexName string - RetentionDuration int64 -} - -func (p *commonConfig) init(base *BaseTable) { - p.Base = base - - p.initDefaultPartitionName() - p.initDefaultIndexName() - p.initRetentionDuration() -} - -func (p *commonConfig) initDefaultPartitionName() { - p.DefaultPartitionName = p.Base.LoadWithDefault("common.defaultPartitionName", "_default") -} - -func (p *commonConfig) initDefaultIndexName() { - p.DefaultIndexName = p.Base.LoadWithDefault("common.defaultIndexName", "_default_idx") -} - -func (p *commonConfig) initRetentionDuration() { - p.RetentionDuration = p.Base.ParseInt64WithDefault("common.retentionDuration", DefaultRetentionDuration) -} - -/////////////////////////////////////////////////////////////////////////////// -// --- knowhere --- -type knowhereConfig struct { - Base *BaseTable - - SimdType string -} - -func (p *knowhereConfig) init(base *BaseTable) { - p.Base = base - - p.initSimdType() -} - -func (p *knowhereConfig) initSimdType() { - p.SimdType = p.Base.LoadWithDefault("knowhere.simdType", "auto") -} - -/////////////////////////////////////////////////////////////////////////////// -// --- msgChannel --- -type msgChannelConfig struct { - Base *BaseTable - ClusterPrefix string ProxySubName string @@ -153,9 +101,15 @@ type msgChannelConfig struct { DataCoordSegmentInfo string DataCoordSubName string DataNodeSubName string + + DefaultPartitionName string + DefaultIndexName string + RetentionDuration int64 + + SimdType string } -func (p *msgChannelConfig) init(base *BaseTable) { +func (p *commonConfig) init(base *BaseTable) { p.Base = base // must init cluster prefix first @@ -180,18 +134,28 @@ func (p *msgChannelConfig) init(base *BaseTable) { p.initDataCoordSegmentInfo() p.initDataCoordSubName() p.initDataNodeSubName() + + p.initDefaultPartitionName() + p.initDefaultIndexName() + p.initRetentionDuration() + + p.initSimdType() } -func (p *msgChannelConfig) initClusterPrefix() { - str, err := p.Base.Load("msgChannel.chanNamePrefix.cluster") +func (p *commonConfig) initClusterPrefix() { + keys := []string{ + "common.chanNamePrefix.cluster", + "msgChannel.chanNamePrefix.cluster", + } + str, err := p.Base.Load2(keys) if err != nil { panic(err) } p.ClusterPrefix = str } -func (p *msgChannelConfig) initChanNamePrefix(cfg string) string { - value, err := p.Base.Load(cfg) +func (p *commonConfig) initChanNamePrefix(keys []string) string { + value, err := p.Base.Load2(keys) if err != nil { panic(err) } @@ -200,72 +164,156 @@ func (p *msgChannelConfig) initChanNamePrefix(cfg string) string { } // --- proxy --- -func (p *msgChannelConfig) initProxySubName() { - p.ProxySubName = p.initChanNamePrefix("msgChannel.subNamePrefix.proxySubNamePrefix") +func (p *commonConfig) initProxySubName() { + keys := []string{ + "common.subNamePrefix.proxySubNamePrefix", + "msgChannel.subNamePrefix.proxySubNamePrefix", + } + p.ProxySubName = p.initChanNamePrefix(keys) } // --- rootcoord --- -func (p *msgChannelConfig) initRootCoordTimeTick() { - p.RootCoordTimeTick = p.initChanNamePrefix("msgChannel.chanNamePrefix.rootCoordTimeTick") +func (p *commonConfig) initRootCoordTimeTick() { + keys := []string{ + "common.chanNamePrefix.rootCoordTimeTick", + "msgChannel.chanNamePrefix.rootCoordTimeTick", + } + p.RootCoordTimeTick = p.initChanNamePrefix(keys) } -func (p *msgChannelConfig) initRootCoordStatistics() { - p.RootCoordStatistics = p.initChanNamePrefix("msgChannel.chanNamePrefix.rootCoordStatistics") +func (p *commonConfig) initRootCoordStatistics() { + keys := []string{ + "common.chanNamePrefix.rootCoordStatistics", + "msgChannel.chanNamePrefix.rootCoordStatistics", + } + p.RootCoordStatistics = p.initChanNamePrefix(keys) } -func (p *msgChannelConfig) initRootCoordDml() { - p.RootCoordDml = p.initChanNamePrefix("msgChannel.chanNamePrefix.rootCoordDml") +func (p *commonConfig) initRootCoordDml() { + keys := []string{ + "common.chanNamePrefix.rootCoordDml", + "msgChannel.chanNamePrefix.rootCoordDml", + } + p.RootCoordDml = p.initChanNamePrefix(keys) } -func (p *msgChannelConfig) initRootCoordDelta() { - p.RootCoordDelta = p.initChanNamePrefix("msgChannel.chanNamePrefix.rootCoordDelta") +func (p *commonConfig) initRootCoordDelta() { + keys := []string{ + "common.chanNamePrefix.rootCoordDelta", + "msgChannel.chanNamePrefix.rootCoordDelta", + } + p.RootCoordDelta = p.initChanNamePrefix(keys) } -func (p *msgChannelConfig) initRootCoordSubName() { - p.RootCoordSubName = p.initChanNamePrefix("msgChannel.subNamePrefix.rootCoordSubNamePrefix") +func (p *commonConfig) initRootCoordSubName() { + keys := []string{ + "common.subNamePrefix.rootCoordSubNamePrefix", + "msgChannel.subNamePrefix.rootCoordSubNamePrefix", + } + p.RootCoordSubName = p.initChanNamePrefix(keys) } // --- querycoord --- -func (p *msgChannelConfig) initQueryCoordSearch() { - p.QueryCoordSearch = p.initChanNamePrefix("msgChannel.chanNamePrefix.search") +func (p *commonConfig) initQueryCoordSearch() { + keys := []string{ + "common.chanNamePrefix.search", + "msgChannel.chanNamePrefix.search", + } + p.QueryCoordSearch = p.initChanNamePrefix(keys) } -func (p *msgChannelConfig) initQueryCoordSearchResult() { - p.QueryCoordSearchResult = p.initChanNamePrefix("msgChannel.chanNamePrefix.searchResult") +func (p *commonConfig) initQueryCoordSearchResult() { + keys := []string{ + "common.chanNamePrefix.searchResult", + "msgChannel.chanNamePrefix.searchResult", + } + p.QueryCoordSearchResult = p.initChanNamePrefix(keys) } -func (p *msgChannelConfig) initQueryCoordTimeTick() { - p.QueryCoordTimeTick = p.initChanNamePrefix("msgChannel.chanNamePrefix.queryTimeTick") +func (p *commonConfig) initQueryCoordTimeTick() { + keys := []string{ + "common.chanNamePrefix.queryTimeTick", + "msgChannel.chanNamePrefix.queryTimeTick", + } + p.QueryCoordTimeTick = p.initChanNamePrefix(keys) } // --- querynode --- -func (p *msgChannelConfig) initQueryNodeStats() { - p.QueryNodeStats = p.initChanNamePrefix("msgChannel.chanNamePrefix.queryNodeStats") +func (p *commonConfig) initQueryNodeStats() { + keys := []string{ + "common.chanNamePrefix.queryNodeStats", + "msgChannel.chanNamePrefix.queryNodeStats", + } + p.QueryNodeStats = p.initChanNamePrefix(keys) } -func (p *msgChannelConfig) initQueryNodeSubName() { - p.QueryNodeSubName = p.initChanNamePrefix("msgChannel.subNamePrefix.queryNodeSubNamePrefix") +func (p *commonConfig) initQueryNodeSubName() { + keys := []string{ + "common.subNamePrefix.queryNodeSubNamePrefix", + "msgChannel.subNamePrefix.queryNodeSubNamePrefix", + } + p.QueryNodeSubName = p.initChanNamePrefix(keys) } // --- datacoord --- -func (p *msgChannelConfig) initDataCoordStatistic() { - p.DataCoordStatistic = p.initChanNamePrefix("msgChannel.chanNamePrefix.dataCoordStatistic") +func (p *commonConfig) initDataCoordStatistic() { + keys := []string{ + "common.chanNamePrefix.dataCoordStatistic", + "msgChannel.chanNamePrefix.dataCoordStatistic", + } + p.DataCoordStatistic = p.initChanNamePrefix(keys) } -func (p *msgChannelConfig) initDataCoordTimeTick() { - p.DataCoordTimeTick = p.initChanNamePrefix("msgChannel.chanNamePrefix.dataCoordTimeTick") +func (p *commonConfig) initDataCoordTimeTick() { + keys := []string{ + "common.chanNamePrefix.dataCoordTimeTick", + "msgChannel.chanNamePrefix.dataCoordTimeTick", + } + p.DataCoordTimeTick = p.initChanNamePrefix(keys) } -func (p *msgChannelConfig) initDataCoordSegmentInfo() { - p.DataCoordSegmentInfo = p.initChanNamePrefix("msgChannel.chanNamePrefix.dataCoordSegmentInfo") +func (p *commonConfig) initDataCoordSegmentInfo() { + keys := []string{ + "common.chanNamePrefix.dataCoordSegmentInfo", + "msgChannel.chanNamePrefix.dataCoordSegmentInfo", + } + p.DataCoordSegmentInfo = p.initChanNamePrefix(keys) } -func (p *msgChannelConfig) initDataCoordSubName() { - p.DataCoordSubName = p.initChanNamePrefix("msgChannel.subNamePrefix.dataCoordSubNamePrefix") +func (p *commonConfig) initDataCoordSubName() { + keys := []string{ + "common.subNamePrefix.dataCoordSubNamePrefix", + "msgChannel.subNamePrefix.dataCoordSubNamePrefix", + } + p.DataCoordSubName = p.initChanNamePrefix(keys) } -func (p *msgChannelConfig) initDataNodeSubName() { - p.DataNodeSubName = p.initChanNamePrefix("msgChannel.subNamePrefix.dataNodeSubNamePrefix") +func (p *commonConfig) initDataNodeSubName() { + keys := []string{ + "common.subNamePrefix.dataNodeSubNamePrefix", + "msgChannel.subNamePrefix.dataNodeSubNamePrefix", + } + p.DataNodeSubName = p.initChanNamePrefix(keys) +} + +func (p *commonConfig) initDefaultPartitionName() { + p.DefaultPartitionName = p.Base.LoadWithDefault("common.defaultPartitionName", "_default") +} + +func (p *commonConfig) initDefaultIndexName() { + p.DefaultIndexName = p.Base.LoadWithDefault("common.defaultIndexName", "_default_idx") +} + +func (p *commonConfig) initRetentionDuration() { + p.RetentionDuration = p.Base.ParseInt64WithDefault("common.retentionDuration", DefaultRetentionDuration) +} + +func (p *commonConfig) initSimdType() { + keys := []string{ + "common.simdType", + "knowhere.simdType", + } + p.SimdType = p.Base.LoadWithDefault2(keys, "auto") } /////////////////////////////////////////////////////////////////////////////// diff --git a/internal/util/paramtable/component_param_test.go b/internal/util/paramtable/component_param_test.go index 291b471c26b7385c84cdcce40cfb03802fd24cba..9213b26d20c638540db3450a3b8b0fb7ecae6228 100644 --- a/internal/util/paramtable/component_param_test.go +++ b/internal/util/paramtable/component_param_test.go @@ -40,17 +40,9 @@ func TestComponentParam(t *testing.T) { t.Logf("default index name = %s", Params.DefaultIndexName) assert.Equal(t, Params.RetentionDuration, int64(DefaultRetentionDuration)) - }) - - t.Run("test knowhereConfig", func(t *testing.T) { - Params := CParams.KnowhereCfg assert.NotEqual(t, Params.SimdType, "") t.Logf("knowhere simd type = %s", Params.SimdType) - }) - - t.Run("test msgChannelConfig", func(t *testing.T) { - Params := CParams.MsgChannelCfg // -- proxy -- assert.Equal(t, Params.ProxySubName, "by-dev-proxy")