From 2a89ac7d7f3cfa2dc6674fe8f92a61887147ff86 Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Fri, 14 Feb 2020 14:36:33 +0800 Subject: [PATCH] fix bugs in sliding query processing --- src/client/src/tscSQLParser.c | 4 + src/system/detail/inc/vnodeQueryImpl.h | 2 +- src/system/detail/src/vnodeQueryImpl.c | 364 ++++++++++++---------- src/system/detail/src/vnodeQueryProcess.c | 29 +- 4 files changed, 215 insertions(+), 184 deletions(-) diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index c37b30c0e2..e14b6ee6de 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -609,6 +609,10 @@ int32_t parseIntervalClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) { // for top/bottom + interval query, we do not add additional timestamp column in the front if (isTopBottomQuery(pQueryInfo)) { + if (parseSlidingClause(pQueryInfo, pQuerySql) != TSDB_CODE_SUCCESS) { + return TSDB_CODE_INVALID_SQL; + } + return TSDB_CODE_SUCCESS; } diff --git a/src/system/detail/inc/vnodeQueryImpl.h b/src/system/detail/inc/vnodeQueryImpl.h index f0c49efa5a..187cca1874 100644 --- a/src/system/detail/inc/vnodeQueryImpl.h +++ b/src/system/detail/inc/vnodeQueryImpl.h @@ -282,7 +282,7 @@ void clearGroupResultBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pOneOutpu void copyGroupResultBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult* dst, const SWindowResult* src); void resetSlidingWindowInfo(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo* pWindowResInfo); -void clearCompletedSlidingWindows(SQueryRuntimeEnv* pRuntimeEnv); +void clearClosedSlidingWindows(SQueryRuntimeEnv* pRuntimeEnv); int32_t numOfClosedSlidingWindow(SWindowResInfo* pWindowResInfo); void closeSlidingWindow(SWindowResInfo* pWindowResInfo, int32_t slot); void closeAllSlidingWindow(SWindowResInfo* pWindowResInfo); diff --git a/src/system/detail/src/vnodeQueryImpl.c b/src/system/detail/src/vnodeQueryImpl.c index fe8586edba..a05c563315 100644 --- a/src/system/detail/src/vnodeQueryImpl.c +++ b/src/system/detail/src/vnodeQueryImpl.c @@ -1010,16 +1010,10 @@ SBlockInfo getBlockBasicInfo(SQueryRuntimeEnv *pRuntimeEnv, void *pBlock, int32_ return blockInfo; } -static bool checkQueryRangeAgainstNextBlock(SBlockInfo *pBlockInfo, SQueryRuntimeEnv *pRuntimeEnv) { - SQuery *pQuery = pRuntimeEnv->pQuery; - +static bool checkQueryRangeAgainstNextBlock(SBlockInfo *pBlockInfo, SQuery *pQuery) { if ((QUERY_IS_ASC_QUERY(pQuery) && pBlockInfo->keyFirst > pQuery->ekey) || (!QUERY_IS_ASC_QUERY(pQuery) && pBlockInfo->keyLast < pQuery->ekey)) { - int32_t pos = QUERY_IS_ASC_QUERY(pQuery) ? 0 : pBlockInfo->size - 1; - - savePointPosition(&pRuntimeEnv->nextPos, pQuery->fileId, pQuery->slot, pos); setQueryStatus(pQuery, QUERY_COMPLETED); - return false; } @@ -1033,7 +1027,7 @@ static bool checkQueryRangeAgainstNextBlock(SBlockInfo *pBlockInfo, SQueryRuntim * @param forwardStep * @return TRUE means query not completed, FALSE means query is completed */ -static bool queryCompleteInBlock(SQuery *pQuery, SBlockInfo *pBlockInfo, int32_t forwardStep) { +static bool queryPausedInCurrentBlock(SQuery *pQuery, SBlockInfo *pBlockInfo, int32_t forwardStep) { if (Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)) { // assert(pQuery->checkBufferInLoop == 1 && pQuery->over == QUERY_RESBUF_FULL && pQuery->pointsOffset == 0); @@ -1466,12 +1460,12 @@ static SWindowResult *doSetSlidingWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, S // todo } - pWindowResInfo->capacity = newCap; - for (int32_t i = pWindowResInfo->capacity; i < newCap; ++i) { SPosInfo pos = {-1, -1}; createQueryResultInfo(pQuery, &pWindowResInfo->pResult[i], pRuntimeEnv->stableQuery, &pos); } + + pWindowResInfo->capacity = newCap; } // add a new result set for a new group @@ -1490,13 +1484,16 @@ static STimeWindow getActiveSlidingWindow(SWindowResInfo *pWindowResInfo, int64_ w.skey = pWindowResInfo->prevSKey; w.ekey = w.skey + pQuery->intervalTime - 1; } else { - int32_t slot = curSlidingWindow(pWindowResInfo); - STimeWindow *window = &pWindowResInfo->pResult[slot].window; + int32_t slot = curSlidingWindow(pWindowResInfo); + w = pWindowResInfo->pResult[slot].window; + } +// STimeWindow *window = &pWindowResInfo->pResult[slot].window; - if (window->skey <= ts && window->ekey >= ts) { - w = *window; // belongs to current active window - } else { - int64_t st = window->skey; + if (w.skey > ts || w.ekey < ts) { +// if (w.skey <= ts && w.ekey >= ts) { +// w = *window; // belongs to current active window +// } else { + int64_t st = w.skey; while (st > ts) { st -= pQuery->slidingTime; @@ -1509,7 +1506,6 @@ static STimeWindow getActiveSlidingWindow(SWindowResInfo *pWindowResInfo, int64_ w.skey = st; w.ekey = w.skey + pQuery->intervalTime - 1; } - } assert(ts >= w.skey && ts <= w.ekey); return w; @@ -1582,6 +1578,7 @@ static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SWindowRes } static SWindowStatus *getSlidingWindowStatus(SWindowResInfo *pWindowResInfo, int32_t slot) { + assert(slot >= 0 && slot < pWindowResInfo->size); return &pWindowResInfo->pResult[slot].status; } @@ -1629,8 +1626,14 @@ static void doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey, } } + // all windows are closed, set the last one to be the skey + if (skey == 0) { + skey = pWindowResInfo->pResult[pWindowResInfo->size-1].window.skey; + } + pWindowResInfo->prevSKey = skey; - + assert(skey != 0); + // the number of completed slots are larger than the threshold, dump to client immediately. int32_t v = numOfClosedSlidingWindow(pWindowResInfo); if (v > pWindowResInfo->threshold) { @@ -1639,6 +1642,8 @@ static void doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey, dTrace("QInfo:%p total window:%d, closed:%d", GET_QINFO_ADDR(pQuery), pWindowResInfo->size, v); } + + assert(pWindowResInfo->prevSKey != 0); } } @@ -1696,8 +1701,7 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t TSKEY ts = primaryKeyCol[offset]; STimeWindow win = getActiveSlidingWindow(pWindowResInfo, ts, pQuery); - int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pRuntimeEnv->pMeterObj->sid, &win); - if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code + if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pRuntimeEnv->pMeterObj->sid, &win) != TSDB_CODE_SUCCESS) { return 0; } @@ -1715,24 +1719,28 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t forwardStep = pQuery->pos + 1; } } - - for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) { - pCtx[k].nStartQueryTimestamp = win.skey; - pCtx[k].size = forwardStep; - pCtx[k].startOffset = pQuery->pos; - - int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId; - if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { - aAggs[functionId].xFunction(&pCtx[k]); + + SWindowStatus* pStatus = getSlidingWindowStatus(pWindowResInfo, curSlidingWindow(pWindowResInfo)); + + if (IS_MASTER_SCAN(pRuntimeEnv) || pStatus->closed) { + for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) { + pCtx[k].nStartQueryTimestamp = win.skey; + pCtx[k].size = forwardStep; + pCtx[k].startOffset = (QUERY_IS_ASC_QUERY(pQuery)) ? pQuery->pos : pQuery->pos - (forwardStep - 1); + + int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId; + if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { + aAggs[functionId].xFunction(&pCtx[k]); + } } } - int32_t index = pWindowResInfo->curIndex; + int32_t index = pWindowResInfo->curIndex; STimeWindow nextWin = win; while (1) { getNextLogicalQueryRange(pRuntimeEnv, &nextWin); - + if (pWindowResInfo->startTime > nextWin.skey || (nextWin.skey > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) || (nextWin.ekey < pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) { pWindowResInfo->curIndex = index; @@ -1782,23 +1790,21 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t forwardStep = startPos + 1; } } - - for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) { - pCtx[k].nStartQueryTimestamp = nextWin.skey; - pCtx[k].size = forwardStep; - pCtx[k].startOffset = startPos; - - SWindowStatus *pStatus = getSlidingWindowStatus(pWindowResInfo, curSlidingWindow(pWindowResInfo)); - if (!IS_MASTER_SCAN(pRuntimeEnv) && !pStatus->closed) { - continue; - } - - int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId; - if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { - aAggs[functionId].xFunction(&pCtx[k]); + + pStatus = getSlidingWindowStatus(pWindowResInfo, curSlidingWindow(pWindowResInfo)); + + if (IS_MASTER_SCAN(pRuntimeEnv) || pStatus->closed) { + for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) { + pCtx[k].nStartQueryTimestamp = nextWin.skey; + pCtx[k].size = forwardStep; + pCtx[k].startOffset = (QUERY_IS_ASC_QUERY(pQuery))? startPos : startPos - (forwardStep - 1); + + int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId; + if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { + aAggs[functionId].xFunction(&pCtx[k]); + } } } - // } else { // pWindowResInfo->curIndex = index; // break; @@ -1941,8 +1947,8 @@ void resetSlidingWindowInfo(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWind } for (int32_t i = 0; i < pWindowResInfo->size; ++i) { - SWindowResult *pOneRes = &pWindowResInfo->pResult[i]; - clearGroupResultBuf(pRuntimeEnv, pOneRes); + SWindowResult *pWindowRes = &pWindowResInfo->pResult[i]; + clearGroupResultBuf(pRuntimeEnv, pWindowRes); } pWindowResInfo->curIndex = -1; @@ -1956,7 +1962,7 @@ void resetSlidingWindowInfo(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWind pWindowResInfo->prevSKey = 0; } -void clearCompletedSlidingWindows(SQueryRuntimeEnv *pRuntimeEnv) { +void clearClosedSlidingWindows(SQueryRuntimeEnv *pRuntimeEnv) { SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; if (pWindowResInfo == NULL || pWindowResInfo->capacity == 0 || pWindowResInfo->size == 0) { return; @@ -1972,33 +1978,34 @@ void clearCompletedSlidingWindows(SQueryRuntimeEnv *pRuntimeEnv) { } } + // no window is closed, no need to clear the window list if (i == 0) { return; } - int32_t remain = pWindowResInfo->size - i; + int32_t unclosed = pWindowResInfo->size - i; - // clear remain list - for (int32_t k = 0; k < remain; ++k) { + // clear all the closed windows from the window list + for (int32_t k = 0; k < unclosed; ++k) { copyGroupResultBuf(pRuntimeEnv, &pWindowResInfo->pResult[k], &pWindowResInfo->pResult[i + k]); } - for (int32_t k = remain; k < pWindowResInfo->size; ++k) { - SWindowResult *pOneRes = &pWindowResInfo->pResult[k]; - clearGroupResultBuf(pRuntimeEnv, pOneRes); + // move the unclosed window in the front of the window list + for (int32_t k = unclosed; k < pWindowResInfo->size; ++k) { + SWindowResult *pWindowRes = &pWindowResInfo->pResult[k]; + clearGroupResultBuf(pRuntimeEnv, pWindowRes); } - pWindowResInfo->size = remain; + pWindowResInfo->size = unclosed; for (int32_t k = 0; k < pWindowResInfo->size; ++k) { SWindowResult *pResult = &pWindowResInfo->pResult[k]; int32_t *p = (int32_t *)taosGetDataFromHashTable(pWindowResInfo->hashList, (const char *)&pResult->window.skey, TSDB_KEYSIZE); - int32_t v = *p; - v = (v - i); - + int32_t v = (*p - i); + + //todo add the update function for hash table taosDeleteFromHashTable(pWindowResInfo->hashList, (const char *)&pResult->window.skey, TSDB_KEYSIZE); - taosAddToHashTable(pWindowResInfo->hashList, (const char *)&pResult->window.skey, TSDB_KEYSIZE, (char *)&v, sizeof(int32_t)); } @@ -2213,7 +2220,7 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t * // decide the time window according to the primary timestamp int64_t ts = primaryKeyCol[offset]; STimeWindow win = getActiveSlidingWindow(pWindowResInfo, ts, pQuery); - + int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pRuntimeEnv->pMeterObj->sid, &win); if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code continue; @@ -2221,33 +2228,29 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t * // all startOffset are identical offset -= pCtx[0].startOffset; - - for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) { - int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId; - pCtx[k].nStartQueryTimestamp = win.skey; - - SWindowStatus *pStatus = getSlidingWindowStatus(pWindowResInfo, curSlidingWindow(pWindowResInfo)); - - if (!IS_MASTER_SCAN(pRuntimeEnv) && !pStatus->closed) { - // qTrace("QInfo:%p not completed in supplementary scan, ignore funcId:%d, window:%lld-%lld", - // GET_QINFO_ADDR(pQuery), functionId, pStatus->window.skey, pStatus->window.ekey); - continue; - } - - if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { - aAggs[functionId].xFunctionF(&pCtx[k], offset); + + SWindowStatus *pStatus = getSlidingWindowStatus(pWindowResInfo, curSlidingWindow(pWindowResInfo)); + if (IS_MASTER_SCAN(pRuntimeEnv) || pStatus->closed) { + + for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) { + int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId; + pCtx[k].nStartQueryTimestamp = win.skey; + + if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { + aAggs[functionId].xFunctionF(&pCtx[k], offset); + } } } - + lastKey = ts; - int32_t index = pWindowResInfo->curIndex; - + int32_t prev = pWindowResInfo->curIndex; STimeWindow nextWin = win; + while (1) { getNextLogicalQueryRange(pRuntimeEnv, &nextWin); if (pWindowResInfo->startTime > nextWin.skey || (nextWin.skey > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) || (nextWin.skey > pQuery->skey && !QUERY_IS_ASC_QUERY(pQuery))) { - pWindowResInfo->curIndex = index; + pWindowResInfo->curIndex = prev; break; } @@ -2255,27 +2258,23 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t * // null data, failed to allocate more memory buffer if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pRuntimeEnv->pMeterObj->sid, &nextWin) != TSDB_CODE_SUCCESS) { - pWindowResInfo->curIndex = index; + pWindowResInfo->curIndex = prev; break; } - - for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) { - int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId; - pCtx[k].nStartQueryTimestamp = nextWin.skey; - - SWindowStatus *pStatus = getSlidingWindowStatus(pWindowResInfo, curSlidingWindow(pWindowResInfo)); - if (!IS_MASTER_SCAN(pRuntimeEnv) && !pStatus->closed) { - // qTrace("QInfo:%p not completed in supplementary scan, ignore funcId:%d, window:%lld-%lld", - // GET_QINFO_ADDR(pQuery), functionId, pStatus->window.skey, pStatus->window.ekey); - continue; - } - - if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { - aAggs[functionId].xFunctionF(&pCtx[k], offset); + + pStatus = getSlidingWindowStatus(pWindowResInfo, curSlidingWindow(pWindowResInfo)); + if (IS_MASTER_SCAN(pRuntimeEnv) || pStatus->closed) { + for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) { + int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId; + pCtx[k].nStartQueryTimestamp = nextWin.skey; + + if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { + aAggs[functionId].xFunctionF(&pCtx[k], offset); + } } } } else { - pWindowResInfo->curIndex = index; + pWindowResInfo->curIndex = prev; break; } } @@ -2389,47 +2388,37 @@ static int32_t applyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo * SQuery *pQuery = pRuntimeEnv->pQuery; int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); - validateQueryRangeAndData(pRuntimeEnv, pPrimaryColumn, pBlockInfo); if (QUERY_IS_ASC_QUERY(pQuery)) { if (pQuery->ekey < pBlockInfo->keyLast) { forwardStep = getForwardStepsInBlock(pBlockInfo->size, searchFn, pQuery->ekey, pQuery->pos, pQuery->order.order, pPrimaryColumn); - assert(forwardStep >= 0); - - if (forwardStep == 0) { - // no qualified data in current block, do not update the lastKey value + if (forwardStep == 0) { // no qualified data in current block, do not update the lastKey value assert(pQuery->ekey < pPrimaryColumn[pQuery->pos]); - } else { // todo MAX()! - pQuery->lastKey = pQuery->ekey + step; // pPrimaryColumn[pQuery->pos + (forwardStep - 1)] + step; + } else { + pQuery->lastKey = MAX(pQuery->ekey, pPrimaryColumn[pQuery->pos + (forwardStep - 1)]) + step; } - } else { forwardStep = pBlockInfo->size - pQuery->pos; - assert(forwardStep > 0); - pQuery->lastKey = pBlockInfo->keyLast + step; } } else { // desc if (pQuery->ekey > pBlockInfo->keyFirst) { forwardStep = getForwardStepsInBlock(pBlockInfo->size, searchFn, pQuery->ekey, pQuery->pos, pQuery->order.order, pPrimaryColumn); - assert(forwardStep >= 0); - - if (forwardStep == 0) { - // no qualified data in current block, do not update the lastKey value + if (forwardStep == 0) { // no qualified data in current block, do not update the lastKey value assert(pQuery->ekey > pPrimaryColumn[pQuery->pos]); } else { - pQuery->lastKey = pQuery->ekey + step; // pPrimaryColumn[pQuery->pos - (forwardStep - 1)] + step; + pQuery->lastKey = MIN(pQuery->ekey, pPrimaryColumn[pQuery->pos - (forwardStep - 1)]) + step; } } else { forwardStep = pQuery->pos + 1; - assert(forwardStep > 0); - pQuery->lastKey = pBlockInfo->keyFirst + step; } } + + assert(forwardStep >= 0); int32_t newForwardStep = reviseForwardSteps(pRuntimeEnv, forwardStep); assert(newForwardStep <= forwardStep && newForwardStep >= 0); @@ -4380,8 +4369,8 @@ static int32_t createQueryResultBuffer(SQueryRuntimeEnv *pRuntimeEnv, int32_t nu // pRuntimeEnv->windowResInfo.pResult = calloc(numOfRows, sizeof(SWindowResult)); // // for (int32_t k = 0; k < numOfRows; ++k) { - // SWindowResult *pOneRes = &pRuntimeEnv->windowResInfo.pResult[k]; - // pOneRes->nAlloc = 1; + // SWindowResult *pWindowRes = &pRuntimeEnv->windowResInfo.pResult[k]; + // pWindowRes->nAlloc = 1; // // /* // * for single table top/bottom query, the output for group by normal column, the output rows is @@ -4391,7 +4380,7 @@ static int32_t createQueryResultBuffer(SQueryRuntimeEnv *pRuntimeEnv, int32_t nu // assert(pQuery->numOfOutputCols > 1); // // SSqlFunctionExpr *pExpr = &pQuery->pSelectExpr[1]; - // pOneRes->nAlloc = pExpr->pBase.arg[0].argValue.i64; + // pWindowRes->nAlloc = pExpr->pBase.arg[0].argValue.i64; // } // // if (page == NULL || page->numOfElems >= pRuntimeEnv->numOfRowsPerPage) { @@ -4401,7 +4390,7 @@ static int32_t createQueryResultBuffer(SQueryRuntimeEnv *pRuntimeEnv, int32_t nu // assert(pageId >= 0); // // SPosInfo posInfo = {.pageId = pageId, .rowId = page->numOfElems}; - // createQueryResultInfo(pQuery, pOneRes, isSTableQuery, &posInfo); + // createQueryResultInfo(pQuery, pWindowRes, isSTableQuery, &posInfo); // page->numOfElems += 1; // next row is available // } @@ -4563,8 +4552,9 @@ int32_t vnodeQuerySingleMeterPrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, SMete pRuntimeEnv->numOfRowsPerPage = getNumOfRowsInResultPage(pQuery, false); if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (pQuery->intervalTime > 0 && pQuery->slidingTime > 0)) { int32_t rows = initialNumOfRows(pSupporter); - - if ((code = createQueryResultBuffer(pRuntimeEnv, rows, false)) != TSDB_CODE_SUCCESS) { + + code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, rows, pQuery->rowSize); + if (code != TSDB_CODE_SUCCESS) { return code; } @@ -4771,7 +4761,8 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) vnodeRecordAllFiles(pQInfo, pMeter->vnode); if (pQuery->intervalTime == 0 && pQuery->slidingTime <= 0) { - if ((ret = createQueryResultBuffer(pRuntimeEnv, 3, true)) != TSDB_CODE_SUCCESS) { + ret = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, 3, pQuery->rowSize); + if (ret != TSDB_CODE_SUCCESS) { return ret; } @@ -5194,7 +5185,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { cnt += forwardStep; - if (queryCompleteInBlock(pQuery, &blockInfo, forwardStep)) { + if (queryPausedInCurrentBlock(pQuery, &blockInfo, forwardStep)) { int32_t nextPos = accessPos + step; /* @@ -5203,33 +5194,38 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { * 2. multi-output query that may cause buffer overflow. */ if (pQuery->intervalTime > 0 || - (Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL) && pQuery->checkBufferInLoop == 1)) { + (Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)/* && pQuery->checkBufferInLoop == 1*/)) { if (nextPos >= blockInfo.size || nextPos < 0) { moveToNextBlock(pRuntimeEnv, step, searchFn, !LOAD_DATA); - // slot/pos/fileId is updated in moveToNextBlock function - savePointPosition(&pRuntimeEnv->nextPos, pQuery->fileId, pQuery->slot, pQuery->pos); + if (!Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK|QUERY_COMPLETED)) { + // slot/pos/fileId is updated in moveToNextBlock function + savePointPosition(&pRuntimeEnv->nextPos, pQuery->fileId, pQuery->slot, pQuery->pos); + + // check next block + void *pNextBlock = getGenericDataBlock(pMeterObj, pRuntimeEnv, pQuery->slot); + + int32_t blockType = (IS_DISK_DATA_BLOCK(pQuery)) ? BLK_FILE_BLOCK : BLK_CACHE_BLOCK; + blockInfo = getBlockBasicInfo(pRuntimeEnv, pNextBlock, blockType); + + // check if need to close window result or not + if (pQuery->intervalTime > 0 && pQuery->slidingTime > 0) { + TSKEY t = (QUERY_IS_ASC_QUERY(pQuery))? blockInfo.keyFirst:blockInfo.keyLast; + doCheckQueryCompleted(pRuntimeEnv, t, &pRuntimeEnv->windowResInfo); + } + } + } else { savePointPosition(&pRuntimeEnv->nextPos, pQuery->fileId, pQuery->slot, accessPos + step); } } break; } else { // query not completed, move to next block - int64_t start = taosGetTimestampUs(); - blockLoadStatus = moveToNextBlock(pRuntimeEnv, step, searchFn, LOAD_DATA); if (Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK | QUERY_COMPLETED)) { - savePointPosition(&pRuntimeEnv->nextPos, pQuery->fileId, pQuery->slot, pQuery->pos); setQueryStatus(pQuery, QUERY_COMPLETED); break; } - - int64_t delta = (taosGetTimestampUs() - start); - if (IS_DISK_DATA_BLOCK(pQuery)) { - pSummary->fileTimeUs += delta; - } else { - pSummary->cacheTimeUs += delta; - } } // check next block @@ -5237,9 +5233,23 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { int32_t blockType = (IS_DISK_DATA_BLOCK(pQuery)) ? BLK_FILE_BLOCK : BLK_CACHE_BLOCK; blockInfo = getBlockBasicInfo(pRuntimeEnv, pNextBlock, blockType); - if (!checkQueryRangeAgainstNextBlock(&blockInfo, pRuntimeEnv)) { + + if ((QUERY_IS_ASC_QUERY(pQuery) && blockInfo.keyFirst > pQuery->ekey) || + (!QUERY_IS_ASC_QUERY(pQuery) && blockInfo.keyLast < pQuery->ekey)) { + setQueryStatus(pQuery, QUERY_COMPLETED); break; } + + // check if need to close window result or not + if (pQuery->intervalTime > 0 && pQuery->slidingTime > 0) { + TSKEY t = (QUERY_IS_ASC_QUERY(pQuery))? blockInfo.keyFirst:blockInfo.keyLast; + doCheckQueryCompleted(pRuntimeEnv, t, &pRuntimeEnv->windowResInfo); + } + + if(Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)) { + break; + } + } // while(1) return cnt; @@ -5914,26 +5924,39 @@ void createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool isSTa } } -void clearGroupResultBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pOneOutputRes) { - if (pOneOutputRes == NULL) { +void clearGroupResultBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pWindowRes) { + if (pWindowRes == NULL) { return; } for (int32_t i = 0; i < pRuntimeEnv->pQuery->numOfOutputCols; ++i) { - SResultInfo *pResultInfo = &pOneOutputRes->resultInfo[i]; + SResultInfo *pResultInfo = &pWindowRes->resultInfo[i]; - char * s = getPosInResultPage(pRuntimeEnv, i, pOneOutputRes); + char * s = getPosInResultPage(pRuntimeEnv, i, pWindowRes); size_t size = pRuntimeEnv->pQuery->pSelectExpr[i].resBytes; memset(s, 0, size); resetResultInfo(pResultInfo); } + + pWindowRes->numOfRows = 0; + pWindowRes->nAlloc = 0; + pWindowRes->pos = (SPosInfo){-1, -1}; + pWindowRes->status.closed = false; + pWindowRes->window = (STimeWindow) {0, 0}; } +/** + * The source window result pos attribution of the source window result does not assign to the destination, + * since the attribute of "Pos" is bound to each window result when the window result is created in the + * disk-based result buffer. + */ void copyGroupResultBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *dst, const SWindowResult *src) { dst->numOfRows = src->numOfRows; dst->nAlloc = src->nAlloc; - + dst->window = src->window; + dst->status = src->status; + int32_t nOutputCols = pRuntimeEnv->pQuery->numOfOutputCols; for (int32_t i = 0; i < nOutputCols; ++i) { @@ -5956,16 +5979,16 @@ void copyGroupResultBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *dst, const } } -void destroyGroupResultBuf(SWindowResult *pOneOutputRes, int32_t nOutputCols) { - if (pOneOutputRes == NULL) { +void destroyGroupResultBuf(SWindowResult *pWindowRes, int32_t nOutputCols) { + if (pWindowRes == NULL) { return; } for (int32_t i = 0; i < nOutputCols; ++i) { - free(pOneOutputRes->resultInfo[i].interResultBuf); + free(pWindowRes->resultInfo[i].interResultBuf); } - free(pOneOutputRes->resultInfo); + free(pWindowRes->resultInfo); } void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) { @@ -6005,7 +6028,7 @@ void forwardCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, int64_t output) { // set next output position if (IS_OUTER_FORWARD(aAggs[functionId].nStatus)) { - pRuntimeEnv->pCtx[j].aOutputBuf += pRuntimeEnv->pCtx[j].outputBytes * output /** factor*/; + pRuntimeEnv->pCtx[j].aOutputBuf += pRuntimeEnv->pCtx[j].outputBytes * output; } if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) { @@ -6239,8 +6262,7 @@ void vnodeScanAllData(SQueryRuntimeEnv *pRuntimeEnv) { doSingleMeterSupplementScan(pRuntimeEnv); - // update the pQuery->skey/pQuery->ekey to limit the scan scope of sliding query during - // supplementary scan +// update the pQuery->skey/pQuery->ekey to limit the scan scope of sliding query during supplementary scan pQuery->skey = newSkey; } @@ -6375,30 +6397,30 @@ void forwardIntervalQueryRange(SMeterQuerySupportObj *pSupporter, SQueryRuntimeE return; } - int32_t r = getNextIntervalQueryRange(pSupporter, pRuntimeEnv, &pQuery->skey, &pQuery->ekey); - if (r == QUERY_COMPLETED) { - setQueryStatus(pQuery, QUERY_COMPLETED); - return; - } - - getNextLogicalQueryRange(pRuntimeEnv, &pRuntimeEnv->intervalWindow); - - /* ensure the search in cache will return right position */ - pQuery->lastKey = pQuery->skey; - - TSKEY nextTimestamp = loadRequiredBlockIntoMem(pRuntimeEnv, &pRuntimeEnv->nextPos); - if ((nextTimestamp > pSupporter->rawEKey && QUERY_IS_ASC_QUERY(pQuery)) || - (nextTimestamp < pSupporter->rawEKey && !QUERY_IS_ASC_QUERY(pQuery)) || - Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK)) { - setQueryStatus(pQuery, QUERY_COMPLETED); - return; - } - - // bridge the gap in group by time function - if ((nextTimestamp > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) || - (nextTimestamp < pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) { - getAlignedIntervalQueryRange(pRuntimeEnv, nextTimestamp, pSupporter->rawSKey, pSupporter->rawEKey); - } +// int32_t r = getNextIntervalQueryRange(pSupporter, pRuntimeEnv, &pQuery->skey, &pQuery->ekey); +// if (r == QUERY_COMPLETED) { +// setQueryStatus(pQuery, QUERY_COMPLETED); +// return; +// } +// +// getNextLogicalQueryRange(pRuntimeEnv, &pRuntimeEnv->intervalWindow); +// +// /* ensure the search in cache will return right position */ +// pQuery->lastKey = pQuery->skey; +// +// TSKEY nextTimestamp = loadRequiredBlockIntoMem(pRuntimeEnv, &pRuntimeEnv->nextPos); +// if ((nextTimestamp > pSupporter->rawEKey && QUERY_IS_ASC_QUERY(pQuery)) || +// (nextTimestamp < pSupporter->rawEKey && !QUERY_IS_ASC_QUERY(pQuery)) || +// Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK)) { +// setQueryStatus(pQuery, QUERY_COMPLETED); +// return; +// } +// +// // bridge the gap in group by time function +// if ((nextTimestamp > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) || +// (nextTimestamp < pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) { +// getAlignedIntervalQueryRange(pRuntimeEnv, nextTimestamp, pSupporter->rawSKey, pSupporter->rawEKey); +// } } static int32_t offsetComparator(const void *pLeft, const void *pRight) { diff --git a/src/system/detail/src/vnodeQueryProcess.c b/src/system/detail/src/vnodeQueryProcess.c index f5beb131c9..9b6404c2af 100644 --- a/src/system/detail/src/vnodeQueryProcess.c +++ b/src/system/detail/src/vnodeQueryProcess.c @@ -1114,11 +1114,8 @@ static void vnodeSingleMeterIntervalMainLooper(SMeterQuerySupportObj *pSupporter SQuery *pQuery = pRuntimeEnv->pQuery; while (1) { - assert((pQuery->skey <= pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) || - (pQuery->skey >= pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))); - initCtxOutputBuf(pRuntimeEnv); - clearCompletedSlidingWindows(pRuntimeEnv); + clearClosedSlidingWindows(pRuntimeEnv); vnodeScanAllData(pRuntimeEnv); if (isQueryKilled(pQuery)) { @@ -1141,16 +1138,17 @@ static void vnodeSingleMeterIntervalMainLooper(SMeterQuerySupportObj *pSupporter pQuery->limit.offset--; } } else { - pQuery->pointsRead += maxOutput; - forwardCtxOutputBuf(pRuntimeEnv, maxOutput); +// assert(0); +// pQuery->pointsRead += maxOutput; +// forwardCtxOutputBuf(pRuntimeEnv, maxOutput); } if (Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK)) { break; } - - forwardIntervalQueryRange(pSupporter, pRuntimeEnv); - if (Q_STATUS_EQUAL(pQuery->over, QUERY_COMPLETED|QUERY_RESBUF_FULL)) { + + loadRequiredBlockIntoMem(pRuntimeEnv, &pRuntimeEnv->nextPos); + if (Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)) { break; } @@ -1180,7 +1178,13 @@ static void vnodeSingleTableIntervalProcessor(SQInfo *pQInfo) { while (1) { resetCtxOutputBuf(pRuntimeEnv); vnodeSingleMeterIntervalMainLooper(pSupporter, pRuntimeEnv); - + + if (pQuery->intervalTime > 0) { + pSupporter->subgroupIdx = 0; + pQuery->pointsRead = 0; + copyFromGroupBuf(pQInfo, pRuntimeEnv->windowResInfo.pResult); + } + // the offset is handled at prepare stage if no interpolation involved if (pQuery->interpoType == TSDB_INTERPO_NONE) { doRevisedResultsByLimit(pQInfo); @@ -1208,8 +1212,9 @@ static void vnodeSingleTableIntervalProcessor(SQInfo *pQInfo) { } } - if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (pQuery->slidingTime > 0 && pQuery->intervalTime > 0)) { - pQInfo->pMeterQuerySupporter->subgroupIdx = 0; + // all data scanned, the group by normal column can return + if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { + pSupporter->subgroupIdx = 0; pQuery->pointsRead = 0; copyFromGroupBuf(pQInfo, pRuntimeEnv->windowResInfo.pResult); } -- GitLab