未验证 提交 a2b383b5 编写于 作者: J jaime 提交者: GitHub

Fix alter collection hang (#20686)

Signed-off-by: Nyun.zhang <yun.zhang@zilliz.com>
Signed-off-by: Nyun.zhang <yun.zhang@zilliz.com>
上级 c65306bc
......@@ -1389,8 +1389,7 @@ func (s *Server) MarkSegmentsDropped(ctx context.Context, req *datapb.MarkSegmen
}, nil
}
func (s *Server) BroadcastAlteredCollection(ctx context.Context,
req *milvuspb.AlterCollectionRequest) (*commonpb.Status, error) {
func (s *Server) BroadcastAlteredCollection(ctx context.Context, req *datapb.AlterCollectionRequest) (*commonpb.Status, error) {
errResp := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "",
......@@ -1405,26 +1404,26 @@ func (s *Server) BroadcastAlteredCollection(ctx context.Context,
// get collection info from cache
clonedColl := s.meta.GetClonedCollectionInfo(req.CollectionID)
// try to reload collection from RootCoord
if clonedColl == nil {
err := s.loadCollectionFromRootCoord(ctx, req.CollectionID)
if err != nil {
log.Warn("failed to load collection from rootcoord", zap.Int64("collectionID", req.CollectionID), zap.Error(err))
errResp.Reason = fmt.Sprintf("failed to load collection from rootcoord, collectionID:%d", req.CollectionID)
return errResp, nil
}
}
clonedColl = s.meta.GetClonedCollectionInfo(req.CollectionID)
if clonedColl == nil {
return nil, fmt.Errorf("get collection from cache failed, collectionID:%d", req.CollectionID)
}
properties := make(map[string]string)
for _, pair := range req.Properties {
properties[pair.GetKey()] = pair.GetValue()
}
// cache miss and update cache
if clonedColl == nil {
collInfo := &collectionInfo{
ID: req.GetCollectionID(),
Schema: req.GetSchema(),
Partitions: req.GetPartitionIDs(),
StartPositions: req.GetStartPositions(),
Properties: properties,
}
s.meta.AddCollection(collInfo)
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}, nil
}
clonedColl.Properties = properties
s.meta.AddCollection(clonedColl)
return &commonpb.Status{
......
package datacoord
import (
"context"
"testing"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/stretchr/testify/assert"
)
func TestBroadcastAlteredCollection(t *testing.T) {
t.Run("test server is closed", func(t *testing.T) {
s := &Server{}
s.stateCode.Store(commonpb.StateCode_Initializing)
ctx := context.Background()
resp, err := s.BroadcastAlteredCollection(ctx, nil)
assert.NotNil(t, resp.Reason)
assert.Nil(t, err)
})
t.Run("test meta non exist", func(t *testing.T) {
s := &Server{meta: &meta{collections: make(map[UniqueID]*collectionInfo, 1)}}
s.stateCode.Store(commonpb.StateCode_Healthy)
ctx := context.Background()
req := &datapb.AlterCollectionRequest{
CollectionID: 1,
PartitionIDs: []int64{1},
Properties: []*commonpb.KeyValuePair{{Key: "k", Value: "v"}},
}
resp, err := s.BroadcastAlteredCollection(ctx, req)
assert.NotNil(t, resp)
assert.NoError(t, err)
assert.Equal(t, 1, len(s.meta.collections))
})
t.Run("test update meta", func(t *testing.T) {
s := &Server{meta: &meta{collections: map[UniqueID]*collectionInfo{
1: {ID: 1},
}}}
s.stateCode.Store(commonpb.StateCode_Healthy)
ctx := context.Background()
req := &datapb.AlterCollectionRequest{
CollectionID: 1,
PartitionIDs: []int64{1},
Properties: []*commonpb.KeyValuePair{{Key: "k", Value: "v"}},
}
assert.Nil(t, s.meta.collections[1].Properties)
resp, err := s.BroadcastAlteredCollection(ctx, req)
assert.NotNil(t, resp)
assert.NoError(t, err)
assert.NotNil(t, s.meta.collections[1].Properties)
})
}
......@@ -277,7 +277,7 @@ func (ds *DataCoordFactory) MarkSegmentsDropped(context.Context, *datapb.MarkSeg
}, nil
}
func (ds *DataCoordFactory) BroadcastAlteredCollection(ctx context.Context, req *milvuspb.AlterCollectionRequest) (*commonpb.Status, error) {
func (ds *DataCoordFactory) BroadcastAlteredCollection(ctx context.Context, req *datapb.AlterCollectionRequest) (*commonpb.Status, error) {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}, nil
......
......@@ -770,12 +770,8 @@ func (c *Client) MarkSegmentsDropped(ctx context.Context, req *datapb.MarkSegmen
}
// BroadcastAlteredCollection is the DataCoord client side code for BroadcastAlteredCollection call.
func (c *Client) BroadcastAlteredCollection(ctx context.Context, req *milvuspb.AlterCollectionRequest) (*commonpb.Status, error) {
func (c *Client) BroadcastAlteredCollection(ctx context.Context, req *datapb.AlterCollectionRequest) (*commonpb.Status, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.DataCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
)
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
......
......@@ -403,7 +403,7 @@ func (s *Server) MarkSegmentsDropped(ctx context.Context, req *datapb.MarkSegmen
return s.dataCoord.MarkSegmentsDropped(ctx, req)
}
func (s *Server) BroadcastAlteredCollection(ctx context.Context, request *milvuspb.AlterCollectionRequest) (*commonpb.Status, error) {
func (s *Server) BroadcastAlteredCollection(ctx context.Context, request *datapb.AlterCollectionRequest) (*commonpb.Status, error) {
return s.dataCoord.BroadcastAlteredCollection(ctx, request)
}
......
......@@ -225,7 +225,7 @@ func (m *MockDataCoord) MarkSegmentsDropped(ctx context.Context, req *datapb.Mar
return m.markSegmentsDroppedResp, m.err
}
func (m *MockDataCoord) BroadcastAlteredCollection(ctx context.Context, req *milvuspb.AlterCollectionRequest) (*commonpb.Status, error) {
func (m *MockDataCoord) BroadcastAlteredCollection(ctx context.Context, req *datapb.AlterCollectionRequest) (*commonpb.Status, error) {
return m.broadCastResp, m.err
}
......
......@@ -604,7 +604,7 @@ func (m *MockDataCoord) ReleaseSegmentLock(ctx context.Context, req *datapb.Rele
return nil, nil
}
func (m *MockDataCoord) BroadcastAlteredCollection(ctx context.Context, req *milvuspb.AlterCollectionRequest) (*commonpb.Status, error) {
func (m *MockDataCoord) BroadcastAlteredCollection(ctx context.Context, req *datapb.AlterCollectionRequest) (*commonpb.Status, error) {
return nil, nil
}
......
......@@ -66,7 +66,7 @@ service DataCoord {
rpc UnsetIsImportingState(UnsetIsImportingStateRequest) returns(common.Status) {}
rpc MarkSegmentsDropped(MarkSegmentsDroppedRequest) returns(common.Status) {}
rpc BroadcastAlteredCollection(milvus.AlterCollectionRequest) returns (common.Status) {}
rpc BroadcastAlteredCollection(AlterCollectionRequest) returns (common.Status) {}
rpc CheckHealth(milvus.CheckHealthRequest) returns (milvus.CheckHealthResponse) {}
}
......@@ -635,3 +635,12 @@ message SegmentReferenceLock {
int64 nodeID = 2;
repeated int64 segmentIDs = 3;
}
message AlterCollectionRequest {
int64 collectionID = 1;
schema.CollectionSchema schema = 2;
repeated int64 partitionIDs = 3;
repeated common.KeyDataPair start_positions = 4;
repeated common.KeyValuePair properties = 5;
}
......@@ -127,7 +127,7 @@ func (coord *DataCoordMock) MarkSegmentsDropped(ctx context.Context, req *datapb
panic("implement me")
}
func (coord *DataCoordMock) BroadcastAlteredCollection(ctx context.Context, req *milvuspb.AlterCollectionRequest) (*commonpb.Status, error) {
func (coord *DataCoordMock) BroadcastAlteredCollection(ctx context.Context, req *datapb.AlterCollectionRequest) (*commonpb.Status, error) {
panic("implement me")
}
......
......@@ -5,6 +5,9 @@ import (
"errors"
"fmt"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
......@@ -234,7 +237,30 @@ func (b *ServerBroker) GetSegmentIndexState(ctx context.Context, collID UniqueID
func (b *ServerBroker) BroadcastAlteredCollection(ctx context.Context, req *milvuspb.AlterCollectionRequest) error {
log.Info("broadcasting request to alter collection", zap.String("collection name", req.GetCollectionName()), zap.Int64("collection id", req.GetCollectionID()))
resp, err := b.s.dataCoord.BroadcastAlteredCollection(ctx, req)
colMeta, err := b.s.meta.GetCollectionByID(ctx, req.GetCollectionID(), typeutil.MaxTimestamp)
if err != nil {
return err
}
partitionIDs := make([]int64, len(colMeta.Partitions))
for _, p := range colMeta.Partitions {
partitionIDs = append(partitionIDs, p.PartitionID)
}
dcReq := &datapb.AlterCollectionRequest{
CollectionID: req.GetCollectionID(),
Schema: &schemapb.CollectionSchema{
Name: colMeta.Name,
Description: colMeta.Description,
AutoID: colMeta.AutoID,
Fields: model.MarshalFieldModels(colMeta.Fields),
},
PartitionIDs: partitionIDs,
StartPositions: colMeta.StartPositions,
Properties: req.GetProperties(),
}
resp, err := b.s.dataCoord.BroadcastAlteredCollection(ctx, dcReq)
if err != nil {
return err
}
......
......@@ -2,8 +2,11 @@ package rootcoord
import (
"context"
"errors"
"testing"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
......@@ -280,8 +283,43 @@ func TestServerBroker_GetSegmentIndexState(t *testing.T) {
}
func TestServerBroker_BroadcastAlteredCollection(t *testing.T) {
collMeta := &model.Collection{
CollectionID: 1,
StartPositions: []*commonpb.KeyDataPair{
{
Key: "0",
Data: []byte("0"),
},
},
Partitions: []*model.Partition{
{
PartitionID: 2,
PartitionName: "test_partition_name_1",
PartitionCreatedTimestamp: 0,
},
},
}
t.Run("get meta fail", func(t *testing.T) {
c := newTestCore(withInvalidDataCoord())
c.meta = &mockMetaTable{
GetCollectionByIDFunc: func(ctx context.Context, collectionID UniqueID, ts Timestamp) (*model.Collection, error) {
return nil, errors.New("err")
},
}
b := newServerBroker(c)
ctx := context.Background()
err := b.BroadcastAlteredCollection(ctx, &milvuspb.AlterCollectionRequest{})
assert.Error(t, err)
})
t.Run("failed to execute", func(t *testing.T) {
c := newTestCore(withInvalidDataCoord())
c.meta = &mockMetaTable{
GetCollectionByIDFunc: func(ctx context.Context, collectionID UniqueID, ts Timestamp) (*model.Collection, error) {
return collMeta, nil
},
}
b := newServerBroker(c)
ctx := context.Background()
err := b.BroadcastAlteredCollection(ctx, &milvuspb.AlterCollectionRequest{})
......@@ -290,6 +328,11 @@ func TestServerBroker_BroadcastAlteredCollection(t *testing.T) {
t.Run("non success error code on execute", func(t *testing.T) {
c := newTestCore(withFailedDataCoord())
c.meta = &mockMetaTable{
GetCollectionByIDFunc: func(ctx context.Context, collectionID UniqueID, ts Timestamp) (*model.Collection, error) {
return collMeta, nil
},
}
b := newServerBroker(c)
ctx := context.Background()
err := b.BroadcastAlteredCollection(ctx, &milvuspb.AlterCollectionRequest{})
......@@ -298,9 +341,18 @@ func TestServerBroker_BroadcastAlteredCollection(t *testing.T) {
t.Run("success", func(t *testing.T) {
c := newTestCore(withValidDataCoord())
c.meta = &mockMetaTable{
GetCollectionByIDFunc: func(ctx context.Context, collectionID UniqueID, ts Timestamp) (*model.Collection, error) {
return collMeta, nil
},
}
b := newServerBroker(c)
ctx := context.Background()
err := b.BroadcastAlteredCollection(ctx, &milvuspb.AlterCollectionRequest{})
req := &milvuspb.AlterCollectionRequest{
CollectionID: 1,
}
err := b.BroadcastAlteredCollection(ctx, req)
assert.NoError(t, err)
})
}
......@@ -162,7 +162,7 @@ type mockDataCoord struct {
FlushFunc func(ctx context.Context, req *datapb.FlushRequest) (*datapb.FlushResponse, error)
ImportFunc func(ctx context.Context, req *datapb.ImportTaskRequest) (*datapb.ImportTaskResponse, error)
UnsetIsImportingStateFunc func(ctx context.Context, req *datapb.UnsetIsImportingStateRequest) (*commonpb.Status, error)
broadCastAlteredCollectionFunc func(ctx context.Context, req *milvuspb.AlterCollectionRequest) (*commonpb.Status, error)
broadCastAlteredCollectionFunc func(ctx context.Context, req *datapb.AlterCollectionRequest) (*commonpb.Status, error)
}
func newMockDataCoord() *mockDataCoord {
......@@ -197,7 +197,7 @@ func (m *mockDataCoord) UnsetIsImportingState(ctx context.Context, req *datapb.U
return m.UnsetIsImportingStateFunc(ctx, req)
}
func (m *mockDataCoord) BroadcastAlteredCollection(ctx context.Context, req *milvuspb.AlterCollectionRequest) (*commonpb.Status, error) {
func (m *mockDataCoord) BroadcastAlteredCollection(ctx context.Context, req *datapb.AlterCollectionRequest) (*commonpb.Status, error) {
return m.broadCastAlteredCollectionFunc(ctx, req)
}
......@@ -623,7 +623,7 @@ func withInvalidDataCoord() Opt {
dc.UnsetIsImportingStateFunc = func(ctx context.Context, req *datapb.UnsetIsImportingStateRequest) (*commonpb.Status, error) {
return nil, errors.New("error mock UnsetIsImportingState")
}
dc.broadCastAlteredCollectionFunc = func(ctx context.Context, req *milvuspb.AlterCollectionRequest) (*commonpb.Status, error) {
dc.broadCastAlteredCollectionFunc = func(ctx context.Context, req *datapb.AlterCollectionRequest) (*commonpb.Status, error) {
return nil, errors.New("error mock broadCastAlteredCollection")
}
return withDataCoord(dc)
......@@ -664,7 +664,7 @@ func withFailedDataCoord() Opt {
Reason: "mock UnsetIsImportingState error",
}, nil
}
dc.broadCastAlteredCollectionFunc = func(ctx context.Context, req *milvuspb.AlterCollectionRequest) (*commonpb.Status, error) {
dc.broadCastAlteredCollectionFunc = func(ctx context.Context, req *datapb.AlterCollectionRequest) (*commonpb.Status, error) {
return failStatus(commonpb.ErrorCode_UnexpectedError, "mock broadcast altered collection error"), nil
}
return withDataCoord(dc)
......@@ -702,7 +702,7 @@ func withValidDataCoord() Opt {
dc.UnsetIsImportingStateFunc = func(ctx context.Context, req *datapb.UnsetIsImportingStateRequest) (*commonpb.Status, error) {
return succStatus(), nil
}
dc.broadCastAlteredCollectionFunc = func(ctx context.Context, req *milvuspb.AlterCollectionRequest) (*commonpb.Status, error) {
dc.broadCastAlteredCollectionFunc = func(ctx context.Context, req *datapb.AlterCollectionRequest) (*commonpb.Status, error) {
return succStatus(), nil
}
return withDataCoord(dc)
......
......@@ -327,7 +327,7 @@ type DataCoord interface {
// MarkSegmentsDropped marks the given segments as `dropped` state.
MarkSegmentsDropped(ctx context.Context, req *datapb.MarkSegmentsDroppedRequest) (*commonpb.Status, error)
BroadcastAlteredCollection(ctx context.Context, req *milvuspb.AlterCollectionRequest) (*commonpb.Status, error)
BroadcastAlteredCollection(ctx context.Context, req *datapb.AlterCollectionRequest) (*commonpb.Status, error)
CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error)
}
......
......@@ -169,7 +169,7 @@ func (m *GrpcDataCoordClient) MarkSegmentsDropped(context.Context, *datapb.MarkS
return &commonpb.Status{}, m.Err
}
func (m *GrpcDataCoordClient) BroadcastAlteredCollection(ctx context.Context, in *milvuspb.AlterCollectionRequest, opts ...grpc.CallOption) (*commonpb.Status, error) {
func (m *GrpcDataCoordClient) BroadcastAlteredCollection(ctx context.Context, in *datapb.AlterCollectionRequest, opts ...grpc.CallOption) (*commonpb.Status, error) {
return &commonpb.Status{}, m.Err
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册