提交 3eca4b3d 编写于 作者: 7 710leo

Refactor: remove tsdb xxhash key

上级 436ae6c6
......@@ -35,7 +35,8 @@ $ docker-compose up -d
![dashboard](https://user-images.githubusercontent.com/19553554/78956965-8b9c6180-7b16-11ea-9747-6ed5e62b068d.png)
## Upgrading
If upgrade `version<1.4.0` to `v1.4.0`, follow the operating instructions in [v1.4.0](https://github.com/didi/nightingale/releases/tag/V1.4.0) release
- If upgrade `version<1.4.0` to `v1.4.0`, follow the operating instructions in [v1.4.0](https://github.com/didi/nightingale/releases/tag/V1.4.0) release
- If upgrade from `version>1.4.0 & version<2.3.0` to `v2.3.0`, need import this [sql](https://github.com/didi/nightingale/blob/master/sql/upgrade_2.3.0.sql)
## Team
......
......@@ -139,13 +139,13 @@ status()
build_one()
{
mod=$1
go build -ldflags "-X main.version=${version} -X main.buildTime=`date -u '+%Y-%m-%d_%I:%M:%S%p'` -X main.gitHash=`git rev-parse HEAD`" -mod=vendor -o n9e-${mod} --tags "md5" src/modules/${mod}/${mod}.go
go build -ldflags "-X main.version=${version} -X main.buildTime=`date -u '+%Y-%m-%d_%I:%M:%S%p'` -X main.gitHash=`git rev-parse HEAD`" -mod=vendor -o n9e-${mod} src/modules/${mod}/${mod}.go
}
build_docker()
{
mod=$1
go build -ldflags "-X main.version=${version} -X main.buildTime=`date -u '+%Y-%m-%d_%I:%M:%S%p'` -X main.gitHash=`git rev-parse HEAD`" -mod=vendor -o bin/n9e-${mod} --tags "md5" src/modules/${mod}/${mod}.go
go build -ldflags "-X main.version=${version} -X main.buildTime=`date -u '+%Y-%m-%d_%I:%M:%S%p'` -X main.gitHash=`git rev-parse HEAD`" -mod=vendor -o bin/n9e-${mod} src/modules/${mod}/${mod}.go
}
build()
......
......@@ -38,7 +38,7 @@ type (
)
type cache struct {
Items map[interface{}]*CS // [counter]ts,value
Items map[string]*CS // [counter]ts,value
sync.RWMutex
}
......@@ -64,18 +64,18 @@ func InitChunkSlot() {
}
ChunksSlots = &ChunksSlot{
Data: make([]map[interface{}][]*Chunk, size),
Data: make([]map[string][]*Chunk, size),
Size: size,
}
for i := 0; i < size; i++ {
ChunksSlots.Data[i] = make(map[interface{}][]*Chunk)
ChunksSlots.Data[i] = make(map[string][]*Chunk)
}
}
func NewCaches() caches {
c := make(caches, SHARD_COUNT)
for i := 0; i < SHARD_COUNT; i++ {
c[i] = &cache{Items: make(map[interface{}]*CS)}
c[i] = &cache{Items: make(map[string]*CS)}
}
return c
}
......@@ -97,7 +97,7 @@ func StartCleanup() {
}
}
func (c *caches) Push(seriesID interface{}, ts int64, value float64) error {
func (c *caches) Push(seriesID string, ts int64, value float64) error {
shard := c.getShard(seriesID)
existC, exist := Caches.exist(seriesID)
if exist {
......@@ -114,7 +114,7 @@ func (c *caches) Push(seriesID interface{}, ts int64, value float64) error {
return err
}
func (c *caches) Get(seriesID interface{}, from, to int64) ([]Iter, error) {
func (c *caches) Get(seriesID string, from, to int64) ([]Iter, error) {
existC, exist := Caches.exist(seriesID)
if !exist {
......@@ -129,7 +129,7 @@ func (c *caches) Get(seriesID interface{}, from, to int64) ([]Iter, error) {
return res, nil
}
func (c *caches) SetFlag(seriesID interface{}, flag uint32) error {
func (c *caches) SetFlag(seriesID string, flag uint32) error {
existC, exist := Caches.exist(seriesID)
if !exist {
return fmt.Errorf("non series exist")
......@@ -138,7 +138,7 @@ func (c *caches) SetFlag(seriesID interface{}, flag uint32) error {
return nil
}
func (c *caches) GetFlag(seriesID interface{}) uint32 {
func (c *caches) GetFlag(seriesID string) uint32 {
existC, exist := Caches.exist(seriesID)
if !exist {
return 0
......@@ -146,7 +146,7 @@ func (c *caches) GetFlag(seriesID interface{}) uint32 {
return existC.GetFlag()
}
func (c *caches) create(seriesID interface{}) *CS {
func (c *caches) create(seriesID string) *CS {
atomic.AddInt64(&TotalCount, 1)
shard := c.getShard(seriesID)
shard.Lock()
......@@ -157,7 +157,7 @@ func (c *caches) create(seriesID interface{}) *CS {
return newC
}
func (c *caches) exist(seriesID interface{}) (*CS, bool) {
func (c *caches) exist(seriesID string) (*CS, bool) {
shard := c.getShard(seriesID)
shard.RLock()
existC, exist := shard.Items[seriesID]
......@@ -166,7 +166,7 @@ func (c *caches) exist(seriesID interface{}) (*CS, bool) {
return existC, exist
}
func (c *caches) GetCurrentChunk(seriesID interface{}) (*Chunk, bool) {
func (c *caches) GetCurrentChunk(seriesID string) (*Chunk, bool) {
shard := c.getShard(seriesID)
if shard == nil {
return nil, false
......@@ -185,7 +185,7 @@ func (c caches) Count() int64 {
return atomic.LoadInt64(&TotalCount)
}
func (c caches) Remove(seriesID interface{}) {
func (c caches) Remove(seriesID string) {
atomic.AddInt64(&TotalCount, -1)
shard := c.getShard(seriesID)
shard.Lock()
......@@ -229,14 +229,6 @@ func (c caches) Cleanup(expiresInMinutes int) {
logger.Infof("cleanup %v Items, took %.2f ms\n", count, float64(time.Since(now).Nanoseconds())*1e-6)
}
func (c caches) getShard(key interface{}) *cache {
switch key.(type) {
case uint64:
return c[int(key.(uint64)%SHARD_COUNT)]
case string:
return c[utils.HashKey(key.(string))%SHARD_COUNT]
default: //不会出现此种情况
return nil
}
return nil
func (c caches) getShard(key string) *cache {
return c[utils.HashKey(key)%SHARD_COUNT]
}
......@@ -54,7 +54,7 @@ var ChunksSlots *ChunksSlot
type ChunksSlot struct {
sync.RWMutex
Data []map[interface{}][]*Chunk
Data []map[string][]*Chunk
Size int
}
......@@ -65,21 +65,21 @@ func (c *ChunksSlot) Len(idx int) int {
return len(c.Data[idx])
}
func (c *ChunksSlot) Get(idx int) map[interface{}][]*Chunk {
func (c *ChunksSlot) Get(idx int) map[string][]*Chunk {
c.Lock()
defer c.Unlock()
items := c.Data[idx]
ret := make(map[interface{}][]*Chunk)
ret := make(map[string][]*Chunk)
for k, v := range items {
ret[k] = v
}
c.Data[idx] = make(map[interface{}][]*Chunk)
c.Data[idx] = make(map[string][]*Chunk)
return ret
}
func (c *ChunksSlot) GetChunks(key interface{}) ([]*Chunk, bool) {
func (c *ChunksSlot) GetChunks(key string) ([]*Chunk, bool) {
c.Lock()
defer c.Unlock()
......@@ -96,7 +96,7 @@ func (c *ChunksSlot) GetChunks(key interface{}) ([]*Chunk, bool) {
return val, ok
}
func (c *ChunksSlot) PushChunks(key interface{}, vals []*Chunk) {
func (c *ChunksSlot) PushChunks(key string, vals []*Chunk) {
c.Lock()
defer c.Unlock()
idx, err := GetChunkIndex(key, c.Size)
......@@ -115,7 +115,7 @@ func (c *ChunksSlot) PushChunks(key interface{}, vals []*Chunk) {
c.Data[idx][key] = vals
}
func (c *ChunksSlot) Push(key interface{}, val *Chunk) {
func (c *ChunksSlot) Push(key string, val *Chunk) {
c.Lock()
defer c.Unlock()
idx, err := GetChunkIndex(key, c.Size)
......@@ -130,13 +130,6 @@ func (c *ChunksSlot) Push(key interface{}, val *Chunk) {
c.Data[idx][key] = append(c.Data[idx][key], val)
}
func GetChunkIndex(key interface{}, size int) (uint32, error) {
switch key.(type) {
case uint64:
return uint32(key.(uint64)) % uint32(size), nil
case string:
return utils.HashKey(key.(string)) % uint32(size), nil
default:
return 0, fmt.Errorf("undefined hashType:%v", key)
}
func GetChunkIndex(key string, size int) (uint32, error) {
return utils.HashKey(key) % uint32(size), nil
}
......@@ -21,7 +21,7 @@ func NewChunks(numOfChunks int) *CS {
return &CS{Chunks: cs}
}
func (cs *CS) Push(seriesID interface{}, ts int64, value float64) error {
func (cs *CS) Push(seriesID string, ts int64, value float64) error {
//找到当前chunk的起始时间
t0 := uint32(ts - (ts % int64(Config.SpanInSeconds)))
......
package cache
type Point struct {
Key interface{} `msg:"key"`
Timestamp int64 `msg:"timestamp"`
Value float64 `msg:"value"`
Key string `msg:"key"`
Timestamp int64 `msg:"timestamp"`
Value float64 `msg:"value"`
}
......@@ -17,11 +17,11 @@ type DsTypeAndStep struct {
// 索引缓存的元素数据结构
type IndexCacheItem struct {
UUID interface{}
UUID string
Item *dataobj.TsdbItem
}
func NewIndexCacheItem(uuid interface{}, item *dataobj.TsdbItem) *IndexCacheItem {
func NewIndexCacheItem(uuid string, item *dataobj.TsdbItem) *IndexCacheItem {
return &IndexCacheItem{UUID: uuid, Item: item}
}
......@@ -29,26 +29,26 @@ func NewIndexCacheItem(uuid interface{}, item *dataobj.TsdbItem) *IndexCacheItem
type IndexCacheBase struct {
sync.RWMutex
maxSize int
data map[interface{}]*dataobj.TsdbItem
data map[string]*dataobj.TsdbItem
}
func NewIndexCacheBase(max int) *IndexCacheBase {
return &IndexCacheBase{maxSize: max, data: make(map[interface{}]*dataobj.TsdbItem)}
return &IndexCacheBase{maxSize: max, data: make(map[string]*dataobj.TsdbItem)}
}
func (i *IndexCacheBase) Put(key interface{}, item *dataobj.TsdbItem) {
func (i *IndexCacheBase) Put(key string, item *dataobj.TsdbItem) {
i.Lock()
defer i.Unlock()
i.data[key] = item
}
func (i *IndexCacheBase) Get(key interface{}) *dataobj.TsdbItem {
func (i *IndexCacheBase) Get(key string) *dataobj.TsdbItem {
i.RLock()
defer i.RUnlock()
return i.data[key]
}
func (i *IndexCacheBase) ContainsKey(key interface{}) bool {
func (i *IndexCacheBase) ContainsKey(key string) bool {
i.RLock()
defer i.RUnlock()
return i.data[key] != nil
......@@ -60,16 +60,16 @@ func (i *IndexCacheBase) Size() int {
return len(i.data)
}
func (i *IndexCacheBase) Keys() []interface{} {
func (i *IndexCacheBase) Keys() []string {
i.RLock()
defer i.RUnlock()
count := len(i.data)
if count == 0 {
return []interface{}{}
return []string{}
}
keys := make([]interface{}, 0, count)
keys := make([]string, 0, count)
for key := range i.data {
keys = append(keys, key)
}
......@@ -77,7 +77,7 @@ func (i *IndexCacheBase) Keys() []interface{} {
return keys
}
func (i *IndexCacheBase) Remove(key interface{}) {
func (i *IndexCacheBase) Remove(key string) {
i.Lock()
defer i.Unlock()
delete(i.data, key)
......
......@@ -40,53 +40,28 @@ func Init(cfg IndexSection) {
logger.Info("index.Start ok")
}
func GetItemFronIndex(hash interface{}) *dataobj.TsdbItem {
switch hash.(type) {
case uint64:
indexedItemCache := IndexedItemCacheBigMap[hash.(uint64)%INDEX_SHARD]
return indexedItemCache.Get(hash)
case string:
indexedItemCache := IndexedItemCacheBigMap[utils.HashKey(hash.(string))%INDEX_SHARD]
return indexedItemCache.Get(hash)
}
return nil
func GetItemFronIndex(hash string) *dataobj.TsdbItem {
indexedItemCache := IndexedItemCacheBigMap[utils.HashKey(hash)%INDEX_SHARD]
return indexedItemCache.Get(hash)
}
func DeleteItemFronIndex(hash interface{}) *dataobj.TsdbItem {
switch hash.(type) {
case uint64:
indexedItemCache := IndexedItemCacheBigMap[hash.(uint64)%INDEX_SHARD]
indexedItemCache.Remove(hash)
case string:
indexedItemCache := IndexedItemCacheBigMap[utils.HashKey(hash.(string))%INDEX_SHARD]
indexedItemCache.Remove(hash)
}
return nil
func DeleteItemFronIndex(hash string) {
indexedItemCache := IndexedItemCacheBigMap[utils.HashKey(hash)%INDEX_SHARD]
indexedItemCache.Remove(hash)
return
}
// index收到一条新上报的监控数据,尝试用于增量更新索引
func ReceiveItem(item *dataobj.TsdbItem, hash interface{}) {
func ReceiveItem(item *dataobj.TsdbItem, hash string) {
if item == nil {
return
}
var indexedItemCache *IndexCacheBase
var unIndexedItemCache *IndexCacheBase
switch hash.(type) {
case uint64:
indexedItemCache = IndexedItemCacheBigMap[int(hash.(uint64)%INDEX_SHARD)]
unIndexedItemCache = UnIndexedItemCacheBigMap[int(hash.(uint64)%INDEX_SHARD)]
case string:
indexedItemCache = IndexedItemCacheBigMap[int(hashKey(hash.(string))%INDEX_SHARD)]
unIndexedItemCache = UnIndexedItemCacheBigMap[int(hashKey(hash.(string))%INDEX_SHARD)]
default:
logger.Error("undefined hash type", hash)
stats.Counter.Set("index.in.err", 1)
indexedItemCache = IndexedItemCacheBigMap[int(hashKey(hash)%INDEX_SHARD)]
unIndexedItemCache = UnIndexedItemCacheBigMap[int(hashKey(hash)%INDEX_SHARD)]
return
}
if indexedItemCache == nil {
stats.Counter.Set("index.in.err", 1)
logger.Error("indexedItemCache: ", reflect.TypeOf(hash), hash)
......
......@@ -30,7 +30,7 @@ const (
var (
Config MigrateSection
QueueCheck = QueueFilter{Data: make(map[interface{}]struct{})}
QueueCheck = QueueFilter{Data: make(map[string]struct{})}
TsdbQueues = make(map[string]*list.SafeListLimited)
NewTsdbQueues = make(map[string]*list.SafeListLimited)
......@@ -45,11 +45,11 @@ var (
)
type QueueFilter struct {
Data map[interface{}]struct{}
Data map[string]struct{}
sync.RWMutex
}
func (q *QueueFilter) Exists(key interface{}) bool {
func (q *QueueFilter) Exists(key string) bool {
q.RLock()
defer q.RUnlock()
......@@ -57,7 +57,7 @@ func (q *QueueFilter) Exists(key interface{}) bool {
return exsits
}
func (q *QueueFilter) Set(key interface{}) {
func (q *QueueFilter) Set(key string) {
q.Lock()
defer q.Unlock()
......
......@@ -410,7 +410,7 @@ func getRRD(f dataobj.RRDFile, worker chan struct{}, dataChan chan *dataobj.File
chunks, exists := cache.ChunksSlots.GetChunks(key)
if exists {
m := make(map[interface{}][]*cache.Chunk)
m := make(map[string][]*cache.Chunk)
m[key] = chunks
rrdtool.FlushRRD(m)
}
......
......@@ -55,7 +55,7 @@ func update(filename string, items []*dataobj.TsdbItem) error {
// flush to disk from memory
// 最新的数据在列表的最后面
func Flushrrd(seriesID interface{}, items []*dataobj.TsdbItem) error {
func Flushrrd(seriesID string, items []*dataobj.TsdbItem) error {
item := index.GetItemFronIndex(seriesID)
if items == nil || len(items) == 0 || item == nil {
return errors.New("empty items")
......
package rrdtool
import (
"fmt"
"io/ioutil"
"sync"
"sync/atomic"
......@@ -53,7 +52,7 @@ type fetch_t struct {
}
type flushfile_t struct {
seriesID interface{}
seriesID string
items []*dataobj.TsdbItem
}
......@@ -164,7 +163,7 @@ func FlushFinishd2Disk() {
case <-ticker:
idx = idx % slotNum
chunks := cache.ChunksSlots.Get(idx)
flushChunks := make(map[interface{}][]*cache.Chunk, 0)
flushChunks := make(map[string][]*cache.Chunk, 0)
for key, cs := range chunks {
if Config.Migrate {
item := index.GetItemFronIndex(key)
......@@ -205,14 +204,14 @@ func Persist() {
return
}
func FlushRRD(flushChunks map[interface{}][]*cache.Chunk) {
func FlushRRD(flushChunks map[string][]*cache.Chunk) {
sema := semaphore.NewSemaphore(Config.Concurrency)
var wg sync.WaitGroup
for key, chunks := range flushChunks {
//控制并发
sema.Acquire()
wg.Add(1)
go func(seriesID interface{}, chunks []*cache.Chunk) {
go func(seriesID string, chunks []*cache.Chunk) {
defer sema.Release()
defer wg.Done()
for _, c := range chunks {
......@@ -242,11 +241,11 @@ func FlushRRD(flushChunks map[interface{}][]*cache.Chunk) {
}
//todo items数据结构优化
func Commit(seriesID interface{}, items []*dataobj.TsdbItem) {
func Commit(seriesID string, items []*dataobj.TsdbItem) {
FlushFile(seriesID, items)
}
func FlushFile(seriesID interface{}, items []*dataobj.TsdbItem) error {
func FlushFile(seriesID string, items []*dataobj.TsdbItem) error {
done := make(chan error, 1)
index, err := getIndex(seriesID)
if err != nil {
......@@ -265,7 +264,7 @@ func FlushFile(seriesID interface{}, items []*dataobj.TsdbItem) error {
return <-done
}
func Fetch(filename string, seriesID interface{}, cf string, start, end int64, step int) ([]*dataobj.RRDData, error) {
func Fetch(filename string, seriesID string, cf string, start, end int64, step int) ([]*dataobj.RRDData, error) {
done := make(chan error, 1)
task := &io_task_t{
method: IO_TASK_M_FETCH,
......@@ -288,25 +287,17 @@ func Fetch(filename string, seriesID interface{}, cf string, start, end int64, s
return task.args.(*fetch_t).data, err
}
func getIndex(seriesID interface{}) (index int, err error) {
func getIndex(seriesID string) (index int, err error) {
batchNum := Config.IOWorkerNum
if batchNum <= 1 {
return 0, nil
}
switch seriesID.(type) {
case uint64:
return int(seriesID.(uint64) % uint64(batchNum)), nil
case string:
return int(utils.HashKey(seriesID.(string)) % uint32(batchNum)), nil
default:
return 0, fmt.Errorf("undefined hashType:%v", seriesID)
}
return
return int(utils.HashKey(seriesID) % uint32(batchNum)), nil
}
func ReadFile(filename string, seriesID interface{}) ([]byte, error) {
func ReadFile(filename string, seriesID string) ([]byte, error) {
done := make(chan error, 1)
task := &io_task_t{
method: IO_TASK_M_READ,
......
......@@ -11,26 +11,12 @@ import (
const RRDDIRS uint64 = 1000
func QueryRrdFile(seriesID interface{}, dsType string, step int) string {
switch seriesID.(type) {
case uint64:
return strconv.FormatUint(seriesID.(uint64)%RRDDIRS, 10) + "/" +
strconv.FormatUint(seriesID.(uint64), 10) + "_" + dsType + "_" + strconv.Itoa(step) + ".rrd"
case string:
return seriesID.(string)[0:2] + "/" + seriesID.(string) + "_" + dsType + "_" + strconv.Itoa(step) + ".rrd"
}
return ""
func QueryRrdFile(seriesID string, dsType string, step int) string {
return seriesID[0:2] + "/" + seriesID + "_" + dsType + "_" + strconv.Itoa(step) + ".rrd"
}
func RrdFileName(baseDir string, seriesID interface{}, dsType string, step int) string {
switch seriesID.(type) {
case uint64:
return baseDir + "/" + strconv.FormatUint(seriesID.(uint64)%RRDDIRS, 10) + "/" +
strconv.FormatUint(seriesID.(uint64), 10) + "_" + dsType + "_" + strconv.Itoa(step) + ".rrd"
case string:
return baseDir + "/" + seriesID.(string)[0:2] + "/" + seriesID.(string) + "_" + dsType + "_" + strconv.Itoa(step) + ".rrd"
}
return ""
func RrdFileName(baseDir string, seriesID string, dsType string, step int) string {
return baseDir + "/" + seriesID[0:2] + "/" + seriesID + "_" + dsType + "_" + strconv.Itoa(step) + ".rrd"
}
// WriteFile writes data to a file named by filename.
......
// +build xxhash
package str
import (
"bytes"
"strconv"
"strings"
"github.com/cespare/xxhash"
)
func Checksum(strs ...string) uint64 {
ret := bufferPool.Get().(*bytes.Buffer)
ret.Reset()
defer bufferPool.Put(ret)
count := len(strs)
if count == 0 {
return 0
}
ret.WriteString(strs[0])
for i := 1; i < count-1; i++ {
ret.WriteString(SEPERATOR)
ret.WriteString(strs[i])
}
if strs[count-1] != "" {
ret.WriteString(SEPERATOR)
ret.WriteString(strs[count-1])
}
return xxhash.Sum64(ret.Bytes())
}
func GetKey(filename string) uint64 {
arr := strings.Split(filename, "/")
if len(arr) < 2 {
return 0
}
a := strings.Split(arr[1], "_")
if len(a) > 1 {
key, _ := strconv.ParseUint(a[0], 10, 64)
return key
}
return 0
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册