提交 f5aa8a20 编写于 作者: W wpan

td-6502

上级 57a995ff
......@@ -55,7 +55,7 @@ typedef struct STidTags {
#pragma pack(pop)
typedef struct SJoinSupporter {
SSqlObj* pObj; // parent SqlObj
int64_t pObj; // parent SqlObj
int32_t subqueryIndex; // index of sub query
SInterval interval;
SLimitVal limit; // limit info
......
......@@ -386,7 +386,7 @@ SJoinSupporter* tscCreateJoinSupporter(SSqlObj* pSql, int32_t index) {
return NULL;
}
pSupporter->pObj = pSql;
pSupporter->pObj = pSql->self;
pSupporter->subqueryIndex = index;
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
......@@ -1119,7 +1119,8 @@ bool emptyTagList(SArray* resList, int32_t size) {
static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRows) {
SJoinSupporter* pSupporter = (SJoinSupporter*)param;
SSqlObj* pParentSql = pSupporter->pObj;
SSqlObj* pParentSql = (SSqlObj*)taosAcquireRef(tscObjRef, (int64_t)pSupporter->pObj);
if (pParentSql == NULL) return;
SSqlObj* pSql = (SSqlObj*)tres;
SSqlCmd* pCmd = &pSql->cmd;
......@@ -1133,16 +1134,15 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
tscError("0x%"PRIx64" abort query due to other subquery failure. code:%d, global code:%d", pSql->self, numOfRows, pParentSql->res.code);
if (quitAllSubquery(pSql, pParentSql, pSupporter)) {
return;
goto _return;
}
if (!tscReparseSql(pParentSql->rootObj, pParentSql->res.code)) {
return;
goto _return;
}
tscAsyncResultOnError(pParentSql);
return;
goto _return;
}
// check for the error code firstly
......@@ -1154,15 +1154,15 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
pParentSql->res.code = numOfRows;
if (quitAllSubquery(pSql, pParentSql, pSupporter)) {
return;
goto _return;
}
if (!tscReparseSql(pParentSql->rootObj, pParentSql->res.code)) {
return;
goto _return;
}
tscAsyncResultOnError(pParentSql);
return;
goto _return;
}
// keep the results in memory
......@@ -1177,11 +1177,11 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
pParentSql->res.code = TAOS_SYSTEM_ERROR(errno);
if (quitAllSubquery(pSql, pParentSql, pSupporter)) {
return;
goto _return;
}
tscAsyncResultOnError(pParentSql);
return;
goto _return;
}
pSupporter->pIdTagList = tmp;
......@@ -1193,7 +1193,7 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
// query not completed, continue to retrieve tid + tag tuples
if (!pRes->completed) {
taos_fetch_rows_a(tres, tidTagRetrieveCallback, param);
return;
goto _return;
}
}
......@@ -1215,14 +1215,14 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
// set the callback function
pSql->fp = tscJoinQueryCallback;
tscBuildAndSendRequest(pSql, NULL);
return;
goto _return;
}
// no data exists in next vnode, mark the <tid, tags> query completed
// only when there is no subquery exits any more, proceeds to get the intersect of the <tid, tags> tuple sets.
if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) {
//tscDebug("0x%"PRIx64" tagRetrieve:%p,%d completed, total:%d", pParentSql->self, tres, pSupporter->subqueryIndex, pParentSql->subState.numOfSub);
return;
goto _return;
}
SArray* resList = taosArrayInit(pParentSql->subState.numOfSub, sizeof(SArray *));
......@@ -1234,7 +1234,7 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
tscAsyncResultOnError(pParentSql);
taosArrayDestroy(resList);
return;
goto _return;
}
if (emptyTagList(resList, pParentSql->subState.numOfSub)) { // no results,return.
......@@ -1278,12 +1278,16 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
}
taosArrayDestroy(resList);
_return:
taosReleaseRef(tscObjRef, (int64_t)pSupporter->pObj);
}
static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRows) {
SJoinSupporter* pSupporter = (SJoinSupporter*)param;
SSqlObj* pParentSql = pSupporter->pObj;
SSqlObj* pParentSql = (SSqlObj*)taosAcquireRef(tscObjRef, (int64_t)pSupporter->pObj);
if (pParentSql == NULL) return;
SSqlObj* pSql = (SSqlObj*)tres;
SSqlCmd* pCmd = &pSql->cmd;
......@@ -1295,16 +1299,16 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
tscError("0x%"PRIx64" abort query due to other subquery failure. code:%d, global code:%d", pSql->self, numOfRows, pParentSql->res.code);
if (quitAllSubquery(pSql, pParentSql, pSupporter)){
return;
goto _return;
}
if (!tscReparseSql(pParentSql->rootObj, pParentSql->res.code)) {
return;
goto _return;
}
tscAsyncResultOnError(pParentSql);
return;
goto _return;
}
// check for the error code firstly
......@@ -1315,15 +1319,15 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
pParentSql->res.code = numOfRows;
if (quitAllSubquery(pSql, pParentSql, pSupporter)){
return;
goto _return;
}
if (!tscReparseSql(pParentSql->rootObj, pParentSql->res.code)) {
return;
goto _return;
}
tscAsyncResultOnError(pParentSql);
return;
goto _return;
}
if (numOfRows > 0) { // write the compressed timestamp to disk file
......@@ -1335,12 +1339,12 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
pParentSql->res.code = TAOS_SYSTEM_ERROR(errno);
if (quitAllSubquery(pSql, pParentSql, pSupporter)) {
return;
goto _return;
}
tscAsyncResultOnError(pParentSql);
return;
goto _return;
}
}
......@@ -1354,12 +1358,12 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
pParentSql->res.code = TAOS_SYSTEM_ERROR(errno);
if (quitAllSubquery(pSql, pParentSql, pSupporter)){
return;
goto _return;
}
tscAsyncResultOnError(pParentSql);
return;
goto _return;
}
if (pSupporter->pTSBuf == NULL) {
......@@ -1378,7 +1382,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
pRes->row = pRes->numOfRows;
taos_fetch_rows_a(tres, tsCompRetrieveCallback, param);
return;
goto _return;
}
}
......@@ -1406,11 +1410,11 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
// set the callback function
pSql->fp = tscJoinQueryCallback;
tscBuildAndSendRequest(pSql, NULL);
return;
goto _return;
}
if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) {
return;
goto _return;
}
tscDebug("0x%"PRIx64" all subquery retrieve ts complete, do ts block intersect", pParentSql->self);
......@@ -1424,7 +1428,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
// set no result command
pParentSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
(*pParentSql->fp)(pParentSql->param, pParentSql, 0);
return;
goto _return;
}
// launch the query the retrieve actual results from vnode along with the filtered timestamp
......@@ -1433,12 +1437,16 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
//update the vgroup that involved in real data query
tscLaunchRealSubqueries(pParentSql);
_return:
taosReleaseRef(tscObjRef, (int64_t)pSupporter->pObj);
}
static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfRows) {
SJoinSupporter* pSupporter = (SJoinSupporter*)param;
SSqlObj* pParentSql = pSupporter->pObj;
SSqlObj* pParentSql = (SSqlObj*)taosAcquireRef(tscObjRef, (int64_t)pSupporter->pObj);
if (pParentSql == NULL) return;
SSqlObj* pSql = (SSqlObj*)tres;
SSqlCmd* pCmd = &pSql->cmd;
......@@ -1449,15 +1457,15 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
tscError("0x%"PRIx64" abort query due to other subquery failure. code:%d, global code:%d", pSql->self, numOfRows, pParentSql->res.code);
if (quitAllSubquery(pSql, pParentSql, pSupporter)) {
return;
goto _return;
}
if (!tscReparseSql(pParentSql->rootObj, pParentSql->res.code)) {
return;
goto _return;
}
tscAsyncResultOnError(pParentSql);
return;
goto _return;
}
......@@ -1468,7 +1476,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
tscError("0x%"PRIx64" retrieve failed, index:%d, code:%s", pSql->self, pSupporter->subqueryIndex, tstrerror(numOfRows));
tscAsyncResultOnError(pParentSql);
return;
goto _return;
}
if (numOfRows >= 0) {
......@@ -1494,7 +1502,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
pSql->fp = tscJoinQueryCallback;
tscBuildAndSendRequest(pSql, NULL);
return;
goto _return;
} else {
tscDebug("0x%"PRIx64" no result in current subquery anymore", pSql->self);
}
......@@ -1502,7 +1510,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) {
//tscDebug("0x%"PRIx64" sub:0x%"PRIx64",%d completed, total:%d", pParentSql->self, pSql->self, pSupporter->subqueryIndex, pState->numOfSub);
return;
goto _return;
}
tscDebug("0x%"PRIx64" all %d secondary subqueries retrieval completed, code:%d", pSql->self, pState->numOfSub, pParentSql->res.code);
......@@ -1540,6 +1548,9 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
// data has retrieved to client, build the join results
tscBuildResFromSubqueries(pParentSql);
_return:
taosReleaseRef(tscObjRef, (int64_t)pSupporter->pObj);
}
void tscFetchDatablockForSubquery(SSqlObj* pSql) {
......@@ -1787,7 +1798,8 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
SSqlObj* pSql = (SSqlObj*)tres;
SJoinSupporter* pSupporter = (SJoinSupporter*)param;
SSqlObj* pParentSql = pSupporter->pObj;
SSqlObj* pParentSql = (SSqlObj*)taosAcquireRef(tscObjRef, (int64_t)pSupporter->pObj);
if (pParentSql == NULL) return;
// There is only one subquery and table for each subquery.
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
......@@ -1799,16 +1811,16 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
tscError("0x%"PRIx64" abort query due to other subquery failure. code:%d, global code:%d", pSql->self, code, pParentSql->res.code);
if (quitAllSubquery(pSql, pParentSql, pSupporter)) {
return;
goto _return;
}
if (!tscReparseSql(pParentSql->rootObj, pParentSql->res.code)) {
return;
goto _return;
}
tscAsyncResultOnError(pParentSql);
return;
goto _return;
}
// TODO here retry is required, not directly returns to client
......@@ -1819,16 +1831,16 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
pParentSql->res.code = code;
if (quitAllSubquery(pSql, pParentSql, pSupporter)) {
return;
goto _return;
}
if (!tscReparseSql(pParentSql->rootObj, pParentSql->res.code)) {
return;
goto _return;
}
tscAsyncResultOnError(pParentSql);
return;
goto _return;
}
// retrieve <tid, tag> tuples from vnode
......@@ -1836,7 +1848,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
pSql->fp = tidTagRetrieveCallback;
pSql->cmd.command = TSDB_SQL_FETCH;
tscBuildAndSendRequest(pSql, NULL);
return;
goto _return;
}
// retrieve ts_comp info from vnode
......@@ -1844,13 +1856,13 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
pSql->fp = tsCompRetrieveCallback;
pSql->cmd.command = TSDB_SQL_FETCH;
tscBuildAndSendRequest(pSql, NULL);
return;
goto _return;
}
// In case of consequence query from other vnode, do not wait for other query response here.
if (!(pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0))) {
if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) {
return;
goto _return;
}
}
......@@ -1873,6 +1885,11 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
tscAsyncResultOnError(pParentSql);
}
}
_return:
taosReleaseRef(tscObjRef, (int64_t)pSupporter->pObj);
}
/////////////////////////////////////////////////////////////////////////////////////////
......
......@@ -442,7 +442,7 @@ static int taosDecRefCount(int rsetId, int64_t rid, int remove) {
}
released = 1;
} else {
uTrace("rsetId:%d p:%p rid:%" PRId64 " is released", rsetId, pNode->p, rid);
uTrace("rsetId:%d p:%p rid:%" PRId64 " is released, count:%d", rsetId, pNode->p, rid, pNode->count);
}
} else {
uTrace("rsetId:%d rid:%" PRId64 " is not there, failed to release/remove", rsetId, rid);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册