From 9a0e9df5678774ec586e6c7a5e367a331f55aad0 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Sun, 16 Jul 2023 23:39:41 +0800 Subject: [PATCH] enhance: block is ts sorted and each book is a source --- source/libs/executor/src/scanoperator.c | 46 ++++++++----------------- source/libs/executor/src/tsort.c | 22 ++++++------ 2 files changed, 25 insertions(+), 43 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 93addef91d..7babf68e56 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -53,6 +53,7 @@ typedef struct STableMergeScanSortSourceParam { SOperatorInfo* pOperator; int32_t readerIdx; uint64_t uid; + STsdbReader* reader; } STableMergeScanSortSourceParam; typedef struct STableCountScanOperatorInfo { @@ -2733,28 +2734,12 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStorageAPI* pAPI = &pTaskInfo->storageAPI; - int32_t readIdx = source->readerIdx; SSDataBlock* pBlock = pInfo->pReaderBlock; int32_t code = 0; int64_t st = taosGetTimestampUs(); - void* p = tableListGetInfo(pInfo->base.pTableListInfo, readIdx + pInfo->tableStartIndex); - SReadHandle* pHandle = &pInfo->base.readHandle; - bool hasNext = false; - if (NULL == pInfo->base.dataReader) { - code = pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInfo->base.cond, p, 1, pBlock, (void**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), false, NULL); - if (code != 0) { - T_LONG_JMP(pTaskInfo->env, code); - } - pInfo->readIdx = readIdx + pInfo->tableStartIndex ; - } else if (pInfo->readIdx != readIdx + pInfo->tableStartIndex) { - pAPI->tsdReader.tsdSetQueryTableList(pInfo->base.dataReader, p, 1); - pAPI->tsdReader.tsdReaderResetStatus(pInfo->base.dataReader, &pInfo->base.cond); - pInfo->readIdx = readIdx + pInfo->tableStartIndex ; - } - STsdbReader* reader = pInfo->base.dataReader; while (true) { code = pAPI->tsdReader.tsdNextDataBlock(reader, &hasNext); @@ -2837,6 +2822,8 @@ int32_t dumpQueryTableCond(const SQueryTableDataCond* src, SQueryTableDataCond* int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { STableMergeScanInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SReadHandle* pHandle = &pInfo->base.readHandle; + SStorageAPI* pAPI = &pTaskInfo->storageAPI; { size_t numOfTables = tableListGetSize(pInfo->base.pTableListInfo); @@ -2866,21 +2853,15 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { // one table has one data block int32_t numOfTable = tableEndIdx - tableStartIdx + 1; - for (int32_t i = 0; i < numOfTable; ++i) { - STableMergeScanSortSourceParam param = {0}; - param.readerIdx = i; - param.pOperator = pOperator; + STableMergeScanSortSourceParam param = {0}; + param.pOperator = pOperator; + STableKeyInfo* startKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, tableStartIdx); + pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInfo->base.cond, startKeyInfo, numOfTable, pInfo->pReaderBlock, (void**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), false, NULL); - taosArrayPush(pInfo->sortSourceParams, ¶m); - } - - for (int32_t i = 0; i < numOfTable; ++i) { - SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource)); - STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i); - ps->param = param; - ps->onlyRef = true; - tsortAddSource(pInfo->pSortHandle, ps); - } + SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource)); + ps->param = ¶m; + ps->onlyRef = true; + tsortAddSource(pInfo->pSortHandle, ps); int32_t code = tsortOpen(pInfo->pSortHandle); @@ -2903,7 +2884,10 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) { pInfo->sortExecInfo.readBytes += sortExecInfo.readBytes; pInfo->sortExecInfo.writeBytes += sortExecInfo.writeBytes; - taosArrayClear(pInfo->sortSourceParams); + if (pInfo->base.dataReader != NULL) { + pAPI->tsdReader.tsdReaderClose(pInfo->base.dataReader); + pInfo->base.dataReader = NULL; + } tsortDestroySortHandle(pInfo->pSortHandle); pInfo->pSortHandle = NULL; diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index c8266ea0d9..c262153464 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -937,22 +937,20 @@ static int32_t createInitialSources(SSortHandle* pHandle) { // pHandle->numOfPages = 1024; //todo check sortbufsize createPageBuf(pHandle); - for (int i = 0; i < nSrc; ++i) { + SSortSource* pSrc = taosArrayGetP(pHandle->pOrderedSource, 0); + SSDataBlock* pBlk = pHandle->fetchfp(pSrc->param); + while (pBlk != NULL) { SArray* aPgId = taosArrayInit(8, sizeof(int32_t)); - - SSortSource* pSrc = taosArrayGetP(pHandle->pOrderedSource, i); - SSDataBlock* pBlk = pHandle->fetchfp(pSrc->param); - while (pBlk != NULL) { - addDataBlockToPageBuf(pHandle, pBlk, aPgId); - pBlk = pHandle->fetchfp(pSrc->param); - } - SSDataBlock* pBlock = createOneDataBlock(pHandle->pDataBlock, false); - code = doAddNewExternalMemSource(pHandle->pBuf, aExtSrc, pBlock, &pHandle->sourceId, aPgId); + addDataBlockToPageBuf(pHandle, pBlk, aPgId); + SSDataBlock* pMemSrcBlk = createOneDataBlock(pHandle->pDataBlock, false); + code = doAddNewExternalMemSource(pHandle->pBuf, aExtSrc, pMemSrcBlk, &pHandle->sourceId, aPgId); if (code != TSDB_CODE_SUCCESS) { - taosArrayDestroy(aExtSrc); - return code; + taosArrayDestroy(aExtSrc); + return code; } + pBlk = pHandle->fetchfp(pSrc->param); } + tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL); taosArrayAddAll(pHandle->pOrderedSource, aExtSrc); taosArrayDestroy(aExtSrc); -- GitLab