未验证 提交 3e73775c 编写于 作者: Z zhenshan.cao 提交者: GitHub

Add ComputeProduceChannelIndexes for msgstream (#5364)

Signed-off-by: Nzhenshan.cao <zhenshan.cao@zilliz.com>
上级 b688cc5f
......@@ -6,6 +6,7 @@ require (
github.com/HdrHistogram/hdrhistogram-go v1.0.1 // indirect
github.com/antonmedv/expr v1.8.9
github.com/apache/pulsar-client-go v0.5.0
github.com/apache/thrift/lib/go/thrift v0.0.0-20210120171102-e27e82c46ba4
github.com/coreos/etcd v3.3.13+incompatible
github.com/facebookgo/ensure v0.0.0-20200202191622-63f1cf65ac4c // indirect
github.com/facebookgo/stack v0.0.0-20160209184415-751773369052 // indirect
......@@ -13,6 +14,7 @@ require (
github.com/frankban/quicktest v1.10.2 // indirect
github.com/go-basic/ipv4 v1.0.0
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect
github.com/golang/mock v1.3.1
github.com/golang/protobuf v1.4.3
github.com/google/btree v1.0.0
github.com/jarcoal/httpmock v1.0.8
......
......@@ -31,6 +31,8 @@ github.com/apache/pulsar-client-go v0.5.0 h1:cM2e6dXBa9OyPtvGHxZB1OlSOWQxsWzu45b
github.com/apache/pulsar-client-go v0.5.0/go.mod h1:yj6hIv/EZXf5GgJJ8I3T13Yx9yspj8aF2QrJ5kzuueM=
github.com/apache/pulsar-client-go/oauth2 v0.0.0-20201120111947-b8bd55bc02bd h1:P5kM7jcXJ7TaftX0/EMKiSJgvQc/ct+Fw0KMvcH3WuY=
github.com/apache/pulsar-client-go/oauth2 v0.0.0-20201120111947-b8bd55bc02bd/go.mod h1:0UtvvETGDdvXNDCHa8ZQpxl+w3HbdFtfYZvDHLgWGTY=
github.com/apache/thrift/lib/go/thrift v0.0.0-20210120171102-e27e82c46ba4 h1:orNYqmQGnSjgOauLWjHEp9/qIDT98xv/0Aa4Zet3/Y8=
github.com/apache/thrift/lib/go/thrift v0.0.0-20210120171102-e27e82c46ba4/go.mod h1:V/LzksIyqd3KZuQ2SunvReTG/UkArhII1dAWY5U1sCE=
github.com/ardielle/ardielle-go v1.5.2 h1:TilHTpHIQJ27R1Tl/iITBzMwiUGSlVfiVhwDNGM3Zj4=
github.com/ardielle/ardielle-go v1.5.2/go.mod h1:I4hy1n795cUhaVt/ojz83SNVCYIGsAFAONtv2Dr7HUI=
github.com/ardielle/ardielle-tools v1.5.4/go.mod h1:oZN+JRMnqGiIhrzkRN9l26Cej9dEx4jeNG6A+AdkShk=
......@@ -113,6 +115,7 @@ github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e h1:1r7pUrabqp18h
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/mock v1.3.1 h1:qGJ6qTW+x6xX/my+8YUVl4WNpX9B7+/l2tRsHGZ7f2s=
github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
......@@ -316,6 +319,7 @@ github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsT
github.com/prometheus/procfs v0.1.3 h1:F0+tqvhOksq22sc6iCHF5WGlWjdwj92p0udFh1VFBS8=
github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
github.com/protocolbuffers/protobuf v3.17.0+incompatible h1:MYhKKlaNOl8FB3F4u6oM2AlpcyLtT+p8Ec1w/9YeHss=
github.com/rivo/tview v0.0.0-20200219210816-cd38d7432498/go.mod h1:6lkG1x+13OShEf0EaOCaTQYyB7d5nSbb181KtjlS+84=
github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
......
......@@ -15,7 +15,6 @@ import (
"context"
"errors"
"log"
"strconv"
"sync"
"github.com/milvus-io/milvus/internal/proto/commonpb"
......@@ -80,6 +79,26 @@ func (mms *MemMsgStream) AsProducer(channels []string) {
}
}
func (mms *MemMsgStream) ComputeProduceChannelIndexes(tsMsgs []TsMsg) [][]int32 {
if len(tsMsgs) <= 0 {
return nil
}
reBucketValues := make([][]int32, len(tsMsgs))
channelNum := uint32(len(mms.producers))
if channelNum == 0 {
return nil
}
for idx, tsMsg := range tsMsgs {
hashValues := tsMsg.HashKeys()
bucketValues := make([]int32, len(hashValues))
for index, hashValue := range hashValues {
bucketValues[index] = int32(hashValue % channelNum)
}
reBucketValues[idx] = bucketValues
}
return reBucketValues
}
func (mms *MemMsgStream) AsConsumer(channels []string, groupName string) {
for _, channelName := range channels {
consumer, err := Mmq.CreateConsumerGroup(groupName, channelName)
......@@ -101,26 +120,7 @@ func (mms *MemMsgStream) Produce(pack *MsgPack) error {
if len(mms.producers) <= 0 {
return errors.New("nil producer in msg stream")
}
reBucketValues := make([][]int32, len(tsMsgs))
for channelID, tsMsg := range tsMsgs {
hashValues := tsMsg.HashKeys()
bucketValues := make([]int32, len(hashValues))
for index, hashValue := range hashValues {
if tsMsg.Type() == commonpb.MsgType_SearchResult {
searchResult := tsMsg.(*SearchResultMsg)
channelID := searchResult.ResultChannelID
channelIDInt, _ := strconv.ParseInt(channelID, 10, 64)
if channelIDInt >= int64(len(mms.producers)) {
return errors.New("Failed to produce rmq msg to unKnow channel")
}
bucketValues[index] = int32(channelIDInt)
continue
}
bucketValues[index] = int32(hashValue % uint32(len(mms.producers)))
}
reBucketValues[channelID] = bucketValues
}
reBucketValues := mms.ComputeProduceChannelIndexes(pack.Msgs)
var result map[int32]*MsgPack
var err error
if mms.repackFunc != nil {
......
......@@ -15,7 +15,6 @@ import (
"context"
"errors"
"path/filepath"
"strconv"
"sync"
"time"
......@@ -165,35 +164,36 @@ func (ms *mqMsgStream) Close() {
}
}
func (ms *mqMsgStream) Produce(msgPack *MsgPack) error {
tsMsgs := msgPack.Msgs
func (ms *mqMsgStream) ComputeProduceChannelIndexes(tsMsgs []TsMsg) [][]int32 {
if len(tsMsgs) <= 0 {
log.Debug("Warning: Receive empty msgPack")
return nil
}
if len(ms.producers) <= 0 {
return errors.New("nil producer in msg stream")
}
reBucketValues := make([][]int32, len(tsMsgs))
channelNum := uint32(len(ms.producerChannels))
if channelNum == 0 {
return nil
}
for idx, tsMsg := range tsMsgs {
hashValues := tsMsg.HashKeys()
bucketValues := make([]int32, len(hashValues))
for index, hashValue := range hashValues {
if tsMsg.Type() == commonpb.MsgType_SearchResult {
searchResult := tsMsg.(*SearchResultMsg)
channelID := searchResult.ResultChannelID
channelIDInt, _ := strconv.ParseInt(channelID, 10, 64)
if channelIDInt >= int64(len(ms.producers)) {
return errors.New("Failed to produce msg to unKnow channel")
}
bucketValues[index] = int32(hashValue % uint32(len(ms.producers)))
continue
}
bucketValues[index] = int32(hashValue % uint32(len(ms.producers)))
bucketValues[index] = int32(hashValue % channelNum)
}
reBucketValues[idx] = bucketValues
}
return reBucketValues
}
func (ms *mqMsgStream) Produce(msgPack *MsgPack) error {
tsMsgs := msgPack.Msgs
if len(tsMsgs) <= 0 {
log.Debug("Warning: Receive empty msgPack")
return nil
}
if len(ms.producers) <= 0 {
return errors.New("nil producer in msg stream")
}
reBucketValues := ms.ComputeProduceChannelIndexes(msgPack.Msgs)
var result map[int32]*MsgPack
var err error
if ms.repackFunc != nil {
......
......@@ -40,7 +40,7 @@ type MsgStream interface {
AsProducer(channels []string)
AsConsumer(channels []string, subName string)
SetRepackFunc(repackFunc RepackFunc)
ComputeProduceChannelIndexes(tsMsgs []TsMsg) [][]int32
Produce(*MsgPack) error
Broadcast(*MsgPack) error
Consume() *MsgPack
......
......@@ -39,6 +39,10 @@ func (ms *SimpleMsgStream) AsProducer(channels []string) {
func (ms *SimpleMsgStream) AsConsumer(channels []string, subName string) {
}
func (ms *SimpleMsgStream) ComputeProduceChannelIndexes(tsMsgs []TsMsg) [][]int32 {
return nil
}
func (ms *SimpleMsgStream) SetRepackFunc(repackFunc RepackFunc) {
}
......
......@@ -15,7 +15,6 @@ import (
"context"
"errors"
"fmt"
"strconv"
"sync"
"github.com/golang/protobuf/proto"
......@@ -366,7 +365,7 @@ func (s *searchCollection) search(searchMsg *msgstream.SearchMsg) error {
}
nilHits[i] = bs
}
resultChannelInt, _ := strconv.ParseInt(searchMsg.ResultChannelID, 10, 64)
resultChannelInt := 0
searchResultMsg := &msgstream.SearchResultMsg{
BaseMsg: msgstream.BaseMsg{Ctx: searchMsg.Ctx, HashValues: []uint32{uint32(resultChannelInt)}},
SearchResults: internalpb.SearchResults{
......@@ -447,7 +446,7 @@ func (s *searchCollection) search(searchMsg *msgstream.SearchMsg) error {
//log.Debug("hits msg = ", unMarshaledHit)
offset += len
}
resultChannelInt, _ := strconv.ParseInt(searchMsg.ResultChannelID, 10, 64)
resultChannelInt := 0
searchResultMsg := &msgstream.SearchResultMsg{
BaseMsg: msgstream.BaseMsg{Ctx: searchMsg.Ctx, HashValues: []uint32{uint32(resultChannelInt)}},
SearchResults: internalpb.SearchResults{
......@@ -517,7 +516,7 @@ func (s *searchCollection) publishFailedSearchResult(searchMsg *msgstream.Search
//log.Debug("Public fail SearchResult!")
msgPack := msgstream.MsgPack{}
resultChannelInt, _ := strconv.ParseInt(searchMsg.ResultChannelID, 10, 64)
resultChannelInt := 0
searchResultMsg := &msgstream.SearchResultMsg{
BaseMsg: msgstream.BaseMsg{HashValues: []uint32{uint32(resultChannelInt)}},
SearchResults: internalpb.SearchResults{
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册