diff --git a/src/client/src/tscFunctionImpl.c b/src/client/src/tscFunctionImpl.c index ed7d4dfc602fde29538183d8f691a24f5d63ec56..81c3a88c50b98c5b40cfc7e90a517c6e6737a2c4 100644 --- a/src/client/src/tscFunctionImpl.c +++ b/src/client/src/tscFunctionImpl.c @@ -2162,7 +2162,7 @@ static STopBotInfo *getTopBotOutputInfo(SQLFunctionCtx *pCtx) { // only the first_stage_merge is directly written data into final output buffer if (pResInfo->superTableQ && pCtx->currentStage != SECONDARY_STAGE_MERGE) { return (STopBotInfo*) pCtx->aOutputBuf; - } else { // for normal table query and super table at the secondary_stage, result is written to intermediate buffer + } else { // during normal table query and super table at the secondary_stage, result is written to intermediate buffer return pResInfo->interResultBuf; } } diff --git a/src/system/detail/inc/vnodeQueryImpl.h b/src/system/detail/inc/vnodeQueryImpl.h index 187cca1874c6ec1f6df2eb0dfbd1c2b4b12d06a6..2e2a43711e567fd15cf74435062cd247cf956cc4 100644 --- a/src/system/detail/inc/vnodeQueryImpl.h +++ b/src/system/detail/inc/vnodeQueryImpl.h @@ -175,8 +175,7 @@ void copyFromGroupBuf(SQInfo* pQInfo, SWindowResult* result); SBlockInfo getBlockBasicInfo(SQueryRuntimeEnv* pRuntimeEnv, void* pBlock, int32_t blockType); SCacheBlock* getCacheDataBlock(SMeterObj* pMeterObj, SQueryRuntimeEnv* pRuntimeEnv, int32_t slot); -void queryOnBlock(SMeterQuerySupportObj* pSupporter, int64_t* primaryKeys, int32_t blockStatus, - SBlockInfo* pBlockBasicInfo, SMeterDataInfo* pDataHeadInfoEx, SField* pFields, +void queryOnBlock(SMeterQuerySupportObj* pSupporter, int32_t blockStatus, SBlockInfo* pBlockBasicInfo, SMeterDataInfo* pDataHeadInfoEx, SField* pFields, __block_search_fn_t searchFn); int32_t vnodeFilterQualifiedMeters(SQInfo *pQInfo, int32_t vid, tSidSet *pSidSet, SMeterDataInfo *pMeterDataInfo, @@ -278,14 +277,17 @@ void displayInterResult(SData** pdata, SQuery* pQuery, int32_t numOfRows); void vnodePrintQueryStatistics(SMeterQuerySupportObj* pSupporter); -void clearGroupResultBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pOneOutputRes); -void copyGroupResultBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult* dst, const SWindowResult* src); +void clearTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pOneOutputRes); +void copyTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult* dst, const SWindowResult* src); -void resetSlidingWindowInfo(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo* pWindowResInfo); -void clearClosedSlidingWindows(SQueryRuntimeEnv* pRuntimeEnv); -int32_t numOfClosedSlidingWindow(SWindowResInfo* pWindowResInfo); -void closeSlidingWindow(SWindowResInfo* pWindowResInfo, int32_t slot); -void closeAllSlidingWindow(SWindowResInfo* pWindowResInfo); +int32_t initWindowResInfo(SWindowResInfo *pWindowResInfo, SQueryRuntimeEnv *pRuntimeEnv, int32_t size, int32_t threshold, int16_t type); + +void cleanupTimeWindowInfo(SWindowResInfo *pWindowResInfo, SQueryRuntimeEnv *pRuntimeEnv); +void resetTimeWindowInfo(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo* pWindowResInfo); +void clearClosedTimeWindow(SQueryRuntimeEnv* pRuntimeEnv); +int32_t numOfClosedTimeWindow(SWindowResInfo* pWindowResInfo); +void closeTimeWindow(SWindowResInfo* pWindowResInfo, int32_t slot); +void closeAllTimeWindow(SWindowResInfo* pWindowResInfo); #ifdef __cplusplus } diff --git a/src/system/detail/inc/vnodeRead.h b/src/system/detail/inc/vnodeRead.h index 13834a07004d681127772c2407a8d135adb882e4..07b0f5765b162ee29f35ef0300d11a721624438c 100644 --- a/src/system/detail/inc/vnodeRead.h +++ b/src/system/detail/inc/vnodeRead.h @@ -197,7 +197,6 @@ typedef struct SMeterQueryInfo { int16_t lastResRows; int64_t tag; STSCursor cur; - SWindowResult* pWindowRes; int32_t sid; // for retrieve the page id list SWindowResInfo windowResInfo; @@ -279,7 +278,7 @@ typedef struct _qinfo { int (*fp)(SMeterObj*, SQuery*); } SQInfo; -int32_t vnodeQuerySingleMeterPrepare(SQInfo* pQInfo, SMeterObj* pMeterObj, SMeterQuerySupportObj* pSMultiMeterObj, +int32_t vnodeQuerySingleTablePrepare(SQInfo* pQInfo, SMeterObj* pMeterObj, SMeterQuerySupportObj* pSMultiMeterObj, void* param); void vnodeQueryFreeQInfoEx(SQInfo* pQInfo); diff --git a/src/system/detail/src/vnodeQueryImpl.c b/src/system/detail/src/vnodeQueryImpl.c index 226c2df35c38efb22a046db03bf1c44089e7ce49..6a2cfeca9cb48bbaaae663c6ba523b380537b23e 100644 --- a/src/system/detail/src/vnodeQueryImpl.c +++ b/src/system/detail/src/vnodeQueryImpl.c @@ -65,7 +65,7 @@ static TSKEY getTimestampInDiskBlock(SQueryRuntimeEnv *pRuntimeEnv, int32_t inde static void savePointPosition(SPositionInfo *position, int32_t fileId, int32_t slot, int32_t pos); static int32_t getNextDataFileCompInfo(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *pMeterObj, int32_t step); -static void setGroupOutputBuffer(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pResult); +static void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pResult); static void getAlignedIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, TSKEY keyInData, TSKEY skey, TSKEY ekey); @@ -83,7 +83,7 @@ static TSKEY getQueryPositionForCacheInvalid(SQueryRuntimeEnv *pRuntimeEnv, __ static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t functionId); void doGetAlignedIntervalQueryRangeImpl(SQuery *pQuery, int64_t pKey, int64_t keyFirst, int64_t keyLast, int64_t *actualSkey, int64_t *actualEkey, int64_t *skey, int64_t *ekey); -static void getNextLogicalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow *pTimeWindow); +static void getNextTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow *pTimeWindow); // check the offset value integrity static FORCE_INLINE int32_t validateHeaderOffsetSegment(SQInfo *pQInfo, char *filePath, int32_t vid, char *data, @@ -588,7 +588,7 @@ static void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, int64_t StartQue void createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool isSTableQuery, SPosInfo *posInfo); -static void destroyGroupResultBuf(SWindowResult *pOneOutputRes, int32_t nOutputCols); +static void destroyTimeWindowRes(SWindowResult *pOneOutputRes, int32_t nOutputCols); static int32_t binarySearchForBlockImpl(SCompBlock *pBlock, int32_t numOfBlocks, TSKEY skey, int32_t order) { int32_t firstSlot = 0; @@ -1432,17 +1432,17 @@ static char *getDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sa return dataBlock; } -static bool slidingWindowClosed(SWindowResInfo *pWindowResInfo, int32_t slot) { +static bool isWindowResClosed(SWindowResInfo *pWindowResInfo, int32_t slot) { return (pWindowResInfo->pResult[slot].status.closed == true); } -static int32_t curSlidingWindow(SWindowResInfo *pWindowResInfo) { +static int32_t curTimeWindow(SWindowResInfo *pWindowResInfo) { assert(pWindowResInfo->curIndex >= 0 && pWindowResInfo->curIndex < pWindowResInfo->size); return pWindowResInfo->curIndex; } -static SWindowResult *doSetSlidingWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWindowResInfo, - char *pData, int16_t bytes) { +static SWindowResult *doSetTimeWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWindowResInfo, char *pData, + int16_t bytes) { SQuery *pQuery = pRuntimeEnv->pQuery; int32_t *p1 = (int32_t *)taosGetDataFromHashTable(pWindowResInfo->hashList, pData, bytes); @@ -1464,7 +1464,7 @@ static SWindowResult *doSetSlidingWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, S SPosInfo pos = {-1, -1}; createQueryResultInfo(pQuery, &pWindowResInfo->pResult[i], pRuntimeEnv->stableQuery, &pos); } - + pWindowResInfo->capacity = newCap; } @@ -1476,37 +1476,34 @@ static SWindowResult *doSetSlidingWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, S return &pWindowResInfo->pResult[pWindowResInfo->curIndex]; } -// get the correct sliding window according to the handled timestamp -static STimeWindow getActiveSlidingWindow(SWindowResInfo *pWindowResInfo, int64_t ts, SQuery *pQuery) { +// get the correct time window according to the handled timestamp +static STimeWindow getActiveTimeWindow(SWindowResInfo *pWindowResInfo, int64_t ts, SQuery *pQuery) { STimeWindow w = {0}; - if (pWindowResInfo->curIndex == -1) { // the first window, from the prevous stored value + if (pWindowResInfo->curIndex == -1) { // the first window, from the previous stored value w.skey = pWindowResInfo->prevSKey; w.ekey = w.skey + pQuery->intervalTime - 1; } else { - int32_t slot = curSlidingWindow(pWindowResInfo); + int32_t slot = curTimeWindow(pWindowResInfo); w = pWindowResInfo->pResult[slot].window; } -// STimeWindow *window = &pWindowResInfo->pResult[slot].window; - - if (w.skey > ts || w.ekey < ts) { -// if (w.skey <= ts && w.ekey >= ts) { -// w = *window; // belongs to current active window -// } else { - int64_t st = w.skey; - - while (st > ts) { - st -= pQuery->slidingTime; - } - while ((st + pQuery->intervalTime - 1) < ts) { - st += pQuery->slidingTime; - } + if (w.skey > ts || w.ekey < ts) { + int64_t st = w.skey; + + if (st > ts) { + st -= ((st - ts + pQuery->slidingTime - 1)/pQuery->slidingTime) * pQuery->slidingTime; + } - w.skey = st; - w.ekey = w.skey + pQuery->intervalTime - 1; + int64_t et = st + pQuery->intervalTime - 1; + if (et < ts) { + st += ((ts - et + pQuery->slidingTime - 1)/pQuery->slidingTime) * pQuery->slidingTime; } + w.skey = st; + w.ekey = w.skey + pQuery->intervalTime - 1; + } + assert(ts >= w.skey && ts <= w.ekey); return w; } @@ -1555,7 +1552,7 @@ static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SWindowRes assert(win->skey < win->ekey); SQueryDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf; - SWindowResult *pWindowRes = doSetSlidingWindowFromKey(pRuntimeEnv, pWindowResInfo, (char *)&win->skey, TSDB_KEYSIZE); + SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, pWindowResInfo, (char *)&win->skey, TSDB_KEYSIZE); if (pWindowRes == NULL) { return -1; } @@ -1571,13 +1568,13 @@ static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SWindowRes // set time window for current result pWindowRes->window = *win; - setGroupOutputBuffer(pRuntimeEnv, pWindowRes); + setWindowResOutputBuf(pRuntimeEnv, pWindowRes); initCtxOutputBuf(pRuntimeEnv); return TSDB_CODE_SUCCESS; } -static SWindowStatus *getSlidingWindowStatus(SWindowResInfo *pWindowResInfo, int32_t slot) { +static SWindowStatus *getTimeWindowResStatus(SWindowResInfo *pWindowResInfo, int32_t slot) { assert(slot >= 0 && slot < pWindowResInfo->size); return &pWindowResInfo->pResult[slot].status; } @@ -1606,11 +1603,11 @@ static void doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey, if (pQuery->slidingTime > 0 && pQuery->intervalTime > 0 && IS_MASTER_SCAN(pRuntimeEnv)) { // query completed if ((lastKey >= pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) || (lastKey <= pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) { - closeAllSlidingWindow(pWindowResInfo); + closeAllTimeWindow(pWindowResInfo); pWindowResInfo->curIndex = pWindowResInfo->size - 1; setQueryStatus(pQuery, QUERY_COMPLETED | QUERY_RESBUF_FULL); - } else { + } else { // set the current index to be the last unclosed window int32_t i = 0; int64_t skey = 0; @@ -1622,7 +1619,7 @@ static void doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey, if ((pResult->window.ekey <= lastKey && QUERY_IS_ASC_QUERY(pQuery)) || (pResult->window.skey >= lastKey && !QUERY_IS_ASC_QUERY(pQuery))) { - closeSlidingWindow(pWindowResInfo, i); + closeTimeWindow(pWindowResInfo, i); } else { skey = pResult->window.skey; break; @@ -1631,24 +1628,109 @@ static void doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey, // all windows are closed, set the last one to be the skey if (skey == 0) { - skey = pWindowResInfo->pResult[pWindowResInfo->size-1].window.skey; + assert(i == pWindowResInfo->size); + pWindowResInfo->curIndex = pWindowResInfo->size - 1; + } else { + pWindowResInfo->curIndex = i; } - - pWindowResInfo->prevSKey = skey; - + + pWindowResInfo->prevSKey = pWindowResInfo->pResult[pWindowResInfo->curIndex].window.skey; + // the number of completed slots are larger than the threshold, dump to client immediately. - int32_t v = numOfClosedSlidingWindow(pWindowResInfo); - if (v > pWindowResInfo->threshold) { + int32_t n = numOfClosedTimeWindow(pWindowResInfo); + if (n > pWindowResInfo->threshold) { setQueryStatus(pQuery, QUERY_RESBUF_FULL); } - dTrace("QInfo:%p total window:%d, closed:%d", GET_QINFO_ADDR(pQuery), pWindowResInfo->size, v); + dTrace("QInfo:%p total window:%d, closed:%d", GET_QINFO_ADDR(pQuery), pWindowResInfo->size, n); } - + assert(pWindowResInfo->prevSKey != 0); } } +static int32_t getNumOfRowsInTimeWindow(SQuery *pQuery, SBlockInfo *pBlockInfo, TSKEY *pPrimaryColumn, int32_t startPos, + TSKEY ekey, __block_search_fn_t searchFn, bool updateLastKey) { + assert(startPos >= 0 && startPos < pBlockInfo->size); + + int32_t forwardStep = -1; + int32_t order = pQuery->order.order; + + int32_t step = GET_FORWARD_DIRECTION_FACTOR(order); + + if (QUERY_IS_ASC_QUERY(pQuery)) { + if (ekey < pBlockInfo->keyLast) { + forwardStep = getForwardStepsInBlock(pBlockInfo->size, searchFn, ekey, startPos, order, pPrimaryColumn); + if (forwardStep == 0) { // no qualified data in current block, do not update the lastKey value + assert(ekey < pPrimaryColumn[startPos]); + } else { + if (updateLastKey) { + pQuery->lastKey = MAX(ekey, pPrimaryColumn[startPos + (forwardStep - 1)]) + step; + } + } + } else { + forwardStep = pBlockInfo->size - startPos; + if (updateLastKey) { + pQuery->lastKey = pBlockInfo->keyLast + step; + } + } + } else { // desc + if (ekey > pBlockInfo->keyFirst) { + forwardStep = getForwardStepsInBlock(pBlockInfo->size, searchFn, ekey, startPos, order, pPrimaryColumn); + if (forwardStep == 0) { // no qualified data in current block, do not update the lastKey value + assert(ekey > pPrimaryColumn[startPos]); + } else { + if (updateLastKey) { + pQuery->lastKey = MIN(ekey, pPrimaryColumn[startPos - (forwardStep - 1)]) + step; + } + } + } else { + forwardStep = startPos + 1; + if (updateLastKey) { + pQuery->lastKey = pBlockInfo->keyFirst + step; + } + } + } + + assert(forwardStep >= 0); + return forwardStep; +} + +static void doBlockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SWindowStatus *pStatus, STimeWindow *pWin, + int32_t startPos, int32_t forwardStep) { + SQuery * pQuery = pRuntimeEnv->pQuery; + SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; + + if (IS_MASTER_SCAN(pRuntimeEnv) || pStatus->closed) { + for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) { + pCtx[k].nStartQueryTimestamp = pWin->skey; + pCtx[k].size = forwardStep; + pCtx[k].startOffset = (QUERY_IS_ASC_QUERY(pQuery)) ? startPos : startPos - (forwardStep - 1); + + int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId; + if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { + aAggs[functionId].xFunction(&pCtx[k]); + } + } + } +} + +static void doRowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SWindowStatus *pStatus, STimeWindow *pWin) { + SQuery * pQuery = pRuntimeEnv->pQuery; + SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; + + if (IS_MASTER_SCAN(pRuntimeEnv) || pStatus->closed) { + for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) { + pCtx[k].nStartQueryTimestamp = pWin->skey; + + int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId; + if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { + aAggs[functionId].xFunction(&pCtx[k]); + } + } + } +} + /** * * @param pRuntimeEnv @@ -1659,11 +1741,12 @@ static void doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey, * @return the incremental number of output value, so it maybe 0 for fixed number of query, * such as count/min/max etc. */ -static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t forwardStep, TSKEY *primaryKeyCol, - SField *pFields, SBlockInfo *pBlockInfo, SWindowResInfo *pWindowResInfo, +static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t forwardStep, SField *pFields, + SBlockInfo *pBlockInfo, SWindowResInfo *pWindowResInfo, __block_search_fn_t searchFn) { SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; SQuery * pQuery = pRuntimeEnv->pQuery; + TSKEY * primaryKeyCol = (TSKEY *)pRuntimeEnv->primaryColBuffer->data; bool isDiskFileBlock = IS_FILE_BLOCK(pRuntimeEnv->blockStatus); int64_t prevNumOfRes = getNumOfResult(pRuntimeEnv); @@ -1702,50 +1785,25 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t int32_t offset = GET_COL_DATA_POS(pQuery, 0, step); TSKEY ts = primaryKeyCol[offset]; - STimeWindow win = getActiveSlidingWindow(pWindowResInfo, ts, pQuery); + STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery); if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pRuntimeEnv->pMeterObj->sid, &win) != TSDB_CODE_SUCCESS) { return 0; } - if (QUERY_IS_ASC_QUERY(pQuery)) { //todo refactor - if (win.ekey < pBlockInfo->keyLast) { - forwardStep = - getForwardStepsInBlock(pBlockInfo->size, searchFn, win.ekey, pQuery->pos, pQuery->order.order, primaryKeyCol); - } else { - forwardStep = pBlockInfo->size - pQuery->pos; - } - } else { - if (win.skey > pBlockInfo->keyFirst) { - forwardStep = getForwardStepsInBlock(pBlockInfo->size, searchFn, win.skey, pQuery->pos, pQuery->order.order, primaryKeyCol); - } else { - forwardStep = pQuery->pos + 1; - } - } - - SWindowStatus* pStatus = getSlidingWindowStatus(pWindowResInfo, curSlidingWindow(pWindowResInfo)); - - if (IS_MASTER_SCAN(pRuntimeEnv) || pStatus->closed) { - for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) { - pCtx[k].nStartQueryTimestamp = win.skey; - pCtx[k].size = forwardStep; - pCtx[k].startOffset = (QUERY_IS_ASC_QUERY(pQuery)) ? pQuery->pos : pQuery->pos - (forwardStep - 1); - - int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId; - if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { - aAggs[functionId].xFunction(&pCtx[k]); - } - } - } + TSKEY ekey = QUERY_IS_ASC_QUERY(pQuery) ? win.ekey : win.skey; + forwardStep = getNumOfRowsInTimeWindow(pQuery, pBlockInfo, primaryKeyCol, pQuery->pos, ekey, searchFn, false); - int32_t index = pWindowResInfo->curIndex; + SWindowStatus *pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo)); + doBlockwiseApplyFunctions(pRuntimeEnv, pStatus, &win, pQuery->pos, forwardStep); + + int32_t index = pWindowResInfo->curIndex; STimeWindow nextWin = win; while (1) { - getNextLogicalQueryRange(pRuntimeEnv, &nextWin); - + getNextTimeWindow(pRuntimeEnv, &nextWin); + if (pWindowResInfo->startTime > nextWin.skey || (nextWin.skey > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) || (nextWin.ekey < pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) { - pWindowResInfo->curIndex = index; break; } @@ -1755,62 +1813,29 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t break; } - // if (pBlockInfo->keyLast >= nextWin.skey && pBlockInfo->keyFirst <= nextWin.ekey) { - int32_t startPos = -1; - if (QUERY_IS_ASC_QUERY(pQuery)) { - startPos = searchFn((char *)primaryKeyCol, pBlockInfo->size, nextWin.skey, TSQL_SO_ASC); - } else { - startPos = searchFn((char *)primaryKeyCol, pBlockInfo->size, nextWin.ekey, TSQL_SO_DESC); - } - + TSKEY startKey = QUERY_IS_ASC_QUERY(pQuery) ? nextWin.skey : nextWin.ekey; + int32_t startPos = searchFn((char *)primaryKeyCol, pBlockInfo->size, startKey, pQuery->order.order); + /* * This time window does not cover any data, try next time window * when the time window is too small, this case may happen */ if ((primaryKeyCol[startPos] > nextWin.ekey && QUERY_IS_ASC_QUERY(pQuery)) || - (primaryKeyCol[startPos] < nextWin.skey && !QUERY_IS_ASC_QUERY(pQuery))) { + (primaryKeyCol[startPos] < nextWin.skey && !QUERY_IS_ASC_QUERY(pQuery))) { continue; } // null data, failed to allocate more memory buffer - if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pRuntimeEnv->pMeterObj->sid, &nextWin) != - TSDB_CODE_SUCCESS) { - pRuntimeEnv->windowResInfo.curIndex = index; + int32_t sid = pRuntimeEnv->pMeterObj->sid; + if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, sid, &nextWin) != TSDB_CODE_SUCCESS) { break; } - if (QUERY_IS_ASC_QUERY(pQuery)) { //todo refactor - if (nextWin.ekey < pBlockInfo->keyLast) { - forwardStep = getForwardStepsInBlock(pBlockInfo->size, searchFn, nextWin.ekey, startPos, pQuery->order.order, primaryKeyCol); - } else { - forwardStep = pBlockInfo->size - startPos; - } - } else { - if (nextWin.skey > pBlockInfo->keyFirst) { - forwardStep = getForwardStepsInBlock(pBlockInfo->size, searchFn, nextWin.skey, startPos, pQuery->order.order, primaryKeyCol); - } else { - forwardStep = startPos + 1; - } - } - - pStatus = getSlidingWindowStatus(pWindowResInfo, curSlidingWindow(pWindowResInfo)); - - if (IS_MASTER_SCAN(pRuntimeEnv) || pStatus->closed) { - for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) { - pCtx[k].nStartQueryTimestamp = nextWin.skey; - pCtx[k].size = forwardStep; - pCtx[k].startOffset = (QUERY_IS_ASC_QUERY(pQuery))? startPos : startPos - (forwardStep - 1); - - int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId; - if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { - aAggs[functionId].xFunction(&pCtx[k]); - } - } - } - // } else { - // pWindowResInfo->curIndex = index; - // break; - // } + ekey = QUERY_IS_ASC_QUERY(pQuery) ? nextWin.ekey : nextWin.skey; + forwardStep = getNumOfRowsInTimeWindow(pQuery, pBlockInfo, primaryKeyCol, startPos, ekey, searchFn, false); + + pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo)); + doBlockwiseApplyFunctions(pRuntimeEnv, pStatus, &nextWin, startPos, forwardStep); } pWindowResInfo->curIndex = index; @@ -1828,9 +1853,6 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t } } - TSKEY lastKey = (QUERY_IS_ASC_QUERY(pQuery)) ? pBlockInfo->keyLast : pBlockInfo->keyFirst; - doCheckQueryCompleted(pRuntimeEnv, lastKey, pWindowResInfo); - /* * No need to calculate the number of output results for group-by normal columns, interval query * because the results of group by normal column is put into intermediate buffer. @@ -1901,19 +1923,24 @@ static bool needToLoadDataBlock(SQuery *pQuery, SField *pField, SQLFunctionCtx * } } - for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { - int32_t functId = pQuery->pSelectExpr[i].pBase.functionId; - if (functId == TSDB_FUNC_TOP || functId == TSDB_FUNC_BOTTOM) { - return top_bot_datablock_filter(&pCtx[i], functId, (char *)&pField[i].min, (char *)&pField[i].max); - } - } + // todo disable this opt code block temporarily + // for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { + // int32_t functId = pQuery->pSelectExpr[i].pBase.functionId; + // if (functId == TSDB_FUNC_TOP || functId == TSDB_FUNC_BOTTOM) { + // return top_bot_datablock_filter(&pCtx[i], functId, (char *)&pField[i].min, (char *)&pField[i].max); + // } + // } return true; } -static int32_t initSlidingWindowInfo(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWindowResInfo, int32_t threshold, - int16_t type) { - pWindowResInfo->capacity = threshold; +int32_t initWindowResInfo(SWindowResInfo *pWindowResInfo, SQueryRuntimeEnv *pRuntimeEnv, int32_t size, + int32_t threshold, int16_t type) { + if (size < threshold) { + size = threshold; + } + + pWindowResInfo->capacity = size; pWindowResInfo->threshold = threshold; pWindowResInfo->type = type; @@ -1934,23 +1961,29 @@ static int32_t initSlidingWindowInfo(SQueryRuntimeEnv *pRuntimeEnv, SWindowResIn return TSDB_CODE_SUCCESS; } -static void destroySlidingWindowInfo(SWindowResInfo *pWindowResInfo) { +void cleanupTimeWindowInfo(SWindowResInfo *pWindowResInfo, SQueryRuntimeEnv *pRuntimeEnv) { if (pWindowResInfo == NULL || pWindowResInfo->capacity == 0) { assert(pWindowResInfo->hashList == NULL && pWindowResInfo->pResult == NULL); return; } + for (int32_t i = 0; i < pWindowResInfo->size; ++i) { + SWindowResult *pResult = &pWindowResInfo->pResult[i]; + destroyTimeWindowRes(pResult, pRuntimeEnv->pQuery->numOfOutputCols); + } + taosCleanUpHashTable(pWindowResInfo->hashList); + tfree(pWindowResInfo->pResult); } -void resetSlidingWindowInfo(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWindowResInfo) { +void resetTimeWindowInfo(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWindowResInfo) { if (pWindowResInfo == NULL || pWindowResInfo->capacity == 0) { return; } for (int32_t i = 0; i < pWindowResInfo->size; ++i) { SWindowResult *pWindowRes = &pWindowResInfo->pResult[i]; - clearGroupResultBuf(pRuntimeEnv, pWindowRes); + clearTimeWindowResBuf(pRuntimeEnv, pWindowRes); } pWindowResInfo->curIndex = -1; @@ -1964,7 +1997,7 @@ void resetSlidingWindowInfo(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWind pWindowResInfo->prevSKey = 0; } -void clearClosedSlidingWindows(SQueryRuntimeEnv *pRuntimeEnv) { +void clearClosedTimeWindow(SQueryRuntimeEnv *pRuntimeEnv) { SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; if (pWindowResInfo == NULL || pWindowResInfo->capacity == 0 || pWindowResInfo->size == 0) { return; @@ -1989,13 +2022,13 @@ void clearClosedSlidingWindows(SQueryRuntimeEnv *pRuntimeEnv) { // clear all the closed windows from the window list for (int32_t k = 0; k < unclosed; ++k) { - copyGroupResultBuf(pRuntimeEnv, &pWindowResInfo->pResult[k], &pWindowResInfo->pResult[i + k]); + copyTimeWindowResBuf(pRuntimeEnv, &pWindowResInfo->pResult[k], &pWindowResInfo->pResult[i + k]); } // move the unclosed window in the front of the window list for (int32_t k = unclosed; k < pWindowResInfo->size; ++k) { SWindowResult *pWindowRes = &pWindowResInfo->pResult[k]; - clearGroupResultBuf(pRuntimeEnv, pWindowRes); + clearTimeWindowResBuf(pRuntimeEnv, pWindowRes); } pWindowResInfo->size = unclosed; @@ -2004,9 +2037,9 @@ void clearClosedSlidingWindows(SQueryRuntimeEnv *pRuntimeEnv) { SWindowResult *pResult = &pWindowResInfo->pResult[k]; int32_t *p = (int32_t *)taosGetDataFromHashTable(pWindowResInfo->hashList, (const char *)&pResult->window.skey, TSDB_KEYSIZE); - int32_t v = (*p - i); - - //todo add the update function for hash table + int32_t v = (*p - i); + + // todo add the update function for hash table taosDeleteFromHashTable(pWindowResInfo->hashList, (const char *)&pResult->window.skey, TSDB_KEYSIZE); taosAddToHashTable(pWindowResInfo->hashList, (const char *)&pResult->window.skey, TSDB_KEYSIZE, (char *)&v, sizeof(int32_t)); @@ -2015,7 +2048,7 @@ void clearClosedSlidingWindows(SQueryRuntimeEnv *pRuntimeEnv) { pWindowResInfo->curIndex = -1; } -int32_t numOfClosedSlidingWindow(SWindowResInfo *pWindowResInfo) { +int32_t numOfClosedTimeWindow(SWindowResInfo *pWindowResInfo) { int32_t i = 0; while (i < pWindowResInfo->size && pWindowResInfo->pResult[i].status.closed) { ++i; @@ -2024,12 +2057,12 @@ int32_t numOfClosedSlidingWindow(SWindowResInfo *pWindowResInfo) { return i; } -void closeSlidingWindow(SWindowResInfo *pWindowResInfo, int32_t slot) { +void closeTimeWindow(SWindowResInfo *pWindowResInfo, int32_t slot) { assert(slot >= 0 && slot < pWindowResInfo->size); pWindowResInfo->pResult[slot].status.closed = true; } -void closeAllSlidingWindow(SWindowResInfo *pWindowResInfo) { +void closeAllTimeWindow(SWindowResInfo *pWindowResInfo) { assert(pWindowResInfo->size >= 0 && pWindowResInfo->capacity >= pWindowResInfo->size); for (int32_t i = 0; i < pWindowResInfo->size; ++i) { @@ -2042,12 +2075,12 @@ static int32_t setGroupResultFromKey(SQueryRuntimeEnv *pRuntimeEnv, char *pData, return -1; } - SWindowResult *pWindowRes = doSetSlidingWindowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, pData, bytes); + SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, pData, bytes); if (pWindowRes == NULL) { return -1; } - setGroupOutputBuffer(pRuntimeEnv, pWindowRes); + setWindowResOutputBuf(pRuntimeEnv, pWindowRes); initCtxOutputBuf(pRuntimeEnv); return TSDB_CODE_SUCCESS; @@ -2139,10 +2172,11 @@ static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx return true; } -static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t *forwardStep, TSKEY *primaryKeyCol, - SField *pFields, SBlockInfo *pBlockInfo, SWindowResInfo *pWindowResInfo) { +static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t *forwardStep, SField *pFields, + SBlockInfo *pBlockInfo, SWindowResInfo *pWindowResInfo) { SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; SQuery * pQuery = pRuntimeEnv->pQuery; + TSKEY * primaryKeyCol = (TSKEY *)pRuntimeEnv->primaryColBuffer->data; bool isDiskFileBlock = IS_FILE_BLOCK(pRuntimeEnv->blockStatus); SData **data = pRuntimeEnv->colDataBuffer; @@ -2221,8 +2255,8 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t * if (pQuery->slidingTime > 0 && pQuery->intervalTime > 0) { // decide the time window according to the primary timestamp int64_t ts = primaryKeyCol[offset]; - STimeWindow win = getActiveSlidingWindow(pWindowResInfo, ts, pQuery); - + STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery); + int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pRuntimeEnv->pMeterObj->sid, &win); if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code continue; @@ -2230,26 +2264,16 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t * // all startOffset are identical offset -= pCtx[0].startOffset; - - SWindowStatus *pStatus = getSlidingWindowStatus(pWindowResInfo, curSlidingWindow(pWindowResInfo)); - if (IS_MASTER_SCAN(pRuntimeEnv) || pStatus->closed) { - - for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) { - int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId; - pCtx[k].nStartQueryTimestamp = win.skey; - - if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { - aAggs[functionId].xFunctionF(&pCtx[k], offset); - } - } - } - + + SWindowStatus *pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo)); + doRowwiseApplyFunctions(pRuntimeEnv, pStatus, &win); + lastKey = ts; - int32_t prev = pWindowResInfo->curIndex; + int32_t prev = pWindowResInfo->curIndex; STimeWindow nextWin = win; - + while (1) { - getNextLogicalQueryRange(pRuntimeEnv, &nextWin); + getNextTimeWindow(pRuntimeEnv, &nextWin); if (pWindowResInfo->startTime > nextWin.skey || (nextWin.skey > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) || (nextWin.skey > pQuery->skey && !QUERY_IS_ASC_QUERY(pQuery))) { pWindowResInfo->curIndex = prev; @@ -2263,18 +2287,9 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t * pWindowResInfo->curIndex = prev; break; } - - pStatus = getSlidingWindowStatus(pWindowResInfo, curSlidingWindow(pWindowResInfo)); - if (IS_MASTER_SCAN(pRuntimeEnv) || pStatus->closed) { - for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) { - int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId; - pCtx[k].nStartQueryTimestamp = nextWin.skey; - - if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { - aAggs[functionId].xFunctionF(&pCtx[k], offset); - } - } - } + + pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo)); + doRowwiseApplyFunctions(pRuntimeEnv, pStatus, &nextWin); } else { pWindowResInfo->curIndex = prev; break; @@ -2383,40 +2398,13 @@ static void validateQueryRangeAndData(SQueryRuntimeEnv *pRuntimeEnv, const TSKEY static int32_t applyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo *pBlockInfo, int64_t *pPrimaryColumn, SField *pFields, __block_search_fn_t searchFn, int32_t *numOfRes, SWindowResInfo *pWindowResInfo) { - int32_t forwardStep = 0; SQuery *pQuery = pRuntimeEnv->pQuery; + validateQueryRangeAndData(pRuntimeEnv, pPrimaryColumn, pBlockInfo); int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); - validateQueryRangeAndData(pRuntimeEnv, pPrimaryColumn, pBlockInfo); + int32_t forwardStep = + getNumOfRowsInTimeWindow(pQuery, pBlockInfo, pPrimaryColumn, pQuery->pos, pQuery->ekey, searchFn, true); - if (QUERY_IS_ASC_QUERY(pQuery)) { - if (pQuery->ekey < pBlockInfo->keyLast) { - forwardStep = getForwardStepsInBlock(pBlockInfo->size, searchFn, pQuery->ekey, pQuery->pos, pQuery->order.order, - pPrimaryColumn); - if (forwardStep == 0) { // no qualified data in current block, do not update the lastKey value - assert(pQuery->ekey < pPrimaryColumn[pQuery->pos]); - } else { - pQuery->lastKey = MAX(pQuery->ekey, pPrimaryColumn[pQuery->pos + (forwardStep - 1)]) + step; - } - } else { - forwardStep = pBlockInfo->size - pQuery->pos; - pQuery->lastKey = pBlockInfo->keyLast + step; - } - } else { // desc - if (pQuery->ekey > pBlockInfo->keyFirst) { - forwardStep = getForwardStepsInBlock(pBlockInfo->size, searchFn, pQuery->ekey, pQuery->pos, pQuery->order.order, - pPrimaryColumn); - if (forwardStep == 0) { // no qualified data in current block, do not update the lastKey value - assert(pQuery->ekey > pPrimaryColumn[pQuery->pos]); - } else { - pQuery->lastKey = MIN(pQuery->ekey, pPrimaryColumn[pQuery->pos - (forwardStep - 1)]) + step; - } - } else { - forwardStep = pQuery->pos + 1; - pQuery->lastKey = pBlockInfo->keyFirst + step; - } - } - assert(forwardStep >= 0); int32_t newForwardStep = reviseForwardSteps(pRuntimeEnv, forwardStep); @@ -2427,18 +2415,15 @@ static int32_t applyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo * pQuery->lastKey = pPrimaryColumn[pQuery->pos + (newForwardStep - 1) * step] + step; } - if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL || isGroupbyNormalCol(pQuery->pGroupbyExpr) /*|| - (pQuery->slidingTime != -1 && pQuery->intervalTime > 0)*/) { - *numOfRes = - rowwiseApplyAllFunctions(pRuntimeEnv, &newForwardStep, pPrimaryColumn, pFields, pBlockInfo, pWindowResInfo); + if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL || isGroupbyNormalCol(pQuery->pGroupbyExpr)) { + *numOfRes = rowwiseApplyAllFunctions(pRuntimeEnv, &newForwardStep, pFields, pBlockInfo, pWindowResInfo); } else { - *numOfRes = blockwiseApplyAllFunctions(pRuntimeEnv, newForwardStep, pPrimaryColumn, pFields, pBlockInfo, - pWindowResInfo, searchFn); + *numOfRes = blockwiseApplyAllFunctions(pRuntimeEnv, newForwardStep, pFields, pBlockInfo, pWindowResInfo, searchFn); } - + TSKEY lastKey = (QUERY_IS_ASC_QUERY(pQuery)) ? pBlockInfo->keyLast : pBlockInfo->keyFirst; doCheckQueryCompleted(pRuntimeEnv, lastKey, pWindowResInfo); - + assert(*numOfRes >= 0); // check if buffer is large enough for accommodating all qualified points @@ -2761,7 +2746,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { } tfree(pRuntimeEnv->secondaryUnzipBuffer); - destroySlidingWindowInfo(&pRuntimeEnv->windowResInfo); + cleanupTimeWindowInfo(&pRuntimeEnv->windowResInfo, pRuntimeEnv); if (pRuntimeEnv->pCtx != NULL) { for (int32_t i = 0; i < pRuntimeEnv->pQuery->numOfOutputCols; ++i) { @@ -4360,44 +4345,6 @@ static int32_t initialNumOfRows(SMeterQuerySupportObj *pSupporter) { return numOfRows; } -static int32_t createQueryResultBuffer(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOfRows, bool isSTableQuery) { - SQuery *pQuery = pRuntimeEnv->pQuery; - - createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, numOfRows, pQuery->rowSize); - - // int32_t pageId = -1; - // tFilePage* page = NULL; - // - // pRuntimeEnv->windowResInfo.pResult = calloc(numOfRows, sizeof(SWindowResult)); - // - // for (int32_t k = 0; k < numOfRows; ++k) { - // SWindowResult *pWindowRes = &pRuntimeEnv->windowResInfo.pResult[k]; - // pWindowRes->nAlloc = 1; - // - // /* - // * for single table top/bottom query, the output for group by normal column, the output rows is - // * equals to the maximum rows, instead of 1. - // */ - // if (!isSTableQuery && isTopBottomQuery(pQuery)) { - // assert(pQuery->numOfOutputCols > 1); - // - // SSqlFunctionExpr *pExpr = &pQuery->pSelectExpr[1]; - // pWindowRes->nAlloc = pExpr->pBase.arg[0].argValue.i64; - // } - // - // if (page == NULL || page->numOfElems >= pRuntimeEnv->numOfRowsPerPage) { - // page = getNewDataBuf(pRuntimeEnv->pResultBuf, 0, &pageId); - // } - // - // assert(pageId >= 0); - // - // SPosInfo posInfo = {.pageId = pageId, .rowId = page->numOfElems}; - // createQueryResultInfo(pQuery, pWindowRes, isSTableQuery, &posInfo); - // page->numOfElems += 1; // next row is available - // } - - return TSDB_CODE_SUCCESS; -} static int32_t allocateRuntimeEnvBuf(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *pMeterObj) { SQuery *pQuery = pRuntimeEnv->pQuery; @@ -4481,7 +4428,7 @@ static char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int32_t columnInd pQuery->pSelectExpr[columnIndex].resBytes * realRowId; } -int32_t vnodeQuerySingleMeterPrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, SMeterQuerySupportObj *pSupporter, +int32_t vnodeQuerySingleTablePrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, SMeterQuerySupportObj *pSupporter, void *param) { SQuery *pQuery = &pQInfo->query; int32_t code = TSDB_CODE_SUCCESS; @@ -4554,7 +4501,7 @@ int32_t vnodeQuerySingleMeterPrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, SMete pRuntimeEnv->numOfRowsPerPage = getNumOfRowsInResultPage(pQuery, false); if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (pQuery->intervalTime > 0 && pQuery->slidingTime > 0)) { int32_t rows = initialNumOfRows(pSupporter); - + code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, rows, pQuery->rowSize); if (code != TSDB_CODE_SUCCESS) { return code; @@ -4567,7 +4514,7 @@ int32_t vnodeQuerySingleMeterPrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, SMete type = TSDB_DATA_TYPE_TIMESTAMP; } - initSlidingWindowInfo(pRuntimeEnv, &pRuntimeEnv->windowResInfo, rows, type); + initWindowResInfo(&pRuntimeEnv->windowResInfo, pRuntimeEnv, rows, 4096, type); } pSupporter->rawSKey = pQuery->skey; @@ -4597,6 +4544,13 @@ int32_t vnodeQuerySingleMeterPrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, SMete if (pQuery->slidingTime > 0 && pQuery->intervalTime > 0) { STimeWindow win = {0}; getActualRange(pSupporter, &win); + + // there is no qualified data with respect to the primary timestamp + if (win.skey > win.ekey) { + sem_post(&pQInfo->dataReady); + pQInfo->over = 1; + return TSDB_CODE_SUCCESS; + } TSKEY skey1, ekey1; TSKEY windowSKey = 0, windowEKey = 0; @@ -4680,7 +4634,7 @@ void vnodeQueryFreeQInfoEx(SQInfo *pQInfo) { } for (int32_t i = 0; i < size; ++i) { - // destroyGroupResultBuf(&pSupporter->pResult[i], pQInfo->query.numOfOutputCols); + // destroyTimeWindowRes(&pSupporter->pResult[i], pQInfo->query.numOfOutputCols); } } @@ -4770,7 +4724,7 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // group by columns not tags; int16_t type = getGroupbyColumnType(pQuery, pQuery->pGroupbyExpr); - initSlidingWindowInfo(pRuntimeEnv, &pRuntimeEnv->windowResInfo, 128, type); + initWindowResInfo(&pRuntimeEnv->windowResInfo, pRuntimeEnv, 128, 4096, type); } } else { // one page for each table at least @@ -5133,7 +5087,7 @@ static void doHandleDataBlockImpl(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo *pbl } } -static void getNextLogicalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow *pTimeWindow) { +static void getNextTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow *pTimeWindow) { SQuery *pQuery = pRuntimeEnv->pQuery; int32_t factor = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); @@ -5194,33 +5148,15 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { * 1. interval query. * 2. multi-output query that may cause buffer overflow. */ -// if (pQuery->intervalTime > 0 || Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)) { - if (nextPos >= blockInfo.size || nextPos < 0) { - moveToNextBlock(pRuntimeEnv, step, searchFn, !LOAD_DATA); - -// if (!Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK|QUERY_COMPLETED)) { - // slot/pos/fileId is updated in moveToNextBlock function - savePointPosition(&pRuntimeEnv->nextPos, pQuery->fileId, pQuery->slot, pQuery->pos); - -// // check next block -// void *pNextBlock = getGenericDataBlock(pMeterObj, pRuntimeEnv, pQuery->slot); -// -// int32_t blockType = (IS_DISK_DATA_BLOCK(pQuery)) ? BLK_FILE_BLOCK : BLK_CACHE_BLOCK; -// blockInfo = getBlockBasicInfo(pRuntimeEnv, pNextBlock, blockType); -// -// // check if need to close window result or not -// if (pQuery->intervalTime > 0 && pQuery->slidingTime > 0) { -// TSKEY t = (QUERY_IS_ASC_QUERY(pQuery))? blockInfo.keyFirst:blockInfo.keyLast; -// doCheckQueryCompleted(pRuntimeEnv, t, &pRuntimeEnv->windowResInfo); -// } -// } - - } else { - savePointPosition(&pRuntimeEnv->nextPos, pQuery->fileId, pQuery->slot, accessPos + step); - } -// } else { -// assert(0); -// } + // if (pQuery->intervalTime > 0 || Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)) { + if (nextPos >= blockInfo.size || nextPos < 0) { + moveToNextBlock(pRuntimeEnv, step, searchFn, !LOAD_DATA); + // slot/pos/fileId is updated in moveToNextBlock function + savePointPosition(&pRuntimeEnv->nextPos, pQuery->fileId, pQuery->slot, pQuery->pos); + } else { + savePointPosition(&pRuntimeEnv->nextPos, pQuery->fileId, pQuery->slot, accessPos + step); + } + break; } else { // query not completed, move to next block blockLoadStatus = moveToNextBlock(pRuntimeEnv, step, searchFn, LOAD_DATA); @@ -5235,36 +5171,30 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { int32_t blockType = (IS_DISK_DATA_BLOCK(pQuery)) ? BLK_FILE_BLOCK : BLK_CACHE_BLOCK; blockInfo = getBlockBasicInfo(pRuntimeEnv, pNextBlock, blockType); - + if ((QUERY_IS_ASC_QUERY(pQuery) && blockInfo.keyFirst > pQuery->ekey) || (!QUERY_IS_ASC_QUERY(pQuery) && blockInfo.keyLast < pQuery->ekey)) { setQueryStatus(pQuery, QUERY_COMPLETED); break; } - -// // check if need to close window result or not -// if (pQuery->intervalTime > 0 && pQuery->slidingTime > 0) { -// TSKEY t = (QUERY_IS_ASC_QUERY(pQuery))? blockInfo.keyFirst:blockInfo.keyLast; -// doCheckQueryCompleted(pRuntimeEnv, t, &pRuntimeEnv->windowResInfo); -// } - - if(Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)) { + + if (Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)) { break; } } // while(1) - + if (pQuery->intervalTime > 0) { - if (Q_STATUS_EQUAL(pQuery->over, QUERY_COMPLETED|QUERY_NO_DATA_TO_CHECK)) { - closeAllSlidingWindow(&pRuntimeEnv->windowResInfo); - } else if (Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)) { // check if window needs to be closed + if (Q_STATUS_EQUAL(pQuery->over, QUERY_COMPLETED | QUERY_NO_DATA_TO_CHECK)) { + closeAllTimeWindow(&pRuntimeEnv->windowResInfo); + } else if (Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)) { // check if window needs to be closed void *pNextBlock = getGenericDataBlock(pMeterObj, pRuntimeEnv, pQuery->slot); - int32_t blockType = (IS_DISK_DATA_BLOCK(pQuery)) ? BLK_FILE_BLOCK : BLK_CACHE_BLOCK; + int32_t blockType = (IS_DISK_DATA_BLOCK(pQuery)) ? BLK_FILE_BLOCK : BLK_CACHE_BLOCK; SBlockInfo blockInfo = getBlockBasicInfo(pRuntimeEnv, pNextBlock, blockType); // check if need to close window result or not - TSKEY t = (QUERY_IS_ASC_QUERY(pQuery))? blockInfo.keyFirst:blockInfo.keyLast; + TSKEY t = (QUERY_IS_ASC_QUERY(pQuery)) ? blockInfo.keyFirst : blockInfo.keyLast; doCheckQueryCompleted(pRuntimeEnv, t, &pRuntimeEnv->windowResInfo); } } @@ -5274,9 +5204,8 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { static void updatelastkey(SQuery *pQuery, SMeterQueryInfo *pMeterQInfo) { pMeterQInfo->lastKey = pQuery->lastKey; } -void queryOnBlock(SMeterQuerySupportObj *pSupporter, int64_t *primaryKeys, int32_t blockStatus, - SBlockInfo *pBlockBasicInfo, SMeterDataInfo *pMeterDataInfo, SField *pFields, - __block_search_fn_t searchFn) { +void queryOnBlock(SMeterQuerySupportObj *pSupporter, int32_t blockStatus, SBlockInfo *pBlockBasicInfo, + SMeterDataInfo *pMeterDataInfo, SField *pFields, __block_search_fn_t searchFn) { /* cache blocks may be assign to other meter, abort */ if (pBlockBasicInfo->size <= 0) { return; @@ -5284,15 +5213,19 @@ void queryOnBlock(SMeterQuerySupportObj *pSupporter, int64_t *primaryKeys, int32 SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv; SQuery * pQuery = pRuntimeEnv->pQuery; + + TSKEY* primaryKeys = (TSKEY*) pRuntimeEnv->primaryColBuffer->data; if (pQuery->intervalTime == 0) { // not interval query + assert(0); + int32_t numOfRes = 0; applyFunctionsOnBlock(pRuntimeEnv, pBlockBasicInfo, primaryKeys, pFields, searchFn, &numOfRes, - &pMeterDataInfo->pMeterQInfo->windowResInfo); + &pMeterDataInfo->pMeterQInfo->windowResInfo);///????bug // note: only fixed number of output for each group by operation - if (numOfRes > 0) { - pRuntimeEnv->windowResInfo.pResult[pMeterDataInfo->groupIdx].numOfRows = numOfRes; + if (numOfRes > 0) {//??? + pRuntimeEnv->windowResInfo.pResult[pMeterDataInfo->groupIdx].numOfRows = numOfRes;////????bug } // used to decide the correct start position in cache after check all data in files @@ -5879,7 +5812,7 @@ void disableFunctForSuppleScan(SQueryRuntimeEnv *pRuntimeEnv, int32_t order) { SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; for (int32_t i = 0; i < pWindowResInfo->size; ++i) { - SWindowStatus *pStatus = getSlidingWindowStatus(pWindowResInfo, i); + SWindowStatus *pStatus = getTimeWindowResStatus(pWindowResInfo, i); if (!pStatus->closed) { continue; } @@ -5941,7 +5874,7 @@ void createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool isSTa } } -void clearGroupResultBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pWindowRes) { +void clearTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pWindowRes) { if (pWindowRes == NULL) { return; } @@ -5955,12 +5888,12 @@ void clearGroupResultBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pWindowRe resetResultInfo(pResultInfo); } - + pWindowRes->numOfRows = 0; pWindowRes->nAlloc = 0; pWindowRes->pos = (SPosInfo){-1, -1}; pWindowRes->status.closed = false; - pWindowRes->window = (STimeWindow) {0, 0}; + pWindowRes->window = (STimeWindow){0, 0}; } /** @@ -5968,12 +5901,12 @@ void clearGroupResultBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pWindowRe * since the attribute of "Pos" is bound to each window result when the window result is created in the * disk-based result buffer. */ -void copyGroupResultBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *dst, const SWindowResult *src) { +void copyTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *dst, const SWindowResult *src) { dst->numOfRows = src->numOfRows; dst->nAlloc = src->nAlloc; dst->window = src->window; dst->status = src->status; - + int32_t nOutputCols = pRuntimeEnv->pQuery->numOfOutputCols; for (int32_t i = 0; i < nOutputCols; ++i) { @@ -5996,7 +5929,7 @@ void copyGroupResultBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *dst, const } } -void destroyGroupResultBuf(SWindowResult *pWindowRes, int32_t nOutputCols) { +void destroyTimeWindowRes(SWindowResult *pWindowRes, int32_t nOutputCols) { if (pWindowRes == NULL) { return; } @@ -6237,7 +6170,7 @@ void vnodeScanAllData(SQueryRuntimeEnv *pRuntimeEnv) { continue; } - setGroupOutputBuffer(pRuntimeEnv, pResult); + setWindowResOutputBuf(pRuntimeEnv, pResult); for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) { aAggs[pQuery->pSelectExpr[j].pBase.functionId].xNextStep(&pRuntimeEnv->pCtx[j]); @@ -6279,7 +6212,7 @@ void vnodeScanAllData(SQueryRuntimeEnv *pRuntimeEnv) { doSingleMeterSupplementScan(pRuntimeEnv); -// update the pQuery->skey/pQuery->ekey to limit the scan scope of sliding query during supplementary scan + // update the pQuery->skey/pQuery->ekey to limit the scan scope of sliding query during supplementary scan pQuery->skey = newSkey; } @@ -6290,16 +6223,16 @@ void doFinalizeResult(SQueryRuntimeEnv *pRuntimeEnv) { // for each group result, call the finalize function for each column SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { - closeAllSlidingWindow(pWindowResInfo); + closeAllTimeWindow(pWindowResInfo); } for (int32_t i = 0; i < pWindowResInfo->size; ++i) { SWindowResult *buf = &pWindowResInfo->pResult[i]; - if (!slidingWindowClosed(pWindowResInfo, i)) { + if (!isWindowResClosed(pWindowResInfo, i)) { continue; } - setGroupOutputBuffer(pRuntimeEnv, buf); + setWindowResOutputBuf(pRuntimeEnv, buf); for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) { aAggs[pQuery->pSelectExpr[j].pBase.functionId].xFinalize(&pRuntimeEnv->pCtx[j]); @@ -6414,30 +6347,30 @@ void forwardIntervalQueryRange(SMeterQuerySupportObj *pSupporter, SQueryRuntimeE return; } -// int32_t r = getNextIntervalQueryRange(pSupporter, pRuntimeEnv, &pQuery->skey, &pQuery->ekey); -// if (r == QUERY_COMPLETED) { -// setQueryStatus(pQuery, QUERY_COMPLETED); -// return; -// } -// -// getNextLogicalQueryRange(pRuntimeEnv, &pRuntimeEnv->intervalWindow); -// -// /* ensure the search in cache will return right position */ -// pQuery->lastKey = pQuery->skey; -// -// TSKEY nextTimestamp = loadRequiredBlockIntoMem(pRuntimeEnv, &pRuntimeEnv->nextPos); -// if ((nextTimestamp > pSupporter->rawEKey && QUERY_IS_ASC_QUERY(pQuery)) || -// (nextTimestamp < pSupporter->rawEKey && !QUERY_IS_ASC_QUERY(pQuery)) || -// Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK)) { -// setQueryStatus(pQuery, QUERY_COMPLETED); -// return; -// } -// -// // bridge the gap in group by time function -// if ((nextTimestamp > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) || -// (nextTimestamp < pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) { -// getAlignedIntervalQueryRange(pRuntimeEnv, nextTimestamp, pSupporter->rawSKey, pSupporter->rawEKey); -// } + // int32_t r = getNextIntervalQueryRange(pSupporter, pRuntimeEnv, &pQuery->skey, &pQuery->ekey); + // if (r == QUERY_COMPLETED) { + // setQueryStatus(pQuery, QUERY_COMPLETED); + // return; + // } + // + // getNextTimeWindow(pRuntimeEnv, &pRuntimeEnv->intervalWindow); + // + // /* ensure the search in cache will return right position */ + // pQuery->lastKey = pQuery->skey; + // + // TSKEY nextTimestamp = loadRequiredBlockIntoMem(pRuntimeEnv, &pRuntimeEnv->nextPos); + // if ((nextTimestamp > pSupporter->rawEKey && QUERY_IS_ASC_QUERY(pQuery)) || + // (nextTimestamp < pSupporter->rawEKey && !QUERY_IS_ASC_QUERY(pQuery)) || + // Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK)) { + // setQueryStatus(pQuery, QUERY_COMPLETED); + // return; + // } + // + // // bridge the gap in group by time function + // if ((nextTimestamp > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) || + // (nextTimestamp < pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) { + // getAlignedIntervalQueryRange(pRuntimeEnv, nextTimestamp, pSupporter->rawSKey, pSupporter->rawEKey); + // } } static int32_t offsetComparator(const void *pLeft, const void *pRight) { @@ -6594,7 +6527,7 @@ SMeterQueryInfo *createMeterQueryInfo(SMeterQuerySupportObj *pSupporter, int32_t pMeterQueryInfo->sid = sid; pMeterQueryInfo->cur.vnodeIndex = -1; - initSlidingWindowInfo(pRuntimeEnv, &pMeterQueryInfo->windowResInfo, 100, TSDB_DATA_TYPE_INT); + initWindowResInfo(&pMeterQueryInfo->windowResInfo, pRuntimeEnv, 100, 100, TSDB_DATA_TYPE_INT); return pMeterQueryInfo; } @@ -6618,11 +6551,13 @@ void changeMeterQueryInfoForSuppleQuery(SQueryDiskbasedResultBuf *pResultBuf, SM return; } - pMeterQueryInfo->skey = skey; - pMeterQueryInfo->ekey = ekey; +// pMeterQueryInfo->skey = skey; +// pMeterQueryInfo->ekey = ekey; + SWAP(pMeterQueryInfo->skey, pMeterQueryInfo->ekey, TSKEY); + pMeterQueryInfo->lastKey = pMeterQueryInfo->skey; - pMeterQueryInfo->queryRangeSet = 0; +// pMeterQueryInfo->queryRangeSet = 0; pMeterQueryInfo->cur.order = pMeterQueryInfo->cur.order ^ 1; pMeterQueryInfo->cur.vnodeIndex = -1; @@ -7029,7 +6964,7 @@ void setExecutionContext(SMeterQuerySupportObj *pSupporter, SWindowResult *outpu int32_t groupIdx, SMeterQueryInfo *pMeterQueryInfo) { SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv; - setGroupOutputBuffer(pRuntimeEnv, &outputRes[groupIdx]); + setWindowResOutputBuf(pRuntimeEnv, &outputRes[groupIdx]); initCtxOutputBuf(pRuntimeEnv); vnodeSetTagValueInParam(pSupporter->pSidSet, pRuntimeEnv, pSupporter->pMeterSidExtInfo[meterIdx]); @@ -7046,7 +6981,7 @@ void setExecutionContext(SMeterQuerySupportObj *pSupporter, SWindowResult *outpu } } -static void setGroupOutputBuffer(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pResult) { +static void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pResult) { SQuery *pQuery = pRuntimeEnv->pQuery; // Note: pResult->pos[i]->numOfElems == 0, there is only fixed number of results for each group @@ -7067,9 +7002,9 @@ static void setGroupOutputBuffer(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *p // set super table query flag SResultInfo *pResInfo = GET_RES_INFO(pCtx); - if (!isGroupbyNormalCol(pQuery->pGroupbyExpr)) { - pResInfo->superTableQ = true; - } + // if (!isGroupbyNormalCol(pQuery->pGroupbyExpr)) { + pResInfo->superTableQ = pRuntimeEnv->stableQuery; + // } } } @@ -7160,9 +7095,9 @@ int32_t setOutputBufferForIntervalQuery(SQueryRuntimeEnv *pRuntimeEnv, SMeterQue SQueryDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf; SWindowResInfo * pWindowResInfo = &pMeterQueryInfo->windowResInfo; - STimeWindow win = getActiveSlidingWindow(pWindowResInfo, pMeterQueryInfo->lastKey, pRuntimeEnv->pQuery); + STimeWindow win = getActiveTimeWindow(pWindowResInfo, pMeterQueryInfo->lastKey, pRuntimeEnv->pQuery); - SWindowResult *pWindowRes = doSetSlidingWindowFromKey(pRuntimeEnv, pWindowResInfo, (char *)&win.skey, TSDB_KEYSIZE); + SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, pWindowResInfo, (char *)&win.skey, TSDB_KEYSIZE); if (pWindowRes == NULL) { return -1; } @@ -7190,8 +7125,8 @@ int32_t setIntervalQueryExecutionContext(SMeterQuerySupportObj *pSupporter, int3 SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv; if (IS_MASTER_SCAN(pRuntimeEnv)) { + // not enough disk space or memory buffer for intermediate results if (setOutputBufferForIntervalQuery(pRuntimeEnv, pMeterQueryInfo) != TSDB_CODE_SUCCESS) { - // not enough disk space or memory buffer for intermediate results return -1; } @@ -7234,172 +7169,6 @@ int32_t setIntervalQueryExecutionContext(SMeterQuerySupportObj *pSupporter, int3 return 0; } -// static void doApplyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQueryInfo, -// SBlockInfo *pBlockInfo, int64_t *pPrimaryCol, SField *pFields, -// __block_search_fn_t searchFn) { -// SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv; -// SQuery * pQuery = pRuntimeEnv->pQuery; -// int32_t factor = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); -// -// int64_t nextKey = -1; -// bool queryCompleted = false; -// -// while (1) { -// int32_t numOfRes = 0; -// int32_t steps = applyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, pPrimaryCol, pFields, searchFn, &numOfRes); -// assert(steps > 0); -// -// // NOTE: in case of stable query, only ONE(or ZERO) row of pos generated for each query range -// if (pMeterQueryInfo->lastResRows == 0) { -// pMeterQueryInfo->lastResRows = numOfRes; -// } else { -// assert(pMeterQueryInfo->lastResRows == 1); -// } -// -// int32_t pos = pQuery->pos + steps * factor; -// -// // query does not reach the end of current block -// if ((pos < pBlockInfo->size && QUERY_IS_ASC_QUERY(pQuery)) || (pos >= 0 && !QUERY_IS_ASC_QUERY(pQuery))) { -// nextKey = pPrimaryCol[pos]; -// } else { -// assert((pQuery->lastKey > pBlockInfo->keyLast && QUERY_IS_ASC_QUERY(pQuery)) || -// (pQuery->lastKey < pBlockInfo->keyFirst && !QUERY_IS_ASC_QUERY(pQuery))); -// } -// -// // all data satisfy current query are checked, query completed -// if (QUERY_IS_ASC_QUERY(pQuery)) { -// queryCompleted = (nextKey > pQuery->ekey || pQuery->ekey <= pBlockInfo->keyLast); -// } else { -// queryCompleted = (nextKey < pQuery->ekey || pQuery->ekey >= pBlockInfo->keyFirst); -// } -// -// /* -// * 1. there may be more date that satisfy current query interval, other than -// * current block, we need to try next data blocks -// * 2. query completed, since reaches the upper bound of the main query range -// */ -// if (QUERY_IS_ASC_QUERY(pQuery)) { -// if (pQuery->lastKey > pBlockInfo->keyLast || pQuery->lastKey > pSupporter->rawEKey || -// nextKey > pSupporter->rawEKey) { -// /* -// * current interval query is completed, set query pos flag closed and -// * try next data block if pQuery->ekey == pSupporter->rawEKey, whole query is completed -// */ -// if (pQuery->lastKey > pBlockInfo->keyLast) { -// assert(pQuery->ekey >= pBlockInfo->keyLast); -// } -// -// if (pQuery->lastKey > pSupporter->rawEKey || nextKey > pSupporter->rawEKey) { -// /* whole query completed, save pos and abort */ -// assert(queryCompleted); -// saveResult(pSupporter, pMeterQueryInfo, pMeterQueryInfo->lastResRows); -// -// // save the pQuery->lastKey for retrieve data in cache, actually, there will be no qualified data in cache. -// saveIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo); -// } else if (pQuery->ekey == pBlockInfo->keyLast) { -// /* current interval query is completed, set the next query range on other data blocks if exist */ -// int64_t prevEKey = pQuery->ekey; -// -// getAlignedIntervalQueryRange(pRuntimeEnv, pQuery->lastKey, pSupporter->rawSKey, pSupporter->rawEKey); -// saveIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo); -// -// assert(queryCompleted && prevEKey < pQuery->skey); -// if (pMeterQueryInfo->lastResRows > 0) { -// saveResult(pSupporter, pMeterQueryInfo, pMeterQueryInfo->lastResRows); -// } -// } else { -// /* -// * Data that satisfy current query range may locate in current block and blocks that are directly right -// * next to current block. Therefore, we need to keep the query range(interval) unchanged until reaching -// * the direct next data block, while only forwards the pQuery->lastKey. -// * -// * With the information of the directly next data block, whether locates in cache or disk, -// * current interval query being completed or not can be decided. -// */ -// saveIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo); -// assert(pQuery->lastKey > pBlockInfo->keyLast && pQuery->lastKey <= pQuery->ekey); -// -// /* -// * if current block is the last block of current file, we still close the pos flag, and -// * merge with other meters in the same group -// */ -// if (queryCompleted) { -// saveResult(pSupporter, pMeterQueryInfo, pMeterQueryInfo->lastResRows); -// } -// } -// -// break; -// } -// } else { -// if (pQuery->lastKey < pBlockInfo->keyFirst || pQuery->lastKey < pSupporter->rawEKey || -// nextKey < pSupporter->rawEKey) { -// if (pQuery->lastKey < pBlockInfo->keyFirst) { -// assert(pQuery->ekey <= pBlockInfo->keyFirst); -// } -// -// if (pQuery->lastKey < pSupporter->rawEKey || (nextKey < pSupporter->rawEKey && nextKey != -1)) { -// /* whole query completed, save pos and abort */ -// assert(queryCompleted); -// saveResult(pSupporter, pMeterQueryInfo, pMeterQueryInfo->lastResRows); -// -// /* -// * save the pQuery->lastKey for retrieve data in cache, actually, -// * there will be no qualified data in cache. -// */ -// saveIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo); -// } else if (pQuery->ekey == pBlockInfo->keyFirst) { -// // current interval query is completed, set the next query range on other data blocks if exist -// int64_t prevEKey = pQuery->ekey; -// -// getAlignedIntervalQueryRange(pRuntimeEnv, pQuery->lastKey, pSupporter->rawSKey, pSupporter->rawEKey); -// saveIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo); -// -// assert(queryCompleted && prevEKey > pQuery->skey); -// if (pMeterQueryInfo->lastResRows > 0) { -// saveResult(pSupporter, pMeterQueryInfo, pMeterQueryInfo->lastResRows); -// } -// } else { -// /* -// * Data that satisfy current query range may locate in current block and blocks that are -// * directly right next to current block. Therefore, we need to keep the query range(interval) -// * unchanged until reaching the direct next data block, while only forwards the pQuery->lastKey. -// * -// * With the information of the directly next data block, whether locates in cache or disk, -// * current interval query being completed or not can be decided. -// */ -// saveIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo); -// assert(pQuery->lastKey < pBlockInfo->keyFirst && pQuery->lastKey >= pQuery->ekey); -// -// /* -// * if current block is the last block of current file, we still close the pos -// * flag, and merge with other meters in the same group -// */ -// if (queryCompleted) { -// saveResult(pSupporter, pMeterQueryInfo, pMeterQueryInfo->lastResRows); -// } -// } -// -// break; -// } -// } -// -// assert(queryCompleted); -// saveResult(pSupporter, pMeterQueryInfo, pMeterQueryInfo->lastResRows); -// -// assert((nextKey >= pQuery->lastKey && QUERY_IS_ASC_QUERY(pQuery)) || -// (nextKey <= pQuery->lastKey && !QUERY_IS_ASC_QUERY(pQuery))); -// -// /* still in the same block to query */ -// getAlignedIntervalQueryRange(pRuntimeEnv, nextKey, pSupporter->rawSKey, pSupporter->rawEKey); -// saveIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo); -// -// int32_t newPos = searchFn((char *)pPrimaryCol, pBlockInfo->size, pQuery->skey, pQuery->order.order); -// assert(newPos == pQuery->pos + steps * factor); -// -// pQuery->pos = newPos; -// } -//} - static void doApplyIntervalQueryOnBlock_rv(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQueryInfo, SBlockInfo *pBlockInfo, int64_t *pPrimaryCol, SField *pFields, __block_search_fn_t searchFn) { @@ -7443,7 +7212,7 @@ static void doApplyIntervalQueryOnBlock_rv(SMeterQuerySupportObj *pSupporter, SM /* * 1. there may be more date that satisfy current query interval, other than * current block, we need to try next data blocks - * 2. query completed, since reaches the upper bound of the main query range + * 2. query completed, since it reaches the upper bound of the main query range */ if (!completed) { /* @@ -7501,13 +7270,14 @@ static void doApplyIntervalQueryOnBlock_rv(SMeterQuerySupportObj *pSupporter, SM pQuery->pos = newPos; } } + int64_t getNextAccessedKeyInData(SQuery *pQuery, int64_t *pPrimaryCol, SBlockInfo *pBlockInfo, int32_t blockStatus) { assert(pQuery->pos >= 0 && pQuery->pos <= pBlockInfo->size - 1); TSKEY key = -1; if (IS_DATA_BLOCK_LOADED(blockStatus)) { key = pPrimaryCol[pQuery->pos]; - } else { + } else {// while the data block is not loaded, the position must be the first or last position assert(pQuery->pos == pBlockInfo->size - 1 || pQuery->pos == 0); key = QUERY_IS_ASC_QUERY(pQuery) ? pBlockInfo->keyFirst : pBlockInfo->keyLast; } @@ -7530,55 +7300,66 @@ void setIntervalQueryRange(SMeterQueryInfo *pMeterQueryInfo, SMeterQuerySupportO SQuery * pQuery = pRuntimeEnv->pQuery; if (pMeterQueryInfo->queryRangeSet) { - assert((QUERY_IS_ASC_QUERY(pQuery) && pQuery->lastKey >= pQuery->skey) || - (!QUERY_IS_ASC_QUERY(pQuery) && pQuery->lastKey <= pQuery->skey)); - - if ((pQuery->ekey < key && QUERY_IS_ASC_QUERY(pQuery)) || (pQuery->ekey > key && !QUERY_IS_ASC_QUERY(pQuery))) { - /* - * last query on this block of the meter is done, start next interval on this block - * otherwise, keep the previous query range and proceed - */ - getAlignedIntervalQueryRange(pRuntimeEnv, key, pSupporter->rawSKey, pSupporter->rawEKey); - saveIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo); - - // previous query does not be closed, save the results and close it - if (pMeterQueryInfo->lastResRows > 0) { - saveResult(pSupporter, pMeterQueryInfo, pMeterQueryInfo->lastResRows); - } - } else { - /* current query not completed, continue. do nothing with respect to query range, */ - } +// assert((QUERY_IS_ASC_QUERY(pQuery) && pQuery->lastKey >= pQuery->skey) || +// (!QUERY_IS_ASC_QUERY(pQuery) && pQuery->lastKey <= pQuery->skey)); +// +// if ((pQuery->ekey < key && QUERY_IS_ASC_QUERY(pQuery)) || (pQuery->ekey > key && !QUERY_IS_ASC_QUERY(pQuery))) { +// /* +// * last query on this block of the meter is done, start next interval on this block +// * otherwise, keep the previous query range and proceed +// */ +// getAlignedIntervalQueryRange(pRuntimeEnv, key, pSupporter->rawSKey, pSupporter->rawEKey); +// saveIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo); +// +// // previous query does not be closed, save the results and close it +// if (pMeterQueryInfo->lastResRows > 0) { +// saveResult(pSupporter, pMeterQueryInfo, pMeterQueryInfo->lastResRows); +// } +// } else { +// /* current query not completed, continue. do nothing with respect to query range, */ +// } } else { pQuery->skey = key; assert(pMeterQueryInfo->lastResRows == 0); + // for too small query range, no data in this interval. if ((QUERY_IS_ASC_QUERY(pQuery) && (pQuery->ekey < pQuery->skey)) || (!QUERY_IS_ASC_QUERY(pQuery) && (pQuery->skey < pQuery->ekey))) { - // for too small query range, no data in this interval. return; } - - // todo - if (pMeterQueryInfo->windowResInfo.prevSKey == 0) { - pMeterQueryInfo->windowResInfo.prevSKey = key; + + /** + * In handling the both ascending and descending order super table query, we need to find the first qualified + * timestamp of this table, and then set the first qualified start timestamp. + * In ascending query, key is the first qualified timestamp. However, in the descending order query, additional + * operations involve. + */ + if (!QUERY_IS_ASC_QUERY(pQuery)) { + + } + + TSKEY skey1, ekey1; + TSKEY windowSKey = 0, windowEKey = 0; + STimeWindow win = {.skey = key, pSupporter->rawEKey}; + SWindowResInfo* pWindowResInfo = &pMeterQueryInfo->windowResInfo; + + doGetAlignedIntervalQueryRangeImpl(pQuery, win.skey, win.skey, win.ekey, &skey1, &ekey1, &windowSKey, &windowEKey); + pWindowResInfo->startTime = windowSKey; + + if (pWindowResInfo->prevSKey == 0) { + pWindowResInfo->prevSKey = windowSKey; } - STimeWindow win = getActiveSlidingWindow(&pMeterQueryInfo->windowResInfo, key, pQuery); - int64_t st = win.skey; - - SWindowResult *pWindowRes = - doSetSlidingWindowFromKey(pRuntimeEnv, &pMeterQueryInfo->windowResInfo, (char *)&st, TSDB_KEYSIZE); + win = getActiveTimeWindow(pWindowResInfo, key, pQuery); + SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, pWindowResInfo, (char *)&win.skey, TSDB_KEYSIZE); if (pWindowRes == NULL) { return; } pWindowRes->window = win; - // setGroupOutputBuffer(pRuntimeEnv, pWindowRes); - // initCtxOutputBuf(pRuntimeEnv); - - // getAlignedIntervalQueryRange(pRuntimeEnv, pQuery->skey, pSupporter->rawSKey, pSupporter->rawEKey); - // saveIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo); pMeterQueryInfo->queryRangeSet = 1; + pMeterQueryInfo->lastKey = win.skey; + pMeterQueryInfo->skey = win.skey; } } @@ -7844,7 +7625,7 @@ static int32_t getNumOfSubset(SMeterQuerySupportObj *pSupporter) { int32_t totalSubset = 0; if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (pQuery->intervalTime > 0 && pQuery->slidingTime > 0)) { - totalSubset = numOfClosedSlidingWindow(&pSupporter->runtimeEnv.windowResInfo); + totalSubset = numOfClosedTimeWindow(&pSupporter->runtimeEnv.windowResInfo); } else { totalSubset = pSupporter->pSidSet->numOfSubSet; } @@ -7935,19 +7716,44 @@ static void applyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMeterD SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv; SQuery * pQuery = pRuntimeEnv->pQuery; SMeterQueryInfo * pMeterQueryInfo = pMeterDataInfo->pMeterQInfo; - + SWindowResInfo* pWindowResInfo = &pMeterQueryInfo->windowResInfo; + int64_t *pPrimaryKey = (int64_t *)pRuntimeEnv->primaryColBuffer->data; + /* * for each block, we need to handle the previous query, since the determination of previous query being completed * or not is based on the start key of current block. */ TSKEY key = getNextAccessedKeyInData(pQuery, pPrimaryKey, pBlockInfo, blockStatus); - setIntervalQueryRange(pMeterDataInfo->pMeterQInfo, pSupporter, key); - - if (((pQuery->skey > pQuery->ekey) && QUERY_IS_ASC_QUERY(pQuery)) || - ((pQuery->skey < pQuery->ekey) && !QUERY_IS_ASC_QUERY(pQuery))) { - return; + setIntervalQueryRange(pMeterQueryInfo, pSupporter, key); + + int32_t forwardStep = + getNumOfRowsInTimeWindow(pQuery, pBlockInfo, pPrimaryKey, pQuery->pos, pQuery->ekey, searchFn, true); + + int32_t numOfRes = 0; + int64_t st = taosGetTimestampUs(); + if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL) { + numOfRes = rowwiseApplyAllFunctions(pRuntimeEnv, &forwardStep, pFields, pBlockInfo, pWindowResInfo); + } else { + numOfRes = blockwiseApplyAllFunctions(pRuntimeEnv, forwardStep, pFields, pBlockInfo, pWindowResInfo, searchFn); } + + int64_t e = taosGetTimestampUs() - st; + printf("-------------------------------%lld\n", e); + + int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); + if ((pQuery->lastKey > pSupporter->rawEKey && QUERY_IS_ASC_QUERY(pQuery)) || + (pQuery->lastKey < pSupporter->rawEKey && !QUERY_IS_ASC_QUERY(pQuery))) { + pMeterQueryInfo->ekey = pQuery->lastKey - step; + } + +// doCheckQueryCompleted(pRuntimeEnv, pQuery->lastKey, pWindowResInfo); + +// +// if (((pQuery->skey > pQuery->ekey) && QUERY_IS_ASC_QUERY(pQuery)) || +// ((pQuery->skey < pQuery->ekey) && !QUERY_IS_ASC_QUERY(pQuery))) { +// return; +// } // if (((pBlockInfo->keyLast < pQuery->ekey) && QUERY_IS_ASC_QUERY(pQuery)) || // ((pBlockInfo->keyFirst > pQuery->ekey) && !QUERY_IS_ASC_QUERY(pQuery))) { @@ -7963,7 +7769,7 @@ static void applyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMeterD // assert(pMeterQueryInfo->lastResRows == 1 || pMeterQueryInfo->lastResRows == 0); // saveIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo); // } else { - doApplyIntervalQueryOnBlock_rv(pSupporter, pMeterQueryInfo, pBlockInfo, pPrimaryKey, pFields, searchFn); +// doApplyIntervalQueryOnBlock_rv(pSupporter, pMeterQueryInfo, pBlockInfo, pPrimaryKey, pFields, searchFn); // } } diff --git a/src/system/detail/src/vnodeQueryProcess.c b/src/system/detail/src/vnodeQueryProcess.c index 9b6404c2af9b8692d580cfc5349b9f44f2e5025e..6ca2666c949189a87d0365c503a428b6dbef5f25 100644 --- a/src/system/detail/src/vnodeQueryProcess.c +++ b/src/system/detail/src/vnodeQueryProcess.c @@ -241,7 +241,7 @@ static void queryOnMultiDataCache(SQInfo *pQInfo, SMeterDataInfo *pMeterInfo) { pRuntimeEnv->blockStatus); totalBlocks++; - queryOnBlock(pSupporter, primaryKeys, pRuntimeEnv->blockStatus, &binfo, &pMeterInfo[k], NULL, searchFn); + queryOnBlock(pSupporter, pRuntimeEnv->blockStatus, &binfo, &pMeterInfo[k], NULL, searchFn); if (ALL_CACHE_BLOCKS_CHECKED(pQuery)) { break; @@ -447,24 +447,8 @@ static void queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMeterDataInfo (pBlock->keyFirst >= pQuery->ekey && pBlock->keyLast <= pQuery->lastKey && !QUERY_IS_ASC_QUERY(pQuery))); } - if (pQuery->intervalTime > 0 && pQuery->slidingTime > 0) { - assert(pMeterQueryInfo->lastKey <= nextKey && QUERY_IS_ASC_QUERY(pQuery)); - - pMeterQueryInfo->lastKey = nextKey; - pQuery->lastKey = nextKey; - if (pMeterQueryInfo->windowResInfo.prevSKey == 0) { - // normalize the window prev time window - - TSKEY skey1, ekey1; - TSKEY windowSKey = 0, windowEKey = 0; - TSKEY skey2 = MIN(pSupporter->rawSKey, pSupporter->rawEKey); - TSKEY ekey2 = MAX(pSupporter->rawSKey, pSupporter->rawEKey); - - doGetAlignedIntervalQueryRangeImpl(pQuery, nextKey, skey2, ekey2, &skey1, &ekey1, &windowSKey, &windowEKey); - - pMeterQueryInfo->windowResInfo.prevSKey = windowSKey; - } + setIntervalQueryRange(pMeterQueryInfo, pSupporter, nextKey); ret = setIntervalQueryExecutionContext(pSupporter, pOneMeterDataInfo->meterOrderIdx, pMeterQueryInfo); if (ret != TSDB_CODE_SUCCESS) { @@ -474,8 +458,7 @@ static void queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMeterDataInfo } } - queryOnBlock(pSupporter, primaryKeys, pRuntimeEnv->blockStatus, &binfo, pOneMeterDataInfo, pInfoEx->pBlock.fields, - searchFn); + queryOnBlock(pSupporter, pRuntimeEnv->blockStatus, &binfo, pOneMeterDataInfo, pInfoEx->pBlock.fields, searchFn); } tfree(pReqMeterDataInfo); @@ -711,7 +694,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) { } resetCtxOutputBuf(pRuntimeEnv); - resetSlidingWindowInfo(pRuntimeEnv, &pRuntimeEnv->windowResInfo); + resetTimeWindowInfo(pRuntimeEnv, &pRuntimeEnv->windowResInfo); while (pSupporter->meterIdx < pSupporter->numOfMeters) { int32_t k = pSupporter->meterIdx; @@ -1115,7 +1098,7 @@ static void vnodeSingleMeterIntervalMainLooper(SMeterQuerySupportObj *pSupporter while (1) { initCtxOutputBuf(pRuntimeEnv); - clearClosedSlidingWindows(pRuntimeEnv); + clearClosedTimeWindow(pRuntimeEnv); vnodeScanAllData(pRuntimeEnv); if (isQueryKilled(pQuery)) { @@ -1124,8 +1107,6 @@ static void vnodeSingleMeterIntervalMainLooper(SMeterQuerySupportObj *pSupporter assert(!Q_STATUS_EQUAL(pQuery->over, QUERY_NOT_COMPLETED)); - // clear tag, used to decide if the whole interval query is completed or not - pQuery->over &= (~QUERY_COMPLETED); doFinalizeResult(pRuntimeEnv); int64_t maxOutput = getNumOfResult(pRuntimeEnv); @@ -1143,7 +1124,7 @@ static void vnodeSingleMeterIntervalMainLooper(SMeterQuerySupportObj *pSupporter // forwardCtxOutputBuf(pRuntimeEnv, maxOutput); } - if (Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK)) { + if (Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK|QUERY_COMPLETED)) { break; } diff --git a/src/system/detail/src/vnodeRead.c b/src/system/detail/src/vnodeRead.c index 9709f5a9a1ab4f9a5e7fb724d05eb887549ba344..8ed8b7a3863c94878f87e7becdcd934f9abc4271 100644 --- a/src/system/detail/src/vnodeRead.c +++ b/src/system/detail/src/vnodeRead.c @@ -670,7 +670,7 @@ void *vnodeQueryOnSingleTable(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyE tsBufNextPos(pTSBuf); } - if (((*code) = vnodeQuerySingleMeterPrepare(pQInfo, pQInfo->pObj, pSupporter, pTSBuf)) != TSDB_CODE_SUCCESS) { + if (((*code) = vnodeQuerySingleTablePrepare(pQInfo, pQInfo->pObj, pSupporter, pTSBuf)) != TSDB_CODE_SUCCESS) { goto _error; }