From 8b6d7db7adc727f69d380a7eae108ba82c7fcb9a Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Sun, 16 Jul 2023 17:54:39 +0800 Subject: [PATCH] enhance: use one tsdb reader to read table sequentially --- source/libs/executor/inc/executorInt.h | 2 +- source/libs/executor/inc/tsort.h | 1 + source/libs/executor/src/scanoperator.c | 84 +++++-------------------- source/libs/executor/src/tsort.c | 84 +++++++++++++++++++++++++ 4 files changed, 102 insertions(+), 69 deletions(-) diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index b3d0ff8225..3b3f2cbb8a 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -232,7 +232,6 @@ typedef struct STableMergeScanInfo { int32_t tableEndIndex; bool hasGroupId; uint64_t groupId; - SArray* queryConds; // array of queryTableDataCond STableScanBase base; int32_t bufPageSize; uint32_t sortBufSize; // max buffer size for in-memory sort @@ -245,6 +244,7 @@ typedef struct STableMergeScanInfo { int64_t numOfRows; SScanInfo scanInfo; int32_t scanTimes; + int32_t readIdx; SSDataBlock* pResBlock; SSampleExecInfo sample; // sample execution info SSortExecInfo sortExecInfo; diff --git a/source/libs/executor/inc/tsort.h b/source/libs/executor/inc/tsort.h index 627aa825c6..a2cb1234ff 100644 --- a/source/libs/executor/inc/tsort.h +++ b/source/libs/executor/inc/tsort.h @@ -26,6 +26,7 @@ extern "C" { enum { SORT_MULTISOURCE_MERGE = 0x1, SORT_SINGLESOURCE_SORT = 0x2, + SORT_TABLE_MERGE_SCAN = 0x3 }; typedef struct SMultiMergeSource { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 9abe4ffef6..cf1e93d3f2 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -13,8 +13,6 @@ * along with this program. If not, see . */ -// clang-format off - #include "executorInt.h" #include "filter.h" #include "function.h" @@ -56,7 +54,6 @@ typedef struct STableMergeScanSortSourceParam { int32_t readerIdx; uint64_t uid; SSDataBlock* inputBlock; - STsdbReader* dataReader; } STableMergeScanSortSourceParam; typedef struct STableCountScanOperatorInfo { @@ -2741,28 +2738,28 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { SSDataBlock* pBlock = source->inputBlock; int32_t code = 0; - SQueryTableDataCond* pQueryCond = taosArrayGet(pInfo->queryConds, readIdx); - int64_t st = taosGetTimestampUs(); void* p = tableListGetInfo(pInfo->base.pTableListInfo, readIdx + pInfo->tableStartIndex); SReadHandle* pHandle = &pInfo->base.readHandle; - if (NULL == source->dataReader) { - code = pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, (void**)&source->dataReader, GET_TASKID(pTaskInfo), false, NULL); + + bool hasNext = false; + + if (NULL == pInfo->base.dataReader) { + code = pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInfo->base.cond, p, 1, pBlock, (void**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), false, NULL); if (code != 0) { T_LONG_JMP(pTaskInfo->env, code); } + pInfo->readIdx = readIdx + pInfo->tableStartIndex ; + } else if (pInfo->readIdx != readIdx + pInfo->tableStartIndex) { + pAPI->tsdReader.tsdSetQueryTableList(pInfo->base.dataReader, p, 1); + pAPI->tsdReader.tsdReaderResetStatus(pInfo->base.dataReader, &pInfo->base.cond); } - pInfo->base.dataReader = source->dataReader; STsdbReader* reader = pInfo->base.dataReader; - bool hasNext = false; - qTrace("tsdb/read-table-data: %p, enter next reader", reader); - while (true) { code = pAPI->tsdReader.tsdNextDataBlock(reader, &hasNext); if (code != 0) { pAPI->tsdReader.tsdReaderReleaseDataBlock(reader); - pInfo->base.dataReader = NULL; T_LONG_JMP(pTaskInfo->env, code); } @@ -2772,7 +2769,6 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { if (isTaskKilled(pTaskInfo)) { pAPI->tsdReader.tsdReaderReleaseDataBlock(reader); - pInfo->base.dataReader = NULL; T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); } @@ -2782,12 +2778,6 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { continue; } - if (pQueryCond->order == TSDB_ORDER_ASC) { - pQueryCond->twindows.skey = pBlock->info.window.ekey + 1; - } else { - pQueryCond->twindows.ekey = pBlock->info.window.skey - 1; - } - uint32_t status = 0; code = loadDataBlock(pOperator, &pInfo->base, pBlock, &status); // code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, pBlock, &status); @@ -2809,14 +2799,9 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { pOperator->resultInfo.totalRows += pBlock->info.rows; pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; - qTrace("tsdb/read-table-data: %p, close reader", reader); - pInfo->base.dataReader = NULL; return pBlock; } - pAPI->tsdReader.tsdReaderClose(source->dataReader); - source->dataReader = NULL; - pInfo->base.dataReader = NULL; blockDataDestroy(source->inputBlock); source->inputBlock = NULL; return NULL; @@ -2870,32 +2855,18 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { int32_t tableStartIdx = pInfo->tableStartIndex; int32_t tableEndIdx = pInfo->tableEndIndex; - pInfo->base.dataReader = NULL; - // todo the total available buffer should be determined by total capacity of buffer of this task. // the additional one is reserved for merge result - // pInfo->sortBufSize = pInfo->bufPageSize * (tableEndIdx - tableStartIdx + 1 + 1); - int32_t kWay = (TSDB_MAX_BYTES_PER_ROW * 2) / (pInfo->pResBlock->info.rowSize); - if (kWay >= 128) { - kWay = 128; - } else if (kWay <= 2) { - kWay = 2; - } else { - int i = 2; - while (i * 2 <= kWay) i = i * 2; - kWay = i; - } + pInfo->sortBufSize = pInfo->bufPageSize * (tableEndIdx - tableStartIdx + 1 + 1); - pInfo->sortBufSize = pInfo->bufPageSize * (kWay + 1); int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; - pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage, + pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_TABLE_MERGE_SCAN, pInfo->bufPageSize, numOfBufPage, pInfo->pSortInputBlock, pTaskInfo->id.str, 0, 0, 0); tsortSetFetchRawDataFp(pInfo->pSortHandle, getTableDataBlockImpl, NULL, NULL); // one table has one data block int32_t numOfTable = tableEndIdx - tableStartIdx + 1; - pInfo->queryConds = taosArrayInit(numOfTable, sizeof(SQueryTableDataCond)); for (int32_t i = 0; i < numOfTable; ++i) { STableMergeScanSortSourceParam param = {0}; @@ -2904,10 +2875,6 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { param.inputBlock = createOneDataBlock(pInfo->pResBlock, false); taosArrayPush(pInfo->sortSourceParams, ¶m); - - SQueryTableDataCond cond; - dumpQueryTableCond(&pInfo->base.cond, &cond); - taosArrayPush(pInfo->queryConds, &cond); } for (int32_t i = 0; i < numOfTable; ++i) { @@ -2932,8 +2899,6 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStorageAPI* pAPI = &pTaskInfo->storageAPI; - int32_t numOfTable = taosArrayGetSize(pInfo->queryConds); - SSortExecInfo sortExecInfo = tsortGetSortExecInfo(pInfo->pSortHandle); pInfo->sortExecInfo.sortMethod = sortExecInfo.sortMethod; pInfo->sortExecInfo.sortBuffer = sortExecInfo.sortBuffer; @@ -2941,24 +2906,15 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) { pInfo->sortExecInfo.readBytes += sortExecInfo.readBytes; pInfo->sortExecInfo.writeBytes += sortExecInfo.writeBytes; - for (int32_t i = 0; i < numOfTable; ++i) { + for (int32_t i = 0; i < taosArrayGetSize(pInfo->sortSourceParams); ++i) { STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i); blockDataDestroy(param->inputBlock); - pAPI->tsdReader.tsdReaderClose(param->dataReader); - param->dataReader = NULL; } taosArrayClear(pInfo->sortSourceParams); tsortDestroySortHandle(pInfo->pSortHandle); pInfo->pSortHandle = NULL; - for (int32_t i = 0; i < taosArrayGetSize(pInfo->queryConds); i++) { - SQueryTableDataCond* cond = taosArrayGet(pInfo->queryConds, i); - taosMemoryFree(cond->colList); - } - taosArrayDestroy(pInfo->queryConds); - pInfo->queryConds = NULL; - resetLimitInfoForNextGroup(&pInfo->limitInfo); return TSDB_CODE_SUCCESS; } @@ -3056,13 +3012,11 @@ void destroyTableMergeScanOperatorInfo(void* param) { STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param; cleanupQueryTableDataCond(&pTableScanInfo->base.cond); - int32_t numOfTable = taosArrayGetSize(pTableScanInfo->queryConds); + int32_t numOfTable = taosArrayGetSize(pTableScanInfo->sortSourceParams); for (int32_t i = 0; i < numOfTable; i++) { STableMergeScanSortSourceParam* p = taosArrayGet(pTableScanInfo->sortSourceParams, i); blockDataDestroy(p->inputBlock); - pTableScanInfo->base.readerAPI.tsdReaderClose(p->dataReader); - p->dataReader = NULL; } pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader); @@ -3072,12 +3026,6 @@ void destroyTableMergeScanOperatorInfo(void* param) { tsortDestroySortHandle(pTableScanInfo->pSortHandle); pTableScanInfo->pSortHandle = NULL; - for (int i = 0; i < taosArrayGetSize(pTableScanInfo->queryConds); i++) { - SQueryTableDataCond* pCond = taosArrayGet(pTableScanInfo->queryConds, i); - taosMemoryFree(pCond->colList); - } - - taosArrayDestroy(pTableScanInfo->queryConds); destroyTableScanBase(&pTableScanInfo->base, &pTableScanInfo->base.readerAPI); pTableScanInfo->pResBlock = blockDataDestroy(pTableScanInfo->pResBlock); @@ -3143,6 +3091,8 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN pInfo->base.scanFlag = MAIN_SCAN; pInfo->base.readHandle = *readHandle; + pInfo->readIdx = -1; + pInfo->base.limitInfo.limit.limit = -1; pInfo->base.limitInfo.slimit.limit = -1; pInfo->base.pTableListInfo = pTableListInfo; @@ -3573,6 +3523,4 @@ static void destoryTableCountScanOperator(void* param) { taosArrayDestroy(pTableCountScanInfo->stbUidList); taosMemoryFreeClear(param); -} - -// clang-format on +} \ No newline at end of file diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index d26db6536f..c8266ea0d9 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -769,6 +769,61 @@ int32_t getProperSortPageSize(size_t rowSize, uint32_t numOfCols) { return pgSize; } +static int32_t createPageBuf(SSortHandle* pHandle) { + if (pHandle->pBuf == NULL) { + if (!osTempSpaceAvailable()) { + terrno = TSDB_CODE_NO_DISKSPACE; + qError("create page buf failed since %s, tempDir:%s", terrstr(), tsTempDir); + return terrno; + } + + int32_t code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize, + "tableBlocksBuf", tsTempDir); + dBufSetPrintInfo(pHandle->pBuf); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + } + return 0; +} + +static int32_t addDataBlockToPageBuf(SSortHandle * pHandle, SSDataBlock* pDataBlock, SArray* aPgId) { + int32_t start = 0; + while (start < pDataBlock->info.rows) { + int32_t stop = 0; + blockDataSplitRows(pDataBlock, pDataBlock->info.hasVarCol, start, &stop, pHandle->pageSize); + SSDataBlock* p = blockDataExtractBlock(pDataBlock, start, stop - start + 1); + if (p == NULL) { + taosArrayDestroy(aPgId); + return terrno; + } + + int32_t pageId = -1; + void* pPage = getNewBufPage(pHandle->pBuf, &pageId); + if (pPage == NULL) { + taosArrayDestroy(aPgId); + blockDataDestroy(p); + return terrno; + } + + taosArrayPush(aPgId, &pageId); + + int32_t size = blockDataGetSize(p) + sizeof(int32_t) + taosArrayGetSize(p->pDataBlock) * sizeof(int32_t); + ASSERT(size <= getBufPageSize(pHandle->pBuf)); + + blockDataToBuf(pPage, p); + + setBufPageDirty(pPage, true); + releaseBufPage(pHandle->pBuf, pPage); + + blockDataDestroy(p); + start = stop + 1; + } + + blockDataCleanup(pDataBlock); + return 0; +} + static int32_t createInitialSources(SSortHandle* pHandle) { size_t sortBufSize = pHandle->numOfPages * pHandle->pageSize; int32_t code = 0; @@ -875,6 +930,35 @@ static int32_t createInitialSources(SSortHandle* pHandle) { code = doAddToBuf(pHandle->pDataBlock, pHandle); } } + } else if (pHandle->type == SORT_TABLE_MERGE_SCAN) { + size_t nSrc = taosArrayGetSize(pHandle->pOrderedSource); + SArray* aExtSrc = taosArrayInit(nSrc, POINTER_BYTES); + + // pHandle->numOfPages = 1024; //todo check sortbufsize + createPageBuf(pHandle); + + for (int i = 0; i < nSrc; ++i) { + SArray* aPgId = taosArrayInit(8, sizeof(int32_t)); + + SSortSource* pSrc = taosArrayGetP(pHandle->pOrderedSource, i); + SSDataBlock* pBlk = pHandle->fetchfp(pSrc->param); + while (pBlk != NULL) { + addDataBlockToPageBuf(pHandle, pBlk, aPgId); + pBlk = pHandle->fetchfp(pSrc->param); + } + SSDataBlock* pBlock = createOneDataBlock(pHandle->pDataBlock, false); + code = doAddNewExternalMemSource(pHandle->pBuf, aExtSrc, pBlock, &pHandle->sourceId, aPgId); + if (code != TSDB_CODE_SUCCESS) { + taosArrayDestroy(aExtSrc); + return code; + } + } + tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL); + taosArrayAddAll(pHandle->pOrderedSource, aExtSrc); + taosArrayDestroy(aExtSrc); + + pHandle->type = SORT_SINGLESOURCE_SORT; + } return code; -- GitLab