提交 39c06f13 编写于 作者: H Haojun Liao

[td-11818] refactor sort, and sort operator.

上级 d11c44fc
......@@ -18,9 +18,6 @@ typedef struct SBlockOrderInfo {
int32_t order;
int32_t colIndex;
SColumnInfoData *pColData;
// int32_t type;
// int32_t bytes;
// bool hasNull;
} SBlockOrderInfo;
int taosGetFqdnPortFromEp(const char *ep, SEp *pEp);
......@@ -93,6 +90,8 @@ size_t blockDataGetRowSize(const SSDataBlock* pBlock);
double blockDataGetSerialRowSize(const SSDataBlock* pBlock);
size_t blockDataGetSerialMetaSize(const SSDataBlock* pBlock);
SSchema* blockDataExtractSchema(const SSDataBlock* pBlock, int32_t* numOfCols);
size_t blockDataNumOfRowsForSerialize(const SSDataBlock* pBlock, int32_t blockSize);
int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst);
......
......@@ -157,7 +157,7 @@ void setBufPageDirty(SFilePage* pPageInfo, bool dirty);
* Print the statistics when closing this buffer
* @param pBuf
*/
void printStatisBeforeClose(SDiskbasedBuf* pBuf);
void setPrintStatis(SDiskbasedBuf* pBuf);
/**
* return buf statistics.
......
......@@ -411,7 +411,6 @@ SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int3
return pDst;
}
/**
*
* +------------------+---------------+--------------------+
......@@ -522,6 +521,22 @@ size_t blockDataGetSerialMetaSize(const SSDataBlock* pBlock) {
return sizeof(int32_t) + pBlock->info.numOfCols * sizeof(int32_t);
}
SSchema* blockDataExtractSchema(const SSDataBlock* pBlock, int32_t* numOfCols) {
SSchema* pSchema = calloc(pBlock->info.numOfCols, sizeof(SSchema));
for(int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
pSchema[i].bytes = pColInfoData->info.bytes;
pSchema[i].type = pColInfoData->info.type;
pSchema[i].colId = pColInfoData->info.colId;
}
if (numOfCols != NULL) {
*numOfCols = pBlock->info.numOfCols;
}
return pSchema;
}
double blockDataGetSerialRowSize(const SSDataBlock* pBlock) {
ASSERT(pBlock != NULL);
double rowSize = 0;
......
......@@ -15,6 +15,7 @@
#ifndef TDENGINE_EXECUTORIMPL_H
#define TDENGINE_EXECUTORIMPL_H
#include "tsort.h"
#ifdef __cplusplus
extern "C" {
#endif
......@@ -557,10 +558,6 @@ typedef struct SSortMergeOperatorInfo {
char** currentGroupColData;
SArray* udfInfo;
int32_t numOfSources;
// char** prevRow;
// int32_t resultRowFactor;
// bool multiGroupResults;
// bool hasGroupColData;
} SSortMergeOperatorInfo;
typedef struct SMsortComparParam {
......@@ -571,19 +568,16 @@ typedef struct SMsortComparParam {
} SMsortComparParam;
typedef struct SOrderOperatorInfo {
int32_t sourceId;
uint32_t sortBufSize; // max buffer size for in-memory sort
SSDataBlock *pDataBlock;
bool hasVarCol; // has variable length column, such as binary/varchar/nchar
int32_t numOfCompleted;
SDiskbasedBuf *pSortInternalBuf;
SMultiwayMergeTreeInfo *pMergeTree;
SArray *pSources; // SArray<SExternalMemSource*>
SArray *orderInfo;
bool nullFirst;
SSortHandle *pSortHandle;
int32_t bufPageSize;
int32_t numOfRowsInRes;
SMsortComparParam cmpParam;
// TODO extact struct
int64_t startTs; // sort start time
uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included.
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TSORT_H
#define TDENGINE_TSORT_H
#ifdef __cplusplus
extern "C" {
#endif
#include "common.h"
#include "os.h"
enum {
SORT_MULTIWAY_MERGE = 0x1,
SORT_SINGLESOURCE = 0x2,
};
typedef struct SMultiMergeSource {
int32_t type;
int32_t rowIndex;
SSDataBlock *pBlock;
} SMultiMergeSource;
typedef struct SExternalMemSource {
SMultiMergeSource src;
SArray* pageIdList;
int32_t pageIndex;
} SExternalMemSource;
typedef struct SOperatorSource {
SMultiMergeSource src;
void* param;
} SOperatorSource;
typedef struct SSortHandle SSortHandle;
typedef struct STupleHandle STupleHandle;
typedef SSDataBlock* (*_sort_fetch_block_fn_t)(void* param);
typedef int32_t (*_sort_merge_compar_fn_t)(const void* p1, const void* p2, void* param);
/**
*
* @param type
* @return
*/
SSortHandle* createSortHandle(SArray* pOrderInfo, bool nullFirst, int32_t type, int32_t pageSize, int32_t numOfPages, SSchema* pSchema, int32_t numOfCols, const char* idstr);
/**
*
* @param pSortHandle
*/
void destroySortHandle(SSortHandle* pSortHandle);
/**
*
* @param pHandle
* @return
*/
int32_t sortOpen(SSortHandle* pHandle);
/**
*
* @param pHandle
* @return
*/
int32_t sortClose(SSortHandle* pHandle);
/**
*
* @return
*/
int32_t setFetchRawDataFp(SSortHandle* pHandle, _sort_fetch_block_fn_t fp);
/**
*
* @param pHandle
* @param fp
* @return
*/
int32_t setComparFn(SSortHandle* pHandle, _sort_merge_compar_fn_t fp);
/**
*
* @param pHandle
* @param pSource
* @return success or failed
*/
int32_t sortAddSource(SSortHandle* pSortHandle, void* pSource);
/**
*
* @param pHandle
* @return
*/
STupleHandle* sortNextTuple(SSortHandle* pHandle);
/**
*
* @param pHandle
* @param colIndex
* @return
*/
bool sortIsValueNull(STupleHandle* pVHandle, int32_t colIndex);
/**
*
* @param pHandle
* @param colIndex
* @return
*/
void* sortGetValue(STupleHandle* pVHandle, int32_t colIndex);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_TSORT_H
......@@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <tep.h>
#include <tsort.h>
#include "exception.h"
#include "os.h"
#include "parser.h"
......@@ -5598,7 +5599,7 @@ SArray* getResultGroupCheckColumns(STaskAttr* pQuery) {
return pOrderColumns;
}
static void destroyGlobalAggOperatorInfo(void* param, int32_t numOfOutput) {
static void destroySortMergeOperatorInfo(void* param, int32_t numOfOutput) {
SSortMergeOperatorInfo *pInfo = (SSortMergeOperatorInfo*) param;
destroyBasicOperatorInfo(&pInfo->binfo, numOfOutput);
taosArrayDestroy(pInfo->groupColumnList);
......@@ -5632,11 +5633,21 @@ static SSDataBlock* doSortedMerge(void* param, bool* newgroup) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SSortMergeOperatorInfo* pInfo = pOperator->info;
SMsortComparParam resultParam = {.orderInfo = pInfo->orderInfo};
SArray* pSource = taosArrayInit(4, POINTER_BYTES);
for(int32_t i = 0; i < pInfo->numOfSources; ++i) {
SSDataBlock* pBlock = pOperator->pDownstream[i]->exec(pOperator->pDownstream[i], newgroup);
// TODO set the order input sources.
// doAddNewOperatorSource(pSource, pBlock, pInfo->binfo.capacity);
}
// sortComparInit(&resultParam, pSource, 0, pInfo->numOfSources - 1, pOperator);
// int32_t code = tMergeTreeCreate(&pInfo->pMergeTree, pInfo->cmpParam.numOfSources, &pInfo->cmpParam, msortComparFn);
// if (code != TSDB_CODE_SUCCESS) {
// longjmp(pTaskInfo->env, code);
// }
return NULL;
}
......@@ -5679,24 +5690,17 @@ SOperatorInfo* createSortMergeOperatorInfo(SOperatorInfo* downstream, SArray* pE
pOperator->pTaskInfo = pTaskInfo;
pOperator->exec = doSortedMerge;
pOperator->cleanupFn = destroyGlobalAggOperatorInfo;
pOperator->cleanupFn = destroySortMergeOperatorInfo;
appendDownstream(pOperator, downstream);
return pOperator;
}
typedef struct SExternalMemSource {
SArray* pageIdList;
int32_t pageIndex;
int32_t rowIndex;
SSDataBlock *pBlock;
} SExternalMemSource;
int32_t msortComparFn(const void *pLeft, const void *pRight, void *param) {
int32_t pLeftIdx = *(int32_t *)pLeft;
int32_t pRightIdx = *(int32_t *)pRight;
SMsortComparParam *pParam = (SMsortComparParam *)param;
SMsortComparParam *pParam = (SMsortComparParam *)param;
SArray *pInfo = pParam->orderInfo;
......@@ -5704,16 +5708,16 @@ int32_t msortComparFn(const void *pLeft, const void *pRight, void *param) {
SExternalMemSource* pRightSource = pParam->pSources[pRightIdx];
// this input is exhausted, set the special value to denote this
if (pLeftSource->rowIndex == -1) {
if (pLeftSource->src.rowIndex == -1) {
return 1;
}
if (pRightSource->rowIndex == -1) {
if (pRightSource->src.rowIndex == -1) {
return -1;
}
SSDataBlock* pLeftBlock = pLeftSource->pBlock;
SSDataBlock* pRightBlock = pRightSource->pBlock;
SSDataBlock* pLeftBlock = pLeftSource->src.pBlock;
SSDataBlock* pRightBlock = pRightSource->src.pBlock;
for(int32_t i = 0; i < pInfo->size; ++i) {
SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(pInfo, i);
......@@ -5722,13 +5726,13 @@ int32_t msortComparFn(const void *pLeft, const void *pRight, void *param) {
bool leftNull = false;
if (pLeftColInfoData->hasNull) {
leftNull = colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->rowIndex, pLeftBlock->pBlockAgg);
leftNull = colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->src.rowIndex, pLeftBlock->pBlockAgg);
}
SColumnInfoData* pRightColInfoData = TARRAY_GET_ELEM(pRightBlock->pDataBlock, pOrder->colIndex);
bool rightNull = false;
if (pRightColInfoData->hasNull) {
rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, pRightSource->rowIndex, pRightBlock->pBlockAgg);
rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, pRightSource->src.rowIndex, pRightBlock->pBlockAgg);
}
if (leftNull && rightNull) {
......@@ -5743,8 +5747,8 @@ int32_t msortComparFn(const void *pLeft, const void *pRight, void *param) {
return pParam->nullFirst? -1:1;
}
void* left1 = colDataGet(pLeftColInfoData, pLeftSource->rowIndex);
void* right1 = colDataGet(pRightColInfoData, pRightSource->rowIndex);
void* left1 = colDataGet(pLeftColInfoData, pLeftSource->src.rowIndex);
void* right1 = colDataGet(pRightColInfoData, pRightSource->src.rowIndex);
switch(pLeftColInfoData->info.type) {
case TSDB_DATA_TYPE_INT: {
......@@ -5767,137 +5771,108 @@ int32_t msortComparFn(const void *pLeft, const void *pRight, void *param) {
}
}
static int32_t adjustMergeTreeForNextTuple(SExternalMemSource *pSource, SMultiwayMergeTreeInfo *pTree, SOrderOperatorInfo* pInfo) {
/*
* load a new SDataBlock into memory of a given intermediate data-set source,
* since it's last record in buffer has been chosen to be processed, as the winner of loser-tree
*/
if (pSource->rowIndex >= pSource->pBlock->info.rows) {
pSource->rowIndex = 0;
pSource->pageIndex += 1;
if (pSource->pageIndex >= taosArrayGetSize(pSource->pageIdList)) {
pInfo->numOfCompleted += 1;
pSource->rowIndex = -1;
pSource->pageIndex = -1;
pSource->pBlock = blockDataDestroy(pSource->pBlock);
} else {
SPageInfo* pPgInfo = *(SPageInfo**)taosArrayGet(pSource->pageIdList, pSource->pageIndex);
SFilePage* pPage = getBufPage(pInfo->pSortInternalBuf, getPageId(pPgInfo));
int32_t code = blockDataFromBuf(pSource->pBlock, pPage->data);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
releaseBufPage(pInfo->pSortInternalBuf, pPage);
}
}
/*
* Adjust loser tree otherwise, according to new candidate data
* if the loser tree is rebuild completed, we do not need to adjust
*/
int32_t leafNodeIndex = tMergeTreeGetAdjustIndex(pTree);
#ifdef _DEBUG_VIEW
printf("before adjust:\t");
tMergeTreePrint(pTree);
#endif
tMergeTreeAdjust(pTree, leafNodeIndex);
#ifdef _DEBUG_VIEW
printf("\nafter adjust:\t");
tMergeTreePrint(pTree);
#endif
}
static void appendOneRowToDataBlock(SSDataBlock *pBlock, const SSDataBlock* pSource, int32_t* rowIndex) {
static void appendOneRowToDataBlock(SSDataBlock *pBlock, STupleHandle* pTupleHandle) {
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
SColumnInfoData* pSrcColInfo = taosArrayGet(pSource->pDataBlock, i);
bool isNull = colDataIsNull(pSrcColInfo, pSource->info.rows, *rowIndex, NULL);
bool isNull = sortIsValueNull(pTupleHandle, i);
if (isNull) {
colDataAppend(pColInfo, pBlock->info.rows, NULL, true);
} else {
char* pData = colDataGet(pSrcColInfo, *rowIndex);
char* pData = sortGetValue(pTupleHandle, i);
colDataAppend(pColInfo, pBlock->info.rows, pData, false);
}
}
pBlock->info.rows += 1;
*rowIndex += 1;
}
static int32_t doAddNewSource(SOrderOperatorInfo* pInfo, SArray* pAllSources, int32_t numOfCols) {
SExternalMemSource* pSource = calloc(1, sizeof(SExternalMemSource));
if (pSource == NULL) {
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
static SSDataBlock* createDataBlock(const SSDataBlock* pDataBlock) {
int32_t numOfCols = pDataBlock->info.numOfCols;
pSource->pageIdList = getDataBufPagesIdList(pInfo->pSortInternalBuf, pInfo->sourceId);
pSource->pBlock = calloc(1, sizeof(SSDataBlock));
pSource->pBlock->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
pSource->pBlock->info.numOfCols = numOfCols;
SSDataBlock* pBlock = calloc(1, sizeof(SSDataBlock));
pBlock->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
pBlock->info.numOfCols = numOfCols;
for(int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData colInfo = {0};
SColumnInfoData* p = taosArrayGet(pInfo->pDataBlock->pDataBlock, i);
SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
colInfo.info = p->info;
taosArrayPush(pSource->pBlock->pDataBlock, &colInfo);
taosArrayPush(pBlock->pDataBlock, &colInfo);
}
taosArrayPush(pAllSources, &pSource);
pInfo->sourceId += 1;
int32_t rowSize = blockDataGetSerialRowSize(pSource->pBlock);
int32_t numOfRows = (getBufPageSize(pInfo->pSortInternalBuf) - blockDataGetSerialMetaSize(pInfo->pDataBlock))/rowSize;
return blockDataEnsureCapacity(pSource->pBlock, numOfRows);
return pBlock;
}
void addToDiskbasedBuf(SOrderOperatorInfo* pInfo, SArray* pSources, jmp_buf env) {
int32_t start = 0;
while(start < pInfo->pDataBlock->info.rows) {
int32_t stop = 0;
blockDataSplitRows(pInfo->pDataBlock, pInfo->hasVarCol, start, &stop, getBufPageSize(pInfo->pSortInternalBuf));
SSDataBlock* p = blockDataExtractBlock(pInfo->pDataBlock, start, stop - start + 1);
if (p == NULL) {
longjmp(env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
int32_t pageId = -1;
SFilePage* pPage = getNewDataBuf(pInfo->pSortInternalBuf, pInfo->sourceId, &pageId);
if (pPage == NULL) {
assert(0);
longjmp(env, terrno);
}
int32_t size = blockDataGetSize(p) + sizeof(int32_t) + p->info.numOfCols * sizeof(int32_t);
assert(size <= getBufPageSize(pInfo->pSortInternalBuf));
blockDataToBuf(pPage->data, p);
setBufPageDirty(pPage, true);
releaseBufPage(pInfo->pSortInternalBuf, pPage);
blockDataDestroy(p);
start = stop + 1;
}
//static int32_t doAddNewExternalMemSource(SOrderOperatorInfo* pInfo, SArray* pAllSources, SSDataBlock* pBlock) {
// SExternalMemSource* pSource = calloc(1, sizeof(SExternalMemSource));
// if (pSource == NULL) {
// return TSDB_CODE_QRY_OUT_OF_MEMORY;
// }
//
// pSource->pageIdList = getDataBufPagesIdList(pInfo->pSortInternalBuf, pInfo->sourceId);
// pSource->src.pBlock = pBlock;
//
// taosArrayPush(pAllSources, &pSource);
//
// pInfo->sourceId += 1;
//
// int32_t rowSize = blockDataGetSerialRowSize(pSource->src.pBlock);
// int32_t numOfRows = (getBufPageSize(pInfo->pSortInternalBuf) - blockDataGetSerialMetaSize(pInfo->pDataBlock))/rowSize;
// return blockDataEnsureCapacity(pSource->src.pBlock, numOfRows);
//}
int32_t numOfCols = pInfo->pDataBlock->info.numOfCols;
blockDataClearup(pInfo->pDataBlock, pInfo->hasVarCol);
//static int32_t doAddNewOperatorSource(SArray* pAllSources, SSDataBlock* pBlock, int32_t capacity) {
// SOperatorSource* pSource = calloc(1, sizeof(SOperatorSource));
// if (pSource == NULL) {
// return TSDB_CODE_QRY_OUT_OF_MEMORY;
// }
//
// pSource->src.pBlock = pBlock;
// taosArrayPush(pAllSources, &pSource);
//
// return blockDataEnsureCapacity(pSource->src.pBlock, capacity);
//}
int32_t code = doAddNewSource(pInfo, pSources, numOfCols);
if (code != TSDB_CODE_SUCCESS) {
longjmp(env, code);
}
}
//void addToDiskbasedBuf(SOrderOperatorInfo* pInfo, SArray* pSources, jmp_buf env) {
// int32_t start = 0;
//
// while(start < pInfo->pDataBlock->info.rows) {
// int32_t stop = 0;
// blockDataSplitRows(pInfo->pDataBlock, pInfo->hasVarCol, start, &stop, getBufPageSize(pInfo->pSortInternalBuf));
// SSDataBlock* p = blockDataExtractBlock(pInfo->pDataBlock, start, stop - start + 1);
// if (p == NULL) {
// longjmp(env, TSDB_CODE_QRY_OUT_OF_MEMORY);
// }
//
// int32_t pageId = -1;
// SFilePage* pPage = getNewDataBuf(pInfo->pSortInternalBuf, pInfo->sourceId, &pageId);
// if (pPage == NULL) {
// assert(0);
// longjmp(env, terrno);
// }
//
// int32_t size = blockDataGetSize(p) + sizeof(int32_t) + p->info.numOfCols * sizeof(int32_t);
// assert(size <= getBufPageSize(pInfo->pSortInternalBuf));
//
// blockDataToBuf(pPage->data, p);
//
// setBufPageDirty(pPage, true);
// releaseBufPage(pInfo->pSortInternalBuf, pPage);
//
// blockDataDestroy(p);
// start = stop + 1;
// }
//
// int32_t numOfCols = pInfo->pDataBlock->info.numOfCols;
// blockDataClearup(pInfo->pDataBlock, pInfo->hasVarCol);
//
// SSDataBlock* pBlock = createDataBlock(pInfo->pDataBlock);
// int32_t code = doAddNewExternalMemSource(pInfo, pSources, pBlock);
// if (code != TSDB_CODE_SUCCESS) {
// longjmp(env, code);
// }
//}
static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int32_t startIndex, int32_t endIndex, SDiskbasedBuf* pBuf) {
cmpParam->pSources = taosArrayGet(pSources, startIndex);
......@@ -5908,7 +5883,7 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int
SPageInfo* pPgInfo = *(SPageInfo**) taosArrayGet(pSource->pageIdList, pSource->pageIndex);
SFilePage* pPage = getBufPage(pBuf, getPageId(pPgInfo));
int32_t code = blockDataFromBuf(cmpParam->pSources[i]->pBlock, pPage->data);
int32_t code = blockDataFromBuf(cmpParam->pSources[i]->src.pBlock, pPage->data);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -5922,31 +5897,23 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int
static int32_t sortComparClearup(SMsortComparParam* cmpParam) {
for(int32_t i = 0; i < cmpParam->numOfSources; ++i) {
SExternalMemSource* pSource = cmpParam->pSources[i];
blockDataDestroy(pSource->pBlock);
blockDataDestroy(pSource->src.pBlock);
tfree(pSource);
}
cmpParam->numOfSources = 0;
}
static SSDataBlock* getSortedBlockData(SExecTaskInfo* pTaskInfo, SOrderOperatorInfo* pInfo, SMsortComparParam* cmpParam, int32_t capacity) {
static SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SOrderOperatorInfo* pInfo, int32_t capacity) {
blockDataClearup(pInfo->pDataBlock, pInfo->hasVarCol);
while(1) {
if (cmpParam->numOfSources == pInfo->numOfCompleted) {
break;
}
int32_t index = tMergeTreeGetChosenIndex(pInfo->pMergeTree);
SExternalMemSource *pSource = (*cmpParam).pSources[index];
appendOneRowToDataBlock(pInfo->pDataBlock, pSource->pBlock, &pSource->rowIndex);
int32_t code = adjustMergeTreeForNextTuple(pSource, pInfo->pMergeTree, pInfo);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
}
STupleHandle* pTupleHandle = sortNextTuple(pHandle);
if (pTupleHandle == NULL) {
break;
}
appendOneRowToDataBlock(pInfo->pDataBlock, pTupleHandle);
if (pInfo->pDataBlock->info.rows >= capacity) {
return pInfo->pDataBlock;
}
......@@ -5955,106 +5922,10 @@ static SSDataBlock* getSortedBlockData(SExecTaskInfo* pTaskInfo, SOrderOperatorI
return (pInfo->pDataBlock->info.rows > 0)? pInfo->pDataBlock:NULL;
}
static int32_t doInternalSort(SExecTaskInfo* pTaskInfo, SOrderOperatorInfo* pInfo) {
size_t numOfSources = taosArrayGetSize(pInfo->pSources);
// Calculate the I/O counts to complete the data sort.
double sortCount = floorl(log2(numOfSources) / log2(getNumOfInMemBufPages(pInfo->pSortInternalBuf)));
pInfo->totalElapsed = taosGetTimestampUs() - pInfo->startTs;
qDebug("%s %d rounds mergesort required to complete the sort, first-round sorted data size:%"PRIzu", sort:%"PRId64", total elapsed:%"PRId64,
GET_TASKID(pTaskInfo), (int32_t) (sortCount + 1), getTotalBufSize(pInfo->pSortInternalBuf), pInfo->sortElapsed,
pInfo->totalElapsed);
size_t pgSize = getBufPageSize(pInfo->pSortInternalBuf);
int32_t numOfRows = (pgSize - blockDataGetSerialMetaSize(pInfo->pDataBlock))/ blockDataGetSerialRowSize(pInfo->pDataBlock);
blockDataEnsureCapacity(pInfo->pDataBlock, numOfRows);
size_t numOfSorted = taosArrayGetSize(pInfo->pSources);
for(int32_t t = 0; t < sortCount; ++t) {
int64_t st = taosGetTimestampUs();
SArray* pResList = taosArrayInit(4, POINTER_BYTES);
SMsortComparParam resultParam = {.orderInfo = pInfo->cmpParam.orderInfo};
int32_t numOfInputSources = getNumOfInMemBufPages(pInfo->pSortInternalBuf);
int32_t sortGroup = (numOfSorted + numOfInputSources - 1) / numOfInputSources;
// Only *numOfInputSources* can be loaded into buffer to perform the external sort.
for(int32_t i = 0; i < sortGroup; ++i) {
pInfo->sourceId += 1;
int32_t end = (i + 1) * numOfInputSources - 1;
if (end > numOfSorted - 1) {
end = numOfSorted - 1;
}
pInfo->cmpParam.numOfSources = end - i * numOfInputSources + 1;
int32_t code = sortComparInit(&pInfo->cmpParam, pInfo->pSources, i * numOfInputSources, end, pInfo->pSortInternalBuf);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
}
code = tMergeTreeCreate(&pInfo->pMergeTree, pInfo->cmpParam.numOfSources, &pInfo->cmpParam, msortComparFn);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
}
while (1) {
SSDataBlock* pDataBlock = getSortedBlockData(pTaskInfo, pInfo, &pInfo->cmpParam, numOfRows);
if (pDataBlock == NULL) {
break;
}
int32_t pageId = -1;
SFilePage* pPage = getNewDataBuf(pInfo->pSortInternalBuf, pInfo->sourceId, &pageId);
if (pPage == NULL) {
assert(0);
longjmp(pTaskInfo->env, terrno);
}
int32_t size = blockDataGetSize(pDataBlock) + sizeof(int32_t) + pDataBlock->info.numOfCols * sizeof(int32_t);
assert(size <= getBufPageSize(pInfo->pSortInternalBuf));
blockDataToBuf(pPage->data, pDataBlock);
setBufPageDirty(pPage, true);
releaseBufPage(pInfo->pSortInternalBuf, pPage);
blockDataClearup(pDataBlock, pInfo->hasVarCol);
}
tMergeTreeDestroy(pInfo->pMergeTree);
pInfo->numOfCompleted = 0;
code = doAddNewSource(pInfo, pResList, pInfo->pDataBlock->info.numOfCols);
if (code != 0) {
longjmp(pTaskInfo->env, code);
}
}
sortComparClearup(&pInfo->cmpParam);
taosArrayClear(pInfo->pSources);
taosArrayAddAll(pInfo->pSources, pResList);
taosArrayDestroy(pResList);
pInfo->cmpParam = resultParam;
numOfSorted = taosArrayGetSize(pInfo->pSources);
int64_t el = taosGetTimestampUs() - st;
pInfo->totalElapsed += el;
SDiskbasedBufStatis statis = getDBufStatis(pInfo->pSortInternalBuf);
qDebug("%s %d round mergesort, elapsed:%"PRId64" readDisk:%.2f Kb, flushDisk:%.2f Kb", GET_TASKID(pTaskInfo), t + 1, el, statis.loadBytes/1024.0,
statis.flushBytes/1024.0);
}
pInfo->cmpParam.numOfSources = taosArrayGetSize(pInfo->pSources);
return 0;
static SSDataBlock* loadNextDataBlock(void* param) {
bool newgroup = false;
SOperatorInfo* pOperator = (SOperatorInfo*) param;
return pOperator->pDownstream[0]->exec(pOperator->pDownstream[0], &newgroup);
}
static SSDataBlock* doSort(void* param, bool* newgroup) {
......@@ -6068,74 +5939,30 @@ static SSDataBlock* doSort(void* param, bool* newgroup) {
SSDataBlock* pBlock = NULL;
if (pOperator->status == OP_RES_TO_RETURN) {
return getSortedBlockData(pTaskInfo, pInfo, &pInfo->cmpParam, pInfo->numOfRowsInRes);
return getSortedBlockData(pInfo->pSortHandle, pInfo, pInfo->numOfRowsInRes);
}
int64_t st = taosGetTimestampUs();
while(1) {
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pOperator->pDownstream[0]->exec(pOperator->pDownstream[0], newgroup);
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
// start to flush data into disk and try do multiway merge sort
if (pBlock == NULL) {
break;
}
int32_t code = blockDataMerge(pInfo->pDataBlock, pBlock);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pOperator->pTaskInfo->env, code);
}
size_t size = blockDataGetSize(pInfo->pDataBlock);
if (size > pInfo->sortBufSize) {
// Perform the in-memory sort and then flush data in the buffer into disk.
int64_t p = taosGetTimestampUs();
blockDataSort(pInfo->pDataBlock, pInfo->cmpParam.orderInfo, pInfo->cmpParam.nullFirst);
int64_t el = taosGetTimestampUs() - p;
pInfo->sortElapsed += el;
addToDiskbasedBuf(pInfo, pInfo->pSources, pTaskInfo->env);
}
}
if (pInfo->pDataBlock->info.rows > 0) {
// Perform the in-memory sort and then flush data in the buffer into disk.
blockDataSort(pInfo->pDataBlock, pInfo->cmpParam.orderInfo, pInfo->cmpParam.nullFirst);
// All sorted data are resident in memory, external memory sort is not needed.
// Return to the upstream operator directly
if (isAllDataInMemBuf(pInfo->pSortInternalBuf)) {
pOperator->status = OP_EXEC_DONE;
return (pInfo->pDataBlock->info.rows == 0)? NULL:pInfo->pDataBlock;
}
SSchema* p = blockDataExtractSchema(pInfo->pDataBlock, NULL);
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
pInfo->pSortHandle = createSortHandle(pInfo->orderInfo, pInfo->nullFirst, SORT_SINGLESOURCE, pInfo->bufPageSize,
numOfBufPage, p, pInfo->pDataBlock->info.numOfCols, "GET_TASKID(pTaskInfo)");
addToDiskbasedBuf(pInfo, pInfo->pSources, pTaskInfo->env);
}
doInternalSort(pTaskInfo, pInfo);
int32_t code = blockDataEnsureCapacity(pInfo->pDataBlock, pInfo->numOfRowsInRes);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
}
tfree(p);
setFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock);
setComparFn(pInfo->pSortHandle, msortComparFn);
int32_t numOfSources = taosArrayGetSize(pInfo->pSources);
ASSERT(numOfSources <= getNumOfInMemBufPages(pInfo->pSortInternalBuf));
code = sortComparInit(&pInfo->cmpParam, pInfo->pSources, 0, numOfSources - 1, pInfo->pSortInternalBuf);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
}
sortAddSource(pInfo->pSortHandle, pOperator);
code = tMergeTreeCreate(&pInfo->pMergeTree, pInfo->cmpParam.numOfSources, &pInfo->cmpParam, msortComparFn);
// TODO set error code;
int32_t code = sortOpen(pInfo->pSortHandle);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
longjmp(pTaskInfo->env, terrno);
}
pOperator->status = OP_RES_TO_RETURN;
return getSortedBlockData(pTaskInfo, pInfo, &pInfo->cmpParam, pInfo->numOfRowsInRes);
return getSortedBlockData(pInfo->pSortHandle, pInfo, pInfo->numOfRowsInRes);
}
static SArray* createBlockOrder(SArray* pExprInfo, SArray* pOrderVal) {
......@@ -6176,8 +6003,7 @@ SOperatorInfo *createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprI
pInfo->numOfRowsInRes = 1024;
pInfo->pDataBlock = createOutputBuf_rv(pExprInfo, pInfo->numOfRowsInRes);
pInfo->pSources = taosArrayInit(4, POINTER_BYTES);
pInfo->cmpParam.orderInfo = createBlockOrder(pExprInfo, pOrderVal);
pInfo->orderInfo = createBlockOrder(pExprInfo, pOrderVal);
for(int32_t i = 0; i < taosArrayGetSize(pExprInfo); ++i) {
SExprInfo* pExpr = taosArrayGetP(pExprInfo, i);
......@@ -6187,8 +6013,8 @@ SOperatorInfo *createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprI
}
}
int32_t code = createDiskbasedBuffer(&pInfo->pSortInternalBuf, pInfo->bufPageSize, pInfo->sortBufSize, 1, "/tmp/");
if (pInfo->pSources == NULL || code != 0 || pInfo->cmpParam.orderInfo == NULL || pInfo->pDataBlock == NULL) {
// int32_t code = createDiskbasedBuffer(&pInfo->pSortInternalBuf, pInfo->bufPageSize, pInfo->sortBufSize, 1, "/tmp/");
if (pInfo->orderInfo == NULL || pInfo->pDataBlock == NULL) {
tfree(pOperator);
destroyOrderOperatorInfo(pInfo, taosArrayGetSize(pExprInfo));
tfree(pInfo);
......@@ -7236,10 +7062,7 @@ static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) {
SOrderOperatorInfo* pInfo = (SOrderOperatorInfo*) param;
pInfo->pDataBlock = blockDataDestroy(pInfo->pDataBlock);
taosArrayDestroy(pInfo->cmpParam.orderInfo);
destroyResultBuf(pInfo->pSortInternalBuf);
tMergeTreeDestroy(pInfo->pMergeTree);
taosArrayDestroy(pInfo->orderInfo);
}
static void destroyConditionOperatorInfo(void* param, int32_t numOfOutput) {
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "common.h"
#include "query.h"
#include "tsort.h"
#include "tep.h"
#include "tcfg.h"
#include "tlosertree.h"
#include "tpagedbuf.h"
#include "tutil.h"
typedef struct SMsortComparParam {
void **pSources;
int32_t numOfSources;
SArray *orderInfo; // SArray<SBlockOrderInfo>
bool nullFirst;
} SMsortComparParam;
typedef struct STupleHandle {
SSDataBlock* pBlock;
int32_t rowIndex;
} STupleHandle;
typedef struct SSortHandle {
int32_t type;
int32_t pageSize;
int32_t numOfPages;
SDiskbasedBuf *pBuf;
SArray *pOrderInfo;
bool nullFirst;
bool hasVarCol;
SArray *pSources;
SArray *pOrderedSource;
_sort_fetch_block_fn_t fetchfp;
_sort_merge_compar_fn_t comparFn;
void *pParam;
SMultiwayMergeTreeInfo *pMergeTree;
int32_t numOfCols;
int64_t startTs;
uint64_t sortElapsed;
uint64_t totalElapsed;
int32_t sourceId;
SSDataBlock *pDataBlock;
SMsortComparParam cmpParam;
int32_t numOfCompletedSources;
bool opened;
const char *idStr;
bool inMemSort;
bool needAdjust;
STupleHandle tupleHandle;
} SSortHandle;
SSDataBlock* createDataBlock_rv(SSchema* pSchema, int32_t numOfCols) {
SSDataBlock* pBlock = calloc(1, sizeof(SSDataBlock));
pBlock->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
pBlock->info.numOfCols = numOfCols;
for(int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData colInfo = {0};
colInfo.info.type = pSchema[i].type;
colInfo.info.bytes = pSchema[i].bytes;
colInfo.info.colId = pSchema[i].colId;
taosArrayPush(pBlock->pDataBlock, &colInfo);
}
return pBlock;
}
/**
*
* @param type
* @return
*/
SSortHandle* createSortHandle(SArray* pOrderInfo, bool nullFirst, int32_t type, int32_t pageSize, int32_t numOfPages, SSchema* pSchema, int32_t numOfCols, const char* idstr) {
SSortHandle* pSortHandle = calloc(1, sizeof(SSortHandle));
pSortHandle->type = type;
pSortHandle->pageSize = pageSize;
pSortHandle->numOfPages = numOfPages;
pSortHandle->pOrderedSource = taosArrayInit(4, POINTER_BYTES);
pSortHandle->pSources = taosArrayInit(4, POINTER_BYTES);
pSortHandle->pOrderInfo = pOrderInfo;
pSortHandle->nullFirst = nullFirst;
pSortHandle->cmpParam.orderInfo = pOrderInfo;
pSortHandle->pDataBlock = createDataBlock_rv(pSchema, numOfCols);
if (idstr != NULL) {
pSortHandle->idStr = strdup(idstr);
}
return pSortHandle;
}
void destroySortHandle(SSortHandle* pSortHandle) {
sortClose(pSortHandle);
taosArrayDestroy(pSortHandle->pSources);
if (pSortHandle->pMergeTree != NULL) {
tMergeTreeDestroy(pSortHandle->pMergeTree);
}
tfree(pSortHandle->idStr);
tfree(pSortHandle);
}
int32_t sortAddSource(SSortHandle* pSortHandle, void* pSource) {
if (pSortHandle->type == SORT_SINGLESOURCE) {
pSortHandle->pParam = pSource;
} else {
taosArrayPush(pSortHandle->pOrderedSource, &pSource);
}
}
static SSDataBlock* createDataBlock(const SSDataBlock* pDataBlock) {
int32_t numOfCols = pDataBlock->info.numOfCols;
SSDataBlock* pBlock = calloc(1, sizeof(SSDataBlock));
pBlock->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
pBlock->info.numOfCols = numOfCols;
for(int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData colInfo = {0};
SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
colInfo.info = p->info;
taosArrayPush(pBlock->pDataBlock, &colInfo);
}
return pBlock;
}
static int32_t doAddNewExternalMemSource(SDiskbasedBuf *pBuf, SArray* pAllSources, SSDataBlock* pBlock, int32_t* sourceId) {
SExternalMemSource* pSource = calloc(1, sizeof(SExternalMemSource));
if (pSource == NULL) {
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
pSource->pageIdList = getDataBufPagesIdList(pBuf, (*sourceId));
pSource->src.pBlock = pBlock;
taosArrayPush(pAllSources, &pSource);
(*sourceId) += 1;
int32_t rowSize = blockDataGetSerialRowSize(pSource->src.pBlock);
int32_t numOfRows = (getBufPageSize(pBuf) - blockDataGetSerialMetaSize(pBlock))/rowSize;
return blockDataEnsureCapacity(pSource->src.pBlock, numOfRows);
}
static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
int32_t start = 0;
if (pHandle->pBuf == NULL) {
int32_t code = createDiskbasedBuffer(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize, 0, "/tmp");
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
while(start < pDataBlock->info.rows) {
int32_t stop = 0;
blockDataSplitRows(pDataBlock, pHandle->hasVarCol, start, &stop, pHandle->pageSize);
SSDataBlock* p = blockDataExtractBlock(pDataBlock, start, stop - start + 1);
if (p == NULL) {
return terrno;
}
int32_t pageId = -1;
SFilePage* pPage = getNewDataBuf(pHandle->pBuf, pHandle->sourceId, &pageId);
if (pPage == NULL) {
return terrno;
}
int32_t size = blockDataGetSize(p) + sizeof(int32_t) + p->info.numOfCols * sizeof(int32_t);
assert(size <= getBufPageSize(pHandle->pBuf));
blockDataToBuf(pPage->data, p);
setBufPageDirty(pPage, true);
releaseBufPage(pHandle->pBuf, pPage);
blockDataDestroy(p);
start = stop + 1;
}
blockDataClearup(pDataBlock, pHandle->hasVarCol);
SSDataBlock* pBlock = createDataBlock(pDataBlock);
int32_t code = doAddNewExternalMemSource(pHandle->pBuf, pHandle->pOrderedSource, pBlock, &pHandle->sourceId);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int32_t startIndex, int32_t endIndex, SSortHandle* pHandle) {
cmpParam->pSources = taosArrayGet(pSources, startIndex);
cmpParam->numOfSources = (endIndex - startIndex + 1);
int32_t code = 0;
if (pHandle->type == SORT_SINGLESOURCE) {
for (int32_t i = 0; i < cmpParam->numOfSources; ++i) {
SExternalMemSource* pSource = cmpParam->pSources[i];
SPageInfo* pPgInfo = *(SPageInfo**)taosArrayGet(pSource->pageIdList, pSource->pageIndex);
SFilePage* pPage = getBufPage(pHandle->pBuf, getPageId(pPgInfo));
code = blockDataFromBuf(pSource->src.pBlock, pPage->data);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
releaseBufPage(pHandle->pBuf, pPage);
}
} else {
// multi-pass internal merge sort is required
if (pHandle->pBuf == NULL) {
code = createDiskbasedBuffer(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize, 0, "/tmp");
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
for (int32_t i = 0; i < cmpParam->numOfSources; ++i) {
SOperatorSource* pSource = cmpParam->pSources[i];
pSource->src.pBlock = pHandle->fetchfp(pSource->param);
}
}
return code;
}
static int32_t sortComparClearup(SMsortComparParam* cmpParam) {
for(int32_t i = 0; i < cmpParam->numOfSources; ++i) {
SExternalMemSource* pSource = cmpParam->pSources[i];
blockDataDestroy(pSource->src.pBlock);
tfree(pSource);
}
cmpParam->numOfSources = 0;
}
static void appendOneRowToDataBlock(SSDataBlock *pBlock, const SSDataBlock* pSource, int32_t* rowIndex) {
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
SColumnInfoData* pSrcColInfo = taosArrayGet(pSource->pDataBlock, i);
bool isNull = colDataIsNull(pSrcColInfo, pSource->info.rows, *rowIndex, NULL);
if (isNull) {
colDataAppend(pColInfo, pBlock->info.rows, NULL, true);
} else {
char* pData = colDataGet(pSrcColInfo, *rowIndex);
colDataAppend(pColInfo, pBlock->info.rows, pData, false);
}
}
pBlock->info.rows += 1;
*rowIndex += 1;
}
static int32_t adjustMergeTreeForNextTuple(SExternalMemSource *pSource, SMultiwayMergeTreeInfo *pTree, SSortHandle *pHandle, int32_t* numOfCompleted) {
/*
* load a new SDataBlock into memory of a given intermediate data-set source,
* since it's last record in buffer has been chosen to be processed, as the winner of loser-tree
*/
if (pSource->src.rowIndex >= pSource->src.pBlock->info.rows) {
pSource->src.rowIndex = 0;
pSource->pageIndex += 1;
if (pSource->pageIndex >= taosArrayGetSize(pSource->pageIdList)) {
(*numOfCompleted) += 1;
pSource->src.rowIndex = -1;
pSource->pageIndex = -1;
pSource->src.pBlock = blockDataDestroy(pSource->src.pBlock);
} else {
if (pHandle->type == SORT_SINGLESOURCE) {
SPageInfo* pPgInfo = *(SPageInfo**)taosArrayGet(pSource->pageIdList, pSource->pageIndex);
SFilePage* pPage = getBufPage(pHandle->pBuf, getPageId(pPgInfo));
int32_t code = blockDataFromBuf(pSource->src.pBlock, pPage->data);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
releaseBufPage(pHandle->pBuf, pPage);
} else {
pSource->src.pBlock = pHandle->fetchfp(((SOperatorSource*)pSource)->param);
if (pSource->src.pBlock == NULL) {
(*numOfCompleted) += 1;
pSource->src.rowIndex = -1;
}
}
}
}
/*
* Adjust loser tree otherwise, according to new candidate data
* if the loser tree is rebuild completed, we do not need to adjust
*/
int32_t leafNodeIndex = tMergeTreeGetAdjustIndex(pTree);
#ifdef _DEBUG_VIEW
printf("before adjust:\t");
tMergeTreePrint(pTree);
#endif
tMergeTreeAdjust(pTree, leafNodeIndex);
#ifdef _DEBUG_VIEW
printf("\nafter adjust:\t");
tMergeTreePrint(pTree);
#endif
}
static SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SMsortComparParam* cmpParam, int32_t capacity) {
blockDataClearup(pHandle->pDataBlock, pHandle->hasVarCol);
while(1) {
if (cmpParam->numOfSources == pHandle->numOfCompletedSources) {
break;
}
int32_t index = tMergeTreeGetChosenIndex(pHandle->pMergeTree);
SExternalMemSource *pSource = (*cmpParam).pSources[index];
appendOneRowToDataBlock(pHandle->pDataBlock, pSource->src.pBlock, &pSource->src.rowIndex);
int32_t code = adjustMergeTreeForNextTuple(pSource, pHandle->pMergeTree, pHandle, &pHandle->numOfCompletedSources);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return NULL;
}
if (pHandle->pDataBlock->info.rows >= capacity) {
return pHandle->pDataBlock;
}
}
return (pHandle->pDataBlock->info.rows > 0)? pHandle->pDataBlock:NULL;
}
static int32_t msortComparFn(const void *pLeft, const void *pRight, void *param) {
int32_t pLeftIdx = *(int32_t *)pLeft;
int32_t pRightIdx = *(int32_t *)pRight;
SMsortComparParam *pParam = (SMsortComparParam *)param;
SArray *pInfo = pParam->orderInfo;
SExternalMemSource* pLeftSource = pParam->pSources[pLeftIdx];
SExternalMemSource* pRightSource = pParam->pSources[pRightIdx];
// this input is exhausted, set the special value to denote this
if (pLeftSource->src.rowIndex == -1) {
return 1;
}
if (pRightSource->src.rowIndex == -1) {
return -1;
}
SSDataBlock* pLeftBlock = pLeftSource->src.pBlock;
SSDataBlock* pRightBlock = pRightSource->src.pBlock;
for(int32_t i = 0; i < pInfo->size; ++i) {
SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(pInfo, i);
SColumnInfoData* pLeftColInfoData = TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pOrder->colIndex);
bool leftNull = false;
if (pLeftColInfoData->hasNull) {
leftNull = colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->src.rowIndex, pLeftBlock->pBlockAgg);
}
SColumnInfoData* pRightColInfoData = TARRAY_GET_ELEM(pRightBlock->pDataBlock, pOrder->colIndex);
bool rightNull = false;
if (pRightColInfoData->hasNull) {
rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, pRightSource->src.rowIndex, pRightBlock->pBlockAgg);
}
if (leftNull && rightNull) {
continue; // continue to next slot
}
if (rightNull) {
return pParam->nullFirst? 1:-1;
}
if (leftNull) {
return pParam->nullFirst? -1:1;
}
void* left1 = colDataGet(pLeftColInfoData, pLeftSource->src.rowIndex);
void* right1 = colDataGet(pRightColInfoData, pRightSource->src.rowIndex);
switch(pLeftColInfoData->info.type) {
case TSDB_DATA_TYPE_INT: {
int32_t leftv = *(int32_t*)left1;
int32_t rightv = *(int32_t*)right1;
if (leftv == rightv) {
break;
} else {
if (pOrder->order == TSDB_ORDER_ASC) {
return leftv < rightv? -1 : 1;
} else {
return leftv < rightv? 1 : -1;
}
}
}
default:
assert(0);
}
}
}
static int32_t doInternalMergeSort(SSortHandle* pHandle) {
size_t numOfSources = taosArrayGetSize(pHandle->pOrderedSource);
// Calculate the I/O counts to complete the data sort.
double sortPass = floorl(log2(numOfSources) / log2(pHandle->numOfPages));
pHandle->totalElapsed = taosGetTimestampUs() - pHandle->startTs;
qDebug("%s %d rounds mergesort required to complete the sort, first-round sorted data size:%"PRIzu", sort:%"PRId64", total elapsed:%"PRId64,
pHandle->idStr, (int32_t) (sortPass + 1), getTotalBufSize(pHandle->pBuf), pHandle->sortElapsed, pHandle->totalElapsed);
size_t pgSize = pHandle->pageSize;
int32_t numOfRows = (pgSize - blockDataGetSerialMetaSize(pHandle->pDataBlock))/ blockDataGetSerialRowSize(pHandle->pDataBlock);
blockDataEnsureCapacity(pHandle->pDataBlock, numOfRows);
size_t numOfSorted = taosArrayGetSize(pHandle->pOrderedSource);
for(int32_t t = 0; t < sortPass; ++t) {
int64_t st = taosGetTimestampUs();
SArray* pResList = taosArrayInit(4, POINTER_BYTES);
int32_t numOfInputSources = pHandle->numOfPages;
int32_t sortGroup = (numOfSorted + numOfInputSources - 1) / numOfInputSources;
// Only *numOfInputSources* can be loaded into buffer to perform the external sort.
for(int32_t i = 0; i < sortGroup; ++i) {
pHandle->sourceId += 1;
int32_t end = (i + 1) * numOfInputSources - 1;
if (end > numOfSorted - 1) {
end = numOfSorted - 1;
}
pHandle->cmpParam.numOfSources = end - i * numOfInputSources + 1;
int32_t code = sortComparInit(&pHandle->cmpParam, pHandle->pOrderedSource, i * numOfInputSources, end, pHandle);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = tMergeTreeCreate(&pHandle->pMergeTree, pHandle->cmpParam.numOfSources, &pHandle->cmpParam, pHandle->comparFn);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
while (1) {
SSDataBlock* pDataBlock = getSortedBlockData(pHandle, &pHandle->cmpParam, numOfRows);
if (pDataBlock == NULL) {
break;
}
int32_t pageId = -1;
SFilePage* pPage = getNewDataBuf(pHandle->pBuf, pHandle->sourceId, &pageId);
if (pPage == NULL) {
return terrno;
}
int32_t size = blockDataGetSize(pDataBlock) + sizeof(int32_t) + pDataBlock->info.numOfCols * sizeof(int32_t);
assert(size <= getBufPageSize(pHandle->pBuf));
blockDataToBuf(pPage->data, pDataBlock);
setBufPageDirty(pPage, true);
releaseBufPage(pHandle->pBuf, pPage);
blockDataClearup(pDataBlock, pHandle->hasVarCol);
}
tMergeTreeDestroy(pHandle->pMergeTree);
pHandle->numOfCompletedSources = 0;
SSDataBlock* pBlock = createDataBlock(pHandle->pDataBlock);
code = doAddNewExternalMemSource(pHandle->pBuf, pResList, pBlock, &pHandle->sourceId);
if (code != 0) {
return code;
}
}
sortComparClearup(&pHandle->cmpParam);
taosArrayClear(pHandle->pOrderedSource);
taosArrayAddAll(pHandle->pOrderedSource, pResList);
taosArrayDestroy(pResList);
numOfSorted = taosArrayGetSize(pHandle->pOrderedSource);
int64_t el = taosGetTimestampUs() - st;
pHandle->totalElapsed += el;
SDiskbasedBufStatis statis = getDBufStatis(pHandle->pBuf);
qDebug("%s %d round mergesort, elapsed:%"PRId64" readDisk:%.2f Kb, flushDisk:%.2f Kb", pHandle->idStr, t + 1, el, statis.loadBytes/1024.0,
statis.flushBytes/1024.0);
if (pHandle->type == SORT_MULTIWAY_MERGE) {
pHandle->type = SORT_SINGLESOURCE;
pHandle->comparFn = msortComparFn;
}
}
pHandle->cmpParam.numOfSources = taosArrayGetSize(pHandle->pOrderedSource);
return 0;
}
int32_t sortOpen(SSortHandle* pHandle) {
if (pHandle->opened) {
return 0;
}
if (pHandle->fetchfp == NULL || pHandle->comparFn == NULL) {
return -1;
}
pHandle->opened = true;
size_t sortBufSize = pHandle->numOfPages * pHandle->pageSize;
if (pHandle->type == SORT_SINGLESOURCE) {
if (pHandle->pParam == NULL) {
qError("%s sort source not set yet", pHandle->idStr);
return -1;
}
while (1) {
SSDataBlock* pBlock = pHandle->fetchfp(pHandle->pParam);
if (pBlock == NULL) {
break;
}
if (pHandle->pDataBlock == NULL) {
pHandle->pDataBlock = createDataBlock(pBlock);
}
int32_t code = blockDataMerge(pHandle->pDataBlock, pBlock);
if (code != 0) {
return code;
}
size_t size = blockDataGetSize(pHandle->pDataBlock);
if (size > sortBufSize) {
// Perform the in-memory sort and then flush data in the buffer into disk.
int64_t p = taosGetTimestampUs();
blockDataSort(pHandle->pDataBlock, pHandle->pOrderInfo, pHandle->nullFirst);
int64_t el = taosGetTimestampUs() - p;
pHandle->sortElapsed += el;
doAddToBuf(pHandle->pDataBlock, pHandle);
}
}
if (pHandle->pDataBlock->info.rows > 0) {
size_t size = blockDataGetSize(pHandle->pDataBlock);
// Perform the in-memory sort and then flush data in the buffer into disk.
blockDataSort(pHandle->pDataBlock, pHandle->pOrderInfo, pHandle->nullFirst);
// All sorted data can fit in memory, external memory sort is not needed. Return to directly
if (size <= sortBufSize) {
pHandle->cmpParam.numOfSources = 1;
pHandle->inMemSort = true;
pHandle->tupleHandle.rowIndex = -1;
pHandle->tupleHandle.pBlock = pHandle->pDataBlock;
return 0;
} else {
doAddToBuf(pHandle->pDataBlock, pHandle);
}
}
} else {
// do nothing
}
// do internal sort
int32_t code = doInternalMergeSort(pHandle);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
int32_t numOfSources = taosArrayGetSize(pHandle->pOrderedSource);
ASSERT(numOfSources <= getNumOfInMemBufPages(pHandle->pBuf));
code = sortComparInit(&pHandle->cmpParam, pHandle->pOrderedSource, 0, numOfSources - 1, pHandle);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = tMergeTreeCreate(&pHandle->pMergeTree, pHandle->cmpParam.numOfSources, &pHandle->cmpParam, pHandle->comparFn);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
int32_t sortClose(SSortHandle* pHandle) {
// do nothing
}
int32_t setFetchRawDataFp(SSortHandle* pHandle, _sort_fetch_block_fn_t fp) {
pHandle->fetchfp = fp;
}
int32_t setComparFn(SSortHandle* pHandle, _sort_merge_compar_fn_t fp) {
pHandle->comparFn = fp;
}
STupleHandle* sortNextTuple(SSortHandle* pHandle) {
if (pHandle->cmpParam.numOfSources == pHandle->numOfCompletedSources) {
return NULL;
}
if (pHandle->inMemSort) {
pHandle->tupleHandle.rowIndex += 1;
if (pHandle->tupleHandle.rowIndex == pHandle->pDataBlock->info.rows) {
pHandle->numOfCompletedSources = 1;
return NULL;
}
return &pHandle->tupleHandle;
}
int32_t index = tMergeTreeGetChosenIndex(pHandle->pMergeTree);
SExternalMemSource *pSource = pHandle->cmpParam.pSources[index];
if (pHandle->needAdjust) {
int32_t code = adjustMergeTreeForNextTuple(pSource, pHandle->pMergeTree, pHandle, &pHandle->numOfCompletedSources);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return NULL;
}
}
if (pHandle->cmpParam.numOfSources == pHandle->numOfCompletedSources) {
return NULL;
}
index = tMergeTreeGetChosenIndex(pHandle->pMergeTree);
pSource = pHandle->cmpParam.pSources[index];
assert(pSource->src.pBlock != NULL);
pHandle->tupleHandle.rowIndex = pSource->src.rowIndex;
pHandle->tupleHandle.pBlock = pSource->src.pBlock;
pHandle->needAdjust = true;
pSource->src.rowIndex += 1;
return &pHandle->tupleHandle;
}
bool sortIsValueNull(STupleHandle* pVHandle, int32_t colIndex) {
return false;
}
void* sortGetValue(STupleHandle* pVHandle, int32_t colIndex) {
SColumnInfoData* pColInfo = TARRAY_GET_ELEM(pVHandle->pBlock->pDataBlock, colIndex);
return colDataGet(pColInfo, pVHandle->rowIndex);
}
......@@ -121,6 +121,7 @@ int main(int argc, char** argv) {
return RUN_ALL_TESTS();
}
#if 0
TEST(testCase, build_executor_tree_Test) {
const char* msg = "{\n"
"\t\"Id\":\t{\n"
......@@ -330,7 +331,7 @@ TEST(testCase, external_sort_Test) {
}
}
printStatisBeforeClose(((SOrderOperatorInfo*) pOperator->info)->pSortInternalBuf);
// setPrintStatis(((SOrderOperatorInfo*) pOperator->info)->pSortInternalBuf);
int64_t s2 = taosGetTimestampUs();
printf("total:%ld\n", s2 - s1);
......@@ -341,4 +342,7 @@ TEST(testCase, external_sort_Test) {
taosArrayDestroy(pExprInfo);
taosArrayDestroy(pOrderVal);
}
#endif
#pragma GCC diagnostic pop
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <executorimpl.h>
#include <gtest/gtest.h>
#include <tglobal.h>
#include <tsort.h>
#include <iostream>
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wwrite-strings"
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wsign-compare"
#include "os.h"
#include "executor.h"
#include "stub.h"
#include "taos.h"
#include "tdef.h"
#include "tep.h"
#include "trpc.h"
#include "tvariant.h"
namespace {
typedef struct {
int32_t startVal;
int32_t count;
int32_t pageRows;
} _info;
SSDataBlock* getSingleColDummyBlock(void* param) {
_info* pInfo = (_info*) param;
if (--pInfo->count < 0) {
return NULL;
}
SSDataBlock* pBlock = static_cast<SSDataBlock*>(calloc(1, sizeof(SSDataBlock)));
pBlock->pDataBlock = taosArrayInit(4, sizeof(SColumnInfoData));
SColumnInfoData colInfo = {0};
colInfo.info.type = TSDB_DATA_TYPE_INT;
colInfo.info.bytes = sizeof(int32_t);
colInfo.info.colId = 1;
colInfo.pData = static_cast<char*>(calloc(pInfo->pageRows, sizeof(int32_t)));
colInfo.nullbitmap = static_cast<char*>(calloc(1, (pInfo->pageRows + 7) / 8));
taosArrayPush(pBlock->pDataBlock, &colInfo);
for (int32_t i = 0; i < pInfo->pageRows; ++i) {
SColumnInfoData* pColInfo = static_cast<SColumnInfoData*>(TARRAY_GET_ELEM(pBlock->pDataBlock, 0));
int32_t v = ++pInfo->startVal;
colDataAppend(pColInfo, i, reinterpret_cast<const char*>(&v), false);
}
pBlock->info.rows = pInfo->pageRows;
pBlock->info.numOfCols = 1;
return pBlock;
}
int32_t docomp(const void* p1, const void* p2, void* param) {
int32_t pLeftIdx = *(int32_t *)p1;
int32_t pRightIdx = *(int32_t *)p2;
SMsortComparParam *pParam = (SMsortComparParam *)param;
SOperatorSource** px = reinterpret_cast<SOperatorSource**>(pParam->pSources);
SArray *pInfo = pParam->orderInfo;
SOperatorSource* pLeftSource = px[pLeftIdx];
SOperatorSource* pRightSource = px[pRightIdx];
// this input is exhausted, set the special value to denote this
if (pLeftSource->src.rowIndex == -1) {
return 1;
}
if (pRightSource->src.rowIndex == -1) {
return -1;
}
SSDataBlock* pLeftBlock = pLeftSource->src.pBlock;
SSDataBlock* pRightBlock = pRightSource->src.pBlock;
for(int32_t i = 0; i < pInfo->size; ++i) {
SBlockOrderInfo* pOrder = (SBlockOrderInfo*)TARRAY_GET_ELEM(pInfo, i);
SColumnInfoData* pLeftColInfoData = (SColumnInfoData*)TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pOrder->colIndex);
bool leftNull = false;
if (pLeftColInfoData->hasNull) {
leftNull = colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->src.rowIndex, pLeftBlock->pBlockAgg);
}
SColumnInfoData* pRightColInfoData = (SColumnInfoData*) TARRAY_GET_ELEM(pRightBlock->pDataBlock, pOrder->colIndex);
bool rightNull = false;
if (pRightColInfoData->hasNull) {
rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, pRightSource->src.rowIndex, pRightBlock->pBlockAgg);
}
if (leftNull && rightNull) {
continue; // continue to next slot
}
if (rightNull) {
return pParam->nullFirst? 1:-1;
}
if (leftNull) {
return pParam->nullFirst? -1:1;
}
void* left1 = colDataGet(pLeftColInfoData, pLeftSource->src.rowIndex);
void* right1 = colDataGet(pRightColInfoData, pRightSource->src.rowIndex);
switch(pLeftColInfoData->info.type) {
case TSDB_DATA_TYPE_INT: {
int32_t leftv = *(int32_t*)left1;
int32_t rightv = *(int32_t*)right1;
if (leftv == rightv) {
break;
} else {
if (pOrder->order == TSDB_ORDER_ASC) {
return leftv < rightv? -1 : 1;
} else {
return leftv < rightv? 1 : -1;
}
}
}
default:
assert(0);
}
}
}
} // namespace
//TEST(testCase, inMem_sort_Test) {
// SArray* pOrderVal = taosArrayInit(4, sizeof(SOrder));
// SOrder o = {.order = TSDB_ORDER_ASC};
// o.col.info.colId = 1;
// o.col.info.type = TSDB_DATA_TYPE_INT;
// taosArrayPush(pOrderVal, &o);
//
// int32_t numOfRows = 1000;
// SBlockOrderInfo oi = {0};
// oi.order = TSDB_ORDER_ASC;
// oi.colIndex = 0;
// SArray* orderInfo = taosArrayInit(1, sizeof(SBlockOrderInfo));
// taosArrayPush(orderInfo, &oi);
//
// SSortHandle* phandle = createSortHandle(orderInfo, false, SORT_SINGLESOURCE, 1024, 5, "test_abc");
// setFetchRawDataFp(phandle, getSingleColDummyBlock);
// sortAddSource(phandle, &numOfRows);
//
// int32_t code = sortOpen(phandle);
// int32_t row = 1;
//
// while(1) {
// STupleHandle* pTupleHandle = sortNextTuple(phandle);
// if (pTupleHandle == NULL) {
// break;
// }
//
// void* v = sortGetValue(pTupleHandle, 0);
// printf("%d: %d\n", row++, *(int32_t*) v);
//
// }
// destroySortHandle(phandle);
//}
//
//TEST(testCase, external_mem_sort_Test) {
// totalcount = 50;
// startVal = 100000;
//
// SArray* pOrderVal = taosArrayInit(4, sizeof(SOrder));
// SOrder o = {.order = TSDB_ORDER_ASC};
// o.col.info.colId = 1;
// o.col.info.type = TSDB_DATA_TYPE_INT;
// taosArrayPush(pOrderVal, &o);
//
// int32_t numOfRows = 1000;
// SBlockOrderInfo oi = {0};
// oi.order = TSDB_ORDER_ASC;
// oi.colIndex = 0;
// SArray* orderInfo = taosArrayInit(1, sizeof(SBlockOrderInfo));
// taosArrayPush(orderInfo, &oi);
//
// SSortHandle* phandle = createSortHandle(orderInfo, false, SORT_SINGLESOURCE, 1024, 5, "test_abc");
// setFetchRawDataFp(phandle, getSingleColDummyBlock);
// sortAddSource(phandle, &numOfRows);
//
// int32_t code = sortOpen(phandle);
// int32_t row = 1;
//
// while(1) {
// STupleHandle* pTupleHandle = sortNextTuple(phandle);
// if (pTupleHandle == NULL) {
// break;
// }
//
// void* v = sortGetValue(pTupleHandle, 0);
// printf("%d: %d\n", row++, *(int32_t*) v);
//
// }
// destroySortHandle(phandle);
//}
TEST(testCase, ordered_merge_sort_Test) {
SArray* pOrderVal = taosArrayInit(4, sizeof(SOrder));
SOrder o = {.order = TSDB_ORDER_ASC};
o.col.info.colId = 1;
o.col.info.type = TSDB_DATA_TYPE_INT;
taosArrayPush(pOrderVal, &o);
int32_t numOfRows = 1000;
SBlockOrderInfo oi = {0};
oi.order = TSDB_ORDER_ASC;
oi.colIndex = 0;
SArray* orderInfo = taosArrayInit(1, sizeof(SBlockOrderInfo));
taosArrayPush(orderInfo, &oi);
SSchema s = {.type = TSDB_DATA_TYPE_INT, .colId = 1, .bytes = 4};
SSortHandle* phandle = createSortHandle(orderInfo, false, SORT_MULTIWAY_MERGE, 1024, 5, &s, 1,"test_abc");
setFetchRawDataFp(phandle, getSingleColDummyBlock);
setComparFn(phandle, docomp);
for(int32_t i = 0; i < 10; ++i) {
SOperatorSource* p = static_cast<SOperatorSource*>(calloc(1, sizeof(SOperatorSource)));
_info* c = static_cast<_info*>(calloc(1, sizeof(_info)));
c->count = 1;
c->pageRows = 1000;
c->startVal = 0;
p->param = c;
sortAddSource(phandle, p);
}
int32_t code = sortOpen(phandle);
int32_t row = 1;
while(1) {
STupleHandle* pTupleHandle = sortNextTuple(phandle);
if (pTupleHandle == NULL) {
break;
}
void* v = sortGetValue(pTupleHandle, 0);
printf("%d: %d\n", row++, *(int32_t*) v);
}
destroySortHandle(phandle);
}
#pragma GCC diagnostic pop
......@@ -53,7 +53,7 @@ typedef struct SDiskbasedBuf {
static void printStatisData(const SDiskbasedBuf* pBuf);
int32_t createDiskbasedBuffer(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMemBufSize, uint64_t qId, const char* dir) {
int32_t createDiskbasedBuffer(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMemBufSize, uint64_t qId, const char* dir) {
*pBuf = calloc(1, sizeof(SDiskbasedBuf));
SDiskbasedBuf* pResBuf = *pBuf;
......@@ -569,7 +569,7 @@ void setBufPageDirty(SFilePage* pPage, bool dirty) {
ppi->dirty = dirty;
}
void printStatisBeforeClose(SDiskbasedBuf* pBuf) {
void setPrintStatis(SDiskbasedBuf* pBuf) {
pBuf->printStatis = true;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册