提交 6c1f75fe 编写于 作者: H Haojun Liao

[td-11818] refactor tsort.

上级 17ead5ed
......@@ -551,7 +551,9 @@ typedef struct SDistinctOperatorInfo {
} SDistinctOperatorInfo;
typedef struct SSortedMergeOperatorInfo {
SSDataBlock *pDataBlock;
SOptrBasicInfo binfo;
// SSDataBlock *pDataBlock;
bool hasVarCol;
SArray *orderInfo; // SArray<SBlockOrderInfo>
......@@ -563,6 +565,11 @@ typedef struct SSortedMergeOperatorInfo {
int32_t bufPageSize;
uint32_t sortBufSize; // max buffer size for in-memory sort
int32_t numOfRowsInRes;
char** prevRow;
int32_t resultRowFactor;
bool multiGroupResults;
bool hasGroupColData;
} SSortedMergeOperatorInfo;
typedef struct SOrderOperatorInfo {
......
......@@ -24,8 +24,8 @@ extern "C" {
#include "os.h"
enum {
SORT_MULTIWAY_MERGE = 0x1,
SORT_SINGLESOURCE = 0x2,
SORT_MULTISOURCE_MERGE = 0x1,
SORT_SINGLESOURCE_SORT = 0x2,
};
typedef struct SMultiMergeSource {
......@@ -40,10 +40,10 @@ typedef struct SExternalMemSource {
int32_t pageIndex;
} SExternalMemSource;
typedef struct SOperatorSource {
typedef struct SGenericSource {
SMultiMergeSource src;
void* param;
} SOperatorSource;
void *param;
} SGenericSource;
typedef struct SMsortComparParam {
void **pSources;
......
......@@ -5603,7 +5603,7 @@ static void destroySortedMergeOperatorInfo(void* param, int32_t numOfOutput) {
SSortedMergeOperatorInfo* pInfo = (SSortedMergeOperatorInfo*) param;
taosArrayDestroy(pInfo->orderInfo);
destroySortHandle(pInfo->pSortHandle);
blockDataDestroy(pInfo->pDataBlock);
blockDataDestroy(pInfo->binfo.pRes);
}
static void destroySlimitOperatorInfo(void* param, int32_t numOfOutput) {
......@@ -5624,6 +5624,7 @@ static SExprInfo* exprArrayDup(SArray* pExprInfo) {
return p;
}
// TODO merge aggregate super table
static void appendOneRowToDataBlock(SSDataBlock *pBlock, STupleHandle* pTupleHandle) {
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
......@@ -5674,21 +5675,20 @@ static SSDataBlock* doSortedMerge(void* param, bool* newgroup) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SSortedMergeOperatorInfo* pInfo = pOperator->info;
if (pOperator->status == OP_RES_TO_RETURN) {
return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, pInfo->hasVarCol, pInfo->numOfRowsInRes);
return getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pInfo->hasVarCol, pInfo->numOfRowsInRes);
}
SSchema* p = blockDataExtractSchema(pInfo->pDataBlock, NULL);
SSchema* p = blockDataExtractSchema(pInfo->binfo.pRes, NULL);
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
pInfo->pSortHandle = createSortHandle(pInfo->orderInfo, pInfo->nullFirst, SORT_MULTIWAY_MERGE, pInfo->bufPageSize,
numOfBufPage, p, pInfo->pDataBlock->info.numOfCols, "GET_TASKID(pTaskInfo)");
pInfo->pSortHandle = createSortHandle(pInfo->orderInfo, pInfo->nullFirst, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize,
numOfBufPage, p, pInfo->binfo.pRes->info.numOfCols, "GET_TASKID(pTaskInfo)");
tfree(p);
setFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock);
for(int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
SOperatorSource* ps = calloc(1, sizeof(SOperatorSource));
SGenericSource* ps = calloc(1, sizeof(SGenericSource));
ps->param = pOperator->pDownstream[i];
sortAddSource(pInfo->pSortHandle, ps);
}
......@@ -5698,7 +5698,7 @@ static SSDataBlock* doSortedMerge(void* param, bool* newgroup) {
}
pOperator->status = OP_RES_TO_RETURN;
return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, pInfo->hasVarCol, pInfo->numOfRowsInRes);
return getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pInfo->hasVarCol, pInfo->numOfRowsInRes);
}
static SArray* createBlockOrder(SArray* pExprInfo, SArray* pOrderVal) {
......@@ -5729,18 +5729,22 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
tfree(pInfo);
tfree(pOperator);
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return NULL;
}
int32_t numOfOutput = taosArrayGetSize(pExprInfo);
pInfo->binfo.capacity = 4096;
pInfo->binfo.pCtx = createSqlFunctionCtx_rv(pExprInfo, &pInfo->binfo.rowCellInfoOffset, &pInfo->binfo.resRowSize);
// pInfo->resultRowFactor =
// (int32_t)(getRowNumForMultioutput(pRuntimeEnv->pQueryAttr, pRuntimeEnv->pQueryAttr->topBotQuery, false));
pInfo->sortBufSize = 1024 * 16; // 1MB
pInfo->bufPageSize = 1024;
pInfo->numOfRowsInRes = 1024;
pInfo->pDataBlock = createOutputBuf_rv(pExprInfo, pInfo->numOfRowsInRes);
pInfo->orderInfo = createBlockOrder(pExprInfo, pOrderVal);
pInfo->binfo.pRes = createOutputBuf_rv(pExprInfo, pInfo->numOfRowsInRes);
pInfo->orderInfo = createBlockOrder(pExprInfo, pOrderVal);
int32_t numOfRows = 1;
......@@ -5749,6 +5753,7 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t
pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo;
pOperator->numOfOutput = numOfOutput;
pOperator->pTaskInfo = pTaskInfo;
pOperator->exec = doSortedMerge;
......@@ -5775,13 +5780,15 @@ static SSDataBlock* doSort(void* param, bool* newgroup) {
SSchema* p = blockDataExtractSchema(pInfo->pDataBlock, NULL);
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
pInfo->pSortHandle = createSortHandle(pInfo->orderInfo, pInfo->nullFirst, SORT_SINGLESOURCE, pInfo->bufPageSize,
pInfo->pSortHandle = createSortHandle(pInfo->orderInfo, pInfo->nullFirst, SORT_SINGLESOURCE_SORT, pInfo->bufPageSize,
numOfBufPage, p, pInfo->pDataBlock->info.numOfCols, "GET_TASKID(pTaskInfo)");
tfree(p);
setFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock);
sortAddSource(pInfo->pSortHandle, pOperator);
SGenericSource* ps = calloc(1, sizeof(SGenericSource));
ps->param = pOperator;
sortAddSource(pInfo->pSortHandle, ps);
// TODO set error code;
int32_t code = sortOpen(pInfo->pSortHandle);
......
......@@ -38,8 +38,6 @@ typedef struct SSortHandle {
SArray *pOrderInfo;
bool nullFirst;
bool hasVarCol;
SArray *pSources; // TODO refactor, remove it
SArray *pOrderedSource;
_sort_fetch_block_fn_t fetchfp;
......@@ -96,7 +94,6 @@ SSortHandle* createSortHandle(SArray* pOrderInfo, bool nullFirst, int32_t type,
pSortHandle->pageSize = pageSize;
pSortHandle->numOfPages = numOfPages;
pSortHandle->pOrderedSource = taosArrayInit(4, POINTER_BYTES);
pSortHandle->pSources = taosArrayInit(4, POINTER_BYTES);
pSortHandle->pOrderInfo = pOrderInfo;
pSortHandle->nullFirst = nullFirst;
pSortHandle->cmpParam.orderInfo = pOrderInfo;
......@@ -113,8 +110,6 @@ SSortHandle* createSortHandle(SArray* pOrderInfo, bool nullFirst, int32_t type,
void destroySortHandle(SSortHandle* pSortHandle) {
sortClose(pSortHandle);
taosArrayDestroy(pSortHandle->pSources);
if (pSortHandle->pMergeTree != NULL) {
tMergeTreeDestroy(pSortHandle->pMergeTree);
}
......@@ -125,11 +120,7 @@ void destroySortHandle(SSortHandle* pSortHandle) {
}
int32_t sortAddSource(SSortHandle* pSortHandle, void* pSource) {
if (pSortHandle->type == SORT_SINGLESOURCE) {
pSortHandle->pParam = pSource;
} else {
taosArrayPush(pSortHandle->pOrderedSource, &pSource);
}
taosArrayPush(pSortHandle->pOrderedSource, &pSource);
}
static SSDataBlock* createDataBlock(const SSDataBlock* pDataBlock) {
......@@ -220,7 +211,7 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int
int32_t code = 0;
if (pHandle->type == SORT_SINGLESOURCE) {
if (pHandle->type == SORT_SINGLESOURCE_SORT) {
for (int32_t i = 0; i < cmpParam->numOfSources; ++i) {
SExternalMemSource* pSource = cmpParam->pSources[i];
SPageInfo* pPgInfo = *(SPageInfo**)taosArrayGet(pSource->pageIdList, pSource->pageIndex);
......@@ -244,7 +235,7 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int
}
for (int32_t i = 0; i < cmpParam->numOfSources; ++i) {
SOperatorSource* pSource = cmpParam->pSources[i];
SGenericSource* pSource = cmpParam->pSources[i];
pSource->src.pBlock = pHandle->fetchfp(pSource->param);
}
}
......@@ -296,7 +287,7 @@ static int32_t adjustMergeTreeForNextTuple(SExternalMemSource *pSource, SMultiwa
pSource->pageIndex = -1;
pSource->src.pBlock = blockDataDestroy(pSource->src.pBlock);
} else {
if (pHandle->type == SORT_SINGLESOURCE) {
if (pHandle->type == SORT_SINGLESOURCE_SORT) {
SPageInfo* pPgInfo = *(SPageInfo**)taosArrayGet(pSource->pageIdList, pSource->pageIndex);
SFilePage* pPage = getBufPage(pHandle->pBuf, getPageId(pPgInfo));
......@@ -307,7 +298,7 @@ static int32_t adjustMergeTreeForNextTuple(SExternalMemSource *pSource, SMultiwa
releaseBufPage(pHandle->pBuf, pPage);
} else {
pSource->src.pBlock = pHandle->fetchfp(((SOperatorSource*)pSource)->param);
pSource->src.pBlock = pHandle->fetchfp(((SGenericSource*)pSource)->param);
if (pSource->src.pBlock == NULL) {
(*numOfCompleted) += 1;
pSource->src.rowIndex = -1;
......@@ -530,8 +521,8 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
qDebug("%s %d round mergesort, elapsed:%"PRId64" readDisk:%.2f Kb, flushDisk:%.2f Kb", pHandle->idStr, t + 1, el, statis.loadBytes/1024.0,
statis.flushBytes/1024.0);
if (pHandle->type == SORT_MULTIWAY_MERGE) {
pHandle->type = SORT_SINGLESOURCE;
if (pHandle->type == SORT_MULTISOURCE_MERGE) {
pHandle->type = SORT_SINGLESOURCE_SORT;
pHandle->comparFn = msortComparFn;
}
}
......@@ -540,26 +531,15 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
return 0;
}
int32_t sortOpen(SSortHandle* pHandle) {
if (pHandle->opened) {
return 0;
}
if (pHandle->fetchfp == NULL || pHandle->comparFn == NULL) {
return -1;
}
pHandle->opened = true;
static int32_t createInitialSortedMultiSources(SSortHandle* pHandle) {
size_t sortBufSize = pHandle->numOfPages * pHandle->pageSize;
if (pHandle->type == SORT_SINGLESOURCE) {
if (pHandle->pParam == NULL) {
qError("%s sort source not set yet", pHandle->idStr);
return -1;
}
if (pHandle->type == SORT_SINGLESOURCE_SORT) {
SGenericSource* source = taosArrayGetP(pHandle->pOrderedSource, 0);
taosArrayClear(pHandle->pOrderedSource);
while (1) {
SSDataBlock* pBlock = pHandle->fetchfp(pHandle->pParam);
SSDataBlock* pBlock = pHandle->fetchfp(source->param);
if (pBlock == NULL) {
break;
}
......@@ -604,12 +584,31 @@ int32_t sortOpen(SSortHandle* pHandle) {
doAddToBuf(pHandle->pDataBlock, pHandle);
}
}
} else {
// do nothing
tfree(source);
}
return TSDB_CODE_SUCCESS;
}
int32_t sortOpen(SSortHandle* pHandle) {
if (pHandle->opened) {
return 0;
}
if (pHandle->fetchfp == NULL || pHandle->comparFn == NULL) {
return -1;
}
pHandle->opened = true;
int32_t code = createInitialSortedMultiSources(pHandle);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
// do internal sort
int32_t code = doInternalMergeSort(pHandle);
code = doInternalMergeSort(pHandle);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......
......@@ -356,7 +356,7 @@ TEST(testCase, external_sort_Test) {
taosArrayDestroy(pExprInfo);
taosArrayDestroy(pOrderVal);
}
#endif
TEST(testCase, sorted_merge_Test) {
srand(time(NULL));
......@@ -424,5 +424,5 @@ TEST(testCase, sorted_merge_Test) {
taosArrayDestroy(pExprInfo);
taosArrayDestroy(pOrderVal);
}
#endif
#pragma GCC diagnostic pop
......@@ -76,12 +76,12 @@ int32_t docomp(const void* p1, const void* p2, void* param) {
int32_t pRightIdx = *(int32_t *)p2;
SMsortComparParam *pParam = (SMsortComparParam *)param;
SOperatorSource** px = reinterpret_cast<SOperatorSource**>(pParam->pSources);
SGenericSource** px = reinterpret_cast<SGenericSource**>(pParam->pSources);
SArray *pInfo = pParam->orderInfo;
SOperatorSource* pLeftSource = px[pLeftIdx];
SOperatorSource* pRightSource = px[pRightIdx];
SGenericSource* pLeftSource = px[pLeftIdx];
SGenericSource* pRightSource = px[pRightIdx];
// this input is exhausted, set the special value to denote this
if (pLeftSource->src.rowIndex == -1) {
......@@ -162,7 +162,7 @@ int32_t docomp(const void* p1, const void* p2, void* param) {
// SArray* orderInfo = taosArrayInit(1, sizeof(SBlockOrderInfo));
// taosArrayPush(orderInfo, &oi);
//
// SSortHandle* phandle = createSortHandle(orderInfo, false, SORT_SINGLESOURCE, 1024, 5, "test_abc");
// SSortHandle* phandle = createSortHandle(orderInfo, false, SORT_SINGLESOURCE_SORT, 1024, 5, "test_abc");
// setFetchRawDataFp(phandle, getSingleColDummyBlock);
// sortAddSource(phandle, &numOfRows);
//
......@@ -182,42 +182,50 @@ int32_t docomp(const void* p1, const void* p2, void* param) {
// destroySortHandle(phandle);
//}
//
//TEST(testCase, external_mem_sort_Test) {
// totalcount = 50;
// startVal = 100000;
//
// SArray* pOrderVal = taosArrayInit(4, sizeof(SOrder));
// SOrder o = {.order = TSDB_ORDER_ASC};
// o.col.info.colId = 1;
// o.col.info.type = TSDB_DATA_TYPE_INT;
// taosArrayPush(pOrderVal, &o);
//
TEST(testCase, external_mem_sort_Test) {
SArray* pOrderVal = taosArrayInit(4, sizeof(SOrder));
SOrder o = {.order = TSDB_ORDER_ASC};
o.col.info.colId = 1;
o.col.info.type = TSDB_DATA_TYPE_INT;
taosArrayPush(pOrderVal, &o);
// int32_t numOfRows = 1000;
// SBlockOrderInfo oi = {0};
// oi.order = TSDB_ORDER_ASC;
// oi.colIndex = 0;
// SArray* orderInfo = taosArrayInit(1, sizeof(SBlockOrderInfo));
// taosArrayPush(orderInfo, &oi);
//
// SSortHandle* phandle = createSortHandle(orderInfo, false, SORT_SINGLESOURCE, 1024, 5, "test_abc");
// setFetchRawDataFp(phandle, getSingleColDummyBlock);
// sortAddSource(phandle, &numOfRows);
//
// int32_t code = sortOpen(phandle);
// int32_t row = 1;
//
// while(1) {
// STupleHandle* pTupleHandle = sortNextTuple(phandle);
// if (pTupleHandle == NULL) {
// break;
// }
//
// void* v = sortGetValue(pTupleHandle, 0);
// printf("%d: %d\n", row++, *(int32_t*) v);
//
// }
// destroySortHandle(phandle);
//}
SBlockOrderInfo oi = {0};
oi.order = TSDB_ORDER_ASC;
oi.colIndex = 0;
SArray* orderInfo = taosArrayInit(1, sizeof(SBlockOrderInfo));
taosArrayPush(orderInfo, &oi);
SSchema s = {.type = TSDB_DATA_TYPE_INT, .colId = 1, .bytes = 4, };
SSortHandle* phandle = createSortHandle(orderInfo, false, SORT_SINGLESOURCE_SORT, 1024, 5, &s, 1, "test_abc");
setFetchRawDataFp(phandle, getSingleColDummyBlock);
_info* pInfo = (_info*) calloc(1, sizeof(_info));
pInfo->startVal = 100000;
pInfo->pageRows = 1000;
pInfo->count = 50;
SGenericSource* ps = static_cast<SGenericSource*>(calloc(1, sizeof(SGenericSource)));
ps->param = pInfo;
sortAddSource(phandle, ps);
int32_t code = sortOpen(phandle);
int32_t row = 1;
while(1) {
STupleHandle* pTupleHandle = sortNextTuple(phandle);
if (pTupleHandle == NULL) {
break;
}
void* v = sortGetValue(pTupleHandle, 0);
printf("%d: %d\n", row++, *(int32_t*) v);
}
destroySortHandle(phandle);
}
//TEST(testCase, ordered_merge_sort_Test) {
// SArray* pOrderVal = taosArrayInit(4, sizeof(SOrder));
......@@ -234,7 +242,7 @@ int32_t docomp(const void* p1, const void* p2, void* param) {
// taosArrayPush(orderInfo, &oi);
//
// SSchema s = {.type = TSDB_DATA_TYPE_INT, .colId = 1, .bytes = 4};
// SSortHandle* phandle = createSortHandle(orderInfo, false, SORT_MULTIWAY_MERGE, 1024, 5, &s, 1,"test_abc");
// SSortHandle* phandle = createSortHandle(orderInfo, false, SORT_MULTISOURCE_MERGE, 1024, 5, &s, 1,"test_abc");
// setFetchRawDataFp(phandle, getSingleColDummyBlock);
// setComparFn(phandle, docomp);
//
......
......@@ -562,7 +562,7 @@ bool isAllDataInMemBuf(const SDiskbasedBuf* pBuf) {
}
void setBufPageDirty(SFilePage* pPage, bool dirty) {
int32_t offset = offsetof(SPageInfo, pData); // todo extract method
int32_t offset = offsetof(SPageInfo, pData);
char* p = (char*)pPage - offset;
SPageInfo* ppi = ((SPageInfo**) p)[0];
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册