From c628a6e1427612ab770450b89b9f3c0f2bac9baa Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 18 Nov 2022 09:33:05 +0800 Subject: [PATCH] update demo --- examples/go/demo.go | 276 +++++++++++++++++++++++++++++++------------- 1 file changed, 193 insertions(+), 83 deletions(-) diff --git a/examples/go/demo.go b/examples/go/demo.go index 99a299964c..70c6a49797 100644 --- a/examples/go/demo.go +++ b/examples/go/demo.go @@ -1,28 +1,33 @@ package main import ( - "container/heap" "database/sql" "database/sql/driver" "flag" "fmt" - "reflect" - "strconv" - "strings" "sync" "time" - + "strings" + "container/heap" + "strconv" + "reflect" + + _ "github.com/taosdata/driver-go/v2/taosSql" ) + + type heapElem struct { - timeout int64 - colName string + timeout int64 + colName string } -type MinHeap []heapElem +type MinHeap []heapElem + type Column struct { + } func (h MinHeap) Len() int { @@ -32,16 +37,16 @@ func (h MinHeap) Len() int { func (h MinHeap) Less(i, j int) bool { res := h[i].timeout - h[j].timeout if res < 0 { - return true + return true; } else if res > 0 { - return false + return false; } - - cmp := strings.Compare(h[i].colName, h[j].colName) - if cmp <= 0 { - return true + + cmp := strings.Compare(h[i].colName, h[j].colName) + if (cmp <= 0) { + return true; } else { - return false + return false; } } @@ -64,20 +69,20 @@ func (h *MinHeap) Top() heapElem { return (*h)[0] } -func (h *MinHeap) Pop() interface{} { +func (h *MinHeap) Pop() interface{}{ res := (*h)[len(*h)-1] *h = (*h)[:len(*h)-1] return res } type config struct { - hostName string - serverPort int - user string - password string - dbName string - srcdbname string - supTblName string + hostName string + serverPort int + user string + password string + dbName string + srcdbName string + supTblName string } var configPara config @@ -89,10 +94,10 @@ func init() { flag.IntVar(&configPara.serverPort, "p", 6030, "The TCP/IP port number to use for the connection to TDengine server.") flag.StringVar(&configPara.user, "u", "root", "The TDengine user name to use when connecting to the server.") flag.StringVar(&configPara.password, "P", "taosdata", "The password to use when connecting to the server.") - flag.StringVar(&configPara.dbName, "d", "test1", "Destination database.") - flag.StringVar(&configPara.srcdbname, "s", "test", "Destination database.") + flag.StringVar(&configPara.dbName, "d", "test1", "check database.") + flag.StringVar(&configPara.srcdbName, "s", "test", "Destination database.") flag.Parse() - + } func checkErr(err error, prompt string) { @@ -102,15 +107,16 @@ func checkErr(err error, prompt string) { } } + type schema struct { idx int numOfField int - timestamp time.Time - colName string - interval int32 - threshold int32 + timestamp time.Time + colName string + interval int32 + threshold int32 + } - type demo struct { db *sql.DB @@ -123,9 +129,13 @@ type demo struct { dInterval int32 dThreshold int32 + suptabname string + metaDict map[string]*schema heap MinHeap timer *time.Timer + + wg *sync.WaitGroup } /*** @@ -137,6 +147,12 @@ type demo struct { ***/ +type taskInfo struct { + wg *sync.WaitGroup + subtask map[string] *demo +} + + type tableInfo struct { tbname string createTime string @@ -147,34 +163,28 @@ type tableInfo struct { vgId int32 } -func (d *demo) Init(wg *sync.WaitGroup) { - d.heap = make(MinHeap, 0, 200) - heap.Init(&d.heap) - d.metaDict = make(map[string]*schema) +func taskInit(db *sql.DB, dbname string, srcdbname string, metatable string, exptable string, tskinfo *taskInfo) { { - sql := fmt.Sprintf("create database if not exists %s update 2", d.dbname) - _, err := d.db.Exec(sql) + sql := fmt.Sprintf("create database if not exists %s update 2", dbname) + _, err := db.Exec(sql) checkErr(err, sql) } - { - sql := fmt.Sprintf("create stable if not exists %s.%s (ts timestamp, dbname binary(64), tabname binary(64), colname binary(64), lastTime timestamp, offline int) tags(tablename binary(128))", d.dbname, d.exceptTable) - _, err := d.db.Exec(sql) + sql := fmt.Sprintf("create stable if not exists %s.%s (ts timestamp, dbname binary(64), tabname binary(64), colname binary(64), lastTime timestamp, offline int) tags(tablename binary(128))", dbname, exptable) + _, err := db.Exec(sql) checkErr(err, sql) } { - sql := fmt.Sprintf("create table if not exists %s.%s (ts timestamp, dbname binary(64), tablename binary(64), colName binary(128), checkInterval int, threshold int)", d.dbname, d.metaTable) - _, err := d.db.Exec(sql) + sql := fmt.Sprintf("create table if not exists %s.%s (ts timestamp, dbname binary(64), tablename binary(64), colName binary(128), checkInterval int, threshold int)", dbname, metatable) + _, err := db.Exec(sql) checkErr(err, sql) } - fieldTs := time.Now().Add(time.Hour * -1000) - - sql := "show " + d.srcdbname + ".tables" + sql := "show " + srcdbname + ".tables" tbs := make([]tableInfo, 0, 512) - rows, _ := d.db.Query(sql) + rows, _ := db.Query(sql) for rows.Next() { var ( tbname string @@ -189,28 +199,141 @@ func (d *demo) Init(wg *sync.WaitGroup) { if err != nil { checkErr(err, sql) } + + // ignore exceptable name + if strings.Compare(strings.ToLower(tbname), strings.ToLower(metatable)) == 0 || strings.Compare(tbname, exptable) == 0 || strings.HasPrefix(strings.ToLower(stbname), strings.ToLower(exptable)) == true { + continue + } + + // ignore normal table + if len(stbname) == 0 || strings.Compare(stbname, "") == 0 { + continue + } + tbs = append(tbs, tableInfo{tbname: tbname, createTime: createTime, columns: columns, stbname: stbname, uid: uid, tid: tid, vgId: vgId}) } rows.Close() - for _, e := range tbs { + fieldTs := time.Now().Add(time.Hour * -1000) + + for _, e := range tbs { tbname := e.tbname - //createTime := e.createTime columns := e.columns stbname := e.stbname - //uid := e.uid - //tid := e.tid - //vgId := vgId + fields := make([]string, 0, columns) + { + subsql := "describe " + srcdbname + "." + stbname - // ignore exceptable name - if strings.Compare(strings.ToLower(tbname), strings.ToLower(d.metaTable)) == 0 || strings.Compare(tbname, d.exceptTable) == 0 || strings.HasPrefix(strings.ToLower(stbname), strings.ToLower(d.exceptTable)) == true { - continue + subRows, err := db.Query(subsql) + if err != nil { + checkErr(err, subsql) + } + + count := 0 + for subRows.Next() { + var field string + var ty string + var len int32 + var note string + subRows.Scan(&field, &ty, &len, ¬e) + + // ignore time and tag col + if count != 0 && strings.Compare(note, "TAG") != 0 { + // skip first and skip tag col + fields = append(fields, field) + } + count = count + 1 + + } + defer subRows.Close() } + for _, f := range fields { + count := 0 + { + checkSql := fmt.Sprintf("select * from %s.%s where dbname = \"%s\" and tablename = \"%s\" and colname = \"%s\"", dbname, metatable, dbname, tbname, f) + checkRow, err := db.Query(checkSql) + if err != nil { + checkErr(err, checkSql) + } - // ignore normal table - if len(stbname) == 0 || strings.Compare(stbname, "") == 0 { - continue + for checkRow.Next() { + count = count + 1 + break + } + if count != 0 { + continue + } + defer checkRow.Close() + } + + if count == 0 { + sql := fmt.Sprintf("insert into %s.%s values(%v, \"%s\", \"%s\", \"%s\", %d, %d)", dbname, metatable, fieldTs.UnixMilli(), dbname, tbname, f, 2, 2) + _, err := db.Exec(sql) + if err != nil { + checkErr(err, sql) + } + } + fieldTs = fieldTs.Add(time.Millisecond * 2) + } + + key := fmt.Sprintf("%s_%s", srcdbname, stbname) + _, ok := tskinfo.subtask[key] + if !ok { + tskinfo.subtask[key] = &demo{db: db, dbname: dbname, srcdbname: srcdbname, suptabname: stbname, metaTable: metatable, exceptTable: exptable, wg: tskinfo.wg} + } + } +} + +func subTaskStart(d *demo) { + + d.Init() + for { + select { + case <-d.timer.C: + timeout := d.NextTimout() + fmt.Printf("stbname %s, timeout %d\n", d.suptabname, timeout) + d.timer.Reset(time.Second * time.Duration(timeout)) + } + } + d.wg.Done() + +} +func (d *demo) Init() { + d.heap = make(MinHeap, 0, 200) + heap.Init(&d.heap) + d.metaDict = make(map[string]*schema) + + fieldTs := time.Now().Add(time.Hour * -1000) + + sql := "show " + d.srcdbname + ".tables" + tbs := make([]tableInfo, 0, 512) + rows, _ := d.db.Query(sql) + for rows.Next() { + var ( + tbname string + createTime string + columns int + stbname string = "" + uid int64 + tid int64 + vgId int32 + ) + err := rows.Scan(&tbname, &createTime, &columns, &stbname, &uid, &tid, &vgId) + if err != nil { + checkErr(err, sql) } + if (strings.Compare(stbname, d.suptabname) == 0) { + tbs = append(tbs, tableInfo{tbname: tbname, createTime: createTime, columns: columns, stbname: stbname, uid: uid, tid: tid, vgId: vgId}) + } + + } + rows.Close() + + + for _, e := range tbs { + tbname := e.tbname + columns := e.columns + stbname := e.stbname fields := make([]string, 0, columns) // sub sql @@ -230,6 +353,7 @@ func (d *demo) Init(wg *sync.WaitGroup) { var note string subRows.Scan(&field, &ty, &len, ¬e) + // ignore time and tag col if count != 0 && strings.Compare(note, "TAG") != 0 { // skip first and skip tag col fields = append(fields, field) @@ -309,7 +433,6 @@ func (d *demo) Init(wg *sync.WaitGroup) { } d.timer = time.NewTimer(time.Second * 1) - wg.Done() } type ValueRows struct { @@ -415,16 +538,13 @@ func (d *demo) NextTimout() int32 { cvttime, is := v.(time.Time) if is { - fmt.Println("yes ") cacheTs[cacheKey] = &ValueRows{rows: values, ts: cvttime} ts = cvttime } else { - fmt.Println("no") cacheTs[cacheKey] = &ValueRows{rows: values, ts: ts} ts = ts } - fmt.Printf("time %v \n", ts.UnixMilli()) rows.Close() } @@ -434,8 +554,8 @@ func (d *demo) NextTimout() int32 { cost := int32(dura.Seconds()) if cost == 0 { elem.timestamp = ts - fmt.Printf("A dura %d, threshold %d insert\n", cost, elem.threshold) sql := fmt.Sprintf("insert into %s.%s using %s.%s tags(\"%s\") values(%v, \"%s\", \"%s\", \"%s\", %v, %d)", d.dbname, exceptTableName, d.dbname, d.exceptTable, exceptTableName, time.Now().UnixMilli(), d.dbname, tabName, colName, ts.UnixMilli(), int(time.Now().Sub(elem.timestamp).Seconds())) + fmt.Printf("INSERT SQL: %s\n", sql) _, err := d.db.Exec(sql) if err != nil { checkErr(err, sql) @@ -443,15 +563,15 @@ func (d *demo) NextTimout() int32 { } else { elem.timestamp = ts if cost > elem.threshold { - fmt.Printf("B dura %d, threshold %d insert \n", cost, elem.threshold) sql := fmt.Sprintf("insert into %s.%s using %s.%s tags(\"%s\") values(%v, \"%s\", \"%s\", \"%s\", %v, %d)", d.dbname, exceptTableName, d.dbname, d.exceptTable, exceptTableName, time.Now().UnixMilli(), d.dbname, tabName, colName, ts.UnixMilli(), int(time.Now().Sub(elem.timestamp).Seconds())) + fmt.Printf("INSERT SQL: %s\n", sql) _, err := d.db.Exec(sql) if err != nil { checkErr(err, sql) } } else { - fmt.Printf("C dura %d, threshold %d not insert \n", cost, elem.threshold) + //fmt.Printf("C dura %d, threshold %d not insert \n", cost, elem.threshold) } } heap.Push(&d.heap, heapElem{timeout: int64(elem.interval) + now, colName: e}) @@ -476,7 +596,7 @@ func printAllArgs() { fmt.Printf("usr: %v\n", configPara.user) fmt.Printf("password: %v\n", configPara.password) fmt.Printf("dbName: %v\n", configPara.dbName) - fmt.Printf("srcDbName: %v\n", configPara.srcdbname) + fmt.Printf("srcDbName: %v\n", configPara.srcdbName) fmt.Printf("stbNme: %v\n", configPara.supTblName) fmt.Printf("================================================\n") } @@ -491,22 +611,12 @@ func main() { checkErr(err, "failed to connect db") } wg := sync.WaitGroup{} - demo := &demo{db: db, dbname: configPara.dbName, srcdbname: configPara.srcdbname, metaTable: "metaTable", exceptTable: "exceptTable", dStartTs: 0, dInterval: 2, dThreshold: 2} - wg.Add(1) - go demo.Init(&wg) - - wg.Wait() - - demo.Update("st1", "t1", "f", 10, 20) - - fmt.Println("time start") - for { - select { - case <-demo.timer.C: - timeout := demo.NextTimout() - fmt.Printf("timeout %d\n", timeout) - demo.timer.Reset(time.Second * time.Duration(timeout)) - } - } - + info := &taskInfo{subtask : make(map[string] *demo), wg: &wg} + + taskInit(db, configPara.dbName, configPara.srcdbName, "metatable", "exptable", info) + for _, v := range info.subtask { + wg.Add(1) + go subTaskStart(v) + } + wg.Wait() } -- GitLab