From 71a7fef5c52a84328a01722b902139518b4d85ff Mon Sep 17 00:00:00 2001 From: yah01 Date: Fri, 26 May 2023 17:33:27 +0800 Subject: [PATCH] Consume threads of the number of nq (#24410) Signed-off-by: yah01 --- internal/querynodev2/tasks/scheduler.go | 22 +++++++++++++++++++++- internal/querynodev2/tasks/task.go | 5 +++++ 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/internal/querynodev2/tasks/scheduler.go b/internal/querynodev2/tasks/scheduler.go index 404d5aea8..fd78c9803 100644 --- a/internal/querynodev2/tasks/scheduler.go +++ b/internal/querynodev2/tasks/scheduler.go @@ -3,6 +3,7 @@ package tasks import ( "context" "fmt" + "sync" "go.uber.org/atomic" @@ -25,7 +26,9 @@ type Scheduler struct { queryProcessQueue chan *QueryTask queryWaitQueue chan *QueryTask - pool *conc.Pool[any] + pool *conc.Pool[any] + runningThreadNum int + cond *sync.Cond } func NewScheduler() *Scheduler { @@ -39,6 +42,7 @@ func NewScheduler() *Scheduler { // queryProcessQueue: make(chan), pool: conc.NewPool[any](maxReadConcurrency, ants.WithPreAlloc(true)), + cond: sync.NewCond(&sync.Mutex{}), } } @@ -151,7 +155,23 @@ func (s *Scheduler) processAll(ctx context.Context) { } func (s *Scheduler) process(t Task) { + s.cond.L.Lock() + for s.runningThreadNum >= s.pool.Cap() { + s.cond.Wait() + } + s.runningThreadNum += t.Weight() + s.cond.L.Unlock() + s.pool.Submit(func() (any, error) { + defer func() { + s.cond.L.Lock() + defer s.cond.L.Unlock() + s.runningThreadNum -= t.Weight() + if s.runningThreadNum < s.pool.Cap() { + s.cond.Broadcast() + } + }() + metrics.QueryNodeReadTaskConcurrency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Inc() err := t.Execute() diff --git a/internal/querynodev2/tasks/task.go b/internal/querynodev2/tasks/task.go index cc1f5f5d8..322e08b35 100644 --- a/internal/querynodev2/tasks/task.go +++ b/internal/querynodev2/tasks/task.go @@ -25,6 +25,7 @@ type Task interface { Done(err error) Canceled() error Wait() error + Weight() int } type SearchTask struct { @@ -235,6 +236,10 @@ func (t *SearchTask) Wait() error { return <-t.notifier } +func (t *SearchTask) Weight() int { + return int(t.nq) +} + func (t *SearchTask) Result() *internalpb.SearchResults { return t.result } -- GitLab