diff --git a/src/kit/taosdemo/inc/demo.h b/src/kit/taosdemo/inc/demo.h index 37dd01449da7e67e9165e0b22d7160d508595e52..a9b6b83580215f3a9e49c3fac98d0cd62e7ca1e6 100644 --- a/src/kit/taosdemo/inc/demo.h +++ b/src/kit/taosdemo/inc/demo.h @@ -585,6 +585,7 @@ extern int64_t g_totalChildTables; extern int64_t g_actualChildTables; extern SQueryMetaInfo g_queryInfo; extern FILE * g_fpOfInsertResult; +extern bool g_fail; #define min(a, b) (((a) < (b)) ? (a) : (b)) diff --git a/src/kit/taosdemo/src/demoInsert.c b/src/kit/taosdemo/src/demoInsert.c index fe36aff5644c452ccb4216b75c6d7057ee6cee76..8209524dc4c0dd30982ab062531fd7ff55403338 100644 --- a/src/kit/taosdemo/src/demoInsert.c +++ b/src/kit/taosdemo/src/demoInsert.c @@ -808,7 +808,8 @@ int createDatabasesAndStables(char *command) { static void *createTable(void *sarg) { threadInfo * pThreadInfo = (threadInfo *)sarg; SSuperTable *stbInfo = pThreadInfo->stbInfo; - + int32_t* code = calloc(1, sizeof(int32_t)); + *code = -1; setThreadName("createTable"); uint64_t lastPrintTime = taosGetTimestampMs(); @@ -818,7 +819,7 @@ static void *createTable(void *sarg) { pThreadInfo->buffer = calloc(1, buff_len); if (NULL == pThreadInfo->buffer) { errorPrint("%s", "failed to allocate memory\n"); - return NULL; + goto create_table_end; } int len = 0; @@ -840,11 +841,10 @@ static void *createTable(void *sarg) { batchNum++; } else { if (stbInfo == NULL) { - free(pThreadInfo->buffer); errorPrint( "%s() LN%d, use metric, but super table info is NULL\n", __func__, __LINE__); - exit(EXIT_FAILURE); + goto create_table_end; } else { if (0 == len) { batchNum = 0; @@ -856,14 +856,13 @@ static void *createTable(void *sarg) { char *tagsValBuf = (char *)calloc(TSDB_MAX_SQL_LEN + 1, 1); if (NULL == tagsValBuf) { errorPrint("%s", "failed to allocate memory\n"); - return NULL; + goto create_table_end; } if (0 == stbInfo->tagSource) { if (generateTagValuesForStb(stbInfo, i, tagsValBuf)) { tmfree(tagsValBuf); - tmfree(pThreadInfo->buffer); - exit(EXIT_FAILURE); + goto create_table_end; } } else { snprintf(tagsValBuf, TSDB_MAX_SQL_LEN, "(%s)", @@ -895,7 +894,7 @@ static void *createTable(void *sarg) { NO_INSERT_TYPE, false)) { errorPrint("queryDbExec() failed. buffer:\n%s\n", pThreadInfo->buffer); - free(pThreadInfo->buffer); + goto create_table_end; return NULL; } pThreadInfo->tables_created += batchNum; @@ -913,11 +912,14 @@ static void *createTable(void *sarg) { NO_INSERT_TYPE, false)) { errorPrint("queryDbExec() failed. buffer:\n%s\n", pThreadInfo->buffer); + goto create_table_end; } pThreadInfo->tables_created += batchNum; } - free(pThreadInfo->buffer); - return NULL; + *code = 0; + create_table_end: + tmfree(pThreadInfo->buffer); + return code; } int startMultiThreadCreateChildTable(char *cols, int threads, @@ -976,7 +978,12 @@ int startMultiThreadCreateChildTable(char *cols, int threads, } for (int i = 0; i < threads; i++) { - pthread_join(pids[i], NULL); + void* result; + pthread_join(pids[i], &result); + if (*(int32_t*)result) { + g_fail = true; + } + tmfree(result); } for (int i = 0; i < threads; i++) { @@ -988,6 +995,9 @@ int startMultiThreadCreateChildTable(char *cols, int threads, free(pids); free(infos); + if (g_fail) { + return -1; + } return 0; } @@ -1579,7 +1589,8 @@ static void *syncWriteInterlaceStmtBatch(threadInfo *pThreadInfo, uint32_t interlaceRows) { debugPrint("[%d] %s() LN%d: ### stmt interlace write\n", pThreadInfo->threadID, __func__, __LINE__); - + int32_t* code = calloc(1, sizeof (int32_t)); + *code = -1; int64_t insertRows; int64_t timeStampStep; uint64_t insert_interval; @@ -1644,7 +1655,7 @@ static void *syncWriteInterlaceStmtBatch(threadInfo *pThreadInfo, if (0 == strlen(tableName)) { errorPrint("[%d] %s() LN%d, getTableName return null\n", pThreadInfo->threadID, __func__, __LINE__); - return NULL; + goto free_of_interlace_stmt; } samplePos = pThreadInfo->samplePos; @@ -1777,16 +1788,17 @@ static void *syncWriteInterlaceStmtBatch(threadInfo *pThreadInfo, } if (percentComplete < 100) printf("[%d]:%d%%\n", pThreadInfo->threadID, percentComplete); - -free_of_interlace_stmt: + *code = 0; printStatPerThread(pThreadInfo); - return NULL; +free_of_interlace_stmt: + return code; } void *syncWriteInterlace(threadInfo *pThreadInfo, uint32_t interlaceRows) { debugPrint("[%d] %s() LN%d: ### interlace write\n", pThreadInfo->threadID, __func__, __LINE__); - + int32_t* code = calloc(1, sizeof (int32_t)); + *code = -1; int64_t insertRows; uint64_t maxSqlLen; int64_t timeStampStep; @@ -1824,7 +1836,7 @@ void *syncWriteInterlace(threadInfo *pThreadInfo, uint32_t interlaceRows) { pThreadInfo->buffer = calloc(maxSqlLen, 1); if (NULL == pThreadInfo->buffer) { errorPrint("%s", "failed to allocate memory\n"); - return NULL; + goto free_of_interlace; } pThreadInfo->totalInsertRows = 0; @@ -1874,8 +1886,7 @@ void *syncWriteInterlace(threadInfo *pThreadInfo, uint32_t interlaceRows) { if (0 == strlen(tableName)) { errorPrint("[%d] %s() LN%d, getTableName return null\n", pThreadInfo->threadID, __func__, __LINE__); - free(pThreadInfo->buffer); - return NULL; + goto free_of_interlace; } uint64_t oldRemainderLen = remainderBufLen; @@ -2017,22 +2028,23 @@ void *syncWriteInterlace(threadInfo *pThreadInfo, uint32_t interlaceRows) { } if (percentComplete < 100) printf("[%d]:%d%%\n", pThreadInfo->threadID, percentComplete); - + *code = 0; + printStatPerThread(pThreadInfo); free_of_interlace: tmfree(pThreadInfo->buffer); - printStatPerThread(pThreadInfo); - return NULL; + return code; } static void *syncWriteInterlaceSml(threadInfo *pThreadInfo, uint32_t interlaceRows) { + int32_t* code = calloc(1, sizeof (int32_t)); + *code = -1; debugPrint("[%d] %s() LN%d: ### interlace schemaless write\n", pThreadInfo->threadID, __func__, __LINE__); int64_t insertRows; uint64_t maxSqlLen; int64_t timeStampStep; uint64_t insert_interval; - int32_t code = 0; SSuperTable *stbInfo = pThreadInfo->stbInfo; @@ -2072,7 +2084,7 @@ static void *syncWriteInterlaceSml(threadInfo *pThreadInfo, smlList = (char **)calloc(pThreadInfo->ntables, sizeof(char *)); if (NULL == smlList) { errorPrint("%s", "failed to allocate memory\n"); - return NULL; + goto free_of_interlace_sml; } for (int t = 0; t < pThreadInfo->ntables; t++) { @@ -2081,8 +2093,7 @@ static void *syncWriteInterlaceSml(threadInfo *pThreadInfo, errorPrint("%s", "failed to allocate memory\n"); goto free_smlheadlist_interlace_sml; } - code = generateSmlConstPart(sml, stbInfo, pThreadInfo, t); - if (code) { + if (generateSmlConstPart(sml, stbInfo, pThreadInfo, t)) { goto free_smlheadlist_interlace_sml; } smlList[t] = sml; @@ -2105,8 +2116,7 @@ static void *syncWriteInterlaceSml(threadInfo *pThreadInfo, jsonArray = cJSON_CreateArray(); tagsList = cJSON_CreateArray(); for (int t = 0; t < pThreadInfo->ntables; t++) { - code = generateSmlJsonTags(tagsList, stbInfo, pThreadInfo, t); - if (code) { + if (generateSmlJsonTags(tagsList, stbInfo, pThreadInfo, t)) { goto free_json_interlace_sml; } } @@ -2156,17 +2166,15 @@ static void *syncWriteInterlaceSml(threadInfo *pThreadInfo, tagsList, (int)(tableSeq - pThreadInfo->start_table_from)), true); - code = generateSmlJsonCols(jsonArray, tag, stbInfo, - pThreadInfo, timestamp); - if (code) { + if (generateSmlJsonCols(jsonArray, tag, stbInfo, + pThreadInfo, timestamp)) { goto free_json_interlace_sml; } } else { - code = generateSmlMutablePart( - pThreadInfo->lines[j], - smlList[tableSeq - pThreadInfo->start_table_from], - stbInfo, pThreadInfo, timestamp); - if (code) { + if (generateSmlMutablePart( + pThreadInfo->lines[j], + smlList[tableSeq - pThreadInfo->start_table_from], + stbInfo, pThreadInfo, timestamp)) { goto free_lines_interlace_sml; } } @@ -2302,7 +2310,9 @@ static void *syncWriteInterlaceSml(threadInfo *pThreadInfo, if (percentComplete < 100) printf("[%d]:%d%%\n", pThreadInfo->threadID, percentComplete); + *code = 0; printStatPerThread(pThreadInfo); + free_of_interlace_sml: if (stbInfo->lineProtocol == TSDB_SML_JSON_PROTOCOL) { tmfree(pThreadInfo->lines); free_json_interlace_sml: @@ -2324,12 +2334,13 @@ static void *syncWriteInterlaceSml(threadInfo *pThreadInfo, } tmfree(smlList); } - return NULL; + return code; } void *syncWriteProgressiveStmt(threadInfo *pThreadInfo) { debugPrint("%s() LN%d: ### stmt progressive write\n", __func__, __LINE__); - + int32_t* code = calloc(1, sizeof (int32_t)); + *code = -1; SSuperTable *stbInfo = pThreadInfo->stbInfo; int64_t timeStampStep = stbInfo ? stbInfo->timeStampStep : g_args.timestamp_step; @@ -2362,7 +2373,7 @@ void *syncWriteProgressiveStmt(threadInfo *pThreadInfo) { if (0 == strlen(tableName)) { errorPrint("[%d] %s() LN%d, getTableName return null\n", pThreadInfo->threadID, __func__, __LINE__); - return NULL; + goto free_of_stmt_progressive; } // measure prepare + insert @@ -2448,16 +2459,17 @@ void *syncWriteProgressiveStmt(threadInfo *pThreadInfo) { if (percentComplete < 100) { printf("[%d]:%d%%\n", pThreadInfo->threadID, percentComplete); } - + *code = 0; + printStatPerThread(pThreadInfo); free_of_stmt_progressive: tmfree(pThreadInfo->buffer); - printStatPerThread(pThreadInfo); - return NULL; + return code; } void *syncWriteProgressive(threadInfo *pThreadInfo) { debugPrint("%s() LN%d: ### progressive write\n", __func__, __LINE__); - + int32_t* code = calloc(1, sizeof (int32_t)); + *code = -1; SSuperTable *stbInfo = pThreadInfo->stbInfo; uint64_t maxSqlLen = stbInfo ? stbInfo->maxSqlLen : g_args.max_sql_len; int64_t timeStampStep = @@ -2469,7 +2481,7 @@ void *syncWriteProgressive(threadInfo *pThreadInfo) { pThreadInfo->buffer = calloc(maxSqlLen, 1); if (NULL == pThreadInfo->buffer) { errorPrint("%s", "failed to allocate memory\n"); - return NULL; + goto free_of_progressive; } uint64_t lastPrintTime = taosGetTimestampMs(); @@ -2497,8 +2509,7 @@ void *syncWriteProgressive(threadInfo *pThreadInfo) { if (0 == strlen(tableName)) { errorPrint("[%d] %s() LN%d, getTableName return null\n", pThreadInfo->threadID, __func__, __LINE__); - free(pThreadInfo->buffer); - return NULL; + goto free_of_progressive; } int64_t remainderBufLen = maxSqlLen - 2000; @@ -2609,16 +2620,17 @@ void *syncWriteProgressive(threadInfo *pThreadInfo) { if (percentComplete < 100) { printf("[%d]:%d%%\n", pThreadInfo->threadID, percentComplete); } - + *code = 0; + printStatPerThread(pThreadInfo); free_of_progressive: tmfree(pThreadInfo->buffer); - printStatPerThread(pThreadInfo); - return NULL; + return code; } void *syncWriteProgressiveSml(threadInfo *pThreadInfo) { debugPrint("%s() LN%d: ### sml progressive write\n", __func__, __LINE__); - int32_t code = 0; + int32_t * code = calloc(1, sizeof (int32_t)); + *code = -1; SSuperTable *stbInfo = pThreadInfo->stbInfo; int64_t timeStampStep = stbInfo->timeStampStep; int64_t insertRows = stbInfo->insertRows; @@ -2645,7 +2657,7 @@ void *syncWriteProgressiveSml(threadInfo *pThreadInfo) { smlList = (char **)calloc(pThreadInfo->ntables, sizeof(char *)); if (NULL == smlList) { errorPrint("%s", "failed to allocate memory\n"); - return NULL; + goto free_of_progressive_sml; } for (int t = 0; t < pThreadInfo->ntables; t++) { char *sml = (char *)calloc(1, stbInfo->lenOfOneRow); @@ -2653,8 +2665,7 @@ void *syncWriteProgressiveSml(threadInfo *pThreadInfo) { errorPrint("%s", "failed to allocate memory\n"); goto free_smlheadlist_progressive_sml; } - code = generateSmlConstPart(sml, stbInfo, pThreadInfo, t); - if (code) { + if (generateSmlConstPart(sml, stbInfo, pThreadInfo, t)) { goto free_smlheadlist_progressive_sml; } smlList[t] = sml; @@ -2677,8 +2688,7 @@ void *syncWriteProgressiveSml(threadInfo *pThreadInfo) { jsonArray = cJSON_CreateArray(); tagsList = cJSON_CreateArray(); for (int t = 0; t < pThreadInfo->ntables; t++) { - code = generateSmlJsonTags(tagsList, stbInfo, pThreadInfo, t); - if (code) { + if (generateSmlJsonTags(tagsList, stbInfo, pThreadInfo, t)) { goto free_json_progressive_sml; } } @@ -2699,16 +2709,14 @@ void *syncWriteProgressiveSml(threadInfo *pThreadInfo) { if (stbInfo->lineProtocol == TSDB_SML_JSON_PROTOCOL) { cJSON *tag = cJSON_Duplicate( cJSON_GetArrayItem(tagsList, (int)i), true); - code = generateSmlJsonCols(jsonArray, tag, stbInfo, - pThreadInfo, timestamp); - if (code) { + if (generateSmlJsonCols(jsonArray, tag, stbInfo, + pThreadInfo, timestamp)) { goto free_json_progressive_sml; } } else { - code = generateSmlMutablePart(pThreadInfo->lines[k], - smlList[i], stbInfo, - pThreadInfo, timestamp); - if (code) { + if (generateSmlMutablePart(pThreadInfo->lines[k], + smlList[i], stbInfo, + pThreadInfo, timestamp)) { goto free_lines_progressive_sml; } } @@ -2770,6 +2778,8 @@ void *syncWriteProgressiveSml(threadInfo *pThreadInfo) { } } + *code = 0; + free_of_progressive_sml: if (stbInfo->lineProtocol == TSDB_SML_JSON_PROTOCOL) { tmfree(pThreadInfo->lines); free_json_progressive_sml: @@ -2791,7 +2801,7 @@ void *syncWriteProgressiveSml(threadInfo *pThreadInfo) { } tmfree(smlList); } - return NULL; + return code; } void *syncWrite(void *sarg) { @@ -3290,7 +3300,12 @@ int startMultiThreadInsertData(int threads, char *db_name, char *precision, int64_t start = taosGetTimestampUs(); for (int i = 0; i < threads; i++) { - pthread_join(pids[i], NULL); + void* result; + pthread_join(pids[i], &result); + if (*(int32_t*)result){ + g_fail = true; + } + tmfree(result); } uint64_t totalDelay = 0; @@ -3343,6 +3358,13 @@ int startMultiThreadInsertData(int threads, char *db_name, char *precision, if (pThreadInfo->minDelay < minDelay) minDelay = pThreadInfo->minDelay; } + free(pids); + free(infos); + + if (g_fail){ + return -1; + } + if (cntDelay == 0) cntDelay = 1; avgDelay = (double)totalDelay / cntDelay; @@ -3404,8 +3426,6 @@ int startMultiThreadInsertData(int threads, char *db_name, char *precision, // taos_close(taos); - free(pids); - free(infos); return 0; } diff --git a/src/kit/taosdemo/src/demoMain.c b/src/kit/taosdemo/src/demoMain.c index 9941f608efa7c0d8a72171383bade43e309fede0..d5e9467b223718338d280dce9b8582dd3de00cd6 100644 --- a/src/kit/taosdemo/src/demoMain.c +++ b/src/kit/taosdemo/src/demoMain.c @@ -20,6 +20,7 @@ FILE * g_fpOfInsertResult = NULL; char * g_dupstr = NULL; SDbs g_Dbs; SQueryMetaInfo g_queryInfo; +bool g_fail = false; SArguments g_args = { DEFAULT_METAFILE, // metaFile diff --git a/src/kit/taosdemo/src/demoQuery.c b/src/kit/taosdemo/src/demoQuery.c index d8e8438fa7177db993c6da1cc5ac5cad98ef3010..ffae0ff10a643c97fbb2f291ae168d7a23dce545 100644 --- a/src/kit/taosdemo/src/demoQuery.c +++ b/src/kit/taosdemo/src/demoQuery.c @@ -44,7 +44,8 @@ void selectAndGetResult(threadInfo *pThreadInfo, char *command) { void *specifiedTableQuery(void *sarg) { threadInfo *pThreadInfo = (threadInfo *)sarg; - + int32_t *code = calloc(1, sizeof (int32_t)); + *code = -1; setThreadName("specTableQuery"); if (pThreadInfo->taos == NULL) { @@ -54,7 +55,7 @@ void *specifiedTableQuery(void *sarg) { if (taos == NULL) { errorPrint("[%d] Failed to connect to TDengine, reason:%s\n", pThreadInfo->threadID, taos_errstr(NULL)); - return NULL; + goto end_of_specified_query; } else { pThreadInfo->taos = taos; } @@ -65,7 +66,7 @@ void *specifiedTableQuery(void *sarg) { if (0 != queryDbExec(pThreadInfo->taos, sqlStr, NO_INSERT_TYPE, false)) { taos_close(pThreadInfo->taos); errorPrint("use database %s failed!\n\n", g_queryInfo.dbName); - return NULL; + goto end_of_specified_query; } uint64_t st = 0; @@ -118,14 +119,18 @@ void *specifiedTableQuery(void *sarg) { lastPrintTime = currentPrintTime; } } - return NULL; + *code = 0; + end_of_specified_query: + return code; } void *superTableQuery(void *sarg) { + int32_t * code = calloc(1, sizeof (int32_t)); + *code = -1; char *sqlstr = calloc(1, BUFFER_SIZE); if (NULL == sqlstr) { errorPrint("%s", "failed to allocate memory\n"); - return NULL; + goto free_of_super_query; } threadInfo *pThreadInfo = (threadInfo *)sarg; @@ -139,8 +144,7 @@ void *superTableQuery(void *sarg) { if (taos == NULL) { errorPrint("[%d] Failed to connect to TDengine, reason:%s\n", pThreadInfo->threadID, taos_errstr(NULL)); - free(sqlstr); - return NULL; + goto free_of_super_query; } else { pThreadInfo->taos = taos; } @@ -200,9 +204,10 @@ void *superTableQuery(void *sarg) { taosGetSelfPthreadId(), pThreadInfo->start_table_from, pThreadInfo->end_table_to, (double)(et - st) / 1000.0); } - - free(sqlstr); - return NULL; + *code = 0; + free_of_super_query: + tmfree(sqlstr); + return code; } int queryTestProcess() { @@ -398,7 +403,12 @@ int queryTestProcess() { if ((nSqlCount > 0) && (nConcurrent > 0)) { for (int i = 0; i < nConcurrent; i++) { for (int j = 0; j < nSqlCount; j++) { - pthread_join(pids[i * nSqlCount + j], NULL); + void* result; + pthread_join(pids[i * nSqlCount + j], &result); + if (*(int32_t*)result) { + g_fail = true; + } + tmfree(result); if (0 == strncasecmp(g_queryInfo.queryMode, "rest", 4)) { threadInfo *pThreadInfo = infos + i * nSqlCount + j; #ifdef WINDOWS @@ -416,7 +426,12 @@ int queryTestProcess() { tmfree((char *)infos); for (int i = 0; i < g_queryInfo.superQueryInfo.threadCnt; i++) { - pthread_join(pidsOfSub[i], NULL); + void* result; + pthread_join(pidsOfSub[i], &result); + if (*(int32_t*)result) { + g_fail = true; + } + tmfree(result); if (0 == strncasecmp(g_queryInfo.queryMode, "rest", 4)) { threadInfo *pThreadInfo = infosOfSub + i; #ifdef WINDOWS @@ -431,6 +446,10 @@ int queryTestProcess() { tmfree((char *)pidsOfSub); tmfree((char *)infosOfSub); + if (g_fail) { + return -1; + } + // taos_close(taos);// workaround to use separate taos connection; uint64_t endTs = taosGetTimestampMs(); diff --git a/src/kit/taosdemo/src/demoSubscribe.c b/src/kit/taosdemo/src/demoSubscribe.c index 1386193f4059ad60eb66c08e7078173a99c29da2..fb9800d0cdc0be90d82b72d91f0e20902c082b4f 100644 --- a/src/kit/taosdemo/src/demoSubscribe.c +++ b/src/kit/taosdemo/src/demoSubscribe.c @@ -71,6 +71,8 @@ TAOS_SUB *subscribeImpl(QUERY_CLASS class, threadInfo *pThreadInfo, char *sql, } void *specifiedSubscribe(void *sarg) { + int32_t * code = calloc(1, sizeof (int32_t)); + *code = -1; threadInfo *pThreadInfo = (threadInfo *)sarg; // TAOS_SUB* tsub = NULL; @@ -83,15 +85,14 @@ void *specifiedSubscribe(void *sarg) { if (pThreadInfo->taos == NULL) { errorPrint("[%d] Failed to connect to TDengine, reason:%s\n", pThreadInfo->threadID, taos_errstr(NULL)); - return NULL; + goto free_of_specified_subscribe; } } char sqlStr[TSDB_DB_NAME_LEN + 5]; sprintf(sqlStr, "USE %s", g_queryInfo.dbName); if (0 != queryDbExec(pThreadInfo->taos, sqlStr, NO_INSERT_TYPE, false)) { - taos_close(pThreadInfo->taos); - return NULL; + goto free_of_specified_subscribe; } sprintf(g_queryInfo.specifiedQueryInfo.topic[pThreadInfo->threadID], @@ -110,8 +111,7 @@ void *specifiedSubscribe(void *sarg) { g_queryInfo.specifiedQueryInfo.subscribeRestart, g_queryInfo.specifiedQueryInfo.subscribeInterval); if (NULL == g_queryInfo.specifiedQueryInfo.tsub[pThreadInfo->threadID]) { - taos_close(pThreadInfo->taos); - return NULL; + goto free_of_specified_subscribe; } // start loop to consume result @@ -171,36 +171,37 @@ void *specifiedSubscribe(void *sarg) { g_queryInfo.specifiedQueryInfo.subscribeInterval); if (NULL == g_queryInfo.specifiedQueryInfo .tsub[pThreadInfo->threadID]) { - taos_close(pThreadInfo->taos); - return NULL; + goto free_of_specified_subscribe; } } } } + *code = 0; taos_free_result(g_queryInfo.specifiedQueryInfo.res[pThreadInfo->threadID]); + free_of_specified_subscribe: taos_close(pThreadInfo->taos); - - return NULL; + return code; } static void *superSubscribe(void *sarg) { + int32_t * code = calloc(1, sizeof (int32_t)); + *code = -1; threadInfo *pThreadInfo = (threadInfo *)sarg; + TAOS_SUB *tsub[MAX_QUERY_SQL_COUNT] = {0}; + uint64_t tsubSeq; char * subSqlStr = calloc(1, BUFFER_SIZE); if (NULL == subSqlStr) { errorPrint("%s", "failed to allocate memory\n"); + goto free_of_super_subscribe; } - TAOS_SUB *tsub[MAX_QUERY_SQL_COUNT] = {0}; - uint64_t tsubSeq; - setThreadName("superSub"); if (pThreadInfo->ntables > MAX_QUERY_SQL_COUNT) { - free(subSqlStr); errorPrint("The table number(%" PRId64 ") of the thread is more than max query sql count: %d\n", pThreadInfo->ntables, MAX_QUERY_SQL_COUNT); - exit(EXIT_FAILURE); + goto free_of_super_subscribe; } if (pThreadInfo->taos == NULL) { @@ -210,18 +211,15 @@ static void *superSubscribe(void *sarg) { if (pThreadInfo->taos == NULL) { errorPrint("[%d] Failed to connect to TDengine, reason:%s\n", pThreadInfo->threadID, taos_errstr(NULL)); - free(subSqlStr); - return NULL; + goto free_of_super_subscribe; } } char sqlStr[TSDB_DB_NAME_LEN + 5]; sprintf(sqlStr, "USE %s", g_queryInfo.dbName); if (0 != queryDbExec(pThreadInfo->taos, sqlStr, NO_INSERT_TYPE, false)) { - taos_close(pThreadInfo->taos); errorPrint("use database %s failed!\n\n", g_queryInfo.dbName); - free(subSqlStr); - return NULL; + goto free_of_super_subscribe; } char topic[32] = {0}; @@ -252,9 +250,7 @@ static void *superSubscribe(void *sarg) { g_queryInfo.superQueryInfo.subscribeRestart, g_queryInfo.superQueryInfo.subscribeInterval); if (NULL == tsub[tsubSeq]) { - taos_close(pThreadInfo->taos); - free(subSqlStr); - return NULL; + goto free_of_super_subscribe; } } @@ -321,9 +317,7 @@ static void *superSubscribe(void *sarg) { g_queryInfo.superQueryInfo.subscribeRestart, g_queryInfo.superQueryInfo.subscribeInterval); if (NULL == tsub[tsubSeq]) { - taos_close(pThreadInfo->taos); - free(subSqlStr); - return NULL; + goto free_of_super_subscribe; } } } @@ -340,10 +334,11 @@ static void *superSubscribe(void *sarg) { tsubSeq = i - pThreadInfo->start_table_from; taos_unsubscribe(tsub[tsubSeq], 0); } - + *code = 0; + free_of_super_subscribe: taos_close(pThreadInfo->taos); - free(subSqlStr); - return NULL; + tmfree(subSqlStr); + return code; } int subscribeTestProcess() { @@ -482,7 +477,12 @@ int subscribeTestProcess() { for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) { for (int j = 0; j < threads; j++) { uint64_t seq = i * threads + j; - pthread_join(pidsOfStable[seq], NULL); + void* result; + pthread_join(pidsOfStable[seq], &result); + if (*(int32_t*)result) { + g_fail = true; + } + tmfree(result); } } } @@ -491,7 +491,12 @@ int subscribeTestProcess() { for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) { for (int j = 0; j < g_queryInfo.specifiedQueryInfo.concurrent; j++) { uint64_t seq = i * g_queryInfo.specifiedQueryInfo.concurrent + j; - pthread_join(pids[seq], NULL); + void* result; + pthread_join(pids[seq], &result); + if (*(int32_t*)result) { + g_fail = true; + } + tmfree(result); } } @@ -501,5 +506,8 @@ int subscribeTestProcess() { tmfree((char *)pidsOfStable); tmfree((char *)infosOfStable); // taos_close(taos); + if (g_fail) { + return -1; + } return 0; } \ No newline at end of file diff --git a/tests/pytest/tools/taosdemoAllTest/TD-10539/create_taosdemo.py b/tests/pytest/tools/taosdemoAllTest/TD-10539/create_taosdemo.py index d64bf201f6cd7d9a1ce7870c578e7a80761f3c9c..6e722d3243c4a69b921e7dbe17d38c7339c847e9 100644 --- a/tests/pytest/tools/taosdemoAllTest/TD-10539/create_taosdemo.py +++ b/tests/pytest/tools/taosdemoAllTest/TD-10539/create_taosdemo.py @@ -171,8 +171,8 @@ class TDTestCase: #print("==============taosdemo——json_no,#create stable,table; insert table; show table; select table; drop table") - assert os.system("%staosdemo -f tools/taosdemoAllTest/TD-10539/create_taosdemo_no.json -y " % binPath) == 0 - tdSql.query("show dbno.tables ") + os.system("%staosdemo -f tools/taosdemoAllTest/TD-10539/create_taosdemo_no.json -y " % binPath) + tdSql.query("show dbno.tables;") tdSql.checkRows(0)