未验证 提交 69cc68ca 编写于 作者: W wei liu 提交者: GitHub

support first or random strategy (#22766)

Signed-off-by: NWei Liu <wei.liu@zilliz.com>
上级 80c6adb3
......@@ -19,6 +19,7 @@ package proxy
import (
"context"
"fmt"
"math/rand"
"strconv"
"sync"
"time"
......@@ -123,12 +124,22 @@ type shardLeadersReader struct {
// Shuffle returns the shuffled shard leader list.
func (it shardLeadersReader) Shuffle() map[string][]nodeInfo {
result := make(map[string][]nodeInfo)
rand.Seed(time.Now().UnixNano())
for channel, leaders := range it.leaders.shardLeaders {
l := len(leaders)
shuffled := make([]nodeInfo, 0, len(leaders))
for i := 0; i < l; i++ {
shuffled = append(shuffled, leaders[(i+int(it.idx))%l])
// shuffle all replica at random order
shuffled := make([]nodeInfo, l)
for i, randIndex := range rand.Perm(l) {
shuffled[i] = leaders[randIndex]
}
// make each copy has same probability to be first replica
for index, leader := range shuffled {
if leader == leaders[int(it.idx)%l] {
shuffled[0], shuffled[index] = shuffled[index], shuffled[0]
}
}
result[channel] = shuffled
}
return result
......
......@@ -25,6 +25,7 @@ import (
"time"
"github.com/cockroachdb/errors"
uatomic "go.uber.org/atomic"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/paramtable"
......@@ -880,3 +881,42 @@ func TestMetaCache_ExpireShardLeaderCache(t *testing.T) {
return len(nodeInfos["channel-1"]) == 3 && len(nodeInfos["channel-2"]) == 3
}, 3*time.Second, 1*time.Second)
}
func TestGlobalMetaCache_ShuffleShardLeaders(t *testing.T) {
shards := map[string][]nodeInfo{
"channel-1": {
{
nodeID: 1,
address: "localhost:9000",
},
{
nodeID: 2,
address: "localhost:9000",
},
{
nodeID: 3,
address: "localhost:9000",
},
},
}
sl := &shardLeaders{
deprecated: uatomic.NewBool(false),
idx: uatomic.NewInt64(5),
shardLeaders: shards,
}
reader := sl.GetReader()
result := reader.Shuffle()
assert.Len(t, result["channel-1"], 3)
assert.Equal(t, int64(1), result["channel-1"][0].nodeID)
reader = sl.GetReader()
result = reader.Shuffle()
assert.Len(t, result["channel-1"], 3)
assert.Equal(t, int64(2), result["channel-1"][0].nodeID)
reader = sl.GetReader()
result = reader.Shuffle()
assert.Len(t, result["channel-1"], 3)
assert.Equal(t, int64(3), result["channel-1"][0].nodeID)
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册