提交 d9175054 编写于 作者: dengyihao's avatar dengyihao

add check demo

上级 f994c52a
package main
import (
"database/sql"
"database/sql/driver"
"flag"
"fmt"
"sync"
"time"
"strings"
"container/heap"
"strconv"
"reflect"
_ "github.com/taosdata/driver-go/v2/taosSql"
)
type heapElem struct {
timeout int64
colName string
}
type MinHeap []heapElem
type Column struct {
}
func (h MinHeap) Len() int {
return len(h)
}
func (h MinHeap) Less(i, j int) bool {
res := h[i].timeout - h[j].timeout
if res < 0 {
return true;
} else if res > 0 {
return false;
}
cmp := strings.Compare(h[i].colName, h[j].colName)
if (cmp <= 0) {
return true;
} else {
return false;
}
}
func (h *MinHeap) Swap(i, j int) {
(*h)[i], (*h)[j] = (*h)[j], (*h)[i]
}
func (h *MinHeap) Push(x interface{}) {
*h = append(*h, x.(heapElem))
}
func (h *MinHeap) Empty() bool {
if len(*h) == 0 {
return true
}
return false
}
func (h *MinHeap) Top() heapElem {
return (*h)[0]
}
func (h *MinHeap) Pop() interface{}{
res := (*h)[len(*h)-1]
*h = (*h)[:len(*h)-1]
return res
}
type config struct {
hostName string
serverPort int
user string
password string
dbName string
srcdbname string
supTblName string
}
var configPara config
var taosDriverName = "taosSql"
var url string
func init() {
flag.StringVar(&configPara.hostName, "h", "127.0.0.1", "The host to connect to TDengine server.")
flag.IntVar(&configPara.serverPort, "p", 6030, "The TCP/IP port number to use for the connection to TDengine server.")
flag.StringVar(&configPara.user, "u", "root", "The TDengine user name to use when connecting to the server.")
flag.StringVar(&configPara.password, "P", "taosdata", "The password to use when connecting to the server.")
flag.StringVar(&configPara.dbName, "d", "test1", "Destination database.")
flag.StringVar(&configPara.srcdbname, "s", "test", "Destination database.")
flag.Parse()
}
func checkErr(err error, prompt string) {
if err != nil {
fmt.Printf("%s\n", prompt)
panic(err)
}
}
type schema struct {
idx int
numOfField int
timestamp time.Time
colName string
interval int32
threshold int32
}
type demo struct {
db *sql.DB
dbname string
srcdbname string
metaTable string
exceptTable string
dStartTs int64
dInterval int32
dThreshold int32
metaDict map[string]*schema
heap MinHeap
timer *time.Timer
}
/***
|ts |colName |interval |threshold|
|now |stbx.tx.colx|2 |5 |
|now+1|stbx.tx.colx|2 |5 |
|now+2|stbx.tx.colx|2 |5 |
***/
type tableInfo struct {
tbname string
createTime string
columns int
stbname string
uid int64
tid int64
vgId int32
}
func (d *demo) Init(wg *sync.WaitGroup) {
d.heap = make(MinHeap, 0, 200)
heap.Init(&d.heap)
d.metaDict = make(map[string]*schema)
{
sql := fmt.Sprintf("create database if not exists %s update 2", d.dbname)
_, err := d.db.Exec(sql)
checkErr(err, sql)
}
{
sql := fmt.Sprintf("create stable if not exists %s.%s (ts timestamp, dbname binary(64), tabname binary(64), colname binary(64), lastTime timestamp, offline int) tags(tablename binary(128))", d.dbname, d.exceptTable)
_, err := d.db.Exec(sql)
checkErr(err, sql)
}
{
sql := fmt.Sprintf("create table if not exists %s.%s (ts timestamp, dbname binary(64), tablename binary(64), colName binary(128), checkInterval int, threshold int)", d.dbname, d.metaTable)
_, err := d.db.Exec(sql)
checkErr(err, sql)
}
fieldTs := time.Now().Add(time.Hour * -1000)
sql := "show " + d.srcdbname + ".tables"
tbs := make([]tableInfo, 0, 512)
rows, _ := d.db.Query(sql)
for rows.Next() {
var (
tbname string
createTime string
columns int
stbname string = ""
uid int64
tid int64
vgId int32
)
err := rows.Scan(&tbname, &createTime, &columns, &stbname, &uid, &tid, &vgId)
if err != nil {
checkErr(err, sql)
}
tbs = append(tbs, tableInfo{tbname: tbname, createTime: createTime, columns: columns, stbname: stbname, uid: uid, tid: tid, vgId: vgId})
}
rows.Close()
for _, e := range tbs {
tbname := e.tbname
//createTime := e.createTime
columns := e.columns
stbname := e.stbname
//uid := e.uid
//tid := e.tid
//vgId := vgId
// ignore exceptable name
if strings.Compare(strings.ToLower(tbname), strings.ToLower(d.metaTable)) == 0 || strings.Compare(tbname, d.exceptTable) == 0 || strings.HasPrefix(strings.ToLower(stbname), strings.ToLower(d.exceptTable)) == true{
continue
}
// ignore normal table
if len(stbname) == 0 || strings.Compare(stbname, "") == 0 {
continue
}
fields := make([]string, 0, columns)
// sub sql
{
subsql := "describe " + d.srcdbname + "." + stbname
subRows, err := d.db.Query(subsql)
if err != nil {
checkErr(err, subsql)
}
count := 0;
for subRows.Next() {
var field string
var ty string
var len int32
var note string
subRows.Scan(&field, &ty, &len, &note)
if count != 0 && strings.Compare(note, "TAG") != 0{
// skip first and skip tag col
fields = append(fields, field)
}
count = count + 1
}
defer subRows.Close()
}
lastTime := time.Now()
{
subsql := fmt.Sprintf("select last_row(ts) from %s.%s group by tbname", d.srcdbname, stbname)
subRows, err := d.db.Query(subsql)
if err != nil {
checkErr(err, subsql)
}
for subRows.Next() {
var tbname string
subRows.Scan(&lastTime, &tbname)
}
subRows.Close()
}
for i, f := range fields {
col := fmt.Sprintf("%s %s %s", stbname, tbname, f)
count := 0;
{
var (
ts time.Time
dbname string
tablename string
colname string
checkinterval int
threshold int
)
checkSql := fmt.Sprintf("select * from %s.%s where dbname = \"%s\" and tablename = \"%s\" and colname = \"%s\"", d.dbname, d.metaTable, d.dbname, tbname, f)
checkRow, err := d.db.Query(checkSql)
if err != nil {
checkErr(err, checkSql)
}
for checkRow.Next() {
_ = checkRow.Scan(&ts, &dbname, &tablename, &colname, &checkinterval, &threshold)
d.metaDict[col] = &schema{idx: i, numOfField: len(fields), timestamp: lastTime, colName : col, interval: int32(checkinterval), threshold : int32(threshold)}
count = count + 1;
}
if count != 0 {
continue
}
defer checkRow.Close()
}
if count == 0 {
sql := fmt.Sprintf("insert into %s.%s values(%v, \"%s\", \"%s\", \"%s\", %d, %d)", d.dbname, d.metaTable, fieldTs.UnixMilli(), d.dbname, tbname, f, d.dInterval, d.dThreshold)
_, err := d.db.Exec(sql)
if err != nil {
checkErr(err, sql)
}
d.metaDict[col] = &schema{idx: i, numOfField: len(fields), timestamp: lastTime, colName : col, interval: d.dInterval, threshold : d.dThreshold}
}
fieldTs = fieldTs.Add(time.Millisecond * 2)
}
}
now := time.Now();
for k, v := range d.metaDict {
durtion := fmt.Sprintf("%ds", v.interval)
s, _ := time.ParseDuration(durtion)
now.Add(s)
heap.Push(&d.heap, heapElem{timeout: now.Unix(), colName: k})
}
d.timer = time.NewTimer(time.Second * 1)
wg.Done()
}
type ValueRows struct {
rows []interface{}
ts time.Time
}
func (d *demo) Update(stbname string, tbname string, col string, interval int32, threshold int32) {
key := fmt.Sprintf("%s %s %s", stbname, tbname, col)
sql := fmt.Sprintf("select * from %s.%s where dbname = \"%s\" and tablename = \"%s\" and colName = \"%s\"", d.dbname, d.metaTable, d.dbname, tbname, col)
rows, _ := d.db.Query(sql)
fmt.Printf("check metatable %s, SQL: %s\n", d.metaTable, sql)
for rows.Next() {
var (
ts time.Time
dbname string
tbname string
col string
inter int32
thresh int32
)
err := rows.Scan(&ts, &dbname, &tbname, &col, &inter, &thresh)
if interval != inter || threshold != thresh {
sql := fmt.Sprintf("insert into %s.%s values(%v, \"%s\", \"%s\", \"%s\", %d, %d)", d.dbname, d.metaTable, ts.UnixMilli(), d.dbname, tbname, col, interval, threshold)
_, err = d.db.Exec(sql)
if err != nil {
checkErr(err, sql)
}
}
}
schemadata := d.metaDict[key]
if schemadata != nil {
schemadata.interval = interval
schemadata.threshold = threshold
}
defer rows.Close()
}
func (d *demo) NextTimout() int32 {
now := time.Now().Unix()
colArray := make([]string, 0, 10)
for !d.heap.Empty() {
elem := d.heap.Top()
if elem.timeout <= now {
colArray = append(colArray, elem.colName)
heap.Pop(&d.heap)
} else {
break
}
}
cacheTs := make(map[string] *ValueRows)
ts := time.Now()
for _, e := range colArray {
//fmt.Println("key : ", e)
elem := d.metaDict[e]
var stbName string
var colName string
var tabName string
fmt.Sscanf(e, "%s %s %s", &stbName, &tabName, &colName)
cacheKey := fmt.Sprintf("%s__%s", d.dbname, stbName)
v, ok := cacheTs[cacheKey]
if ok {
ts = v.ts
v, err := v.rows[elem.idx].(driver.Valuer).Value()
if err != nil || v == nil {
}
} else {
sql := fmt.Sprintf("select last_row(*) from %s.%s group by tbname", d.srcdbname, stbName)
rows, err := d.db.Query(sql)
if err != nil {
checkErr(err, sql)
}
tt, err := rows.ColumnTypes()
types := make([]reflect.Type, len(tt))
for i, tp := range tt {
st := tp.ScanType()
types[i] = st
}
values := make([]interface{}, len(tt))
for i := range values {
values[i] = reflect.New(types[i]).Interface()
}
for rows.Next() {
rows.Scan(values...)
}
v, err := values[0].(driver.Valuer).Value()
if err != nil {
checkErr(err, "invalid timestamp")
}
cvttime, is := v.(time.Time)
if is {
fmt.Println("yes ")
cacheTs[cacheKey] = &ValueRows{rows: values, ts: cvttime}
ts = cvttime
} else {
fmt.Println("no")
cacheTs[cacheKey] = &ValueRows{rows: values, ts: ts}
ts = ts
}
fmt.Printf("time %v \n", ts.UnixMilli())
rows.Close()
}
exceptTableName := fmt.Sprintf("%s_%s_%s", stbName, tabName, colName)
var dura time.Duration = ts.Sub(elem.timestamp);
cost := int32(dura.Seconds())
if (cost == 0) {
elem.timestamp = ts;
fmt.Printf("A dura %d, threshold %d insert\n", cost, elem.threshold)
sql := fmt.Sprintf("insert into %s.%s using %s.%s tags(\"%s\") values(%v, \"%s\", \"%s\", \"%s\", %v, %d)", d.dbname,exceptTableName, d.dbname, d.exceptTable, exceptTableName, time.Now().UnixMilli(), d.dbname, tabName, colName, ts.UnixMilli(), int(time.Now().Sub(elem.timestamp).Seconds()))
_, err := d.db.Exec(sql)
if err != nil {
checkErr(err, sql)
}
} else {
elem.timestamp = ts;
if (cost > elem.threshold) {
fmt.Printf("B dura %d, threshold %d insert \n", cost, elem.threshold)
sql := fmt.Sprintf("insert into %s.%s using %s.%s tags(\"%s\") values(%v, \"%s\", \"%s\", \"%s\", %v, %d)", d.dbname,exceptTableName, d.dbname, d.exceptTable, exceptTableName, time.Now().UnixMilli(), d.dbname, tabName, colName, ts.UnixMilli(), int(time.Now().Sub(elem.timestamp).Seconds()))
_, err := d.db.Exec(sql)
if err != nil {
checkErr(err, sql)
}
} else {
fmt.Printf("C dura %d, threshold %d not insert \n", cost, elem.threshold)
}
}
heap.Push(&d.heap, heapElem{timeout : int64(elem.interval) + now, colName : e})
}
if !d.heap.Empty() {
elem := d.heap.Top()
timeout := elem.timeout - now
if timeout < 1 {
timeout = 1
}
return int32(timeout);
}
return 1;
}
func printAllArgs() {
fmt.Printf("\n============= args parse result: =============\n")
fmt.Printf("hostName: %v\n", configPara.hostName)
fmt.Printf("serverPort: %v\n", configPara.serverPort)
fmt.Printf("usr: %v\n", configPara.user)
fmt.Printf("password: %v\n", configPara.password)
fmt.Printf("dbName: %v\n", configPara.dbName)
fmt.Printf("srcDbName: %v\n", configPara.srcdbname)
fmt.Printf("stbNme: %v\n", configPara.supTblName)
fmt.Printf("================================================\n")
}
func main() {
printAllArgs()
url = "root:taosdata@/tcp(" + configPara.hostName + ":" + strconv.Itoa(configPara.serverPort) + ")/"
db, err := sql.Open(taosDriverName, url)
if err != nil {
checkErr(err, "failed to connect db")
}
wg := sync.WaitGroup{}
demo := &demo{db: db, dbname: configPara.dbName, srcdbname: configPara.srcdbname, metaTable: "metaTable", exceptTable: "exceptTable", dStartTs : 0, dInterval : 2, dThreshold : 2}
wg.Add(1)
go demo.Init(&wg)
wg.Wait()
demo.Update("st1", "t1", "f", 10, 20)
fmt.Println("time start")
for {
select {
case <-demo.timer.C:
timeout := demo.NextTimout()
fmt.Printf("timeout %d\n", timeout)
demo.timer.Reset(time.Second * time.Duration(timeout))
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册