未验证 提交 f57c72fe 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #18311 from taosdata/enh/addDemo

enh: add demo
...@@ -7,6 +7,7 @@ import ( ...@@ -7,6 +7,7 @@ import (
"flag" "flag"
"fmt" "fmt"
"reflect" "reflect"
"runtime"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
...@@ -223,6 +224,11 @@ func GetStableField(db *sql.DB, dbname, stbname string) []string { ...@@ -223,6 +224,11 @@ func GetStableField(db *sql.DB, dbname, stbname string) []string {
} }
func taskInit(db *sql.DB, dbname string, srcdbname string, metatable string, exptable string, tskinfo *taskInfo) { func taskInit(db *sql.DB, dbname string, srcdbname string, metatable string, exptable string, tskinfo *taskInfo) {
{
sql := fmt.Sprintf("drop database if exists %s", dbname)
_, err := db.Exec(sql)
checkErr(err, sql)
}
{ {
sql := fmt.Sprintf("create database if not exists %s update 2", dbname) sql := fmt.Sprintf("create database if not exists %s update 2", dbname)
_, err := db.Exec(sql) _, err := db.Exec(sql)
...@@ -258,37 +264,20 @@ func taskInit(db *sql.DB, dbname string, srcdbname string, metatable string, exp ...@@ -258,37 +264,20 @@ func taskInit(db *sql.DB, dbname string, srcdbname string, metatable string, exp
for _, f := range field { for _, f := range field {
insertTableInfoIntoMetatable := func(db *sql.DB, metaDB string, metaTable string, srcDB string, srcTable string, srcCol string, ts time.Time, interval, threshold int) { insertTableInfoIntoMetatable := func(db *sql.DB, metaDB string, metaTable string, srcDB string, srcTable string, srcCol string, ts time.Time, interval, threshold int) {
sql := fmt.Sprintf("select * from %s.%s where dbname = \"%s\" and tablename = \"%s\" and colname = \"%s\"", metaDB, metaTable, srcDB, srcTable, srcCol) sql := fmt.Sprintf("insert into %s.%s values(%v, \"%s\", \"%s\", \"%s\", %d, %d)", metaDB, metaTable, ts.UnixMilli(), srcDB, srcTable, srcCol, interval, threshold)
row, err := db.Query(sql) _, err := db.Exec(sql)
if err != nil { if err != nil {
checkErr(err, sql) checkErr(err, sql)
} }
count := 0
for row.Next() {
count = count + 1
if count >= 1 {
row.Close()
break
}
}
if count == 0 {
sql := fmt.Sprintf("insert into %s.%s values(%v, \"%s\", \"%s\", \"%s\", %d, %d)", metaDB, metaTable, ts.UnixMilli(), srcDB, srcTable, srcCol, interval, threshold)
_, err := db.Exec(sql)
if err != nil {
checkErr(err, sql)
}
}
} }
insertTableInfoIntoMetatable(db, dbname, metatable, srcdbname, tbname, f, fieldTs, 2, 2) insertTableInfoIntoMetatable(db, dbname, metatable, srcdbname, tbname, f, fieldTs, 2, 2*3)
fieldTs = fieldTs.Add(time.Millisecond * 2) fieldTs = fieldTs.Add(time.Millisecond * 2)
} }
key := fmt.Sprintf("%s_%s", srcdbname, stbname) key := fmt.Sprintf("%s_%s", srcdbname, stbname)
_, ok = tskinfo.subtask[key] _, ok = tskinfo.subtask[key]
if !ok { if !ok {
tskinfo.subtask[key] = &demo{db: db, dbname: dbname, srcdbname: srcdbname, suptabname: stbname, metaTable: metatable, exceptTable: exptable, wg: tskinfo.wg} tskinfo.subtask[key] = &demo{db: db, dbname: dbname, srcdbname: srcdbname, suptabname: stbname, metaTable: metatable, exceptTable: exptable, wg: tskinfo.wg, dInterval: 2, dThreshold: 2 * 3}
} }
} }
} }
...@@ -341,26 +330,18 @@ func (d *demo) Init() { ...@@ -341,26 +330,18 @@ func (d *demo) Init() {
for i, f := range fields { for i, f := range fields {
col := fmt.Sprintf("%s %s", tbname, f) col := fmt.Sprintf("%s %s", tbname, f)
var ( d.metaDict[col] = &schema{idx: i, numOfField: len(fields), timestamp: lastTime, colName: col, interval: int32(d.dInterval), threshold: d.dThreshold}
ts time.Time {
dbname string expRecordTab := fmt.Sprintf("%s_%s_%s", d.suptabname, tbname, f)
tablename string sql := fmt.Sprintf("create table %s.%s using %s.%s tags(\"%s\")", d.dbname, expRecordTab, d.dbname, d.exceptTable, expRecordTab)
colname string {
checkinterval int fmt.Printf("create TAB: %s\n", sql)
threshold int _, err := d.db.Exec(sql)
) checkErr(err, sql)
checkSql := fmt.Sprintf("select * from %s.%s where dbname = \"%s\" and tablename = \"%s\" and colname = \"%s\"", d.dbname, d.metaTable, d.srcdbname, tbname, f) }
checkRow, err := d.db.Query(checkSql)
if err != nil {
checkErr(err, checkSql)
}
for checkRow.Next() {
_ = checkRow.Scan(&ts, &dbname, &tablename, &colname, &checkinterval, &threshold)
d.metaDict[col] = &schema{idx: i, numOfField: len(fields), timestamp: lastTime, colName: col, interval: int32(checkinterval), threshold: int32(threshold)}
} }
defer checkRow.Close()
} }
} }
now := time.Now() now := time.Now()
...@@ -414,6 +395,17 @@ func (d *demo) Update(stbname, tbname, col string, interval int32, threshold int ...@@ -414,6 +395,17 @@ func (d *demo) Update(stbname, tbname, col string, interval int32, threshold int
defer rows.Close() defer rows.Close()
} }
type ExceptSQL struct {
lastTime time.Time
exceptTable string
cost int32
col string
tab string
elem *schema
key string
}
func (d *demo) NextTimout() int32 { func (d *demo) NextTimout() int32 {
now := time.Now().Unix() now := time.Now().Unix()
colArray := make([]string, 0, 10) colArray := make([]string, 0, 10)
...@@ -469,6 +461,7 @@ func (d *demo) NextTimout() int32 { ...@@ -469,6 +461,7 @@ func (d *demo) NextTimout() int32 {
return result, colIdx return result, colIdx
}(d.db, d.srcdbname, d.suptabname) }(d.db, d.srcdbname, d.suptabname)
expSql := make([]*ExceptSQL, 0, len(colArray))
for _, e := range colArray { for _, e := range colArray {
elem := d.metaDict[e] elem := d.metaDict[e]
var colName string var colName string
...@@ -497,37 +490,44 @@ func (d *demo) NextTimout() int32 { ...@@ -497,37 +490,44 @@ func (d *demo) NextTimout() int32 {
if !update { if !update {
ts = elem.timestamp ts = elem.timestamp
} }
//fmt.Printf("Get time tbname: %s, colname:%s, current %v, lasttime %v\n", tabName, colName, ts.UnixMilli(), elem.timestamp.UnixMilli())
exceptTableName := fmt.Sprintf("%s_%s_%s", d.suptabname, tabName, colName) exceptTableName := fmt.Sprintf("%s_%s_%s", d.suptabname, tabName, colName)
var dura time.Duration = ts.Sub(elem.timestamp) var dura time.Duration = ts.Sub(elem.timestamp)
cost := int32(dura.Seconds()) cost := int32(dura.Seconds())
if cost == 0 { if cost == 0 {
elem.timestamp = ts elem.timestamp = ts
sql := fmt.Sprintf("insert into %s.%s using %s.%s tags(\"%s\") values(%v, \"%s\", \"%s\", \"%s\", %v, %d)", d.dbname, exceptTableName, d.dbname, d.exceptTable, exceptTableName, time.Now().UnixMilli(), d.srcdbname, tabName, colName, ts.UnixMilli(), int(time.Now().Sub(elem.timestamp).Seconds()))
fmt.Printf("INSERT SQL: %s\n", sql) expSql = append(expSql, &ExceptSQL{exceptTable: exceptTableName, cost: cost, lastTime: ts, col: colName, tab: tabName, elem: elem, key: e})
_, err := d.db.Exec(sql)
if err != nil {
checkErr(err, sql)
}
} else { } else {
elem.timestamp = ts elem.timestamp = ts
if cost > elem.threshold { if cost > elem.threshold {
sql := fmt.Sprintf("insert into %s.%s using %s.%s tags(\"%s\") values(%v, \"%s\", \"%s\", \"%s\", %v, %d)", d.dbname, exceptTableName, d.dbname, d.exceptTable, exceptTableName, time.Now().UnixMilli(), d.srcdbname, tabName, colName, ts.UnixMilli(), int(time.Now().Sub(elem.timestamp).Seconds())) expSql = append(expSql, &ExceptSQL{exceptTable: exceptTableName, cost: cost, lastTime: ts, col: colName, tab: tabName, elem: elem, key: e})
fmt.Printf("INSERT SQL: %s\n", sql)
_, err := d.db.Exec(sql)
if err != nil {
checkErr(err, sql)
}
} else {
//fmt.Printf("C dura %d, threshold %d not insert \n", cost, elem.threshold)
} }
} }
heap.Push(&d.heap, heapElem{timeout: int64(elem.interval) + now, colName: e}) heap.Push(&d.heap, heapElem{timeout: int64(elem.interval) + now, colName: e})
} }
var info strings.Builder
info.Grow(64 * len(expSql))
for i, v := range expSql {
if i == 0 {
s := fmt.Sprintf("insert into %s.%s values(%v, \"%s\", \"%s\",\"%s\", %v, %d )", d.dbname, v.exceptTable, time.Now().UnixMilli(), d.srcdbname, v.tab, v.col, v.lastTime.UnixMilli(), int(time.Now().Sub(v.elem.timestamp).Seconds()))
info.WriteString(s)
info.WriteString(" ")
} else {
s := fmt.Sprintf("%s.%s values(%v, \"%s\", \"%s\",\"%s\", %v, %d )", d.dbname, v.exceptTable, time.Now().UnixMilli(), d.srcdbname, v.tab, v.col, v.lastTime.UnixMilli(), int(time.Now().Sub(v.elem.timestamp).Seconds()))
info.WriteString(s)
info.WriteString(" ")
}
}
if len(expSql) != 0 {
sql := info.String()
fmt.Printf("INSERT SQL: %s\n", sql)
_, err := d.db.Exec(sql)
checkErr(err, sql)
}
if !d.heap.Empty() { if !d.heap.Empty() {
elem := d.heap.Top() elem := d.heap.Top()
timeout := elem.timeout - now timeout := elem.timeout - now
...@@ -555,6 +555,11 @@ func printAllArgs() { ...@@ -555,6 +555,11 @@ func printAllArgs() {
func main() { func main() {
printAllArgs() printAllArgs()
cpuNum := runtime.NumCPU()
fmt.Println("cpu核心数:", cpuNum)
runtime.GOMAXPROCS(cpuNum)
url = "root:taosdata@/tcp(" + configPara.hostName + ":" + strconv.Itoa(configPara.serverPort) + ")/" url = "root:taosdata@/tcp(" + configPara.hostName + ":" + strconv.Itoa(configPara.serverPort) + ")/"
db, err := sql.Open(taosDriverName, url) db, err := sql.Open(taosDriverName, url)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册