From 47a983fbf7640d397ba575f7446f7d47a9cc4298 Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Sun, 19 Apr 2020 01:15:08 +0800 Subject: [PATCH] [td-98] fix bugs in reversed single table query --- src/query/inc/queryExecutor.h | 4 +- src/query/src/queryExecutor.c | 206 ++++++++++++++++++++-------------- 2 files changed, 121 insertions(+), 89 deletions(-) diff --git a/src/query/inc/queryExecutor.h b/src/query/inc/queryExecutor.h index 244f15e1dd..c07897abf6 100644 --- a/src/query/inc/queryExecutor.h +++ b/src/query/inc/queryExecutor.h @@ -170,11 +170,11 @@ typedef struct SQInfo { int32_t pointsInterpo; int32_t code; // error code to returned to client sem_t dataReady; - STableGroupInfo groupInfo; // table id list void* tsdb; + STableGroupInfo groupInfo; // table id list SQueryRuntimeEnv runtimeEnv; - int32_t subgroupIdx; + int32_t groupIndex; int32_t offset; /* offset in group result set of subgroup */ T_REF_DECLARE() diff --git a/src/query/src/queryExecutor.c b/src/query/src/queryExecutor.c index 9206becaab..6ca8b3ab4a 100644 --- a/src/query/src/queryExecutor.c +++ b/src/query/src/queryExecutor.c @@ -2900,14 +2900,14 @@ int32_t mergeIntoGroupResult(SQInfo *pQInfo) { int32_t numOfGroups = taosArrayGetSize(pQInfo->groupInfo.pGroupList); - while (pQInfo->subgroupIdx < numOfGroups) { - SArray* group = taosArrayGetP(pQInfo->groupInfo.pGroupList, pQInfo->subgroupIdx); + while (pQInfo->groupIndex < numOfGroups) { + SArray* group = taosArrayGetP(pQInfo->groupInfo.pGroupList, pQInfo->groupIndex); ret = mergeIntoGroupResultImpl(pQInfo, group); if (ret < 0) { // not enough disk space to save the data into disk return -1; } - pQInfo->subgroupIdx += 1; + pQInfo->groupIndex += 1; // this group generates at least one result, return results if (ret > 0) { @@ -2915,11 +2915,11 @@ int32_t mergeIntoGroupResult(SQInfo *pQInfo) { } assert(pQInfo->numOfGroupResultPages == 0); - dTrace("QInfo:%p no result in group %d, continue", pQInfo, pQInfo->subgroupIdx - 1); + dTrace("QInfo:%p no result in group %d, continue", pQInfo, pQInfo->groupIndex - 1); } dTrace("QInfo:%p merge res data into group, index:%d, total group:%d, elapsed time:%lldms", - pQInfo, pQInfo->subgroupIdx - 1, numOfGroups, taosGetTimestampMs() - st); + pQInfo, pQInfo->groupIndex - 1, numOfGroups, taosGetTimestampMs() - st); return TSDB_CODE_SUCCESS; } @@ -2934,7 +2934,7 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) { } // set current query completed - // if (pQInfo->numOfGroupResultPages == 0 && pQInfo->subgroupIdx == pQInfo->pSidSet->numOfSubSet) { + // if (pQInfo->numOfGroupResultPages == 0 && pQInfo->groupIndex == pQInfo->pSidSet->numOfSubSet) { // pQInfo->tableIndex = pQInfo->pSidSet->numOfTables; // return; // } @@ -2943,7 +2943,7 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) { SQueryRuntimeEnv * pRuntimeEnv = &pQInfo->runtimeEnv; SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf; - int32_t id = getGroupResultId(pQInfo->subgroupIdx - 1); + int32_t id = getGroupResultId(pQInfo->groupIndex - 1); SIDList list = getDataBufPagesIdList(pResultBuf, pQInfo->offset + id); int32_t total = 0; @@ -3149,7 +3149,7 @@ int32_t flushFromResultBuf(SQInfo *pQInfo) { r = capacity; } - int32_t id = getGroupResultId(pQInfo->subgroupIdx) + pQInfo->numOfGroupResultPages; + int32_t id = getGroupResultId(pQInfo->groupIndex) + pQInfo->numOfGroupResultPages; tFilePage *buf = getNewDataBuf(pResultBuf, id, &pageId); // pagewise copy to dest buffer @@ -3198,8 +3198,8 @@ static void doDisableFunctsForSupplementaryScan(SQuery *pQuery, SWindowResInfo * for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) { int32_t functId = pQuery->pSelectExpr[j].pBase.functionId; - if (((functId == TSDB_FUNC_FIRST || functId == TSDB_FUNC_FIRST_DST) && order == TSDB_ORDER_DESC) || - ((functId == TSDB_FUNC_LAST || functId == TSDB_FUNC_LAST_DST) && order == TSDB_ORDER_ASC)) { + if (((functId == TSDB_FUNC_FIRST || functId == TSDB_FUNC_FIRST_DST) && order == TSDB_ORDER_ASC) || + ((functId == TSDB_FUNC_LAST || functId == TSDB_FUNC_LAST_DST) && order == TSDB_ORDER_DESC)) { buf->resultInfo[j].complete = false; } else if (functId != TSDB_FUNC_TS && functId != TSDB_FUNC_TAG) { buf->resultInfo[j].complete = true; @@ -3208,32 +3208,28 @@ static void doDisableFunctsForSupplementaryScan(SQuery *pQuery, SWindowResInfo * } } -void disableFuncInReverseScan(SQueryRuntimeEnv *pRuntimeEnv, int32_t order) { +void disableFuncInReverseScan(SQueryRuntimeEnv *pRuntimeEnv) { SQuery *pQuery = pRuntimeEnv->pQuery; - + int32_t order = pQuery->order.order; + // group by normal columns and interval query on normal table - for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { - pRuntimeEnv->pCtx[i].order = (pRuntimeEnv->pCtx[i].order) ^ 1u; - } - SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || isIntervalQuery(pQuery)) { doDisableFunctsForSupplementaryScan(pQuery, pWindowResInfo, order); } else { // for simple result of table query, for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) { - int32_t functId = pQuery->pSelectExpr[j].pBase.functionId; + int32_t functId = pQuery->pSelectExpr[j].pBase.functionId; + SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[j]; - if (((functId == TSDB_FUNC_FIRST || functId == TSDB_FUNC_FIRST_DST) && order == TSDB_ORDER_DESC) || - ((functId == TSDB_FUNC_LAST || functId == TSDB_FUNC_LAST_DST) && order == TSDB_ORDER_ASC)) { + if (((functId == TSDB_FUNC_FIRST || functId == TSDB_FUNC_FIRST_DST) && order == TSDB_ORDER_ASC) || + ((functId == TSDB_FUNC_LAST || functId == TSDB_FUNC_LAST_DST) && order == TSDB_ORDER_DESC)) { pCtx->resultInfo->complete = false; } else if (functId != TSDB_FUNC_TS && functId != TSDB_FUNC_TAG) { pCtx->resultInfo->complete = true; } } } - - pQuery->order.order = pQuery->order.order ^ 1u; } void disableFuncForReverseScan(SQInfo *pQInfo, int32_t order) { @@ -3259,14 +3255,11 @@ void disableFuncForReverseScan(SQInfo *pQInfo, int32_t order) { pQuery->order.order = (pQuery->order.order) ^ 1u; } -void enableFuncForForwardScan(SQueryRuntimeEnv *pRuntimeEnv, int32_t order) { +void setCtxOrder(SQueryRuntimeEnv *pRuntimeEnv) { SQuery *pQuery = pRuntimeEnv->pQuery; - for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { - pRuntimeEnv->pCtx[i].order = (pRuntimeEnv->pCtx[i].order) ^ 1u; + pRuntimeEnv->pCtx[i].order = (pRuntimeEnv->pCtx[i].order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC; } - - pQuery->order.order = (pQuery->order.order) ^ 1u; } void createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool isSTableQuery, SPosInfo *posInfo) { @@ -3381,8 +3374,8 @@ void doSkipResults(SQueryRuntimeEnv *pRuntimeEnv) { } typedef struct SQueryStatus { - int8_t overStatus; - TSKEY lastKey; + int8_t status; +// TSKEY lastKey; STSCursor cur; } SQueryStatus; @@ -3390,8 +3383,8 @@ typedef struct SQueryStatus { static void queryStatusSave(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatus *pStatus) { SQuery *pQuery = pRuntimeEnv->pQuery; - pStatus->overStatus = pQuery->status; - pStatus->lastKey = pQuery->lastKey; + pStatus->status = pQuery->status; +// pStatus->lastKey = pQuery->lastKey; pStatus->cur = tsBufGetCursor(pRuntimeEnv->pTSBuf); // save the cursor @@ -3402,21 +3395,21 @@ static void queryStatusSave(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatus *pStatus setQueryStatus(pQuery, QUERY_NOT_COMPLETED); - SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY); - pQuery->lastKey = pQuery->window.skey; +// SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY); +// pQuery->lastKey = pQuery->window.skey; } static void queryStatusRestore(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatus *pStatus) { SQuery *pQuery = pRuntimeEnv->pQuery; SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY); - pQuery->lastKey = pStatus->lastKey; - pQuery->status = pStatus->overStatus; +// pQuery->lastKey = pStatus->lastKey; + pQuery->status = pStatus->status; tsBufSetCursor(pRuntimeEnv->pTSBuf, &pStatus->cur); } -static void doReverseScan(SQueryRuntimeEnv *pRuntimeEnv) { +static UNUSED_FUNC void doReverseScan(SQueryRuntimeEnv *pRuntimeEnv) { SQuery * pQuery = pRuntimeEnv->pQuery; SQueryStatus qStatus = {0}; @@ -3428,19 +3421,15 @@ static void doReverseScan(SQueryRuntimeEnv *pRuntimeEnv) { SET_SUPPLEMENT_SCAN_FLAG(pRuntimeEnv); // close necessary function execution during supplementary scan - disableFuncInReverseScan(pRuntimeEnv, pQuery->order.order); + disableFuncInReverseScan(pRuntimeEnv); queryStatusSave(pRuntimeEnv, &qStatus); -// STimeWindow w = {.skey = pQuery->window.skey, .ekey = pQuery->window.ekey}; - // reverse scan from current position -// tsdbpos_t current = tsdbDataBlockTell(pRuntimeEnv->pQueryHandle); -// tsdbResetQuery(pRuntimeEnv->pQueryHandle, &w, current, pQuery->order.order); - doScanAllDataBlocks(pRuntimeEnv); queryStatusRestore(pRuntimeEnv, &qStatus); - enableFuncForForwardScan(pRuntimeEnv, pQuery->order.order); + setCtxOrder(pRuntimeEnv); + SET_MASTER_SCAN_FLAG(pRuntimeEnv); } @@ -3508,7 +3497,7 @@ void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { int64_t skey = pQuery->lastKey; int32_t status = pQuery->status; - int32_t activeSlot = pRuntimeEnv->windowResInfo.curIndex; + int32_t prevSlot = pRuntimeEnv->windowResInfo.curIndex; SET_MASTER_SCAN_FLAG(pRuntimeEnv); int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); @@ -3517,55 +3506,98 @@ void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { doScanAllDataBlocks(pRuntimeEnv); if (!needScanDataBlocksAgain(pRuntimeEnv)) { - // restore the status if (pRuntimeEnv->scanFlag == REPEAT_SCAN) { - pQuery->status = status; + pQuery->status = status; // restore the status code when abort from repeat scan } break; } STsdbQueryCond cond = { - .twindow = {pQuery->window.skey, pQuery->lastKey}, + .twindow = {.skey = skey, .ekey = pQuery->lastKey - step}, .order = pQuery->order.order, .colList = pQuery->colList, .numOfCols = pQuery->numOfCols, }; - if (pRuntimeEnv->pSecQueryHandle != NULL) { + if (pRuntimeEnv->pSecQueryHandle == NULL) { pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->groupInfo); } - status = pQuery->status; - pRuntimeEnv->windowResInfo.curIndex = activeSlot; + status = pQuery->status; // backup the status + pRuntimeEnv->windowResInfo.curIndex = prevSlot; setQueryStatus(pQuery, QUERY_NOT_COMPLETED); pRuntimeEnv->scanFlag = REPEAT_SCAN; // check if query is killed or not - if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) { + if (isQueryKilled(pQInfo)) { return; } } + + if (!needReverseScan(pQuery)) { + return; + } + + // save the query time window + STimeWindow prev = {.skey = pQuery->lastKey, .ekey = pQuery->window.ekey}; - // no need to set the end key - TSKEY lkey = pQuery->lastKey; - TSKEY ekey = pQuery->window.ekey; - - pQuery->window.skey = skey; - pQuery->window.ekey = pQuery->lastKey - step; -// /*tsdbpos_t current =*/ tsdbDataBlockTell(pRuntimeEnv->pQueryHandle); - - doReverseScan(pRuntimeEnv); - - // update the pQuery->window.skey and pQuery->window.ekey to limit the scan scope of sliding query during reverse scan - pQuery->lastKey = lkey; - pQuery->window.ekey = ekey; + // reverse order time range + pQuery->window.skey = pQuery->lastKey - step; + pQuery->window.ekey = skey; + + pQuery->order.order = (pQuery->order.order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC; + + STsdbQueryCond cond = { + .twindow = pQuery->window, + .order = pQuery->order.order, + .colList = pQuery->colList, + .numOfCols = pQuery->numOfCols, + }; + + // clean unused handle + if (pRuntimeEnv->pSecQueryHandle != NULL) { + tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle); + } -// STimeWindow win = {.skey = pQuery->window.skey, .ekey = pQuery->window.ekey}; -// tsdbResetQuery(pRuntimeEnv->pQueryHandle, &win, current, pQuery->order.order); -// tsdbNextDataBlock(pRuntimeEnv->pQueryHandle); + pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->groupInfo); + + dTrace("QInfo:%p start to reverse scan", GET_QINFO_ADDR(pRuntimeEnv)); + SET_SUPPLEMENT_SCAN_FLAG(pRuntimeEnv); + + int32_t status1 = pQuery->status; + + STSCursor cur = tsBufGetCursor(pRuntimeEnv->pTSBuf); // save the cursor + if (pRuntimeEnv->pTSBuf) { + pRuntimeEnv->pTSBuf->cur.order = pQuery->order.order; + tsBufNextPos(pRuntimeEnv->pTSBuf); + } + + setQueryStatus(pQuery, QUERY_NOT_COMPLETED); + + setCtxOrder(pRuntimeEnv); + disableFuncInReverseScan(pRuntimeEnv); + + // reverse scan from current position + doScanAllDataBlocks(pRuntimeEnv); + + pQuery->order.order = (pQuery->order.order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC; + tsBufSetCursor(pRuntimeEnv->pTSBuf, &cur); + if (pRuntimeEnv->pTSBuf) { + pRuntimeEnv->pTSBuf->cur.order = pQuery->order.order; + } + + setCtxOrder(pRuntimeEnv); + + SET_MASTER_SCAN_FLAG(pRuntimeEnv); + + // update the pQuery->window.skey and pQuery->window.ekey to limit the scan scope of sliding query + // during reverse scan + pQuery->lastKey = prev.skey; + pQuery->status = status1; + pQuery->window.ekey = prev.ekey; } void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv) { @@ -3859,17 +3891,17 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResult *result, int32_t orde int32_t totalSubset = getNumOfSubset(pQInfo); if (orderType == TSDB_ORDER_ASC) { - startIdx = pQInfo->subgroupIdx; + startIdx = pQInfo->groupIndex; step = 1; } else { // desc order copy all data - startIdx = totalSubset - pQInfo->subgroupIdx - 1; + startIdx = totalSubset - pQInfo->groupIndex - 1; step = -1; } for (int32_t i = startIdx; (i < totalSubset) && (i >= 0); i += step) { if (result[i].numOfRows == 0) { pQInfo->offset = 0; - pQInfo->subgroupIdx += 1; + pQInfo->groupIndex += 1; continue; } @@ -3887,7 +3919,7 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResult *result, int32_t orde pQInfo->offset += numOfRowsToCopy; } else { pQInfo->offset = 0; - pQInfo->subgroupIdx += 1; + pQInfo->groupIndex += 1; } for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) { @@ -4474,14 +4506,14 @@ static void sequentialTableProcess(SQInfo *pQInfo) { assert(pQuery->limit.offset == 0 && pQuery->limit.limit != 0); #if 0 - while (pQInfo->subgroupIdx < numOfGroups) { + while (pQInfo->groupIndex < numOfGroups) { - SArray* group = taosArrayGetP(pQInfo->groupInfo.pGroupList, pQInfo->subgroupIdx); + SArray* group = taosArrayGetP(pQInfo->groupInfo.pGroupList, pQInfo->groupIndex); size_t numOfTable = taosArrayGetSize(group); if (isFirstLastRowQuery(pQuery)) { dTrace("QInfo:%p last_row query on vid:%d, numOfGroups:%d, current group:%d", pQInfo, vid, pTableIdList->numOfSubSet, - pQInfo->subgroupIdx); + pQInfo->groupIndex); TSKEY key = -1; int32_t index = -1; @@ -4502,7 +4534,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { // assert(num >= 0); } else { dTrace("QInfo:%p interp query on vid:%d, numOfGroups:%d, current group:%d", pQInfo, vid, pTableIdList->numOfSubSet, - pQInfo->subgroupIdx); + pQInfo->groupIndex); for (int32_t k = start; k <= end; ++k) { if (isQueryKilled(pQInfo)) { @@ -4520,7 +4552,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { } } - pSupporter->subgroupIdx++; + pSupporter->groupIndex++; // output buffer is full, return to client if (pQuery->size >= pQuery->pointsToRead) { @@ -4537,7 +4569,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { * if the subgroup index is larger than 0, results generated by group by tbname,k is existed. * we need to return it to client in the first place. */ - if (pQInfo->subgroupIdx > 0) { + if (pQInfo->groupIndex > 0) { copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult); pQuery->rec.total += pQuery->rec.rows; @@ -4667,7 +4699,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { } } - pQInfo->subgroupIdx = 0; + pQInfo->groupIndex = 0; pQuery->rec.rows = 0; copyFromWindowResToSData(pQInfo, pWindowResInfo->pResult); } @@ -4744,7 +4776,7 @@ static void doRestoreContext(SQInfo* pQInfo) { pRuntimeEnv->pTSBuf->cur.order = pRuntimeEnv->pTSBuf->cur.order ^ 1; } - enableFuncForForwardScan(pRuntimeEnv, pQuery->order.order); + setCtxOrder(pRuntimeEnv); SET_MASTER_SCAN_FLAG(pRuntimeEnv); } @@ -4777,9 +4809,9 @@ static void multiTableQueryProcess(SQInfo *pQInfo) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQuery * pQuery = pRuntimeEnv->pQuery; - if (pQInfo->subgroupIdx > 0) { + if (pQInfo->groupIndex > 0) { /* - * if the subgroupIdx > 0, the query process must be completed yet, we only need to + * if the groupIndex > 0, the query process must be completed yet, we only need to * copy the data into output buffer */ if (isIntervalQuery(pQuery)) { @@ -4841,7 +4873,7 @@ static void multiTableQueryProcess(SQInfo *pQInfo) { } if (isIntervalQuery(pQuery) || isSumAvgRateQuery(pQuery)) { -// assert(pSupporter->subgroupIdx == 0 && pSupporter->numOfGroupResultPages == 0); +// assert(pSupporter->groupIndex == 0 && pSupporter->numOfGroupResultPages == 0); if (mergeIntoGroupResult(pQInfo) == TSDB_CODE_SUCCESS) { copyResToQueryResultBuf(pQInfo, pQuery); @@ -4979,11 +5011,11 @@ static void tableIntervalProcess(SQInfo *pQInfo) { tableIntervalProcessImpl(pRuntimeEnv); if (isIntervalQuery(pQuery)) { - pQInfo->subgroupIdx = 0; // always start from 0 + pQInfo->groupIndex = 0; // always start from 0 pQuery->rec.rows = 0; copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult); - clearFirstNTimeWindow(pRuntimeEnv, pQInfo->subgroupIdx); + clearFirstNTimeWindow(pRuntimeEnv, pQInfo->groupIndex); } // the offset is handled at prepare stage if no interpolation involved @@ -5015,10 +5047,10 @@ static void tableIntervalProcess(SQInfo *pQInfo) { // all data scanned, the group by normal column can return if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // todo refactor with merge interval time result - pQInfo->subgroupIdx = 0; + pQInfo->groupIndex = 0; pQuery->rec.rows = 0; copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult); - clearFirstNTimeWindow(pRuntimeEnv, pQInfo->subgroupIdx); + clearFirstNTimeWindow(pRuntimeEnv, pQInfo->groupIndex); } pQInfo->pointsInterpo += numOfInterpo; @@ -5054,13 +5086,13 @@ static void tableQueryImpl(SQInfo* pQInfo) { // todo limit the output for interval query? pQuery->rec.rows = 0; - pQInfo->subgroupIdx = 0; // always start from 0 + pQInfo->groupIndex = 0; // always start from 0 if (pRuntimeEnv->windowResInfo.size > 0) { copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult); pQuery->rec.rows += pQuery->rec.rows; - clearFirstNTimeWindow(pRuntimeEnv, pQInfo->subgroupIdx); + clearFirstNTimeWindow(pRuntimeEnv, pQInfo->groupIndex); if (pQuery->rec.rows > 0) { dTrace("QInfo:%p %d rows returned from group results, total:%d", pQInfo, pQuery->rec.rows, pQuery->rec.total); -- GitLab