未验证 提交 bcc32617 编写于 作者: F fangpanpan 提交者: GitHub

Merge pull request #1033 from taosdata/feature/importSampleData

add statistic and save option
## 样例数据导入
该工具可以根据用户提供的 `json``csv` 格式样例数据文件快速导入 `TDengine`,目前仅能在Linux上运行。
该工具可以根据用户提供的 `json``csv` 格式样例数据文件快速导入 `TDengine`,目前仅能在 Linux 上运行。
为了体验写入和查询性能,可以对样例数据进行横向、纵向扩展。横向扩展是指将一个表(监测点)的数据克隆到多张表,纵向扩展是指将样例数据中的一段时间范围内的数据在时间轴上复制。该工具还支持历史数据导入至当前时间后持续导入,这样可以测试插入和查询并行进行的场景,以模拟真实环境。
......@@ -83,11 +83,19 @@ go build -o bin/taosimport app/main.go
* -vnum int
需要将样例数据进行纵向扩展的次数,如果设置为 0 代表将历史数据导入至当前时间后持续按照指定间隔导入。默认为 1000,表示将样例数据在时间轴上纵向复制1000次。
需要将样例数据进行纵向扩展的次数,如果设置为 0 代表将历史数据导入至当前时间后持续按照指定间隔导入。默认为 1000,表示将样例数据在时间轴上纵向复制1000 次。
* -delay int
当 vnum 设置为 0 时持续导入的时间间隔,默认 3000 ms。
当 vnum 设置为 0 时持续导入的时间间隔,默认为所有场景中最小记录间隔时间的一半,单位 ms。
* -tick int
打印统计信息的时间间隔,默认 2000 ms。
* -save int
是否保存统计信息到 tdengine 的 statistic 表中,1 是,0 否, 默认 0。
* -auto int
......
......@@ -16,7 +16,6 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
_ "github.com/taosdata/TDengine/src/connector/go/src/taosSql"
......@@ -28,6 +27,7 @@ const (
MILLISECOND = "millisecond"
DEFAULT_STARTTIME int64 = -1
DEFAULT_INTERVAL int64 = 1*1000
DEFAULT_DELAY int64 = -1
JSON_FORMAT = "json"
CSV_FORMAT = "csv"
......@@ -37,6 +37,7 @@ const (
DRIVER_NAME = "taosSql"
STARTTIME_LAYOUT = "2006-01-02 15:04:05.000"
INSERT_PREFIX = "insert into "
STATISTIC_TABLE = "statistic"
)
var (
......@@ -67,8 +68,13 @@ var (
scaleTableMap = make(map[string]*scaleTableInfo)
totalSuccessRows int64
successRows []int64
lastStaticTime time.Time
lastTotalRows int64
timeTicker *time.Ticker
delay int64 // default 10 milliseconds
tick int64
save int
)
type superTableConfig struct {
......@@ -149,6 +155,8 @@ func main() {
importConfig := dataimport.LoadConfig(cfg)
var caseMinumInterval int64 = -1
for _, userCase := range strings.Split(cases, ",") {
caseConfig, ok := importConfig.UserCases[userCase]
......@@ -182,10 +190,23 @@ func main() {
superTableConfigMap[caseConfig.Stname].avgInterval = avgInterval
superTableConfigMap[caseConfig.Stname].cycleTime = cycleTime
if caseMinumInterval == -1 || caseMinumInterval > avgInterval {
caseMinumInterval = avgInterval
}
startStr := time.Unix(0, start*int64(time.Millisecond)).Format(STARTTIME_LAYOUT)
log.Printf("case [%s] startTime %s(%d), average dataInterval %d ms, cycleTime %d ms.\n", userCase, startStr, start, avgInterval, cycleTime)
}
if DEFAULT_DELAY == delay {
// default delay
delay = caseMinumInterval / 2
if delay < 1 {
delay = 1
}
log.Printf("actual delay is %d ms.", delay)
}
superTableNum := len(superTableConfigMap)
if superTableNum == 0 {
log.Fatalln("no valid file, exited")
......@@ -214,6 +235,8 @@ func main() {
start = time.Now()
successRows = make([]int64, thread)
startIndex, endIndex := 0, filePerThread
for i := 0; i < thread; i++ {
// start thread
......@@ -222,25 +245,76 @@ func main() {
}
wg.Add(1)
go insertData(i, startIndex, endIndex, &wg)
go insertData(i, startIndex, endIndex, &wg, successRows)
startIndex, endIndex = endIndex, endIndex+filePerThread
}
lastStaticTime = time.Now()
timeTicker = time.NewTicker(time.Millisecond * time.Duration(tick))
go staticSpeed()
wg.Wait()
usedTime := time.Since(start)
log.Printf("finished insert %d rows, used %d ms, speed %d rows/s", totalSuccessRows, usedTime/1e6, totalSuccessRows * 1e9 / int64(usedTime))
total := getTotalRows(successRows)
log.Printf("finished insert %d rows, used %d ms, speed %d rows/s", total, usedTime/1e6, total * 1e9 / int64(usedTime))
if vnum == 0 {
// continue waiting for insert data
wait := make(chan string)
v := <- wait
log.Printf("program receive %s, exited.\n", v)
}else{
timeTicker.Stop()
}
}
func staticSpeed(){
connection := getConnection()
defer connection.Close()
if save == 1 {
connection.Exec("use " + db)
_, err := connection.Exec("create table if not exists " + STATISTIC_TABLE +"(ts timestamp, speed int)")
if err != nil {
log.Fatalf("create %s Table error: %s\n", STATISTIC_TABLE, err)
}
}
for {
<-timeTicker.C
currentTime := time.Now()
usedTime := currentTime.UnixNano() - lastStaticTime.UnixNano()
total := getTotalRows(successRows)
currentSuccessRows := total - lastTotalRows
speed := currentSuccessRows * 1e9 / int64(usedTime)
log.Printf("insert %d rows, used %d ms, speed %d rows/s", currentSuccessRows, usedTime/1e6, speed)
if save == 1 {
insertSql := fmt.Sprintf("insert into %s values(%d, %d)", STATISTIC_TABLE, currentTime.UnixNano()/1e6, speed)
connection.Exec(insertSql)
}
lastStaticTime = currentTime
lastTotalRows = total
}
}
func getTotalRows(successRows []int64) int64{
var total int64 = 0
for j := 0; j < len(successRows); j++ {
total += successRows[j]
}
return total
}
func getSuperTableTimeConfig(fileRows dataRows) (start, cycleTime, avgInterval int64){
if auto == 1 {
// use auto generate data time
......@@ -275,6 +349,15 @@ func getSuperTableTimeConfig(fileRows dataRows) (start, cycleTime, avgInterval i
return
}
func createStatisticTable(){
connection := getConnection()
defer connection.Close()
_, err := connection.Exec("create table if not exist " + db + "."+ STATISTIC_TABLE +"(ts timestamp, speed int)")
if err != nil {
log.Fatalf("createStatisticTable error: %s\n", err)
}
}
func createSubTable(subTableMaps map[string]*dataRows) {
......@@ -478,24 +561,25 @@ func getSubTableName(subTableValue string, superTableName string) string {
}
func insertData(threadIndex, start, end int, wg *sync.WaitGroup) {
func insertData(threadIndex, start, end int, wg *sync.WaitGroup, successRows []int64) {
connection := getConnection()
defer connection.Close()
defer wg.Done()
connection.Exec("use " + db) // use db
log.Printf("thread-%d start insert into [%d, %d) subtables.\n", threadIndex, start, end)
num := 0
subTables := scaleTableNames[start:end]
for {
log.Printf("thread-%d start insert into [%d, %d) subtables.\n", threadIndex, start, end)
threadStartTime := time.Now()
var currSuccessRows int64
var appendRows int
var lastTableName string
var successRows int64
var rows []tableRows
buffers := bytes.Buffer{}
buffers.WriteString(INSERT_PREFIX)
subTables := scaleTableNames[start:end]
for _, tableName := range subTables {
subTableInfo := subTableMap[scaleTableMap[tableName].subTableName]
......@@ -507,7 +591,7 @@ func insertData(threadIndex, start, end int, wg *sync.WaitGroup) {
if vnum == 0 {
// need continue generate data
tableEndTime = time.Now().UnixNano()/1e6
}else {
}else {
tableEndTime = tableStartTime + superTableConf.cycleTime * int64(vnum) - superTableConf.avgInterval
}
......@@ -521,17 +605,43 @@ func insertData(threadIndex, start, end int, wg *sync.WaitGroup) {
currentTime := getPrimaryKey(currentRow[subTableInfo.config.Timestamp]) + loopNum * superTableConf.cycleTime + tableStartTime
if currentTime <= tableEndTime {
// append
row := buildRow(tableName, currentTime, subTableInfo, currentRow)
rows = append(rows, row)
if lastTableName != tableName {
buffers.WriteString(tableName)
buffers.WriteString(" values")
}
lastTableName = tableName
buffers.WriteString("(")
buffers.WriteString(fmt.Sprintf("%v", currentTime))
buffers.WriteString(",")
// fieldNum := len(subTableInfo.config.Fields)
for _,field := range subTableInfo.config.Fields {
buffers.WriteString(getFieldValue(currentRow[strings.ToLower(field.Name)]))
buffers.WriteString(",")
// if( i != fieldNum -1){
// }
}
buffers.Truncate(buffers.Len()-1)
buffers.WriteString(") ")
appendRows++
insertRows++
if len(rows) == batch {
if appendRows == batch {
// executebatch
insertSql := buildSql(rows)
insertSql := buffers.String()
affectedRows := executeBatchInsert(insertSql, connection)
successRows = atomic.AddInt64(&successRows, affectedRows)
rows = []tableRows{}
successRows[threadIndex] += affectedRows
currSuccessRows += affectedRows
buffers.Reset()
buffers.WriteString(INSERT_PREFIX)
lastTableName = ""
appendRows = 0
}
}else {
// finished insert current table
......@@ -543,31 +653,37 @@ func insertData(threadIndex, start, end int, wg *sync.WaitGroup) {
}
left := len(rows)
if left > 0 {
// left := len(rows)
if appendRows > 0 {
// executebatch
insertSql := buildSql(rows)
insertSql := buffers.String()
affectedRows := executeBatchInsert(insertSql, connection)
successRows = atomic.AddInt64(&successRows, affectedRows)
successRows[threadIndex] += affectedRows
currSuccessRows += affectedRows
buffers.Reset()
}
atomic.AddInt64(&totalSuccessRows, successRows)
log.Printf("thread-%d finished insert %d rows, used %d ms.", threadIndex, successRows, time.Since(threadStartTime)/1e6)
// log.Printf("thread-%d finished insert %d rows, used %d ms.", threadIndex, currSuccessRows, time.Since(threadStartTime)/1e6)
if vnum != 0 {
// thread finished insert data
// log.Printf("thread-%d exit\n", threadIndex)
break
}
if(num == 0){
wg.Done() // finished insert history data
wg.Done() //finished insert history data
num++
}
num++
// need continue insert data
// log.Printf("thread-%d start to sleep %d ms.", threadIndex, delay)
time.Sleep(time.Duration(delay) * time.Millisecond)
if currSuccessRows == 0 {
// log.Printf("thread-%d start to sleep %d ms.", threadIndex, delay)
time.Sleep(time.Duration(delay) * time.Millisecond)
}
// need continue insert data
}
}
......@@ -638,6 +754,7 @@ func executeBatchInsert(insertSql string, connection *sql.DB) int64 {
affected = 0
}
return affected
// return 0
}
func getFieldValue(fieldValue interface{}) string {
......@@ -917,7 +1034,9 @@ func parseArg() {
flag.StringVar(&cases, "cases", "sensor_info", "usecase for dataset to be imported. Multiple choices can be separated by comma, for example, -cases sensor_info,camera_detection.")
flag.IntVar(&hnum, "hnum", 100, "magnification factor of the sample tables. For example, if hnum is 100 and in the sample data there are 10 tables, then 10x100=1000 tables will be created in the database.")
flag.IntVar(&vnum, "vnum", 1000, "copies of the sample records in each table. If set to 0,this program will never stop simulating and importing data even if the timestamp has passed current time.")
flag.Int64Var(&delay, "delay", 3 * 1000, "the delay millisecond to continue generate data when vnum set 0.")
flag.Int64Var(&delay, "delay", DEFAULT_DELAY, "the delay time interval(millisecond) to continue generating data when vnum set 0.")
flag.Int64Var(&tick, "tick", 2000, "the tick time interval(millisecond) to print statistic info.")
flag.IntVar(&save, "save", 0, "whether to save the statistical info into 'statistic' table. 0 is disabled and 1 is enabled.")
flag.IntVar(&thread, "thread", 10, "number of threads to import data.")
flag.IntVar(&batch, "batch", 100, "rows of records in one import batch.")
flag.IntVar(&auto, "auto", 0, "whether to use the starttime and interval specified by users when simulating the data. 0 is disabled and 1 is enabled.")
......@@ -940,12 +1059,14 @@ func printArg() {
fmt.Println("-cases:", cases)
fmt.Println("-hnum:", hnum)
fmt.Println("-vnum:", vnum)
fmt.Println("-delay:", delay)
fmt.Println("-tick:", tick)
fmt.Println("-save:", save)
fmt.Println("-thread:", thread)
fmt.Println("-batch:", batch)
fmt.Println("-auto:", auto)
fmt.Println("-start:", starttimestr)
fmt.Println("-interval:", interval)
fmt.Println("-delay:", delay)
fmt.Println("-host:", host)
fmt.Println("-port", port)
fmt.Println("-user", user)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册