未验证 提交 243a3d16 编写于 作者: sangshuduo's avatar sangshuduo 提交者: GitHub

Feature/sangshuduo/td 4068 taosdemo stmt (#6178)

* merge with develop branch.

change query/tests/CMakeLists.txt to allow unused function and variable.

* refactor data generating.

* refactor.

* refactor.

* refactor.

* refactor.

* refactor
Co-authored-by: NShuduo Sang <sdsang@taosdata.com>
上级 d05fa5c8
...@@ -61,6 +61,8 @@ extern char configDir[]; ...@@ -61,6 +61,8 @@ extern char configDir[];
#define QUERY_JSON_NAME "query.json" #define QUERY_JSON_NAME "query.json"
#define SUBSCRIBE_JSON_NAME "subscribe.json" #define SUBSCRIBE_JSON_NAME "subscribe.json"
#define STR_INSERT_INTO "INSERT INTO "
enum TEST_MODE { enum TEST_MODE {
INSERT_TEST, // 0 INSERT_TEST, // 0
QUERY_TEST, // 1 QUERY_TEST, // 1
...@@ -5093,7 +5095,8 @@ static int generateStbSQLHead( ...@@ -5093,7 +5095,8 @@ static int generateStbSQLHead(
return len; return len;
} }
static int64_t generateInterlaceDataBuffer( static int64_t generateStbInterlaceData(
SSuperTable *superTblInfo,
char *tableName, uint64_t batchPerTbl, uint64_t i, uint64_t batchPerTblTimes, char *tableName, uint64_t batchPerTbl, uint64_t i, uint64_t batchPerTblTimes,
uint64_t tableSeq, uint64_t tableSeq,
threadInfo *pThreadInfo, char *buffer, threadInfo *pThreadInfo, char *buffer,
...@@ -5103,7 +5106,6 @@ static int64_t generateInterlaceDataBuffer( ...@@ -5103,7 +5106,6 @@ static int64_t generateInterlaceDataBuffer(
{ {
assert(buffer); assert(buffer);
char *pstr = buffer; char *pstr = buffer;
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
int headLen = generateStbSQLHead( int headLen = generateStbSQLHead(
superTblInfo, superTblInfo,
...@@ -5126,24 +5128,58 @@ static int64_t generateInterlaceDataBuffer( ...@@ -5126,24 +5128,58 @@ static int64_t generateInterlaceDataBuffer(
pThreadInfo->threadID, __func__, __LINE__, pThreadInfo->threadID, __func__, __LINE__,
i, batchPerTblTimes, batchPerTbl); i, batchPerTblTimes, batchPerTbl);
int64_t k;
if (superTblInfo) {
if (0 == strncasecmp(superTblInfo->startTimestamp, "now", 3)) { if (0 == strncasecmp(superTblInfo->startTimestamp, "now", 3)) {
startTime = taosGetTimestamp(pThreadInfo->time_precision); startTime = taosGetTimestamp(pThreadInfo->time_precision);
} }
k = generateStbDataTail( int64_t k = generateStbDataTail(
superTblInfo, superTblInfo,
batchPerTbl, pstr, *pRemainderBufLen, insertRows, 0, batchPerTbl, pstr, *pRemainderBufLen, insertRows, 0,
startTime, startTime,
&(pThreadInfo->samplePos), &dataLen); &(pThreadInfo->samplePos), &dataLen);
if (k == batchPerTbl) {
pstr += dataLen;
*pRemainderBufLen -= dataLen;
} else { } else {
startTime = 1500000000000; debugPrint("%s() LN%d, generated data tail: %"PRIu64", not equal batch per table: %"PRIu64"\n",
k = generateDataTailWithoutStb( __func__, __LINE__, k, batchPerTbl);
pstr -= headLen;
pstr[0] = '\0';
k = 0;
}
return k;
}
static int64_t generateInterlaceDataWithoutStb(
char *tableName, uint64_t batchPerTbl,
uint64_t tableSeq,
char *dbName, char *buffer,
int64_t insertRows,
uint64_t *pRemainderBufLen)
{
assert(buffer);
char *pstr = buffer;
int headLen = generateSQLHeadWithoutStb(
tableName, dbName,
pstr, *pRemainderBufLen);
if (headLen <= 0) {
return 0;
}
pstr += headLen;
*pRemainderBufLen -= headLen;
int64_t dataLen = 0;
int64_t startTime = 1500000000000;
int64_t k = generateDataTailWithoutStb(
batchPerTbl, pstr, *pRemainderBufLen, insertRows, 0, batchPerTbl, pstr, *pRemainderBufLen, insertRows, 0,
startTime, startTime,
/* &(pThreadInfo->samplePos), */&dataLen); &dataLen);
}
if (k == batchPerTbl) { if (k == batchPerTbl) {
pstr += dataLen; pstr += dataLen;
...@@ -5159,33 +5195,24 @@ static int64_t generateInterlaceDataBuffer( ...@@ -5159,33 +5195,24 @@ static int64_t generateInterlaceDataBuffer(
return k; return k;
} }
static int64_t generateProgressiveDataBuffer( static int64_t generateStbProgressiveData(
SSuperTable *superTblInfo,
char *tableName, char *tableName,
int64_t tableSeq, int64_t tableSeq,
threadInfo *pThreadInfo, char *buffer, char *dbName, char *buffer,
int64_t insertRows, int64_t insertRows,
uint64_t startFrom, int64_t startTime, int64_t *pSamplePos, uint64_t startFrom, int64_t startTime, int64_t *pSamplePos,
int64_t *pRemainderBufLen) int64_t *pRemainderBufLen)
{ {
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
assert(buffer != NULL); assert(buffer != NULL);
char *pstr = buffer; char *pstr = buffer;
memset(buffer, 0, *pRemainderBufLen); memset(buffer, 0, *pRemainderBufLen);
int64_t headLen; int64_t headLen = generateStbSQLHead(
if (superTblInfo) {
headLen = generateStbSQLHead(
superTblInfo, superTblInfo,
tableName, tableSeq, pThreadInfo->db_name, tableName, tableSeq, dbName,
buffer, *pRemainderBufLen);
} else {
headLen = generateSQLHeadWithoutStb(
tableName, pThreadInfo->db_name,
buffer, *pRemainderBufLen); buffer, *pRemainderBufLen);
}
if (headLen <= 0) { if (headLen <= 0) {
return 0; return 0;
...@@ -5194,22 +5221,43 @@ static int64_t generateProgressiveDataBuffer( ...@@ -5194,22 +5221,43 @@ static int64_t generateProgressiveDataBuffer(
*pRemainderBufLen -= headLen; *pRemainderBufLen -= headLen;
int64_t dataLen; int64_t dataLen;
int64_t k;
if (superTblInfo) { return generateStbDataTail(superTblInfo,
k = generateStbDataTail(superTblInfo,
g_args.num_of_RPR, pstr, *pRemainderBufLen, g_args.num_of_RPR, pstr, *pRemainderBufLen,
insertRows, startFrom, insertRows, startFrom,
startTime, startTime,
pSamplePos, &dataLen); pSamplePos, &dataLen);
} else { }
k = generateDataTailWithoutStb(
static int64_t generateProgressiveDataWithoutStb(
char *tableName,
int64_t tableSeq,
threadInfo *pThreadInfo, char *buffer,
int64_t insertRows,
uint64_t startFrom, int64_t startTime, int64_t *pSamplePos,
int64_t *pRemainderBufLen)
{
assert(buffer != NULL);
char *pstr = buffer;
memset(buffer, 0, *pRemainderBufLen);
int64_t headLen = generateSQLHeadWithoutStb(
tableName, pThreadInfo->db_name,
buffer, *pRemainderBufLen);
if (headLen <= 0) {
return 0;
}
pstr += headLen;
*pRemainderBufLen -= headLen;
int64_t dataLen;
return generateDataTailWithoutStb(
g_args.num_of_RPR, pstr, *pRemainderBufLen, insertRows, startFrom, g_args.num_of_RPR, pstr, *pRemainderBufLen, insertRows, startFrom,
startTime, startTime,
/*pSamplePos, */&dataLen); /*pSamplePos, */&dataLen);
}
return k;
} }
static void printStatPerThread(threadInfo *pThreadInfo) static void printStatPerThread(threadInfo *pThreadInfo)
...@@ -5228,6 +5276,9 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { ...@@ -5228,6 +5276,9 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
int64_t insertRows; int64_t insertRows;
uint64_t interlaceRows; uint64_t interlaceRows;
uint64_t maxSqlLen;
int64_t nTimeStampStep;
uint64_t insert_interval;
SSuperTable* superTblInfo = pThreadInfo->superTblInfo; SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
...@@ -5240,26 +5291,38 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { ...@@ -5240,26 +5291,38 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
} else { } else {
interlaceRows = superTblInfo->interlaceRows; interlaceRows = superTblInfo->interlaceRows;
} }
maxSqlLen = superTblInfo->maxSqlLen;
nTimeStampStep = superTblInfo->timeStampStep;
insert_interval = superTblInfo->insertInterval;
} else { } else {
insertRows = g_args.num_of_DPT; insertRows = g_args.num_of_DPT;
interlaceRows = g_args.interlace_rows; interlaceRows = g_args.interlace_rows;
maxSqlLen = g_args.max_sql_len;
nTimeStampStep = DEFAULT_TIMESTAMP_STEP;
insert_interval = g_args.insert_interval;
} }
debugPrint("[%d] %s() LN%d: start_table_from=%"PRIu64" ntables=%"PRId64" insertRows=%"PRIu64"\n",
pThreadInfo->threadID, __func__, __LINE__,
pThreadInfo->start_table_from,
pThreadInfo->ntables, insertRows);
if (interlaceRows > insertRows) if (interlaceRows > insertRows)
interlaceRows = insertRows; interlaceRows = insertRows;
if (interlaceRows > g_args.num_of_RPR) if (interlaceRows > g_args.num_of_RPR)
interlaceRows = g_args.num_of_RPR; interlaceRows = g_args.num_of_RPR;
int progOrInterlace; uint64_t batchPerTbl = interlaceRows;
uint64_t batchPerTblTimes;
if (interlaceRows > 0) { if ((interlaceRows > 0) && (pThreadInfo->ntables > 1)) {
progOrInterlace= INTERLACE_INSERT_MODE; batchPerTblTimes =
g_args.num_of_RPR / interlaceRows;
} else { } else {
progOrInterlace = PROGRESSIVE_INSERT_MODE; batchPerTblTimes = 1;
} }
uint64_t maxSqlLen = superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len;
pThreadInfo->buffer = calloc(maxSqlLen, 1); pThreadInfo->buffer = calloc(maxSqlLen, 1);
if (NULL == pThreadInfo->buffer) { if (NULL == pThreadInfo->buffer) {
errorPrint( "%s() LN%d, Failed to alloc %"PRIu64" Bytes, reason:%s\n", errorPrint( "%s() LN%d, Failed to alloc %"PRIu64" Bytes, reason:%s\n",
...@@ -5267,16 +5330,9 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { ...@@ -5267,16 +5330,9 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
return NULL; return NULL;
} }
char tableName[TSDB_TABLE_NAME_LEN];
pThreadInfo->totalInsertRows = 0; pThreadInfo->totalInsertRows = 0;
pThreadInfo->totalAffectedRows = 0; pThreadInfo->totalAffectedRows = 0;
int64_t nTimeStampStep =
superTblInfo?superTblInfo->timeStampStep:DEFAULT_TIMESTAMP_STEP;
uint64_t insert_interval =
superTblInfo?superTblInfo->insertInterval:g_args.insert_interval;
uint64_t st = 0; uint64_t st = 0;
uint64_t et = UINT64_MAX; uint64_t et = UINT64_MAX;
...@@ -5285,31 +5341,12 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { ...@@ -5285,31 +5341,12 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
uint64_t endTs; uint64_t endTs;
uint64_t tableSeq = pThreadInfo->start_table_from; uint64_t tableSeq = pThreadInfo->start_table_from;
debugPrint("[%d] %s() LN%d: start_table_from=%"PRIu64" ntables=%"PRId64" insertRows=%"PRIu64"\n",
pThreadInfo->threadID, __func__, __LINE__,
pThreadInfo->start_table_from,
pThreadInfo->ntables, insertRows);
int64_t startTime = pThreadInfo->start_time; int64_t startTime = pThreadInfo->start_time;
uint64_t batchPerTbl = interlaceRows;
uint64_t batchPerTblTimes;
if ((interlaceRows > 0) && (pThreadInfo->ntables > 1)) {
batchPerTblTimes =
g_args.num_of_RPR / interlaceRows;
} else {
batchPerTblTimes = 1;
}
uint64_t generatedRecPerTbl = 0; uint64_t generatedRecPerTbl = 0;
bool flagSleep = true; bool flagSleep = true;
uint64_t sleepTimeTotal = 0; uint64_t sleepTimeTotal = 0;
char *strInsertInto = "insert into ";
int nInsertBufLen = strlen(strInsertInto);
while(pThreadInfo->totalInsertRows < pThreadInfo->ntables * insertRows) { while(pThreadInfo->totalInsertRows < pThreadInfo->ntables * insertRows) {
if ((flagSleep) && (insert_interval)) { if ((flagSleep) && (insert_interval)) {
st = taosGetTimestampMs(); st = taosGetTimestampMs();
...@@ -5321,13 +5358,16 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { ...@@ -5321,13 +5358,16 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
char *pstr = pThreadInfo->buffer; char *pstr = pThreadInfo->buffer;
int len = snprintf(pstr, nInsertBufLen + 1, "%s", strInsertInto); int len = snprintf(pstr,
strlen(STR_INSERT_INTO) + 1, "%s", STR_INSERT_INTO);
pstr += len; pstr += len;
remainderBufLen -= len; remainderBufLen -= len;
uint64_t recOfBatch = 0; uint64_t recOfBatch = 0;
for (uint64_t i = 0; i < batchPerTblTimes; i ++) { for (uint64_t i = 0; i < batchPerTblTimes; i ++) {
char tableName[TSDB_TABLE_NAME_LEN];
getTableName(tableName, pThreadInfo, tableSeq); getTableName(tableName, pThreadInfo, tableSeq);
if (0 == strlen(tableName)) { if (0 == strlen(tableName)) {
errorPrint("[%d] %s() LN%d, getTableName return null\n", errorPrint("[%d] %s() LN%d, getTableName return null\n",
...@@ -5337,13 +5377,25 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { ...@@ -5337,13 +5377,25 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
} }
uint64_t oldRemainderLen = remainderBufLen; uint64_t oldRemainderLen = remainderBufLen;
int64_t generated = generateInterlaceDataBuffer(
int64_t generated;
if (superTblInfo) {
generated = generateStbInterlaceData(
superTblInfo,
tableName, batchPerTbl, i, batchPerTblTimes, tableName, batchPerTbl, i, batchPerTblTimes,
tableSeq, tableSeq,
pThreadInfo, pstr, pThreadInfo, pstr,
insertRows, insertRows,
startTime, startTime,
&remainderBufLen); &remainderBufLen);
} else {
generated = generateInterlaceDataWithoutStb(
tableName, batchPerTbl,
tableSeq,
pThreadInfo->db_name, pstr,
insertRows,
&remainderBufLen);
}
debugPrint("[%d] %s() LN%d, generated records is %"PRId64"\n", debugPrint("[%d] %s() LN%d, generated records is %"PRId64"\n",
pThreadInfo->threadID, __func__, __LINE__, generated); pThreadInfo->threadID, __func__, __LINE__, generated);
...@@ -5364,7 +5416,6 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { ...@@ -5364,7 +5416,6 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
pThreadInfo->threadID, __func__, __LINE__, pThreadInfo->threadID, __func__, __LINE__,
batchPerTbl, recOfBatch); batchPerTbl, recOfBatch);
if (progOrInterlace == INTERLACE_INSERT_MODE) {
if (tableSeq == pThreadInfo->start_table_from + pThreadInfo->ntables) { if (tableSeq == pThreadInfo->start_table_from + pThreadInfo->ntables) {
// turn to first table // turn to first table
tableSeq = pThreadInfo->start_table_from; tableSeq = pThreadInfo->start_table_from;
...@@ -5384,7 +5435,6 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { ...@@ -5384,7 +5435,6 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
if (pThreadInfo->ntables * batchPerTbl < g_args.num_of_RPR) if (pThreadInfo->ntables * batchPerTbl < g_args.num_of_RPR)
break; break;
} }
}
verbosePrint("[%d] %s() LN%d generatedRecPerTbl=%"PRId64" insertRows=%"PRId64"\n", verbosePrint("[%d] %s() LN%d generatedRecPerTbl=%"PRId64" insertRows=%"PRId64"\n",
pThreadInfo->threadID, __func__, __LINE__, pThreadInfo->threadID, __func__, __LINE__,
...@@ -5511,18 +5561,28 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { ...@@ -5511,18 +5561,28 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
int64_t remainderBufLen = maxSqlLen; int64_t remainderBufLen = maxSqlLen;
char *pstr = pThreadInfo->buffer; char *pstr = pThreadInfo->buffer;
int nInsertBufLen = strlen("insert into ");
int len = snprintf(pstr, nInsertBufLen + 1, "%s", "insert into "); int len = snprintf(pstr,
strlen(STR_INSERT_INTO) + 1, "%s", STR_INSERT_INTO);
pstr += len; pstr += len;
remainderBufLen -= len; remainderBufLen -= len;
int64_t generated = generateProgressiveDataBuffer( int64_t generated;
if (superTblInfo) {
generated = generateStbProgressiveData(
superTblInfo,
tableName, tableSeq, pThreadInfo->db_name, pstr, insertRows,
i, start_time,
&(pThreadInfo->samplePos),
&remainderBufLen);
} else {
generated = generateProgressiveDataWithoutStb(
tableName, tableSeq, pThreadInfo, pstr, insertRows, tableName, tableSeq, pThreadInfo, pstr, insertRows,
i, start_time, i, start_time,
&(pThreadInfo->samplePos), &(pThreadInfo->samplePos),
&remainderBufLen); &remainderBufLen);
}
if (generated > 0) if (generated > 0)
i += generated; i += generated;
else else
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册