未验证 提交 71a7fef5 编写于 作者: Y yah01 提交者: GitHub

Consume threads of the number of nq (#24410)

Signed-off-by: Nyah01 <yang.cen@zilliz.com>
上级 6209d5d7
......@@ -3,6 +3,7 @@ package tasks
import (
"context"
"fmt"
"sync"
"go.uber.org/atomic"
......@@ -25,7 +26,9 @@ type Scheduler struct {
queryProcessQueue chan *QueryTask
queryWaitQueue chan *QueryTask
pool *conc.Pool[any]
pool *conc.Pool[any]
runningThreadNum int
cond *sync.Cond
}
func NewScheduler() *Scheduler {
......@@ -39,6 +42,7 @@ func NewScheduler() *Scheduler {
// queryProcessQueue: make(chan),
pool: conc.NewPool[any](maxReadConcurrency, ants.WithPreAlloc(true)),
cond: sync.NewCond(&sync.Mutex{}),
}
}
......@@ -151,7 +155,23 @@ func (s *Scheduler) processAll(ctx context.Context) {
}
func (s *Scheduler) process(t Task) {
s.cond.L.Lock()
for s.runningThreadNum >= s.pool.Cap() {
s.cond.Wait()
}
s.runningThreadNum += t.Weight()
s.cond.L.Unlock()
s.pool.Submit(func() (any, error) {
defer func() {
s.cond.L.Lock()
defer s.cond.L.Unlock()
s.runningThreadNum -= t.Weight()
if s.runningThreadNum < s.pool.Cap() {
s.cond.Broadcast()
}
}()
metrics.QueryNodeReadTaskConcurrency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Inc()
err := t.Execute()
......
......@@ -25,6 +25,7 @@ type Task interface {
Done(err error)
Canceled() error
Wait() error
Weight() int
}
type SearchTask struct {
......@@ -235,6 +236,10 @@ func (t *SearchTask) Wait() error {
return <-t.notifier
}
func (t *SearchTask) Weight() int {
return int(t.nq)
}
func (t *SearchTask) Result() *internalpb.SearchResults {
return t.result
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册