未验证 提交 39d31f8b 编写于 作者: C congqixia 提交者: GitHub

Trigger checker while waiting collection/partition released (#24523)

Signed-off-by: NCongqi Xia <congqi.xia@zilliz.com>
上级 b09e7aea
......@@ -23,6 +23,7 @@ import (
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/checkers"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
"github.com/milvus-io/milvus/internal/querycoordv2/observers"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
......@@ -33,13 +34,14 @@ import (
type ReleaseCollectionJob struct {
*BaseJob
req *querypb.ReleaseCollectionRequest
dist *meta.DistributionManager
meta *meta.Meta
broker meta.Broker
cluster session.Cluster
targetMgr *meta.TargetManager
targetObserver *observers.TargetObserver
req *querypb.ReleaseCollectionRequest
dist *meta.DistributionManager
meta *meta.Meta
broker meta.Broker
cluster session.Cluster
targetMgr *meta.TargetManager
targetObserver *observers.TargetObserver
checkerController *checkers.CheckerController
}
func NewReleaseCollectionJob(ctx context.Context,
......@@ -50,16 +52,18 @@ func NewReleaseCollectionJob(ctx context.Context,
cluster session.Cluster,
targetMgr *meta.TargetManager,
targetObserver *observers.TargetObserver,
checkerController *checkers.CheckerController,
) *ReleaseCollectionJob {
return &ReleaseCollectionJob{
BaseJob: NewBaseJob(ctx, req.Base.GetMsgID(), req.GetCollectionID()),
req: req,
dist: dist,
meta: meta,
broker: broker,
cluster: cluster,
targetMgr: targetMgr,
targetObserver: targetObserver,
BaseJob: NewBaseJob(ctx, req.Base.GetMsgID(), req.GetCollectionID()),
req: req,
dist: dist,
meta: meta,
broker: broker,
cluster: cluster,
targetMgr: targetMgr,
targetObserver: targetObserver,
checkerController: checkerController,
}
}
......@@ -93,7 +97,7 @@ func (job *ReleaseCollectionJob) Execute() error {
job.targetMgr.RemoveCollection(req.GetCollectionID())
job.targetObserver.ReleaseCollection(req.GetCollectionID())
waitCollectionReleased(job.dist, req.GetCollectionID())
waitCollectionReleased(job.dist, job.checkerController, req.GetCollectionID())
metrics.QueryCoordNumCollections.WithLabelValues().Dec()
metrics.QueryCoordNumPartitions.WithLabelValues().Sub(float64(len(toRelease)))
metrics.QueryCoordReleaseCount.WithLabelValues(metrics.TotalLabel).Inc()
......@@ -105,13 +109,14 @@ type ReleasePartitionJob struct {
*BaseJob
releasePartitionsOnly bool
req *querypb.ReleasePartitionsRequest
dist *meta.DistributionManager
meta *meta.Meta
broker meta.Broker
cluster session.Cluster
targetMgr *meta.TargetManager
targetObserver *observers.TargetObserver
req *querypb.ReleasePartitionsRequest
dist *meta.DistributionManager
meta *meta.Meta
broker meta.Broker
cluster session.Cluster
targetMgr *meta.TargetManager
targetObserver *observers.TargetObserver
checkerController *checkers.CheckerController
}
func NewReleasePartitionJob(ctx context.Context,
......@@ -122,16 +127,18 @@ func NewReleasePartitionJob(ctx context.Context,
cluster session.Cluster,
targetMgr *meta.TargetManager,
targetObserver *observers.TargetObserver,
checkerController *checkers.CheckerController,
) *ReleasePartitionJob {
return &ReleasePartitionJob{
BaseJob: NewBaseJob(ctx, req.Base.GetMsgID(), req.GetCollectionID()),
req: req,
dist: dist,
meta: meta,
broker: broker,
cluster: cluster,
targetMgr: targetMgr,
targetObserver: targetObserver,
BaseJob: NewBaseJob(ctx, req.Base.GetMsgID(), req.GetCollectionID()),
req: req,
dist: dist,
meta: meta,
broker: broker,
cluster: cluster,
targetMgr: targetMgr,
targetObserver: targetObserver,
checkerController: checkerController,
}
}
......@@ -175,7 +182,7 @@ func (job *ReleasePartitionJob) Execute() error {
job.targetMgr.RemoveCollection(req.GetCollectionID())
job.targetObserver.ReleaseCollection(req.GetCollectionID())
metrics.QueryCoordNumCollections.WithLabelValues().Dec()
waitCollectionReleased(job.dist, req.GetCollectionID())
waitCollectionReleased(job.dist, job.checkerController, req.GetCollectionID())
} else {
err := job.meta.CollectionManager.RemovePartition(toRelease...)
if err != nil {
......@@ -184,7 +191,7 @@ func (job *ReleasePartitionJob) Execute() error {
return utils.WrapError(msg, err)
}
job.targetMgr.RemovePartition(req.GetCollectionID(), toRelease...)
waitCollectionReleased(job.dist, req.GetCollectionID(), toRelease...)
waitCollectionReleased(job.dist, job.checkerController, req.GetCollectionID(), toRelease...)
}
metrics.QueryCoordNumPartitions.WithLabelValues().Sub(float64(len(toRelease)))
return nil
......
......@@ -31,6 +31,7 @@ import (
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/checkers"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
"github.com/milvus-io/milvus/internal/querycoordv2/observers"
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
......@@ -55,15 +56,16 @@ type JobSuite struct {
loadTypes map[int64]querypb.LoadType
// Dependencies
kv kv.MetaKv
store meta.Store
dist *meta.DistributionManager
meta *meta.Meta
cluster *session.MockCluster
targetMgr *meta.TargetManager
targetObserver *observers.TargetObserver
broker *meta.MockBroker
nodeMgr *session.NodeManager
kv kv.MetaKv
store meta.Store
dist *meta.DistributionManager
meta *meta.Meta
cluster *session.MockCluster
targetMgr *meta.TargetManager
targetObserver *observers.TargetObserver
broker *meta.MockBroker
nodeMgr *session.NodeManager
checkerController *checkers.CheckerController
// Test objects
scheduler *Scheduler
......@@ -174,6 +176,8 @@ func (suite *JobSuite) SetupTest() {
suite.NoError(err)
err = suite.meta.AssignNode(meta.DefaultResourceGroupName, 3000)
suite.NoError(err)
suite.checkerController = &checkers.CheckerController{}
}
func (suite *JobSuite) TearDownTest() {
......@@ -856,6 +860,7 @@ func (suite *JobSuite) TestReleaseCollection() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.checkerController,
)
suite.scheduler.Add(job)
err := job.Wait()
......@@ -877,6 +882,7 @@ func (suite *JobSuite) TestReleaseCollection() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.checkerController,
)
suite.scheduler.Add(job)
err := job.Wait()
......@@ -905,6 +911,7 @@ func (suite *JobSuite) TestReleasePartition() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.checkerController,
)
suite.scheduler.Add(job)
err := job.Wait()
......@@ -927,6 +934,7 @@ func (suite *JobSuite) TestReleasePartition() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.checkerController,
)
suite.scheduler.Add(job)
err := job.Wait()
......@@ -951,6 +959,7 @@ func (suite *JobSuite) TestReleasePartition() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.checkerController,
)
suite.scheduler.Add(job)
err := job.Wait()
......@@ -983,6 +992,7 @@ func (suite *JobSuite) TestDynamicRelease() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.checkerController,
)
return job
}
......@@ -999,6 +1009,7 @@ func (suite *JobSuite) TestDynamicRelease() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.checkerController,
)
return job
}
......@@ -1295,6 +1306,7 @@ func (suite *JobSuite) TestCallReleasePartitionFailed() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.checkerController,
)
suite.scheduler.Add(releaseCollectionJob)
err := releaseCollectionJob.Wait()
......@@ -1313,6 +1325,7 @@ func (suite *JobSuite) TestCallReleasePartitionFailed() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.checkerController,
)
suite.scheduler.Add(releasePartitionJob)
err = releasePartitionJob.Wait()
......@@ -1451,6 +1464,7 @@ func (suite *JobSuite) releaseAll() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.checkerController,
)
suite.scheduler.Add(job)
err := job.Wait()
......
......@@ -26,6 +26,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus-proto/go-api/schemapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/checkers"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/pkg/log"
......@@ -36,7 +37,7 @@ import (
// waitCollectionReleased blocks until
// all channels and segments of given collection(partitions) are released,
// empty partition list means wait for collection released
func waitCollectionReleased(dist *meta.DistributionManager, collection int64, partitions ...int64) {
func waitCollectionReleased(dist *meta.DistributionManager, checkerController *checkers.CheckerController, collection int64, partitions ...int64) {
partitionSet := typeutil.NewUniqueSet(partitions...)
for {
var (
......@@ -55,6 +56,8 @@ func waitCollectionReleased(dist *meta.DistributionManager, collection int64, pa
break
}
// trigger check more frequently
checkerController.Check()
time.Sleep(200 * time.Millisecond)
}
}
......
......@@ -276,6 +276,7 @@ func (s *Server) ReleaseCollection(ctx context.Context, req *querypb.ReleaseColl
s.cluster,
s.targetMgr,
s.targetObserver,
s.checkerController,
)
s.jobScheduler.Add(releaseJob)
err := releaseJob.Wait()
......@@ -400,6 +401,7 @@ func (s *Server) ReleasePartitions(ctx context.Context, req *querypb.ReleasePart
s.cluster,
s.targetMgr,
s.targetObserver,
s.checkerController,
)
s.jobScheduler.Add(releaseJob)
err := releaseJob.Wait()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册