From a45ca89ecb4d1a79e5c40d677777b8ffa4befb74 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E9=94=AE=E5=86=AC?= Date: Thu, 7 May 2020 21:42:39 +0800 Subject: [PATCH] No 'this' anymore (#135) --- src/modules/collector/config/config.go | 2 +- src/modules/collector/log/worker/cached.go | 101 +++++------ src/modules/collector/log/worker/counter.go | 185 ++++++++++---------- src/modules/collector/log/worker/worker.go | 48 ++--- 4 files changed, 168 insertions(+), 168 deletions(-) diff --git a/src/modules/collector/config/config.go b/src/modules/collector/config/config.go index 6b64419c..8a91f978 100644 --- a/src/modules/collector/config/config.go +++ b/src/modules/collector/config/config.go @@ -76,7 +76,7 @@ func Parse(conf string) error { err = viper.Unmarshal(&Config) if err != nil { - return fmt.Errorf("Unmarshal %v", err) + return fmt.Errorf("unmarshal config error:%v", err) } return nil diff --git a/src/modules/collector/log/worker/cached.go b/src/modules/collector/log/worker/cached.go index 0065ad7b..5f7fe579 100644 --- a/src/modules/collector/log/worker/cached.go +++ b/src/modules/collector/log/worker/cached.go @@ -13,7 +13,7 @@ import ( ) // cached时间周期 -const CACHED_DURATION = 60 +const cachedDuration = 60 type counterCache struct { sync.RWMutex @@ -31,13 +31,6 @@ func init() { go CleanLoop() } -func (this *counterCache) AddPoint(tms int64, value float64) { - this.Lock() - tmsStr := fmt.Sprintf("%d", tms) - this.Points[tmsStr] = value - this.Unlock() -} - func PostToCache(paramPoints []*dataobj.MetricValue) { for _, point := range paramPoints { globalPushPoints.AddPoint(point) @@ -62,69 +55,76 @@ func GetCachedAll() string { return string(str) } -func (this *counterCache) GetKeys() []string { - this.RLock() +func (cc *counterCache) AddPoint(tms int64, value float64) { + cc.Lock() + tmsStr := fmt.Sprintf("%d", tms) + cc.Points[tmsStr] = value + cc.Unlock() +} + +func (cc *counterCache) GetKeys() []string { + cc.RLock() retList := make([]string, 0) - for k := range this.Points { + for k := range cc.Points { retList = append(retList, k) } - this.RUnlock() + cc.RUnlock() return retList } -func (this *counterCache) RemoveTms(tms string) { - this.Lock() - delete(this.Points, tms) - this.Unlock() +func (cc *counterCache) RemoveTms(tms string) { + cc.Lock() + delete(cc.Points, tms) + cc.Unlock() } -func (this *pushPointsCache) AddCounter(counter string) { - this.Lock() +func (pc *pushPointsCache) AddCounter(counter string) { + pc.Lock() tmp := new(counterCache) - tmp.Points = make(map[string]float64, 0) - this.Counters[counter] = tmp - this.Unlock() + tmp.Points = make(map[string]float64) + pc.Counters[counter] = tmp + pc.Unlock() } -func (this *pushPointsCache) GetCounters() []string { +func (pc *pushPointsCache) GetCounters() []string { ret := make([]string, 0) - this.RLock() - for k := range this.Counters { + pc.RLock() + for k := range pc.Counters { ret = append(ret, k) } - this.RUnlock() + pc.RUnlock() return ret } -func (this *pushPointsCache) RemoveCounter(counter string) { - this.Lock() - delete(this.Counters, counter) - this.Unlock() +func (pc *pushPointsCache) RemoveCounter(counter string) { + pc.Lock() + delete(pc.Counters, counter) + pc.Unlock() } -func (this *pushPointsCache) GetCounterObj(key string) (*counterCache, bool) { - this.RLock() - Points, ok := this.Counters[key] - this.RUnlock() +func (pc *pushPointsCache) GetCounterObj(key string) (*counterCache, bool) { + pc.RLock() + Points, ok := pc.Counters[key] + pc.RUnlock() return Points, ok } -func (this *pushPointsCache) AddPoint(point *dataobj.MetricValue) { +func (pc *pushPointsCache) AddPoint(point *dataobj.MetricValue) { counter := calcCounter(point) - if _, ok := this.GetCounterObj(counter); !ok { - this.AddCounter(counter) + if _, ok := pc.GetCounterObj(counter); !ok { + pc.AddCounter(counter) } - counterPoints, exists := this.GetCounterObj(counter) + counterPoints, exists := pc.GetCounterObj(counter) if exists { counterPoints.AddPoint(point.Timestamp, point.Value) } } -func (this *pushPointsCache) CleanOld() { - counters := this.GetCounters() +func (pc *pushPointsCache) CleanOld() { + counters := pc.GetCounters() for _, counter := range counters { - counterObj, exists := this.GetCounterObj(counter) + counterObj, exists := pc.GetCounterObj(counter) if !exists { continue } @@ -132,16 +132,17 @@ func (this *pushPointsCache) CleanOld() { //如果列表为空,清理掉这个counter if len(tmsList) == 0 { - this.RemoveCounter(counter) - } else { - for _, tmsStr := range tmsList { - tms, err := strconv.Atoi(tmsStr) - if err != nil { - logger.Errorf("clean cached point, atoi error : [%v]", err) - counterObj.RemoveTms(tmsStr) - } else if (time.Now().Unix() - int64(tms)) > CACHED_DURATION { - counterObj.RemoveTms(tmsStr) - } + pc.RemoveCounter(counter) + continue + } + + for _, tmsStr := range tmsList { + tms, err := strconv.Atoi(tmsStr) + if err != nil { + logger.Errorf("clean cached point, atoi error : [%v]", err) + counterObj.RemoveTms(tmsStr) + } else if (time.Now().Unix() - int64(tms)) > cachedDuration { + counterObj.RemoveTms(tmsStr) } } } diff --git a/src/modules/collector/log/worker/counter.go b/src/modules/collector/log/worker/counter.go index 3d035278..3e14c53f 100644 --- a/src/modules/collector/log/worker/counter.go +++ b/src/modules/collector/log/worker/counter.go @@ -116,58 +116,58 @@ func AlignStepTms(step, tms int64) int64 { return newTms } -func (this *PointsCounter) GetBytagstring(tagstring string) (*PointCounter, error) { - this.RLock() - point, ok := this.TagstringMap[tagstring] - this.RUnlock() - - if !ok { - return nil, fmt.Errorf("tagstring [%s] not exists!", tagstring) - } - return point, nil +func (pc *PointCounter) UpdateCnt() { + atomic.AddInt64(&pc.Count, 1) } -func (this *PointCounter) UpdateCnt() { - atomic.AddInt64(&this.Count, 1) +func (pc *PointCounter) UpdateSum(value float64) { + addFloat64(&pc.Sum, value) } -func (this *PointCounter) UpdateSum(value float64) { - addFloat64(&this.Sum, value) -} - -func (this *PointCounter) UpdateMaxMin(value float64) { +func (pc *PointCounter) UpdateMaxMin(value float64) { // 这里要用到结构体的小锁 // sum和cnt可以不用锁,但是最大最小没办法做到原子操作 // 只能引入锁 - this.RLock() - if math.IsNaN(this.Max) || value > this.Max { - this.RUnlock() - this.Lock() - if math.IsNaN(this.Max) || value > this.Max { - this.Max = value + pc.RLock() + if math.IsNaN(pc.Max) || value > pc.Max { + pc.RUnlock() + pc.Lock() + if math.IsNaN(pc.Max) || value > pc.Max { + pc.Max = value } - this.Unlock() + pc.Unlock() } else { - this.RUnlock() + pc.RUnlock() } - this.RLock() - if math.IsNaN(this.Min) || value < this.Min { - this.RUnlock() - this.Lock() - if math.IsNaN(this.Min) || value < this.Min { - this.Min = value + pc.RLock() + if math.IsNaN(pc.Min) || value < pc.Min { + pc.RUnlock() + pc.Lock() + if math.IsNaN(pc.Min) || value < pc.Min { + pc.Min = value } - this.Unlock() + pc.Unlock() } else { - this.RUnlock() + pc.RUnlock() } } -func (this *PointsCounter) Update(tagstring string, value float64) error { - pointCount, err := this.GetBytagstring(tagstring) +func (psc *PointsCounter) GetBytagstring(tagstring string) (*PointCounter, error) { + psc.RLock() + point, ok := psc.TagstringMap[tagstring] + psc.RUnlock() + + if !ok { + return nil, fmt.Errorf("tagstring [%s] not exists", tagstring) + } + return point, nil +} + +func (psc *PointsCounter) Update(tagstring string, value float64) error { + pointCount, err := psc.GetBytagstring(tagstring) if err != nil { - this.Lock() + psc.Lock() tmp := new(PointCounter) tmp.Count = 0 tmp.Sum = 0 @@ -177,10 +177,10 @@ func (this *PointsCounter) Update(tagstring string, value float64) error { } tmp.Max = math.NaN() tmp.Min = math.NaN() - this.TagstringMap[tagstring] = tmp - this.Unlock() + psc.TagstringMap[tagstring] = tmp + psc.Unlock() - pointCount, err = this.GetBytagstring(tagstring) + pointCount, err = psc.GetBytagstring(tagstring) // 如果还是拿不到,就出错返回吧 if err != nil { return fmt.Errorf("when update, cannot get pointCount after add [tagstring:%s]", tagstring) @@ -221,60 +221,60 @@ func addFloat64(val *float64, delta float64) (new float64) { return } -func (this *StrategyCounter) GetTmsList() []int64 { - tmsList := []int64{} - this.RLock() - for tms := range this.TmsPoints { +func (sc *StrategyCounter) GetTmsList() []int64 { + var tmsList []int64 + sc.RLock() + for tms := range sc.TmsPoints { tmsList = append(tmsList, tms) } - this.RUnlock() + sc.RUnlock() return tmsList } -func (this *StrategyCounter) DeleteTms(tms int64) { - this.Lock() - delete(this.TmsPoints, tms) - this.Unlock() +func (sc *StrategyCounter) DeleteTms(tms int64) { + sc.Lock() + delete(sc.TmsPoints, tms) + sc.Unlock() } -func (this *StrategyCounter) GetByTms(tms int64) (*PointsCounter, error) { - this.RLock() - psCount, ok := this.TmsPoints[tms] +func (sc *StrategyCounter) GetByTms(tms int64) (*PointsCounter, error) { + sc.RLock() + psCount, ok := sc.TmsPoints[tms] if !ok { - this.RUnlock() + sc.RUnlock() return nil, fmt.Errorf("no this tms:%v", tms) } - this.RUnlock() + sc.RUnlock() return psCount, nil } -func (this *StrategyCounter) AddTms(tms int64) error { - this.Lock() - _, ok := this.TmsPoints[tms] +func (sc *StrategyCounter) AddTms(tms int64) error { + sc.Lock() + _, ok := sc.TmsPoints[tms] if !ok { tmp := new(PointsCounter) tmp.TagstringMap = make(map[string]*PointCounter, 0) - this.TmsPoints[tms] = tmp + sc.TmsPoints[tms] = tmp } - this.Unlock() + sc.Unlock() return nil } // 只做更新和删除,添加 由数据驱动 -func (this *GlobalCounter) UpdateByStrategy(globalStras map[int64]*stra.Strategy) { +func (gc *GlobalCounter) UpdateByStrategy(globalStras map[int64]*stra.Strategy) { var delCount, upCount int // 先以count的ID为准,更新count // 若ID没有了, 那就删掉 - for _, id := range this.GetIDs() { - this.RLock() - sCount, ok := this.StrategyCounts[id] - this.RUnlock() + for _, id := range gc.GetIDs() { + gc.RLock() + sCount, ok := gc.StrategyCounts[id] + gc.RUnlock() if !ok || sCount.Strategy == nil { //证明此策略无效,或已被删除 //删一下 delCount = delCount + 1 - this.deleteByID(id) + gc.deleteByID(id) continue } @@ -286,65 +286,65 @@ func (this *GlobalCounter) UpdateByStrategy(globalStras map[int64]*stra.Strategy //需要清空缓存 upCount = upCount + 1 logger.Infof("strategy [%d] changed, clean data", id) - this.cleanStrategyData(id) + gc.cleanStrategyData(id) sCount.Strategy = newStrategy } else { - this.upStrategy(newStrategy) + gc.upStrategy(newStrategy) } } logger.Infof("Update global count done, [del:%d][update:%d]", delCount, upCount) } -func (this *GlobalCounter) AddStrategyCount(st *stra.Strategy) { - this.Lock() - if _, ok := this.StrategyCounts[st.ID]; !ok { +func (gc *GlobalCounter) AddStrategyCount(st *stra.Strategy) { + gc.Lock() + if _, ok := gc.StrategyCounts[st.ID]; !ok { tmp := new(StrategyCounter) tmp.Strategy = st tmp.TmsPoints = make(map[int64]*PointsCounter, 0) - this.StrategyCounts[st.ID] = tmp + gc.StrategyCounts[st.ID] = tmp } - this.Unlock() + gc.Unlock() } -func (this *GlobalCounter) upStrategy(st *stra.Strategy) { - this.Lock() - if _, ok := this.StrategyCounts[st.ID]; ok { - this.StrategyCounts[st.ID].Strategy = st +func (gc *GlobalCounter) upStrategy(st *stra.Strategy) { + gc.Lock() + if _, ok := gc.StrategyCounts[st.ID]; ok { + gc.StrategyCounts[st.ID].Strategy = st } - this.Unlock() + gc.Unlock() } -func (this *GlobalCounter) GetStrategyCountByID(id int64) (*StrategyCounter, error) { - this.RLock() - stCount, ok := this.StrategyCounts[id] +func (gc *GlobalCounter) GetStrategyCountByID(id int64) (*StrategyCounter, error) { + gc.RLock() + stCount, ok := gc.StrategyCounts[id] if !ok { - this.RUnlock() + gc.RUnlock() return nil, fmt.Errorf("No this ID") } - this.RUnlock() + gc.RUnlock() return stCount, nil } -func (this *GlobalCounter) GetIDs() []int64 { - this.RLock() +func (gc *GlobalCounter) GetIDs() []int64 { + gc.RLock() rList := make([]int64, 0) - for k := range this.StrategyCounts { + for k := range gc.StrategyCounts { rList = append(rList, k) } - this.RUnlock() + gc.RUnlock() return rList } -func (this *GlobalCounter) deleteByID(id int64) { - this.Lock() - delete(this.StrategyCounts, id) - this.Unlock() +func (gc *GlobalCounter) deleteByID(id int64) { + gc.Lock() + delete(gc.StrategyCounts, id) + gc.Unlock() } -func (this *GlobalCounter) cleanStrategyData(id int64) { - this.RLock() - sCount, ok := this.StrategyCounts[id] - this.RUnlock() +func (gc *GlobalCounter) cleanStrategyData(id int64) { + gc.RLock() + sCount, ok := gc.StrategyCounts[id] + gc.RUnlock() if !ok || sCount == nil { return } @@ -361,5 +361,4 @@ func countEqual(A *stra.Strategy, B *stra.Strategy) bool { return true } return false - } diff --git a/src/modules/collector/log/worker/worker.go b/src/modules/collector/log/worker/worker.go index 03780595..2913d23d 100644 --- a/src/modules/collector/log/worker/worker.go +++ b/src/modules/collector/log/worker/worker.go @@ -40,30 +40,6 @@ type WorkerGroup struct { TimeFormatStrategy string } -func (this WorkerGroup) GetLatestTmsAndDelay() (tms int64, delay int64) { - return this.LatestTms, this.MaxDelay -} - -func (this *WorkerGroup) SetLatestTmsAndDelay(tms int64, delay int64) { - latest := atomic.LoadInt64(&this.LatestTms) - - if latest < tms { - swapped := atomic.CompareAndSwapInt64(&this.LatestTms, latest, tms) - if swapped { - logger.Debugf("[work group:%s][set latestTms:%d]", this.Workers[0].Mark, tms) - } - } - - if delay == 0 { - return - } - - newest := atomic.LoadInt64(&this.MaxDelay) - if newest < delay { - atomic.CompareAndSwapInt64(&this.MaxDelay, newest, delay) - } -} - /* * filepath和stream依赖外部,其他的都自己创建 */ @@ -94,6 +70,30 @@ func NewWorkerGroup(filePath string, stream chan string) *WorkerGroup { return wg } +func (wg WorkerGroup) GetLatestTmsAndDelay() (tms int64, delay int64) { + return wg.LatestTms, wg.MaxDelay +} + +func (wg *WorkerGroup) SetLatestTmsAndDelay(tms int64, delay int64) { + latest := atomic.LoadInt64(&wg.LatestTms) + + if latest < tms { + swapped := atomic.CompareAndSwapInt64(&wg.LatestTms, latest, tms) + if swapped { + logger.Debugf("[work group:%s][set latestTms:%d]", wg.Workers[0].Mark, tms) + } + } + + if delay == 0 { + return + } + + newest := atomic.LoadInt64(&wg.MaxDelay) + if newest < delay { + atomic.CompareAndSwapInt64(&wg.MaxDelay, newest, delay) + } +} + func (wg *WorkerGroup) Start() { for _, worker := range wg.Workers { worker.Start() -- GitLab