diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 9acebc3a28d5ffab6404f4fa847312f13ccc1208..409c41fb1a2d0952090ff2ef90581e37007e3679 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -766,15 +766,15 @@ static void tscKillSTableQuery(SSqlObj *pSql) { } tscAsyncResultOnError(pSubObj); - // taosRelekaseRef(tscObjRef, pSubObj->self); } - pthread_mutex_unlock(&pSql->subState.mutex); } if (pSql->subState.numOfSub <= 0) { tscAsyncResultOnError(pSql); } + pthread_mutex_unlock(&pSql->subState.mutex); } + tscUnlockByThread(&pSql->squeryLock); tscDebug("0x%"PRIx64" super table query cancelled", pSql->self); diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index d0a403f6435b378882e8d5198ec640b90ceede5c..fc72943c6ec445d4e2cc160b729a316861eb373c 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -535,26 +535,13 @@ static SArray* buildVgroupTableByResult(SQueryInfo* pQueryInfo, SArray* pVgroupT * launch secondary stage query to fetch the result that contains timestamp in set */ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { - int32_t numOfSub = 0; SJoinSupporter* pSupporter = NULL; bool success = true; uint32_t stateVersion = 0; { pthread_mutex_lock(&pSql->subState.mutex); - //If the columns are not involved in the final select clause, the corresponding query will not be issued. - for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { - pSupporter = pSql->pSubs[i]->param; - if (taosArrayGetSize(pSupporter->exprList) > 0) { - ++numOfSub; - } - } - - assert(numOfSub > 0); - // scan all subquery, if one sub query has only ts, ignore it - tscDebug("0x%"PRIx64" start to launch secondary subqueries, %d out of %d needs to query", pSql->self, numOfSub, pSql->subState.numOfSub); - for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { SSqlObj *pPrevSub = pSql->pSubs[i]; if (!pPrevSub) continue; @@ -589,6 +576,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { tscClearSubqueryInfo(&pNew->cmd); pSql->pSubs[i] = pNew; + pSql->subState.version ++; SQueryInfo *pQueryInfo = tscGetQueryInfo(&pNew->cmd); pQueryInfo->tsBuf = pTsBuf; // transfer the ownership of timestamp comp-z data to the new created object @@ -1080,6 +1068,11 @@ static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pPar tscDebug("0x%"PRIx64" all subquery retrieve complete, do tags match", pParentSql->self); for (int32_t i = 0; i < joinNum; i++) { + if (!pParentSql->pSubs[i]) { + tscError("0x%"PRIx64"some of subquery objs not exist. numOfSub:%d, i: %d", pParentSql->self, pParentSql->subState.numOfSub, i); + code = TSDB_CODE_FAILED; + break; + } SJoinSupporter* p = pParentSql->pSubs[i]->param; setTidTagType(p, pColSchema->type); @@ -2130,6 +2123,7 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter } pSql->pSubs[tableIndex] = pNew; + pSql->subState.version ++; if (QUERY_IS_JOIN_QUERY(pQueryInfo->type)) { addGroupInfoForSubquery(pSql, pNew, 0, tableIndex); @@ -2269,6 +2263,13 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { { pthread_mutex_lock(&pSql->subState.mutex); for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { + if (pSql->pSubs[i] != NULL) { + tscError("0x%"PRIx64"some of subquery objs already set. numOfSub:%d, i: %d", pSql->self, pSql->subState.numOfSub, i); + code = TSDB_CODE_FAILED; + errflag = 1; + break; + } + SJoinSupporter *pSupporter = tscCreateJoinSupporter(pSql, i); if (pSupporter == NULL) { // failed to create support struct, abort current query tscError("0x%"PRIx64" tableIndex:%d, failed to allocate join support object, abort further query", pSql->self, i); @@ -2284,6 +2285,7 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { break; } + // pSub has been recreated. SSqlObj* pSub = pSql->pSubs[i]; STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSub->cmd, 0); if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo) && (pTableMetaInfo->vgroupList->numOfVgroups == 0)) { @@ -3446,6 +3448,13 @@ static bool needRetryInsert(SSqlObj* pParentObj) { { pthread_mutex_lock(&pParentObj->subState.mutex); for (int32_t i = 0; i < pParentObj->subState.numOfSub; ++i) { + if (!pParentObj->pSubs[i]) { + tscError("0x%"PRIx64"some of subquery objs not exist. numOfSub:%d, i: %d", pParentObj->self, pParentObj->subState.numOfSub, i); + pParentObj->res.code = TSDB_CODE_FAILED; + ret = false; + break; + } + int32_t code = pParentObj->pSubs[i]->res.code; if (code == TSDB_CODE_SUCCESS) { continue; @@ -3655,8 +3664,16 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { tscDebug("0x%"PRIx64" submit data to %d vnode(s)", pSql->self, pSql->subState.numOfSub); while(numOfSub < pSql->subState.numOfSub) { + if (pSql->pSubs[numOfSub] != NULL) { + tscError("0x%"PRIx64"some of subquery objs already set. numOfSub:%d, i: %d", pSql->self, pSql->subState.numOfSub, numOfSub); + code = TSDB_CODE_FAILED; + errflag = 1; + break; + } + SInsertSupporter* pSupporter = calloc(1, sizeof(SInsertSupporter)); if (pSupporter == NULL) { + code = TSDB_CODE_TSC_OUT_OF_MEMORY; errflag = 1; break; } @@ -3667,6 +3684,7 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { SSqlObj *pNew = createSimpleSubObj(pSql, multiVnodeInsertFinalize, pSupporter, TSDB_SQL_INSERT); if (pNew == NULL) { tscError("0x%"PRIx64" failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql->self, numOfSub, strerror(errno)); + code = TSDB_CODE_TSC_OUT_OF_MEMORY; errflag = 1; break; } @@ -3678,10 +3696,11 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { pNew->fetchFp = pNew->fp; pSql->pSubs[numOfSub] = pNew; + pSql->subState.version ++; STableDataBlocks* pTableDataBlock = taosArrayGetP(pCmd->insertParam.pDataBlocks, numOfSub); - pRes->code = tscCopyDataBlockToPayload(pNew, pTableDataBlock); - if (pRes->code == TSDB_CODE_SUCCESS) { + code = tscCopyDataBlockToPayload(pNew, pTableDataBlock); + if (code == TSDB_CODE_SUCCESS) { tscDebug("0x%"PRIx64" sub:%p create subObj success. orderOfSub:%d", pSql->self, pNew, numOfSub); numOfSub++; } else { @@ -3694,7 +3713,7 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { if (numOfSub < pSql->subState.numOfSub) { tscError("0x%"PRIx64" failed to prepare subObj structure and launch sub-insertion", pSql->self); - pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; + pRes->code = code; errflag = 1; } @@ -3725,7 +3744,7 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { return TSDB_CODE_SUCCESS; _error: - return TSDB_CODE_TSC_OUT_OF_MEMORY; + return pRes->code; } static char* getResultBlockPosition(SSqlCmd* pCmd, SSqlRes* pRes, int32_t columnIndex, int16_t* bytes) { diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 5a9ba219674a37b16c63753efb2afcad29b723a4..2af31303ca3a40c81a0e757fd2bd4788e8d6ded3 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -4286,6 +4286,13 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { { pthread_mutex_lock(&pSql->subState.mutex); for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) { + if (pSql->pSubs[i] != NULL) { + tscError("0x%"PRIx64"some of subquery objs already set. numOfSub:%d, i: %d", pSql->self, pSql->subState.numOfSub, i); + code = TSDB_CODE_FAILED; + errflag = 1; + break; + } + SQueryInfo* pSub = taosArrayGetP(pQueryInfo->pUpstream, i); pSql->cmd.active = pSub; @@ -4322,6 +4329,7 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { registerSqlObj(pNew); pSql->pSubs[i] = pNew; + pSql->subState.version ++; SSqlCmd* pCmd = &pNew->cmd; pCmd->command = TSDB_SQL_SELECT; diff --git a/src/kit/taos-tools b/src/kit/taos-tools index 8207c74c4e8a832f2d7c9986b8ffd3356ace7f2d..cf1df1cb05e660d8c79754dabe93386f79a21ce5 160000 --- a/src/kit/taos-tools +++ b/src/kit/taos-tools @@ -1 +1 @@ -Subproject commit 8207c74c4e8a832f2d7c9986b8ffd3356ace7f2d +Subproject commit cf1df1cb05e660d8c79754dabe93386f79a21ce5