diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index c70b40cc2fb6531de92454d216724af72ac208fd..055a7caba3844f9d147ccea27a591d82058cc3aa 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1919,9 +1919,10 @@ typedef struct STableMergeScanInfo { int32_t tableStartIndex; int32_t tableEndIndex; bool hasGroupId; + uint64_t groupId; - SArray* dataReaders; // array of tsdbReaderT* - SReadHandle readHandle; + SArray* dataReaders; // array of tsdbReaderT* + SReadHandle readHandle; int32_t bufPageSize; uint32_t sortBufSize; // max buffer size for in-memory sort @@ -2218,6 +2219,18 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { STableMergeScanInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + { + size_t tableListSize = taosArrayGetSize(pInfo->tableListInfo->pTableList); + int32_t i = pInfo->tableStartIndex + 1; + for (; i < tableListSize; ++i) { + STableKeyInfo* tableKeyInfo = taosArrayGet(pInfo->tableListInfo->pTableList, i); + if (tableKeyInfo->groupId != pInfo->groupId) { + break; + } + } + pInfo->tableEndIndex = i - 1; + } + int32_t tableStartIdx = pInfo->tableStartIndex; int32_t tableEndIdx = pInfo->tableEndIndex; @@ -2314,20 +2327,38 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { if (code != TSDB_CODE_SUCCESS) { longjmp(pTaskInfo->env, code); } + size_t tableListSize = taosArrayGetSize(pInfo->tableListInfo->pTableList); if (!pInfo->hasGroupId) { pInfo->hasGroupId = true; + + if (tableListSize == 0) { + doSetOperatorCompleted(pOperator); + return NULL; + } pInfo->tableStartIndex = 0; - pInfo->tableEndIndex = taosArrayGetSize(pInfo->tableListInfo->pTableList) - 1; + pInfo->groupId = ((STableKeyInfo*)taosArrayGet(pInfo->tableListInfo->pTableList, pInfo->tableStartIndex))->groupId; startGroupTableMergeScan(pOperator); } - SSDataBlock* pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pOperator->resultInfo.capacity, pOperator); - - if (pBlock != NULL) { - pOperator->resultInfo.totalRows += pBlock->info.rows; - } else { - stopGroupTableMergeScan(pOperator); - doSetOperatorCompleted(pOperator); + SSDataBlock* pBlock = NULL; + while (pInfo->tableStartIndex < tableListSize) { + pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pOperator->resultInfo.capacity, pOperator); + if (pBlock != NULL) { + pBlock->info.groupId = pInfo->groupId; + pOperator->resultInfo.totalRows += pBlock->info.rows; + return pBlock; + } else { + stopGroupTableMergeScan(pOperator); + if (pInfo->tableEndIndex >= tableListSize - 1) { + doSetOperatorCompleted(pOperator); + break; + } + pInfo->tableStartIndex = pInfo->tableEndIndex + 1; + pInfo->groupId = + ((STableKeyInfo*)taosArrayGet(pInfo->tableListInfo->pTableList, pInfo->tableStartIndex))->groupId; + startGroupTableMergeScan(pOperator); + } } + return pBlock; } @@ -2335,7 +2366,6 @@ void destroyTableMergeScanOperatorInfo(void* param, int32_t numOfOutput) { STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param; cleanupQueryTableDataCond(&pTableScanInfo->cond); - if (pTableScanInfo->pColMatchInfo != NULL) { taosArrayDestroy(pTableScanInfo->pColMatchInfo); } @@ -2424,8 +2454,8 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN initResultSizeInfo(pOperator, 1024); pOperator->fpSet = - createOperatorFpSet(operatorDummyOpenFn, doTableMergeScan, NULL, NULL, destroyTableMergeScanOperatorInfo, - NULL, NULL, getTableMergeScanExplainExecInfo); + createOperatorFpSet(operatorDummyOpenFn, doTableMergeScan, NULL, NULL, destroyTableMergeScanOperatorInfo, NULL, + NULL, getTableMergeScanExplainExecInfo); pOperator->cost.openCost = 0; return pOperator;