From 580e3a57cfda8293655073ea77181499235ca70d Mon Sep 17 00:00:00 2001 From: congqixia Date: Thu, 1 Jul 2021 14:32:16 +0800 Subject: [PATCH] stop add new node into cluster before validated (#6241) * stop add new new into cluster Signed-off-by: Congqi Xia * Fix startup & test cases Signed-off-by: Congqi Xia --- internal/datacoord/cluster.go | 14 +-- internal/datacoord/cluster_data_manager.go | 10 +-- internal/datacoord/cluster_test.go | 99 ++++++++++++++++++++-- 3 files changed, 102 insertions(+), 21 deletions(-) diff --git a/internal/datacoord/cluster.go b/internal/datacoord/cluster.go index 224affa2f..91f5de39a 100644 --- a/internal/datacoord/cluster.go +++ b/internal/datacoord/cluster.go @@ -95,17 +95,18 @@ func newCluster(ctx context.Context, dataManager *clusterNodeManager, assignPolicy: defaultAssignPolicy(), } + c.candidateManager = newCandidateManager(20, c.validateDataNode, c.enableDataNode) + for _, opt := range opts { opt.apply(c) } - c.candidateManager = newCandidateManager(20, c.validateDataNode, c.enableDataNode) return c } // startup applies statup policy func (c *cluster) startup(dataNodes []*datapb.DataNodeInfo) error { - deltaChange := c.dataManager.updateCluster(dataNodes) + /*deltaChange := c.dataManager.updateCluster(dataNodes) nodes, chanBuffer := c.dataManager.getDataNodes(false) var rets []*datapb.DataNodeInfo var err error @@ -117,7 +118,8 @@ func (c *cluster) startup(dataNodes []*datapb.DataNodeInfo) error { //does not trigger new another refresh, pending evt will do } c.dataManager.updateDataNodes(rets, chanBuffer) - return nil + return nil*/ + return c.refresh(dataNodes) } // refresh rough refresh datanode status after event received @@ -150,8 +152,10 @@ func (c *cluster) refresh(dataNodes []*datapb.DataNodeInfo) error { } } } - _, buffer := c.dataManager.getDataNodes(true) - c.updateNodeWatch(restartNodes, buffer) + if len(restartNodes) > 0 { + _, buffer := c.dataManager.getDataNodes(true) + c.updateNodeWatch(restartNodes, buffer) + } // 3. offline do unregister unregisterNodes := make([]*datapb.DataNodeInfo, 0, len(deltaChange.offlines)) // possible nodes info to unregister diff --git a/internal/datacoord/cluster_data_manager.go b/internal/datacoord/cluster_data_manager.go index c55908e0c..a8c1a340a 100644 --- a/internal/datacoord/cluster_data_manager.go +++ b/internal/datacoord/cluster_data_manager.go @@ -97,15 +97,6 @@ func (c *clusterNodeManager) updateCluster(dataNodes []*datapb.DataNodeInfo) *cl continue } - newNode := &dataNodeInfo{ - info: &datapb.DataNodeInfo{ - Address: n.Address, - Version: n.Version, - Channels: []*datapb.ChannelStatus{}, - }, - status: online, - } - c.dataNodes[n.Address] = newNode newNodes = append(newNodes, n.Address) } @@ -166,6 +157,7 @@ func (c *clusterNodeManager) unregister(addr string) *datapb.DataNodeInfo { if !ok { return nil } + delete(c.dataNodes, addr) node.status = offline c.updateMetrics() return node.info diff --git a/internal/datacoord/cluster_test.go b/internal/datacoord/cluster_test.go index be3b40d2c..11146171c 100644 --- a/internal/datacoord/cluster_test.go +++ b/internal/datacoord/cluster_test.go @@ -12,6 +12,8 @@ package datacoord import ( "context" + "errors" + "strings" "testing" memkv "github.com/milvus-io/milvus/internal/kv/mem" @@ -21,7 +23,8 @@ import ( func TestClusterCreate(t *testing.T) { cPolicy := newMockStartupPolicy() - cluster := createCluster(t, nil, withStartupPolicy(cPolicy)) + ch := make(chan struct{}, 1) + cluster := createCluster(t, nil, withStartupPolicy(cPolicy), mockValidatorOption(ch)) addr := "localhost:8080" nodes := []*datapb.DataNodeInfo{ { @@ -32,6 +35,7 @@ func TestClusterCreate(t *testing.T) { } err := cluster.startup(nodes) assert.Nil(t, err) + <-ch dataNodes, _ := cluster.dataManager.getDataNodes(true) assert.EqualValues(t, 1, len(dataNodes)) assert.EqualValues(t, "localhost:8080", dataNodes[addr].Address) @@ -58,7 +62,8 @@ func TestRegister(t *testing.T) { func TestUnregister(t *testing.T) { cPolicy := newMockStartupPolicy() unregisterPolicy := newEmptyUnregisterPolicy() - cluster := createCluster(t, nil, withStartupPolicy(cPolicy), withUnregistorPolicy(unregisterPolicy)) + ch := make(chan struct{}, 1) + cluster := createCluster(t, nil, withStartupPolicy(cPolicy), withUnregistorPolicy(unregisterPolicy), mockValidatorOption(ch)) addr := "localhost:8080" nodes := []*datapb.DataNodeInfo{ { @@ -69,6 +74,7 @@ func TestUnregister(t *testing.T) { } err := cluster.startup(nodes) assert.Nil(t, err) + <-ch dataNodes, _ := cluster.dataManager.getDataNodes(true) assert.EqualValues(t, 1, len(dataNodes)) assert.EqualValues(t, "localhost:8080", dataNodes[addr].Address) @@ -78,14 +84,75 @@ func TestUnregister(t *testing.T) { Channels: []*datapb.ChannelStatus{}, }) dataNodes, _ = cluster.dataManager.getDataNodes(false) - assert.EqualValues(t, 1, len(dataNodes)) - assert.EqualValues(t, offline, cluster.dataManager.dataNodes[addr].status) - assert.EqualValues(t, "localhost:8080", dataNodes[addr].Address) + assert.EqualValues(t, 0, len(dataNodes)) +} + +func TestRefresh(t *testing.T) { + cPolicy := newMockStartupPolicy() + ch := make(chan struct{}, 1) + cluster := createCluster(t, nil, withStartupPolicy(cPolicy), clusterOption{ + apply: func(c *cluster) { + c.candidateManager.validate = func(dn *datapb.DataNodeInfo) error { + if strings.Contains(dn.Address, "inv") { + return errors.New("invalid dn") + } + return nil + } + c.candidateManager.enable = func(dn *datapb.DataNodeInfo) error { + err := c.enableDataNode(dn) + ch <- struct{}{} + return err + } + }, + }) + addr := "localhost:8080" + nodes := []*datapb.DataNodeInfo{ + { + Address: addr, + Version: 1, + Channels: []*datapb.ChannelStatus{}, + }, + { + Address: addr + "invalid", + Version: 1, + Channels: []*datapb.ChannelStatus{}, + }, + } + err := cluster.startup(nodes) + assert.Nil(t, err) + <-ch + dataNodes, _ := cluster.dataManager.getDataNodes(true) + if !assert.Equal(t, 1, len(dataNodes)) { + t.FailNow() + } + assert.Equal(t, addr, dataNodes[addr].Address) + addr2 := "localhost:8081" + nodes = []*datapb.DataNodeInfo{ + { + Address: addr2, + Version: 1, + Channels: []*datapb.ChannelStatus{}, + }, + { + Address: addr2 + "invalid", + Version: 1, + Channels: []*datapb.ChannelStatus{}, + }, + } + err = cluster.refresh(nodes) + assert.Nil(t, err) + <-ch + dataNodes, _ = cluster.dataManager.getDataNodes(true) + assert.Equal(t, 1, len(dataNodes)) + _, has := dataNodes[addr] + assert.False(t, has) + assert.Equal(t, addr2, dataNodes[addr2].Address) } func TestWatchIfNeeded(t *testing.T) { cPolicy := newMockStartupPolicy() - cluster := createCluster(t, nil, withStartupPolicy(cPolicy)) + ch := make(chan struct{}, 1) + cluster := createCluster(t, nil, withStartupPolicy(cPolicy), mockValidatorOption(ch)) addr := "localhost:8080" nodes := []*datapb.DataNodeInfo{ { @@ -96,6 +163,7 @@ func TestWatchIfNeeded(t *testing.T) { } err := cluster.startup(nodes) assert.Nil(t, err) + <-ch dataNodes, _ := cluster.dataManager.getDataNodes(true) assert.EqualValues(t, 1, len(dataNodes)) assert.EqualValues(t, "localhost:8080", dataNodes[addr].Address) @@ -112,7 +180,8 @@ func TestWatchIfNeeded(t *testing.T) { func TestFlushSegments(t *testing.T) { cPolicy := newMockStartupPolicy() - cluster := createCluster(t, nil, withStartupPolicy(cPolicy)) + ch := make(chan struct{}, 1) + cluster := createCluster(t, nil, withStartupPolicy(cPolicy), mockValidatorOption(ch)) addr := "localhost:8080" nodes := []*datapb.DataNodeInfo{ { @@ -123,6 +192,7 @@ func TestFlushSegments(t *testing.T) { } err := cluster.startup(nodes) assert.Nil(t, err) + <-ch segments := []*datapb.SegmentInfo{ { ID: 0, @@ -134,6 +204,21 @@ func TestFlushSegments(t *testing.T) { cluster.flush(segments) } +func mockValidatorOption(ch chan<- struct{}) clusterOption { + return clusterOption{ + apply: func(c *cluster) { + c.candidateManager.validate = func(dn *datapb.DataNodeInfo) error { + return nil + } + c.candidateManager.enable = func(dn *datapb.DataNodeInfo) error { + err := c.enableDataNode(dn) + ch <- struct{}{} + return err + } + }, + } +} + func createCluster(t *testing.T, ch chan interface{}, options ...clusterOption) *cluster { kv := memkv.NewMemoryKV() sessionManager := newMockSessionManager(ch) -- GitLab