提交 bbc03a17 编写于 作者: H Haojun Liao

[td-11818] refactor code.

上级 39c06f13
......@@ -25,7 +25,7 @@ extern "C" {
enum {
SORT_MULTIWAY_MERGE = 0x1,
SORT_SINGLESOURCE = 0x2,
SORT_SINGLESOURCE = 0x2,
};
typedef struct SMultiMergeSource {
......
......@@ -5804,106 +5804,6 @@ static SSDataBlock* createDataBlock(const SSDataBlock* pDataBlock) {
return pBlock;
}
//static int32_t doAddNewExternalMemSource(SOrderOperatorInfo* pInfo, SArray* pAllSources, SSDataBlock* pBlock) {
// SExternalMemSource* pSource = calloc(1, sizeof(SExternalMemSource));
// if (pSource == NULL) {
// return TSDB_CODE_QRY_OUT_OF_MEMORY;
// }
//
// pSource->pageIdList = getDataBufPagesIdList(pInfo->pSortInternalBuf, pInfo->sourceId);
// pSource->src.pBlock = pBlock;
//
// taosArrayPush(pAllSources, &pSource);
//
// pInfo->sourceId += 1;
//
// int32_t rowSize = blockDataGetSerialRowSize(pSource->src.pBlock);
// int32_t numOfRows = (getBufPageSize(pInfo->pSortInternalBuf) - blockDataGetSerialMetaSize(pInfo->pDataBlock))/rowSize;
// return blockDataEnsureCapacity(pSource->src.pBlock, numOfRows);
//}
//static int32_t doAddNewOperatorSource(SArray* pAllSources, SSDataBlock* pBlock, int32_t capacity) {
// SOperatorSource* pSource = calloc(1, sizeof(SOperatorSource));
// if (pSource == NULL) {
// return TSDB_CODE_QRY_OUT_OF_MEMORY;
// }
//
// pSource->src.pBlock = pBlock;
// taosArrayPush(pAllSources, &pSource);
//
// return blockDataEnsureCapacity(pSource->src.pBlock, capacity);
//}
//void addToDiskbasedBuf(SOrderOperatorInfo* pInfo, SArray* pSources, jmp_buf env) {
// int32_t start = 0;
//
// while(start < pInfo->pDataBlock->info.rows) {
// int32_t stop = 0;
// blockDataSplitRows(pInfo->pDataBlock, pInfo->hasVarCol, start, &stop, getBufPageSize(pInfo->pSortInternalBuf));
// SSDataBlock* p = blockDataExtractBlock(pInfo->pDataBlock, start, stop - start + 1);
// if (p == NULL) {
// longjmp(env, TSDB_CODE_QRY_OUT_OF_MEMORY);
// }
//
// int32_t pageId = -1;
// SFilePage* pPage = getNewDataBuf(pInfo->pSortInternalBuf, pInfo->sourceId, &pageId);
// if (pPage == NULL) {
// assert(0);
// longjmp(env, terrno);
// }
//
// int32_t size = blockDataGetSize(p) + sizeof(int32_t) + p->info.numOfCols * sizeof(int32_t);
// assert(size <= getBufPageSize(pInfo->pSortInternalBuf));
//
// blockDataToBuf(pPage->data, p);
//
// setBufPageDirty(pPage, true);
// releaseBufPage(pInfo->pSortInternalBuf, pPage);
//
// blockDataDestroy(p);
// start = stop + 1;
// }
//
// int32_t numOfCols = pInfo->pDataBlock->info.numOfCols;
// blockDataClearup(pInfo->pDataBlock, pInfo->hasVarCol);
//
// SSDataBlock* pBlock = createDataBlock(pInfo->pDataBlock);
// int32_t code = doAddNewExternalMemSource(pInfo, pSources, pBlock);
// if (code != TSDB_CODE_SUCCESS) {
// longjmp(env, code);
// }
//}
static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int32_t startIndex, int32_t endIndex, SDiskbasedBuf* pBuf) {
cmpParam->pSources = taosArrayGet(pSources, startIndex);
cmpParam->numOfSources = (endIndex - startIndex + 1);
for(int32_t i = 0; i < cmpParam->numOfSources; ++i) {
SExternalMemSource* pSource = cmpParam->pSources[i];
SPageInfo* pPgInfo = *(SPageInfo**) taosArrayGet(pSource->pageIdList, pSource->pageIndex);
SFilePage* pPage = getBufPage(pBuf, getPageId(pPgInfo));
int32_t code = blockDataFromBuf(cmpParam->pSources[i]->src.pBlock, pPage->data);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
releaseBufPage(pBuf, pPage);
}
return TSDB_CODE_SUCCESS;
}
static int32_t sortComparClearup(SMsortComparParam* cmpParam) {
for(int32_t i = 0; i < cmpParam->numOfSources; ++i) {
SExternalMemSource* pSource = cmpParam->pSources[i];
blockDataDestroy(pSource->src.pBlock);
tfree(pSource);
}
cmpParam->numOfSources = 0;
}
static SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SOrderOperatorInfo* pInfo, int32_t capacity) {
blockDataClearup(pInfo->pDataBlock, pInfo->hasVarCol);
......@@ -5936,8 +5836,6 @@ static SSDataBlock* doSort(void* param, bool* newgroup) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SOrderOperatorInfo* pInfo = pOperator->info;
SSDataBlock* pBlock = NULL;
if (pOperator->status == OP_RES_TO_RETURN) {
return getSortedBlockData(pInfo->pSortHandle, pInfo, pInfo->numOfRowsInRes);
}
......@@ -6013,7 +5911,6 @@ SOperatorInfo *createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprI
}
}
// int32_t code = createDiskbasedBuffer(&pInfo->pSortInternalBuf, pInfo->bufPageSize, pInfo->sortBufSize, 1, "/tmp/");
if (pInfo->orderInfo == NULL || pInfo->pDataBlock == NULL) {
tfree(pOperator);
destroyOrderOperatorInfo(pInfo, taosArrayGetSize(pExprInfo));
......
......@@ -176,6 +176,7 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
if (pHandle->pBuf == NULL) {
int32_t code = createDiskbasedBuffer(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize, 0, "/tmp");
setPrintStatis(pHandle->pBuf);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -239,6 +240,7 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int
// multi-pass internal merge sort is required
if (pHandle->pBuf == NULL) {
code = createDiskbasedBuffer(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize, 0, "/tmp");
setPrintStatis(pHandle->pBuf);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......
......@@ -121,7 +121,6 @@ int main(int argc, char** argv) {
return RUN_ALL_TESTS();
}
#if 0
TEST(testCase, build_executor_tree_Test) {
const char* msg = "{\n"
"\t\"Id\":\t{\n"
......@@ -331,8 +330,6 @@ TEST(testCase, external_sort_Test) {
}
}
// setPrintStatis(((SOrderOperatorInfo*) pOperator->info)->pSortInternalBuf);
int64_t s2 = taosGetTimestampUs();
printf("total:%ld\n", s2 - s1);
......@@ -343,6 +340,5 @@ TEST(testCase, external_sort_Test) {
taosArrayDestroy(pOrderVal);
}
#endif
#pragma GCC diagnostic pop
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册