diff --git a/source/libs/executor/src/tfill.c b/source/libs/executor/src/tfill.c index 7674b9e479e45303b8c0c46478c96dd15693cedb..a736b6e13fec311e1e720f92ff27bb111555ef83 100644 --- a/source/libs/executor/src/tfill.c +++ b/source/libs/executor/src/tfill.c @@ -1282,7 +1282,9 @@ static void doStreamFillImpl(SOperatorInfo* pOperator) { blockDataCleanup(pInfo->pSrcBlock); } -static void buildDeleteRange(TSKEY start, TSKEY end, uint64_t groupId, SSDataBlock* delRes) { +static void buildDeleteRange(SOperatorInfo* pOp, TSKEY start, TSKEY end, uint64_t groupId, SSDataBlock* delRes) { + SStreamState* pState = pOp->pTaskInfo->streamInfo.pState; + SSDataBlock* pBlock = delRes; SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); SColumnInfoData* pEndCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX); @@ -1290,25 +1292,42 @@ static void buildDeleteRange(TSKEY start, TSKEY end, uint64_t groupId, SSDataBlo SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); SColumnInfoData* pCalStartCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX); SColumnInfoData* pCalEndCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX); + SColumnInfoData* pTbNameCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX); colDataAppend(pStartCol, pBlock->info.rows, (const char*)&start, false); colDataAppend(pEndCol, pBlock->info.rows, (const char*)&end, false); colDataAppendNULL(pUidCol, pBlock->info.rows); colDataAppend(pGroupCol, pBlock->info.rows, (const char*)&groupId, false); colDataAppendNULL(pCalStartCol, pBlock->info.rows); colDataAppendNULL(pCalEndCol, pBlock->info.rows); + + SColumnInfoData* pTableCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX); + + void* tbname = NULL; + streamStateGetParName(pOp->pTaskInfo->streamInfo.pState, groupId, &tbname); + if (tbname == NULL) { + colDataAppendNULL(pTableCol, pBlock->info.rows); + } else { + char parTbName[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN]; + STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName)); + colDataAppend(pTableCol, pBlock->info.rows, (const char*)parTbName, false); + } + tdbFree(tbname); + pBlock->info.rows++; } -static void buildDeleteResult(SStreamFillSupporter* pFillSup, TSKEY startTs, TSKEY endTs, uint64_t groupId, +static void buildDeleteResult(SOperatorInfo* pOperator, TSKEY startTs, TSKEY endTs, uint64_t groupId, SSDataBlock* delRes) { + SStreamFillOperatorInfo* pInfo = pOperator->info; + SStreamFillSupporter* pFillSup = pInfo->pFillSup; if (hasPrevWindow(pFillSup)) { TSKEY start = getNextWindowTs(pFillSup->prev.key, &pFillSup->interval); - buildDeleteRange(start, endTs, groupId, delRes); + buildDeleteRange(pOperator, start, endTs, groupId, delRes); } else if (hasNextWindow(pFillSup)) { TSKEY end = getPrevWindowTs(pFillSup->next.key, &pFillSup->interval); - buildDeleteRange(startTs, end, groupId, delRes); + buildDeleteRange(pOperator, startTs, end, groupId, delRes); } else { - buildDeleteRange(startTs, endTs, groupId, delRes); + buildDeleteRange(pOperator, startTs, endTs, groupId, delRes); } } @@ -1319,7 +1338,7 @@ static void doDeleteFillResultImpl(SOperatorInfo* pOperator, TSKEY startTs, TSKE SWinKey key = {.ts = startTs, .groupId = groupId}; if (!pInfo->pFillInfo->needFill) { streamStateFillDel(pOperator->pTaskInfo->streamInfo.pState, &key); - buildDeleteResult(pInfo->pFillSup, startTs, endTs, groupId, pInfo->pDelRes); + buildDeleteResult(pOperator, startTs, endTs, groupId, pInfo->pDelRes); } else { STimeRange tw = { .skey = startTs, @@ -1578,7 +1597,7 @@ static SStreamFillSupporter* initStreamFillSup(SStreamFillPhysiNode* pPhyFillNod destroyStreamFillSupporter(pFillSup); return NULL; } - + SExprInfo* noFillExpr = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &numOfNotFillCols); code = initExprSupp(&pFillSup->notFillExprSup, noFillExpr, numOfNotFillCols); if (code != TSDB_CODE_SUCCESS) { @@ -1715,9 +1734,9 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi } pInfo->srcRowIndex = 0; - setOperatorInfo(pOperator, "StreamFillOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL, false, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->fpSet = - createOperatorFpSet(operatorDummyOpenFn, doStreamFill, NULL, destroyStreamFillOperatorInfo, NULL); + setOperatorInfo(pOperator, "StreamFillOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL, false, OP_NOT_OPENED, pInfo, + pTaskInfo); + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamFill, NULL, destroyStreamFillOperatorInfo, NULL); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) {