From 93bd2740e644ffd28df705ca1836042ed3202750 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 16 Jul 2020 20:07:15 +0000 Subject: [PATCH] TD-935 --- importSampleData/app/main.go | 248 +++++++++++++++++------------------ 1 file changed, 122 insertions(+), 126 deletions(-) diff --git a/importSampleData/app/main.go b/importSampleData/app/main.go index 6996047026..61de6e740c 100644 --- a/importSampleData/app/main.go +++ b/importSampleData/app/main.go @@ -7,7 +7,6 @@ import ( "encoding/json" "flag" "fmt" - "github.com/taosdata/TDengine/importSampleData/import" "hash/crc32" "io" "log" @@ -18,88 +17,89 @@ import ( "sync" "time" + dataimport "github.com/taosdata/TDengine/importSampleData/import" + _ "github.com/taosdata/driver-go/taosSql" ) const ( - TIMESTAMP = "timestamp" - DATETIME = "datetime" - MILLISECOND = "millisecond" - DEFAULT_STARTTIME int64 = -1 - DEFAULT_INTERVAL int64 = 1*1000 - DEFAULT_DELAY int64 = -1 - DEFAULT_STATISTIC_TABLE = "statistic" - - JSON_FORMAT = "json" - CSV_FORMAT = "csv" + TIMESTAMP = "timestamp" + DATETIME = "datetime" + MILLISECOND = "millisecond" + DEFAULT_STARTTIME int64 = -1 + DEFAULT_INTERVAL int64 = 1 * 1000 + DEFAULT_DELAY int64 = -1 + DEFAULT_STATISTIC_TABLE = "statistic" + + JSON_FORMAT = "json" + CSV_FORMAT = "csv" SUPERTABLE_PREFIX = "s_" - SUBTABLE_PREFIX = "t_" + SUBTABLE_PREFIX = "t_" - DRIVER_NAME = "taosSql" + DRIVER_NAME = "taosSql" STARTTIME_LAYOUT = "2006-01-02 15:04:05.000" - INSERT_PREFIX = "insert into " + INSERT_PREFIX = "insert into " ) var ( - - cfg string - cases string - hnum int - vnum int - thread int - batch int - auto int - starttimestr string - interval int64 - host string - port int - user string - password string - dropdb int - db string - dbparam string + cfg string + cases string + hnum int + vnum int + thread int + batch int + auto int + starttimestr string + interval int64 + host string + port int + user string + password string + dropdb int + db string + dbparam string dataSourceName string - startTime int64 + startTime int64 - superTableConfigMap = make(map[string]*superTableConfig) - subTableMap = make(map[string]*dataRows) - scaleTableNames []string + superTableConfigMap = make(map[string]*superTableConfig) + subTableMap = make(map[string]*dataRows) + scaleTableNames []string scaleTableMap = make(map[string]*scaleTableInfo) - successRows []int64 + successRows []int64 lastStaticTime time.Time - lastTotalRows int64 - timeTicker *time.Ticker - delay int64 // default 10 milliseconds - tick int64 - save int - saveTable string + lastTotalRows int64 + timeTicker *time.Ticker + delay int64 // default 10 milliseconds + tick int64 + save int + saveTable string ) type superTableConfig struct { - startTime int64 - endTime int64 - cycleTime int64 + startTime int64 + endTime int64 + cycleTime int64 avgInterval int64 - config dataimport.CaseConfig + config dataimport.CaseConfig } type scaleTableInfo struct { scaleTableName string - subTableName string - insertRows int64 + subTableName string + insertRows int64 } type tableRows struct { - tableName string // tableName - value string // values(...) + tableName string // tableName + value string // values(...) } type dataRows struct { - rows []map[string]interface{} - config dataimport.CaseConfig + rows []map[string]interface{} + config dataimport.CaseConfig } func (rows dataRows) Len() int { @@ -127,10 +127,10 @@ func init() { if db == "" { //db = "go" - db = fmt.Sprintf("test_%s",time.Now().Format("20060102")) + db = fmt.Sprintf("test_%s", time.Now().Format("20060102")) } - if auto == 1 && len(starttimestr) == 0 { + if auto == 1 && len(starttimestr) == 0 { log.Fatalf("startTime must be set when auto is 1, the format is \"yyyy-MM-dd HH:mm:ss.SSS\" ") } @@ -141,7 +141,7 @@ func init() { } startTime = t.UnixNano() / 1e6 // as millisecond - }else{ + } else { startTime = DEFAULT_STARTTIME } @@ -179,7 +179,7 @@ func main() { _, exists := superTableConfigMap[caseConfig.Stname] if !exists { - superTableConfigMap[caseConfig.Stname] = &superTableConfig{config:caseConfig} + superTableConfigMap[caseConfig.Stname] = &superTableConfig{config: caseConfig} } else { log.Fatalf("the stname of case %s already exist.\n", caseConfig.Stname) } @@ -201,9 +201,9 @@ func main() { if DEFAULT_DELAY == delay { // default delay - delay = caseMinumInterval / 2 + delay = caseMinumInterval / 2 if delay < 1 { - delay = 1 + delay = 1 } log.Printf("actual delay is %d ms.", delay) } @@ -232,7 +232,7 @@ func main() { filePerThread := subTableNum / thread leftFileNum := subTableNum % thread - var wg sync.WaitGroup + var wg sync.WaitGroup start = time.Now() @@ -255,31 +255,31 @@ func main() { go staticSpeed() wg.Wait() - usedTime := time.Since(start) + usedTime := time.Since(start) total := getTotalRows(successRows) - log.Printf("finished insert %d rows, used %d ms, speed %d rows/s", total, usedTime/1e6, total * 1e9 / int64(usedTime)) + log.Printf("finished insert %d rows, used %d ms, speed %d rows/s", total, usedTime/1e6, total*1e3/usedTime.Milliseconds()) if vnum == 0 { // continue waiting for insert data - wait := make(chan string) - v := <- wait + wait := make(chan string) + v := <-wait log.Printf("program receive %s, exited.\n", v) - }else{ + } else { timeTicker.Stop() } } -func staticSpeed(){ +func staticSpeed() { connection := getConnection() defer connection.Close() if save == 1 { connection.Exec("use " + db) - _, err := connection.Exec("create table if not exists " + saveTable +"(ts timestamp, speed int)") + _, err := connection.Exec("create table if not exists " + saveTable + "(ts timestamp, speed int)") if err != nil { log.Fatalf("create %s Table error: %s\n", saveTable, err) } @@ -287,13 +287,13 @@ func staticSpeed(){ for { <-timeTicker.C - + currentTime := time.Now() usedTime := currentTime.UnixNano() - lastStaticTime.UnixNano() - + total := getTotalRows(successRows) currentSuccessRows := total - lastTotalRows - + speed := currentSuccessRows * 1e9 / int64(usedTime) log.Printf("insert %d rows, used %d ms, speed %d rows/s", currentSuccessRows, usedTime/1e6, speed) @@ -301,14 +301,14 @@ func staticSpeed(){ insertSql := fmt.Sprintf("insert into %s values(%d, %d)", saveTable, currentTime.UnixNano()/1e6, speed) connection.Exec(insertSql) } - + lastStaticTime = currentTime lastTotalRows = total } } -func getTotalRows(successRows []int64) int64{ +func getTotalRows(successRows []int64) int64 { var total int64 = 0 for j := 0; j < len(successRows); j++ { total += successRows[j] @@ -316,18 +316,18 @@ func getTotalRows(successRows []int64) int64{ return total } -func getSuperTableTimeConfig(fileRows dataRows) (start, cycleTime, avgInterval int64){ +func getSuperTableTimeConfig(fileRows dataRows) (start, cycleTime, avgInterval int64) { if auto == 1 { // use auto generate data time start = startTime avgInterval = interval maxTableRows := normalizationDataWithSameInterval(fileRows, avgInterval) - cycleTime = maxTableRows * avgInterval + avgInterval + cycleTime = maxTableRows*avgInterval + avgInterval } else { // use the sample data primary timestamp - sort.Sort(fileRows)// sort the file data by the primarykey + sort.Sort(fileRows) // sort the file data by the primarykey minTime := getPrimaryKey(fileRows.rows[0][fileRows.config.Timestamp]) maxTime := getPrimaryKey(fileRows.rows[len(fileRows.rows)-1][fileRows.config.Timestamp]) @@ -340,21 +340,21 @@ func getSuperTableTimeConfig(fileRows dataRows) (start, cycleTime, avgInterval i if minTime == maxTime { avgInterval = interval - cycleTime = tableNum * avgInterval + avgInterval - }else{ + cycleTime = tableNum*avgInterval + avgInterval + } else { avgInterval = (maxTime - minTime) / int64(len(fileRows.rows)) * tableNum cycleTime = maxTime - minTime + avgInterval } - + } return } -func createStatisticTable(){ +func createStatisticTable() { connection := getConnection() defer connection.Close() - _, err := connection.Exec("create table if not exist " + db + "."+ saveTable +"(ts timestamp, speed int)") + _, err := connection.Exec("create table if not exist " + db + "." + saveTable + "(ts timestamp, speed int)") if err != nil { log.Fatalf("createStatisticTable error: %s\n", err) } @@ -379,8 +379,8 @@ func createSubTable(subTableMaps map[string]*dataRows) { tableName := getScaleSubTableName(subTableName, i) scaleTableMap[tableName] = &scaleTableInfo{ - subTableName: subTableName, - insertRows: 0, + subTableName: subTableName, + insertRows: 0, } scaleTableNames = append(scaleTableNames, tableName) @@ -389,12 +389,12 @@ func createSubTable(subTableMaps map[string]*dataRows) { buffers.WriteString(" using ") buffers.WriteString(superTableName) buffers.WriteString(" tags(") - for _, tag := range subTableMaps[subTableName].config.Tags{ + for _, tag := range subTableMaps[subTableName].config.Tags { tagValue := fmt.Sprintf("%v", tagValues[strings.ToLower(tag.Name)]) buffers.WriteString("'" + tagValue + "'") buffers.WriteString(",") } - buffers.Truncate(buffers.Len()-1) + buffers.Truncate(buffers.Len() - 1) buffers.WriteString(")") createTableSql := buffers.String() @@ -451,14 +451,14 @@ func createSuperTable(superTableConfigMap map[string]*superTableConfig) { buffer.WriteString(field.Name + " " + field.Type + ",") } - buffer.Truncate(buffer.Len()-1) + buffer.Truncate(buffer.Len() - 1) buffer.WriteString(") tags( ") for _, tag := range superTableConf.config.Tags { buffer.WriteString(tag.Name + " " + tag.Type + ",") } - buffer.Truncate(buffer.Len()-1) + buffer.Truncate(buffer.Len() - 1) buffer.WriteString(")") createSql := buffer.String() @@ -475,16 +475,15 @@ func createSuperTable(superTableConfigMap map[string]*superTableConfig) { func getScaleSubTableName(subTableName string, hnum int) string { if hnum == 0 { - return subTableName + return subTableName } - return fmt.Sprintf( "%s_%d", subTableName, hnum) + return fmt.Sprintf("%s_%d", subTableName, hnum) } func getSuperTableName(stname string) string { return SUPERTABLE_PREFIX + stname } - /** * normalizationData , and return the num of subTables */ @@ -505,12 +504,12 @@ func normalizationData(fileRows dataRows, minTime int64) int64 { value, ok := subTableMap[subTableName] if !ok { subTableMap[subTableName] = &dataRows{ - rows: []map[string]interface{}{row}, - config: fileRows.config, + rows: []map[string]interface{}{row}, + config: fileRows.config, } tableNum++ - }else{ + } else { value.rows = append(value.rows, row) } } @@ -518,9 +517,9 @@ func normalizationData(fileRows dataRows, minTime int64) int64 { } // return the maximum table rows -func normalizationDataWithSameInterval(fileRows dataRows, avgInterval int64) int64{ +func normalizationDataWithSameInterval(fileRows dataRows, avgInterval int64) int64 { // subTableMap - currSubTableMap := make(map[string]*dataRows) + currSubTableMap := make(map[string]*dataRows) for _, row := range fileRows.rows { // get subTableName tableValue := getSubTableNameValue(row[fileRows.config.SubTableName]) @@ -534,10 +533,10 @@ func normalizationDataWithSameInterval(fileRows dataRows, avgInterval int64) int if !ok { row[fileRows.config.Timestamp] = 0 currSubTableMap[subTableName] = &dataRows{ - rows: []map[string]interface{}{row}, - config: fileRows.config, + rows: []map[string]interface{}{row}, + config: fileRows.config, } - }else{ + } else { row[fileRows.config.Timestamp] = int64(len(value.rows)) * avgInterval value.rows = append(value.rows, row) } @@ -545,7 +544,7 @@ func normalizationDataWithSameInterval(fileRows dataRows, avgInterval int64) int } var maxRows, tableRows int = 0, 0 - for tableName := range currSubTableMap{ + for tableName := range currSubTableMap { tableRows = len(currSubTableMap[tableName].rows) subTableMap[tableName] = currSubTableMap[tableName] // add to global subTableMap if tableRows > maxRows { @@ -556,13 +555,11 @@ func normalizationDataWithSameInterval(fileRows dataRows, avgInterval int64) int return int64(maxRows) } - -func getSubTableName(subTableValue string, superTableName string) string { +func getSubTableName(subTableValue string, superTableName string) string { return SUBTABLE_PREFIX + subTableValue + "_" + superTableName } - -func insertData(threadIndex, start, end int, wg *sync.WaitGroup, successRows []int64) { +func insertData(threadIndex, start, end int, wg *sync.WaitGroup, successRows []int64) { connection := getConnection() defer connection.Close() defer wg.Done() @@ -591,9 +588,9 @@ func insertData(threadIndex, start, end int, wg *sync.WaitGroup, successRows [] var tableEndTime int64 if vnum == 0 { // need continue generate data - tableEndTime = time.Now().UnixNano()/1e6 - }else { - tableEndTime = tableStartTime + superTableConf.cycleTime * int64(vnum) - superTableConf.avgInterval + tableEndTime = time.Now().UnixNano() / 1e6 + } else { + tableEndTime = tableStartTime + superTableConf.cycleTime*int64(vnum) - superTableConf.avgInterval } insertRows := scaleTableMap[tableName].insertRows @@ -603,10 +600,10 @@ func insertData(threadIndex, start, end int, wg *sync.WaitGroup, successRows [] rowIndex := insertRows % subTableRows currentRow := subTableInfo.rows[rowIndex] - currentTime := getPrimaryKey(currentRow[subTableInfo.config.Timestamp]) + loopNum * superTableConf.cycleTime + tableStartTime + currentTime := getPrimaryKey(currentRow[subTableInfo.config.Timestamp]) + loopNum*superTableConf.cycleTime + tableStartTime if currentTime <= tableEndTime { // append - + if lastTableName != tableName { buffers.WriteString(tableName) buffers.WriteString(" values") @@ -616,22 +613,22 @@ func insertData(threadIndex, start, end int, wg *sync.WaitGroup, successRows [] buffers.WriteString("(") buffers.WriteString(fmt.Sprintf("%v", currentTime)) buffers.WriteString(",") - + // fieldNum := len(subTableInfo.config.Fields) - for _,field := range subTableInfo.config.Fields { + for _, field := range subTableInfo.config.Fields { buffers.WriteString(getFieldValue(currentRow[strings.ToLower(field.Name)])) buffers.WriteString(",") // if( i != fieldNum -1){ - + // } } - buffers.Truncate(buffers.Len()-1) + buffers.Truncate(buffers.Len() - 1) buffers.WriteString(") ") appendRows++ insertRows++ - if appendRows == batch { + if appendRows == batch { // executebatch insertSql := buffers.String() connection.Exec("use " + db) @@ -645,7 +642,7 @@ func insertData(threadIndex, start, end int, wg *sync.WaitGroup, successRows [] lastTableName = "" appendRows = 0 } - }else { + } else { // finished insert current table break } @@ -654,14 +651,14 @@ func insertData(threadIndex, start, end int, wg *sync.WaitGroup, successRows [] scaleTableMap[tableName].insertRows = insertRows } - + // left := len(rows) - if appendRows > 0 { + if appendRows > 0 { // executebatch insertSql := buffers.String() connection.Exec("use " + db) affectedRows := executeBatchInsert(insertSql, connection) - + successRows[threadIndex] += affectedRows currSuccessRows += affectedRows @@ -676,7 +673,7 @@ func insertData(threadIndex, start, end int, wg *sync.WaitGroup, successRows [] break } - if(num == 0){ + if num == 0 { wg.Done() //finished insert history data num++ } @@ -691,7 +688,7 @@ func insertData(threadIndex, start, end int, wg *sync.WaitGroup, successRows [] } -func buildSql(rows []tableRows) string{ +func buildSql(rows []tableRows) string { var lastTableName string @@ -709,7 +706,7 @@ func buildSql(rows []tableRows) string{ if lastTableName == row.tableName { buffers.WriteString(row.value) - }else { + } else { buffers.WriteString(" ") buffers.WriteString(row.tableName) buffers.WriteString(" values") @@ -722,7 +719,7 @@ func buildSql(rows []tableRows) string{ return inserSql } -func buildRow(tableName string, currentTime int64, subTableInfo *dataRows, currentRow map[string]interface{}) tableRows{ +func buildRow(tableName string, currentTime int64, subTableInfo *dataRows, currentRow map[string]interface{}) tableRows { tableRows := tableRows{tableName: tableName} @@ -732,12 +729,12 @@ func buildRow(tableName string, currentTime int64, subTableInfo *dataRows, curre buffers.WriteString(fmt.Sprintf("%v", currentTime)) buffers.WriteString(",") - for _,field := range subTableInfo.config.Fields { + for _, field := range subTableInfo.config.Fields { buffers.WriteString(getFieldValue(currentRow[strings.ToLower(field.Name)])) buffers.WriteString(",") } - buffers.Truncate(buffers.Len()-1) + buffers.Truncate(buffers.Len() - 1) buffers.WriteString(")") insertSql := buffers.String() @@ -764,7 +761,7 @@ func getFieldValue(fieldValue interface{}) string { return fmt.Sprintf("'%v'", fieldValue) } -func getConnection() *sql.DB{ +func getConnection() *sql.DB { db, err := sql.Open(DRIVER_NAME, dataSourceName) if err != nil { panic(err) @@ -772,7 +769,6 @@ func getConnection() *sql.DB{ return db } - func getSubTableNameValue(suffix interface{}) string { return fmt.Sprintf("%v", suffix) } @@ -950,7 +946,7 @@ func parseMillisecond(str interface{}, layout string) int64 { log.Println(err) return -1 } - return t.UnixNano()/1e6 + return t.UnixNano() / 1e6 } // lowerMapKey transfer all the map key to lowercase @@ -1009,7 +1005,7 @@ func checkUserCaseConfig(caseName string, caseConfig *dataimport.CaseConfig) { if i < len(caseConfig.Fields)-1 { // delete middle item, a = a[:i+copy(a[i:], a[i+1:])] caseConfig.Fields = caseConfig.Fields[:i+copy(caseConfig.Fields[i:], caseConfig.Fields[i+1:])] - }else { + } else { // delete the last item caseConfig.Fields = caseConfig.Fields[:len(caseConfig.Fields)-1] } @@ -1057,7 +1053,7 @@ func parseArg() { flag.Parse() } -func printArg() { +func printArg() { fmt.Println("used param: ") fmt.Println("-cfg: ", cfg) fmt.Println("-cases:", cases) -- GitLab