提交 4d619b3b 编写于 作者: dengyihao's avatar dengyihao

fix create table error

上级 c628a6e1
package main package main
import ( import (
"container/heap"
"database/sql" "database/sql"
"database/sql/driver" "database/sql/driver"
"flag" "flag"
"fmt" "fmt"
"reflect"
"strconv"
"strings"
"sync" "sync"
"time" "time"
"strings"
"container/heap"
"strconv"
"reflect"
_ "github.com/taosdata/driver-go/v2/taosSql" _ "github.com/taosdata/driver-go/v2/taosSql"
) )
type heapElem struct { type heapElem struct {
timeout int64 timeout int64
colName string colName string
} }
type MinHeap []heapElem type MinHeap []heapElem
type Column struct { type Column struct {
} }
func (h MinHeap) Len() int { func (h MinHeap) Len() int {
...@@ -37,16 +32,16 @@ func (h MinHeap) Len() int { ...@@ -37,16 +32,16 @@ func (h MinHeap) Len() int {
func (h MinHeap) Less(i, j int) bool { func (h MinHeap) Less(i, j int) bool {
res := h[i].timeout - h[j].timeout res := h[i].timeout - h[j].timeout
if res < 0 { if res < 0 {
return true; return true
} else if res > 0 { } else if res > 0 {
return false; return false
} }
cmp := strings.Compare(h[i].colName, h[j].colName) cmp := strings.Compare(h[i].colName, h[j].colName)
if (cmp <= 0) { if cmp <= 0 {
return true; return true
} else { } else {
return false; return false
} }
} }
...@@ -69,20 +64,20 @@ func (h *MinHeap) Top() heapElem { ...@@ -69,20 +64,20 @@ func (h *MinHeap) Top() heapElem {
return (*h)[0] return (*h)[0]
} }
func (h *MinHeap) Pop() interface{}{ func (h *MinHeap) Pop() interface{} {
res := (*h)[len(*h)-1] res := (*h)[len(*h)-1]
*h = (*h)[:len(*h)-1] *h = (*h)[:len(*h)-1]
return res return res
} }
type config struct { type config struct {
hostName string hostName string
serverPort int serverPort int
user string user string
password string password string
dbName string dbName string
srcdbName string srcdbName string
supTblName string supTblName string
} }
var configPara config var configPara config
...@@ -97,7 +92,7 @@ func init() { ...@@ -97,7 +92,7 @@ func init() {
flag.StringVar(&configPara.dbName, "d", "test1", "check database.") flag.StringVar(&configPara.dbName, "d", "test1", "check database.")
flag.StringVar(&configPara.srcdbName, "s", "test", "Destination database.") flag.StringVar(&configPara.srcdbName, "s", "test", "Destination database.")
flag.Parse() flag.Parse()
} }
func checkErr(err error, prompt string) { func checkErr(err error, prompt string) {
...@@ -107,15 +102,13 @@ func checkErr(err error, prompt string) { ...@@ -107,15 +102,13 @@ func checkErr(err error, prompt string) {
} }
} }
type schema struct { type schema struct {
idx int idx int
numOfField int numOfField int
timestamp time.Time timestamp time.Time
colName string colName string
interval int32 interval int32
threshold int32 threshold int32
} }
type demo struct { type demo struct {
db *sql.DB db *sql.DB
...@@ -129,13 +122,13 @@ type demo struct { ...@@ -129,13 +122,13 @@ type demo struct {
dInterval int32 dInterval int32
dThreshold int32 dThreshold int32
suptabname string suptabname string
metaDict map[string]*schema metaDict map[string]*schema
heap MinHeap heap MinHeap
timer *time.Timer timer *time.Timer
wg *sync.WaitGroup wg *sync.WaitGroup
} }
/*** /***
...@@ -148,11 +141,10 @@ type demo struct { ...@@ -148,11 +141,10 @@ type demo struct {
***/ ***/
type taskInfo struct { type taskInfo struct {
wg *sync.WaitGroup wg *sync.WaitGroup
subtask map[string] *demo subtask map[string]*demo
} }
type tableInfo struct { type tableInfo struct {
tbname string tbname string
createTime string createTime string
...@@ -163,7 +155,6 @@ type tableInfo struct { ...@@ -163,7 +155,6 @@ type tableInfo struct {
vgId int32 vgId int32
} }
func taskInit(db *sql.DB, dbname string, srcdbname string, metatable string, exptable string, tskinfo *taskInfo) { func taskInit(db *sql.DB, dbname string, srcdbname string, metatable string, exptable string, tskinfo *taskInfo) {
{ {
sql := fmt.Sprintf("create database if not exists %s update 2", dbname) sql := fmt.Sprintf("create database if not exists %s update 2", dbname)
...@@ -216,7 +207,7 @@ func taskInit(db *sql.DB, dbname string, srcdbname string, metatable string, exp ...@@ -216,7 +207,7 @@ func taskInit(db *sql.DB, dbname string, srcdbname string, metatable string, exp
fieldTs := time.Now().Add(time.Hour * -1000) fieldTs := time.Now().Add(time.Hour * -1000)
for _, e := range tbs { for _, e := range tbs {
tbname := e.tbname tbname := e.tbname
columns := e.columns columns := e.columns
stbname := e.stbname stbname := e.stbname
...@@ -237,7 +228,7 @@ func taskInit(db *sql.DB, dbname string, srcdbname string, metatable string, exp ...@@ -237,7 +228,7 @@ func taskInit(db *sql.DB, dbname string, srcdbname string, metatable string, exp
var note string var note string
subRows.Scan(&field, &ty, &len, &note) subRows.Scan(&field, &ty, &len, &note)
// ignore time and tag col // ignore time and tag col
if count != 0 && strings.Compare(note, "TAG") != 0 { if count != 0 && strings.Compare(note, "TAG") != 0 {
// skip first and skip tag col // skip first and skip tag col
fields = append(fields, field) fields = append(fields, field)
...@@ -258,7 +249,7 @@ func taskInit(db *sql.DB, dbname string, srcdbname string, metatable string, exp ...@@ -258,7 +249,7 @@ func taskInit(db *sql.DB, dbname string, srcdbname string, metatable string, exp
for checkRow.Next() { for checkRow.Next() {
count = count + 1 count = count + 1
break break
} }
if count != 0 { if count != 0 {
continue continue
...@@ -266,37 +257,37 @@ func taskInit(db *sql.DB, dbname string, srcdbname string, metatable string, exp ...@@ -266,37 +257,37 @@ func taskInit(db *sql.DB, dbname string, srcdbname string, metatable string, exp
defer checkRow.Close() defer checkRow.Close()
} }
if count == 0 { if count == 0 {
sql := fmt.Sprintf("insert into %s.%s values(%v, \"%s\", \"%s\", \"%s\", %d, %d)", dbname, metatable, fieldTs.UnixMilli(), dbname, tbname, f, 2, 2) sql := fmt.Sprintf("insert into %s.%s values(%v, \"%s\", \"%s\", \"%s\", %d, %d)", dbname, metatable, fieldTs.UnixMilli(), srcdbname, tbname, f, 2, 2)
_, err := db.Exec(sql) _, err := db.Exec(sql)
if err != nil { if err != nil {
checkErr(err, sql) checkErr(err, sql)
} }
} }
fieldTs = fieldTs.Add(time.Millisecond * 2) fieldTs = fieldTs.Add(time.Millisecond * 2)
} }
key := fmt.Sprintf("%s_%s", srcdbname, stbname) key := fmt.Sprintf("%s_%s", srcdbname, stbname)
_, ok := tskinfo.subtask[key] _, ok := tskinfo.subtask[key]
if !ok { if !ok {
tskinfo.subtask[key] = &demo{db: db, dbname: dbname, srcdbname: srcdbname, suptabname: stbname, metaTable: metatable, exceptTable: exptable, wg: tskinfo.wg} tskinfo.subtask[key] = &demo{db: db, dbname: dbname, srcdbname: srcdbname, suptabname: stbname, metaTable: metatable, exceptTable: exptable, wg: tskinfo.wg}
} }
} }
} }
func subTaskStart(d *demo) { func subTaskStart(d *demo) {
d.Init() d.Init()
for { for {
select { select {
case <-d.timer.C: case <-d.timer.C:
timeout := d.NextTimout() timeout := d.NextTimout()
fmt.Printf("stbname %s, timeout %d\n", d.suptabname, timeout) fmt.Printf("stbname %s, timeout %d\n", d.suptabname, timeout)
d.timer.Reset(time.Second * time.Duration(timeout)) d.timer.Reset(time.Second * time.Duration(timeout))
} }
} }
d.wg.Done() d.wg.Done()
} }
func (d *demo) Init() { func (d *demo) Init() {
d.heap = make(MinHeap, 0, 200) d.heap = make(MinHeap, 0, 200)
...@@ -322,14 +313,13 @@ func (d *demo) Init() { ...@@ -322,14 +313,13 @@ func (d *demo) Init() {
if err != nil { if err != nil {
checkErr(err, sql) checkErr(err, sql)
} }
if (strings.Compare(stbname, d.suptabname) == 0) { if strings.Compare(stbname, d.suptabname) == 0 {
tbs = append(tbs, tableInfo{tbname: tbname, createTime: createTime, columns: columns, stbname: stbname, uid: uid, tid: tid, vgId: vgId}) tbs = append(tbs, tableInfo{tbname: tbname, createTime: createTime, columns: columns, stbname: stbname, uid: uid, tid: tid, vgId: vgId})
} }
} }
rows.Close() rows.Close()
for _, e := range tbs { for _, e := range tbs {
tbname := e.tbname tbname := e.tbname
columns := e.columns columns := e.columns
...@@ -353,7 +343,7 @@ func (d *demo) Init() { ...@@ -353,7 +343,7 @@ func (d *demo) Init() {
var note string var note string
subRows.Scan(&field, &ty, &len, &note) subRows.Scan(&field, &ty, &len, &note)
// ignore time and tag col // ignore time and tag col
if count != 0 && strings.Compare(note, "TAG") != 0 { if count != 0 && strings.Compare(note, "TAG") != 0 {
// skip first and skip tag col // skip first and skip tag col
fields = append(fields, field) fields = append(fields, field)
...@@ -555,7 +545,7 @@ func (d *demo) NextTimout() int32 { ...@@ -555,7 +545,7 @@ func (d *demo) NextTimout() int32 {
if cost == 0 { if cost == 0 {
elem.timestamp = ts elem.timestamp = ts
sql := fmt.Sprintf("insert into %s.%s using %s.%s tags(\"%s\") values(%v, \"%s\", \"%s\", \"%s\", %v, %d)", d.dbname, exceptTableName, d.dbname, d.exceptTable, exceptTableName, time.Now().UnixMilli(), d.dbname, tabName, colName, ts.UnixMilli(), int(time.Now().Sub(elem.timestamp).Seconds())) sql := fmt.Sprintf("insert into %s.%s using %s.%s tags(\"%s\") values(%v, \"%s\", \"%s\", \"%s\", %v, %d)", d.dbname, exceptTableName, d.dbname, d.exceptTable, exceptTableName, time.Now().UnixMilli(), d.dbname, tabName, colName, ts.UnixMilli(), int(time.Now().Sub(elem.timestamp).Seconds()))
fmt.Printf("INSERT SQL: %s\n", sql) fmt.Printf("INSERT SQL: %s\n", sql)
_, err := d.db.Exec(sql) _, err := d.db.Exec(sql)
if err != nil { if err != nil {
checkErr(err, sql) checkErr(err, sql)
...@@ -564,7 +554,7 @@ func (d *demo) NextTimout() int32 { ...@@ -564,7 +554,7 @@ func (d *demo) NextTimout() int32 {
elem.timestamp = ts elem.timestamp = ts
if cost > elem.threshold { if cost > elem.threshold {
sql := fmt.Sprintf("insert into %s.%s using %s.%s tags(\"%s\") values(%v, \"%s\", \"%s\", \"%s\", %v, %d)", d.dbname, exceptTableName, d.dbname, d.exceptTable, exceptTableName, time.Now().UnixMilli(), d.dbname, tabName, colName, ts.UnixMilli(), int(time.Now().Sub(elem.timestamp).Seconds())) sql := fmt.Sprintf("insert into %s.%s using %s.%s tags(\"%s\") values(%v, \"%s\", \"%s\", \"%s\", %v, %d)", d.dbname, exceptTableName, d.dbname, d.exceptTable, exceptTableName, time.Now().UnixMilli(), d.dbname, tabName, colName, ts.UnixMilli(), int(time.Now().Sub(elem.timestamp).Seconds()))
fmt.Printf("INSERT SQL: %s\n", sql) fmt.Printf("INSERT SQL: %s\n", sql)
_, err := d.db.Exec(sql) _, err := d.db.Exec(sql)
if err != nil { if err != nil {
...@@ -611,12 +601,12 @@ func main() { ...@@ -611,12 +601,12 @@ func main() {
checkErr(err, "failed to connect db") checkErr(err, "failed to connect db")
} }
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
info := &taskInfo{subtask : make(map[string] *demo), wg: &wg} info := &taskInfo{subtask: make(map[string]*demo), wg: &wg}
taskInit(db, configPara.dbName, configPara.srcdbName, "metatable", "exptable", info) taskInit(db, configPara.dbName, configPara.srcdbName, "metatable", "exptable", info)
for _, v := range info.subtask { for _, v := range info.subtask {
wg.Add(1) wg.Add(1)
go subTaskStart(v) go subTaskStart(v)
} }
wg.Wait() wg.Wait()
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册