main.go 29.4 KB
Newer Older
X
xieyinglin 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
package main

import (
	"bufio"
	"bytes"
	"database/sql"
	"encoding/json"
	"flag"
	"fmt"
	"io"
	"log"
	"os"
	"sort"
	"strconv"
	"strings"
	"sync"
	"time"

张金富 已提交
19
	dataImport "github.com/taosdata/TDengine/importSampleData/import"
dengyihao's avatar
TD-935  
dengyihao 已提交
20

21
	_ "github.com/taosdata/driver-go/taosSql"
X
xieyinglin 已提交
22 23 24
)

const (
张金富 已提交
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
	// 主键类型必须为 timestamp
	TIMESTAMP = "timestamp"

	// 样例数据中主键时间字段是 millisecond 还是 dateTime 格式
	DATETIME    = "datetime"
	MILLISECOND = "millisecond"

	DefaultStartTime int64 = -1
	DefaultInterval  int64 = 1 * 1000 // 导入的记录时间间隔,该设置只会在指定 auto=1 之后生效,否则会根据样例数据自动计算间隔时间。单位为毫秒,默认 1000。
	DefaultDelay     int64 = -1       //

	// 当 save 为 1 时保存统计信息的表名, 默认 statistic。
	DefaultStatisticTable = "statistic"

	// 样例数据文件格式,可以是 json 或 csv
	JsonFormat = "json"
	CsvFormat  = "csv"

	SuperTablePrefix = "s_" // 超级表前缀
	SubTablePrefix   = "t_" // 子表前缀

	DriverName      = "taosSql"
	StartTimeLayout = "2006-01-02 15:04:05.000"
	InsertPrefix    = "insert into "
X
xieyinglin 已提交
49 50 51
)

var (
张金富 已提交
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
	cfg          string // 导入配置文件路径,包含样例数据文件相关描述及对应 TDengine 配置信息。默认使用 config/cfg.toml
	cases        string // 需要导入的场景名称,该名称可从 -cfg 指定的配置文件中 [usecase] 查看,可同时导入多个场景,中间使用逗号分隔,如:sensor_info,camera_detection,默认为 sensor_info
	hnum         int    // 需要将样例数据进行横向扩展的倍数,假设原有样例数据包含 1 张子表 t_0 数据,指定 hnum 为 2 时会根据原有表名创建 t、t_1 两张子表。默认为 100。
	vnum         int    // 需要将样例数据进行纵向扩展的次数,如果设置为 0 代表将历史数据导入至当前时间后持续按照指定间隔导入。默认为 1000,表示将样例数据在时间轴上纵向复制1000 次
	thread       int    // 执行导入数据的线程数目,默认为 10
	batch        int    // 执行导入数据时的批量大小,默认为 100。批量是指一次写操作时,包含多少条记录
	auto         int    // 是否自动生成样例数据中的主键时间戳,1 是,0 否, 默认 0
	startTimeStr string // 导入的记录开始时间,格式为 "yyyy-MM-dd HH:mm:ss.SSS",不设置会使用样例数据中最小时间,设置后会忽略样例数据中的主键时间,会按照指定的 start 进行导入。如果 auto 为 1,则必须设置 start,默认为空
	interval     int64  // 导入的记录时间间隔,该设置只会在指定 auto=1 之后生效,否则会根据样例数据自动计算间隔时间。单位为毫秒,默认 1000
	host         string // 导入的 TDengine 服务器 IP,默认为 127.0.0.1
	port         int    // 导入的 TDengine 服务器端口,默认为 6030
	user         string // 导入的 TDengine 用户名,默认为 root
	password     string // 导入的 TDengine 用户密码,默认为 taosdata
	dropdb       int    // 导入数据之前是否删除数据库,1 是,0 否, 默认 0
	db           string // 导入的 TDengine 数据库名称,默认为 test_yyyyMMdd
	dbparam      string // 当指定的数据库不存在时,自动创建数据库时可选项配置参数,如 days 10 cache 16000 ablocks 4,默认为空
X
xieyinglin 已提交
68 69

	dataSourceName string
dengyihao's avatar
TD-935  
dengyihao 已提交
70
	startTime      int64
X
xieyinglin 已提交
71

dengyihao's avatar
TD-935  
dengyihao 已提交
72 73 74
	superTableConfigMap = make(map[string]*superTableConfig)
	subTableMap         = make(map[string]*dataRows)
	scaleTableNames     []string
X
xieyinglin 已提交
75 76 77

	scaleTableMap = make(map[string]*scaleTableInfo)

dengyihao's avatar
TD-935  
dengyihao 已提交
78
	successRows    []int64
X
xieyinglin 已提交
79
	lastStaticTime time.Time
dengyihao's avatar
TD-935  
dengyihao 已提交
80 81
	lastTotalRows  int64
	timeTicker     *time.Ticker
张金富 已提交
82 83 84 85
	delay          int64  // 当 vnum 设置为 0 时持续导入的时间间隔,默认为所有场景中最小记录间隔时间的一半,单位 ms。
	tick           int64  // 打印统计信息的时间间隔,默认 2000 ms。
	save           int    // 是否保存统计信息到 tdengine 的 statistic 表中,1 是,0 否, 默认 0。
	saveTable      string // 当 save 为 1 时保存统计信息的表名, 默认 statistic。
X
xieyinglin 已提交
86 87 88
)

type superTableConfig struct {
dengyihao's avatar
TD-935  
dengyihao 已提交
89 90 91
	startTime   int64
	endTime     int64
	cycleTime   int64
X
xieyinglin 已提交
92
	avgInterval int64
张金富 已提交
93
	config      dataImport.CaseConfig
X
xieyinglin 已提交
94 95 96 97
}

type scaleTableInfo struct {
	scaleTableName string
dengyihao's avatar
TD-935  
dengyihao 已提交
98 99
	subTableName   string
	insertRows     int64
X
xieyinglin 已提交
100 101
}

张金富 已提交
102 103 104 105
//type tableRows struct {
//	tableName string // tableName
//	value     string // values(...)
//}
X
xieyinglin 已提交
106 107

type dataRows struct {
dengyihao's avatar
TD-935  
dengyihao 已提交
108
	rows   []map[string]interface{}
张金富 已提交
109
	config dataImport.CaseConfig
X
xieyinglin 已提交
110 111 112 113 114 115 116
}

func (rows dataRows) Len() int {
	return len(rows.rows)
}

func (rows dataRows) Less(i, j int) bool {
张金富 已提交
117 118 119
	iTime := getPrimaryKey(rows.rows[i][rows.config.Timestamp])
	jTime := getPrimaryKey(rows.rows[j][rows.config.Timestamp])
	return iTime < jTime
X
xieyinglin 已提交
120 121 122 123 124 125 126 127 128 129 130 131 132
}

func (rows dataRows) Swap(i, j int) {
	rows.rows[i], rows.rows[j] = rows.rows[j], rows.rows[i]
}

func getPrimaryKey(value interface{}) int64 {
	val, _ := value.(int64)
	//time, _ := strconv.ParseInt(str, 10, 64)
	return val
}

func init() {
张金富 已提交
133
	parseArg() // parse argument
X
xieyinglin 已提交
134 135

	if db == "" {
张金富 已提交
136
		// 导入的 TDengine 数据库名称,默认为 test_yyyyMMdd
dengyihao's avatar
TD-935  
dengyihao 已提交
137
		db = fmt.Sprintf("test_%s", time.Now().Format("20060102"))
X
xieyinglin 已提交
138 139
	}

张金富 已提交
140
	if auto == 1 && len(startTimeStr) == 0 {
X
xieyinglin 已提交
141 142 143
		log.Fatalf("startTime must be set when auto is 1, the format is \"yyyy-MM-dd HH:mm:ss.SSS\" ")
	}

张金富 已提交
144 145
	if len(startTimeStr) != 0 {
		t, err := time.ParseInLocation(StartTimeLayout, strings.TrimSpace(startTimeStr), time.Local)
X
xieyinglin 已提交
146
		if err != nil {
张金富 已提交
147
			log.Fatalf("param startTime %s error, %s\n", startTimeStr, err)
X
xieyinglin 已提交
148 149 150
		}

		startTime = t.UnixNano() / 1e6 // as millisecond
dengyihao's avatar
TD-935  
dengyihao 已提交
151
	} else {
张金富 已提交
152
		startTime = DefaultStartTime
X
xieyinglin 已提交
153 154 155 156 157 158 159 160 161 162 163
	}

	dataSourceName = fmt.Sprintf("%s:%s@/tcp(%s:%d)/", user, password, host, port)

	printArg()

	log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile)
}

func main() {

张金富 已提交
164
	importConfig := dataImport.LoadConfig(cfg)
X
xieyinglin 已提交
165

张金富 已提交
166
	var caseMinInterval int64 = -1
X
xieyinglin 已提交
167

X
xieyinglin 已提交
168 169 170 171 172 173 174 175 176 177
	for _, userCase := range strings.Split(cases, ",") {
		caseConfig, ok := importConfig.UserCases[userCase]

		if !ok {
			log.Println("not exist case: ", userCase)
			continue
		}

		checkUserCaseConfig(userCase, &caseConfig)

张金富 已提交
178
		// read file as map array
X
xieyinglin 已提交
179 180 181 182 183 184 185 186
		fileRows := readFile(caseConfig)
		log.Printf("case [%s] sample data file contains %d rows.\n", userCase, len(fileRows.rows))

		if len(fileRows.rows) == 0 {
			log.Printf("there is no valid line in file %s\n", caseConfig.FilePath)
			continue
		}

张金富 已提交
187
		_, exists := superTableConfigMap[caseConfig.StName]
X
xieyinglin 已提交
188
		if !exists {
张金富 已提交
189
			superTableConfigMap[caseConfig.StName] = &superTableConfig{config: caseConfig}
X
xieyinglin 已提交
190
		} else {
张金富 已提交
191
			log.Fatalf("the stname of case %s already exist.\n", caseConfig.StName)
X
xieyinglin 已提交
192 193 194 195 196
		}

		var start, cycleTime, avgInterval int64 = getSuperTableTimeConfig(fileRows)

		// set super table's startTime, cycleTime and avgInterval
张金富 已提交
197 198 199
		superTableConfigMap[caseConfig.StName].startTime = start
		superTableConfigMap[caseConfig.StName].cycleTime = cycleTime
		superTableConfigMap[caseConfig.StName].avgInterval = avgInterval
X
xieyinglin 已提交
200

张金富 已提交
201 202
		if caseMinInterval == -1 || caseMinInterval > avgInterval {
			caseMinInterval = avgInterval
X
xieyinglin 已提交
203 204
		}

张金富 已提交
205
		startStr := time.Unix(0, start*int64(time.Millisecond)).Format(StartTimeLayout)
X
xieyinglin 已提交
206 207 208
		log.Printf("case [%s] startTime %s(%d), average dataInterval %d ms, cycleTime %d ms.\n", userCase, startStr, start, avgInterval, cycleTime)
	}

张金富 已提交
209
	if DefaultDelay == delay {
X
xieyinglin 已提交
210
		// default delay
张金富 已提交
211
		delay = caseMinInterval / 2
X
xieyinglin 已提交
212
		if delay < 1 {
dengyihao's avatar
TD-935  
dengyihao 已提交
213
			delay = 1
X
xieyinglin 已提交
214 215 216 217
		}
		log.Printf("actual delay is %d ms.", delay)
	}

X
xieyinglin 已提交
218 219 220 221 222 223 224 225 226 227
	superTableNum := len(superTableConfigMap)
	if superTableNum == 0 {
		log.Fatalln("no valid file, exited")
	}

	start := time.Now()
	// create super table
	createSuperTable(superTableConfigMap)
	log.Printf("create %d superTable ,used %d ms.\n", superTableNum, time.Since(start)/1e6)

张金富 已提交
228
	// create sub table
X
xieyinglin 已提交
229 230 231 232 233 234 235 236 237 238 239 240 241
	start = time.Now()
	createSubTable(subTableMap)
	log.Printf("create %d times of %d subtable ,all %d tables, used %d ms.\n", hnum, len(subTableMap), len(scaleTableMap), time.Since(start)/1e6)

	subTableNum := len(scaleTableMap)

	if subTableNum < thread {
		thread = subTableNum
	}

	filePerThread := subTableNum / thread
	leftFileNum := subTableNum % thread

dengyihao's avatar
TD-935  
dengyihao 已提交
242
	var wg sync.WaitGroup
X
xieyinglin 已提交
243 244 245

	start = time.Now()

X
xieyinglin 已提交
246 247
	successRows = make([]int64, thread)

X
xieyinglin 已提交
248 249 250 251 252 253 254 255
	startIndex, endIndex := 0, filePerThread
	for i := 0; i < thread; i++ {
		// start thread
		if i < leftFileNum {
			endIndex++
		}
		wg.Add(1)

X
xieyinglin 已提交
256
		go insertData(i, startIndex, endIndex, &wg, successRows)
X
xieyinglin 已提交
257 258 259
		startIndex, endIndex = endIndex, endIndex+filePerThread
	}

X
xieyinglin 已提交
260 261 262
	lastStaticTime = time.Now()
	timeTicker = time.NewTicker(time.Millisecond * time.Duration(tick))
	go staticSpeed()
X
xieyinglin 已提交
263 264
	wg.Wait()

dengyihao's avatar
TD-935  
dengyihao 已提交
265
	usedTime := time.Since(start)
X
xieyinglin 已提交
266

X
xieyinglin 已提交
267 268
	total := getTotalRows(successRows)

dengyihao's avatar
TD-935  
dengyihao 已提交
269
	log.Printf("finished insert %d rows, used %d ms, speed %d rows/s", total, usedTime/1e6, total*1e3/usedTime.Milliseconds())
X
xieyinglin 已提交
270 271 272

	if vnum == 0 {
		// continue waiting for insert data
dengyihao's avatar
TD-935  
dengyihao 已提交
273 274
		wait := make(chan string)
		v := <-wait
X
xieyinglin 已提交
275
		log.Printf("program receive %s, exited.\n", v)
dengyihao's avatar
TD-935  
dengyihao 已提交
276
	} else {
X
xieyinglin 已提交
277
		timeTicker.Stop()
X
xieyinglin 已提交
278 279 280 281
	}

}

dengyihao's avatar
TD-935  
dengyihao 已提交
282
func staticSpeed() {
X
xieyinglin 已提交
283 284 285 286 287

	connection := getConnection()
	defer connection.Close()

	if save == 1 {
张金富 已提交
288
		_, _ = connection.Exec("use " + db)
dengyihao's avatar
TD-935  
dengyihao 已提交
289
		_, err := connection.Exec("create table if not exists " + saveTable + "(ts timestamp, speed int)")
X
xieyinglin 已提交
290
		if err != nil {
X
xieyinglin 已提交
291
			log.Fatalf("create %s Table error: %s\n", saveTable, err)
X
xieyinglin 已提交
292 293 294 295 296
		}
	}

	for {
		<-timeTicker.C
dengyihao's avatar
TD-935  
dengyihao 已提交
297

X
xieyinglin 已提交
298 299
		currentTime := time.Now()
		usedTime := currentTime.UnixNano() - lastStaticTime.UnixNano()
dengyihao's avatar
TD-935  
dengyihao 已提交
300

X
xieyinglin 已提交
301 302
		total := getTotalRows(successRows)
		currentSuccessRows := total - lastTotalRows
dengyihao's avatar
TD-935  
dengyihao 已提交
303

张金富 已提交
304
		speed := currentSuccessRows * 1e9 / usedTime
X
xieyinglin 已提交
305 306 307
		log.Printf("insert %d rows, used %d ms, speed %d rows/s", currentSuccessRows, usedTime/1e6, speed)

		if save == 1 {
X
xieyinglin 已提交
308
			insertSql := fmt.Sprintf("insert into %s values(%d, %d)", saveTable, currentTime.UnixNano()/1e6, speed)
张金富 已提交
309
			_, _ = connection.Exec(insertSql)
X
xieyinglin 已提交
310
		}
dengyihao's avatar
TD-935  
dengyihao 已提交
311

X
xieyinglin 已提交
312 313 314 315 316 317
		lastStaticTime = currentTime
		lastTotalRows = total
	}

}

dengyihao's avatar
TD-935  
dengyihao 已提交
318
func getTotalRows(successRows []int64) int64 {
X
xieyinglin 已提交
319 320 321 322 323 324 325
	var total int64 = 0
	for j := 0; j < len(successRows); j++ {
		total += successRows[j]
	}
	return total
}

dengyihao's avatar
TD-935  
dengyihao 已提交
326
func getSuperTableTimeConfig(fileRows dataRows) (start, cycleTime, avgInterval int64) {
X
xieyinglin 已提交
327 328 329 330 331
	if auto == 1 {
		// use auto generate data time
		start = startTime
		avgInterval = interval
		maxTableRows := normalizationDataWithSameInterval(fileRows, avgInterval)
dengyihao's avatar
TD-935  
dengyihao 已提交
332
		cycleTime = maxTableRows*avgInterval + avgInterval
X
xieyinglin 已提交
333 334 335 336

	} else {

		// use the sample data primary timestamp
张金富 已提交
337
		sort.Sort(fileRows) // sort the file data by the primaryKey
X
xieyinglin 已提交
338 339 340 341
		minTime := getPrimaryKey(fileRows.rows[0][fileRows.config.Timestamp])
		maxTime := getPrimaryKey(fileRows.rows[len(fileRows.rows)-1][fileRows.config.Timestamp])

		start = minTime // default startTime use the minTime
张金富 已提交
342 343
		// 设置了start时间的话 按照start来
		if DefaultStartTime != startTime {
X
xieyinglin 已提交
344 345 346 347 348 349 350
			start = startTime
		}

		tableNum := normalizationData(fileRows, minTime)

		if minTime == maxTime {
			avgInterval = interval
dengyihao's avatar
TD-935  
dengyihao 已提交
351 352
			cycleTime = tableNum*avgInterval + avgInterval
		} else {
X
xieyinglin 已提交
353 354 355
			avgInterval = (maxTime - minTime) / int64(len(fileRows.rows)) * tableNum
			cycleTime = maxTime - minTime + avgInterval
		}
dengyihao's avatar
TD-935  
dengyihao 已提交
356

X
xieyinglin 已提交
357 358 359 360 361 362 363 364 365
	}
	return
}

func createSubTable(subTableMaps map[string]*dataRows) {

	connection := getConnection()
	defer connection.Close()

张金富 已提交
366
	_, _ = connection.Exec("use " + db)
X
xieyinglin 已提交
367 368

	createTablePrefix := "create table if not exists "
张金富 已提交
369
	var buffer bytes.Buffer
X
xieyinglin 已提交
370 371
	for subTableName := range subTableMaps {

张金富 已提交
372 373
		superTableName := getSuperTableName(subTableMaps[subTableName].config.StName)
		firstRowValues := subTableMaps[subTableName].rows[0] // the first rows values as tags
X
xieyinglin 已提交
374

张金富 已提交
375
		// create table t using superTable tags(...);
X
xieyinglin 已提交
376 377 378 379
		for i := 0; i < hnum; i++ {
			tableName := getScaleSubTableName(subTableName, i)

			scaleTableMap[tableName] = &scaleTableInfo{
dengyihao's avatar
TD-935  
dengyihao 已提交
380 381
				subTableName: subTableName,
				insertRows:   0,
X
xieyinglin 已提交
382 383 384
			}
			scaleTableNames = append(scaleTableNames, tableName)

张金富 已提交
385 386 387 388 389
			buffer.WriteString(createTablePrefix)
			buffer.WriteString(tableName)
			buffer.WriteString(" using ")
			buffer.WriteString(superTableName)
			buffer.WriteString(" tags(")
dengyihao's avatar
TD-935  
dengyihao 已提交
390
			for _, tag := range subTableMaps[subTableName].config.Tags {
张金富 已提交
391 392 393
				tagValue := fmt.Sprintf("%v", firstRowValues[strings.ToLower(tag.Name)])
				buffer.WriteString("'" + tagValue + "'")
				buffer.WriteString(",")
X
xieyinglin 已提交
394
			}
张金富 已提交
395 396
			buffer.Truncate(buffer.Len() - 1)
			buffer.WriteString(")")
X
xieyinglin 已提交
397

张金富 已提交
398 399
			createTableSql := buffer.String()
			buffer.Reset()
X
xieyinglin 已提交
400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420

			//log.Printf("create table: %s\n", createTableSql)
			_, err := connection.Exec(createTableSql)
			if err != nil {
				log.Fatalf("create table error: %s\n", err)
			}
		}
	}
}

func createSuperTable(superTableConfigMap map[string]*superTableConfig) {

	connection := getConnection()
	defer connection.Close()

	if dropdb == 1 {
		dropDbSql := "drop database if exists " + db
		_, err := connection.Exec(dropDbSql) // drop database if exists
		if err != nil {
			log.Fatalf("drop database error: %s\n", err)
		}
张金富 已提交
421
		log.Printf("dropdb: %s\n", dropDbSql)
X
xieyinglin 已提交
422 423 424 425 426 427 428 429 430 431
	}

	createDbSql := "create database if not exists " + db + " " + dbparam

	_, err := connection.Exec(createDbSql) // create database if not exists
	if err != nil {
		log.Fatalf("create database error: %s\n", err)
	}
	log.Printf("createDb: %s\n", createDbSql)

张金富 已提交
432
	_, _ = connection.Exec("use " + db)
X
xieyinglin 已提交
433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451

	prefix := "create table if not exists "
	var buffer bytes.Buffer
	//CREATE TABLE <stable_name> (<field_name> TIMESTAMP, field_name1 field_type,…) TAGS(tag_name tag_type, …)
	for key := range superTableConfigMap {

		buffer.WriteString(prefix)
		buffer.WriteString(getSuperTableName(key))
		buffer.WriteString("(")

		superTableConf := superTableConfigMap[key]

		buffer.WriteString(superTableConf.config.Timestamp)
		buffer.WriteString(" timestamp, ")

		for _, field := range superTableConf.config.Fields {
			buffer.WriteString(field.Name + " " + field.Type + ",")
		}

dengyihao's avatar
TD-935  
dengyihao 已提交
452
		buffer.Truncate(buffer.Len() - 1)
X
xieyinglin 已提交
453 454 455 456 457 458
		buffer.WriteString(") tags( ")

		for _, tag := range superTableConf.config.Tags {
			buffer.WriteString(tag.Name + " " + tag.Type + ",")
		}

dengyihao's avatar
TD-935  
dengyihao 已提交
459
		buffer.Truncate(buffer.Len() - 1)
X
xieyinglin 已提交
460 461 462 463 464
		buffer.WriteString(")")

		createSql := buffer.String()
		buffer.Reset()

张金富 已提交
465
		//log.Printf("superTable: %s\n", createSql)
X
xieyinglin 已提交
466 467 468 469 470 471 472 473
		_, err = connection.Exec(createSql)
		if err != nil {
			log.Fatalf("create supertable error: %s\n", err)
		}
	}

}

张金富 已提交
474 475
func getScaleSubTableName(subTableName string, hNum int) string {
	if hNum == 0 {
dengyihao's avatar
TD-935  
dengyihao 已提交
476
		return subTableName
X
xieyinglin 已提交
477
	}
张金富 已提交
478
	return fmt.Sprintf("%s_%d", subTableName, hNum)
X
xieyinglin 已提交
479 480
}

张金富 已提交
481 482
func getSuperTableName(stName string) string {
	return SuperTablePrefix + stName
X
xieyinglin 已提交
483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499
}

/**
* normalizationData , and return the num of subTables
 */
func normalizationData(fileRows dataRows, minTime int64) int64 {

	var tableNum int64 = 0
	for _, row := range fileRows.rows {
		// get subTableName
		tableValue := getSubTableNameValue(row[fileRows.config.SubTableName])
		if len(tableValue) == 0 {
			continue
		}

		row[fileRows.config.Timestamp] = getPrimaryKey(row[fileRows.config.Timestamp]) - minTime

张金富 已提交
500
		subTableName := getSubTableName(tableValue, fileRows.config.StName)
X
xieyinglin 已提交
501 502 503 504

		value, ok := subTableMap[subTableName]
		if !ok {
			subTableMap[subTableName] = &dataRows{
dengyihao's avatar
TD-935  
dengyihao 已提交
505 506
				rows:   []map[string]interface{}{row},
				config: fileRows.config,
X
xieyinglin 已提交
507 508 509
			}

			tableNum++
dengyihao's avatar
TD-935  
dengyihao 已提交
510
		} else {
X
xieyinglin 已提交
511 512 513 514 515 516 517
			value.rows = append(value.rows, row)
		}
	}
	return tableNum
}

// return the maximum table rows
dengyihao's avatar
TD-935  
dengyihao 已提交
518
func normalizationDataWithSameInterval(fileRows dataRows, avgInterval int64) int64 {
X
xieyinglin 已提交
519
	// subTableMap
dengyihao's avatar
TD-935  
dengyihao 已提交
520
	currSubTableMap := make(map[string]*dataRows)
X
xieyinglin 已提交
521 522 523 524 525 526 527
	for _, row := range fileRows.rows {
		// get subTableName
		tableValue := getSubTableNameValue(row[fileRows.config.SubTableName])
		if len(tableValue) == 0 {
			continue
		}

张金富 已提交
528
		subTableName := getSubTableName(tableValue, fileRows.config.StName)
X
xieyinglin 已提交
529 530 531 532 533

		value, ok := currSubTableMap[subTableName]
		if !ok {
			row[fileRows.config.Timestamp] = 0
			currSubTableMap[subTableName] = &dataRows{
dengyihao's avatar
TD-935  
dengyihao 已提交
534 535
				rows:   []map[string]interface{}{row},
				config: fileRows.config,
X
xieyinglin 已提交
536
			}
dengyihao's avatar
TD-935  
dengyihao 已提交
537
		} else {
X
xieyinglin 已提交
538 539 540 541 542 543
			row[fileRows.config.Timestamp] = int64(len(value.rows)) * avgInterval
			value.rows = append(value.rows, row)
		}

	}

张金富 已提交
544
	var maxRows, tableRows = 0, 0
dengyihao's avatar
TD-935  
dengyihao 已提交
545
	for tableName := range currSubTableMap {
X
xieyinglin 已提交
546 547 548 549 550 551 552 553 554 555
		tableRows = len(currSubTableMap[tableName].rows)
		subTableMap[tableName] = currSubTableMap[tableName] // add to global subTableMap
		if tableRows > maxRows {
			maxRows = tableRows
		}
	}

	return int64(maxRows)
}

dengyihao's avatar
TD-935  
dengyihao 已提交
556
func getSubTableName(subTableValue string, superTableName string) string {
张金富 已提交
557
	return SubTablePrefix + subTableValue + "_" + superTableName
X
xieyinglin 已提交
558 559
}

dengyihao's avatar
TD-935  
dengyihao 已提交
560
func insertData(threadIndex, start, end int, wg *sync.WaitGroup, successRows []int64) {
X
xieyinglin 已提交
561 562 563 564
	connection := getConnection()
	defer connection.Close()
	defer wg.Done()

张金富 已提交
565
	_, _ = connection.Exec("use " + db) // use db
X
xieyinglin 已提交
566

X
xieyinglin 已提交
567 568
	log.Printf("thread-%d start insert into [%d, %d) subtables.\n", threadIndex, start, end)

X
xieyinglin 已提交
569
	num := 0
X
xieyinglin 已提交
570
	subTables := scaleTableNames[start:end]
张金富 已提交
571
	var buffer bytes.Buffer
X
xieyinglin 已提交
572
	for {
X
xieyinglin 已提交
573 574 575
		var currSuccessRows int64
		var appendRows int
		var lastTableName string
X
xieyinglin 已提交
576

张金富 已提交
577
		buffer.WriteString(InsertPrefix)
X
xieyinglin 已提交
578 579 580 581 582

		for _, tableName := range subTables {

			subTableInfo := subTableMap[scaleTableMap[tableName].subTableName]
			subTableRows := int64(len(subTableInfo.rows))
张金富 已提交
583
			superTableConf := superTableConfigMap[subTableInfo.config.StName]
X
xieyinglin 已提交
584 585 586 587 588

			tableStartTime := superTableConf.startTime
			var tableEndTime int64
			if vnum == 0 {
				// need continue generate data
dengyihao's avatar
TD-935  
dengyihao 已提交
589 590 591
				tableEndTime = time.Now().UnixNano() / 1e6
			} else {
				tableEndTime = tableStartTime + superTableConf.cycleTime*int64(vnum) - superTableConf.avgInterval
X
xieyinglin 已提交
592 593 594 595 596 597 598 599 600
			}

			insertRows := scaleTableMap[tableName].insertRows

			for {
				loopNum := insertRows / subTableRows
				rowIndex := insertRows % subTableRows
				currentRow := subTableInfo.rows[rowIndex]

dengyihao's avatar
TD-935  
dengyihao 已提交
601
				currentTime := getPrimaryKey(currentRow[subTableInfo.config.Timestamp]) + loopNum*superTableConf.cycleTime + tableStartTime
X
xieyinglin 已提交
602 603
				if currentTime <= tableEndTime {
					// append
dengyihao's avatar
TD-935  
dengyihao 已提交
604

X
xieyinglin 已提交
605
					if lastTableName != tableName {
张金富 已提交
606 607
						buffer.WriteString(tableName)
						buffer.WriteString(" values")
X
xieyinglin 已提交
608 609 610
					}
					lastTableName = tableName

张金富 已提交
611 612 613
					buffer.WriteString("(")
					buffer.WriteString(fmt.Sprintf("%v", currentTime))
					buffer.WriteString(",")
dengyihao's avatar
TD-935  
dengyihao 已提交
614 615

					for _, field := range subTableInfo.config.Fields {
张金富 已提交
616 617
						buffer.WriteString(getFieldValue(currentRow[strings.ToLower(field.Name)]))
						buffer.WriteString(",")
X
xieyinglin 已提交
618
					}
X
xieyinglin 已提交
619

张金富 已提交
620 621
					buffer.Truncate(buffer.Len() - 1)
					buffer.WriteString(") ")
X
xieyinglin 已提交
622 623

					appendRows++
X
xieyinglin 已提交
624
					insertRows++
dengyihao's avatar
TD-935  
dengyihao 已提交
625
					if appendRows == batch {
张金富 已提交
626 627
						// executeBatch
						insertSql := buffer.String()
X
xieyinglin 已提交
628 629
						affectedRows := executeBatchInsert(insertSql, connection)

X
xieyinglin 已提交
630 631 632
						successRows[threadIndex] += affectedRows
						currSuccessRows += affectedRows

张金富 已提交
633 634
						buffer.Reset()
						buffer.WriteString(InsertPrefix)
X
xieyinglin 已提交
635 636
						lastTableName = ""
						appendRows = 0
X
xieyinglin 已提交
637
					}
dengyihao's avatar
TD-935  
dengyihao 已提交
638
				} else {
X
xieyinglin 已提交
639 640 641 642 643 644 645 646
					// finished insert current table
					break
				}
			}

			scaleTableMap[tableName].insertRows = insertRows

		}
dengyihao's avatar
TD-935  
dengyihao 已提交
647

X
xieyinglin 已提交
648
		// left := len(rows)
dengyihao's avatar
TD-935  
dengyihao 已提交
649
		if appendRows > 0 {
张金富 已提交
650 651
			// executeBatch
			insertSql := buffer.String()
X
xieyinglin 已提交
652
			affectedRows := executeBatchInsert(insertSql, connection)
dengyihao's avatar
TD-935  
dengyihao 已提交
653

X
xieyinglin 已提交
654 655 656
			successRows[threadIndex] += affectedRows
			currSuccessRows += affectedRows

张金富 已提交
657
			buffer.Reset()
X
xieyinglin 已提交
658 659
		}

X
xieyinglin 已提交
660
		// log.Printf("thread-%d finished insert %d rows, used %d ms.", threadIndex, currSuccessRows, time.Since(threadStartTime)/1e6)
X
xieyinglin 已提交
661 662 663

		if vnum != 0 {
			// thread finished insert data
X
xieyinglin 已提交
664
			// log.Printf("thread-%d exit\n", threadIndex)
X
xieyinglin 已提交
665 666 667
			break
		}

dengyihao's avatar
TD-935  
dengyihao 已提交
668
		if num == 0 {
X
xieyinglin 已提交
669 670
			wg.Done() //finished insert history data
			num++
X
xieyinglin 已提交
671 672
		}

X
xieyinglin 已提交
673 674 675 676
		if currSuccessRows == 0 {
			// log.Printf("thread-%d start to sleep %d ms.", threadIndex, delay)
			time.Sleep(time.Duration(delay) * time.Millisecond)
		}
X
xieyinglin 已提交
677

X
xieyinglin 已提交
678
		// need continue insert data
X
xieyinglin 已提交
679 680 681 682 683
	}

}

func executeBatchInsert(insertSql string, connection *sql.DB) int64 {
张金富 已提交
684 685 686
	result, err := connection.Exec(insertSql)
	if err != nil {
		log.Printf("execute insertSql %s error, %s\n", insertSql, err)
X
xieyinglin 已提交
687 688 689 690 691 692 693 694 695 696 697 698 699
		return 0
	}
	affected, _ := result.RowsAffected()
	if affected < 0 {
		affected = 0
	}
	return affected
}

func getFieldValue(fieldValue interface{}) string {
	return fmt.Sprintf("'%v'", fieldValue)
}

dengyihao's avatar
TD-935  
dengyihao 已提交
700
func getConnection() *sql.DB {
张金富 已提交
701
	db, err := sql.Open(DriverName, dataSourceName)
X
xieyinglin 已提交
702 703 704 705 706 707 708 709 710 711
	if err != nil {
		panic(err)
	}
	return db
}

func getSubTableNameValue(suffix interface{}) string {
	return fmt.Sprintf("%v", suffix)
}

张金富 已提交
712
func readFile(config dataImport.CaseConfig) dataRows {
X
xieyinglin 已提交
713
	fileFormat := strings.ToLower(config.Format)
张金富 已提交
714
	if fileFormat == JsonFormat {
X
xieyinglin 已提交
715
		return readJSONFile(config)
张金富 已提交
716
	} else if fileFormat == CsvFormat {
X
xieyinglin 已提交
717 718 719 720 721 722 723
		return readCSVFile(config)
	}

	log.Printf("the file %s is not supported yet\n", config.FilePath)
	return dataRows{}
}

张金富 已提交
724
func readCSVFile(config dataImport.CaseConfig) dataRows {
X
xieyinglin 已提交
725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743
	var rows dataRows
	f, err := os.Open(config.FilePath)
	if err != nil {
		log.Printf("Error: %s, %s\n", config.FilePath, err)
		return rows
	}
	defer f.Close()

	r := bufio.NewReader(f)

	//read the first line as title
	lineBytes, _, err := r.ReadLine()
	if err == io.EOF {
		log.Printf("the file %s is empty\n", config.FilePath)
		return rows
	}
	line := strings.ToLower(string(lineBytes))
	titles := strings.Split(line, config.Separator)
	if len(titles) < 3 {
张金富 已提交
744
		// need suffix、 primaryKey and at least one other field
X
xieyinglin 已提交
745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778
		log.Printf("the first line of file %s should be title row, and at least 3 field.\n", config.FilePath)
		return rows
	}

	rows.config = config

	var lineNum = 0
	for {
		// read data row
		lineBytes, _, err = r.ReadLine()
		lineNum++
		if err == io.EOF {
			break
		}
		// fmt.Println(line)
		rowData := strings.Split(string(lineBytes), config.Separator)

		dataMap := make(map[string]interface{})
		for i, title := range titles {
			title = strings.TrimSpace(title)
			if i < len(rowData) {
				dataMap[title] = strings.TrimSpace(rowData[i])
			} else {
				dataMap[title] = ""
			}
		}

		// if the suffix valid
		if !existMapKeyAndNotEmpty(config.Timestamp, dataMap) {
			log.Printf("the Timestamp[%s] of line %d is empty, will filtered.\n", config.Timestamp, lineNum)
			continue
		}

		// if the primary key valid
张金富 已提交
779
		primaryKeyValue := getPrimaryKeyMilliSec(config.Timestamp, config.TimestampType, config.TimestampTypeFormat, dataMap)
X
xieyinglin 已提交
780 781 782 783 784 785 786 787 788 789 790 791
		if primaryKeyValue == -1 {
			log.Printf("the Timestamp[%s] of line %d is not valid, will filtered.\n", config.Timestamp, lineNum)
			continue
		}

		dataMap[config.Timestamp] = primaryKeyValue

		rows.rows = append(rows.rows, dataMap)
	}
	return rows
}

张金富 已提交
792
func readJSONFile(config dataImport.CaseConfig) dataRows {
X
xieyinglin 已提交
793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829

	var rows dataRows
	f, err := os.Open(config.FilePath)
	if err != nil {
		log.Printf("Error: %s, %s\n", config.FilePath, err)
		return rows
	}
	defer f.Close()

	r := bufio.NewReader(f)
	//log.Printf("file size %d\n", r.Size())

	rows.config = config
	var lineNum = 0
	for {
		lineBytes, _, err := r.ReadLine()
		lineNum++
		if err == io.EOF {
			break
		}

		line := make(map[string]interface{})
		err = json.Unmarshal(lineBytes, &line)

		if err != nil {
			log.Printf("line [%d] of file %s parse error, reason:  %s\n", lineNum, config.FilePath, err)
			continue
		}

		// transfer the key to lowercase
		lowerMapKey(line)

		if !existMapKeyAndNotEmpty(config.SubTableName, line) {
			log.Printf("the SubTableName[%s] of line %d is empty, will filtered.\n", config.SubTableName, lineNum)
			continue
		}

张金富 已提交
830
		primaryKeyValue := getPrimaryKeyMilliSec(config.Timestamp, config.TimestampType, config.TimestampTypeFormat, line)
X
xieyinglin 已提交
831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846
		if primaryKeyValue == -1 {
			log.Printf("the Timestamp[%s] of line %d is not valid, will filtered.\n", config.Timestamp, lineNum)
			continue
		}

		line[config.Timestamp] = primaryKeyValue

		rows.rows = append(rows.rows, line)
	}

	return rows
}

/**
* get primary key as millisecond , otherwise return -1
 */
张金富 已提交
847
func getPrimaryKeyMilliSec(key string, valueType string, valueFormat string, line map[string]interface{}) int64 {
X
xieyinglin 已提交
848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876
	if !existMapKeyAndNotEmpty(key, line) {
		return -1
	}
	if DATETIME == valueType {
		// transfer the datetime to milliseconds
		return parseMillisecond(line[key], valueFormat)
	}

	value, err := strconv.ParseInt(fmt.Sprintf("%v", line[key]), 10, 64)
	// as millisecond num
	if err != nil {
		return -1
	}
	return value
}

// parseMillisecond parse the dateStr to millisecond, return -1 if failed
func parseMillisecond(str interface{}, layout string) int64 {
	value, ok := str.(string)
	if !ok {
		return -1
	}

	t, err := time.ParseInLocation(layout, strings.TrimSpace(value), time.Local)

	if err != nil {
		log.Println(err)
		return -1
	}
dengyihao's avatar
TD-935  
dengyihao 已提交
877
	return t.UnixNano() / 1e6
X
xieyinglin 已提交
878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901
}

// lowerMapKey transfer all the map key to lowercase
func lowerMapKey(maps map[string]interface{}) {
	for key := range maps {
		value := maps[key]
		delete(maps, key)
		maps[strings.ToLower(key)] = value
	}
}

func existMapKeyAndNotEmpty(key string, maps map[string]interface{}) bool {
	value, ok := maps[key]
	if !ok {
		return false
	}

	str, err := value.(string)
	if err && len(str) == 0 {
		return false
	}
	return true
}

张金富 已提交
902
func checkUserCaseConfig(caseName string, caseConfig *dataImport.CaseConfig) {
X
xieyinglin 已提交
903

张金富 已提交
904
	if len(caseConfig.StName) == 0 {
X
xieyinglin 已提交
905 906 907
		log.Fatalf("the stname of case %s can't be empty\n", caseName)
	}

张金富 已提交
908
	caseConfig.StName = strings.ToLower(caseConfig.StName)
X
xieyinglin 已提交
909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935

	if len(caseConfig.Tags) == 0 {
		log.Fatalf("the tags of case %s can't be empty\n", caseName)
	}

	if len(caseConfig.Fields) == 0 {
		log.Fatalf("the fields of case %s can't be empty\n", caseName)
	}

	if len(caseConfig.SubTableName) == 0 {
		log.Fatalf("the suffix of case %s can't be empty\n", caseName)
	}

	caseConfig.SubTableName = strings.ToLower(caseConfig.SubTableName)

	caseConfig.Timestamp = strings.ToLower(caseConfig.Timestamp)

	var timestampExist = false
	for i, field := range caseConfig.Fields {
		if strings.EqualFold(field.Name, caseConfig.Timestamp) {
			if strings.ToLower(field.Type) != TIMESTAMP {
				log.Fatalf("case %s's primaryKey %s field type is %s, it must be timestamp\n", caseName, caseConfig.Timestamp, field.Type)
			}
			timestampExist = true
			if i < len(caseConfig.Fields)-1 {
				// delete middle item,  a = a[:i+copy(a[i:], a[i+1:])]
				caseConfig.Fields = caseConfig.Fields[:i+copy(caseConfig.Fields[i:], caseConfig.Fields[i+1:])]
dengyihao's avatar
TD-935  
dengyihao 已提交
936
			} else {
X
xieyinglin 已提交
937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959
				// delete the last item
				caseConfig.Fields = caseConfig.Fields[:len(caseConfig.Fields)-1]
			}
			break
		}
	}

	if !timestampExist {
		log.Fatalf("case %s primaryKey %s is not exist in fields\n", caseName, caseConfig.Timestamp)
	}

	caseConfig.TimestampType = strings.ToLower(caseConfig.TimestampType)
	if caseConfig.TimestampType != MILLISECOND && caseConfig.TimestampType != DATETIME {
		log.Fatalf("case %s's timestampType %s error, only can be timestamp or datetime\n", caseName, caseConfig.TimestampType)
	}

	if caseConfig.TimestampType == DATETIME && len(caseConfig.TimestampTypeFormat) == 0 {
		log.Fatalf("case %s's timestampTypeFormat %s can't be empty when timestampType is datetime\n", caseName, caseConfig.TimestampTypeFormat)
	}

}

func parseArg() {
张金富 已提交
960 961
	flag.StringVar(&cfg, "cfg", "config/cfg.toml", "configuration file which describes useCase and data format.")
	flag.StringVar(&cases, "cases", "sensor_info", "useCase for dataset to be imported. Multiple choices can be separated by comma, for example, -cases sensor_info,camera_detection.")
X
xieyinglin 已提交
962 963
	flag.IntVar(&hnum, "hnum", 100, "magnification factor of the sample tables. For example, if hnum is 100 and in the sample data there are 10 tables, then 10x100=1000 tables will be created in the database.")
	flag.IntVar(&vnum, "vnum", 1000, "copies of the sample records in each table. If set to 0,this program will never stop simulating and importing data even if the timestamp has passed current time.")
张金富 已提交
964
	flag.Int64Var(&delay, "delay", DefaultDelay, "the delay time interval(millisecond) to continue generating data when vnum set 0.")
X
xieyinglin 已提交
965 966
	flag.Int64Var(&tick, "tick", 2000, "the tick time interval(millisecond) to print statistic info.")
	flag.IntVar(&save, "save", 0, "whether to save the statistical info into 'statistic' table. 0 is disabled and 1 is enabled.")
张金富 已提交
967
	flag.StringVar(&saveTable, "savetb", DefaultStatisticTable, "the table to save 'statistic' info when save set 1.")
X
xieyinglin 已提交
968 969
	flag.IntVar(&thread, "thread", 10, "number of threads to import data.")
	flag.IntVar(&batch, "batch", 100, "rows of records in one import batch.")
张金富 已提交
970 971 972
	flag.IntVar(&auto, "auto", 0, "whether to use the startTime and interval specified by users when simulating the data. 0 is disabled and 1 is enabled.")
	flag.StringVar(&startTimeStr, "start", "", "the starting timestamp of simulated data, in the format of yyyy-MM-dd HH:mm:ss.SSS. If not specified, the earliest timestamp in the sample data will be set as the startTime.")
	flag.Int64Var(&interval, "interval", DefaultInterval, "time interval between two consecutive records, in the unit of millisecond. Only valid when auto is 1.")
X
xieyinglin 已提交
973 974 975 976
	flag.StringVar(&host, "host", "127.0.0.1", "tdengine server ip.")
	flag.IntVar(&port, "port", 6030, "tdengine server port.")
	flag.StringVar(&user, "user", "root", "user name to login into the database.")
	flag.StringVar(&password, "password", "taosdata", "the import tdengine user password")
张金富 已提交
977
	flag.IntVar(&dropdb, "dropdb", 0, "whether to drop the existing database. 1 is yes and 0 otherwise.")
X
xieyinglin 已提交
978 979 980 981 982 983
	flag.StringVar(&db, "db", "", "name of the database to store data.")
	flag.StringVar(&dbparam, "dbparam", "", "database configurations when it is created.")

	flag.Parse()
}

dengyihao's avatar
TD-935  
dengyihao 已提交
984
func printArg() {
X
xieyinglin 已提交
985 986 987 988 989
	fmt.Println("used param: ")
	fmt.Println("-cfg: ", cfg)
	fmt.Println("-cases:", cases)
	fmt.Println("-hnum:", hnum)
	fmt.Println("-vnum:", vnum)
X
xieyinglin 已提交
990 991 992
	fmt.Println("-delay:", delay)
	fmt.Println("-tick:", tick)
	fmt.Println("-save:", save)
X
xieyinglin 已提交
993
	fmt.Println("-savetb:", saveTable)
X
xieyinglin 已提交
994 995 996
	fmt.Println("-thread:", thread)
	fmt.Println("-batch:", batch)
	fmt.Println("-auto:", auto)
张金富 已提交
997
	fmt.Println("-start:", startTimeStr)
X
xieyinglin 已提交
998 999 1000 1001 1002 1003 1004 1005 1006
	fmt.Println("-interval:", interval)
	fmt.Println("-host:", host)
	fmt.Println("-port", port)
	fmt.Println("-user", user)
	fmt.Println("-password", password)
	fmt.Println("-dropdb", dropdb)
	fmt.Println("-db", db)
	fmt.Println("-dbparam", dbparam)
}