From d9624955158ea47d55c3fda6f44cfc8163ed36a5 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 19 Nov 2022 14:59:11 +0800 Subject: [PATCH] enh demo.go performance --- examples/go/demo.go | 425 ++++++++++++++++++++------------------------ 1 file changed, 193 insertions(+), 232 deletions(-) diff --git a/examples/go/demo.go b/examples/go/demo.go index 688fa7ba53..d29d3d736b 100644 --- a/examples/go/demo.go +++ b/examples/go/demo.go @@ -155,120 +155,138 @@ type tableInfo struct { vgId int32 } -func taskInit(db *sql.DB, dbname string, srcdbname string, metatable string, exptable string, tskinfo *taskInfo) { - { - sql := fmt.Sprintf("create database if not exists %s update 2", dbname) - _, err := db.Exec(sql) - checkErr(err, sql) - } - { - sql := fmt.Sprintf("create stable if not exists %s.%s (ts timestamp, dbname binary(64), tabname binary(64), colname binary(64), lastTime timestamp, offline int) tags(tablename binary(128))", dbname, exptable) - _, err := db.Exec(sql) - checkErr(err, sql) - } +func GetSubTableInfo(db *sql.DB, dbname, stbname string) []tableInfo { + tbs := make([]tableInfo, 0, 512) + sql := "show " + dbname + ".tables" - { - sql := fmt.Sprintf("create table if not exists %s.%s (ts timestamp, dbname binary(64), tablename binary(64), colName binary(128), checkInterval int, threshold int)", dbname, metatable) - _, err := db.Exec(sql) + row, err := db.Query(sql) + if err != nil { checkErr(err, sql) } - sql := "show " + srcdbname + ".tables" - tbs := make([]tableInfo, 0, 512) - rows, _ := db.Query(sql) - for rows.Next() { + for row.Next() { var ( tbname string createTime string columns int - stbname string = "" + stb string uid int64 tid int64 vgId int32 ) - err := rows.Scan(&tbname, &createTime, &columns, &stbname, &uid, &tid, &vgId) + err := row.Scan(&tbname, &createTime, &columns, &stb, &uid, &tid, &vgId) if err != nil { checkErr(err, sql) } - // ignore exceptable name - if strings.Compare(strings.ToLower(tbname), strings.ToLower(metatable)) == 0 || strings.Compare(tbname, exptable) == 0 || strings.HasPrefix(strings.ToLower(stbname), strings.ToLower(exptable)) == true { + if len(stbname) == 0 { + // skip normal table + if len(stb) == 0 || strings.Compare(stb, "") == 0 { + continue + } + tbs = append(tbs, tableInfo{tbname: tbname, createTime: createTime, columns: columns, stbname: stb, uid: uid, tid: tid, vgId: vgId}) continue } - // ignore normal table - if len(stbname) == 0 || strings.Compare(stbname, "") == 0 { - continue + if strings.Compare(stb, stbname) == 0 { + tbs = append(tbs, tableInfo{tbname: tbname, createTime: createTime, columns: columns, stbname: stbname, uid: uid, tid: tid, vgId: vgId}) } + } + row.Close() + return tbs - tbs = append(tbs, tableInfo{tbname: tbname, createTime: createTime, columns: columns, stbname: stbname, uid: uid, tid: tid, vgId: vgId}) +} +func GetStableField(db *sql.DB, dbname, stbname string) []string { + result := make([]string, 0, 10) + sql := "describe " + dbname + "." + stbname + row, err := db.Query(sql) + if err != nil { + checkErr(err, sql) + } + count := 0 + for row.Next() { + var field string + var ty string + var tlen int32 + var note string + row.Scan(&field, &ty, &tlen, ¬e) + + // ignore time and tag col + if count != 0 && strings.Compare(note, "TAG") != 0 { + // skip first and skip tag col + result = append(result, field) + } + count = count + 1 } - rows.Close() + row.Close() + return result +} - fieldTs := time.Now().Add(time.Hour * -1000) +func taskInit(db *sql.DB, dbname string, srcdbname string, metatable string, exptable string, tskinfo *taskInfo) { + { + sql := fmt.Sprintf("create database if not exists %s update 2", dbname) + _, err := db.Exec(sql) + checkErr(err, sql) + } + { + sql := fmt.Sprintf("create stable if not exists %s.%s (ts timestamp, dbname binary(64), tabname binary(64), colname binary(64), lastTime timestamp, offline int) tags(tablename binary(128))", dbname, exptable) + _, err := db.Exec(sql) + checkErr(err, sql) + } + + { + sql := fmt.Sprintf("create table if not exists %s.%s (ts timestamp, dbname binary(64), tablename binary(64), colName binary(128), checkInterval int, threshold int)", dbname, metatable) + _, err := db.Exec(sql) + checkErr(err, sql) + } + + tbs := GetSubTableInfo(db, srcdbname, "") + fmt.Printf("tbs size %d\n", len(tbs)) + + fieldDict := make(map[string][]string) + fieldTs := time.Now().Add(time.Hour * -1000) for _, e := range tbs { tbname := e.tbname - columns := e.columns stbname := e.stbname - fields := make([]string, 0, columns) - { - subsql := "describe " + srcdbname + "." + stbname - - subRows, err := db.Query(subsql) - if err != nil { - checkErr(err, subsql) - } - - count := 0 - for subRows.Next() { - var field string - var ty string - var len int32 - var note string - subRows.Scan(&field, &ty, &len, ¬e) - - // ignore time and tag col - if count != 0 && strings.Compare(note, "TAG") != 0 { - // skip first and skip tag col - fields = append(fields, field) - } - count = count + 1 - } - defer subRows.Close() + field, ok := fieldDict[stbname] + if !ok { + field = GetStableField(db, srcdbname, stbname) + fieldDict[stbname] = field } - for _, f := range fields { - count := 0 - { - checkSql := fmt.Sprintf("select * from %s.%s where dbname = \"%s\" and tablename = \"%s\" and colname = \"%s\"", dbname, metatable, srcdbname, tbname, f) - checkRow, err := db.Query(checkSql) + + for _, f := range field { + insertTableInfoIntoMetatable := func(db *sql.DB, metaDB string, metaTable string, srcDB string, srcTable string, srcCol string, ts time.Time, interval, threshold int) { + sql := fmt.Sprintf("select * from %s.%s where dbname = \"%s\" and tablename = \"%s\" and colname = \"%s\"", metaDB, metaTable, srcDB, srcTable, srcCol) + row, err := db.Query(sql) if err != nil { - checkErr(err, checkSql) + checkErr(err, sql) } - for checkRow.Next() { + count := 0 + for row.Next() { count = count + 1 - break - } - if count != 0 { - continue + if count >= 1 { + row.Close() + break + } } - defer checkRow.Close() - } - if count == 0 { - sql := fmt.Sprintf("insert into %s.%s values(%v, \"%s\", \"%s\", \"%s\", %d, %d)", dbname, metatable, fieldTs.UnixMilli(), srcdbname, tbname, f, 2, 2) - _, err := db.Exec(sql) - if err != nil { - checkErr(err, sql) + if count == 0 { + sql := fmt.Sprintf("insert into %s.%s values(%v, \"%s\", \"%s\", \"%s\", %d, %d)", metaDB, metaTable, ts.UnixMilli(), srcDB, srcTable, srcCol, interval, threshold) + _, err := db.Exec(sql) + if err != nil { + checkErr(err, sql) + } } } + insertTableInfoIntoMetatable(db, dbname, metatable, srcdbname, tbname, f, fieldTs, 2, 2) fieldTs = fieldTs.Add(time.Millisecond * 2) } key := fmt.Sprintf("%s_%s", srcdbname, stbname) - _, ok := tskinfo.subtask[key] + _, ok = tskinfo.subtask[key] if !ok { tskinfo.subtask[key] = &demo{db: db, dbname: dbname, srcdbname: srcdbname, suptabname: stbname, metaTable: metatable, exceptTable: exptable, wg: tskinfo.wg} } @@ -287,133 +305,64 @@ func subTaskStart(d *demo) { } } d.wg.Done() - } + func (d *demo) Init() { d.heap = make(MinHeap, 0, 200) heap.Init(&d.heap) d.metaDict = make(map[string]*schema) - fieldTs := time.Now().Add(time.Hour * -1000) + tbs := GetSubTableInfo(d.db, d.srcdbname, d.suptabname) + fields := GetStableField(d.db, d.srcdbname, d.suptabname) - sql := "show " + d.srcdbname + ".tables" - tbs := make([]tableInfo, 0, 512) - rows, _ := d.db.Query(sql) - for rows.Next() { - var ( - tbname string - createTime string - columns int - stbname string = "" - uid int64 - tid int64 - vgId int32 - ) - err := rows.Scan(&tbname, &createTime, &columns, &stbname, &uid, &tid, &vgId) + lastRowDict := func(db *sql.DB, srcDB, stbname string) map[string]time.Time { + result := make(map[string]time.Time) + sql := fmt.Sprintf("select last_row(ts) from %s.%s group by tbname", srcDB, stbname) + row, err := d.db.Query(sql) if err != nil { checkErr(err, sql) } - if strings.Compare(stbname, d.suptabname) == 0 { - tbs = append(tbs, tableInfo{tbname: tbname, createTime: createTime, columns: columns, stbname: stbname, uid: uid, tid: tid, vgId: vgId}) + for row.Next() { + var ts time.Time + var tbname string + row.Scan(&ts, &tbname) + result[tbname] = ts } - - } - rows.Close() + row.Close() + return result + }(d.db, d.srcdbname, d.suptabname) for _, e := range tbs { tbname := e.tbname - columns := e.columns - stbname := e.stbname - - fields := make([]string, 0, columns) - // sub sql - { - subsql := "describe " + d.srcdbname + "." + stbname - - subRows, err := d.db.Query(subsql) - if err != nil { - checkErr(err, subsql) - } - - count := 0 - for subRows.Next() { - var field string - var ty string - var len int32 - var note string - subRows.Scan(&field, &ty, &len, ¬e) - - // ignore time and tag col - if count != 0 && strings.Compare(note, "TAG") != 0 { - // skip first and skip tag col - fields = append(fields, field) - } - count = count + 1 - - } - defer subRows.Close() - } - - lastTime := time.Now() - { - subsql := fmt.Sprintf("select last_row(ts) from %s.%s group by tbname", d.srcdbname, stbname) - subRows, err := d.db.Query(subsql) - if err != nil { - checkErr(err, subsql) - } - for subRows.Next() { - var tbname string - subRows.Scan(&lastTime, &tbname) - } - subRows.Close() + lastTime, ok := lastRowDict[tbname] + if !ok { + lastTime = time.Now() } for i, f := range fields { - col := fmt.Sprintf("%s %s %s", stbname, tbname, f) - count := 0 - { - - var ( - ts time.Time - dbname string - tablename string - colname string - checkinterval int - threshold int - ) - - checkSql := fmt.Sprintf("select * from %s.%s where dbname = \"%s\" and tablename = \"%s\" and colname = \"%s\"", d.dbname, d.metaTable, d.srcdbname, tbname, f) - checkRow, err := d.db.Query(checkSql) - if err != nil { - checkErr(err, checkSql) - } - - for checkRow.Next() { - _ = checkRow.Scan(&ts, &dbname, &tablename, &colname, &checkinterval, &threshold) - d.metaDict[col] = &schema{idx: i, numOfField: len(fields), timestamp: lastTime, colName: col, interval: int32(checkinterval), threshold: int32(threshold)} - - count = count + 1 - } - if count != 0 { - continue - } - defer checkRow.Close() + col := fmt.Sprintf("%s %s", tbname, f) + var ( + ts time.Time + dbname string + tablename string + colname string + checkinterval int + threshold int + ) + checkSql := fmt.Sprintf("select * from %s.%s where dbname = \"%s\" and tablename = \"%s\" and colname = \"%s\"", d.dbname, d.metaTable, d.srcdbname, tbname, f) + checkRow, err := d.db.Query(checkSql) + if err != nil { + checkErr(err, checkSql) } - if count == 0 { - sql := fmt.Sprintf("insert into %s.%s values(%v, \"%s\", \"%s\", \"%s\", %d, %d)", d.dbname, d.metaTable, fieldTs.UnixMilli(), d.dbname, tbname, f, d.dInterval, d.dThreshold) - _, err := d.db.Exec(sql) - if err != nil { - checkErr(err, sql) - } - - d.metaDict[col] = &schema{idx: i, numOfField: len(fields), timestamp: lastTime, colName: col, interval: d.dInterval, threshold: d.dThreshold} - + for checkRow.Next() { + _ = checkRow.Scan(&ts, &dbname, &tablename, &colname, &checkinterval, &threshold) + d.metaDict[col] = &schema{idx: i, numOfField: len(fields), timestamp: lastTime, colName: col, interval: int32(checkinterval), threshold: int32(threshold)} } - fieldTs = fieldTs.Add(time.Millisecond * 2) - + defer checkRow.Close() } } + now := time.Now() for k, v := range d.metaDict { durtion := fmt.Sprintf("%ds", v.interval) @@ -426,12 +375,13 @@ func (d *demo) Init() { } type ValueRows struct { - rows []interface{} - ts time.Time + column []interface{} + ts time.Time + tbname string } -func (d *demo) Update(stbname string, tbname string, col string, interval int32, threshold int32) { - key := fmt.Sprintf("%s %s %s", stbname, tbname, col) +func (d *demo) Update(stbname, tbname, col string, interval int32, threshold int32) { + key := fmt.Sprintf("%s %s", tbname, col) sql := fmt.Sprintf("select * from %s.%s where dbname = \"%s\" and tablename = \"%s\" and colName = \"%s\"", d.dbname, d.metaTable, d.dbname, tbname, col) rows, _ := d.db.Query(sql) fmt.Printf("check metatable %s, SQL: %s\n", d.metaTable, sql) @@ -461,7 +411,6 @@ func (d *demo) Update(stbname string, tbname string, col string, interval int32, schemadata.interval = interval schemadata.threshold = threshold } - defer rows.Close() } @@ -479,72 +428,84 @@ func (d *demo) NextTimout() int32 { } } - cacheTs := make(map[string]*ValueRows) - ts := time.Now() - - for _, e := range colArray { - //fmt.Println("key : ", e) - elem := d.metaDict[e] - var stbName string - var colName string - var tabName string - fmt.Sscanf(e, "%s %s %s", &stbName, &tabName, &colName) - - cacheKey := fmt.Sprintf("%s__%s", d.dbname, stbName) + lastRowGroup, colIdx := func(db *sql.DB, srcDB, stbname string) (map[string]*ValueRows, map[string]int) { + result := make(map[string]*ValueRows) + colIdx := make(map[string]int) - v, ok := cacheTs[cacheKey] - if ok { - ts = v.ts - v, err := v.rows[elem.idx].(driver.Valuer).Value() - if err != nil || v == nil { - } - } else { - sql := fmt.Sprintf("select last_row(*) from %s.%s group by tbname", d.srcdbname, stbName) - rows, err := d.db.Query(sql) - if err != nil { - checkErr(err, sql) - } + sql := fmt.Sprintf("select last_row(*) from %s.%s group by tbname", srcDB, stbname) + row, err := db.Query(sql) + if err != nil { + checkErr(err, sql) + } + tt, err := row.ColumnTypes() + types := make([]reflect.Type, len(tt)) + for i, tp := range tt { + st := tp.ScanType() + types[i] = st + } + columns, _ := row.Columns() - tt, err := rows.ColumnTypes() - types := make([]reflect.Type, len(tt)) - for i, tp := range tt { - st := tp.ScanType() - types[i] = st - } + for row.Next() { values := make([]interface{}, len(tt)) for i := range values { values[i] = reflect.New(types[i]).Interface() } + row.Scan(values...) - for rows.Next() { - rows.Scan(values...) - } + ts, _ := values[0].(driver.Valuer).Value() + tts, _ := ts.(time.Time) - v, err := values[0].(driver.Valuer).Value() - if err != nil { - checkErr(err, "invalid timestamp") - } + tbname, _ := values[len(tt)-1].(driver.Valuer).Value() + ttbname, _ := tbname.(string) - cvttime, is := v.(time.Time) + result[ttbname] = &ValueRows{column: values, ts: tts, tbname: ttbname} + } - if is { - cacheTs[cacheKey] = &ValueRows{rows: values, ts: cvttime} - ts = cvttime - } else { - cacheTs[cacheKey] = &ValueRows{rows: values, ts: ts} - ts = ts + row.Close() + + for i, v := range columns { + colIdx[v] = i + } + return result, colIdx + }(d.db, d.srcdbname, d.suptabname) + + for _, e := range colArray { + elem := d.metaDict[e] + var colName string + var tabName string + fmt.Sscanf(e, "%s %s", &tabName, &colName) + + ts, update := func(rowGroup map[string]*ValueRows, colIdx map[string]int, tabName, colName string) (time.Time, bool) { + var ts time.Time + update := false + + field := fmt.Sprintf("last_row(%s)", colName) + idx, ok1 := colIdx[field] + row, ok2 := rowGroup[tabName] + if ok1 && ok2 { + if row != nil { + v, _ := row.column[idx].(driver.Valuer).Value() + if v != nil { + ts = row.ts + update = true + } + } } + return ts, update + }(lastRowGroup, colIdx, tabName, colName) - rows.Close() + if !update { + ts = elem.timestamp } - exceptTableName := fmt.Sprintf("%s_%s_%s", stbName, tabName, colName) + //fmt.Printf("Get time tbname: %s, colname:%s, current %v, lasttime %v\n", tabName, colName, ts.UnixMilli(), elem.timestamp.UnixMilli()) + exceptTableName := fmt.Sprintf("%s_%s_%s", d.suptabname, tabName, colName) var dura time.Duration = ts.Sub(elem.timestamp) cost := int32(dura.Seconds()) if cost == 0 { elem.timestamp = ts - sql := fmt.Sprintf("insert into %s.%s using %s.%s tags(\"%s\") values(%v, \"%s\", \"%s\", \"%s\", %v, %d)", d.dbname, exceptTableName, d.dbname, d.exceptTable, exceptTableName, time.Now().UnixMilli(), d.dbname, tabName, colName, ts.UnixMilli(), int(time.Now().Sub(elem.timestamp).Seconds())) + sql := fmt.Sprintf("insert into %s.%s using %s.%s tags(\"%s\") values(%v, \"%s\", \"%s\", \"%s\", %v, %d)", d.dbname, exceptTableName, d.dbname, d.exceptTable, exceptTableName, time.Now().UnixMilli(), d.srcdbname, tabName, colName, ts.UnixMilli(), int(time.Now().Sub(elem.timestamp).Seconds())) fmt.Printf("INSERT SQL: %s\n", sql) _, err := d.db.Exec(sql) if err != nil { @@ -552,10 +513,10 @@ func (d *demo) NextTimout() int32 { } } else { elem.timestamp = ts + if cost > elem.threshold { - sql := fmt.Sprintf("insert into %s.%s using %s.%s tags(\"%s\") values(%v, \"%s\", \"%s\", \"%s\", %v, %d)", d.dbname, exceptTableName, d.dbname, d.exceptTable, exceptTableName, time.Now().UnixMilli(), d.dbname, tabName, colName, ts.UnixMilli(), int(time.Now().Sub(elem.timestamp).Seconds())) + sql := fmt.Sprintf("insert into %s.%s using %s.%s tags(\"%s\") values(%v, \"%s\", \"%s\", \"%s\", %v, %d)", d.dbname, exceptTableName, d.dbname, d.exceptTable, exceptTableName, time.Now().UnixMilli(), d.srcdbname, tabName, colName, ts.UnixMilli(), int(time.Now().Sub(elem.timestamp).Seconds())) fmt.Printf("INSERT SQL: %s\n", sql) - _, err := d.db.Exec(sql) if err != nil { checkErr(err, sql) -- GitLab