提交 797af537 编写于 作者: H Haojun Liao

enh(query): improve the multi-way merge performance.

上级 5a3e4dc1
...@@ -209,7 +209,7 @@ static SSDataBlock* doLoadRemoteDataImpl(SOperatorInfo* pOperator) { ...@@ -209,7 +209,7 @@ static SSDataBlock* doLoadRemoteDataImpl(SOperatorInfo* pOperator) {
} }
} }
static SSDataBlock* doLoadRemoteData(SOperatorInfo* pOperator) { static SSDataBlock* loadRemoteData(SOperatorInfo* pOperator) {
SExchangeInfo* pExchangeInfo = pOperator->info; SExchangeInfo* pExchangeInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
...@@ -318,7 +318,7 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode ...@@ -318,7 +318,7 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pDummyBlock->pDataBlock); pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pDummyBlock->pDataBlock);
pOperator->fpSet = pOperator->fpSet =
createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, destroyExchangeOperatorInfo, NULL); createOperatorFpSet(prepareLoadRemoteData, loadRemoteData, NULL, destroyExchangeOperatorInfo, NULL);
return pOperator; return pOperator;
_error: _error:
...@@ -581,13 +581,10 @@ int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) { ...@@ -581,13 +581,10 @@ int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) {
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
pOperator->cost.openCost = taosGetTimestampUs() - startTs; pOperator->cost.openCost = taosGetTimestampUs() - startTs;
tsem_wait(&pExchangeInfo->ready);
if (isTaskKilled(pTaskInfo)) { if (isTaskKilled(pTaskInfo)) {
longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
} }
tsem_post(&pExchangeInfo->ready);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -559,7 +559,7 @@ typedef struct SMultiwayMergeOperatorInfo { ...@@ -559,7 +559,7 @@ typedef struct SMultiwayMergeOperatorInfo {
STupleHandle* prefetchedTuple; STupleHandle* prefetchedTuple;
} SMultiwayMergeOperatorInfo; } SMultiwayMergeOperatorInfo;
int32_t doOpenMultiwayMergeOperator(SOperatorInfo* pOperator) { int32_t openMultiwayMergeOperator(SOperatorInfo* pOperator) {
SMultiwayMergeOperatorInfo* pInfo = pOperator->info; SMultiwayMergeOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
...@@ -580,6 +580,7 @@ int32_t doOpenMultiwayMergeOperator(SOperatorInfo* pOperator) { ...@@ -580,6 +580,7 @@ int32_t doOpenMultiwayMergeOperator(SOperatorInfo* pOperator) {
SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource)); SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
ps->param = pOperator->pDownstream[i]; ps->param = pOperator->pDownstream[i];
ps->onlyRef = true; ps->onlyRef = true;
tsortAddSource(pInfo->pSortHandle, ps); tsortAddSource(pInfo->pSortHandle, ps);
} }
...@@ -792,7 +793,7 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size ...@@ -792,7 +793,7 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size
pInfo->sortBufSize = pInfo->bufPageSize * (numStreams + 1); // one additional is reserved for merged result. pInfo->sortBufSize = pInfo->bufPageSize * (numStreams + 1); // one additional is reserved for merged result.
setOperatorInfo(pOperator, "MultiwayMergeOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE, false, OP_NOT_OPENED, pInfo, pTaskInfo); setOperatorInfo(pOperator, "MultiwayMergeOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE, false, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->fpSet = createOperatorFpSet(doOpenMultiwayMergeOperator, doMultiwayMerge, NULL, pOperator->fpSet = createOperatorFpSet(openMultiwayMergeOperator, doMultiwayMerge, NULL,
destroyMultiwayMergeOperatorInfo, getMultiwayMergeExplainExecInfo); destroyMultiwayMergeOperatorInfo, getMultiwayMergeExplainExecInfo);
code = appendDownstream(pOperator, downStreams, numStreams); code = appendDownstream(pOperator, downStreams, numStreams);
......
...@@ -34,14 +34,12 @@ struct SSortHandle { ...@@ -34,14 +34,12 @@ struct SSortHandle {
int32_t pageSize; int32_t pageSize;
int32_t numOfPages; int32_t numOfPages;
SDiskbasedBuf* pBuf; SDiskbasedBuf* pBuf;
SArray* pSortInfo;
SArray* pSortInfo; SArray* pOrderedSource;
SArray* pOrderedSource; int32_t loops;
uint64_t sortElapsed;
int32_t loops; int64_t startTs;
uint64_t sortElapsed; uint64_t totalElapsed;
int64_t startTs;
uint64_t totalElapsed;
int32_t sourceId; int32_t sourceId;
SSDataBlock* pDataBlock; SSDataBlock* pDataBlock;
...@@ -99,9 +97,9 @@ SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t page ...@@ -99,9 +97,9 @@ SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t page
} }
static int32_t sortComparCleanup(SMsortComparParam* cmpParam) { static int32_t sortComparCleanup(SMsortComparParam* cmpParam) {
// NOTICE: pSource may be, if it is SORT_MULTISOURCE_MERGE
for (int32_t i = 0; i < cmpParam->numOfSources; ++i) { for (int32_t i = 0; i < cmpParam->numOfSources; ++i) {
SSortSource* pSource = SSortSource* pSource = cmpParam->pSources[i];
cmpParam->pSources[i]; // NOTICE: pSource may be SGenericSource *, if it is SORT_MULTISOURCE_MERGE
blockDataDestroy(pSource->src.pBlock); blockDataDestroy(pSource->src.pBlock);
taosMemoryFreeClear(pSource); taosMemoryFreeClear(pSource);
} }
...@@ -228,15 +226,15 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) { ...@@ -228,15 +226,15 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
return doAddNewExternalMemSource(pHandle->pBuf, pHandle->pOrderedSource, pBlock, &pHandle->sourceId, pPageIdList); return doAddNewExternalMemSource(pHandle->pBuf, pHandle->pOrderedSource, pBlock, &pHandle->sourceId, pPageIdList);
} }
static void setCurrentSourceIsDone(SSortSource* pSource, SSortHandle* pHandle) { static void setCurrentSourceDone(SSortSource* pSource, SSortHandle* pHandle) {
pSource->src.rowIndex = -1; pSource->src.rowIndex = -1;
++pHandle->numOfCompletedSources; ++pHandle->numOfCompletedSources;
} }
static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int32_t startIndex, int32_t endIndex, static int32_t sortComparInit(SMsortComparParam* pParam, SArray* pSources, int32_t startIndex, int32_t endIndex,
SSortHandle* pHandle) { SSortHandle* pHandle) {
cmpParam->pSources = taosArrayGet(pSources, startIndex); pParam->pSources = taosArrayGet(pSources, startIndex);
cmpParam->numOfSources = (endIndex - startIndex + 1); pParam->numOfSources = (endIndex - startIndex + 1);
int32_t code = 0; int32_t code = 0;
...@@ -244,7 +242,7 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int ...@@ -244,7 +242,7 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int
if (pHandle->pBuf == NULL) { if (pHandle->pBuf == NULL) {
if (!osTempSpaceAvailable()) { if (!osTempSpaceAvailable()) {
code = TSDB_CODE_NO_AVAIL_DISK; code = TSDB_CODE_NO_AVAIL_DISK;
qError("Sort compare init failed since %s", terrstr(code)); qError("Sort compare init failed since %s, %s", terrstr(code), pHandle->idStr);
return code; return code;
} }
...@@ -257,12 +255,12 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int ...@@ -257,12 +255,12 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int
} }
if (pHandle->type == SORT_SINGLESOURCE_SORT) { if (pHandle->type == SORT_SINGLESOURCE_SORT) {
for (int32_t i = 0; i < cmpParam->numOfSources; ++i) { for (int32_t i = 0; i < pParam->numOfSources; ++i) {
SSortSource* pSource = cmpParam->pSources[i]; SSortSource* pSource = pParam->pSources[i];
// set current source is done // set current source is done
if (taosArrayGetSize(pSource->pageIdList) == 0) { if (taosArrayGetSize(pSource->pageIdList) == 0) {
setCurrentSourceIsDone(pSource, pHandle); setCurrentSourceDone(pSource, pHandle);
continue; continue;
} }
...@@ -277,15 +275,21 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int ...@@ -277,15 +275,21 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int
releaseBufPage(pHandle->pBuf, pPage); releaseBufPage(pHandle->pBuf, pPage);
} }
} else { } else {
for (int32_t i = 0; i < cmpParam->numOfSources; ++i) { qDebug("start init for the multiway merge sort, %s", pHandle->idStr);
SSortSource* pSource = cmpParam->pSources[i]; int64_t st = taosGetTimestampUs();
for (int32_t i = 0; i < pParam->numOfSources; ++i) {
SSortSource* pSource = pParam->pSources[i];
pSource->src.pBlock = pHandle->fetchfp(pSource->param); pSource->src.pBlock = pHandle->fetchfp(pSource->param);
// set current source is done // set current source is done
if (pSource->src.pBlock == NULL) { if (pSource->src.pBlock == NULL) {
setCurrentSourceIsDone(pSource, pHandle); setCurrentSourceDone(pSource, pHandle);
} }
} }
int64_t et = taosGetTimestampUs();
qDebug("init for merge sort completed, elapsed time:%.2f ms, %s", (et - st) / 1000.0, pHandle->idStr);
} }
return code; return code;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册