diff --git a/examples/go/demo.go b/examples/go/demo.go index f13380585b46af098ba783595f89f72091ad41a8..99a299964c1140ff68d761f0215ca7de911637ab 100644 --- a/examples/go/demo.go +++ b/examples/go/demo.go @@ -1,33 +1,28 @@ package main import ( + "container/heap" "database/sql" "database/sql/driver" "flag" "fmt" + "reflect" + "strconv" + "strings" "sync" "time" - "strings" - "container/heap" - "strconv" - "reflect" - - + _ "github.com/taosdata/driver-go/v2/taosSql" ) - - type heapElem struct { - timeout int64 - colName string + timeout int64 + colName string } -type MinHeap []heapElem - +type MinHeap []heapElem type Column struct { - } func (h MinHeap) Len() int { @@ -37,16 +32,16 @@ func (h MinHeap) Len() int { func (h MinHeap) Less(i, j int) bool { res := h[i].timeout - h[j].timeout if res < 0 { - return true; + return true } else if res > 0 { - return false; + return false } - - cmp := strings.Compare(h[i].colName, h[j].colName) - if (cmp <= 0) { - return true; + + cmp := strings.Compare(h[i].colName, h[j].colName) + if cmp <= 0 { + return true } else { - return false; + return false } } @@ -69,20 +64,20 @@ func (h *MinHeap) Top() heapElem { return (*h)[0] } -func (h *MinHeap) Pop() interface{}{ +func (h *MinHeap) Pop() interface{} { res := (*h)[len(*h)-1] *h = (*h)[:len(*h)-1] return res } type config struct { - hostName string - serverPort int - user string - password string - dbName string - srcdbname string - supTblName string + hostName string + serverPort int + user string + password string + dbName string + srcdbname string + supTblName string } var configPara config @@ -97,7 +92,7 @@ func init() { flag.StringVar(&configPara.dbName, "d", "test1", "Destination database.") flag.StringVar(&configPara.srcdbname, "s", "test", "Destination database.") flag.Parse() - + } func checkErr(err error, prompt string) { @@ -107,15 +102,13 @@ func checkErr(err error, prompt string) { } } - type schema struct { idx int numOfField int - timestamp time.Time - colName string - interval int32 - threshold int32 - + timestamp time.Time + colName string + interval int32 + threshold int32 } type demo struct { @@ -131,8 +124,8 @@ type demo struct { dThreshold int32 metaDict map[string]*schema - heap MinHeap - timer *time.Timer + heap MinHeap + timer *time.Timer } /*** @@ -145,14 +138,15 @@ type demo struct { ***/ type tableInfo struct { - tbname string - createTime string - columns int - stbname string - uid int64 - tid int64 + tbname string + createTime string + columns int + stbname string + uid int64 + tid int64 vgId int32 } + func (d *demo) Init(wg *sync.WaitGroup) { d.heap = make(MinHeap, 0, 200) heap.Init(&d.heap) @@ -170,19 +164,18 @@ func (d *demo) Init(wg *sync.WaitGroup) { checkErr(err, sql) } - { sql := fmt.Sprintf("create table if not exists %s.%s (ts timestamp, dbname binary(64), tablename binary(64), colName binary(128), checkInterval int, threshold int)", d.dbname, d.metaTable) _, err := d.db.Exec(sql) checkErr(err, sql) } - fieldTs := time.Now().Add(time.Hour * -1000) + fieldTs := time.Now().Add(time.Hour * -1000) sql := "show " + d.srcdbname + ".tables" - tbs := make([]tableInfo, 0, 512) + tbs := make([]tableInfo, 0, 512) rows, _ := d.db.Query(sql) - for rows.Next() { + for rows.Next() { var ( tbname string createTime string @@ -193,65 +186,63 @@ func (d *demo) Init(wg *sync.WaitGroup) { vgId int32 ) err := rows.Scan(&tbname, &createTime, &columns, &stbname, &uid, &tid, &vgId) - if err != nil { - checkErr(err, sql) - } - tbs = append(tbs, tableInfo{tbname: tbname, createTime: createTime, columns: columns, stbname: stbname, uid: uid, tid: tid, vgId: vgId}) - } - rows.Close() - - for _, e := range tbs { - tbname := e.tbname - //createTime := e.createTime - columns := e.columns - stbname := e.stbname - //uid := e.uid - //tid := e.tid - //vgId := vgId - - + if err != nil { + checkErr(err, sql) + } + tbs = append(tbs, tableInfo{tbname: tbname, createTime: createTime, columns: columns, stbname: stbname, uid: uid, tid: tid, vgId: vgId}) + } + rows.Close() + + for _, e := range tbs { + tbname := e.tbname + //createTime := e.createTime + columns := e.columns + stbname := e.stbname + //uid := e.uid + //tid := e.tid + //vgId := vgId + // ignore exceptable name - if strings.Compare(strings.ToLower(tbname), strings.ToLower(d.metaTable)) == 0 || strings.Compare(tbname, d.exceptTable) == 0 || strings.HasPrefix(strings.ToLower(stbname), strings.ToLower(d.exceptTable)) == true{ - continue - } + if strings.Compare(strings.ToLower(tbname), strings.ToLower(d.metaTable)) == 0 || strings.Compare(tbname, d.exceptTable) == 0 || strings.HasPrefix(strings.ToLower(stbname), strings.ToLower(d.exceptTable)) == true { + continue + } - // ignore normal table + // ignore normal table if len(stbname) == 0 || strings.Compare(stbname, "") == 0 { continue } - fields := make([]string, 0, columns) // sub sql { subsql := "describe " + d.srcdbname + "." + stbname subRows, err := d.db.Query(subsql) - if err != nil { + if err != nil { checkErr(err, subsql) } - count := 0; + count := 0 for subRows.Next() { var field string - var ty string - var len int32 - var note string + var ty string + var len int32 + var note string subRows.Scan(&field, &ty, &len, ¬e) - if count != 0 && strings.Compare(note, "TAG") != 0{ - // skip first and skip tag col - fields = append(fields, field) + if count != 0 && strings.Compare(note, "TAG") != 0 { + // skip first and skip tag col + fields = append(fields, field) } count = count + 1 - + } defer subRows.Close() } - lastTime := time.Now() + lastTime := time.Now() { - subsql := fmt.Sprintf("select last_row(ts) from %s.%s group by tbname", d.srcdbname, stbname) + subsql := fmt.Sprintf("select last_row(ts) from %s.%s group by tbname", d.srcdbname, stbname) subRows, err := d.db.Query(subsql) if err != nil { checkErr(err, subsql) @@ -259,38 +250,35 @@ func (d *demo) Init(wg *sync.WaitGroup) { for subRows.Next() { var tbname string subRows.Scan(&lastTime, &tbname) - } + } subRows.Close() - } - - + } - for i, f := range fields { col := fmt.Sprintf("%s %s %s", stbname, tbname, f) - count := 0; + count := 0 { - var ( - ts time.Time - dbname string - tablename string - colname string - checkinterval int - threshold int - ) - - checkSql := fmt.Sprintf("select * from %s.%s where dbname = \"%s\" and tablename = \"%s\" and colname = \"%s\"", d.dbname, d.metaTable, d.dbname, tbname, f) - checkRow, err := d.db.Query(checkSql) - if err != nil { - checkErr(err, checkSql) - } - - for checkRow.Next() { - _ = checkRow.Scan(&ts, &dbname, &tablename, &colname, &checkinterval, &threshold) - d.metaDict[col] = &schema{idx: i, numOfField: len(fields), timestamp: lastTime, colName : col, interval: int32(checkinterval), threshold : int32(threshold)} - - count = count + 1; + var ( + ts time.Time + dbname string + tablename string + colname string + checkinterval int + threshold int + ) + + checkSql := fmt.Sprintf("select * from %s.%s where dbname = \"%s\" and tablename = \"%s\" and colname = \"%s\"", d.dbname, d.metaTable, d.dbname, tbname, f) + checkRow, err := d.db.Query(checkSql) + if err != nil { + checkErr(err, checkSql) + } + + for checkRow.Next() { + _ = checkRow.Scan(&ts, &dbname, &tablename, &colname, &checkinterval, &threshold) + d.metaDict[col] = &schema{idx: i, numOfField: len(fields), timestamp: lastTime, colName: col, interval: int32(checkinterval), threshold: int32(threshold)} + + count = count + 1 } if count != 0 { continue @@ -299,79 +287,77 @@ func (d *demo) Init(wg *sync.WaitGroup) { } if count == 0 { - sql := fmt.Sprintf("insert into %s.%s values(%v, \"%s\", \"%s\", \"%s\", %d, %d)", d.dbname, d.metaTable, fieldTs.UnixMilli(), d.dbname, tbname, f, d.dInterval, d.dThreshold) + sql := fmt.Sprintf("insert into %s.%s values(%v, \"%s\", \"%s\", \"%s\", %d, %d)", d.dbname, d.metaTable, fieldTs.UnixMilli(), d.dbname, tbname, f, d.dInterval, d.dThreshold) _, err := d.db.Exec(sql) if err != nil { checkErr(err, sql) } - d.metaDict[col] = &schema{idx: i, numOfField: len(fields), timestamp: lastTime, colName : col, interval: d.dInterval, threshold : d.dThreshold} + d.metaDict[col] = &schema{idx: i, numOfField: len(fields), timestamp: lastTime, colName: col, interval: d.dInterval, threshold: d.dThreshold} - } + } fieldTs = fieldTs.Add(time.Millisecond * 2) - } } - now := time.Now(); + now := time.Now() for k, v := range d.metaDict { - durtion := fmt.Sprintf("%ds", v.interval) - s, _ := time.ParseDuration(durtion) - now.Add(s) - heap.Push(&d.heap, heapElem{timeout: now.Unix(), colName: k}) + durtion := fmt.Sprintf("%ds", v.interval) + s, _ := time.ParseDuration(durtion) + now.Add(s) + heap.Push(&d.heap, heapElem{timeout: now.Unix(), colName: k}) } - - d.timer = time.NewTimer(time.Second * 1) + d.timer = time.NewTimer(time.Second * 1) wg.Done() } type ValueRows struct { rows []interface{} - ts time.Time -} + ts time.Time +} func (d *demo) Update(stbname string, tbname string, col string, interval int32, threshold int32) { - key := fmt.Sprintf("%s %s %s", stbname, tbname, col) - sql := fmt.Sprintf("select * from %s.%s where dbname = \"%s\" and tablename = \"%s\" and colName = \"%s\"", d.dbname, d.metaTable, d.dbname, tbname, col) + key := fmt.Sprintf("%s %s %s", stbname, tbname, col) + sql := fmt.Sprintf("select * from %s.%s where dbname = \"%s\" and tablename = \"%s\" and colName = \"%s\"", d.dbname, d.metaTable, d.dbname, tbname, col) rows, _ := d.db.Query(sql) fmt.Printf("check metatable %s, SQL: %s\n", d.metaTable, sql) for rows.Next() { var ( - ts time.Time - dbname string - tbname string - col string + ts time.Time + dbname string + tbname string + col string inter int32 thresh int32 ) - err := rows.Scan(&ts, &dbname, &tbname, &col, &inter, &thresh) - if interval != inter || threshold != thresh { - sql := fmt.Sprintf("insert into %s.%s values(%v, \"%s\", \"%s\", \"%s\", %d, %d)", d.dbname, d.metaTable, ts.UnixMilli(), d.dbname, tbname, col, interval, threshold) - _, err = d.db.Exec(sql) - if err != nil { - checkErr(err, sql) + err := rows.Scan(&ts, &dbname, &tbname, &col, &inter, &thresh) + if interval != inter || threshold != thresh { + sql := fmt.Sprintf("insert into %s.%s values(%v, \"%s\", \"%s\", \"%s\", %d, %d)", d.dbname, d.metaTable, ts.UnixMilli(), d.dbname, tbname, col, interval, threshold) + _, err = d.db.Exec(sql) + if err != nil { + checkErr(err, sql) + } } - } - - } - schemadata := d.metaDict[key] + } + + schemadata := d.metaDict[key] if schemadata != nil { schemadata.interval = interval schemadata.threshold = threshold } - defer rows.Close() + defer rows.Close() } func (d *demo) NextTimout() int32 { - now := time.Now().Unix() + now := time.Now().Unix() colArray := make([]string, 0, 10) for !d.heap.Empty() { - elem := d.heap.Top() + elem := d.heap.Top() if elem.timeout <= now { colArray = append(colArray, elem.colName) heap.Pop(&d.heap) @@ -380,49 +366,47 @@ func (d *demo) NextTimout() int32 { } } - - - cacheTs := make(map[string] *ValueRows) - ts := time.Now() + cacheTs := make(map[string]*ValueRows) + ts := time.Now() for _, e := range colArray { //fmt.Println("key : ", e) elem := d.metaDict[e] var stbName string - var colName string + var colName string var tabName string fmt.Sscanf(e, "%s %s %s", &stbName, &tabName, &colName) - + cacheKey := fmt.Sprintf("%s__%s", d.dbname, stbName) v, ok := cacheTs[cacheKey] if ok { - ts = v.ts + ts = v.ts v, err := v.rows[elem.idx].(driver.Valuer).Value() if err != nil || v == nil { - } + } } else { - sql := fmt.Sprintf("select last_row(*) from %s.%s group by tbname", d.srcdbname, stbName) - rows, err := d.db.Query(sql) + sql := fmt.Sprintf("select last_row(*) from %s.%s group by tbname", d.srcdbname, stbName) + rows, err := d.db.Query(sql) if err != nil { checkErr(err, sql) } - + tt, err := rows.ColumnTypes() - types := make([]reflect.Type, len(tt)) + types := make([]reflect.Type, len(tt)) for i, tp := range tt { st := tp.ScanType() types[i] = st } - values := make([]interface{}, len(tt)) + values := make([]interface{}, len(tt)) for i := range values { values[i] = reflect.New(types[i]).Interface() } - + for rows.Next() { rows.Scan(values...) } - + v, err := values[0].(driver.Valuer).Value() if err != nil { checkErr(err, "invalid timestamp") @@ -430,48 +414,47 @@ func (d *demo) NextTimout() int32 { cvttime, is := v.(time.Time) - if is { - fmt.Println("yes ") - cacheTs[cacheKey] = &ValueRows{rows: values, ts: cvttime} + fmt.Println("yes ") + cacheTs[cacheKey] = &ValueRows{rows: values, ts: cvttime} ts = cvttime } else { - fmt.Println("no") - cacheTs[cacheKey] = &ValueRows{rows: values, ts: ts} + fmt.Println("no") + cacheTs[cacheKey] = &ValueRows{rows: values, ts: ts} ts = ts } fmt.Printf("time %v \n", ts.UnixMilli()) rows.Close() } - - exceptTableName := fmt.Sprintf("%s_%s_%s", stbName, tabName, colName) - - var dura time.Duration = ts.Sub(elem.timestamp); - cost := int32(dura.Seconds()) - if (cost == 0) { - elem.timestamp = ts; - fmt.Printf("A dura %d, threshold %d insert\n", cost, elem.threshold) - sql := fmt.Sprintf("insert into %s.%s using %s.%s tags(\"%s\") values(%v, \"%s\", \"%s\", \"%s\", %v, %d)", d.dbname,exceptTableName, d.dbname, d.exceptTable, exceptTableName, time.Now().UnixMilli(), d.dbname, tabName, colName, ts.UnixMilli(), int(time.Now().Sub(elem.timestamp).Seconds())) - _, err := d.db.Exec(sql) + + exceptTableName := fmt.Sprintf("%s_%s_%s", stbName, tabName, colName) + + var dura time.Duration = ts.Sub(elem.timestamp) + cost := int32(dura.Seconds()) + if cost == 0 { + elem.timestamp = ts + fmt.Printf("A dura %d, threshold %d insert\n", cost, elem.threshold) + sql := fmt.Sprintf("insert into %s.%s using %s.%s tags(\"%s\") values(%v, \"%s\", \"%s\", \"%s\", %v, %d)", d.dbname, exceptTableName, d.dbname, d.exceptTable, exceptTableName, time.Now().UnixMilli(), d.dbname, tabName, colName, ts.UnixMilli(), int(time.Now().Sub(elem.timestamp).Seconds())) + _, err := d.db.Exec(sql) if err != nil { checkErr(err, sql) } - } else { - elem.timestamp = ts; - if (cost > elem.threshold) { - fmt.Printf("B dura %d, threshold %d insert \n", cost, elem.threshold) - sql := fmt.Sprintf("insert into %s.%s using %s.%s tags(\"%s\") values(%v, \"%s\", \"%s\", \"%s\", %v, %d)", d.dbname,exceptTableName, d.dbname, d.exceptTable, exceptTableName, time.Now().UnixMilli(), d.dbname, tabName, colName, ts.UnixMilli(), int(time.Now().Sub(elem.timestamp).Seconds())) - - _, err := d.db.Exec(sql) - if err != nil { - checkErr(err, sql) - } - } else { - fmt.Printf("C dura %d, threshold %d not insert \n", cost, elem.threshold) - } - } - heap.Push(&d.heap, heapElem{timeout : int64(elem.interval) + now, colName : e}) + } else { + elem.timestamp = ts + if cost > elem.threshold { + fmt.Printf("B dura %d, threshold %d insert \n", cost, elem.threshold) + sql := fmt.Sprintf("insert into %s.%s using %s.%s tags(\"%s\") values(%v, \"%s\", \"%s\", \"%s\", %v, %d)", d.dbname, exceptTableName, d.dbname, d.exceptTable, exceptTableName, time.Now().UnixMilli(), d.dbname, tabName, colName, ts.UnixMilli(), int(time.Now().Sub(elem.timestamp).Seconds())) + + _, err := d.db.Exec(sql) + if err != nil { + checkErr(err, sql) + } + } else { + fmt.Printf("C dura %d, threshold %d not insert \n", cost, elem.threshold) + } + } + heap.Push(&d.heap, heapElem{timeout: int64(elem.interval) + now, colName: e}) } if !d.heap.Empty() { @@ -479,12 +462,12 @@ func (d *demo) NextTimout() int32 { timeout := elem.timeout - now if timeout < 1 { timeout = 1 - } - return int32(timeout); + } + return int32(timeout) } - return 1; - -} + return 1 + +} func printAllArgs() { fmt.Printf("\n============= args parse result: =============\n") @@ -493,35 +476,34 @@ func printAllArgs() { fmt.Printf("usr: %v\n", configPara.user) fmt.Printf("password: %v\n", configPara.password) fmt.Printf("dbName: %v\n", configPara.dbName) - fmt.Printf("srcDbName: %v\n", configPara.srcdbname) + fmt.Printf("srcDbName: %v\n", configPara.srcdbname) fmt.Printf("stbNme: %v\n", configPara.supTblName) fmt.Printf("================================================\n") - } +} func main() { - printAllArgs() + printAllArgs() url = "root:taosdata@/tcp(" + configPara.hostName + ":" + strconv.Itoa(configPara.serverPort) + ")/" - + db, err := sql.Open(taosDriverName, url) if err != nil { - checkErr(err, "failed to connect db") + checkErr(err, "failed to connect db") } wg := sync.WaitGroup{} - demo := &demo{db: db, dbname: configPara.dbName, srcdbname: configPara.srcdbname, metaTable: "metaTable", exceptTable: "exceptTable", dStartTs : 0, dInterval : 2, dThreshold : 2} + demo := &demo{db: db, dbname: configPara.dbName, srcdbname: configPara.srcdbname, metaTable: "metaTable", exceptTable: "exceptTable", dStartTs: 0, dInterval: 2, dThreshold: 2} wg.Add(1) go demo.Init(&wg) - wg.Wait() demo.Update("st1", "t1", "f", 10, 20) - + fmt.Println("time start") for { select { case <-demo.timer.C: - timeout := demo.NextTimout() + timeout := demo.NextTimout() fmt.Printf("timeout %d\n", timeout) demo.timer.Reset(time.Second * time.Duration(timeout)) }