未验证 提交 ad41e2a0 编写于 作者: X XuanYang-cn 提交者: GitHub

Tidy getQueryChannel for readability (#16096)

- Remove useless codes in query_coord.go
- Tidy slice initialization in LoadCollection task
- Add ut for getQueryChannel
Signed-off-by: Nyangxuan <xuan.yang@zilliz.com>
上级 97e5d779
......@@ -779,16 +779,16 @@ func (m *MetaReplica) setDmChannelInfos(dmChannelWatchInfos []*querypb.DmChannel
return nil
}
// createQueryChannel creates topic names for search channel and search result channel
// Search channel's suffix is fixed with "-0"
// Search result channel's suffix is fixed with "-0"
func (m *MetaReplica) createQueryChannel(collectionID UniqueID) *querypb.QueryChannelInfo {
// TODO::to remove
// all collection use the same query channel
colIDForAssignChannel := UniqueID(0)
searchPrefix := Params.CommonCfg.QueryCoordSearch
searchResultPrefix := Params.CommonCfg.QueryCoordSearchResult
allocatedQueryChannel := searchPrefix + "-" + strconv.FormatInt(colIDForAssignChannel, 10)
allocatedQueryResultChannel := searchResultPrefix + "-" + strconv.FormatInt(colIDForAssignChannel, 10)
log.Debug("query coordinator create query channel", zap.String("queryChannelName", allocatedQueryChannel), zap.String("queryResultChannelName", allocatedQueryResultChannel))
allocatedQueryChannel := fmt.Sprintf("%s-0", Params.CommonCfg.QueryCoordSearch)
allocatedQueryResultChannel := fmt.Sprintf("%s-0", Params.CommonCfg.QueryCoordSearchResult)
log.Debug("query coordinator is creating query channel",
zap.String("query channel name", allocatedQueryChannel),
zap.String("query result channel name", allocatedQueryResultChannel))
seekPosition := &internalpb.MsgPosition{
ChannelName: allocatedQueryChannel,
......
......@@ -354,3 +354,50 @@ func TestReloadMetaFromKV(t *testing.T) {
segment := meta.segmentsInfo.getSegment(defaultSegmentID)
assert.NotNil(t, segment)
}
func TestCreateQueryChannel(t *testing.T) {
refreshParams()
etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg)
assert.Nil(t, err)
defer etcdCli.Close()
kv := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath)
nodeID := defaultQueryNodeID
segmentsInfo := newSegmentsInfo(kv)
segmentsInfo.segmentIDMap[defaultSegmentID] = &querypb.SegmentInfo{
CollectionID: defaultCollectionID,
PartitionID: defaultPartitionID,
SegmentID: defaultSegmentID,
NodeID: nodeID,
}
fixedQueryChannel := Params.CommonCfg.QueryCoordSearch + "-0"
fixedQueryResultChannel := Params.CommonCfg.QueryCoordSearchResult + "-0"
tests := []struct {
inID UniqueID
outQueryChannel string
outResultChannel string
description string
}{
{0, fixedQueryChannel, fixedQueryResultChannel, "collection ID = 0"},
{1, fixedQueryChannel, fixedQueryResultChannel, "collection ID = 1"},
}
m := &MetaReplica{
client: kv,
collectionInfos: map[UniqueID]*querypb.CollectionInfo{},
queryChannelInfos: map[UniqueID]*querypb.QueryChannelInfo{},
dmChannelInfos: map[string]*querypb.DmChannelWatchInfo{},
segmentsInfo: segmentsInfo,
}
for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
info := m.createQueryChannel(test.inID)
assert.Equal(t, info.GetQueryChannel(), test.outQueryChannel)
assert.Equal(t, info.GetQueryResultChannel(), test.outResultChannel)
})
}
}
......@@ -24,7 +24,6 @@ import (
"math/rand"
"os"
"sort"
"strconv"
"sync"
"sync/atomic"
"syscall"
......@@ -296,18 +295,6 @@ func (qc *QueryCoord) UpdateStateCode(code internalpb.StateCode) {
// NewQueryCoord creates a QueryCoord object.
func NewQueryCoord(ctx context.Context, factory msgstream.Factory) (*QueryCoord, error) {
rand.Seed(time.Now().UnixNano())
queryChannels := make([]*queryChannelInfo, 0)
channelID := len(queryChannels)
searchPrefix := Params.CommonCfg.QueryCoordSearch
searchResultPrefix := Params.CommonCfg.QueryCoordSearchResult
allocatedQueryChannel := searchPrefix + "-" + strconv.FormatInt(int64(channelID), 10)
allocatedQueryResultChannel := searchResultPrefix + "-" + strconv.FormatInt(int64(channelID), 10)
queryChannels = append(queryChannels, &queryChannelInfo{
requestChannel: allocatedQueryChannel,
responseChannel: allocatedQueryResultChannel,
})
ctx1, cancel := context.WithCancel(ctx)
service := &QueryCoord{
loopCtx: ctx1,
......@@ -317,7 +304,6 @@ func NewQueryCoord(ctx context.Context, factory msgstream.Factory) (*QueryCoord,
}
service.UpdateStateCode(internalpb.StateCode_Abnormal)
log.Debug("query coordinator", zap.Any("queryChannels", queryChannels))
return service, nil
}
......
......@@ -375,10 +375,13 @@ func (lct *loadCollectionTask) execute(ctx context.Context) error {
}
log.Debug("loadCollectionTask: get collection's all partitionIDs", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", toLoadPartitionIDs), zap.Int64("msgID", lct.Base.MsgID))
loadSegmentReqs := make([]*querypb.LoadSegmentsRequest, 0)
watchDmChannelReqs := make([]*querypb.WatchDmChannelsRequest, 0)
var deltaChannelInfos []*datapb.VchannelInfo
var dmChannelInfos []*datapb.VchannelInfo
var (
loadSegmentReqs = []*querypb.LoadSegmentsRequest{}
watchDmChannelReqs = []*querypb.WatchDmChannelsRequest{}
deltaChannelInfos = []*datapb.VchannelInfo{}
dmChannelInfos = []*datapb.VchannelInfo{}
)
for _, partitionID := range toLoadPartitionIDs {
vChannelInfos, binlogs, err := lct.broker.getRecoveryInfo(lct.ctx, collectionID, partitionID)
if err != nil {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册