未验证 提交 87a4ddc7 编写于 作者: W wei liu 提交者: GitHub

fix rg e2e (#22187)

Signed-off-by: NWei Liu <wei.liu@zilliz.com>
上级 5d83781d
......@@ -4514,7 +4514,7 @@ func getErrResponse(err error, method string) *commonpb.Status {
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
ErrorCode: commonpb.ErrorCode_IllegalArgument,
Reason: err.Error(),
}
}
......@@ -4525,13 +4525,6 @@ func (node *Proxy) DropResourceGroup(ctx context.Context, request *milvuspb.Drop
}
method := "DropResourceGroup"
if err := ValidateResourceGroupName(request.GetResourceGroup()); err != nil {
log.Warn("DropResourceGroup failed",
zap.Error(err),
)
return getErrResponse(err, method), nil
}
ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-DropResourceGroup")
defer sp.End()
tr := timerecord.NewTimeRecorder(method)
......
......@@ -301,7 +301,7 @@ func TestProxy_InvalidResourceGroupName(t *testing.T) {
ResourceGroup: "...",
})
assert.NoError(t, err)
assert.Equal(t, resp.ErrorCode, commonpb.ErrorCode_UnexpectedError)
assert.Equal(t, resp.ErrorCode, commonpb.ErrorCode_IllegalArgument)
})
t.Run("drop resource group", func(t *testing.T) {
......@@ -309,7 +309,7 @@ func TestProxy_InvalidResourceGroupName(t *testing.T) {
ResourceGroup: "...",
})
assert.NoError(t, err)
assert.Equal(t, resp.ErrorCode, commonpb.ErrorCode_UnexpectedError)
assert.Equal(t, resp.ErrorCode, commonpb.ErrorCode_Success)
})
t.Run("transfer node", func(t *testing.T) {
......@@ -319,7 +319,7 @@ func TestProxy_InvalidResourceGroupName(t *testing.T) {
NumNode: 1,
})
assert.NoError(t, err)
assert.Equal(t, resp.ErrorCode, commonpb.ErrorCode_UnexpectedError)
assert.Equal(t, resp.ErrorCode, commonpb.ErrorCode_IllegalArgument)
})
t.Run("transfer replica", func(t *testing.T) {
......@@ -330,6 +330,6 @@ func TestProxy_InvalidResourceGroupName(t *testing.T) {
CollectionName: "collection1",
})
assert.NoError(t, err)
assert.Equal(t, resp.ErrorCode, commonpb.ErrorCode_UnexpectedError)
assert.Equal(t, resp.ErrorCode, commonpb.ErrorCode_IllegalArgument)
})
}
......@@ -117,7 +117,7 @@ func validateCollectionNameOrAlias(entity, entityType string) error {
func ValidateResourceGroupName(entity string) error {
if entity == "" {
return fmt.Errorf("resource group name %s should not be empty", entity)
return errors.New("resource group name couldn't be empty")
}
invalidMsg := fmt.Sprintf("Invalid resource group name %s.", entity)
......
......@@ -172,12 +172,6 @@ func (job *LoadCollectionJob) PreExecute() error {
return ErrCollectionLoaded
}
if len(job.nodeMgr.GetAll()) < int(job.req.GetReplicaNumber()) {
msg := "no enough nodes to create replicas"
log.Warn(msg)
return utils.WrapError(msg, ErrNoEnoughNode)
}
return nil
}
......@@ -387,12 +381,6 @@ func (job *LoadPartitionJob) PreExecute() error {
return ErrCollectionLoaded
}
if len(job.nodeMgr.GetAll()) < int(job.req.GetReplicaNumber()) {
msg := "no enough nodes to create replicas"
log.Warn(msg)
return utils.WrapError(msg, ErrNoEnoughNode)
}
return nil
}
......
......@@ -343,7 +343,7 @@ func (suite *JobSuite) TestLoadCollectionWithReplicas() {
)
suite.scheduler.Add(job)
err := job.Wait()
suite.ErrorIs(err, ErrNoEnoughNode)
suite.ErrorContains(err, meta.ErrNodeNotEnough.Error())
}
}
......@@ -605,7 +605,7 @@ func (suite *JobSuite) TestLoadPartitionWithReplicas() {
)
suite.scheduler.Add(job)
err := job.Wait()
suite.ErrorIs(err, ErrNoEnoughNode)
suite.ErrorContains(err, meta.ErrNodeNotEnough.Error())
}
}
......
......@@ -51,6 +51,7 @@ var (
)
var DefaultResourceGroupName = "__default_resource_group"
var DefaultResourceGroupCapacity = 1000000
type ResourceGroup struct {
nodes UniqueSet
......@@ -67,54 +68,30 @@ func NewResourceGroup(capacity int) *ResourceGroup {
}
// assign node to resource group
func (rg *ResourceGroup) assignNode(id int64) error {
func (rg *ResourceGroup) assignNode(id int64, deltaCapacity int) error {
if rg.containsNode(id) {
return ErrNodeAlreadyAssign
}
rg.nodes.Insert(id)
rg.capacity++
rg.capacity += deltaCapacity
return nil
}
// unassign node from resource group
func (rg *ResourceGroup) unassignNode(id int64) error {
func (rg *ResourceGroup) unassignNode(id int64, deltaCapacity int) error {
if !rg.containsNode(id) {
// remove non exist node should be tolerable
return nil
}
rg.nodes.Remove(id)
rg.capacity--
rg.capacity += deltaCapacity
return nil
}
func (rg *ResourceGroup) handleNodeUp(id int64) error {
if rg.containsNode(id) {
return ErrNodeAlreadyAssign
}
if len(rg.nodes) >= rg.capacity {
// handleNodeUp can grow the capacity
rg.capacity++
}
rg.nodes.Insert(id)
return nil
}
func (rg *ResourceGroup) handleNodeDown(id int64) error {
if !rg.containsNode(id) {
// remove non exist node should be tolerable
return nil
}
rg.nodes.Remove(id)
return nil
}
func (rg *ResourceGroup) LackOfNodes() int {
return rg.capacity - len(rg.nodes)
}
......@@ -141,7 +118,7 @@ type ResourceManager struct {
func NewResourceManager(store Store, nodeMgr *session.NodeManager) *ResourceManager {
groupMap := make(map[string]*ResourceGroup)
groupMap[DefaultResourceGroupName] = NewResourceGroup(0)
groupMap[DefaultResourceGroupName] = NewResourceGroup(DefaultResourceGroupCapacity)
return &ResourceManager{
groups: groupMap,
store: store,
......@@ -241,9 +218,14 @@ func (rm *ResourceManager) assignNode(rgName string, node int64) error {
newNodes := rm.groups[rgName].GetNodes()
newNodes = append(newNodes, node)
deltaCapacity := 1
if rgName == DefaultResourceGroupName {
// default rg capacity won't be changed
deltaCapacity = 0
}
err := rm.store.SaveResourceGroup(&querypb.ResourceGroup{
Name: rgName,
Capacity: int32(rm.groups[rgName].GetCapacity()) + 1,
Capacity: int32(rm.groups[rgName].GetCapacity() + deltaCapacity),
Nodes: newNodes,
})
if err != nil {
......@@ -255,7 +237,7 @@ func (rm *ResourceManager) assignNode(rgName string, node int64) error {
return err
}
err = rm.groups[rgName].assignNode(node)
err = rm.groups[rgName].assignNode(node, deltaCapacity)
if err != nil {
return err
}
......@@ -290,7 +272,7 @@ func (rm *ResourceManager) unassignNode(rgName string, node int64) error {
return ErrRGNotExist
}
if rm.nodeMgr.Get(node) == nil {
if rm.nodeMgr.Get(node) == nil || !rm.groups[rgName].containsNode(node) {
// remove non exist node should be tolerable
return nil
}
......@@ -302,9 +284,15 @@ func (rm *ResourceManager) unassignNode(rgName string, node int64) error {
}
}
deltaCapacity := -1
if rgName == DefaultResourceGroupName {
// default rg capacity won't be changed
deltaCapacity = 0
}
err := rm.store.SaveResourceGroup(&querypb.ResourceGroup{
Name: rgName,
Capacity: int32(rm.groups[rgName].GetCapacity()) - 1,
Capacity: int32(rm.groups[rgName].GetCapacity() + deltaCapacity),
Nodes: newNodes,
})
if err != nil {
......@@ -317,7 +305,7 @@ func (rm *ResourceManager) unassignNode(rgName string, node int64) error {
}
rm.checkRGNodeStatus(rgName)
err = rm.groups[rgName].unassignNode(node)
err = rm.groups[rgName].unassignNode(node, deltaCapacity)
if err != nil {
return err
}
......@@ -461,7 +449,7 @@ func (rm *ResourceManager) HandleNodeUp(node int64) (string, error) {
}
// assign new node to default rg
rm.groups[DefaultResourceGroupName].handleNodeUp(node)
rm.groups[DefaultResourceGroupName].assignNode(node, 0)
log.Info("HandleNodeUp: add node to default resource group",
zap.String("rgName", DefaultResourceGroupName),
zap.Int64("node", node),
......@@ -483,7 +471,7 @@ func (rm *ResourceManager) HandleNodeDown(node int64) (string, error) {
zap.String("rgName", rgName),
zap.Int64("node", node),
)
return rgName, rm.groups[rgName].handleNodeDown(node)
return rgName, rm.groups[rgName].unassignNode(node, 0)
}
return "", ErrNodeNotAssignToRG
......@@ -509,14 +497,23 @@ func (rm *ResourceManager) TransferNode(from string, to string, numNode int) err
return err
}
deltaFromCapacity := -1
if from == DefaultResourceGroupName {
deltaFromCapacity = 0
}
deltaToCapacity := 1
if to == DefaultResourceGroupName {
deltaToCapacity = 0
}
for _, node := range movedNodes {
err := rm.groups[from].unassignNode(node)
err := rm.groups[from].unassignNode(node, deltaFromCapacity)
if err != nil {
// interrupt transfer, unreachable logic path
return err
}
err = rm.groups[to].assignNode(node)
err = rm.groups[to].assignNode(node, deltaToCapacity)
if err != nil {
// interrupt transfer, unreachable logic path
return err
......@@ -544,15 +541,27 @@ func (rm *ResourceManager) transferNodeInStore(from string, to string, numNode i
}
}
fromCapacity := rm.groups[from].GetCapacity()
if from != DefaultResourceGroupName {
// default rg capacity won't be changed
fromCapacity = rm.groups[from].GetCapacity() - numNode
}
fromRG := &querypb.ResourceGroup{
Name: from,
Capacity: int32(rm.groups[from].GetCapacity() - numNode),
Capacity: int32(fromCapacity),
Nodes: fromNodeList,
}
toCapacity := rm.groups[to].GetCapacity()
if from != DefaultResourceGroupName {
// default rg capacity won't be changed
toCapacity = rm.groups[to].GetCapacity() + numNode
}
toRG := &querypb.ResourceGroup{
Name: to,
Capacity: int32(rm.groups[to].GetCapacity() + numNode),
Capacity: int32(toCapacity),
Nodes: toNodeList,
}
......@@ -580,7 +589,7 @@ func (rm *ResourceManager) AutoRecoverResourceGroup(rgName string) (int, error)
return i + 1, err
}
err = rm.groups[rgName].handleNodeUp(node)
err = rm.groups[rgName].assignNode(node, 0)
if err != nil {
// roll back, unreachable logic path
rm.assignNode(DefaultResourceGroupName, node)
......@@ -599,10 +608,18 @@ func (rm *ResourceManager) Recover() error {
}
for _, rg := range rgs {
rm.groups[rg.GetName()] = NewResourceGroup(0)
for _, node := range rg.GetNodes() {
rm.groups[rg.GetName()].assignNode(node)
if rg.GetName() == DefaultResourceGroupName {
rm.groups[rg.GetName()] = NewResourceGroup(DefaultResourceGroupCapacity)
for _, node := range rg.GetNodes() {
rm.groups[rg.GetName()].assignNode(node, 0)
}
} else {
rm.groups[rg.GetName()] = NewResourceGroup(0)
for _, node := range rg.GetNodes() {
rm.groups[rg.GetName()].assignNode(node, 1)
}
}
rm.checkRGNodeStatus(rg.GetName())
log.Info("Recover resource group",
zap.String("rgName", rg.GetName()),
......@@ -623,7 +640,7 @@ func (rm *ResourceManager) checkRGNodeStatus(rgName string) {
zap.Int64("nodeID", node),
)
rm.groups[rgName].handleNodeDown(node)
rm.groups[rgName].unassignNode(node, 0)
}
}
}
......
......@@ -111,7 +111,6 @@ func (suite *ResourceManagerSuite) TestManipulateNode() {
err = suite.manager.AssignNode("rg1", 1)
suite.NoError(err)
err = suite.manager.AssignNode("rg2", 1)
println(err.Error())
suite.ErrorIs(err, ErrNodeAlreadyAssign)
// transfer node between rgs
......@@ -175,14 +174,14 @@ func (suite *ResourceManagerSuite) TestHandleNodeUp() {
suite.NoError(err)
defaultRG, err := suite.manager.GetResourceGroup(DefaultResourceGroupName)
suite.NoError(err)
suite.Equal(0, defaultRG.GetCapacity())
suite.Equal(DefaultResourceGroupCapacity, defaultRG.GetCapacity())
suite.manager.HandleNodeUp(101)
rg, err = suite.manager.GetResourceGroup("rg1")
suite.NoError(err)
suite.Equal(rg.GetCapacity(), 3)
suite.Equal(len(rg.GetNodes()), 2)
suite.False(suite.manager.ContainsNode("rg1", 101))
suite.Equal(1, defaultRG.GetCapacity())
suite.Equal(DefaultResourceGroupCapacity, defaultRG.GetCapacity())
}
func (suite *ResourceManagerSuite) TestRecover() {
......@@ -311,13 +310,13 @@ func (suite *ResourceManagerSuite) TestDefaultResourceGroup() {
}
defaultRG, err := suite.manager.GetResourceGroup(DefaultResourceGroupName)
suite.NoError(err)
suite.Equal(defaultRG.GetCapacity(), 0)
suite.Equal(defaultRG.GetCapacity(), DefaultResourceGroupCapacity)
suite.Len(defaultRG.GetNodes(), 0)
suite.manager.HandleNodeUp(1)
suite.manager.HandleNodeUp(2)
suite.manager.HandleNodeUp(3)
suite.Equal(defaultRG.GetCapacity(), 3)
suite.Equal(defaultRG.GetCapacity(), DefaultResourceGroupCapacity)
suite.Len(defaultRG.GetNodes(), 3)
// shutdown node 1 and 2
......@@ -326,18 +325,18 @@ func (suite *ResourceManagerSuite) TestDefaultResourceGroup() {
defaultRG, err = suite.manager.GetResourceGroup(DefaultResourceGroupName)
suite.NoError(err)
suite.Equal(defaultRG.GetCapacity(), 3)
suite.Equal(defaultRG.GetCapacity(), DefaultResourceGroupCapacity)
suite.Len(defaultRG.GetNodes(), 1)
suite.manager.HandleNodeUp(4)
suite.manager.HandleNodeUp(5)
suite.Equal(defaultRG.GetCapacity(), 3)
suite.Equal(defaultRG.GetCapacity(), DefaultResourceGroupCapacity)
suite.Len(defaultRG.GetNodes(), 3)
suite.manager.HandleNodeUp(7)
suite.manager.HandleNodeUp(8)
suite.manager.HandleNodeUp(9)
suite.Equal(defaultRG.GetCapacity(), 6)
suite.Equal(defaultRG.GetCapacity(), DefaultResourceGroupCapacity)
suite.Len(defaultRG.GetNodes(), 6)
}
......
......@@ -1060,12 +1060,19 @@ func (s *Server) TransferReplica(ctx context.Context, req *querypb.TransferRepli
fmt.Sprintf("the target resource group[%s] doesn't exist", req.GetTargetResourceGroup()), meta.ErrRGNotExist), nil
}
replicas := s.meta.ReplicaManager.GetByCollectionAndRG(req.GetCollectionID(), req.GetTargetResourceGroup())
if len(replicas) > 0 {
return utils.WrapStatus(commonpb.ErrorCode_IllegalArgument,
fmt.Sprintf("found [%d] replicas of same collection in target resource group[%s], dynamically increase replica num is unsupported",
len(replicas), req.GetSourceResourceGroup())), nil
}
// for now, we don't support to transfer replica of same collection to same resource group
replicas := s.meta.ReplicaManager.GetByCollectionAndRG(req.GetCollectionID(), req.GetSourceResourceGroup())
replicas = s.meta.ReplicaManager.GetByCollectionAndRG(req.GetCollectionID(), req.GetSourceResourceGroup())
if len(replicas) < int(req.GetNumReplica()) {
return utils.WrapStatus(commonpb.ErrorCode_IllegalArgument,
fmt.Sprintf("found [%d] replicas of collection[%d] in source resource group[%s]",
len(replicas), req.GetCollectionID(), req.GetSourceResourceGroup())), nil
fmt.Sprintf("only found [%d] replicas in source resource group[%s]",
len(replicas), req.GetSourceResourceGroup())), nil
}
err := s.transferReplica(req.GetTargetResourceGroup(), replicas[:req.GetNumReplica()])
......
......@@ -561,7 +561,7 @@ func (suite *ServiceSuite) TestTransferReplica() {
NumReplica: 2,
})
suite.NoError(err)
suite.Contains(resp.Reason, "found [0] replicas of collection[1] in source resource group")
suite.Contains(resp.Reason, "only found [0] replicas in source resource group")
resp, err = suite.server.TransferReplica(ctx, &querypb.TransferReplicaRequest{
SourceResourceGroup: "rgg",
......@@ -591,6 +591,11 @@ func (suite *ServiceSuite) TestTransferReplica() {
ID: 222,
ResourceGroup: meta.DefaultResourceGroupName,
}, typeutil.NewUniqueSet(2)))
suite.server.meta.Put(meta.NewReplica(&querypb.Replica{
CollectionID: 1,
ID: 333,
ResourceGroup: meta.DefaultResourceGroupName,
}, typeutil.NewUniqueSet(3)))
suite.server.nodeMgr.Add(session.NewNodeInfo(1001, "localhost"))
suite.server.nodeMgr.Add(session.NewNodeInfo(1002, "localhost"))
......@@ -611,6 +616,14 @@ func (suite *ServiceSuite) TestTransferReplica() {
suite.NoError(err)
suite.Equal(resp.ErrorCode, commonpb.ErrorCode_Success)
suite.Len(suite.server.meta.GetByResourceGroup("rg3"), 2)
resp, err = suite.server.TransferReplica(ctx, &querypb.TransferReplicaRequest{
SourceResourceGroup: meta.DefaultResourceGroupName,
TargetResourceGroup: "rg3",
CollectionID: 1,
NumReplica: 2,
})
suite.NoError(err)
suite.Contains(resp.Reason, "dynamically increase replica num is unsupported")
// server unhealthy
server.status.Store(commonpb.StateCode_Abnormal)
......
......@@ -33,7 +33,7 @@ var (
ErrGetNodesFromRG = errors.New("failed to get node from rg")
ErrNoReplicaFound = errors.New("no replica found during assign nodes")
ErrReplicasInconsistent = errors.New("all replicas should belong to same collection during assign nodes")
ErrUseWrongNumRG = errors.New("resource num can only be 0, 1 or same as replica number")
ErrUseWrongNumRG = errors.New("resource group num can only be 0, 1 or same as replica number")
)
func GetReplicaNodesInfo(replicaMgr *meta.ReplicaManager, nodeMgr *session.NodeManager, replicaID int64) []*session.NodeInfo {
......
......@@ -215,11 +215,11 @@ class ResponseChecker:
if check_items.get("num_available_node", None):
assert rg.num_available_node == check_items["num_available_node"]
if check_items.get("num_loaded_replica", None):
assert rg.num_loaded_replica == check_items["num_loaded_replica"]
assert dict(rg.num_loaded_replica).items() >= check_items["num_loaded_replica"].items()
if check_items.get("num_outgoing_node", None):
assert rg.num_outgoing_node == check_items["num_outgoing_node"]
assert dict(rg.num_outgoing_node).items() >= check_items["num_outgoing_node"].items()
if check_items.get("num_incoming_node", None):
assert rg.num_incoming_node == check_items["num_incoming_node"]
assert dict(rg.num_incoming_node).items() >= check_items["num_incoming_node"].items()
return True
@staticmethod
......
......@@ -38,6 +38,7 @@ another_float_vec_field_name = "float_vector1"
default_binary_vec_field_name = "binary_vector"
default_partition_name = "_default"
default_resource_group_name = '__default_resource_group'
default_resource_group_capacity = 1000000
default_tag = "1970_01_01"
row_count = "row_count"
default_length = 65535
......
......@@ -12,7 +12,7 @@ allure-pytest==2.7.0
pytest-print==0.2.1
pytest-level==0.1.1
pytest-xdist==2.5.0
pymilvus==2.3.0.dev34
pymilvus==2.3.0.dev35
pytest-rerunfailures==9.1.1
git+https://github.com/Projectplace/pytest-tags
ndg-httpsclient
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册