未验证 提交 63b6a4a6 编写于 作者: X Xiaofan 提交者: GitHub

use single instance ppol (#25159)

Signed-off-by: Nxiaofan-luan <xiaofan.luan@zilliz.com>
上级 e48e9b1c
......@@ -100,7 +100,7 @@ func TestChannelManager_StateTransfer(t *testing.T) {
nodeID = UniqueID(119)
channelNamePrefix = t.Name()
waitFor = time.Second
waitFor = time.Second * 10
tick = time.Millisecond * 10
)
......
......@@ -21,7 +21,6 @@ import (
"fmt"
"math"
"path"
"runtime"
"sync"
"time"
......@@ -122,7 +121,7 @@ type ChannelMeta struct {
metaService *metaService
chunkManager storage.ChunkManager
workerPool *conc.Pool[struct{}]
workerPool *conc.Pool[any]
closed *atomic.Bool
}
......@@ -143,8 +142,6 @@ var _ Channel = &ChannelMeta{}
func newChannel(channelName string, collID UniqueID, schema *schemapb.CollectionSchema, rc types.RootCoord, cm storage.ChunkManager) *ChannelMeta {
metaService := newMetaService(rc, collID)
pool := conc.NewPool[struct{}](runtime.GOMAXPROCS(0), conc.WithPreAlloc(false), conc.WithNonBlocking(false))
channel := ChannelMeta{
collectionID: collID,
collSchema: schema,
......@@ -161,7 +158,7 @@ func newChannel(channelName string, collID UniqueID, schema *schemapb.Collection
metaService: metaService,
chunkManager: cm,
workerPool: pool,
workerPool: getOrCreateStatsPool(),
closed: atomic.NewBool(false),
}
......@@ -338,7 +335,7 @@ func (c *ChannelMeta) submitLoadStatsTask(s *Segment, statsBinlogs []*datapb.Fie
}
// do submitting in a goroutine in case of task pool is full
go func() {
c.workerPool.Submit(func() (struct{}, error) {
c.workerPool.Submit(func() (any, error) {
stats, err := c.loadStats(context.Background(), s.segmentID, s.collectionID, statsBinlogs, ts)
if err != nil {
// TODO if not retryable, add rebuild statslog logic
......@@ -349,7 +346,7 @@ func (c *ChannelMeta) submitLoadStatsTask(s *Segment, statsBinlogs []*datapb.Fie
c.submitLoadStatsTask(s, statsBinlogs, ts)
}
return struct{}{}, err
return nil, err
}
// get segment lock here
// it's ok that segment is dropped here
......@@ -357,10 +354,8 @@ func (c *ChannelMeta) submitLoadStatsTask(s *Segment, statsBinlogs []*datapb.Fie
defer c.segMu.Unlock()
s.historyStats = append(s.historyStats, stats...)
s.setLoadingLazy(false)
log.Info("lazy loading segment statslog complete")
return struct{}{}, nil
return nil, nil
})
}()
}
......
......@@ -1197,6 +1197,13 @@ func (s *ChannelMetaMockSuite) TestAddSegment_SkipBFLoad() {
s.True(seg.isLoadingLazy())
s.True(seg.isPKExist(&storage.Int64PrimaryKey{Value: 100}))
})
}
func (s *ChannelMetaMockSuite) TestAddSegment_SkipBFLoad2() {
Params.Save(Params.DataNodeCfg.SkipBFStatsLoad.Key, "true")
defer func() {
Params.Save(Params.DataNodeCfg.SkipBFStatsLoad.Key, "false")
}()
s.Run("transient_error", func() {
defer s.SetupTest()
......
package datanode
import (
"runtime"
"sync"
"github.com/milvus-io/milvus/pkg/util/conc"
......@@ -9,6 +10,9 @@ import (
var ioPool *conc.Pool[any]
var ioPoolInitOnce sync.Once
var statsPool *conc.Pool[any]
var statsPoolInitOnce sync.Once
func initIOPool() {
capacity := Params.DataNodeCfg.IOConcurrency.GetAsInt()
if capacity > 32 {
......@@ -22,3 +26,12 @@ func getOrCreateIOPool() *conc.Pool[any] {
ioPoolInitOnce.Do(initIOPool)
return ioPool
}
func initStatsPool() {
statsPool = conc.NewPool[any](runtime.GOMAXPROCS(0), conc.WithPreAlloc(false), conc.WithNonBlocking(false))
}
func getOrCreateStatsPool() *conc.Pool[any] {
statsPoolInitOnce.Do(initStatsPool)
return statsPool
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册