提交 bce825ff 编写于 作者: 7 710leo

Change push index from async to sync

上级 3c1ed52b
......@@ -7,7 +7,6 @@ import (
"github.com/didi/nightingale/src/modules/tsdb/backend/rpc"
"github.com/didi/nightingale/src/toolkits/str"
"github.com/toolkits/pkg/concurrent/semaphore"
"github.com/toolkits/pkg/logger"
)
......@@ -15,17 +14,8 @@ const (
IndexUpdateIncrTaskSleepInterval = time.Duration(10) * time.Second // 增量更新间隔时间, 默认30s
)
var (
semaUpdateIndexIncr *semaphore.Semaphore // 索引增量更新时并发控制
)
// 启动索引的 异步、增量更新 任务, 每隔一定时间,刷新cache中的数据到数据库中
func StartIndexUpdateIncrTask() {
if rpc.Config.MaxConns != 0 {
semaUpdateIndexIncr = semaphore.NewSemaphore(rpc.Config.MaxConns / 2)
} else {
semaUpdateIndexIncr = semaphore.NewSemaphore(10)
}
t1 := time.NewTicker(IndexUpdateIncrTaskSleepInterval)
for {
......@@ -64,21 +54,12 @@ func updateIndexIncr() int {
tmpList[i] = item
i = i + 1
if i == aggrNum {
semaUpdateIndexIncr.Acquire()
go func(items []*dataobj.TsdbItem) {
defer semaUpdateIndexIncr.Release()
rpc.Push2Index(rpc.INCRINDEX, items, IndexList.Get())
}(tmpList)
i = 0
rpc.Push2Index(rpc.INCRINDEX, tmpList, IndexList.Get())
}
}
if i != 0 {
semaUpdateIndexIncr.Acquire()
go func(items []*dataobj.TsdbItem) {
defer semaUpdateIndexIncr.Release()
rpc.Push2Index(rpc.INCRINDEX, items, IndexList.Get())
}(tmpList[:i])
rpc.Push2Index(rpc.INCRINDEX, tmpList[:i], IndexList.Get())
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册