提交 129c9251 编写于 作者: X Xiaoyu Wang

fix: csum/diff/mavg support subquery

上级 e235c983
...@@ -1944,7 +1944,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -1944,7 +1944,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{ {
.name = "irate", .name = "irate",
.type = FUNCTION_TYPE_IRATE, .type = FUNCTION_TYPE_IRATE,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC, .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC,
.translateFunc = translateIrate, .translateFunc = translateIrate,
.getEnvFunc = getIrateFuncEnv, .getEnvFunc = getIrateFuncEnv,
.initFunc = irateFuncSetup, .initFunc = irateFuncSetup,
...@@ -2151,7 +2151,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -2151,7 +2151,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{ {
.name = "stateduration", .name = "stateduration",
.type = FUNCTION_TYPE_STATE_DURATION, .type = FUNCTION_TYPE_STATE_DURATION,
.classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_FORBID_WINDOW_FUNC, .classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_FORBID_WINDOW_FUNC,
.translateFunc = translateStateDuration, .translateFunc = translateStateDuration,
.getEnvFunc = getStateFuncEnv, .getEnvFunc = getStateFuncEnv,
.initFunc = functionSetup, .initFunc = functionSetup,
......
...@@ -80,7 +80,7 @@ typedef struct STopBotRes { ...@@ -80,7 +80,7 @@ typedef struct STopBotRes {
typedef struct SFirstLastRes { typedef struct SFirstLastRes {
bool hasResult; bool hasResult;
bool isNull; //used for last_row function only bool isNull; // used for last_row function only
int32_t bytes; int32_t bytes;
char buf[]; char buf[];
} SFirstLastRes; } SFirstLastRes;
...@@ -2802,7 +2802,7 @@ int32_t lastRowFunction(SqlFunctionCtx* pCtx) { ...@@ -2802,7 +2802,7 @@ int32_t lastRowFunction(SqlFunctionCtx* pCtx) {
} }
*(TSKEY*)(pInfo->buf) = cts; *(TSKEY*)(pInfo->buf) = cts;
numOfElems++; numOfElems++;
//handle selectivity // handle selectivity
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
STuplePos* pTuplePos = (STuplePos*)(pInfo->buf + bytes + sizeof(TSKEY)); STuplePos* pTuplePos = (STuplePos*)(pInfo->buf + bytes + sizeof(TSKEY));
if (!pInfo->hasResult) { if (!pInfo->hasResult) {
...@@ -2812,7 +2812,7 @@ int32_t lastRowFunction(SqlFunctionCtx* pCtx) { ...@@ -2812,7 +2812,7 @@ int32_t lastRowFunction(SqlFunctionCtx* pCtx) {
} }
} }
pInfo->hasResult = true; pInfo->hasResult = true;
//DO_UPDATE_TAG_COLUMNS(pCtx, ts); // DO_UPDATE_TAG_COLUMNS(pCtx, ts);
pResInfo->numOfRes = 1; pResInfo->numOfRes = 1;
} }
break; break;
...@@ -2834,7 +2834,7 @@ int32_t lastRowFunction(SqlFunctionCtx* pCtx) { ...@@ -2834,7 +2834,7 @@ int32_t lastRowFunction(SqlFunctionCtx* pCtx) {
} }
*(TSKEY*)(pInfo->buf) = cts; *(TSKEY*)(pInfo->buf) = cts;
numOfElems++; numOfElems++;
//handle selectivity // handle selectivity
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
STuplePos* pTuplePos = (STuplePos*)(pInfo->buf + bytes + sizeof(TSKEY)); STuplePos* pTuplePos = (STuplePos*)(pInfo->buf + bytes + sizeof(TSKEY));
if (!pInfo->hasResult) { if (!pInfo->hasResult) {
...@@ -2845,7 +2845,7 @@ int32_t lastRowFunction(SqlFunctionCtx* pCtx) { ...@@ -2845,7 +2845,7 @@ int32_t lastRowFunction(SqlFunctionCtx* pCtx) {
} }
pInfo->hasResult = true; pInfo->hasResult = true;
pResInfo->numOfRes = 1; pResInfo->numOfRes = 1;
//DO_UPDATE_TAG_COLUMNS(pCtx, ts); // DO_UPDATE_TAG_COLUMNS(pCtx, ts);
} }
break; break;
} }
...@@ -2855,7 +2855,6 @@ int32_t lastRowFunction(SqlFunctionCtx* pCtx) { ...@@ -2855,7 +2855,6 @@ int32_t lastRowFunction(SqlFunctionCtx* pCtx) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t lastRowFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { int32_t lastRowFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
int32_t slotId = pCtx->pExpr->base.resSchema.slotId; int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
...@@ -2864,14 +2863,13 @@ int32_t lastRowFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { ...@@ -2864,14 +2863,13 @@ int32_t lastRowFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SFirstLastRes* pRes = GET_ROWCELL_INTERBUF(pResInfo); SFirstLastRes* pRes = GET_ROWCELL_INTERBUF(pResInfo);
colDataAppend(pCol, pBlock->info.rows, pRes->buf + sizeof(TSKEY), pRes->isNull); colDataAppend(pCol, pBlock->info.rows, pRes->buf + sizeof(TSKEY), pRes->isNull);
//handle selectivity // handle selectivity
STuplePos* pTuplePos = (STuplePos*)(pRes->buf + pRes->bytes + sizeof(TSKEY)); STuplePos* pTuplePos = (STuplePos*)(pRes->buf + pRes->bytes + sizeof(TSKEY));
setSelectivityValue(pCtx, pBlock, pTuplePos, pBlock->info.rows); setSelectivityValue(pCtx, pBlock, pTuplePos, pBlock->info.rows);
return pResInfo->numOfRes; return pResInfo->numOfRes;
} }
bool getDiffFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { bool getDiffFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
pEnv->calcMemSize = sizeof(SDiffInfo); pEnv->calcMemSize = sizeof(SDiffInfo);
return true; return true;
...@@ -2885,7 +2883,11 @@ bool diffFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) { ...@@ -2885,7 +2883,11 @@ bool diffFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) {
SDiffInfo* pDiffInfo = GET_ROWCELL_INTERBUF(pResInfo); SDiffInfo* pDiffInfo = GET_ROWCELL_INTERBUF(pResInfo);
pDiffInfo->hasPrev = false; pDiffInfo->hasPrev = false;
pDiffInfo->prev.i64 = 0; pDiffInfo->prev.i64 = 0;
if (pCtx->numOfParams > 1) {
pDiffInfo->ignoreNegative = pCtx->param[1].param.i; // TODO set correct param pDiffInfo->ignoreNegative = pCtx->param[1].param.i; // TODO set correct param
} else {
pDiffInfo->ignoreNegative = false;
}
pDiffInfo->includeNull = false; pDiffInfo->includeNull = false;
pDiffInfo->firstOutput = false; pDiffInfo->firstOutput = false;
return true; return true;
...@@ -3000,10 +3002,8 @@ int32_t diffFunction(SqlFunctionCtx* pCtx) { ...@@ -3000,10 +3002,8 @@ int32_t diffFunction(SqlFunctionCtx* pCtx) {
SInputColumnInfoData* pInput = &pCtx->input; SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pInputCol = pInput->pData[0]; SColumnInfoData* pInputCol = pInput->pData[0];
SColumnInfoData* pTsOutput = pCtx->pTsOutput;
int32_t numOfElems = 0; int32_t numOfElems = 0;
TSKEY* tsList = (int64_t*)pInput->pPTS->pData;
int32_t startOffset = pCtx->offset; int32_t startOffset = pCtx->offset;
SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput; SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
...@@ -3015,9 +3015,6 @@ int32_t diffFunction(SqlFunctionCtx* pCtx) { ...@@ -3015,9 +3015,6 @@ int32_t diffFunction(SqlFunctionCtx* pCtx) {
if (colDataIsNull_f(pInputCol->nullbitmap, i)) { if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
if (pDiffInfo->includeNull) { if (pDiffInfo->includeNull) {
colDataSetNull_f(pOutput->nullbitmap, pos); colDataSetNull_f(pOutput->nullbitmap, pos);
if (tsList != NULL) {
colDataAppendInt64(pTsOutput, pos, &tsList[i]);
}
numOfElems += 1; numOfElems += 1;
} }
...@@ -3028,9 +3025,6 @@ int32_t diffFunction(SqlFunctionCtx* pCtx) { ...@@ -3028,9 +3025,6 @@ int32_t diffFunction(SqlFunctionCtx* pCtx) {
if (pDiffInfo->hasPrev) { if (pDiffInfo->hasPrev) {
doHandleDiff(pDiffInfo, pInputCol->info.type, pv, pOutput, pos, pCtx->order); doHandleDiff(pDiffInfo, pInputCol->info.type, pv, pOutput, pos, pCtx->order);
if (pTsOutput != NULL) {
colDataAppendInt64(pTsOutput, pos, &tsList[i]);
}
numOfElems++; numOfElems++;
} else { } else {
...@@ -3046,9 +3040,6 @@ int32_t diffFunction(SqlFunctionCtx* pCtx) { ...@@ -3046,9 +3040,6 @@ int32_t diffFunction(SqlFunctionCtx* pCtx) {
if (colDataIsNull_f(pInputCol->nullbitmap, i)) { if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
if (pDiffInfo->includeNull) { if (pDiffInfo->includeNull) {
colDataSetNull_f(pOutput->nullbitmap, pos); colDataSetNull_f(pOutput->nullbitmap, pos);
if (tsList != NULL) {
colDataAppendInt64(pTsOutput, pos, &tsList[i]);
}
numOfElems += 1; numOfElems += 1;
} }
...@@ -3060,9 +3051,6 @@ int32_t diffFunction(SqlFunctionCtx* pCtx) { ...@@ -3060,9 +3051,6 @@ int32_t diffFunction(SqlFunctionCtx* pCtx) {
// there is a row of previous data block to be handled in the first place. // there is a row of previous data block to be handled in the first place.
if (pDiffInfo->hasPrev) { if (pDiffInfo->hasPrev) {
doHandleDiff(pDiffInfo, pInputCol->info.type, pv, pOutput, pos, pCtx->order); doHandleDiff(pDiffInfo, pInputCol->info.type, pv, pOutput, pos, pCtx->order);
if (pTsOutput != NULL) {
colDataAppendInt64(pTsOutput, pos, &pDiffInfo->prevTs);
}
numOfElems++; numOfElems++;
} else { } else {
...@@ -3070,9 +3058,6 @@ int32_t diffFunction(SqlFunctionCtx* pCtx) { ...@@ -3070,9 +3058,6 @@ int32_t diffFunction(SqlFunctionCtx* pCtx) {
} }
pDiffInfo->hasPrev = true; pDiffInfo->hasPrev = true;
if (pTsOutput != NULL) {
pDiffInfo->prevTs = tsList[i];
}
} }
} }
...@@ -3573,7 +3558,7 @@ bool elapsedFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo ...@@ -3573,7 +3558,7 @@ bool elapsedFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo
pInfo->min = MAX_TS_KEY; pInfo->min = MAX_TS_KEY;
pInfo->max = 0; pInfo->max = 0;
if (pCtx->numOfParams > 2) { if (pCtx->numOfParams > 1) {
pInfo->timeUnit = pCtx->param[1].param.i; pInfo->timeUnit = pCtx->param[1].param.i;
} else { } else {
pInfo->timeUnit = 1; pInfo->timeUnit = 1;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册