未验证 提交 01763823 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #17003 from taosdata/FIX/TD-19011-2.6

enh: guard against possible nullptrs of subquery objs in SSqlObj pSubs additionally
...@@ -766,15 +766,15 @@ static void tscKillSTableQuery(SSqlObj *pSql) { ...@@ -766,15 +766,15 @@ static void tscKillSTableQuery(SSqlObj *pSql) {
} }
tscAsyncResultOnError(pSubObj); tscAsyncResultOnError(pSubObj);
// taosRelekaseRef(tscObjRef, pSubObj->self);
} }
pthread_mutex_unlock(&pSql->subState.mutex); }
if (pSql->subState.numOfSub <= 0) { if (pSql->subState.numOfSub <= 0) {
tscAsyncResultOnError(pSql); tscAsyncResultOnError(pSql);
} }
pthread_mutex_unlock(&pSql->subState.mutex); }
tscUnlockByThread(&pSql->squeryLock); tscUnlockByThread(&pSql->squeryLock);
tscDebug("0x%"PRIx64" super table query cancelled", pSql->self); tscDebug("0x%"PRIx64" super table query cancelled", pSql->self);
......
...@@ -535,26 +535,13 @@ static SArray* buildVgroupTableByResult(SQueryInfo* pQueryInfo, SArray* pVgroupT ...@@ -535,26 +535,13 @@ static SArray* buildVgroupTableByResult(SQueryInfo* pQueryInfo, SArray* pVgroupT
* launch secondary stage query to fetch the result that contains timestamp in set * launch secondary stage query to fetch the result that contains timestamp in set
*/ */
static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
int32_t numOfSub = 0;
SJoinSupporter* pSupporter = NULL; SJoinSupporter* pSupporter = NULL;
bool success = true; bool success = true;
uint32_t stateVersion = 0; uint32_t stateVersion = 0;
{ pthread_mutex_lock(&pSql->subState.mutex); { pthread_mutex_lock(&pSql->subState.mutex);
//If the columns are not involved in the final select clause, the corresponding query will not be issued.
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
pSupporter = pSql->pSubs[i]->param;
if (taosArrayGetSize(pSupporter->exprList) > 0) {
++numOfSub;
}
}
assert(numOfSub > 0);
// scan all subquery, if one sub query has only ts, ignore it // scan all subquery, if one sub query has only ts, ignore it
tscDebug("0x%"PRIx64" start to launch secondary subqueries, %d out of %d needs to query", pSql->self, numOfSub, pSql->subState.numOfSub);
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj *pPrevSub = pSql->pSubs[i]; SSqlObj *pPrevSub = pSql->pSubs[i];
if (!pPrevSub) continue; if (!pPrevSub) continue;
...@@ -589,6 +576,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { ...@@ -589,6 +576,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
tscClearSubqueryInfo(&pNew->cmd); tscClearSubqueryInfo(&pNew->cmd);
pSql->pSubs[i] = pNew; pSql->pSubs[i] = pNew;
pSql->subState.version ++;
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pNew->cmd); SQueryInfo *pQueryInfo = tscGetQueryInfo(&pNew->cmd);
pQueryInfo->tsBuf = pTsBuf; // transfer the ownership of timestamp comp-z data to the new created object pQueryInfo->tsBuf = pTsBuf; // transfer the ownership of timestamp comp-z data to the new created object
...@@ -1080,6 +1068,11 @@ static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pPar ...@@ -1080,6 +1068,11 @@ static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pPar
tscDebug("0x%"PRIx64" all subquery retrieve <tid, tags> complete, do tags match", pParentSql->self); tscDebug("0x%"PRIx64" all subquery retrieve <tid, tags> complete, do tags match", pParentSql->self);
for (int32_t i = 0; i < joinNum; i++) { for (int32_t i = 0; i < joinNum; i++) {
if (!pParentSql->pSubs[i]) {
tscError("0x%"PRIx64"some of subquery objs not exist. numOfSub:%d, i: %d", pParentSql->self, pParentSql->subState.numOfSub, i);
code = TSDB_CODE_FAILED;
break;
}
SJoinSupporter* p = pParentSql->pSubs[i]->param; SJoinSupporter* p = pParentSql->pSubs[i]->param;
setTidTagType(p, pColSchema->type); setTidTagType(p, pColSchema->type);
...@@ -2130,6 +2123,7 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter ...@@ -2130,6 +2123,7 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
} }
pSql->pSubs[tableIndex] = pNew; pSql->pSubs[tableIndex] = pNew;
pSql->subState.version ++;
if (QUERY_IS_JOIN_QUERY(pQueryInfo->type)) { if (QUERY_IS_JOIN_QUERY(pQueryInfo->type)) {
addGroupInfoForSubquery(pSql, pNew, 0, tableIndex); addGroupInfoForSubquery(pSql, pNew, 0, tableIndex);
...@@ -2269,6 +2263,13 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { ...@@ -2269,6 +2263,13 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) {
{ pthread_mutex_lock(&pSql->subState.mutex); { pthread_mutex_lock(&pSql->subState.mutex);
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
if (pSql->pSubs[i] != NULL) {
tscError("0x%"PRIx64"some of subquery objs already set. numOfSub:%d, i: %d", pSql->self, pSql->subState.numOfSub, i);
code = TSDB_CODE_FAILED;
errflag = 1;
break;
}
SJoinSupporter *pSupporter = tscCreateJoinSupporter(pSql, i); SJoinSupporter *pSupporter = tscCreateJoinSupporter(pSql, i);
if (pSupporter == NULL) { // failed to create support struct, abort current query if (pSupporter == NULL) { // failed to create support struct, abort current query
tscError("0x%"PRIx64" tableIndex:%d, failed to allocate join support object, abort further query", pSql->self, i); tscError("0x%"PRIx64" tableIndex:%d, failed to allocate join support object, abort further query", pSql->self, i);
...@@ -2284,6 +2285,7 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { ...@@ -2284,6 +2285,7 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) {
break; break;
} }
// pSub has been recreated.
SSqlObj* pSub = pSql->pSubs[i]; SSqlObj* pSub = pSql->pSubs[i];
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSub->cmd, 0); STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSub->cmd, 0);
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo) && (pTableMetaInfo->vgroupList->numOfVgroups == 0)) { if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo) && (pTableMetaInfo->vgroupList->numOfVgroups == 0)) {
...@@ -3446,6 +3448,13 @@ static bool needRetryInsert(SSqlObj* pParentObj) { ...@@ -3446,6 +3448,13 @@ static bool needRetryInsert(SSqlObj* pParentObj) {
{ pthread_mutex_lock(&pParentObj->subState.mutex); { pthread_mutex_lock(&pParentObj->subState.mutex);
for (int32_t i = 0; i < pParentObj->subState.numOfSub; ++i) { for (int32_t i = 0; i < pParentObj->subState.numOfSub; ++i) {
if (!pParentObj->pSubs[i]) {
tscError("0x%"PRIx64"some of subquery objs not exist. numOfSub:%d, i: %d", pParentObj->self, pParentObj->subState.numOfSub, i);
pParentObj->res.code = TSDB_CODE_FAILED;
ret = false;
break;
}
int32_t code = pParentObj->pSubs[i]->res.code; int32_t code = pParentObj->pSubs[i]->res.code;
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
continue; continue;
...@@ -3655,8 +3664,16 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { ...@@ -3655,8 +3664,16 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
tscDebug("0x%"PRIx64" submit data to %d vnode(s)", pSql->self, pSql->subState.numOfSub); tscDebug("0x%"PRIx64" submit data to %d vnode(s)", pSql->self, pSql->subState.numOfSub);
while(numOfSub < pSql->subState.numOfSub) { while(numOfSub < pSql->subState.numOfSub) {
if (pSql->pSubs[numOfSub] != NULL) {
tscError("0x%"PRIx64"some of subquery objs already set. numOfSub:%d, i: %d", pSql->self, pSql->subState.numOfSub, numOfSub);
code = TSDB_CODE_FAILED;
errflag = 1;
break;
}
SInsertSupporter* pSupporter = calloc(1, sizeof(SInsertSupporter)); SInsertSupporter* pSupporter = calloc(1, sizeof(SInsertSupporter));
if (pSupporter == NULL) { if (pSupporter == NULL) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
errflag = 1; errflag = 1;
break; break;
} }
...@@ -3667,6 +3684,7 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { ...@@ -3667,6 +3684,7 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
SSqlObj *pNew = createSimpleSubObj(pSql, multiVnodeInsertFinalize, pSupporter, TSDB_SQL_INSERT); SSqlObj *pNew = createSimpleSubObj(pSql, multiVnodeInsertFinalize, pSupporter, TSDB_SQL_INSERT);
if (pNew == NULL) { if (pNew == NULL) {
tscError("0x%"PRIx64" failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql->self, numOfSub, strerror(errno)); tscError("0x%"PRIx64" failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql->self, numOfSub, strerror(errno));
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
errflag = 1; errflag = 1;
break; break;
} }
...@@ -3678,10 +3696,11 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { ...@@ -3678,10 +3696,11 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
pNew->fetchFp = pNew->fp; pNew->fetchFp = pNew->fp;
pSql->pSubs[numOfSub] = pNew; pSql->pSubs[numOfSub] = pNew;
pSql->subState.version ++;
STableDataBlocks* pTableDataBlock = taosArrayGetP(pCmd->insertParam.pDataBlocks, numOfSub); STableDataBlocks* pTableDataBlock = taosArrayGetP(pCmd->insertParam.pDataBlocks, numOfSub);
pRes->code = tscCopyDataBlockToPayload(pNew, pTableDataBlock); code = tscCopyDataBlockToPayload(pNew, pTableDataBlock);
if (pRes->code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
tscDebug("0x%"PRIx64" sub:%p create subObj success. orderOfSub:%d", pSql->self, pNew, numOfSub); tscDebug("0x%"PRIx64" sub:%p create subObj success. orderOfSub:%d", pSql->self, pNew, numOfSub);
numOfSub++; numOfSub++;
} else { } else {
...@@ -3694,7 +3713,7 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { ...@@ -3694,7 +3713,7 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
if (numOfSub < pSql->subState.numOfSub) { if (numOfSub < pSql->subState.numOfSub) {
tscError("0x%"PRIx64" failed to prepare subObj structure and launch sub-insertion", pSql->self); tscError("0x%"PRIx64" failed to prepare subObj structure and launch sub-insertion", pSql->self);
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; pRes->code = code;
errflag = 1; errflag = 1;
} }
...@@ -3725,7 +3744,7 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { ...@@ -3725,7 +3744,7 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_error: _error:
return TSDB_CODE_TSC_OUT_OF_MEMORY; return pRes->code;
} }
static char* getResultBlockPosition(SSqlCmd* pCmd, SSqlRes* pRes, int32_t columnIndex, int16_t* bytes) { static char* getResultBlockPosition(SSqlCmd* pCmd, SSqlRes* pRes, int32_t columnIndex, int16_t* bytes) {
......
...@@ -4293,6 +4293,13 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { ...@@ -4293,6 +4293,13 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
{ pthread_mutex_lock(&pSql->subState.mutex); { pthread_mutex_lock(&pSql->subState.mutex);
for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) { for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
if (pSql->pSubs[i] != NULL) {
tscError("0x%"PRIx64"some of subquery objs already set. numOfSub:%d, i: %d", pSql->self, pSql->subState.numOfSub, i);
code = TSDB_CODE_FAILED;
errflag = 1;
break;
}
SQueryInfo* pSub = taosArrayGetP(pQueryInfo->pUpstream, i); SQueryInfo* pSub = taosArrayGetP(pQueryInfo->pUpstream, i);
pSql->cmd.active = pSub; pSql->cmd.active = pSub;
...@@ -4329,6 +4336,7 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { ...@@ -4329,6 +4336,7 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
registerSqlObj(pNew); registerSqlObj(pNew);
pSql->pSubs[i] = pNew; pSql->pSubs[i] = pNew;
pSql->subState.version ++;
SSqlCmd* pCmd = &pNew->cmd; SSqlCmd* pCmd = &pNew->cmd;
pCmd->command = TSDB_SQL_SELECT; pCmd->command = TSDB_SQL_SELECT;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册