demo.go 14.2 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1 2 3
package main

import (
dengyihao's avatar
dengyihao 已提交
4
	"container/heap"
dengyihao's avatar
dengyihao 已提交
5 6 7 8
	"database/sql"
	"database/sql/driver"
	"flag"
	"fmt"
dengyihao's avatar
dengyihao 已提交
9 10 11
	"reflect"
	"strconv"
	"strings"
dengyihao's avatar
dengyihao 已提交
12 13
	"sync"
	"time"
dengyihao's avatar
dengyihao 已提交
14

dengyihao's avatar
dengyihao 已提交
15 16 17 18
	_ "github.com/taosdata/driver-go/v2/taosSql"
)

type heapElem struct {
dengyihao's avatar
dengyihao 已提交
19 20
	timeout int64
	colName string
dengyihao's avatar
dengyihao 已提交
21 22
}

dengyihao's avatar
dengyihao 已提交
23
type MinHeap []heapElem
dengyihao's avatar
dengyihao 已提交
24 25 26 27 28 29 30 31 32 33 34

type Column struct {
}

func (h MinHeap) Len() int {
	return len(h)
}

func (h MinHeap) Less(i, j int) bool {
	res := h[i].timeout - h[j].timeout
	if res < 0 {
dengyihao's avatar
dengyihao 已提交
35
		return true
dengyihao's avatar
dengyihao 已提交
36
	} else if res > 0 {
dengyihao's avatar
dengyihao 已提交
37
		return false
dengyihao's avatar
dengyihao 已提交
38
	}
dengyihao's avatar
dengyihao 已提交
39 40 41 42

	cmp := strings.Compare(h[i].colName, h[j].colName)
	if cmp <= 0 {
		return true
dengyihao's avatar
dengyihao 已提交
43
	} else {
dengyihao's avatar
dengyihao 已提交
44
		return false
dengyihao's avatar
dengyihao 已提交
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
	}
}

func (h *MinHeap) Swap(i, j int) {
	(*h)[i], (*h)[j] = (*h)[j], (*h)[i]
}

func (h *MinHeap) Push(x interface{}) {
	*h = append(*h, x.(heapElem))
}

func (h *MinHeap) Empty() bool {
	if len(*h) == 0 {
		return true
	}
	return false
}

func (h *MinHeap) Top() heapElem {
	return (*h)[0]
}

dengyihao's avatar
dengyihao 已提交
67
func (h *MinHeap) Pop() interface{} {
dengyihao's avatar
dengyihao 已提交
68 69 70 71 72 73
	res := (*h)[len(*h)-1]
	*h = (*h)[:len(*h)-1]
	return res
}

type config struct {
dengyihao's avatar
dengyihao 已提交
74 75 76 77 78 79 80
	hostName   string
	serverPort int
	user       string
	password   string
	dbName     string
	srcdbName  string
	supTblName string
dengyihao's avatar
dengyihao 已提交
81 82 83 84 85 86 87 88 89 90 91
}

var configPara config
var taosDriverName = "taosSql"
var url string

func init() {
	flag.StringVar(&configPara.hostName, "h", "127.0.0.1", "The host to connect to TDengine server.")
	flag.IntVar(&configPara.serverPort, "p", 6030, "The TCP/IP port number to use for the connection to TDengine server.")
	flag.StringVar(&configPara.user, "u", "root", "The TDengine user name to use when connecting to the server.")
	flag.StringVar(&configPara.password, "P", "taosdata", "The password to use when connecting to the server.")
dengyihao's avatar
dengyihao 已提交
92 93
	flag.StringVar(&configPara.dbName, "d", "test1", "check  database.")
	flag.StringVar(&configPara.srcdbName, "s", "test", "Destination database.")
dengyihao's avatar
dengyihao 已提交
94
	flag.Parse()
dengyihao's avatar
dengyihao 已提交
95

dengyihao's avatar
dengyihao 已提交
96 97 98 99 100 101 102 103 104 105 106 107
}

func checkErr(err error, prompt string) {
	if err != nil {
		fmt.Printf("%s\n", prompt)
		panic(err)
	}
}

type schema struct {
	idx        int
	numOfField int
dengyihao's avatar
dengyihao 已提交
108 109 110 111
	timestamp  time.Time
	colName    string
	interval   int32
	threshold  int32
dengyihao's avatar
dengyihao 已提交
112 113 114 115 116 117 118 119 120 121 122 123 124
}
type demo struct {
	db *sql.DB

	dbname      string
	srcdbname   string
	metaTable   string
	exceptTable string

	dStartTs   int64
	dInterval  int32
	dThreshold int32

dengyihao's avatar
dengyihao 已提交
125
	suptabname string
dengyihao's avatar
dengyihao 已提交
126

dengyihao's avatar
dengyihao 已提交
127
	metaDict map[string]*schema
dengyihao's avatar
dengyihao 已提交
128 129
	heap     MinHeap
	timer    *time.Timer
dengyihao's avatar
dengyihao 已提交
130

dengyihao's avatar
dengyihao 已提交
131
	wg *sync.WaitGroup
dengyihao's avatar
dengyihao 已提交
132 133 134 135 136 137 138 139 140 141 142
}

/***

|ts   |colName     |interval |threshold|
|now  |stbx.tx.colx|2        |5        |
|now+1|stbx.tx.colx|2        |5        |
|now+2|stbx.tx.colx|2        |5        |

***/

dengyihao's avatar
dengyihao 已提交
143
type taskInfo struct {
dengyihao's avatar
dengyihao 已提交
144 145
	wg      *sync.WaitGroup
	subtask map[string]*demo
dengyihao's avatar
dengyihao 已提交
146 147
}

dengyihao's avatar
dengyihao 已提交
148
type tableInfo struct {
dengyihao's avatar
dengyihao 已提交
149 150 151 152 153 154
	tbname     string
	createTime string
	columns    int
	stbname    string
	uid        int64
	tid        int64
dengyihao's avatar
dengyihao 已提交
155 156
	vgId       int32
}
dengyihao's avatar
dengyihao 已提交
157

dengyihao's avatar
dengyihao 已提交
158 159 160
func GetSubTableInfo(db *sql.DB, dbname, stbname string) []tableInfo {
	tbs := make([]tableInfo, 0, 512)
	sql := "show " + dbname + ".tables"
dengyihao's avatar
dengyihao 已提交
161

dengyihao's avatar
dengyihao 已提交
162 163
	row, err := db.Query(sql)
	if err != nil {
dengyihao's avatar
dengyihao 已提交
164 165 166
		checkErr(err, sql)
	}

dengyihao's avatar
dengyihao 已提交
167
	for row.Next() {
dengyihao's avatar
dengyihao 已提交
168 169 170 171
		var (
			tbname     string
			createTime string
			columns    int
dengyihao's avatar
dengyihao 已提交
172
			stb        string
dengyihao's avatar
dengyihao 已提交
173 174 175 176
			uid        int64
			tid        int64
			vgId       int32
		)
dengyihao's avatar
dengyihao 已提交
177
		err := row.Scan(&tbname, &createTime, &columns, &stb, &uid, &tid, &vgId)
dengyihao's avatar
dengyihao 已提交
178 179 180
		if err != nil {
			checkErr(err, sql)
		}
dengyihao's avatar
dengyihao 已提交
181

dengyihao's avatar
dengyihao 已提交
182 183 184 185 186 187
		if len(stbname) == 0 {
			// skip normal table
			if len(stb) == 0 || strings.Compare(stb, "") == 0 {
				continue
			}
			tbs = append(tbs, tableInfo{tbname: tbname, createTime: createTime, columns: columns, stbname: stb, uid: uid, tid: tid, vgId: vgId})
dengyihao's avatar
dengyihao 已提交
188 189 190
			continue
		}

dengyihao's avatar
dengyihao 已提交
191 192
		if strings.Compare(stb, stbname) == 0 {
			tbs = append(tbs, tableInfo{tbname: tbname, createTime: createTime, columns: columns, stbname: stbname, uid: uid, tid: tid, vgId: vgId})
dengyihao's avatar
dengyihao 已提交
193
		}
dengyihao's avatar
dengyihao 已提交
194 195 196
	}
	row.Close()
	return tbs
dengyihao's avatar
dengyihao 已提交
197

dengyihao's avatar
dengyihao 已提交
198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219
}
func GetStableField(db *sql.DB, dbname, stbname string) []string {
	result := make([]string, 0, 10)
	sql := "describe " + dbname + "." + stbname
	row, err := db.Query(sql)
	if err != nil {
		checkErr(err, sql)
	}
	count := 0
	for row.Next() {
		var field string
		var ty string
		var tlen int32
		var note string
		row.Scan(&field, &ty, &tlen, &note)

		// ignore time and tag col
		if count != 0 && strings.Compare(note, "TAG") != 0 {
			// skip first and skip tag col
			result = append(result, field)
		}
		count = count + 1
dengyihao's avatar
dengyihao 已提交
220
	}
dengyihao's avatar
dengyihao 已提交
221 222 223
	row.Close()
	return result
}
dengyihao's avatar
dengyihao 已提交
224

dengyihao's avatar
dengyihao 已提交
225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246
func taskInit(db *sql.DB, dbname string, srcdbname string, metatable string, exptable string, tskinfo *taskInfo) {
	{
		sql := fmt.Sprintf("create database if not exists %s update 2", dbname)
		_, err := db.Exec(sql)
		checkErr(err, sql)
	}
	{
		sql := fmt.Sprintf("create stable if not exists %s.%s (ts timestamp, dbname binary(64), tabname binary(64), colname binary(64), lastTime timestamp,  offline int) tags(tablename binary(128))", dbname, exptable)
		_, err := db.Exec(sql)
		checkErr(err, sql)
	}

	{
		sql := fmt.Sprintf("create table if not exists %s.%s (ts timestamp,  dbname binary(64), tablename binary(64), colName binary(128),  checkInterval int, threshold  int)", dbname, metatable)
		_, err := db.Exec(sql)
		checkErr(err, sql)
	}

	tbs := GetSubTableInfo(db, srcdbname, "")
	fmt.Printf("tbs size %d\n", len(tbs))

	fieldDict := make(map[string][]string)
dengyihao's avatar
dengyihao 已提交
247

dengyihao's avatar
dengyihao 已提交
248
	fieldTs := time.Now().Add(time.Hour * -1000)
dengyihao's avatar
dengyihao 已提交
249
	for _, e := range tbs {
dengyihao's avatar
dengyihao 已提交
250 251
		tbname := e.tbname
		stbname := e.stbname
dengyihao's avatar
dengyihao 已提交
252

dengyihao's avatar
dengyihao 已提交
253 254 255 256
		field, ok := fieldDict[stbname]
		if !ok {
			field = GetStableField(db, srcdbname, stbname)
			fieldDict[stbname] = field
dengyihao's avatar
dengyihao 已提交
257
		}
dengyihao's avatar
dengyihao 已提交
258 259 260 261 262

		for _, f := range field {
			insertTableInfoIntoMetatable := func(db *sql.DB, metaDB string, metaTable string, srcDB string, srcTable string, srcCol string, ts time.Time, interval, threshold int) {
				sql := fmt.Sprintf("select * from %s.%s where dbname = \"%s\" and tablename = \"%s\" and colname = \"%s\"", metaDB, metaTable, srcDB, srcTable, srcCol)
				row, err := db.Query(sql)
dengyihao's avatar
dengyihao 已提交
263
				if err != nil {
dengyihao's avatar
dengyihao 已提交
264
					checkErr(err, sql)
dengyihao's avatar
dengyihao 已提交
265
				}
dengyihao's avatar
dengyihao 已提交
266

dengyihao's avatar
dengyihao 已提交
267 268
				count := 0
				for row.Next() {
dengyihao's avatar
dengyihao 已提交
269
					count = count + 1
dengyihao's avatar
dengyihao 已提交
270 271 272 273
					if count >= 1 {
						row.Close()
						break
					}
dengyihao's avatar
dengyihao 已提交
274 275
				}

dengyihao's avatar
dengyihao 已提交
276 277 278 279 280 281
				if count == 0 {
					sql := fmt.Sprintf("insert into %s.%s values(%v, \"%s\", \"%s\", \"%s\", %d, %d)", metaDB, metaTable, ts.UnixMilli(), srcDB, srcTable, srcCol, interval, threshold)
					_, err := db.Exec(sql)
					if err != nil {
						checkErr(err, sql)
					}
dengyihao's avatar
dengyihao 已提交
282
				}
dengyihao's avatar
dengyihao 已提交
283
			}
dengyihao's avatar
dengyihao 已提交
284
			insertTableInfoIntoMetatable(db, dbname, metatable, srcdbname, tbname, f, fieldTs, 2, 2)
dengyihao's avatar
dengyihao 已提交
285
			fieldTs = fieldTs.Add(time.Millisecond * 2)
dengyihao's avatar
dengyihao 已提交
286
		}
dengyihao's avatar
dengyihao 已提交
287

dengyihao's avatar
dengyihao 已提交
288
		key := fmt.Sprintf("%s_%s", srcdbname, stbname)
dengyihao's avatar
dengyihao 已提交
289
		_, ok = tskinfo.subtask[key]
dengyihao's avatar
dengyihao 已提交
290 291 292 293 294
		if !ok {
			tskinfo.subtask[key] = &demo{db: db, dbname: dbname, srcdbname: srcdbname, suptabname: stbname, metaTable: metatable, exceptTable: exptable, wg: tskinfo.wg}
		}
	}
}
dengyihao's avatar
dengyihao 已提交
295 296

func subTaskStart(d *demo) {
dengyihao's avatar
dengyihao 已提交
297

dengyihao's avatar
dengyihao 已提交
298 299
	d.Init()
	for {
dengyihao's avatar
dengyihao 已提交
300 301 302 303 304 305
		select {
		case <-d.timer.C:
			timeout := d.NextTimout()
			fmt.Printf("stbname %s, timeout %d\n", d.suptabname, timeout)
			d.timer.Reset(time.Second * time.Duration(timeout))
		}
dengyihao's avatar
dengyihao 已提交
306
	}
dengyihao's avatar
dengyihao 已提交
307
	d.wg.Done()
dengyihao's avatar
dengyihao 已提交
308
}
dengyihao's avatar
dengyihao 已提交
309

dengyihao's avatar
dengyihao 已提交
310 311 312 313 314
func (d *demo) Init() {
	d.heap = make(MinHeap, 0, 200)
	heap.Init(&d.heap)
	d.metaDict = make(map[string]*schema)

dengyihao's avatar
dengyihao 已提交
315 316
	tbs := GetSubTableInfo(d.db, d.srcdbname, d.suptabname)
	fields := GetStableField(d.db, d.srcdbname, d.suptabname)
dengyihao's avatar
dengyihao 已提交
317

dengyihao's avatar
dengyihao 已提交
318 319 320 321
	lastRowDict := func(db *sql.DB, srcDB, stbname string) map[string]time.Time {
		result := make(map[string]time.Time)
		sql := fmt.Sprintf("select last_row(ts) from %s.%s group by tbname", srcDB, stbname)
		row, err := d.db.Query(sql)
dengyihao's avatar
dengyihao 已提交
322 323
		if err != nil {
			checkErr(err, sql)
dengyihao's avatar
dengyihao 已提交
324
		}
dengyihao's avatar
dengyihao 已提交
325 326 327 328 329
		for row.Next() {
			var ts time.Time
			var tbname string
			row.Scan(&ts, &tbname)
			result[tbname] = ts
dengyihao's avatar
dengyihao 已提交
330
		}
dengyihao's avatar
dengyihao 已提交
331 332 333
		row.Close()
		return result
	}(d.db, d.srcdbname, d.suptabname)
dengyihao's avatar
dengyihao 已提交
334 335 336

	for _, e := range tbs {
		tbname := e.tbname
dengyihao's avatar
dengyihao 已提交
337 338 339
		lastTime, ok := lastRowDict[tbname]
		if !ok {
			lastTime = time.Now()
dengyihao's avatar
dengyihao 已提交
340
		}
dengyihao's avatar
dengyihao 已提交
341 342

		for i, f := range fields {
dengyihao's avatar
dengyihao 已提交
343 344 345 346 347 348 349 350 351 352 353 354 355
			col := fmt.Sprintf("%s %s", tbname, f)
			var (
				ts            time.Time
				dbname        string
				tablename     string
				colname       string
				checkinterval int
				threshold     int
			)
			checkSql := fmt.Sprintf("select * from %s.%s where dbname = \"%s\" and tablename = \"%s\" and colname = \"%s\"", d.dbname, d.metaTable, d.srcdbname, tbname, f)
			checkRow, err := d.db.Query(checkSql)
			if err != nil {
				checkErr(err, checkSql)
dengyihao's avatar
dengyihao 已提交
356 357
			}

dengyihao's avatar
dengyihao 已提交
358 359 360
			for checkRow.Next() {
				_ = checkRow.Scan(&ts, &dbname, &tablename, &colname, &checkinterval, &threshold)
				d.metaDict[col] = &schema{idx: i, numOfField: len(fields), timestamp: lastTime, colName: col, interval: int32(checkinterval), threshold: int32(threshold)}
dengyihao's avatar
dengyihao 已提交
361
			}
dengyihao's avatar
dengyihao 已提交
362
			defer checkRow.Close()
dengyihao's avatar
dengyihao 已提交
363 364
		}
	}
dengyihao's avatar
dengyihao 已提交
365

dengyihao's avatar
dengyihao 已提交
366
	now := time.Now()
dengyihao's avatar
dengyihao 已提交
367
	for k, v := range d.metaDict {
dengyihao's avatar
dengyihao 已提交
368 369 370 371
		durtion := fmt.Sprintf("%ds", v.interval)
		s, _ := time.ParseDuration(durtion)
		now.Add(s)
		heap.Push(&d.heap, heapElem{timeout: now.Unix(), colName: k})
dengyihao's avatar
dengyihao 已提交
372 373
	}

dengyihao's avatar
dengyihao 已提交
374
	d.timer = time.NewTimer(time.Second * 1)
dengyihao's avatar
dengyihao 已提交
375 376 377
}

type ValueRows struct {
dengyihao's avatar
dengyihao 已提交
378 379 380
	column []interface{}
	ts     time.Time
	tbname string
dengyihao's avatar
dengyihao 已提交
381
}
dengyihao's avatar
dengyihao 已提交
382

dengyihao's avatar
dengyihao 已提交
383 384
func (d *demo) Update(stbname, tbname, col string, interval int32, threshold int32) {
	key := fmt.Sprintf("%s %s", tbname, col)
dengyihao's avatar
dengyihao 已提交
385
	sql := fmt.Sprintf("select * from %s.%s where dbname = \"%s\" and tablename = \"%s\" and colName = \"%s\"", d.dbname, d.metaTable, d.dbname, tbname, col)
dengyihao's avatar
dengyihao 已提交
386 387 388 389
	rows, _ := d.db.Query(sql)
	fmt.Printf("check metatable %s, SQL: %s\n", d.metaTable, sql)
	for rows.Next() {
		var (
dengyihao's avatar
dengyihao 已提交
390 391 392 393
			ts     time.Time
			dbname string
			tbname string
			col    string
dengyihao's avatar
dengyihao 已提交
394 395 396 397
			inter  int32
			thresh int32
		)

dengyihao's avatar
dengyihao 已提交
398 399 400 401 402 403 404
		err := rows.Scan(&ts, &dbname, &tbname, &col, &inter, &thresh)
		if interval != inter || threshold != thresh {
			sql := fmt.Sprintf("insert into %s.%s values(%v, \"%s\", \"%s\", \"%s\", %d, %d)", d.dbname, d.metaTable, ts.UnixMilli(), d.dbname, tbname, col, interval, threshold)
			_, err = d.db.Exec(sql)
			if err != nil {
				checkErr(err, sql)
			}
dengyihao's avatar
dengyihao 已提交
405 406
		}

dengyihao's avatar
dengyihao 已提交
407 408 409
	}

	schemadata := d.metaDict[key]
dengyihao's avatar
dengyihao 已提交
410 411 412 413
	if schemadata != nil {
		schemadata.interval = interval
		schemadata.threshold = threshold
	}
dengyihao's avatar
dengyihao 已提交
414
	defer rows.Close()
dengyihao's avatar
dengyihao 已提交
415 416 417
}

func (d *demo) NextTimout() int32 {
dengyihao's avatar
dengyihao 已提交
418
	now := time.Now().Unix()
dengyihao's avatar
dengyihao 已提交
419 420 421
	colArray := make([]string, 0, 10)

	for !d.heap.Empty() {
dengyihao's avatar
dengyihao 已提交
422
		elem := d.heap.Top()
dengyihao's avatar
dengyihao 已提交
423 424 425 426 427 428 429 430
		if elem.timeout <= now {
			colArray = append(colArray, elem.colName)
			heap.Pop(&d.heap)
		} else {
			break
		}
	}

dengyihao's avatar
dengyihao 已提交
431 432 433
	lastRowGroup, colIdx := func(db *sql.DB, srcDB, stbname string) (map[string]*ValueRows, map[string]int) {
		result := make(map[string]*ValueRows)
		colIdx := make(map[string]int)
dengyihao's avatar
dengyihao 已提交
434

dengyihao's avatar
dengyihao 已提交
435 436 437 438 439 440 441 442 443 444 445 446
		sql := fmt.Sprintf("select last_row(*) from %s.%s group by tbname", srcDB, stbname)
		row, err := db.Query(sql)
		if err != nil {
			checkErr(err, sql)
		}
		tt, err := row.ColumnTypes()
		types := make([]reflect.Type, len(tt))
		for i, tp := range tt {
			st := tp.ScanType()
			types[i] = st
		}
		columns, _ := row.Columns()
dengyihao's avatar
dengyihao 已提交
447

dengyihao's avatar
dengyihao 已提交
448
		for row.Next() {
dengyihao's avatar
dengyihao 已提交
449
			values := make([]interface{}, len(tt))
dengyihao's avatar
dengyihao 已提交
450 451 452
			for i := range values {
				values[i] = reflect.New(types[i]).Interface()
			}
dengyihao's avatar
dengyihao 已提交
453
			row.Scan(values...)
dengyihao's avatar
dengyihao 已提交
454

dengyihao's avatar
dengyihao 已提交
455 456
			ts, _ := values[0].(driver.Valuer).Value()
			tts, _ := ts.(time.Time)
dengyihao's avatar
dengyihao 已提交
457

dengyihao's avatar
dengyihao 已提交
458 459
			tbname, _ := values[len(tt)-1].(driver.Valuer).Value()
			ttbname, _ := tbname.(string)
dengyihao's avatar
dengyihao 已提交
460

dengyihao's avatar
dengyihao 已提交
461 462
			result[ttbname] = &ValueRows{column: values, ts: tts, tbname: ttbname}
		}
dengyihao's avatar
dengyihao 已提交
463

dengyihao's avatar
dengyihao 已提交
464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492
		row.Close()

		for i, v := range columns {
			colIdx[v] = i
		}
		return result, colIdx
	}(d.db, d.srcdbname, d.suptabname)

	for _, e := range colArray {
		elem := d.metaDict[e]
		var colName string
		var tabName string
		fmt.Sscanf(e, "%s %s", &tabName, &colName)

		ts, update := func(rowGroup map[string]*ValueRows, colIdx map[string]int, tabName, colName string) (time.Time, bool) {
			var ts time.Time
			update := false

			field := fmt.Sprintf("last_row(%s)", colName)
			idx, ok1 := colIdx[field]
			row, ok2 := rowGroup[tabName]
			if ok1 && ok2 {
				if row != nil {
					v, _ := row.column[idx].(driver.Valuer).Value()
					if v != nil {
						ts = row.ts
						update = true
					}
				}
dengyihao's avatar
dengyihao 已提交
493
			}
dengyihao's avatar
dengyihao 已提交
494 495
			return ts, update
		}(lastRowGroup, colIdx, tabName, colName)
dengyihao's avatar
dengyihao 已提交
496

dengyihao's avatar
dengyihao 已提交
497 498
		if !update {
			ts = elem.timestamp
dengyihao's avatar
dengyihao 已提交
499
		}
dengyihao's avatar
dengyihao 已提交
500

dengyihao's avatar
dengyihao 已提交
501 502
		//fmt.Printf("Get time tbname: %s, colname:%s, current %v, lasttime %v\n", tabName, colName, ts.UnixMilli(), elem.timestamp.UnixMilli())
		exceptTableName := fmt.Sprintf("%s_%s_%s", d.suptabname, tabName, colName)
dengyihao's avatar
dengyihao 已提交
503 504 505 506 507

		var dura time.Duration = ts.Sub(elem.timestamp)
		cost := int32(dura.Seconds())
		if cost == 0 {
			elem.timestamp = ts
dengyihao's avatar
dengyihao 已提交
508
			sql := fmt.Sprintf("insert into %s.%s using %s.%s tags(\"%s\") values(%v, \"%s\", \"%s\", \"%s\", %v, %d)", d.dbname, exceptTableName, d.dbname, d.exceptTable, exceptTableName, time.Now().UnixMilli(), d.srcdbname, tabName, colName, ts.UnixMilli(), int(time.Now().Sub(elem.timestamp).Seconds()))
dengyihao's avatar
dengyihao 已提交
509
			fmt.Printf("INSERT SQL: %s\n", sql)
dengyihao's avatar
dengyihao 已提交
510
			_, err := d.db.Exec(sql)
dengyihao's avatar
dengyihao 已提交
511 512 513
			if err != nil {
				checkErr(err, sql)
			}
dengyihao's avatar
dengyihao 已提交
514 515
		} else {
			elem.timestamp = ts
dengyihao's avatar
dengyihao 已提交
516

dengyihao's avatar
dengyihao 已提交
517
			if cost > elem.threshold {
dengyihao's avatar
dengyihao 已提交
518
				sql := fmt.Sprintf("insert into %s.%s using %s.%s tags(\"%s\") values(%v, \"%s\", \"%s\", \"%s\", %v, %d)", d.dbname, exceptTableName, d.dbname, d.exceptTable, exceptTableName, time.Now().UnixMilli(), d.srcdbname, tabName, colName, ts.UnixMilli(), int(time.Now().Sub(elem.timestamp).Seconds()))
dengyihao's avatar
dengyihao 已提交
519
				fmt.Printf("INSERT SQL: %s\n", sql)
dengyihao's avatar
dengyihao 已提交
520 521 522 523 524
				_, err := d.db.Exec(sql)
				if err != nil {
					checkErr(err, sql)
				}
			} else {
dengyihao's avatar
dengyihao 已提交
525
				//fmt.Printf("C dura %d, threshold %d not insert \n", cost, elem.threshold)
dengyihao's avatar
dengyihao 已提交
526 527 528
			}
		}
		heap.Push(&d.heap, heapElem{timeout: int64(elem.interval) + now, colName: e})
dengyihao's avatar
dengyihao 已提交
529 530 531 532 533 534 535
	}

	if !d.heap.Empty() {
		elem := d.heap.Top()
		timeout := elem.timeout - now
		if timeout < 1 {
			timeout = 1
dengyihao's avatar
dengyihao 已提交
536 537
		}
		return int32(timeout)
dengyihao's avatar
dengyihao 已提交
538
	}
dengyihao's avatar
dengyihao 已提交
539 540 541
	return 1

}
dengyihao's avatar
dengyihao 已提交
542 543 544 545 546 547 548 549

func printAllArgs() {
	fmt.Printf("\n============= args parse result: =============\n")
	fmt.Printf("hostName:             %v\n", configPara.hostName)
	fmt.Printf("serverPort:           %v\n", configPara.serverPort)
	fmt.Printf("usr:                  %v\n", configPara.user)
	fmt.Printf("password:             %v\n", configPara.password)
	fmt.Printf("dbName:               %v\n", configPara.dbName)
dengyihao's avatar
dengyihao 已提交
550
	fmt.Printf("srcDbName:            %v\n", configPara.srcdbName)
dengyihao's avatar
dengyihao 已提交
551 552
	fmt.Printf("stbNme:               %v\n", configPara.supTblName)
	fmt.Printf("================================================\n")
dengyihao's avatar
dengyihao 已提交
553
}
dengyihao's avatar
dengyihao 已提交
554 555 556

func main() {

dengyihao's avatar
dengyihao 已提交
557
	printAllArgs()
dengyihao's avatar
dengyihao 已提交
558
	url = "root:taosdata@/tcp(" + configPara.hostName + ":" + strconv.Itoa(configPara.serverPort) + ")/"
dengyihao's avatar
dengyihao 已提交
559

dengyihao's avatar
dengyihao 已提交
560 561
	db, err := sql.Open(taosDriverName, url)
	if err != nil {
dengyihao's avatar
dengyihao 已提交
562
		checkErr(err, "failed to connect db")
dengyihao's avatar
dengyihao 已提交
563 564
	}
	wg := sync.WaitGroup{}
dengyihao's avatar
dengyihao 已提交
565 566 567 568 569 570 571 572
	info := &taskInfo{subtask: make(map[string]*demo), wg: &wg}

	taskInit(db, configPara.dbName, configPara.srcdbName, "metatable", "exptable", info)
	for _, v := range info.subtask {
		wg.Add(1)
		go subTaskStart(v)
	}
	wg.Wait()
dengyihao's avatar
dengyihao 已提交
573
}