diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 04ffe3d900618430bbe3b336d698f666696873b9..5f90c567bead7ef3d25955b919baced1e57e5ff5 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -1867,7 +1867,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* if (minKey == key) { init = true; TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); - int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema); + code = tRowMergerInit(&merge, &fRow, pReader->pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -1881,7 +1881,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* tRowMerge(&merge, &fRow1); } else { init = true; - int32_t code = tRowMergerInit(&merge, &fRow1, pReader->pSchema); + code = tRowMergerInit(&merge, &fRow1, pReader->pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index 1965a8cbf68ece68e889d1a7df9241b461370c24..76a3359b5b15fef1909a95322e6f22bd1786ce29 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -23,6 +23,7 @@ #include "tpagedbuf.h" #include "tsimplehash.h" #include "vnode.h" +#include "executor.h" #define T_LONG_JMP(_obj, _c) \ do { \ @@ -95,27 +96,24 @@ typedef struct SColMatchInfo { int32_t matchType; // determinate the source according to col id or slot id } SColMatchInfo; -// If the numOfOutputGroups is 1, the data blocks that belongs to different groups will be provided randomly -// The numOfOutputGroups is specified by physical plan. and will not be affect by numOfGroups -typedef struct STableListInfo { - bool oneTableForEachGroup; - int32_t numOfOuputGroups; // the data block will be generated one by one - int32_t* groupOffset; // keep the offset value for each group in the tableList - SArray* pTableList; - SHashObj* map; // speedup acquire the tableQueryInfo by table uid - uint64_t suid; -} STableListInfo; - -void destroyTableList(STableListInfo* pTableList); -int32_t getNumOfOutputGroups(const STableListInfo* pTableList); -bool oneTableForEachGroup(const STableListInfo* pTableList); -uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid); -int32_t addTableIntoTableList(STableListInfo* pTableList, uint64_t uid, uint64_t gid); -int32_t getTablesOfGroup(const STableListInfo* pTableList, int32_t ordinalIndex, STableKeyInfo** pKeyInfo, int32_t* num); -uint64_t getTotalTables(const STableListInfo* pTableList); - +typedef struct STableListInfo STableListInfo; struct SqlFunctionCtx; +int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle, + STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond, const char* id); + +STableListInfo* tableListCreate(); +void* tableListDestroy(STableListInfo* pTableListInfo); +void tableListClear(STableListInfo* pTableListInfo); +int32_t tableListGetOutputGroups(const STableListInfo* pTableList); +bool oneTableForEachGroup(const STableListInfo* pTableList); +uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid); +int32_t tableListAddTableInfo(STableListInfo* pTableList, uint64_t uid, uint64_t gid); +int32_t tableListGetGroupList(const STableListInfo* pTableList, int32_t ordinalIndex, STableKeyInfo** pKeyInfo, int32_t* num); +uint64_t tableListGetSize(const STableListInfo* pTableList); +uint64_t tableListGetSuid(const STableListInfo* pTableList); +STableKeyInfo* tableListGetInfo(const STableListInfo* pTableList, int32_t index); + size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput); void initResultRowInfo(SResultRowInfo* pResultRowInfo); void closeResultRow(SResultRow* pResultRow); @@ -128,6 +126,7 @@ static FORCE_INLINE SResultRow* getResultRowByPos(SDiskbasedBuf* pBuf, SResultRo if (forUpdate) { setBufPageDirty(bufPage, true); } + SResultRow* pRow = (SResultRow*)((char*)bufPage + pos->offset); return pRow; } @@ -143,10 +142,7 @@ int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo); SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode); EDealRes doTranslateTagExpr(SNode** pNode, void* pContext); -int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond, - STableListInfo* pListInfo); int32_t getGroupIdFromTagsVal(void* pMeta, uint64_t uid, SNodeList* pGroupNode, char* keyBuf, uint64_t* pGroupId); -int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableListInfo* pTableListInfo); size_t getTableTagsBufLen(const SNodeList* pGroups); SArray* createSortInfo(SNodeList* pNodeList); @@ -169,9 +165,7 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi void cleanupQueryTableDataCond(SQueryTableDataCond* pCond); int32_t convertFillType(int32_t mode); - int32_t resultrowComparAsc(const void* p1, const void* p2); - int32_t isQualifiedTable(STableKeyInfo* info, SNode* pTagCond, void* metaHandle, bool* pQualified); #endif // TDENGINE_QUERYUTIL_H diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 17ab7569339056c9d48e8213a024519e464bec4a..1191b6a485655418b1d674f36a895a238a42acaf 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -184,7 +184,7 @@ typedef struct SExecTaskInfo { int64_t version; // used for stream to record wal version SStreamTaskInfo streamInfo; SSchemaInfo schemaInfo; - STableListInfo tableqinfoList; // this is a table list + STableListInfo* pTableInfoList; // this is a table list const char* sql; // query sql string jmp_buf env; // jump to this position when error happens. EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] @@ -555,7 +555,7 @@ typedef struct SSysTableScanInfo { typedef struct SBlockDistInfo { SSDataBlock* pResBlock; - void* pHandle; + STsdbReader* pHandle; SReadHandle readHandle; uint64_t uid; // table uid } SBlockDistInfo; @@ -970,8 +970,8 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* readHandle, uint64_t uid, - SBlockDistScanPhysiNode* pBlockScanNode, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanPhysiNode* pBlockScanNode, + SExecTaskInfo* pTaskInfo); SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond, SExecTaskInfo* pTaskInfo); @@ -1065,10 +1065,6 @@ uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, S int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SExprSupp* pSup, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo); -int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle, - STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond, - const char* idstr); - SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNode* pSortPhyNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, STableListInfo* pTableListInfo, @@ -1077,7 +1073,6 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex); bool groupbyTbname(SNodeList* pGroupList); -int32_t setGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* group, bool groupSort); void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput); int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup, SGroupResInfo* pGroupResInfo); diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index d37efbe9414ca65f236ffce67a7ede60db1594b5..92e98d3eab205f8bb75fbdc28494e4d06c396193 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -59,22 +59,23 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe goto _error; } - STableListInfo* pTableList = &pTaskInfo->tableqinfoList; + STableListInfo* pTableList = pTaskInfo->pTableInfoList; initResultSizeInfo(&pOperator->resultInfo, 4096); blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); pInfo->pUidList = taosArrayInit(4, sizeof(int64_t)); // partition by tbname, todo opt perf - if (oneTableForEachGroup(pTableList) || (getTotalTables(pTableList) == 1)) { + if (oneTableForEachGroup(pTableList) || (tableListGetSize(pTableList) == 1)) { pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_ALL | (pScanNode->ignoreNull ? CACHESCAN_RETRIEVE_LAST : CACHESCAN_RETRIEVE_LAST_ROW); - STableKeyInfo* pList = taosArrayGet(pTableList->pTableList, 0); - size_t num = taosArrayGetSize(pTableList->pTableList); + STableKeyInfo* pList = tableListGetInfo(pTableList, 0); + size_t num = tableListGetSize(pTableList); + uint64_t suid = tableListGetSuid(pTableList); code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, num, - taosArrayGetSize(pInfo->matchInfo.pList), pTableList->suid, &pInfo->pLastrowReader); + taosArrayGetSize(pInfo->matchInfo.pList), suid, &pInfo->pLastrowReader); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -120,8 +121,10 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { SLastrowScanInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - STableListInfo* pTableList = &pTaskInfo->tableqinfoList; - int32_t size = taosArrayGetSize(pTableList->pTableList); + STableListInfo* pTableList = pTaskInfo->pTableInfoList; + + uint64_t suid = tableListGetSuid(pTableList); + int32_t size = tableListGetSize(pTableList); if (size == 0) { doSetOperatorCompleted(pOperator); return NULL; @@ -184,20 +187,19 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { return NULL; } } else { - size_t totalGroups = getNumOfOutputGroups(pTableList); + size_t totalGroups = tableListGetOutputGroups(pTableList); while (pInfo->currentGroupIndex < totalGroups) { - STableKeyInfo* pList = NULL; int32_t num = 0; - int32_t code = getTablesOfGroup(pTableList, pInfo->currentGroupIndex, &pList, &num); + int32_t code = tableListGetGroupList(pTableList, pInfo->currentGroupIndex, &pList, &num); if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, code); } tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, num, - taosArrayGetSize(pInfo->matchInfo.pList), pTableList->suid, &pInfo->pLastrowReader); + taosArrayGetSize(pInfo->matchInfo.pList), suid, &pInfo->pLastrowReader); taosArrayClear(pInfo->pUidList); code = tsdbRetrieveCacheRows(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pInfo->pUidList); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 0f2e5194c1ec908ecf25d7a8d4e4d534403305ba..bb87b5029ae56020f81c2f41681ba3b112999a36 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -26,9 +26,31 @@ #include "executorimpl.h" #include "tcompression.h" -static int32_t removeInvalidTable(SArray* list, SHashObj* tags); +// If the numOfOutputGroups is 1, the data blocks that belongs to different groups will be provided randomly +// The numOfOutputGroups is specified by physical plan. and will not be affect by numOfGroups +struct STableListInfo { + bool oneTableForEachGroup; + int32_t numOfOuputGroups; // the data block will be generated one by one + int32_t* groupOffset; // keep the offset value for each group in the tableList + SArray* pTableList; + SHashObj* map; // speedup acquire the tableQueryInfo by table uid + uint64_t suid; +}; + +typedef struct tagFilterAssist { + SHashObj* colHash; + int32_t index; + SArray* cInfoList; +} tagFilterAssist; + +static int32_t removeInvalidTable(SArray* uids, SHashObj* tags); static int32_t optimizeTbnameInCond(void* metaHandle, int64_t suid, SArray* list, SNode* pTagCond, SHashObj* tags); static int32_t optimizeTbnameInCondImpl(void* metaHandle, int64_t suid, SArray* list, SNode* pTagCond); +static int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, + SNode* pTagIndexCond, STableListInfo* pListInfo); + +static int64_t getLimit(const SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->limit; } +static int64_t getOffset(const SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->offset; } void initResultRowInfo(SResultRowInfo* pResultRowInfo) { pResultRowInfo->size = 0; @@ -301,12 +323,6 @@ int32_t isQualifiedTable(STableKeyInfo* info, SNode* pTagCond, void* metaHandle, return TSDB_CODE_SUCCESS; } -typedef struct tagFilterAssist { - SHashObj* colHash; - int32_t index; - SArray* cInfoList; -} tagFilterAssist; - static EDealRes getColumn(SNode** pNode, void* pContext) { SColumnNode* pSColumnNode = NULL; if (QUERY_NODE_COLUMN == nodeType((*pNode))) { @@ -482,6 +498,7 @@ static SColumnInfoData* getColInfoResult(void* metaHandle, int64_t suid, SArray* } } } + pResBlock->info.rows = rows; // int64_t st1 = taosGetTimestampUs(); @@ -766,6 +783,7 @@ static int tableUidCompare(const void* a, const void* b) { } return u1 < u2 ? -1 : 1; } + static int32_t optimizeTbnameInCond(void* metaHandle, int64_t suid, SArray* list, SNode* cond, SHashObj* tags) { int32_t ret = -1; if (nodeType(cond) == QUERY_NODE_OPERATOR) { @@ -820,6 +838,7 @@ static int32_t removeInvalidTable(SArray* uids, SHashObj* tags) { taosArrayPush(validUid, uid); } } + taosArraySwap(uids, validUid); taosArrayDestroy(validUid); return 0; @@ -930,11 +949,6 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, STableListInfo* pListInfo) { int32_t code = TSDB_CODE_SUCCESS; - pListInfo->pTableList = taosArrayInit(8, sizeof(STableKeyInfo)); - if (pListInfo->pTableList == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - uint64_t tableUid = pScanNode->uid; pListInfo->suid = pScanNode->suid; SArray* res = taosArrayInit(8, sizeof(uint64_t)); @@ -994,13 +1008,14 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, size_t numOfTables = taosArrayGetSize(res); for (int i = 0; i < numOfTables; i++) { STableKeyInfo info = {.uid = *(uint64_t*)taosArrayGet(res, i), .groupId = 0}; - void* p = taosArrayPush(pListInfo->pTableList, &info); + + void* p = taosArrayPush(pListInfo->pTableList, &info); if (p == NULL) { taosArrayDestroy(res); return TSDB_CODE_OUT_OF_MEMORY; } - qDebug("tagfilter get uid:%" PRId64 "", info.uid); + qDebug("tagfilter get uid:%" PRIu64 "", info.uid); } taosArrayDestroy(res); @@ -1642,9 +1657,6 @@ bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo) { pLimitInfo->slimit.offset != -1); } -static int64_t getLimit(const SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->limit; } -static int64_t getOffset(const SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->offset; } - void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo) { SLimit limit = {.limit = getLimit(pLimit), .offset = getOffset(pLimit)}; SLimit slimit = {.limit = getLimit(pSLimit), .offset = getOffset(pSLimit)}; @@ -1655,11 +1667,23 @@ void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimit pLimitInfo->remainGroupOffset = slimit.offset; } -uint64_t getTotalTables(const STableListInfo* pTableList) { +uint64_t tableListGetSize(const STableListInfo* pTableList) { ASSERT(taosArrayGetSize(pTableList->pTableList) == taosHashGetSize(pTableList->map)); return taosArrayGetSize(pTableList->pTableList); } +uint64_t tableListGetSuid(const STableListInfo* pTableList) { + return pTableList->suid; +} + +STableKeyInfo* tableListGetInfo(const STableListInfo* pTableList, int32_t index) { + if (taosArrayGetSize(pTableList->pTableList) == 0) { + return NULL; + } + + return taosArrayGet(pTableList->pTableList, index); +} + uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid) { int32_t* slot = taosHashGet(pTableList->map, &tableUid, sizeof(tableUid)); ASSERT(pTableList->map != NULL && slot != NULL); @@ -1670,7 +1694,8 @@ uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid) { return pKeyInfo->groupId; } -int32_t addTableIntoTableList(STableListInfo* pTableList, uint64_t uid, uint64_t gid) { +// TODO handle the group offset info, fix it, the rule of group output will be broken by this function +int32_t tableListAddTableInfo(STableListInfo* pTableList, uint64_t uid, uint64_t gid) { if (pTableList->map == NULL) { ASSERT(taosArrayGetSize(pTableList->pTableList) == 0); pTableList->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); @@ -1686,9 +1711,9 @@ int32_t addTableIntoTableList(STableListInfo* pTableList, uint64_t uid, uint64_t return TSDB_CODE_SUCCESS; } -int32_t getTablesOfGroup(const STableListInfo* pTableList, int32_t ordinalGroupIndex, STableKeyInfo** pKeyInfo, +int32_t tableListGetGroupList(const STableListInfo* pTableList, int32_t ordinalGroupIndex, STableKeyInfo** pKeyInfo, int32_t* size) { - int32_t total = getNumOfOutputGroups(pTableList); + int32_t total = tableListGetOutputGroups(pTableList); if (ordinalGroupIndex < 0 || ordinalGroupIndex >= total) { return TSDB_CODE_INVALID_PARA; } @@ -1696,10 +1721,10 @@ int32_t getTablesOfGroup(const STableListInfo* pTableList, int32_t ordinalGroupI // here handle two special cases: // 1. only one group exists, and 2. one table exists for each group. if (total == 1) { - *size = getTotalTables(pTableList); + *size = tableListGetSize(pTableList); *pKeyInfo = (*size == 0)? NULL:taosArrayGet(pTableList->pTableList, 0); return TSDB_CODE_SUCCESS; - } else if (total == getTotalTables(pTableList)) { + } else if (total == tableListGetSize(pTableList)) { *size = 1; *pKeyInfo = taosArrayGet(pTableList->pTableList, ordinalGroupIndex); return TSDB_CODE_SUCCESS; @@ -1716,16 +1741,178 @@ int32_t getTablesOfGroup(const STableListInfo* pTableList, int32_t ordinalGroupI return TSDB_CODE_SUCCESS; } -int32_t getNumOfOutputGroups(const STableListInfo* pTableList) { return pTableList->numOfOuputGroups; } +int32_t tableListGetOutputGroups(const STableListInfo* pTableList) { return pTableList->numOfOuputGroups; } bool oneTableForEachGroup(const STableListInfo* pTableList) { return pTableList->oneTableForEachGroup; } -void destroyTableList(STableListInfo* pTableqinfoList) { - pTableqinfoList->pTableList = taosArrayDestroy(pTableqinfoList->pTableList); - taosMemoryFreeClear(pTableqinfoList->groupOffset); +STableListInfo* tableListCreate() { + STableListInfo* pListInfo = taosMemoryCalloc(1, sizeof(STableListInfo)); + if (pListInfo == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + pListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo)); + if (pListInfo->pTableList == NULL) { + goto _error; + } + + pListInfo->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); + if (pListInfo->map == NULL) { + goto _error; + } + + pListInfo->numOfOuputGroups = 1; + return pListInfo; + +_error: + tableListDestroy(pListInfo); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; +} + +void* tableListDestroy(STableListInfo* pTableListInfo) { + if (pTableListInfo == NULL) { + return NULL; + } + + pTableListInfo->pTableList = taosArrayDestroy(pTableListInfo->pTableList); + taosMemoryFreeClear(pTableListInfo->groupOffset); + + taosHashCleanup(pTableListInfo->map); + + pTableListInfo->pTableList = NULL; + pTableListInfo->map = NULL; + taosMemoryFree(pTableListInfo); + return NULL; +} + +void tableListClear(STableListInfo* pTableListInfo) { + if (pTableListInfo == NULL) { + return; + } + + taosArrayClear(pTableListInfo->pTableList); + taosHashClear(pTableListInfo->map); + taosMemoryFree(pTableListInfo->groupOffset); + pTableListInfo->numOfOuputGroups = 1; + pTableListInfo->oneTableForEachGroup = false; +} + +static int32_t orderbyGroupIdComparFn(const void* p1, const void* p2) { + STableKeyInfo* pInfo1 = (STableKeyInfo*) p1; + STableKeyInfo* pInfo2 = (STableKeyInfo*) p2; + + if (pInfo1->groupId == pInfo2->groupId) { + return 0; + } else { + return pInfo1->groupId < pInfo2->groupId? -1:1; + } +} + +static int32_t sortTableGroup(STableListInfo* pTableListInfo) { + int32_t code = TSDB_CODE_SUCCESS; + + taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn); + int32_t size = taosArrayGetSize(pTableListInfo->pTableList); + + SArray* pList = taosArrayInit(4, sizeof(int32_t)); + + STableKeyInfo* pInfo = taosArrayGet(pTableListInfo->pTableList, 0); + uint64_t gid = pInfo->groupId; + + int32_t start = 0; + taosArrayPush(pList, &start); + + for(int32_t i = 1; i < size; ++i) { + pInfo = taosArrayGet(pTableListInfo->pTableList, i); + if (pInfo->groupId != gid) { + taosArrayPush(pList, &i); + gid = pInfo->groupId; + } + } + + pTableListInfo->numOfOuputGroups = taosArrayGetSize(pList); + pTableListInfo->groupOffset = taosMemoryMalloc(sizeof(int32_t) * pTableListInfo->numOfOuputGroups); + memcpy(pTableListInfo->groupOffset, taosArrayGet(pList, 0), sizeof(int32_t) * pTableListInfo->numOfOuputGroups); + taosArrayDestroy(pList); + return TDB_CODE_SUCCESS; +} + +int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* group, bool groupSort) { + int32_t code = TSDB_CODE_SUCCESS; + ASSERT(pTableListInfo->map != NULL); + + bool groupByTbname = groupbyTbname(group); + size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList); + if (group == NULL || groupByTbname) { + for (int32_t i = 0; i < numOfTables; i++) { + STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i); + info->groupId = groupByTbname? info->uid:0; + } + + pTableListInfo->oneTableForEachGroup = groupByTbname; - taosHashCleanup(pTableqinfoList->map); + if (groupSort && groupByTbname) { + taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn); + pTableListInfo->numOfOuputGroups = numOfTables; + } else { + pTableListInfo->numOfOuputGroups = 1; + } + } else { + code = getColInfoResultForGroupby(pHandle->meta, group, pTableListInfo); + if (code != TSDB_CODE_SUCCESS) { + return code; + } - pTableqinfoList->pTableList = NULL; - pTableqinfoList->map = NULL; -} \ No newline at end of file + if (groupSort) { + code = sortTableGroup(pTableListInfo); + } + } + + // add all table entry in the hash map + size_t size = taosArrayGetSize(pTableListInfo->pTableList); + for(int32_t i = 0; i < size; ++i) { + STableKeyInfo* p = taosArrayGet(pTableListInfo->pTableList, i); + taosHashPut(pTableListInfo->map, &p->uid, sizeof(uint64_t), &i, sizeof(int32_t)); + } + + return code; +} + +int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle, + STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond, + const char* idStr) { + int64_t st = taosGetTimestampUs(); + + if (pHandle == NULL) { + qError("invalid handle, in creating operator tree, %s", idStr); + return TSDB_CODE_INVALID_PARA; + } + + int32_t code = getTableList(pHandle->meta, pHandle->vnode, pScanNode, pTagCond, pTagIndexCond, pTableListInfo); + if (code != TSDB_CODE_SUCCESS) { + qError("failed to getTableList, code: %s", tstrerror(code)); + return code; + } + + ASSERT(pTableListInfo->numOfOuputGroups == 1); + + int64_t st1 = taosGetTimestampUs(); + qDebug("generate queried table list completed, elapsed time:%.2f ms %s", (st1 - st) / 1000.0, idStr); + + if (taosArrayGetSize(pTableListInfo->pTableList) == 0) { + qDebug("no table qualified for query, %s" PRIx64, idStr); + return TSDB_CODE_SUCCESS; + } + + code = buildGroupIdMapForAllTables(pTableListInfo, pHandle, pGroupTags, groupSort); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + int64_t st2 = taosGetTimestampUs(); + qDebug("generate group id map completed, elapsed time:%.2f ms %s", (st2 - st1) / 1000.0, idStr); + + return TSDB_CODE_SUCCESS; +} diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 765968999ac18c834b26c4b077631a306d964ab0..bbd62a991233d648afbac081a4adb6b068c63c0c 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -287,14 +287,11 @@ static SArray* filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const S int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bool isAdd) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - STableListInfo* pListInfo = &pTaskInfo->tableqinfoList; if (isAdd) { qDebug("add %d tables id into query list, %s", (int32_t)taosArrayGetSize(tableIdList), pTaskInfo->id.str); } - - // traverse to the stream scanner node to add this table id SOperatorInfo* pInfo = pTaskInfo->pRoot; while (pInfo->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { @@ -328,7 +325,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo } } - STableListInfo* pTableListInfo = &pTaskInfo->tableqinfoList; + STableListInfo* pTableListInfo = pTaskInfo->pTableInfoList; for (int32_t i = 0; i < numOfQualifiedTables; ++i) { uint64_t* uid = taosArrayGet(qa, i); @@ -361,7 +358,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo if (!exists) { #endif - addTableIntoTableList(pTableListInfo, keyInfo.uid, keyInfo.groupId); + tableListAddTableInfo(pTableListInfo, keyInfo.uid, keyInfo.groupId); } if (keyBuf != NULL) { @@ -925,8 +922,8 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT int64_t ts = pOffset->ts; if (uid == 0) { - if (taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList) != 0) { - STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, 0); + if (tableListGetSize(pTaskInfo->pTableInfoList) != 0) { + STableKeyInfo* pTableInfo = tableListGetInfo(pTaskInfo->pTableInfoList, 0); uid = pTableInfo->uid; ts = INT64_MIN; } else { @@ -937,7 +934,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT /*if (pTaskInfo->streamInfo.lastStatus.type != TMQ_OFFSET__SNAPSHOT_DATA ||*/ /*pTaskInfo->streamInfo.lastStatus.uid != uid || pTaskInfo->streamInfo.lastStatus.ts != ts) {*/ STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; - int32_t numOfTables = getTotalTables(&pTaskInfo->tableqinfoList); + int32_t numOfTables = tableListGetSize(pTaskInfo->pTableInfoList); #ifndef NDEBUG qDebug("switch to next table %" PRId64 " (cursor %d), %" PRId64 " rows returned", uid, @@ -947,7 +944,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT bool found = false; for (int32_t i = 0; i < numOfTables; i++) { - STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, i); + STableKeyInfo* pTableInfo = tableListGetInfo(pTaskInfo->pTableInfoList, i); if (pTableInfo->uid == uid) { found = true; pTableScanInfo->currentTable = i; @@ -959,8 +956,8 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT ASSERT(found); if (pTableScanInfo->dataReader == NULL) { - STableKeyInfo* pList = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, 0); - int32_t num = getTotalTables(&pTaskInfo->tableqinfoList); + STableKeyInfo* pList = tableListGetInfo(pTaskInfo->pTableInfoList, 0); + int32_t num = tableListGetSize(pTaskInfo->pTableInfoList); if (tsdbReaderOpen(pTableScanInfo->readHandle.vnode, &pTableScanInfo->cond, pList, num, &pTableScanInfo->dataReader, NULL) < 0 || pTableScanInfo->dataReader == NULL) { @@ -993,22 +990,28 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT SMetaTableInfo mtInfo = getUidfromSnapShot(sContext); tsdbReaderClose(pInfo->dataReader); pInfo->dataReader = NULL; + cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond); - taosArrayDestroy(pTaskInfo->tableqinfoList.pTableList); - if (mtInfo.uid == 0) return 0; // no data + tableListClear(pTaskInfo->pTableInfoList); + + if (mtInfo.uid == 0) { + return 0; // no data + } initQueryTableDataCondForTmq(&pTaskInfo->streamInfo.tableCond, sContext, &mtInfo); pTaskInfo->streamInfo.tableCond.twindows.skey = pOffset->ts; - STableListInfo* pListInfo = &pTaskInfo->tableqinfoList; + if (pTaskInfo->pTableInfoList == NULL) { + pTaskInfo->pTableInfoList = tableListCreate(); + } - pListInfo->pTableList = taosArrayInit(1, sizeof(STableKeyInfo)); - taosArrayPush(pListInfo->pTableList, &(STableKeyInfo){.uid = mtInfo.uid, .groupId = 0}); + tableListAddTableInfo(pTaskInfo->pTableInfoList, mtInfo.uid, 0); - STableKeyInfo* pList = taosArrayGet(pListInfo->pTableList, 0); + STableKeyInfo* pList = tableListGetInfo(pTaskInfo->pTableInfoList, 0); + int32_t size = tableListGetSize(pTaskInfo->pTableInfoList); + ASSERT(size == 1); - tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, taosArrayGetSize(pListInfo->pTableList), - &pInfo->dataReader, NULL); + tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size, &pInfo->dataReader, NULL); cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond); strcpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 54c92c10bd17d33587a063ed5a978a97b91ce907..11dbfaff9f5552b44ccd543bb54c16bd7e302459 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3273,6 +3273,7 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPT pTaskInfo->cost.created = taosGetTimestampMs(); pTaskInfo->id.queryId = queryId; pTaskInfo->execModel = model; + pTaskInfo->pTableInfoList = tableListCreate(); char* p = taosMemoryCalloc(1, 128); snprintf(p, 128, "TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, queryId); @@ -3364,117 +3365,6 @@ static void cleanupTableSchemaInfo(SSchemaInfo* pSchemaInfo) { static void cleanupStreamInfo(SStreamTaskInfo* pStreamInfo) { tDeleteSSchemaWrapper(pStreamInfo->schema); } -static int32_t orderbyGroupIdComparFn(const void* p1, const void* p2) { - STableKeyInfo* pInfo1 = (STableKeyInfo*) p1; - STableKeyInfo* pInfo2 = (STableKeyInfo*) p2; - - if (pInfo1->groupId == pInfo2->groupId) { - return 0; - } else { - return pInfo1->groupId < pInfo2->groupId? -1:1; - } -} - -static int32_t sortTableGroup(STableListInfo* pTableListInfo) { - int32_t code = TSDB_CODE_SUCCESS; - - taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn); - int32_t size = taosArrayGetSize(pTableListInfo->pTableList); - - SArray* pList = taosArrayInit(4, sizeof(int32_t)); - - STableKeyInfo* pInfo = taosArrayGet(pTableListInfo->pTableList, 0); - uint64_t gid = pInfo->groupId; - - int32_t start = 0; - taosArrayPush(pList, &start); - - for(int32_t i = 1; i < size; ++i) { - pInfo = taosArrayGet(pTableListInfo->pTableList, i); - if (pInfo->groupId != gid) { - taosArrayPush(pList, &i); - gid = pInfo->groupId; - } - } - - pTableListInfo->numOfOuputGroups = taosArrayGetSize(pList); - pTableListInfo->groupOffset = taosMemoryMalloc(sizeof(int32_t) * pTableListInfo->numOfOuputGroups); - memcpy(pTableListInfo->groupOffset, taosArrayGet(pList, 0), sizeof(int32_t) * pTableListInfo->numOfOuputGroups); - taosArrayDestroy(pList); - -# if 0 - SArray* sortSupport = taosArrayInit(16, sizeof(uint64_t)); - if (sortSupport == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - size_t num = taosArrayGetSize(pTableListInfo->pTableList); - for (int32_t i = 0; i < num; i++) { - STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i); - uint64_t* groupId = taosHashGet(pTableListInfo->map, &info->uid, sizeof(uint64_t)); - - int32_t index = taosArraySearchIdx(sortSupport, groupId, compareUint64Val, TD_EQ); - if (index == -1) { - void* p = taosArraySearch(sortSupport, groupId, compareUint64Val, TD_GT); - - SArray* tGroup = taosArrayInit(8, sizeof(STableKeyInfo)); - if (tGroup == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _error; - } - - if (taosArrayPush(tGroup, info) == NULL) { - qError("taos push info array error"); - code = TSDB_CODE_OUT_OF_MEMORY; - goto _error; - } - - if (p == NULL) { - if (taosArrayPush(sortSupport, groupId) == NULL) { - qError("taos push support array error"); - code = TSDB_CODE_OUT_OF_MEMORY; - goto _error; - } - - if (taosArrayPush(pTableListInfo->pGroupList, &tGroup) == NULL) { - qError("taos push group array error"); - code = TSDB_CODE_OUT_OF_MEMORY; - goto _error; - } - } else { - int32_t pos = TARRAY_ELEM_IDX(sortSupport, p); - if (taosArrayInsert(sortSupport, pos, groupId) == NULL) { - qError("taos insert support array error"); - code = TSDB_CODE_OUT_OF_MEMORY; - goto _error; - } - - if (taosArrayInsert(pTableListInfo->pGroupList, pos, &tGroup) == NULL) { - qError("taos insert group array error"); - code = TSDB_CODE_OUT_OF_MEMORY; - goto _error; - } - } - } else { - SArray* tGroup = (SArray*)taosArrayGetP(pTableListInfo->pGroupList, index); - if (taosArrayPush(tGroup, info) == NULL) { - qError("taos push uid array error"); - code = TSDB_CODE_OUT_OF_MEMORY; - goto _error; - } - } - } - - taosArrayDestroy(sortSupport); -#endif - - return TDB_CODE_SUCCESS; - - _error: -// taosArrayDestroy(sortSupport); - return code; -} - bool groupbyTbname(SNodeList* pGroupList) { bool bytbname = false; if (LIST_LENGTH(pGroupList) == 1) { @@ -3488,80 +3378,11 @@ bool groupbyTbname(SNodeList* pGroupList) { return bytbname; } -int32_t setGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* group, bool groupSort) { - int32_t code = TSDB_CODE_SUCCESS; - - pTableListInfo->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); - if (pTableListInfo->map == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - return code; - } - - bool groupByTbname = groupbyTbname(group); - size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList); - if (group == NULL || groupByTbname) { - for (int32_t i = 0; i < numOfTables; i++) { - STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i); - info->groupId = groupByTbname? info->uid:0; - } - - pTableListInfo->oneTableForEachGroup = groupByTbname; - - if (groupSort && groupByTbname) { - taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn); - pTableListInfo->numOfOuputGroups = numOfTables; - } else { - pTableListInfo->numOfOuputGroups = 1; - } - } else { - code = getColInfoResultForGroupby(pHandle->meta, group, pTableListInfo); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - if (groupSort) { - code = sortTableGroup(pTableListInfo); - } - } - - // add all table entry in the hash map - size_t size = taosArrayGetSize(pTableListInfo->pTableList); - for(int32_t i = 0; i < size; ++i) { - STableKeyInfo* p = taosArrayGet(pTableListInfo->pTableList, i); - taosHashPut(pTableListInfo->map, &p->uid, sizeof(uint64_t), &i, sizeof(int32_t)); - } - - return code; -} - -static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pCond) { - memset(pCond, 0, sizeof(SQueryTableDataCond)); - - pCond->order = TSDB_ORDER_ASC; - pCond->numOfCols = 1; - pCond->colList = taosMemoryCalloc(1, sizeof(SColumnInfo)); - if (pCond->colList == NULL) { - terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; - return terrno; - } - - pCond->colList->colId = 1; - pCond->colList->type = TSDB_DATA_TYPE_TIMESTAMP; - pCond->colList->bytes = sizeof(TSKEY); - - pCond->twindows = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX}; - pCond->suid = uid; - pCond->type = TIMEWINDOW_RANGE_CONTAINED; - pCond->startVersion = -1; - pCond->endVersion = -1; - - return TSDB_CODE_SUCCESS; -} - -SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, - STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond, - const char* pUser) { +SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SNode* pTagCond, + SNode* pTagIndexCond, const char* pUser) { int32_t type = nodeType(pPhyNode); + STableListInfo* pTableListInfo = pTaskInfo->pTableInfoList; + const char* idstr = GET_TASKID(pTaskInfo); if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) { SOperatorInfo* pOperator = NULL; @@ -3576,10 +3397,10 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo int32_t code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, pHandle, - pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo)); + pTableListInfo, pTagCond, pTagIndexCond, idstr); if (code) { pTaskInfo->code = code; - qError("failed to createScanTableListInfo, code:%s, %s", tstrerror(code), GET_TASKID(pTaskInfo)); + qError("failed to createScanTableListInfo, code:%s, %s", tstrerror(code), idstr); return NULL; } @@ -3596,7 +3417,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode; int32_t code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, /*pTableScanNode->groupSort*/true, pHandle, - pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo)); + pTableListInfo, pTagCond, pTagIndexCond, idstr); if (code) { pTaskInfo->code = code; qError("failed to createScanTableListInfo, code: %s", tstrerror(code)); @@ -3621,7 +3442,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo if (pHandle->vnode) { int32_t code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, - pHandle, pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo)); + pHandle, pTableListInfo, pTagCond, pTagIndexCond, idstr); if (code) { pTaskInfo->code = code; qError("failed to createScanTableListInfo, code: %s", tstrerror(code)); @@ -3629,11 +3450,11 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } #ifndef NDEBUG - int32_t sz = taosArrayGetSize(pTableListInfo->pTableList); + int32_t sz = tableListGetSize(pTableListInfo); qDebug("create stream task, total:%d", sz); for (int32_t i = 0; i < sz; i++) { - STableKeyInfo* pKeyInfo = taosArrayGet(pTableListInfo->pTableList, i); + STableKeyInfo* pKeyInfo = tableListGetInfo(pTableListInfo, i); qDebug("add table uid:%" PRIu64", gid:%"PRIu64, pKeyInfo->uid, pKeyInfo->groupId); } #endif @@ -3646,7 +3467,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo pOperator = createSysTableScanOperatorInfo(pHandle, pSysScanPhyNode, pUser, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) { STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode; - int32_t code = getTableList(pHandle->meta, pHandle->vnode, pScanPhyNode, pTagCond, pTagIndexCond, pTableListInfo); + + int32_t code = createScanTableListInfo(pScanPhyNode, NULL, false, pHandle, pTableListInfo, pTagCond, + pTagIndexCond, idstr); if (code != TSDB_CODE_SUCCESS) { pTaskInfo->code = code; qError("failed to getTableList, code: %s", tstrerror(code)); @@ -3656,8 +3479,6 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo pOperator = createTagScanOperatorInfo(pHandle, pScanPhyNode, pTableListInfo, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) { SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*)pPhyNode; - pTableListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo)); - pTableListInfo->numOfOuputGroups = 1; if (pBlockNode->tableType == TSDB_SUPER_TABLE) { SArray* pList = taosArrayInit(4, sizeof(STableKeyInfo)); @@ -3667,38 +3488,21 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo return NULL; } - for(int32_t i = 0; i < taosArrayGetSize(pTableListInfo->pTableList); ++i) { + for(int32_t i = 0; i < tableListGetSize(pTableListInfo); ++i) { STableKeyInfo* p = taosArrayGet(pList, i); - addTableIntoTableList(pTableListInfo, p->uid, 0); + tableListAddTableInfo(pTableListInfo, p->uid, 0); } taosArrayDestroy(pList); - } else { // Create one table group. - addTableIntoTableList(pTableListInfo, pBlockNode->uid, 0); + } else { // Create group with only one table + tableListAddTableInfo(pTableListInfo, pBlockNode->uid, 0); } - SQueryTableDataCond cond = {0}; - - int32_t code = initTableblockDistQueryCond(pBlockNode->suid, &cond); - if (code != TSDB_CODE_SUCCESS) { - return NULL; - } - - size_t num = getTotalTables(pTableListInfo); - void* pList = NULL; - if (num > 0) { - pList = taosArrayGet(pTableListInfo->pTableList, 0); - } - - STsdbReader* pReader = NULL; - tsdbReaderOpen(pHandle->vnode, &cond, pList, num, &pReader, ""); - cleanupQueryTableDataCond(&cond); - - pOperator = createDataBlockInfoScanOperator(pReader, pHandle, cond.suid, pBlockNode, pTaskInfo); + pOperator = createDataBlockInfoScanOperator(pHandle, pBlockNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN == type) { SLastRowScanPhysiNode* pScanNode = (SLastRowScanPhysiNode*)pPhyNode; int32_t code = createScanTableListInfo(&pScanNode->scan, pScanNode->pGroupTags, true, pHandle, pTableListInfo, - pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo)); + pTagCond, pTagIndexCond, idstr); if (code != TSDB_CODE_SUCCESS) { pTaskInfo->code = code; return NULL; @@ -3725,17 +3529,18 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } size_t size = LIST_LENGTH(pPhyNode->pChildren); - SOperatorInfo** ops = taosMemoryCalloc(size, POINTER_BYTES); + if (ops == NULL) { + return NULL; + } + for (int32_t i = 0; i < size; ++i) { SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i); - ops[i] = createOperatorTree(pChildNode, pTaskInfo, pHandle, pTableListInfo, pTagCond, pTagIndexCond, pUser); + ops[i] = createOperatorTree(pChildNode, pTaskInfo, pHandle, pTagCond, pTagIndexCond, pUser); if (ops[i] == NULL) { taosMemoryFree(ops); return NULL; } - - ops[i]->resultDataBlockId = pChildNode->pOutputDataBlockDesc->dataBlockId; } SOperatorInfo* pOptr = NULL; @@ -4000,15 +3805,18 @@ int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pT if (NULL == pDeleterParam) { return TSDB_CODE_OUT_OF_MEMORY; } - int32_t tbNum = taosArrayGetSize(pTask->tableqinfoList.pTableList); - pDeleterParam->suid = pTask->tableqinfoList.suid; + int32_t tbNum = tableListGetSize(pTask->pTableInfoList); + pDeleterParam->suid = tableListGetSuid(pTask->pTableInfoList); + + // TODO extract uid list pDeleterParam->pUidList = taosArrayInit(tbNum, sizeof(uint64_t)); if (NULL == pDeleterParam->pUidList) { taosMemoryFree(pDeleterParam); return TSDB_CODE_OUT_OF_MEMORY; } + for (int32_t i = 0; i < tbNum; ++i) { - STableKeyInfo* pTable = taosArrayGet(pTask->tableqinfoList.pTableList, i); + STableKeyInfo* pTable = tableListGetInfo(pTask->pTableInfoList, i); taosArrayPush(pDeleterParam->pUidList, &pTable->uid); } @@ -4044,8 +3852,8 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead sql = NULL; (*pTaskInfo)->pSubplan = pPlan; - (*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, &(*pTaskInfo)->tableqinfoList, - pPlan->pTagCond, pPlan->pTagIndexCond, pPlan->user); + (*pTaskInfo)->pRoot = + createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, pPlan->pTagCond, pPlan->pTagIndexCond, pPlan->user); if (NULL == (*pTaskInfo)->pRoot) { code = (*pTaskInfo)->code; @@ -4064,7 +3872,7 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead void doDestroyTask(SExecTaskInfo* pTaskInfo) { qDebug("%s execTask is freed", GET_TASKID(pTaskInfo)); - destroyTableList(&pTaskInfo->tableqinfoList); + pTaskInfo->pTableInfoList = tableListDestroy(pTaskInfo->pTableInfoList); destroyOperatorInfo(pTaskInfo->pRoot); cleanupTableSchemaInfo(&pTaskInfo->schemaInfo); cleanupStreamInfo(&pTaskInfo->streamInfo); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 415fa60287c7229dd9a16803629e89dfb18c198b..b9324d0c64b6fc59aaf9919d5f9cdb2d8dc7daed 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -623,7 +623,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { pBlock->info = binfo; ASSERT(binfo.uid != 0); - pBlock->info.groupId = getTableGroupId(&pTaskInfo->tableqinfoList, pBlock->info.uid); + pBlock->info.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.uid); uint32_t status = 0; int32_t code = loadDataBlock(pOperator, pTableScanInfo, pBlock, &status); @@ -719,7 +719,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { // scan table one by one sequentially if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) { - int32_t numOfTables = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList); + int32_t numOfTables = tableListGetSize(pTaskInfo->pTableInfoList); while (1) { SSDataBlock* result = doTableScanGroup(pOperator); @@ -733,7 +733,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { return NULL; } - STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, pInfo->currentTable); + STableKeyInfo* pTableInfo = tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->currentTable); tsdbSetTableList(pInfo->dataReader, pTableInfo, 1); qDebug("set uid:%" PRIu64 " into scanner, total tables:%d, index:%d %s", pTableInfo->uid, numOfTables, pInfo->currentTable, pTaskInfo->id.str); @@ -743,14 +743,14 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { } } else { // scan table group by group sequentially if (pInfo->currentGroupId == -1) { - if ((++pInfo->currentGroupId) >= getNumOfOutputGroups(&pTaskInfo->tableqinfoList)) { + if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pTaskInfo->pTableInfoList)) { doSetOperatorCompleted(pOperator); return NULL; } int32_t num = 0; STableKeyInfo* pList = NULL; - getTablesOfGroup(&pTaskInfo->tableqinfoList, pInfo->currentGroupId, &pList, &num); + tableListGetGroupList(pTaskInfo->pTableInfoList, pInfo->currentGroupId, &pList, &num); ASSERT(pInfo->dataReader == NULL); int32_t code = tsdbReaderOpen(pInfo->readHandle.vnode, &pInfo->cond, pList, num, (STsdbReader**)&pInfo->dataReader, @@ -766,7 +766,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { return result; } - if ((++pInfo->currentGroupId) >= getNumOfOutputGroups(&pTaskInfo->tableqinfoList)) { + if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pTaskInfo->pTableInfoList)) { doSetOperatorCompleted(pOperator); return NULL; } @@ -778,7 +778,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { int32_t num = 0; STableKeyInfo* pList = NULL; - getTablesOfGroup(&pTaskInfo->tableqinfoList, pInfo->currentGroupId, &pList, &num); + tableListGetGroupList(pTaskInfo->pTableInfoList, pInfo->currentGroupId, &pList, &num); tsdbSetTableList(pInfo->dataReader, pList, num); tsdbReaderReset(pInfo->dataReader, &pInfo->cond); @@ -1000,8 +1000,31 @@ static void destroyBlockDistScanOperatorInfo(void* param) { taosMemoryFreeClear(param); } -SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* readHandle, uint64_t uid, - SBlockDistScanPhysiNode* pBlockScanNode, SExecTaskInfo* pTaskInfo) { +static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pCond) { + memset(pCond, 0, sizeof(SQueryTableDataCond)); + + pCond->order = TSDB_ORDER_ASC; + pCond->numOfCols = 1; + pCond->colList = taosMemoryCalloc(1, sizeof(SColumnInfo)); + if (pCond->colList == NULL) { + terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; + return terrno; + } + + pCond->colList->colId = 1; + pCond->colList->type = TSDB_DATA_TYPE_TIMESTAMP; + pCond->colList->bytes = sizeof(TSKEY); + + pCond->twindows = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX}; + pCond->suid = uid; + pCond->type = TIMEWINDOW_RANGE_CONTAINED; + pCond->startVersion = -1; + pCond->endVersion = -1; + + return TSDB_CODE_SUCCESS; +} + +SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanPhysiNode* pBlockScanNode, SExecTaskInfo* pTaskInfo) { SBlockDistInfo* pInfo = taosMemoryCalloc(1, sizeof(SBlockDistInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -1009,9 +1032,24 @@ SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* re goto _error; } - pInfo->pHandle = dataReader; + { + SQueryTableDataCond cond = {0}; + + int32_t code = initTableblockDistQueryCond(pBlockScanNode->suid, &cond); + if (code != TSDB_CODE_SUCCESS) { + return NULL; + } + + STableListInfo* pTableListInfo = pTaskInfo->pTableInfoList; + size_t num = tableListGetSize(pTableListInfo); + void* pList = tableListGetInfo(pTableListInfo, 0); + + tsdbReaderOpen(readHandle->vnode, &cond, pList, num, &pInfo->pHandle, pTaskInfo->id.str); + cleanupQueryTableDataCond(&cond); + } + pInfo->readHandle = *readHandle; - pInfo->uid = uid; + pInfo->uid = pBlockScanNode->suid; pInfo->pResBlock = createResDataBlock(pBlockScanNode->node.pOutputDataBlockDesc); int32_t numOfCols = 0; @@ -1119,7 +1157,7 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU relocateColumnData(pBlock, pTableScanInfo->matchInfo.pList, pCols, true); doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo); - pBlock->info.groupId = getTableGroupId(&pTaskInfo->tableqinfoList, binfo.uid); + pBlock->info.groupId = getTableGroupId(pTaskInfo->pTableInfoList, binfo.uid); } tsdbReaderClose(pReader); @@ -1139,8 +1177,8 @@ static uint64_t getGroupIdByCol(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, } static uint64_t getGroupIdByUid(SStreamScanInfo* pInfo, uint64_t uid) { - return getTableGroupId(&pInfo->pTableScanOp->pTaskInfo->tableqinfoList, uid); -// SHashObj* map = pInfo->pTableScanOp->pTaskInfo->tableqinfoList.map; + return getTableGroupId(pInfo->pTableScanOp->pTaskInfo->pTableInfoList, uid); +// SHashObj* map = pInfo->pTableScanOp->pTaskInfo->pTableInfoList.map; // uint64_t* groupId = taosHashGet(map, &uid, sizeof(int64_t)); // if (groupId) { // return *groupId; @@ -1564,7 +1602,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock pInfo->pRes->info.type = STREAM_NORMAL; pInfo->pRes->info.version = pBlock->info.version; - pInfo->pRes->info.groupId = getTableGroupId(&pTaskInfo->tableqinfoList, pBlock->info.uid); + pInfo->pRes->info.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.uid); // todo extract method for (int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) { @@ -2076,12 +2114,13 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { } } -static SArray* extractTableIdList(const STableListInfo* pTableGroupInfo) { +static SArray* extractTableIdList(const STableListInfo* pTableListInfo) { SArray* tableIdList = taosArrayInit(4, sizeof(uint64_t)); // Transfer the Array of STableKeyInfo into uid list. - for (int32_t i = 0; i < taosArrayGetSize(pTableGroupInfo->pTableList); ++i) { - STableKeyInfo* pkeyInfo = taosArrayGet(pTableGroupInfo->pTableList, i); + size_t size = tableListGetSize(pTableListInfo); + for (int32_t i = 0; i < size; ++i) { + STableKeyInfo* pkeyInfo = tableListGetInfo(pTableListInfo, i); taosArrayPush(tableIdList, &pkeyInfo->uid); } @@ -2350,7 +2389,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys STableKeyInfo* pList = NULL; int32_t num = 0; - getTablesOfGroup(&pTaskInfo->tableqinfoList, 0, &pList, &num); + tableListGetGroupList(pTaskInfo->pTableInfoList, 0, &pList, &num); if (pHandle->initTableReader) { pTSInfo->scanMode = TABLE_SCAN__TABLE_ORDER; @@ -2383,7 +2422,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys // set the extract column id to streamHandle tqReaderSetColIdList(pInfo->tqReader, pColIds); - SArray* tableIdList = extractTableIdList(&pTaskInfo->tableqinfoList); + SArray* tableIdList = extractTableIdList(pTaskInfo->pTableInfoList); code = tqReaderSetTbUidList(pInfo->tqReader, tableIdList); if (code != 0) { taosArrayDestroy(tableIdList); @@ -4082,7 +4121,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { SSDataBlock* pRes = pInfo->pRes; blockDataCleanup(pRes); - int32_t size = taosArrayGetSize(pInfo->pTableList->pTableList); + int32_t size = tableListGetSize(pTaskInfo->pTableInfoList); if (size == 0) { setTaskStatus(pTaskInfo, TASK_COMPLETED); return NULL; @@ -4094,7 +4133,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { metaReaderInit(&mr, pInfo->readHandle.meta, 0); while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) { - STableKeyInfo* item = taosArrayGet(pInfo->pTableList->pTableList, pInfo->curPos); + STableKeyInfo* item = tableListGetInfo(pInfo->pTableList, pInfo->curPos); int32_t code = metaGetTableEntryByUid(&mr, item->uid); tDecoderClear(&mr.coder); if (code != TSDB_CODE_SUCCESS) { @@ -4209,47 +4248,10 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi return NULL; } -int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle, - STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond, - const char* idStr) { - int64_t st = taosGetTimestampUs(); - - if (pHandle == NULL) { - qError("invalid handle, in creating operator tree, %s", idStr); - return TSDB_CODE_INVALID_PARA; - } - - int32_t code = getTableList(pHandle->meta, pHandle->vnode, pScanNode, pTagCond, pTagIndexCond, pTableListInfo); - if (code != TSDB_CODE_SUCCESS) { - qError("failed to getTableList, code: %s", tstrerror(code)); - return code; - } - - pTableListInfo->numOfOuputGroups = 1; - - int64_t st1 = taosGetTimestampUs(); - qDebug("generate queried table list completed, elapsed time:%.2f ms %s", (st1 - st) / 1000.0, idStr); - - if (taosArrayGetSize(pTableListInfo->pTableList) == 0) { - qDebug("no table qualified for query, %s" PRIx64, idStr); - return TSDB_CODE_SUCCESS; - } - - code = setGroupIdMapForAllTables(pTableListInfo, pHandle, pGroupTags, groupSort); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - int64_t st2 = taosGetTimestampUs(); - qDebug("generate group id map completed, elapsed time:%.2f ms %s", (st2 - st1) / 1000.0, idStr); - - return TSDB_CODE_SUCCESS; -} - int32_t createMultipleDataReaders(SQueryTableDataCond* pQueryCond, SReadHandle* pHandle, STableListInfo* pTableListInfo, int32_t tableStartIdx, int32_t tableEndIdx, SArray* arrayReader, const char* idstr) { for (int32_t i = tableStartIdx; i <= tableEndIdx; ++i) { - STableKeyInfo* pList = taosArrayGet(pTableListInfo->pTableList, i); + STableKeyInfo* pList = tableListGetInfo(pTableListInfo, i); STsdbReader* pReader = NULL; tsdbReaderOpen(pHandle->vnode, pQueryCond, pList, 1, &pReader, idstr); taosArrayPush(arrayReader, &pReader); @@ -4262,7 +4264,7 @@ int32_t createMultipleDataReaders2(SQueryTableDataCond* pQueryCond, SReadHandle* STableListInfo* pTableListInfo, int32_t tableStartIdx, int32_t tableEndIdx, STsdbReader** ppReader, const char* idstr) { STsdbReader* pReader = NULL; - void* pStart = taosArrayGet(pTableListInfo->pTableList, tableStartIdx); + void* pStart = tableListGetInfo(pTableListInfo, tableStartIdx); int32_t num = tableEndIdx - tableStartIdx + 1; int32_t code = tsdbReaderOpen(pHandle->vnode, pQueryCond, pStart, num, &pReader, idstr); @@ -4522,7 +4524,7 @@ static SSDataBlock* getTableDataBlockTemp(void* param) { int64_t st = taosGetTimestampUs(); - void* p =taosArrayGet(pInfo->tableListInfo->pTableList, readIdx + pInfo->tableStartIndex); + void* p = tableListGetInfo(pInfo->tableListInfo, readIdx + pInfo->tableStartIndex); SReadHandle* pHandle = &pInfo->readHandle; tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, &pInfo->pReader, GET_TASKID(pTaskInfo)); @@ -4565,7 +4567,7 @@ static SSDataBlock* getTableDataBlockTemp(void* param) { continue; } - pBlock->info.groupId = getTableGroupId(&pOperator->pTaskInfo->tableqinfoList, pBlock->info.uid); + pBlock->info.groupId = getTableGroupId(pOperator->pTaskInfo->pTableInfoList, pBlock->info.uid); pOperator->resultInfo.totalRows += pBlock->info.rows; // pTableScanInfo->readRecorder.totalRows; pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; @@ -4622,7 +4624,7 @@ static SSDataBlock* getTableDataBlock2(void* param) { continue; } - pBlock->info.groupId = getTableGroupId(&pOperator->pTaskInfo->tableqinfoList, pBlock->info.uid); + pBlock->info.groupId = getTableGroupId(pOperator->pTaskInfo->pTableInfoList, pBlock->info.uid); pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows; pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; @@ -4676,7 +4678,7 @@ static SSDataBlock* getTableDataBlock(void* param) { continue; } - pBlock->info.groupId = getTableGroupId(&pOperator->pTaskInfo->tableqinfoList, pBlock->info.uid); + pBlock->info.groupId = getTableGroupId(pOperator->pTaskInfo->pTableInfoList, pBlock->info.uid); pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows; pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; @@ -4719,10 +4721,10 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; { - size_t tableListSize = taosArrayGetSize(pInfo->tableListInfo->pTableList); + size_t numOfTables = tableListGetSize(pInfo->tableListInfo); int32_t i = pInfo->tableStartIndex + 1; - for (; i < tableListSize; ++i) { - STableKeyInfo* tableKeyInfo = taosArrayGet(pInfo->tableListInfo->pTableList, i); + for (; i < numOfTables; ++i) { + STableKeyInfo* tableKeyInfo = tableListGetInfo(pInfo->tableListInfo, i); if (tableKeyInfo->groupId != pInfo->groupId) { break; } @@ -4847,7 +4849,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { T_LONG_JMP(pTaskInfo->env, code); } - size_t tableListSize = taosArrayGetSize(pInfo->tableListInfo->pTableList); + size_t tableListSize = tableListGetSize(pInfo->tableListInfo); if (!pInfo->hasGroupId) { pInfo->hasGroupId = true; @@ -4856,7 +4858,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { return NULL; } pInfo->tableStartIndex = 0; - pInfo->groupId = ((STableKeyInfo*)taosArrayGet(pInfo->tableListInfo->pTableList, pInfo->tableStartIndex))->groupId; + pInfo->groupId = ((STableKeyInfo*)tableListGetInfo(pInfo->tableListInfo, pInfo->tableStartIndex))->groupId; startGroupTableMergeScan(pOperator); } @@ -4875,8 +4877,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { break; } pInfo->tableStartIndex = pInfo->tableEndIndex + 1; - pInfo->groupId = - ((STableKeyInfo*)taosArrayGet(pInfo->tableListInfo->pTableList, pInfo->tableStartIndex))->groupId; + pInfo->groupId = tableListGetInfo(pInfo->tableListInfo, pInfo->tableStartIndex)->groupId; startGroupTableMergeScan(pOperator); } }