diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 1207ea83cc4ea057336a5eae0e4fb76c51b663bf..bbfddba4fb1b53d403e5fd3987ce885621f3d882 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -624,6 +624,7 @@ typedef struct SIndefOperatorInfo { typedef struct SFillOperatorInfo { struct SFillInfo* pFillInfo; SSDataBlock* pRes; + SSDataBlock* pFinalRes; int64_t totalInputRows; void** p; SSDataBlock* existNewGroupBlock; @@ -631,6 +632,7 @@ typedef struct SFillOperatorInfo { SNode* pCondition; SArray* pColMatchColInfo; int32_t primaryTsCol; + int32_t primarySrcSlotId; uint64_t curGroupId; // current handled group id SExprInfo* pExprInfo; int32_t numOfExpr; diff --git a/source/libs/executor/inc/tfill.h b/source/libs/executor/inc/tfill.h index 9f3a95aca80aa6f93b06168be3efbe7aa0c036a1..ae3c010ac3045300dd2f4b9d39e530ade0dd33c6 100644 --- a/source/libs/executor/inc/tfill.h +++ b/source/libs/executor/inc/tfill.h @@ -28,9 +28,7 @@ struct SSDataBlock; typedef struct SFillColInfo { SExprInfo *pExpr; -// int16_t flag; // column flag: TAG COLUMN|NORMAL COLUMN bool notFillCol; // denote if this column needs fill operation -// int16_t tagIndex; // index of current tag in SFillTagColInfo array list SVariant fillVal; } SFillColInfo; @@ -38,6 +36,11 @@ typedef struct { SSchema col; char* tagVal; } SFillTagColInfo; + +typedef struct { + int64_t key; + SArray* pRowVal; +} SRowVal; typedef struct SFillInfo { TSKEY start; // start timestamp @@ -53,9 +56,8 @@ typedef struct SFillInfo { int32_t numOfCurrent; // number of filled rows in current results int32_t numOfCols; // number of columns, including the tags columns SInterval interval; - - SArray *prev; - SArray *next; + SRowVal prev; + SRowVal next; SSDataBlock *pSrcBlock; int32_t alloc; // data buffer size in rows diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 2fc113c7d65edcfee0402614bae26635d1ca8e2f..fff0600fe49693dede4d51eadc6276f77cda0e86 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3212,8 +3212,8 @@ static void doHandleRemainBlockForNewGroupImpl(SFillOperatorInfo* pInfo, SResult static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo* pInfo, SResultInfo* pResultInfo, SExecTaskInfo* pTaskInfo) { if (taosFillHasMoreResults(pInfo->pFillInfo)) { - int32_t numOfResultRows = pResultInfo->capacity - pInfo->pRes->info.rows; - taosFillResultDataBlock(pInfo->pFillInfo, pInfo->pRes, numOfResultRows); + int32_t numOfResultRows = pResultInfo->capacity - pInfo->pFinalRes->info.rows; + taosFillResultDataBlock(pInfo->pFillInfo, pInfo->pFinalRes, numOfResultRows); pInfo->pRes->info.groupId = pInfo->curGroupId; return; } @@ -3229,9 +3229,13 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SResultInfo* pResultInfo = &pOperator->resultInfo; - SSDataBlock* pResBlock = pInfo->pRes; + SSDataBlock* pResBlock = pInfo->pFinalRes; blockDataCleanup(pResBlock); + blockDataCleanup(pInfo->pRes); + + int32_t order = TSDB_ORDER_ASC; + int32_t scanFlag = MAIN_SCAN; doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, pTaskInfo); if (pResBlock->info.rows > 0) { @@ -3251,17 +3255,23 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) { taosFillSetStartInfo(pInfo->pFillInfo, 0, pInfo->win.ekey); } else { blockDataUpdateTsWindow(pBlock, pInfo->primaryTsCol); - SExprSupp* pSup = &pOperator->exprSupp; + + getTableScanInfo(pOperator, &order, &scanFlag); + setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, false); projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs, NULL); + pInfo->pRes->info.groupId = pBlock->info.groupId; - if (pInfo->curGroupId == 0 || pInfo->curGroupId == pBlock->info.groupId) { - pInfo->curGroupId = pBlock->info.groupId; // the first data block + SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pInfo->primaryTsCol); + SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, pInfo->primarySrcSlotId); + colDataAssign(pDst, pSrc, pInfo->pRes->info.rows, &pResBlock->info); - pInfo->totalInputRows += pBlock->info.rows; + if (pInfo->curGroupId == 0 || pInfo->curGroupId == pInfo->pRes->info.groupId) { + pInfo->curGroupId = pInfo->pRes->info.groupId; // the first data block + pInfo->totalInputRows += pInfo->pRes->info.rows; - taosFillSetStartInfo(pInfo->pFillInfo, pBlock->info.rows, pBlock->info.window.ekey); - taosFillSetInputDataBlock(pInfo->pFillInfo, pBlock); + taosFillSetStartInfo(pInfo->pFillInfo, pInfo->pRes->info.rows, pBlock->info.window.ekey); + taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->pRes); } else if (pInfo->curGroupId != pBlock->info.groupId) { // the new group data block pInfo->existNewGroupBlock = pBlock; @@ -3649,7 +3659,8 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* blockDataEnsureCapacity(pResBlock, pOperator->resultInfo.capacity); initExprSupp(&pOperator->exprSupp, pExprInfo, pInfo->numOfExpr); - pInfo->primaryTsCol = ((SColumnNode*)pPhyFillNode->pWStartTs)->slotId; + pInfo->primaryTsCol = ((STargetNode*)pPhyFillNode->pWStartTs)->slotId; + pInfo->primarySrcSlotId = ((SColumnNode*)((STargetNode*)pPhyFillNode->pWStartTs)->pExpr)->slotId; int32_t numOfOutputCols = 0; SArray* pColMatchColInfo = extractColMatchInfo(pPhyFillNode->pFillExprs, pPhyFillNode->node.pOutputDataBlockDesc, @@ -3663,6 +3674,9 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* } pInfo->pRes = pResBlock; + pInfo->pFinalRes = createOneDataBlock(pResBlock, false); + blockDataEnsureCapacity(pInfo->pFinalRes, pOperator->resultInfo.capacity); + pInfo->pCondition = pPhyFillNode->node.pConditions; pInfo->pColMatchColInfo = pColMatchColInfo; pOperator->name = "FillOperator"; diff --git a/source/libs/executor/src/tfill.c b/source/libs/executor/src/tfill.c index 4a885fb2ce5a357993ce1a38d631d60ff4307bb3..f9897c4253097ec94668b3e4bbeab0c784cbd77a 100644 --- a/source/libs/executor/src/tfill.c +++ b/source/libs/executor/src/tfill.c @@ -65,7 +65,7 @@ static void setNullRow(SSDataBlock* pBlock, SFillInfo* pFillInfo, int32_t rowInd if (pDstColInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP) { colDataAppend(pDstColInfo, rowIndex, (const char*)&pFillInfo->currentKey, false); } else { - SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->prev : pFillInfo->next; + SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->prev.pRowVal : pFillInfo->next.pRowVal; SGroupKeys* pKey = taosArrayGet(p, i); doSetVal(pDstColInfo, rowIndex, pKey); } @@ -105,7 +105,7 @@ static void doFillOneRow(SFillInfo* pFillInfo, SSDataBlock* pBlock, SSDataBlock* // set the other values if (pFillInfo->type == TSDB_FILL_PREV) { - SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->prev : pFillInfo->next; + SArray* p = FILL_IS_ASC_FILL(pFillInfo)? pFillInfo->prev.pRowVal : pFillInfo->next.pRowVal; for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) { SFillColInfo* pCol = &pFillInfo->pFillCol[i]; @@ -120,7 +120,7 @@ static void doFillOneRow(SFillInfo* pFillInfo, SSDataBlock* pBlock, SSDataBlock* } } } else if (pFillInfo->type == TSDB_FILL_NEXT) { - SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->next : pFillInfo->prev; + SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->next.pRowVal : pFillInfo->prev.pRowVal; // todo refactor: start from 0 not 1 for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) { SFillColInfo* pCol = &pFillInfo->pFillCol[i]; @@ -149,21 +149,21 @@ static void doFillOneRow(SFillInfo* pFillInfo, SSDataBlock* pBlock, SSDataBlock* if (type == TSDB_DATA_TYPE_TIMESTAMP) { colDataAppend(pDstCol, index, (const char*)&pFillInfo->currentKey, false); } else { - SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->prev : pFillInfo->next; + SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->prev.pRowVal : pFillInfo->next.pRowVal; SGroupKeys* pKey = taosArrayGet(p, i); doSetVal(pDstCol, index, pKey); } } else { - SGroupKeys* pKey = taosArrayGet(pFillInfo->prev, i); + SGroupKeys* pKey = taosArrayGet(pFillInfo->prev.pRowVal, i); if (IS_VAR_DATA_TYPE(type) || type == TSDB_DATA_TYPE_BOOL || pKey->isNull) { colDataAppendNULL(pDstCol, index); continue; } - SGroupKeys* pKey1 = taosArrayGet(pFillInfo->prev, pFillInfo->tsSlotId); + SGroupKeys* pKey1 = taosArrayGet(pFillInfo->prev.pRowVal, pFillInfo->tsSlotId); int64_t prevTs = *(int64_t*)pKey1->pData; - int32_t srcSlotId = GET_SRC_SLOT_ID(pCol); + int32_t srcSlotId = GET_DEST_SLOT_ID(pCol); SColumnInfoData* pSrcCol = taosArrayGet(pSrcBlock->pDataBlock, srcSlotId); char* data = colDataGetData(pSrcCol, pFillInfo->index); @@ -192,7 +192,7 @@ static void doFillOneRow(SFillInfo* pFillInfo, SSDataBlock* pBlock, SSDataBlock* if (pDst->info.type == TSDB_DATA_TYPE_TIMESTAMP) { colDataAppend(pDst, index, (const char*)&pFillInfo->currentKey, false); } else { - SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->prev : pFillInfo->next; + SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->prev.pRowVal : pFillInfo->next.pRowVal; SGroupKeys* pKey = taosArrayGet(p, i); doSetVal(pDst, index, pKey); } @@ -220,7 +220,7 @@ void doSetVal(SColumnInfoData* pDstCol, int32_t rowIndex, const SGroupKeys* pKey } static void initBeforeAfterDataBuf(SFillInfo* pFillInfo) { - if (taosArrayGetSize(pFillInfo->next) > 0) { + if (taosArrayGetSize(pFillInfo->next.pRowVal) > 0) { return; } @@ -234,10 +234,10 @@ static void initBeforeAfterDataBuf(SFillInfo* pFillInfo) { key.bytes = pSchema->bytes; key.type = pSchema->type; - taosArrayPush(pFillInfo->next, &key); + taosArrayPush(pFillInfo->next.pRowVal, &key); key.pData = taosMemoryMalloc(pSchema->bytes); - taosArrayPush(pFillInfo->prev, &key); + taosArrayPush(pFillInfo->prev.pRowVal, &key); } } @@ -245,13 +245,24 @@ static void saveColData(SArray* rowBuf, int32_t columnIndex, const char* src, bo static void copyCurrentRowIntoBuf(SFillInfo* pFillInfo, int32_t rowIndex, SArray* pRow) { for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) { - int32_t srcSlotId = GET_SRC_SLOT_ID(&pFillInfo->pFillCol[i]); + int32_t type = pFillInfo->pFillCol[i].pExpr->pExpr->nodeType; + if (type == QUERY_NODE_COLUMN) { + int32_t srcSlotId = GET_DEST_SLOT_ID(&pFillInfo->pFillCol[i]); - SColumnInfoData* pSrcCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, srcSlotId); + SColumnInfoData* pSrcCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, srcSlotId); - bool isNull = colDataIsNull_s(pSrcCol, rowIndex); - char* p = colDataGetData(pSrcCol, rowIndex); - saveColData(pRow, i, p, isNull); + bool isNull = colDataIsNull_s(pSrcCol, rowIndex); + char* p = colDataGetData(pSrcCol, rowIndex); + saveColData(pRow, i, p, isNull); + } else if (type == QUERY_NODE_OPERATOR) { + SColumnInfoData* pSrcCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, i); + + bool isNull = colDataIsNull_s(pSrcCol, rowIndex); + char* p = colDataGetData(pSrcCol, rowIndex); + saveColData(pRow, i, p, isNull); + } else { + ASSERT(0); + } } } @@ -272,7 +283,7 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t // set the next value for interpolation if ((pFillInfo->currentKey < ts && ascFill) || (pFillInfo->currentKey > ts && !ascFill)) { - copyCurrentRowIntoBuf(pFillInfo, pFillInfo->index, pFillInfo->next); + copyCurrentRowIntoBuf(pFillInfo, pFillInfo->index, pFillInfo->next.pRowVal); } if (((pFillInfo->currentKey < ts && ascFill) || (pFillInfo->currentKey > ts && !ascFill)) && @@ -294,39 +305,38 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t if (pFillInfo->type == TSDB_FILL_NEXT && (pFillInfo->index + 1) < pFillInfo->numOfRows) { int32_t nextRowIndex = pFillInfo->index + 1; - copyCurrentRowIntoBuf(pFillInfo, nextRowIndex, pFillInfo->next); + copyCurrentRowIntoBuf(pFillInfo, nextRowIndex, pFillInfo->next.pRowVal); } // copy rows to dst buffer for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) { SFillColInfo* pCol = &pFillInfo->pFillCol[i]; - int32_t srcSlotId = GET_SRC_SLOT_ID(pCol); int32_t dstSlotId = GET_DEST_SLOT_ID(pCol); SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, dstSlotId); - SColumnInfoData* pSrc = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, srcSlotId); + SColumnInfoData* pSrc = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, dstSlotId); char* src = colDataGetData(pSrc, pFillInfo->index); if (!colDataIsNull_s(pSrc, pFillInfo->index)) { colDataAppend(pDst, index, src, false); - saveColData(pFillInfo->prev, i, src, false); + saveColData(pFillInfo->prev.pRowVal, i, src, false); } else { // the value is null if (pDst->info.type == TSDB_DATA_TYPE_TIMESTAMP) { colDataAppend(pDst, index, (const char*)&pFillInfo->currentKey, false); } else { // i > 0 and data is null , do interpolation if (pFillInfo->type == TSDB_FILL_PREV) { - SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->prev : pFillInfo->next; + SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->prev.pRowVal : pFillInfo->next.pRowVal; SGroupKeys* pKey = taosArrayGet(p, i); doSetVal(pDst, index, pKey); } else if (pFillInfo->type == TSDB_FILL_LINEAR) { bool isNull = colDataIsNull_s(pSrc, pFillInfo->index); colDataAppend(pDst, index, src, isNull); - saveColData(pFillInfo->prev, i, src, isNull); // todo: + saveColData(pFillInfo->prev.pRowVal, i, src, isNull); // todo: } else if (pFillInfo->type == TSDB_FILL_NULL) { colDataAppendNULL(pDst, index); } else if (pFillInfo->type == TSDB_FILL_NEXT) { - SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->next : pFillInfo->prev; + SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->next.pRowVal : pFillInfo->prev.pRowVal; SGroupKeys* pKey = taosArrayGet(p, i); doSetVal(pDst, index, pKey); } else { @@ -413,7 +423,7 @@ struct SFillInfo* taosCreateFillInfo(TSKEY skey, int32_t numOfFillCols, int32_t for(int32_t i = 0; i < numOfNotFillCols; ++i) { SFillColInfo* p = &pCol[i + numOfFillCols]; - int32_t srcSlotId = GET_SRC_SLOT_ID(p); + int32_t srcSlotId = GET_DEST_SLOT_ID(p); if (srcSlotId == primaryTsSlotId) { pFillInfo->tsSlotId = i + numOfFillCols; break; @@ -453,8 +463,8 @@ struct SFillInfo* taosCreateFillInfo(TSKEY skey, int32_t numOfFillCols, int32_t pFillInfo->id = id; pFillInfo->interval = *pInterval; - pFillInfo->next = taosArrayInit(pFillInfo->numOfCols, sizeof(SGroupKeys)); - pFillInfo->prev = taosArrayInit(pFillInfo->numOfCols, sizeof(SGroupKeys)); + pFillInfo->next.pRowVal = taosArrayInit(pFillInfo->numOfCols, sizeof(SGroupKeys)); + pFillInfo->prev.pRowVal = taosArrayInit(pFillInfo->numOfCols, sizeof(SGroupKeys)); initBeforeAfterDataBuf(pFillInfo); return pFillInfo; @@ -474,16 +484,16 @@ void* taosDestroyFillInfo(SFillInfo* pFillInfo) { if (pFillInfo == NULL) { return NULL; } - for (int32_t i = 0; i < taosArrayGetSize(pFillInfo->prev); ++i) { - SGroupKeys* pKey = taosArrayGet(pFillInfo->prev, i); + for (int32_t i = 0; i < taosArrayGetSize(pFillInfo->prev.pRowVal); ++i) { + SGroupKeys* pKey = taosArrayGet(pFillInfo->prev.pRowVal, i); taosMemoryFree(pKey->pData); } - taosArrayDestroy(pFillInfo->prev); - for (int32_t i = 0; i < taosArrayGetSize(pFillInfo->next); ++i) { - SGroupKeys* pKey = taosArrayGet(pFillInfo->next, i); + taosArrayDestroy(pFillInfo->prev.pRowVal); + for (int32_t i = 0; i < taosArrayGetSize(pFillInfo->next.pRowVal); ++i) { + SGroupKeys* pKey = taosArrayGet(pFillInfo->next.pRowVal, i); taosMemoryFree(pKey->pData); } - taosArrayDestroy(pFillInfo->next); + taosArrayDestroy(pFillInfo->next.pRowVal); // for (int32_t i = 0; i < pFillInfo->numOfTags; ++i) { // taosMemoryFreeClear(pFillInfo->pTags[i].tagVal); @@ -624,10 +634,6 @@ SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfFillExpr, SExprIn SValueNode* pv = (SValueNode*)nodesListGetNode(pValNode->pNodeList, index); nodesValueNodeToVariant(pv, &pFillCol[i].fillVal); } - -// if (pExprInfo->base.numOfParams > 0) { -// pFillCol[i].flag = pExprInfo->base.pParam[0].pCol->flag; // always be the normal column for table query -// } } for(int32_t i = 0; i < numOfNotFillExpr; ++i) { diff --git a/tests/script/tsim/parser/fill.sim b/tests/script/tsim/parser/fill.sim index 396bdd1e568c819132ebfa05011d973176a19401..24196450a5e10218a7f61b97c34aacab25ec4a1c 100644 --- a/tests/script/tsim/parser/fill.sim +++ b/tests/script/tsim/parser/fill.sim @@ -885,15 +885,15 @@ if $data10 != @20-01-01 01:01:10.000@ then return -1 endi -if $data11 != 1.000000000 then +if $data11 != 99.000000000 then return -1 endi -if $data12 != 1.000000000 then +if $data12 != 91.000000000 then return -1 endi -if $data13 != -87.000000000 then +if $data13 != 90.000000000 then return -1 endi @@ -917,15 +917,15 @@ if $data70 != @20-01-01 01:02:10.000@ then return -1 endi -if $data71 != 1.000000000 then +if $data71 != 99.000000000 then return -1 endi -if $data72 != 1.000000000 then +if $data72 != 91.000000000 then return -1 endi -if $data73 != -87.000000000 then +if $data73 != 90.000000000 then return -1 endi @@ -994,19 +994,19 @@ if $data10 != @20-01-01 01:01:10.000@ then return -1 endi -if $data11 != 1.000000000 then +if $data11 != 99.000000000 then return -1 endi -if $data12 != 1.000000000 then +if $data12 != 91.000000000 then return -1 endi -if $data13 != -87.000000000 then +if $data13 != 90.000000000 then return -1 endi -if $data14 != 86 then +if $data14 != 89 then return -1 endi diff --git a/tests/script/tsim/parser/fill_stb.sim b/tests/script/tsim/parser/fill_stb.sim index 107bac7089dd60dadca1abc51d4956009ffcecf4..c1d568594a466ec6fe3e5a99566983ce44e79ef6 100644 --- a/tests/script/tsim/parser/fill_stb.sim +++ b/tests/script/tsim/parser/fill_stb.sim @@ -111,13 +111,15 @@ endi if $data12 != -2 then return -1 endi -if $data13 != -3.00000 then +if $data13 != -3 then return -1 endi -if $data14 != -4.000000000 then +if $data14 != -4.00000 then + print expect -4.00000, actual: $data14 return -1 endi -if $data15 != -5 then +if $data15 != -5.000000000 then + print expect -5.000000000, actual: $data15 return -1 endi if $data31 != -1 then @@ -126,10 +128,10 @@ endi if $data52 != -2 then return -1 endi -if $data73 != -3.00000 then +if $data73 != -3 then return -1 endi -if $data74 != -4.000000000 then +if $data74 != -4.00000 then return -1 endi