提交 14fe0655 编写于 作者: H Haojun Liao

fix(query): fix no-fill expression calculation bug.

上级 3fcd60e6
...@@ -672,8 +672,9 @@ typedef struct SFillOperatorInfo { ...@@ -672,8 +672,9 @@ typedef struct SFillOperatorInfo {
uint64_t curGroupId; // current handled group id uint64_t curGroupId; // current handled group id
SExprInfo* pExprInfo; SExprInfo* pExprInfo;
int32_t numOfExpr; int32_t numOfExpr;
SExprInfo* pNotFillExprInfo; SExprSupp noFillExprSupp;
int32_t numOfNotFillExpr; // SExprInfo* noFillExprInfo;
// int32_t numOfNoFillExpr;
} SFillOperatorInfo; } SFillOperatorInfo;
typedef struct SGroupbyOperatorInfo { typedef struct SGroupbyOperatorInfo {
...@@ -1017,8 +1018,7 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi ...@@ -1017,8 +1018,7 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi
int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx, int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx,
int32_t numOfOutput, SArray* pPseudoList); int32_t numOfOutput, SArray* pPseudoList);
void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order, void setInputDataBlock(SExprSupp* pExprSupp, SSDataBlock* pBlock, int32_t order, int32_t scanFlag, bool createDummyCol);
int32_t scanFlag, bool createDummyCol);
bool isTaskKilled(SExecTaskInfo* pTaskInfo); bool isTaskKilled(SExecTaskInfo* pTaskInfo);
int32_t checkForQueryBuf(size_t numOfTables); int32_t checkForQueryBuf(size_t numOfTables);
......
...@@ -403,25 +403,24 @@ void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfo ...@@ -403,25 +403,24 @@ void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfo
} }
} }
static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order, static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag,
int32_t scanFlag, bool createDummyCol); bool createDummyCol);
static void doSetInputDataBlockInfo(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, static void doSetInputDataBlockInfo(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order) {
int32_t order) { SqlFunctionCtx* pCtx = pExprSup->pCtx;
for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) { for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) {
pCtx[i].order = order; pCtx[i].order = order;
pCtx[i].input.numOfRows = pBlock->info.rows; pCtx[i].input.numOfRows = pBlock->info.rows;
setBlockSMAInfo(&pCtx[i], &pOperator->exprSupp.pExprInfo[i], pBlock); setBlockSMAInfo(&pCtx[i], &pExprSup->pExprInfo[i], pBlock);
pCtx[i].pSrcBlock = pBlock; pCtx[i].pSrcBlock = pBlock;
} }
} }
void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order, void setInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag, bool createDummyCol) {
int32_t scanFlag, bool createDummyCol) {
if (pBlock->pBlockAgg != NULL) { if (pBlock->pBlockAgg != NULL) {
doSetInputDataBlockInfo(pOperator, pCtx, pBlock, order); doSetInputDataBlockInfo(pExprSup, pBlock, order);
} else { } else {
doSetInputDataBlock(pOperator, pCtx, pBlock, order, scanFlag, createDummyCol); doSetInputDataBlock(pExprSup, pBlock, order, scanFlag, createDummyCol);
} }
} }
...@@ -468,11 +467,12 @@ static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunc ...@@ -468,11 +467,12 @@ static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunc
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order, static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag,
int32_t scanFlag, bool createDummyCol) { bool createDummyCol) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SqlFunctionCtx* pCtx = pExprSup->pCtx;
for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) { for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) {
pCtx[i].order = order; pCtx[i].order = order;
pCtx[i].input.numOfRows = pBlock->info.rows; pCtx[i].input.numOfRows = pBlock->info.rows;
...@@ -483,7 +483,7 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt ...@@ -483,7 +483,7 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt
pInput->uid = pBlock->info.uid; pInput->uid = pBlock->info.uid;
pInput->colDataAggIsSet = false; pInput->colDataAggIsSet = false;
SExprInfo* pOneExpr = &pOperator->exprSupp.pExprInfo[i]; SExprInfo* pOneExpr = &pExprSup->pExprInfo[i];
for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) { for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) {
SFunctParam* pFuncParam = &pOneExpr->base.pParam[j]; SFunctParam* pFuncParam = &pOneExpr->base.pParam[j];
if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) { if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
...@@ -2434,7 +2434,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { ...@@ -2434,7 +2434,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.groupId); setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.groupId);
setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, true); setInputDataBlock(pSup, pBlock, order, scanFlag, true);
code = doAggregateImpl(pOperator, pSup->pCtx); code = doAggregateImpl(pOperator, pSup->pCtx);
if (code != 0) { if (code != 0) {
T_LONG_JMP(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);
...@@ -2733,28 +2733,16 @@ static void doHandleRemainBlockFromNewGroup(SOperatorInfo* pOperator, SFillOpera ...@@ -2733,28 +2733,16 @@ static void doHandleRemainBlockFromNewGroup(SOperatorInfo* pOperator, SFillOpera
static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag) { static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag) {
SFillOperatorInfo* pInfo = pOperator->info; SFillOperatorInfo* pInfo = pOperator->info;
SExprSupp* pSup = &pOperator->exprSupp; SExprSupp* pSup = &pOperator->exprSupp;
SSDataBlock* pResBlock = pInfo->pFinalRes; setInputDataBlock(pSup, pBlock, order, scanFlag, false);
setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, false);
projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs, NULL); projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs, NULL);
pInfo->pRes->info.groupId = pBlock->info.groupId;
SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pInfo->primaryTsCol);
SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, pInfo->primarySrcSlotId);
colDataAssign(pDst, pSrc, pInfo->pRes->info.rows, &pResBlock->info);
for (int32_t i = 0; i < pInfo->numOfNotFillExpr; ++i) {
SFillColInfo* pCol = &pInfo->pFillInfo->pFillCol[i + pInfo->numOfExpr];
ASSERT(pCol->notFillCol);
SExprInfo* pExpr = pCol->pExpr; // reset the row value before applying the no-fill functions to the input data block, which is "pBlock" in this case.
int32_t srcSlotId = pExpr->base.pParam[0].pCol->slotId; pInfo->pRes->info.rows = 0;
int32_t dstSlotId = pExpr->base.resSchema.slotId; SExprSupp* pNoFillSupp = &pInfo->noFillExprSupp;
setInputDataBlock(pNoFillSupp, pBlock, order, scanFlag, false);
SColumnInfoData* pDst1 = taosArrayGet(pInfo->pRes->pDataBlock, dstSlotId); projectApplyFunctions(pNoFillSupp->pExprInfo, pInfo->pRes, pBlock, pNoFillSupp->pCtx, pNoFillSupp->numOfExprs, NULL);
SColumnInfoData* pSrc1 = taosArrayGet(pBlock->pDataBlock, srcSlotId); pInfo->pRes->info.groupId = pBlock->info.groupId;
colDataAssign(pDst1, pSrc1, pInfo->pRes->info.rows, &pResBlock->info);
}
} }
static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) { static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
...@@ -3142,10 +3130,7 @@ void destroyFillOperatorInfo(void* param) { ...@@ -3142,10 +3130,7 @@ void destroyFillOperatorInfo(void* param) {
pInfo->pRes = blockDataDestroy(pInfo->pRes); pInfo->pRes = blockDataDestroy(pInfo->pRes);
pInfo->pFinalRes = blockDataDestroy(pInfo->pFinalRes); pInfo->pFinalRes = blockDataDestroy(pInfo->pFinalRes);
if (pInfo->pNotFillExprInfo != NULL) { cleanupExprSupp(&pInfo->noFillExprSupp);
destroyExprInfo(pInfo->pNotFillExprInfo, pInfo->numOfNotFillExpr);
taosMemoryFree(pInfo->pNotFillExprInfo);
}
taosMemoryFreeClear(pInfo->p); taosMemoryFreeClear(pInfo->p);
taosArrayDestroy(pInfo->pColMatchColInfo); taosArrayDestroy(pInfo->pColMatchColInfo);
...@@ -3210,11 +3195,12 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t ...@@ -3210,11 +3195,12 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t
} }
static bool isWstartColumnExist(SFillOperatorInfo* pInfo) { static bool isWstartColumnExist(SFillOperatorInfo* pInfo) {
if (pInfo->numOfNotFillExpr == 0) { if (pInfo->noFillExprSupp.numOfExprs == 0) {
return false; return false;
} }
for (int32_t i = 0; i < pInfo->numOfNotFillExpr; ++i) {
SExprInfo* exprInfo = pInfo->pNotFillExprInfo + i; for (int32_t i = 0; i < pInfo->noFillExprSupp.numOfExprs; ++i) {
SExprInfo* exprInfo = pInfo->noFillExprSupp.pExprInfo + i;
if (exprInfo->pExpr->nodeType == QUERY_NODE_COLUMN && exprInfo->base.numOfParams == 1 && if (exprInfo->pExpr->nodeType == QUERY_NODE_COLUMN && exprInfo->base.numOfParams == 1 &&
exprInfo->base.pParam[0].pCol->colType == COLUMN_TYPE_WINDOW_START) { exprInfo->base.pParam[0].pCol->colType == COLUMN_TYPE_WINDOW_START) {
return true; return true;
...@@ -3223,25 +3209,24 @@ static bool isWstartColumnExist(SFillOperatorInfo* pInfo) { ...@@ -3223,25 +3209,24 @@ static bool isWstartColumnExist(SFillOperatorInfo* pInfo) {
return false; return false;
} }
static int32_t createWStartTsAsNotFillExpr(SFillOperatorInfo* pInfo, SFillPhysiNode* pPhyFillNode) { static int32_t createPrimaryTsExprIfNeeded(SFillOperatorInfo* pInfo, SFillPhysiNode* pPhyFillNode, SExprSupp* pExprSupp,
const char* idStr) {
bool wstartExist = isWstartColumnExist(pInfo); bool wstartExist = isWstartColumnExist(pInfo);
if (wstartExist == false) { if (wstartExist == false) {
if (pPhyFillNode->pWStartTs->type != QUERY_NODE_TARGET) { if (pPhyFillNode->pWStartTs->type != QUERY_NODE_TARGET) {
qError("pWStartTs of fill physical node is not a target node"); qError("pWStartTs of fill physical node is not a target node, %s", idStr);
return TSDB_CODE_QRY_SYS_ERROR; return TSDB_CODE_QRY_SYS_ERROR;
} }
SExprInfo* notFillExprs = SExprInfo* pExpr = taosMemoryRealloc(pExprSupp->pExprInfo, (pExprSupp->numOfExprs + 1) * sizeof(SExprInfo));
taosMemoryRealloc(pInfo->pNotFillExprInfo, (pInfo->numOfNotFillExpr + 1) * sizeof(SExprInfo)); if (pExpr == NULL) {
if (notFillExprs == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
createExprFromTargetNode(notFillExprs + pInfo->numOfNotFillExpr, (STargetNode*)pPhyFillNode->pWStartTs); createExprFromTargetNode(&pExpr[pExprSupp->numOfExprs], (STargetNode*)pPhyFillNode->pWStartTs);
pExprSupp->numOfExprs += 1;
++pInfo->numOfNotFillExpr; pExprSupp->pExprInfo = pExpr;
pInfo->pNotFillExprInfo = notFillExprs;
return TSDB_CODE_SUCCESS;
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -3259,8 +3244,14 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* ...@@ -3259,8 +3244,14 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
SExprInfo* pExprInfo = createExprInfo(pPhyFillNode->pFillExprs, NULL, &pInfo->numOfExpr); SExprInfo* pExprInfo = createExprInfo(pPhyFillNode->pFillExprs, NULL, &pInfo->numOfExpr);
pOperator->exprSupp.pExprInfo = pExprInfo; pOperator->exprSupp.pExprInfo = pExprInfo;
pInfo->pNotFillExprInfo = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &pInfo->numOfNotFillExpr); SExprSupp* pNoFillSupp = &pInfo->noFillExprSupp;
int32_t code = createWStartTsAsNotFillExpr(pInfo, pPhyFillNode); pNoFillSupp->pExprInfo = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &pNoFillSupp->numOfExprs);
int32_t code = createPrimaryTsExprIfNeeded(pInfo, pPhyFillNode, pNoFillSupp, pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
code = initExprSupp(pNoFillSupp, pNoFillSupp->pExprInfo, pNoFillSupp->numOfExprs);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -3289,7 +3280,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* ...@@ -3289,7 +3280,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
pInfo->pColMatchColInfo = extractColMatchInfo(pPhyFillNode->pFillExprs, pPhyFillNode->node.pOutputDataBlockDesc, pInfo->pColMatchColInfo = extractColMatchInfo(pPhyFillNode->pFillExprs, pPhyFillNode->node.pOutputDataBlockDesc,
&numOfOutputCols, COL_MATCH_FROM_SLOT_ID); &numOfOutputCols, COL_MATCH_FROM_SLOT_ID);
code = initFillInfo(pInfo, pExprInfo, pInfo->numOfExpr, pInfo->pNotFillExprInfo, pInfo->numOfNotFillExpr, code = initFillInfo(pInfo, pExprInfo, pInfo->numOfExpr, pNoFillSupp->pExprInfo, pNoFillSupp->numOfExprs,
(SNodeListNode*)pPhyFillNode->pValues, pPhyFillNode->timeRange, pResultInfo->capacity, (SNodeListNode*)pPhyFillNode->pValues, pPhyFillNode->timeRange, pResultInfo->capacity,
pTaskInfo->id.str, pInterval, type, order); pTaskInfo->id.str, pInterval, type, order);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
......
...@@ -360,7 +360,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { ...@@ -360,7 +360,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
} }
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pOperator->exprSupp.pCtx, pBlock, order, scanFlag, true); setInputDataBlock(&pOperator->exprSupp, pBlock, order, scanFlag, true);
// there is an scalar expression that needs to be calculated right before apply the group aggregation. // there is an scalar expression that needs to be calculated right before apply the group aggregation.
if (pInfo->scalarSup.pExprInfo != NULL) { if (pInfo->scalarSup.pExprInfo != NULL) {
......
...@@ -285,7 +285,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { ...@@ -285,7 +285,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
T_LONG_JMP(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);
} }
setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, false); setInputDataBlock(pSup, pBlock, order, scanFlag, false);
blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows); blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
code = projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs, code = projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs,
...@@ -446,7 +446,7 @@ static void doHandleDataBlock(SOperatorInfo* pOperator, SSDataBlock* pBlock, SOp ...@@ -446,7 +446,7 @@ static void doHandleDataBlock(SOperatorInfo* pOperator, SSDataBlock* pBlock, SOp
} }
} }
setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, false); setInputDataBlock(pSup, pBlock, order, scanFlag, false);
blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows); blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
code = projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs, code = projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs,
......
...@@ -272,7 +272,17 @@ static void copyCurrentRowIntoBuf(SFillInfo* pFillInfo, int32_t rowIndex, SArray ...@@ -272,7 +272,17 @@ static void copyCurrentRowIntoBuf(SFillInfo* pFillInfo, int32_t rowIndex, SArray
char* p = colDataGetData(pSrcCol, rowIndex); char* p = colDataGetData(pSrcCol, rowIndex);
saveColData(pRow, i, p, isNull); saveColData(pRow, i, p, isNull);
} else if (type == QUERY_NODE_OPERATOR) { } else if (type == QUERY_NODE_OPERATOR) {
SColumnInfoData* pSrcCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, i); int32_t srcSlotId = GET_DEST_SLOT_ID(&pFillInfo->pFillCol[i]);
SColumnInfoData* pSrcCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, srcSlotId);
bool isNull = colDataIsNull_s(pSrcCol, rowIndex);
char* p = colDataGetData(pSrcCol, rowIndex);
saveColData(pRow, i, p, isNull);
} else if (type == QUERY_NODE_FUNCTION) {
int32_t srcSlotId = GET_DEST_SLOT_ID(&pFillInfo->pFillCol[i]);
SColumnInfoData* pSrcCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, srcSlotId);
bool isNull = colDataIsNull_s(pSrcCol, rowIndex); bool isNull = colDataIsNull_s(pSrcCol, rowIndex);
char* p = colDataGetData(pSrcCol, rowIndex); char* p = colDataGetData(pSrcCol, rowIndex);
...@@ -621,8 +631,8 @@ int64_t taosFillResultDataBlock(SFillInfo* pFillInfo, SSDataBlock* p, int32_t ca ...@@ -621,8 +631,8 @@ int64_t taosFillResultDataBlock(SFillInfo* pFillInfo, SSDataBlock* p, int32_t ca
int64_t getFillInfoStart(struct SFillInfo* pFillInfo) { return pFillInfo->start; } int64_t getFillInfoStart(struct SFillInfo* pFillInfo) { return pFillInfo->start; }
SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfFillExpr, SExprInfo* pNotFillExpr, SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfFillExpr, SExprInfo* pNotFillExpr,
int32_t numOfNotFillExpr, const struct SNodeListNode* pValNode) { int32_t numOfNoFillExpr, const struct SNodeListNode* pValNode) {
SFillColInfo* pFillCol = taosMemoryCalloc(numOfFillExpr + numOfNotFillExpr, sizeof(SFillColInfo)); SFillColInfo* pFillCol = taosMemoryCalloc(numOfFillExpr + numOfNoFillExpr, sizeof(SFillColInfo));
if (pFillCol == NULL) { if (pFillCol == NULL) {
return NULL; return NULL;
} }
...@@ -643,7 +653,7 @@ SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfFillExpr, SExprIn ...@@ -643,7 +653,7 @@ SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfFillExpr, SExprIn
} }
} }
for (int32_t i = 0; i < numOfNotFillExpr; ++i) { for (int32_t i = 0; i < numOfNoFillExpr; ++i) {
SExprInfo* pExprInfo = &pNotFillExpr[i]; SExprInfo* pExprInfo = &pNotFillExpr[i];
pFillCol[i + numOfFillExpr].pExpr = pExprInfo; pFillCol[i + numOfFillExpr].pExpr = pExprInfo;
pFillCol[i + numOfFillExpr].notFillCol = true; pFillCol[i + numOfFillExpr].notFillCol = true;
...@@ -1403,7 +1413,7 @@ static void doApplyStreamScalarCalculation(SOperatorInfo* pOperator, SSDataBlock ...@@ -1403,7 +1413,7 @@ static void doApplyStreamScalarCalculation(SOperatorInfo* pOperator, SSDataBlock
blockDataCleanup(pDstBlock); blockDataCleanup(pDstBlock);
blockDataEnsureCapacity(pDstBlock, pSrcBlock->info.rows); blockDataEnsureCapacity(pDstBlock, pSrcBlock->info.rows);
setInputDataBlock(pOperator, pSup->pCtx, pSrcBlock, TSDB_ORDER_ASC, MAIN_SCAN, false); setInputDataBlock(pSup, pSrcBlock, TSDB_ORDER_ASC, MAIN_SCAN, false);
projectApplyFunctions(pSup->pExprInfo, pDstBlock, pSrcBlock, pSup->pCtx, pSup->numOfExprs, NULL); projectApplyFunctions(pSup->pExprInfo, pDstBlock, pSrcBlock, pSup->pCtx, pSup->numOfExprs, NULL);
pDstBlock->info.groupId = pSrcBlock->info.groupId; pDstBlock->info.groupId = pSrcBlock->info.groupId;
...@@ -1551,8 +1561,8 @@ static SStreamFillSupporter* initStreamFillSup(SStreamFillPhysiNode* pPhyFillNod ...@@ -1551,8 +1561,8 @@ static SStreamFillSupporter* initStreamFillSup(SStreamFillPhysiNode* pPhyFillNod
} }
pFillSup->numOfFillCols = numOfFillCols; pFillSup->numOfFillCols = numOfFillCols;
int32_t numOfNotFillCols = 0; int32_t numOfNotFillCols = 0;
SExprInfo* pNotFillExprInfo = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &numOfNotFillCols); SExprInfo* noFillExprInfo = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &numOfNotFillCols);
pFillSup->pAllColInfo = createFillColInfo(pFillExprInfo, pFillSup->numOfFillCols, pNotFillExprInfo, numOfNotFillCols, pFillSup->pAllColInfo = createFillColInfo(pFillExprInfo, pFillSup->numOfFillCols, noFillExprInfo, numOfNotFillCols,
(const SNodeListNode*)(pPhyFillNode->pValues)); (const SNodeListNode*)(pPhyFillNode->pValues));
pFillSup->type = convertFillType(pPhyFillNode->mode); pFillSup->type = convertFillType(pPhyFillNode->mode);
pFillSup->numOfAllCols = pFillSup->numOfFillCols + numOfNotFillCols; pFillSup->numOfAllCols = pFillSup->numOfFillCols + numOfNotFillCols;
......
...@@ -1107,7 +1107,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { ...@@ -1107,7 +1107,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
} }
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pSup->pCtx, pBlock, pInfo->inputOrder, scanFlag, true); setInputDataBlock(pSup, pBlock, pInfo->inputOrder, scanFlag, true);
blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex); blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex);
hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag); hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag);
...@@ -1237,7 +1237,7 @@ static int32_t openStateWindowAggOptr(SOperatorInfo* pOperator) { ...@@ -1237,7 +1237,7 @@ static int32_t openStateWindowAggOptr(SOperatorInfo* pOperator) {
break; break;
} }
setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, MAIN_SCAN, true); setInputDataBlock(pSup, pBlock, order, MAIN_SCAN, true);
blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId); blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId);
// there is an scalar expression that needs to be calculated right before apply the group aggregation. // there is an scalar expression that needs to be calculated right before apply the group aggregation.
...@@ -2049,7 +2049,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { ...@@ -2049,7 +2049,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
} }
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, MAIN_SCAN, true); setInputDataBlock(pSup, pBlock, order, MAIN_SCAN, true);
blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId); blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId);
doSessionWindowAggImpl(pOperator, pInfo, pBlock); doSessionWindowAggImpl(pOperator, pInfo, pBlock);
...@@ -2400,7 +2400,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { ...@@ -2400,7 +2400,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
} }
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, MAIN_SCAN, true); setInputDataBlock(pSup, pBlock, order, MAIN_SCAN, true);
SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSliceInfo->tsCol.slotId); SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSliceInfo->tsCol.slotId);
for (int32_t i = 0; i < pBlock->info.rows; ++i) { for (int32_t i = 0; i < pBlock->info.rows; ++i) {
...@@ -3365,7 +3365,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -3365,7 +3365,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
SExprSupp* pExprSup = &pInfo->scalarSupp; SExprSupp* pExprSup = &pInfo->scalarSupp;
projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL); projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
} }
setInputDataBlock(pOperator, pSup->pCtx, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true); setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
doStreamIntervalAggImpl(pOperator, pBlock, pBlock->info.groupId, pUpdatedMap); doStreamIntervalAggImpl(pOperator, pBlock, pBlock->info.groupId, pUpdatedMap);
if (IS_FINAL_OP(pInfo)) { if (IS_FINAL_OP(pInfo)) {
int32_t chIndex = getChildIndex(pBlock); int32_t chIndex = getChildIndex(pBlock);
...@@ -3383,7 +3383,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -3383,7 +3383,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
} }
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, chIndex); SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, chIndex);
SStreamIntervalOperatorInfo* pChInfo = pChildOp->info; SStreamIntervalOperatorInfo* pChInfo = pChildOp->info;
setInputDataBlock(pChildOp, pChildOp->exprSupp.pCtx, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true); setInputDataBlock(&pChildOp->exprSupp, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
doStreamIntervalAggImpl(pChildOp, pBlock, pBlock->info.groupId, NULL); doStreamIntervalAggImpl(pChildOp, pBlock, pBlock->info.groupId, NULL);
} }
maxTs = TMAX(maxTs, pBlock->info.window.ekey); maxTs = TMAX(maxTs, pBlock->info.window.ekey);
...@@ -4424,7 +4424,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { ...@@ -4424,7 +4424,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL); projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
} }
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pSup->pCtx, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true); setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
doStreamSessionAggImpl(pOperator, pBlock, pStUpdated, pInfo->pStDeleted, IS_FINAL_OP(pInfo)); doStreamSessionAggImpl(pOperator, pBlock, pStUpdated, pInfo->pStDeleted, IS_FINAL_OP(pInfo));
if (IS_FINAL_OP(pInfo)) { if (IS_FINAL_OP(pInfo)) {
int32_t chIndex = getChildIndex(pBlock); int32_t chIndex = getChildIndex(pBlock);
...@@ -4439,7 +4439,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { ...@@ -4439,7 +4439,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
taosArrayPush(pInfo->pChildren, &pChildOp); taosArrayPush(pInfo->pChildren, &pChildOp);
} }
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, chIndex); SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, chIndex);
setInputDataBlock(pChildOp, pChildOp->exprSupp.pCtx, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true); setInputDataBlock(&pChildOp->exprSupp, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
doStreamSessionAggImpl(pChildOp, pBlock, NULL, NULL, true); doStreamSessionAggImpl(pChildOp, pBlock, NULL, NULL, true);
} }
maxTs = TMAX(maxTs, pBlock->info.window.ekey); maxTs = TMAX(maxTs, pBlock->info.window.ekey);
...@@ -4569,7 +4569,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { ...@@ -4569,7 +4569,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL); projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
} }
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pSup->pCtx, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true); setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
doStreamSessionAggImpl(pOperator, pBlock, pStUpdated, NULL, false); doStreamSessionAggImpl(pOperator, pBlock, pStUpdated, NULL, false);
maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey); maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
} }
...@@ -4973,7 +4973,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { ...@@ -4973,7 +4973,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL); projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
} }
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pSup->pCtx, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true); setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
doStreamStateAggImpl(pOperator, pBlock, pSeUpdated, pInfo->pSeDeleted); doStreamStateAggImpl(pOperator, pBlock, pSeUpdated, pInfo->pSeDeleted);
maxTs = TMAX(maxTs, pBlock->info.window.ekey); maxTs = TMAX(maxTs, pBlock->info.window.ekey);
} }
...@@ -5252,7 +5252,7 @@ static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) { ...@@ -5252,7 +5252,7 @@ static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
} }
getTableScanInfo(pOperator, &pIaInfo->inputOrder, &scanFlag); getTableScanInfo(pOperator, &pIaInfo->inputOrder, &scanFlag);
setInputDataBlock(pOperator, pSup->pCtx, pBlock, pIaInfo->inputOrder, scanFlag, true); setInputDataBlock(pSup, pBlock, pIaInfo->inputOrder, scanFlag, true);
doMergeAlignedIntervalAggImpl(pOperator, &pIaInfo->binfo.resultRowInfo, pBlock, pRes); doMergeAlignedIntervalAggImpl(pOperator, &pIaInfo->binfo.resultRowInfo, pBlock, pRes);
doFilter(pMiaInfo->pCondition, pRes, NULL, NULL); doFilter(pMiaInfo->pCondition, pRes, NULL, NULL);
...@@ -5581,7 +5581,7 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) { ...@@ -5581,7 +5581,7 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
} }
getTableScanInfo(pOperator, &iaInfo->inputOrder, &scanFlag); getTableScanInfo(pOperator, &iaInfo->inputOrder, &scanFlag);
setInputDataBlock(pOperator, pExpSupp->pCtx, pBlock, iaInfo->inputOrder, scanFlag, true); setInputDataBlock(pExpSupp, pBlock, iaInfo->inputOrder, scanFlag, true);
doMergeIntervalAggImpl(pOperator, &iaInfo->binfo.resultRowInfo, pBlock, scanFlag, pRes); doMergeIntervalAggImpl(pOperator, &iaInfo->binfo.resultRowInfo, pBlock, scanFlag, pRes);
if (pRes->info.rows >= pOperator->resultInfo.threshold) { if (pRes->info.rows >= pOperator->resultInfo.threshold) {
...@@ -5760,7 +5760,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { ...@@ -5760,7 +5760,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
// The timewindow that overlaps the timestamps of the input pBlock need to be recalculated and return to the // The timewindow that overlaps the timestamps of the input pBlock need to be recalculated and return to the
// caller. Note that all the time window are not close till now. // caller. Note that all the time window are not close till now.
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pSup->pCtx, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true); setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
if (pInfo->invertible) { if (pInfo->invertible) {
setInverFunction(pSup->pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.type); setInverFunction(pSup->pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.type);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册