未验证 提交 5ed8e2a2 编写于 作者: sangshuduo's avatar sangshuduo 提交者: GitHub

[TD-5302]<fix>:taosdemo stmt interlace. (#6880)

* [TD-5302]<fix>:taosdemo stmt interlace.

* fix the logic interlace rows compare insert rows.
Co-authored-by: NShuduo Sang <sdsang@taosdata.com>
上级 901b91d2
...@@ -106,9 +106,9 @@ enum TEST_MODE { ...@@ -106,9 +106,9 @@ enum TEST_MODE {
typedef enum CREATE_SUB_TALBE_MOD_EN { typedef enum CREATE_SUB_TALBE_MOD_EN {
PRE_CREATE_SUBTBL, PRE_CREATE_SUBTBL,
AUTO_CREATE_SUBTBL, AUTO_CREATE_SUBTBL,
NO_CREATE_SUBTBL NO_CREATE_SUBTBL
} CREATE_SUB_TALBE_MOD_EN; } CREATE_SUB_TALBE_MOD_EN;
typedef enum TALBE_EXISTS_EN { typedef enum TALBE_EXISTS_EN {
...@@ -637,7 +637,7 @@ static FILE * g_fpOfInsertResult = NULL; ...@@ -637,7 +637,7 @@ static FILE * g_fpOfInsertResult = NULL;
#define performancePrint(fmt, ...) \ #define performancePrint(fmt, ...) \
do { if (g_args.performance_print) \ do { if (g_args.performance_print) \
fprintf(stderr, "VERB: "fmt, __VA_ARGS__); } while(0) fprintf(stderr, "PERF: "fmt, __VA_ARGS__); } while(0)
#define errorPrint(fmt, ...) \ #define errorPrint(fmt, ...) \
do { fprintf(stderr, "ERROR: "fmt, __VA_ARGS__); } while(0) do { fprintf(stderr, "ERROR: "fmt, __VA_ARGS__); } while(0)
...@@ -3183,8 +3183,10 @@ static void createChildTables() { ...@@ -3183,8 +3183,10 @@ static void createChildTables() {
if (g_Dbs.db[i].superTblCount > 0) { if (g_Dbs.db[i].superTblCount > 0) {
// with super table // with super table
for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) {
if ((AUTO_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable) if ((AUTO_CREATE_SUBTBL
|| (TBL_ALREADY_EXISTS == g_Dbs.db[i].superTbls[j].childTblExists)) { == g_Dbs.db[i].superTbls[j].autoCreateTable)
|| (TBL_ALREADY_EXISTS
== g_Dbs.db[i].superTbls[j].childTblExists)) {
continue; continue;
} }
verbosePrint("%s() LN%d: %s\n", __func__, __LINE__, verbosePrint("%s() LN%d: %s\n", __func__, __LINE__,
...@@ -4156,6 +4158,22 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { ...@@ -4156,6 +4158,22 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
goto PARSE_OVER; goto PARSE_OVER;
} }
*/ */
cJSON* insertRows = cJSON_GetObjectItem(stbInfo, "insert_rows");
if (insertRows && insertRows->type == cJSON_Number) {
if (insertRows->valueint < 0) {
errorPrint("%s() LN%d, failed to read json, insert_rows input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
g_Dbs.db[i].superTbls[j].insertRows = insertRows->valueint;
} else if (!insertRows) {
g_Dbs.db[i].superTbls[j].insertRows = 0x7FFFFFFFFFFFFFFF;
} else {
errorPrint("%s() LN%d, failed to read json, insert_rows input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
cJSON* stbInterlaceRows = cJSON_GetObjectItem(stbInfo, "interlace_rows"); cJSON* stbInterlaceRows = cJSON_GetObjectItem(stbInfo, "interlace_rows");
if (stbInterlaceRows && stbInterlaceRows->type == cJSON_Number) { if (stbInterlaceRows && stbInterlaceRows->type == cJSON_Number) {
if (stbInterlaceRows->valueint < 0) { if (stbInterlaceRows->valueint < 0) {
...@@ -4164,15 +4182,15 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { ...@@ -4164,15 +4182,15 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
goto PARSE_OVER; goto PARSE_OVER;
} }
g_Dbs.db[i].superTbls[j].interlaceRows = stbInterlaceRows->valueint; g_Dbs.db[i].superTbls[j].interlaceRows = stbInterlaceRows->valueint;
// rows per table need be less than insert batch
if (g_Dbs.db[i].superTbls[j].interlaceRows > g_args.num_of_RPR) { if (g_Dbs.db[i].superTbls[j].interlaceRows > g_Dbs.db[i].superTbls[j].insertRows) {
printf("NOTICE: db[%d].superTbl[%d]'s interlace rows value %u > num_of_records_per_req %u\n\n", printf("NOTICE: db[%d].superTbl[%d]'s interlace rows value %u > insert_rows %"PRId64"\n\n",
i, j, g_Dbs.db[i].superTbls[j].interlaceRows, i, j, g_Dbs.db[i].superTbls[j].interlaceRows,
g_args.num_of_RPR); g_Dbs.db[i].superTbls[j].insertRows);
printf(" interlace rows value will be set to num_of_records_per_req %u\n\n", printf(" interlace rows value will be set to insert_rows %"PRId64"\n\n",
g_args.num_of_RPR); g_Dbs.db[i].superTbls[j].insertRows);
prompt(); prompt();
g_Dbs.db[i].superTbls[j].interlaceRows = g_args.num_of_RPR; g_Dbs.db[i].superTbls[j].interlaceRows = g_Dbs.db[i].superTbls[j].insertRows;
} }
} else if (!stbInterlaceRows) { } else if (!stbInterlaceRows) {
g_Dbs.db[i].superTbls[j].interlaceRows = 0; // 0 means progressive mode, > 0 mean interlace mode. max value is less or equ num_of_records_per_req g_Dbs.db[i].superTbls[j].interlaceRows = 0; // 0 means progressive mode, > 0 mean interlace mode. max value is less or equ num_of_records_per_req
...@@ -4209,22 +4227,6 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { ...@@ -4209,22 +4227,6 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
goto PARSE_OVER; goto PARSE_OVER;
} }
cJSON* insertRows = cJSON_GetObjectItem(stbInfo, "insert_rows");
if (insertRows && insertRows->type == cJSON_Number) {
if (insertRows->valueint < 0) {
errorPrint("%s() LN%d, failed to read json, insert_rows input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
g_Dbs.db[i].superTbls[j].insertRows = insertRows->valueint;
} else if (!insertRows) {
g_Dbs.db[i].superTbls[j].insertRows = 0x7FFFFFFFFFFFFFFF;
} else {
errorPrint("%s() LN%d, failed to read json, insert_rows input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
cJSON* insertInterval = cJSON_GetObjectItem(stbInfo, "insert_interval"); cJSON* insertInterval = cJSON_GetObjectItem(stbInfo, "insert_interval");
if (insertInterval && insertInterval->type == cJSON_Number) { if (insertInterval && insertInterval->type == cJSON_Number) {
g_Dbs.db[i].superTbls[j].insertInterval = insertInterval->valueint; g_Dbs.db[i].superTbls[j].insertInterval = insertInterval->valueint;
...@@ -5143,12 +5145,18 @@ static int32_t generateDataTailWithoutStb( ...@@ -5143,12 +5145,18 @@ static int32_t generateDataTailWithoutStb(
char **data_type = g_args.datatype; char **data_type = g_args.datatype;
int lenOfBinary = g_args.len_of_binary; int lenOfBinary = g_args.len_of_binary;
retLen = generateData(data, data_type, if (g_args.disorderRatio) {
startTime + getTSRandTail( retLen = generateData(data, data_type,
(int64_t) DEFAULT_TIMESTAMP_STEP, k, startTime + getTSRandTail(
g_args.disorderRatio, (int64_t) DEFAULT_TIMESTAMP_STEP, k,
g_args.disorderRange), g_args.disorderRatio,
lenOfBinary); g_args.disorderRange),
lenOfBinary);
} else {
retLen = generateData(data, data_type,
startTime + (int64_t) (DEFAULT_TIMESTAMP_STEP* k),
lenOfBinary);
}
if (len > remainderBufLen) if (len > remainderBufLen)
break; break;
...@@ -5200,14 +5208,14 @@ static int32_t generateStbDataTail( ...@@ -5200,14 +5208,14 @@ static int32_t generateStbDataTail(
bool tsRand; bool tsRand;
if (0 == strncasecmp(superTblInfo->dataSource, "rand", strlen("rand"))) { if (0 == strncasecmp(superTblInfo->dataSource, "rand", strlen("rand"))) {
tsRand = true; tsRand = true;
} else { } else {
tsRand = false; tsRand = false;
} }
verbosePrint("%s() LN%d batch=%u buflen=%"PRId64"\n", verbosePrint("%s() LN%d batch=%u buflen=%"PRId64"\n",
__func__, __LINE__, batch, remainderBufLen); __func__, __LINE__, batch, remainderBufLen);
int32_t k = 0; int32_t k;
for (k = 0; k < batch;) { for (k = 0; k < batch;) {
char data[MAX_DATA_SIZE]; char data[MAX_DATA_SIZE];
memset(data, 0, MAX_DATA_SIZE); memset(data, 0, MAX_DATA_SIZE);
...@@ -5656,10 +5664,15 @@ static int32_t prepareStmtWithoutStb( ...@@ -5656,10 +5664,15 @@ static int32_t prepareStmtWithoutStb(
bind_ts = (int64_t *)ptr; bind_ts = (int64_t *)ptr;
bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP; bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
*bind_ts = startTime + getTSRandTail(
(int64_t)DEFAULT_TIMESTAMP_STEP, k, if (g_args.disorderRatio) {
g_args.disorderRatio, *bind_ts = startTime + getTSRandTail(
g_args.disorderRange); (int64_t)DEFAULT_TIMESTAMP_STEP, k,
g_args.disorderRatio,
g_args.disorderRange);
} else {
*bind_ts = startTime + (int64_t)(DEFAULT_TIMESTAMP_STEP * k);
}
bind->buffer_length = sizeof(int64_t); bind->buffer_length = sizeof(int64_t);
bind->buffer = bind_ts; bind->buffer = bind_ts;
bind->length = &bind->buffer_length; bind->length = &bind->buffer_length;
...@@ -5744,7 +5757,7 @@ static int32_t prepareStbStmt( ...@@ -5744,7 +5757,7 @@ static int32_t prepareStbStmt(
bind_ts = (int64_t *)ptr; bind_ts = (int64_t *)ptr;
bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP; bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
if (sourceRand) { if (stbInfo->disorderRatio) {
*bind_ts = startTime + getTSRandTail( *bind_ts = startTime + getTSRandTail(
stbInfo->timeStampStep, k, stbInfo->timeStampStep, k,
stbInfo->disorderRatio, stbInfo->disorderRatio,
...@@ -5811,6 +5824,7 @@ static int32_t prepareStbStmt( ...@@ -5811,6 +5824,7 @@ static int32_t prepareStbStmt(
if (!sourceRand) { if (!sourceRand) {
(*pSamplePos) ++; (*pSamplePos) ++;
} }
if (recordFrom >= insertRows) { if (recordFrom >= insertRows) {
break; break;
} }
...@@ -5820,6 +5834,43 @@ static int32_t prepareStbStmt( ...@@ -5820,6 +5834,43 @@ static int32_t prepareStbStmt(
free(bindArray); free(bindArray);
return k; return k;
} }
static int32_t prepareStbStmtInterlace(
SSuperTable *stbInfo,
TAOS_STMT *stmt,
char *tableName, uint32_t batch,
uint64_t insertRows,
uint64_t recordFrom,
int64_t startTime,
int64_t *pSamplePos)
{
return prepareStbStmt(
stbInfo,
stmt,
tableName,
g_args.num_of_RPR,
insertRows, 0, startTime,
pSamplePos);
}
static int32_t prepareStbStmtProgressive(
SSuperTable *stbInfo,
TAOS_STMT *stmt,
char *tableName, uint32_t batch,
uint64_t insertRows,
uint64_t recordFrom,
int64_t startTime,
int64_t *pSamplePos)
{
return prepareStbStmt(
stbInfo,
stmt,
tableName,
g_args.num_of_RPR,
insertRows, recordFrom, startTime,
pSamplePos);
}
#endif #endif
static int32_t generateStbProgressiveData( static int32_t generateStbProgressiveData(
...@@ -5992,7 +6043,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { ...@@ -5992,7 +6043,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
uint32_t recOfBatch = 0; uint32_t recOfBatch = 0;
for (uint32_t i = 0; i < batchPerTblTimes; i ++) { for (uint64_t i = 0; i < batchPerTblTimes; i ++) {
char tableName[TSDB_TABLE_NAME_LEN]; char tableName[TSDB_TABLE_NAME_LEN];
getTableName(tableName, pThreadInfo, tableSeq); getTableName(tableName, pThreadInfo, tableSeq);
...@@ -6009,7 +6060,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { ...@@ -6009,7 +6060,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
if (superTblInfo) { if (superTblInfo) {
if (superTblInfo->iface == STMT_IFACE) { if (superTblInfo->iface == STMT_IFACE) {
#if STMT_IFACE_ENABLED == 1 #if STMT_IFACE_ENABLED == 1
generated = prepareStbStmt( generated = prepareStbStmtInterlace(
superTblInfo, superTblInfo,
pThreadInfo->stmt, pThreadInfo->stmt,
tableName, tableName,
...@@ -6238,7 +6289,7 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { ...@@ -6238,7 +6289,7 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
if (superTblInfo) { if (superTblInfo) {
if (superTblInfo->iface == STMT_IFACE) { if (superTblInfo->iface == STMT_IFACE) {
#if STMT_IFACE_ENABLED == 1 #if STMT_IFACE_ENABLED == 1
generated = prepareStbStmt( generated = prepareStbStmtProgressive(
superTblInfo, superTblInfo,
pThreadInfo->stmt, pThreadInfo->stmt,
tableName, tableName,
...@@ -6668,13 +6719,24 @@ static void startMultiThreadInsertData(int threads, char* db_name, ...@@ -6668,13 +6719,24 @@ static void startMultiThreadInsertData(int threads, char* db_name,
char buffer[3000]; char buffer[3000];
char *pstr = buffer; char *pstr = buffer;
pstr += sprintf(pstr, "INSERT INTO ? values(?");
if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) {
pstr += sprintf(pstr, "INSERT INTO ? USING %s TAGS(?",
superTblInfo->sTblName);
for (int tag = 0; tag < (superTblInfo->tagCount - 1); tag ++ ) {
pstr += sprintf(pstr, ",?");
}
pstr += sprintf(pstr, ") VALUES(?");
} else {
pstr += sprintf(pstr, "INSERT INTO ? VALUES(?");
}
for (int col = 0; col < columnCount; col ++) { for (int col = 0; col < columnCount; col ++) {
pstr += sprintf(pstr, ",?"); pstr += sprintf(pstr, ",?");
} }
pstr += sprintf(pstr, ")"); pstr += sprintf(pstr, ")");
debugPrint("%s() LN%d, buffer: %s", __func__, __LINE__, buffer);
int ret = taos_stmt_prepare(pThreadInfo->stmt, buffer, 0); int ret = taos_stmt_prepare(pThreadInfo->stmt, buffer, 0);
if (ret != 0){ if (ret != 0){
errorPrint("failed to execute taos_stmt_prepare. return 0x%x. reason: %s\n", errorPrint("failed to execute taos_stmt_prepare. return 0x%x. reason: %s\n",
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册