提交 8f95d502 编写于 作者: C chenjiandongx

No 'this' anymore

上级 f609a84c
...@@ -76,7 +76,7 @@ func Parse(conf string) error { ...@@ -76,7 +76,7 @@ func Parse(conf string) error {
err = viper.Unmarshal(&Config) err = viper.Unmarshal(&Config)
if err != nil { if err != nil {
return fmt.Errorf("Unmarshal %v", err) return fmt.Errorf("unmarshal config error:%v", err)
} }
return nil return nil
......
...@@ -13,7 +13,7 @@ import ( ...@@ -13,7 +13,7 @@ import (
) )
// cached时间周期 // cached时间周期
const CACHED_DURATION = 60 const cachedDuration = 60
type counterCache struct { type counterCache struct {
sync.RWMutex sync.RWMutex
...@@ -31,13 +31,6 @@ func init() { ...@@ -31,13 +31,6 @@ func init() {
go CleanLoop() go CleanLoop()
} }
func (this *counterCache) AddPoint(tms int64, value float64) {
this.Lock()
tmsStr := fmt.Sprintf("%d", tms)
this.Points[tmsStr] = value
this.Unlock()
}
func PostToCache(paramPoints []*dataobj.MetricValue) { func PostToCache(paramPoints []*dataobj.MetricValue) {
for _, point := range paramPoints { for _, point := range paramPoints {
globalPushPoints.AddPoint(point) globalPushPoints.AddPoint(point)
...@@ -62,69 +55,76 @@ func GetCachedAll() string { ...@@ -62,69 +55,76 @@ func GetCachedAll() string {
return string(str) return string(str)
} }
func (this *counterCache) GetKeys() []string { func (cc *counterCache) AddPoint(tms int64, value float64) {
this.RLock() cc.Lock()
tmsStr := fmt.Sprintf("%d", tms)
cc.Points[tmsStr] = value
cc.Unlock()
}
func (cc *counterCache) GetKeys() []string {
cc.RLock()
retList := make([]string, 0) retList := make([]string, 0)
for k := range this.Points { for k := range cc.Points {
retList = append(retList, k) retList = append(retList, k)
} }
this.RUnlock() cc.RUnlock()
return retList return retList
} }
func (this *counterCache) RemoveTms(tms string) { func (cc *counterCache) RemoveTms(tms string) {
this.Lock() cc.Lock()
delete(this.Points, tms) delete(cc.Points, tms)
this.Unlock() cc.Unlock()
} }
func (this *pushPointsCache) AddCounter(counter string) { func (pc *pushPointsCache) AddCounter(counter string) {
this.Lock() pc.Lock()
tmp := new(counterCache) tmp := new(counterCache)
tmp.Points = make(map[string]float64, 0) tmp.Points = make(map[string]float64)
this.Counters[counter] = tmp pc.Counters[counter] = tmp
this.Unlock() pc.Unlock()
} }
func (this *pushPointsCache) GetCounters() []string { func (pc *pushPointsCache) GetCounters() []string {
ret := make([]string, 0) ret := make([]string, 0)
this.RLock() pc.RLock()
for k := range this.Counters { for k := range pc.Counters {
ret = append(ret, k) ret = append(ret, k)
} }
this.RUnlock() pc.RUnlock()
return ret return ret
} }
func (this *pushPointsCache) RemoveCounter(counter string) { func (pc *pushPointsCache) RemoveCounter(counter string) {
this.Lock() pc.Lock()
delete(this.Counters, counter) delete(pc.Counters, counter)
this.Unlock() pc.Unlock()
} }
func (this *pushPointsCache) GetCounterObj(key string) (*counterCache, bool) { func (pc *pushPointsCache) GetCounterObj(key string) (*counterCache, bool) {
this.RLock() pc.RLock()
Points, ok := this.Counters[key] Points, ok := pc.Counters[key]
this.RUnlock() pc.RUnlock()
return Points, ok return Points, ok
} }
func (this *pushPointsCache) AddPoint(point *dataobj.MetricValue) { func (pc *pushPointsCache) AddPoint(point *dataobj.MetricValue) {
counter := calcCounter(point) counter := calcCounter(point)
if _, ok := this.GetCounterObj(counter); !ok { if _, ok := pc.GetCounterObj(counter); !ok {
this.AddCounter(counter) pc.AddCounter(counter)
} }
counterPoints, exists := this.GetCounterObj(counter) counterPoints, exists := pc.GetCounterObj(counter)
if exists { if exists {
counterPoints.AddPoint(point.Timestamp, point.Value) counterPoints.AddPoint(point.Timestamp, point.Value)
} }
} }
func (this *pushPointsCache) CleanOld() { func (pc *pushPointsCache) CleanOld() {
counters := this.GetCounters() counters := pc.GetCounters()
for _, counter := range counters { for _, counter := range counters {
counterObj, exists := this.GetCounterObj(counter) counterObj, exists := pc.GetCounterObj(counter)
if !exists { if !exists {
continue continue
} }
...@@ -132,16 +132,17 @@ func (this *pushPointsCache) CleanOld() { ...@@ -132,16 +132,17 @@ func (this *pushPointsCache) CleanOld() {
//如果列表为空,清理掉这个counter //如果列表为空,清理掉这个counter
if len(tmsList) == 0 { if len(tmsList) == 0 {
this.RemoveCounter(counter) pc.RemoveCounter(counter)
} else { continue
for _, tmsStr := range tmsList { }
tms, err := strconv.Atoi(tmsStr)
if err != nil { for _, tmsStr := range tmsList {
logger.Errorf("clean cached point, atoi error : [%v]", err) tms, err := strconv.Atoi(tmsStr)
counterObj.RemoveTms(tmsStr) if err != nil {
} else if (time.Now().Unix() - int64(tms)) > CACHED_DURATION { logger.Errorf("clean cached point, atoi error : [%v]", err)
counterObj.RemoveTms(tmsStr) counterObj.RemoveTms(tmsStr)
} } else if (time.Now().Unix() - int64(tms)) > cachedDuration {
counterObj.RemoveTms(tmsStr)
} }
} }
} }
......
...@@ -116,58 +116,58 @@ func AlignStepTms(step, tms int64) int64 { ...@@ -116,58 +116,58 @@ func AlignStepTms(step, tms int64) int64 {
return newTms return newTms
} }
func (this *PointsCounter) GetBytagstring(tagstring string) (*PointCounter, error) { func (pc *PointCounter) UpdateCnt() {
this.RLock() atomic.AddInt64(&pc.Count, 1)
point, ok := this.TagstringMap[tagstring]
this.RUnlock()
if !ok {
return nil, fmt.Errorf("tagstring [%s] not exists!", tagstring)
}
return point, nil
} }
func (this *PointCounter) UpdateCnt() { func (pc *PointCounter) UpdateSum(value float64) {
atomic.AddInt64(&this.Count, 1) addFloat64(&pc.Sum, value)
} }
func (this *PointCounter) UpdateSum(value float64) { func (pc *PointCounter) UpdateMaxMin(value float64) {
addFloat64(&this.Sum, value)
}
func (this *PointCounter) UpdateMaxMin(value float64) {
// 这里要用到结构体的小锁 // 这里要用到结构体的小锁
// sum和cnt可以不用锁,但是最大最小没办法做到原子操作 // sum和cnt可以不用锁,但是最大最小没办法做到原子操作
// 只能引入锁 // 只能引入锁
this.RLock() pc.RLock()
if math.IsNaN(this.Max) || value > this.Max { if math.IsNaN(pc.Max) || value > pc.Max {
this.RUnlock() pc.RUnlock()
this.Lock() pc.Lock()
if math.IsNaN(this.Max) || value > this.Max { if math.IsNaN(pc.Max) || value > pc.Max {
this.Max = value pc.Max = value
} }
this.Unlock() pc.Unlock()
} else { } else {
this.RUnlock() pc.RUnlock()
} }
this.RLock() pc.RLock()
if math.IsNaN(this.Min) || value < this.Min { if math.IsNaN(pc.Min) || value < pc.Min {
this.RUnlock() pc.RUnlock()
this.Lock() pc.Lock()
if math.IsNaN(this.Min) || value < this.Min { if math.IsNaN(pc.Min) || value < pc.Min {
this.Min = value pc.Min = value
} }
this.Unlock() pc.Unlock()
} else { } else {
this.RUnlock() pc.RUnlock()
} }
} }
func (this *PointsCounter) Update(tagstring string, value float64) error { func (psc *PointsCounter) GetBytagstring(tagstring string) (*PointCounter, error) {
pointCount, err := this.GetBytagstring(tagstring) psc.RLock()
point, ok := psc.TagstringMap[tagstring]
psc.RUnlock()
if !ok {
return nil, fmt.Errorf("tagstring [%s] not exists", tagstring)
}
return point, nil
}
func (psc *PointsCounter) Update(tagstring string, value float64) error {
pointCount, err := psc.GetBytagstring(tagstring)
if err != nil { if err != nil {
this.Lock() psc.Lock()
tmp := new(PointCounter) tmp := new(PointCounter)
tmp.Count = 0 tmp.Count = 0
tmp.Sum = 0 tmp.Sum = 0
...@@ -177,10 +177,10 @@ func (this *PointsCounter) Update(tagstring string, value float64) error { ...@@ -177,10 +177,10 @@ func (this *PointsCounter) Update(tagstring string, value float64) error {
} }
tmp.Max = math.NaN() tmp.Max = math.NaN()
tmp.Min = math.NaN() tmp.Min = math.NaN()
this.TagstringMap[tagstring] = tmp psc.TagstringMap[tagstring] = tmp
this.Unlock() psc.Unlock()
pointCount, err = this.GetBytagstring(tagstring) pointCount, err = psc.GetBytagstring(tagstring)
// 如果还是拿不到,就出错返回吧 // 如果还是拿不到,就出错返回吧
if err != nil { if err != nil {
return fmt.Errorf("when update, cannot get pointCount after add [tagstring:%s]", tagstring) return fmt.Errorf("when update, cannot get pointCount after add [tagstring:%s]", tagstring)
...@@ -221,60 +221,60 @@ func addFloat64(val *float64, delta float64) (new float64) { ...@@ -221,60 +221,60 @@ func addFloat64(val *float64, delta float64) (new float64) {
return return
} }
func (this *StrategyCounter) GetTmsList() []int64 { func (sc *StrategyCounter) GetTmsList() []int64 {
tmsList := []int64{} var tmsList []int64
this.RLock() sc.RLock()
for tms := range this.TmsPoints { for tms := range sc.TmsPoints {
tmsList = append(tmsList, tms) tmsList = append(tmsList, tms)
} }
this.RUnlock() sc.RUnlock()
return tmsList return tmsList
} }
func (this *StrategyCounter) DeleteTms(tms int64) { func (sc *StrategyCounter) DeleteTms(tms int64) {
this.Lock() sc.Lock()
delete(this.TmsPoints, tms) delete(sc.TmsPoints, tms)
this.Unlock() sc.Unlock()
} }
func (this *StrategyCounter) GetByTms(tms int64) (*PointsCounter, error) { func (sc *StrategyCounter) GetByTms(tms int64) (*PointsCounter, error) {
this.RLock() sc.RLock()
psCount, ok := this.TmsPoints[tms] psCount, ok := sc.TmsPoints[tms]
if !ok { if !ok {
this.RUnlock() sc.RUnlock()
return nil, fmt.Errorf("no this tms:%v", tms) return nil, fmt.Errorf("no this tms:%v", tms)
} }
this.RUnlock() sc.RUnlock()
return psCount, nil return psCount, nil
} }
func (this *StrategyCounter) AddTms(tms int64) error { func (sc *StrategyCounter) AddTms(tms int64) error {
this.Lock() sc.Lock()
_, ok := this.TmsPoints[tms] _, ok := sc.TmsPoints[tms]
if !ok { if !ok {
tmp := new(PointsCounter) tmp := new(PointsCounter)
tmp.TagstringMap = make(map[string]*PointCounter, 0) tmp.TagstringMap = make(map[string]*PointCounter, 0)
this.TmsPoints[tms] = tmp sc.TmsPoints[tms] = tmp
} }
this.Unlock() sc.Unlock()
return nil return nil
} }
// 只做更新和删除,添加 由数据驱动 // 只做更新和删除,添加 由数据驱动
func (this *GlobalCounter) UpdateByStrategy(globalStras map[int64]*stra.Strategy) { func (gc *GlobalCounter) UpdateByStrategy(globalStras map[int64]*stra.Strategy) {
var delCount, upCount int var delCount, upCount int
// 先以count的ID为准,更新count // 先以count的ID为准,更新count
// 若ID没有了, 那就删掉 // 若ID没有了, 那就删掉
for _, id := range this.GetIDs() { for _, id := range gc.GetIDs() {
this.RLock() gc.RLock()
sCount, ok := this.StrategyCounts[id] sCount, ok := gc.StrategyCounts[id]
this.RUnlock() gc.RUnlock()
if !ok || sCount.Strategy == nil { if !ok || sCount.Strategy == nil {
//证明此策略无效,或已被删除 //证明此策略无效,或已被删除
//删一下 //删一下
delCount = delCount + 1 delCount = delCount + 1
this.deleteByID(id) gc.deleteByID(id)
continue continue
} }
...@@ -286,65 +286,65 @@ func (this *GlobalCounter) UpdateByStrategy(globalStras map[int64]*stra.Strategy ...@@ -286,65 +286,65 @@ func (this *GlobalCounter) UpdateByStrategy(globalStras map[int64]*stra.Strategy
//需要清空缓存 //需要清空缓存
upCount = upCount + 1 upCount = upCount + 1
logger.Infof("strategy [%d] changed, clean data", id) logger.Infof("strategy [%d] changed, clean data", id)
this.cleanStrategyData(id) gc.cleanStrategyData(id)
sCount.Strategy = newStrategy sCount.Strategy = newStrategy
} else { } else {
this.upStrategy(newStrategy) gc.upStrategy(newStrategy)
} }
} }
logger.Infof("Update global count done, [del:%d][update:%d]", delCount, upCount) logger.Infof("Update global count done, [del:%d][update:%d]", delCount, upCount)
} }
func (this *GlobalCounter) AddStrategyCount(st *stra.Strategy) { func (gc *GlobalCounter) AddStrategyCount(st *stra.Strategy) {
this.Lock() gc.Lock()
if _, ok := this.StrategyCounts[st.ID]; !ok { if _, ok := gc.StrategyCounts[st.ID]; !ok {
tmp := new(StrategyCounter) tmp := new(StrategyCounter)
tmp.Strategy = st tmp.Strategy = st
tmp.TmsPoints = make(map[int64]*PointsCounter, 0) tmp.TmsPoints = make(map[int64]*PointsCounter, 0)
this.StrategyCounts[st.ID] = tmp gc.StrategyCounts[st.ID] = tmp
} }
this.Unlock() gc.Unlock()
} }
func (this *GlobalCounter) upStrategy(st *stra.Strategy) { func (gc *GlobalCounter) upStrategy(st *stra.Strategy) {
this.Lock() gc.Lock()
if _, ok := this.StrategyCounts[st.ID]; ok { if _, ok := gc.StrategyCounts[st.ID]; ok {
this.StrategyCounts[st.ID].Strategy = st gc.StrategyCounts[st.ID].Strategy = st
} }
this.Unlock() gc.Unlock()
} }
func (this *GlobalCounter) GetStrategyCountByID(id int64) (*StrategyCounter, error) { func (gc *GlobalCounter) GetStrategyCountByID(id int64) (*StrategyCounter, error) {
this.RLock() gc.RLock()
stCount, ok := this.StrategyCounts[id] stCount, ok := gc.StrategyCounts[id]
if !ok { if !ok {
this.RUnlock() gc.RUnlock()
return nil, fmt.Errorf("No this ID") return nil, fmt.Errorf("No this ID")
} }
this.RUnlock() gc.RUnlock()
return stCount, nil return stCount, nil
} }
func (this *GlobalCounter) GetIDs() []int64 { func (gc *GlobalCounter) GetIDs() []int64 {
this.RLock() gc.RLock()
rList := make([]int64, 0) rList := make([]int64, 0)
for k := range this.StrategyCounts { for k := range gc.StrategyCounts {
rList = append(rList, k) rList = append(rList, k)
} }
this.RUnlock() gc.RUnlock()
return rList return rList
} }
func (this *GlobalCounter) deleteByID(id int64) { func (gc *GlobalCounter) deleteByID(id int64) {
this.Lock() gc.Lock()
delete(this.StrategyCounts, id) delete(gc.StrategyCounts, id)
this.Unlock() gc.Unlock()
} }
func (this *GlobalCounter) cleanStrategyData(id int64) { func (gc *GlobalCounter) cleanStrategyData(id int64) {
this.RLock() gc.RLock()
sCount, ok := this.StrategyCounts[id] sCount, ok := gc.StrategyCounts[id]
this.RUnlock() gc.RUnlock()
if !ok || sCount == nil { if !ok || sCount == nil {
return return
} }
...@@ -361,5 +361,4 @@ func countEqual(A *stra.Strategy, B *stra.Strategy) bool { ...@@ -361,5 +361,4 @@ func countEqual(A *stra.Strategy, B *stra.Strategy) bool {
return true return true
} }
return false return false
} }
...@@ -40,30 +40,6 @@ type WorkerGroup struct { ...@@ -40,30 +40,6 @@ type WorkerGroup struct {
TimeFormatStrategy string TimeFormatStrategy string
} }
func (this WorkerGroup) GetLatestTmsAndDelay() (tms int64, delay int64) {
return this.LatestTms, this.MaxDelay
}
func (this *WorkerGroup) SetLatestTmsAndDelay(tms int64, delay int64) {
latest := atomic.LoadInt64(&this.LatestTms)
if latest < tms {
swapped := atomic.CompareAndSwapInt64(&this.LatestTms, latest, tms)
if swapped {
logger.Debugf("[work group:%s][set latestTms:%d]", this.Workers[0].Mark, tms)
}
}
if delay == 0 {
return
}
newest := atomic.LoadInt64(&this.MaxDelay)
if newest < delay {
atomic.CompareAndSwapInt64(&this.MaxDelay, newest, delay)
}
}
/* /*
* filepath和stream依赖外部,其他的都自己创建 * filepath和stream依赖外部,其他的都自己创建
*/ */
...@@ -94,6 +70,30 @@ func NewWorkerGroup(filePath string, stream chan string) *WorkerGroup { ...@@ -94,6 +70,30 @@ func NewWorkerGroup(filePath string, stream chan string) *WorkerGroup {
return wg return wg
} }
func (wg WorkerGroup) GetLatestTmsAndDelay() (tms int64, delay int64) {
return wg.LatestTms, wg.MaxDelay
}
func (wg *WorkerGroup) SetLatestTmsAndDelay(tms int64, delay int64) {
latest := atomic.LoadInt64(&wg.LatestTms)
if latest < tms {
swapped := atomic.CompareAndSwapInt64(&wg.LatestTms, latest, tms)
if swapped {
logger.Debugf("[work group:%s][set latestTms:%d]", wg.Workers[0].Mark, tms)
}
}
if delay == 0 {
return
}
newest := atomic.LoadInt64(&wg.MaxDelay)
if newest < delay {
atomic.CompareAndSwapInt64(&wg.MaxDelay, newest, delay)
}
}
func (wg *WorkerGroup) Start() { func (wg *WorkerGroup) Start() {
for _, worker := range wg.Workers { for _, worker := range wg.Workers {
worker.Start() worker.Start()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册