未验证 提交 bc62ca1f 编写于 作者: C congqixia 提交者: GitHub

Release collection resources when all partition released (#18569)

Signed-off-by: NCongqi Xia <congqi.xia@zilliz.com>
上级 d62381af
......@@ -31,6 +31,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/internalpb"
queryPb "github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
type task interface {
......@@ -594,6 +595,7 @@ func (r *releaseCollectionTask) Execute(ctx context.Context) error {
r.node.metaReplica.removeExcludedSegments(r.req.CollectionID)
r.node.queryShardService.releaseCollection(r.req.CollectionID)
r.node.ShardClusterService.releaseCollection(r.req.CollectionID)
err = r.node.metaReplica.removeCollection(r.req.CollectionID)
if err != nil {
return err
......@@ -607,29 +609,84 @@ func (r *releaseCollectionTask) Execute(ctx context.Context) error {
// releasePartitionsTask
func (r *releasePartitionsTask) Execute(ctx context.Context) error {
log.Info("Execute release partition task",
zap.Any("collectionID", r.req.CollectionID),
zap.Any("partitionIDs", r.req.PartitionIDs))
zap.Int64("collectionID", r.req.GetCollectionID()),
zap.Int64s("partitionIDs", r.req.GetPartitionIDs()))
_, err := r.node.metaReplica.getCollectionByID(r.req.CollectionID)
coll, err := r.node.metaReplica.getCollectionByID(r.req.CollectionID)
if err != nil {
return fmt.Errorf("release partitions failed, collectionID = %d, err = %s", r.req.CollectionID, err)
// skip error if collection not found, do clean up job below
log.Warn("failed to get collection for release partitions", zap.Int64("collectionID", r.req.GetCollectionID()),
zap.Int64s("partitionIDs", r.req.GetPartitionIDs()))
}
log.Info("start release partition", zap.Any("collectionID", r.req.CollectionID))
log.Info("start release partition", zap.Int64("collectionID", r.req.GetCollectionID()), zap.Int64s("partitionIDs", r.req.GetPartitionIDs()))
for _, id := range r.req.PartitionIDs {
// remove partition from streaming and historical
hasPartition := r.node.metaReplica.hasPartition(id)
if hasPartition {
err := r.node.metaReplica.removePartition(id)
if err != nil {
// not return, try to release all partitions
log.Warn(err.Error())
// shall be false if coll is nil
releaseAll := r.isAllPartitionsReleased(coll)
if releaseAll {
// set release time
log.Info("set release time", zap.Int64("collectionID", r.req.CollectionID))
coll.setReleaseTime(r.req.Base.Timestamp, true)
// remove all flow graphs of the target collection
vChannels := coll.getVChannels()
vDeltaChannels := coll.getVDeltaChannels()
r.node.dataSyncService.removeFlowGraphsByDMLChannels(vChannels)
r.node.dataSyncService.removeFlowGraphsByDeltaChannels(vDeltaChannels)
// remove all tSafes of the target collection
for _, channel := range vChannels {
r.node.tSafeReplica.removeTSafe(channel)
}
for _, channel := range vDeltaChannels {
r.node.tSafeReplica.removeTSafe(channel)
}
log.Info("Release tSafe in releaseCollectionTask",
zap.Int64("collectionID", r.req.CollectionID),
zap.Strings("vChannels", vChannels),
zap.Strings("vDeltaChannels", vDeltaChannels),
)
r.node.metaReplica.removeExcludedSegments(r.req.CollectionID)
r.node.queryShardService.releaseCollection(r.req.CollectionID)
r.node.ShardClusterService.releaseCollection(r.req.CollectionID)
err = r.node.metaReplica.removeCollection(r.req.CollectionID)
if err != nil {
log.Warn("failed to remove collection", zap.Int64("collectionID", r.req.GetCollectionID()),
zap.Int64s("partitionIDs", r.req.GetPartitionIDs()), zap.Error(err))
}
} else {
for _, id := range r.req.PartitionIDs {
// remove partition from streaming and historical
hasPartition := r.node.metaReplica.hasPartition(id)
if hasPartition {
err := r.node.metaReplica.removePartition(id)
if err != nil {
// not return, try to release all partitions
log.Warn(err.Error())
}
}
}
}
log.Info("Release partition task done",
zap.Any("collectionID", r.req.CollectionID),
zap.Any("partitionIDs", r.req.PartitionIDs))
zap.Int64("collectionID", r.req.CollectionID),
zap.Int64s("partitionIDs", r.req.PartitionIDs))
return nil
}
func (r *releasePartitionsTask) isAllPartitionsReleased(coll *Collection) bool {
if coll == nil {
return false
}
if len(r.req.GetPartitionIDs()) < len(coll.partitionIDs) && len(coll.partitionIDs) > 0 {
return false
}
parts := make(typeutil.UniqueSet)
for _, partID := range r.req.GetPartitionIDs() {
parts.Insert(partID)
}
return parts.Contain(coll.partitionIDs...)
}
......@@ -21,6 +21,9 @@ import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/milvus-io/milvus/internal/mq/msgstream"
"github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper/rmq"
"github.com/milvus-io/milvus/internal/proto/commonpb"
......@@ -30,7 +33,6 @@ import (
"github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/stretchr/testify/assert"
)
func TestTask_watchDmChannelsTask(t *testing.T) {
......@@ -746,6 +748,28 @@ func TestTask_releasePartitionTask(t *testing.T) {
assert.NoError(t, err)
})
t.Run("test isAllPartitionsReleased", func(t *testing.T) {
node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err)
task := releasePartitionsTask{
req: genReleasePartitionsRequest(),
node: node,
}
coll, err := node.metaReplica.getCollectionByID(defaultCollectionID)
require.NoError(t, err)
assert.False(t, task.isAllPartitionsReleased(nil))
assert.True(t, task.isAllPartitionsReleased(coll))
node.metaReplica.addPartition(defaultCollectionID, -1)
assert.False(t, task.isAllPartitionsReleased(coll))
node.metaReplica.removePartition(defaultPartitionID)
node.metaReplica.removePartition(-1)
assert.True(t, task.isAllPartitionsReleased(coll))
})
t.Run("test execute", func(t *testing.T) {
node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err)
......@@ -776,7 +800,37 @@ func TestTask_releasePartitionTask(t *testing.T) {
assert.NoError(t, err)
err = task.Execute(ctx)
assert.Error(t, err)
assert.NoError(t, err)
})
t.Run("test execute no partition", func(t *testing.T) {
node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err)
task := releasePartitionsTask{
req: genReleasePartitionsRequest(),
node: node,
}
err = node.metaReplica.removePartition(defaultPartitionID)
assert.NoError(t, err)
err = task.Execute(ctx)
assert.NoError(t, err)
})
t.Run("test execute non-exist partition", func(t *testing.T) {
node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err)
req := genReleasePartitionsRequest()
req.PartitionIDs = []int64{-1}
task := releasePartitionsTask{
req: req,
node: node,
}
err = task.Execute(ctx)
assert.NoError(t, err)
})
t.Run("test execute remove deltaVChannel", func(t *testing.T) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册