diff --git a/internal/querycoord/meta.go b/internal/querycoord/meta.go index b2e37feecda05e0491c94c61a97c9b18c8441341..c6a16ae0a38fa2388be574ceb08a8c17ecccc3fd 100644 --- a/internal/querycoord/meta.go +++ b/internal/querycoord/meta.go @@ -779,16 +779,16 @@ func (m *MetaReplica) setDmChannelInfos(dmChannelWatchInfos []*querypb.DmChannel return nil } +// createQueryChannel creates topic names for search channel and search result channel +// Search channel's suffix is fixed with "-0" +// Search result channel's suffix is fixed with "-0" func (m *MetaReplica) createQueryChannel(collectionID UniqueID) *querypb.QueryChannelInfo { - // TODO::to remove - // all collection use the same query channel - colIDForAssignChannel := UniqueID(0) - - searchPrefix := Params.CommonCfg.QueryCoordSearch - searchResultPrefix := Params.CommonCfg.QueryCoordSearchResult - allocatedQueryChannel := searchPrefix + "-" + strconv.FormatInt(colIDForAssignChannel, 10) - allocatedQueryResultChannel := searchResultPrefix + "-" + strconv.FormatInt(colIDForAssignChannel, 10) - log.Debug("query coordinator create query channel", zap.String("queryChannelName", allocatedQueryChannel), zap.String("queryResultChannelName", allocatedQueryResultChannel)) + allocatedQueryChannel := fmt.Sprintf("%s-0", Params.CommonCfg.QueryCoordSearch) + allocatedQueryResultChannel := fmt.Sprintf("%s-0", Params.CommonCfg.QueryCoordSearchResult) + + log.Debug("query coordinator is creating query channel", + zap.String("query channel name", allocatedQueryChannel), + zap.String("query result channel name", allocatedQueryResultChannel)) seekPosition := &internalpb.MsgPosition{ ChannelName: allocatedQueryChannel, diff --git a/internal/querycoord/meta_test.go b/internal/querycoord/meta_test.go index d1c7c3969d74b0de8b047f458e96179b6c322469..9df95667449b560477de0096984cadb88ed60a83 100644 --- a/internal/querycoord/meta_test.go +++ b/internal/querycoord/meta_test.go @@ -354,3 +354,50 @@ func TestReloadMetaFromKV(t *testing.T) { segment := meta.segmentsInfo.getSegment(defaultSegmentID) assert.NotNil(t, segment) } + +func TestCreateQueryChannel(t *testing.T) { + refreshParams() + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) + assert.Nil(t, err) + defer etcdCli.Close() + kv := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath) + + nodeID := defaultQueryNodeID + segmentsInfo := newSegmentsInfo(kv) + segmentsInfo.segmentIDMap[defaultSegmentID] = &querypb.SegmentInfo{ + CollectionID: defaultCollectionID, + PartitionID: defaultPartitionID, + SegmentID: defaultSegmentID, + NodeID: nodeID, + } + + fixedQueryChannel := Params.CommonCfg.QueryCoordSearch + "-0" + fixedQueryResultChannel := Params.CommonCfg.QueryCoordSearchResult + "-0" + + tests := []struct { + inID UniqueID + outQueryChannel string + outResultChannel string + + description string + }{ + {0, fixedQueryChannel, fixedQueryResultChannel, "collection ID = 0"}, + {1, fixedQueryChannel, fixedQueryResultChannel, "collection ID = 1"}, + } + + m := &MetaReplica{ + client: kv, + collectionInfos: map[UniqueID]*querypb.CollectionInfo{}, + queryChannelInfos: map[UniqueID]*querypb.QueryChannelInfo{}, + dmChannelInfos: map[string]*querypb.DmChannelWatchInfo{}, + segmentsInfo: segmentsInfo, + } + for _, test := range tests { + t.Run(test.description, func(t *testing.T) { + info := m.createQueryChannel(test.inID) + assert.Equal(t, info.GetQueryChannel(), test.outQueryChannel) + assert.Equal(t, info.GetQueryResultChannel(), test.outResultChannel) + }) + } + +} diff --git a/internal/querycoord/query_coord.go b/internal/querycoord/query_coord.go index a9d79c7490f3de0953d49ab0046b906443264dfb..c6421cb33b04763f0bd37e9a849e6f91f5395d45 100644 --- a/internal/querycoord/query_coord.go +++ b/internal/querycoord/query_coord.go @@ -24,7 +24,6 @@ import ( "math/rand" "os" "sort" - "strconv" "sync" "sync/atomic" "syscall" @@ -296,18 +295,6 @@ func (qc *QueryCoord) UpdateStateCode(code internalpb.StateCode) { // NewQueryCoord creates a QueryCoord object. func NewQueryCoord(ctx context.Context, factory msgstream.Factory) (*QueryCoord, error) { rand.Seed(time.Now().UnixNano()) - queryChannels := make([]*queryChannelInfo, 0) - channelID := len(queryChannels) - searchPrefix := Params.CommonCfg.QueryCoordSearch - searchResultPrefix := Params.CommonCfg.QueryCoordSearchResult - allocatedQueryChannel := searchPrefix + "-" + strconv.FormatInt(int64(channelID), 10) - allocatedQueryResultChannel := searchResultPrefix + "-" + strconv.FormatInt(int64(channelID), 10) - - queryChannels = append(queryChannels, &queryChannelInfo{ - requestChannel: allocatedQueryChannel, - responseChannel: allocatedQueryResultChannel, - }) - ctx1, cancel := context.WithCancel(ctx) service := &QueryCoord{ loopCtx: ctx1, @@ -317,7 +304,6 @@ func NewQueryCoord(ctx context.Context, factory msgstream.Factory) (*QueryCoord, } service.UpdateStateCode(internalpb.StateCode_Abnormal) - log.Debug("query coordinator", zap.Any("queryChannels", queryChannels)) return service, nil } diff --git a/internal/querycoord/task.go b/internal/querycoord/task.go index 2a1aad44d0ab66a50a52407fdd5f08d9e8af21e1..53c0597e6353da46d74cfd72000d47c3963eb09c 100644 --- a/internal/querycoord/task.go +++ b/internal/querycoord/task.go @@ -375,10 +375,13 @@ func (lct *loadCollectionTask) execute(ctx context.Context) error { } log.Debug("loadCollectionTask: get collection's all partitionIDs", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", toLoadPartitionIDs), zap.Int64("msgID", lct.Base.MsgID)) - loadSegmentReqs := make([]*querypb.LoadSegmentsRequest, 0) - watchDmChannelReqs := make([]*querypb.WatchDmChannelsRequest, 0) - var deltaChannelInfos []*datapb.VchannelInfo - var dmChannelInfos []*datapb.VchannelInfo + var ( + loadSegmentReqs = []*querypb.LoadSegmentsRequest{} + watchDmChannelReqs = []*querypb.WatchDmChannelsRequest{} + deltaChannelInfos = []*datapb.VchannelInfo{} + dmChannelInfos = []*datapb.VchannelInfo{} + ) + for _, partitionID := range toLoadPartitionIDs { vChannelInfos, binlogs, err := lct.broker.getRecoveryInfo(lct.ctx, collectionID, partitionID) if err != nil {