diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 3109bc49a2d564a126f2694165ce6aec18858f2a..8f7808cf0269c7e0ca9057484457a7da65410799 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -340,6 +340,7 @@ typedef struct SSortExecInfo { #define GROUPID_COLUMN_INDEX 3 #define CALCULATE_START_TS_COLUMN_INDEX 4 #define CALCULATE_END_TS_COLUMN_INDEX 5 +#define TABLE_NAME_COLUMN_INDEX 6 #ifdef __cplusplus } diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index c525759ac7ee5a6f5fcdbfa9149817ed39b9c23f..fb99ba536128dbcce6b72c66041e1057a287bd93 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1316,8 +1316,8 @@ SSDataBlock* createSpecialDataBlock(EStreamType type) { pBlock->info.groupId = 0; pBlock->info.rows = 0; pBlock->info.type = type; - pBlock->info.rowSize = - sizeof(TSKEY) + sizeof(TSKEY) + sizeof(uint64_t) + sizeof(uint64_t) + sizeof(TSKEY) + sizeof(TSKEY); + pBlock->info.rowSize = sizeof(TSKEY) + sizeof(TSKEY) + sizeof(uint64_t) + sizeof(uint64_t) + sizeof(TSKEY) + + sizeof(TSKEY) + TSDB_TABLE_NAME_LEN; pBlock->info.watermark = INT64_MIN; pBlock->pDataBlock = taosArrayInit(6, sizeof(SColumnInfoData)); @@ -1343,6 +1343,11 @@ SSDataBlock* createSpecialDataBlock(EStreamType type) { // calculate end ts taosArrayPush(pBlock->pDataBlock, &infoData); + // table name + infoData.info.type = TSDB_DATA_TYPE_VARCHAR; + infoData.info.bytes = TSDB_TABLE_NAME_LEN; + taosArrayPush(pBlock->pDataBlock, &infoData); + return pBlock; } diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 6e7428b6628ccc636208926d2294f359e3f1e296..7230b6232f0c6648ec5f1bb2ca0fdc593aeb6f93 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -23,10 +23,19 @@ int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl int32_t totRow = pDataBlock->info.rows; SColumnInfoData* pTsCol = taosArrayGet(pDataBlock->pDataBlock, START_TS_COLUMN_INDEX); SColumnInfoData* pGidCol = taosArrayGet(pDataBlock->pDataBlock, GROUPID_COLUMN_INDEX); + SColumnInfoData* pTbNameCol = taosArrayGet(pDataBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX); + for (int32_t row = 0; row < totRow; row++) { int64_t ts = *(int64_t*)colDataGetData(pTsCol, row); int64_t groupId = *(int64_t*)colDataGetData(pGidCol, row); - char* name = buildCtbNameByGroupId(stbFullName, groupId); + char* name; + void* varTbName = colDataGetVarData(pTbNameCol, row); + if (varTbName != NULL && varTbName != (void*)-1) { + name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN); + memcpy(name, varDataVal(varTbName), varDataLen(varTbName)); + } else { + name = buildCtbNameByGroupId(stbFullName, groupId); + } tqDebug("stream delete msg: groupId :%ld, name: %s", groupId, name); SMetaReader mr = {0}; metaReaderInit(&mr, pVnode->pMeta, 0); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 4b0ce0e545b4eaa0cab6cf1ac45cf2463d6111cc..de4e4dd1f84b6e56c896468b9d6cf6c97c592170 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -1067,7 +1067,7 @@ bool isOverdue(TSKEY ts, STimeWindowAggSupp* pSup); bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup); bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup); bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, SStreamState* pState, STimeWindowAggSupp* pTwSup); -void appendOneRow(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid, uint64_t* pGp); +void appendOneRowToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid, uint64_t* pGp, void* pTbName); void printDataBlock(SSDataBlock* pBlock, const char* flag); uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId); diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 44dc362e2244582d28230388840c1757f310de69..f3ae652987eb1ab133dbe7a211c20a7e36f3b48d 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -885,8 +885,8 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) { ASSERT(taosArrayGetSize(pResBlock->pDataBlock) == 1); SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, 0); ASSERT(pCol->info.type == TSDB_DATA_TYPE_VARCHAR); - void* pData = colDataGetData(pCol, 0); - // TODO check tbname validation + void* pData = colDataGetVarData(pCol, 0); + // TODO check tbname validity if (pData != (void*)-1) { memcpy(pDest->info.parTbName, varDataVal(pData), varDataLen(pData)); } else { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 670b2a2f5ea6146725f0efc9360fa2d52cf8cf19..b055ba1ce53d56a5a4d020d6381635c6764d8094 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1300,19 +1300,22 @@ static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, return code; } -void appendOneRow(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid, uint64_t* pGp) { +void appendOneRowToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid, + uint64_t* pGp, void* pTbName) { SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX); SColumnInfoData* pUidCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX); SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); SColumnInfoData* pCalStartCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX); SColumnInfoData* pCalEndCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX); + SColumnInfoData* pTableCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX); colDataAppend(pStartTsCol, pBlock->info.rows, (const char*)pStartTs, false); colDataAppend(pEndTsCol, pBlock->info.rows, (const char*)pEndTs, false); colDataAppend(pUidCol, pBlock->info.rows, (const char*)pUid, false); colDataAppend(pGpCol, pBlock->info.rows, (const char*)pGp, false); colDataAppend(pCalStartCol, pBlock->info.rows, (const char*)pStartTs, false); colDataAppend(pCalEndCol, pBlock->info.rows, (const char*)pEndTs, false); + colDataAppend(pTableCol, pBlock->info.rows, (const char*)pTbName, pTbName == NULL); pBlock->info.rows++; } @@ -1342,10 +1345,12 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock if ((update || closedWin) && out) { qDebug("stream update check not pass, update %d, closedWin %d", update, closedWin); uint64_t gpId = 0; - appendOneRow(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.uid, &gpId); + appendOneRowToStreamSpecialBlock(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.uid, &gpId, + NULL); if (closedWin && pInfo->partitionSup.needCalc) { gpId = calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pBlock, rowId); - appendOneRow(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.uid, &gpId); + appendOneRowToStreamSpecialBlock(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.uid, &gpId, + NULL); } } } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index b0610c71e11e676f9ecbba8d355f54df3c551b6b..69e1b66ecf80988eec1e0657357beacf6474252b 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1687,7 +1687,8 @@ static void freeAllPages(SArray* pageIds, SDiskbasedBuf* pDiskBuf) { taosArrayClear(pageIds); } -static void doBuildDeleteResult(SArray* pWins, int32_t* index, SSDataBlock* pBlock) { +static void doBuildDeleteResult(SStreamIntervalOperatorInfo* pInfo, SArray* pWins, int32_t* index, + SSDataBlock* pBlock) { blockDataCleanup(pBlock); int32_t size = taosArrayGetSize(pWins); if (*index == size) { @@ -1699,7 +1700,14 @@ static void doBuildDeleteResult(SArray* pWins, int32_t* index, SSDataBlock* pBlo uint64_t uid = 0; for (int32_t i = *index; i < size; i++) { SWinKey* pWin = taosArrayGet(pWins, i); - appendOneRow(pBlock, &pWin->ts, &pWin->ts, &uid, &pWin->groupId); + char* tbname = taosHashGet(pInfo->pGroupIdTbNameMap, &pWin->groupId, sizeof(int64_t)); + if (tbname == NULL) { + appendOneRowToStreamSpecialBlock(pBlock, &pWin->ts, &pWin->ts, &uid, &pWin->groupId, NULL); + } else { + char parTbName[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN]; + STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName)); + appendOneRowToStreamSpecialBlock(pBlock, &pWin->ts, &pWin->ts, &uid, &pWin->groupId, parTbName); + } (*index)++; } } @@ -3239,7 +3247,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { return pInfo->pPullDataRes; } - doBuildDeleteResult(pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes); + doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes); if (pInfo->pDelRes->info.rows != 0) { // process the rest of the data printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi"); @@ -3265,7 +3273,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { return NULL; } else { if (!IS_FINAL_OP(pInfo)) { - doBuildDeleteResult(pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes); + doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes); if (pInfo->pDelRes->info.rows != 0) { // process the rest of the data printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi"); @@ -3392,7 +3400,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { return pInfo->pPullDataRes; } - doBuildDeleteResult(pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes); + doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes); if (pInfo->pDelRes->info.rows != 0) { // process the rest of the data printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi"); @@ -4849,7 +4857,8 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl i, &allEqual, pStDeleted); if (!allEqual) { uint64_t uid = 0; - appendOneRow(pAggSup->pScanBlock, &pCurWin->winInfo.win.skey, &pCurWin->winInfo.win.ekey, &uid, &groupId); + appendOneRowToStreamSpecialBlock(pAggSup->pScanBlock, &pCurWin->winInfo.win.skey, &pCurWin->winInfo.win.ekey, + &uid, &groupId, NULL); taosHashRemove(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition)); deleteWindow(pAggSup->pCurWins, winIndex, destroyStateWinInfo); continue; @@ -5645,7 +5654,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { } if (pOperator->status == OP_RES_TO_RETURN) { - doBuildDeleteResult(pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes); + doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes); if (pInfo->pDelRes->info.rows > 0) { printDataBlock(pInfo->pDelRes, "single interval delete"); return pInfo->pDelRes; @@ -5729,7 +5738,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); taosHashCleanup(pUpdatedMap); - doBuildDeleteResult(pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes); + doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes); if (pInfo->pDelRes->info.rows > 0) { printDataBlock(pInfo->pDelRes, "single interval delete"); return pInfo->pDelRes;