diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 20de7f73bf6baa993885469f85838c6f46386482..a9145ac69df04fdb692f78e78535aeda0df9a101 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -423,6 +423,8 @@ typedef struct SGroupKeyInfo { (_p).val = (_v); \ } while (0) +static int32_t firstLastTransferInfoImpl(SFirstLastRes* pInput, SFirstLastRes* pOutput, bool isFirst); + bool functionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) { if (pResultInfo->initialized) { return false; @@ -457,11 +459,12 @@ int32_t firstCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) { SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx); SFirstLastRes* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo); - if (pSResInfo->numOfRes != 0 && (pDResInfo->numOfRes == 0 || pDBuf->ts > pSBuf->ts)) { - memcpy(pDBuf->buf, pSBuf->buf, bytes); - pDBuf->ts = pSBuf->ts; - pDResInfo->numOfRes = 1; + if (TSDB_CODE_SUCCESS == firstLastTransferInfoImpl(pSBuf, pDBuf, true)) { + pDBuf->hasResult = true; } + + pDResInfo->numOfRes = TMAX(pDResInfo->numOfRes, pSResInfo->numOfRes); + pDResInfo->isNullRes &= pSResInfo->isNullRes; return TSDB_CODE_SUCCESS; } @@ -1274,17 +1277,8 @@ int32_t stddevCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) { SStddevRes* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo); int16_t type = pDBuf->type == TSDB_DATA_TYPE_NULL ? pSBuf->type : pDBuf->type; - if (IS_SIGNED_NUMERIC_TYPE(type)) { - pDBuf->isum += pSBuf->isum; - pDBuf->quadraticISum += pSBuf->quadraticISum; - } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) { - pDBuf->usum += pSBuf->usum; - pDBuf->quadraticUSum += pSBuf->quadraticUSum; - } else { - pDBuf->dsum += pSBuf->dsum; - pDBuf->quadraticDSum += pSBuf->quadraticDSum; - } - pDBuf->count += pSBuf->count; + stddevTransferInfo(pSBuf, pDBuf); + pDResInfo->numOfRes = TMAX(pDResInfo->numOfRes, pSResInfo->numOfRes); pDResInfo->isNullRes &= pSResInfo->isNullRes; return TSDB_CODE_SUCCESS; @@ -2289,16 +2283,15 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { return TSDB_CODE_SUCCESS; } -static void firstLastTransferInfo(SqlFunctionCtx* pCtx, SFirstLastRes* pInput, SFirstLastRes* pOutput, bool isFirst, - int32_t rowIndex) { +static int32_t firstLastTransferInfoImpl(SFirstLastRes* pInput, SFirstLastRes* pOutput, bool isFirst) { if (pOutput->hasResult) { if (isFirst) { if (pInput->ts > pOutput->ts) { - return; + return TSDB_CODE_FAILED; } } else { if (pInput->ts < pOutput->ts) { - return; + return TSDB_CODE_FAILED; } } } @@ -2308,9 +2301,15 @@ static void firstLastTransferInfo(SqlFunctionCtx* pCtx, SFirstLastRes* pInput, S pOutput->bytes = pInput->bytes; memcpy(pOutput->buf, pInput->buf, pOutput->bytes); - firstlastSaveTupleData(pCtx->pSrcBlock, rowIndex, pCtx, pOutput); + return TSDB_CODE_SUCCESS; +} - pOutput->hasResult = true; +static void firstLastTransferInfo(SqlFunctionCtx* pCtx, SFirstLastRes* pInput, SFirstLastRes* pOutput, bool isFirst, + int32_t rowIndex) { + if (TSDB_CODE_SUCCESS == firstLastTransferInfoImpl(pInput, pOutput, isFirst)) { + firstlastSaveTupleData(pCtx->pSrcBlock, rowIndex, pCtx, pOutput); + pOutput->hasResult = true; + } } static int32_t firstLastFunctionMergeImpl(SqlFunctionCtx* pCtx, bool isFirstQuery) { @@ -2378,7 +2377,6 @@ int32_t firstLastPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { return 1; } -// todo rewrite: int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) { SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx); SFirstLastRes* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo); @@ -2387,11 +2385,12 @@ int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) { SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx); SFirstLastRes* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo); - if (pSResInfo->numOfRes != 0 && (pDResInfo->numOfRes == 0 || pDBuf->ts < pSBuf->ts)) { - memcpy(pDBuf->buf, pSBuf->buf, bytes); - pDBuf->ts = pSBuf->ts; - pDResInfo->numOfRes = 1; + if (TSDB_CODE_SUCCESS == firstLastTransferInfoImpl(pSBuf, pDBuf, false)) { + pDBuf->hasResult = true; } + + pDResInfo->numOfRes = TMAX(pDResInfo->numOfRes, pSResInfo->numOfRes); + pDResInfo->isNullRes &= pSResInfo->isNullRes; return TSDB_CODE_SUCCESS; } diff --git a/tests/script/tsim/stream/session1.sim b/tests/script/tsim/stream/session1.sim index ee6eefde2633f6422dc51927421ef109392dc2ae..f535fd619fbedfa87598569ba7edfde82bd18086 100644 --- a/tests/script/tsim/stream/session1.sim +++ b/tests/script/tsim/stream/session1.sim @@ -197,4 +197,98 @@ if $data01 != 1 then return -1 endi +sql create database test1 vgroups 1; +sql use test1; +sql create table t1(ts timestamp, a int, b int , c int, d double); +sql create stream streams3 trigger at_once into streamt3 as select _wstart, count(*) c1 from t1 where a > 5 session(ts, 5s); +sql insert into t1 values(1648791213000,1,2,3,1.0); + +$loop_count = 0 +loop13: +sleep 200 + +sql select * from streamt3; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +# row 0 +if $rows != 0 then + print =====rows=$rows + goto loop13 +endi + +sql insert into t1 values(1648791213000,6,2,3,1.0); + +$loop_count = 0 +loop14: +sleep 200 +sql select * from streamt3; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $data01 != 1 then + print =====data01=$data01 + goto loop14 +endi + +sql insert into t1 values(1648791213000,2,2,3,1.0); + +$loop_count = 0 +loop15: +sleep 200 +sql select * from streamt3; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 0 then + print =====rows=$rows + goto loop15 +endi + + +sql insert into t1 values(1648791223000,2,2,3,1.0); +sql insert into t1 values(1648791223000,10,2,3,1.0); +sql insert into t1 values(1648791233000,10,2,3,1.0); + +$loop_count = 0 +loop16: +sleep 200 +sql select * from streamt3; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 2 then + print =====rows=$rows + goto loop16 +endi + +sql insert into t1 values(1648791233000,2,2,3,1.0); + +$loop_count = 0 +loop17: +sleep 200 +sql select * from streamt3; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 1 then + print =====rows=$rows + goto loop17 +endi + system sh/exec.sh -n dnode1 -s stop -x SIGINT