提交 2e6403fc 编写于 作者: dengyihao's avatar dengyihao

change example

上级 d9624955
...@@ -223,6 +223,11 @@ func GetStableField(db *sql.DB, dbname, stbname string) []string { ...@@ -223,6 +223,11 @@ func GetStableField(db *sql.DB, dbname, stbname string) []string {
} }
func taskInit(db *sql.DB, dbname string, srcdbname string, metatable string, exptable string, tskinfo *taskInfo) { func taskInit(db *sql.DB, dbname string, srcdbname string, metatable string, exptable string, tskinfo *taskInfo) {
{
sql := fmt.Sprintf("drop database if exists %s", dbname)
_, err := db.Exec(sql)
checkErr(err, sql)
}
{ {
sql := fmt.Sprintf("create database if not exists %s update 2", dbname) sql := fmt.Sprintf("create database if not exists %s update 2", dbname)
_, err := db.Exec(sql) _, err := db.Exec(sql)
...@@ -258,37 +263,20 @@ func taskInit(db *sql.DB, dbname string, srcdbname string, metatable string, exp ...@@ -258,37 +263,20 @@ func taskInit(db *sql.DB, dbname string, srcdbname string, metatable string, exp
for _, f := range field { for _, f := range field {
insertTableInfoIntoMetatable := func(db *sql.DB, metaDB string, metaTable string, srcDB string, srcTable string, srcCol string, ts time.Time, interval, threshold int) { insertTableInfoIntoMetatable := func(db *sql.DB, metaDB string, metaTable string, srcDB string, srcTable string, srcCol string, ts time.Time, interval, threshold int) {
sql := fmt.Sprintf("select * from %s.%s where dbname = \"%s\" and tablename = \"%s\" and colname = \"%s\"", metaDB, metaTable, srcDB, srcTable, srcCol)
row, err := db.Query(sql)
if err != nil {
checkErr(err, sql)
}
count := 0
for row.Next() {
count = count + 1
if count >= 1 {
row.Close()
break
}
}
if count == 0 {
sql := fmt.Sprintf("insert into %s.%s values(%v, \"%s\", \"%s\", \"%s\", %d, %d)", metaDB, metaTable, ts.UnixMilli(), srcDB, srcTable, srcCol, interval, threshold) sql := fmt.Sprintf("insert into %s.%s values(%v, \"%s\", \"%s\", \"%s\", %d, %d)", metaDB, metaTable, ts.UnixMilli(), srcDB, srcTable, srcCol, interval, threshold)
_, err := db.Exec(sql) _, err := db.Exec(sql)
if err != nil { if err != nil {
checkErr(err, sql) checkErr(err, sql)
} }
} }
} insertTableInfoIntoMetatable(db, dbname, metatable, srcdbname, tbname, f, fieldTs, 2, 2*3)
insertTableInfoIntoMetatable(db, dbname, metatable, srcdbname, tbname, f, fieldTs, 2, 2)
fieldTs = fieldTs.Add(time.Millisecond * 2) fieldTs = fieldTs.Add(time.Millisecond * 2)
} }
key := fmt.Sprintf("%s_%s", srcdbname, stbname) key := fmt.Sprintf("%s_%s", srcdbname, stbname)
_, ok = tskinfo.subtask[key] _, ok = tskinfo.subtask[key]
if !ok { if !ok {
tskinfo.subtask[key] = &demo{db: db, dbname: dbname, srcdbname: srcdbname, suptabname: stbname, metaTable: metatable, exceptTable: exptable, wg: tskinfo.wg} tskinfo.subtask[key] = &demo{db: db, dbname: dbname, srcdbname: srcdbname, suptabname: stbname, metaTable: metatable, exceptTable: exptable, wg: tskinfo.wg, dInterval: 2, dThreshold: 2 * 3}
} }
} }
} }
...@@ -341,26 +329,18 @@ func (d *demo) Init() { ...@@ -341,26 +329,18 @@ func (d *demo) Init() {
for i, f := range fields { for i, f := range fields {
col := fmt.Sprintf("%s %s", tbname, f) col := fmt.Sprintf("%s %s", tbname, f)
var ( d.metaDict[col] = &schema{idx: i, numOfField: len(fields), timestamp: lastTime, colName: col, interval: int32(d.dInterval), threshold: d.dThreshold}
ts time.Time {
dbname string expRecordTab := fmt.Sprintf("%s_%s_%s", d.suptabname, tbname, f)
tablename string sql := fmt.Sprintf("create table %s.%s using %s.%s tags(\"%s\")", d.dbname, expRecordTab, d.dbname, d.exceptTable, expRecordTab)
colname string {
checkinterval int fmt.Printf("create TAB: %s\n", sql)
threshold int _, err := d.db.Exec(sql)
) checkErr(err, sql)
checkSql := fmt.Sprintf("select * from %s.%s where dbname = \"%s\" and tablename = \"%s\" and colname = \"%s\"", d.dbname, d.metaTable, d.srcdbname, tbname, f)
checkRow, err := d.db.Query(checkSql)
if err != nil {
checkErr(err, checkSql)
} }
for checkRow.Next() {
_ = checkRow.Scan(&ts, &dbname, &tablename, &colname, &checkinterval, &threshold)
d.metaDict[col] = &schema{idx: i, numOfField: len(fields), timestamp: lastTime, colName: col, interval: int32(checkinterval), threshold: int32(threshold)}
} }
defer checkRow.Close()
} }
} }
now := time.Now() now := time.Now()
...@@ -414,6 +394,17 @@ func (d *demo) Update(stbname, tbname, col string, interval int32, threshold int ...@@ -414,6 +394,17 @@ func (d *demo) Update(stbname, tbname, col string, interval int32, threshold int
defer rows.Close() defer rows.Close()
} }
type ExceptSQL struct {
lastTime time.Time
exceptTable string
cost int32
col string
tab string
elem *schema
key string
}
func (d *demo) NextTimout() int32 { func (d *demo) NextTimout() int32 {
now := time.Now().Unix() now := time.Now().Unix()
colArray := make([]string, 0, 10) colArray := make([]string, 0, 10)
...@@ -469,6 +460,7 @@ func (d *demo) NextTimout() int32 { ...@@ -469,6 +460,7 @@ func (d *demo) NextTimout() int32 {
return result, colIdx return result, colIdx
}(d.db, d.srcdbname, d.suptabname) }(d.db, d.srcdbname, d.suptabname)
expSql := make([]*ExceptSQL, 0, len(colArray))
for _, e := range colArray { for _, e := range colArray {
elem := d.metaDict[e] elem := d.metaDict[e]
var colName string var colName string
...@@ -497,35 +489,42 @@ func (d *demo) NextTimout() int32 { ...@@ -497,35 +489,42 @@ func (d *demo) NextTimout() int32 {
if !update { if !update {
ts = elem.timestamp ts = elem.timestamp
} }
//fmt.Printf("Get time tbname: %s, colname:%s, current %v, lasttime %v\n", tabName, colName, ts.UnixMilli(), elem.timestamp.UnixMilli())
exceptTableName := fmt.Sprintf("%s_%s_%s", d.suptabname, tabName, colName) exceptTableName := fmt.Sprintf("%s_%s_%s", d.suptabname, tabName, colName)
var dura time.Duration = ts.Sub(elem.timestamp) var dura time.Duration = ts.Sub(elem.timestamp)
cost := int32(dura.Seconds()) cost := int32(dura.Seconds())
if cost == 0 { if cost == 0 {
elem.timestamp = ts elem.timestamp = ts
sql := fmt.Sprintf("insert into %s.%s using %s.%s tags(\"%s\") values(%v, \"%s\", \"%s\", \"%s\", %v, %d)", d.dbname, exceptTableName, d.dbname, d.exceptTable, exceptTableName, time.Now().UnixMilli(), d.srcdbname, tabName, colName, ts.UnixMilli(), int(time.Now().Sub(elem.timestamp).Seconds()))
fmt.Printf("INSERT SQL: %s\n", sql) expSql = append(expSql, &ExceptSQL{exceptTable: exceptTableName, cost: cost, lastTime: ts, col: colName, tab: tabName, elem: elem, key: e})
_, err := d.db.Exec(sql)
if err != nil {
checkErr(err, sql)
}
} else { } else {
elem.timestamp = ts elem.timestamp = ts
if cost > elem.threshold { if cost > elem.threshold {
sql := fmt.Sprintf("insert into %s.%s using %s.%s tags(\"%s\") values(%v, \"%s\", \"%s\", \"%s\", %v, %d)", d.dbname, exceptTableName, d.dbname, d.exceptTable, exceptTableName, time.Now().UnixMilli(), d.srcdbname, tabName, colName, ts.UnixMilli(), int(time.Now().Sub(elem.timestamp).Seconds())) expSql = append(expSql, &ExceptSQL{exceptTable: exceptTableName, cost: cost, lastTime: ts, col: colName, tab: tabName, elem: elem, key: e})
fmt.Printf("INSERT SQL: %s\n", sql) }
_, err := d.db.Exec(sql) }
if err != nil { heap.Push(&d.heap, heapElem{timeout: int64(elem.interval) + now, colName: e})
checkErr(err, sql)
} }
var info strings.Builder
info.Grow(64 * len(expSql))
for i, v := range expSql {
if i == 0 {
s := fmt.Sprintf("insert into %s.%s values(%v, \"%s\", \"%s\",\"%s\", %v, %d )", d.dbname, v.exceptTable, time.Now().UnixMilli(), d.srcdbname, v.tab, v.col, v.lastTime.UnixMilli(), int(time.Now().Sub(v.elem.timestamp).Seconds()))
info.WriteString(s)
info.WriteString(" ")
} else { } else {
//fmt.Printf("C dura %d, threshold %d not insert \n", cost, elem.threshold) s := fmt.Sprintf("%s.%s values(%v, \"%s\", \"%s\",\"%s\", %v, %d )", d.dbname, v.exceptTable, time.Now().UnixMilli(), d.srcdbname, v.tab, v.col, v.lastTime.UnixMilli(), int(time.Now().Sub(v.elem.timestamp).Seconds()))
info.WriteString(s)
info.WriteString(" ")
} }
} }
heap.Push(&d.heap, heapElem{timeout: int64(elem.interval) + now, colName: e}) if len(expSql) != 0 {
sql := info.String()
fmt.Printf("INSERT SQL: %s\n", sql)
_, err := d.db.Exec(sql)
checkErr(err, sql)
} }
if !d.heap.Empty() { if !d.heap.Empty() {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册