diff --git a/importSampleData/README.md b/importSampleData/README.md index f97a31bc3150a7e904576f2b70f5bb2f28bd72cb..0678676d4e85d568068dfa138904baf7a8ef03e5 100644 --- a/importSampleData/README.md +++ b/importSampleData/README.md @@ -1,6 +1,6 @@ ## 样例数据导入 -该工具可以根据用户提供的 `json` 或 `csv` 格式样例数据文件快速导入 `TDengine`,目前仅能在Linux上运行。 +该工具可以根据用户提供的 `json` 或 `csv` 格式样例数据文件快速导入 `TDengine`,目前仅能在 Linux 上运行。 为了体验写入和查询性能,可以对样例数据进行横向、纵向扩展。横向扩展是指将一个表(监测点)的数据克隆到多张表,纵向扩展是指将样例数据中的一段时间范围内的数据在时间轴上复制。该工具还支持历史数据导入至当前时间后持续导入,这样可以测试插入和查询并行进行的场景,以模拟真实环境。 @@ -83,11 +83,19 @@ go build -o bin/taosimport app/main.go * -vnum int - 需要将样例数据进行纵向扩展的次数,如果设置为 0 代表将历史数据导入至当前时间后持续按照指定间隔导入。默认为 1000,表示将样例数据在时间轴上纵向复制1000次。 + 需要将样例数据进行纵向扩展的次数,如果设置为 0 代表将历史数据导入至当前时间后持续按照指定间隔导入。默认为 1000,表示将样例数据在时间轴上纵向复制1000 次。 * -delay int - 当 vnum 设置为 0 时持续导入的时间间隔,默认 3000 ms。 + 当 vnum 设置为 0 时持续导入的时间间隔,默认为所有场景中最小记录间隔时间的一半,单位 ms。 + +* -tick int + + 打印统计信息的时间间隔,默认 2000 ms。 + +* -save int + + 是否保存统计信息到 tdengine 的 statistic 表中,1 是,0 否, 默认 0。 * -auto int diff --git a/importSampleData/app/main.go b/importSampleData/app/main.go index 04cef9fda9739efa6d03855162e586c5d1a61c51..087b7bb7614e8a03da0ce9fae0c5693340314236 100644 --- a/importSampleData/app/main.go +++ b/importSampleData/app/main.go @@ -16,7 +16,6 @@ import ( "strconv" "strings" "sync" - "sync/atomic" "time" _ "github.com/taosdata/TDengine/src/connector/go/src/taosSql" @@ -28,6 +27,7 @@ const ( MILLISECOND = "millisecond" DEFAULT_STARTTIME int64 = -1 DEFAULT_INTERVAL int64 = 1*1000 + DEFAULT_DELAY int64 = -1 JSON_FORMAT = "json" CSV_FORMAT = "csv" @@ -37,6 +37,7 @@ const ( DRIVER_NAME = "taosSql" STARTTIME_LAYOUT = "2006-01-02 15:04:05.000" INSERT_PREFIX = "insert into " + STATISTIC_TABLE = "statistic" ) var ( @@ -67,8 +68,13 @@ var ( scaleTableMap = make(map[string]*scaleTableInfo) - totalSuccessRows int64 + successRows []int64 + lastStaticTime time.Time + lastTotalRows int64 + timeTicker *time.Ticker delay int64 // default 10 milliseconds + tick int64 + save int ) type superTableConfig struct { @@ -149,6 +155,8 @@ func main() { importConfig := dataimport.LoadConfig(cfg) + var caseMinumInterval int64 = -1 + for _, userCase := range strings.Split(cases, ",") { caseConfig, ok := importConfig.UserCases[userCase] @@ -182,10 +190,23 @@ func main() { superTableConfigMap[caseConfig.Stname].avgInterval = avgInterval superTableConfigMap[caseConfig.Stname].cycleTime = cycleTime + if caseMinumInterval == -1 || caseMinumInterval > avgInterval { + caseMinumInterval = avgInterval + } + startStr := time.Unix(0, start*int64(time.Millisecond)).Format(STARTTIME_LAYOUT) log.Printf("case [%s] startTime %s(%d), average dataInterval %d ms, cycleTime %d ms.\n", userCase, startStr, start, avgInterval, cycleTime) } + if DEFAULT_DELAY == delay { + // default delay + delay = caseMinumInterval / 2 + if delay < 1 { + delay = 1 + } + log.Printf("actual delay is %d ms.", delay) + } + superTableNum := len(superTableConfigMap) if superTableNum == 0 { log.Fatalln("no valid file, exited") @@ -214,6 +235,8 @@ func main() { start = time.Now() + successRows = make([]int64, thread) + startIndex, endIndex := 0, filePerThread for i := 0; i < thread; i++ { // start thread @@ -222,25 +245,76 @@ func main() { } wg.Add(1) - go insertData(i, startIndex, endIndex, &wg) + go insertData(i, startIndex, endIndex, &wg, successRows) startIndex, endIndex = endIndex, endIndex+filePerThread } + lastStaticTime = time.Now() + timeTicker = time.NewTicker(time.Millisecond * time.Duration(tick)) + go staticSpeed() wg.Wait() usedTime := time.Since(start) - log.Printf("finished insert %d rows, used %d ms, speed %d rows/s", totalSuccessRows, usedTime/1e6, totalSuccessRows * 1e9 / int64(usedTime)) + total := getTotalRows(successRows) + + log.Printf("finished insert %d rows, used %d ms, speed %d rows/s", total, usedTime/1e6, total * 1e9 / int64(usedTime)) if vnum == 0 { // continue waiting for insert data wait := make(chan string) v := <- wait log.Printf("program receive %s, exited.\n", v) + }else{ + timeTicker.Stop() } } +func staticSpeed(){ + + connection := getConnection() + defer connection.Close() + + if save == 1 { + connection.Exec("use " + db) + _, err := connection.Exec("create table if not exists " + STATISTIC_TABLE +"(ts timestamp, speed int)") + if err != nil { + log.Fatalf("create %s Table error: %s\n", STATISTIC_TABLE, err) + } + } + + for { + <-timeTicker.C + + currentTime := time.Now() + usedTime := currentTime.UnixNano() - lastStaticTime.UnixNano() + + total := getTotalRows(successRows) + currentSuccessRows := total - lastTotalRows + + speed := currentSuccessRows * 1e9 / int64(usedTime) + log.Printf("insert %d rows, used %d ms, speed %d rows/s", currentSuccessRows, usedTime/1e6, speed) + + if save == 1 { + insertSql := fmt.Sprintf("insert into %s values(%d, %d)", STATISTIC_TABLE, currentTime.UnixNano()/1e6, speed) + connection.Exec(insertSql) + } + + lastStaticTime = currentTime + lastTotalRows = total + } + +} + +func getTotalRows(successRows []int64) int64{ + var total int64 = 0 + for j := 0; j < len(successRows); j++ { + total += successRows[j] + } + return total +} + func getSuperTableTimeConfig(fileRows dataRows) (start, cycleTime, avgInterval int64){ if auto == 1 { // use auto generate data time @@ -275,6 +349,15 @@ func getSuperTableTimeConfig(fileRows dataRows) (start, cycleTime, avgInterval i return } +func createStatisticTable(){ + connection := getConnection() + defer connection.Close() + + _, err := connection.Exec("create table if not exist " + db + "."+ STATISTIC_TABLE +"(ts timestamp, speed int)") + if err != nil { + log.Fatalf("createStatisticTable error: %s\n", err) + } +} func createSubTable(subTableMaps map[string]*dataRows) { @@ -478,24 +561,25 @@ func getSubTableName(subTableValue string, superTableName string) string { } -func insertData(threadIndex, start, end int, wg *sync.WaitGroup) { +func insertData(threadIndex, start, end int, wg *sync.WaitGroup, successRows []int64) { connection := getConnection() defer connection.Close() defer wg.Done() connection.Exec("use " + db) // use db + log.Printf("thread-%d start insert into [%d, %d) subtables.\n", threadIndex, start, end) + num := 0 + subTables := scaleTableNames[start:end] for { - log.Printf("thread-%d start insert into [%d, %d) subtables.\n", threadIndex, start, end) - - threadStartTime := time.Now() + var currSuccessRows int64 + var appendRows int + var lastTableName string - var successRows int64 - - var rows []tableRows + buffers := bytes.Buffer{} + buffers.WriteString(INSERT_PREFIX) - subTables := scaleTableNames[start:end] for _, tableName := range subTables { subTableInfo := subTableMap[scaleTableMap[tableName].subTableName] @@ -507,7 +591,7 @@ func insertData(threadIndex, start, end int, wg *sync.WaitGroup) { if vnum == 0 { // need continue generate data tableEndTime = time.Now().UnixNano()/1e6 - }else { + }else { tableEndTime = tableStartTime + superTableConf.cycleTime * int64(vnum) - superTableConf.avgInterval } @@ -521,17 +605,43 @@ func insertData(threadIndex, start, end int, wg *sync.WaitGroup) { currentTime := getPrimaryKey(currentRow[subTableInfo.config.Timestamp]) + loopNum * superTableConf.cycleTime + tableStartTime if currentTime <= tableEndTime { // append - row := buildRow(tableName, currentTime, subTableInfo, currentRow) - rows = append(rows, row) + + if lastTableName != tableName { + buffers.WriteString(tableName) + buffers.WriteString(" values") + } + lastTableName = tableName + + buffers.WriteString("(") + buffers.WriteString(fmt.Sprintf("%v", currentTime)) + buffers.WriteString(",") + + // fieldNum := len(subTableInfo.config.Fields) + for _,field := range subTableInfo.config.Fields { + buffers.WriteString(getFieldValue(currentRow[strings.ToLower(field.Name)])) + buffers.WriteString(",") + // if( i != fieldNum -1){ + + // } + } + buffers.Truncate(buffers.Len()-1) + buffers.WriteString(") ") + + appendRows++ insertRows++ - if len(rows) == batch { + if appendRows == batch { // executebatch - insertSql := buildSql(rows) + insertSql := buffers.String() affectedRows := executeBatchInsert(insertSql, connection) - successRows = atomic.AddInt64(&successRows, affectedRows) - rows = []tableRows{} + successRows[threadIndex] += affectedRows + currSuccessRows += affectedRows + + buffers.Reset() + buffers.WriteString(INSERT_PREFIX) + lastTableName = "" + appendRows = 0 } }else { // finished insert current table @@ -543,31 +653,37 @@ func insertData(threadIndex, start, end int, wg *sync.WaitGroup) { } - left := len(rows) - if left > 0 { + // left := len(rows) + if appendRows > 0 { // executebatch - insertSql := buildSql(rows) + insertSql := buffers.String() affectedRows := executeBatchInsert(insertSql, connection) - successRows = atomic.AddInt64(&successRows, affectedRows) + + successRows[threadIndex] += affectedRows + currSuccessRows += affectedRows + + buffers.Reset() } - atomic.AddInt64(&totalSuccessRows, successRows) - log.Printf("thread-%d finished insert %d rows, used %d ms.", threadIndex, successRows, time.Since(threadStartTime)/1e6) + // log.Printf("thread-%d finished insert %d rows, used %d ms.", threadIndex, currSuccessRows, time.Since(threadStartTime)/1e6) if vnum != 0 { // thread finished insert data + // log.Printf("thread-%d exit\n", threadIndex) break } if(num == 0){ - wg.Done() // finished insert history data + wg.Done() //finished insert history data + num++ } - num++ - // need continue insert data - // log.Printf("thread-%d start to sleep %d ms.", threadIndex, delay) - time.Sleep(time.Duration(delay) * time.Millisecond) + if currSuccessRows == 0 { + // log.Printf("thread-%d start to sleep %d ms.", threadIndex, delay) + time.Sleep(time.Duration(delay) * time.Millisecond) + } + // need continue insert data } } @@ -638,6 +754,7 @@ func executeBatchInsert(insertSql string, connection *sql.DB) int64 { affected = 0 } return affected + // return 0 } func getFieldValue(fieldValue interface{}) string { @@ -917,7 +1034,9 @@ func parseArg() { flag.StringVar(&cases, "cases", "sensor_info", "usecase for dataset to be imported. Multiple choices can be separated by comma, for example, -cases sensor_info,camera_detection.") flag.IntVar(&hnum, "hnum", 100, "magnification factor of the sample tables. For example, if hnum is 100 and in the sample data there are 10 tables, then 10x100=1000 tables will be created in the database.") flag.IntVar(&vnum, "vnum", 1000, "copies of the sample records in each table. If set to 0,this program will never stop simulating and importing data even if the timestamp has passed current time.") - flag.Int64Var(&delay, "delay", 3 * 1000, "the delay millisecond to continue generate data when vnum set 0.") + flag.Int64Var(&delay, "delay", DEFAULT_DELAY, "the delay time interval(millisecond) to continue generating data when vnum set 0.") + flag.Int64Var(&tick, "tick", 2000, "the tick time interval(millisecond) to print statistic info.") + flag.IntVar(&save, "save", 0, "whether to save the statistical info into 'statistic' table. 0 is disabled and 1 is enabled.") flag.IntVar(&thread, "thread", 10, "number of threads to import data.") flag.IntVar(&batch, "batch", 100, "rows of records in one import batch.") flag.IntVar(&auto, "auto", 0, "whether to use the starttime and interval specified by users when simulating the data. 0 is disabled and 1 is enabled.") @@ -940,12 +1059,14 @@ func printArg() { fmt.Println("-cases:", cases) fmt.Println("-hnum:", hnum) fmt.Println("-vnum:", vnum) + fmt.Println("-delay:", delay) + fmt.Println("-tick:", tick) + fmt.Println("-save:", save) fmt.Println("-thread:", thread) fmt.Println("-batch:", batch) fmt.Println("-auto:", auto) fmt.Println("-start:", starttimestr) fmt.Println("-interval:", interval) - fmt.Println("-delay:", delay) fmt.Println("-host:", host) fmt.Println("-port", port) fmt.Println("-user", user) diff --git a/importSampleData/bin/taosimport b/importSampleData/bin/taosimport index 32c20fa7355750305d28e4c0b3d4280eb7754db1..1cb3c12926ec6657190471ea590e79f4a6b191b6 100755 Binary files a/importSampleData/bin/taosimport and b/importSampleData/bin/taosimport differ