提交 1015b185 编写于 作者: B Benguang Zhao

enh: guard against additional nullptrs of subquery objs in SSqlObj pSubs

上级 062e868f
......@@ -766,15 +766,15 @@ static void tscKillSTableQuery(SSqlObj *pSql) {
}
tscAsyncResultOnError(pSubObj);
// taosRelekaseRef(tscObjRef, pSubObj->self);
}
pthread_mutex_unlock(&pSql->subState.mutex); }
if (pSql->subState.numOfSub <= 0) {
tscAsyncResultOnError(pSql);
}
pthread_mutex_unlock(&pSql->subState.mutex); }
tscUnlockByThread(&pSql->squeryLock);
tscDebug("0x%"PRIx64" super table query cancelled", pSql->self);
......
......@@ -535,26 +535,13 @@ static SArray* buildVgroupTableByResult(SQueryInfo* pQueryInfo, SArray* pVgroupT
* launch secondary stage query to fetch the result that contains timestamp in set
*/
static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
int32_t numOfSub = 0;
SJoinSupporter* pSupporter = NULL;
bool success = true;
uint32_t stateVersion = 0;
{ pthread_mutex_lock(&pSql->subState.mutex);
//If the columns are not involved in the final select clause, the corresponding query will not be issued.
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
pSupporter = pSql->pSubs[i]->param;
if (taosArrayGetSize(pSupporter->exprList) > 0) {
++numOfSub;
}
}
assert(numOfSub > 0);
// scan all subquery, if one sub query has only ts, ignore it
tscDebug("0x%"PRIx64" start to launch secondary subqueries, %d out of %d needs to query", pSql->self, numOfSub, pSql->subState.numOfSub);
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj *pPrevSub = pSql->pSubs[i];
if (!pPrevSub) continue;
......@@ -589,6 +576,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
tscClearSubqueryInfo(&pNew->cmd);
pSql->pSubs[i] = pNew;
pSql->subState.version ++;
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pNew->cmd);
pQueryInfo->tsBuf = pTsBuf; // transfer the ownership of timestamp comp-z data to the new created object
......@@ -1080,6 +1068,11 @@ static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pPar
tscDebug("0x%"PRIx64" all subquery retrieve <tid, tags> complete, do tags match", pParentSql->self);
for (int32_t i = 0; i < joinNum; i++) {
if (!pParentSql->pSubs[i]) {
tscError("0x%"PRIx64"some of subquery objs not exist. numOfSub:%d, i: %d", pParentSql->self, pParentSql->subState.numOfSub, i);
code = TSDB_CODE_FAILED;
break;
}
SJoinSupporter* p = pParentSql->pSubs[i]->param;
setTidTagType(p, pColSchema->type);
......@@ -2130,6 +2123,7 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
}
pSql->pSubs[tableIndex] = pNew;
pSql->subState.version ++;
if (QUERY_IS_JOIN_QUERY(pQueryInfo->type)) {
addGroupInfoForSubquery(pSql, pNew, 0, tableIndex);
......@@ -2269,6 +2263,13 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) {
{ pthread_mutex_lock(&pSql->subState.mutex);
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
if (pSql->pSubs[i] != NULL) {
tscError("0x%"PRIx64"some of subquery objs already set. numOfSub:%d, i: %d", pSql->self, pSql->subState.numOfSub, i);
code = TSDB_CODE_FAILED;
errflag = 1;
break;
}
SJoinSupporter *pSupporter = tscCreateJoinSupporter(pSql, i);
if (pSupporter == NULL) { // failed to create support struct, abort current query
tscError("0x%"PRIx64" tableIndex:%d, failed to allocate join support object, abort further query", pSql->self, i);
......@@ -2284,6 +2285,7 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) {
break;
}
// pSub has been recreated.
SSqlObj* pSub = pSql->pSubs[i];
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSub->cmd, 0);
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo) && (pTableMetaInfo->vgroupList->numOfVgroups == 0)) {
......@@ -3446,6 +3448,13 @@ static bool needRetryInsert(SSqlObj* pParentObj) {
{ pthread_mutex_lock(&pParentObj->subState.mutex);
for (int32_t i = 0; i < pParentObj->subState.numOfSub; ++i) {
if (!pParentObj->pSubs[i]) {
tscError("0x%"PRIx64"some of subquery objs not exist. numOfSub:%d, i: %d", pParentObj->self, pParentObj->subState.numOfSub, i);
pParentObj->res.code = TSDB_CODE_FAILED;
ret = false;
break;
}
int32_t code = pParentObj->pSubs[i]->res.code;
if (code == TSDB_CODE_SUCCESS) {
continue;
......@@ -3655,8 +3664,16 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
tscDebug("0x%"PRIx64" submit data to %d vnode(s)", pSql->self, pSql->subState.numOfSub);
while(numOfSub < pSql->subState.numOfSub) {
if (pSql->pSubs[numOfSub] != NULL) {
tscError("0x%"PRIx64"some of subquery objs already set. numOfSub:%d, i: %d", pSql->self, pSql->subState.numOfSub, numOfSub);
code = TSDB_CODE_FAILED;
errflag = 1;
break;
}
SInsertSupporter* pSupporter = calloc(1, sizeof(SInsertSupporter));
if (pSupporter == NULL) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
errflag = 1;
break;
}
......@@ -3667,6 +3684,7 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
SSqlObj *pNew = createSimpleSubObj(pSql, multiVnodeInsertFinalize, pSupporter, TSDB_SQL_INSERT);
if (pNew == NULL) {
tscError("0x%"PRIx64" failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql->self, numOfSub, strerror(errno));
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
errflag = 1;
break;
}
......@@ -3678,10 +3696,11 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
pNew->fetchFp = pNew->fp;
pSql->pSubs[numOfSub] = pNew;
pSql->subState.version ++;
STableDataBlocks* pTableDataBlock = taosArrayGetP(pCmd->insertParam.pDataBlocks, numOfSub);
pRes->code = tscCopyDataBlockToPayload(pNew, pTableDataBlock);
if (pRes->code == TSDB_CODE_SUCCESS) {
code = tscCopyDataBlockToPayload(pNew, pTableDataBlock);
if (code == TSDB_CODE_SUCCESS) {
tscDebug("0x%"PRIx64" sub:%p create subObj success. orderOfSub:%d", pSql->self, pNew, numOfSub);
numOfSub++;
} else {
......@@ -3694,7 +3713,7 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
if (numOfSub < pSql->subState.numOfSub) {
tscError("0x%"PRIx64" failed to prepare subObj structure and launch sub-insertion", pSql->self);
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
pRes->code = code;
errflag = 1;
}
......@@ -3725,7 +3744,7 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
return TSDB_CODE_SUCCESS;
_error:
return TSDB_CODE_TSC_OUT_OF_MEMORY;
return pRes->code;
}
static char* getResultBlockPosition(SSqlCmd* pCmd, SSqlRes* pRes, int32_t columnIndex, int16_t* bytes) {
......
......@@ -4293,6 +4293,13 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
{ pthread_mutex_lock(&pSql->subState.mutex);
for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
if (pSql->pSubs[i] != NULL) {
tscError("0x%"PRIx64"some of subquery objs already set. numOfSub:%d, i: %d", pSql->self, pSql->subState.numOfSub, i);
code = TSDB_CODE_FAILED;
errflag = 1;
break;
}
SQueryInfo* pSub = taosArrayGetP(pQueryInfo->pUpstream, i);
pSql->cmd.active = pSub;
......@@ -4329,6 +4336,7 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
registerSqlObj(pNew);
pSql->pSubs[i] = pNew;
pSql->subState.version ++;
SSqlCmd* pCmd = &pNew->cmd;
pCmd->command = TSDB_SQL_SELECT;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册