diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index 6241eae3ac1da062a1d541537ca43deec7d86c9a..92e8d90bb81da34c6cb83037aceeab114b01f0b1 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -62,7 +62,7 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe pInfo->pUidList = taosArrayInit(4, sizeof(int64_t)); // partition by tbname - if (taosArrayGetSize(pTableList->pGroupList) == taosArrayGetSize(pTableList->pTableList)) { + if (oneTableForEachGroup(pTableList) || (getTotalTables(pTableList) == 1)) { pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_ALL | (pScanNode->ignoreNull ? CACHESCAN_RETRIEVE_LAST : CACHESCAN_RETRIEVE_LAST_ROW); code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pTableList->pTableList, @@ -175,12 +175,18 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { return NULL; } } else { - size_t totalGroups = taosArrayGetSize(pTableList->pGroupList); - - while (pInfo->currentGroupIndex < totalGroups) { - SArray* pGroupTableList = taosArrayGetP(pTableList->pGroupList, pInfo->currentGroupIndex); + size_t num = getNumOfOutputGroups(pTableList); + while (pInfo->currentGroupIndex < num) { + + STableKeyInfo* p = NULL; + int32_t s = 0; + getTablesOfGroup(pTableList, pInfo->currentGroupIndex, &p, &s); + SArray* x = taosArrayInit(4, sizeof(STableKeyInfo)); + for(int32_t i = 0; i < s; ++i) { + taosArrayPush(x, &p[i]); + } - tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pGroupTableList, + tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, x, taosArrayGetSize(pInfo->matchInfo.pList), pTableList->suid, &pInfo->pLastrowReader); taosArrayClear(pInfo->pUidList); @@ -195,9 +201,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { if (pInfo->pRes->info.rows > 0) { if (pInfo->pseudoExprSup.numOfExprs > 0) { SExprSupp* pSup = &pInfo->pseudoExprSup; - - STableKeyInfo* pKeyInfo = taosArrayGet(pGroupTableList, 0); - pInfo->pRes->info.groupId = pKeyInfo->groupId; + pInfo->pRes->info.groupId = p->groupId; if (taosArrayGetSize(pInfo->pUidList) > 0) { ASSERT((pInfo->retrieveType & CACHESCAN_RETRIEVE_LAST_ROW) == CACHESCAN_RETRIEVE_LAST_ROW); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 7448f7a8fa881e6f0d7bd0b84835fb959f2f574d..c8fb0e23f8a86f34f10764de1a1980985eac6779 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -997,14 +997,6 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, } taosArrayDestroy(res); - - pListInfo->pGroupList = taosArrayInit(4, POINTER_BYTES); - if (pListInfo->pGroupList == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - // put into list as default group, remove it if grouping sorting is required later - taosArrayPush(pListInfo->pGroupList, &pListInfo->pTableList); return code; } @@ -1698,7 +1690,7 @@ int32_t getTablesOfGroup(const STableListInfo* pTableList, int32_t ordinalGroupI // 1. only one group exists, and 2. one table exists for each group. if (total == 1) { *size = getTotalTables(pTableList); - *pKeyInfo = taosArrayGet(pTableList->pTableList, 0); + *pKeyInfo = (*size == 0)? NULL:taosArrayGet(pTableList->pTableList, 0); return TSDB_CODE_SUCCESS; } else if (total == getTotalTables(pTableList)) { *size = 1; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 0091b4a31df1a9109bd1424ffe700afa5396cca2..b0480bef35908ff53d83aa5d2f86d782ab9825ba 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4026,15 +4026,15 @@ void doDestroyTableList(STableListInfo* pTableqinfoList) { taosArrayDestroy(pTableqinfoList->pTableList); taosHashCleanup(pTableqinfoList->map); if (pTableqinfoList->needSortTableByGroupId) { - for (int32_t i = 0; i < taosArrayGetSize(pTableqinfoList->pGroupList); i++) { - SArray* tmp = taosArrayGetP(pTableqinfoList->pGroupList, i); - if (tmp == pTableqinfoList->pTableList) { - continue; - } - taosArrayDestroy(tmp); - } +// for (int32_t i = 0; i < taosArrayGetSize(pTableqinfoList->pGroupList); i++) { +// SArray* tmp = taosArrayGetP(pTableqinfoList->pGroupList, i); +// if (tmp == pTableqinfoList->pTableList) { +// continue; +// } +// taosArrayDestroy(tmp); +// } } - taosArrayDestroy(pTableqinfoList->pGroupList); +// taosArrayDestroy(pTableqinfoList->pGroupList); pTableqinfoList->pTableList = NULL; pTableqinfoList->map = NULL; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index a4a9f233ab8b44c99ab60c1bcc6dcd3a2fb55391..b1adbd14c03b308dd66802da98c68ffd727b16df 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -749,20 +749,28 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { if (pInfo->currentGroupId == -1) { pInfo->currentGroupId++; - qDebug("number:------------------------%d, %d", (int)taosArrayGetSize(pTaskInfo->tableqinfoList.pGroupList), - getNumOfOutputGroups(&pTaskInfo->tableqinfoList)); +// qDebug("number:------------------------%d, %d", (int)taosArrayGetSize(pTaskInfo->tableqinfoList.pGroupList), +// getNumOfOutputGroups(&pTaskInfo->tableqinfoList)); if (pInfo->currentGroupId >= getNumOfOutputGroups(&pTaskInfo->tableqinfoList)/*taosArrayGetSize(pTaskInfo->tableqinfoList.pGroupList)*/) { // if (pInfo->currentGroupId >= taosArrayGetSize(pTaskInfo->tableqinfoList.pGroupList)) { -// setTaskStatus(pTaskInfo, TASK_COMPLETED); doSetOperatorCompleted(pOperator); return NULL; } - SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, pInfo->currentGroupId); + SArray* p = taosArrayInit(4, sizeof(STableKeyInfo)); tsdbReaderClose(pInfo->dataReader); - int32_t code = tsdbReaderOpen(pInfo->readHandle.vnode, &pInfo->cond, tableList, (STsdbReader**)&pInfo->dataReader, + STableKeyInfo* x = NULL; + int32_t num = 0; + getTablesOfGroup(&pTaskInfo->tableqinfoList, pInfo->currentGroupId, &x, &num); + for(int32_t i = 0; i < num; ++i) { + taosArrayPush(p, &x[i]); + } + + int32_t code = tsdbReaderOpen(pInfo->readHandle.vnode, &pInfo->cond, p, (STsdbReader**)&pInfo->dataReader, GET_TASKID(pTaskInfo)); + taosArrayDestroy(p); + if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, code); return NULL; @@ -775,11 +783,16 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { } pInfo->currentGroupId++; - if (pInfo->currentGroupId >= taosArrayGetSize(pTaskInfo->tableqinfoList.pGroupList)) { - setTaskStatus(pTaskInfo, TASK_COMPLETED); + if (pInfo->currentGroupId >= getNumOfOutputGroups(&pTaskInfo->tableqinfoList)) { + doSetOperatorCompleted(pOperator); return NULL; } + // reset value for the next group data output + pOperator->status = OP_OPENED; + pInfo->limitInfo.numOfOutputRows = 0; + pInfo->limitInfo.remainOffset = pInfo->limitInfo.limit.offset; + tsdbReaderReset(pInfo->dataReader, &pInfo->cond); pInfo->scanTimes = 0; @@ -1075,39 +1088,59 @@ void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin) { pTableScanInfo->cond.twindows = *pWin; pTableScanInfo->scanTimes = 0; pTableScanInfo->currentGroupId = -1; -} - -static void freeArray(void* array) { taosArrayDestroy(array); } - -static void resetTableScanOperator(SOperatorInfo* pTableScanOp) { - STableScanInfo* pTableScanInfo = pTableScanOp->info; - pTableScanInfo->cond.startVersion = -1; - pTableScanInfo->cond.endVersion = -1; - SArray* gpTbls = pTableScanOp->pTaskInfo->tableqinfoList.pGroupList; - SArray* allTbls = pTableScanOp->pTaskInfo->tableqinfoList.pTableList; - taosArrayClearP(gpTbls, freeArray); - taosArrayPush(gpTbls, &allTbls); - STimeWindow win = {.skey = INT64_MIN, .ekey = INT64_MAX}; - resetTableScanInfo(pTableScanOp->info, &win); + tsdbReaderClose(pTableScanInfo->dataReader); + pTableScanInfo->dataReader = NULL; } static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbUid, TSKEY startTs, TSKEY endTs, int64_t maxVersion) { - SArray* gpTbls = pTableScanOp->pTaskInfo->tableqinfoList.pGroupList; - taosArrayClear(gpTbls); STableKeyInfo tblInfo = {.uid = tbUid, .groupId = 0}; - SArray* tbls = taosArrayInit(1, sizeof(STableKeyInfo)); - taosArrayPush(tbls, &tblInfo); - taosArrayPush(gpTbls, &tbls); - STimeWindow win = {.skey = startTs, .ekey = endTs}; - STableScanInfo* pTableScanInfo = pTableScanOp->info; - pTableScanInfo->cond.startVersion = -1; - pTableScanInfo->cond.endVersion = maxVersion; - resetTableScanInfo(pTableScanOp->info, &win); - SSDataBlock* pRes = doTableScan(pTableScanOp); - resetTableScanOperator(pTableScanOp); - return pRes; + STableScanInfo* pTableScanInfo = pTableScanOp->info; + SQueryTableDataCond cond = pTableScanInfo->cond; + + cond.startVersion = -1; + cond.endVersion = maxVersion; + cond.twindows = (STimeWindow){.skey = startTs, .ekey = endTs}; + + SExecTaskInfo* pTaskInfo = pTableScanOp->pTaskInfo; + + SSDataBlock* pBlock = pTableScanInfo->pResBlock; + blockDataCleanup(pBlock); + + SArray* p = taosArrayInit(1, sizeof(STableKeyInfo)); + taosArrayPush(p, &tblInfo); + + STsdbReader* pReader = NULL; + int32_t code = tsdbReaderOpen(pTableScanInfo->readHandle.vnode, &cond, p, (STsdbReader**)&pReader, GET_TASKID(pTaskInfo)); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + return NULL; + } + + bool hasBlock = tsdbNextDataBlock(pReader); + if (hasBlock) { + SDataBlockInfo binfo = {0}; + tsdbRetrieveDataBlockInfo(pReader, &binfo); + + SArray* pCols = tsdbRetrieveDataBlock(pReader, NULL); + blockDataEnsureCapacity(pBlock, binfo.rows); + + pBlock->info.window = binfo.window; + pBlock->info.uid = binfo.uid; + pBlock->info.rows = binfo.rows; + + relocateColumnData(pBlock, pTableScanInfo->matchInfo.pList, pCols, true); + doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo); + + pBlock->info.groupId = getTableGroupId(&pTaskInfo->tableqinfoList, binfo.uid); + } + + tsdbReaderClose(pReader); + qDebug("retrieve prev rows:%d, skey:%" PRId64 ", ekey:%" PRId64 " uid:%" PRIu64 ", max ver:%" PRId64 + ", suid:%" PRIu64, pBlock->info.rows, startTs, endTs, tbUid, maxVersion, cond.suid); + + return pBlock->info.rows > 0 ? pBlock : NULL; } static uint64_t getGroupIdByCol(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) { @@ -2329,11 +2362,20 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys pTSInfo->cond.endVersion = pHandle->version; } - SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, 0); +// SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, 0); + STableKeyInfo* pList = NULL; + int32_t num = 0; + getTablesOfGroup(&pTaskInfo->tableqinfoList, 0, &pList, &num); + + SArray* p = taosArrayInit(4, sizeof(STableKeyInfo)); + for(int32_t i = 0; i < num; ++i) { + taosArrayPush(p, &pList[i]); + } + if (pHandle->initTableReader) { pTSInfo->scanMode = TABLE_SCAN__TABLE_ORDER; pTSInfo->dataReader = NULL; - if (tsdbReaderOpen(pHandle->vnode, &pTSInfo->cond, tableList, &pTSInfo->dataReader, NULL) < 0) { + if (tsdbReaderOpen(pHandle->vnode, &pTSInfo->cond, p, &pTSInfo->dataReader, NULL) < 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto _error; }