未验证 提交 4a32b842 编写于 作者: Z zhenshan.cao 提交者: GitHub

Improve the check logic of channel remove (#23473)

Signed-off-by: Nzhenshan.cao <zhenshan.cao@zilliz.com>
上级 3ad9ff7a
...@@ -243,7 +243,7 @@ func (c *ChannelManager) unwatchDroppedChannels() { ...@@ -243,7 +243,7 @@ func (c *ChannelManager) unwatchDroppedChannels() {
nodeChannels := c.store.GetNodesChannels() nodeChannels := c.store.GetNodesChannels()
for _, nodeChannel := range nodeChannels { for _, nodeChannel := range nodeChannels {
for _, ch := range nodeChannel.Channels { for _, ch := range nodeChannel.Channels {
if !c.h.CheckShouldDropChannel(ch.Name) { if !c.h.CheckShouldDropChannel(ch.Name, ch.CollectionID) {
continue continue
} }
err := c.remove(nodeChannel.NodeID, ch) err := c.remove(nodeChannel.NodeID, ch)
...@@ -766,7 +766,7 @@ func (c *ChannelManager) Reassign(originNodeID UniqueID, channelName string) err ...@@ -766,7 +766,7 @@ func (c *ChannelManager) Reassign(originNodeID UniqueID, channelName string) err
reallocates := &NodeChannelInfo{originNodeID, []*channel{ch}} reallocates := &NodeChannelInfo{originNodeID, []*channel{ch}}
if c.isMarkedDrop(channelName) { if c.isMarkedDrop(channelName, ch.CollectionID) {
if err := c.remove(originNodeID, ch); err != nil { if err := c.remove(originNodeID, ch); err != nil {
return fmt.Errorf("failed to remove watch info: %v,%s", ch, err.Error()) return fmt.Errorf("failed to remove watch info: %v,%s", ch, err.Error())
} }
...@@ -813,7 +813,7 @@ func (c *ChannelManager) CleanupAndReassign(nodeID UniqueID, channelName string) ...@@ -813,7 +813,7 @@ func (c *ChannelManager) CleanupAndReassign(nodeID UniqueID, channelName string)
reallocates := &NodeChannelInfo{nodeID, []*channel{chToCleanUp}} reallocates := &NodeChannelInfo{nodeID, []*channel{chToCleanUp}}
if c.isMarkedDrop(channelName) { if c.isMarkedDrop(channelName, chToCleanUp.CollectionID) {
if err := c.remove(nodeID, chToCleanUp); err != nil { if err := c.remove(nodeID, chToCleanUp); err != nil {
return fmt.Errorf("failed to remove watch info: %v,%s", chToCleanUp, err.Error()) return fmt.Errorf("failed to remove watch info: %v,%s", chToCleanUp, err.Error())
} }
...@@ -871,8 +871,8 @@ func (c *ChannelManager) getNodeIDByChannelName(chName string) (bool, UniqueID) ...@@ -871,8 +871,8 @@ func (c *ChannelManager) getNodeIDByChannelName(chName string) (bool, UniqueID)
return false, 0 return false, 0
} }
func (c *ChannelManager) isMarkedDrop(channelName string) bool { func (c *ChannelManager) isMarkedDrop(channelName string, collectionID UniqueID) bool {
return c.h.CheckShouldDropChannel(channelName) return c.h.CheckShouldDropChannel(channelName, collectionID)
} }
func getReleaseOp(nodeID UniqueID, ch *channel) ChannelOpSet { func getReleaseOp(nodeID UniqueID, ch *channel) ChannelOpSet {
......
...@@ -18,6 +18,7 @@ package datacoord ...@@ -18,6 +18,7 @@ package datacoord
import ( import (
"context" "context"
"time"
"github.com/milvus-io/milvus/pkg/util/tsoutil" "github.com/milvus-io/milvus/pkg/util/tsoutil"
"go.uber.org/zap" "go.uber.org/zap"
...@@ -27,6 +28,7 @@ import ( ...@@ -27,6 +28,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/retry"
"github.com/milvus-io/milvus/pkg/util/typeutil" "github.com/milvus-io/milvus/pkg/util/typeutil"
) )
...@@ -36,7 +38,7 @@ type Handler interface { ...@@ -36,7 +38,7 @@ type Handler interface {
GetQueryVChanPositions(channel *channel, partitionID UniqueID) *datapb.VchannelInfo GetQueryVChanPositions(channel *channel, partitionID UniqueID) *datapb.VchannelInfo
// GetDataVChanPositions gets the information recovery needed of a channel for DataNode // GetDataVChanPositions gets the information recovery needed of a channel for DataNode
GetDataVChanPositions(channel *channel, partitionID UniqueID) *datapb.VchannelInfo GetDataVChanPositions(channel *channel, partitionID UniqueID) *datapb.VchannelInfo
CheckShouldDropChannel(channel string) bool CheckShouldDropChannel(channel string, collectionID UniqueID) bool
FinishDropChannel(channel string) error FinishDropChannel(channel string) error
GetCollection(ctx context.Context, collectionID UniqueID) (*collectionInfo, error) GetCollection(ctx context.Context, collectionID UniqueID) (*collectionInfo, error)
} }
...@@ -316,6 +318,26 @@ func trimSegmentInfo(info *datapb.SegmentInfo) *datapb.SegmentInfo { ...@@ -316,6 +318,26 @@ func trimSegmentInfo(info *datapb.SegmentInfo) *datapb.SegmentInfo {
} }
} }
// HasCollection returns whether the collection exist from user's perspective.
func (h *ServerHandler) HasCollection(ctx context.Context, collectionID UniqueID) (bool, error) {
var hasCollection bool
ctx2, cancel := context.WithTimeout(ctx, time.Minute*30)
defer cancel()
if err := retry.Do(ctx2, func() error {
has, err := h.s.hasCollection(ctx2, collectionID)
if err != nil {
log.RatedInfo(60, "datacoord ServerHandler HasCollection retry failed", zap.Error(err))
return err
}
hasCollection = has
return nil
}, retry.Attempts(100000)); err != nil {
log.Error("datacoord ServerHandler HasCollection finally failed")
panic("datacoord ServerHandler HasCollection finally failed")
}
return hasCollection, nil
}
// GetCollection returns collection info with specified collection id // GetCollection returns collection info with specified collection id
func (h *ServerHandler) GetCollection(ctx context.Context, collectionID UniqueID) (*collectionInfo, error) { func (h *ServerHandler) GetCollection(ctx context.Context, collectionID UniqueID) (*collectionInfo, error) {
coll := h.s.meta.GetCollection(collectionID) coll := h.s.meta.GetCollection(collectionID)
...@@ -332,9 +354,20 @@ func (h *ServerHandler) GetCollection(ctx context.Context, collectionID UniqueID ...@@ -332,9 +354,20 @@ func (h *ServerHandler) GetCollection(ctx context.Context, collectionID UniqueID
} }
// CheckShouldDropChannel returns whether specified channel is marked to be removed // CheckShouldDropChannel returns whether specified channel is marked to be removed
func (h *ServerHandler) CheckShouldDropChannel(channel string) bool { func (h *ServerHandler) CheckShouldDropChannel(channel string, collectionID UniqueID) bool {
return h.s.meta.catalog.ShouldDropChannel(h.s.ctx, channel) || if h.s.meta.catalog.ShouldDropChannel(h.s.ctx, channel) {
!h.s.meta.catalog.ChannelExists(h.s.ctx, channel) return true
}
// collectionID parse from channelName
has, err := h.HasCollection(h.s.ctx, collectionID)
if err != nil {
log.Info("datacoord ServerHandler CheckShouldDropChannel hasCollection failed", zap.Error(err))
return false
}
log.Info("datacoord ServerHandler CheckShouldDropChannel hasCollection", zap.Bool("shouldDropChannel", !has),
zap.String("channel", channel))
return !has
} }
// FinishDropChannel cleans up the remove flag for channels // FinishDropChannel cleans up the remove flag for channels
......
...@@ -860,7 +860,7 @@ func (h *mockHandler) GetDataVChanPositions(channel *channel, partitionID Unique ...@@ -860,7 +860,7 @@ func (h *mockHandler) GetDataVChanPositions(channel *channel, partitionID Unique
} }
} }
func (h *mockHandler) CheckShouldDropChannel(channel string) bool { func (h *mockHandler) CheckShouldDropChannel(channel string, collectionID UniqueID) bool {
return false return false
} }
......
...@@ -1016,6 +1016,54 @@ func (s *Server) loadCollectionFromRootCoord(ctx context.Context, collectionID i ...@@ -1016,6 +1016,54 @@ func (s *Server) loadCollectionFromRootCoord(ctx context.Context, collectionID i
return nil return nil
} }
// hasCollection communicates with RootCoord and check whether this collection exist from the user's perspective.
func (s *Server) hasCollection(ctx context.Context, collectionID int64) (bool, error) {
resp, err := s.rootCoordClient.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(commonpb.MsgType_DescribeCollection),
commonpbutil.WithSourceID(paramtable.GetNodeID()),
),
DbName: "",
CollectionID: collectionID,
})
if err != nil {
return false, err
}
if resp == nil {
return false, errNilResponse
}
if resp.Status.ErrorCode == commonpb.ErrorCode_Success {
return true, nil
}
if resp.Status.ErrorCode == commonpb.ErrorCode_CollectionNotExists {
return false, nil
}
return false, fmt.Errorf("code:%s, reason:%s", resp.Status.GetErrorCode().String(), resp.Status.GetReason())
}
// hasCollectionInternal communicates with RootCoord and check whether this collection's meta exist in rootcoord.
func (s *Server) hasCollectionInternal(ctx context.Context, collectionID int64) (bool, error) {
resp, err := s.rootCoordClient.DescribeCollectionInternal(ctx, &milvuspb.DescribeCollectionRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(commonpb.MsgType_DescribeCollection),
commonpbutil.WithSourceID(paramtable.GetNodeID()),
),
DbName: "",
CollectionID: collectionID,
})
if err != nil {
return false, err
}
if resp == nil {
return false, errNilResponse
}
if resp.Status.ErrorCode != commonpb.ErrorCode_Success {
return false, nil
}
return true, nil
}
func (s *Server) reCollectSegmentStats(ctx context.Context) { func (s *Server) reCollectSegmentStats(ctx context.Context) {
if s.channelManager == nil { if s.channelManager == nil {
log.Error("null channel manager found, which should NOT happen in non-testing environment") log.Error("null channel manager found, which should NOT happen in non-testing environment")
......
...@@ -60,6 +60,7 @@ import ( ...@@ -60,6 +60,7 @@ import (
"github.com/milvus-io/milvus/pkg/util/metautil" "github.com/milvus-io/milvus/pkg/util/metautil"
"github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/milvus-io/milvus/pkg/util/typeutil" "github.com/milvus-io/milvus/pkg/util/typeutil"
) )
...@@ -2352,7 +2353,30 @@ func TestGetQueryVChanPositions(t *testing.T) { ...@@ -2352,7 +2353,30 @@ func TestGetQueryVChanPositions(t *testing.T) {
} }
func TestShouldDropChannel(t *testing.T) { func TestShouldDropChannel(t *testing.T) {
svr := newTestServer(t, nil) type myRootCoord struct {
mocks.RootCoord
}
myRoot := &myRootCoord{}
myRoot.EXPECT().Init().Return(nil)
myRoot.EXPECT().Start().Return(nil)
myRoot.EXPECT().AllocTimestamp(mock.Anything, mock.Anything).Return(&rootcoordpb.AllocTimestampResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
Timestamp: tsoutil.ComposeTSByTime(time.Now(), 0),
Count: 1,
}, nil)
myRoot.EXPECT().AllocID(mock.Anything, mock.Anything).Return(&rootcoordpb.AllocIDResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
ID: int64(tsoutil.ComposeTSByTime(time.Now(), 0)),
Count: 1,
}, nil)
var crt rootCoordCreatorFunc = func(ctx context.Context, metaRoot string, etcdClient *clientv3.Client) (types.RootCoord, error) {
return myRoot, nil
}
opt := WithRootCoordCreator(crt)
svr := newTestServer(t, nil, opt)
defer closeTestServer(t, svr) defer closeTestServer(t, svr)
schema := newTestSchema() schema := newTestSchema()
svr.meta.AddCollection(&collectionInfo{ svr.meta.AddCollection(&collectionInfo{
...@@ -2375,122 +2399,53 @@ func TestShouldDropChannel(t *testing.T) { ...@@ -2375,122 +2399,53 @@ func TestShouldDropChannel(t *testing.T) {
}, },
}, },
}) })
/*
s1 := &datapb.SegmentInfo{
ID: 1,
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Dropped,
StartPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{8, 9, 10},
MsgGroup: "",
Timestamp: 0,
},
DmlPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 0,
},
}
s2 := &datapb.SegmentInfo{
ID: 2,
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
CompactionFrom: []int64{4, 5},
StartPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{8, 9, 10},
MsgGroup: "",
},
DmlPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 1,
},
}
s3 := &datapb.SegmentInfo{
ID: 3,
CollectionID: 0,
PartitionID: 1,
InsertChannel: "ch1",
State: commonpb.SegmentState_Growing,
StartPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{8, 9, 10},
MsgGroup: "",
},
DmlPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{11, 12, 13},
MsgGroup: "",
Timestamp: 2,
},
}
s4 := &datapb.SegmentInfo{
ID: 4,
CollectionID: 0,
PartitionID: 1,
InsertChannel: "ch1",
State: commonpb.SegmentState_Growing,
}*/
/*
t.Run("channel without segments", func(t *testing.T) {
r := svr.handler.CheckShouldDropChannel("ch1")
assert.True(t, r)
})
t.Run("channel with all dropped segments", func(t *testing.T) {
err := svr.meta.AddSegment(NewSegmentInfo(s1))
require.NoError(t, err)
r := svr.handler.CheckShouldDropChannel("ch1") t.Run("channel name not in kv, collection not exist", func(t *testing.T) {
assert.True(t, r) //myRoot.code = commonpb.ErrorCode_CollectionNotExists
myRoot.EXPECT().DescribeCollection(mock.Anything, mock.Anything).
Return(&milvuspb.DescribeCollectionResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_CollectionNotExists},
CollectionID: -1,
}, nil).Once()
assert.True(t, svr.handler.CheckShouldDropChannel("ch99", -1))
}) })
t.Run("channel with all dropped segments and flushed compacted segments", func(t *testing.T) { t.Run("channel name not in kv, collection exist", func(t *testing.T) {
err := svr.meta.AddSegment(NewSegmentInfo(s2)) myRoot.EXPECT().DescribeCollection(mock.Anything, mock.Anything).
require.Nil(t, err) Return(&milvuspb.DescribeCollectionResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
r := svr.handler.CheckShouldDropChannel("ch1") CollectionID: 0,
assert.False(t, r) }, nil).Once()
assert.False(t, svr.handler.CheckShouldDropChannel("ch99", 0))
}) })
t.Run("channel with other state segments", func(t *testing.T) { t.Run("collection name in kv, collection exist", func(t *testing.T) {
err := svr.meta.DropSegment(2) myRoot.EXPECT().DescribeCollection(mock.Anything, mock.Anything).
require.Nil(t, err) Return(&milvuspb.DescribeCollectionResponse{
err = svr.meta.AddSegment(NewSegmentInfo(s3)) Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
require.Nil(t, err) CollectionID: 0,
}, nil).Once()
r := svr.handler.CheckShouldDropChannel("ch1") assert.False(t, svr.handler.CheckShouldDropChannel("ch1", 0))
assert.False(t, r)
}) })
t.Run("channel with dropped segment and with segment without start position", func(t *testing.T) { t.Run("collection name in kv, collection not exist", func(t *testing.T) {
err := svr.meta.DropSegment(3) myRoot.EXPECT().DescribeCollection(mock.Anything, mock.Anything).
require.Nil(t, err) Return(&milvuspb.DescribeCollectionResponse{
err = svr.meta.AddSegment(NewSegmentInfo(s4)) Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_CollectionNotExists},
require.Nil(t, err) CollectionID: -1,
}, nil).Once()
r := svr.handler.CheckShouldDropChannel("ch1") assert.True(t, svr.handler.CheckShouldDropChannel("ch1", -1))
assert.True(t, r)
})
*/
t.Run("channel name not in kv", func(t *testing.T) {
assert.True(t, svr.handler.CheckShouldDropChannel("ch99"))
}) })
t.Run("channel in remove flag", func(t *testing.T) { t.Run("channel in remove flag, collection exist", func(t *testing.T) {
err := svr.meta.catalog.MarkChannelDeleted(context.TODO(), "ch1") err := svr.meta.catalog.MarkChannelDeleted(context.TODO(), "ch1")
require.NoError(t, err) require.NoError(t, err)
myRoot.EXPECT().DescribeCollection(mock.Anything, mock.Anything).
assert.True(t, svr.handler.CheckShouldDropChannel("ch1")) Return(&milvuspb.DescribeCollectionResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
CollectionID: 0,
}, nil).Once()
assert.True(t, svr.handler.CheckShouldDropChannel("ch1", 0))
}) })
} }
...@@ -3956,7 +3911,7 @@ func newTestServer(t *testing.T, receiveCh chan any, opts ...Option) *Server { ...@@ -3956,7 +3911,7 @@ func newTestServer(t *testing.T, receiveCh chan any, opts ...Option) *Server {
_, err = etcdCli.Delete(context.Background(), sessKey, clientv3.WithPrefix()) _, err = etcdCli.Delete(context.Background(), sessKey, clientv3.WithPrefix())
assert.Nil(t, err) assert.Nil(t, err)
svr := CreateServer(context.TODO(), factory, opts...) svr := CreateServer(context.TODO(), factory)
svr.SetEtcdClient(etcdCli) svr.SetEtcdClient(etcdCli)
svr.dataNodeCreator = func(ctx context.Context, addr string) (types.DataNode, error) { svr.dataNodeCreator = func(ctx context.Context, addr string) (types.DataNode, error) {
return newMockDataNodeClient(0, receiveCh) return newMockDataNodeClient(0, receiveCh)
...@@ -3965,6 +3920,10 @@ func newTestServer(t *testing.T, receiveCh chan any, opts ...Option) *Server { ...@@ -3965,6 +3920,10 @@ func newTestServer(t *testing.T, receiveCh chan any, opts ...Option) *Server {
return newMockRootCoordService(), nil return newMockRootCoordService(), nil
} }
for _, opt := range opts {
opt(svr)
}
err = svr.Init() err = svr.Init()
assert.Nil(t, err) assert.Nil(t, err)
if Params.DataCoordCfg.EnableActiveStandby.GetAsBool() { if Params.DataCoordCfg.EnableActiveStandby.GetAsBool() {
......
...@@ -209,10 +209,10 @@ func TestFlowGraphManager(t *testing.T) { ...@@ -209,10 +209,10 @@ func TestFlowGraphManager(t *testing.T) {
fm.dropAll() fm.dropAll()
const channelPrefix = "by-dev-rootcoord-dml-test-fg-mgr-execute-" const channelPrefix = "by-dev-rootcoord-dml-test-fg-mgr-execute-"
for _, test := range tests {
var baseParams = &Params.BaseTable var baseParams = &Params.BaseTable
baseParams.Save(Params.DataNodeCfg.MemoryWatermark.Key, fmt.Sprintf("%f", test.watermark))
baseParams.Save(Params.DataNodeCfg.MemoryForceSyncEnable.Key, fmt.Sprintf("%t", true)) baseParams.Save(Params.DataNodeCfg.MemoryForceSyncEnable.Key, fmt.Sprintf("%t", true))
for _, test := range tests {
baseParams.Save(Params.DataNodeCfg.MemoryWatermark.Key, fmt.Sprintf("%f", test.watermark))
for i, memorySize := range test.memorySizes { for i, memorySize := range test.memorySizes {
vchannel := fmt.Sprintf("%s%d", channelPrefix, i) vchannel := fmt.Sprintf("%s%d", channelPrefix, i)
vchan := &datapb.VchannelInfo{ vchan := &datapb.VchannelInfo{
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册