提交 93bd2740 编写于 作者: dengyihao's avatar dengyihao

TD-935

上级 f8818e5b
......@@ -7,7 +7,6 @@ import (
"encoding/json"
"flag"
"fmt"
"github.com/taosdata/TDengine/importSampleData/import"
"hash/crc32"
"io"
"log"
......@@ -18,88 +17,89 @@ import (
"sync"
"time"
dataimport "github.com/taosdata/TDengine/importSampleData/import"
_ "github.com/taosdata/driver-go/taosSql"
)
const (
TIMESTAMP = "timestamp"
DATETIME = "datetime"
MILLISECOND = "millisecond"
DEFAULT_STARTTIME int64 = -1
DEFAULT_INTERVAL int64 = 1*1000
DEFAULT_DELAY int64 = -1
DEFAULT_STATISTIC_TABLE = "statistic"
JSON_FORMAT = "json"
CSV_FORMAT = "csv"
TIMESTAMP = "timestamp"
DATETIME = "datetime"
MILLISECOND = "millisecond"
DEFAULT_STARTTIME int64 = -1
DEFAULT_INTERVAL int64 = 1 * 1000
DEFAULT_DELAY int64 = -1
DEFAULT_STATISTIC_TABLE = "statistic"
JSON_FORMAT = "json"
CSV_FORMAT = "csv"
SUPERTABLE_PREFIX = "s_"
SUBTABLE_PREFIX = "t_"
SUBTABLE_PREFIX = "t_"
DRIVER_NAME = "taosSql"
DRIVER_NAME = "taosSql"
STARTTIME_LAYOUT = "2006-01-02 15:04:05.000"
INSERT_PREFIX = "insert into "
INSERT_PREFIX = "insert into "
)
var (
cfg string
cases string
hnum int
vnum int
thread int
batch int
auto int
starttimestr string
interval int64
host string
port int
user string
password string
dropdb int
db string
dbparam string
cfg string
cases string
hnum int
vnum int
thread int
batch int
auto int
starttimestr string
interval int64
host string
port int
user string
password string
dropdb int
db string
dbparam string
dataSourceName string
startTime int64
startTime int64
superTableConfigMap = make(map[string]*superTableConfig)
subTableMap = make(map[string]*dataRows)
scaleTableNames []string
superTableConfigMap = make(map[string]*superTableConfig)
subTableMap = make(map[string]*dataRows)
scaleTableNames []string
scaleTableMap = make(map[string]*scaleTableInfo)
successRows []int64
successRows []int64
lastStaticTime time.Time
lastTotalRows int64
timeTicker *time.Ticker
delay int64 // default 10 milliseconds
tick int64
save int
saveTable string
lastTotalRows int64
timeTicker *time.Ticker
delay int64 // default 10 milliseconds
tick int64
save int
saveTable string
)
type superTableConfig struct {
startTime int64
endTime int64
cycleTime int64
startTime int64
endTime int64
cycleTime int64
avgInterval int64
config dataimport.CaseConfig
config dataimport.CaseConfig
}
type scaleTableInfo struct {
scaleTableName string
subTableName string
insertRows int64
subTableName string
insertRows int64
}
type tableRows struct {
tableName string // tableName
value string // values(...)
tableName string // tableName
value string // values(...)
}
type dataRows struct {
rows []map[string]interface{}
config dataimport.CaseConfig
rows []map[string]interface{}
config dataimport.CaseConfig
}
func (rows dataRows) Len() int {
......@@ -127,10 +127,10 @@ func init() {
if db == "" {
//db = "go"
db = fmt.Sprintf("test_%s",time.Now().Format("20060102"))
db = fmt.Sprintf("test_%s", time.Now().Format("20060102"))
}
if auto == 1 && len(starttimestr) == 0 {
if auto == 1 && len(starttimestr) == 0 {
log.Fatalf("startTime must be set when auto is 1, the format is \"yyyy-MM-dd HH:mm:ss.SSS\" ")
}
......@@ -141,7 +141,7 @@ func init() {
}
startTime = t.UnixNano() / 1e6 // as millisecond
}else{
} else {
startTime = DEFAULT_STARTTIME
}
......@@ -179,7 +179,7 @@ func main() {
_, exists := superTableConfigMap[caseConfig.Stname]
if !exists {
superTableConfigMap[caseConfig.Stname] = &superTableConfig{config:caseConfig}
superTableConfigMap[caseConfig.Stname] = &superTableConfig{config: caseConfig}
} else {
log.Fatalf("the stname of case %s already exist.\n", caseConfig.Stname)
}
......@@ -201,9 +201,9 @@ func main() {
if DEFAULT_DELAY == delay {
// default delay
delay = caseMinumInterval / 2
delay = caseMinumInterval / 2
if delay < 1 {
delay = 1
delay = 1
}
log.Printf("actual delay is %d ms.", delay)
}
......@@ -232,7 +232,7 @@ func main() {
filePerThread := subTableNum / thread
leftFileNum := subTableNum % thread
var wg sync.WaitGroup
var wg sync.WaitGroup
start = time.Now()
......@@ -255,31 +255,31 @@ func main() {
go staticSpeed()
wg.Wait()
usedTime := time.Since(start)
usedTime := time.Since(start)
total := getTotalRows(successRows)
log.Printf("finished insert %d rows, used %d ms, speed %d rows/s", total, usedTime/1e6, total * 1e9 / int64(usedTime))
log.Printf("finished insert %d rows, used %d ms, speed %d rows/s", total, usedTime/1e6, total*1e3/usedTime.Milliseconds())
if vnum == 0 {
// continue waiting for insert data
wait := make(chan string)
v := <- wait
wait := make(chan string)
v := <-wait
log.Printf("program receive %s, exited.\n", v)
}else{
} else {
timeTicker.Stop()
}
}
func staticSpeed(){
func staticSpeed() {
connection := getConnection()
defer connection.Close()
if save == 1 {
connection.Exec("use " + db)
_, err := connection.Exec("create table if not exists " + saveTable +"(ts timestamp, speed int)")
_, err := connection.Exec("create table if not exists " + saveTable + "(ts timestamp, speed int)")
if err != nil {
log.Fatalf("create %s Table error: %s\n", saveTable, err)
}
......@@ -287,13 +287,13 @@ func staticSpeed(){
for {
<-timeTicker.C
currentTime := time.Now()
usedTime := currentTime.UnixNano() - lastStaticTime.UnixNano()
total := getTotalRows(successRows)
currentSuccessRows := total - lastTotalRows
speed := currentSuccessRows * 1e9 / int64(usedTime)
log.Printf("insert %d rows, used %d ms, speed %d rows/s", currentSuccessRows, usedTime/1e6, speed)
......@@ -301,14 +301,14 @@ func staticSpeed(){
insertSql := fmt.Sprintf("insert into %s values(%d, %d)", saveTable, currentTime.UnixNano()/1e6, speed)
connection.Exec(insertSql)
}
lastStaticTime = currentTime
lastTotalRows = total
}
}
func getTotalRows(successRows []int64) int64{
func getTotalRows(successRows []int64) int64 {
var total int64 = 0
for j := 0; j < len(successRows); j++ {
total += successRows[j]
......@@ -316,18 +316,18 @@ func getTotalRows(successRows []int64) int64{
return total
}
func getSuperTableTimeConfig(fileRows dataRows) (start, cycleTime, avgInterval int64){
func getSuperTableTimeConfig(fileRows dataRows) (start, cycleTime, avgInterval int64) {
if auto == 1 {
// use auto generate data time
start = startTime
avgInterval = interval
maxTableRows := normalizationDataWithSameInterval(fileRows, avgInterval)
cycleTime = maxTableRows * avgInterval + avgInterval
cycleTime = maxTableRows*avgInterval + avgInterval
} else {
// use the sample data primary timestamp
sort.Sort(fileRows)// sort the file data by the primarykey
sort.Sort(fileRows) // sort the file data by the primarykey
minTime := getPrimaryKey(fileRows.rows[0][fileRows.config.Timestamp])
maxTime := getPrimaryKey(fileRows.rows[len(fileRows.rows)-1][fileRows.config.Timestamp])
......@@ -340,21 +340,21 @@ func getSuperTableTimeConfig(fileRows dataRows) (start, cycleTime, avgInterval i
if minTime == maxTime {
avgInterval = interval
cycleTime = tableNum * avgInterval + avgInterval
}else{
cycleTime = tableNum*avgInterval + avgInterval
} else {
avgInterval = (maxTime - minTime) / int64(len(fileRows.rows)) * tableNum
cycleTime = maxTime - minTime + avgInterval
}
}
return
}
func createStatisticTable(){
func createStatisticTable() {
connection := getConnection()
defer connection.Close()
_, err := connection.Exec("create table if not exist " + db + "."+ saveTable +"(ts timestamp, speed int)")
_, err := connection.Exec("create table if not exist " + db + "." + saveTable + "(ts timestamp, speed int)")
if err != nil {
log.Fatalf("createStatisticTable error: %s\n", err)
}
......@@ -379,8 +379,8 @@ func createSubTable(subTableMaps map[string]*dataRows) {
tableName := getScaleSubTableName(subTableName, i)
scaleTableMap[tableName] = &scaleTableInfo{
subTableName: subTableName,
insertRows: 0,
subTableName: subTableName,
insertRows: 0,
}
scaleTableNames = append(scaleTableNames, tableName)
......@@ -389,12 +389,12 @@ func createSubTable(subTableMaps map[string]*dataRows) {
buffers.WriteString(" using ")
buffers.WriteString(superTableName)
buffers.WriteString(" tags(")
for _, tag := range subTableMaps[subTableName].config.Tags{
for _, tag := range subTableMaps[subTableName].config.Tags {
tagValue := fmt.Sprintf("%v", tagValues[strings.ToLower(tag.Name)])
buffers.WriteString("'" + tagValue + "'")
buffers.WriteString(",")
}
buffers.Truncate(buffers.Len()-1)
buffers.Truncate(buffers.Len() - 1)
buffers.WriteString(")")
createTableSql := buffers.String()
......@@ -451,14 +451,14 @@ func createSuperTable(superTableConfigMap map[string]*superTableConfig) {
buffer.WriteString(field.Name + " " + field.Type + ",")
}
buffer.Truncate(buffer.Len()-1)
buffer.Truncate(buffer.Len() - 1)
buffer.WriteString(") tags( ")
for _, tag := range superTableConf.config.Tags {
buffer.WriteString(tag.Name + " " + tag.Type + ",")
}
buffer.Truncate(buffer.Len()-1)
buffer.Truncate(buffer.Len() - 1)
buffer.WriteString(")")
createSql := buffer.String()
......@@ -475,16 +475,15 @@ func createSuperTable(superTableConfigMap map[string]*superTableConfig) {
func getScaleSubTableName(subTableName string, hnum int) string {
if hnum == 0 {
return subTableName
return subTableName
}
return fmt.Sprintf( "%s_%d", subTableName, hnum)
return fmt.Sprintf("%s_%d", subTableName, hnum)
}
func getSuperTableName(stname string) string {
return SUPERTABLE_PREFIX + stname
}
/**
* normalizationData , and return the num of subTables
*/
......@@ -505,12 +504,12 @@ func normalizationData(fileRows dataRows, minTime int64) int64 {
value, ok := subTableMap[subTableName]
if !ok {
subTableMap[subTableName] = &dataRows{
rows: []map[string]interface{}{row},
config: fileRows.config,
rows: []map[string]interface{}{row},
config: fileRows.config,
}
tableNum++
}else{
} else {
value.rows = append(value.rows, row)
}
}
......@@ -518,9 +517,9 @@ func normalizationData(fileRows dataRows, minTime int64) int64 {
}
// return the maximum table rows
func normalizationDataWithSameInterval(fileRows dataRows, avgInterval int64) int64{
func normalizationDataWithSameInterval(fileRows dataRows, avgInterval int64) int64 {
// subTableMap
currSubTableMap := make(map[string]*dataRows)
currSubTableMap := make(map[string]*dataRows)
for _, row := range fileRows.rows {
// get subTableName
tableValue := getSubTableNameValue(row[fileRows.config.SubTableName])
......@@ -534,10 +533,10 @@ func normalizationDataWithSameInterval(fileRows dataRows, avgInterval int64) int
if !ok {
row[fileRows.config.Timestamp] = 0
currSubTableMap[subTableName] = &dataRows{
rows: []map[string]interface{}{row},
config: fileRows.config,
rows: []map[string]interface{}{row},
config: fileRows.config,
}
}else{
} else {
row[fileRows.config.Timestamp] = int64(len(value.rows)) * avgInterval
value.rows = append(value.rows, row)
}
......@@ -545,7 +544,7 @@ func normalizationDataWithSameInterval(fileRows dataRows, avgInterval int64) int
}
var maxRows, tableRows int = 0, 0
for tableName := range currSubTableMap{
for tableName := range currSubTableMap {
tableRows = len(currSubTableMap[tableName].rows)
subTableMap[tableName] = currSubTableMap[tableName] // add to global subTableMap
if tableRows > maxRows {
......@@ -556,13 +555,11 @@ func normalizationDataWithSameInterval(fileRows dataRows, avgInterval int64) int
return int64(maxRows)
}
func getSubTableName(subTableValue string, superTableName string) string {
func getSubTableName(subTableValue string, superTableName string) string {
return SUBTABLE_PREFIX + subTableValue + "_" + superTableName
}
func insertData(threadIndex, start, end int, wg *sync.WaitGroup, successRows []int64) {
func insertData(threadIndex, start, end int, wg *sync.WaitGroup, successRows []int64) {
connection := getConnection()
defer connection.Close()
defer wg.Done()
......@@ -591,9 +588,9 @@ func insertData(threadIndex, start, end int, wg *sync.WaitGroup, successRows []
var tableEndTime int64
if vnum == 0 {
// need continue generate data
tableEndTime = time.Now().UnixNano()/1e6
}else {
tableEndTime = tableStartTime + superTableConf.cycleTime * int64(vnum) - superTableConf.avgInterval
tableEndTime = time.Now().UnixNano() / 1e6
} else {
tableEndTime = tableStartTime + superTableConf.cycleTime*int64(vnum) - superTableConf.avgInterval
}
insertRows := scaleTableMap[tableName].insertRows
......@@ -603,10 +600,10 @@ func insertData(threadIndex, start, end int, wg *sync.WaitGroup, successRows []
rowIndex := insertRows % subTableRows
currentRow := subTableInfo.rows[rowIndex]
currentTime := getPrimaryKey(currentRow[subTableInfo.config.Timestamp]) + loopNum * superTableConf.cycleTime + tableStartTime
currentTime := getPrimaryKey(currentRow[subTableInfo.config.Timestamp]) + loopNum*superTableConf.cycleTime + tableStartTime
if currentTime <= tableEndTime {
// append
if lastTableName != tableName {
buffers.WriteString(tableName)
buffers.WriteString(" values")
......@@ -616,22 +613,22 @@ func insertData(threadIndex, start, end int, wg *sync.WaitGroup, successRows []
buffers.WriteString("(")
buffers.WriteString(fmt.Sprintf("%v", currentTime))
buffers.WriteString(",")
// fieldNum := len(subTableInfo.config.Fields)
for _,field := range subTableInfo.config.Fields {
for _, field := range subTableInfo.config.Fields {
buffers.WriteString(getFieldValue(currentRow[strings.ToLower(field.Name)]))
buffers.WriteString(",")
// if( i != fieldNum -1){
// }
}
buffers.Truncate(buffers.Len()-1)
buffers.Truncate(buffers.Len() - 1)
buffers.WriteString(") ")
appendRows++
insertRows++
if appendRows == batch {
if appendRows == batch {
// executebatch
insertSql := buffers.String()
connection.Exec("use " + db)
......@@ -645,7 +642,7 @@ func insertData(threadIndex, start, end int, wg *sync.WaitGroup, successRows []
lastTableName = ""
appendRows = 0
}
}else {
} else {
// finished insert current table
break
}
......@@ -654,14 +651,14 @@ func insertData(threadIndex, start, end int, wg *sync.WaitGroup, successRows []
scaleTableMap[tableName].insertRows = insertRows
}
// left := len(rows)
if appendRows > 0 {
if appendRows > 0 {
// executebatch
insertSql := buffers.String()
connection.Exec("use " + db)
affectedRows := executeBatchInsert(insertSql, connection)
successRows[threadIndex] += affectedRows
currSuccessRows += affectedRows
......@@ -676,7 +673,7 @@ func insertData(threadIndex, start, end int, wg *sync.WaitGroup, successRows []
break
}
if(num == 0){
if num == 0 {
wg.Done() //finished insert history data
num++
}
......@@ -691,7 +688,7 @@ func insertData(threadIndex, start, end int, wg *sync.WaitGroup, successRows []
}
func buildSql(rows []tableRows) string{
func buildSql(rows []tableRows) string {
var lastTableName string
......@@ -709,7 +706,7 @@ func buildSql(rows []tableRows) string{
if lastTableName == row.tableName {
buffers.WriteString(row.value)
}else {
} else {
buffers.WriteString(" ")
buffers.WriteString(row.tableName)
buffers.WriteString(" values")
......@@ -722,7 +719,7 @@ func buildSql(rows []tableRows) string{
return inserSql
}
func buildRow(tableName string, currentTime int64, subTableInfo *dataRows, currentRow map[string]interface{}) tableRows{
func buildRow(tableName string, currentTime int64, subTableInfo *dataRows, currentRow map[string]interface{}) tableRows {
tableRows := tableRows{tableName: tableName}
......@@ -732,12 +729,12 @@ func buildRow(tableName string, currentTime int64, subTableInfo *dataRows, curre
buffers.WriteString(fmt.Sprintf("%v", currentTime))
buffers.WriteString(",")
for _,field := range subTableInfo.config.Fields {
for _, field := range subTableInfo.config.Fields {
buffers.WriteString(getFieldValue(currentRow[strings.ToLower(field.Name)]))
buffers.WriteString(",")
}
buffers.Truncate(buffers.Len()-1)
buffers.Truncate(buffers.Len() - 1)
buffers.WriteString(")")
insertSql := buffers.String()
......@@ -764,7 +761,7 @@ func getFieldValue(fieldValue interface{}) string {
return fmt.Sprintf("'%v'", fieldValue)
}
func getConnection() *sql.DB{
func getConnection() *sql.DB {
db, err := sql.Open(DRIVER_NAME, dataSourceName)
if err != nil {
panic(err)
......@@ -772,7 +769,6 @@ func getConnection() *sql.DB{
return db
}
func getSubTableNameValue(suffix interface{}) string {
return fmt.Sprintf("%v", suffix)
}
......@@ -950,7 +946,7 @@ func parseMillisecond(str interface{}, layout string) int64 {
log.Println(err)
return -1
}
return t.UnixNano()/1e6
return t.UnixNano() / 1e6
}
// lowerMapKey transfer all the map key to lowercase
......@@ -1009,7 +1005,7 @@ func checkUserCaseConfig(caseName string, caseConfig *dataimport.CaseConfig) {
if i < len(caseConfig.Fields)-1 {
// delete middle item, a = a[:i+copy(a[i:], a[i+1:])]
caseConfig.Fields = caseConfig.Fields[:i+copy(caseConfig.Fields[i:], caseConfig.Fields[i+1:])]
}else {
} else {
// delete the last item
caseConfig.Fields = caseConfig.Fields[:len(caseConfig.Fields)-1]
}
......@@ -1057,7 +1053,7 @@ func parseArg() {
flag.Parse()
}
func printArg() {
func printArg() {
fmt.Println("used param: ")
fmt.Println("-cfg: ", cfg)
fmt.Println("-cases:", cases)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册