未验证 提交 fd5cebdf 编写于 作者: sangshuduo's avatar sangshuduo 提交者: GitHub

Hotfix/sangshuduo/td 3733 for develop (#5768)

* [TD-3733]<fix>: taosdemo buffer processing refactor.

* [TD-3733]<fix>: taosdemo generate SQL head buffer process.

* [TD-3733]<fix>: taosdemo generate sql head buffer processing.

fix compile issue for windows.

* [TD-3733]<fix>: taosdemo generate sql head buffer processing.

fix max_sql_len. generate data buffer refactor.
Co-authored-by: NShuduo Sang <sdsang@taosdata.com>
上级 0913d455
...@@ -2949,9 +2949,6 @@ static int readSampleFromCsvFileToMem( ...@@ -2949,9 +2949,6 @@ static int readSampleFromCsvFileToMem(
continue; continue;
} }
verbosePrint("readLen=%ld stb->lenOfOneRow=%d getRows=%d\n", (long)readLen,
superTblInfo->lenOfOneRow, getRows);
memcpy(superTblInfo->sampleDataBuf + getRows * superTblInfo->lenOfOneRow, memcpy(superTblInfo->sampleDataBuf + getRows * superTblInfo->lenOfOneRow,
line, readLen); line, readLen);
getRows++; getRows++;
...@@ -3214,9 +3211,9 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { ...@@ -3214,9 +3211,9 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
// rows per table need be less than insert batch // rows per table need be less than insert batch
if (g_args.interlace_rows > g_args.num_of_RPR) { if (g_args.interlace_rows > g_args.num_of_RPR) {
printf("NOTICE: interlace rows value %d > num_of_records_per_request %d\n\n", printf("NOTICE: interlace rows value %d > num_of_records_per_req %d\n\n",
g_args.interlace_rows, g_args.num_of_RPR); g_args.interlace_rows, g_args.num_of_RPR);
printf(" interlace rows value will be set to num_of_records_per_request %d\n\n", printf(" interlace rows value will be set to num_of_records_per_req %d\n\n",
g_args.num_of_RPR); g_args.num_of_RPR);
printf(" press Enter key to continue or Ctrl-C to stop."); printf(" press Enter key to continue or Ctrl-C to stop.");
(void)getchar(); (void)getchar();
...@@ -3527,6 +3524,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { ...@@ -3527,6 +3524,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
goto PARSE_OVER; goto PARSE_OVER;
} }
/*
cJSON* batchCreateTbl = cJSON_GetObjectItem(stbInfo, "batch_create_tbl_num"); cJSON* batchCreateTbl = cJSON_GetObjectItem(stbInfo, "batch_create_tbl_num");
if (batchCreateTbl && batchCreateTbl->type == cJSON_Number) { if (batchCreateTbl && batchCreateTbl->type == cJSON_Number) {
g_Dbs.db[i].superTbls[j].batchCreateTableNum = batchCreateTbl->valueint; g_Dbs.db[i].superTbls[j].batchCreateTableNum = batchCreateTbl->valueint;
...@@ -3536,6 +3534,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { ...@@ -3536,6 +3534,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
printf("ERROR: failed to read json, batch_create_tbl_num not found\n"); printf("ERROR: failed to read json, batch_create_tbl_num not found\n");
goto PARSE_OVER; goto PARSE_OVER;
} }
*/
cJSON *childTblExists = cJSON_GetObjectItem(stbInfo, "child_table_exists"); // yes, no cJSON *childTblExists = cJSON_GetObjectItem(stbInfo, "child_table_exists"); // yes, no
if (childTblExists if (childTblExists
...@@ -3683,14 +3682,15 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { ...@@ -3683,14 +3682,15 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
int32_t len = maxSqlLen->valueint; int32_t len = maxSqlLen->valueint;
if (len > TSDB_MAX_ALLOWED_SQL_LEN) { if (len > TSDB_MAX_ALLOWED_SQL_LEN) {
len = TSDB_MAX_ALLOWED_SQL_LEN; len = TSDB_MAX_ALLOWED_SQL_LEN;
} else if (len < TSDB_MAX_SQL_LEN) { } else if (len < 5) {
len = TSDB_MAX_SQL_LEN; len = 5;
} }
g_Dbs.db[i].superTbls[j].maxSqlLen = len; g_Dbs.db[i].superTbls[j].maxSqlLen = len;
} else if (!maxSqlLen) { } else if (!maxSqlLen) {
g_Dbs.db[i].superTbls[j].maxSqlLen = g_args.max_sql_len; g_Dbs.db[i].superTbls[j].maxSqlLen = g_args.max_sql_len;
} else { } else {
printf("ERROR: failed to read json, maxSqlLen not found\n"); errorPrint("%s() LN%d, failed to read json, maxSqlLen input mistake\n",
__func__, __LINE__);
goto PARSE_OVER; goto PARSE_OVER;
} }
...@@ -3716,9 +3716,9 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { ...@@ -3716,9 +3716,9 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
g_Dbs.db[i].superTbls[j].interlaceRows = interlaceRows->valueint; g_Dbs.db[i].superTbls[j].interlaceRows = interlaceRows->valueint;
// rows per table need be less than insert batch // rows per table need be less than insert batch
if (g_Dbs.db[i].superTbls[j].interlaceRows > g_args.num_of_RPR) { if (g_Dbs.db[i].superTbls[j].interlaceRows > g_args.num_of_RPR) {
printf("NOTICE: db[%d].superTbl[%d]'s interlace rows value %d > num_of_records_per_request %d\n\n", printf("NOTICE: db[%d].superTbl[%d]'s interlace rows value %d > num_of_records_per_req %d\n\n",
i, j, g_Dbs.db[i].superTbls[j].interlaceRows, g_args.num_of_RPR); i, j, g_Dbs.db[i].superTbls[j].interlaceRows, g_args.num_of_RPR);
printf(" interlace rows value will be set to num_of_records_per_request %d\n\n", printf(" interlace rows value will be set to num_of_records_per_req %d\n\n",
g_args.num_of_RPR); g_args.num_of_RPR);
printf(" press Enter key to continue or Ctrl-C to stop."); printf(" press Enter key to continue or Ctrl-C to stop.");
(void)getchar(); (void)getchar();
...@@ -4512,13 +4512,15 @@ static void getTableName(char *pTblName, threadInfo* pThreadInfo, int tableSeq) ...@@ -4512,13 +4512,15 @@ static void getTableName(char *pTblName, threadInfo* pThreadInfo, int tableSeq)
} }
} }
static int generateDataTail(char *tableName, int32_t tableSeq, static int generateDataTail(
threadInfo* pThreadInfo, SSuperTable* superTblInfo, SSuperTable* superTblInfo,
int batch, char* buffer, int remainderBufLen, int64_t insertRows, int batch, char* buffer, int remainderBufLen, int64_t insertRows,
int64_t startFrom, uint64_t startTime, int *pSamplePos, int *dataLen) { int64_t startFrom, uint64_t startTime, int *pSamplePos, int *dataLen) {
int len = 0; int len = 0;
int ncols_per_record = 1; // count first col ts int ncols_per_record = 1; // count first col ts
char *pstr = buffer;
if (superTblInfo == NULL) { if (superTblInfo == NULL) {
int datatypeSeq = 0; int datatypeSeq = 0;
while(g_args.datatype[datatypeSeq]) { while(g_args.datatype[datatypeSeq]) {
...@@ -4547,15 +4549,14 @@ static int generateDataTail(char *tableName, int32_t tableSeq, ...@@ -4547,15 +4549,14 @@ static int generateDataTail(char *tableName, int32_t tableSeq,
pSamplePos); pSamplePos);
} else if (0 == strncasecmp(superTblInfo->dataSource, } else if (0 == strncasecmp(superTblInfo->dataSource,
"rand", strlen("rand"))) { "rand", strlen("rand"))) {
int rand_num = taosRandom() % 100;
int randTail; int randTail = superTblInfo->timeStampStep * k;
if (0 != superTblInfo->disorderRatio if (superTblInfo->disorderRatio > 0) {
&& rand_num < superTblInfo->disorderRatio) { int rand_num = taosRandom() % 100;
randTail = (superTblInfo->timeStampStep * k if(rand_num < superTblInfo->disorderRatio) {
+ (taosRandom() % superTblInfo->disorderRange + 1)) * (-1); randTail = (randTail + (taosRandom() % superTblInfo->disorderRange + 1)) * (-1);
debugPrint("rand data generated, back %d\n", randTail); debugPrint("rand data generated, back %d\n", randTail);
} else { }
randTail = superTblInfo->timeStampStep * k;
} }
uint64_t d = startTime uint64_t d = startTime
...@@ -4570,7 +4571,7 @@ static int generateDataTail(char *tableName, int32_t tableSeq, ...@@ -4570,7 +4571,7 @@ static int generateDataTail(char *tableName, int32_t tableSeq,
break; break;
} }
buffer += snprintf(buffer, retLen + 1, "%s", data); pstr += snprintf(pstr , retLen + 1, "%s", data);
k++; k++;
len += retLen; len += retLen;
remainderBufLen -= retLen; remainderBufLen -= retLen;
...@@ -4598,7 +4599,7 @@ static int generateDataTail(char *tableName, int32_t tableSeq, ...@@ -4598,7 +4599,7 @@ static int generateDataTail(char *tableName, int32_t tableSeq,
if (len > remainderBufLen) if (len > remainderBufLen)
break; break;
buffer += sprintf(buffer, " %s", data); pstr += sprintf(pstr, " %s", data);
k++; k++;
len += retLen; len += retLen;
remainderBufLen -= retLen; remainderBufLen -= retLen;
...@@ -4619,9 +4620,14 @@ static int generateDataTail(char *tableName, int32_t tableSeq, ...@@ -4619,9 +4620,14 @@ static int generateDataTail(char *tableName, int32_t tableSeq,
} }
static int generateSQLHead(char *tableName, int32_t tableSeq, static int generateSQLHead(char *tableName, int32_t tableSeq,
threadInfo* pThreadInfo, SSuperTable* superTblInfo, char *buffer) threadInfo* pThreadInfo, SSuperTable* superTblInfo,
char *buffer, int remainderBufLen)
{ {
int len; int len;
#define HEAD_BUFF_LEN 1024*24 // 16*1024 + (192+32)*2 + insert into ..
char headBuf[HEAD_BUFF_LEN];
if (superTblInfo) { if (superTblInfo) {
if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) { if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) {
char* tagsValBuf = NULL; char* tagsValBuf = NULL;
...@@ -4638,8 +4644,9 @@ static int generateSQLHead(char *tableName, int32_t tableSeq, ...@@ -4638,8 +4644,9 @@ static int generateSQLHead(char *tableName, int32_t tableSeq,
return -1; return -1;
} }
len = snprintf(buffer, len = snprintf(
superTblInfo->maxSqlLen, headBuf,
HEAD_BUFF_LEN,
"insert into %s.%s using %s.%s tags %s values", "insert into %s.%s using %s.%s tags %s values",
pThreadInfo->db_name, pThreadInfo->db_name,
tableName, tableName,
...@@ -4648,30 +4655,93 @@ static int generateSQLHead(char *tableName, int32_t tableSeq, ...@@ -4648,30 +4655,93 @@ static int generateSQLHead(char *tableName, int32_t tableSeq,
tagsValBuf); tagsValBuf);
tmfree(tagsValBuf); tmfree(tagsValBuf);
} else if (TBL_ALREADY_EXISTS == superTblInfo->childTblExists) { } else if (TBL_ALREADY_EXISTS == superTblInfo->childTblExists) {
len = snprintf(buffer, len = snprintf(
superTblInfo->maxSqlLen, headBuf,
HEAD_BUFF_LEN,
"insert into %s.%s values", "insert into %s.%s values",
pThreadInfo->db_name, pThreadInfo->db_name,
tableName); tableName);
} else { } else {
len = snprintf(buffer, len = snprintf(
superTblInfo->maxSqlLen, headBuf,
HEAD_BUFF_LEN,
"insert into %s.%s values", "insert into %s.%s values",
pThreadInfo->db_name, pThreadInfo->db_name,
tableName); tableName);
} }
} else { } else {
len = snprintf(buffer, len = snprintf(
g_args.max_sql_len, headBuf,
HEAD_BUFF_LEN,
"insert into %s.%s values", "insert into %s.%s values",
pThreadInfo->db_name, pThreadInfo->db_name,
tableName); tableName);
} }
if (len > remainderBufLen)
return -1;
tstrncpy(buffer, headBuf, len + 1);
return len; return len;
} }
static int generateProgressiveDataBuffer(char *pTblName, static int generateInterlaceDataBuffer(
char *tableName, int batchPerTbl, int i, int batchPerTblTimes,
int32_t tableSeq,
threadInfo *pThreadInfo, char *buffer,
int64_t insertRows,
int64_t startTime,
int *pRemainderBufLen)
{
char *pstr = buffer;
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
int headLen = generateSQLHead(tableName, tableSeq, pThreadInfo,
superTblInfo, pstr, *pRemainderBufLen);
if (headLen <= 0) {
return 0;
}
// generate data buffer
verbosePrint("[%d] %s() LN%d i=%d buffer:\n%s\n",
pThreadInfo->threadID, __func__, __LINE__, i, buffer);
pstr += headLen;
*pRemainderBufLen -= headLen;
int dataLen = 0;
verbosePrint("[%d] %s() LN%d i=%d batchPerTblTimes=%d batchPerTbl = %d\n",
pThreadInfo->threadID, __func__, __LINE__,
i, batchPerTblTimes, batchPerTbl);
if (superTblInfo) {
if (0 == strncasecmp(superTblInfo->startTimestamp, "now", 3)) {
startTime = taosGetTimestamp(pThreadInfo->time_precision);
}
} else {
startTime = 1500000000000;
}
int k = generateDataTail(
superTblInfo,
batchPerTbl, pstr, *pRemainderBufLen, insertRows, 0,
startTime,
&(pThreadInfo->samplePos), &dataLen);
if (k > 0) {
pstr += dataLen;
*pRemainderBufLen -= dataLen;
} else {
pstr -= headLen;
pstr[0] = '\0';
}
return k;
}
static int generateProgressiveDataBuffer(
char *tableName,
int32_t tableSeq, int32_t tableSeq,
threadInfo *pThreadInfo, char *buffer, threadInfo *pThreadInfo, char *buffer,
int64_t insertRows, int64_t insertRows,
...@@ -4691,6 +4761,7 @@ static int generateProgressiveDataBuffer(char *pTblName, ...@@ -4691,6 +4761,7 @@ static int generateProgressiveDataBuffer(char *pTblName,
assert(buffer != NULL); assert(buffer != NULL);
int k = 0;
int maxSqlLen = superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len; int maxSqlLen = superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len;
int remainderBufLen = maxSqlLen; int remainderBufLen = maxSqlLen;
...@@ -4698,14 +4769,17 @@ static int generateProgressiveDataBuffer(char *pTblName, ...@@ -4698,14 +4769,17 @@ static int generateProgressiveDataBuffer(char *pTblName,
char *pstr = buffer; char *pstr = buffer;
int headLen = generateSQLHead(pTblName, tableSeq, pThreadInfo, superTblInfo, int headLen = generateSQLHead(tableName, tableSeq, pThreadInfo, superTblInfo,
buffer); buffer, remainderBufLen);
if (headLen <= 0) {
return 0;
}
pstr += headLen; pstr += headLen;
remainderBufLen -= headLen; remainderBufLen -= headLen;
int k;
int dataLen; int dataLen;
k = generateDataTail(pTblName, tableSeq, pThreadInfo, superTblInfo, k = generateDataTail(superTblInfo,
g_args.num_of_RPR, pstr, remainderBufLen, insertRows, startFrom, g_args.num_of_RPR, pstr, remainderBufLen, insertRows, startFrom,
startTime, startTime,
pSamplePos, &dataLen); pSamplePos, &dataLen);
...@@ -4722,8 +4796,8 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { ...@@ -4722,8 +4796,8 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
int64_t insertRows = (superTblInfo)?superTblInfo->insertRows:g_args.num_of_DPT; int64_t insertRows = (superTblInfo)?superTblInfo->insertRows:g_args.num_of_DPT;
int interlaceRows = superTblInfo?superTblInfo->interlaceRows:g_args.interlace_rows; int interlaceRows = superTblInfo?superTblInfo->interlaceRows:g_args.interlace_rows;
if (interlaceRows > insertRows) if (interlaceRows > g_args.num_of_RPR)
interlaceRows = insertRows; interlaceRows = g_args.num_of_RPR;
int insertMode; int insertMode;
...@@ -4788,8 +4862,6 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { ...@@ -4788,8 +4862,6 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
bool flagSleep = true; bool flagSleep = true;
int sleepTimeTotal = 0; int sleepTimeTotal = 0;
int remainderBufLen;
while(pThreadInfo->totalInsertRows < pThreadInfo->ntables * insertRows) { while(pThreadInfo->totalInsertRows < pThreadInfo->ntables * insertRows) {
if ((flagSleep) && (insert_interval)) { if ((flagSleep) && (insert_interval)) {
st = taosGetTimestampUs(); st = taosGetTimestampUs();
...@@ -4797,7 +4869,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { ...@@ -4797,7 +4869,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
} }
// generate data // generate data
memset(buffer, 0, maxSqlLen); memset(buffer, 0, maxSqlLen);
remainderBufLen = maxSqlLen; int remainderBufLen = maxSqlLen;
char *pstr = buffer; char *pstr = buffer;
int recOfBatch = 0; int recOfBatch = 0;
...@@ -4811,50 +4883,23 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { ...@@ -4811,50 +4883,23 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
return NULL; return NULL;
} }
int headLen; int generated = generateInterlaceDataBuffer(
if (i == 0) { tableName, batchPerTbl, i, batchPerTblTimes,
headLen = generateSQLHead(tableName, tableSeq, pThreadInfo, tableSeq,
superTblInfo, pstr); pThreadInfo, pstr,
} else { insertRows,
headLen = snprintf(pstr, TSDB_TABLE_NAME_LEN, "%s.%s values",
pThreadInfo->db_name,
tableName);
}
// generate data buffer
verbosePrint("[%d] %s() LN%d i=%d buffer:\n%s\n",
pThreadInfo->threadID, __func__, __LINE__, i, buffer);
pstr += headLen;
remainderBufLen -= headLen;
int dataLen = 0;
verbosePrint("[%d] %s() LN%d i=%d batchPerTblTimes=%d batchPerTbl = %d\n",
pThreadInfo->threadID, __func__, __LINE__,
i, batchPerTblTimes, batchPerTbl);
if (superTblInfo) {
if (0 == strncasecmp(superTblInfo->startTimestamp, "now", 3)) {
startTime = taosGetTimestamp(pThreadInfo->time_precision);
}
} else {
startTime = 1500000000000;
}
int generated = generateDataTail(
tableName, tableSeq, pThreadInfo, superTblInfo,
batchPerTbl, pstr, remainderBufLen, insertRows, 0,
startTime, startTime,
&(pThreadInfo->samplePos), &dataLen); &remainderBufLen);
if (generated < 0) { if (generated < 0) {
debugPrint("[%d] %s() LN%d, generated data is %d\n", debugPrint("[%d] %s() LN%d, generated data is %d\n",
pThreadInfo->threadID, __func__, __LINE__, generated); pThreadInfo->threadID, __func__, __LINE__, generated);
goto free_and_statistics_interlace; goto free_and_statistics_interlace;
} else if (generated == 0) {
break;
} }
pstr += dataLen;
remainderBufLen -= dataLen;
tableSeq ++;
recOfBatch += batchPerTbl; recOfBatch += batchPerTbl;
// startTime += batchPerTbl * superTblInfo->timeStampStep; // startTime += batchPerTbl * superTblInfo->timeStampStep;
pThreadInfo->totalInsertRows += batchPerTbl; pThreadInfo->totalInsertRows += batchPerTbl;
...@@ -4862,7 +4907,6 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { ...@@ -4862,7 +4907,6 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
pThreadInfo->threadID, __func__, __LINE__, pThreadInfo->threadID, __func__, __LINE__,
batchPerTbl, recOfBatch); batchPerTbl, recOfBatch);
tableSeq ++;
if (insertMode == INTERLACE_INSERT_MODE) { if (insertMode == INTERLACE_INSERT_MODE) {
if (tableSeq == pThreadInfo->start_table_from + pThreadInfo->ntables) { if (tableSeq == pThreadInfo->start_table_from + pThreadInfo->ntables) {
// turn to first table // turn to first table
...@@ -5071,11 +5115,13 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { ...@@ -5071,11 +5115,13 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
*/ */
} // num_of_DPT } // num_of_DPT
if ((tableSeq == pThreadInfo->ntables - 1) && superTblInfo && if (g_args.verbose_print) {
if ((tableSeq == pThreadInfo->ntables - 1) && superTblInfo &&
(0 == strncasecmp( (0 == strncasecmp(
superTblInfo->dataSource, "sample", strlen("sample")))) { superTblInfo->dataSource, "sample", strlen("sample")))) {
printf("%s() LN%d samplePos=%d\n", verbosePrint("%s() LN%d samplePos=%d\n",
__func__, __LINE__, pThreadInfo->samplePos); __func__, __LINE__, pThreadInfo->samplePos);
}
} }
} // tableSeq } // tableSeq
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册