From 52c3777823376836714ba0570a956897c1e471a6 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 30 Oct 2022 03:01:40 +0800 Subject: [PATCH] refactor: do some internal refactor. --- include/common/tcommon.h | 8 - source/dnode/vnode/src/inc/vnodeInt.h | 3 - source/libs/executor/inc/executil.h | 29 ++++ source/libs/executor/inc/executorimpl.h | 2 +- source/libs/executor/src/cachescanoperator.c | 12 +- source/libs/executor/src/executil.c | 165 ++++++++++++++++--- source/libs/executor/src/executor.c | 8 +- source/libs/executor/src/executorimpl.c | 73 ++++++-- source/libs/executor/src/scanoperator.c | 44 +++-- 9 files changed, 259 insertions(+), 85 deletions(-) diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 2b45a5d206..3bfbb85958 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -164,14 +164,6 @@ typedef enum EStreamType { STREAM_FILL_OVER, } EStreamType; -typedef struct { - SArray* pGroupList; - SArray* pTableList; - SHashObj* map; // speedup acquire the tableQueryInfo by table uid - bool needSortTableByGroupId; - uint64_t suid; -} STableListInfo; - #pragma pack(push, 1) typedef struct SColumnDataAgg { int16_t colId; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index a5dc4431ab..94178727be 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -162,10 +162,7 @@ int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSub int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, SSubmitBlkRsp* pRsp); int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey); -STsdbReader tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId, - void* pMemRef); int32_t tsdbSetKeepCfg(STsdb* pTsdb, STsdbCfg* pCfg); -int32_t tsdbGetStbIdList(SMeta* pMeta, int64_t suid, SArray* list); // tq int tqInit(); diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index 0cfef7dc24..9f77178966 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -97,6 +97,35 @@ typedef struct SColMatchInfo { struct SqlFunctionCtx; +// If the numOfOutputGroups is 1, the data blocks that belongs to different groups will be provided randomly +// The numOfOutputGroups is specified by physical plan. and will not be affect by numOfGroups +//typedef struct STableListInfo { +// bool oneTableForEachGroup; +// int32_t numOfOuputGroups; // the data block will be generated one by one +// int32_t* groupOffset; // keep the offset value for each group in the tableList +// SArray* pTableList; +// SHashObj* map; // speedup acquire the tableQueryInfo by table uid +// uint64_t suid; +//} STableListInfo; +typedef struct { + bool oneTableForEachGroup; + int32_t numOfOuputGroups; // the data block will be generated one by one + int32_t* groupOffset; // keep the offset value for each group in the tableList + SArray* pGroupList; + SArray* pTableList; + SHashObj* map; // speedup acquire the tableQueryInfo by table uid + bool needSortTableByGroupId; + uint64_t suid; +} STableListInfo; + +void destroyTableList(STableListInfo* pTableList); +int32_t getNumOfOutputGroups(const STableListInfo* pTableList); +bool oneTableForEachGroup(const STableListInfo* pTableList); +int32_t addTableIntoTableList(STableListInfo* pTableList, uint64_t uid, uint64_t gid); +int32_t getTablesOfGroup(const STableListInfo* pTableList, int32_t ordinalIndex, STableKeyInfo** pKeyInfo, int32_t* num); +uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid); +uint64_t getTotalTables(const STableListInfo* pTableList); + size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput); void initResultRowInfo(SResultRowInfo* pResultRowInfo); void closeResultRow(SResultRow* pResultRow); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 8769e8ac2f..31bf410f0a 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -1077,7 +1077,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex); bool groupbyTbname(SNodeList* pGroupList); -int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* groupKey); +int32_t setGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* groupKey, bool groupSort); void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput); int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup, SGroupResInfo* pGroupResInfo); diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index f645e71c6e..6241eae3ac 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -167,17 +167,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { } } - if (pTableList->map != NULL) { - int64_t* groupId = taosHashGet(pTableList->map, &pInfo->pRes->info.uid, sizeof(int64_t)); - if (groupId != NULL) { - pInfo->pRes->info.groupId = *groupId; - } - } else { - ASSERT(taosArrayGetSize(pTableList->pTableList) == 1); - STableKeyInfo* pKeyInfo = taosArrayGet(pTableList->pTableList, 0); - pInfo->pRes->info.groupId = pKeyInfo->groupId; - } - + pInfo->pRes->info.groupId = getTableGroupId(pTableList, pInfo->pRes->info.uid); pInfo->indexOfBufferedRes += 1; return pInfo->pRes; } else { diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 971b28eb09..7448f7a8fa 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -733,7 +733,6 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis int32_t len = (int32_t)(pStart - (char*)keyBuf); info->groupId = calcGroupId(keyBuf, len); - taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &info->groupId, sizeof(uint64_t)); } // int64_t st2 = taosGetTimestampUs(); @@ -751,14 +750,70 @@ end: return code; } +static int32_t nameComparFn(const void* p1, const void* p2) { + const char* pName1 = *(const char**)p1; + const char* pName2 = *(const char**)p2; + + int32_t ret = strcmp(pName1, pName2); + if (ret == 0) { + return 0; + } else { + return (ret > 0) ? 1 : -1; + } +} + +static SArray* getTableNameList(const SNodeListNode* pList) { + int32_t len = LIST_LENGTH(pList->pNodeList); + SListCell* cell = pList->pNodeList->pHead; + + SArray* pTbList = taosArrayInit(len, POINTER_BYTES); + for (int i = 0; i < pList->pNodeList->length; i++) { + SValueNode* valueNode = (SValueNode*)cell->pNode; + if (!IS_VAR_DATA_TYPE(valueNode->node.resType.type)) { + terrno = TSDB_CODE_INVALID_PARA; + taosArrayDestroy(pTbList); + return NULL; + } + + char* name = varDataVal(valueNode->datum.p); + taosArrayPush(pTbList, &name); + cell = cell->pNext; + } + + size_t numOfTables = taosArrayGetSize(pTbList); + + // order the name + taosArraySort(pTbList, nameComparFn); + + // remove the duplicates + SArray* pNewList = taosArrayInit(taosArrayGetSize(pTbList), sizeof(void*)); + taosArrayPush(pNewList, taosArrayGet(pTbList, 0)); + + for (int32_t i = 1; i < numOfTables; ++i) { + char** name = taosArrayGetLast(pNewList); + char** nameInOldList = taosArrayGet(pTbList, i); + if (strcmp(*name, *nameInOldList) == 0) { + continue; + } + + taosArrayPush(pNewList, nameInOldList); + } + + taosArrayDestroy(pTbList); + return pNewList; +} + static int tableUidCompare(const void* a, const void* b) { - int64_t u1 = *(uint64_t*)a; - int64_t u2 = *(uint64_t*)b; + uint64_t u1 = *(uint64_t*)a; + uint64_t u2 = *(uint64_t*)b; + if (u1 == u2) { return 0; } + return u1 < u2 ? -1 : 1; } + static int32_t optimizeTbnameInCond(void* metaHandle, int64_t suid, SArray* list, SNode* cond, SHashObj* tags) { int32_t ret = -1; if (nodeType(cond) == QUERY_NODE_OPERATOR) { @@ -778,7 +833,9 @@ static int32_t optimizeTbnameInCond(void* metaHandle, int64_t suid, SArray* list SNodeList* pList = (SNodeList*)pNode->pParameterList; int32_t len = LIST_LENGTH(pList); - if (len <= 0) return ret; + if (len <= 0) { + return ret; + } SListCell* cell = pList->pHead; for (int i = 0; i < len; i++) { @@ -789,6 +846,7 @@ static int32_t optimizeTbnameInCond(void* metaHandle, int64_t suid, SArray* list } cell = cell->pNext; } + taosArraySort(list, tableUidCompare); taosArrayRemoveDuplicate(list, tableUidCompare, NULL); @@ -796,6 +854,7 @@ static int32_t optimizeTbnameInCond(void* metaHandle, int64_t suid, SArray* list ret = metaGetTableTagsByUids(metaHandle, suid, list, tags); removeInvalidTable(list, tags); } + return ret; } @@ -831,23 +890,13 @@ static int32_t optimizeTbnameInCondImpl(void* metaHandle, int64_t suid, SArray* SNodeListNode* pList = (SNodeListNode*)pNode->pRight; int32_t len = LIST_LENGTH(pList->pNodeList); - if (len <= 0) return -1; - - SListCell* cell = pList->pNodeList->pHead; - - SArray* pTbList = taosArrayInit(len, sizeof(void*)); - for (int i = 0; i < pList->pNodeList->length; i++) { - SValueNode* valueNode = (SValueNode*)cell->pNode; - if (!IS_VAR_DATA_TYPE(valueNode->node.resType.type)) { - taosArrayDestroy(pTbList); - return -1; - } - char* name = varDataVal(valueNode->datum.p); - taosArrayPush(pTbList, &name); - cell = cell->pNext; + if (len <= 0) { + return -1; } - for (int i = 0; i < taosArrayGetSize(pTbList); i++) { + SArray* pTbList = getTableNameList(pList); + size_t num = taosArrayGetSize(pTbList); + for (int i = 0; i < num; i++) { char* name = taosArrayGetP(pTbList, i); uint64_t uid = 0; if (metaGetTableUidByName(metaHandle, name, &uid) == 0) { @@ -866,8 +915,10 @@ static int32_t optimizeTbnameInCondImpl(void* metaHandle, int64_t suid, SArray* taosArrayDestroy(pTbList); return 0; } + return -1; } + int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond, STableListInfo* pListInfo) { int32_t code = TSDB_CODE_SUCCESS; @@ -1604,3 +1655,79 @@ void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimit pLimitInfo->remainOffset = limit.offset; pLimitInfo->remainGroupOffset = slimit.offset; } + +uint64_t getTotalTables(const STableListInfo* pTableList) { + ASSERT(taosArrayGetSize(pTableList->pTableList) == taosHashGetSize(pTableList->map)); + return taosArrayGetSize(pTableList->pTableList); +} + +uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid) { + int32_t* slot = taosHashGet(pTableList->map, &tableUid, sizeof(tableUid)); + ASSERT(pTableList->map != NULL && slot != NULL); + + STableKeyInfo* pKeyInfo = taosArrayGet(pTableList->pTableList, *slot); + ASSERT(pKeyInfo->uid == tableUid); + + return pKeyInfo->groupId; +} + +int32_t addTableIntoTableList(STableListInfo* pTableList, uint64_t uid, uint64_t gid) { + if (pTableList->map == NULL) { + ASSERT(taosArrayGetSize(pTableList->pTableList) == 0); + pTableList->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); + } + + STableKeyInfo keyInfo = {.uid = uid, .groupId = gid}; + taosArrayPush(pTableList->pTableList, &keyInfo); + + int32_t slot = (int32_t)taosArrayGetSize(pTableList->pTableList) - 1; + taosHashPut(pTableList->map, &uid, sizeof(uid), &slot, sizeof(slot)); + + qDebug("uid:%" PRIu64 ", groupId:%" PRIu64 " added into table list, slot:%d, total:%d", uid, gid, slot, slot + 1); + return TSDB_CODE_SUCCESS; +} + +int32_t getTablesOfGroup(const STableListInfo* pTableList, int32_t ordinalGroupIndex, STableKeyInfo** pKeyInfo, + int32_t* size) { + int32_t total = getNumOfOutputGroups(pTableList); + if (ordinalGroupIndex < 0 || ordinalGroupIndex >= total) { + return TSDB_CODE_INVALID_PARA; + } + + // here handle two special cases: + // 1. only one group exists, and 2. one table exists for each group. + if (total == 1) { + *size = getTotalTables(pTableList); + *pKeyInfo = taosArrayGet(pTableList->pTableList, 0); + return TSDB_CODE_SUCCESS; + } else if (total == getTotalTables(pTableList)) { + *size = 1; + *pKeyInfo = taosArrayGet(pTableList->pTableList, ordinalGroupIndex); + return TSDB_CODE_SUCCESS; + } + + int32_t offset = pTableList->groupOffset[ordinalGroupIndex]; + if (ordinalGroupIndex < total - 1) { + *size = pTableList->groupOffset[offset + 1] - pTableList->groupOffset[offset]; + } else { + *size = total - pTableList->groupOffset[offset] - 1; + } + + *pKeyInfo = taosArrayGet(pTableList->pTableList, offset); + return TSDB_CODE_SUCCESS; +} + +int32_t getNumOfOutputGroups(const STableListInfo* pTableList) { return pTableList->numOfOuputGroups; } + +// todo remove it +bool oneTableForEachGroup(const STableListInfo* pTableList) { return pTableList->oneTableForEachGroup; } + +void destroyTableList(STableListInfo* pTableqinfoList) { + pTableqinfoList->pTableList = taosArrayDestroy(pTableqinfoList->pTableList); + taosMemoryFreeClear(pTableqinfoList->groupOffset); + + taosHashCleanup(pTableqinfoList->map); + + pTableqinfoList->pTableList = NULL; + pTableqinfoList->map = NULL; +} diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 9546c3895e..12f3874e70 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -293,10 +293,6 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo qDebug("add %d tables id into query list, %s", (int32_t)taosArrayGetSize(tableIdList), pTaskInfo->id.str); } - if (pListInfo->map == NULL) { - pListInfo->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); - } - // traverse to the stream scanner node to add this table id SOperatorInfo* pInfo = pTaskInfo->pRoot; while (pInfo->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { @@ -358,8 +354,8 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo if (!exists) { #endif - taosArrayPush(pTaskInfo->tableqinfoList.pTableList, &keyInfo); - taosHashPut(pTaskInfo->tableqinfoList.map, uid, sizeof(*uid), &keyInfo.groupId, sizeof(keyInfo.groupId)); + + addTableIntoTableList(&pTaskInfo->tableqinfoList, keyInfo.uid, keyInfo.groupId); } if (keyBuf != NULL) { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 75db42fccc..0091b4a31d 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3366,7 +3366,47 @@ static void cleanupTableSchemaInfo(SSchemaInfo* pSchemaInfo) { static void cleanupStreamInfo(SStreamTaskInfo* pStreamInfo) { tDeleteSSchemaWrapper(pStreamInfo->schema); } + +static int32_t orderbyGroupIdComparFn(const void* p1, const void* p2) { + STableKeyInfo* pInfo1 = (STableKeyInfo*) p1; + STableKeyInfo* pInfo2 = (STableKeyInfo*) p2; + + if (pInfo1->groupId == pInfo2->groupId) { + return 0; + } else { + return pInfo1->groupId < pInfo2->groupId? -1:1; + } +} + static int32_t sortTableGroup(STableListInfo* pTableListInfo) { + int32_t code = TSDB_CODE_SUCCESS; + + taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn); + int32_t size = taosArrayGetSize(pTableListInfo->pTableList); + + SArray* pList = taosArrayInit(4, sizeof(int32_t)); + + STableKeyInfo* pInfo = taosArrayGet(pTableListInfo->pTableList, 0); + uint64_t gid = pInfo->groupId; + + int32_t start = 0; + taosArrayPush(pList, &start); + + for(int32_t i = 1; i < size; ++i) { + pInfo = taosArrayGet(pTableListInfo->pTableList, i); + if (pInfo->groupId != gid) { + taosArrayPush(pList, &i); + gid = pInfo->groupId; + } + } + + pTableListInfo->numOfOuputGroups = taosArrayGetSize(pList); + pTableListInfo->groupOffset = taosMemoryMalloc(sizeof(int32_t) * pTableListInfo->numOfOuputGroups); + memcpy(pTableListInfo->groupOffset, taosArrayGet(pList, 0), sizeof(int32_t) * pTableListInfo->numOfOuputGroups); + taosArrayDestroy(pList); + return TSDB_CODE_SUCCESS; + +#if 0 taosArrayClear(pTableListInfo->pGroupList); SArray* sortSupport = taosArrayInit(16, sizeof(uint64_t)); if (sortSupport == NULL) return TSDB_CODE_OUT_OF_MEMORY; @@ -3422,6 +3462,7 @@ static int32_t sortTableGroup(STableListInfo* pTableListInfo) { } taosArrayDestroy(sortSupport); return TDB_CODE_SUCCESS; +#endif } bool groupbyTbname(SNodeList* pGroupList) { @@ -3437,35 +3478,41 @@ bool groupbyTbname(SNodeList* pGroupList) { return bytbname; } -int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* group) { - if (group == NULL) { - return TDB_CODE_SUCCESS; - } - +int32_t setGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* group, bool groupSort) { pTableListInfo->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); if (pTableListInfo->map == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } - bool assignUid = groupbyTbname(group); - + bool groupByTbname = groupbyTbname(group); size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList); - if (assignUid) { + if (groupByTbname || group == NULL) { for (int32_t i = 0; i < numOfTables; i++) { STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i); - info->groupId = info->uid; - taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &info->groupId, sizeof(uint64_t)); + info->groupId = groupByTbname? info->uid:0; + } + + pTableListInfo->oneTableForEachGroup = groupByTbname; + + if (groupSort && groupByTbname) { + taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn); + pTableListInfo->numOfOuputGroups = numOfTables; } } else { int32_t code = getColInfoResultForGroupby(pHandle->meta, group, pTableListInfo); if (code != TSDB_CODE_SUCCESS) { return code; } + + if (groupSort) { + return sortTableGroup(pTableListInfo); + } } - if (pTableListInfo->needSortTableByGroupId) { - return sortTableGroup(pTableListInfo); + for(int32_t i = 0; i < numOfTables; ++i) { + STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i); + taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &i, sizeof(int32_t)); } return TDB_CODE_SUCCESS; @@ -3551,7 +3598,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; if (pHandle->vnode) { int32_t code = - createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, + createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, /*pTableScanNode->groupSort*/false, pHandle, pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo)); if (code) { pTaskInfo->code = code; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index e2f3b1c6c4..a4a9f233ab 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -626,10 +626,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { pBlock->info = binfo; ASSERT(binfo.uid != 0); - uint64_t* groupId = taosHashGet(pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t)); - if (groupId) { - pBlock->info.groupId = *groupId; - } + pBlock->info.groupId = getTableGroupId(&pTaskInfo->tableqinfoList, pBlock->info.uid); uint32_t status = 0; int32_t code = loadDataBlock(pOperator, pTableScanInfo, pBlock, &status); @@ -683,10 +680,7 @@ static SSDataBlock* doTableScanGroup(SOperatorInfo* pOperator) { if (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) { setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED); pTableScanInfo->scanFlag = REPEAT_SCAN; - qDebug( - "%s start to repeat ascending order scan data SELECT last_row(*),hostname from cpu group by hostname;blocks " - "due to query func required", - GET_TASKID(pTaskInfo)); + qDebug( "%s start to repeat ascending order scan data blocks due to query func required", GET_TASKID(pTaskInfo)); // do prepare for the next round table scan operation tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond); @@ -755,8 +749,12 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { if (pInfo->currentGroupId == -1) { pInfo->currentGroupId++; - if (pInfo->currentGroupId >= taosArrayGetSize(pTaskInfo->tableqinfoList.pGroupList)) { - setTaskStatus(pTaskInfo, TASK_COMPLETED); + qDebug("number:------------------------%d, %d", (int)taosArrayGetSize(pTaskInfo->tableqinfoList.pGroupList), + getNumOfOutputGroups(&pTaskInfo->tableqinfoList)); + if (pInfo->currentGroupId >= getNumOfOutputGroups(&pTaskInfo->tableqinfoList)/*taosArrayGetSize(pTaskInfo->tableqinfoList.pGroupList)*/) { +// if (pInfo->currentGroupId >= taosArrayGetSize(pTaskInfo->tableqinfoList.pGroupList)) { +// setTaskStatus(pTaskInfo, TASK_COMPLETED); + doSetOperatorCompleted(pOperator); return NULL; } @@ -790,7 +788,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { return result; } - setTaskStatus(pTaskInfo, TASK_COMPLETED); + doSetOperatorCompleted(pOperator); return NULL; } @@ -1122,12 +1120,7 @@ static uint64_t getGroupIdByCol(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, } static uint64_t getGroupIdByUid(SStreamScanInfo* pInfo, uint64_t uid) { - SHashObj* map = pInfo->pTableScanOp->pTaskInfo->tableqinfoList.map; - uint64_t* groupId = taosHashGet(map, &uid, sizeof(int64_t)); - if (groupId) { - return *groupId; - } - return 0; + return getTableGroupId(&pInfo->pTableScanOp->pTaskInfo->tableqinfoList, uid); } static uint64_t getGroupIdByData(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) { @@ -1549,12 +1542,13 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock pInfo->pRes->info.type = STREAM_NORMAL; pInfo->pRes->info.version = pBlock->info.version; - uint64_t* groupIdPre = taosHashGet(pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t)); - if (groupIdPre) { - pInfo->pRes->info.groupId = *groupIdPre; - } else { - pInfo->pRes->info.groupId = 0; - } + pInfo->pRes->info.groupId = getTableGroupId(&pTaskInfo->tableqinfoList, pBlock->info.uid); +// uint64_t* groupIdPre = taosHashGet(pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t)); +// if (groupIdPre) { +// pInfo->pRes->info.groupId = *groupIdPre; +// } else { +// pInfo->pRes->info.groupId = 0; +// } // todo extract method for (int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) { @@ -4202,6 +4196,8 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags return code; } + pTableListInfo->numOfOuputGroups = 1; + int64_t st1 = taosGetTimestampUs(); qDebug("generate queried table list completed, elapsed time:%.2f ms %s", (st1 - st) / 1000.0, idStr); @@ -4211,7 +4207,7 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags } pTableListInfo->needSortTableByGroupId = groupSort; - code = generateGroupIdMap(pTableListInfo, pHandle, pGroupTags); + code = setGroupIdMapForAllTables(pTableListInfo, pHandle, pGroupTags, groupSort); if (code != TSDB_CODE_SUCCESS) { return code; } -- GitLab