未验证 提交 76db6804 编写于 作者: sangshuduo's avatar sangshuduo 提交者: GitHub

Feature/sangshuduo/td 3317 taosdemo interlace (#5773)

* [TD-3316] <fix>: add testcase for taosdemo limit and offset.

check offset 0.

* [TD-3316] <fix>: add testcase for taosdemo limit and offset.

fix sample file import bug.

* [TD-3316] <fix>: add test case for limit and offset. fix sample data issue.

* [TD-3327] <fix>: fix taosdemo segfault when import data from sample data file.

* [TD-3317] <feature>: make taosdemo support interlace mode.

json parameter rows_per_tbl support.

* [TD-3317] <feature>: support interlace mode.

refactor

* [TD-3317] <feature>: support interlace mode.

refactor

* [TD-3317] <feature>: support interlace mode insertion.

refactor.

* [TD-3317] <feature>: support interlace mode insertion.

change json file.

* [TD-3317] <feature>: support interlace mode insertion.

fix multithread create table regression.

* [TD-3317] <feature>: support interlace mode insertion.

working but not perfect.

* [TD-3317] <feature>: support interlace mode insertion.

rename lowaTest with taosdemoTestWithJson

* [TD-3317] <feature>: support interlace mode insertion.

perfect

* [TD-3317] <feature>: support interlace mode insertion.

cleanup.

* [TD-3317] <feature>: support interlace mode insertion.

adjust algorithm of loop times.

* [TD-3317] <feature>: support interlace mode insertion.

fix delay time bug.

* [TD-3317] <feature>: support interlace mode insertion.

fix progressive timestamp bug.

* [TD-3317] <feature>: support interlace mode insertion.

add an option for performance print.

* [TD-3317] <feature>: support interlace mode insertion.

change json test case with less table for acceleration.

* [TD-3317] <feature>: support interlace mode insertion.

change progressive mode timestamp step and testcase.

* [TD-3197] <fix>: fix taosdemo coverity scan issues.

* [TD-3197] <fix>: fix taosdemo coverity scan issue.

fix subscribeTest pids uninitialized.

* [TD-3317] <feature>: support interlace mode insertion.

add time shift for no sleep time.

* [TD-3317] <feature>: support interlace insert.

rework timestamp.

* [TD-3317] <feature>: support interlace mode insertion.

change rows_per_tbl to interlace_rows.

* [TD-3317] <feature>: taosdemo suppoert interlace mode.

remove trailing spaces.

* [TD-3317] <feature>: taosdemo support interlace insertion.

prompt if interlace > num_of_records_per_req

* fill insert-into early to buffer.

* fix buffer overflow issue.

* change rows_per_tbl to interlace_rows to align with taosdemo.
Co-authored-by: NShuduo Sang <sdsang@taosdata.com>
上级 fd5cebdf
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
when in some thread query return error, thread don't exit, but return, otherwise coredump in other thread. when in some thread query return error, thread don't exit, but return, otherwise coredump in other thread.
*/ */
#include <stdint.h>
#define _GNU_SOURCE #define _GNU_SOURCE
#define CURL_STATICLIB #define CURL_STATICLIB
...@@ -3242,7 +3243,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { ...@@ -3242,7 +3243,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
if (numRecPerReq && numRecPerReq->type == cJSON_Number) { if (numRecPerReq && numRecPerReq->type == cJSON_Number) {
g_args.num_of_RPR = numRecPerReq->valueint; g_args.num_of_RPR = numRecPerReq->valueint;
} else if (!numRecPerReq) { } else if (!numRecPerReq) {
g_args.num_of_RPR = 0xffff; g_args.num_of_RPR = INT32_MAX;
} else { } else {
errorPrint("%s() LN%d, failed to read json, num_of_records_per_req not found\n", errorPrint("%s() LN%d, failed to read json, num_of_records_per_req not found\n",
__func__, __LINE__); __func__, __LINE__);
...@@ -4647,7 +4648,7 @@ static int generateSQLHead(char *tableName, int32_t tableSeq, ...@@ -4647,7 +4648,7 @@ static int generateSQLHead(char *tableName, int32_t tableSeq,
len = snprintf( len = snprintf(
headBuf, headBuf,
HEAD_BUFF_LEN, HEAD_BUFF_LEN,
"insert into %s.%s using %s.%s tags %s values", "%s.%s using %s.%s tags %s values",
pThreadInfo->db_name, pThreadInfo->db_name,
tableName, tableName,
pThreadInfo->db_name, pThreadInfo->db_name,
...@@ -4658,14 +4659,14 @@ static int generateSQLHead(char *tableName, int32_t tableSeq, ...@@ -4658,14 +4659,14 @@ static int generateSQLHead(char *tableName, int32_t tableSeq,
len = snprintf( len = snprintf(
headBuf, headBuf,
HEAD_BUFF_LEN, HEAD_BUFF_LEN,
"insert into %s.%s values", "%s.%s values",
pThreadInfo->db_name, pThreadInfo->db_name,
tableName); tableName);
} else { } else {
len = snprintf( len = snprintf(
headBuf, headBuf,
HEAD_BUFF_LEN, HEAD_BUFF_LEN,
"insert into %s.%s values", "%s.%s values",
pThreadInfo->db_name, pThreadInfo->db_name,
tableName); tableName);
} }
...@@ -4673,7 +4674,7 @@ static int generateSQLHead(char *tableName, int32_t tableSeq, ...@@ -4673,7 +4674,7 @@ static int generateSQLHead(char *tableName, int32_t tableSeq,
len = snprintf( len = snprintf(
headBuf, headBuf,
HEAD_BUFF_LEN, HEAD_BUFF_LEN,
"insert into %s.%s values", "%s.%s values",
pThreadInfo->db_name, pThreadInfo->db_name,
tableName); tableName);
} }
...@@ -4694,6 +4695,7 @@ static int generateInterlaceDataBuffer( ...@@ -4694,6 +4695,7 @@ static int generateInterlaceDataBuffer(
int64_t startTime, int64_t startTime,
int *pRemainderBufLen) int *pRemainderBufLen)
{ {
assert(buffer);
char *pstr = buffer; char *pstr = buffer;
SSuperTable* superTblInfo = pThreadInfo->superTblInfo; SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
...@@ -4723,18 +4725,20 @@ static int generateInterlaceDataBuffer( ...@@ -4723,18 +4725,20 @@ static int generateInterlaceDataBuffer(
} else { } else {
startTime = 1500000000000; startTime = 1500000000000;
} }
int k = generateDataTail( int k = generateDataTail(
superTblInfo, superTblInfo,
batchPerTbl, pstr, *pRemainderBufLen, insertRows, 0, batchPerTbl, pstr, *pRemainderBufLen, insertRows, 0,
startTime, startTime,
&(pThreadInfo->samplePos), &dataLen); &(pThreadInfo->samplePos), &dataLen);
if (k > 0) { if (k == batchPerTbl) {
pstr += dataLen; pstr += dataLen;
*pRemainderBufLen -= dataLen; *pRemainderBufLen -= dataLen;
} else { } else {
pstr -= headLen; pstr -= headLen;
pstr[0] = '\0'; pstr[0] = '\0';
k = 0;
} }
return k; return k;
...@@ -4745,7 +4749,8 @@ static int generateProgressiveDataBuffer( ...@@ -4745,7 +4749,8 @@ static int generateProgressiveDataBuffer(
int32_t tableSeq, int32_t tableSeq,
threadInfo *pThreadInfo, char *buffer, threadInfo *pThreadInfo, char *buffer,
int64_t insertRows, int64_t insertRows,
int64_t startFrom, int64_t startTime, int *pSamplePos) int64_t startFrom, int64_t startTime, int *pSamplePos,
int *pRemainderBufLen)
{ {
SSuperTable* superTblInfo = pThreadInfo->superTblInfo; SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
...@@ -4760,27 +4765,24 @@ static int generateProgressiveDataBuffer( ...@@ -4760,27 +4765,24 @@ static int generateProgressiveDataBuffer(
} }
assert(buffer != NULL); assert(buffer != NULL);
char *pstr = buffer;
int k = 0; int k = 0;
int maxSqlLen = superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len;
int remainderBufLen = maxSqlLen;
memset(buffer, 0, maxSqlLen);
char *pstr = buffer; memset(buffer, 0, *pRemainderBufLen);
int headLen = generateSQLHead(tableName, tableSeq, pThreadInfo, superTblInfo, int headLen = generateSQLHead(tableName, tableSeq, pThreadInfo, superTblInfo,
buffer, remainderBufLen); buffer, *pRemainderBufLen);
if (headLen <= 0) { if (headLen <= 0) {
return 0; return 0;
} }
pstr += headLen; pstr += headLen;
remainderBufLen -= headLen; *pRemainderBufLen -= headLen;
int dataLen; int dataLen;
k = generateDataTail(superTblInfo, k = generateDataTail(superTblInfo,
g_args.num_of_RPR, pstr, remainderBufLen, insertRows, startFrom, g_args.num_of_RPR, pstr, *pRemainderBufLen, insertRows, startFrom,
startTime, startTime,
pSamplePos, &dataLen); pSamplePos, &dataLen);
...@@ -4842,18 +4844,17 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { ...@@ -4842,18 +4844,17 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
int64_t startTime = pThreadInfo->start_time; int64_t startTime = pThreadInfo->start_time;
int batchPerTblTimes;
int batchPerTbl;
assert(pThreadInfo->ntables > 0); assert(pThreadInfo->ntables > 0);
if (interlaceRows > g_args.num_of_RPR) if (interlaceRows > g_args.num_of_RPR)
interlaceRows = g_args.num_of_RPR; interlaceRows = g_args.num_of_RPR;
batchPerTbl = interlaceRows; int batchPerTbl = interlaceRows;
int batchPerTblTimes;
if ((interlaceRows > 0) && (pThreadInfo->ntables > 1)) { if ((interlaceRows > 0) && (pThreadInfo->ntables > 1)) {
batchPerTblTimes = batchPerTblTimes =
(g_args.num_of_RPR / (interlaceRows * pThreadInfo->ntables)) + 1; g_args.num_of_RPR / interlaceRows;
} else { } else {
batchPerTblTimes = 1; batchPerTblTimes = 1;
} }
...@@ -4862,6 +4863,9 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { ...@@ -4862,6 +4863,9 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
bool flagSleep = true; bool flagSleep = true;
int sleepTimeTotal = 0; int sleepTimeTotal = 0;
char *strInsertInto = "insert into ";
int nInsertBufLen = strlen(strInsertInto);
while(pThreadInfo->totalInsertRows < pThreadInfo->ntables * insertRows) { while(pThreadInfo->totalInsertRows < pThreadInfo->ntables * insertRows) {
if ((flagSleep) && (insert_interval)) { if ((flagSleep) && (insert_interval)) {
st = taosGetTimestampUs(); st = taosGetTimestampUs();
...@@ -4872,6 +4876,11 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { ...@@ -4872,6 +4876,11 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
int remainderBufLen = maxSqlLen; int remainderBufLen = maxSqlLen;
char *pstr = buffer; char *pstr = buffer;
int len = snprintf(pstr, nInsertBufLen + 1, "%s", strInsertInto);
pstr += len;
remainderBufLen -= len;
int recOfBatch = 0; int recOfBatch = 0;
for (int i = 0; i < batchPerTblTimes; i ++) { for (int i = 0; i < batchPerTblTimes; i ++) {
...@@ -4883,6 +4892,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { ...@@ -4883,6 +4892,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
return NULL; return NULL;
} }
int oldRemainderLen = remainderBufLen;
int generated = generateInterlaceDataBuffer( int generated = generateInterlaceDataBuffer(
tableName, batchPerTbl, i, batchPerTblTimes, tableName, batchPerTbl, i, batchPerTblTimes,
tableSeq, tableSeq,
...@@ -4901,6 +4911,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { ...@@ -4901,6 +4911,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
tableSeq ++; tableSeq ++;
recOfBatch += batchPerTbl; recOfBatch += batchPerTbl;
pstr += (oldRemainderLen - remainderBufLen);
// startTime += batchPerTbl * superTblInfo->timeStampStep; // startTime += batchPerTbl * superTblInfo->timeStampStep;
pThreadInfo->totalInsertRows += batchPerTbl; pThreadInfo->totalInsertRows += batchPerTbl;
verbosePrint("[%d] %s() LN%d batchPerTbl=%d recOfBatch=%d\n", verbosePrint("[%d] %s() LN%d batchPerTbl=%d recOfBatch=%d\n",
...@@ -5012,11 +5023,12 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { ...@@ -5012,11 +5023,12 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
debugPrint("%s() LN%d: ### progressive write\n", __func__, __LINE__); debugPrint("%s() LN%d: ### progressive write\n", __func__, __LINE__);
SSuperTable* superTblInfo = pThreadInfo->superTblInfo; SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
int maxSqlLen = superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len;
char* buffer = calloc(superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len, 1); char* buffer = calloc(maxSqlLen, 1);
if (NULL == buffer) { if (NULL == buffer) {
errorPrint( "Failed to alloc %d Bytes, reason:%s\n", errorPrint( "Failed to alloc %d Bytes, reason:%s\n",
superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len, maxSqlLen,
strerror(errno)); strerror(errno));
return NULL; return NULL;
} }
...@@ -5059,10 +5071,20 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { ...@@ -5059,10 +5071,20 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
__func__, __LINE__, __func__, __LINE__,
pThreadInfo->threadID, tableSeq, tableName); pThreadInfo->threadID, tableSeq, tableName);
int remainderBufLen = maxSqlLen;
char *pstr = buffer;
int nInsertBufLen = strlen("insert into ");
int len = snprintf(pstr, nInsertBufLen + 1, "%s", "insert into ");
pstr += len;
remainderBufLen -= len;
int generated = generateProgressiveDataBuffer( int generated = generateProgressiveDataBuffer(
tableName, tableSeq, pThreadInfo, buffer, insertRows, tableName, tableSeq, pThreadInfo, pstr, insertRows,
i, start_time, i, start_time,
&(pThreadInfo->samplePos)); &(pThreadInfo->samplePos),
&remainderBufLen);
if (generated > 0) if (generated > 0)
i += generated; i += generated;
else else
......
...@@ -51,7 +51,7 @@ class taosdemoPerformace: ...@@ -51,7 +51,7 @@ class taosdemoPerformace:
"insert_rows": 100000, "insert_rows": 100000,
"multi_thread_write_one_tbl": "no", "multi_thread_write_one_tbl": "no",
"number_of_tbl_in_one_sql": 0, "number_of_tbl_in_one_sql": 0,
"rows_per_tbl": 100, "interlace_rows": 100,
"max_sql_len": 1024000, "max_sql_len": 1024000,
"disorder_ratio": 0, "disorder_ratio": 0,
"disorder_range": 1000, "disorder_range": 1000,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册