diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 81e430f5c2ae2e8e4c0dda10ae3daff6d90d897c..fcf61580c8967756614efdce55010fca920b12c2 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -55,7 +55,7 @@ typedef struct STidTags { #pragma pack(pop) typedef struct SJoinSupporter { - SSqlObj* pObj; // parent SqlObj + int64_t pObj; // parent SqlObj int32_t subqueryIndex; // index of sub query SInterval interval; SLimitVal limit; // limit info diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 390c2f882b763a39b0401f512aa4d3c21ad108a9..b293d92aa4a530905266e8f2a7483f5e4f8fcc80 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -386,7 +386,7 @@ SJoinSupporter* tscCreateJoinSupporter(SSqlObj* pSql, int32_t index) { return NULL; } - pSupporter->pObj = pSql; + pSupporter->pObj = pSql->self; pSupporter->subqueryIndex = index; SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd); @@ -1119,7 +1119,10 @@ bool emptyTagList(SArray* resList, int32_t size) { static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRows) { SJoinSupporter* pSupporter = (SJoinSupporter*)param; - SSqlObj* pParentSql = pSupporter->pObj; + int64_t handle = pSupporter->pObj; + + SSqlObj* pParentSql = (SSqlObj*)taosAcquireRef(tscObjRef, handle); + if (pParentSql == NULL) return; SSqlObj* pSql = (SSqlObj*)tres; SSqlCmd* pCmd = &pSql->cmd; @@ -1133,16 +1136,15 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow if (pParentSql->res.code != TSDB_CODE_SUCCESS) { tscError("0x%"PRIx64" abort query due to other subquery failure. code:%d, global code:%d", pSql->self, numOfRows, pParentSql->res.code); if (quitAllSubquery(pSql, pParentSql, pSupporter)) { - return; + goto _return; } if (!tscReparseSql(pParentSql->rootObj, pParentSql->res.code)) { - return; + goto _return; } tscAsyncResultOnError(pParentSql); - - return; + goto _return; } // check for the error code firstly @@ -1154,15 +1156,15 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow pParentSql->res.code = numOfRows; if (quitAllSubquery(pSql, pParentSql, pSupporter)) { - return; + goto _return; } if (!tscReparseSql(pParentSql->rootObj, pParentSql->res.code)) { - return; + goto _return; } tscAsyncResultOnError(pParentSql); - return; + goto _return; } // keep the results in memory @@ -1177,11 +1179,11 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow pParentSql->res.code = TAOS_SYSTEM_ERROR(errno); if (quitAllSubquery(pSql, pParentSql, pSupporter)) { - return; + goto _return; } tscAsyncResultOnError(pParentSql); - return; + goto _return; } pSupporter->pIdTagList = tmp; @@ -1193,7 +1195,7 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow // query not completed, continue to retrieve tid + tag tuples if (!pRes->completed) { taos_fetch_rows_a(tres, tidTagRetrieveCallback, param); - return; + goto _return; } } @@ -1215,14 +1217,14 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow // set the callback function pSql->fp = tscJoinQueryCallback; tscBuildAndSendRequest(pSql, NULL); - return; + goto _return; } // no data exists in next vnode, mark the query completed // only when there is no subquery exits any more, proceeds to get the intersect of the tuple sets. if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) { //tscDebug("0x%"PRIx64" tagRetrieve:%p,%d completed, total:%d", pParentSql->self, tres, pSupporter->subqueryIndex, pParentSql->subState.numOfSub); - return; + goto _return; } SArray* resList = taosArrayInit(pParentSql->subState.numOfSub, sizeof(SArray *)); @@ -1234,7 +1236,7 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow tscAsyncResultOnError(pParentSql); taosArrayDestroy(resList); - return; + goto _return; } if (emptyTagList(resList, pParentSql->subState.numOfSub)) { // no results,return. @@ -1278,12 +1280,18 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow } taosArrayDestroy(resList); + +_return: + taosReleaseRef(tscObjRef, handle); } static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRows) { SJoinSupporter* pSupporter = (SJoinSupporter*)param; - SSqlObj* pParentSql = pSupporter->pObj; + int64_t handle = pSupporter->pObj; + + SSqlObj* pParentSql = (SSqlObj*)taosAcquireRef(tscObjRef, handle); + if (pParentSql == NULL) return; SSqlObj* pSql = (SSqlObj*)tres; SSqlCmd* pCmd = &pSql->cmd; @@ -1295,16 +1303,16 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow if (pParentSql->res.code != TSDB_CODE_SUCCESS) { tscError("0x%"PRIx64" abort query due to other subquery failure. code:%d, global code:%d", pSql->self, numOfRows, pParentSql->res.code); if (quitAllSubquery(pSql, pParentSql, pSupporter)){ - return; + goto _return; } if (!tscReparseSql(pParentSql->rootObj, pParentSql->res.code)) { - return; + goto _return; } tscAsyncResultOnError(pParentSql); - return; + goto _return; } // check for the error code firstly @@ -1315,15 +1323,15 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow pParentSql->res.code = numOfRows; if (quitAllSubquery(pSql, pParentSql, pSupporter)){ - return; + goto _return; } if (!tscReparseSql(pParentSql->rootObj, pParentSql->res.code)) { - return; + goto _return; } tscAsyncResultOnError(pParentSql); - return; + goto _return; } if (numOfRows > 0) { // write the compressed timestamp to disk file @@ -1335,12 +1343,12 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow pParentSql->res.code = TAOS_SYSTEM_ERROR(errno); if (quitAllSubquery(pSql, pParentSql, pSupporter)) { - return; + goto _return; } tscAsyncResultOnError(pParentSql); - return; + goto _return; } } @@ -1354,12 +1362,12 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow pParentSql->res.code = TAOS_SYSTEM_ERROR(errno); if (quitAllSubquery(pSql, pParentSql, pSupporter)){ - return; + goto _return; } tscAsyncResultOnError(pParentSql); - return; + goto _return; } if (pSupporter->pTSBuf == NULL) { @@ -1378,7 +1386,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow pRes->row = pRes->numOfRows; taos_fetch_rows_a(tres, tsCompRetrieveCallback, param); - return; + goto _return; } } @@ -1406,11 +1414,11 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow // set the callback function pSql->fp = tscJoinQueryCallback; tscBuildAndSendRequest(pSql, NULL); - return; + goto _return; } if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) { - return; + goto _return; } tscDebug("0x%"PRIx64" all subquery retrieve ts complete, do ts block intersect", pParentSql->self); @@ -1424,7 +1432,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow // set no result command pParentSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT; (*pParentSql->fp)(pParentSql->param, pParentSql, 0); - return; + goto _return; } // launch the query the retrieve actual results from vnode along with the filtered timestamp @@ -1433,12 +1441,17 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow //update the vgroup that involved in real data query tscLaunchRealSubqueries(pParentSql); + +_return: + taosReleaseRef(tscObjRef, handle); } static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfRows) { SJoinSupporter* pSupporter = (SJoinSupporter*)param; + int64_t handle = pSupporter->pObj; - SSqlObj* pParentSql = pSupporter->pObj; + SSqlObj* pParentSql = (SSqlObj*)taosAcquireRef(tscObjRef, handle); + if (pParentSql == NULL) return; SSqlObj* pSql = (SSqlObj*)tres; SSqlCmd* pCmd = &pSql->cmd; @@ -1449,15 +1462,15 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR if (pParentSql->res.code != TSDB_CODE_SUCCESS) { tscError("0x%"PRIx64" abort query due to other subquery failure. code:%d, global code:%d", pSql->self, numOfRows, pParentSql->res.code); if (quitAllSubquery(pSql, pParentSql, pSupporter)) { - return; + goto _return; } if (!tscReparseSql(pParentSql->rootObj, pParentSql->res.code)) { - return; + goto _return; } tscAsyncResultOnError(pParentSql); - return; + goto _return; } @@ -1468,7 +1481,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR tscError("0x%"PRIx64" retrieve failed, index:%d, code:%s", pSql->self, pSupporter->subqueryIndex, tstrerror(numOfRows)); tscAsyncResultOnError(pParentSql); - return; + goto _return; } if (numOfRows >= 0) { @@ -1494,7 +1507,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR pSql->fp = tscJoinQueryCallback; tscBuildAndSendRequest(pSql, NULL); - return; + goto _return; } else { tscDebug("0x%"PRIx64" no result in current subquery anymore", pSql->self); } @@ -1502,7 +1515,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) { //tscDebug("0x%"PRIx64" sub:0x%"PRIx64",%d completed, total:%d", pParentSql->self, pSql->self, pSupporter->subqueryIndex, pState->numOfSub); - return; + goto _return; } tscDebug("0x%"PRIx64" all %d secondary subqueries retrieval completed, code:%d", pSql->self, pState->numOfSub, pParentSql->res.code); @@ -1540,6 +1553,9 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR // data has retrieved to client, build the join results tscBuildResFromSubqueries(pParentSql); + +_return: + taosReleaseRef(tscObjRef, handle); } void tscFetchDatablockForSubquery(SSqlObj* pSql) { @@ -1787,7 +1803,10 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { SSqlObj* pSql = (SSqlObj*)tres; SJoinSupporter* pSupporter = (SJoinSupporter*)param; - SSqlObj* pParentSql = pSupporter->pObj; + int64_t handle = pSupporter->pObj; + + SSqlObj* pParentSql = (SSqlObj*)taosAcquireRef(tscObjRef, handle); + if (pParentSql == NULL) return; // There is only one subquery and table for each subquery. SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd); @@ -1799,16 +1818,16 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { if (pParentSql->res.code != TSDB_CODE_SUCCESS) { tscError("0x%"PRIx64" abort query due to other subquery failure. code:%d, global code:%d", pSql->self, code, pParentSql->res.code); if (quitAllSubquery(pSql, pParentSql, pSupporter)) { - return; + goto _return; } if (!tscReparseSql(pParentSql->rootObj, pParentSql->res.code)) { - return; + goto _return; } tscAsyncResultOnError(pParentSql); - return; + goto _return; } // TODO here retry is required, not directly returns to client @@ -1819,16 +1838,16 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { pParentSql->res.code = code; if (quitAllSubquery(pSql, pParentSql, pSupporter)) { - return; + goto _return; } if (!tscReparseSql(pParentSql->rootObj, pParentSql->res.code)) { - return; + goto _return; } tscAsyncResultOnError(pParentSql); - return; + goto _return; } // retrieve tuples from vnode @@ -1836,7 +1855,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { pSql->fp = tidTagRetrieveCallback; pSql->cmd.command = TSDB_SQL_FETCH; tscBuildAndSendRequest(pSql, NULL); - return; + goto _return; } // retrieve ts_comp info from vnode @@ -1844,13 +1863,13 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { pSql->fp = tsCompRetrieveCallback; pSql->cmd.command = TSDB_SQL_FETCH; tscBuildAndSendRequest(pSql, NULL); - return; + goto _return; } // In case of consequence query from other vnode, do not wait for other query response here. if (!(pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0))) { if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) { - return; + goto _return; } } @@ -1873,6 +1892,11 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { tscAsyncResultOnError(pParentSql); } } + + +_return: + taosReleaseRef(tscObjRef, handle); + } ///////////////////////////////////////////////////////////////////////////////////////// @@ -1971,9 +1995,9 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter size_t numOfCols = taosArrayGetSize(pNewQueryInfo->colList); tscDebug( - "%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%d, transfer to tid_tag query to retrieve (tableId, tags), " + "0x%"PRIX64" subquery:0x%"PRIx64" tableIndex:%d, vgroupIndex:%d, type:%d, transfer to tid_tag query to retrieve (tableId, tags), " "exprInfo:%" PRIzu ", colList:%" PRIzu ", fieldsInfo:%d, tagIndex:%d, name:%s", - pSql, pNew, tableIndex, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, tscNumOfExprs(pNewQueryInfo), + pSql->self, pNew->self, tableIndex, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, tscNumOfExprs(pNewQueryInfo), numOfCols, pNewQueryInfo->fieldsInfo.numOfOutput, colIndex.columnIndex, tNameGetTableName(&pNewQueryInfo->pTableMetaInfo[0]->name)); } else { SSchema colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = 1}; @@ -2006,9 +2030,9 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter size_t numOfCols = taosArrayGetSize(pNewQueryInfo->colList); tscDebug( - "%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%u, transfer to ts_comp query to retrieve timestamps, " + "0x%"PRIX64" subquery:0x%"PRIx64" tableIndex:%d, vgroupIndex:%d, type:%u, transfer to ts_comp query to retrieve timestamps, " "exprInfo:%" PRIzu ", colList:%" PRIzu ", fieldsInfo:%d, name:%s", - pSql, pNew, tableIndex, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, tscNumOfExprs(pNewQueryInfo), + pSql->self, pNew->self, tableIndex, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, tscNumOfExprs(pNewQueryInfo), numOfCols, pNewQueryInfo->fieldsInfo.numOfOutput, tNameGetTableName(&pNewQueryInfo->pTableMetaInfo[0]->name)); } } else { diff --git a/src/util/src/tref.c b/src/util/src/tref.c index 7d64bd1f83fb8d235c825057251a5e76e0b96b2a..33323889c68162219b3c6faf886ac29b2a975ffa 100644 --- a/src/util/src/tref.c +++ b/src/util/src/tref.c @@ -442,7 +442,7 @@ static int taosDecRefCount(int rsetId, int64_t rid, int remove) { } released = 1; } else { - uTrace("rsetId:%d p:%p rid:%" PRId64 " is released", rsetId, pNode->p, rid); + uTrace("rsetId:%d p:%p rid:%" PRId64 " is released, count:%d", rsetId, pNode->p, rid, pNode->count); } } else { uTrace("rsetId:%d rid:%" PRId64 " is not there, failed to release/remove", rsetId, rid);