提交 c628a6e1 编写于 作者: dengyihao's avatar dengyihao

update demo

上级 0814a6a4
package main package main
import ( import (
"container/heap"
"database/sql" "database/sql"
"database/sql/driver" "database/sql/driver"
"flag" "flag"
"fmt" "fmt"
"reflect"
"strconv"
"strings"
"sync" "sync"
"time" "time"
"strings"
"container/heap"
"strconv"
"reflect"
_ "github.com/taosdata/driver-go/v2/taosSql" _ "github.com/taosdata/driver-go/v2/taosSql"
) )
type heapElem struct { type heapElem struct {
timeout int64 timeout int64
colName string colName string
...@@ -22,7 +25,9 @@ type heapElem struct { ...@@ -22,7 +25,9 @@ type heapElem struct {
type MinHeap []heapElem type MinHeap []heapElem
type Column struct { type Column struct {
} }
func (h MinHeap) Len() int { func (h MinHeap) Len() int {
...@@ -32,16 +37,16 @@ func (h MinHeap) Len() int { ...@@ -32,16 +37,16 @@ func (h MinHeap) Len() int {
func (h MinHeap) Less(i, j int) bool { func (h MinHeap) Less(i, j int) bool {
res := h[i].timeout - h[j].timeout res := h[i].timeout - h[j].timeout
if res < 0 { if res < 0 {
return true return true;
} else if res > 0 { } else if res > 0 {
return false return false;
} }
cmp := strings.Compare(h[i].colName, h[j].colName) cmp := strings.Compare(h[i].colName, h[j].colName)
if cmp <= 0 { if (cmp <= 0) {
return true return true;
} else { } else {
return false return false;
} }
} }
...@@ -64,7 +69,7 @@ func (h *MinHeap) Top() heapElem { ...@@ -64,7 +69,7 @@ func (h *MinHeap) Top() heapElem {
return (*h)[0] return (*h)[0]
} }
func (h *MinHeap) Pop() interface{} { func (h *MinHeap) Pop() interface{}{
res := (*h)[len(*h)-1] res := (*h)[len(*h)-1]
*h = (*h)[:len(*h)-1] *h = (*h)[:len(*h)-1]
return res return res
...@@ -76,7 +81,7 @@ type config struct { ...@@ -76,7 +81,7 @@ type config struct {
user string user string
password string password string
dbName string dbName string
srcdbname string srcdbName string
supTblName string supTblName string
} }
...@@ -89,8 +94,8 @@ func init() { ...@@ -89,8 +94,8 @@ func init() {
flag.IntVar(&configPara.serverPort, "p", 6030, "The TCP/IP port number to use for the connection to TDengine server.") flag.IntVar(&configPara.serverPort, "p", 6030, "The TCP/IP port number to use for the connection to TDengine server.")
flag.StringVar(&configPara.user, "u", "root", "The TDengine user name to use when connecting to the server.") flag.StringVar(&configPara.user, "u", "root", "The TDengine user name to use when connecting to the server.")
flag.StringVar(&configPara.password, "P", "taosdata", "The password to use when connecting to the server.") flag.StringVar(&configPara.password, "P", "taosdata", "The password to use when connecting to the server.")
flag.StringVar(&configPara.dbName, "d", "test1", "Destination database.") flag.StringVar(&configPara.dbName, "d", "test1", "check database.")
flag.StringVar(&configPara.srcdbname, "s", "test", "Destination database.") flag.StringVar(&configPara.srcdbName, "s", "test", "Destination database.")
flag.Parse() flag.Parse()
} }
...@@ -102,6 +107,7 @@ func checkErr(err error, prompt string) { ...@@ -102,6 +107,7 @@ func checkErr(err error, prompt string) {
} }
} }
type schema struct { type schema struct {
idx int idx int
numOfField int numOfField int
...@@ -109,8 +115,8 @@ type schema struct { ...@@ -109,8 +115,8 @@ type schema struct {
colName string colName string
interval int32 interval int32
threshold int32 threshold int32
}
}
type demo struct { type demo struct {
db *sql.DB db *sql.DB
...@@ -123,9 +129,13 @@ type demo struct { ...@@ -123,9 +129,13 @@ type demo struct {
dInterval int32 dInterval int32
dThreshold int32 dThreshold int32
suptabname string
metaDict map[string]*schema metaDict map[string]*schema
heap MinHeap heap MinHeap
timer *time.Timer timer *time.Timer
wg *sync.WaitGroup
} }
/*** /***
...@@ -137,6 +147,12 @@ type demo struct { ...@@ -137,6 +147,12 @@ type demo struct {
***/ ***/
type taskInfo struct {
wg *sync.WaitGroup
subtask map[string] *demo
}
type tableInfo struct { type tableInfo struct {
tbname string tbname string
createTime string createTime string
...@@ -147,34 +163,28 @@ type tableInfo struct { ...@@ -147,34 +163,28 @@ type tableInfo struct {
vgId int32 vgId int32
} }
func (d *demo) Init(wg *sync.WaitGroup) {
d.heap = make(MinHeap, 0, 200)
heap.Init(&d.heap)
d.metaDict = make(map[string]*schema)
func taskInit(db *sql.DB, dbname string, srcdbname string, metatable string, exptable string, tskinfo *taskInfo) {
{ {
sql := fmt.Sprintf("create database if not exists %s update 2", d.dbname) sql := fmt.Sprintf("create database if not exists %s update 2", dbname)
_, err := d.db.Exec(sql) _, err := db.Exec(sql)
checkErr(err, sql) checkErr(err, sql)
} }
{ {
sql := fmt.Sprintf("create stable if not exists %s.%s (ts timestamp, dbname binary(64), tabname binary(64), colname binary(64), lastTime timestamp, offline int) tags(tablename binary(128))", d.dbname, d.exceptTable) sql := fmt.Sprintf("create stable if not exists %s.%s (ts timestamp, dbname binary(64), tabname binary(64), colname binary(64), lastTime timestamp, offline int) tags(tablename binary(128))", dbname, exptable)
_, err := d.db.Exec(sql) _, err := db.Exec(sql)
checkErr(err, sql) checkErr(err, sql)
} }
{ {
sql := fmt.Sprintf("create table if not exists %s.%s (ts timestamp, dbname binary(64), tablename binary(64), colName binary(128), checkInterval int, threshold int)", d.dbname, d.metaTable) sql := fmt.Sprintf("create table if not exists %s.%s (ts timestamp, dbname binary(64), tablename binary(64), colName binary(128), checkInterval int, threshold int)", dbname, metatable)
_, err := d.db.Exec(sql) _, err := db.Exec(sql)
checkErr(err, sql) checkErr(err, sql)
} }
fieldTs := time.Now().Add(time.Hour * -1000) sql := "show " + srcdbname + ".tables"
sql := "show " + d.srcdbname + ".tables"
tbs := make([]tableInfo, 0, 512) tbs := make([]tableInfo, 0, 512)
rows, _ := d.db.Query(sql) rows, _ := db.Query(sql)
for rows.Next() { for rows.Next() {
var ( var (
tbname string tbname string
...@@ -189,28 +199,141 @@ func (d *demo) Init(wg *sync.WaitGroup) { ...@@ -189,28 +199,141 @@ func (d *demo) Init(wg *sync.WaitGroup) {
if err != nil { if err != nil {
checkErr(err, sql) checkErr(err, sql)
} }
// ignore exceptable name
if strings.Compare(strings.ToLower(tbname), strings.ToLower(metatable)) == 0 || strings.Compare(tbname, exptable) == 0 || strings.HasPrefix(strings.ToLower(stbname), strings.ToLower(exptable)) == true {
continue
}
// ignore normal table
if len(stbname) == 0 || strings.Compare(stbname, "") == 0 {
continue
}
tbs = append(tbs, tableInfo{tbname: tbname, createTime: createTime, columns: columns, stbname: stbname, uid: uid, tid: tid, vgId: vgId}) tbs = append(tbs, tableInfo{tbname: tbname, createTime: createTime, columns: columns, stbname: stbname, uid: uid, tid: tid, vgId: vgId})
} }
rows.Close() rows.Close()
fieldTs := time.Now().Add(time.Hour * -1000)
for _, e := range tbs { for _, e := range tbs {
tbname := e.tbname tbname := e.tbname
//createTime := e.createTime
columns := e.columns columns := e.columns
stbname := e.stbname stbname := e.stbname
//uid := e.uid fields := make([]string, 0, columns)
//tid := e.tid {
//vgId := vgId subsql := "describe " + srcdbname + "." + stbname
// ignore exceptable name subRows, err := db.Query(subsql)
if strings.Compare(strings.ToLower(tbname), strings.ToLower(d.metaTable)) == 0 || strings.Compare(tbname, d.exceptTable) == 0 || strings.HasPrefix(strings.ToLower(stbname), strings.ToLower(d.exceptTable)) == true { if err != nil {
continue checkErr(err, subsql)
} }
// ignore normal table count := 0
if len(stbname) == 0 || strings.Compare(stbname, "") == 0 { for subRows.Next() {
var field string
var ty string
var len int32
var note string
subRows.Scan(&field, &ty, &len, &note)
// ignore time and tag col
if count != 0 && strings.Compare(note, "TAG") != 0 {
// skip first and skip tag col
fields = append(fields, field)
}
count = count + 1
}
defer subRows.Close()
}
for _, f := range fields {
count := 0
{
checkSql := fmt.Sprintf("select * from %s.%s where dbname = \"%s\" and tablename = \"%s\" and colname = \"%s\"", dbname, metatable, dbname, tbname, f)
checkRow, err := db.Query(checkSql)
if err != nil {
checkErr(err, checkSql)
}
for checkRow.Next() {
count = count + 1
break
}
if count != 0 {
continue continue
} }
defer checkRow.Close()
}
if count == 0 {
sql := fmt.Sprintf("insert into %s.%s values(%v, \"%s\", \"%s\", \"%s\", %d, %d)", dbname, metatable, fieldTs.UnixMilli(), dbname, tbname, f, 2, 2)
_, err := db.Exec(sql)
if err != nil {
checkErr(err, sql)
}
}
fieldTs = fieldTs.Add(time.Millisecond * 2)
}
key := fmt.Sprintf("%s_%s", srcdbname, stbname)
_, ok := tskinfo.subtask[key]
if !ok {
tskinfo.subtask[key] = &demo{db: db, dbname: dbname, srcdbname: srcdbname, suptabname: stbname, metaTable: metatable, exceptTable: exptable, wg: tskinfo.wg}
}
}
}
func subTaskStart(d *demo) {
d.Init()
for {
select {
case <-d.timer.C:
timeout := d.NextTimout()
fmt.Printf("stbname %s, timeout %d\n", d.suptabname, timeout)
d.timer.Reset(time.Second * time.Duration(timeout))
}
}
d.wg.Done()
}
func (d *demo) Init() {
d.heap = make(MinHeap, 0, 200)
heap.Init(&d.heap)
d.metaDict = make(map[string]*schema)
fieldTs := time.Now().Add(time.Hour * -1000)
sql := "show " + d.srcdbname + ".tables"
tbs := make([]tableInfo, 0, 512)
rows, _ := d.db.Query(sql)
for rows.Next() {
var (
tbname string
createTime string
columns int
stbname string = ""
uid int64
tid int64
vgId int32
)
err := rows.Scan(&tbname, &createTime, &columns, &stbname, &uid, &tid, &vgId)
if err != nil {
checkErr(err, sql)
}
if (strings.Compare(stbname, d.suptabname) == 0) {
tbs = append(tbs, tableInfo{tbname: tbname, createTime: createTime, columns: columns, stbname: stbname, uid: uid, tid: tid, vgId: vgId})
}
}
rows.Close()
for _, e := range tbs {
tbname := e.tbname
columns := e.columns
stbname := e.stbname
fields := make([]string, 0, columns) fields := make([]string, 0, columns)
// sub sql // sub sql
...@@ -230,6 +353,7 @@ func (d *demo) Init(wg *sync.WaitGroup) { ...@@ -230,6 +353,7 @@ func (d *demo) Init(wg *sync.WaitGroup) {
var note string var note string
subRows.Scan(&field, &ty, &len, &note) subRows.Scan(&field, &ty, &len, &note)
// ignore time and tag col
if count != 0 && strings.Compare(note, "TAG") != 0 { if count != 0 && strings.Compare(note, "TAG") != 0 {
// skip first and skip tag col // skip first and skip tag col
fields = append(fields, field) fields = append(fields, field)
...@@ -309,7 +433,6 @@ func (d *demo) Init(wg *sync.WaitGroup) { ...@@ -309,7 +433,6 @@ func (d *demo) Init(wg *sync.WaitGroup) {
} }
d.timer = time.NewTimer(time.Second * 1) d.timer = time.NewTimer(time.Second * 1)
wg.Done()
} }
type ValueRows struct { type ValueRows struct {
...@@ -415,16 +538,13 @@ func (d *demo) NextTimout() int32 { ...@@ -415,16 +538,13 @@ func (d *demo) NextTimout() int32 {
cvttime, is := v.(time.Time) cvttime, is := v.(time.Time)
if is { if is {
fmt.Println("yes ")
cacheTs[cacheKey] = &ValueRows{rows: values, ts: cvttime} cacheTs[cacheKey] = &ValueRows{rows: values, ts: cvttime}
ts = cvttime ts = cvttime
} else { } else {
fmt.Println("no")
cacheTs[cacheKey] = &ValueRows{rows: values, ts: ts} cacheTs[cacheKey] = &ValueRows{rows: values, ts: ts}
ts = ts ts = ts
} }
fmt.Printf("time %v \n", ts.UnixMilli())
rows.Close() rows.Close()
} }
...@@ -434,8 +554,8 @@ func (d *demo) NextTimout() int32 { ...@@ -434,8 +554,8 @@ func (d *demo) NextTimout() int32 {
cost := int32(dura.Seconds()) cost := int32(dura.Seconds())
if cost == 0 { if cost == 0 {
elem.timestamp = ts elem.timestamp = ts
fmt.Printf("A dura %d, threshold %d insert\n", cost, elem.threshold)
sql := fmt.Sprintf("insert into %s.%s using %s.%s tags(\"%s\") values(%v, \"%s\", \"%s\", \"%s\", %v, %d)", d.dbname, exceptTableName, d.dbname, d.exceptTable, exceptTableName, time.Now().UnixMilli(), d.dbname, tabName, colName, ts.UnixMilli(), int(time.Now().Sub(elem.timestamp).Seconds())) sql := fmt.Sprintf("insert into %s.%s using %s.%s tags(\"%s\") values(%v, \"%s\", \"%s\", \"%s\", %v, %d)", d.dbname, exceptTableName, d.dbname, d.exceptTable, exceptTableName, time.Now().UnixMilli(), d.dbname, tabName, colName, ts.UnixMilli(), int(time.Now().Sub(elem.timestamp).Seconds()))
fmt.Printf("INSERT SQL: %s\n", sql)
_, err := d.db.Exec(sql) _, err := d.db.Exec(sql)
if err != nil { if err != nil {
checkErr(err, sql) checkErr(err, sql)
...@@ -443,15 +563,15 @@ func (d *demo) NextTimout() int32 { ...@@ -443,15 +563,15 @@ func (d *demo) NextTimout() int32 {
} else { } else {
elem.timestamp = ts elem.timestamp = ts
if cost > elem.threshold { if cost > elem.threshold {
fmt.Printf("B dura %d, threshold %d insert \n", cost, elem.threshold)
sql := fmt.Sprintf("insert into %s.%s using %s.%s tags(\"%s\") values(%v, \"%s\", \"%s\", \"%s\", %v, %d)", d.dbname, exceptTableName, d.dbname, d.exceptTable, exceptTableName, time.Now().UnixMilli(), d.dbname, tabName, colName, ts.UnixMilli(), int(time.Now().Sub(elem.timestamp).Seconds())) sql := fmt.Sprintf("insert into %s.%s using %s.%s tags(\"%s\") values(%v, \"%s\", \"%s\", \"%s\", %v, %d)", d.dbname, exceptTableName, d.dbname, d.exceptTable, exceptTableName, time.Now().UnixMilli(), d.dbname, tabName, colName, ts.UnixMilli(), int(time.Now().Sub(elem.timestamp).Seconds()))
fmt.Printf("INSERT SQL: %s\n", sql)
_, err := d.db.Exec(sql) _, err := d.db.Exec(sql)
if err != nil { if err != nil {
checkErr(err, sql) checkErr(err, sql)
} }
} else { } else {
fmt.Printf("C dura %d, threshold %d not insert \n", cost, elem.threshold) //fmt.Printf("C dura %d, threshold %d not insert \n", cost, elem.threshold)
} }
} }
heap.Push(&d.heap, heapElem{timeout: int64(elem.interval) + now, colName: e}) heap.Push(&d.heap, heapElem{timeout: int64(elem.interval) + now, colName: e})
...@@ -476,7 +596,7 @@ func printAllArgs() { ...@@ -476,7 +596,7 @@ func printAllArgs() {
fmt.Printf("usr: %v\n", configPara.user) fmt.Printf("usr: %v\n", configPara.user)
fmt.Printf("password: %v\n", configPara.password) fmt.Printf("password: %v\n", configPara.password)
fmt.Printf("dbName: %v\n", configPara.dbName) fmt.Printf("dbName: %v\n", configPara.dbName)
fmt.Printf("srcDbName: %v\n", configPara.srcdbname) fmt.Printf("srcDbName: %v\n", configPara.srcdbName)
fmt.Printf("stbNme: %v\n", configPara.supTblName) fmt.Printf("stbNme: %v\n", configPara.supTblName)
fmt.Printf("================================================\n") fmt.Printf("================================================\n")
} }
...@@ -491,22 +611,12 @@ func main() { ...@@ -491,22 +611,12 @@ func main() {
checkErr(err, "failed to connect db") checkErr(err, "failed to connect db")
} }
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
demo := &demo{db: db, dbname: configPara.dbName, srcdbname: configPara.srcdbname, metaTable: "metaTable", exceptTable: "exceptTable", dStartTs: 0, dInterval: 2, dThreshold: 2} info := &taskInfo{subtask : make(map[string] *demo), wg: &wg}
wg.Add(1)
go demo.Init(&wg)
wg.Wait() taskInit(db, configPara.dbName, configPara.srcdbName, "metatable", "exptable", info)
for _, v := range info.subtask {
demo.Update("st1", "t1", "f", 10, 20) wg.Add(1)
go subTaskStart(v)
fmt.Println("time start")
for {
select {
case <-demo.timer.C:
timeout := demo.NextTimout()
fmt.Printf("timeout %d\n", timeout)
demo.timer.Reset(time.Second * time.Duration(timeout))
}
} }
wg.Wait()
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册