未验证 提交 5704a04d 编写于 作者: sangshuduo's avatar sangshuduo 提交者: GitHub

[TD-5872]<fix>: taosdemo stmt improve. (#7251)

* [TD-5872]<fix>: taosdemo stmt improve.

* refactor stmt functions.

* [TD-5872]<fix>: taosdemo stmt csv perf improve.

* rand func back to early impl.

* fix windows/mac compile error.

* fix empty tag sample.

* [TD-5873]<test>add stmt’performance taosdemo testcase
Co-authored-by: NShuduo Sang <sdsang@taosdata.com>
Co-authored-by: haoranc's avatartomchon <haoran920c@163.com>
上级 29141d71
...@@ -295,6 +295,9 @@ typedef struct SSuperTable_S { ...@@ -295,6 +295,9 @@ typedef struct SSuperTable_S {
uint64_t lenOfTagOfOneRow; uint64_t lenOfTagOfOneRow;
char* sampleDataBuf; char* sampleDataBuf;
#if STMT_IFACE_ENABLED == 1
char* sampleBindArray;
#endif
//int sampleRowCount; //int sampleRowCount;
//int sampleUsePos; //int sampleUsePos;
...@@ -441,6 +444,7 @@ typedef struct SQueryMetaInfo_S { ...@@ -441,6 +444,7 @@ typedef struct SQueryMetaInfo_S {
typedef struct SThreadInfo_S { typedef struct SThreadInfo_S {
TAOS * taos; TAOS * taos;
TAOS_STMT *stmt; TAOS_STMT *stmt;
int64_t *bind_ts;
int threadID; int threadID;
char db_name[TSDB_DB_NAME_LEN]; char db_name[TSDB_DB_NAME_LEN];
uint32_t time_precision; uint32_t time_precision;
...@@ -454,7 +458,7 @@ typedef struct SThreadInfo_S { ...@@ -454,7 +458,7 @@ typedef struct SThreadInfo_S {
int64_t start_time; int64_t start_time;
char* cols; char* cols;
bool use_metric; bool use_metric;
SSuperTable* superTblInfo; SSuperTable* stbInfo;
char *buffer; // sql cmd buffer char *buffer; // sql cmd buffer
// for async insert // for async insert
...@@ -674,7 +678,7 @@ static FILE * g_fpOfInsertResult = NULL; ...@@ -674,7 +678,7 @@ static FILE * g_fpOfInsertResult = NULL;
/////////////////////////////////////////////////// ///////////////////////////////////////////////////
static void ERROR_EXIT(const char *msg) { perror(msg); exit(-1); } static void ERROR_EXIT(const char *msg) { errorPrint("%s", msg); exit(-1); }
#ifndef TAOSDEMO_COMMIT_SHA1 #ifndef TAOSDEMO_COMMIT_SHA1
#define TAOSDEMO_COMMIT_SHA1 "unknown" #define TAOSDEMO_COMMIT_SHA1 "unknown"
...@@ -1136,8 +1140,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) { ...@@ -1136,8 +1140,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
} }
if (0 == columnCount) { if (0 == columnCount) {
perror("data type error!"); ERROR_EXIT("data type error!");
exit(-1);
} }
g_args.num_of_CPR = columnCount; g_args.num_of_CPR = columnCount;
...@@ -2425,14 +2428,14 @@ static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port ...@@ -2425,14 +2428,14 @@ static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port
#endif #endif
debugPrint("%s() LN%d, sockfd=%d\n", __func__, __LINE__, sockfd); debugPrint("%s() LN%d, sockfd=%d\n", __func__, __LINE__, sockfd);
free(request_buf); free(request_buf);
ERROR_EXIT("ERROR opening socket"); ERROR_EXIT("opening socket");
} }
int retConn = connect(sockfd, (struct sockaddr *)pServAddr, sizeof(struct sockaddr)); int retConn = connect(sockfd, (struct sockaddr *)pServAddr, sizeof(struct sockaddr));
debugPrint("%s() LN%d connect() return %d\n", __func__, __LINE__, retConn); debugPrint("%s() LN%d connect() return %d\n", __func__, __LINE__, retConn);
if (retConn < 0) { if (retConn < 0) {
free(request_buf); free(request_buf);
ERROR_EXIT("ERROR connecting"); ERROR_EXIT("connecting");
} }
memset(base64_buf, 0, INPUT_BUF_LEN); memset(base64_buf, 0, INPUT_BUF_LEN);
...@@ -2465,7 +2468,7 @@ static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port ...@@ -2465,7 +2468,7 @@ static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port
auth, strlen(sqlstr), sqlstr); auth, strlen(sqlstr), sqlstr);
if (r >= req_buf_len) { if (r >= req_buf_len) {
free(request_buf); free(request_buf);
ERROR_EXIT("ERROR too long request"); ERROR_EXIT("too long request");
} }
verbosePrint("%s() LN%d: Request:\n%s\n", __func__, __LINE__, request_buf); verbosePrint("%s() LN%d: Request:\n%s\n", __func__, __LINE__, request_buf);
...@@ -2478,7 +2481,7 @@ static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port ...@@ -2478,7 +2481,7 @@ static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port
bytes = write(sockfd, request_buf + sent, req_str_len - sent); bytes = write(sockfd, request_buf + sent, req_str_len - sent);
#endif #endif
if (bytes < 0) if (bytes < 0)
ERROR_EXIT("ERROR writing message to socket"); ERROR_EXIT("writing message to socket");
if (bytes == 0) if (bytes == 0)
break; break;
sent+=bytes; sent+=bytes;
...@@ -2495,7 +2498,7 @@ static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port ...@@ -2495,7 +2498,7 @@ static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port
#endif #endif
if (bytes < 0) { if (bytes < 0) {
free(request_buf); free(request_buf);
ERROR_EXIT("ERROR reading response from socket"); ERROR_EXIT("reading response from socket");
} }
if (bytes == 0) if (bytes == 0)
break; break;
...@@ -2504,7 +2507,7 @@ static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port ...@@ -2504,7 +2507,7 @@ static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port
if (received == resp_len) { if (received == resp_len) {
free(request_buf); free(request_buf);
ERROR_EXIT("ERROR storing complete response from socket"); ERROR_EXIT("storing complete response from socket");
} }
response_buf[RESP_BUF_LEN - 1] = '\0'; response_buf[RESP_BUF_LEN - 1] = '\0';
...@@ -2667,8 +2670,8 @@ static int calcRowLen(SSuperTable* superTbls) { ...@@ -2667,8 +2670,8 @@ static int calcRowLen(SSuperTable* superTbls) {
} else if (strcasecmp(dataType, "TIMESTAMP") == 0) { } else if (strcasecmp(dataType, "TIMESTAMP") == 0) {
lenOfOneRow += TIMESTAMP_BUFF_LEN; lenOfOneRow += TIMESTAMP_BUFF_LEN;
} else { } else {
printf("get error data type : %s\n", dataType); errorPrint("get error data type : %s\n", dataType);
exit(-1); exit(EXIT_FAILURE);
} }
} }
...@@ -2698,8 +2701,8 @@ static int calcRowLen(SSuperTable* superTbls) { ...@@ -2698,8 +2701,8 @@ static int calcRowLen(SSuperTable* superTbls) {
} else if (strcasecmp(dataType, "DOUBLE") == 0) { } else if (strcasecmp(dataType, "DOUBLE") == 0) {
lenOfTagOfOneRow += superTbls->tags[tagIndex].dataLen + DOUBLE_BUFF_LEN; lenOfTagOfOneRow += superTbls->tags[tagIndex].dataLen + DOUBLE_BUFF_LEN;
} else { } else {
printf("get error tag type : %s\n", dataType); errorPrint("get error tag type : %s\n", dataType);
exit(-1); exit(EXIT_FAILURE);
} }
} }
...@@ -2737,7 +2740,7 @@ static int getChildNameOfSuperTableWithLimitAndOffset(TAOS * taos, ...@@ -2737,7 +2740,7 @@ static int getChildNameOfSuperTableWithLimitAndOffset(TAOS * taos,
taos_close(taos); taos_close(taos);
errorPrint("%s() LN%d, failed to run command %s\n", errorPrint("%s() LN%d, failed to run command %s\n",
__func__, __LINE__, command); __func__, __LINE__, command);
exit(-1); exit(EXIT_FAILURE);
} }
int64_t childTblCount = (limit < 0)?10000:limit; int64_t childTblCount = (limit < 0)?10000:limit;
...@@ -2748,7 +2751,7 @@ static int getChildNameOfSuperTableWithLimitAndOffset(TAOS * taos, ...@@ -2748,7 +2751,7 @@ static int getChildNameOfSuperTableWithLimitAndOffset(TAOS * taos,
taos_free_result(res); taos_free_result(res);
taos_close(taos); taos_close(taos);
errorPrint("%s() LN%d, failed to allocate memory!\n", __func__, __LINE__); errorPrint("%s() LN%d, failed to allocate memory!\n", __func__, __LINE__);
exit(-1); exit(EXIT_FAILURE);
} }
} }
...@@ -2759,7 +2762,7 @@ static int getChildNameOfSuperTableWithLimitAndOffset(TAOS * taos, ...@@ -2759,7 +2762,7 @@ static int getChildNameOfSuperTableWithLimitAndOffset(TAOS * taos,
if (0 == strlen((char *)row[0])) { if (0 == strlen((char *)row[0])) {
errorPrint("%s() LN%d, No.%"PRId64" table return empty name\n", errorPrint("%s() LN%d, No.%"PRId64" table return empty name\n",
__func__, __LINE__, count); __func__, __LINE__, count);
exit(-1); exit(EXIT_FAILURE);
} }
tstrncpy(pTblName, (char *)row[0], len[0]+1); tstrncpy(pTblName, (char *)row[0], len[0]+1);
...@@ -2775,12 +2778,12 @@ static int getChildNameOfSuperTableWithLimitAndOffset(TAOS * taos, ...@@ -2775,12 +2778,12 @@ static int getChildNameOfSuperTableWithLimitAndOffset(TAOS * taos,
(size_t)((childTblCount-count)*TSDB_TABLE_NAME_LEN)); (size_t)((childTblCount-count)*TSDB_TABLE_NAME_LEN));
} else { } else {
// exit, if allocate more memory failed // exit, if allocate more memory failed
errorPrint("%s() LN%d, realloc fail for save child table name of %s.%s\n",
__func__, __LINE__, dbName, sTblName);
tmfree(childTblName); tmfree(childTblName);
taos_free_result(res); taos_free_result(res);
taos_close(taos); taos_close(taos);
exit(-1); errorPrint("%s() LN%d, realloc fail for save child table name of %s.%s\n",
__func__, __LINE__, dbName, sTblName);
exit(EXIT_FAILURE);
} }
} }
pTblName = childTblName + count * TSDB_TABLE_NAME_LEN; pTblName = childTblName + count * TSDB_TABLE_NAME_LEN;
...@@ -2964,10 +2967,10 @@ static int createSuperTable( ...@@ -2964,10 +2967,10 @@ static int createSuperTable(
lenOfOneRow += TIMESTAMP_BUFF_LEN; lenOfOneRow += TIMESTAMP_BUFF_LEN;
} else { } else {
taos_close(taos); taos_close(taos);
free(command);
errorPrint("%s() LN%d, config error data type : %s\n", errorPrint("%s() LN%d, config error data type : %s\n",
__func__, __LINE__, dataType); __func__, __LINE__, dataType);
free(command); exit(EXIT_FAILURE);
exit(-1);
} }
} }
...@@ -2976,11 +2979,11 @@ static int createSuperTable( ...@@ -2976,11 +2979,11 @@ static int createSuperTable(
// save for creating child table // save for creating child table
superTbl->colsOfCreateChildTable = (char*)calloc(len+20, 1); superTbl->colsOfCreateChildTable = (char*)calloc(len+20, 1);
if (NULL == superTbl->colsOfCreateChildTable) { if (NULL == superTbl->colsOfCreateChildTable) {
errorPrint("%s() LN%d, Failed when calloc, size:%d",
__func__, __LINE__, len+1);
taos_close(taos); taos_close(taos);
free(command); free(command);
exit(-1); errorPrint("%s() LN%d, Failed when calloc, size:%d",
__func__, __LINE__, len+1);
exit(EXIT_FAILURE);
} }
snprintf(superTbl->colsOfCreateChildTable, len+20, "(ts timestamp%s)", cols); snprintf(superTbl->colsOfCreateChildTable, len+20, "(ts timestamp%s)", cols);
...@@ -3054,10 +3057,10 @@ static int createSuperTable( ...@@ -3054,10 +3057,10 @@ static int createSuperTable(
lenOfTagOfOneRow += superTbl->tags[tagIndex].dataLen + DOUBLE_BUFF_LEN; lenOfTagOfOneRow += superTbl->tags[tagIndex].dataLen + DOUBLE_BUFF_LEN;
} else { } else {
taos_close(taos); taos_close(taos);
free(command);
errorPrint("%s() LN%d, config error tag type : %s\n", errorPrint("%s() LN%d, config error tag type : %s\n",
__func__, __LINE__, dataType); __func__, __LINE__, dataType);
free(command); exit(EXIT_FAILURE);
exit(-1);
} }
} }
...@@ -3100,35 +3103,43 @@ int createDatabasesAndStables(char *command) { ...@@ -3100,35 +3103,43 @@ int createDatabasesAndStables(char *command) {
int dataLen = 0; int dataLen = 0;
dataLen += snprintf(command + dataLen, dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, "create database if not exists %s", g_Dbs.db[i].dbName); BUFFER_SIZE - dataLen, "create database if not exists %s",
g_Dbs.db[i].dbName);
if (g_Dbs.db[i].dbCfg.blocks > 0) { if (g_Dbs.db[i].dbCfg.blocks > 0) {
dataLen += snprintf(command + dataLen, dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " blocks %d", g_Dbs.db[i].dbCfg.blocks); BUFFER_SIZE - dataLen, " blocks %d",
g_Dbs.db[i].dbCfg.blocks);
} }
if (g_Dbs.db[i].dbCfg.cache > 0) { if (g_Dbs.db[i].dbCfg.cache > 0) {
dataLen += snprintf(command + dataLen, dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " cache %d", g_Dbs.db[i].dbCfg.cache); BUFFER_SIZE - dataLen, " cache %d",
g_Dbs.db[i].dbCfg.cache);
} }
if (g_Dbs.db[i].dbCfg.days > 0) { if (g_Dbs.db[i].dbCfg.days > 0) {
dataLen += snprintf(command + dataLen, dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " days %d", g_Dbs.db[i].dbCfg.days); BUFFER_SIZE - dataLen, " days %d",
g_Dbs.db[i].dbCfg.days);
} }
if (g_Dbs.db[i].dbCfg.keep > 0) { if (g_Dbs.db[i].dbCfg.keep > 0) {
dataLen += snprintf(command + dataLen, dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " keep %d", g_Dbs.db[i].dbCfg.keep); BUFFER_SIZE - dataLen, " keep %d",
g_Dbs.db[i].dbCfg.keep);
} }
if (g_Dbs.db[i].dbCfg.quorum > 1) { if (g_Dbs.db[i].dbCfg.quorum > 1) {
dataLen += snprintf(command + dataLen, dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " quorum %d", g_Dbs.db[i].dbCfg.quorum); BUFFER_SIZE - dataLen, " quorum %d",
g_Dbs.db[i].dbCfg.quorum);
} }
if (g_Dbs.db[i].dbCfg.replica > 0) { if (g_Dbs.db[i].dbCfg.replica > 0) {
dataLen += snprintf(command + dataLen, dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " replica %d", g_Dbs.db[i].dbCfg.replica); BUFFER_SIZE - dataLen, " replica %d",
g_Dbs.db[i].dbCfg.replica);
} }
if (g_Dbs.db[i].dbCfg.update > 0) { if (g_Dbs.db[i].dbCfg.update > 0) {
dataLen += snprintf(command + dataLen, dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " update %d", g_Dbs.db[i].dbCfg.update); BUFFER_SIZE - dataLen, " update %d",
g_Dbs.db[i].dbCfg.update);
} }
//if (g_Dbs.db[i].dbCfg.maxtablesPerVnode > 0) { //if (g_Dbs.db[i].dbCfg.maxtablesPerVnode > 0) {
// dataLen += snprintf(command + dataLen, // dataLen += snprintf(command + dataLen,
...@@ -3136,42 +3147,48 @@ int createDatabasesAndStables(char *command) { ...@@ -3136,42 +3147,48 @@ int createDatabasesAndStables(char *command) {
//} //}
if (g_Dbs.db[i].dbCfg.minRows > 0) { if (g_Dbs.db[i].dbCfg.minRows > 0) {
dataLen += snprintf(command + dataLen, dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " minrows %d", g_Dbs.db[i].dbCfg.minRows); BUFFER_SIZE - dataLen, " minrows %d",
g_Dbs.db[i].dbCfg.minRows);
} }
if (g_Dbs.db[i].dbCfg.maxRows > 0) { if (g_Dbs.db[i].dbCfg.maxRows > 0) {
dataLen += snprintf(command + dataLen, dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " maxrows %d", g_Dbs.db[i].dbCfg.maxRows); BUFFER_SIZE - dataLen, " maxrows %d",
g_Dbs.db[i].dbCfg.maxRows);
} }
if (g_Dbs.db[i].dbCfg.comp > 0) { if (g_Dbs.db[i].dbCfg.comp > 0) {
dataLen += snprintf(command + dataLen, dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " comp %d", g_Dbs.db[i].dbCfg.comp); BUFFER_SIZE - dataLen, " comp %d",
g_Dbs.db[i].dbCfg.comp);
} }
if (g_Dbs.db[i].dbCfg.walLevel > 0) { if (g_Dbs.db[i].dbCfg.walLevel > 0) {
dataLen += snprintf(command + dataLen, dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " wal %d", g_Dbs.db[i].dbCfg.walLevel); BUFFER_SIZE - dataLen, " wal %d",
g_Dbs.db[i].dbCfg.walLevel);
} }
if (g_Dbs.db[i].dbCfg.cacheLast > 0) { if (g_Dbs.db[i].dbCfg.cacheLast > 0) {
dataLen += snprintf(command + dataLen, dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " cachelast %d", g_Dbs.db[i].dbCfg.cacheLast); BUFFER_SIZE - dataLen, " cachelast %d",
g_Dbs.db[i].dbCfg.cacheLast);
} }
if (g_Dbs.db[i].dbCfg.fsync > 0) { if (g_Dbs.db[i].dbCfg.fsync > 0) {
dataLen += snprintf(command + dataLen, BUFFER_SIZE - dataLen, dataLen += snprintf(command + dataLen, BUFFER_SIZE - dataLen,
" fsync %d", g_Dbs.db[i].dbCfg.fsync); " fsync %d", g_Dbs.db[i].dbCfg.fsync);
} }
if ((0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "ms", strlen("ms"))) if ((0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "ms", 2))
#if NANO_SECOND_ENABLED == 1 #if NANO_SECOND_ENABLED == 1
|| (0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, || (0 == strncasecmp(g_Dbs.db[i].dbCfg.precision,
"ns", strlen("ns"))) "ns", 2))
#endif #endif
|| (0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, || (0 == strncasecmp(g_Dbs.db[i].dbCfg.precision,
"us", strlen("us")))) { "us", 2))) {
dataLen += snprintf(command + dataLen, BUFFER_SIZE - dataLen, dataLen += snprintf(command + dataLen, BUFFER_SIZE - dataLen,
" precision \'%s\';", g_Dbs.db[i].dbCfg.precision); " precision \'%s\';", g_Dbs.db[i].dbCfg.precision);
} }
if (0 != queryDbExec(taos, command, NO_INSERT_TYPE, false)) { if (0 != queryDbExec(taos, command, NO_INSERT_TYPE, false)) {
taos_close(taos); taos_close(taos);
errorPrint( "\ncreate database %s failed!\n\n", g_Dbs.db[i].dbName); errorPrint( "\ncreate database %s failed!\n\n",
g_Dbs.db[i].dbName);
return -1; return -1;
} }
printf("\ncreate database %s success!\n\n", g_Dbs.db[i].dbName); printf("\ncreate database %s success!\n\n", g_Dbs.db[i].dbName);
...@@ -3218,7 +3235,7 @@ int createDatabasesAndStables(char *command) { ...@@ -3218,7 +3235,7 @@ int createDatabasesAndStables(char *command) {
static void* createTable(void *sarg) static void* createTable(void *sarg)
{ {
threadInfo *pThreadInfo = (threadInfo *)sarg; threadInfo *pThreadInfo = (threadInfo *)sarg;
SSuperTable* superTblInfo = pThreadInfo->superTblInfo; SSuperTable* stbInfo = pThreadInfo->stbInfo;
setThreadName("createTable"); setThreadName("createTable");
...@@ -3229,7 +3246,7 @@ static void* createTable(void *sarg) ...@@ -3229,7 +3246,7 @@ static void* createTable(void *sarg)
pThreadInfo->buffer = calloc(buff_len, 1); pThreadInfo->buffer = calloc(buff_len, 1);
if (pThreadInfo->buffer == NULL) { if (pThreadInfo->buffer == NULL) {
errorPrint("%s() LN%d, Memory allocated failed!\n", __func__, __LINE__); errorPrint("%s() LN%d, Memory allocated failed!\n", __func__, __LINE__);
exit(-1); exit(EXIT_FAILURE);
} }
int len = 0; int len = 0;
...@@ -3248,11 +3265,11 @@ static void* createTable(void *sarg) ...@@ -3248,11 +3265,11 @@ static void* createTable(void *sarg)
g_args.tb_prefix, i, g_args.tb_prefix, i,
pThreadInfo->cols); pThreadInfo->cols);
} else { } else {
if (superTblInfo == NULL) { if (stbInfo == NULL) {
free(pThreadInfo->buffer);
errorPrint("%s() LN%d, use metric, but super table info is NULL\n", errorPrint("%s() LN%d, use metric, but super table info is NULL\n",
__func__, __LINE__); __func__, __LINE__);
free(pThreadInfo->buffer); exit(EXIT_FAILURE);
exit(-1);
} else { } else {
if (0 == len) { if (0 == len) {
batchNum = 0; batchNum = 0;
...@@ -3260,29 +3277,35 @@ static void* createTable(void *sarg) ...@@ -3260,29 +3277,35 @@ static void* createTable(void *sarg)
len += snprintf(pThreadInfo->buffer + len, len += snprintf(pThreadInfo->buffer + len,
buff_len - len, "create table "); buff_len - len, "create table ");
} }
char* tagsValBuf = NULL; char* tagsValBuf = NULL;
if (0 == superTblInfo->tagSource) { if (0 == stbInfo->tagSource) {
tagsValBuf = generateTagValuesForStb(superTblInfo, i); tagsValBuf = generateTagValuesForStb(stbInfo, i);
} else { } else {
if (0 == stbInfo->tagSampleCount) {
free(pThreadInfo->buffer);
ERROR_EXIT("use sample file for tag, but has no content!\n");
}
tagsValBuf = getTagValueFromTagSample( tagsValBuf = getTagValueFromTagSample(
superTblInfo, stbInfo,
i % superTblInfo->tagSampleCount); i % stbInfo->tagSampleCount);
} }
if (NULL == tagsValBuf) { if (NULL == tagsValBuf) {
free(pThreadInfo->buffer); free(pThreadInfo->buffer);
return NULL; ERROR_EXIT("use metric, but tag buffer is NULL\n");
} }
len += snprintf(pThreadInfo->buffer + len, len += snprintf(pThreadInfo->buffer + len,
buff_len - len, buff_len - len,
"if not exists %s.%s%"PRIu64" using %s.%s tags %s ", "if not exists %s.%s%"PRIu64" using %s.%s tags %s ",
pThreadInfo->db_name, superTblInfo->childTblPrefix, pThreadInfo->db_name, stbInfo->childTblPrefix,
i, pThreadInfo->db_name, i, pThreadInfo->db_name,
superTblInfo->sTblName, tagsValBuf); stbInfo->sTblName, tagsValBuf);
free(tagsValBuf); free(tagsValBuf);
batchNum++; batchNum++;
if ((batchNum < superTblInfo->batchCreateTableNum) if ((batchNum < stbInfo->batchCreateTableNum)
&& ((buff_len - len) && ((buff_len - len)
>= (superTblInfo->lenOfTagOfOneRow + 256))) { >= (stbInfo->lenOfTagOfOneRow + 256))) {
continue; continue;
} }
} }
...@@ -3317,14 +3340,13 @@ static void* createTable(void *sarg) ...@@ -3317,14 +3340,13 @@ static void* createTable(void *sarg)
static int startMultiThreadCreateChildTable( static int startMultiThreadCreateChildTable(
char* cols, int threads, uint64_t tableFrom, int64_t ntables, char* cols, int threads, uint64_t tableFrom, int64_t ntables,
char* db_name, SSuperTable* superTblInfo) { char* db_name, SSuperTable* stbInfo) {
pthread_t *pids = calloc(1, threads * sizeof(pthread_t)); pthread_t *pids = calloc(1, threads * sizeof(pthread_t));
threadInfo *infos = calloc(1, threads * sizeof(threadInfo)); threadInfo *infos = calloc(1, threads * sizeof(threadInfo));
if ((NULL == pids) || (NULL == infos)) { if ((NULL == pids) || (NULL == infos)) {
printf("malloc failed\n"); ERROR_EXIT("createChildTable malloc failed\n");
exit(-1);
} }
if (threads < 1) { if (threads < 1) {
...@@ -3344,7 +3366,7 @@ static int startMultiThreadCreateChildTable( ...@@ -3344,7 +3366,7 @@ static int startMultiThreadCreateChildTable(
threadInfo *pThreadInfo = infos + i; threadInfo *pThreadInfo = infos + i;
pThreadInfo->threadID = i; pThreadInfo->threadID = i;
tstrncpy(pThreadInfo->db_name, db_name, TSDB_DB_NAME_LEN); tstrncpy(pThreadInfo->db_name, db_name, TSDB_DB_NAME_LEN);
pThreadInfo->superTblInfo = superTblInfo; pThreadInfo->stbInfo = stbInfo;
verbosePrint("%s() %d db_name: %s\n", __func__, __LINE__, db_name); verbosePrint("%s() %d db_name: %s\n", __func__, __LINE__, db_name);
pThreadInfo->taos = taos_connect( pThreadInfo->taos = taos_connect(
g_Dbs.host, g_Dbs.host,
...@@ -3451,26 +3473,26 @@ static void createChildTables() { ...@@ -3451,26 +3473,26 @@ static void createChildTables() {
/* /*
Read 10000 lines at most. If more than 10000 lines, continue to read after using Read 10000 lines at most. If more than 10000 lines, continue to read after using
*/ */
static int readTagFromCsvFileToMem(SSuperTable * superTblInfo) { static int readTagFromCsvFileToMem(SSuperTable * stbInfo) {
size_t n = 0; size_t n = 0;
ssize_t readLen = 0; ssize_t readLen = 0;
char * line = NULL; char * line = NULL;
FILE *fp = fopen(superTblInfo->tagsFile, "r"); FILE *fp = fopen(stbInfo->tagsFile, "r");
if (fp == NULL) { if (fp == NULL) {
printf("Failed to open tags file: %s, reason:%s\n", printf("Failed to open tags file: %s, reason:%s\n",
superTblInfo->tagsFile, strerror(errno)); stbInfo->tagsFile, strerror(errno));
return -1; return -1;
} }
if (superTblInfo->tagDataBuf) { if (stbInfo->tagDataBuf) {
free(superTblInfo->tagDataBuf); free(stbInfo->tagDataBuf);
superTblInfo->tagDataBuf = NULL; stbInfo->tagDataBuf = NULL;
} }
int tagCount = 10000; int tagCount = 10000;
int count = 0; int count = 0;
char* tagDataBuf = calloc(1, superTblInfo->lenOfTagOfOneRow * tagCount); char* tagDataBuf = calloc(1, stbInfo->lenOfTagOfOneRow * tagCount);
if (tagDataBuf == NULL) { if (tagDataBuf == NULL) {
printf("Failed to calloc, reason:%s\n", strerror(errno)); printf("Failed to calloc, reason:%s\n", strerror(errno));
fclose(fp); fclose(fp);
...@@ -3486,20 +3508,20 @@ static int readTagFromCsvFileToMem(SSuperTable * superTblInfo) { ...@@ -3486,20 +3508,20 @@ static int readTagFromCsvFileToMem(SSuperTable * superTblInfo) {
continue; continue;
} }
memcpy(tagDataBuf + count * superTblInfo->lenOfTagOfOneRow, line, readLen); memcpy(tagDataBuf + count * stbInfo->lenOfTagOfOneRow, line, readLen);
count++; count++;
if (count >= tagCount - 1) { if (count >= tagCount - 1) {
char *tmp = realloc(tagDataBuf, char *tmp = realloc(tagDataBuf,
(size_t)tagCount*1.5*superTblInfo->lenOfTagOfOneRow); (size_t)tagCount*1.5*stbInfo->lenOfTagOfOneRow);
if (tmp != NULL) { if (tmp != NULL) {
tagDataBuf = tmp; tagDataBuf = tmp;
tagCount = (int)(tagCount*1.5); tagCount = (int)(tagCount*1.5);
memset(tagDataBuf + count*superTblInfo->lenOfTagOfOneRow, memset(tagDataBuf + count*stbInfo->lenOfTagOfOneRow,
0, (size_t)((tagCount-count)*superTblInfo->lenOfTagOfOneRow)); 0, (size_t)((tagCount-count)*stbInfo->lenOfTagOfOneRow));
} else { } else {
// exit, if allocate more memory failed // exit, if allocate more memory failed
printf("realloc fail for save tag val from %s\n", superTblInfo->tagsFile); printf("realloc fail for save tag val from %s\n", stbInfo->tagsFile);
tmfree(tagDataBuf); tmfree(tagDataBuf);
free(line); free(line);
fclose(fp); fclose(fp);
...@@ -3508,8 +3530,8 @@ static int readTagFromCsvFileToMem(SSuperTable * superTblInfo) { ...@@ -3508,8 +3530,8 @@ static int readTagFromCsvFileToMem(SSuperTable * superTblInfo) {
} }
} }
superTblInfo->tagDataBuf = tagDataBuf; stbInfo->tagDataBuf = tagDataBuf;
superTblInfo->tagSampleCount = count; stbInfo->tagSampleCount = count;
free(line); free(line);
fclose(fp); fclose(fp);
...@@ -3520,28 +3542,28 @@ static int readTagFromCsvFileToMem(SSuperTable * superTblInfo) { ...@@ -3520,28 +3542,28 @@ static int readTagFromCsvFileToMem(SSuperTable * superTblInfo) {
Read 10000 lines at most. If more than 10000 lines, continue to read after using Read 10000 lines at most. If more than 10000 lines, continue to read after using
*/ */
static int readSampleFromCsvFileToMem( static int readSampleFromCsvFileToMem(
SSuperTable* superTblInfo) { SSuperTable* stbInfo) {
size_t n = 0; size_t n = 0;
ssize_t readLen = 0; ssize_t readLen = 0;
char * line = NULL; char * line = NULL;
int getRows = 0; int getRows = 0;
FILE* fp = fopen(superTblInfo->sampleFile, "r"); FILE* fp = fopen(stbInfo->sampleFile, "r");
if (fp == NULL) { if (fp == NULL) {
errorPrint( "Failed to open sample file: %s, reason:%s\n", errorPrint( "Failed to open sample file: %s, reason:%s\n",
superTblInfo->sampleFile, strerror(errno)); stbInfo->sampleFile, strerror(errno));
return -1; return -1;
} }
assert(superTblInfo->sampleDataBuf); assert(stbInfo->sampleDataBuf);
memset(superTblInfo->sampleDataBuf, 0, memset(stbInfo->sampleDataBuf, 0,
MAX_SAMPLES_ONCE_FROM_FILE * superTblInfo->lenOfOneRow); MAX_SAMPLES_ONCE_FROM_FILE * stbInfo->lenOfOneRow);
while(1) { while(1) {
readLen = tgetline(&line, &n, fp); readLen = tgetline(&line, &n, fp);
if (-1 == readLen) { if (-1 == readLen) {
if(0 != fseek(fp, 0, SEEK_SET)) { if(0 != fseek(fp, 0, SEEK_SET)) {
errorPrint( "Failed to fseek file: %s, reason:%s\n", errorPrint( "Failed to fseek file: %s, reason:%s\n",
superTblInfo->sampleFile, strerror(errno)); stbInfo->sampleFile, strerror(errno));
fclose(fp); fclose(fp);
return -1; return -1;
} }
...@@ -3556,13 +3578,13 @@ static int readSampleFromCsvFileToMem( ...@@ -3556,13 +3578,13 @@ static int readSampleFromCsvFileToMem(
continue; continue;
} }
if (readLen > superTblInfo->lenOfOneRow) { if (readLen > stbInfo->lenOfOneRow) {
printf("sample row len[%d] overflow define schema len[%"PRIu64"], so discard this row\n", printf("sample row len[%d] overflow define schema len[%"PRIu64"], so discard this row\n",
(int32_t)readLen, superTblInfo->lenOfOneRow); (int32_t)readLen, stbInfo->lenOfOneRow);
continue; continue;
} }
memcpy(superTblInfo->sampleDataBuf + getRows * superTblInfo->lenOfOneRow, memcpy(stbInfo->sampleDataBuf + getRows * stbInfo->lenOfOneRow,
line, readLen); line, readLen);
getRows++; getRows++;
...@@ -5047,6 +5069,23 @@ static void postFreeResource() { ...@@ -5047,6 +5069,23 @@ static void postFreeResource() {
free(g_Dbs.db[i].superTbls[j].sampleDataBuf); free(g_Dbs.db[i].superTbls[j].sampleDataBuf);
g_Dbs.db[i].superTbls[j].sampleDataBuf = NULL; g_Dbs.db[i].superTbls[j].sampleDataBuf = NULL;
} }
#if STMT_IFACE_ENABLED == 1
if (g_Dbs.db[i].superTbls[j].sampleBindArray) {
for (int k = 0; k < MAX_SAMPLES_ONCE_FROM_FILE; k++) {
uintptr_t *tmp = (uintptr_t *)(*(uintptr_t *)(
g_Dbs.db[i].superTbls[j].sampleBindArray
+ sizeof(uintptr_t *) * k));
for (int c = 1; c < g_Dbs.db[i].superTbls[j].columnCount + 1; c++) {
TAOS_BIND *bind = (TAOS_BIND *)((char *)tmp + (sizeof(TAOS_BIND) * c));
if (bind)
tmfree(bind->buffer);
}
tmfree((char *)tmp);
}
}
tmfree((char *)g_Dbs.db[i].superTbls[j].sampleBindArray);
#endif
if (0 != g_Dbs.db[i].superTbls[j].tagDataBuf) { if (0 != g_Dbs.db[i].superTbls[j].tagDataBuf) {
free(g_Dbs.db[i].superTbls[j].tagDataBuf); free(g_Dbs.db[i].superTbls[j].tagDataBuf);
g_Dbs.db[i].superTbls[j].tagDataBuf = NULL; g_Dbs.db[i].superTbls[j].tagDataBuf = NULL;
...@@ -5067,21 +5106,14 @@ static void postFreeResource() { ...@@ -5067,21 +5106,14 @@ static void postFreeResource() {
tmfree(g_randfloat_buff); tmfree(g_randfloat_buff);
tmfree(g_rand_current_buff); tmfree(g_rand_current_buff);
tmfree(g_rand_phase_buff); tmfree(g_rand_phase_buff);
tmfree(g_randdouble_buff);
} }
static int getRowDataFromSample( static int getRowDataFromSample(
char* dataBuf, int64_t maxLen, int64_t timestamp, char* dataBuf, int64_t maxLen, int64_t timestamp,
SSuperTable* superTblInfo, int64_t* sampleUsePos) SSuperTable* stbInfo, int64_t* sampleUsePos)
{ {
if ((*sampleUsePos) == MAX_SAMPLES_ONCE_FROM_FILE) { if ((*sampleUsePos) == MAX_SAMPLES_ONCE_FROM_FILE) {
/* int ret = readSampleFromCsvFileToMem(superTblInfo);
if (0 != ret) {
tmfree(superTblInfo->sampleDataBuf);
superTblInfo->sampleDataBuf = NULL;
return -1;
}
*/
*sampleUsePos = 0; *sampleUsePos = 0;
} }
...@@ -5091,8 +5123,8 @@ static int getRowDataFromSample( ...@@ -5091,8 +5123,8 @@ static int getRowDataFromSample(
"(%" PRId64 ", ", timestamp); "(%" PRId64 ", ", timestamp);
dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen,
"%s", "%s",
superTblInfo->sampleDataBuf stbInfo->sampleDataBuf
+ superTblInfo->lenOfOneRow * (*sampleUsePos)); + stbInfo->lenOfOneRow * (*sampleUsePos));
dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, ")"); dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, ")");
(*sampleUsePos)++; (*sampleUsePos)++;
...@@ -5248,7 +5280,7 @@ static int64_t generateData(char *recBuf, char **data_type, ...@@ -5248,7 +5280,7 @@ static int64_t generateData(char *recBuf, char **data_type,
if (s == NULL) { if (s == NULL) {
errorPrint("%s() LN%d, memory allocation %d bytes failed\n", errorPrint("%s() LN%d, memory allocation %d bytes failed\n",
__func__, __LINE__, lenOfBinary + 1); __func__, __LINE__, lenOfBinary + 1);
exit(-1); exit(EXIT_FAILURE);
} }
rand_string(s, lenOfBinary); rand_string(s, lenOfBinary);
pstr += sprintf(pstr, ",\"%s\"", s); pstr += sprintf(pstr, ",\"%s\"", s);
...@@ -5258,7 +5290,7 @@ static int64_t generateData(char *recBuf, char **data_type, ...@@ -5258,7 +5290,7 @@ static int64_t generateData(char *recBuf, char **data_type,
if (s == NULL) { if (s == NULL) {
errorPrint("%s() LN%d, memory allocation %d bytes failed\n", errorPrint("%s() LN%d, memory allocation %d bytes failed\n",
__func__, __LINE__, lenOfBinary + 1); __func__, __LINE__, lenOfBinary + 1);
exit(-1); exit(EXIT_FAILURE);
} }
rand_string(s, lenOfBinary); rand_string(s, lenOfBinary);
pstr += sprintf(pstr, ",\"%s\"", s); pstr += sprintf(pstr, ",\"%s\"", s);
...@@ -5266,8 +5298,7 @@ static int64_t generateData(char *recBuf, char **data_type, ...@@ -5266,8 +5298,7 @@ static int64_t generateData(char *recBuf, char **data_type,
} }
if (strlen(recBuf) > MAX_DATA_SIZE) { if (strlen(recBuf) > MAX_DATA_SIZE) {
perror("column length too long, abort"); ERROR_EXIT("column length too long, abort");
exit(-1);
} }
} }
...@@ -5278,27 +5309,27 @@ static int64_t generateData(char *recBuf, char **data_type, ...@@ -5278,27 +5309,27 @@ static int64_t generateData(char *recBuf, char **data_type,
return (int32_t)strlen(recBuf); return (int32_t)strlen(recBuf);
} }
static int prepareSampleDataForSTable(SSuperTable *superTblInfo) { static int prepareSampleDataForSTable(SSuperTable *stbInfo) {
char* sampleDataBuf = NULL; char* sampleDataBuf = NULL;
sampleDataBuf = calloc( sampleDataBuf = calloc(
superTblInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE, 1); stbInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE, 1);
if (sampleDataBuf == NULL) { if (sampleDataBuf == NULL) {
errorPrint("%s() LN%d, Failed to calloc %"PRIu64" Bytes, reason:%s\n", errorPrint("%s() LN%d, Failed to calloc %"PRIu64" Bytes, reason:%s\n",
__func__, __LINE__, __func__, __LINE__,
superTblInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE, stbInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE,
strerror(errno)); strerror(errno));
return -1; return -1;
} }
superTblInfo->sampleDataBuf = sampleDataBuf; stbInfo->sampleDataBuf = sampleDataBuf;
int ret = readSampleFromCsvFileToMem(superTblInfo); int ret = readSampleFromCsvFileToMem(stbInfo);
if (0 != ret) { if (0 != ret) {
errorPrint("%s() LN%d, read sample from csv file failed.\n", errorPrint("%s() LN%d, read sample from csv file failed.\n",
__func__, __LINE__); __func__, __LINE__);
tmfree(sampleDataBuf); tmfree(sampleDataBuf);
superTblInfo->sampleDataBuf = NULL; stbInfo->sampleDataBuf = NULL;
return -1; return -1;
} }
...@@ -5308,14 +5339,14 @@ static int prepareSampleDataForSTable(SSuperTable *superTblInfo) { ...@@ -5308,14 +5339,14 @@ static int prepareSampleDataForSTable(SSuperTable *superTblInfo) {
static int32_t execInsert(threadInfo *pThreadInfo, uint32_t k) static int32_t execInsert(threadInfo *pThreadInfo, uint32_t k)
{ {
int32_t affectedRows; int32_t affectedRows;
SSuperTable* superTblInfo = pThreadInfo->superTblInfo; SSuperTable* stbInfo = pThreadInfo->stbInfo;
verbosePrint("[%d] %s() LN%d %s\n", pThreadInfo->threadID, verbosePrint("[%d] %s() LN%d %s\n", pThreadInfo->threadID,
__func__, __LINE__, pThreadInfo->buffer); __func__, __LINE__, pThreadInfo->buffer);
uint16_t iface; uint16_t iface;
if (superTblInfo) if (stbInfo)
iface = superTblInfo->iface; iface = stbInfo->iface;
else { else {
if (g_args.iface == INTERFACE_BUT) if (g_args.iface == INTERFACE_BUT)
iface = TAOSC_IFACE; iface = TAOSC_IFACE;
...@@ -5355,7 +5386,7 @@ static int32_t execInsert(threadInfo *pThreadInfo, uint32_t k) ...@@ -5355,7 +5386,7 @@ static int32_t execInsert(threadInfo *pThreadInfo, uint32_t k)
__func__, __LINE__, taos_stmt_errstr(pThreadInfo->stmt)); __func__, __LINE__, taos_stmt_errstr(pThreadInfo->stmt));
fprintf(stderr, "\n\033[31m === Please reduce batch number if WAL size exceeds limit. ===\033[0m\n\n"); fprintf(stderr, "\n\033[31m === Please reduce batch number if WAL size exceeds limit. ===\033[0m\n\n");
exit(-1); exit(EXIT_FAILURE);
} }
affectedRows = k; affectedRows = k;
break; break;
...@@ -5363,7 +5394,7 @@ static int32_t execInsert(threadInfo *pThreadInfo, uint32_t k) ...@@ -5363,7 +5394,7 @@ static int32_t execInsert(threadInfo *pThreadInfo, uint32_t k)
default: default:
errorPrint("%s() LN%d: unknown insert mode: %d\n", errorPrint("%s() LN%d: unknown insert mode: %d\n",
__func__, __LINE__, superTblInfo->iface); __func__, __LINE__, stbInfo->iface);
affectedRows = 0; affectedRows = 0;
} }
...@@ -5373,24 +5404,24 @@ static int32_t execInsert(threadInfo *pThreadInfo, uint32_t k) ...@@ -5373,24 +5404,24 @@ static int32_t execInsert(threadInfo *pThreadInfo, uint32_t k)
static void getTableName(char *pTblName, static void getTableName(char *pTblName,
threadInfo* pThreadInfo, uint64_t tableSeq) threadInfo* pThreadInfo, uint64_t tableSeq)
{ {
SSuperTable* superTblInfo = pThreadInfo->superTblInfo; SSuperTable* stbInfo = pThreadInfo->stbInfo;
if (superTblInfo) { if (stbInfo) {
if (AUTO_CREATE_SUBTBL != superTblInfo->autoCreateTable) { if (AUTO_CREATE_SUBTBL != stbInfo->autoCreateTable) {
if (superTblInfo->childTblLimit > 0) { if (stbInfo->childTblLimit > 0) {
snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s", snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s",
superTblInfo->childTblName + stbInfo->childTblName +
(tableSeq - superTblInfo->childTblOffset) * TSDB_TABLE_NAME_LEN); (tableSeq - stbInfo->childTblOffset) * TSDB_TABLE_NAME_LEN);
} else { } else {
verbosePrint("[%d] %s() LN%d: from=%"PRIu64" count=%"PRId64" seq=%"PRIu64"\n", verbosePrint("[%d] %s() LN%d: from=%"PRIu64" count=%"PRId64" seq=%"PRIu64"\n",
pThreadInfo->threadID, __func__, __LINE__, pThreadInfo->threadID, __func__, __LINE__,
pThreadInfo->start_table_from, pThreadInfo->start_table_from,
pThreadInfo->ntables, tableSeq); pThreadInfo->ntables, tableSeq);
snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s", snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s",
superTblInfo->childTblName + tableSeq * TSDB_TABLE_NAME_LEN); stbInfo->childTblName + tableSeq * TSDB_TABLE_NAME_LEN);
} }
} else { } else {
snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s%"PRIu64"", snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s%"PRIu64"",
superTblInfo->childTblPrefix, tableSeq); stbInfo->childTblPrefix, tableSeq);
} }
} else { } else {
snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s%"PRIu64"", snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s%"PRIu64"",
...@@ -5471,7 +5502,7 @@ static int64_t getTSRandTail(int64_t timeStampStep, int32_t seq, ...@@ -5471,7 +5502,7 @@ static int64_t getTSRandTail(int64_t timeStampStep, int32_t seq,
} }
static int32_t generateStbDataTail( static int32_t generateStbDataTail(
SSuperTable* superTblInfo, SSuperTable* stbInfo,
uint32_t batch, char* buffer, uint32_t batch, char* buffer,
int64_t remainderBufLen, int64_t insertRows, int64_t remainderBufLen, int64_t insertRows,
uint64_t recordFrom, int64_t startTime, uint64_t recordFrom, int64_t startTime,
...@@ -5481,7 +5512,7 @@ static int32_t generateStbDataTail( ...@@ -5481,7 +5512,7 @@ static int32_t generateStbDataTail(
char *pstr = buffer; char *pstr = buffer;
bool tsRand; bool tsRand;
if (0 == strncasecmp(superTblInfo->dataSource, "rand", strlen("rand"))) { if (0 == strncasecmp(stbInfo->dataSource, "rand", strlen("rand"))) {
tsRand = true; tsRand = true;
} else { } else {
tsRand = false; tsRand = false;
...@@ -5496,26 +5527,26 @@ static int32_t generateStbDataTail( ...@@ -5496,26 +5527,26 @@ static int32_t generateStbDataTail(
int64_t lenOfRow = 0; int64_t lenOfRow = 0;
if (tsRand) { if (tsRand) {
if (superTblInfo->disorderRatio > 0) { if (stbInfo->disorderRatio > 0) {
lenOfRow = generateStbRowData(superTblInfo, data, lenOfRow = generateStbRowData(stbInfo, data,
remainderBufLen, remainderBufLen,
startTime + getTSRandTail( startTime + getTSRandTail(
superTblInfo->timeStampStep, k, stbInfo->timeStampStep, k,
superTblInfo->disorderRatio, stbInfo->disorderRatio,
superTblInfo->disorderRange) stbInfo->disorderRange)
); );
} else { } else {
lenOfRow = generateStbRowData(superTblInfo, data, lenOfRow = generateStbRowData(stbInfo, data,
remainderBufLen, remainderBufLen,
startTime + superTblInfo->timeStampStep * k startTime + stbInfo->timeStampStep * k
); );
} }
} else { } else {
lenOfRow = getRowDataFromSample( lenOfRow = getRowDataFromSample(
data, data,
(remainderBufLen < MAX_DATA_SIZE)?remainderBufLen:MAX_DATA_SIZE, (remainderBufLen < MAX_DATA_SIZE)?remainderBufLen:MAX_DATA_SIZE,
startTime + superTblInfo->timeStampStep * k, startTime + stbInfo->timeStampStep * k,
superTblInfo, stbInfo,
pSamplePos); pSamplePos);
} }
...@@ -5571,7 +5602,7 @@ static int generateSQLHeadWithoutStb(char *tableName, ...@@ -5571,7 +5602,7 @@ static int generateSQLHeadWithoutStb(char *tableName,
} }
static int generateStbSQLHead( static int generateStbSQLHead(
SSuperTable* superTblInfo, SSuperTable* stbInfo,
char *tableName, int64_t tableSeq, char *tableName, int64_t tableSeq,
char *dbName, char *dbName,
char *buffer, int remainderBufLen) char *buffer, int remainderBufLen)
...@@ -5580,14 +5611,14 @@ static int generateStbSQLHead( ...@@ -5580,14 +5611,14 @@ static int generateStbSQLHead(
char headBuf[HEAD_BUFF_LEN]; char headBuf[HEAD_BUFF_LEN];
if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) { if (AUTO_CREATE_SUBTBL == stbInfo->autoCreateTable) {
char* tagsValBuf = NULL; char* tagsValBuf = NULL;
if (0 == superTblInfo->tagSource) { if (0 == stbInfo->tagSource) {
tagsValBuf = generateTagValuesForStb(superTblInfo, tableSeq); tagsValBuf = generateTagValuesForStb(stbInfo, tableSeq);
} else { } else {
tagsValBuf = getTagValueFromTagSample( tagsValBuf = getTagValueFromTagSample(
superTblInfo, stbInfo,
tableSeq % superTblInfo->tagSampleCount); tableSeq % stbInfo->tagSampleCount);
} }
if (NULL == tagsValBuf) { if (NULL == tagsValBuf) {
errorPrint("%s() LN%d, tag buf failed to allocate memory\n", errorPrint("%s() LN%d, tag buf failed to allocate memory\n",
...@@ -5602,10 +5633,10 @@ static int generateStbSQLHead( ...@@ -5602,10 +5633,10 @@ static int generateStbSQLHead(
dbName, dbName,
tableName, tableName,
dbName, dbName,
superTblInfo->sTblName, stbInfo->sTblName,
tagsValBuf); tagsValBuf);
tmfree(tagsValBuf); tmfree(tagsValBuf);
} else if (TBL_ALREADY_EXISTS == superTblInfo->childTblExists) { } else if (TBL_ALREADY_EXISTS == stbInfo->childTblExists) {
len = snprintf( len = snprintf(
headBuf, headBuf,
HEAD_BUFF_LEN, HEAD_BUFF_LEN,
...@@ -5630,12 +5661,12 @@ static int generateStbSQLHead( ...@@ -5630,12 +5661,12 @@ static int generateStbSQLHead(
} }
static int32_t generateStbInterlaceData( static int32_t generateStbInterlaceData(
SSuperTable *superTblInfo, threadInfo *pThreadInfo,
char *tableName, uint32_t batchPerTbl, char *tableName, uint32_t batchPerTbl,
uint64_t i, uint64_t i,
uint32_t batchPerTblTimes, uint32_t batchPerTblTimes,
uint64_t tableSeq, uint64_t tableSeq,
threadInfo *pThreadInfo, char *buffer, char *buffer,
int64_t insertRows, int64_t insertRows,
int64_t startTime, int64_t startTime,
uint64_t *pRemainderBufLen) uint64_t *pRemainderBufLen)
...@@ -5643,8 +5674,9 @@ static int32_t generateStbInterlaceData( ...@@ -5643,8 +5674,9 @@ static int32_t generateStbInterlaceData(
assert(buffer); assert(buffer);
char *pstr = buffer; char *pstr = buffer;
SSuperTable *stbInfo = pThreadInfo->stbInfo;
int headLen = generateStbSQLHead( int headLen = generateStbSQLHead(
superTblInfo, stbInfo,
tableName, tableSeq, pThreadInfo->db_name, tableName, tableSeq, pThreadInfo->db_name,
pstr, *pRemainderBufLen); pstr, *pRemainderBufLen);
...@@ -5664,12 +5696,12 @@ static int32_t generateStbInterlaceData( ...@@ -5664,12 +5696,12 @@ static int32_t generateStbInterlaceData(
pThreadInfo->threadID, __func__, __LINE__, pThreadInfo->threadID, __func__, __LINE__,
i, batchPerTblTimes, batchPerTbl); i, batchPerTblTimes, batchPerTbl);
if (0 == strncasecmp(superTblInfo->startTimestamp, "now", 3)) { if (0 == strncasecmp(stbInfo->startTimestamp, "now", 3)) {
startTime = taosGetTimestamp(pThreadInfo->time_precision); startTime = taosGetTimestamp(pThreadInfo->time_precision);
} }
int32_t k = generateStbDataTail( int32_t k = generateStbDataTail(
superTblInfo, stbInfo,
batchPerTbl, pstr, *pRemainderBufLen, insertRows, 0, batchPerTbl, pstr, *pRemainderBufLen, insertRows, 0,
startTime, startTime,
&(pThreadInfo->samplePos), &dataLen); &(pThreadInfo->samplePos), &dataLen);
...@@ -5732,8 +5764,206 @@ static int64_t generateInterlaceDataWithoutStb( ...@@ -5732,8 +5764,206 @@ static int64_t generateInterlaceDataWithoutStb(
} }
#if STMT_IFACE_ENABLED == 1 #if STMT_IFACE_ENABLED == 1
static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind, static int32_t prepareStmtBindArrayByType(
char *dataType, int32_t dataLen, char **ptr, char *value) TAOS_BIND *bind,
char *dataType, int32_t dataLen,
int32_t timePrec,
char *value)
{
if (0 == strncasecmp(dataType,
"BINARY", strlen("BINARY"))) {
if (dataLen > TSDB_MAX_BINARY_LEN) {
errorPrint( "binary length overflow, max size:%u\n",
(uint32_t)TSDB_MAX_BINARY_LEN);
return -1;
}
char *bind_binary;
bind->buffer_type = TSDB_DATA_TYPE_BINARY;
if (value) {
bind_binary = calloc(1, strlen(value) + 1);
strncpy(bind_binary, value, strlen(value));
bind->buffer_length = strlen(bind_binary);
} else {
bind_binary = calloc(1, dataLen + 1);
rand_string(bind_binary, dataLen);
bind->buffer_length = dataLen;
}
bind->length = &bind->buffer_length;
bind->buffer = bind_binary;
bind->is_null = NULL;
} else if (0 == strncasecmp(dataType,
"NCHAR", strlen("NCHAR"))) {
if (dataLen > TSDB_MAX_BINARY_LEN) {
errorPrint( "nchar length overflow, max size:%u\n",
(uint32_t)TSDB_MAX_BINARY_LEN);
return -1;
}
char *bind_nchar;
bind->buffer_type = TSDB_DATA_TYPE_NCHAR;
if (value) {
bind_nchar = calloc(1, strlen(value) + 1);
strncpy(bind_nchar, value, strlen(value));
} else {
bind_nchar = calloc(1, dataLen + 1);
rand_string(bind_nchar, dataLen);
}
bind->buffer_length = strlen(bind_nchar);
bind->buffer = bind_nchar;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
} else if (0 == strncasecmp(dataType,
"INT", strlen("INT"))) {
int32_t *bind_int = malloc(sizeof(int32_t));
if (value) {
*bind_int = atoi(value);
} else {
*bind_int = rand_int();
}
bind->buffer_type = TSDB_DATA_TYPE_INT;
bind->buffer_length = sizeof(int32_t);
bind->buffer = bind_int;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
} else if (0 == strncasecmp(dataType,
"BIGINT", strlen("BIGINT"))) {
int64_t *bind_bigint = malloc(sizeof(int64_t));
if (value) {
*bind_bigint = atoll(value);
} else {
*bind_bigint = rand_bigint();
}
bind->buffer_type = TSDB_DATA_TYPE_BIGINT;
bind->buffer_length = sizeof(int64_t);
bind->buffer = bind_bigint;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
} else if (0 == strncasecmp(dataType,
"FLOAT", strlen("FLOAT"))) {
float *bind_float = malloc(sizeof(float));
if (value) {
*bind_float = (float)atof(value);
} else {
*bind_float = rand_float();
}
bind->buffer_type = TSDB_DATA_TYPE_FLOAT;
bind->buffer_length = sizeof(float);
bind->buffer = bind_float;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
} else if (0 == strncasecmp(dataType,
"DOUBLE", strlen("DOUBLE"))) {
double *bind_double = malloc(sizeof(double));
if (value) {
*bind_double = atof(value);
} else {
*bind_double = rand_double();
}
bind->buffer_type = TSDB_DATA_TYPE_DOUBLE;
bind->buffer_length = sizeof(double);
bind->buffer = bind_double;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
} else if (0 == strncasecmp(dataType,
"SMALLINT", strlen("SMALLINT"))) {
int16_t *bind_smallint = malloc(sizeof(int16_t));
if (value) {
*bind_smallint = (int16_t)atoi(value);
} else {
*bind_smallint = rand_smallint();
}
bind->buffer_type = TSDB_DATA_TYPE_SMALLINT;
bind->buffer_length = sizeof(int16_t);
bind->buffer = bind_smallint;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
} else if (0 == strncasecmp(dataType,
"TINYINT", strlen("TINYINT"))) {
int8_t *bind_tinyint = malloc(sizeof(int8_t));
if (value) {
*bind_tinyint = (int8_t)atoi(value);
} else {
*bind_tinyint = rand_tinyint();
}
bind->buffer_type = TSDB_DATA_TYPE_TINYINT;
bind->buffer_length = sizeof(int8_t);
bind->buffer = bind_tinyint;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
} else if (0 == strncasecmp(dataType,
"BOOL", strlen("BOOL"))) {
int8_t *bind_bool = malloc(sizeof(int8_t));
if (value) {
if (strncasecmp(value, "true", 4)) {
*bind_bool = true;
} else {
*bind_bool = false;
}
} else {
*bind_bool = rand_bool();
}
bind->buffer_type = TSDB_DATA_TYPE_BOOL;
bind->buffer_length = sizeof(int8_t);
bind->buffer = bind_bool;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
} else if (0 == strncasecmp(dataType,
"TIMESTAMP", strlen("TIMESTAMP"))) {
int64_t *bind_ts2 = malloc(sizeof(int64_t));
if (value) {
if (strchr(value, ':') && strchr(value, '-')) {
int i = 0;
while(value[i] != '\0') {
if (value[i] == '\"' || value[i] == '\'') {
value[i] = ' ';
}
i++;
}
int64_t tmpEpoch;
if (TSDB_CODE_SUCCESS != taosParseTime(
value, &tmpEpoch, strlen(value),
timePrec, 0)) {
errorPrint("Input %s, time format error!\n", value);
return -1;
}
*bind_ts2 = tmpEpoch;
} else {
*bind_ts2 = atoll(value);
}
} else {
*bind_ts2 = rand_bigint();
}
bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
bind->buffer_length = sizeof(int64_t);
bind->buffer = bind_ts2;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
} else {
errorPrint( "No support data type: %s\n", dataType);
return -1;
}
return 0;
}
static int32_t prepareStmtBindArrayByTypeForRand(
TAOS_BIND *bind,
char *dataType, int32_t dataLen,
int32_t timePrec,
char **ptr,
char *value)
{ {
if (0 == strncasecmp(dataType, if (0 == strncasecmp(dataType,
"BINARY", strlen("BINARY"))) { "BINARY", strlen("BINARY"))) {
...@@ -5814,7 +6044,7 @@ static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind, ...@@ -5814,7 +6044,7 @@ static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind,
*ptr += bind->buffer_length; *ptr += bind->buffer_length;
} else if (0 == strncasecmp(dataType, } else if (0 == strncasecmp(dataType,
"FLOAT", strlen("FLOAT"))) { "FLOAT", strlen("FLOAT"))) {
float *bind_float = (float *) *ptr; float *bind_float = (float *)*ptr;
if (value) { if (value) {
*bind_float = (float)atof(value); *bind_float = (float)atof(value);
...@@ -5874,12 +6104,21 @@ static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind, ...@@ -5874,12 +6104,21 @@ static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind,
bind->buffer = bind_tinyint; bind->buffer = bind_tinyint;
bind->length = &bind->buffer_length; bind->length = &bind->buffer_length;
bind->is_null = NULL; bind->is_null = NULL;
*ptr += bind->buffer_length; *ptr += bind->buffer_length;
} else if (0 == strncasecmp(dataType, } else if (0 == strncasecmp(dataType,
"BOOL", strlen("BOOL"))) { "BOOL", strlen("BOOL"))) {
int8_t *bind_bool = (int8_t *)*ptr; int8_t *bind_bool = (int8_t *)*ptr;
if (value) {
if (strncasecmp(value, "true", 4)) {
*bind_bool = true;
} else {
*bind_bool = false;
}
} else {
*bind_bool = rand_bool(); *bind_bool = rand_bool();
}
bind->buffer_type = TSDB_DATA_TYPE_BOOL; bind->buffer_type = TSDB_DATA_TYPE_BOOL;
bind->buffer_length = sizeof(int8_t); bind->buffer_length = sizeof(int8_t);
bind->buffer = bind_bool; bind->buffer = bind_bool;
...@@ -5889,10 +6128,28 @@ static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind, ...@@ -5889,10 +6128,28 @@ static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind,
*ptr += bind->buffer_length; *ptr += bind->buffer_length;
} else if (0 == strncasecmp(dataType, } else if (0 == strncasecmp(dataType,
"TIMESTAMP", strlen("TIMESTAMP"))) { "TIMESTAMP", strlen("TIMESTAMP"))) {
int64_t *bind_ts2 = (int64_t *) *ptr; int64_t *bind_ts2 = (int64_t *)*ptr;
if (value) { if (value) {
if (strchr(value, ':') && strchr(value, '-')) {
int i = 0;
while(value[i] != '\0') {
if (value[i] == '\"' || value[i] == '\'') {
value[i] = ' ';
}
i++;
}
int64_t tmpEpoch;
if (TSDB_CODE_SUCCESS != taosParseTime(
value, &tmpEpoch, strlen(value),
timePrec, 0)) {
errorPrint("Input %s, time format error!\n", value);
return -1;
}
*bind_ts2 = tmpEpoch;
} else {
*bind_ts2 = atoll(value); *bind_ts2 = atoll(value);
}
} else { } else {
*bind_ts2 = rand_bigint(); *bind_ts2 = rand_bigint();
} }
...@@ -5912,13 +6169,14 @@ static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind, ...@@ -5912,13 +6169,14 @@ static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind,
} }
static int32_t prepareStmtWithoutStb( static int32_t prepareStmtWithoutStb(
TAOS_STMT *stmt, threadInfo *pThreadInfo,
char *tableName, char *tableName,
uint32_t batch, uint32_t batch,
int64_t insertRows, int64_t insertRows,
int64_t recordFrom, int64_t recordFrom,
int64_t startTime) int64_t startTime)
{ {
TAOS_STMT *stmt = pThreadInfo->stmt;
int ret = taos_stmt_set_tbname(stmt, tableName); int ret = taos_stmt_set_tbname(stmt, tableName);
if (ret != 0) { if (ret != 0) {
errorPrint("failed to execute taos_stmt_set_tbname(%s). return 0x%x. reason: %s\n", errorPrint("failed to execute taos_stmt_set_tbname(%s). return 0x%x. reason: %s\n",
...@@ -5938,15 +6196,11 @@ static int32_t prepareStmtWithoutStb( ...@@ -5938,15 +6196,11 @@ static int32_t prepareStmtWithoutStb(
int32_t k = 0; int32_t k = 0;
for (k = 0; k < batch;) { for (k = 0; k < batch;) {
/* columnCount + 1 (ts) */ /* columnCount + 1 (ts) */
char data[MAX_DATA_SIZE];
memset(data, 0, MAX_DATA_SIZE);
char *ptr = data;
TAOS_BIND *bind = (TAOS_BIND *)(bindArray + 0); TAOS_BIND *bind = (TAOS_BIND *)(bindArray + 0);
int64_t *bind_ts; int64_t *bind_ts = pThreadInfo->bind_ts;
bind_ts = (int64_t *)ptr;
bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP; bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
if (g_args.disorderRatio) { if (g_args.disorderRatio) {
...@@ -5962,8 +6216,6 @@ static int32_t prepareStmtWithoutStb( ...@@ -5962,8 +6216,6 @@ static int32_t prepareStmtWithoutStb(
bind->length = &bind->buffer_length; bind->length = &bind->buffer_length;
bind->is_null = NULL; bind->is_null = NULL;
ptr += bind->buffer_length;
for (int i = 0; i < g_args.num_of_CPR; i ++) { for (int i = 0; i < g_args.num_of_CPR; i ++) {
bind = (TAOS_BIND *)((char *)bindArray bind = (TAOS_BIND *)((char *)bindArray
+ (sizeof(TAOS_BIND) * (i + 1))); + (sizeof(TAOS_BIND) * (i + 1)));
...@@ -5971,7 +6223,8 @@ static int32_t prepareStmtWithoutStb( ...@@ -5971,7 +6223,8 @@ static int32_t prepareStmtWithoutStb(
bind, bind,
data_type[i], data_type[i],
g_args.len_of_binary, g_args.len_of_binary,
&ptr, NULL)) { pThreadInfo->time_precision,
NULL)) {
return -1; return -1;
} }
} }
...@@ -5998,10 +6251,42 @@ static int32_t prepareStmtWithoutStb( ...@@ -5998,10 +6251,42 @@ static int32_t prepareStmtWithoutStb(
return k; return k;
} }
static int32_t prepareStbStmtBind( static int32_t prepareStbStmtBindTag(
char *bindArray, SSuperTable *stbInfo, bool sourceRand, char *bindArray, SSuperTable *stbInfo,
char *tagsVal,
int32_t timePrec)
{
char *bindBuffer = calloc(1, DOUBLE_BUFF_LEN); // g_args.len_of_binary);
if (bindBuffer == NULL) {
errorPrint("%s() LN%d, Failed to allocate %d bind buffer\n",
__func__, __LINE__, g_args.len_of_binary);
return -1;
}
TAOS_BIND *tag;
for (int t = 0; t < stbInfo->tagCount; t ++) {
tag = (TAOS_BIND *)((char *)bindArray + (sizeof(TAOS_BIND) * t));
if ( -1 == prepareStmtBindArrayByType(
tag,
stbInfo->tags[t].dataType,
stbInfo->tags[t].dataLen,
timePrec,
NULL)) {
free(bindBuffer);
return -1;
}
}
free(bindBuffer);
return 0;
}
static int32_t prepareStbStmtBindRand(
int64_t *ts,
char *bindArray, SSuperTable *stbInfo,
int64_t startTime, int32_t recSeq, int64_t startTime, int32_t recSeq,
bool isColumn) int32_t timePrec)
{ {
char *bindBuffer = calloc(1, DOUBLE_BUFF_LEN); // g_args.len_of_binary); char *bindBuffer = calloc(1, DOUBLE_BUFF_LEN); // g_args.len_of_binary);
if (bindBuffer == NULL) { if (bindBuffer == NULL) {
...@@ -6016,16 +6301,12 @@ static int32_t prepareStbStmtBind( ...@@ -6016,16 +6301,12 @@ static int32_t prepareStbStmtBind(
TAOS_BIND *bind; TAOS_BIND *bind;
if (isColumn) {
int cursor = 0;
for (int i = 0; i < stbInfo->columnCount + 1; i ++) { for (int i = 0; i < stbInfo->columnCount + 1; i ++) {
bind = (TAOS_BIND *)((char *)bindArray + (sizeof(TAOS_BIND) * i)); bind = (TAOS_BIND *)((char *)bindArray + (sizeof(TAOS_BIND) * i));
if (i == 0) { if (i == 0) {
int64_t *bind_ts; int64_t *bind_ts = ts;
bind_ts = (int64_t *)ptr;
bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP; bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
if (stbInfo->disorderRatio) { if (stbInfo->disorderRatio) {
*bind_ts = startTime + getTSRandTail( *bind_ts = startTime + getTSRandTail(
...@@ -6041,96 +6322,71 @@ static int32_t prepareStbStmtBind( ...@@ -6041,96 +6322,71 @@ static int32_t prepareStbStmtBind(
bind->is_null = NULL; bind->is_null = NULL;
ptr += bind->buffer_length; ptr += bind->buffer_length;
} else { } else if ( -1 == prepareStmtBindArrayByTypeForRand(
if (sourceRand) {
if ( -1 == prepareStmtBindArrayByType(
bind, bind,
stbInfo->columns[i-1].dataType, stbInfo->columns[i-1].dataType,
stbInfo->columns[i-1].dataLen, stbInfo->columns[i-1].dataLen,
timePrec,
&ptr, &ptr,
NULL)) { NULL)) {
free(bindBuffer); tmfree(bindBuffer);
return -1; return -1;
} }
} else {
char *restStr = stbInfo->sampleDataBuf + cursor;
int lengthOfRest = strlen(restStr);
int index = 0;
for (index = 0; index < lengthOfRest; index ++) {
if (restStr[index] == ',') {
break;
}
} }
memset(bindBuffer, 0, g_args.len_of_binary); tmfree(bindBuffer);
strncpy(bindBuffer, restStr, index); return 0;
cursor += index + 1; // skip ',' too }
if ( -1 == prepareStmtBindArrayByType( static int32_t prepareStbStmtBindWithSample(
bind, int64_t *ts,
stbInfo->columns[i-1].dataType, char *bindArray, SSuperTable *stbInfo,
stbInfo->columns[i-1].dataLen, int64_t startTime, int32_t recSeq,
&ptr, int32_t timePrec,
bindBuffer)) { int64_t samplePos)
free(bindBuffer); {
return -1; TAOS_BIND *bind;
}
}
}
}
} else {
TAOS_BIND *tag;
for (int t = 0; t < stbInfo->tagCount; t ++) { bind = (TAOS_BIND *)bindArray;
tag = (TAOS_BIND *)((char *)bindArray + (sizeof(TAOS_BIND) * t));
if ( -1 == prepareStmtBindArrayByType(
tag,
stbInfo->tags[t].dataType,
stbInfo->tags[t].dataLen,
&ptr,
NULL)) {
free(bindBuffer);
return -1;
}
}
int64_t *bind_ts = ts;
bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
if (stbInfo->disorderRatio) {
*bind_ts = startTime + getTSRandTail(
stbInfo->timeStampStep, recSeq,
stbInfo->disorderRatio,
stbInfo->disorderRange);
} else {
*bind_ts = startTime + stbInfo->timeStampStep * recSeq;
} }
bind->buffer_length = sizeof(int64_t);
bind->buffer = bind_ts;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
free(bindBuffer);
return 0; return 0;
} }
static int32_t prepareStbStmt( static int32_t prepareStbStmtRand(
SSuperTable *stbInfo, threadInfo *pThreadInfo,
TAOS_STMT *stmt,
char *tableName, char *tableName,
int64_t tableSeq, int64_t tableSeq,
uint32_t batch, uint32_t batch,
uint64_t insertRows, uint64_t insertRows,
uint64_t recordFrom, uint64_t recordFrom,
int64_t startTime, int64_t startTime)
int64_t *pSamplePos)
{ {
int ret; int ret;
SSuperTable *stbInfo = pThreadInfo->stbInfo;
bool sourceRand; TAOS_STMT *stmt = pThreadInfo->stmt;
if (0 == strncasecmp(stbInfo->dataSource, "rand", strlen("rand"))) {
sourceRand = true;
} else {
sourceRand = false; // from sample data file
}
if (AUTO_CREATE_SUBTBL == stbInfo->autoCreateTable) { if (AUTO_CREATE_SUBTBL == stbInfo->autoCreateTable) {
char* tagsValBuf = NULL; char* tagsValBuf = NULL;
bool tagRand;
if (0 == stbInfo->tagSource) { if (0 == stbInfo->tagSource) {
tagRand = true;
tagsValBuf = generateTagValuesForStb(stbInfo, tableSeq); tagsValBuf = generateTagValuesForStb(stbInfo, tableSeq);
} else { } else {
tagRand = false;
tagsValBuf = getTagValueFromTagSample( tagsValBuf = getTagValueFromTagSample(
stbInfo, stbInfo,
tableSeq % stbInfo->tagSampleCount); tableSeq % stbInfo->tagSampleCount);
...@@ -6150,8 +6406,9 @@ static int32_t prepareStbStmt( ...@@ -6150,8 +6406,9 @@ static int32_t prepareStbStmt(
return -1; return -1;
} }
if (-1 == prepareStbStmtBind( if (-1 == prepareStbStmtBindTag(
tagsArray, stbInfo, tagRand, -1, -1, false /* is tag */)) { tagsArray, stbInfo, tagsValBuf, pThreadInfo->time_precision
/* is tag */)) {
tmfree(tagsValBuf); tmfree(tagsValBuf);
tmfree(tagsArray); tmfree(tagsArray);
return -1; return -1;
...@@ -6186,8 +6443,12 @@ static int32_t prepareStbStmt( ...@@ -6186,8 +6443,12 @@ static int32_t prepareStbStmt(
uint32_t k; uint32_t k;
for (k = 0; k < batch;) { for (k = 0; k < batch;) {
/* columnCount + 1 (ts) */ /* columnCount + 1 (ts) */
if (-1 == prepareStbStmtBind(bindArray, stbInfo, sourceRand, if (-1 == prepareStbStmtBindRand(
startTime, k, true /* is column */)) { pThreadInfo->bind_ts,
bindArray, stbInfo,
startTime, k,
pThreadInfo->time_precision
/* is column */)) {
free(bindArray); free(bindArray);
return -1; return -1;
} }
...@@ -6210,10 +6471,6 @@ static int32_t prepareStbStmt( ...@@ -6210,10 +6471,6 @@ static int32_t prepareStbStmt(
k++; k++;
recordFrom ++; recordFrom ++;
if (!sourceRand) {
(*pSamplePos) ++;
}
if (recordFrom >= insertRows) { if (recordFrom >= insertRows) {
break; break;
} }
...@@ -6223,9 +6480,8 @@ static int32_t prepareStbStmt( ...@@ -6223,9 +6480,8 @@ static int32_t prepareStbStmt(
return k; return k;
} }
static int32_t prepareStbStmtInterlace( static int32_t prepareStbStmtWithSample(
SSuperTable *stbInfo, threadInfo *pThreadInfo,
TAOS_STMT *stmt,
char *tableName, char *tableName,
int64_t tableSeq, int64_t tableSeq,
uint32_t batch, uint32_t batch,
...@@ -6234,41 +6490,109 @@ static int32_t prepareStbStmtInterlace( ...@@ -6234,41 +6490,109 @@ static int32_t prepareStbStmtInterlace(
int64_t startTime, int64_t startTime,
int64_t *pSamplePos) int64_t *pSamplePos)
{ {
return prepareStbStmt( int ret;
stbInfo, SSuperTable *stbInfo = pThreadInfo->stbInfo;
stmt, TAOS_STMT *stmt = pThreadInfo->stmt;
tableName,
tableSeq,
batch,
insertRows, 0, startTime,
pSamplePos);
}
static int32_t prepareStbStmtProgressive( if (AUTO_CREATE_SUBTBL == stbInfo->autoCreateTable) {
SSuperTable *stbInfo, char* tagsValBuf = NULL;
TAOS_STMT *stmt,
char *tableName, if (0 == stbInfo->tagSource) {
int64_t tableSeq, tagsValBuf = generateTagValuesForStb(stbInfo, tableSeq);
uint32_t batch, } else {
uint64_t insertRows, tagsValBuf = getTagValueFromTagSample(
uint64_t recordFrom,
int64_t startTime,
int64_t *pSamplePos)
{
return prepareStbStmt(
stbInfo, stbInfo,
stmt, tableSeq % stbInfo->tagSampleCount);
tableName, }
tableSeq,
g_args.num_of_RPR,
insertRows, recordFrom, startTime,
pSamplePos);
}
if (NULL == tagsValBuf) {
errorPrint("%s() LN%d, tag buf failed to allocate memory\n",
__func__, __LINE__);
return -1;
}
char *tagsArray = calloc(1, sizeof(TAOS_BIND) * stbInfo->tagCount);
if (NULL == tagsArray) {
tmfree(tagsValBuf);
errorPrint("%s() LN%d, tag buf failed to allocate memory\n",
__func__, __LINE__);
return -1;
}
if (-1 == prepareStbStmtBindTag(
tagsArray, stbInfo, tagsValBuf, pThreadInfo->time_precision
/* is tag */)) {
tmfree(tagsValBuf);
tmfree(tagsArray);
return -1;
}
ret = taos_stmt_set_tbname_tags(stmt, tableName, (TAOS_BIND *)tagsArray);
tmfree(tagsValBuf);
tmfree(tagsArray);
if (0 != ret) {
errorPrint("%s() LN%d, stmt_set_tbname_tags() failed! reason: %s\n",
__func__, __LINE__, taos_stmt_errstr(stmt));
return -1;
}
} else {
ret = taos_stmt_set_tbname(stmt, tableName);
if (0 != ret) {
errorPrint("%s() LN%d, stmt_set_tbname() failed! reason: %s\n",
__func__, __LINE__, taos_stmt_errstr(stmt));
return -1;
}
}
uint32_t k;
for (k = 0; k < batch;) {
char *bindArray = (char *)(*((uintptr_t *)
(stbInfo->sampleBindArray + (sizeof(char *)) * (*pSamplePos))));
/* columnCount + 1 (ts) */
if (-1 == prepareStbStmtBindWithSample(
pThreadInfo->bind_ts,
bindArray, stbInfo,
startTime, k,
pThreadInfo->time_precision,
*pSamplePos
/* is column */)) {
return -1;
}
ret = taos_stmt_bind_param(stmt, (TAOS_BIND *)bindArray);
if (0 != ret) {
errorPrint("%s() LN%d, stmt_bind_param() failed! reason: %s\n",
__func__, __LINE__, taos_stmt_errstr(stmt));
return -1;
}
// if msg > 3MB, break
ret = taos_stmt_add_batch(stmt);
if (0 != ret) {
errorPrint("%s() LN%d, stmt_add_batch() failed! reason: %s\n",
__func__, __LINE__, taos_stmt_errstr(stmt));
return -1;
}
k++;
recordFrom ++;
(*pSamplePos) ++;
if ((*pSamplePos) == MAX_SAMPLES_ONCE_FROM_FILE) {
*pSamplePos = 0;
}
if (recordFrom >= insertRows) {
break;
}
}
return k;
}
#endif #endif
static int32_t generateStbProgressiveData( static int32_t generateStbProgressiveData(
SSuperTable *superTblInfo, SSuperTable *stbInfo,
char *tableName, char *tableName,
int64_t tableSeq, int64_t tableSeq,
char *dbName, char *buffer, char *dbName, char *buffer,
...@@ -6282,7 +6606,7 @@ static int32_t generateStbProgressiveData( ...@@ -6282,7 +6606,7 @@ static int32_t generateStbProgressiveData(
memset(pstr, 0, *pRemainderBufLen); memset(pstr, 0, *pRemainderBufLen);
int64_t headLen = generateStbSQLHead( int64_t headLen = generateStbSQLHead(
superTblInfo, stbInfo,
tableName, tableSeq, dbName, tableName, tableSeq, dbName,
buffer, *pRemainderBufLen); buffer, *pRemainderBufLen);
...@@ -6294,7 +6618,7 @@ static int32_t generateStbProgressiveData( ...@@ -6294,7 +6618,7 @@ static int32_t generateStbProgressiveData(
int64_t dataLen; int64_t dataLen;
return generateStbDataTail(superTblInfo, return generateStbDataTail(stbInfo,
g_args.num_of_RPR, pstr, *pRemainderBufLen, g_args.num_of_RPR, pstr, *pRemainderBufLen,
insertRows, recordFrom, insertRows, recordFrom,
startTime, startTime,
...@@ -6354,26 +6678,34 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { ...@@ -6354,26 +6678,34 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
int64_t nTimeStampStep; int64_t nTimeStampStep;
uint64_t insert_interval; uint64_t insert_interval;
SSuperTable* superTblInfo = pThreadInfo->superTblInfo; bool sourceRand;
SSuperTable* stbInfo = pThreadInfo->stbInfo;
if (superTblInfo) { if (stbInfo) {
insertRows = superTblInfo->insertRows; insertRows = stbInfo->insertRows;
if ((superTblInfo->interlaceRows == 0) if ((stbInfo->interlaceRows == 0)
&& (g_args.interlace_rows > 0)) { && (g_args.interlace_rows > 0)) {
interlaceRows = g_args.interlace_rows; interlaceRows = g_args.interlace_rows;
} else { } else {
interlaceRows = superTblInfo->interlaceRows; interlaceRows = stbInfo->interlaceRows;
}
maxSqlLen = stbInfo->maxSqlLen;
nTimeStampStep = stbInfo->timeStampStep;
insert_interval = stbInfo->insertInterval;
if (0 == strncasecmp(stbInfo->dataSource, "rand", 4)) {
sourceRand = true;
} else {
sourceRand = false; // from sample data file
} }
maxSqlLen = superTblInfo->maxSqlLen;
nTimeStampStep = superTblInfo->timeStampStep;
insert_interval = superTblInfo->insertInterval;
} else { } else {
insertRows = g_args.num_of_DPT; insertRows = g_args.num_of_DPT;
interlaceRows = g_args.interlace_rows; interlaceRows = g_args.interlace_rows;
maxSqlLen = g_args.max_sql_len; maxSqlLen = g_args.max_sql_len;
nTimeStampStep = g_args.timestamp_step; nTimeStampStep = g_args.timestamp_step;
insert_interval = g_args.insert_interval; insert_interval = g_args.insert_interval;
sourceRand = true;
} }
debugPrint("[%d] %s() LN%d: start_table_from=%"PRIu64" ntables=%"PRId64" insertRows=%"PRIu64"\n", debugPrint("[%d] %s() LN%d: start_table_from=%"PRIu64" ntables=%"PRId64" insertRows=%"PRIu64"\n",
...@@ -6456,28 +6788,38 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { ...@@ -6456,28 +6788,38 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
uint64_t oldRemainderLen = remainderBufLen; uint64_t oldRemainderLen = remainderBufLen;
int32_t generated; int32_t generated;
if (superTblInfo) { if (stbInfo) {
if (superTblInfo->iface == STMT_IFACE) { if (stbInfo->iface == STMT_IFACE) {
#if STMT_IFACE_ENABLED == 1 #if STMT_IFACE_ENABLED == 1
generated = prepareStbStmtInterlace( if (sourceRand) {
superTblInfo, generated = prepareStbStmtRand(
pThreadInfo->stmt, pThreadInfo,
tableName, tableName,
tableSeq, tableSeq,
batchPerTbl, batchPerTbl,
insertRows, i, insertRows, 0,
startTime
);
} else {
generated = prepareStbStmtWithSample(
pThreadInfo,
tableName,
tableSeq,
batchPerTbl,
insertRows, 0,
startTime, startTime,
&(pThreadInfo->samplePos)); &(pThreadInfo->samplePos));
}
#else #else
generated = -1; generated = -1;
#endif #endif
} else { } else {
generated = generateStbInterlaceData( generated = generateStbInterlaceData(
superTblInfo, pThreadInfo,
tableName, batchPerTbl, i, tableName, batchPerTbl, i,
batchPerTblTimes, batchPerTblTimes,
tableSeq, tableSeq,
pThreadInfo, pstr, pstr,
insertRows, insertRows,
startTime, startTime,
&remainderBufLen); &remainderBufLen);
...@@ -6490,7 +6832,8 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { ...@@ -6490,7 +6832,8 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
tableName, batchPerTbl, startTime); tableName, batchPerTbl, startTime);
#if STMT_IFACE_ENABLED == 1 #if STMT_IFACE_ENABLED == 1
generated = prepareStmtWithoutStb( generated = prepareStmtWithoutStb(
pThreadInfo->stmt, tableName, pThreadInfo,
tableName,
batchPerTbl, batchPerTbl,
insertRows, i, insertRows, i,
startTime); startTime);
...@@ -6639,12 +6982,12 @@ free_of_interlace: ...@@ -6639,12 +6982,12 @@ free_of_interlace:
static void* syncWriteProgressive(threadInfo *pThreadInfo) { static void* syncWriteProgressive(threadInfo *pThreadInfo) {
debugPrint("%s() LN%d: ### progressive write\n", __func__, __LINE__); debugPrint("%s() LN%d: ### progressive write\n", __func__, __LINE__);
SSuperTable* superTblInfo = pThreadInfo->superTblInfo; SSuperTable* stbInfo = pThreadInfo->stbInfo;
uint64_t maxSqlLen = superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len; uint64_t maxSqlLen = stbInfo?stbInfo->maxSqlLen:g_args.max_sql_len;
int64_t timeStampStep = int64_t timeStampStep =
superTblInfo?superTblInfo->timeStampStep:g_args.timestamp_step; stbInfo?stbInfo->timeStampStep:g_args.timestamp_step;
int64_t insertRows = int64_t insertRows =
(superTblInfo)?superTblInfo->insertRows:g_args.num_of_DPT; (stbInfo)?stbInfo->insertRows:g_args.num_of_DPT;
verbosePrint("%s() LN%d insertRows=%"PRId64"\n", verbosePrint("%s() LN%d insertRows=%"PRId64"\n",
__func__, __LINE__, insertRows); __func__, __LINE__, insertRows);
...@@ -6663,6 +7006,17 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { ...@@ -6663,6 +7006,17 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
pThreadInfo->totalInsertRows = 0; pThreadInfo->totalInsertRows = 0;
pThreadInfo->totalAffectedRows = 0; pThreadInfo->totalAffectedRows = 0;
bool sourceRand;
if (stbInfo) {
if (0 == strncasecmp(stbInfo->dataSource, "rand", 4)) {
sourceRand = true;
} else {
sourceRand = false; // from sample data file
}
} else {
sourceRand = true;
}
pThreadInfo->samplePos = 0; pThreadInfo->samplePos = 0;
int percentComplete = 0; int percentComplete = 0;
...@@ -6696,24 +7050,35 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { ...@@ -6696,24 +7050,35 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
remainderBufLen -= len; remainderBufLen -= len;
int32_t generated; int32_t generated;
if (superTblInfo) { if (stbInfo) {
if (superTblInfo->iface == STMT_IFACE) { if (stbInfo->iface == STMT_IFACE) {
#if STMT_IFACE_ENABLED == 1 #if STMT_IFACE_ENABLED == 1
generated = prepareStbStmtProgressive( if (sourceRand) {
superTblInfo, generated = prepareStbStmtRand(
pThreadInfo->stmt, pThreadInfo,
tableName,
tableSeq,
g_args.num_of_RPR,
insertRows,
i, start_time
);
} else {
generated = prepareStbStmtWithSample(
pThreadInfo,
tableName, tableName,
tableSeq, tableSeq,
g_args.num_of_RPR, g_args.num_of_RPR,
insertRows, i, start_time, insertRows, i, start_time,
&(pThreadInfo->samplePos)); &(pThreadInfo->samplePos));
}
#else #else
generated = -1; generated = -1;
#endif #endif
} else { } else {
generated = generateStbProgressiveData( generated = generateStbProgressiveData(
superTblInfo, stbInfo,
tableName, tableSeq, pThreadInfo->db_name, pstr, tableName, tableSeq,
pThreadInfo->db_name, pstr,
insertRows, i, start_time, insertRows, i, start_time,
&(pThreadInfo->samplePos), &(pThreadInfo->samplePos),
&remainderBufLen); &remainderBufLen);
...@@ -6722,7 +7087,7 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { ...@@ -6722,7 +7087,7 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
if (g_args.iface == STMT_IFACE) { if (g_args.iface == STMT_IFACE) {
#if STMT_IFACE_ENABLED == 1 #if STMT_IFACE_ENABLED == 1
generated = prepareStmtWithoutStb( generated = prepareStmtWithoutStb(
pThreadInfo->stmt, pThreadInfo,
tableName, tableName,
g_args.num_of_RPR, g_args.num_of_RPR,
insertRows, i, insertRows, i,
...@@ -6792,9 +7157,9 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { ...@@ -6792,9 +7157,9 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
} // num_of_DPT } // num_of_DPT
if ((g_args.verbose_print) && if ((g_args.verbose_print) &&
(tableSeq == pThreadInfo->ntables - 1) && (superTblInfo) (tableSeq == pThreadInfo->ntables - 1) && (stbInfo)
&& (0 == strncasecmp( && (0 == strncasecmp(
superTblInfo->dataSource, stbInfo->dataSource,
"sample", strlen("sample")))) { "sample", strlen("sample")))) {
verbosePrint("%s() LN%d samplePos=%"PRId64"\n", verbosePrint("%s() LN%d samplePos=%"PRId64"\n",
__func__, __LINE__, pThreadInfo->samplePos); __func__, __LINE__, pThreadInfo->samplePos);
...@@ -6812,18 +7177,18 @@ free_of_progressive: ...@@ -6812,18 +7177,18 @@ free_of_progressive:
static void* syncWrite(void *sarg) { static void* syncWrite(void *sarg) {
threadInfo *pThreadInfo = (threadInfo *)sarg; threadInfo *pThreadInfo = (threadInfo *)sarg;
SSuperTable* superTblInfo = pThreadInfo->superTblInfo; SSuperTable* stbInfo = pThreadInfo->stbInfo;
setThreadName("syncWrite"); setThreadName("syncWrite");
uint32_t interlaceRows; uint32_t interlaceRows;
if (superTblInfo) { if (stbInfo) {
if ((superTblInfo->interlaceRows == 0) if ((stbInfo->interlaceRows == 0)
&& (g_args.interlace_rows > 0)) { && (g_args.interlace_rows > 0)) {
interlaceRows = g_args.interlace_rows; interlaceRows = g_args.interlace_rows;
} else { } else {
interlaceRows = superTblInfo->interlaceRows; interlaceRows = stbInfo->interlaceRows;
} }
} else { } else {
interlaceRows = g_args.interlace_rows; interlaceRows = g_args.interlace_rows;
...@@ -6840,10 +7205,10 @@ static void* syncWrite(void *sarg) { ...@@ -6840,10 +7205,10 @@ static void* syncWrite(void *sarg) {
static void callBack(void *param, TAOS_RES *res, int code) { static void callBack(void *param, TAOS_RES *res, int code) {
threadInfo* pThreadInfo = (threadInfo*)param; threadInfo* pThreadInfo = (threadInfo*)param;
SSuperTable* superTblInfo = pThreadInfo->superTblInfo; SSuperTable* stbInfo = pThreadInfo->stbInfo;
int insert_interval = int insert_interval =
superTblInfo?superTblInfo->insertInterval:g_args.insert_interval; stbInfo?stbInfo->insertInterval:g_args.insert_interval;
if (insert_interval) { if (insert_interval) {
pThreadInfo->et = taosGetTimestampMs(); pThreadInfo->et = taosGetTimestampMs();
if ((pThreadInfo->et - pThreadInfo->st) < insert_interval) { if ((pThreadInfo->et - pThreadInfo->st) < insert_interval) {
...@@ -6851,13 +7216,13 @@ static void callBack(void *param, TAOS_RES *res, int code) { ...@@ -6851,13 +7216,13 @@ static void callBack(void *param, TAOS_RES *res, int code) {
} }
} }
char *buffer = calloc(1, pThreadInfo->superTblInfo->maxSqlLen); char *buffer = calloc(1, pThreadInfo->stbInfo->maxSqlLen);
char data[MAX_DATA_SIZE]; char data[MAX_DATA_SIZE];
char *pstr = buffer; char *pstr = buffer;
pstr += sprintf(pstr, "insert into %s.%s%"PRId64" values", pstr += sprintf(pstr, "insert into %s.%s%"PRId64" values",
pThreadInfo->db_name, pThreadInfo->tb_prefix, pThreadInfo->db_name, pThreadInfo->tb_prefix,
pThreadInfo->start_table_from); pThreadInfo->start_table_from);
// if (pThreadInfo->counter >= pThreadInfo->superTblInfo->insertRows) { // if (pThreadInfo->counter >= pThreadInfo->stbInfo->insertRows) {
if (pThreadInfo->counter >= g_args.num_of_RPR) { if (pThreadInfo->counter >= g_args.num_of_RPR) {
pThreadInfo->start_table_from++; pThreadInfo->start_table_from++;
pThreadInfo->counter = 0; pThreadInfo->counter = 0;
...@@ -6871,15 +7236,15 @@ static void callBack(void *param, TAOS_RES *res, int code) { ...@@ -6871,15 +7236,15 @@ static void callBack(void *param, TAOS_RES *res, int code) {
for (int i = 0; i < g_args.num_of_RPR; i++) { for (int i = 0; i < g_args.num_of_RPR; i++) {
int rand_num = taosRandom() % 100; int rand_num = taosRandom() % 100;
if (0 != pThreadInfo->superTblInfo->disorderRatio if (0 != pThreadInfo->stbInfo->disorderRatio
&& rand_num < pThreadInfo->superTblInfo->disorderRatio) { && rand_num < pThreadInfo->stbInfo->disorderRatio) {
int64_t d = pThreadInfo->lastTs int64_t d = pThreadInfo->lastTs
- (taosRandom() % pThreadInfo->superTblInfo->disorderRange + 1); - (taosRandom() % pThreadInfo->stbInfo->disorderRange + 1);
generateStbRowData(pThreadInfo->superTblInfo, data, generateStbRowData(pThreadInfo->stbInfo, data,
MAX_DATA_SIZE, MAX_DATA_SIZE,
d); d);
} else { } else {
generateStbRowData(pThreadInfo->superTblInfo, generateStbRowData(pThreadInfo->stbInfo,
data, data,
MAX_DATA_SIZE, MAX_DATA_SIZE,
pThreadInfo->lastTs += 1000); pThreadInfo->lastTs += 1000);
...@@ -6887,7 +7252,7 @@ static void callBack(void *param, TAOS_RES *res, int code) { ...@@ -6887,7 +7252,7 @@ static void callBack(void *param, TAOS_RES *res, int code) {
pstr += sprintf(pstr, "%s", data); pstr += sprintf(pstr, "%s", data);
pThreadInfo->counter++; pThreadInfo->counter++;
if (pThreadInfo->counter >= pThreadInfo->superTblInfo->insertRows) { if (pThreadInfo->counter >= pThreadInfo->stbInfo->insertRows) {
break; break;
} }
} }
...@@ -6903,7 +7268,7 @@ static void callBack(void *param, TAOS_RES *res, int code) { ...@@ -6903,7 +7268,7 @@ static void callBack(void *param, TAOS_RES *res, int code) {
static void *asyncWrite(void *sarg) { static void *asyncWrite(void *sarg) {
threadInfo *pThreadInfo = (threadInfo *)sarg; threadInfo *pThreadInfo = (threadInfo *)sarg;
SSuperTable* superTblInfo = pThreadInfo->superTblInfo; SSuperTable* stbInfo = pThreadInfo->stbInfo;
setThreadName("asyncWrite"); setThreadName("asyncWrite");
...@@ -6912,7 +7277,7 @@ static void *asyncWrite(void *sarg) { ...@@ -6912,7 +7277,7 @@ static void *asyncWrite(void *sarg) {
pThreadInfo->lastTs = pThreadInfo->start_time; pThreadInfo->lastTs = pThreadInfo->start_time;
int insert_interval = int insert_interval =
superTblInfo?superTblInfo->insertInterval:g_args.insert_interval; stbInfo?stbInfo->insertInterval:g_args.insert_interval;
if (insert_interval) { if (insert_interval) {
pThreadInfo->st = taosGetTimestampMs(); pThreadInfo->st = taosGetTimestampMs();
} }
...@@ -6949,8 +7314,81 @@ static int convertHostToServAddr(char *host, uint16_t port, struct sockaddr_in * ...@@ -6949,8 +7314,81 @@ static int convertHostToServAddr(char *host, uint16_t port, struct sockaddr_in *
return 0; return 0;
} }
#if STMT_IFACE_ENABLED == 1
static int parseSampleFileToStmt(SSuperTable *stbInfo, uint32_t timePrec)
{
stbInfo->sampleBindArray = calloc(1, sizeof(char *) * MAX_SAMPLES_ONCE_FROM_FILE);
if (stbInfo->sampleBindArray == NULL) {
errorPrint("%s() LN%d, Failed to allocate %"PRIu64" bind array buffer\n",
__func__, __LINE__, (uint64_t)sizeof(char *) * MAX_SAMPLES_ONCE_FROM_FILE);
return -1;
}
for (int i=0; i < MAX_SAMPLES_ONCE_FROM_FILE; i++) {
char *bindArray = calloc(1, sizeof(TAOS_BIND) * (stbInfo->columnCount + 1));
if (bindArray == NULL) {
errorPrint("%s() LN%d, Failed to allocate %d bind params\n",
__func__, __LINE__, (stbInfo->columnCount + 1));
return -1;
}
TAOS_BIND *bind;
int cursor = 0;
for (int c = 0; c < stbInfo->columnCount + 1; c++) {
bind = (TAOS_BIND *)((char *)bindArray + (sizeof(TAOS_BIND) * c));
if (c == 0) {
bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
bind->buffer_length = sizeof(int64_t);
bind->buffer = NULL; //bind_ts;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
} else {
char *restStr = stbInfo->sampleDataBuf
+ stbInfo->lenOfOneRow * i + cursor;
int lengthOfRest = strlen(restStr);
int index = 0;
for (index = 0; index < lengthOfRest; index ++) {
if (restStr[index] == ',') {
break;
}
}
char *bindBuffer = calloc(1, index + 1);
if (bindBuffer == NULL) {
errorPrint("%s() LN%d, Failed to allocate %d bind buffer\n",
__func__, __LINE__, DOUBLE_BUFF_LEN);
return -1;
}
strncpy(bindBuffer, restStr, index);
cursor += index + 1; // skip ',' too
if (-1 == prepareStmtBindArrayByType(
bind,
stbInfo->columns[c-1].dataType,
stbInfo->columns[c-1].dataLen,
timePrec,
bindBuffer)) {
free(bindBuffer);
return -1;
}
free(bindBuffer);
}
}
*((uintptr_t *)(stbInfo->sampleBindArray + (sizeof(char *)) * i)) = (uintptr_t)bindArray;
}
return 0;
}
#endif
static void startMultiThreadInsertData(int threads, char* db_name, static void startMultiThreadInsertData(int threads, char* db_name,
char* precision, SSuperTable* superTblInfo) { char* precision, SSuperTable* stbInfo) {
int32_t timePrec = TSDB_TIME_PRECISION_MILLI; int32_t timePrec = TSDB_TIME_PRECISION_MILLI;
if (0 != precision[0]) { if (0 != precision[0]) {
...@@ -6964,19 +7402,19 @@ static void startMultiThreadInsertData(int threads, char* db_name, ...@@ -6964,19 +7402,19 @@ static void startMultiThreadInsertData(int threads, char* db_name,
#endif #endif
} else { } else {
errorPrint("Not support precision: %s\n", precision); errorPrint("Not support precision: %s\n", precision);
exit(-1); exit(EXIT_FAILURE);
} }
} }
int64_t start_time; int64_t start_time;
if (superTblInfo) { if (stbInfo) {
if (0 == strncasecmp(superTblInfo->startTimestamp, "now", 3)) { if (0 == strncasecmp(stbInfo->startTimestamp, "now", 3)) {
start_time = taosGetTimestamp(timePrec); start_time = taosGetTimestamp(timePrec);
} else { } else {
if (TSDB_CODE_SUCCESS != taosParseTime( if (TSDB_CODE_SUCCESS != taosParseTime(
superTblInfo->startTimestamp, stbInfo->startTimestamp,
&start_time, &start_time,
strlen(superTblInfo->startTimestamp), strlen(stbInfo->startTimestamp),
timePrec, 0)) { timePrec, 0)) {
ERROR_EXIT("failed to parse time!\n"); ERROR_EXIT("failed to parse time!\n");
} }
...@@ -6990,12 +7428,12 @@ static void startMultiThreadInsertData(int threads, char* db_name, ...@@ -6990,12 +7428,12 @@ static void startMultiThreadInsertData(int threads, char* db_name,
int64_t start = taosGetTimestampMs(); int64_t start = taosGetTimestampMs();
// read sample data from file first // read sample data from file first
if ((superTblInfo) && (0 == strncasecmp(superTblInfo->dataSource, if ((stbInfo) && (0 == strncasecmp(stbInfo->dataSource,
"sample", strlen("sample")))) { "sample", strlen("sample")))) {
if (0 != prepareSampleDataForSTable(superTblInfo)) { if (0 != prepareSampleDataForSTable(stbInfo)) {
errorPrint("%s() LN%d, prepare sample data for stable failed!\n", errorPrint("%s() LN%d, prepare sample data for stable failed!\n",
__func__, __LINE__); __func__, __LINE__);
exit(-1); exit(EXIT_FAILURE);
} }
} }
...@@ -7005,68 +7443,68 @@ static void startMultiThreadInsertData(int threads, char* db_name, ...@@ -7005,68 +7443,68 @@ static void startMultiThreadInsertData(int threads, char* db_name,
if (NULL == taos0) { if (NULL == taos0) {
errorPrint("%s() LN%d, connect to server fail , reason: %s\n", errorPrint("%s() LN%d, connect to server fail , reason: %s\n",
__func__, __LINE__, taos_errstr(NULL)); __func__, __LINE__, taos_errstr(NULL));
exit(-1); exit(EXIT_FAILURE);
} }
int64_t ntables = 0; int64_t ntables = 0;
uint64_t tableFrom; uint64_t tableFrom;
if (superTblInfo) { if (stbInfo) {
int64_t limit; int64_t limit;
uint64_t offset; uint64_t offset;
if ((NULL != g_args.sqlFile) if ((NULL != g_args.sqlFile)
&& (superTblInfo->childTblExists == TBL_NO_EXISTS) && (stbInfo->childTblExists == TBL_NO_EXISTS)
&& ((superTblInfo->childTblOffset != 0) && ((stbInfo->childTblOffset != 0)
|| (superTblInfo->childTblLimit >= 0))) { || (stbInfo->childTblLimit >= 0))) {
printf("WARNING: offset and limit will not be used since the child tables not exists!\n"); printf("WARNING: offset and limit will not be used since the child tables not exists!\n");
} }
if (superTblInfo->childTblExists == TBL_ALREADY_EXISTS) { if (stbInfo->childTblExists == TBL_ALREADY_EXISTS) {
if ((superTblInfo->childTblLimit < 0) if ((stbInfo->childTblLimit < 0)
|| ((superTblInfo->childTblOffset || ((stbInfo->childTblOffset
+ superTblInfo->childTblLimit) + stbInfo->childTblLimit)
> (superTblInfo->childTblCount))) { > (stbInfo->childTblCount))) {
superTblInfo->childTblLimit = stbInfo->childTblLimit =
superTblInfo->childTblCount - superTblInfo->childTblOffset; stbInfo->childTblCount - stbInfo->childTblOffset;
} }
offset = superTblInfo->childTblOffset; offset = stbInfo->childTblOffset;
limit = superTblInfo->childTblLimit; limit = stbInfo->childTblLimit;
} else { } else {
limit = superTblInfo->childTblCount; limit = stbInfo->childTblCount;
offset = 0; offset = 0;
} }
ntables = limit; ntables = limit;
tableFrom = offset; tableFrom = offset;
if ((superTblInfo->childTblExists != TBL_NO_EXISTS) if ((stbInfo->childTblExists != TBL_NO_EXISTS)
&& ((superTblInfo->childTblOffset + superTblInfo->childTblLimit ) && ((stbInfo->childTblOffset + stbInfo->childTblLimit)
> superTblInfo->childTblCount)) { > stbInfo->childTblCount)) {
printf("WARNING: specified offset + limit > child table count!\n"); printf("WARNING: specified offset + limit > child table count!\n");
prompt(); prompt();
} }
if ((superTblInfo->childTblExists != TBL_NO_EXISTS) if ((stbInfo->childTblExists != TBL_NO_EXISTS)
&& (0 == superTblInfo->childTblLimit)) { && (0 == stbInfo->childTblLimit)) {
printf("WARNING: specified limit = 0, which cannot find table name to insert or query! \n"); printf("WARNING: specified limit = 0, which cannot find table name to insert or query! \n");
prompt(); prompt();
} }
superTblInfo->childTblName = (char*)calloc(1, stbInfo->childTblName = (char*)calloc(1,
limit * TSDB_TABLE_NAME_LEN); limit * TSDB_TABLE_NAME_LEN);
if (superTblInfo->childTblName == NULL) { if (stbInfo->childTblName == NULL) {
errorPrint("%s() LN%d, alloc memory failed!\n", __func__, __LINE__);
taos_close(taos0); taos_close(taos0);
exit(-1); errorPrint("%s() LN%d, alloc memory failed!\n", __func__, __LINE__);
exit(EXIT_FAILURE);
} }
int64_t childTblCount; int64_t childTblCount;
getChildNameOfSuperTableWithLimitAndOffset( getChildNameOfSuperTableWithLimitAndOffset(
taos0, taos0,
db_name, superTblInfo->sTblName, db_name, stbInfo->sTblName,
&superTblInfo->childTblName, &childTblCount, &stbInfo->childTblName, &childTblCount,
limit, limit,
offset); offset);
} else { } else {
...@@ -7087,11 +7525,11 @@ static void startMultiThreadInsertData(int threads, char* db_name, ...@@ -7087,11 +7525,11 @@ static void startMultiThreadInsertData(int threads, char* db_name,
b = ntables % threads; b = ntables % threads;
} }
if ((superTblInfo) if ((stbInfo)
&& (superTblInfo->iface == REST_IFACE)) { && (stbInfo->iface == REST_IFACE)) {
if (convertHostToServAddr( if (convertHostToServAddr(
g_Dbs.host, g_Dbs.port, &(g_Dbs.serv_addr)) != 0) { g_Dbs.host, g_Dbs.port, &(g_Dbs.serv_addr)) != 0) {
exit(-1); ERROR_EXIT("convert host to server address");
} }
} }
...@@ -7104,98 +7542,110 @@ static void startMultiThreadInsertData(int threads, char* db_name, ...@@ -7104,98 +7542,110 @@ static void startMultiThreadInsertData(int threads, char* db_name,
memset(pids, 0, threads * sizeof(pthread_t)); memset(pids, 0, threads * sizeof(pthread_t));
memset(infos, 0, threads * sizeof(threadInfo)); memset(infos, 0, threads * sizeof(threadInfo));
#if STMT_IFACE_ENABLED == 1
char *stmtBuffer = calloc(1, BUFFER_SIZE);
assert(stmtBuffer);
if ((g_args.iface == STMT_IFACE)
|| ((stbInfo)
&& (stbInfo->iface == STMT_IFACE))) {
char *pstr = stmtBuffer;
if ((stbInfo)
&& (AUTO_CREATE_SUBTBL
== stbInfo->autoCreateTable)) {
pstr += sprintf(pstr, "INSERT INTO ? USING %s TAGS(?",
stbInfo->sTblName);
for (int tag = 0; tag < (stbInfo->tagCount - 1);
tag ++ ) {
pstr += sprintf(pstr, ",?");
}
pstr += sprintf(pstr, ") VALUES(?");
} else {
pstr += sprintf(pstr, "INSERT INTO ? VALUES(?");
}
int columnCount;
if (stbInfo) {
columnCount = stbInfo->columnCount;
} else {
columnCount = g_args.num_of_CPR;
}
for (int col = 0; col < columnCount; col ++) {
pstr += sprintf(pstr, ",?");
}
pstr += sprintf(pstr, ")");
debugPrint("%s() LN%d, stmtBuffer: %s", __func__, __LINE__, stmtBuffer);
if ((stbInfo) && (0 == strncasecmp(stbInfo->dataSource,
"sample", strlen("sample")))) {
parseSampleFileToStmt(stbInfo, timePrec);
}
}
#endif
for (int i = 0; i < threads; i++) { for (int i = 0; i < threads; i++) {
threadInfo *pThreadInfo = infos + i; threadInfo *pThreadInfo = infos + i;
pThreadInfo->threadID = i; pThreadInfo->threadID = i;
tstrncpy(pThreadInfo->db_name, db_name, TSDB_DB_NAME_LEN); tstrncpy(pThreadInfo->db_name, db_name, TSDB_DB_NAME_LEN);
pThreadInfo->time_precision = timePrec; pThreadInfo->time_precision = timePrec;
pThreadInfo->superTblInfo = superTblInfo; pThreadInfo->stbInfo = stbInfo;
pThreadInfo->start_time = start_time; pThreadInfo->start_time = start_time;
pThreadInfo->minDelay = UINT64_MAX; pThreadInfo->minDelay = UINT64_MAX;
if ((NULL == superTblInfo) || if ((NULL == stbInfo) ||
(superTblInfo->iface != REST_IFACE)) { (stbInfo->iface != REST_IFACE)) {
//t_info->taos = taos; //t_info->taos = taos;
pThreadInfo->taos = taos_connect( pThreadInfo->taos = taos_connect(
g_Dbs.host, g_Dbs.user, g_Dbs.host, g_Dbs.user,
g_Dbs.password, db_name, g_Dbs.port); g_Dbs.password, db_name, g_Dbs.port);
if (NULL == pThreadInfo->taos) { if (NULL == pThreadInfo->taos) {
free(infos);
errorPrint( errorPrint(
"%s() LN%d, connect to server fail from insert sub thread, reason: %s\n", "%s() LN%d, connect to server fail from insert sub thread, reason: %s\n",
__func__, __LINE__, __func__, __LINE__,
taos_errstr(NULL)); taos_errstr(NULL));
free(infos); exit(EXIT_FAILURE);
exit(-1);
} }
#if STMT_IFACE_ENABLED == 1 #if STMT_IFACE_ENABLED == 1
if ((g_args.iface == STMT_IFACE) if ((g_args.iface == STMT_IFACE)
|| ((superTblInfo) || ((stbInfo)
&& (superTblInfo->iface == STMT_IFACE))) { && (stbInfo->iface == STMT_IFACE))) {
int columnCount;
if (superTblInfo) {
columnCount = superTblInfo->columnCount;
} else {
columnCount = g_args.num_of_CPR;
}
pThreadInfo->stmt = taos_stmt_init(pThreadInfo->taos); pThreadInfo->stmt = taos_stmt_init(pThreadInfo->taos);
if (NULL == pThreadInfo->stmt) { if (NULL == pThreadInfo->stmt) {
free(pids);
free(infos);
errorPrint( errorPrint(
"%s() LN%d, failed init stmt, reason: %s\n", "%s() LN%d, failed init stmt, reason: %s\n",
__func__, __LINE__, __func__, __LINE__,
taos_errstr(NULL)); taos_errstr(NULL));
free(pids); exit(EXIT_FAILURE);
free(infos);
exit(-1);
}
char *buffer = calloc(1, BUFFER_SIZE);
assert(buffer);
char *pstr = buffer;
if ((superTblInfo)
&& (AUTO_CREATE_SUBTBL
== superTblInfo->autoCreateTable)) {
pstr += sprintf(pstr, "INSERT INTO ? USING %s TAGS(?",
superTblInfo->sTblName);
for (int tag = 0; tag < (superTblInfo->tagCount - 1);
tag ++ ) {
pstr += sprintf(pstr, ",?");
}
pstr += sprintf(pstr, ") VALUES(?");
} else {
pstr += sprintf(pstr, "INSERT INTO ? VALUES(?");
}
for (int col = 0; col < columnCount; col ++) {
pstr += sprintf(pstr, ",?");
} }
pstr += sprintf(pstr, ")");
debugPrint("%s() LN%d, buffer: %s", __func__, __LINE__, buffer); int ret = taos_stmt_prepare(pThreadInfo->stmt, stmtBuffer, 0);
int ret = taos_stmt_prepare(pThreadInfo->stmt, buffer, 0);
if (ret != 0){ if (ret != 0){
errorPrint("failed to execute taos_stmt_prepare. return 0x%x. reason: %s\n",
ret, taos_stmt_errstr(pThreadInfo->stmt));
free(pids); free(pids);
free(infos); free(infos);
free(buffer); free(stmtBuffer);
exit(-1); errorPrint("failed to execute taos_stmt_prepare. return 0x%x. reason: %s\n",
ret, taos_stmt_errstr(pThreadInfo->stmt));
exit(EXIT_FAILURE);
} }
pThreadInfo->bind_ts = malloc(sizeof(int64_t));
free(buffer);
} }
#endif #endif
} else { } else {
pThreadInfo->taos = NULL; pThreadInfo->taos = NULL;
} }
/* if ((NULL == superTblInfo) /* if ((NULL == stbInfo)
|| (0 == superTblInfo->multiThreadWriteOneTbl)) { || (0 == stbInfo->multiThreadWriteOneTbl)) {
*/ */
pThreadInfo->start_table_from = tableFrom; pThreadInfo->start_table_from = tableFrom;
pThreadInfo->ntables = i<b?a+1:a; pThreadInfo->ntables = i<b?a+1:a;
...@@ -7203,7 +7653,7 @@ static void startMultiThreadInsertData(int threads, char* db_name, ...@@ -7203,7 +7653,7 @@ static void startMultiThreadInsertData(int threads, char* db_name,
tableFrom = pThreadInfo->end_table_to + 1; tableFrom = pThreadInfo->end_table_to + 1;
/* } else { /* } else {
pThreadInfo->start_table_from = 0; pThreadInfo->start_table_from = 0;
pThreadInfo->ntables = superTblInfo->childTblCount; pThreadInfo->ntables = stbInfo->childTblCount;
pThreadInfo->start_time = pThreadInfo->start_time + rand_int() % 10000 - rand_tinyint(); pThreadInfo->start_time = pThreadInfo->start_time + rand_int() % 10000 - rand_tinyint();
} }
*/ */
...@@ -7215,6 +7665,10 @@ static void startMultiThreadInsertData(int threads, char* db_name, ...@@ -7215,6 +7665,10 @@ static void startMultiThreadInsertData(int threads, char* db_name,
} }
} }
#if STMT_IFACE_ENABLED == 1
free(stmtBuffer);
#endif
for (int i = 0; i < threads; i++) { for (int i = 0; i < threads; i++) {
pthread_join(pids[i], NULL); pthread_join(pids[i], NULL);
} }
...@@ -7228,11 +7682,10 @@ static void startMultiThreadInsertData(int threads, char* db_name, ...@@ -7228,11 +7682,10 @@ static void startMultiThreadInsertData(int threads, char* db_name,
for (int i = 0; i < threads; i++) { for (int i = 0; i < threads; i++) {
threadInfo *pThreadInfo = infos + i; threadInfo *pThreadInfo = infos + i;
tsem_destroy(&(pThreadInfo->lock_sem));
#if STMT_IFACE_ENABLED == 1 #if STMT_IFACE_ENABLED == 1
if (pThreadInfo->stmt) { if (pThreadInfo->stmt) {
taos_stmt_close(pThreadInfo->stmt); taos_stmt_close(pThreadInfo->stmt);
tmfree((char *)pThreadInfo->bind_ts);
} }
#endif #endif
tsem_destroy(&(pThreadInfo->lock_sem)); tsem_destroy(&(pThreadInfo->lock_sem));
...@@ -7242,9 +7695,9 @@ static void startMultiThreadInsertData(int threads, char* db_name, ...@@ -7242,9 +7695,9 @@ static void startMultiThreadInsertData(int threads, char* db_name,
__func__, __LINE__, __func__, __LINE__,
pThreadInfo->threadID, pThreadInfo->totalInsertRows, pThreadInfo->threadID, pThreadInfo->totalInsertRows,
pThreadInfo->totalAffectedRows); pThreadInfo->totalAffectedRows);
if (superTblInfo) { if (stbInfo) {
superTblInfo->totalAffectedRows += pThreadInfo->totalAffectedRows; stbInfo->totalAffectedRows += pThreadInfo->totalAffectedRows;
superTblInfo->totalInsertRows += pThreadInfo->totalInsertRows; stbInfo->totalInsertRows += pThreadInfo->totalInsertRows;
} else { } else {
g_args.totalAffectedRows += pThreadInfo->totalAffectedRows; g_args.totalAffectedRows += pThreadInfo->totalAffectedRows;
g_args.totalInsertRows += pThreadInfo->totalInsertRows; g_args.totalInsertRows += pThreadInfo->totalInsertRows;
...@@ -7265,22 +7718,22 @@ static void startMultiThreadInsertData(int threads, char* db_name, ...@@ -7265,22 +7718,22 @@ static void startMultiThreadInsertData(int threads, char* db_name,
double tInMs = t/1000.0; double tInMs = t/1000.0;
if (superTblInfo) { if (stbInfo) {
fprintf(stderr, "Spent %.2f seconds to insert rows: %"PRIu64", affected rows: %"PRIu64" with %d thread(s) into %s.%s. %.2f records/second\n\n", fprintf(stderr, "Spent %.2f seconds to insert rows: %"PRIu64", affected rows: %"PRIu64" with %d thread(s) into %s.%s. %.2f records/second\n\n",
tInMs, superTblInfo->totalInsertRows, tInMs, stbInfo->totalInsertRows,
superTblInfo->totalAffectedRows, stbInfo->totalAffectedRows,
threads, db_name, superTblInfo->sTblName, threads, db_name, stbInfo->sTblName,
(tInMs)? (tInMs)?
(double)(superTblInfo->totalInsertRows/tInMs):FLT_MAX); (double)(stbInfo->totalInsertRows/tInMs):FLT_MAX);
if (g_fpOfInsertResult) { if (g_fpOfInsertResult) {
fprintf(g_fpOfInsertResult, fprintf(g_fpOfInsertResult,
"Spent %.2f seconds to insert rows: %"PRIu64", affected rows: %"PRIu64" with %d thread(s) into %s.%s. %.2f records/second\n\n", "Spent %.2f seconds to insert rows: %"PRIu64", affected rows: %"PRIu64" with %d thread(s) into %s.%s. %.2f records/second\n\n",
tInMs, superTblInfo->totalInsertRows, tInMs, stbInfo->totalInsertRows,
superTblInfo->totalAffectedRows, stbInfo->totalAffectedRows,
threads, db_name, superTblInfo->sTblName, threads, db_name, stbInfo->sTblName,
(tInMs)? (tInMs)?
(double)(superTblInfo->totalInsertRows/tInMs):FLT_MAX); (double)(stbInfo->totalInsertRows/tInMs):FLT_MAX);
} }
} else { } else {
fprintf(stderr, "Spent %.2f seconds to insert rows: %"PRIu64", affected rows: %"PRIu64" with %d thread(s) into %s %.2f records/second\n\n", fprintf(stderr, "Spent %.2f seconds to insert rows: %"PRIu64", affected rows: %"PRIu64" with %d thread(s) into %s %.2f records/second\n\n",
...@@ -7335,8 +7788,8 @@ static void *readTable(void *sarg) { ...@@ -7335,8 +7788,8 @@ static void *readTable(void *sarg) {
} }
int64_t num_of_DPT; int64_t num_of_DPT;
/* if (pThreadInfo->superTblInfo) { /* if (pThreadInfo->stbInfo) {
num_of_DPT = pThreadInfo->superTblInfo->insertRows; // nrecords_per_table; num_of_DPT = pThreadInfo->stbInfo->insertRows; // nrecords_per_table;
} else { } else {
*/ */
num_of_DPT = g_args.num_of_DPT; num_of_DPT = g_args.num_of_DPT;
...@@ -7410,7 +7863,7 @@ static void *readMetric(void *sarg) { ...@@ -7410,7 +7863,7 @@ static void *readMetric(void *sarg) {
return NULL; return NULL;
} }
int64_t num_of_DPT = pThreadInfo->superTblInfo->insertRows; int64_t num_of_DPT = pThreadInfo->stbInfo->insertRows;
int64_t num_of_tables = pThreadInfo->ntables; // rinfo->end_table_to - rinfo->start_table_from + 1; int64_t num_of_tables = pThreadInfo->ntables; // rinfo->end_table_to - rinfo->start_table_from + 1;
int64_t totalData = num_of_DPT * num_of_tables; int64_t totalData = num_of_DPT * num_of_tables;
bool do_aggreFunc = g_Dbs.do_aggreFunc; bool do_aggreFunc = g_Dbs.do_aggreFunc;
...@@ -7549,14 +8002,14 @@ static int insertTestProcess() { ...@@ -7549,14 +8002,14 @@ static int insertTestProcess() {
if (g_Dbs.db[i].superTblCount > 0) { if (g_Dbs.db[i].superTblCount > 0) {
for (uint64_t j = 0; j < g_Dbs.db[i].superTblCount; j++) { for (uint64_t j = 0; j < g_Dbs.db[i].superTblCount; j++) {
SSuperTable* superTblInfo = &g_Dbs.db[i].superTbls[j]; SSuperTable* stbInfo = &g_Dbs.db[i].superTbls[j];
if (superTblInfo && (superTblInfo->insertRows > 0)) { if (stbInfo && (stbInfo->insertRows > 0)) {
startMultiThreadInsertData( startMultiThreadInsertData(
g_Dbs.threadCount, g_Dbs.threadCount,
g_Dbs.db[i].dbName, g_Dbs.db[i].dbName,
g_Dbs.db[i].dbCfg.precision, g_Dbs.db[i].dbCfg.precision,
superTblInfo); stbInfo);
} }
} }
} }
...@@ -7776,7 +8229,7 @@ static int queryTestProcess() { ...@@ -7776,7 +8229,7 @@ static int queryTestProcess() {
if (taos == NULL) { if (taos == NULL) {
errorPrint( "Failed to connect to TDengine, reason:%s\n", errorPrint( "Failed to connect to TDengine, reason:%s\n",
taos_errstr(NULL)); taos_errstr(NULL));
exit(-1); exit(EXIT_FAILURE);
} }
if (0 != g_queryInfo.superQueryInfo.sqlCount) { if (0 != g_queryInfo.superQueryInfo.sqlCount) {
...@@ -7796,7 +8249,7 @@ static int queryTestProcess() { ...@@ -7796,7 +8249,7 @@ static int queryTestProcess() {
if (0 == strncasecmp(g_queryInfo.queryMode, "rest", strlen("rest"))) { if (0 == strncasecmp(g_queryInfo.queryMode, "rest", strlen("rest"))) {
if (convertHostToServAddr( if (convertHostToServAddr(
g_queryInfo.host, g_queryInfo.port, &g_queryInfo.serv_addr) != 0) g_queryInfo.host, g_queryInfo.port, &g_queryInfo.serv_addr) != 0)
exit(-1); ERROR_EXIT("convert host to server address");
} }
pthread_t *pids = NULL; pthread_t *pids = NULL;
...@@ -8000,10 +8453,10 @@ static void *superSubscribe(void *sarg) { ...@@ -8000,10 +8453,10 @@ static void *superSubscribe(void *sarg) {
setThreadName("superSub"); setThreadName("superSub");
if (pThreadInfo->ntables > MAX_QUERY_SQL_COUNT) { if (pThreadInfo->ntables > MAX_QUERY_SQL_COUNT) {
free(subSqlStr);
errorPrint("The table number(%"PRId64") of the thread is more than max query sql count: %d\n", errorPrint("The table number(%"PRId64") of the thread is more than max query sql count: %d\n",
pThreadInfo->ntables, MAX_QUERY_SQL_COUNT); pThreadInfo->ntables, MAX_QUERY_SQL_COUNT);
free(subSqlStr); exit(EXIT_FAILURE);
exit(-1);
} }
if (pThreadInfo->taos == NULL) { if (pThreadInfo->taos == NULL) {
...@@ -8269,7 +8722,7 @@ static int subscribeTestProcess() { ...@@ -8269,7 +8722,7 @@ static int subscribeTestProcess() {
if (taos == NULL) { if (taos == NULL) {
errorPrint( "Failed to connect to TDengine, reason:%s\n", errorPrint( "Failed to connect to TDengine, reason:%s\n",
taos_errstr(NULL)); taos_errstr(NULL));
exit(-1); exit(EXIT_FAILURE);
} }
if (0 != g_queryInfo.superQueryInfo.sqlCount) { if (0 != g_queryInfo.superQueryInfo.sqlCount) {
...@@ -8298,7 +8751,7 @@ static int subscribeTestProcess() { ...@@ -8298,7 +8751,7 @@ static int subscribeTestProcess() {
errorPrint("%s() LN%d, sepcified query sqlCount %d.\n", errorPrint("%s() LN%d, sepcified query sqlCount %d.\n",
__func__, __LINE__, __func__, __LINE__,
g_queryInfo.specifiedQueryInfo.sqlCount); g_queryInfo.specifiedQueryInfo.sqlCount);
exit(-1); exit(EXIT_FAILURE);
} }
pids = calloc( pids = calloc(
...@@ -8313,7 +8766,7 @@ static int subscribeTestProcess() { ...@@ -8313,7 +8766,7 @@ static int subscribeTestProcess() {
sizeof(threadInfo)); sizeof(threadInfo));
if ((NULL == pids) || (NULL == infos)) { if ((NULL == pids) || (NULL == infos)) {
errorPrint("%s() LN%d, malloc failed for create threads\n", __func__, __LINE__); errorPrint("%s() LN%d, malloc failed for create threads\n", __func__, __LINE__);
exit(-1); exit(EXIT_FAILURE);
} }
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) { for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) {
...@@ -8350,7 +8803,7 @@ static int subscribeTestProcess() { ...@@ -8350,7 +8803,7 @@ static int subscribeTestProcess() {
errorPrint("%s() LN%d, malloc failed for create threads\n", errorPrint("%s() LN%d, malloc failed for create threads\n",
__func__, __LINE__); __func__, __LINE__);
// taos_close(taos); // taos_close(taos);
exit(-1); exit(EXIT_FAILURE);
} }
int64_t ntables = g_queryInfo.superQueryInfo.childTblCount; int64_t ntables = g_queryInfo.superQueryInfo.childTblCount;
...@@ -8553,8 +9006,7 @@ static int regexMatch(const char *s, const char *reg, int cflags) { ...@@ -8553,8 +9006,7 @@ static int regexMatch(const char *s, const char *reg, int cflags) {
/* Compile regular expression */ /* Compile regular expression */
if (regcomp(&regex, reg, cflags) != 0) { if (regcomp(&regex, reg, cflags) != 0) {
printf("Fail to compile regex\n"); ERROR_EXIT("Fail to compile regex\n");
exit(-1);
} }
/* Execute regular expression */ /* Execute regular expression */
...@@ -8567,9 +9019,9 @@ static int regexMatch(const char *s, const char *reg, int cflags) { ...@@ -8567,9 +9019,9 @@ static int regexMatch(const char *s, const char *reg, int cflags) {
return 0; return 0;
} else { } else {
regerror(reti, &regex, msgbuf, sizeof(msgbuf)); regerror(reti, &regex, msgbuf, sizeof(msgbuf));
printf("Regex match failed: %s\n", msgbuf);
regfree(&regex); regfree(&regex);
exit(-1); printf("Regex match failed: %s\n", msgbuf);
exit(EXIT_FAILURE);
} }
return 0; return 0;
...@@ -8671,7 +9123,7 @@ static void queryResult() { ...@@ -8671,7 +9123,7 @@ static void queryResult() {
if (g_args.use_metric) { if (g_args.use_metric) {
pThreadInfo->ntables = g_Dbs.db[0].superTbls[0].childTblCount; pThreadInfo->ntables = g_Dbs.db[0].superTbls[0].childTblCount;
pThreadInfo->end_table_to = g_Dbs.db[0].superTbls[0].childTblCount - 1; pThreadInfo->end_table_to = g_Dbs.db[0].superTbls[0].childTblCount - 1;
pThreadInfo->superTblInfo = &g_Dbs.db[0].superTbls[0]; pThreadInfo->stbInfo = &g_Dbs.db[0].superTbls[0];
tstrncpy(pThreadInfo->tb_prefix, tstrncpy(pThreadInfo->tb_prefix,
g_Dbs.db[0].superTbls[0].childTblPrefix, TBNAME_PREFIX_LEN); g_Dbs.db[0].superTbls[0].childTblPrefix, TBNAME_PREFIX_LEN);
} else { } else {
...@@ -8687,10 +9139,10 @@ static void queryResult() { ...@@ -8687,10 +9139,10 @@ static void queryResult() {
g_Dbs.db[0].dbName, g_Dbs.db[0].dbName,
g_Dbs.port); g_Dbs.port);
if (pThreadInfo->taos == NULL) { if (pThreadInfo->taos == NULL) {
free(pThreadInfo);
errorPrint( "Failed to connect to TDengine, reason:%s\n", errorPrint( "Failed to connect to TDengine, reason:%s\n",
taos_errstr(NULL)); taos_errstr(NULL));
free(pThreadInfo); exit(EXIT_FAILURE);
exit(-1);
} }
tstrncpy(pThreadInfo->filePath, g_Dbs.resultFile, MAX_FILE_NAME_LEN); tstrncpy(pThreadInfo->filePath, g_Dbs.resultFile, MAX_FILE_NAME_LEN);
......
因为 它太大了无法显示 source diff 。你可以改为 查看blob
{
"filetype": "insert",
"cfgdir": "/etc/taos",
"host": "192.168.1.103",
"port": 6030,
"user": "root",
"password": "taosdata",
"thread_count": 10,
"thread_count_create_tbl": 1,
"result_file": "1174.out",
"confirm_parameter_prompt": "no",
"num_of_records_per_req": 51,
"databases": [
{
"dbinfo": {
"name": "gdse",
"drop": "yes"
},
"super_tables": [{
"name": "model_1174",
"child_table_exists":"no",
"childtable_count": 1,
"childtable_prefix": "model_1174_",
"auto_create_table": "no",
"batch_create_tbl_num": 0,
"data_source": "sample",
"insert_mode": "stmt",
"insert_rate": 0,
"insert_rows": 2592000,
"interlace_rows": 1,
"multi_thread_write_one_tbl": "no",
"number_of_tbl_in_one_sql": 0,
"max_sql_len": 1048576,
"disorder_ratio": 0,
"disorder_range": 1000,
"timestamp_step": 1000,
"start_timestamp": "2021-05-01 00:00:00.000",
"sample_format": "csv",
"sample_file": "tools/taosdemoAllTest/1174.csv",
"tags_file": "tools/taosdemoAllTest/1174-tag.csv",
"columns": [{"type": "FLOAT", "count": 109}, {"type": "INT", "count": 4}, {"type": "FLOAT", "count": 8}, {"type": "INT", "count": 1}, {"type": "FLOAT", "count": 5}, {"type": "INT", "count": 47}, {"type": "BOOL", "count": 103}, {"type": "INT", "count": 2}, {"type": "TIMESTAMP", "count": 3}, {"type": "BOOL", "count": 28}, {"type": "INT", "count": 3}, {"type": "FLOAT", "count": 6}, {"type": "INT", "count": 1}, {"type": "FLOAT", "count": 7}, {"type": "BOOL", "count": 7}, {"type": "FLOAT", "count": 2}, {"type": "INT", "count": 3}, {"type": "FLOAT", "count": 3}, {"type": "INT", "count": 3}, {"type": "BOOL", "count": 1}],
"tags": [{"type": "INT", "count": 1}]
}]
}]
}
{
"filetype": "insert",
"cfgdir": "/etc/taos",
"host": "192.168.1.103",
"port": 6030,
"user": "root",
"password": "taosdata",
"thread_count": 10,
"thread_count_create_tbl": 1,
"result_file": "1174.out",
"confirm_parameter_prompt": "no",
"num_of_records_per_req": 51,
"databases": [
{
"dbinfo": {
"name": "gdse",
"drop": "yes"
},
"super_tables": [{
"name": "model_1174",
"child_table_exists":"no",
"childtable_count": 1,
"childtable_prefix": "model_1174_",
"auto_create_table": "no",
"batch_create_tbl_num": 0,
"data_source": "sample",
"insert_mode": "taosc",
"insert_rate": 0,
"insert_rows": 2592000,
"interlace_rows": 1,
"multi_thread_write_one_tbl": "no",
"number_of_tbl_in_one_sql": 0,
"max_sql_len": 1048576,
"disorder_ratio": 0,
"disorder_range": 1000,
"timestamp_step": 1000,
"start_timestamp": "2021-05-01 00:00:00.000",
"sample_format": "csv",
"sample_file": "tools/taosdemoAllTest/1174.csv",
"tags_file": "tools/taosdemoAllTest/1174-tag.csv",
"columns": [{"type": "FLOAT", "count": 109}, {"type": "INT", "count": 4}, {"type": "FLOAT", "count": 8}, {"type": "INT", "count": 1}, {"type": "FLOAT", "count": 5}, {"type": "INT", "count": 47}, {"type": "BOOL", "count": 103}, {"type": "INT", "count": 2}, {"type": "TIMESTAMP", "count": 3}, {"type": "BOOL", "count": 28}, {"type": "INT", "count": 3}, {"type": "FLOAT", "count": 6}, {"type": "INT", "count": 1}, {"type": "FLOAT", "count": 7}, {"type": "BOOL", "count": 7}, {"type": "FLOAT", "count": 2}, {"type": "INT", "count": 3}, {"type": "FLOAT", "count": 3}, {"type": "INT", "count": 3}, {"type": "BOOL", "count": 1}],
"tags": [{"type": "INT", "count": 1}]
}]
}]
}
{
"filetype": "insert",
"cfgdir": "/etc/taos",
"host": "192.168.1.103",
"port": 6030,
"user": "root",
"password": "taosdata",
"thread_count": 10,
"thread_count_create_tbl": 1,
"result_file": "1174.out",
"confirm_parameter_prompt": "no",
"num_of_records_per_req": 51,
"databases": [
{
"dbinfo": {
"name": "gdse",
"drop": "yes"
},
"super_tables": [{
"name": "model_1174",
"child_table_exists":"no",
"childtable_count": 1,
"childtable_prefix": "model_1174_",
"auto_create_table": "no",
"batch_create_tbl_num": 0,
"data_source": "rand",
"insert_mode": "stmt",
"insert_rate": 0,
"insert_rows": 259200,
"interlace_rows": 1,
"multi_thread_write_one_tbl": "no",
"number_of_tbl_in_one_sql": 0,
"max_sql_len": 1048576,
"disorder_ratio": 0,
"disorder_range": 1000,
"timestamp_step": 1000,
"start_timestamp": "2021-05-01 00:00:00.000",
"sample_format": "csv",
"sample_file": "tools/taosdemoAllTest/1174.csv",
"tags_file": "tools/taosdemoAllTest/1174-tag.csv",
"columns": [{"type": "FLOAT", "count": 109}, {"type": "INT", "count": 4}, {"type": "FLOAT", "count": 8}, {"type": "INT", "count": 1}, {"type": "FLOAT", "count": 5}, {"type": "INT", "count": 47}, {"type": "BOOL", "count": 103}, {"type": "INT", "count": 2}, {"type": "TIMESTAMP", "count": 3}, {"type": "BOOL", "count": 28}, {"type": "INT", "count": 3}, {"type": "FLOAT", "count": 6}, {"type": "INT", "count": 1}, {"type": "FLOAT", "count": 7}, {"type": "BOOL", "count": 7}, {"type": "FLOAT", "count": 2}, {"type": "INT", "count": 3}, {"type": "FLOAT", "count": 3}, {"type": "INT", "count": 3}, {"type": "BOOL", "count": 1}],
"tags": [{"type": "INT", "count": 1}]
}]
}]
}
{
"filetype": "insert",
"cfgdir": "/etc/taos",
"host": "192.168.1.103",
"port": 6030,
"user": "root",
"password": "taosdata",
"thread_count": 10,
"thread_count_create_tbl": 1,
"result_file": "1174.out",
"confirm_parameter_prompt": "no",
"num_of_records_per_req": 51,
"databases": [
{
"dbinfo": {
"name": "gdse",
"drop": "yes"
},
"super_tables": [{
"name": "model_1174",
"child_table_exists":"no",
"childtable_count": 1,
"childtable_prefix": "model_1174_",
"auto_create_table": "no",
"batch_create_tbl_num": 0,
"data_source": "sample",
"insert_mode": "stmt",
"insert_rate": 0,
"insert_rows": 259200,
"interlace_rows": 1,
"multi_thread_write_one_tbl": "no",
"number_of_tbl_in_one_sql": 0,
"max_sql_len": 1048576,
"disorder_ratio": 0,
"disorder_range": 1000,
"timestamp_step": 1000,
"start_timestamp": "2021-05-01 00:00:00.000",
"sample_format": "csv",
"sample_file": "tools/taosdemoAllTest/1174.csv",
"tags_file": "tools/taosdemoAllTest/1174-tag.csv",
"columns": [{"type": "FLOAT", "count": 109}, {"type": "INT", "count": 4}, {"type": "FLOAT", "count": 8}, {"type": "INT", "count": 1}, {"type": "FLOAT", "count": 5}, {"type": "INT", "count": 47}, {"type": "BOOL", "count": 103}, {"type": "INT", "count": 2}, {"type": "TIMESTAMP", "count": 3}, {"type": "BOOL", "count": 28}, {"type": "INT", "count": 3}, {"type": "FLOAT", "count": 6}, {"type": "INT", "count": 1}, {"type": "FLOAT", "count": 7}, {"type": "BOOL", "count": 7}, {"type": "FLOAT", "count": 2}, {"type": "INT", "count": 3}, {"type": "FLOAT", "count": 3}, {"type": "INT", "count": 3}, {"type": "BOOL", "count": 1}],
"tags": [{"type": "INT", "count": 1}]
}]
}]
}
{
"filetype": "insert",
"cfgdir": "/etc/taos",
"host": "192.168.1.103",
"port": 6030,
"user": "root",
"password": "taosdata",
"thread_count": 10,
"thread_count_create_tbl": 1,
"result_file": "1174.out",
"confirm_parameter_prompt": "no",
"num_of_records_per_req": 51,
"databases": [
{
"dbinfo": {
"name": "gdse",
"drop": "yes"
},
"super_tables": [{
"name": "model_1174",
"child_table_exists":"no",
"childtable_count": 1,
"childtable_prefix": "model_1174_",
"auto_create_table": "no",
"batch_create_tbl_num": 0,
"data_source": "sample",
"insert_mode": "taosc",
"insert_rate": 0,
"insert_rows": 259200,
"interlace_rows": 1,
"multi_thread_write_one_tbl": "no",
"number_of_tbl_in_one_sql": 0,
"max_sql_len": 1048576,
"disorder_ratio": 0,
"disorder_range": 1000,
"timestamp_step": 1000,
"start_timestamp": "2021-05-01 00:00:00.000",
"sample_format": "csv",
"sample_file": "tools/taosdemoAllTest/1174.csv",
"tags_file": "tools/taosdemoAllTest/1174-tag.csv",
"columns": [{"type": "FLOAT", "count": 109}, {"type": "INT", "count": 4}, {"type": "FLOAT", "count": 8}, {"type": "INT", "count": 1}, {"type": "FLOAT", "count": 5}, {"type": "INT", "count": 47}, {"type": "BOOL", "count": 103}, {"type": "INT", "count": 2}, {"type": "TIMESTAMP", "count": 3}, {"type": "BOOL", "count": 28}, {"type": "INT", "count": 3}, {"type": "FLOAT", "count": 6}, {"type": "INT", "count": 1}, {"type": "FLOAT", "count": 7}, {"type": "BOOL", "count": 7}, {"type": "FLOAT", "count": 2}, {"type": "INT", "count": 3}, {"type": "FLOAT", "count": 3}, {"type": "INT", "count": 3}, {"type": "BOOL", "count": 1}],
"tags": [{"type": "INT", "count": 1}]
}]
}]
}
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import os
from util.log import *
from util.cases import *
from util.sql import *
from util.dnodes import *
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
# tdSql.init(conn.cursor(), logSql)
def getBuildPath(self):
selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath):
projPath = selfPath[:selfPath.find("community")]
else:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root)-len("/build/bin")]
break
return buildPath
def run(self):
buildPath = self.getBuildPath()
if (buildPath == ""):
tdLog.exit("taosd not found!")
else:
tdLog.info("taosd found in %s" % buildPath)
binPath = buildPath+ "/build/bin/"
# insert: create one or mutiple tables per sql and insert multiple rows per sql
os.system("%staosdemo -f tools/taosdemoAllTest/stmt/1174-small-stmt-random.json -y " % binPath)
# sleep(60)
# os.system("%staosdemo -f tools/taosdemoAllTest/stmt/1174-small-taosc.json -y " % binPath)
# sleep(60)
# os.system("%staosdemo -f tools/taosdemoAllTest/stmt/1174-small-stmt.json -y " % binPath)
# sleep(60)
# os.system("%staosdemo -f tools/taosdemoAllTest/stmt/1174-large-taosc.json -y " % binPath)
# sleep(60)
# os.system("%staosdemo -f tools/taosdemoAllTest/stmt/1174-large-stmt.json -y " % binPath)
# tdSql.execute("use db")
# tdSql.query("select count (tbname) from stb0")
# tdSql.checkData(0, 0, 1000)
# tdSql.query("select count (tbname) from stb1")
# tdSql.checkData(0, 0, 1000)
# tdSql.query("select count(*) from stb00_0")
# tdSql.checkData(0, 0, 100)
# tdSql.query("select count(*) from stb0")
# tdSql.checkData(0, 0, 100000)
# tdSql.query("select count(*) from stb01_1")
# tdSql.checkData(0, 0, 200)
# tdSql.query("select count(*) from stb1")
# tdSql.checkData(0, 0, 200000)
testcaseFilename = os.path.split(__file__)[-1]
os.system("rm -rf ./insert_res.txt")
os.system("rm -rf tools/taosdemoAllTest/%s.sql" % testcaseFilename )
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册