diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index d4dbec72236e1c7d321e4be139295cd6daad0bf3..568b45f5ddb802524567b1930162855ad5e9cba1 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -742,15 +742,15 @@ static void tscKillSTableQuery(SSqlObj *pSql) { } tscAsyncResultOnError(pSubObj); - // taosRelekaseRef(tscObjRef, pSubObj->self); } - pthread_mutex_unlock(&pSql->subState.mutex); } if (pSql->subState.numOfSub <= 0) { tscAsyncResultOnError(pSql); } + pthread_mutex_unlock(&pSql->subState.mutex); } + tscUnlockByThread(&pSql->squeryLock); tscDebug("0x%"PRIx64" super table query cancelled", pSql->self); diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index ae2a3f33f9b7afb3b1950c361a664051201dc4ef..28daa227a05d80d5c0986d500abcaf917eb66efe 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -535,26 +535,13 @@ static SArray* buildVgroupTableByResult(SQueryInfo* pQueryInfo, SArray* pVgroupT * launch secondary stage query to fetch the result that contains timestamp in set */ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { - int32_t numOfSub = 0; SJoinSupporter* pSupporter = NULL; bool success = true; uint32_t stateVersion = 0; { pthread_mutex_lock(&pSql->subState.mutex); - //If the columns are not involved in the final select clause, the corresponding query will not be issued. - for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { - pSupporter = pSql->pSubs[i]->param; - if (taosArrayGetSize(pSupporter->exprList) > 0) { - ++numOfSub; - } - } - - assert(numOfSub > 0); - // scan all subquery, if one sub query has only ts, ignore it - tscDebug("0x%"PRIx64" start to launch secondary subqueries, %d out of %d needs to query", pSql->self, numOfSub, pSql->subState.numOfSub); - for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { SSqlObj *pPrevSub = pSql->pSubs[i]; if (!pPrevSub) continue; @@ -589,6 +576,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { tscClearSubqueryInfo(&pNew->cmd); pSql->pSubs[i] = pNew; + pSql->subState.version ++; SQueryInfo *pQueryInfo = tscGetQueryInfo(&pNew->cmd); pQueryInfo->tsBuf = pTsBuf; // transfer the ownership of timestamp comp-z data to the new created object @@ -1080,6 +1068,11 @@ static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pPar tscDebug("0x%"PRIx64" all subquery retrieve complete, do tags match", pParentSql->self); for (int32_t i = 0; i < joinNum; i++) { + if (!pParentSql->pSubs[i]) { + tscError("0x%"PRIx64"some of subquery objs not exist. numOfSub:%d, i: %d", pParentSql->self, pParentSql->subState.numOfSub, i); + code = TSDB_CODE_FAILED; + break; + } SJoinSupporter* p = pParentSql->pSubs[i]->param; setTidTagType(p, pColSchema->type); @@ -2132,6 +2125,7 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter } pSql->pSubs[tableIndex] = pNew; + pSql->subState.version ++; if (QUERY_IS_JOIN_QUERY(pQueryInfo->type)) { addGroupInfoForSubquery(pSql, pNew, 0, tableIndex); @@ -2261,7 +2255,9 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { int32_t code = TSDB_CODE_SUCCESS; code = doReInitSubState(pSql, pQueryInfo->numOfTables); - assert (code == TSDB_CODE_SUCCESS && "Out of memory"); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } uint32_t stateVersion = 0; int errflag = 0; @@ -2269,6 +2265,13 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { { pthread_mutex_lock(&pSql->subState.mutex); for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { + if (pSql->pSubs[i] != NULL) { + tscError("0x%"PRIx64"some of subquery objs already set. numOfSub:%d, i: %d", pSql->self, pSql->subState.numOfSub, i); + code = TSDB_CODE_FAILED; + errflag = 1; + break; + } + SJoinSupporter *pSupporter = tscCreateJoinSupporter(pSql, i); if (pSupporter == NULL) { // failed to create support struct, abort current query tscError("0x%"PRIx64" tableIndex:%d, failed to allocate join support object, abort further query", pSql->self, i); @@ -2284,6 +2287,7 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { break; } + // pSub has been recreated. SSqlObj* pSub = pSql->pSubs[i]; STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSub->cmd, 0); if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo) && (pTableMetaInfo->vgroupList->numOfVgroups == 0)) { @@ -3446,6 +3450,13 @@ static bool needRetryInsert(SSqlObj* pParentObj) { { pthread_mutex_lock(&pParentObj->subState.mutex); for (int32_t i = 0; i < pParentObj->subState.numOfSub; ++i) { + if (!pParentObj->pSubs[i]) { + tscError("0x%"PRIx64"some of subquery objs not exist. numOfSub:%d, i: %d", pParentObj->self, pParentObj->subState.numOfSub, i); + pParentObj->res.code = TSDB_CODE_FAILED; + ret = false; + break; + } + int32_t code = pParentObj->pSubs[i]->res.code; if (code == TSDB_CODE_SUCCESS) { continue; @@ -3655,8 +3666,16 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { tscDebug("0x%"PRIx64" submit data to %d vnode(s)", pSql->self, pSql->subState.numOfSub); while(numOfSub < pSql->subState.numOfSub) { + if (pSql->pSubs[numOfSub] != NULL) { + tscError("0x%"PRIx64"some of subquery objs already set. numOfSub:%d, i: %d", pSql->self, pSql->subState.numOfSub, numOfSub); + code = TSDB_CODE_FAILED; + errflag = 1; + break; + } + SInsertSupporter* pSupporter = calloc(1, sizeof(SInsertSupporter)); if (pSupporter == NULL) { + code = TSDB_CODE_TSC_OUT_OF_MEMORY; errflag = 1; break; } @@ -3667,6 +3686,7 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { SSqlObj *pNew = createSimpleSubObj(pSql, multiVnodeInsertFinalize, pSupporter, TSDB_SQL_INSERT); if (pNew == NULL) { tscError("0x%"PRIx64" failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql->self, numOfSub, strerror(errno)); + code = TSDB_CODE_TSC_OUT_OF_MEMORY; errflag = 1; break; } @@ -3678,10 +3698,11 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { pNew->fetchFp = pNew->fp; pSql->pSubs[numOfSub] = pNew; + pSql->subState.version ++; STableDataBlocks* pTableDataBlock = taosArrayGetP(pCmd->insertParam.pDataBlocks, numOfSub); - pRes->code = tscCopyDataBlockToPayload(pNew, pTableDataBlock); - if (pRes->code == TSDB_CODE_SUCCESS) { + code = tscCopyDataBlockToPayload(pNew, pTableDataBlock); + if (code == TSDB_CODE_SUCCESS) { tscDebug("0x%"PRIx64" sub:%p create subObj success. orderOfSub:%d", pSql->self, pNew, numOfSub); numOfSub++; } else { @@ -3694,7 +3715,7 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { if (numOfSub < pSql->subState.numOfSub) { tscError("0x%"PRIx64" failed to prepare subObj structure and launch sub-insertion", pSql->self); - pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; + pRes->code = code; errflag = 1; } @@ -3725,7 +3746,7 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { return TSDB_CODE_SUCCESS; _error: - return TSDB_CODE_TSC_OUT_OF_MEMORY; + return pRes->code; } static char* getResultBlockPosition(SSqlCmd* pCmd, SSqlRes* pRes, int32_t columnIndex, int16_t* bytes) { diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 6a33952a2f484ae049f7340a20f5d51a764e1a05..9944c22e89a558a64b702a34208be9bdcfa00fcc 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -4247,6 +4247,13 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { { pthread_mutex_lock(&pSql->subState.mutex); for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) { + if (pSql->pSubs[i] != NULL) { + tscError("0x%"PRIx64"some of subquery objs already set. numOfSub:%d, i: %d", pSql->self, pSql->subState.numOfSub, i); + code = TSDB_CODE_FAILED; + errflag = 1; + break; + } + SQueryInfo* pSub = taosArrayGetP(pQueryInfo->pUpstream, i); pSql->cmd.active = pSub; @@ -4283,6 +4290,7 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { registerSqlObj(pNew); pSql->pSubs[i] = pNew; + pSql->subState.version ++; SSqlCmd* pCmd = &pNew->cmd; pCmd->command = TSDB_SQL_SELECT;