From c5e29cd1170f8db7493aa093fba7807dbd6c8424 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 4 Mar 2022 18:19:52 +0800 Subject: [PATCH] rollback rm useless code & add filter logic for tail function --- src/query/src/qAggMain.c | 20 ++++++++++++++++++-- src/query/src/qExecutor.c | 3 ++- 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/src/query/src/qAggMain.c b/src/query/src/qAggMain.c index 8f3cf9350a..4f221d6837 100644 --- a/src/query/src/qAggMain.c +++ b/src/query/src/qAggMain.c @@ -249,7 +249,6 @@ typedef struct { } TailUnit; typedef struct STailInfo { - int32_t offset; int32_t num; TailUnit **res; } STailInfo; @@ -969,6 +968,23 @@ static int32_t lastDistFuncRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_ } } +static int32_t tailFuncRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t colId) { + // not initialized yet, it is the first block, load it. + if (pCtx->pOutput == NULL) { + return BLK_DATA_ALL_NEEDED; + } + + // the pCtx should be set to current Ctx and output buffer before call this function. Otherwise, pCtx->pOutput is + // the previous windowRes output buffer, not current unloaded block. In this case, the following filter is invalid + STailInfo *pInfo = (STailInfo*) (pCtx->pOutput); + TailUnit **pList = pInfo->res; + if (pInfo->num >= pCtx->param[0].i64 && pList[0]->timestamp > w->ekey){ + return BLK_DATA_NO_NEEDED; + } else { + return BLK_DATA_ALL_NEEDED; + } +} + ////////////////////////////////////////////////////////////////////////////////////////////// /* * The intermediate result of average is kept in the interResultBuf. @@ -6140,6 +6156,6 @@ SAggFunctionInfo aAggs[TSDB_FUNC_MAX_NUM] = {{ tail_function, tail_func_finalizer, tail_func_merge, - dataBlockRequired, + tailFuncRequired, } }; diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 6cc3db3f39..9279b66e64 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -6841,7 +6841,7 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) { } SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; - + int32_t order = pQueryAttr->order.order; SOperatorInfo* upstream = pOperator->upstream[0]; STableId prevId = {0, 0}; @@ -6871,6 +6871,7 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) { } pOperator->status = OP_RES_TO_RETURN; + pQueryAttr->order.order = order; // TODO : restore the order doCloseAllTimeWindow(pRuntimeEnv); setQueryStatus(pRuntimeEnv, QUERY_COMPLETED); -- GitLab