From bce825ff32c4e0d1d7dfbcd82a8713b9e09ced4a Mon Sep 17 00:00:00 2001 From: 710leo <710leo@gmail.com> Date: Mon, 29 Jun 2020 23:57:15 +0800 Subject: [PATCH] Change push index from async to sync --- src/modules/tsdb/index/update_incr.go | 23 ++--------------------- 1 file changed, 2 insertions(+), 21 deletions(-) diff --git a/src/modules/tsdb/index/update_incr.go b/src/modules/tsdb/index/update_incr.go index e61d4e6a..5c74c2e2 100644 --- a/src/modules/tsdb/index/update_incr.go +++ b/src/modules/tsdb/index/update_incr.go @@ -7,7 +7,6 @@ import ( "github.com/didi/nightingale/src/modules/tsdb/backend/rpc" "github.com/didi/nightingale/src/toolkits/str" - "github.com/toolkits/pkg/concurrent/semaphore" "github.com/toolkits/pkg/logger" ) @@ -15,17 +14,8 @@ const ( IndexUpdateIncrTaskSleepInterval = time.Duration(10) * time.Second // 增量更新间隔时间, 默认30s ) -var ( - semaUpdateIndexIncr *semaphore.Semaphore // 索引增量更新时并发控制 -) - // 启动索引的 异步、增量更新 任务, 每隔一定时间,刷新cache中的数据到数据库中 func StartIndexUpdateIncrTask() { - if rpc.Config.MaxConns != 0 { - semaUpdateIndexIncr = semaphore.NewSemaphore(rpc.Config.MaxConns / 2) - } else { - semaUpdateIndexIncr = semaphore.NewSemaphore(10) - } t1 := time.NewTicker(IndexUpdateIncrTaskSleepInterval) for { @@ -64,21 +54,12 @@ func updateIndexIncr() int { tmpList[i] = item i = i + 1 if i == aggrNum { - semaUpdateIndexIncr.Acquire() - go func(items []*dataobj.TsdbItem) { - defer semaUpdateIndexIncr.Release() - rpc.Push2Index(rpc.INCRINDEX, items, IndexList.Get()) - }(tmpList) - i = 0 + rpc.Push2Index(rpc.INCRINDEX, tmpList, IndexList.Get()) } } if i != 0 { - semaUpdateIndexIncr.Acquire() - go func(items []*dataobj.TsdbItem) { - defer semaUpdateIndexIncr.Release() - rpc.Push2Index(rpc.INCRINDEX, items, IndexList.Get()) - }(tmpList[:i]) + rpc.Push2Index(rpc.INCRINDEX, tmpList[:i], IndexList.Get()) } } -- GitLab