From 69cc68caffa6c508a3c097e0b9a57e2176a3c3bb Mon Sep 17 00:00:00 2001 From: wei liu Date: Thu, 16 Mar 2023 14:25:55 +0800 Subject: [PATCH] support first or random strategy (#22766) Signed-off-by: Wei Liu --- internal/proxy/meta_cache.go | 17 ++++++++++--- internal/proxy/meta_cache_test.go | 40 +++++++++++++++++++++++++++++++ 2 files changed, 54 insertions(+), 3 deletions(-) diff --git a/internal/proxy/meta_cache.go b/internal/proxy/meta_cache.go index 914c07639..1974804ca 100644 --- a/internal/proxy/meta_cache.go +++ b/internal/proxy/meta_cache.go @@ -19,6 +19,7 @@ package proxy import ( "context" "fmt" + "math/rand" "strconv" "sync" "time" @@ -123,12 +124,22 @@ type shardLeadersReader struct { // Shuffle returns the shuffled shard leader list. func (it shardLeadersReader) Shuffle() map[string][]nodeInfo { result := make(map[string][]nodeInfo) + rand.Seed(time.Now().UnixNano()) for channel, leaders := range it.leaders.shardLeaders { l := len(leaders) - shuffled := make([]nodeInfo, 0, len(leaders)) - for i := 0; i < l; i++ { - shuffled = append(shuffled, leaders[(i+int(it.idx))%l]) + // shuffle all replica at random order + shuffled := make([]nodeInfo, l) + for i, randIndex := range rand.Perm(l) { + shuffled[i] = leaders[randIndex] } + + // make each copy has same probability to be first replica + for index, leader := range shuffled { + if leader == leaders[int(it.idx)%l] { + shuffled[0], shuffled[index] = shuffled[index], shuffled[0] + } + } + result[channel] = shuffled } return result diff --git a/internal/proxy/meta_cache_test.go b/internal/proxy/meta_cache_test.go index 72f68d23d..74012c2ba 100644 --- a/internal/proxy/meta_cache_test.go +++ b/internal/proxy/meta_cache_test.go @@ -25,6 +25,7 @@ import ( "time" "github.com/cockroachdb/errors" + uatomic "go.uber.org/atomic" "github.com/milvus-io/milvus/internal/util/funcutil" "github.com/milvus-io/milvus/internal/util/paramtable" @@ -880,3 +881,42 @@ func TestMetaCache_ExpireShardLeaderCache(t *testing.T) { return len(nodeInfos["channel-1"]) == 3 && len(nodeInfos["channel-2"]) == 3 }, 3*time.Second, 1*time.Second) } + +func TestGlobalMetaCache_ShuffleShardLeaders(t *testing.T) { + shards := map[string][]nodeInfo{ + "channel-1": { + { + nodeID: 1, + address: "localhost:9000", + }, + { + nodeID: 2, + address: "localhost:9000", + }, + { + nodeID: 3, + address: "localhost:9000", + }, + }, + } + sl := &shardLeaders{ + deprecated: uatomic.NewBool(false), + idx: uatomic.NewInt64(5), + shardLeaders: shards, + } + + reader := sl.GetReader() + result := reader.Shuffle() + assert.Len(t, result["channel-1"], 3) + assert.Equal(t, int64(1), result["channel-1"][0].nodeID) + + reader = sl.GetReader() + result = reader.Shuffle() + assert.Len(t, result["channel-1"], 3) + assert.Equal(t, int64(2), result["channel-1"][0].nodeID) + + reader = sl.GetReader() + result = reader.Shuffle() + assert.Len(t, result["channel-1"], 3) + assert.Equal(t, int64(3), result["channel-1"][0].nodeID) +} -- GitLab