提交 0814a6a4 编写于 作者: dengyihao's avatar dengyihao

add check demo

上级 d9175054
package main package main
import ( import (
"container/heap"
"database/sql" "database/sql"
"database/sql/driver" "database/sql/driver"
"flag" "flag"
"fmt" "fmt"
"reflect"
"strconv"
"strings"
"sync" "sync"
"time" "time"
"strings"
"container/heap"
"strconv"
"reflect"
_ "github.com/taosdata/driver-go/v2/taosSql" _ "github.com/taosdata/driver-go/v2/taosSql"
) )
type heapElem struct { type heapElem struct {
timeout int64 timeout int64
colName string colName string
...@@ -25,9 +22,7 @@ type heapElem struct { ...@@ -25,9 +22,7 @@ type heapElem struct {
type MinHeap []heapElem type MinHeap []heapElem
type Column struct { type Column struct {
} }
func (h MinHeap) Len() int { func (h MinHeap) Len() int {
...@@ -37,16 +32,16 @@ func (h MinHeap) Len() int { ...@@ -37,16 +32,16 @@ func (h MinHeap) Len() int {
func (h MinHeap) Less(i, j int) bool { func (h MinHeap) Less(i, j int) bool {
res := h[i].timeout - h[j].timeout res := h[i].timeout - h[j].timeout
if res < 0 { if res < 0 {
return true; return true
} else if res > 0 { } else if res > 0 {
return false; return false
} }
cmp := strings.Compare(h[i].colName, h[j].colName) cmp := strings.Compare(h[i].colName, h[j].colName)
if (cmp <= 0) { if cmp <= 0 {
return true; return true
} else { } else {
return false; return false
} }
} }
...@@ -69,7 +64,7 @@ func (h *MinHeap) Top() heapElem { ...@@ -69,7 +64,7 @@ func (h *MinHeap) Top() heapElem {
return (*h)[0] return (*h)[0]
} }
func (h *MinHeap) Pop() interface{}{ func (h *MinHeap) Pop() interface{} {
res := (*h)[len(*h)-1] res := (*h)[len(*h)-1]
*h = (*h)[:len(*h)-1] *h = (*h)[:len(*h)-1]
return res return res
...@@ -107,7 +102,6 @@ func checkErr(err error, prompt string) { ...@@ -107,7 +102,6 @@ func checkErr(err error, prompt string) {
} }
} }
type schema struct { type schema struct {
idx int idx int
numOfField int numOfField int
...@@ -115,7 +109,6 @@ type schema struct { ...@@ -115,7 +109,6 @@ type schema struct {
colName string colName string
interval int32 interval int32
threshold int32 threshold int32
} }
type demo struct { type demo struct {
...@@ -153,6 +146,7 @@ type tableInfo struct { ...@@ -153,6 +146,7 @@ type tableInfo struct {
tid int64 tid int64
vgId int32 vgId int32
} }
func (d *demo) Init(wg *sync.WaitGroup) { func (d *demo) Init(wg *sync.WaitGroup) {
d.heap = make(MinHeap, 0, 200) d.heap = make(MinHeap, 0, 200)
heap.Init(&d.heap) heap.Init(&d.heap)
...@@ -170,7 +164,6 @@ func (d *demo) Init(wg *sync.WaitGroup) { ...@@ -170,7 +164,6 @@ func (d *demo) Init(wg *sync.WaitGroup) {
checkErr(err, sql) checkErr(err, sql)
} }
{ {
sql := fmt.Sprintf("create table if not exists %s.%s (ts timestamp, dbname binary(64), tablename binary(64), colName binary(128), checkInterval int, threshold int)", d.dbname, d.metaTable) sql := fmt.Sprintf("create table if not exists %s.%s (ts timestamp, dbname binary(64), tablename binary(64), colName binary(128), checkInterval int, threshold int)", d.dbname, d.metaTable)
_, err := d.db.Exec(sql) _, err := d.db.Exec(sql)
...@@ -209,9 +202,8 @@ func (d *demo) Init(wg *sync.WaitGroup) { ...@@ -209,9 +202,8 @@ func (d *demo) Init(wg *sync.WaitGroup) {
//tid := e.tid //tid := e.tid
//vgId := vgId //vgId := vgId
// ignore exceptable name // ignore exceptable name
if strings.Compare(strings.ToLower(tbname), strings.ToLower(d.metaTable)) == 0 || strings.Compare(tbname, d.exceptTable) == 0 || strings.HasPrefix(strings.ToLower(stbname), strings.ToLower(d.exceptTable)) == true{ if strings.Compare(strings.ToLower(tbname), strings.ToLower(d.metaTable)) == 0 || strings.Compare(tbname, d.exceptTable) == 0 || strings.HasPrefix(strings.ToLower(stbname), strings.ToLower(d.exceptTable)) == true {
continue continue
} }
...@@ -220,7 +212,6 @@ func (d *demo) Init(wg *sync.WaitGroup) { ...@@ -220,7 +212,6 @@ func (d *demo) Init(wg *sync.WaitGroup) {
continue continue
} }
fields := make([]string, 0, columns) fields := make([]string, 0, columns)
// sub sql // sub sql
{ {
...@@ -231,7 +222,7 @@ func (d *demo) Init(wg *sync.WaitGroup) { ...@@ -231,7 +222,7 @@ func (d *demo) Init(wg *sync.WaitGroup) {
checkErr(err, subsql) checkErr(err, subsql)
} }
count := 0; count := 0
for subRows.Next() { for subRows.Next() {
var field string var field string
var ty string var ty string
...@@ -239,7 +230,7 @@ func (d *demo) Init(wg *sync.WaitGroup) { ...@@ -239,7 +230,7 @@ func (d *demo) Init(wg *sync.WaitGroup) {
var note string var note string
subRows.Scan(&field, &ty, &len, &note) subRows.Scan(&field, &ty, &len, &note)
if count != 0 && strings.Compare(note, "TAG") != 0{ if count != 0 && strings.Compare(note, "TAG") != 0 {
// skip first and skip tag col // skip first and skip tag col
fields = append(fields, field) fields = append(fields, field)
} }
...@@ -263,12 +254,9 @@ func (d *demo) Init(wg *sync.WaitGroup) { ...@@ -263,12 +254,9 @@ func (d *demo) Init(wg *sync.WaitGroup) {
subRows.Close() subRows.Close()
} }
for i, f := range fields { for i, f := range fields {
col := fmt.Sprintf("%s %s %s", stbname, tbname, f) col := fmt.Sprintf("%s %s %s", stbname, tbname, f)
count := 0; count := 0
{ {
var ( var (
...@@ -288,9 +276,9 @@ func (d *demo) Init(wg *sync.WaitGroup) { ...@@ -288,9 +276,9 @@ func (d *demo) Init(wg *sync.WaitGroup) {
for checkRow.Next() { for checkRow.Next() {
_ = checkRow.Scan(&ts, &dbname, &tablename, &colname, &checkinterval, &threshold) _ = checkRow.Scan(&ts, &dbname, &tablename, &colname, &checkinterval, &threshold)
d.metaDict[col] = &schema{idx: i, numOfField: len(fields), timestamp: lastTime, colName : col, interval: int32(checkinterval), threshold : int32(threshold)} d.metaDict[col] = &schema{idx: i, numOfField: len(fields), timestamp: lastTime, colName: col, interval: int32(checkinterval), threshold: int32(threshold)}
count = count + 1; count = count + 1
} }
if count != 0 { if count != 0 {
continue continue
...@@ -305,15 +293,14 @@ func (d *demo) Init(wg *sync.WaitGroup) { ...@@ -305,15 +293,14 @@ func (d *demo) Init(wg *sync.WaitGroup) {
checkErr(err, sql) checkErr(err, sql)
} }
d.metaDict[col] = &schema{idx: i, numOfField: len(fields), timestamp: lastTime, colName : col, interval: d.dInterval, threshold : d.dThreshold} d.metaDict[col] = &schema{idx: i, numOfField: len(fields), timestamp: lastTime, colName: col, interval: d.dInterval, threshold: d.dThreshold}
} }
fieldTs = fieldTs.Add(time.Millisecond * 2) fieldTs = fieldTs.Add(time.Millisecond * 2)
} }
} }
now := time.Now(); now := time.Now()
for k, v := range d.metaDict { for k, v := range d.metaDict {
durtion := fmt.Sprintf("%ds", v.interval) durtion := fmt.Sprintf("%ds", v.interval)
s, _ := time.ParseDuration(durtion) s, _ := time.ParseDuration(durtion)
...@@ -321,7 +308,6 @@ func (d *demo) Init(wg *sync.WaitGroup) { ...@@ -321,7 +308,6 @@ func (d *demo) Init(wg *sync.WaitGroup) {
heap.Push(&d.heap, heapElem{timeout: now.Unix(), colName: k}) heap.Push(&d.heap, heapElem{timeout: now.Unix(), colName: k})
} }
d.timer = time.NewTimer(time.Second * 1) d.timer = time.NewTimer(time.Second * 1)
wg.Done() wg.Done()
} }
...@@ -380,9 +366,7 @@ func (d *demo) NextTimout() int32 { ...@@ -380,9 +366,7 @@ func (d *demo) NextTimout() int32 {
} }
} }
cacheTs := make(map[string]*ValueRows)
cacheTs := make(map[string] *ValueRows)
ts := time.Now() ts := time.Now()
for _, e := range colArray { for _, e := range colArray {
...@@ -430,7 +414,6 @@ func (d *demo) NextTimout() int32 { ...@@ -430,7 +414,6 @@ func (d *demo) NextTimout() int32 {
cvttime, is := v.(time.Time) cvttime, is := v.(time.Time)
if is { if is {
fmt.Println("yes ") fmt.Println("yes ")
cacheTs[cacheKey] = &ValueRows{rows: values, ts: cvttime} cacheTs[cacheKey] = &ValueRows{rows: values, ts: cvttime}
...@@ -447,21 +430,21 @@ func (d *demo) NextTimout() int32 { ...@@ -447,21 +430,21 @@ func (d *demo) NextTimout() int32 {
exceptTableName := fmt.Sprintf("%s_%s_%s", stbName, tabName, colName) exceptTableName := fmt.Sprintf("%s_%s_%s", stbName, tabName, colName)
var dura time.Duration = ts.Sub(elem.timestamp); var dura time.Duration = ts.Sub(elem.timestamp)
cost := int32(dura.Seconds()) cost := int32(dura.Seconds())
if (cost == 0) { if cost == 0 {
elem.timestamp = ts; elem.timestamp = ts
fmt.Printf("A dura %d, threshold %d insert\n", cost, elem.threshold) fmt.Printf("A dura %d, threshold %d insert\n", cost, elem.threshold)
sql := fmt.Sprintf("insert into %s.%s using %s.%s tags(\"%s\") values(%v, \"%s\", \"%s\", \"%s\", %v, %d)", d.dbname,exceptTableName, d.dbname, d.exceptTable, exceptTableName, time.Now().UnixMilli(), d.dbname, tabName, colName, ts.UnixMilli(), int(time.Now().Sub(elem.timestamp).Seconds())) sql := fmt.Sprintf("insert into %s.%s using %s.%s tags(\"%s\") values(%v, \"%s\", \"%s\", \"%s\", %v, %d)", d.dbname, exceptTableName, d.dbname, d.exceptTable, exceptTableName, time.Now().UnixMilli(), d.dbname, tabName, colName, ts.UnixMilli(), int(time.Now().Sub(elem.timestamp).Seconds()))
_, err := d.db.Exec(sql) _, err := d.db.Exec(sql)
if err != nil { if err != nil {
checkErr(err, sql) checkErr(err, sql)
} }
} else { } else {
elem.timestamp = ts; elem.timestamp = ts
if (cost > elem.threshold) { if cost > elem.threshold {
fmt.Printf("B dura %d, threshold %d insert \n", cost, elem.threshold) fmt.Printf("B dura %d, threshold %d insert \n", cost, elem.threshold)
sql := fmt.Sprintf("insert into %s.%s using %s.%s tags(\"%s\") values(%v, \"%s\", \"%s\", \"%s\", %v, %d)", d.dbname,exceptTableName, d.dbname, d.exceptTable, exceptTableName, time.Now().UnixMilli(), d.dbname, tabName, colName, ts.UnixMilli(), int(time.Now().Sub(elem.timestamp).Seconds())) sql := fmt.Sprintf("insert into %s.%s using %s.%s tags(\"%s\") values(%v, \"%s\", \"%s\", \"%s\", %v, %d)", d.dbname, exceptTableName, d.dbname, d.exceptTable, exceptTableName, time.Now().UnixMilli(), d.dbname, tabName, colName, ts.UnixMilli(), int(time.Now().Sub(elem.timestamp).Seconds()))
_, err := d.db.Exec(sql) _, err := d.db.Exec(sql)
if err != nil { if err != nil {
...@@ -471,7 +454,7 @@ func (d *demo) NextTimout() int32 { ...@@ -471,7 +454,7 @@ func (d *demo) NextTimout() int32 {
fmt.Printf("C dura %d, threshold %d not insert \n", cost, elem.threshold) fmt.Printf("C dura %d, threshold %d not insert \n", cost, elem.threshold)
} }
} }
heap.Push(&d.heap, heapElem{timeout : int64(elem.interval) + now, colName : e}) heap.Push(&d.heap, heapElem{timeout: int64(elem.interval) + now, colName: e})
} }
if !d.heap.Empty() { if !d.heap.Empty() {
...@@ -480,9 +463,9 @@ func (d *demo) NextTimout() int32 { ...@@ -480,9 +463,9 @@ func (d *demo) NextTimout() int32 {
if timeout < 1 { if timeout < 1 {
timeout = 1 timeout = 1
} }
return int32(timeout); return int32(timeout)
} }
return 1; return 1
} }
...@@ -496,7 +479,7 @@ func printAllArgs() { ...@@ -496,7 +479,7 @@ func printAllArgs() {
fmt.Printf("srcDbName: %v\n", configPara.srcdbname) fmt.Printf("srcDbName: %v\n", configPara.srcdbname)
fmt.Printf("stbNme: %v\n", configPara.supTblName) fmt.Printf("stbNme: %v\n", configPara.supTblName)
fmt.Printf("================================================\n") fmt.Printf("================================================\n")
} }
func main() { func main() {
...@@ -508,11 +491,10 @@ func main() { ...@@ -508,11 +491,10 @@ func main() {
checkErr(err, "failed to connect db") checkErr(err, "failed to connect db")
} }
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
demo := &demo{db: db, dbname: configPara.dbName, srcdbname: configPara.srcdbname, metaTable: "metaTable", exceptTable: "exceptTable", dStartTs : 0, dInterval : 2, dThreshold : 2} demo := &demo{db: db, dbname: configPara.dbName, srcdbname: configPara.srcdbname, metaTable: "metaTable", exceptTable: "exceptTable", dStartTs: 0, dInterval: 2, dThreshold: 2}
wg.Add(1) wg.Add(1)
go demo.Init(&wg) go demo.Init(&wg)
wg.Wait() wg.Wait()
demo.Update("st1", "t1", "f", 10, 20) demo.Update("st1", "t1", "f", 10, 20)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册