diff --git a/src/client/src/tscPrepare.c b/src/client/src/tscPrepare.c index 665efa4c6dbca0437540ee1dd9875267bc2d8b72..8e4c417fbfe759bc383191b9d7a9b49f41978435 100644 --- a/src/client/src/tscPrepare.c +++ b/src/client/src/tscPrepare.c @@ -1261,8 +1261,6 @@ static void insertBatchClean(STscStmt* pStmt) { taosHashClear(pCmd->insertParam.pTableBlockHashList); tscFreeSqlResult(pSql); tscFreeSubobj(pSql); - tfree(pSql->pSubs); - pSql->subState.numOfSub = 0; } static int insertBatchStmtExecute(STscStmt* pStmt) { diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 92a6f6e149306a7449c823311aaa92061974d39d..779847d7c89deb35f6747e2c89c17cc061ce6445 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -330,7 +330,7 @@ void checkBrokenQueries(STscObj *pTscObj) { SSqlObj *pSql = pTscObj->sqlList; while (pSql) { // avoid sqlobj may not be correctly removed from sql list - if (pSql->sqlstr == NULL) { + if (pSql->sqlstr == NULL || pSql->signature != pSql) { pSql = pSql->next; continue; } @@ -3392,7 +3392,6 @@ int tscRenewTableMeta(SSqlObj *pSql) { pthread_mutex_lock(&rootSql->mtxSubs); tscFreeSubobj(rootSql); pthread_mutex_unlock(&rootSql->mtxSubs); - tfree(rootSql->pSubs); tscResetSqlCmd(&rootSql->cmd, true, rootSql->self); code = getMultiTableMetaFromMnode(rootSql, pNameList, vgroupList, NULL, tscTableMetaCallBack, true); diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index 0ed4c0a270ca675dfc445f387ad48de5171df7d9..eab9527f16e439e48016faff33a280c206818e15 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -232,8 +232,6 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf tscFreeSqlResult(pStream->pSql); tscFreeSubobj(pStream->pSql); - tfree(pStream->pSql->pSubs); - pStream->pSql->subState.numOfSub = 0; pTableMetaInfo->vgroupList = tscVgroupInfoClear(pTableMetaInfo->vgroupList); tscSetRetryTimer(pStream, pStream->pSql, retryDelay); @@ -610,8 +608,6 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf tscFreeSqlResult(pSql); tscFreeSubobj(pSql); - tfree(pSql->pSubs); - pSql->subState.numOfSub = 0; pTableMetaInfo->vgroupList = tscVgroupInfoClear(pTableMetaInfo->vgroupList); tscSetNextLaunchTimer(pStream, pSql); } diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 1805c22a9d77181fd5d5e2b3f31a25186741ac74..a8c2eea195992c4a6e66cbfd49bf9e36f2cf4c8c 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -695,6 +695,12 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { } void freeJoinSubqueryObj(SSqlObj* pSql) { + if (pSql->subState.numOfSub == 0) { + return; + } + + pthread_mutex_lock(&pSql->subState.mutex); + for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { SSqlObj* pSub = pSql->pSubs[i]; if (pSub == NULL) { @@ -707,13 +713,13 @@ void freeJoinSubqueryObj(SSqlObj* pSql) { taos_free_result(pSub); pSql->pSubs[i] = NULL; } - - if (pSql->subState.states) { - pthread_mutex_destroy(&pSql->subState.mutex); - } tfree(pSql->subState.states); pSql->subState.numOfSub = 0; + + pthread_mutex_unlock(&pSql->subState.mutex); + + pthread_mutex_destroy(&pSql->subState.mutex); } static int32_t quitAllSubquery(SSqlObj* pSqlSub, SSqlObj* pSqlObj, SJoinSupporter* pSupporter) { @@ -901,7 +907,6 @@ bool tscReparseSql(SSqlObj *sql, int32_t code){ } tscFreeSubobj(sql); - tfree(sql->pSubs); sql->res.code = TSDB_CODE_SUCCESS; sql->retry++; @@ -2180,7 +2185,6 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { assert((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) == 0); int32_t code = TSDB_CODE_SUCCESS; - pSql->subState.numOfSub = pQueryInfo->numOfTables; if (pSql->subState.states == NULL) { pSql->subState.states = calloc(pSql->subState.numOfSub, sizeof(*pSql->subState.states)); @@ -2192,6 +2196,8 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { pthread_mutex_init(&pSql->subState.mutex, NULL); } + pSql->subState.numOfSub = pQueryInfo->numOfTables; + memset(pSql->subState.states, 0, sizeof(*pSql->subState.states) * pSql->subState.numOfSub); tscDebug("0x%"PRIx64" reset all sub states to 0, start subquery, total:%d", pSql->self, pQueryInfo->numOfTables); @@ -2251,7 +2257,12 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { } void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs) { - assert(numOfSubs <= pSql->subState.numOfSub && numOfSubs >= 0); + pthread_mutex_lock(&pSql->subState.mutex); + if (numOfSubs > pSql->subState.numOfSub || numOfSubs <= 0 || pSql->subState.numOfSub <= 0) { + pthread_mutex_unlock(&pSql->subState.mutex); + return; + } + for(int32_t i = 0; i < numOfSubs; ++i) { SSqlObj* pSub = pSql->pSubs[i]; @@ -2261,6 +2272,7 @@ void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs) { taos_free_result(pSub); } + pthread_mutex_unlock(&pSql->subState.mutex); } void tscLockByThread(int64_t *lockedBy) { @@ -2365,8 +2377,10 @@ void tscFirstRoundRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { if (code != TSDB_CODE_SUCCESS) { tscFreeFirstRoundSup(¶m); taos_free_result(pSql); + pthread_mutex_lock(&pParent->subState.mutex); pParent->subState.numOfSub = 0; tfree(pParent->pSubs); + pthread_mutex_unlock(&pParent->subState.mutex); pParent->res.code = code; tscAsyncResultOnError(pParent); return; @@ -2469,9 +2483,11 @@ void tscFirstRoundRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { tscFreeFirstRoundSup(¶m); taos_free_result(pSql); + pthread_mutex_lock(&pParent->subState.mutex); pParent->subState.numOfSub = 0; tfree(pParent->pSubs); - + pthread_mutex_unlock(&pParent->subState.mutex); + if (resRows == 0) { pParent->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT; (*pParent->fp)(pParent->param, pParent, 0); @@ -2493,8 +2509,10 @@ void tscFirstRoundCallback(void* param, TAOS_RES* tres, int code) { tscFreeFirstRoundSup(¶m); taos_free_result(pSql); + pthread_mutex_lock(&parent->subState.mutex); parent->subState.numOfSub = 0; tfree(parent->pSubs); + pthread_mutex_unlock(&parent->subState.mutex); parent->res.code = c; tscAsyncResultOnError(parent); return; @@ -3014,7 +3032,6 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO } tscFreeSubobj(userSql); - tfree(userSql->pSubs); userSql->res.code = TSDB_CODE_SUCCESS; userSql->retry++; @@ -3382,7 +3399,9 @@ static bool needRetryInsert(SSqlObj* pParentObj, int32_t numOfSub) { } static void doFreeInsertSupporter(SSqlObj* pSqlObj) { - assert(pSqlObj != NULL && pSqlObj->subState.numOfSub > 0); + if (pSqlObj == NULL || pSqlObj->subState.numOfSub <= 0) { + return; + } for(int32_t i = 0; i < pSqlObj->subState.numOfSub; ++i) { SSqlObj* pSql = pSqlObj->pSubs[i]; diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 6c03aeefd789825ddabe1cf1a27bf1f5f28995ea..e1717269de6d2152acc127e6d704080e3e57b62d 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1682,6 +1682,8 @@ void tscFreeSubobj(SSqlObj* pSql) { return; } + pthread_mutex_lock(&pSql->subState.mutex); + tscDebug("0x%"PRIx64" start to free sub SqlObj, numOfSub:%d", pSql->self, pSql->subState.numOfSub); for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) { @@ -1695,12 +1697,14 @@ void tscFreeSubobj(SSqlObj* pSql) { pSql->pSubs[i] = NULL; } - if (pSql->subState.states) { - pthread_mutex_destroy(&pSql->subState.mutex); - } - tfree(pSql->subState.states); pSql->subState.numOfSub = 0; + + tfree(pSql->pSubs); + + pthread_mutex_unlock(&pSql->subState.mutex); + + pthread_mutex_destroy(&pSql->subState.mutex); } /** @@ -1768,9 +1772,6 @@ void tscFreeSqlObj(SSqlObj* pSql) { pSql->fp = NULL; tfree(pSql->sqlstr); tfree(pSql->pBuf); - - tfree(pSql->pSubs); - pSql->subState.numOfSub = 0; pSql->self = 0; tscFreeSqlResult(pSql); @@ -4161,10 +4162,7 @@ static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) { } tscFreeSubobj(pParentSql); - tfree(pParentSql->pSubs); - tscFreeSubobj(rootObj); - tfree(rootObj->pSubs); rootObj->res.code = TSDB_CODE_SUCCESS; rootObj->retry++; @@ -4207,19 +4205,9 @@ static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) { int32_t doInitSubState(SSqlObj* pSql, int32_t numOfSubqueries) { //bug fix. Above doInitSubState level, the loop invocation with the same SSqlObj will be fail. //assert(pSql->subState.numOfSub == 0 && pSql->pSubs == NULL && pSql->subState.states == NULL); - if(pSql->pSubs) { - free(pSql->pSubs); - pSql->pSubs = NULL; - } - - if(pSql->subState.states) { - free(pSql->subState.states); - pSql->subState.states = NULL; - } - - pSql->subState.numOfSub = numOfSubqueries; + tscFreeSubobj(pSql); - pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES); + pSql->pSubs = calloc(numOfSubqueries, POINTER_BYTES); pSql->subState.states = calloc(pSql->subState.numOfSub, sizeof(int8_t)); int32_t code = pthread_mutex_init(&pSql->subState.mutex, NULL); @@ -4227,6 +4215,8 @@ int32_t doInitSubState(SSqlObj* pSql, int32_t numOfSubqueries) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } + pSql->subState.numOfSub = numOfSubqueries; + return TSDB_CODE_SUCCESS; } @@ -4332,6 +4322,7 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { return; _error: + for(int32_t i = 0; i < numOfInit; ++i) { SSqlObj* p = pSql->pSubs[i]; tscFreeSqlObj(p); @@ -4616,16 +4607,7 @@ void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp) { pRes->final = finalBk; pRes->numOfTotal = num; - pthread_mutex_lock(&pSql->subState.mutex); - for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) { - taos_free_result(pSql->pSubs[i]); - } - - tfree(pSql->pSubs); - tfree(pSql->subState.states); - pSql->subState.numOfSub = 0; - pthread_mutex_unlock(&pSql->subState.mutex); - pthread_mutex_destroy(&pSql->subState.mutex); + tscFreeSubobj(pSql); pSql->fp = fp; diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 6ba582138c2aecbad0c0dde95177b67a4a0d72fb..7ff2ff790fe5e1d98f147fb4e4f0e2ea7e13b43b 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -9946,7 +9946,7 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SGroupbyExpr* pGroupbyExpr, S pTableqinfo->pGroupList = taosArrayInit(numOfGroups, POINTER_BYTES); pTableqinfo->numOfTables = pTableGroupInfo->numOfTables; - pTableqinfo->map = taosHashInit(pTableGroupInfo->numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); + pTableqinfo->map = taosHashInit(pTableGroupInfo->numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); } pQInfo->pBuf = calloc(pTableGroupInfo->numOfTables, sizeof(STableQueryInfo)); @@ -10146,10 +10146,13 @@ static void doDestroyTableQueryInfo(STableGroupInfo* pTableqinfoGroupInfo) { } taosArrayDestroy(&pTableqinfoGroupInfo->pGroupList); - taosHashCleanup(pTableqinfoGroupInfo->map); + + SHashObj *pmap = pTableqinfoGroupInfo->map; + if (pmap == atomic_val_compare_exchange_ptr(&pTableqinfoGroupInfo->map, pmap, NULL)) { + taosHashCleanup(pmap); + } pTableqinfoGroupInfo->pGroupList = NULL; - pTableqinfoGroupInfo->map = NULL; pTableqinfoGroupInfo->numOfTables = 0; } diff --git a/src/util/src/hash.c b/src/util/src/hash.c index d4d42976155cb1e11b4abdf6c1d6fa6855921971..3cbcc7ed6e7243dc1dcaec17b3c35892c011461e 100644 --- a/src/util/src/hash.c +++ b/src/util/src/hash.c @@ -611,7 +611,6 @@ void taosHashClear(SHashObj *pHashObj) { for (int32_t i = 0; i < pHashObj->capacity; ++i) { SHashEntry *pEntry = pHashObj->hashList[i]; if (pEntry->num == 0) { - assert(pEntry->next == NULL); continue; }