未验证 提交 7258ea92 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #20576 from taosdata/fix/TS-2933

enh: optimize table merge scan performance
......@@ -44,7 +44,8 @@ typedef struct SSortSource {
void* param;
bool onlyRef;
};
int64_t fetchUs;
int64_t fetchNum;
} SSortSource;
typedef struct SMsortComparParam {
......
......@@ -31,6 +31,7 @@
#include "thash.h"
#include "ttypes.h"
#define MULTI_READER_MAX_TABLE_NUM 5000
#define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN)
#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
......@@ -43,7 +44,9 @@ typedef struct STableMergeScanSortSourceParam {
SOperatorInfo* pOperator;
int32_t readerIdx;
uint64_t uid;
SSDataBlock* inputBlock;
SSDataBlock* inputBlock;
bool multiReader;
STsdbReader* dataReader;
} STableMergeScanSortSourceParam;
static bool processBlockWithProbability(const SSampleExecInfo* pInfo);
......@@ -2588,6 +2591,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
int32_t readIdx = source->readerIdx;
SSDataBlock* pBlock = source->inputBlock;
int32_t code = 0;
SQueryTableDataCond* pQueryCond = taosArrayGet(pInfo->queryConds, readIdx);
......@@ -2595,17 +2599,20 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
void* p = tableListGetInfo(pTaskInfo->pTableInfoList, readIdx + pInfo->tableStartIndex);
SReadHandle* pHandle = &pInfo->base.readHandle;
int32_t code =
tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, &pInfo->base.dataReader, GET_TASKID(pTaskInfo));
if (code != 0) {
T_LONG_JMP(pTaskInfo->env, code);
if (NULL == source->dataReader || !source->multiReader) {
code = tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, &source->dataReader, GET_TASKID(pTaskInfo));
if (code != 0) {
T_LONG_JMP(pTaskInfo->env, code);
}
}
pInfo->base.dataReader = source->dataReader;
STsdbReader* reader = pInfo->base.dataReader;
qTrace("tsdb/read-table-data: %p, enter next reader", reader);
while (tsdbNextDataBlock(reader)) {
if (isTaskKilled(pTaskInfo)) {
tsdbReleaseDataBlock(reader);
pInfo->base.dataReader = NULL;
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
}
......@@ -2639,14 +2646,18 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
qTrace("tsdb/read-table-data: %p, close reader", reader);
tsdbReaderClose(pInfo->base.dataReader);
if (!source->multiReader) {
tsdbReaderClose(pInfo->base.dataReader);
source->dataReader = NULL;
}
pInfo->base.dataReader = NULL;
return pBlock;
}
qDebug("8");
tsdbReaderClose(pInfo->base.dataReader);
if (!source->multiReader) {
tsdbReaderClose(pInfo->base.dataReader);
source->dataReader = NULL;
}
pInfo->base.dataReader = NULL;
return NULL;
}
......@@ -2718,6 +2729,7 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
STableMergeScanSortSourceParam param = {0};
param.readerIdx = i;
param.pOperator = pOperator;
param.multiReader = (numOfTable <= MULTI_READER_MAX_TABLE_NUM) ? true : false;
param.inputBlock = createOneDataBlock(pInfo->pResBlock, false);
blockDataEnsureCapacity(param.inputBlock, pOperator->resultInfo.capacity);
......@@ -2761,6 +2773,8 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) {
for (int32_t i = 0; i < numOfTable; ++i) {
STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i);
blockDataDestroy(param->inputBlock);
tsdbReaderClose(param->dataReader);
param->dataReader = NULL;
}
taosArrayClear(pInfo->sortSourceParams);
......@@ -2871,15 +2885,17 @@ void destroyTableMergeScanOperatorInfo(void* param) {
for (int32_t i = 0; i < numOfTable; i++) {
STableMergeScanSortSourceParam* p = taosArrayGet(pTableScanInfo->sortSourceParams, i);
blockDataDestroy(p->inputBlock);
tsdbReaderClose(p->dataReader);
p->dataReader = NULL;
}
tsdbReaderClose(pTableScanInfo->base.dataReader);
pTableScanInfo->base.dataReader = NULL;
taosArrayDestroy(pTableScanInfo->sortSourceParams);
tsortDestroySortHandle(pTableScanInfo->pSortHandle);
pTableScanInfo->pSortHandle = NULL;
tsdbReaderClose(pTableScanInfo->base.dataReader);
pTableScanInfo->base.dataReader = NULL;
for (int i = 0; i < taosArrayGetSize(pTableScanInfo->queryConds); i++) {
SQueryTableDataCond* pCond = taosArrayGet(pTableScanInfo->queryConds, i);
taosMemoryFree(pCond->colList);
......@@ -2896,8 +2912,6 @@ void destroyTableMergeScanOperatorInfo(void* param) {
taosArrayDestroy(pTableScanInfo->pSortInfo);
cleanupExprSupp(&pTableScanInfo->base.pseudoSup);
tsdbReaderClose(pTableScanInfo->base.dataReader);
pTableScanInfo->base.dataReader = NULL;
taosLRUCacheCleanup(pTableScanInfo->base.metaCache.pTableMetaEntryCache);
taosMemoryFreeClear(param);
......
......@@ -108,12 +108,18 @@ static int32_t sortComparCleanup(SMsortComparParam* cmpParam) {
return TSDB_CODE_SUCCESS;
}
void tsortClearOrderdSource(SArray* pOrderedSource) {
void tsortClearOrderdSource(SArray* pOrderedSource, int64_t *fetchUs, int64_t *fetchNum) {
for (size_t i = 0; i < taosArrayGetSize(pOrderedSource); i++) {
SSortSource** pSource = taosArrayGet(pOrderedSource, i);
if (NULL == *pSource) {
continue;
}
if (fetchUs) {
*fetchUs += (*pSource)->fetchUs;
*fetchNum += (*pSource)->fetchNum;
}
// release pageIdList
if ((*pSource)->pageIdList) {
taosArrayDestroy((*pSource)->pageIdList);
......@@ -147,7 +153,10 @@ void tsortDestroySortHandle(SSortHandle* pSortHandle) {
taosMemoryFreeClear(pSortHandle->idStr);
blockDataDestroy(pSortHandle->pDataBlock);
tsortClearOrderdSource(pSortHandle->pOrderedSource);
int64_t fetchUs = 0, fetchNum = 0;
tsortClearOrderdSource(pSortHandle->pOrderedSource, &fetchUs, &fetchNum);
qError("all source fetch time: %" PRId64 "us num:%" PRId64 " %s", fetchUs, fetchNum, pSortHandle->idStr);
taosArrayDestroy(pSortHandle->pOrderedSource);
taosMemoryFreeClear(pSortHandle);
}
......@@ -307,7 +316,7 @@ static int32_t sortComparInit(SMsortComparParam* pParam, SArray* pSources, int32
}
int64_t et = taosGetTimestampUs();
qDebug("init for merge sort completed, elapsed time:%.2f ms, %s", (et - st) / 1000.0, pHandle->idStr);
qError("init for merge sort completed, elapsed time:%.2f ms, %s", (et - st) / 1000.0, pHandle->idStr);
}
return code;
......@@ -365,7 +374,10 @@ static int32_t adjustMergeTreeForNextTuple(SSortSource* pSource, SMultiwayMergeT
releaseBufPage(pHandle->pBuf, pPage);
}
} else {
int64_t st = taosGetTimestampUs();
pSource->src.pBlock = pHandle->fetchfp(((SSortSource*)pSource)->param);
pSource->fetchUs += taosGetTimestampUs() - st;
pSource->fetchNum++;
if (pSource->src.pBlock == NULL) {
(*numOfCompleted) += 1;
pSource->src.rowIndex = -1;
......@@ -602,7 +614,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
}
}
tsortClearOrderdSource(pHandle->pOrderedSource);
tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL);
taosArrayAddAll(pHandle->pOrderedSource, pResList);
taosArrayDestroy(pResList);
......@@ -644,7 +656,7 @@ static int32_t createInitialSources(SSortHandle* pHandle) {
SSortSource* source = *pSource;
*pSource = NULL;
tsortClearOrderdSource(pHandle->pOrderedSource);
tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL);
while (1) {
SSDataBlock* pBlock = pHandle->fetchfp(source->param);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册