提交 de1ab400 编写于 作者: D dapan1121

fix: fix taosd/taosc crash issue

上级 db9dad4a
...@@ -1261,8 +1261,6 @@ static void insertBatchClean(STscStmt* pStmt) { ...@@ -1261,8 +1261,6 @@ static void insertBatchClean(STscStmt* pStmt) {
taosHashClear(pCmd->insertParam.pTableBlockHashList); taosHashClear(pCmd->insertParam.pTableBlockHashList);
tscFreeSqlResult(pSql); tscFreeSqlResult(pSql);
tscFreeSubobj(pSql); tscFreeSubobj(pSql);
tfree(pSql->pSubs);
pSql->subState.numOfSub = 0;
} }
static int insertBatchStmtExecute(STscStmt* pStmt) { static int insertBatchStmtExecute(STscStmt* pStmt) {
......
...@@ -330,7 +330,7 @@ void checkBrokenQueries(STscObj *pTscObj) { ...@@ -330,7 +330,7 @@ void checkBrokenQueries(STscObj *pTscObj) {
SSqlObj *pSql = pTscObj->sqlList; SSqlObj *pSql = pTscObj->sqlList;
while (pSql) { while (pSql) {
// avoid sqlobj may not be correctly removed from sql list // avoid sqlobj may not be correctly removed from sql list
if (pSql->sqlstr == NULL) { if (pSql->sqlstr == NULL || pSql->signature != pSql) {
pSql = pSql->next; pSql = pSql->next;
continue; continue;
} }
...@@ -3392,7 +3392,6 @@ int tscRenewTableMeta(SSqlObj *pSql) { ...@@ -3392,7 +3392,6 @@ int tscRenewTableMeta(SSqlObj *pSql) {
pthread_mutex_lock(&rootSql->mtxSubs); pthread_mutex_lock(&rootSql->mtxSubs);
tscFreeSubobj(rootSql); tscFreeSubobj(rootSql);
pthread_mutex_unlock(&rootSql->mtxSubs); pthread_mutex_unlock(&rootSql->mtxSubs);
tfree(rootSql->pSubs);
tscResetSqlCmd(&rootSql->cmd, true, rootSql->self); tscResetSqlCmd(&rootSql->cmd, true, rootSql->self);
code = getMultiTableMetaFromMnode(rootSql, pNameList, vgroupList, NULL, tscTableMetaCallBack, true); code = getMultiTableMetaFromMnode(rootSql, pNameList, vgroupList, NULL, tscTableMetaCallBack, true);
......
...@@ -232,8 +232,6 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf ...@@ -232,8 +232,6 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf
tscFreeSqlResult(pStream->pSql); tscFreeSqlResult(pStream->pSql);
tscFreeSubobj(pStream->pSql); tscFreeSubobj(pStream->pSql);
tfree(pStream->pSql->pSubs);
pStream->pSql->subState.numOfSub = 0;
pTableMetaInfo->vgroupList = tscVgroupInfoClear(pTableMetaInfo->vgroupList); pTableMetaInfo->vgroupList = tscVgroupInfoClear(pTableMetaInfo->vgroupList);
tscSetRetryTimer(pStream, pStream->pSql, retryDelay); tscSetRetryTimer(pStream, pStream->pSql, retryDelay);
...@@ -610,8 +608,6 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf ...@@ -610,8 +608,6 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
tscFreeSqlResult(pSql); tscFreeSqlResult(pSql);
tscFreeSubobj(pSql); tscFreeSubobj(pSql);
tfree(pSql->pSubs);
pSql->subState.numOfSub = 0;
pTableMetaInfo->vgroupList = tscVgroupInfoClear(pTableMetaInfo->vgroupList); pTableMetaInfo->vgroupList = tscVgroupInfoClear(pTableMetaInfo->vgroupList);
tscSetNextLaunchTimer(pStream, pSql); tscSetNextLaunchTimer(pStream, pSql);
} }
......
...@@ -695,6 +695,12 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { ...@@ -695,6 +695,12 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
} }
void freeJoinSubqueryObj(SSqlObj* pSql) { void freeJoinSubqueryObj(SSqlObj* pSql) {
if (pSql->subState.numOfSub == 0) {
return;
}
pthread_mutex_lock(&pSql->subState.mutex);
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* pSub = pSql->pSubs[i]; SSqlObj* pSub = pSql->pSubs[i];
if (pSub == NULL) { if (pSub == NULL) {
...@@ -707,13 +713,13 @@ void freeJoinSubqueryObj(SSqlObj* pSql) { ...@@ -707,13 +713,13 @@ void freeJoinSubqueryObj(SSqlObj* pSql) {
taos_free_result(pSub); taos_free_result(pSub);
pSql->pSubs[i] = NULL; pSql->pSubs[i] = NULL;
} }
if (pSql->subState.states) {
pthread_mutex_destroy(&pSql->subState.mutex);
}
tfree(pSql->subState.states); tfree(pSql->subState.states);
pSql->subState.numOfSub = 0; pSql->subState.numOfSub = 0;
pthread_mutex_unlock(&pSql->subState.mutex);
pthread_mutex_destroy(&pSql->subState.mutex);
} }
static int32_t quitAllSubquery(SSqlObj* pSqlSub, SSqlObj* pSqlObj, SJoinSupporter* pSupporter) { static int32_t quitAllSubquery(SSqlObj* pSqlSub, SSqlObj* pSqlObj, SJoinSupporter* pSupporter) {
...@@ -901,7 +907,6 @@ bool tscReparseSql(SSqlObj *sql, int32_t code){ ...@@ -901,7 +907,6 @@ bool tscReparseSql(SSqlObj *sql, int32_t code){
} }
tscFreeSubobj(sql); tscFreeSubobj(sql);
tfree(sql->pSubs);
sql->res.code = TSDB_CODE_SUCCESS; sql->res.code = TSDB_CODE_SUCCESS;
sql->retry++; sql->retry++;
...@@ -2180,7 +2185,6 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { ...@@ -2180,7 +2185,6 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) {
assert((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) == 0); assert((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) == 0);
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
pSql->subState.numOfSub = pQueryInfo->numOfTables;
if (pSql->subState.states == NULL) { if (pSql->subState.states == NULL) {
pSql->subState.states = calloc(pSql->subState.numOfSub, sizeof(*pSql->subState.states)); pSql->subState.states = calloc(pSql->subState.numOfSub, sizeof(*pSql->subState.states));
...@@ -2192,6 +2196,8 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { ...@@ -2192,6 +2196,8 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) {
pthread_mutex_init(&pSql->subState.mutex, NULL); pthread_mutex_init(&pSql->subState.mutex, NULL);
} }
pSql->subState.numOfSub = pQueryInfo->numOfTables;
memset(pSql->subState.states, 0, sizeof(*pSql->subState.states) * pSql->subState.numOfSub); memset(pSql->subState.states, 0, sizeof(*pSql->subState.states) * pSql->subState.numOfSub);
tscDebug("0x%"PRIx64" reset all sub states to 0, start subquery, total:%d", pSql->self, pQueryInfo->numOfTables); tscDebug("0x%"PRIx64" reset all sub states to 0, start subquery, total:%d", pSql->self, pQueryInfo->numOfTables);
...@@ -2251,7 +2257,12 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { ...@@ -2251,7 +2257,12 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) {
} }
void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs) { void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs) {
assert(numOfSubs <= pSql->subState.numOfSub && numOfSubs >= 0); pthread_mutex_lock(&pSql->subState.mutex);
if (numOfSubs > pSql->subState.numOfSub || numOfSubs <= 0 || pSql->subState.numOfSub <= 0) {
pthread_mutex_unlock(&pSql->subState.mutex);
return;
}
for(int32_t i = 0; i < numOfSubs; ++i) { for(int32_t i = 0; i < numOfSubs; ++i) {
SSqlObj* pSub = pSql->pSubs[i]; SSqlObj* pSub = pSql->pSubs[i];
...@@ -2261,6 +2272,7 @@ void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs) { ...@@ -2261,6 +2272,7 @@ void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs) {
taos_free_result(pSub); taos_free_result(pSub);
} }
pthread_mutex_unlock(&pSql->subState.mutex);
} }
void tscLockByThread(int64_t *lockedBy) { void tscLockByThread(int64_t *lockedBy) {
...@@ -2365,8 +2377,10 @@ void tscFirstRoundRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { ...@@ -2365,8 +2377,10 @@ void tscFirstRoundRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tscFreeFirstRoundSup(&param); tscFreeFirstRoundSup(&param);
taos_free_result(pSql); taos_free_result(pSql);
pthread_mutex_lock(&pParent->subState.mutex);
pParent->subState.numOfSub = 0; pParent->subState.numOfSub = 0;
tfree(pParent->pSubs); tfree(pParent->pSubs);
pthread_mutex_unlock(&pParent->subState.mutex);
pParent->res.code = code; pParent->res.code = code;
tscAsyncResultOnError(pParent); tscAsyncResultOnError(pParent);
return; return;
...@@ -2469,9 +2483,11 @@ void tscFirstRoundRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { ...@@ -2469,9 +2483,11 @@ void tscFirstRoundRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
tscFreeFirstRoundSup(&param); tscFreeFirstRoundSup(&param);
taos_free_result(pSql); taos_free_result(pSql);
pthread_mutex_lock(&pParent->subState.mutex);
pParent->subState.numOfSub = 0; pParent->subState.numOfSub = 0;
tfree(pParent->pSubs); tfree(pParent->pSubs);
pthread_mutex_unlock(&pParent->subState.mutex);
if (resRows == 0) { if (resRows == 0) {
pParent->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT; pParent->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
(*pParent->fp)(pParent->param, pParent, 0); (*pParent->fp)(pParent->param, pParent, 0);
...@@ -2493,8 +2509,10 @@ void tscFirstRoundCallback(void* param, TAOS_RES* tres, int code) { ...@@ -2493,8 +2509,10 @@ void tscFirstRoundCallback(void* param, TAOS_RES* tres, int code) {
tscFreeFirstRoundSup(&param); tscFreeFirstRoundSup(&param);
taos_free_result(pSql); taos_free_result(pSql);
pthread_mutex_lock(&parent->subState.mutex);
parent->subState.numOfSub = 0; parent->subState.numOfSub = 0;
tfree(parent->pSubs); tfree(parent->pSubs);
pthread_mutex_unlock(&parent->subState.mutex);
parent->res.code = c; parent->res.code = c;
tscAsyncResultOnError(parent); tscAsyncResultOnError(parent);
return; return;
...@@ -3014,7 +3032,6 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO ...@@ -3014,7 +3032,6 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
} }
tscFreeSubobj(userSql); tscFreeSubobj(userSql);
tfree(userSql->pSubs);
userSql->res.code = TSDB_CODE_SUCCESS; userSql->res.code = TSDB_CODE_SUCCESS;
userSql->retry++; userSql->retry++;
...@@ -3382,7 +3399,9 @@ static bool needRetryInsert(SSqlObj* pParentObj, int32_t numOfSub) { ...@@ -3382,7 +3399,9 @@ static bool needRetryInsert(SSqlObj* pParentObj, int32_t numOfSub) {
} }
static void doFreeInsertSupporter(SSqlObj* pSqlObj) { static void doFreeInsertSupporter(SSqlObj* pSqlObj) {
assert(pSqlObj != NULL && pSqlObj->subState.numOfSub > 0); if (pSqlObj == NULL || pSqlObj->subState.numOfSub <= 0) {
return;
}
for(int32_t i = 0; i < pSqlObj->subState.numOfSub; ++i) { for(int32_t i = 0; i < pSqlObj->subState.numOfSub; ++i) {
SSqlObj* pSql = pSqlObj->pSubs[i]; SSqlObj* pSql = pSqlObj->pSubs[i];
......
...@@ -1682,6 +1682,8 @@ void tscFreeSubobj(SSqlObj* pSql) { ...@@ -1682,6 +1682,8 @@ void tscFreeSubobj(SSqlObj* pSql) {
return; return;
} }
pthread_mutex_lock(&pSql->subState.mutex);
tscDebug("0x%"PRIx64" start to free sub SqlObj, numOfSub:%d", pSql->self, pSql->subState.numOfSub); tscDebug("0x%"PRIx64" start to free sub SqlObj, numOfSub:%d", pSql->self, pSql->subState.numOfSub);
for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) { for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
...@@ -1695,12 +1697,14 @@ void tscFreeSubobj(SSqlObj* pSql) { ...@@ -1695,12 +1697,14 @@ void tscFreeSubobj(SSqlObj* pSql) {
pSql->pSubs[i] = NULL; pSql->pSubs[i] = NULL;
} }
if (pSql->subState.states) {
pthread_mutex_destroy(&pSql->subState.mutex);
}
tfree(pSql->subState.states); tfree(pSql->subState.states);
pSql->subState.numOfSub = 0; pSql->subState.numOfSub = 0;
tfree(pSql->pSubs);
pthread_mutex_unlock(&pSql->subState.mutex);
pthread_mutex_destroy(&pSql->subState.mutex);
} }
/** /**
...@@ -1768,9 +1772,6 @@ void tscFreeSqlObj(SSqlObj* pSql) { ...@@ -1768,9 +1772,6 @@ void tscFreeSqlObj(SSqlObj* pSql) {
pSql->fp = NULL; pSql->fp = NULL;
tfree(pSql->sqlstr); tfree(pSql->sqlstr);
tfree(pSql->pBuf); tfree(pSql->pBuf);
tfree(pSql->pSubs);
pSql->subState.numOfSub = 0;
pSql->self = 0; pSql->self = 0;
tscFreeSqlResult(pSql); tscFreeSqlResult(pSql);
...@@ -4161,10 +4162,7 @@ static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) { ...@@ -4161,10 +4162,7 @@ static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) {
} }
tscFreeSubobj(pParentSql); tscFreeSubobj(pParentSql);
tfree(pParentSql->pSubs);
tscFreeSubobj(rootObj); tscFreeSubobj(rootObj);
tfree(rootObj->pSubs);
rootObj->res.code = TSDB_CODE_SUCCESS; rootObj->res.code = TSDB_CODE_SUCCESS;
rootObj->retry++; rootObj->retry++;
...@@ -4207,17 +4205,7 @@ static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) { ...@@ -4207,17 +4205,7 @@ static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) {
int32_t doInitSubState(SSqlObj* pSql, int32_t numOfSubqueries) { int32_t doInitSubState(SSqlObj* pSql, int32_t numOfSubqueries) {
//bug fix. Above doInitSubState level, the loop invocation with the same SSqlObj will be fail. //bug fix. Above doInitSubState level, the loop invocation with the same SSqlObj will be fail.
//assert(pSql->subState.numOfSub == 0 && pSql->pSubs == NULL && pSql->subState.states == NULL); //assert(pSql->subState.numOfSub == 0 && pSql->pSubs == NULL && pSql->subState.states == NULL);
if(pSql->pSubs) { tscFreeSubobj(pSql);
free(pSql->pSubs);
pSql->pSubs = NULL;
}
if(pSql->subState.states) {
free(pSql->subState.states);
pSql->subState.states = NULL;
}
pSql->subState.numOfSub = numOfSubqueries;
pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES); pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES);
pSql->subState.states = calloc(pSql->subState.numOfSub, sizeof(int8_t)); pSql->subState.states = calloc(pSql->subState.numOfSub, sizeof(int8_t));
...@@ -4227,6 +4215,8 @@ int32_t doInitSubState(SSqlObj* pSql, int32_t numOfSubqueries) { ...@@ -4227,6 +4215,8 @@ int32_t doInitSubState(SSqlObj* pSql, int32_t numOfSubqueries) {
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
pSql->subState.numOfSub = numOfSubqueries;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -4332,6 +4322,7 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { ...@@ -4332,6 +4322,7 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
return; return;
_error: _error:
for(int32_t i = 0; i < numOfInit; ++i) { for(int32_t i = 0; i < numOfInit; ++i) {
SSqlObj* p = pSql->pSubs[i]; SSqlObj* p = pSql->pSubs[i];
tscFreeSqlObj(p); tscFreeSqlObj(p);
...@@ -4616,16 +4607,7 @@ void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp) { ...@@ -4616,16 +4607,7 @@ void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp) {
pRes->final = finalBk; pRes->final = finalBk;
pRes->numOfTotal = num; pRes->numOfTotal = num;
pthread_mutex_lock(&pSql->subState.mutex); tscFreeSubobj(pSql);
for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
taos_free_result(pSql->pSubs[i]);
}
tfree(pSql->pSubs);
tfree(pSql->subState.states);
pSql->subState.numOfSub = 0;
pthread_mutex_unlock(&pSql->subState.mutex);
pthread_mutex_destroy(&pSql->subState.mutex);
pSql->fp = fp; pSql->fp = fp;
......
...@@ -9946,7 +9946,7 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SGroupbyExpr* pGroupbyExpr, S ...@@ -9946,7 +9946,7 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SGroupbyExpr* pGroupbyExpr, S
pTableqinfo->pGroupList = taosArrayInit(numOfGroups, POINTER_BYTES); pTableqinfo->pGroupList = taosArrayInit(numOfGroups, POINTER_BYTES);
pTableqinfo->numOfTables = pTableGroupInfo->numOfTables; pTableqinfo->numOfTables = pTableGroupInfo->numOfTables;
pTableqinfo->map = taosHashInit(pTableGroupInfo->numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); pTableqinfo->map = taosHashInit(pTableGroupInfo->numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
} }
pQInfo->pBuf = calloc(pTableGroupInfo->numOfTables, sizeof(STableQueryInfo)); pQInfo->pBuf = calloc(pTableGroupInfo->numOfTables, sizeof(STableQueryInfo));
...@@ -10146,10 +10146,13 @@ static void doDestroyTableQueryInfo(STableGroupInfo* pTableqinfoGroupInfo) { ...@@ -10146,10 +10146,13 @@ static void doDestroyTableQueryInfo(STableGroupInfo* pTableqinfoGroupInfo) {
} }
taosArrayDestroy(&pTableqinfoGroupInfo->pGroupList); taosArrayDestroy(&pTableqinfoGroupInfo->pGroupList);
taosHashCleanup(pTableqinfoGroupInfo->map);
SHashObj *pmap = pTableqinfoGroupInfo->map;
if (pmap == atomic_val_compare_exchange_ptr(&pTableqinfoGroupInfo->map, pmap, NULL)) {
taosHashCleanup(pmap);
}
pTableqinfoGroupInfo->pGroupList = NULL; pTableqinfoGroupInfo->pGroupList = NULL;
pTableqinfoGroupInfo->map = NULL;
pTableqinfoGroupInfo->numOfTables = 0; pTableqinfoGroupInfo->numOfTables = 0;
} }
......
...@@ -611,7 +611,6 @@ void taosHashClear(SHashObj *pHashObj) { ...@@ -611,7 +611,6 @@ void taosHashClear(SHashObj *pHashObj) {
for (int32_t i = 0; i < pHashObj->capacity; ++i) { for (int32_t i = 0; i < pHashObj->capacity; ++i) {
SHashEntry *pEntry = pHashObj->hashList[i]; SHashEntry *pEntry = pHashObj->hashList[i];
if (pEntry->num == 0) { if (pEntry->num == 0) {
assert(pEntry->next == NULL);
continue; continue;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册