From f5aa8a20002f65984a9eacfe99826ca90d234318 Mon Sep 17 00:00:00 2001 From: wpan Date: Tue, 7 Sep 2021 09:08:20 +0800 Subject: [PATCH] td-6502 --- src/client/inc/tscUtil.h | 2 +- src/client/src/tscSubquery.c | 111 ++++++++++++++++++++--------------- src/util/src/tref.c | 2 +- 3 files changed, 66 insertions(+), 49 deletions(-) diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 81e430f5c2..fcf61580c8 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -55,7 +55,7 @@ typedef struct STidTags { #pragma pack(pop) typedef struct SJoinSupporter { - SSqlObj* pObj; // parent SqlObj + int64_t pObj; // parent SqlObj int32_t subqueryIndex; // index of sub query SInterval interval; SLimitVal limit; // limit info diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 8e579ad221..e16989e23c 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -386,7 +386,7 @@ SJoinSupporter* tscCreateJoinSupporter(SSqlObj* pSql, int32_t index) { return NULL; } - pSupporter->pObj = pSql; + pSupporter->pObj = pSql->self; pSupporter->subqueryIndex = index; SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd); @@ -1119,7 +1119,8 @@ bool emptyTagList(SArray* resList, int32_t size) { static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRows) { SJoinSupporter* pSupporter = (SJoinSupporter*)param; - SSqlObj* pParentSql = pSupporter->pObj; + SSqlObj* pParentSql = (SSqlObj*)taosAcquireRef(tscObjRef, (int64_t)pSupporter->pObj); + if (pParentSql == NULL) return; SSqlObj* pSql = (SSqlObj*)tres; SSqlCmd* pCmd = &pSql->cmd; @@ -1133,16 +1134,15 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow if (pParentSql->res.code != TSDB_CODE_SUCCESS) { tscError("0x%"PRIx64" abort query due to other subquery failure. code:%d, global code:%d", pSql->self, numOfRows, pParentSql->res.code); if (quitAllSubquery(pSql, pParentSql, pSupporter)) { - return; + goto _return; } if (!tscReparseSql(pParentSql->rootObj, pParentSql->res.code)) { - return; + goto _return; } tscAsyncResultOnError(pParentSql); - - return; + goto _return; } // check for the error code firstly @@ -1154,15 +1154,15 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow pParentSql->res.code = numOfRows; if (quitAllSubquery(pSql, pParentSql, pSupporter)) { - return; + goto _return; } if (!tscReparseSql(pParentSql->rootObj, pParentSql->res.code)) { - return; + goto _return; } tscAsyncResultOnError(pParentSql); - return; + goto _return; } // keep the results in memory @@ -1177,11 +1177,11 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow pParentSql->res.code = TAOS_SYSTEM_ERROR(errno); if (quitAllSubquery(pSql, pParentSql, pSupporter)) { - return; + goto _return; } tscAsyncResultOnError(pParentSql); - return; + goto _return; } pSupporter->pIdTagList = tmp; @@ -1193,7 +1193,7 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow // query not completed, continue to retrieve tid + tag tuples if (!pRes->completed) { taos_fetch_rows_a(tres, tidTagRetrieveCallback, param); - return; + goto _return; } } @@ -1215,14 +1215,14 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow // set the callback function pSql->fp = tscJoinQueryCallback; tscBuildAndSendRequest(pSql, NULL); - return; + goto _return; } // no data exists in next vnode, mark the query completed // only when there is no subquery exits any more, proceeds to get the intersect of the tuple sets. if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) { //tscDebug("0x%"PRIx64" tagRetrieve:%p,%d completed, total:%d", pParentSql->self, tres, pSupporter->subqueryIndex, pParentSql->subState.numOfSub); - return; + goto _return; } SArray* resList = taosArrayInit(pParentSql->subState.numOfSub, sizeof(SArray *)); @@ -1234,7 +1234,7 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow tscAsyncResultOnError(pParentSql); taosArrayDestroy(resList); - return; + goto _return; } if (emptyTagList(resList, pParentSql->subState.numOfSub)) { // no results,return. @@ -1278,12 +1278,16 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow } taosArrayDestroy(resList); + +_return: + taosReleaseRef(tscObjRef, (int64_t)pSupporter->pObj); } static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRows) { SJoinSupporter* pSupporter = (SJoinSupporter*)param; - SSqlObj* pParentSql = pSupporter->pObj; + SSqlObj* pParentSql = (SSqlObj*)taosAcquireRef(tscObjRef, (int64_t)pSupporter->pObj); + if (pParentSql == NULL) return; SSqlObj* pSql = (SSqlObj*)tres; SSqlCmd* pCmd = &pSql->cmd; @@ -1295,16 +1299,16 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow if (pParentSql->res.code != TSDB_CODE_SUCCESS) { tscError("0x%"PRIx64" abort query due to other subquery failure. code:%d, global code:%d", pSql->self, numOfRows, pParentSql->res.code); if (quitAllSubquery(pSql, pParentSql, pSupporter)){ - return; + goto _return; } if (!tscReparseSql(pParentSql->rootObj, pParentSql->res.code)) { - return; + goto _return; } tscAsyncResultOnError(pParentSql); - return; + goto _return; } // check for the error code firstly @@ -1315,15 +1319,15 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow pParentSql->res.code = numOfRows; if (quitAllSubquery(pSql, pParentSql, pSupporter)){ - return; + goto _return; } if (!tscReparseSql(pParentSql->rootObj, pParentSql->res.code)) { - return; + goto _return; } tscAsyncResultOnError(pParentSql); - return; + goto _return; } if (numOfRows > 0) { // write the compressed timestamp to disk file @@ -1335,12 +1339,12 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow pParentSql->res.code = TAOS_SYSTEM_ERROR(errno); if (quitAllSubquery(pSql, pParentSql, pSupporter)) { - return; + goto _return; } tscAsyncResultOnError(pParentSql); - return; + goto _return; } } @@ -1354,12 +1358,12 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow pParentSql->res.code = TAOS_SYSTEM_ERROR(errno); if (quitAllSubquery(pSql, pParentSql, pSupporter)){ - return; + goto _return; } tscAsyncResultOnError(pParentSql); - return; + goto _return; } if (pSupporter->pTSBuf == NULL) { @@ -1378,7 +1382,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow pRes->row = pRes->numOfRows; taos_fetch_rows_a(tres, tsCompRetrieveCallback, param); - return; + goto _return; } } @@ -1406,11 +1410,11 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow // set the callback function pSql->fp = tscJoinQueryCallback; tscBuildAndSendRequest(pSql, NULL); - return; + goto _return; } if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) { - return; + goto _return; } tscDebug("0x%"PRIx64" all subquery retrieve ts complete, do ts block intersect", pParentSql->self); @@ -1424,7 +1428,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow // set no result command pParentSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT; (*pParentSql->fp)(pParentSql->param, pParentSql, 0); - return; + goto _return; } // launch the query the retrieve actual results from vnode along with the filtered timestamp @@ -1433,12 +1437,16 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow //update the vgroup that involved in real data query tscLaunchRealSubqueries(pParentSql); + +_return: + taosReleaseRef(tscObjRef, (int64_t)pSupporter->pObj); } static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfRows) { SJoinSupporter* pSupporter = (SJoinSupporter*)param; - SSqlObj* pParentSql = pSupporter->pObj; + SSqlObj* pParentSql = (SSqlObj*)taosAcquireRef(tscObjRef, (int64_t)pSupporter->pObj); + if (pParentSql == NULL) return; SSqlObj* pSql = (SSqlObj*)tres; SSqlCmd* pCmd = &pSql->cmd; @@ -1449,15 +1457,15 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR if (pParentSql->res.code != TSDB_CODE_SUCCESS) { tscError("0x%"PRIx64" abort query due to other subquery failure. code:%d, global code:%d", pSql->self, numOfRows, pParentSql->res.code); if (quitAllSubquery(pSql, pParentSql, pSupporter)) { - return; + goto _return; } if (!tscReparseSql(pParentSql->rootObj, pParentSql->res.code)) { - return; + goto _return; } tscAsyncResultOnError(pParentSql); - return; + goto _return; } @@ -1468,7 +1476,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR tscError("0x%"PRIx64" retrieve failed, index:%d, code:%s", pSql->self, pSupporter->subqueryIndex, tstrerror(numOfRows)); tscAsyncResultOnError(pParentSql); - return; + goto _return; } if (numOfRows >= 0) { @@ -1494,7 +1502,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR pSql->fp = tscJoinQueryCallback; tscBuildAndSendRequest(pSql, NULL); - return; + goto _return; } else { tscDebug("0x%"PRIx64" no result in current subquery anymore", pSql->self); } @@ -1502,7 +1510,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) { //tscDebug("0x%"PRIx64" sub:0x%"PRIx64",%d completed, total:%d", pParentSql->self, pSql->self, pSupporter->subqueryIndex, pState->numOfSub); - return; + goto _return; } tscDebug("0x%"PRIx64" all %d secondary subqueries retrieval completed, code:%d", pSql->self, pState->numOfSub, pParentSql->res.code); @@ -1540,6 +1548,9 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR // data has retrieved to client, build the join results tscBuildResFromSubqueries(pParentSql); + +_return: + taosReleaseRef(tscObjRef, (int64_t)pSupporter->pObj); } void tscFetchDatablockForSubquery(SSqlObj* pSql) { @@ -1787,7 +1798,8 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { SSqlObj* pSql = (SSqlObj*)tres; SJoinSupporter* pSupporter = (SJoinSupporter*)param; - SSqlObj* pParentSql = pSupporter->pObj; + SSqlObj* pParentSql = (SSqlObj*)taosAcquireRef(tscObjRef, (int64_t)pSupporter->pObj); + if (pParentSql == NULL) return; // There is only one subquery and table for each subquery. SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd); @@ -1799,16 +1811,16 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { if (pParentSql->res.code != TSDB_CODE_SUCCESS) { tscError("0x%"PRIx64" abort query due to other subquery failure. code:%d, global code:%d", pSql->self, code, pParentSql->res.code); if (quitAllSubquery(pSql, pParentSql, pSupporter)) { - return; + goto _return; } if (!tscReparseSql(pParentSql->rootObj, pParentSql->res.code)) { - return; + goto _return; } tscAsyncResultOnError(pParentSql); - return; + goto _return; } // TODO here retry is required, not directly returns to client @@ -1819,16 +1831,16 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { pParentSql->res.code = code; if (quitAllSubquery(pSql, pParentSql, pSupporter)) { - return; + goto _return; } if (!tscReparseSql(pParentSql->rootObj, pParentSql->res.code)) { - return; + goto _return; } tscAsyncResultOnError(pParentSql); - return; + goto _return; } // retrieve tuples from vnode @@ -1836,7 +1848,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { pSql->fp = tidTagRetrieveCallback; pSql->cmd.command = TSDB_SQL_FETCH; tscBuildAndSendRequest(pSql, NULL); - return; + goto _return; } // retrieve ts_comp info from vnode @@ -1844,13 +1856,13 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { pSql->fp = tsCompRetrieveCallback; pSql->cmd.command = TSDB_SQL_FETCH; tscBuildAndSendRequest(pSql, NULL); - return; + goto _return; } // In case of consequence query from other vnode, do not wait for other query response here. if (!(pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0))) { if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) { - return; + goto _return; } } @@ -1873,6 +1885,11 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { tscAsyncResultOnError(pParentSql); } } + + +_return: + taosReleaseRef(tscObjRef, (int64_t)pSupporter->pObj); + } ///////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/util/src/tref.c b/src/util/src/tref.c index 7d64bd1f83..33323889c6 100644 --- a/src/util/src/tref.c +++ b/src/util/src/tref.c @@ -442,7 +442,7 @@ static int taosDecRefCount(int rsetId, int64_t rid, int remove) { } released = 1; } else { - uTrace("rsetId:%d p:%p rid:%" PRId64 " is released", rsetId, pNode->p, rid); + uTrace("rsetId:%d p:%p rid:%" PRId64 " is released, count:%d", rsetId, pNode->p, rid, pNode->count); } } else { uTrace("rsetId:%d rid:%" PRId64 " is not there, failed to release/remove", rsetId, rid); -- GitLab