提交 c5e29cd1 编写于 作者: wmmhello's avatar wmmhello

rollback rm useless code & add filter logic for tail function

上级 fd2ada64
...@@ -249,7 +249,6 @@ typedef struct { ...@@ -249,7 +249,6 @@ typedef struct {
} TailUnit; } TailUnit;
typedef struct STailInfo { typedef struct STailInfo {
int32_t offset;
int32_t num; int32_t num;
TailUnit **res; TailUnit **res;
} STailInfo; } STailInfo;
...@@ -969,6 +968,23 @@ static int32_t lastDistFuncRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_ ...@@ -969,6 +968,23 @@ static int32_t lastDistFuncRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_
} }
} }
static int32_t tailFuncRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t colId) {
// not initialized yet, it is the first block, load it.
if (pCtx->pOutput == NULL) {
return BLK_DATA_ALL_NEEDED;
}
// the pCtx should be set to current Ctx and output buffer before call this function. Otherwise, pCtx->pOutput is
// the previous windowRes output buffer, not current unloaded block. In this case, the following filter is invalid
STailInfo *pInfo = (STailInfo*) (pCtx->pOutput);
TailUnit **pList = pInfo->res;
if (pInfo->num >= pCtx->param[0].i64 && pList[0]->timestamp > w->ekey){
return BLK_DATA_NO_NEEDED;
} else {
return BLK_DATA_ALL_NEEDED;
}
}
////////////////////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////////////////////
/* /*
* The intermediate result of average is kept in the interResultBuf. * The intermediate result of average is kept in the interResultBuf.
...@@ -6140,6 +6156,6 @@ SAggFunctionInfo aAggs[TSDB_FUNC_MAX_NUM] = {{ ...@@ -6140,6 +6156,6 @@ SAggFunctionInfo aAggs[TSDB_FUNC_MAX_NUM] = {{
tail_function, tail_function,
tail_func_finalizer, tail_func_finalizer,
tail_func_merge, tail_func_merge,
dataBlockRequired, tailFuncRequired,
} }
}; };
...@@ -6841,7 +6841,7 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) { ...@@ -6841,7 +6841,7 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) {
} }
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
int32_t order = pQueryAttr->order.order;
SOperatorInfo* upstream = pOperator->upstream[0]; SOperatorInfo* upstream = pOperator->upstream[0];
STableId prevId = {0, 0}; STableId prevId = {0, 0};
...@@ -6871,6 +6871,7 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) { ...@@ -6871,6 +6871,7 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) {
} }
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
pQueryAttr->order.order = order; // TODO : restore the order
doCloseAllTimeWindow(pRuntimeEnv); doCloseAllTimeWindow(pRuntimeEnv);
setQueryStatus(pRuntimeEnv, QUERY_COMPLETED); setQueryStatus(pRuntimeEnv, QUERY_COMPLETED);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册