提交 d9624955 编写于 作者: dengyihao's avatar dengyihao

enh demo.go performance

上级 60865397
...@@ -155,120 +155,138 @@ type tableInfo struct { ...@@ -155,120 +155,138 @@ type tableInfo struct {
vgId int32 vgId int32
} }
func taskInit(db *sql.DB, dbname string, srcdbname string, metatable string, exptable string, tskinfo *taskInfo) { func GetSubTableInfo(db *sql.DB, dbname, stbname string) []tableInfo {
{ tbs := make([]tableInfo, 0, 512)
sql := fmt.Sprintf("create database if not exists %s update 2", dbname) sql := "show " + dbname + ".tables"
_, err := db.Exec(sql)
checkErr(err, sql)
}
{
sql := fmt.Sprintf("create stable if not exists %s.%s (ts timestamp, dbname binary(64), tabname binary(64), colname binary(64), lastTime timestamp, offline int) tags(tablename binary(128))", dbname, exptable)
_, err := db.Exec(sql)
checkErr(err, sql)
}
{ row, err := db.Query(sql)
sql := fmt.Sprintf("create table if not exists %s.%s (ts timestamp, dbname binary(64), tablename binary(64), colName binary(128), checkInterval int, threshold int)", dbname, metatable) if err != nil {
_, err := db.Exec(sql)
checkErr(err, sql) checkErr(err, sql)
} }
sql := "show " + srcdbname + ".tables" for row.Next() {
tbs := make([]tableInfo, 0, 512)
rows, _ := db.Query(sql)
for rows.Next() {
var ( var (
tbname string tbname string
createTime string createTime string
columns int columns int
stbname string = "" stb string
uid int64 uid int64
tid int64 tid int64
vgId int32 vgId int32
) )
err := rows.Scan(&tbname, &createTime, &columns, &stbname, &uid, &tid, &vgId) err := row.Scan(&tbname, &createTime, &columns, &stb, &uid, &tid, &vgId)
if err != nil { if err != nil {
checkErr(err, sql) checkErr(err, sql)
} }
// ignore exceptable name if len(stbname) == 0 {
if strings.Compare(strings.ToLower(tbname), strings.ToLower(metatable)) == 0 || strings.Compare(tbname, exptable) == 0 || strings.HasPrefix(strings.ToLower(stbname), strings.ToLower(exptable)) == true { // skip normal table
if len(stb) == 0 || strings.Compare(stb, "") == 0 {
continue
}
tbs = append(tbs, tableInfo{tbname: tbname, createTime: createTime, columns: columns, stbname: stb, uid: uid, tid: tid, vgId: vgId})
continue continue
} }
// ignore normal table if strings.Compare(stb, stbname) == 0 {
if len(stbname) == 0 || strings.Compare(stbname, "") == 0 { tbs = append(tbs, tableInfo{tbname: tbname, createTime: createTime, columns: columns, stbname: stbname, uid: uid, tid: tid, vgId: vgId})
continue
} }
}
row.Close()
return tbs
tbs = append(tbs, tableInfo{tbname: tbname, createTime: createTime, columns: columns, stbname: stbname, uid: uid, tid: tid, vgId: vgId}) }
func GetStableField(db *sql.DB, dbname, stbname string) []string {
result := make([]string, 0, 10)
sql := "describe " + dbname + "." + stbname
row, err := db.Query(sql)
if err != nil {
checkErr(err, sql)
}
count := 0
for row.Next() {
var field string
var ty string
var tlen int32
var note string
row.Scan(&field, &ty, &tlen, &note)
// ignore time and tag col
if count != 0 && strings.Compare(note, "TAG") != 0 {
// skip first and skip tag col
result = append(result, field)
}
count = count + 1
} }
rows.Close() row.Close()
return result
}
fieldTs := time.Now().Add(time.Hour * -1000) func taskInit(db *sql.DB, dbname string, srcdbname string, metatable string, exptable string, tskinfo *taskInfo) {
{
sql := fmt.Sprintf("create database if not exists %s update 2", dbname)
_, err := db.Exec(sql)
checkErr(err, sql)
}
{
sql := fmt.Sprintf("create stable if not exists %s.%s (ts timestamp, dbname binary(64), tabname binary(64), colname binary(64), lastTime timestamp, offline int) tags(tablename binary(128))", dbname, exptable)
_, err := db.Exec(sql)
checkErr(err, sql)
}
{
sql := fmt.Sprintf("create table if not exists %s.%s (ts timestamp, dbname binary(64), tablename binary(64), colName binary(128), checkInterval int, threshold int)", dbname, metatable)
_, err := db.Exec(sql)
checkErr(err, sql)
}
tbs := GetSubTableInfo(db, srcdbname, "")
fmt.Printf("tbs size %d\n", len(tbs))
fieldDict := make(map[string][]string)
fieldTs := time.Now().Add(time.Hour * -1000)
for _, e := range tbs { for _, e := range tbs {
tbname := e.tbname tbname := e.tbname
columns := e.columns
stbname := e.stbname stbname := e.stbname
fields := make([]string, 0, columns)
{
subsql := "describe " + srcdbname + "." + stbname
subRows, err := db.Query(subsql)
if err != nil {
checkErr(err, subsql)
}
count := 0
for subRows.Next() {
var field string
var ty string
var len int32
var note string
subRows.Scan(&field, &ty, &len, &note)
// ignore time and tag col
if count != 0 && strings.Compare(note, "TAG") != 0 {
// skip first and skip tag col
fields = append(fields, field)
}
count = count + 1
} field, ok := fieldDict[stbname]
defer subRows.Close() if !ok {
field = GetStableField(db, srcdbname, stbname)
fieldDict[stbname] = field
} }
for _, f := range fields {
count := 0 for _, f := range field {
{ insertTableInfoIntoMetatable := func(db *sql.DB, metaDB string, metaTable string, srcDB string, srcTable string, srcCol string, ts time.Time, interval, threshold int) {
checkSql := fmt.Sprintf("select * from %s.%s where dbname = \"%s\" and tablename = \"%s\" and colname = \"%s\"", dbname, metatable, srcdbname, tbname, f) sql := fmt.Sprintf("select * from %s.%s where dbname = \"%s\" and tablename = \"%s\" and colname = \"%s\"", metaDB, metaTable, srcDB, srcTable, srcCol)
checkRow, err := db.Query(checkSql) row, err := db.Query(sql)
if err != nil { if err != nil {
checkErr(err, checkSql) checkErr(err, sql)
} }
for checkRow.Next() { count := 0
for row.Next() {
count = count + 1 count = count + 1
break if count >= 1 {
} row.Close()
if count != 0 { break
continue }
} }
defer checkRow.Close()
}
if count == 0 { if count == 0 {
sql := fmt.Sprintf("insert into %s.%s values(%v, \"%s\", \"%s\", \"%s\", %d, %d)", dbname, metatable, fieldTs.UnixMilli(), srcdbname, tbname, f, 2, 2) sql := fmt.Sprintf("insert into %s.%s values(%v, \"%s\", \"%s\", \"%s\", %d, %d)", metaDB, metaTable, ts.UnixMilli(), srcDB, srcTable, srcCol, interval, threshold)
_, err := db.Exec(sql) _, err := db.Exec(sql)
if err != nil { if err != nil {
checkErr(err, sql) checkErr(err, sql)
}
} }
} }
insertTableInfoIntoMetatable(db, dbname, metatable, srcdbname, tbname, f, fieldTs, 2, 2)
fieldTs = fieldTs.Add(time.Millisecond * 2) fieldTs = fieldTs.Add(time.Millisecond * 2)
} }
key := fmt.Sprintf("%s_%s", srcdbname, stbname) key := fmt.Sprintf("%s_%s", srcdbname, stbname)
_, ok := tskinfo.subtask[key] _, ok = tskinfo.subtask[key]
if !ok { if !ok {
tskinfo.subtask[key] = &demo{db: db, dbname: dbname, srcdbname: srcdbname, suptabname: stbname, metaTable: metatable, exceptTable: exptable, wg: tskinfo.wg} tskinfo.subtask[key] = &demo{db: db, dbname: dbname, srcdbname: srcdbname, suptabname: stbname, metaTable: metatable, exceptTable: exptable, wg: tskinfo.wg}
} }
...@@ -287,133 +305,64 @@ func subTaskStart(d *demo) { ...@@ -287,133 +305,64 @@ func subTaskStart(d *demo) {
} }
} }
d.wg.Done() d.wg.Done()
} }
func (d *demo) Init() { func (d *demo) Init() {
d.heap = make(MinHeap, 0, 200) d.heap = make(MinHeap, 0, 200)
heap.Init(&d.heap) heap.Init(&d.heap)
d.metaDict = make(map[string]*schema) d.metaDict = make(map[string]*schema)
fieldTs := time.Now().Add(time.Hour * -1000) tbs := GetSubTableInfo(d.db, d.srcdbname, d.suptabname)
fields := GetStableField(d.db, d.srcdbname, d.suptabname)
sql := "show " + d.srcdbname + ".tables" lastRowDict := func(db *sql.DB, srcDB, stbname string) map[string]time.Time {
tbs := make([]tableInfo, 0, 512) result := make(map[string]time.Time)
rows, _ := d.db.Query(sql) sql := fmt.Sprintf("select last_row(ts) from %s.%s group by tbname", srcDB, stbname)
for rows.Next() { row, err := d.db.Query(sql)
var (
tbname string
createTime string
columns int
stbname string = ""
uid int64
tid int64
vgId int32
)
err := rows.Scan(&tbname, &createTime, &columns, &stbname, &uid, &tid, &vgId)
if err != nil { if err != nil {
checkErr(err, sql) checkErr(err, sql)
} }
if strings.Compare(stbname, d.suptabname) == 0 { for row.Next() {
tbs = append(tbs, tableInfo{tbname: tbname, createTime: createTime, columns: columns, stbname: stbname, uid: uid, tid: tid, vgId: vgId}) var ts time.Time
var tbname string
row.Scan(&ts, &tbname)
result[tbname] = ts
} }
row.Close()
} return result
rows.Close() }(d.db, d.srcdbname, d.suptabname)
for _, e := range tbs { for _, e := range tbs {
tbname := e.tbname tbname := e.tbname
columns := e.columns lastTime, ok := lastRowDict[tbname]
stbname := e.stbname if !ok {
lastTime = time.Now()
fields := make([]string, 0, columns)
// sub sql
{
subsql := "describe " + d.srcdbname + "." + stbname
subRows, err := d.db.Query(subsql)
if err != nil {
checkErr(err, subsql)
}
count := 0
for subRows.Next() {
var field string
var ty string
var len int32
var note string
subRows.Scan(&field, &ty, &len, &note)
// ignore time and tag col
if count != 0 && strings.Compare(note, "TAG") != 0 {
// skip first and skip tag col
fields = append(fields, field)
}
count = count + 1
}
defer subRows.Close()
}
lastTime := time.Now()
{
subsql := fmt.Sprintf("select last_row(ts) from %s.%s group by tbname", d.srcdbname, stbname)
subRows, err := d.db.Query(subsql)
if err != nil {
checkErr(err, subsql)
}
for subRows.Next() {
var tbname string
subRows.Scan(&lastTime, &tbname)
}
subRows.Close()
} }
for i, f := range fields { for i, f := range fields {
col := fmt.Sprintf("%s %s %s", stbname, tbname, f) col := fmt.Sprintf("%s %s", tbname, f)
count := 0 var (
{ ts time.Time
dbname string
var ( tablename string
ts time.Time colname string
dbname string checkinterval int
tablename string threshold int
colname string )
checkinterval int checkSql := fmt.Sprintf("select * from %s.%s where dbname = \"%s\" and tablename = \"%s\" and colname = \"%s\"", d.dbname, d.metaTable, d.srcdbname, tbname, f)
threshold int checkRow, err := d.db.Query(checkSql)
) if err != nil {
checkErr(err, checkSql)
checkSql := fmt.Sprintf("select * from %s.%s where dbname = \"%s\" and tablename = \"%s\" and colname = \"%s\"", d.dbname, d.metaTable, d.srcdbname, tbname, f)
checkRow, err := d.db.Query(checkSql)
if err != nil {
checkErr(err, checkSql)
}
for checkRow.Next() {
_ = checkRow.Scan(&ts, &dbname, &tablename, &colname, &checkinterval, &threshold)
d.metaDict[col] = &schema{idx: i, numOfField: len(fields), timestamp: lastTime, colName: col, interval: int32(checkinterval), threshold: int32(threshold)}
count = count + 1
}
if count != 0 {
continue
}
defer checkRow.Close()
} }
if count == 0 { for checkRow.Next() {
sql := fmt.Sprintf("insert into %s.%s values(%v, \"%s\", \"%s\", \"%s\", %d, %d)", d.dbname, d.metaTable, fieldTs.UnixMilli(), d.dbname, tbname, f, d.dInterval, d.dThreshold) _ = checkRow.Scan(&ts, &dbname, &tablename, &colname, &checkinterval, &threshold)
_, err := d.db.Exec(sql) d.metaDict[col] = &schema{idx: i, numOfField: len(fields), timestamp: lastTime, colName: col, interval: int32(checkinterval), threshold: int32(threshold)}
if err != nil {
checkErr(err, sql)
}
d.metaDict[col] = &schema{idx: i, numOfField: len(fields), timestamp: lastTime, colName: col, interval: d.dInterval, threshold: d.dThreshold}
} }
fieldTs = fieldTs.Add(time.Millisecond * 2) defer checkRow.Close()
} }
} }
now := time.Now() now := time.Now()
for k, v := range d.metaDict { for k, v := range d.metaDict {
durtion := fmt.Sprintf("%ds", v.interval) durtion := fmt.Sprintf("%ds", v.interval)
...@@ -426,12 +375,13 @@ func (d *demo) Init() { ...@@ -426,12 +375,13 @@ func (d *demo) Init() {
} }
type ValueRows struct { type ValueRows struct {
rows []interface{} column []interface{}
ts time.Time ts time.Time
tbname string
} }
func (d *demo) Update(stbname string, tbname string, col string, interval int32, threshold int32) { func (d *demo) Update(stbname, tbname, col string, interval int32, threshold int32) {
key := fmt.Sprintf("%s %s %s", stbname, tbname, col) key := fmt.Sprintf("%s %s", tbname, col)
sql := fmt.Sprintf("select * from %s.%s where dbname = \"%s\" and tablename = \"%s\" and colName = \"%s\"", d.dbname, d.metaTable, d.dbname, tbname, col) sql := fmt.Sprintf("select * from %s.%s where dbname = \"%s\" and tablename = \"%s\" and colName = \"%s\"", d.dbname, d.metaTable, d.dbname, tbname, col)
rows, _ := d.db.Query(sql) rows, _ := d.db.Query(sql)
fmt.Printf("check metatable %s, SQL: %s\n", d.metaTable, sql) fmt.Printf("check metatable %s, SQL: %s\n", d.metaTable, sql)
...@@ -461,7 +411,6 @@ func (d *demo) Update(stbname string, tbname string, col string, interval int32, ...@@ -461,7 +411,6 @@ func (d *demo) Update(stbname string, tbname string, col string, interval int32,
schemadata.interval = interval schemadata.interval = interval
schemadata.threshold = threshold schemadata.threshold = threshold
} }
defer rows.Close() defer rows.Close()
} }
...@@ -479,72 +428,84 @@ func (d *demo) NextTimout() int32 { ...@@ -479,72 +428,84 @@ func (d *demo) NextTimout() int32 {
} }
} }
cacheTs := make(map[string]*ValueRows) lastRowGroup, colIdx := func(db *sql.DB, srcDB, stbname string) (map[string]*ValueRows, map[string]int) {
ts := time.Now() result := make(map[string]*ValueRows)
colIdx := make(map[string]int)
for _, e := range colArray {
//fmt.Println("key : ", e)
elem := d.metaDict[e]
var stbName string
var colName string
var tabName string
fmt.Sscanf(e, "%s %s %s", &stbName, &tabName, &colName)
cacheKey := fmt.Sprintf("%s__%s", d.dbname, stbName)
v, ok := cacheTs[cacheKey] sql := fmt.Sprintf("select last_row(*) from %s.%s group by tbname", srcDB, stbname)
if ok { row, err := db.Query(sql)
ts = v.ts if err != nil {
v, err := v.rows[elem.idx].(driver.Valuer).Value() checkErr(err, sql)
if err != nil || v == nil { }
} tt, err := row.ColumnTypes()
} else { types := make([]reflect.Type, len(tt))
sql := fmt.Sprintf("select last_row(*) from %s.%s group by tbname", d.srcdbname, stbName) for i, tp := range tt {
rows, err := d.db.Query(sql) st := tp.ScanType()
if err != nil { types[i] = st
checkErr(err, sql) }
} columns, _ := row.Columns()
tt, err := rows.ColumnTypes() for row.Next() {
types := make([]reflect.Type, len(tt))
for i, tp := range tt {
st := tp.ScanType()
types[i] = st
}
values := make([]interface{}, len(tt)) values := make([]interface{}, len(tt))
for i := range values { for i := range values {
values[i] = reflect.New(types[i]).Interface() values[i] = reflect.New(types[i]).Interface()
} }
row.Scan(values...)
for rows.Next() { ts, _ := values[0].(driver.Valuer).Value()
rows.Scan(values...) tts, _ := ts.(time.Time)
}
v, err := values[0].(driver.Valuer).Value() tbname, _ := values[len(tt)-1].(driver.Valuer).Value()
if err != nil { ttbname, _ := tbname.(string)
checkErr(err, "invalid timestamp")
}
cvttime, is := v.(time.Time) result[ttbname] = &ValueRows{column: values, ts: tts, tbname: ttbname}
}
if is { row.Close()
cacheTs[cacheKey] = &ValueRows{rows: values, ts: cvttime}
ts = cvttime for i, v := range columns {
} else { colIdx[v] = i
cacheTs[cacheKey] = &ValueRows{rows: values, ts: ts} }
ts = ts return result, colIdx
}(d.db, d.srcdbname, d.suptabname)
for _, e := range colArray {
elem := d.metaDict[e]
var colName string
var tabName string
fmt.Sscanf(e, "%s %s", &tabName, &colName)
ts, update := func(rowGroup map[string]*ValueRows, colIdx map[string]int, tabName, colName string) (time.Time, bool) {
var ts time.Time
update := false
field := fmt.Sprintf("last_row(%s)", colName)
idx, ok1 := colIdx[field]
row, ok2 := rowGroup[tabName]
if ok1 && ok2 {
if row != nil {
v, _ := row.column[idx].(driver.Valuer).Value()
if v != nil {
ts = row.ts
update = true
}
}
} }
return ts, update
}(lastRowGroup, colIdx, tabName, colName)
rows.Close() if !update {
ts = elem.timestamp
} }
exceptTableName := fmt.Sprintf("%s_%s_%s", stbName, tabName, colName) //fmt.Printf("Get time tbname: %s, colname:%s, current %v, lasttime %v\n", tabName, colName, ts.UnixMilli(), elem.timestamp.UnixMilli())
exceptTableName := fmt.Sprintf("%s_%s_%s", d.suptabname, tabName, colName)
var dura time.Duration = ts.Sub(elem.timestamp) var dura time.Duration = ts.Sub(elem.timestamp)
cost := int32(dura.Seconds()) cost := int32(dura.Seconds())
if cost == 0 { if cost == 0 {
elem.timestamp = ts elem.timestamp = ts
sql := fmt.Sprintf("insert into %s.%s using %s.%s tags(\"%s\") values(%v, \"%s\", \"%s\", \"%s\", %v, %d)", d.dbname, exceptTableName, d.dbname, d.exceptTable, exceptTableName, time.Now().UnixMilli(), d.dbname, tabName, colName, ts.UnixMilli(), int(time.Now().Sub(elem.timestamp).Seconds())) sql := fmt.Sprintf("insert into %s.%s using %s.%s tags(\"%s\") values(%v, \"%s\", \"%s\", \"%s\", %v, %d)", d.dbname, exceptTableName, d.dbname, d.exceptTable, exceptTableName, time.Now().UnixMilli(), d.srcdbname, tabName, colName, ts.UnixMilli(), int(time.Now().Sub(elem.timestamp).Seconds()))
fmt.Printf("INSERT SQL: %s\n", sql) fmt.Printf("INSERT SQL: %s\n", sql)
_, err := d.db.Exec(sql) _, err := d.db.Exec(sql)
if err != nil { if err != nil {
...@@ -552,10 +513,10 @@ func (d *demo) NextTimout() int32 { ...@@ -552,10 +513,10 @@ func (d *demo) NextTimout() int32 {
} }
} else { } else {
elem.timestamp = ts elem.timestamp = ts
if cost > elem.threshold { if cost > elem.threshold {
sql := fmt.Sprintf("insert into %s.%s using %s.%s tags(\"%s\") values(%v, \"%s\", \"%s\", \"%s\", %v, %d)", d.dbname, exceptTableName, d.dbname, d.exceptTable, exceptTableName, time.Now().UnixMilli(), d.dbname, tabName, colName, ts.UnixMilli(), int(time.Now().Sub(elem.timestamp).Seconds())) sql := fmt.Sprintf("insert into %s.%s using %s.%s tags(\"%s\") values(%v, \"%s\", \"%s\", \"%s\", %v, %d)", d.dbname, exceptTableName, d.dbname, d.exceptTable, exceptTableName, time.Now().UnixMilli(), d.srcdbname, tabName, colName, ts.UnixMilli(), int(time.Now().Sub(elem.timestamp).Seconds()))
fmt.Printf("INSERT SQL: %s\n", sql) fmt.Printf("INSERT SQL: %s\n", sql)
_, err := d.db.Exec(sql) _, err := d.db.Exec(sql)
if err != nil { if err != nil {
checkErr(err, sql) checkErr(err, sql)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册