未验证 提交 8698a2ff 编写于 作者: Y Yang Zhao 提交者: GitHub

[TD-11379]<fix>taosdemo lead potential taosc coredump (#8811)

* [TD-11379]<fix>potential taosc coredump taosdemo

* modify test case
上级 9cda2b25
...@@ -585,6 +585,7 @@ extern int64_t g_totalChildTables; ...@@ -585,6 +585,7 @@ extern int64_t g_totalChildTables;
extern int64_t g_actualChildTables; extern int64_t g_actualChildTables;
extern SQueryMetaInfo g_queryInfo; extern SQueryMetaInfo g_queryInfo;
extern FILE * g_fpOfInsertResult; extern FILE * g_fpOfInsertResult;
extern bool g_fail;
#define min(a, b) (((a) < (b)) ? (a) : (b)) #define min(a, b) (((a) < (b)) ? (a) : (b))
......
...@@ -808,7 +808,8 @@ int createDatabasesAndStables(char *command) { ...@@ -808,7 +808,8 @@ int createDatabasesAndStables(char *command) {
static void *createTable(void *sarg) { static void *createTable(void *sarg) {
threadInfo * pThreadInfo = (threadInfo *)sarg; threadInfo * pThreadInfo = (threadInfo *)sarg;
SSuperTable *stbInfo = pThreadInfo->stbInfo; SSuperTable *stbInfo = pThreadInfo->stbInfo;
int32_t* code = calloc(1, sizeof(int32_t));
*code = -1;
setThreadName("createTable"); setThreadName("createTable");
uint64_t lastPrintTime = taosGetTimestampMs(); uint64_t lastPrintTime = taosGetTimestampMs();
...@@ -818,7 +819,7 @@ static void *createTable(void *sarg) { ...@@ -818,7 +819,7 @@ static void *createTable(void *sarg) {
pThreadInfo->buffer = calloc(1, buff_len); pThreadInfo->buffer = calloc(1, buff_len);
if (NULL == pThreadInfo->buffer) { if (NULL == pThreadInfo->buffer) {
errorPrint("%s", "failed to allocate memory\n"); errorPrint("%s", "failed to allocate memory\n");
return NULL; goto create_table_end;
} }
int len = 0; int len = 0;
...@@ -840,11 +841,10 @@ static void *createTable(void *sarg) { ...@@ -840,11 +841,10 @@ static void *createTable(void *sarg) {
batchNum++; batchNum++;
} else { } else {
if (stbInfo == NULL) { if (stbInfo == NULL) {
free(pThreadInfo->buffer);
errorPrint( errorPrint(
"%s() LN%d, use metric, but super table info is NULL\n", "%s() LN%d, use metric, but super table info is NULL\n",
__func__, __LINE__); __func__, __LINE__);
exit(EXIT_FAILURE); goto create_table_end;
} else { } else {
if (0 == len) { if (0 == len) {
batchNum = 0; batchNum = 0;
...@@ -856,14 +856,13 @@ static void *createTable(void *sarg) { ...@@ -856,14 +856,13 @@ static void *createTable(void *sarg) {
char *tagsValBuf = (char *)calloc(TSDB_MAX_SQL_LEN + 1, 1); char *tagsValBuf = (char *)calloc(TSDB_MAX_SQL_LEN + 1, 1);
if (NULL == tagsValBuf) { if (NULL == tagsValBuf) {
errorPrint("%s", "failed to allocate memory\n"); errorPrint("%s", "failed to allocate memory\n");
return NULL; goto create_table_end;
} }
if (0 == stbInfo->tagSource) { if (0 == stbInfo->tagSource) {
if (generateTagValuesForStb(stbInfo, i, tagsValBuf)) { if (generateTagValuesForStb(stbInfo, i, tagsValBuf)) {
tmfree(tagsValBuf); tmfree(tagsValBuf);
tmfree(pThreadInfo->buffer); goto create_table_end;
exit(EXIT_FAILURE);
} }
} else { } else {
snprintf(tagsValBuf, TSDB_MAX_SQL_LEN, "(%s)", snprintf(tagsValBuf, TSDB_MAX_SQL_LEN, "(%s)",
...@@ -895,7 +894,7 @@ static void *createTable(void *sarg) { ...@@ -895,7 +894,7 @@ static void *createTable(void *sarg) {
NO_INSERT_TYPE, false)) { NO_INSERT_TYPE, false)) {
errorPrint("queryDbExec() failed. buffer:\n%s\n", errorPrint("queryDbExec() failed. buffer:\n%s\n",
pThreadInfo->buffer); pThreadInfo->buffer);
free(pThreadInfo->buffer); goto create_table_end;
return NULL; return NULL;
} }
pThreadInfo->tables_created += batchNum; pThreadInfo->tables_created += batchNum;
...@@ -913,11 +912,14 @@ static void *createTable(void *sarg) { ...@@ -913,11 +912,14 @@ static void *createTable(void *sarg) {
NO_INSERT_TYPE, false)) { NO_INSERT_TYPE, false)) {
errorPrint("queryDbExec() failed. buffer:\n%s\n", errorPrint("queryDbExec() failed. buffer:\n%s\n",
pThreadInfo->buffer); pThreadInfo->buffer);
goto create_table_end;
} }
pThreadInfo->tables_created += batchNum; pThreadInfo->tables_created += batchNum;
} }
free(pThreadInfo->buffer); *code = 0;
return NULL; create_table_end:
tmfree(pThreadInfo->buffer);
return code;
} }
int startMultiThreadCreateChildTable(char *cols, int threads, int startMultiThreadCreateChildTable(char *cols, int threads,
...@@ -976,7 +978,12 @@ int startMultiThreadCreateChildTable(char *cols, int threads, ...@@ -976,7 +978,12 @@ int startMultiThreadCreateChildTable(char *cols, int threads,
} }
for (int i = 0; i < threads; i++) { for (int i = 0; i < threads; i++) {
pthread_join(pids[i], NULL); void* result;
pthread_join(pids[i], &result);
if (*(int32_t*)result) {
g_fail = true;
}
tmfree(result);
} }
for (int i = 0; i < threads; i++) { for (int i = 0; i < threads; i++) {
...@@ -988,6 +995,9 @@ int startMultiThreadCreateChildTable(char *cols, int threads, ...@@ -988,6 +995,9 @@ int startMultiThreadCreateChildTable(char *cols, int threads,
free(pids); free(pids);
free(infos); free(infos);
if (g_fail) {
return -1;
}
return 0; return 0;
} }
...@@ -1579,7 +1589,8 @@ static void *syncWriteInterlaceStmtBatch(threadInfo *pThreadInfo, ...@@ -1579,7 +1589,8 @@ static void *syncWriteInterlaceStmtBatch(threadInfo *pThreadInfo,
uint32_t interlaceRows) { uint32_t interlaceRows) {
debugPrint("[%d] %s() LN%d: ### stmt interlace write\n", debugPrint("[%d] %s() LN%d: ### stmt interlace write\n",
pThreadInfo->threadID, __func__, __LINE__); pThreadInfo->threadID, __func__, __LINE__);
int32_t* code = calloc(1, sizeof (int32_t));
*code = -1;
int64_t insertRows; int64_t insertRows;
int64_t timeStampStep; int64_t timeStampStep;
uint64_t insert_interval; uint64_t insert_interval;
...@@ -1644,7 +1655,7 @@ static void *syncWriteInterlaceStmtBatch(threadInfo *pThreadInfo, ...@@ -1644,7 +1655,7 @@ static void *syncWriteInterlaceStmtBatch(threadInfo *pThreadInfo,
if (0 == strlen(tableName)) { if (0 == strlen(tableName)) {
errorPrint("[%d] %s() LN%d, getTableName return null\n", errorPrint("[%d] %s() LN%d, getTableName return null\n",
pThreadInfo->threadID, __func__, __LINE__); pThreadInfo->threadID, __func__, __LINE__);
return NULL; goto free_of_interlace_stmt;
} }
samplePos = pThreadInfo->samplePos; samplePos = pThreadInfo->samplePos;
...@@ -1777,16 +1788,17 @@ static void *syncWriteInterlaceStmtBatch(threadInfo *pThreadInfo, ...@@ -1777,16 +1788,17 @@ static void *syncWriteInterlaceStmtBatch(threadInfo *pThreadInfo,
} }
if (percentComplete < 100) if (percentComplete < 100)
printf("[%d]:%d%%\n", pThreadInfo->threadID, percentComplete); printf("[%d]:%d%%\n", pThreadInfo->threadID, percentComplete);
*code = 0;
free_of_interlace_stmt:
printStatPerThread(pThreadInfo); printStatPerThread(pThreadInfo);
return NULL; free_of_interlace_stmt:
return code;
} }
void *syncWriteInterlace(threadInfo *pThreadInfo, uint32_t interlaceRows) { void *syncWriteInterlace(threadInfo *pThreadInfo, uint32_t interlaceRows) {
debugPrint("[%d] %s() LN%d: ### interlace write\n", pThreadInfo->threadID, debugPrint("[%d] %s() LN%d: ### interlace write\n", pThreadInfo->threadID,
__func__, __LINE__); __func__, __LINE__);
int32_t* code = calloc(1, sizeof (int32_t));
*code = -1;
int64_t insertRows; int64_t insertRows;
uint64_t maxSqlLen; uint64_t maxSqlLen;
int64_t timeStampStep; int64_t timeStampStep;
...@@ -1824,7 +1836,7 @@ void *syncWriteInterlace(threadInfo *pThreadInfo, uint32_t interlaceRows) { ...@@ -1824,7 +1836,7 @@ void *syncWriteInterlace(threadInfo *pThreadInfo, uint32_t interlaceRows) {
pThreadInfo->buffer = calloc(maxSqlLen, 1); pThreadInfo->buffer = calloc(maxSqlLen, 1);
if (NULL == pThreadInfo->buffer) { if (NULL == pThreadInfo->buffer) {
errorPrint("%s", "failed to allocate memory\n"); errorPrint("%s", "failed to allocate memory\n");
return NULL; goto free_of_interlace;
} }
pThreadInfo->totalInsertRows = 0; pThreadInfo->totalInsertRows = 0;
...@@ -1874,8 +1886,7 @@ void *syncWriteInterlace(threadInfo *pThreadInfo, uint32_t interlaceRows) { ...@@ -1874,8 +1886,7 @@ void *syncWriteInterlace(threadInfo *pThreadInfo, uint32_t interlaceRows) {
if (0 == strlen(tableName)) { if (0 == strlen(tableName)) {
errorPrint("[%d] %s() LN%d, getTableName return null\n", errorPrint("[%d] %s() LN%d, getTableName return null\n",
pThreadInfo->threadID, __func__, __LINE__); pThreadInfo->threadID, __func__, __LINE__);
free(pThreadInfo->buffer); goto free_of_interlace;
return NULL;
} }
uint64_t oldRemainderLen = remainderBufLen; uint64_t oldRemainderLen = remainderBufLen;
...@@ -2017,22 +2028,23 @@ void *syncWriteInterlace(threadInfo *pThreadInfo, uint32_t interlaceRows) { ...@@ -2017,22 +2028,23 @@ void *syncWriteInterlace(threadInfo *pThreadInfo, uint32_t interlaceRows) {
} }
if (percentComplete < 100) if (percentComplete < 100)
printf("[%d]:%d%%\n", pThreadInfo->threadID, percentComplete); printf("[%d]:%d%%\n", pThreadInfo->threadID, percentComplete);
*code = 0;
printStatPerThread(pThreadInfo);
free_of_interlace: free_of_interlace:
tmfree(pThreadInfo->buffer); tmfree(pThreadInfo->buffer);
printStatPerThread(pThreadInfo); return code;
return NULL;
} }
static void *syncWriteInterlaceSml(threadInfo *pThreadInfo, static void *syncWriteInterlaceSml(threadInfo *pThreadInfo,
uint32_t interlaceRows) { uint32_t interlaceRows) {
int32_t* code = calloc(1, sizeof (int32_t));
*code = -1;
debugPrint("[%d] %s() LN%d: ### interlace schemaless write\n", debugPrint("[%d] %s() LN%d: ### interlace schemaless write\n",
pThreadInfo->threadID, __func__, __LINE__); pThreadInfo->threadID, __func__, __LINE__);
int64_t insertRows; int64_t insertRows;
uint64_t maxSqlLen; uint64_t maxSqlLen;
int64_t timeStampStep; int64_t timeStampStep;
uint64_t insert_interval; uint64_t insert_interval;
int32_t code = 0;
SSuperTable *stbInfo = pThreadInfo->stbInfo; SSuperTable *stbInfo = pThreadInfo->stbInfo;
...@@ -2072,7 +2084,7 @@ static void *syncWriteInterlaceSml(threadInfo *pThreadInfo, ...@@ -2072,7 +2084,7 @@ static void *syncWriteInterlaceSml(threadInfo *pThreadInfo,
smlList = (char **)calloc(pThreadInfo->ntables, sizeof(char *)); smlList = (char **)calloc(pThreadInfo->ntables, sizeof(char *));
if (NULL == smlList) { if (NULL == smlList) {
errorPrint("%s", "failed to allocate memory\n"); errorPrint("%s", "failed to allocate memory\n");
return NULL; goto free_of_interlace_sml;
} }
for (int t = 0; t < pThreadInfo->ntables; t++) { for (int t = 0; t < pThreadInfo->ntables; t++) {
...@@ -2081,8 +2093,7 @@ static void *syncWriteInterlaceSml(threadInfo *pThreadInfo, ...@@ -2081,8 +2093,7 @@ static void *syncWriteInterlaceSml(threadInfo *pThreadInfo,
errorPrint("%s", "failed to allocate memory\n"); errorPrint("%s", "failed to allocate memory\n");
goto free_smlheadlist_interlace_sml; goto free_smlheadlist_interlace_sml;
} }
code = generateSmlConstPart(sml, stbInfo, pThreadInfo, t); if (generateSmlConstPart(sml, stbInfo, pThreadInfo, t)) {
if (code) {
goto free_smlheadlist_interlace_sml; goto free_smlheadlist_interlace_sml;
} }
smlList[t] = sml; smlList[t] = sml;
...@@ -2105,8 +2116,7 @@ static void *syncWriteInterlaceSml(threadInfo *pThreadInfo, ...@@ -2105,8 +2116,7 @@ static void *syncWriteInterlaceSml(threadInfo *pThreadInfo,
jsonArray = cJSON_CreateArray(); jsonArray = cJSON_CreateArray();
tagsList = cJSON_CreateArray(); tagsList = cJSON_CreateArray();
for (int t = 0; t < pThreadInfo->ntables; t++) { for (int t = 0; t < pThreadInfo->ntables; t++) {
code = generateSmlJsonTags(tagsList, stbInfo, pThreadInfo, t); if (generateSmlJsonTags(tagsList, stbInfo, pThreadInfo, t)) {
if (code) {
goto free_json_interlace_sml; goto free_json_interlace_sml;
} }
} }
...@@ -2156,17 +2166,15 @@ static void *syncWriteInterlaceSml(threadInfo *pThreadInfo, ...@@ -2156,17 +2166,15 @@ static void *syncWriteInterlaceSml(threadInfo *pThreadInfo,
tagsList, tagsList,
(int)(tableSeq - pThreadInfo->start_table_from)), (int)(tableSeq - pThreadInfo->start_table_from)),
true); true);
code = generateSmlJsonCols(jsonArray, tag, stbInfo, if (generateSmlJsonCols(jsonArray, tag, stbInfo,
pThreadInfo, timestamp); pThreadInfo, timestamp)) {
if (code) {
goto free_json_interlace_sml; goto free_json_interlace_sml;
} }
} else { } else {
code = generateSmlMutablePart( if (generateSmlMutablePart(
pThreadInfo->lines[j], pThreadInfo->lines[j],
smlList[tableSeq - pThreadInfo->start_table_from], smlList[tableSeq - pThreadInfo->start_table_from],
stbInfo, pThreadInfo, timestamp); stbInfo, pThreadInfo, timestamp)) {
if (code) {
goto free_lines_interlace_sml; goto free_lines_interlace_sml;
} }
} }
...@@ -2302,7 +2310,9 @@ static void *syncWriteInterlaceSml(threadInfo *pThreadInfo, ...@@ -2302,7 +2310,9 @@ static void *syncWriteInterlaceSml(threadInfo *pThreadInfo,
if (percentComplete < 100) if (percentComplete < 100)
printf("[%d]:%d%%\n", pThreadInfo->threadID, percentComplete); printf("[%d]:%d%%\n", pThreadInfo->threadID, percentComplete);
*code = 0;
printStatPerThread(pThreadInfo); printStatPerThread(pThreadInfo);
free_of_interlace_sml:
if (stbInfo->lineProtocol == TSDB_SML_JSON_PROTOCOL) { if (stbInfo->lineProtocol == TSDB_SML_JSON_PROTOCOL) {
tmfree(pThreadInfo->lines); tmfree(pThreadInfo->lines);
free_json_interlace_sml: free_json_interlace_sml:
...@@ -2324,12 +2334,13 @@ static void *syncWriteInterlaceSml(threadInfo *pThreadInfo, ...@@ -2324,12 +2334,13 @@ static void *syncWriteInterlaceSml(threadInfo *pThreadInfo,
} }
tmfree(smlList); tmfree(smlList);
} }
return NULL; return code;
} }
void *syncWriteProgressiveStmt(threadInfo *pThreadInfo) { void *syncWriteProgressiveStmt(threadInfo *pThreadInfo) {
debugPrint("%s() LN%d: ### stmt progressive write\n", __func__, __LINE__); debugPrint("%s() LN%d: ### stmt progressive write\n", __func__, __LINE__);
int32_t* code = calloc(1, sizeof (int32_t));
*code = -1;
SSuperTable *stbInfo = pThreadInfo->stbInfo; SSuperTable *stbInfo = pThreadInfo->stbInfo;
int64_t timeStampStep = int64_t timeStampStep =
stbInfo ? stbInfo->timeStampStep : g_args.timestamp_step; stbInfo ? stbInfo->timeStampStep : g_args.timestamp_step;
...@@ -2362,7 +2373,7 @@ void *syncWriteProgressiveStmt(threadInfo *pThreadInfo) { ...@@ -2362,7 +2373,7 @@ void *syncWriteProgressiveStmt(threadInfo *pThreadInfo) {
if (0 == strlen(tableName)) { if (0 == strlen(tableName)) {
errorPrint("[%d] %s() LN%d, getTableName return null\n", errorPrint("[%d] %s() LN%d, getTableName return null\n",
pThreadInfo->threadID, __func__, __LINE__); pThreadInfo->threadID, __func__, __LINE__);
return NULL; goto free_of_stmt_progressive;
} }
// measure prepare + insert // measure prepare + insert
...@@ -2448,16 +2459,17 @@ void *syncWriteProgressiveStmt(threadInfo *pThreadInfo) { ...@@ -2448,16 +2459,17 @@ void *syncWriteProgressiveStmt(threadInfo *pThreadInfo) {
if (percentComplete < 100) { if (percentComplete < 100) {
printf("[%d]:%d%%\n", pThreadInfo->threadID, percentComplete); printf("[%d]:%d%%\n", pThreadInfo->threadID, percentComplete);
} }
*code = 0;
printStatPerThread(pThreadInfo);
free_of_stmt_progressive: free_of_stmt_progressive:
tmfree(pThreadInfo->buffer); tmfree(pThreadInfo->buffer);
printStatPerThread(pThreadInfo); return code;
return NULL;
} }
void *syncWriteProgressive(threadInfo *pThreadInfo) { void *syncWriteProgressive(threadInfo *pThreadInfo) {
debugPrint("%s() LN%d: ### progressive write\n", __func__, __LINE__); debugPrint("%s() LN%d: ### progressive write\n", __func__, __LINE__);
int32_t* code = calloc(1, sizeof (int32_t));
*code = -1;
SSuperTable *stbInfo = pThreadInfo->stbInfo; SSuperTable *stbInfo = pThreadInfo->stbInfo;
uint64_t maxSqlLen = stbInfo ? stbInfo->maxSqlLen : g_args.max_sql_len; uint64_t maxSqlLen = stbInfo ? stbInfo->maxSqlLen : g_args.max_sql_len;
int64_t timeStampStep = int64_t timeStampStep =
...@@ -2469,7 +2481,7 @@ void *syncWriteProgressive(threadInfo *pThreadInfo) { ...@@ -2469,7 +2481,7 @@ void *syncWriteProgressive(threadInfo *pThreadInfo) {
pThreadInfo->buffer = calloc(maxSqlLen, 1); pThreadInfo->buffer = calloc(maxSqlLen, 1);
if (NULL == pThreadInfo->buffer) { if (NULL == pThreadInfo->buffer) {
errorPrint("%s", "failed to allocate memory\n"); errorPrint("%s", "failed to allocate memory\n");
return NULL; goto free_of_progressive;
} }
uint64_t lastPrintTime = taosGetTimestampMs(); uint64_t lastPrintTime = taosGetTimestampMs();
...@@ -2497,8 +2509,7 @@ void *syncWriteProgressive(threadInfo *pThreadInfo) { ...@@ -2497,8 +2509,7 @@ void *syncWriteProgressive(threadInfo *pThreadInfo) {
if (0 == strlen(tableName)) { if (0 == strlen(tableName)) {
errorPrint("[%d] %s() LN%d, getTableName return null\n", errorPrint("[%d] %s() LN%d, getTableName return null\n",
pThreadInfo->threadID, __func__, __LINE__); pThreadInfo->threadID, __func__, __LINE__);
free(pThreadInfo->buffer); goto free_of_progressive;
return NULL;
} }
int64_t remainderBufLen = maxSqlLen - 2000; int64_t remainderBufLen = maxSqlLen - 2000;
...@@ -2609,16 +2620,17 @@ void *syncWriteProgressive(threadInfo *pThreadInfo) { ...@@ -2609,16 +2620,17 @@ void *syncWriteProgressive(threadInfo *pThreadInfo) {
if (percentComplete < 100) { if (percentComplete < 100) {
printf("[%d]:%d%%\n", pThreadInfo->threadID, percentComplete); printf("[%d]:%d%%\n", pThreadInfo->threadID, percentComplete);
} }
*code = 0;
printStatPerThread(pThreadInfo);
free_of_progressive: free_of_progressive:
tmfree(pThreadInfo->buffer); tmfree(pThreadInfo->buffer);
printStatPerThread(pThreadInfo); return code;
return NULL;
} }
void *syncWriteProgressiveSml(threadInfo *pThreadInfo) { void *syncWriteProgressiveSml(threadInfo *pThreadInfo) {
debugPrint("%s() LN%d: ### sml progressive write\n", __func__, __LINE__); debugPrint("%s() LN%d: ### sml progressive write\n", __func__, __LINE__);
int32_t code = 0; int32_t * code = calloc(1, sizeof (int32_t));
*code = -1;
SSuperTable *stbInfo = pThreadInfo->stbInfo; SSuperTable *stbInfo = pThreadInfo->stbInfo;
int64_t timeStampStep = stbInfo->timeStampStep; int64_t timeStampStep = stbInfo->timeStampStep;
int64_t insertRows = stbInfo->insertRows; int64_t insertRows = stbInfo->insertRows;
...@@ -2645,7 +2657,7 @@ void *syncWriteProgressiveSml(threadInfo *pThreadInfo) { ...@@ -2645,7 +2657,7 @@ void *syncWriteProgressiveSml(threadInfo *pThreadInfo) {
smlList = (char **)calloc(pThreadInfo->ntables, sizeof(char *)); smlList = (char **)calloc(pThreadInfo->ntables, sizeof(char *));
if (NULL == smlList) { if (NULL == smlList) {
errorPrint("%s", "failed to allocate memory\n"); errorPrint("%s", "failed to allocate memory\n");
return NULL; goto free_of_progressive_sml;
} }
for (int t = 0; t < pThreadInfo->ntables; t++) { for (int t = 0; t < pThreadInfo->ntables; t++) {
char *sml = (char *)calloc(1, stbInfo->lenOfOneRow); char *sml = (char *)calloc(1, stbInfo->lenOfOneRow);
...@@ -2653,8 +2665,7 @@ void *syncWriteProgressiveSml(threadInfo *pThreadInfo) { ...@@ -2653,8 +2665,7 @@ void *syncWriteProgressiveSml(threadInfo *pThreadInfo) {
errorPrint("%s", "failed to allocate memory\n"); errorPrint("%s", "failed to allocate memory\n");
goto free_smlheadlist_progressive_sml; goto free_smlheadlist_progressive_sml;
} }
code = generateSmlConstPart(sml, stbInfo, pThreadInfo, t); if (generateSmlConstPart(sml, stbInfo, pThreadInfo, t)) {
if (code) {
goto free_smlheadlist_progressive_sml; goto free_smlheadlist_progressive_sml;
} }
smlList[t] = sml; smlList[t] = sml;
...@@ -2677,8 +2688,7 @@ void *syncWriteProgressiveSml(threadInfo *pThreadInfo) { ...@@ -2677,8 +2688,7 @@ void *syncWriteProgressiveSml(threadInfo *pThreadInfo) {
jsonArray = cJSON_CreateArray(); jsonArray = cJSON_CreateArray();
tagsList = cJSON_CreateArray(); tagsList = cJSON_CreateArray();
for (int t = 0; t < pThreadInfo->ntables; t++) { for (int t = 0; t < pThreadInfo->ntables; t++) {
code = generateSmlJsonTags(tagsList, stbInfo, pThreadInfo, t); if (generateSmlJsonTags(tagsList, stbInfo, pThreadInfo, t)) {
if (code) {
goto free_json_progressive_sml; goto free_json_progressive_sml;
} }
} }
...@@ -2699,16 +2709,14 @@ void *syncWriteProgressiveSml(threadInfo *pThreadInfo) { ...@@ -2699,16 +2709,14 @@ void *syncWriteProgressiveSml(threadInfo *pThreadInfo) {
if (stbInfo->lineProtocol == TSDB_SML_JSON_PROTOCOL) { if (stbInfo->lineProtocol == TSDB_SML_JSON_PROTOCOL) {
cJSON *tag = cJSON_Duplicate( cJSON *tag = cJSON_Duplicate(
cJSON_GetArrayItem(tagsList, (int)i), true); cJSON_GetArrayItem(tagsList, (int)i), true);
code = generateSmlJsonCols(jsonArray, tag, stbInfo, if (generateSmlJsonCols(jsonArray, tag, stbInfo,
pThreadInfo, timestamp); pThreadInfo, timestamp)) {
if (code) {
goto free_json_progressive_sml; goto free_json_progressive_sml;
} }
} else { } else {
code = generateSmlMutablePart(pThreadInfo->lines[k], if (generateSmlMutablePart(pThreadInfo->lines[k],
smlList[i], stbInfo, smlList[i], stbInfo,
pThreadInfo, timestamp); pThreadInfo, timestamp)) {
if (code) {
goto free_lines_progressive_sml; goto free_lines_progressive_sml;
} }
} }
...@@ -2770,6 +2778,8 @@ void *syncWriteProgressiveSml(threadInfo *pThreadInfo) { ...@@ -2770,6 +2778,8 @@ void *syncWriteProgressiveSml(threadInfo *pThreadInfo) {
} }
} }
*code = 0;
free_of_progressive_sml:
if (stbInfo->lineProtocol == TSDB_SML_JSON_PROTOCOL) { if (stbInfo->lineProtocol == TSDB_SML_JSON_PROTOCOL) {
tmfree(pThreadInfo->lines); tmfree(pThreadInfo->lines);
free_json_progressive_sml: free_json_progressive_sml:
...@@ -2791,7 +2801,7 @@ void *syncWriteProgressiveSml(threadInfo *pThreadInfo) { ...@@ -2791,7 +2801,7 @@ void *syncWriteProgressiveSml(threadInfo *pThreadInfo) {
} }
tmfree(smlList); tmfree(smlList);
} }
return NULL; return code;
} }
void *syncWrite(void *sarg) { void *syncWrite(void *sarg) {
...@@ -3290,7 +3300,12 @@ int startMultiThreadInsertData(int threads, char *db_name, char *precision, ...@@ -3290,7 +3300,12 @@ int startMultiThreadInsertData(int threads, char *db_name, char *precision,
int64_t start = taosGetTimestampUs(); int64_t start = taosGetTimestampUs();
for (int i = 0; i < threads; i++) { for (int i = 0; i < threads; i++) {
pthread_join(pids[i], NULL); void* result;
pthread_join(pids[i], &result);
if (*(int32_t*)result){
g_fail = true;
}
tmfree(result);
} }
uint64_t totalDelay = 0; uint64_t totalDelay = 0;
...@@ -3343,6 +3358,13 @@ int startMultiThreadInsertData(int threads, char *db_name, char *precision, ...@@ -3343,6 +3358,13 @@ int startMultiThreadInsertData(int threads, char *db_name, char *precision,
if (pThreadInfo->minDelay < minDelay) minDelay = pThreadInfo->minDelay; if (pThreadInfo->minDelay < minDelay) minDelay = pThreadInfo->minDelay;
} }
free(pids);
free(infos);
if (g_fail){
return -1;
}
if (cntDelay == 0) cntDelay = 1; if (cntDelay == 0) cntDelay = 1;
avgDelay = (double)totalDelay / cntDelay; avgDelay = (double)totalDelay / cntDelay;
...@@ -3404,8 +3426,6 @@ int startMultiThreadInsertData(int threads, char *db_name, char *precision, ...@@ -3404,8 +3426,6 @@ int startMultiThreadInsertData(int threads, char *db_name, char *precision,
// taos_close(taos); // taos_close(taos);
free(pids);
free(infos);
return 0; return 0;
} }
......
...@@ -20,6 +20,7 @@ FILE * g_fpOfInsertResult = NULL; ...@@ -20,6 +20,7 @@ FILE * g_fpOfInsertResult = NULL;
char * g_dupstr = NULL; char * g_dupstr = NULL;
SDbs g_Dbs; SDbs g_Dbs;
SQueryMetaInfo g_queryInfo; SQueryMetaInfo g_queryInfo;
bool g_fail = false;
SArguments g_args = { SArguments g_args = {
DEFAULT_METAFILE, // metaFile DEFAULT_METAFILE, // metaFile
......
...@@ -44,7 +44,8 @@ void selectAndGetResult(threadInfo *pThreadInfo, char *command) { ...@@ -44,7 +44,8 @@ void selectAndGetResult(threadInfo *pThreadInfo, char *command) {
void *specifiedTableQuery(void *sarg) { void *specifiedTableQuery(void *sarg) {
threadInfo *pThreadInfo = (threadInfo *)sarg; threadInfo *pThreadInfo = (threadInfo *)sarg;
int32_t *code = calloc(1, sizeof (int32_t));
*code = -1;
setThreadName("specTableQuery"); setThreadName("specTableQuery");
if (pThreadInfo->taos == NULL) { if (pThreadInfo->taos == NULL) {
...@@ -54,7 +55,7 @@ void *specifiedTableQuery(void *sarg) { ...@@ -54,7 +55,7 @@ void *specifiedTableQuery(void *sarg) {
if (taos == NULL) { if (taos == NULL) {
errorPrint("[%d] Failed to connect to TDengine, reason:%s\n", errorPrint("[%d] Failed to connect to TDengine, reason:%s\n",
pThreadInfo->threadID, taos_errstr(NULL)); pThreadInfo->threadID, taos_errstr(NULL));
return NULL; goto end_of_specified_query;
} else { } else {
pThreadInfo->taos = taos; pThreadInfo->taos = taos;
} }
...@@ -65,7 +66,7 @@ void *specifiedTableQuery(void *sarg) { ...@@ -65,7 +66,7 @@ void *specifiedTableQuery(void *sarg) {
if (0 != queryDbExec(pThreadInfo->taos, sqlStr, NO_INSERT_TYPE, false)) { if (0 != queryDbExec(pThreadInfo->taos, sqlStr, NO_INSERT_TYPE, false)) {
taos_close(pThreadInfo->taos); taos_close(pThreadInfo->taos);
errorPrint("use database %s failed!\n\n", g_queryInfo.dbName); errorPrint("use database %s failed!\n\n", g_queryInfo.dbName);
return NULL; goto end_of_specified_query;
} }
uint64_t st = 0; uint64_t st = 0;
...@@ -118,14 +119,18 @@ void *specifiedTableQuery(void *sarg) { ...@@ -118,14 +119,18 @@ void *specifiedTableQuery(void *sarg) {
lastPrintTime = currentPrintTime; lastPrintTime = currentPrintTime;
} }
} }
return NULL; *code = 0;
end_of_specified_query:
return code;
} }
void *superTableQuery(void *sarg) { void *superTableQuery(void *sarg) {
int32_t * code = calloc(1, sizeof (int32_t));
*code = -1;
char *sqlstr = calloc(1, BUFFER_SIZE); char *sqlstr = calloc(1, BUFFER_SIZE);
if (NULL == sqlstr) { if (NULL == sqlstr) {
errorPrint("%s", "failed to allocate memory\n"); errorPrint("%s", "failed to allocate memory\n");
return NULL; goto free_of_super_query;
} }
threadInfo *pThreadInfo = (threadInfo *)sarg; threadInfo *pThreadInfo = (threadInfo *)sarg;
...@@ -139,8 +144,7 @@ void *superTableQuery(void *sarg) { ...@@ -139,8 +144,7 @@ void *superTableQuery(void *sarg) {
if (taos == NULL) { if (taos == NULL) {
errorPrint("[%d] Failed to connect to TDengine, reason:%s\n", errorPrint("[%d] Failed to connect to TDengine, reason:%s\n",
pThreadInfo->threadID, taos_errstr(NULL)); pThreadInfo->threadID, taos_errstr(NULL));
free(sqlstr); goto free_of_super_query;
return NULL;
} else { } else {
pThreadInfo->taos = taos; pThreadInfo->taos = taos;
} }
...@@ -200,9 +204,10 @@ void *superTableQuery(void *sarg) { ...@@ -200,9 +204,10 @@ void *superTableQuery(void *sarg) {
taosGetSelfPthreadId(), pThreadInfo->start_table_from, taosGetSelfPthreadId(), pThreadInfo->start_table_from,
pThreadInfo->end_table_to, (double)(et - st) / 1000.0); pThreadInfo->end_table_to, (double)(et - st) / 1000.0);
} }
*code = 0;
free(sqlstr); free_of_super_query:
return NULL; tmfree(sqlstr);
return code;
} }
int queryTestProcess() { int queryTestProcess() {
...@@ -398,7 +403,12 @@ int queryTestProcess() { ...@@ -398,7 +403,12 @@ int queryTestProcess() {
if ((nSqlCount > 0) && (nConcurrent > 0)) { if ((nSqlCount > 0) && (nConcurrent > 0)) {
for (int i = 0; i < nConcurrent; i++) { for (int i = 0; i < nConcurrent; i++) {
for (int j = 0; j < nSqlCount; j++) { for (int j = 0; j < nSqlCount; j++) {
pthread_join(pids[i * nSqlCount + j], NULL); void* result;
pthread_join(pids[i * nSqlCount + j], &result);
if (*(int32_t*)result) {
g_fail = true;
}
tmfree(result);
if (0 == strncasecmp(g_queryInfo.queryMode, "rest", 4)) { if (0 == strncasecmp(g_queryInfo.queryMode, "rest", 4)) {
threadInfo *pThreadInfo = infos + i * nSqlCount + j; threadInfo *pThreadInfo = infos + i * nSqlCount + j;
#ifdef WINDOWS #ifdef WINDOWS
...@@ -416,7 +426,12 @@ int queryTestProcess() { ...@@ -416,7 +426,12 @@ int queryTestProcess() {
tmfree((char *)infos); tmfree((char *)infos);
for (int i = 0; i < g_queryInfo.superQueryInfo.threadCnt; i++) { for (int i = 0; i < g_queryInfo.superQueryInfo.threadCnt; i++) {
pthread_join(pidsOfSub[i], NULL); void* result;
pthread_join(pidsOfSub[i], &result);
if (*(int32_t*)result) {
g_fail = true;
}
tmfree(result);
if (0 == strncasecmp(g_queryInfo.queryMode, "rest", 4)) { if (0 == strncasecmp(g_queryInfo.queryMode, "rest", 4)) {
threadInfo *pThreadInfo = infosOfSub + i; threadInfo *pThreadInfo = infosOfSub + i;
#ifdef WINDOWS #ifdef WINDOWS
...@@ -431,6 +446,10 @@ int queryTestProcess() { ...@@ -431,6 +446,10 @@ int queryTestProcess() {
tmfree((char *)pidsOfSub); tmfree((char *)pidsOfSub);
tmfree((char *)infosOfSub); tmfree((char *)infosOfSub);
if (g_fail) {
return -1;
}
// taos_close(taos);// workaround to use separate taos connection; // taos_close(taos);// workaround to use separate taos connection;
uint64_t endTs = taosGetTimestampMs(); uint64_t endTs = taosGetTimestampMs();
......
...@@ -71,6 +71,8 @@ TAOS_SUB *subscribeImpl(QUERY_CLASS class, threadInfo *pThreadInfo, char *sql, ...@@ -71,6 +71,8 @@ TAOS_SUB *subscribeImpl(QUERY_CLASS class, threadInfo *pThreadInfo, char *sql,
} }
void *specifiedSubscribe(void *sarg) { void *specifiedSubscribe(void *sarg) {
int32_t * code = calloc(1, sizeof (int32_t));
*code = -1;
threadInfo *pThreadInfo = (threadInfo *)sarg; threadInfo *pThreadInfo = (threadInfo *)sarg;
// TAOS_SUB* tsub = NULL; // TAOS_SUB* tsub = NULL;
...@@ -83,15 +85,14 @@ void *specifiedSubscribe(void *sarg) { ...@@ -83,15 +85,14 @@ void *specifiedSubscribe(void *sarg) {
if (pThreadInfo->taos == NULL) { if (pThreadInfo->taos == NULL) {
errorPrint("[%d] Failed to connect to TDengine, reason:%s\n", errorPrint("[%d] Failed to connect to TDengine, reason:%s\n",
pThreadInfo->threadID, taos_errstr(NULL)); pThreadInfo->threadID, taos_errstr(NULL));
return NULL; goto free_of_specified_subscribe;
} }
} }
char sqlStr[TSDB_DB_NAME_LEN + 5]; char sqlStr[TSDB_DB_NAME_LEN + 5];
sprintf(sqlStr, "USE %s", g_queryInfo.dbName); sprintf(sqlStr, "USE %s", g_queryInfo.dbName);
if (0 != queryDbExec(pThreadInfo->taos, sqlStr, NO_INSERT_TYPE, false)) { if (0 != queryDbExec(pThreadInfo->taos, sqlStr, NO_INSERT_TYPE, false)) {
taos_close(pThreadInfo->taos); goto free_of_specified_subscribe;
return NULL;
} }
sprintf(g_queryInfo.specifiedQueryInfo.topic[pThreadInfo->threadID], sprintf(g_queryInfo.specifiedQueryInfo.topic[pThreadInfo->threadID],
...@@ -110,8 +111,7 @@ void *specifiedSubscribe(void *sarg) { ...@@ -110,8 +111,7 @@ void *specifiedSubscribe(void *sarg) {
g_queryInfo.specifiedQueryInfo.subscribeRestart, g_queryInfo.specifiedQueryInfo.subscribeRestart,
g_queryInfo.specifiedQueryInfo.subscribeInterval); g_queryInfo.specifiedQueryInfo.subscribeInterval);
if (NULL == g_queryInfo.specifiedQueryInfo.tsub[pThreadInfo->threadID]) { if (NULL == g_queryInfo.specifiedQueryInfo.tsub[pThreadInfo->threadID]) {
taos_close(pThreadInfo->taos); goto free_of_specified_subscribe;
return NULL;
} }
// start loop to consume result // start loop to consume result
...@@ -171,36 +171,37 @@ void *specifiedSubscribe(void *sarg) { ...@@ -171,36 +171,37 @@ void *specifiedSubscribe(void *sarg) {
g_queryInfo.specifiedQueryInfo.subscribeInterval); g_queryInfo.specifiedQueryInfo.subscribeInterval);
if (NULL == g_queryInfo.specifiedQueryInfo if (NULL == g_queryInfo.specifiedQueryInfo
.tsub[pThreadInfo->threadID]) { .tsub[pThreadInfo->threadID]) {
taos_close(pThreadInfo->taos); goto free_of_specified_subscribe;
return NULL;
} }
} }
} }
} }
*code = 0;
taos_free_result(g_queryInfo.specifiedQueryInfo.res[pThreadInfo->threadID]); taos_free_result(g_queryInfo.specifiedQueryInfo.res[pThreadInfo->threadID]);
free_of_specified_subscribe:
taos_close(pThreadInfo->taos); taos_close(pThreadInfo->taos);
return code;
return NULL;
} }
static void *superSubscribe(void *sarg) { static void *superSubscribe(void *sarg) {
int32_t * code = calloc(1, sizeof (int32_t));
*code = -1;
threadInfo *pThreadInfo = (threadInfo *)sarg; threadInfo *pThreadInfo = (threadInfo *)sarg;
TAOS_SUB *tsub[MAX_QUERY_SQL_COUNT] = {0};
uint64_t tsubSeq;
char * subSqlStr = calloc(1, BUFFER_SIZE); char * subSqlStr = calloc(1, BUFFER_SIZE);
if (NULL == subSqlStr) { if (NULL == subSqlStr) {
errorPrint("%s", "failed to allocate memory\n"); errorPrint("%s", "failed to allocate memory\n");
goto free_of_super_subscribe;
} }
TAOS_SUB *tsub[MAX_QUERY_SQL_COUNT] = {0};
uint64_t tsubSeq;
setThreadName("superSub"); setThreadName("superSub");
if (pThreadInfo->ntables > MAX_QUERY_SQL_COUNT) { if (pThreadInfo->ntables > MAX_QUERY_SQL_COUNT) {
free(subSqlStr);
errorPrint("The table number(%" PRId64 errorPrint("The table number(%" PRId64
") of the thread is more than max query sql count: %d\n", ") of the thread is more than max query sql count: %d\n",
pThreadInfo->ntables, MAX_QUERY_SQL_COUNT); pThreadInfo->ntables, MAX_QUERY_SQL_COUNT);
exit(EXIT_FAILURE); goto free_of_super_subscribe;
} }
if (pThreadInfo->taos == NULL) { if (pThreadInfo->taos == NULL) {
...@@ -210,18 +211,15 @@ static void *superSubscribe(void *sarg) { ...@@ -210,18 +211,15 @@ static void *superSubscribe(void *sarg) {
if (pThreadInfo->taos == NULL) { if (pThreadInfo->taos == NULL) {
errorPrint("[%d] Failed to connect to TDengine, reason:%s\n", errorPrint("[%d] Failed to connect to TDengine, reason:%s\n",
pThreadInfo->threadID, taos_errstr(NULL)); pThreadInfo->threadID, taos_errstr(NULL));
free(subSqlStr); goto free_of_super_subscribe;
return NULL;
} }
} }
char sqlStr[TSDB_DB_NAME_LEN + 5]; char sqlStr[TSDB_DB_NAME_LEN + 5];
sprintf(sqlStr, "USE %s", g_queryInfo.dbName); sprintf(sqlStr, "USE %s", g_queryInfo.dbName);
if (0 != queryDbExec(pThreadInfo->taos, sqlStr, NO_INSERT_TYPE, false)) { if (0 != queryDbExec(pThreadInfo->taos, sqlStr, NO_INSERT_TYPE, false)) {
taos_close(pThreadInfo->taos);
errorPrint("use database %s failed!\n\n", g_queryInfo.dbName); errorPrint("use database %s failed!\n\n", g_queryInfo.dbName);
free(subSqlStr); goto free_of_super_subscribe;
return NULL;
} }
char topic[32] = {0}; char topic[32] = {0};
...@@ -252,9 +250,7 @@ static void *superSubscribe(void *sarg) { ...@@ -252,9 +250,7 @@ static void *superSubscribe(void *sarg) {
g_queryInfo.superQueryInfo.subscribeRestart, g_queryInfo.superQueryInfo.subscribeRestart,
g_queryInfo.superQueryInfo.subscribeInterval); g_queryInfo.superQueryInfo.subscribeInterval);
if (NULL == tsub[tsubSeq]) { if (NULL == tsub[tsubSeq]) {
taos_close(pThreadInfo->taos); goto free_of_super_subscribe;
free(subSqlStr);
return NULL;
} }
} }
...@@ -321,9 +317,7 @@ static void *superSubscribe(void *sarg) { ...@@ -321,9 +317,7 @@ static void *superSubscribe(void *sarg) {
g_queryInfo.superQueryInfo.subscribeRestart, g_queryInfo.superQueryInfo.subscribeRestart,
g_queryInfo.superQueryInfo.subscribeInterval); g_queryInfo.superQueryInfo.subscribeInterval);
if (NULL == tsub[tsubSeq]) { if (NULL == tsub[tsubSeq]) {
taos_close(pThreadInfo->taos); goto free_of_super_subscribe;
free(subSqlStr);
return NULL;
} }
} }
} }
...@@ -340,10 +334,11 @@ static void *superSubscribe(void *sarg) { ...@@ -340,10 +334,11 @@ static void *superSubscribe(void *sarg) {
tsubSeq = i - pThreadInfo->start_table_from; tsubSeq = i - pThreadInfo->start_table_from;
taos_unsubscribe(tsub[tsubSeq], 0); taos_unsubscribe(tsub[tsubSeq], 0);
} }
*code = 0;
free_of_super_subscribe:
taos_close(pThreadInfo->taos); taos_close(pThreadInfo->taos);
free(subSqlStr); tmfree(subSqlStr);
return NULL; return code;
} }
int subscribeTestProcess() { int subscribeTestProcess() {
...@@ -482,7 +477,12 @@ int subscribeTestProcess() { ...@@ -482,7 +477,12 @@ int subscribeTestProcess() {
for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) { for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) {
for (int j = 0; j < threads; j++) { for (int j = 0; j < threads; j++) {
uint64_t seq = i * threads + j; uint64_t seq = i * threads + j;
pthread_join(pidsOfStable[seq], NULL); void* result;
pthread_join(pidsOfStable[seq], &result);
if (*(int32_t*)result) {
g_fail = true;
}
tmfree(result);
} }
} }
} }
...@@ -491,7 +491,12 @@ int subscribeTestProcess() { ...@@ -491,7 +491,12 @@ int subscribeTestProcess() {
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) { for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) {
for (int j = 0; j < g_queryInfo.specifiedQueryInfo.concurrent; j++) { for (int j = 0; j < g_queryInfo.specifiedQueryInfo.concurrent; j++) {
uint64_t seq = i * g_queryInfo.specifiedQueryInfo.concurrent + j; uint64_t seq = i * g_queryInfo.specifiedQueryInfo.concurrent + j;
pthread_join(pids[seq], NULL); void* result;
pthread_join(pids[seq], &result);
if (*(int32_t*)result) {
g_fail = true;
}
tmfree(result);
} }
} }
...@@ -501,5 +506,8 @@ int subscribeTestProcess() { ...@@ -501,5 +506,8 @@ int subscribeTestProcess() {
tmfree((char *)pidsOfStable); tmfree((char *)pidsOfStable);
tmfree((char *)infosOfStable); tmfree((char *)infosOfStable);
// taos_close(taos); // taos_close(taos);
if (g_fail) {
return -1;
}
return 0; return 0;
} }
\ No newline at end of file
...@@ -171,8 +171,8 @@ class TDTestCase: ...@@ -171,8 +171,8 @@ class TDTestCase:
#print("==============taosdemo——json_no,#create stable,table; insert table; show table; select table; drop table") #print("==============taosdemo——json_no,#create stable,table; insert table; show table; select table; drop table")
assert os.system("%staosdemo -f tools/taosdemoAllTest/TD-10539/create_taosdemo_no.json -y " % binPath) == 0 os.system("%staosdemo -f tools/taosdemoAllTest/TD-10539/create_taosdemo_no.json -y " % binPath)
tdSql.query("show dbno.tables ") tdSql.query("show dbno.tables;")
tdSql.checkRows(0) tdSql.checkRows(0)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册