diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 549ce6ae79f2dfe9ac24092628c167b52b05bf06..1712cba0f53ac3e3f5cf69d3ad8e257851fd55f0 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -873,7 +873,7 @@ void getNextIntervalWindow(SInterval* pInterval, STimeWindow* tw, int32_t ord int32_t qAppendTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo* pInfo); int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int32_t pos, int32_t order, int64_t* pData); -void appendCreateTableRow(SStreamState* pState, SExprSupp* pTableSup, SExprSupp* pTagSup, int64_t groupId, +void appendCreateTableRow(SStreamState* pState, SExprSupp* pTableSup, SExprSupp* pTagSup, uint64_t groupId, SSDataBlock* pSrcBlock, int32_t rowId, SSDataBlock* pDestBlock); SSDataBlock* buildCreateTableBlock(SExprSupp* tbName, SExprSupp* tag); diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c index 41e4c990f851a3cbcaf3a5982e16dc024caf0eab..f30fe30e351767280bb0e254b20d6f7cd140f273 100644 --- a/source/libs/executor/src/filloperator.c +++ b/source/libs/executor/src/filloperator.c @@ -1259,9 +1259,11 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) { memcpy(pInfo->pSrcBlock->info.parTbName, pBlock->info.parTbName, TSDB_TABLE_NAME_LEN); pInfo->srcRowIndex = 0; } break; + case STREAM_CREATE_CHILD_TABLE: { + return pBlock; + } break; default: - ASSERT(0); - break; + ASSERTS(pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type"); } } diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index a2c637244bfe3159d45782e3c82c9ac467f1c87d..9fd8f7d3a24e79aba16b266ba4bfa3d681d1cf61 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -985,11 +985,13 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) { return pDest; } -void appendCreateTableRow(SStreamState* pState, SExprSupp* pTableSup, SExprSupp* pTagSup, int64_t groupId, +void appendCreateTableRow(SStreamState* pState, SExprSupp* pTableSup, SExprSupp* pTagSup, uint64_t groupId, SSDataBlock* pSrcBlock, int32_t rowId, SSDataBlock* pDestBlock) { void* pValue = NULL; if (streamStateGetParName(pState, groupId, &pValue) != 0) { SSDataBlock* pTmpBlock = blockCopyOneRow(pSrcBlock, rowId); + memset(pTmpBlock->info.parTbName, 0, TSDB_TABLE_NAME_LEN); + pTmpBlock->info.id.groupId = groupId; if (pTableSup->numOfExprs > 0) { projectApplyFunctions(pTableSup->pExprInfo, pDestBlock, pTmpBlock, pTableSup->pCtx, pTableSup->numOfExprs, NULL); SColumnInfoData* pTbCol = taosArrayGet(pDestBlock->pDataBlock, UD_TABLE_NAME_COLUMN_INDEX); @@ -999,6 +1001,7 @@ void appendCreateTableRow(SStreamState* pState, SExprSupp* pTableSup, SExprSupp* int32_t len = TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN - 1); memcpy(tbName, varDataVal(pData), len); streamStatePutParName(pState, groupId, tbName); + memcpy(pTmpBlock->info.parTbName, tbName, len); pDestBlock->info.rows--; } else { void* pTbNameCol = taosArrayGet(pDestBlock->pDataBlock, UD_TABLE_NAME_COLUMN_INDEX); @@ -1013,7 +1016,6 @@ void appendCreateTableRow(SStreamState* pState, SExprSupp* pTableSup, SExprSupp* void* pGpIdCol = taosArrayGet(pDestBlock->pDataBlock, UD_GROUPID_COLUMN_INDEX); colDataAppend(pGpIdCol, pDestBlock->info.rows, (const char*)&groupId, false); - pDestBlock->info.id.groupId = groupId; pDestBlock->info.rows++; blockDataDestroy(pTmpBlock); }