From ddab83a2baa9a7a467bd701bef851e1e92373681 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 21 Mar 2023 11:42:46 +0800 Subject: [PATCH] enh: optimize table merge scan performance --- source/libs/executor/inc/tsort.h | 3 ++- source/libs/executor/src/scanoperator.c | 29 ++++++++++++------------- source/libs/executor/src/tsort.c | 22 ++++++++++++++----- 3 files changed, 33 insertions(+), 21 deletions(-) diff --git a/source/libs/executor/inc/tsort.h b/source/libs/executor/inc/tsort.h index cff568aebc..d51a24bb43 100644 --- a/source/libs/executor/inc/tsort.h +++ b/source/libs/executor/inc/tsort.h @@ -44,7 +44,8 @@ typedef struct SSortSource { void* param; bool onlyRef; }; - + int64_t fetchUs; + int64_t fetchNum; } SSortSource; typedef struct SMsortComparParam { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 29a23cd90b..5db1e00410 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -43,7 +43,8 @@ typedef struct STableMergeScanSortSourceParam { SOperatorInfo* pOperator; int32_t readerIdx; uint64_t uid; - SSDataBlock* inputBlock; + SSDataBlock* inputBlock; + STsdbReader* dataReader; } STableMergeScanSortSourceParam; static bool processBlockWithProbability(const SSampleExecInfo* pInfo); @@ -2586,6 +2587,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; int32_t readIdx = source->readerIdx; SSDataBlock* pBlock = source->inputBlock; + int32_t code = 0; SQueryTableDataCond* pQueryCond = taosArrayGet(pInfo->queryConds, readIdx); @@ -2593,12 +2595,14 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { void* p = tableListGetInfo(pTaskInfo->pTableInfoList, readIdx + pInfo->tableStartIndex); SReadHandle* pHandle = &pInfo->base.readHandle; - int32_t code = - tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, &pInfo->base.dataReader, GET_TASKID(pTaskInfo)); - if (code != 0) { - T_LONG_JMP(pTaskInfo->env, code); + if (NULL == source->dataReader) { + code = tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, &source->dataReader, GET_TASKID(pTaskInfo)); + if (code != 0) { + T_LONG_JMP(pTaskInfo->env, code); + } } - + + pInfo->base.dataReader = source->dataReader; STsdbReader* reader = pInfo->base.dataReader; qTrace("tsdb/read-table-data: %p, enter next reader", reader); while (tsdbNextDataBlock(reader)) { @@ -2637,14 +2641,10 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; qTrace("tsdb/read-table-data: %p, close reader", reader); - tsdbReaderClose(pInfo->base.dataReader); pInfo->base.dataReader = NULL; return pBlock; } - qDebug("8"); - - tsdbReaderClose(pInfo->base.dataReader); pInfo->base.dataReader = NULL; return NULL; } @@ -2759,6 +2759,8 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) { for (int32_t i = 0; i < numOfTable; ++i) { STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i); blockDataDestroy(param->inputBlock); + tsdbReaderClose(param->dataReader); + param->dataReader = NULL; } taosArrayClear(pInfo->sortSourceParams); @@ -2871,15 +2873,14 @@ void destroyTableMergeScanOperatorInfo(void* param) { for (int32_t i = 0; i < numOfTable; i++) { STableMergeScanSortSourceParam* p = taosArrayGet(pTableScanInfo->sortSourceParams, i); blockDataDestroy(p->inputBlock); + tsdbReaderClose(p->dataReader); + p->dataReader = NULL; } taosArrayDestroy(pTableScanInfo->sortSourceParams); tsortDestroySortHandle(pTableScanInfo->pSortHandle); pTableScanInfo->pSortHandle = NULL; - tsdbReaderClose(pTableScanInfo->base.dataReader); - pTableScanInfo->base.dataReader = NULL; - for (int i = 0; i < taosArrayGetSize(pTableScanInfo->queryConds); i++) { SQueryTableDataCond* pCond = taosArrayGet(pTableScanInfo->queryConds, i); taosMemoryFree(pCond->colList); @@ -2896,8 +2897,6 @@ void destroyTableMergeScanOperatorInfo(void* param) { taosArrayDestroy(pTableScanInfo->pSortInfo); cleanupExprSupp(&pTableScanInfo->base.pseudoSup); - tsdbReaderClose(pTableScanInfo->base.dataReader); - pTableScanInfo->base.dataReader = NULL; taosLRUCacheCleanup(pTableScanInfo->base.metaCache.pTableMetaEntryCache); taosMemoryFreeClear(param); diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 291b0cf515..6c8e581b3f 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -108,12 +108,18 @@ static int32_t sortComparCleanup(SMsortComparParam* cmpParam) { return TSDB_CODE_SUCCESS; } -void tsortClearOrderdSource(SArray* pOrderedSource) { +void tsortClearOrderdSource(SArray* pOrderedSource, int64_t *fetchUs, int64_t *fetchNum) { for (size_t i = 0; i < taosArrayGetSize(pOrderedSource); i++) { SSortSource** pSource = taosArrayGet(pOrderedSource, i); if (NULL == *pSource) { continue; } + + if (fetchUs) { + *fetchUs += (*pSource)->fetchUs; + *fetchNum += (*pSource)->fetchNum; + } + // release pageIdList if ((*pSource)->pageIdList) { taosArrayDestroy((*pSource)->pageIdList); @@ -147,7 +153,10 @@ void tsortDestroySortHandle(SSortHandle* pSortHandle) { taosMemoryFreeClear(pSortHandle->idStr); blockDataDestroy(pSortHandle->pDataBlock); - tsortClearOrderdSource(pSortHandle->pOrderedSource); + int64_t fetchUs = 0, fetchNum = 0; + tsortClearOrderdSource(pSortHandle->pOrderedSource, &fetchUs, &fetchNum); + qError("all source fetch time: %" PRId64 "us num:%" PRId64 " %s", fetchUs, fetchNum, pSortHandle->idStr); + taosArrayDestroy(pSortHandle->pOrderedSource); taosMemoryFreeClear(pSortHandle); } @@ -307,7 +316,7 @@ static int32_t sortComparInit(SMsortComparParam* pParam, SArray* pSources, int32 } int64_t et = taosGetTimestampUs(); - qDebug("init for merge sort completed, elapsed time:%.2f ms, %s", (et - st) / 1000.0, pHandle->idStr); + qError("init for merge sort completed, elapsed time:%.2f ms, %s", (et - st) / 1000.0, pHandle->idStr); } return code; @@ -365,7 +374,10 @@ static int32_t adjustMergeTreeForNextTuple(SSortSource* pSource, SMultiwayMergeT releaseBufPage(pHandle->pBuf, pPage); } } else { + int64_t st = taosGetTimestampUs(); pSource->src.pBlock = pHandle->fetchfp(((SSortSource*)pSource)->param); + pSource->fetchUs += taosGetTimestampUs() - st; + pSource->fetchNum++; if (pSource->src.pBlock == NULL) { (*numOfCompleted) += 1; pSource->src.rowIndex = -1; @@ -602,7 +614,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { } } - tsortClearOrderdSource(pHandle->pOrderedSource); + tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL); taosArrayAddAll(pHandle->pOrderedSource, pResList); taosArrayDestroy(pResList); @@ -644,7 +656,7 @@ static int32_t createInitialSources(SSortHandle* pHandle) { SSortSource* source = *pSource; *pSource = NULL; - tsortClearOrderdSource(pHandle->pOrderedSource); + tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL); while (1) { SSDataBlock* pBlock = pHandle->fetchfp(source->param); -- GitLab