taosdemo.go 11.9 KB
Newer Older
H
Hui Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
package main

import (
	"database/sql"
19
	"flag"
H
Hui Li 已提交
20
	"fmt"
21
	"math/rand"
H
Hui Li 已提交
22 23 24
	"os"
	"runtime"
	"strconv"
25
	"sync"
H
Hui Li 已提交
26
	"time"
27 28 29

	_ "github.com/taosdata/driver-go/taosSql"

H
Hui Li 已提交
30 31 32 33
	//"golang.org/x/sys/unix"
)

const (
34 35
	maxLocationSize = 32
	//maxSqlBufSize   = 65480
H
Hui Li 已提交
36 37
)

38 39 40 41
var locations = [maxLocationSize]string{
	"Beijing", "Shanghai", "Guangzhou", "Shenzhen",
	"HangZhou", "Tianjin", "Wuhan", "Changsha",
	"Nanjing", "Xian"}
H
Hui Li 已提交
42 43

type config struct {
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
	hostName             string
	serverPort           int
	user                 string
	password             string
	dbName               string
	supTblName           string
	tablePrefix          string
	numOftables          int
	numOfRecordsPerTable int
	numOfRecordsPerReq   int
	numOfThreads         int
	startTimestamp       string
	startTs              int64

	keep int
	days int
H
Hui Li 已提交
60 61 62 63 64 65 66
}

var configPara config
var taosDriverName = "taosSql"
var url string

func init() {
67
	flag.StringVar(&configPara.hostName, "h", "127.0.0.1", "The host to connect to TDengine server.")
H
Hui Li 已提交
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
	flag.IntVar(&configPara.serverPort, "p", 6030, "The TCP/IP port number to use for the connection to TDengine server.")
	flag.StringVar(&configPara.user, "u", "root", "The TDengine user name to use when connecting to the server.")
	flag.StringVar(&configPara.password, "P", "taosdata", "The password to use when connecting to the server.")
	flag.StringVar(&configPara.dbName, "d", "test", "Destination database.")
	flag.StringVar(&configPara.tablePrefix, "m", "d", "Table prefix name.")
	flag.IntVar(&configPara.numOftables, "t", 2, "The number of tables.")
	flag.IntVar(&configPara.numOfRecordsPerTable, "n", 10, "The number of records per table.")
	flag.IntVar(&configPara.numOfRecordsPerReq, "r", 3, "The number of records per request.")
	flag.IntVar(&configPara.numOfThreads, "T", 1, "The number of threads.")
	flag.StringVar(&configPara.startTimestamp, "s", "2020-10-01 08:00:00", "The start timestamp for one table.")
	flag.Parse()

	configPara.keep = 365 * 20
	configPara.days = 30
	configPara.supTblName = "meters"

	startTs, err := time.ParseInLocation("2006-01-02 15:04:05", configPara.startTimestamp, time.Local)
85 86
	if err == nil {
		configPara.startTs = startTs.UnixNano() / 1e6
H
Hui Li 已提交
87 88 89 90 91
	}
}

func printAllArgs() {
	fmt.Printf("\n============= args parse result: =============\n")
92
	fmt.Printf("hostName:             %v\n", configPara.hostName)
H
Hui Li 已提交
93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
	fmt.Printf("serverPort:           %v\n", configPara.serverPort)
	fmt.Printf("usr:                  %v\n", configPara.user)
	fmt.Printf("password:             %v\n", configPara.password)
	fmt.Printf("dbName:               %v\n", configPara.dbName)
	fmt.Printf("tablePrefix:          %v\n", configPara.tablePrefix)
	fmt.Printf("numOftables:          %v\n", configPara.numOftables)
	fmt.Printf("numOfRecordsPerTable: %v\n", configPara.numOfRecordsPerTable)
	fmt.Printf("numOfRecordsPerReq:   %v\n", configPara.numOfRecordsPerReq)
	fmt.Printf("numOfThreads:         %v\n", configPara.numOfThreads)
	fmt.Printf("startTimestamp:       %v[%v]\n", configPara.startTimestamp, configPara.startTs)
	fmt.Printf("================================================\n")
}

func main() {
	printAllArgs()
	fmt.Printf("Please press enter key to continue....\n")
109
	_, _ = fmt.Scanln()
H
Hui Li 已提交
110 111

	url = "root:taosdata@/tcp(" + configPara.hostName + ":" + strconv.Itoa(configPara.serverPort) + ")/"
112
	//url = fmt.Sprintf("%s:%s@/tcp(%s:%d)/%s?interpolateParams=true", configPara.user, configPara.password, configPara.hostName, configPara.serverPort, configPara.dbName)
H
Hui Li 已提交
113 114 115 116 117 118 119
	// open connect to taos server
	//db, err := sql.Open(taosDriverName, url)
	//if err != nil {
	//  fmt.Println("Open database error: %s\n", err)
	//  os.Exit(1)
	//}
	//defer db.Close()
120
	rand.Seed(time.Now().Unix())
H
Hui Li 已提交
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142

	createDatabase(configPara.dbName, configPara.supTblName)
	fmt.Printf("======== create database success! ========\n\n")

	//create_table(db, stblName)
	multiThreadCreateTable(configPara.numOfThreads, configPara.numOftables, configPara.dbName, configPara.tablePrefix)
	fmt.Printf("======== create super table and child tables success! ========\n\n")

	//insert_data(db, demot)
	multiThreadInsertData(configPara.numOfThreads, configPara.numOftables, configPara.dbName, configPara.tablePrefix)
	fmt.Printf("======== insert data into child tables success! ========\n\n")

	//select_data(db, demot)
	selectTest(configPara.dbName, configPara.tablePrefix, configPara.supTblName)
	fmt.Printf("======== select data success!  ========\n\n")

	fmt.Printf("======== end demo ========\n")
}

func createDatabase(dbName string, supTblName string) {
	db, err := sql.Open(taosDriverName, url)
	if err != nil {
143
		fmt.Printf("Open database error: %s\n", err)
H
Hui Li 已提交
144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169
		os.Exit(1)
	}
	defer db.Close()

	// drop database if exists
	sqlStr := "drop database if exists " + dbName
	_, err = db.Exec(sqlStr)
	checkErr(err, sqlStr)

	time.Sleep(time.Second)

	// create database
	sqlStr = "create database " + dbName + " keep " + strconv.Itoa(configPara.keep) + " days " + strconv.Itoa(configPara.days)
	_, err = db.Exec(sqlStr)
	checkErr(err, sqlStr)

	// use database
	//sqlStr = "use " + dbName
	//_, err = db.Exec(sqlStr)
	//checkErr(err, sqlStr)

	sqlStr = "create table if not exists " + dbName + "." + supTblName + " (ts timestamp, current float, voltage int, phase float) tags(location binary(64), groupId int);"
	_, err = db.Exec(sqlStr)
	checkErr(err, sqlStr)
}

170
func multiThreadCreateTable(threads int, nTables int, dbName string, tablePrefix string) {
H
Hui Li 已提交
171 172
	st := time.Now().UnixNano()

173 174
	if threads < 1 {
		threads = 1
H
Hui Li 已提交
175 176
	}

177 178 179 180
	a := nTables / threads
	if a < 1 {
		threads = nTables
		a = 1
H
Hui Li 已提交
181 182
	}

183
	b := nTables % threads
H
Hui Li 已提交
184

185
	last := 0
H
Hui Li 已提交
186 187 188 189
	endTblId := 0
	wg := sync.WaitGroup{}
	for i := 0; i < threads; i++ {
		startTblId := last
190
		if i < b {
H
Hui Li 已提交
191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210
			endTblId = last + a
		} else {
			endTblId = last + a - 1
		}
		last = endTblId + 1
		wg.Add(1)
		go createTable(dbName, tablePrefix, startTblId, endTblId, &wg)
	}
	wg.Wait()

	et := time.Now().UnixNano()
	fmt.Printf("create tables spent duration: %6.6fs\n", (float32(et-st))/1e9)
}

func createTable(dbName string, childTblPrefix string, startTblId int, endTblId int, wg *sync.WaitGroup) {
	//fmt.Printf("subThread[%d]: create table from %d to %d \n", unix.Gettid(), startTblId, endTblId)
	// windows.GetCurrentThreadId()

	db, err := sql.Open(taosDriverName, url)
	if err != nil {
211
		fmt.Printf("Open database error: %s\n", err)
H
Hui Li 已提交
212 213 214 215
		os.Exit(1)
	}
	defer db.Close()

216 217 218 219 220 221 222 223
	for i := startTblId; i <= endTblId; i++ {
		sqlStr := "create table if not exists " + dbName + "." + childTblPrefix + strconv.Itoa(i) + " using " + dbName + ".meters tags('" + locations[i%maxLocationSize] + "', " + strconv.Itoa(i) + ");"
		//fmt.Printf("sqlStr:               %v\n", sqlStr)
		_, err = db.Exec(sqlStr)
		checkErr(err, sqlStr)
	}
	wg.Done()
	runtime.Goexit()
H
Hui Li 已提交
224 225 226
}

func generateRowData(ts int64) string {
227 228 229 230 231
	voltage := rand.Int() % 1000
	current := 200 + rand.Float32()
	phase := rand.Float32()
	values := "( " + strconv.FormatInt(ts, 10) + ", " + strconv.FormatFloat(float64(current), 'f', 6, 64) + ", " + strconv.Itoa(voltage) + ", " + strconv.FormatFloat(float64(phase), 'f', 6, 64) + " ) "
	return values
H
Hui Li 已提交
232
}
233

H
Hui Li 已提交
234 235 236 237 238 239
func insertData(dbName string, childTblPrefix string, startTblId int, endTblId int, wg *sync.WaitGroup) {
	//fmt.Printf("subThread[%d]: insert data to table from %d to %d \n", unix.Gettid(), startTblId, endTblId)
	// windows.GetCurrentThreadId()

	db, err := sql.Open(taosDriverName, url)
	if err != nil {
240
		fmt.Printf("Open database error: %s\n", err)
H
Hui Li 已提交
241 242 243 244
		os.Exit(1)
	}
	defer db.Close()

245
	tmpTs := configPara.startTs
H
Hui Li 已提交
246
	//rand.New(rand.NewSource(time.Now().UnixNano()))
247
	for tID := startTblId; tID <= endTblId; tID++ {
H
Hui Li 已提交
248 249 250 251 252 253 254
		totalNum := 0
		for {
			sqlStr := "insert into " + dbName + "." + childTblPrefix + strconv.Itoa(tID) + " values "
			currRowNum := 0
			for {
				tmpTs += 1000
				valuesOfRow := generateRowData(tmpTs)
255 256
				currRowNum += 1
				totalNum += 1
H
Hui Li 已提交
257 258 259

				sqlStr = fmt.Sprintf("%s %s", sqlStr, valuesOfRow)

260 261
				if currRowNum >= configPara.numOfRecordsPerReq || totalNum >= configPara.numOfRecordsPerTable {
					break
H
Hui Li 已提交
262 263 264 265 266 267 268 269 270
				}
			}

			res, err := db.Exec(sqlStr)
			checkErr(err, sqlStr)

			count, err := res.RowsAffected()
			checkErr(err, "rows affected")

271 272
			if count != int64(currRowNum) {
				fmt.Printf("insert data, expect affected:%d, actual:%d\n", currRowNum, count)
H
Hui Li 已提交
273 274 275
				os.Exit(1)
			}

276
			if totalNum >= configPara.numOfRecordsPerTable {
H
Hui Li 已提交
277 278 279 280 281 282 283 284
				break
			}
		}
	}

	wg.Done()
	runtime.Goexit()
}
285 286

func multiThreadInsertData(threads int, nTables int, dbName string, tablePrefix string) {
H
Hui Li 已提交
287 288
	st := time.Now().UnixNano()

289 290
	if threads < 1 {
		threads = 1
H
Hui Li 已提交
291 292
	}

293 294 295 296
	a := nTables / threads
	if a < 1 {
		threads = nTables
		a = 1
H
Hui Li 已提交
297 298
	}

299
	b := nTables % threads
H
Hui Li 已提交
300

301
	last := 0
H
Hui Li 已提交
302 303 304 305
	endTblId := 0
	wg := sync.WaitGroup{}
	for i := 0; i < threads; i++ {
		startTblId := last
306
		if i < b {
H
Hui Li 已提交
307 308 309 310 311 312
			endTblId = last + a
		} else {
			endTblId = last + a - 1
		}
		last = endTblId + 1
		wg.Add(1)
313
		go insertData(dbName, tablePrefix, startTblId, endTblId, &wg)
H
Hui Li 已提交
314 315 316 317 318 319
	}
	wg.Wait()

	et := time.Now().UnixNano()
	fmt.Printf("insert data spent duration: %6.6fs\n", (float32(et-st))/1e9)
}
320 321

func selectTest(dbName string, tbPrefix string, supTblName string) {
H
Hui Li 已提交
322 323
	db, err := sql.Open(taosDriverName, url)
	if err != nil {
324
		fmt.Printf("Open database error: %s\n", err)
H
Hui Li 已提交
325 326 327 328 329 330 331 332 333 334 335 336 337 338 339
		os.Exit(1)
	}
	defer db.Close()

	// select sql 1
	limit := 3
	offset := 0
	sqlStr := "select * from " + dbName + "." + supTblName + " limit " + strconv.Itoa(limit) + " offset " + strconv.Itoa(offset)
	rows, err := db.Query(sqlStr)
	checkErr(err, sqlStr)

	defer rows.Close()
	fmt.Printf("query sql: %s\n", sqlStr)
	for rows.Next() {
		var (
340 341 342 343 344 345
			ts       string
			current  float32
			voltage  int
			phase    float32
			location string
			groupid  int
H
Hui Li 已提交
346 347 348 349 350 351 352 353 354 355 356 357 358 359
		)
		err := rows.Scan(&ts, &current, &voltage, &phase, &location, &groupid)
		if err != nil {
			checkErr(err, "rows scan fail")
		}

		fmt.Printf("ts:%s\t current:%f\t voltage:%d\t phase:%f\t location:%s\t groupid:%d\n", ts, current, voltage, phase, location, groupid)
	}
	// check iteration error
	if rows.Err() != nil {
		checkErr(err, "rows next iteration error")
	}

	// select sql 2
360
	sqlStr = "select avg(voltage), min(voltage), max(voltage) from " + dbName + "." + tbPrefix + strconv.Itoa(rand.Int()%configPara.numOftables)
H
Hui Li 已提交
361 362 363 364 365 366 367
	rows, err = db.Query(sqlStr)
	checkErr(err, sqlStr)

	defer rows.Close()
	fmt.Printf("\nquery sql: %s\n", sqlStr)
	for rows.Next() {
		var (
368 369 370
			voltageAvg float32
			voltageMin int
			voltageMax int
H
Hui Li 已提交
371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392
		)
		err := rows.Scan(&voltageAvg, &voltageMin, &voltageMax)
		if err != nil {
			checkErr(err, "rows scan fail")
		}

		fmt.Printf("avg(voltage):%f\t min(voltage):%d\t max(voltage):%d\n", voltageAvg, voltageMin, voltageMax)
	}
	// check iteration error
	if rows.Err() != nil {
		checkErr(err, "rows next iteration error")
	}

	// select sql 3
	sqlStr = "select last(*) from " + dbName + "." + supTblName
	rows, err = db.Query(sqlStr)
	checkErr(err, sqlStr)

	defer rows.Close()
	fmt.Printf("\nquery sql: %s\n", sqlStr)
	for rows.Next() {
		var (
393 394 395 396
			lastTs      string
			lastCurrent float32
			lastVoltage int
			lastPhase   float32
H
Hui Li 已提交
397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415
		)
		err := rows.Scan(&lastTs, &lastCurrent, &lastVoltage, &lastPhase)
		if err != nil {
			checkErr(err, "rows scan fail")
		}

		fmt.Printf("last(ts):%s\t last(current):%f\t last(voltage):%d\t last(phase):%f\n", lastTs, lastCurrent, lastVoltage, lastPhase)
	}
	// check iteration error
	if rows.Err() != nil {
		checkErr(err, "rows next iteration error")
	}
}
func checkErr(err error, prompt string) {
	if err != nil {
		fmt.Printf("%s\n", prompt)
		panic(err)
	}
}