From 39d31f8bbf8bb133d72fd4c54269810141e94a63 Mon Sep 17 00:00:00 2001 From: congqixia Date: Tue, 30 May 2023 17:41:28 +0800 Subject: [PATCH] Trigger checker while waiting collection/partition released (#24523) Signed-off-by: Congqi Xia --- internal/querycoordv2/job/job_release.go | 73 +++++++++++++----------- internal/querycoordv2/job/job_test.go | 32 ++++++++--- internal/querycoordv2/job/utils.go | 5 +- internal/querycoordv2/services.go | 2 + 4 files changed, 69 insertions(+), 43 deletions(-) diff --git a/internal/querycoordv2/job/job_release.go b/internal/querycoordv2/job/job_release.go index f25d7c132..550beb4a7 100644 --- a/internal/querycoordv2/job/job_release.go +++ b/internal/querycoordv2/job/job_release.go @@ -23,6 +23,7 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/internal/querycoordv2/checkers" "github.com/milvus-io/milvus/internal/querycoordv2/meta" "github.com/milvus-io/milvus/internal/querycoordv2/observers" "github.com/milvus-io/milvus/internal/querycoordv2/session" @@ -33,13 +34,14 @@ import ( type ReleaseCollectionJob struct { *BaseJob - req *querypb.ReleaseCollectionRequest - dist *meta.DistributionManager - meta *meta.Meta - broker meta.Broker - cluster session.Cluster - targetMgr *meta.TargetManager - targetObserver *observers.TargetObserver + req *querypb.ReleaseCollectionRequest + dist *meta.DistributionManager + meta *meta.Meta + broker meta.Broker + cluster session.Cluster + targetMgr *meta.TargetManager + targetObserver *observers.TargetObserver + checkerController *checkers.CheckerController } func NewReleaseCollectionJob(ctx context.Context, @@ -50,16 +52,18 @@ func NewReleaseCollectionJob(ctx context.Context, cluster session.Cluster, targetMgr *meta.TargetManager, targetObserver *observers.TargetObserver, + checkerController *checkers.CheckerController, ) *ReleaseCollectionJob { return &ReleaseCollectionJob{ - BaseJob: NewBaseJob(ctx, req.Base.GetMsgID(), req.GetCollectionID()), - req: req, - dist: dist, - meta: meta, - broker: broker, - cluster: cluster, - targetMgr: targetMgr, - targetObserver: targetObserver, + BaseJob: NewBaseJob(ctx, req.Base.GetMsgID(), req.GetCollectionID()), + req: req, + dist: dist, + meta: meta, + broker: broker, + cluster: cluster, + targetMgr: targetMgr, + targetObserver: targetObserver, + checkerController: checkerController, } } @@ -93,7 +97,7 @@ func (job *ReleaseCollectionJob) Execute() error { job.targetMgr.RemoveCollection(req.GetCollectionID()) job.targetObserver.ReleaseCollection(req.GetCollectionID()) - waitCollectionReleased(job.dist, req.GetCollectionID()) + waitCollectionReleased(job.dist, job.checkerController, req.GetCollectionID()) metrics.QueryCoordNumCollections.WithLabelValues().Dec() metrics.QueryCoordNumPartitions.WithLabelValues().Sub(float64(len(toRelease))) metrics.QueryCoordReleaseCount.WithLabelValues(metrics.TotalLabel).Inc() @@ -105,13 +109,14 @@ type ReleasePartitionJob struct { *BaseJob releasePartitionsOnly bool - req *querypb.ReleasePartitionsRequest - dist *meta.DistributionManager - meta *meta.Meta - broker meta.Broker - cluster session.Cluster - targetMgr *meta.TargetManager - targetObserver *observers.TargetObserver + req *querypb.ReleasePartitionsRequest + dist *meta.DistributionManager + meta *meta.Meta + broker meta.Broker + cluster session.Cluster + targetMgr *meta.TargetManager + targetObserver *observers.TargetObserver + checkerController *checkers.CheckerController } func NewReleasePartitionJob(ctx context.Context, @@ -122,16 +127,18 @@ func NewReleasePartitionJob(ctx context.Context, cluster session.Cluster, targetMgr *meta.TargetManager, targetObserver *observers.TargetObserver, + checkerController *checkers.CheckerController, ) *ReleasePartitionJob { return &ReleasePartitionJob{ - BaseJob: NewBaseJob(ctx, req.Base.GetMsgID(), req.GetCollectionID()), - req: req, - dist: dist, - meta: meta, - broker: broker, - cluster: cluster, - targetMgr: targetMgr, - targetObserver: targetObserver, + BaseJob: NewBaseJob(ctx, req.Base.GetMsgID(), req.GetCollectionID()), + req: req, + dist: dist, + meta: meta, + broker: broker, + cluster: cluster, + targetMgr: targetMgr, + targetObserver: targetObserver, + checkerController: checkerController, } } @@ -175,7 +182,7 @@ func (job *ReleasePartitionJob) Execute() error { job.targetMgr.RemoveCollection(req.GetCollectionID()) job.targetObserver.ReleaseCollection(req.GetCollectionID()) metrics.QueryCoordNumCollections.WithLabelValues().Dec() - waitCollectionReleased(job.dist, req.GetCollectionID()) + waitCollectionReleased(job.dist, job.checkerController, req.GetCollectionID()) } else { err := job.meta.CollectionManager.RemovePartition(toRelease...) if err != nil { @@ -184,7 +191,7 @@ func (job *ReleasePartitionJob) Execute() error { return utils.WrapError(msg, err) } job.targetMgr.RemovePartition(req.GetCollectionID(), toRelease...) - waitCollectionReleased(job.dist, req.GetCollectionID(), toRelease...) + waitCollectionReleased(job.dist, job.checkerController, req.GetCollectionID(), toRelease...) } metrics.QueryCoordNumPartitions.WithLabelValues().Sub(float64(len(toRelease))) return nil diff --git a/internal/querycoordv2/job/job_test.go b/internal/querycoordv2/job/job_test.go index 6093f06cd..1d8e364c3 100644 --- a/internal/querycoordv2/job/job_test.go +++ b/internal/querycoordv2/job/job_test.go @@ -31,6 +31,7 @@ import ( etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/internal/querycoordv2/checkers" "github.com/milvus-io/milvus/internal/querycoordv2/meta" "github.com/milvus-io/milvus/internal/querycoordv2/observers" . "github.com/milvus-io/milvus/internal/querycoordv2/params" @@ -55,15 +56,16 @@ type JobSuite struct { loadTypes map[int64]querypb.LoadType // Dependencies - kv kv.MetaKv - store meta.Store - dist *meta.DistributionManager - meta *meta.Meta - cluster *session.MockCluster - targetMgr *meta.TargetManager - targetObserver *observers.TargetObserver - broker *meta.MockBroker - nodeMgr *session.NodeManager + kv kv.MetaKv + store meta.Store + dist *meta.DistributionManager + meta *meta.Meta + cluster *session.MockCluster + targetMgr *meta.TargetManager + targetObserver *observers.TargetObserver + broker *meta.MockBroker + nodeMgr *session.NodeManager + checkerController *checkers.CheckerController // Test objects scheduler *Scheduler @@ -174,6 +176,8 @@ func (suite *JobSuite) SetupTest() { suite.NoError(err) err = suite.meta.AssignNode(meta.DefaultResourceGroupName, 3000) suite.NoError(err) + + suite.checkerController = &checkers.CheckerController{} } func (suite *JobSuite) TearDownTest() { @@ -856,6 +860,7 @@ func (suite *JobSuite) TestReleaseCollection() { suite.cluster, suite.targetMgr, suite.targetObserver, + suite.checkerController, ) suite.scheduler.Add(job) err := job.Wait() @@ -877,6 +882,7 @@ func (suite *JobSuite) TestReleaseCollection() { suite.cluster, suite.targetMgr, suite.targetObserver, + suite.checkerController, ) suite.scheduler.Add(job) err := job.Wait() @@ -905,6 +911,7 @@ func (suite *JobSuite) TestReleasePartition() { suite.cluster, suite.targetMgr, suite.targetObserver, + suite.checkerController, ) suite.scheduler.Add(job) err := job.Wait() @@ -927,6 +934,7 @@ func (suite *JobSuite) TestReleasePartition() { suite.cluster, suite.targetMgr, suite.targetObserver, + suite.checkerController, ) suite.scheduler.Add(job) err := job.Wait() @@ -951,6 +959,7 @@ func (suite *JobSuite) TestReleasePartition() { suite.cluster, suite.targetMgr, suite.targetObserver, + suite.checkerController, ) suite.scheduler.Add(job) err := job.Wait() @@ -983,6 +992,7 @@ func (suite *JobSuite) TestDynamicRelease() { suite.cluster, suite.targetMgr, suite.targetObserver, + suite.checkerController, ) return job } @@ -999,6 +1009,7 @@ func (suite *JobSuite) TestDynamicRelease() { suite.cluster, suite.targetMgr, suite.targetObserver, + suite.checkerController, ) return job } @@ -1295,6 +1306,7 @@ func (suite *JobSuite) TestCallReleasePartitionFailed() { suite.cluster, suite.targetMgr, suite.targetObserver, + suite.checkerController, ) suite.scheduler.Add(releaseCollectionJob) err := releaseCollectionJob.Wait() @@ -1313,6 +1325,7 @@ func (suite *JobSuite) TestCallReleasePartitionFailed() { suite.cluster, suite.targetMgr, suite.targetObserver, + suite.checkerController, ) suite.scheduler.Add(releasePartitionJob) err = releasePartitionJob.Wait() @@ -1451,6 +1464,7 @@ func (suite *JobSuite) releaseAll() { suite.cluster, suite.targetMgr, suite.targetObserver, + suite.checkerController, ) suite.scheduler.Add(job) err := job.Wait() diff --git a/internal/querycoordv2/job/utils.go b/internal/querycoordv2/job/utils.go index 7a58c6569..b2c0cdb75 100644 --- a/internal/querycoordv2/job/utils.go +++ b/internal/querycoordv2/job/utils.go @@ -26,6 +26,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/commonpb" "github.com/milvus-io/milvus-proto/go-api/schemapb" "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/internal/querycoordv2/checkers" "github.com/milvus-io/milvus/internal/querycoordv2/meta" "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/pkg/log" @@ -36,7 +37,7 @@ import ( // waitCollectionReleased blocks until // all channels and segments of given collection(partitions) are released, // empty partition list means wait for collection released -func waitCollectionReleased(dist *meta.DistributionManager, collection int64, partitions ...int64) { +func waitCollectionReleased(dist *meta.DistributionManager, checkerController *checkers.CheckerController, collection int64, partitions ...int64) { partitionSet := typeutil.NewUniqueSet(partitions...) for { var ( @@ -55,6 +56,8 @@ func waitCollectionReleased(dist *meta.DistributionManager, collection int64, pa break } + // trigger check more frequently + checkerController.Check() time.Sleep(200 * time.Millisecond) } } diff --git a/internal/querycoordv2/services.go b/internal/querycoordv2/services.go index 2f43dc17f..509070bc5 100644 --- a/internal/querycoordv2/services.go +++ b/internal/querycoordv2/services.go @@ -276,6 +276,7 @@ func (s *Server) ReleaseCollection(ctx context.Context, req *querypb.ReleaseColl s.cluster, s.targetMgr, s.targetObserver, + s.checkerController, ) s.jobScheduler.Add(releaseJob) err := releaseJob.Wait() @@ -400,6 +401,7 @@ func (s *Server) ReleasePartitions(ctx context.Context, req *querypb.ReleasePart s.cluster, s.targetMgr, s.targetObserver, + s.checkerController, ) s.jobScheduler.Add(releaseJob) err := releaseJob.Wait() -- GitLab