diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 2980332f7a3020113810a2412296be616815567e..606d035898d5312dc86aabc57633457e263e8b47 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -717,7 +717,7 @@ static FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_se return forwardStep; } -static void doUpdateResultRowIndex(SResultRowInfo*pResultRowInfo, TSKEY lastKey, bool ascQuery) { +static void doUpdateResultRowIndex(SResultRowInfo*pResultRowInfo, TSKEY lastKey, bool ascQuery, bool timeWindowInterpo) { int64_t skey = TSKEY_INITIAL_VAL; int32_t i = 0; for (i = pResultRowInfo->size - 1; i >= 0; --i) { @@ -727,10 +727,22 @@ static void doUpdateResultRowIndex(SResultRowInfo*pResultRowInfo, TSKEY lastKey, } // new closed result rows - if ((pResult->win.ekey <= lastKey && ascQuery) || (pResult->win.skey >= lastKey && !ascQuery)) { - closeResultRow(pResultRowInfo, i); + if (timeWindowInterpo) { + if (pResult->endInterp && ((pResult->win.skey <= lastKey && ascQuery) || (pResult->win.skey >= lastKey && !ascQuery))) { + if (i > 0) { // the first time window, the startInterp is false. + assert(pResult->startInterp); + } + + closeResultRow(pResultRowInfo, i); + } else { + skey = pResult->win.skey; + } } else { - skey = pResult->win.skey; + if ((pResult->win.ekey <= lastKey && ascQuery) || (pResult->win.skey >= lastKey && !ascQuery)) { + closeResultRow(pResultRowInfo, i); + } else { + skey = pResult->win.skey; + } } } @@ -751,13 +763,13 @@ static void doUpdateResultRowIndex(SResultRowInfo*pResultRowInfo, TSKEY lastKey, } } -static void updateResultRowIndex(SResultRowInfo* pResultRowInfo, STableQueryInfo* pTableQueryInfo, bool ascQuery) { +static void updateResultRowIndex(SResultRowInfo* pResultRowInfo, STableQueryInfo* pTableQueryInfo, bool ascQuery, bool timeWindowInterpo) { if ((pTableQueryInfo->lastKey > pTableQueryInfo->win.ekey && ascQuery) || (pTableQueryInfo->lastKey < pTableQueryInfo->win.ekey && (!ascQuery))) { closeAllResultRows(pResultRowInfo); pResultRowInfo->curIndex = pResultRowInfo->size - 1; } else { int32_t step = ascQuery? 1:-1; - doUpdateResultRowIndex(pResultRowInfo, pTableQueryInfo->lastKey - step, ascQuery); + doUpdateResultRowIndex(pResultRowInfo, pTableQueryInfo->lastKey - step, ascQuery, timeWindowInterpo); } } @@ -1076,13 +1088,13 @@ static bool setTimeWindowInterpolationEndTs(SQueryRuntimeEnv* pRuntimeEnv, int32 return true; } -static void saveDataBlockLastRow(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pDataBlockInfo, SArray* pDataBlock) { +static void saveDataBlockLastRow(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pDataBlockInfo, SArray* pDataBlock, + int32_t rowIndex) { if (pDataBlock == NULL) { return; } SQuery* pQuery = pRuntimeEnv->pQuery; - int32_t rowIndex = QUERY_IS_ASC_QUERY(pQuery)? pDataBlockInfo->rows-1:0; for (int32_t k = 0; k < pQuery->numOfCols; ++k) { SColumnInfoData *pColInfo = taosArrayGet(pDataBlock, k); memcpy(pRuntimeEnv->prevRow[k], ((char*)pColInfo->pData) + (pColInfo->info.bytes * rowIndex), pColInfo->info.bytes); @@ -1265,7 +1277,8 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * _end: if (pRuntimeEnv->timeWindowInterpo) { - saveDataBlockLastRow(pRuntimeEnv, pDataBlockInfo, pDataBlock); + int32_t rowIndex = QUERY_IS_ASC_QUERY(pQuery)? pDataBlockInfo->rows-1:0; + saveDataBlockLastRow(pRuntimeEnv, pDataBlockInfo, pDataBlock, rowIndex); } for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { @@ -1512,7 +1525,7 @@ static void setTimeWindowEKeyInterp(SQueryRuntimeEnv* pRuntimeEnv, SArray* pData } static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pStatis, SDataBlockInfo *pDataBlockInfo, - SResultRowInfo *pWindowResInfo, SArray *pDataBlock) { + SResultRowInfo *pWindowResInfo, SArray *pDataBlock) { SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; bool masterScan = IS_MASTER_SCAN(pRuntimeEnv); @@ -1587,7 +1600,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS // interval window query, decide the time window according to the primary timestamp if (QUERY_IS_INTERVAL_QUERY(pQuery)) { int32_t prevWindowIndex = curTimeWindowIndex(pWindowResInfo); - int64_t ts = tsCols[offset]; + int64_t ts = tsCols[offset]; STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery); @@ -1629,8 +1642,6 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS doRowwiseApplyFunctions(pRuntimeEnv, &win, offset); STimeWindow nextWin = win; - int32_t index = pWindowResInfo->curIndex; - while (1) { getNextTimeWindow(pQuery, &nextWin); if ((nextWin.skey > pQuery->window.ekey && QUERY_IS_ASC_QUERY(pQuery)) || @@ -1652,7 +1663,6 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS doRowwiseApplyFunctions(pRuntimeEnv, &nextWin, offset); } - pWindowResInfo->curIndex = index; } else { // other queries // decide which group this rows belongs to according to current state value if (groupbyColumnValue) { @@ -1686,10 +1696,17 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS _end: assert(offset >= 0); + assert(tsCols != NULL); + if (tsCols != NULL) { - item->lastKey = tsCols[offset] + step; + item->lastKey = prevTs + step; } else { - item->lastKey = (QUERY_IS_ASC_QUERY(pQuery)? pDataBlockInfo->window.ekey:pDataBlockInfo->window.skey) + step; + item->lastKey = (QUERY_IS_ASC_QUERY(pQuery) ? pDataBlockInfo->window.ekey : pDataBlockInfo->window.skey) + step; + } + + // In case of all rows in current block are not qualified + if (pRuntimeEnv->timeWindowInterpo && prevRowIndex != -1) { + saveDataBlockLastRow(pRuntimeEnv, pDataBlockInfo, pDataBlock, prevRowIndex); } if (pRuntimeEnv->pTsBuf != NULL) { @@ -1729,7 +1746,7 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl int32_t numOfRes = 0; if (QUERY_IS_INTERVAL_QUERY(pQuery) || pRuntimeEnv->groupbyNormalCol) { numOfRes = pResultRowInfo->size; - updateResultRowIndex(pResultRowInfo, pTableQueryInfo, QUERY_IS_ASC_QUERY(pQuery)); + updateResultRowIndex(pResultRowInfo, pTableQueryInfo, QUERY_IS_ASC_QUERY(pQuery), pRuntimeEnv->timeWindowInterpo); } else { // projection query numOfRes = (int32_t) getNumOfResult(pRuntimeEnv); @@ -4210,7 +4227,7 @@ static void stableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBloc } if (QUERY_IS_INTERVAL_QUERY(pQuery)) { - updateResultRowIndex(pResultRowInfo, pTableQueryInfo, QUERY_IS_ASC_QUERY(pQuery)); + updateResultRowIndex(pResultRowInfo, pTableQueryInfo, QUERY_IS_ASC_QUERY(pQuery), pRuntimeEnv->timeWindowInterpo); } } @@ -4510,7 +4527,11 @@ static TSKEY doSkipIntervalProcess(SQueryRuntimeEnv* pRuntimeEnv, STimeWindow* w static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) { SQuery *pQuery = pRuntimeEnv->pQuery; - assert(*start <= pQuery->current->lastKey); + if (QUERY_IS_ASC_QUERY(pQuery)) { + assert(*start <= pQuery->current->lastKey); + } else { + assert(*start >= pQuery->current->lastKey); + } // if queried with value filter, do NOT forward query start position if (pQuery->limit.offset <= 0 || pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTsBuf != NULL || pRuntimeEnv->pFillInfo != NULL) { diff --git a/tests/script/general/parser/first_last.sim b/tests/script/general/parser/first_last.sim index 773f92afcfd1fb126f137befd9de1107b050356f..df9a4598e0ba5438e45d299d467d86b2989eb825 100644 --- a/tests/script/general/parser/first_last.sim +++ b/tests/script/general/parser/first_last.sim @@ -106,7 +106,7 @@ while $x < 5000 endw system sh/exec.sh -n dnode1 -s stop -x SIGINT -sleep 3000 +sleep 1000 system sh/exec.sh -n dnode1 -s start print ================== server restart completed sql connect diff --git a/tests/script/general/parser/function.sim b/tests/script/general/parser/function.sim index 110abc40c5c320f75a461cd366111505e3577d2a..78b63c2baf460907a2d88836c52f157a8753c89c 100644 --- a/tests/script/general/parser/function.sim +++ b/tests/script/general/parser/function.sim @@ -311,3 +311,53 @@ if $rows != 6 then return -1 endi +print ==================> td-2624 +sql create table tm2(ts timestamp, k int, b binary(12)); +sql insert into tm2 values('2011-01-02 18:42:45.326', -1,'abc'); +sql insert into tm2 values('2020-07-30 17:44:06.283', 0, null); +sql insert into tm2 values('2020-07-30 17:44:19.578', 9999999, null); +sql insert into tm2 values('2020-07-30 17:46:06.417', NULL, null); +sql insert into tm2 values('2020-11-09 18:42:25.538', 0, null); +sql insert into tm2 values('2020-12-29 17:43:11.641', 0, null); +sql insert into tm2 values('2020-12-29 18:43:17.129', 0, null); +sql insert into tm2 values('2020-12-29 18:46:19.109', NULL, null); +sql insert into tm2 values('2021-01-03 18:40:40.065', 0, null); + +sql select twa(k),first(ts) from tm2 where k <50 interval(17s); +if $rows != 6 then + return -1 +endi + +if $data00 != @11-01-02 18:42:42.000@ then + return -1 +endi + +if $data02 != @11-01-02 18:42:45.326@ then + return -1 +endi + +if $data10 != @20-07-30 17:43:59.000@ then + return -1 +endi + +if $data21 != 0.000000000 then + return -1 +endi + +sql select twa(k),first(ts) from tm2 where k <50 interval(17s) order by ts desc; +if $rows != 6 then + return -1 +endi + +sql select twa(k),first(ts),count(k),first(k) from tm2 interval(17s) limit 20 offset 0; +if $rows != 9 then + return -1 +endi + +if $data00 != @11-01-02 18:42:42.000@ then + return -1 +endi + +if $data10 != @20-07-30 17:43:59.000@ then + return -1 +endi