提交 1c274de3 编写于 作者: B bigsheeper 提交者: yefu.chen

Improve mutex in queryservice

Signed-off-by: Nbigsheeper <yihao.dai@zilliz.com>
上级 dfb68a7e
......@@ -44,10 +44,14 @@ func (qn *queryNodeInfo) AddDmChannels(channels []string, collectionID UniqueID)
qn.channels2Col[collectionID] = append(qn.channels2Col[collectionID], channels...)
}
func (qn *queryNodeInfo) getChannels2Col() map[UniqueID][]string {
func (qn *queryNodeInfo) getNumChannels() int {
qn.mu.Lock()
defer qn.mu.Unlock()
return qn.channels2Col
numChannels := 0
for _, chs := range qn.channels2Col {
numChannels += len(chs)
}
return numChannels
}
func (qn *queryNodeInfo) AddSegments(segmentIDs []UniqueID, collectionID UniqueID) {
......@@ -60,6 +64,22 @@ func (qn *queryNodeInfo) AddSegments(segmentIDs []UniqueID, collectionID UniqueI
qn.segments[collectionID] = append(qn.segments[collectionID], segmentIDs...)
}
func (qn *queryNodeInfo) getSegmentsLength() int {
qn.mu.Lock()
defer qn.mu.Unlock()
return len(qn.segments)
}
func (qn *queryNodeInfo) getNumSegments() int {
qn.mu.Lock()
defer qn.mu.Unlock()
numSegments := 0
for _, ids := range qn.segments {
numSegments += len(ids)
}
return numSegments
}
func (qn *queryNodeInfo) AddQueryChannel(ctx context.Context, in *querypb.AddQueryChannelRequest) (*commonpb.Status, error) {
return qn.client.AddQueryChannel(ctx, in)
}
......
......@@ -756,10 +756,7 @@ func (qs *QueryService) watchDmChannels(dbID UniqueID, collectionID UniqueID) er
func (qs *QueryService) shuffleChannelsToQueryNode(dmChannels []string) map[int64][]string {
maxNumChannels := 0
for _, node := range qs.queryNodes {
numChannels := 0
for _, chs := range node.getChannels2Col() {
numChannels += len(chs)
}
numChannels := node.getNumChannels()
if numChannels > maxNumChannels {
maxNumChannels = numChannels
}
......@@ -771,7 +768,7 @@ func (qs *QueryService) shuffleChannelsToQueryNode(dmChannels []string) map[int6
lastOffset := offset
if !loopAll {
for id, node := range qs.queryNodes {
if len(node.segments) >= maxNumChannels {
if node.getSegmentsLength() >= maxNumChannels {
continue
}
if _, ok := res[id]; !ok {
......@@ -804,10 +801,7 @@ func (qs *QueryService) shuffleChannelsToQueryNode(dmChannels []string) map[int6
func (qs *QueryService) shuffleSegmentsToQueryNode(segmentIDs []UniqueID) map[int64][]UniqueID {
maxNumSegments := 0
for _, node := range qs.queryNodes {
numSegments := 0
for _, ids := range node.segments {
numSegments += len(ids)
}
numSegments := node.getNumSegments()
if numSegments > maxNumSegments {
maxNumSegments = numSegments
}
......@@ -828,7 +822,7 @@ func (qs *QueryService) shuffleSegmentsToQueryNode(segmentIDs []UniqueID) map[in
lastOffset := offset
if !loopAll {
for id, node := range qs.queryNodes {
if len(node.segments) >= maxNumSegments {
if node.getSegmentsLength() >= maxNumSegments {
continue
}
if _, ok := res[id]; !ok {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册