未验证 提交 79cd86d5 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #18705 from taosdata/fix/TD-20907

fix:stddev\fisrt\last combine function
......@@ -423,6 +423,8 @@ typedef struct SGroupKeyInfo {
(_p).val = (_v); \
} while (0)
static int32_t firstLastTransferInfoImpl(SFirstLastRes* pInput, SFirstLastRes* pOutput, bool isFirst);
bool functionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) {
if (pResultInfo->initialized) {
return false;
......@@ -457,11 +459,12 @@ int32_t firstCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
SFirstLastRes* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
if (pSResInfo->numOfRes != 0 && (pDResInfo->numOfRes == 0 || pDBuf->ts > pSBuf->ts)) {
memcpy(pDBuf->buf, pSBuf->buf, bytes);
pDBuf->ts = pSBuf->ts;
pDResInfo->numOfRes = 1;
if (TSDB_CODE_SUCCESS == firstLastTransferInfoImpl(pSBuf, pDBuf, true)) {
pDBuf->hasResult = true;
}
pDResInfo->numOfRes = TMAX(pDResInfo->numOfRes, pSResInfo->numOfRes);
pDResInfo->isNullRes &= pSResInfo->isNullRes;
return TSDB_CODE_SUCCESS;
}
......@@ -1274,17 +1277,8 @@ int32_t stddevCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
SStddevRes* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
int16_t type = pDBuf->type == TSDB_DATA_TYPE_NULL ? pSBuf->type : pDBuf->type;
if (IS_SIGNED_NUMERIC_TYPE(type)) {
pDBuf->isum += pSBuf->isum;
pDBuf->quadraticISum += pSBuf->quadraticISum;
} else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
pDBuf->usum += pSBuf->usum;
pDBuf->quadraticUSum += pSBuf->quadraticUSum;
} else {
pDBuf->dsum += pSBuf->dsum;
pDBuf->quadraticDSum += pSBuf->quadraticDSum;
}
pDBuf->count += pSBuf->count;
stddevTransferInfo(pSBuf, pDBuf);
pDResInfo->numOfRes = TMAX(pDResInfo->numOfRes, pSResInfo->numOfRes);
pDResInfo->isNullRes &= pSResInfo->isNullRes;
return TSDB_CODE_SUCCESS;
......@@ -2289,16 +2283,15 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
return TSDB_CODE_SUCCESS;
}
static void firstLastTransferInfo(SqlFunctionCtx* pCtx, SFirstLastRes* pInput, SFirstLastRes* pOutput, bool isFirst,
int32_t rowIndex) {
static int32_t firstLastTransferInfoImpl(SFirstLastRes* pInput, SFirstLastRes* pOutput, bool isFirst) {
if (pOutput->hasResult) {
if (isFirst) {
if (pInput->ts > pOutput->ts) {
return;
return TSDB_CODE_FAILED;
}
} else {
if (pInput->ts < pOutput->ts) {
return;
return TSDB_CODE_FAILED;
}
}
}
......@@ -2308,9 +2301,15 @@ static void firstLastTransferInfo(SqlFunctionCtx* pCtx, SFirstLastRes* pInput, S
pOutput->bytes = pInput->bytes;
memcpy(pOutput->buf, pInput->buf, pOutput->bytes);
firstlastSaveTupleData(pCtx->pSrcBlock, rowIndex, pCtx, pOutput);
return TSDB_CODE_SUCCESS;
}
pOutput->hasResult = true;
static void firstLastTransferInfo(SqlFunctionCtx* pCtx, SFirstLastRes* pInput, SFirstLastRes* pOutput, bool isFirst,
int32_t rowIndex) {
if (TSDB_CODE_SUCCESS == firstLastTransferInfoImpl(pInput, pOutput, isFirst)) {
firstlastSaveTupleData(pCtx->pSrcBlock, rowIndex, pCtx, pOutput);
pOutput->hasResult = true;
}
}
static int32_t firstLastFunctionMergeImpl(SqlFunctionCtx* pCtx, bool isFirstQuery) {
......@@ -2378,7 +2377,6 @@ int32_t firstLastPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
return 1;
}
// todo rewrite:
int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
SFirstLastRes* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);
......@@ -2387,11 +2385,12 @@ int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
SFirstLastRes* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
if (pSResInfo->numOfRes != 0 && (pDResInfo->numOfRes == 0 || pDBuf->ts < pSBuf->ts)) {
memcpy(pDBuf->buf, pSBuf->buf, bytes);
pDBuf->ts = pSBuf->ts;
pDResInfo->numOfRes = 1;
if (TSDB_CODE_SUCCESS == firstLastTransferInfoImpl(pSBuf, pDBuf, false)) {
pDBuf->hasResult = true;
}
pDResInfo->numOfRes = TMAX(pDResInfo->numOfRes, pSResInfo->numOfRes);
pDResInfo->isNullRes &= pSResInfo->isNullRes;
return TSDB_CODE_SUCCESS;
}
......
......@@ -197,4 +197,98 @@ if $data01 != 1 then
return -1
endi
sql create database test1 vgroups 1;
sql use test1;
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream streams3 trigger at_once into streamt3 as select _wstart, count(*) c1 from t1 where a > 5 session(ts, 5s);
sql insert into t1 values(1648791213000,1,2,3,1.0);
$loop_count = 0
loop13:
sleep 200
sql select * from streamt3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
# row 0
if $rows != 0 then
print =====rows=$rows
goto loop13
endi
sql insert into t1 values(1648791213000,6,2,3,1.0);
$loop_count = 0
loop14:
sleep 200
sql select * from streamt3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 1 then
print =====data01=$data01
goto loop14
endi
sql insert into t1 values(1648791213000,2,2,3,1.0);
$loop_count = 0
loop15:
sleep 200
sql select * from streamt3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 0 then
print =====rows=$rows
goto loop15
endi
sql insert into t1 values(1648791223000,2,2,3,1.0);
sql insert into t1 values(1648791223000,10,2,3,1.0);
sql insert into t1 values(1648791233000,10,2,3,1.0);
$loop_count = 0
loop16:
sleep 200
sql select * from streamt3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 2 then
print =====rows=$rows
goto loop16
endi
sql insert into t1 values(1648791233000,2,2,3,1.0);
$loop_count = 0
loop17:
sleep 200
sql select * from streamt3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 1 then
print =====rows=$rows
goto loop17
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册