未验证 提交 5b50731f 编写于 作者: S shaoyue 提交者: GitHub

Fix searchResultBufFlags & queryResultBufFlags memory leak (#11585)

Signed-off-by: Nshaoyue.chen <shaoyue.chen@zilliz.com>
上级 53c29088
......@@ -46,6 +46,8 @@ proxy:
maxDimension: 32768 # Maximum dimension of vector
maxShardNum: 256 # Maximum number of shards in a collection
maxTaskNum: 1024 # max task number of proxy task queue
bufFlagExpireTime: 3600 # second, the time to expire bufFlag from cache in collectResultLoop
bufFlagCleanupInterval: 600 # second, the interval to clean bufFlag cache in collectResultLoop
queryNode:
stats:
......
......@@ -91,6 +91,9 @@ proxy:
maxDimension: 32768 # Maximum dimension of vector
maxShardNum: 256 # Maximum number of shards in a collection
maxTaskNum: 1024 # max task number of proxy task queue
bufFlagExpireTime: 3600 # second, the time to expire bufFlag from cache in collectResultLoop
bufFlagCleanupInterval: 600 # second, the interval to clean bufFlag cache in collectResultLoop
# Related configuration of queryCoord, used to manage topology and load balancing for the query nodes, and handoff from growing segments to sealed segments.
queryCoord:
......
package proxy
import (
"strconv"
"time"
"github.com/patrickmn/go-cache"
)
type idCache struct {
cache *cache.Cache
}
func newIDCache(defaultExpiration, cleanupInterval time.Duration) *idCache {
c := cache.New(defaultExpiration, cleanupInterval)
return &idCache{
cache: c,
}
}
func (r *idCache) Set(id UniqueID, value bool) {
r.cache.Set(strconv.FormatInt(id, 36), value, 0)
}
func (r *idCache) Get(id UniqueID) (value bool, exists bool) {
valueRaw, exists := r.cache.Get(strconv.FormatInt(id, 36))
if valueRaw == nil {
return false, exists
}
return valueRaw.(bool), exists
}
package proxy
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestIDCache_SetGet(t *testing.T) {
cache := newIDCache(time.Hour, time.Hour)
// not exist before set
_, exist := cache.Get(1)
assert.False(t, exist)
cache.Set(1, true)
// exist after set & before expire
value, exist := cache.Get(1)
assert.True(t, exist)
assert.True(t, value)
cache = newIDCache(time.Millisecond, time.Hour)
cache.Set(1, true)
<-time.After(time.Millisecond)
// not exists after set & expire
_, exist = cache.Get(1)
assert.False(t, exist)
}
......@@ -59,6 +59,8 @@ type ParamTable struct {
MaxDimension int64
DefaultPartitionName string
DefaultIndexName string
BufFlagExpireTime time.Duration
BufFlagCleanupInterval time.Duration
// --- Channels ---
ClusterChannelPrefix string
......@@ -112,6 +114,8 @@ func (pt *ParamTable) Init() {
pt.initPulsarMaxMessageSize()
pt.initMaxTaskNum()
pt.initBufFlagExpireTime()
pt.initBufFlagCleanupInterval()
pt.initRoleName()
}
......@@ -282,3 +286,13 @@ func (pt *ParamTable) initMetaRootPath() {
func (pt *ParamTable) initMaxTaskNum() {
pt.MaxTaskNum = pt.ParseInt64WithDefault("proxy.maxTaskNum", 1024)
}
func (pt *ParamTable) initBufFlagExpireTime() {
expireTime := pt.ParseInt64WithDefault("proxy.bufFlagExpireTime", 3600)
pt.BufFlagExpireTime = time.Duration(expireTime) * time.Second
}
func (pt *ParamTable) initBufFlagCleanupInterval() {
interval := pt.ParseInt64WithDefault("proxy.bufFlagCleanupInterval", 600)
pt.BufFlagCleanupInterval = time.Duration(interval) * time.Second
}
......@@ -653,9 +653,9 @@ func (sched *taskScheduler) collectResultLoop() {
defer queryResultMsgStream.Close()
searchResultBufs := make(map[UniqueID]*searchResultBuf)
searchResultBufFlags := make(map[UniqueID]bool) // if value is true, we can ignore searchResult
searchResultBufFlags := newIDCache(Params.BufFlagExpireTime, Params.BufFlagCleanupInterval) // if value is true, we can ignore searchResult
queryResultBufs := make(map[UniqueID]*queryResultBuf)
queryResultBufFlags := make(map[UniqueID]bool) // if value is true, we can ignore queryResult
queryResultBufFlags := newIDCache(Params.BufFlagExpireTime, Params.BufFlagCleanupInterval) // if value is true, we can ignore queryResult
for {
select {
......@@ -674,9 +674,9 @@ func (sched *taskScheduler) collectResultLoop() {
if searchResultMsg, srOk := tsMsg.(*msgstream.SearchResultMsg); srOk {
reqID := searchResultMsg.Base.MsgID
reqIDStr := strconv.FormatInt(reqID, 10)
ignoreThisResult, ok := searchResultBufFlags[reqID]
ignoreThisResult, ok := searchResultBufFlags.Get(reqID)
if !ok {
searchResultBufFlags[reqID] = false
searchResultBufFlags.Set(reqID, false)
ignoreThisResult = false
}
if ignoreThisResult {
......@@ -688,7 +688,7 @@ func (sched *taskScheduler) collectResultLoop() {
if t == nil {
log.Debug("Proxy collectResultLoop GetTaskByReqID failed", zap.String("reqID", reqIDStr))
delete(searchResultBufs, reqID)
searchResultBufFlags[reqID] = true
searchResultBufFlags.Set(reqID, true)
continue
}
......@@ -696,7 +696,7 @@ func (sched *taskScheduler) collectResultLoop() {
if !ok {
log.Debug("Proxy collectResultLoop type assert t as searchTask failed", zap.Any("ReqID", reqID))
delete(searchResultBufs, reqID)
searchResultBufFlags[reqID] = true
searchResultBufFlags.Set(reqID, true)
continue
}
......@@ -732,7 +732,7 @@ func (sched *taskScheduler) collectResultLoop() {
if resultBuf.readyToReduce() {
log.Debug("Proxy collectResultLoop readyToReduce and assign to reduce")
searchResultBufFlags[reqID] = true
searchResultBufFlags.Set(reqID, true)
st.resultBuf <- resultBuf.resultBuf
delete(searchResultBufs, reqID)
}
......@@ -773,9 +773,9 @@ func (sched *taskScheduler) collectResultLoop() {
reqID := queryResultMsg.Base.MsgID
reqIDStr := strconv.FormatInt(reqID, 10)
ignoreThisResult, ok := queryResultBufFlags[reqID]
ignoreThisResult, ok := queryResultBufFlags.Get(reqID)
if !ok {
queryResultBufFlags[reqID] = false
queryResultBufFlags.Set(reqID, false)
ignoreThisResult = false
}
if ignoreThisResult {
......@@ -787,7 +787,7 @@ func (sched *taskScheduler) collectResultLoop() {
if t == nil {
log.Debug("Proxy collectResultLoop GetTaskByReqID failed", zap.String("reqID", reqIDStr))
delete(queryResultBufs, reqID)
queryResultBufFlags[reqID] = true
queryResultBufFlags.Set(reqID, true)
continue
}
......@@ -795,7 +795,7 @@ func (sched *taskScheduler) collectResultLoop() {
if !ok {
log.Debug("Proxy collectResultLoop type assert t as queryTask failed")
delete(queryResultBufs, reqID)
queryResultBufFlags[reqID] = true
queryResultBufFlags.Set(reqID, true)
continue
}
......@@ -831,7 +831,7 @@ func (sched *taskScheduler) collectResultLoop() {
if resultBuf.readyToReduce() {
log.Debug("Proxy collectResultLoop readyToReduce and assign to reduce")
queryResultBufFlags[reqID] = true
queryResultBufFlags.Set(reqID, true)
st.resultBuf <- resultBuf.resultBuf
delete(queryResultBufs, reqID)
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册