From de1ab40099ac98549fe41ed1ea79db4c8f5e14dd Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Wed, 14 Sep 2022 10:13:45 +0800 Subject: [PATCH] fix: fix taosd/taosc crash issue --- src/client/src/tscPrepare.c | 2 -- src/client/src/tscServer.c | 3 +-- src/client/src/tscStream.c | 4 ---- src/client/src/tscSubquery.c | 39 ++++++++++++++++++++++++-------- src/client/src/tscUtil.c | 44 +++++++++++------------------------- src/query/src/qExecutor.c | 9 +++++--- src/util/src/hash.c | 1 - 7 files changed, 49 insertions(+), 53 deletions(-) diff --git a/src/client/src/tscPrepare.c b/src/client/src/tscPrepare.c index 665efa4c6d..8e4c417fbf 100644 --- a/src/client/src/tscPrepare.c +++ b/src/client/src/tscPrepare.c @@ -1261,8 +1261,6 @@ static void insertBatchClean(STscStmt* pStmt) { taosHashClear(pCmd->insertParam.pTableBlockHashList); tscFreeSqlResult(pSql); tscFreeSubobj(pSql); - tfree(pSql->pSubs); - pSql->subState.numOfSub = 0; } static int insertBatchStmtExecute(STscStmt* pStmt) { diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 92a6f6e149..779847d7c8 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -330,7 +330,7 @@ void checkBrokenQueries(STscObj *pTscObj) { SSqlObj *pSql = pTscObj->sqlList; while (pSql) { // avoid sqlobj may not be correctly removed from sql list - if (pSql->sqlstr == NULL) { + if (pSql->sqlstr == NULL || pSql->signature != pSql) { pSql = pSql->next; continue; } @@ -3392,7 +3392,6 @@ int tscRenewTableMeta(SSqlObj *pSql) { pthread_mutex_lock(&rootSql->mtxSubs); tscFreeSubobj(rootSql); pthread_mutex_unlock(&rootSql->mtxSubs); - tfree(rootSql->pSubs); tscResetSqlCmd(&rootSql->cmd, true, rootSql->self); code = getMultiTableMetaFromMnode(rootSql, pNameList, vgroupList, NULL, tscTableMetaCallBack, true); diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index 0ed4c0a270..eab9527f16 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -232,8 +232,6 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf tscFreeSqlResult(pStream->pSql); tscFreeSubobj(pStream->pSql); - tfree(pStream->pSql->pSubs); - pStream->pSql->subState.numOfSub = 0; pTableMetaInfo->vgroupList = tscVgroupInfoClear(pTableMetaInfo->vgroupList); tscSetRetryTimer(pStream, pStream->pSql, retryDelay); @@ -610,8 +608,6 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf tscFreeSqlResult(pSql); tscFreeSubobj(pSql); - tfree(pSql->pSubs); - pSql->subState.numOfSub = 0; pTableMetaInfo->vgroupList = tscVgroupInfoClear(pTableMetaInfo->vgroupList); tscSetNextLaunchTimer(pStream, pSql); } diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 1805c22a9d..a8c2eea195 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -695,6 +695,12 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { } void freeJoinSubqueryObj(SSqlObj* pSql) { + if (pSql->subState.numOfSub == 0) { + return; + } + + pthread_mutex_lock(&pSql->subState.mutex); + for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { SSqlObj* pSub = pSql->pSubs[i]; if (pSub == NULL) { @@ -707,13 +713,13 @@ void freeJoinSubqueryObj(SSqlObj* pSql) { taos_free_result(pSub); pSql->pSubs[i] = NULL; } - - if (pSql->subState.states) { - pthread_mutex_destroy(&pSql->subState.mutex); - } tfree(pSql->subState.states); pSql->subState.numOfSub = 0; + + pthread_mutex_unlock(&pSql->subState.mutex); + + pthread_mutex_destroy(&pSql->subState.mutex); } static int32_t quitAllSubquery(SSqlObj* pSqlSub, SSqlObj* pSqlObj, SJoinSupporter* pSupporter) { @@ -901,7 +907,6 @@ bool tscReparseSql(SSqlObj *sql, int32_t code){ } tscFreeSubobj(sql); - tfree(sql->pSubs); sql->res.code = TSDB_CODE_SUCCESS; sql->retry++; @@ -2180,7 +2185,6 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { assert((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) == 0); int32_t code = TSDB_CODE_SUCCESS; - pSql->subState.numOfSub = pQueryInfo->numOfTables; if (pSql->subState.states == NULL) { pSql->subState.states = calloc(pSql->subState.numOfSub, sizeof(*pSql->subState.states)); @@ -2192,6 +2196,8 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { pthread_mutex_init(&pSql->subState.mutex, NULL); } + pSql->subState.numOfSub = pQueryInfo->numOfTables; + memset(pSql->subState.states, 0, sizeof(*pSql->subState.states) * pSql->subState.numOfSub); tscDebug("0x%"PRIx64" reset all sub states to 0, start subquery, total:%d", pSql->self, pQueryInfo->numOfTables); @@ -2251,7 +2257,12 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { } void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs) { - assert(numOfSubs <= pSql->subState.numOfSub && numOfSubs >= 0); + pthread_mutex_lock(&pSql->subState.mutex); + if (numOfSubs > pSql->subState.numOfSub || numOfSubs <= 0 || pSql->subState.numOfSub <= 0) { + pthread_mutex_unlock(&pSql->subState.mutex); + return; + } + for(int32_t i = 0; i < numOfSubs; ++i) { SSqlObj* pSub = pSql->pSubs[i]; @@ -2261,6 +2272,7 @@ void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs) { taos_free_result(pSub); } + pthread_mutex_unlock(&pSql->subState.mutex); } void tscLockByThread(int64_t *lockedBy) { @@ -2365,8 +2377,10 @@ void tscFirstRoundRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { if (code != TSDB_CODE_SUCCESS) { tscFreeFirstRoundSup(¶m); taos_free_result(pSql); + pthread_mutex_lock(&pParent->subState.mutex); pParent->subState.numOfSub = 0; tfree(pParent->pSubs); + pthread_mutex_unlock(&pParent->subState.mutex); pParent->res.code = code; tscAsyncResultOnError(pParent); return; @@ -2469,9 +2483,11 @@ void tscFirstRoundRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { tscFreeFirstRoundSup(¶m); taos_free_result(pSql); + pthread_mutex_lock(&pParent->subState.mutex); pParent->subState.numOfSub = 0; tfree(pParent->pSubs); - + pthread_mutex_unlock(&pParent->subState.mutex); + if (resRows == 0) { pParent->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT; (*pParent->fp)(pParent->param, pParent, 0); @@ -2493,8 +2509,10 @@ void tscFirstRoundCallback(void* param, TAOS_RES* tres, int code) { tscFreeFirstRoundSup(¶m); taos_free_result(pSql); + pthread_mutex_lock(&parent->subState.mutex); parent->subState.numOfSub = 0; tfree(parent->pSubs); + pthread_mutex_unlock(&parent->subState.mutex); parent->res.code = c; tscAsyncResultOnError(parent); return; @@ -3014,7 +3032,6 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO } tscFreeSubobj(userSql); - tfree(userSql->pSubs); userSql->res.code = TSDB_CODE_SUCCESS; userSql->retry++; @@ -3382,7 +3399,9 @@ static bool needRetryInsert(SSqlObj* pParentObj, int32_t numOfSub) { } static void doFreeInsertSupporter(SSqlObj* pSqlObj) { - assert(pSqlObj != NULL && pSqlObj->subState.numOfSub > 0); + if (pSqlObj == NULL || pSqlObj->subState.numOfSub <= 0) { + return; + } for(int32_t i = 0; i < pSqlObj->subState.numOfSub; ++i) { SSqlObj* pSql = pSqlObj->pSubs[i]; diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 6c03aeefd7..9ed7302f27 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1682,6 +1682,8 @@ void tscFreeSubobj(SSqlObj* pSql) { return; } + pthread_mutex_lock(&pSql->subState.mutex); + tscDebug("0x%"PRIx64" start to free sub SqlObj, numOfSub:%d", pSql->self, pSql->subState.numOfSub); for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) { @@ -1695,12 +1697,14 @@ void tscFreeSubobj(SSqlObj* pSql) { pSql->pSubs[i] = NULL; } - if (pSql->subState.states) { - pthread_mutex_destroy(&pSql->subState.mutex); - } - tfree(pSql->subState.states); pSql->subState.numOfSub = 0; + + tfree(pSql->pSubs); + + pthread_mutex_unlock(&pSql->subState.mutex); + + pthread_mutex_destroy(&pSql->subState.mutex); } /** @@ -1768,9 +1772,6 @@ void tscFreeSqlObj(SSqlObj* pSql) { pSql->fp = NULL; tfree(pSql->sqlstr); tfree(pSql->pBuf); - - tfree(pSql->pSubs); - pSql->subState.numOfSub = 0; pSql->self = 0; tscFreeSqlResult(pSql); @@ -4161,10 +4162,7 @@ static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) { } tscFreeSubobj(pParentSql); - tfree(pParentSql->pSubs); - tscFreeSubobj(rootObj); - tfree(rootObj->pSubs); rootObj->res.code = TSDB_CODE_SUCCESS; rootObj->retry++; @@ -4207,17 +4205,7 @@ static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) { int32_t doInitSubState(SSqlObj* pSql, int32_t numOfSubqueries) { //bug fix. Above doInitSubState level, the loop invocation with the same SSqlObj will be fail. //assert(pSql->subState.numOfSub == 0 && pSql->pSubs == NULL && pSql->subState.states == NULL); - if(pSql->pSubs) { - free(pSql->pSubs); - pSql->pSubs = NULL; - } - - if(pSql->subState.states) { - free(pSql->subState.states); - pSql->subState.states = NULL; - } - - pSql->subState.numOfSub = numOfSubqueries; + tscFreeSubobj(pSql); pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES); pSql->subState.states = calloc(pSql->subState.numOfSub, sizeof(int8_t)); @@ -4227,6 +4215,8 @@ int32_t doInitSubState(SSqlObj* pSql, int32_t numOfSubqueries) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } + pSql->subState.numOfSub = numOfSubqueries; + return TSDB_CODE_SUCCESS; } @@ -4332,6 +4322,7 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { return; _error: + for(int32_t i = 0; i < numOfInit; ++i) { SSqlObj* p = pSql->pSubs[i]; tscFreeSqlObj(p); @@ -4616,16 +4607,7 @@ void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp) { pRes->final = finalBk; pRes->numOfTotal = num; - pthread_mutex_lock(&pSql->subState.mutex); - for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) { - taos_free_result(pSql->pSubs[i]); - } - - tfree(pSql->pSubs); - tfree(pSql->subState.states); - pSql->subState.numOfSub = 0; - pthread_mutex_unlock(&pSql->subState.mutex); - pthread_mutex_destroy(&pSql->subState.mutex); + tscFreeSubobj(pSql); pSql->fp = fp; diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 6ba582138c..7ff2ff790f 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -9946,7 +9946,7 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SGroupbyExpr* pGroupbyExpr, S pTableqinfo->pGroupList = taosArrayInit(numOfGroups, POINTER_BYTES); pTableqinfo->numOfTables = pTableGroupInfo->numOfTables; - pTableqinfo->map = taosHashInit(pTableGroupInfo->numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); + pTableqinfo->map = taosHashInit(pTableGroupInfo->numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); } pQInfo->pBuf = calloc(pTableGroupInfo->numOfTables, sizeof(STableQueryInfo)); @@ -10146,10 +10146,13 @@ static void doDestroyTableQueryInfo(STableGroupInfo* pTableqinfoGroupInfo) { } taosArrayDestroy(&pTableqinfoGroupInfo->pGroupList); - taosHashCleanup(pTableqinfoGroupInfo->map); + + SHashObj *pmap = pTableqinfoGroupInfo->map; + if (pmap == atomic_val_compare_exchange_ptr(&pTableqinfoGroupInfo->map, pmap, NULL)) { + taosHashCleanup(pmap); + } pTableqinfoGroupInfo->pGroupList = NULL; - pTableqinfoGroupInfo->map = NULL; pTableqinfoGroupInfo->numOfTables = 0; } diff --git a/src/util/src/hash.c b/src/util/src/hash.c index d4d4297615..3cbcc7ed6e 100644 --- a/src/util/src/hash.c +++ b/src/util/src/hash.c @@ -611,7 +611,6 @@ void taosHashClear(SHashObj *pHashObj) { for (int32_t i = 0; i < pHashObj->capacity; ++i) { SHashEntry *pEntry = pHashObj->hashList[i]; if (pEntry->num == 0) { - assert(pEntry->next == NULL); continue; } -- GitLab