未验证 提交 199e61ec 编写于 作者: sangshuduo's avatar sangshuduo 提交者: GitHub

Feature/sangshuduo/td 3317 taosdemo interlace (#5477)

* [TD-3316] <fix>: add testcase for taosdemo limit and offset.

check offset 0.

* [TD-3316] <fix>: add testcase for taosdemo limit and offset.

fix sample file import bug.

* [TD-3316] <fix>: add test case for limit and offset. fix sample data issue.

* [TD-3327] <fix>: fix taosdemo segfault when import data from sample data file.

* [TD-3317] <feature>: make taosdemo support interlace mode.

json parameter rows_per_tbl support.

* [TD-3317] <feature>: support interlace mode.

refactor

* [TD-3317] <feature>: support interlace mode.

refactor

* [TD-3317] <feature>: support interlace mode insertion.

refactor.

* [TD-3317] <feature>: support interlace mode insertion.

change json file.

* [TD-3317] <feature>: support interlace mode insertion.

fix multithread create table regression.

* [TD-3317] <feature>: support interlace mode insertion.

working but not perfect.

* [TD-3317] <feature>: support interlace mode insertion.

rename lowaTest with taosdemoTestWithJson

* [TD-3317] <feature>: support interlace mode insertion.

perfect
Co-authored-by: NShuduo Sang <sdsang@taosdata.com>
上级 3f312f8b
...@@ -9,8 +9,7 @@ ...@@ -9,8 +9,7 @@
"thread_count_create_tbl": 4, "thread_count_create_tbl": 4,
"result_file": "./insert_res.txt", "result_file": "./insert_res.txt",
"confirm_parameter_prompt": "no", "confirm_parameter_prompt": "no",
"insert_interval": 5000, "insert_interval": 1000,
"rows_per_tbl": 50,
"num_of_records_per_req": 100, "num_of_records_per_req": 100,
"max_sql_len": 1024000, "max_sql_len": 1024000,
"databases": [{ "databases": [{
...@@ -35,12 +34,12 @@ ...@@ -35,12 +34,12 @@
"super_tables": [{ "super_tables": [{
"name": "stb", "name": "stb",
"child_table_exists":"no", "child_table_exists":"no",
"childtable_count": 10, "childtable_count": 100,
"childtable_prefix": "stb_", "childtable_prefix": "stb_",
"auto_create_table": "no", "auto_create_table": "no",
"data_source": "rand", "data_source": "rand",
"insert_mode": "taosc", "insert_mode": "taosc",
"insert_rows": 200, "insert_rows": 1000,
"multi_thread_write_one_tbl": "no", "multi_thread_write_one_tbl": "no",
"rows_per_tbl": 20, "rows_per_tbl": 20,
"max_sql_len": 1024000, "max_sql_len": 1024000,
...@@ -51,8 +50,8 @@ ...@@ -51,8 +50,8 @@
"sample_format": "csv", "sample_format": "csv",
"sample_file": "./sample.csv", "sample_file": "./sample.csv",
"tags_file": "", "tags_file": "",
"columns": [{"type": "INT"}, {"type": "DOUBLE", "count":10}, {"type": "BINARY", "len": 16, "count":3}, {"type": "BINARY", "len": 32, "count":6}], "columns": [{"type": "INT"}],
"tags": [{"type": "TINYINT", "count":2}, {"type": "BINARY", "len": 16, "count":5}] "tags": [{"type": "TINYINT", "count":1}]
}] }]
}] }]
} }
...@@ -4444,110 +4444,102 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { ...@@ -4444,110 +4444,102 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
int64_t startTime = pThreadInfo->start_time; int64_t startTime = pThreadInfo->start_time;
int batchPerTblTimes; int batchPerTblTimes;
int prevBatchPerTbl, lastBatchPerTbl; int batchPerTbl;
if (pThreadInfo->ntables == 1) { if ((rowsPerTbl > 0) && (pThreadInfo->ntables > 1)) {
batchPerTblTimes = 1;
lastBatchPerTbl = rowsPerTbl;
prevBatchPerTbl = rowsPerTbl;
} else if (rowsPerTbl > 0) {
batchPerTblTimes = g_args.num_of_RPR / rowsPerTbl; batchPerTblTimes = g_args.num_of_RPR / rowsPerTbl;
lastBatchPerTbl = g_args.num_of_RPR % rowsPerTbl; batchPerTbl = rowsPerTbl;
if (lastBatchPerTbl > 0)
batchPerTblTimes += 1;
else
lastBatchPerTbl = rowsPerTbl;
prevBatchPerTbl = rowsPerTbl;
} else { } else {
batchPerTblTimes = 1; batchPerTblTimes = 1;
prevBatchPerTbl = g_args.num_of_RPR; batchPerTbl = g_args.num_of_RPR;
lastBatchPerTbl = g_args.num_of_RPR;
} }
int generatedRecPerTbl = 0;
while(pThreadInfo->totalInsertRows < pThreadInfo->ntables * insertRows) { while(pThreadInfo->totalInsertRows < pThreadInfo->ntables * insertRows) {
if (insert_interval) { if (insert_interval) {
st = taosGetTimestampUs(); st = taosGetTimestampUs();
} }
// generate data // generate data
memset(buffer, 0, superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len); memset(buffer, 0, superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len);
char *pstr = buffer; char *pstr = buffer;
int recGenerated = 0; int recGenerated = 0;
for (int i = 0; i < batchPerTblTimes; i ++) { for (int i = 0; i < batchPerTblTimes; i ++) {
if (insertMode == INTERLACE_INSERT_MODE) { getTableName(tableName, pThreadInfo, tableSeq);
if (tableSeq == pThreadInfo->start_table_from + pThreadInfo->ntables) {
// turn to first table
tableSeq = pThreadInfo->start_table_from;
}
}
getTableName(tableName, pThreadInfo, tableSeq);
int headLen; int headLen;
if (i == 0) { if (i == 0) {
headLen = generateSQLHead(tableName, tableSeq, pThreadInfo, superTblInfo, pstr); headLen = generateSQLHead(tableName, tableSeq, pThreadInfo, superTblInfo, pstr);
} else { } else {
headLen = snprintf(pstr, TSDB_TABLE_NAME_LEN, "%s.%s values", headLen = snprintf(pstr, TSDB_TABLE_NAME_LEN, "%s.%s values",
pThreadInfo->db_name, pThreadInfo->db_name,
tableName); tableName);
} }
// generate data buffer // generate data buffer
verbosePrint("%s() LN%d i=%d buffer:\n%s\n", verbosePrint("%s() LN%d i=%d buffer:\n%s\n",
__func__, __LINE__, i, buffer); __func__, __LINE__, i, buffer);
pstr += headLen; pstr += headLen;
int dataLen = 0; int dataLen = 0;
int batchPerTbl;
if (i == batchPerTblTimes - 1) {
batchPerTbl = lastBatchPerTbl;
} else {
batchPerTbl = prevBatchPerTbl;
}
verbosePrint("%s() LN%d batchPerTbl = %d\n", printf("%s() LN%d i=%d batchPerTblTimes=%d batchPerTbl = %d\n",
__func__, __LINE__, batchPerTbl); __func__, __LINE__, i, batchPerTblTimes, batchPerTbl);
int numOfRecGenerated = generateDataTail( int numOfRecGenerated = generateDataTail(
tableName, tableSeq, pThreadInfo, superTblInfo, tableName, tableSeq, pThreadInfo, superTblInfo,
batchPerTbl, pstr, insertRows, 0, batchPerTbl, pstr, insertRows, 0,
startTime + pThreadInfo->totalInsertRows * superTblInfo->timeStampStep, startTime + pThreadInfo->totalInsertRows * superTblInfo->timeStampStep,
&(pThreadInfo->samplePos), &dataLen); &(pThreadInfo->samplePos), &dataLen);
verbosePrint("%s() LN%d numOfRecGenerated= %d\n", verbosePrint("%s() LN%d numOfRecGenerated= %d\n",
__func__, __LINE__, numOfRecGenerated); __func__, __LINE__, numOfRecGenerated);
pstr += dataLen; pstr += dataLen;
recGenerated += numOfRecGenerated; recGenerated += numOfRecGenerated;
tableSeq ++; tableSeq ++;
if (insertMode == INTERLACE_INSERT_MODE) {
if (tableSeq == pThreadInfo->start_table_from + pThreadInfo->ntables) {
// turn to first table
tableSeq = pThreadInfo->start_table_from;
generatedRecPerTbl += numOfRecGenerated;
}
} }
verbosePrint("%s() LN%d buffer:\n%s\n",
__func__, __LINE__, buffer);
pThreadInfo->totalInsertRows += recGenerated;
int affectedRows = execInsert(pThreadInfo, buffer, recGenerated); int remainRows = insertRows - generatedRecPerTbl;
if (affectedRows < 0) if (batchPerTbl > remainRows)
batchPerTbl = remainRows;
if ((g_args.num_of_RPR - recGenerated) < batchPerTbl)
break;
}
pThreadInfo->totalInsertRows += recGenerated;
printf("%s() LN%d recGenerated=%d totalInsertRows=%"PRId64" buffer:\n%s\n",
__func__, __LINE__, recGenerated,
pThreadInfo->totalInsertRows, buffer);
int affectedRows = execInsert(pThreadInfo, buffer, recGenerated);
if (affectedRows < 0)
goto free_and_statistics_interlace; goto free_and_statistics_interlace;
pThreadInfo->totalAffectedRows += affectedRows; pThreadInfo->totalAffectedRows += affectedRows;
endTs = taosGetTimestampUs(); endTs = taosGetTimestampUs();
int64_t delay = endTs - startTs; int64_t delay = endTs - startTs;
if (delay > pThreadInfo->maxDelay) pThreadInfo->maxDelay = delay; if (delay > pThreadInfo->maxDelay) pThreadInfo->maxDelay = delay;
if (delay < pThreadInfo->minDelay) pThreadInfo->minDelay = delay; if (delay < pThreadInfo->minDelay) pThreadInfo->minDelay = delay;
pThreadInfo->cntDelay++; pThreadInfo->cntDelay++;
pThreadInfo->totalDelay += delay; pThreadInfo->totalDelay += delay;
int64_t currentPrintTime = taosGetTimestampMs(); int64_t currentPrintTime = taosGetTimestampMs();
if (currentPrintTime - lastPrintTime > 30*1000) { if (currentPrintTime - lastPrintTime > 30*1000) {
printf("thread[%d] has currently inserted rows: %"PRId64 ", affected rows: %"PRId64 "\n", printf("thread[%d] has currently inserted rows: %"PRId64 ", affected rows: %"PRId64 "\n",
pThreadInfo->threadID, pThreadInfo->threadID,
pThreadInfo->totalInsertRows, pThreadInfo->totalInsertRows,
pThreadInfo->totalAffectedRows); pThreadInfo->totalAffectedRows);
lastPrintTime = currentPrintTime; lastPrintTime = currentPrintTime;
} }
if (insert_interval) { if (insert_interval) {
et = taosGetTimestampUs(); et = taosGetTimestampUs();
if (insert_interval > ((et - st)/1000) ) { if (insert_interval > ((et - st)/1000) ) {
...@@ -4556,7 +4548,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { ...@@ -4556,7 +4548,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
__func__, __LINE__, sleep_time); __func__, __LINE__, sleep_time);
taosMsleep(sleep_time); // ms taosMsleep(sleep_time); // ms
} }
} }
} }
free_and_statistics_interlace: free_and_statistics_interlace:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册