diff --git a/src/dataobj/metric.go b/src/dataobj/metric.go index 2966c65bbf92214f13a9253ad9796330e8959cc5..8a8c02d2be6b6152e2a84cf9b9720077186c42e2 100644 --- a/src/dataobj/metric.go +++ b/src/dataobj/metric.go @@ -10,10 +10,11 @@ import ( ) const ( - GAUGE = "GAUGE" - COUNTER = "COUNTER" - DERIVE = "DERIVE" - SPLIT = "/" + GAUGE = "GAUGE" + COUNTER = "COUNTER" + SUBTRACT = "SUBTRACT" + DERIVE = "DERIVE" + SPLIT = "/" ) type MetricValue struct { diff --git a/src/modules/collector/sys/funcs/push.go b/src/modules/collector/sys/funcs/push.go index ca9547e5e1c195ffe5e854d460e930159b5593c6..6eebd2c82b61d107709919820e8f2785268c4163 100644 --- a/src/modules/collector/sys/funcs/push.go +++ b/src/modules/collector/sys/funcs/push.go @@ -37,8 +37,14 @@ func Push(metricItems []*dataobj.MetricValue) error { continue } if item.CounterType == dataobj.COUNTER { - if err := CounterToGauge(item); err != nil { - logger.Warning(err) + item = CounterToGauge(item) + if item == nil { + continue + } + } + if item.CounterType == dataobj.SUBTRACT { + item = SubtractToGauge(item) + if item == nil { continue } } @@ -143,25 +149,59 @@ func rpcClient(addr string) (*rpc.Client, error) { return client, nil } -func CounterToGauge(item *dataobj.MetricValue) error { +func CounterToGauge(item *dataobj.MetricValue) *dataobj.MetricValue { key := item.PK() old, exists := cache.MetricHistory.Get(key) cache.MetricHistory.Set(key, *item) if !exists { - return fmt.Errorf("not found old item:%v", item) + logger.Debugf("not found old item:%v, maybe this is the first item", item) + return nil } if old.Value > item.Value { - return fmt.Errorf("item:%v old value:%v greater than new value:%v", item, old.Value, item.Value) + logger.Warningf("item:%v old value:%v greater than new value:%v", item, old.Value, item.Value) + return nil } if old.Timestamp >= item.Timestamp { - return fmt.Errorf("item:%v old timestamp:%v greater than new timestamp:%v", item, old.Timestamp, item.Timestamp) + logger.Warningf("item:%v old timestamp:%v greater than new timestamp:%v", item, old.Timestamp, item.Timestamp) + return nil } item.ValueUntyped = (item.Value - old.Value) / float64(item.Timestamp-old.Timestamp) item.CounterType = dataobj.GAUGE - return nil + return item +} + +func SubtractToGauge(item *dataobj.MetricValue) *dataobj.MetricValue { + key := item.PK() + + old, exists := cache.MetricHistory.Get(key) + cache.MetricHistory.Set(key, *item) + + if !exists { + logger.Debugf("not found old item:%v, maybe this is the first item", item) + return nil + } + + if old.Value > item.Value { + logger.Warningf("item:%v old value:%v greater than new value:%v", item, old.Value, item.Value) + return nil + } + + if old.Timestamp >= item.Timestamp { + logger.Warningf("item:%v old timestamp:%v greater than new timestamp:%v", item, old.Timestamp, item.Timestamp) + return nil + } + + if old.Timestamp <= item.Timestamp-2*item.Step { + logger.Warningf("item:%v old timestamp:%v too old <= %v = (new timestamp: %v - 2 * step: %v), maybe some point lost", item, old.Timestamp, item.Timestamp-2*item.Step, item.Timestamp, item.Step) + return nil + } + + item.ValueUntyped = item.Value - old.Value + item.CounterType = dataobj.GAUGE + return item }