未验证 提交 8f8f072a 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #2694 from taosdata/hotfix/TD-935

Hotfix/td 935
...@@ -7,7 +7,6 @@ import ( ...@@ -7,7 +7,6 @@ import (
"encoding/json" "encoding/json"
"flag" "flag"
"fmt" "fmt"
"github.com/taosdata/TDengine/importSampleData/import"
"hash/crc32" "hash/crc32"
"io" "io"
"log" "log"
...@@ -18,6 +17,8 @@ import ( ...@@ -18,6 +17,8 @@ import (
"sync" "sync"
"time" "time"
dataimport "github.com/taosdata/TDengine/importSampleData/import"
_ "github.com/taosdata/driver-go/taosSql" _ "github.com/taosdata/driver-go/taosSql"
) )
...@@ -26,7 +27,7 @@ const ( ...@@ -26,7 +27,7 @@ const (
DATETIME = "datetime" DATETIME = "datetime"
MILLISECOND = "millisecond" MILLISECOND = "millisecond"
DEFAULT_STARTTIME int64 = -1 DEFAULT_STARTTIME int64 = -1
DEFAULT_INTERVAL int64 = 1*1000 DEFAULT_INTERVAL int64 = 1 * 1000
DEFAULT_DELAY int64 = -1 DEFAULT_DELAY int64 = -1
DEFAULT_STATISTIC_TABLE = "statistic" DEFAULT_STATISTIC_TABLE = "statistic"
...@@ -41,7 +42,6 @@ const ( ...@@ -41,7 +42,6 @@ const (
) )
var ( var (
cfg string cfg string
cases string cases string
hnum int hnum int
...@@ -127,7 +127,7 @@ func init() { ...@@ -127,7 +127,7 @@ func init() {
if db == "" { if db == "" {
//db = "go" //db = "go"
db = fmt.Sprintf("test_%s",time.Now().Format("20060102")) db = fmt.Sprintf("test_%s", time.Now().Format("20060102"))
} }
if auto == 1 && len(starttimestr) == 0 { if auto == 1 && len(starttimestr) == 0 {
...@@ -141,7 +141,7 @@ func init() { ...@@ -141,7 +141,7 @@ func init() {
} }
startTime = t.UnixNano() / 1e6 // as millisecond startTime = t.UnixNano() / 1e6 // as millisecond
}else{ } else {
startTime = DEFAULT_STARTTIME startTime = DEFAULT_STARTTIME
} }
...@@ -179,7 +179,7 @@ func main() { ...@@ -179,7 +179,7 @@ func main() {
_, exists := superTableConfigMap[caseConfig.Stname] _, exists := superTableConfigMap[caseConfig.Stname]
if !exists { if !exists {
superTableConfigMap[caseConfig.Stname] = &superTableConfig{config:caseConfig} superTableConfigMap[caseConfig.Stname] = &superTableConfig{config: caseConfig}
} else { } else {
log.Fatalf("the stname of case %s already exist.\n", caseConfig.Stname) log.Fatalf("the stname of case %s already exist.\n", caseConfig.Stname)
} }
...@@ -259,27 +259,27 @@ func main() { ...@@ -259,27 +259,27 @@ func main() {
total := getTotalRows(successRows) total := getTotalRows(successRows)
log.Printf("finished insert %d rows, used %d ms, speed %d rows/s", total, usedTime/1e6, total * 1e9 / int64(usedTime)) log.Printf("finished insert %d rows, used %d ms, speed %d rows/s", total, usedTime/1e6, total*1e3/usedTime.Milliseconds())
if vnum == 0 { if vnum == 0 {
// continue waiting for insert data // continue waiting for insert data
wait := make(chan string) wait := make(chan string)
v := <- wait v := <-wait
log.Printf("program receive %s, exited.\n", v) log.Printf("program receive %s, exited.\n", v)
}else{ } else {
timeTicker.Stop() timeTicker.Stop()
} }
} }
func staticSpeed(){ func staticSpeed() {
connection := getConnection() connection := getConnection()
defer connection.Close() defer connection.Close()
if save == 1 { if save == 1 {
connection.Exec("use " + db) connection.Exec("use " + db)
_, err := connection.Exec("create table if not exists " + saveTable +"(ts timestamp, speed int)") _, err := connection.Exec("create table if not exists " + saveTable + "(ts timestamp, speed int)")
if err != nil { if err != nil {
log.Fatalf("create %s Table error: %s\n", saveTable, err) log.Fatalf("create %s Table error: %s\n", saveTable, err)
} }
...@@ -308,7 +308,7 @@ func staticSpeed(){ ...@@ -308,7 +308,7 @@ func staticSpeed(){
} }
func getTotalRows(successRows []int64) int64{ func getTotalRows(successRows []int64) int64 {
var total int64 = 0 var total int64 = 0
for j := 0; j < len(successRows); j++ { for j := 0; j < len(successRows); j++ {
total += successRows[j] total += successRows[j]
...@@ -316,18 +316,18 @@ func getTotalRows(successRows []int64) int64{ ...@@ -316,18 +316,18 @@ func getTotalRows(successRows []int64) int64{
return total return total
} }
func getSuperTableTimeConfig(fileRows dataRows) (start, cycleTime, avgInterval int64){ func getSuperTableTimeConfig(fileRows dataRows) (start, cycleTime, avgInterval int64) {
if auto == 1 { if auto == 1 {
// use auto generate data time // use auto generate data time
start = startTime start = startTime
avgInterval = interval avgInterval = interval
maxTableRows := normalizationDataWithSameInterval(fileRows, avgInterval) maxTableRows := normalizationDataWithSameInterval(fileRows, avgInterval)
cycleTime = maxTableRows * avgInterval + avgInterval cycleTime = maxTableRows*avgInterval + avgInterval
} else { } else {
// use the sample data primary timestamp // use the sample data primary timestamp
sort.Sort(fileRows)// sort the file data by the primarykey sort.Sort(fileRows) // sort the file data by the primarykey
minTime := getPrimaryKey(fileRows.rows[0][fileRows.config.Timestamp]) minTime := getPrimaryKey(fileRows.rows[0][fileRows.config.Timestamp])
maxTime := getPrimaryKey(fileRows.rows[len(fileRows.rows)-1][fileRows.config.Timestamp]) maxTime := getPrimaryKey(fileRows.rows[len(fileRows.rows)-1][fileRows.config.Timestamp])
...@@ -340,8 +340,8 @@ func getSuperTableTimeConfig(fileRows dataRows) (start, cycleTime, avgInterval i ...@@ -340,8 +340,8 @@ func getSuperTableTimeConfig(fileRows dataRows) (start, cycleTime, avgInterval i
if minTime == maxTime { if minTime == maxTime {
avgInterval = interval avgInterval = interval
cycleTime = tableNum * avgInterval + avgInterval cycleTime = tableNum*avgInterval + avgInterval
}else{ } else {
avgInterval = (maxTime - minTime) / int64(len(fileRows.rows)) * tableNum avgInterval = (maxTime - minTime) / int64(len(fileRows.rows)) * tableNum
cycleTime = maxTime - minTime + avgInterval cycleTime = maxTime - minTime + avgInterval
} }
...@@ -350,11 +350,11 @@ func getSuperTableTimeConfig(fileRows dataRows) (start, cycleTime, avgInterval i ...@@ -350,11 +350,11 @@ func getSuperTableTimeConfig(fileRows dataRows) (start, cycleTime, avgInterval i
return return
} }
func createStatisticTable(){ func createStatisticTable() {
connection := getConnection() connection := getConnection()
defer connection.Close() defer connection.Close()
_, err := connection.Exec("create table if not exist " + db + "."+ saveTable +"(ts timestamp, speed int)") _, err := connection.Exec("create table if not exist " + db + "." + saveTable + "(ts timestamp, speed int)")
if err != nil { if err != nil {
log.Fatalf("createStatisticTable error: %s\n", err) log.Fatalf("createStatisticTable error: %s\n", err)
} }
...@@ -389,12 +389,12 @@ func createSubTable(subTableMaps map[string]*dataRows) { ...@@ -389,12 +389,12 @@ func createSubTable(subTableMaps map[string]*dataRows) {
buffers.WriteString(" using ") buffers.WriteString(" using ")
buffers.WriteString(superTableName) buffers.WriteString(superTableName)
buffers.WriteString(" tags(") buffers.WriteString(" tags(")
for _, tag := range subTableMaps[subTableName].config.Tags{ for _, tag := range subTableMaps[subTableName].config.Tags {
tagValue := fmt.Sprintf("%v", tagValues[strings.ToLower(tag.Name)]) tagValue := fmt.Sprintf("%v", tagValues[strings.ToLower(tag.Name)])
buffers.WriteString("'" + tagValue + "'") buffers.WriteString("'" + tagValue + "'")
buffers.WriteString(",") buffers.WriteString(",")
} }
buffers.Truncate(buffers.Len()-1) buffers.Truncate(buffers.Len() - 1)
buffers.WriteString(")") buffers.WriteString(")")
createTableSql := buffers.String() createTableSql := buffers.String()
...@@ -451,14 +451,14 @@ func createSuperTable(superTableConfigMap map[string]*superTableConfig) { ...@@ -451,14 +451,14 @@ func createSuperTable(superTableConfigMap map[string]*superTableConfig) {
buffer.WriteString(field.Name + " " + field.Type + ",") buffer.WriteString(field.Name + " " + field.Type + ",")
} }
buffer.Truncate(buffer.Len()-1) buffer.Truncate(buffer.Len() - 1)
buffer.WriteString(") tags( ") buffer.WriteString(") tags( ")
for _, tag := range superTableConf.config.Tags { for _, tag := range superTableConf.config.Tags {
buffer.WriteString(tag.Name + " " + tag.Type + ",") buffer.WriteString(tag.Name + " " + tag.Type + ",")
} }
buffer.Truncate(buffer.Len()-1) buffer.Truncate(buffer.Len() - 1)
buffer.WriteString(")") buffer.WriteString(")")
createSql := buffer.String() createSql := buffer.String()
...@@ -477,14 +477,13 @@ func getScaleSubTableName(subTableName string, hnum int) string { ...@@ -477,14 +477,13 @@ func getScaleSubTableName(subTableName string, hnum int) string {
if hnum == 0 { if hnum == 0 {
return subTableName return subTableName
} }
return fmt.Sprintf( "%s_%d", subTableName, hnum) return fmt.Sprintf("%s_%d", subTableName, hnum)
} }
func getSuperTableName(stname string) string { func getSuperTableName(stname string) string {
return SUPERTABLE_PREFIX + stname return SUPERTABLE_PREFIX + stname
} }
/** /**
* normalizationData , and return the num of subTables * normalizationData , and return the num of subTables
*/ */
...@@ -510,7 +509,7 @@ func normalizationData(fileRows dataRows, minTime int64) int64 { ...@@ -510,7 +509,7 @@ func normalizationData(fileRows dataRows, minTime int64) int64 {
} }
tableNum++ tableNum++
}else{ } else {
value.rows = append(value.rows, row) value.rows = append(value.rows, row)
} }
} }
...@@ -518,7 +517,7 @@ func normalizationData(fileRows dataRows, minTime int64) int64 { ...@@ -518,7 +517,7 @@ func normalizationData(fileRows dataRows, minTime int64) int64 {
} }
// return the maximum table rows // return the maximum table rows
func normalizationDataWithSameInterval(fileRows dataRows, avgInterval int64) int64{ func normalizationDataWithSameInterval(fileRows dataRows, avgInterval int64) int64 {
// subTableMap // subTableMap
currSubTableMap := make(map[string]*dataRows) currSubTableMap := make(map[string]*dataRows)
for _, row := range fileRows.rows { for _, row := range fileRows.rows {
...@@ -537,7 +536,7 @@ func normalizationDataWithSameInterval(fileRows dataRows, avgInterval int64) int ...@@ -537,7 +536,7 @@ func normalizationDataWithSameInterval(fileRows dataRows, avgInterval int64) int
rows: []map[string]interface{}{row}, rows: []map[string]interface{}{row},
config: fileRows.config, config: fileRows.config,
} }
}else{ } else {
row[fileRows.config.Timestamp] = int64(len(value.rows)) * avgInterval row[fileRows.config.Timestamp] = int64(len(value.rows)) * avgInterval
value.rows = append(value.rows, row) value.rows = append(value.rows, row)
} }
...@@ -545,7 +544,7 @@ func normalizationDataWithSameInterval(fileRows dataRows, avgInterval int64) int ...@@ -545,7 +544,7 @@ func normalizationDataWithSameInterval(fileRows dataRows, avgInterval int64) int
} }
var maxRows, tableRows int = 0, 0 var maxRows, tableRows int = 0, 0
for tableName := range currSubTableMap{ for tableName := range currSubTableMap {
tableRows = len(currSubTableMap[tableName].rows) tableRows = len(currSubTableMap[tableName].rows)
subTableMap[tableName] = currSubTableMap[tableName] // add to global subTableMap subTableMap[tableName] = currSubTableMap[tableName] // add to global subTableMap
if tableRows > maxRows { if tableRows > maxRows {
...@@ -556,12 +555,10 @@ func normalizationDataWithSameInterval(fileRows dataRows, avgInterval int64) int ...@@ -556,12 +555,10 @@ func normalizationDataWithSameInterval(fileRows dataRows, avgInterval int64) int
return int64(maxRows) return int64(maxRows)
} }
func getSubTableName(subTableValue string, superTableName string) string { func getSubTableName(subTableValue string, superTableName string) string {
return SUBTABLE_PREFIX + subTableValue + "_" + superTableName return SUBTABLE_PREFIX + subTableValue + "_" + superTableName
} }
func insertData(threadIndex, start, end int, wg *sync.WaitGroup, successRows []int64) { func insertData(threadIndex, start, end int, wg *sync.WaitGroup, successRows []int64) {
connection := getConnection() connection := getConnection()
defer connection.Close() defer connection.Close()
...@@ -591,9 +588,9 @@ func insertData(threadIndex, start, end int, wg *sync.WaitGroup, successRows [] ...@@ -591,9 +588,9 @@ func insertData(threadIndex, start, end int, wg *sync.WaitGroup, successRows []
var tableEndTime int64 var tableEndTime int64
if vnum == 0 { if vnum == 0 {
// need continue generate data // need continue generate data
tableEndTime = time.Now().UnixNano()/1e6 tableEndTime = time.Now().UnixNano() / 1e6
}else { } else {
tableEndTime = tableStartTime + superTableConf.cycleTime * int64(vnum) - superTableConf.avgInterval tableEndTime = tableStartTime + superTableConf.cycleTime*int64(vnum) - superTableConf.avgInterval
} }
insertRows := scaleTableMap[tableName].insertRows insertRows := scaleTableMap[tableName].insertRows
...@@ -603,7 +600,7 @@ func insertData(threadIndex, start, end int, wg *sync.WaitGroup, successRows [] ...@@ -603,7 +600,7 @@ func insertData(threadIndex, start, end int, wg *sync.WaitGroup, successRows []
rowIndex := insertRows % subTableRows rowIndex := insertRows % subTableRows
currentRow := subTableInfo.rows[rowIndex] currentRow := subTableInfo.rows[rowIndex]
currentTime := getPrimaryKey(currentRow[subTableInfo.config.Timestamp]) + loopNum * superTableConf.cycleTime + tableStartTime currentTime := getPrimaryKey(currentRow[subTableInfo.config.Timestamp]) + loopNum*superTableConf.cycleTime + tableStartTime
if currentTime <= tableEndTime { if currentTime <= tableEndTime {
// append // append
...@@ -618,7 +615,7 @@ func insertData(threadIndex, start, end int, wg *sync.WaitGroup, successRows [] ...@@ -618,7 +615,7 @@ func insertData(threadIndex, start, end int, wg *sync.WaitGroup, successRows []
buffers.WriteString(",") buffers.WriteString(",")
// fieldNum := len(subTableInfo.config.Fields) // fieldNum := len(subTableInfo.config.Fields)
for _,field := range subTableInfo.config.Fields { for _, field := range subTableInfo.config.Fields {
buffers.WriteString(getFieldValue(currentRow[strings.ToLower(field.Name)])) buffers.WriteString(getFieldValue(currentRow[strings.ToLower(field.Name)]))
buffers.WriteString(",") buffers.WriteString(",")
// if( i != fieldNum -1){ // if( i != fieldNum -1){
...@@ -626,7 +623,7 @@ func insertData(threadIndex, start, end int, wg *sync.WaitGroup, successRows [] ...@@ -626,7 +623,7 @@ func insertData(threadIndex, start, end int, wg *sync.WaitGroup, successRows []
// } // }
} }
buffers.Truncate(buffers.Len()-1) buffers.Truncate(buffers.Len() - 1)
buffers.WriteString(") ") buffers.WriteString(") ")
appendRows++ appendRows++
...@@ -645,7 +642,7 @@ func insertData(threadIndex, start, end int, wg *sync.WaitGroup, successRows [] ...@@ -645,7 +642,7 @@ func insertData(threadIndex, start, end int, wg *sync.WaitGroup, successRows []
lastTableName = "" lastTableName = ""
appendRows = 0 appendRows = 0
} }
}else { } else {
// finished insert current table // finished insert current table
break break
} }
...@@ -676,7 +673,7 @@ func insertData(threadIndex, start, end int, wg *sync.WaitGroup, successRows [] ...@@ -676,7 +673,7 @@ func insertData(threadIndex, start, end int, wg *sync.WaitGroup, successRows []
break break
} }
if(num == 0){ if num == 0 {
wg.Done() //finished insert history data wg.Done() //finished insert history data
num++ num++
} }
...@@ -691,7 +688,7 @@ func insertData(threadIndex, start, end int, wg *sync.WaitGroup, successRows [] ...@@ -691,7 +688,7 @@ func insertData(threadIndex, start, end int, wg *sync.WaitGroup, successRows []
} }
func buildSql(rows []tableRows) string{ func buildSql(rows []tableRows) string {
var lastTableName string var lastTableName string
...@@ -709,7 +706,7 @@ func buildSql(rows []tableRows) string{ ...@@ -709,7 +706,7 @@ func buildSql(rows []tableRows) string{
if lastTableName == row.tableName { if lastTableName == row.tableName {
buffers.WriteString(row.value) buffers.WriteString(row.value)
}else { } else {
buffers.WriteString(" ") buffers.WriteString(" ")
buffers.WriteString(row.tableName) buffers.WriteString(row.tableName)
buffers.WriteString(" values") buffers.WriteString(" values")
...@@ -722,7 +719,7 @@ func buildSql(rows []tableRows) string{ ...@@ -722,7 +719,7 @@ func buildSql(rows []tableRows) string{
return inserSql return inserSql
} }
func buildRow(tableName string, currentTime int64, subTableInfo *dataRows, currentRow map[string]interface{}) tableRows{ func buildRow(tableName string, currentTime int64, subTableInfo *dataRows, currentRow map[string]interface{}) tableRows {
tableRows := tableRows{tableName: tableName} tableRows := tableRows{tableName: tableName}
...@@ -732,12 +729,12 @@ func buildRow(tableName string, currentTime int64, subTableInfo *dataRows, curre ...@@ -732,12 +729,12 @@ func buildRow(tableName string, currentTime int64, subTableInfo *dataRows, curre
buffers.WriteString(fmt.Sprintf("%v", currentTime)) buffers.WriteString(fmt.Sprintf("%v", currentTime))
buffers.WriteString(",") buffers.WriteString(",")
for _,field := range subTableInfo.config.Fields { for _, field := range subTableInfo.config.Fields {
buffers.WriteString(getFieldValue(currentRow[strings.ToLower(field.Name)])) buffers.WriteString(getFieldValue(currentRow[strings.ToLower(field.Name)]))
buffers.WriteString(",") buffers.WriteString(",")
} }
buffers.Truncate(buffers.Len()-1) buffers.Truncate(buffers.Len() - 1)
buffers.WriteString(")") buffers.WriteString(")")
insertSql := buffers.String() insertSql := buffers.String()
...@@ -764,7 +761,7 @@ func getFieldValue(fieldValue interface{}) string { ...@@ -764,7 +761,7 @@ func getFieldValue(fieldValue interface{}) string {
return fmt.Sprintf("'%v'", fieldValue) return fmt.Sprintf("'%v'", fieldValue)
} }
func getConnection() *sql.DB{ func getConnection() *sql.DB {
db, err := sql.Open(DRIVER_NAME, dataSourceName) db, err := sql.Open(DRIVER_NAME, dataSourceName)
if err != nil { if err != nil {
panic(err) panic(err)
...@@ -772,7 +769,6 @@ func getConnection() *sql.DB{ ...@@ -772,7 +769,6 @@ func getConnection() *sql.DB{
return db return db
} }
func getSubTableNameValue(suffix interface{}) string { func getSubTableNameValue(suffix interface{}) string {
return fmt.Sprintf("%v", suffix) return fmt.Sprintf("%v", suffix)
} }
...@@ -950,7 +946,7 @@ func parseMillisecond(str interface{}, layout string) int64 { ...@@ -950,7 +946,7 @@ func parseMillisecond(str interface{}, layout string) int64 {
log.Println(err) log.Println(err)
return -1 return -1
} }
return t.UnixNano()/1e6 return t.UnixNano() / 1e6
} }
// lowerMapKey transfer all the map key to lowercase // lowerMapKey transfer all the map key to lowercase
...@@ -1009,7 +1005,7 @@ func checkUserCaseConfig(caseName string, caseConfig *dataimport.CaseConfig) { ...@@ -1009,7 +1005,7 @@ func checkUserCaseConfig(caseName string, caseConfig *dataimport.CaseConfig) {
if i < len(caseConfig.Fields)-1 { if i < len(caseConfig.Fields)-1 {
// delete middle item, a = a[:i+copy(a[i:], a[i+1:])] // delete middle item, a = a[:i+copy(a[i:], a[i+1:])]
caseConfig.Fields = caseConfig.Fields[:i+copy(caseConfig.Fields[i:], caseConfig.Fields[i+1:])] caseConfig.Fields = caseConfig.Fields[:i+copy(caseConfig.Fields[i:], caseConfig.Fields[i+1:])]
}else { } else {
// delete the last item // delete the last item
caseConfig.Fields = caseConfig.Fields[:len(caseConfig.Fields)-1] caseConfig.Fields = caseConfig.Fields[:len(caseConfig.Fields)-1]
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册