From 1d52dd9d430586df706e78484bdbd741d71877ac Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 26 Mar 2022 15:02:29 +0800 Subject: [PATCH] [td-13039] fix bug. --- source/libs/executor/inc/executorimpl.h | 5 ++- source/libs/executor/src/executorimpl.c | 59 ++++++++++++++++++++++--- 2 files changed, 56 insertions(+), 8 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index c582873315..b6b5cdb727 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -415,6 +415,7 @@ typedef struct STableScanInfo { int32_t* rowCellInfoOffset; SExprInfo* pExpr; SSDataBlock block; + SArray* pColMatchInfo; int32_t numOfOutput; int64_t elapsedTime; int32_t prevGroupId; // previous table group id @@ -646,8 +647,8 @@ typedef struct SDistinctOperatorInfo { } SDistinctOperatorInfo; SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, - int32_t repeatTime, int32_t reverseTime, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfCols, int32_t repeatTime, + int32_t reverseTime, SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index a601b9b0b7..872ce173ad 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -66,6 +66,11 @@ typedef enum SResultTsInterpType { RESULT_ROW_END_INTERP = 2, } SResultTsInterpType; +typedef struct SColMatchInfo { + int32_t colId; + int32_t targetSlotId; +} SColMatchInfo; + #if 0 static UNUSED_FUNC void *u_malloc (size_t __size) { uint32_t v = taosRand(); @@ -2944,12 +2949,21 @@ int32_t loadDataBlock(SExecTaskInfo *pTaskInfo, STableScanInfo* pTableScanInfo, *status = BLK_DATA_ALL_NEEDED; - pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pTsdbReadHandle, NULL); - if (pBlock->pDataBlock == NULL) { + SArray* pCols = tsdbRetrieveDataBlock(pTableScanInfo->pTsdbReadHandle, NULL); + if (pCols == NULL) { return terrno; - } else { - return TSDB_CODE_SUCCESS; } + + int32_t numOfCols = pBlock->info.numOfCols; + for(int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* p = taosArrayGet(pCols, i); + SColMatchInfo* pColMatchInfo = taosArrayGet(pTableScanInfo->pColMatchInfo, i); + ASSERT(pColMatchInfo->colId == p->info.colId); + + taosArraySet(pBlock->pDataBlock, pColMatchInfo->targetSlotId, p); + } + + return TSDB_CODE_SUCCESS; } int32_t loadDataBlockOnDemand(SExecTaskInfo *pTaskInfo, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, uint32_t* status) { @@ -5374,7 +5388,8 @@ SSDataBlock* createResultDataBlock(const SArray* pExprInfo) { return pResBlock; } -SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, int32_t reverseTime, SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, int32_t reverseTime, SArray* pColMatchInfo, + SExecTaskInfo* pTaskInfo) { assert(repeatTime > 0); STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); @@ -5387,12 +5402,19 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, return NULL; } + pInfo->block.pDataBlock = taosArrayInit(numOfOutput, sizeof(SColumnInfoData)); + for(int32_t i = 0; i < numOfOutput; ++i) { + SColumnInfoData idata = {0}; + taosArrayPush(pInfo->block.pDataBlock, &idata); + } + pInfo->pTsdbReadHandle = pTsdbReadHandle; pInfo->times = repeatTime; pInfo->reverseTimes = reverseTime; pInfo->order = order; pInfo->current = 0; pInfo->scanFlag = MAIN_SCAN; + pInfo->pColMatchInfo = pColMatchInfo; pOperator->name = "TableScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN; pOperator->blockingOptr = false; @@ -8497,6 +8519,7 @@ static int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t static SArray* extractTableIdList(const STableGroupInfo* pTableGroupInfo); static SArray* extractScanColumnId(SNodeList* pNodeList); static SArray* extractColumnInfo(SNodeList* pNodeList); +static SArray* extractColMatchInfo(SNodeList* pNodeList); SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo) { if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) { @@ -8505,7 +8528,9 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa size_t numOfCols = LIST_LENGTH(pScanPhyNode->pScanCols); tsdbReaderT pDataReader = doCreateDataReader((STableScanPhysiNode*)pPhyNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId); - return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count, pScanPhyNode->reverse, pTaskInfo); + SArray* pColList = extractColMatchInfo(pScanPhyNode->pScanCols); + + return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count, pScanPhyNode->reverse, pColList, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == nodeType(pPhyNode)) { SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pPhyNode; SSDataBlock* pResBlock = createOutputBuf_rv1(pExchange->node.pOutputDataBlockDesc); @@ -8683,6 +8708,28 @@ SArray* extractColumnInfo(SNodeList* pNodeList) { return pList; } +SArray* extractColMatchInfo(SNodeList* pNodeList) { + size_t numOfCols = LIST_LENGTH(pNodeList); + SArray* pList = taosArrayInit(numOfCols, sizeof(SColMatchInfo)); + if (pList == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + for(int32_t i = 0; i < numOfCols; ++i) { + STargetNode* pNode = (STargetNode*) nodesListGetNode(pNodeList, i); + SColumnNode* pColNode = (SColumnNode*) pNode->pExpr; + + SColMatchInfo c = {0}; + c.colId = pColNode->colId; + c.targetSlotId = pNode->slotId; + + taosArrayPush(pList, &c); + } + + return pList; +} + int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t tableUid, STableGroupInfo* pGroupInfo, uint64_t queryId, uint64_t taskId) { int32_t code = 0; if (tableType == TSDB_SUPER_TABLE) { -- GitLab