未验证 提交 d518801d 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #7804 from taosdata/hotfix/TD-6502-2

[td-6502]fix sql obj reused issue
......@@ -55,7 +55,7 @@ typedef struct STidTags {
#pragma pack(pop)
typedef struct SJoinSupporter {
SSqlObj* pObj; // parent SqlObj
int64_t pObj; // parent SqlObj
int32_t subqueryIndex; // index of sub query
SInterval interval;
SLimitVal limit; // limit info
......
......@@ -386,7 +386,7 @@ SJoinSupporter* tscCreateJoinSupporter(SSqlObj* pSql, int32_t index) {
return NULL;
}
pSupporter->pObj = pSql;
pSupporter->pObj = pSql->self;
pSupporter->subqueryIndex = index;
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
......@@ -1119,7 +1119,10 @@ bool emptyTagList(SArray* resList, int32_t size) {
static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRows) {
SJoinSupporter* pSupporter = (SJoinSupporter*)param;
SSqlObj* pParentSql = pSupporter->pObj;
int64_t handle = pSupporter->pObj;
SSqlObj* pParentSql = (SSqlObj*)taosAcquireRef(tscObjRef, handle);
if (pParentSql == NULL) return;
SSqlObj* pSql = (SSqlObj*)tres;
SSqlCmd* pCmd = &pSql->cmd;
......@@ -1133,16 +1136,15 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
tscError("0x%"PRIx64" abort query due to other subquery failure. code:%d, global code:%d", pSql->self, numOfRows, pParentSql->res.code);
if (quitAllSubquery(pSql, pParentSql, pSupporter)) {
return;
goto _return;
}
if (!tscReparseSql(pParentSql->rootObj, pParentSql->res.code)) {
return;
goto _return;
}
tscAsyncResultOnError(pParentSql);
return;
goto _return;
}
// check for the error code firstly
......@@ -1154,15 +1156,15 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
pParentSql->res.code = numOfRows;
if (quitAllSubquery(pSql, pParentSql, pSupporter)) {
return;
goto _return;
}
if (!tscReparseSql(pParentSql->rootObj, pParentSql->res.code)) {
return;
goto _return;
}
tscAsyncResultOnError(pParentSql);
return;
goto _return;
}
// keep the results in memory
......@@ -1177,11 +1179,11 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
pParentSql->res.code = TAOS_SYSTEM_ERROR(errno);
if (quitAllSubquery(pSql, pParentSql, pSupporter)) {
return;
goto _return;
}
tscAsyncResultOnError(pParentSql);
return;
goto _return;
}
pSupporter->pIdTagList = tmp;
......@@ -1193,7 +1195,7 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
// query not completed, continue to retrieve tid + tag tuples
if (!pRes->completed) {
taos_fetch_rows_a(tres, tidTagRetrieveCallback, param);
return;
goto _return;
}
}
......@@ -1215,14 +1217,14 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
// set the callback function
pSql->fp = tscJoinQueryCallback;
tscBuildAndSendRequest(pSql, NULL);
return;
goto _return;
}
// no data exists in next vnode, mark the <tid, tags> query completed
// only when there is no subquery exits any more, proceeds to get the intersect of the <tid, tags> tuple sets.
if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) {
//tscDebug("0x%"PRIx64" tagRetrieve:%p,%d completed, total:%d", pParentSql->self, tres, pSupporter->subqueryIndex, pParentSql->subState.numOfSub);
return;
goto _return;
}
SArray* resList = taosArrayInit(pParentSql->subState.numOfSub, sizeof(SArray *));
......@@ -1234,7 +1236,7 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
tscAsyncResultOnError(pParentSql);
taosArrayDestroy(resList);
return;
goto _return;
}
if (emptyTagList(resList, pParentSql->subState.numOfSub)) { // no results,return.
......@@ -1278,12 +1280,18 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
}
taosArrayDestroy(resList);
_return:
taosReleaseRef(tscObjRef, handle);
}
static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRows) {
SJoinSupporter* pSupporter = (SJoinSupporter*)param;
SSqlObj* pParentSql = pSupporter->pObj;
int64_t handle = pSupporter->pObj;
SSqlObj* pParentSql = (SSqlObj*)taosAcquireRef(tscObjRef, handle);
if (pParentSql == NULL) return;
SSqlObj* pSql = (SSqlObj*)tres;
SSqlCmd* pCmd = &pSql->cmd;
......@@ -1295,16 +1303,16 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
tscError("0x%"PRIx64" abort query due to other subquery failure. code:%d, global code:%d", pSql->self, numOfRows, pParentSql->res.code);
if (quitAllSubquery(pSql, pParentSql, pSupporter)){
return;
goto _return;
}
if (!tscReparseSql(pParentSql->rootObj, pParentSql->res.code)) {
return;
goto _return;
}
tscAsyncResultOnError(pParentSql);
return;
goto _return;
}
// check for the error code firstly
......@@ -1315,15 +1323,15 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
pParentSql->res.code = numOfRows;
if (quitAllSubquery(pSql, pParentSql, pSupporter)){
return;
goto _return;
}
if (!tscReparseSql(pParentSql->rootObj, pParentSql->res.code)) {
return;
goto _return;
}
tscAsyncResultOnError(pParentSql);
return;
goto _return;
}
if (numOfRows > 0) { // write the compressed timestamp to disk file
......@@ -1335,12 +1343,12 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
pParentSql->res.code = TAOS_SYSTEM_ERROR(errno);
if (quitAllSubquery(pSql, pParentSql, pSupporter)) {
return;
goto _return;
}
tscAsyncResultOnError(pParentSql);
return;
goto _return;
}
}
......@@ -1354,12 +1362,12 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
pParentSql->res.code = TAOS_SYSTEM_ERROR(errno);
if (quitAllSubquery(pSql, pParentSql, pSupporter)){
return;
goto _return;
}
tscAsyncResultOnError(pParentSql);
return;
goto _return;
}
if (pSupporter->pTSBuf == NULL) {
......@@ -1378,7 +1386,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
pRes->row = pRes->numOfRows;
taos_fetch_rows_a(tres, tsCompRetrieveCallback, param);
return;
goto _return;
}
}
......@@ -1406,11 +1414,11 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
// set the callback function
pSql->fp = tscJoinQueryCallback;
tscBuildAndSendRequest(pSql, NULL);
return;
goto _return;
}
if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) {
return;
goto _return;
}
tscDebug("0x%"PRIx64" all subquery retrieve ts complete, do ts block intersect", pParentSql->self);
......@@ -1424,7 +1432,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
// set no result command
pParentSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
(*pParentSql->fp)(pParentSql->param, pParentSql, 0);
return;
goto _return;
}
// launch the query the retrieve actual results from vnode along with the filtered timestamp
......@@ -1433,12 +1441,17 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
//update the vgroup that involved in real data query
tscLaunchRealSubqueries(pParentSql);
_return:
taosReleaseRef(tscObjRef, handle);
}
static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfRows) {
SJoinSupporter* pSupporter = (SJoinSupporter*)param;
int64_t handle = pSupporter->pObj;
SSqlObj* pParentSql = pSupporter->pObj;
SSqlObj* pParentSql = (SSqlObj*)taosAcquireRef(tscObjRef, handle);
if (pParentSql == NULL) return;
SSqlObj* pSql = (SSqlObj*)tres;
SSqlCmd* pCmd = &pSql->cmd;
......@@ -1449,15 +1462,15 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
tscError("0x%"PRIx64" abort query due to other subquery failure. code:%d, global code:%d", pSql->self, numOfRows, pParentSql->res.code);
if (quitAllSubquery(pSql, pParentSql, pSupporter)) {
return;
goto _return;
}
if (!tscReparseSql(pParentSql->rootObj, pParentSql->res.code)) {
return;
goto _return;
}
tscAsyncResultOnError(pParentSql);
return;
goto _return;
}
......@@ -1468,7 +1481,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
tscError("0x%"PRIx64" retrieve failed, index:%d, code:%s", pSql->self, pSupporter->subqueryIndex, tstrerror(numOfRows));
tscAsyncResultOnError(pParentSql);
return;
goto _return;
}
if (numOfRows >= 0) {
......@@ -1494,7 +1507,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
pSql->fp = tscJoinQueryCallback;
tscBuildAndSendRequest(pSql, NULL);
return;
goto _return;
} else {
tscDebug("0x%"PRIx64" no result in current subquery anymore", pSql->self);
}
......@@ -1502,7 +1515,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) {
//tscDebug("0x%"PRIx64" sub:0x%"PRIx64",%d completed, total:%d", pParentSql->self, pSql->self, pSupporter->subqueryIndex, pState->numOfSub);
return;
goto _return;
}
tscDebug("0x%"PRIx64" all %d secondary subqueries retrieval completed, code:%d", pSql->self, pState->numOfSub, pParentSql->res.code);
......@@ -1540,6 +1553,9 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
// data has retrieved to client, build the join results
tscBuildResFromSubqueries(pParentSql);
_return:
taosReleaseRef(tscObjRef, handle);
}
void tscFetchDatablockForSubquery(SSqlObj* pSql) {
......@@ -1787,7 +1803,10 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
SSqlObj* pSql = (SSqlObj*)tres;
SJoinSupporter* pSupporter = (SJoinSupporter*)param;
SSqlObj* pParentSql = pSupporter->pObj;
int64_t handle = pSupporter->pObj;
SSqlObj* pParentSql = (SSqlObj*)taosAcquireRef(tscObjRef, handle);
if (pParentSql == NULL) return;
// There is only one subquery and table for each subquery.
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
......@@ -1799,16 +1818,16 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
tscError("0x%"PRIx64" abort query due to other subquery failure. code:%d, global code:%d", pSql->self, code, pParentSql->res.code);
if (quitAllSubquery(pSql, pParentSql, pSupporter)) {
return;
goto _return;
}
if (!tscReparseSql(pParentSql->rootObj, pParentSql->res.code)) {
return;
goto _return;
}
tscAsyncResultOnError(pParentSql);
return;
goto _return;
}
// TODO here retry is required, not directly returns to client
......@@ -1819,16 +1838,16 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
pParentSql->res.code = code;
if (quitAllSubquery(pSql, pParentSql, pSupporter)) {
return;
goto _return;
}
if (!tscReparseSql(pParentSql->rootObj, pParentSql->res.code)) {
return;
goto _return;
}
tscAsyncResultOnError(pParentSql);
return;
goto _return;
}
// retrieve <tid, tag> tuples from vnode
......@@ -1836,7 +1855,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
pSql->fp = tidTagRetrieveCallback;
pSql->cmd.command = TSDB_SQL_FETCH;
tscBuildAndSendRequest(pSql, NULL);
return;
goto _return;
}
// retrieve ts_comp info from vnode
......@@ -1844,13 +1863,13 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
pSql->fp = tsCompRetrieveCallback;
pSql->cmd.command = TSDB_SQL_FETCH;
tscBuildAndSendRequest(pSql, NULL);
return;
goto _return;
}
// In case of consequence query from other vnode, do not wait for other query response here.
if (!(pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0))) {
if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) {
return;
goto _return;
}
}
......@@ -1873,6 +1892,11 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
tscAsyncResultOnError(pParentSql);
}
}
_return:
taosReleaseRef(tscObjRef, handle);
}
/////////////////////////////////////////////////////////////////////////////////////////
......@@ -1971,9 +1995,9 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
size_t numOfCols = taosArrayGetSize(pNewQueryInfo->colList);
tscDebug(
"%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%d, transfer to tid_tag query to retrieve (tableId, tags), "
"0x%"PRIX64" subquery:0x%"PRIx64" tableIndex:%d, vgroupIndex:%d, type:%d, transfer to tid_tag query to retrieve (tableId, tags), "
"exprInfo:%" PRIzu ", colList:%" PRIzu ", fieldsInfo:%d, tagIndex:%d, name:%s",
pSql, pNew, tableIndex, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, tscNumOfExprs(pNewQueryInfo),
pSql->self, pNew->self, tableIndex, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, tscNumOfExprs(pNewQueryInfo),
numOfCols, pNewQueryInfo->fieldsInfo.numOfOutput, colIndex.columnIndex, tNameGetTableName(&pNewQueryInfo->pTableMetaInfo[0]->name));
} else {
SSchema colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = 1};
......@@ -2006,9 +2030,9 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
size_t numOfCols = taosArrayGetSize(pNewQueryInfo->colList);
tscDebug(
"%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%u, transfer to ts_comp query to retrieve timestamps, "
"0x%"PRIX64" subquery:0x%"PRIx64" tableIndex:%d, vgroupIndex:%d, type:%u, transfer to ts_comp query to retrieve timestamps, "
"exprInfo:%" PRIzu ", colList:%" PRIzu ", fieldsInfo:%d, name:%s",
pSql, pNew, tableIndex, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, tscNumOfExprs(pNewQueryInfo),
pSql->self, pNew->self, tableIndex, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, tscNumOfExprs(pNewQueryInfo),
numOfCols, pNewQueryInfo->fieldsInfo.numOfOutput, tNameGetTableName(&pNewQueryInfo->pTableMetaInfo[0]->name));
}
} else {
......
......@@ -442,7 +442,7 @@ static int taosDecRefCount(int rsetId, int64_t rid, int remove) {
}
released = 1;
} else {
uTrace("rsetId:%d p:%p rid:%" PRId64 " is released", rsetId, pNode->p, rid);
uTrace("rsetId:%d p:%p rid:%" PRId64 " is released, count:%d", rsetId, pNode->p, rid, pNode->count);
}
} else {
uTrace("rsetId:%d rid:%" PRId64 " is not there, failed to release/remove", rsetId, rid);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册