diff --git a/configs/advanced/component.yaml b/configs/advanced/component.yaml index 559db4798bd5735b509cf7240e3b602bb465b002..0d010d1d81aa34ae8982b42c433240c415adb862 100644 --- a/configs/advanced/component.yaml +++ b/configs/advanced/component.yaml @@ -46,6 +46,8 @@ proxy: maxDimension: 32768 # Maximum dimension of vector maxShardNum: 256 # Maximum number of shards in a collection maxTaskNum: 1024 # max task number of proxy task queue + bufFlagExpireTime: 3600 # second, the time to expire bufFlag from cache in collectResultLoop + bufFlagCleanupInterval: 600 # second, the interval to clean bufFlag cache in collectResultLoop queryNode: stats: diff --git a/configs/milvus.yaml b/configs/milvus.yaml index f59e4cae90cb0463b28801282d2c87dc4bb450c6..8f0e66d44aea175b6fad8b2babae163cf507d86a 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -91,6 +91,9 @@ proxy: maxDimension: 32768 # Maximum dimension of vector maxShardNum: 256 # Maximum number of shards in a collection maxTaskNum: 1024 # max task number of proxy task queue + bufFlagExpireTime: 3600 # second, the time to expire bufFlag from cache in collectResultLoop + bufFlagCleanupInterval: 600 # second, the interval to clean bufFlag cache in collectResultLoop + # Related configuration of queryCoord, used to manage topology and load balancing for the query nodes, and handoff from growing segments to sealed segments. queryCoord: diff --git a/go.mod b/go.mod index 161f68136e89f2bf7aba1574d9e69e774cf4a0f9..2780cb7e811028fd2bc748921f9e466f7408164a 100644 --- a/go.mod +++ b/go.mod @@ -24,6 +24,7 @@ require ( github.com/minio/minio-go/v7 v7.0.10 github.com/mitchellh/mapstructure v1.4.1 github.com/opentracing/opentracing-go v1.2.0 + github.com/patrickmn/go-cache v2.1.0+incompatible // indirect github.com/pierrec/lz4 v2.5.2+incompatible // indirect github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.11.0 diff --git a/go.sum b/go.sum index bdeeb7df5a3c55afe0c1e8d414fab12bc80e91d1..db8929ea902318fcc7b8d9c59e8cbe4b0023865f 100644 --- a/go.sum +++ b/go.sum @@ -406,6 +406,8 @@ github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFSt github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs= github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= +github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= +github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/pelletier/go-toml v1.9.3 h1:zeC5b1GviRUyKYd6OJPvBU/mcVDVoL1OhT17FCt5dSQ= github.com/pelletier/go-toml v1.9.3/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c= diff --git a/internal/proxy/id_cache.go b/internal/proxy/id_cache.go new file mode 100644 index 0000000000000000000000000000000000000000..7dec3affd315df4fa8280fbc67944952a20fb37c --- /dev/null +++ b/internal/proxy/id_cache.go @@ -0,0 +1,31 @@ +package proxy + +import ( + "strconv" + "time" + + "github.com/patrickmn/go-cache" +) + +type idCache struct { + cache *cache.Cache +} + +func newIDCache(defaultExpiration, cleanupInterval time.Duration) *idCache { + c := cache.New(defaultExpiration, cleanupInterval) + return &idCache{ + cache: c, + } +} + +func (r *idCache) Set(id UniqueID, value bool) { + r.cache.Set(strconv.FormatInt(id, 36), value, 0) +} + +func (r *idCache) Get(id UniqueID) (value bool, exists bool) { + valueRaw, exists := r.cache.Get(strconv.FormatInt(id, 36)) + if valueRaw == nil { + return false, exists + } + return valueRaw.(bool), exists +} diff --git a/internal/proxy/id_cache_test.go b/internal/proxy/id_cache_test.go new file mode 100644 index 0000000000000000000000000000000000000000..2b2b52b57f541b3d175aa6c949b0b99ab4ee4a61 --- /dev/null +++ b/internal/proxy/id_cache_test.go @@ -0,0 +1,27 @@ +package proxy + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestIDCache_SetGet(t *testing.T) { + cache := newIDCache(time.Hour, time.Hour) + // not exist before set + _, exist := cache.Get(1) + assert.False(t, exist) + cache.Set(1, true) + // exist after set & before expire + value, exist := cache.Get(1) + assert.True(t, exist) + assert.True(t, value) + + cache = newIDCache(time.Millisecond, time.Hour) + cache.Set(1, true) + <-time.After(time.Millisecond) + // not exists after set & expire + _, exist = cache.Get(1) + assert.False(t, exist) +} diff --git a/internal/proxy/param_table.go b/internal/proxy/param_table.go index 3cba838462e465964db372c37a3b4d28442c5eeb..90d3b8ca74abca4ebbcc8e86c861af93604e8346 100644 --- a/internal/proxy/param_table.go +++ b/internal/proxy/param_table.go @@ -59,6 +59,8 @@ type ParamTable struct { MaxDimension int64 DefaultPartitionName string DefaultIndexName string + BufFlagExpireTime time.Duration + BufFlagCleanupInterval time.Duration // --- Channels --- ClusterChannelPrefix string @@ -112,6 +114,8 @@ func (pt *ParamTable) Init() { pt.initPulsarMaxMessageSize() pt.initMaxTaskNum() + pt.initBufFlagExpireTime() + pt.initBufFlagCleanupInterval() pt.initRoleName() } @@ -282,3 +286,13 @@ func (pt *ParamTable) initMetaRootPath() { func (pt *ParamTable) initMaxTaskNum() { pt.MaxTaskNum = pt.ParseInt64WithDefault("proxy.maxTaskNum", 1024) } + +func (pt *ParamTable) initBufFlagExpireTime() { + expireTime := pt.ParseInt64WithDefault("proxy.bufFlagExpireTime", 3600) + pt.BufFlagExpireTime = time.Duration(expireTime) * time.Second +} + +func (pt *ParamTable) initBufFlagCleanupInterval() { + interval := pt.ParseInt64WithDefault("proxy.bufFlagCleanupInterval", 600) + pt.BufFlagCleanupInterval = time.Duration(interval) * time.Second +} diff --git a/internal/proxy/task_scheduler.go b/internal/proxy/task_scheduler.go index 022359ea442552580668b6cf6c5590e0b768612e..4fd8602d2ca6e46bd07d2485b94b6705d3b8d350 100644 --- a/internal/proxy/task_scheduler.go +++ b/internal/proxy/task_scheduler.go @@ -653,9 +653,9 @@ func (sched *taskScheduler) collectResultLoop() { defer queryResultMsgStream.Close() searchResultBufs := make(map[UniqueID]*searchResultBuf) - searchResultBufFlags := make(map[UniqueID]bool) // if value is true, we can ignore searchResult + searchResultBufFlags := newIDCache(Params.BufFlagExpireTime, Params.BufFlagCleanupInterval) // if value is true, we can ignore searchResult queryResultBufs := make(map[UniqueID]*queryResultBuf) - queryResultBufFlags := make(map[UniqueID]bool) // if value is true, we can ignore queryResult + queryResultBufFlags := newIDCache(Params.BufFlagExpireTime, Params.BufFlagCleanupInterval) // if value is true, we can ignore queryResult for { select { @@ -674,9 +674,9 @@ func (sched *taskScheduler) collectResultLoop() { if searchResultMsg, srOk := tsMsg.(*msgstream.SearchResultMsg); srOk { reqID := searchResultMsg.Base.MsgID reqIDStr := strconv.FormatInt(reqID, 10) - ignoreThisResult, ok := searchResultBufFlags[reqID] + ignoreThisResult, ok := searchResultBufFlags.Get(reqID) if !ok { - searchResultBufFlags[reqID] = false + searchResultBufFlags.Set(reqID, false) ignoreThisResult = false } if ignoreThisResult { @@ -688,7 +688,7 @@ func (sched *taskScheduler) collectResultLoop() { if t == nil { log.Debug("Proxy collectResultLoop GetTaskByReqID failed", zap.String("reqID", reqIDStr)) delete(searchResultBufs, reqID) - searchResultBufFlags[reqID] = true + searchResultBufFlags.Set(reqID, true) continue } @@ -696,7 +696,7 @@ func (sched *taskScheduler) collectResultLoop() { if !ok { log.Debug("Proxy collectResultLoop type assert t as searchTask failed", zap.Any("ReqID", reqID)) delete(searchResultBufs, reqID) - searchResultBufFlags[reqID] = true + searchResultBufFlags.Set(reqID, true) continue } @@ -732,7 +732,7 @@ func (sched *taskScheduler) collectResultLoop() { if resultBuf.readyToReduce() { log.Debug("Proxy collectResultLoop readyToReduce and assign to reduce") - searchResultBufFlags[reqID] = true + searchResultBufFlags.Set(reqID, true) st.resultBuf <- resultBuf.resultBuf delete(searchResultBufs, reqID) } @@ -773,9 +773,9 @@ func (sched *taskScheduler) collectResultLoop() { reqID := queryResultMsg.Base.MsgID reqIDStr := strconv.FormatInt(reqID, 10) - ignoreThisResult, ok := queryResultBufFlags[reqID] + ignoreThisResult, ok := queryResultBufFlags.Get(reqID) if !ok { - queryResultBufFlags[reqID] = false + queryResultBufFlags.Set(reqID, false) ignoreThisResult = false } if ignoreThisResult { @@ -787,7 +787,7 @@ func (sched *taskScheduler) collectResultLoop() { if t == nil { log.Debug("Proxy collectResultLoop GetTaskByReqID failed", zap.String("reqID", reqIDStr)) delete(queryResultBufs, reqID) - queryResultBufFlags[reqID] = true + queryResultBufFlags.Set(reqID, true) continue } @@ -795,7 +795,7 @@ func (sched *taskScheduler) collectResultLoop() { if !ok { log.Debug("Proxy collectResultLoop type assert t as queryTask failed") delete(queryResultBufs, reqID) - queryResultBufFlags[reqID] = true + queryResultBufFlags.Set(reqID, true) continue } @@ -831,7 +831,7 @@ func (sched *taskScheduler) collectResultLoop() { if resultBuf.readyToReduce() { log.Debug("Proxy collectResultLoop readyToReduce and assign to reduce") - queryResultBufFlags[reqID] = true + queryResultBufFlags.Set(reqID, true) st.resultBuf <- resultBuf.resultBuf delete(queryResultBufs, reqID) }