提交 8c47b350 编写于 作者: H Haojun Liao

fix(query): Pseudo time window result expands to multiple rows in case of...

fix(query): Pseudo time window result expands to multiple rows in case of multiple rows aggregates function existing.
上级 6c30d170
...@@ -619,7 +619,7 @@ void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* ...@@ -619,7 +619,7 @@ void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData*
continue; continue;
} }
if (functionNeedToExecute(&pCtx[k])) { if (functionNeedToExecute(&pCtx[k]) && pCtx[k].fpSet.process != NULL) {
pCtx[k].fpSet.process(&pCtx[k]); pCtx[k].fpSet.process(&pCtx[k]);
} }
...@@ -802,11 +802,14 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt ...@@ -802,11 +802,14 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt
static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunctionCtx* pCtx) { static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunctionCtx* pCtx) {
for (int32_t k = 0; k < pOperator->numOfExprs; ++k) { for (int32_t k = 0; k < pOperator->numOfExprs; ++k) {
if (functionNeedToExecute(&pCtx[k])) { if (functionNeedToExecute(&pCtx[k])) {
pCtx[k].startTs = startTs; // this can be set during create the struct pCtx[k].startTs = startTs;
if (pCtx[k].fpSet.process != NULL) // this can be set during create the struct
// todo add a dummy funtion to avoid process check
if (pCtx[k].fpSet.process != NULL) {
pCtx[k].fpSet.process(&pCtx[k]); pCtx[k].fpSet.process(&pCtx[k]);
} }
} }
}
} }
static void setPseudoOutputColInfo(SSDataBlock* pResult, SqlFunctionCtx* pCtx, SArray* pPseudoList) { static void setPseudoOutputColInfo(SSDataBlock* pResult, SqlFunctionCtx* pCtx, SArray* pPseudoList) {
...@@ -1087,7 +1090,7 @@ static int32_t setCtxTagColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutput) { ...@@ -1087,7 +1090,7 @@ static int32_t setCtxTagColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
for (int32_t i = 0; i < numOfOutput; ++i) { for (int32_t i = 0; i < numOfOutput; ++i) {
if (strcmp(pCtx[i].pExpr->pExpr->_function.functionName, "_select_value") == 0) { if (strcmp(pCtx[i].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
pValCtx[num++] = &pCtx[i]; pValCtx[num++] = &pCtx[i];
} else { } else if (fmIsAggFunc(pCtx[i].functionId)) {
p = &pCtx[i]; p = &pCtx[i];
} }
// if (functionId == FUNCTION_TAG_DUMMY || functionId == FUNCTION_TS_DUMMY) { // if (functionId == FUNCTION_TAG_DUMMY || functionId == FUNCTION_TS_DUMMY) {
...@@ -2179,17 +2182,15 @@ int32_t doCopyToSDataBlock(SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbased ...@@ -2179,17 +2182,15 @@ int32_t doCopyToSDataBlock(SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbased
int32_t numOfResult = pBlock->info.rows; // there are already exists result rows int32_t numOfResult = pBlock->info.rows; // there are already exists result rows
int32_t start = 0; int32_t start = 0;
int32_t step = -1; int32_t step = 1;
// qDebug("QInfo:0x%"PRIx64" start to copy data from windowResInfo to output buf", GET_TASKID(pRuntimeEnv)); // qDebug("QInfo:0x%"PRIx64" start to copy data from windowResInfo to output buf", GET_TASKID(pRuntimeEnv));
assert(orderType == TSDB_ORDER_ASC || orderType == TSDB_ORDER_DESC); assert(orderType == TSDB_ORDER_ASC || orderType == TSDB_ORDER_DESC);
if (orderType == TSDB_ORDER_ASC) { if (orderType == TSDB_ORDER_ASC) {
start = pGroupResInfo->index; start = pGroupResInfo->index;
step = 1;
} else { // desc order copy all data } else { // desc order copy all data
start = numOfRows - pGroupResInfo->index - 1; start = numOfRows - pGroupResInfo->index - 1;
step = -1;
} }
for (int32_t i = start; (i < numOfRows) && (i >= 0); i += step) { for (int32_t i = start; (i < numOfRows) && (i >= 0); i += step) {
...@@ -2219,10 +2220,13 @@ int32_t doCopyToSDataBlock(SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbased ...@@ -2219,10 +2220,13 @@ int32_t doCopyToSDataBlock(SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbased
} else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) { } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
// do nothing, todo refactor // do nothing, todo refactor
} else { } else {
// expand the result into multiple rows. E.g., _wstartts, top(k, 20)
// the _wstartts needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId); SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
char* in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo); char* in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
colDataAppend(pColInfoData, pBlock->info.rows, in, pCtx[j].resultInfo->isNullRes); for(int32_t k = 0; k < pRow->numOfRows; ++k) {
colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
}
} }
} }
......
...@@ -25,6 +25,7 @@ extern "C" { ...@@ -25,6 +25,7 @@ extern "C" {
bool functionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); bool functionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
int32_t dummyProcess(SqlFunctionCtx* UNUSED_PARAM(pCtx));
int32_t functionFinalizeWithResultBuf(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, char* finalResult); int32_t functionFinalizeWithResultBuf(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, char* finalResult);
EFuncDataRequired countDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow); EFuncDataRequired countDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow);
......
...@@ -1064,7 +1064,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -1064,7 +1064,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.translateFunc = translateSelectValue, .translateFunc = translateSelectValue,
.getEnvFunc = getSelectivityFuncEnv, // todo remove this function later. .getEnvFunc = getSelectivityFuncEnv, // todo remove this function later.
.initFunc = functionSetup, .initFunc = functionSetup,
.sprocessFunc = NULL, .processFunc = NULL,
.finalizeFunc = NULL .finalizeFunc = NULL
} }
}; };
......
...@@ -184,7 +184,6 @@ int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { ...@@ -184,7 +184,6 @@ int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
pResInfo->isNullRes = (pResInfo->numOfRes == 0) ? 1 : 0; pResInfo->isNullRes = (pResInfo->numOfRes == 0) ? 1 : 0;
/*cleanupResultRowEntry(pResInfo);*/
char* in = GET_ROWCELL_INTERBUF(pResInfo); char* in = GET_ROWCELL_INTERBUF(pResInfo);
colDataAppend(pCol, pBlock->info.rows, in, pResInfo->isNullRes); colDataAppend(pCol, pBlock->info.rows, in, pResInfo->isNullRes);
...@@ -192,6 +191,10 @@ int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { ...@@ -192,6 +191,10 @@ int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
return pResInfo->numOfRes; return pResInfo->numOfRes;
} }
int32_t dummyProcess(SqlFunctionCtx* UNUSED_PARAM(pCtx)) {
return 0;
}
int32_t functionFinalizeWithResultBuf(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, char* finalResult) { int32_t functionFinalizeWithResultBuf(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, char* finalResult) {
int32_t slotId = pCtx->pExpr->base.resSchema.slotId; int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册