diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index ecb04bf5bc3cc2afbd9eb3db9309a09c94ed0df7..4b0ce0e545b4eaa0cab6cf1ac45cf2463d6111cc 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -478,6 +478,7 @@ typedef struct SStreamScanInfo { uint64_t tableUid; // queried super table uid SExprInfo* pPseudoExpr; int32_t numOfPseudoExpr; + SExprSupp tbnameCalSup; int32_t primaryTsIndex; // primary time stamp slot id SReadHandle readHandle; SInterval interval; // if the upstream is an interval operator, the interval info is also kept here. diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 44cd665052f22e0d855b2cd1ea410602de4b97b3..2f191db74db0a3a509a021101d929ea8f8e9695d 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4249,6 +4249,8 @@ int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pStat char* tbname = taosHashGet(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t)); if (tbname != NULL) { memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN); + } else { + pBlock->info.parTbName[0] = 0; } } else { // current value belongs to different group, it can't be packed into one datablock diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 5e7afa1303b408c4e640b2c92ecb665ef67aba93..44dc362e2244582d28230388840c1757f310de69 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -873,7 +873,7 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) { colDataAppend(pDestCol, pDest->info.rows, pSrcData, isNull); } pDest->info.rows++; - if (i == 0) { + if (pInfo->tbnameCalSup.numOfExprs > 0 && i == 0) { SSDataBlock* pTmpBlock = blockCopyOneRow(pSrc, rowIndex); SSDataBlock* pResBlock = createDataBlock(); pResBlock->info.rowSize = TSDB_TABLE_NAME_LEN; @@ -892,6 +892,9 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) { } else { pDest->info.parTbName[0] = 0; } + /*printf("\n\n set name %s\n\n", pDest->info.parTbName);*/ + blockDataDestroy(pTmpBlock); + blockDataDestroy(pResBlock); } } blockDataUpdateTsWindow(pDest, pInfo->tsColIndex); @@ -915,7 +918,6 @@ static void doStreamHashPartitionImpl(SStreamPartitionOperatorInfo* pInfo, SSDat } else { SPartitionDataInfo newParData = {0}; newParData.groupId = calcGroupId(pInfo->partitionSup.keyBuf, keyLen); - /*newParData.tbname = */ newParData.rowIds = taosArrayInit(64, sizeof(int32_t)); taosArrayPush(newParData.rowIds, &i); taosHashPut(pInfo->pPartitions, pInfo->partitionSup.keyBuf, keyLen, &newParData, sizeof(SPartitionDataInfo)); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index e43a6011de1f5af4a55ccb58338e1e87e7ae02b9..670b2a2f5ea6146725f0efc9360fa2d52cf8cf19 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1415,6 +1415,30 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock } blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex); blockDataFreeRes((SSDataBlock*)pBlock); + + if (pInfo->tbnameCalSup.numOfExprs > 0 && pInfo->pRes->info.rows > 0) { + SSDataBlock* pTmpBlock = blockCopyOneRow(pInfo->pRes, 0); + SSDataBlock* pResBlock = createDataBlock(); + pResBlock->info.rowSize = TSDB_TABLE_NAME_LEN; + SColumnInfoData data = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, TSDB_TABLE_NAME_LEN, 0); + taosArrayPush(pResBlock->pDataBlock, &data); + blockDataEnsureCapacity(pResBlock, 1); + projectApplyFunctions(pInfo->tbnameCalSup.pExprInfo, pResBlock, pTmpBlock, pInfo->tbnameCalSup.pCtx, 1, NULL); + ASSERT(pResBlock->info.rows == 1); + ASSERT(taosArrayGetSize(pResBlock->pDataBlock) == 1); + SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, 0); + ASSERT(pCol->info.type == TSDB_DATA_TYPE_VARCHAR); + void* pData = colDataGetData(pCol, 0); + // TODO check tbname validation + if (pData != (void*)-1) { + memcpy(pInfo->pRes->info.parTbName, varDataVal(pData), varDataLen(pData)); + } else { + pInfo->pRes->info.parTbName[0] = 0; + } + blockDataDestroy(pTmpBlock); + blockDataDestroy(pResBlock); + } + return 0; } @@ -2089,6 +2113,19 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys } } + if (pTableScanNode->pSubtable != NULL) { + SExprInfo* pSubTableExpr = taosMemoryCalloc(1, sizeof(SExprInfo)); + if (pSubTableExpr == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _error; + } + pInfo->tbnameCalSup.pExprInfo = pSubTableExpr; + createExprFromOneNode(pSubTableExpr, pTableScanNode->pSubtable, 0); + if (initExprSupp(&pInfo->tbnameCalSup, pSubTableExpr, 1) != 0) { + goto _error; + } + } + pInfo->pBlockLists = taosArrayInit(4, POINTER_BYTES); if (pInfo->pBlockLists == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index e0a5bbffa45fac6eff4e24ce2270037883e64552..b0610c71e11e676f9ecbba8d355f54df3c551b6b 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1761,6 +1761,7 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) { nodesDestroyNode((SNode*)pInfo->pPhyNode); colDataDestroy(&pInfo->twAggSup.timeWindowData); cleanupGroupResInfo(&pInfo->groupResInfo); + taosHashCleanup(pInfo->pGroupIdTbNameMap); taosMemoryFreeClear(param); } @@ -3291,6 +3292,11 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { } printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "interval final recv" : "interval semi recv"); + if (pBlock->info.parTbName[0]) { + taosHashPut(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t), &pBlock->info.parTbName, + TSDB_TABLE_NAME_LEN); + } + ASSERT(pBlock->info.type != STREAM_INVERT); if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA) { pInfo->binfo.pRes->info.type = pBlock->info.type; @@ -3501,6 +3507,9 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, pInfo->delKey.ts = INT64_MAX; pInfo->delKey.groupId = 0; + pInfo->pGroupIdTbNameMap = + taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK); + pOperator->operatorType = pPhyNode->type; pOperator->blocking = true; pOperator->status = OP_NOT_OPENED; @@ -5666,6 +5675,12 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { } printDataBlock(pBlock, "single interval recv"); + if (pBlock->info.parTbName[0]) { + taosHashPut(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t), &pBlock->info.parTbName, + TSDB_TABLE_NAME_LEN); + /*printf("\n\n put tbname %s\n\n", pBlock->info.parTbName);*/ + } + if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT || pBlock->info.type == STREAM_CLEAR) { doDeleteWindows(pOperator, &pInfo->interval, pBlock, pInfo->pDelWins, pUpdatedMap); @@ -5807,6 +5822,9 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys pInfo->delKey.ts = INT64_MAX; pInfo->delKey.groupId = 0; + pInfo->pGroupIdTbNameMap = + taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK); + pOperator->name = "StreamIntervalOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL; pOperator->blocking = true;