提交 aced34c4 编写于 作者: wmmhello's avatar wmmhello

fix error in sort operation

上级 4bc0e507
...@@ -34,16 +34,17 @@ typedef struct SMultiMergeSource { ...@@ -34,16 +34,17 @@ typedef struct SMultiMergeSource {
SSDataBlock *pBlock; SSDataBlock *pBlock;
} SMultiMergeSource; } SMultiMergeSource;
typedef struct SExternalMemSource { typedef struct SSortSource {
SMultiMergeSource src; SMultiMergeSource src;
SArray* pageIdList; union{
int32_t pageIndex; struct{
} SExternalMemSource; SArray* pageIdList;
int32_t pageIndex;
typedef struct SGenericSource { };
SMultiMergeSource src; void *param;
void *param; };
} SGenericSource;
} SSortSource;
typedef struct SMsortComparParam { typedef struct SMsortComparParam {
void **pSources; void **pSources;
......
...@@ -5982,7 +5982,7 @@ static SSDataBlock* doSortedMerge(SOperatorInfo* pOperator, bool* newgroup) { ...@@ -5982,7 +5982,7 @@ static SSDataBlock* doSortedMerge(SOperatorInfo* pOperator, bool* newgroup) {
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock); tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock);
for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) { for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
SGenericSource* ps = taosMemoryCalloc(1, sizeof(SGenericSource)); SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
ps->param = pOperator->pDownstream[i]; ps->param = pOperator->pDownstream[i];
tsortAddSource(pInfo->pSortHandle, ps); tsortAddSource(pInfo->pSortHandle, ps);
} }
...@@ -6128,7 +6128,7 @@ static SSDataBlock* doSort(SOperatorInfo* pOperator, bool* newgroup) { ...@@ -6128,7 +6128,7 @@ static SSDataBlock* doSort(SOperatorInfo* pOperator, bool* newgroup) {
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock); tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock);
SGenericSource* ps = taosMemoryCalloc(1, sizeof(SGenericSource)); SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
ps->param = pOperator->pDownstream[0]; ps->param = pOperator->pDownstream[0];
tsortAddSource(pInfo->pSortHandle, ps); tsortAddSource(pInfo->pSortHandle, ps);
......
...@@ -113,7 +113,7 @@ SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t page ...@@ -113,7 +113,7 @@ SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t page
static int32_t sortComparClearup(SMsortComparParam* cmpParam) { static int32_t sortComparClearup(SMsortComparParam* cmpParam) {
for(int32_t i = 0; i < cmpParam->numOfSources; ++i) { for(int32_t i = 0; i < cmpParam->numOfSources; ++i) {
SExternalMemSource* pSource = cmpParam->pSources[i]; SSortSource* pSource = cmpParam->pSources[i]; // NOTICE: pSource may be SGenericSource *, if it is SORT_MULTISOURCE_MERGE
blockDataDestroy(pSource->src.pBlock); blockDataDestroy(pSource->src.pBlock);
taosMemoryFreeClear(pSource); taosMemoryFreeClear(pSource);
} }
...@@ -132,7 +132,7 @@ void tsortDestroySortHandle(SSortHandle* pSortHandle) { ...@@ -132,7 +132,7 @@ void tsortDestroySortHandle(SSortHandle* pSortHandle) {
taosMemoryFreeClear(pSortHandle->idStr); taosMemoryFreeClear(pSortHandle->idStr);
blockDataDestroy(pSortHandle->pDataBlock); blockDataDestroy(pSortHandle->pDataBlock);
for (size_t i = 0; i < taosArrayGetSize(pSortHandle->pOrderedSource); i++){ for (size_t i = 0; i < taosArrayGetSize(pSortHandle->pOrderedSource); i++){
SExternalMemSource** pSource = taosArrayGet(pSortHandle->pOrderedSource, i); SSortSource** pSource = taosArrayGet(pSortHandle->pOrderedSource, i);
blockDataDestroy((*pSource)->src.pBlock); blockDataDestroy((*pSource)->src.pBlock);
taosMemoryFreeClear(*pSource); taosMemoryFreeClear(*pSource);
} }
...@@ -146,7 +146,7 @@ int32_t tsortAddSource(SSortHandle* pSortHandle, void* pSource) { ...@@ -146,7 +146,7 @@ int32_t tsortAddSource(SSortHandle* pSortHandle, void* pSource) {
} }
static int32_t doAddNewExternalMemSource(SDiskbasedBuf *pBuf, SArray* pAllSources, SSDataBlock* pBlock, int32_t* sourceId) { static int32_t doAddNewExternalMemSource(SDiskbasedBuf *pBuf, SArray* pAllSources, SSDataBlock* pBlock, int32_t* sourceId) {
SExternalMemSource* pSource = taosMemoryCalloc(1, sizeof(SExternalMemSource)); SSortSource* pSource = taosMemoryCalloc(1, sizeof(SSortSource));
if (pSource == NULL) { if (pSource == NULL) {
return TSDB_CODE_QRY_OUT_OF_MEMORY; return TSDB_CODE_QRY_OUT_OF_MEMORY;
} }
...@@ -216,7 +216,7 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int ...@@ -216,7 +216,7 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int
if (pHandle->type == SORT_SINGLESOURCE_SORT) { if (pHandle->type == SORT_SINGLESOURCE_SORT) {
for (int32_t i = 0; i < cmpParam->numOfSources; ++i) { for (int32_t i = 0; i < cmpParam->numOfSources; ++i) {
SExternalMemSource* pSource = cmpParam->pSources[i]; SSortSource* pSource = cmpParam->pSources[i];
SPageInfo* pPgInfo = *(SPageInfo**)taosArrayGet(pSource->pageIdList, pSource->pageIndex); SPageInfo* pPgInfo = *(SPageInfo**)taosArrayGet(pSource->pageIdList, pSource->pageIndex);
void* pPage = getBufPage(pHandle->pBuf, getPageId(pPgInfo)); void* pPage = getBufPage(pHandle->pBuf, getPageId(pPgInfo));
...@@ -238,7 +238,7 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int ...@@ -238,7 +238,7 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int
} }
for (int32_t i = 0; i < cmpParam->numOfSources; ++i) { for (int32_t i = 0; i < cmpParam->numOfSources; ++i) {
SGenericSource* pSource = cmpParam->pSources[i]; SSortSource* pSource = cmpParam->pSources[i];
pSource->src.pBlock = pHandle->fetchfp(pSource->param); pSource->src.pBlock = pHandle->fetchfp(pSource->param);
} }
} }
...@@ -265,7 +265,7 @@ static void appendOneRowToDataBlock(SSDataBlock *pBlock, const SSDataBlock* pSou ...@@ -265,7 +265,7 @@ static void appendOneRowToDataBlock(SSDataBlock *pBlock, const SSDataBlock* pSou
*rowIndex += 1; *rowIndex += 1;
} }
static int32_t adjustMergeTreeForNextTuple(SExternalMemSource *pSource, SMultiwayMergeTreeInfo *pTree, SSortHandle *pHandle, int32_t* numOfCompleted) { static int32_t adjustMergeTreeForNextTuple(SSortSource *pSource, SMultiwayMergeTreeInfo *pTree, SSortHandle *pHandle, int32_t* numOfCompleted) {
/* /*
* load a new SDataBlock into memory of a given intermediate data-set source, * load a new SDataBlock into memory of a given intermediate data-set source,
* since it's last record in buffer has been chosen to be processed, as the winner of loser-tree * since it's last record in buffer has been chosen to be processed, as the winner of loser-tree
...@@ -292,7 +292,7 @@ static int32_t adjustMergeTreeForNextTuple(SExternalMemSource *pSource, SMultiwa ...@@ -292,7 +292,7 @@ static int32_t adjustMergeTreeForNextTuple(SExternalMemSource *pSource, SMultiwa
releaseBufPage(pHandle->pBuf, pPage); releaseBufPage(pHandle->pBuf, pPage);
} }
} else { } else {
pSource->src.pBlock = pHandle->fetchfp(((SGenericSource*)pSource)->param); pSource->src.pBlock = pHandle->fetchfp(((SSortSource*)pSource)->param);
if (pSource->src.pBlock == NULL) { if (pSource->src.pBlock == NULL) {
(*numOfCompleted) += 1; (*numOfCompleted) += 1;
pSource->src.rowIndex = -1; pSource->src.rowIndex = -1;
...@@ -330,7 +330,7 @@ static SSDataBlock* getSortedBlockDataInner(SSortHandle* pHandle, SMsortComparPa ...@@ -330,7 +330,7 @@ static SSDataBlock* getSortedBlockDataInner(SSortHandle* pHandle, SMsortComparPa
int32_t index = tMergeTreeGetChosenIndex(pHandle->pMergeTree); int32_t index = tMergeTreeGetChosenIndex(pHandle->pMergeTree);
SExternalMemSource *pSource = (*cmpParam).pSources[index]; SSortSource *pSource = (*cmpParam).pSources[index];
appendOneRowToDataBlock(pHandle->pDataBlock, pSource->src.pBlock, &pSource->src.rowIndex); appendOneRowToDataBlock(pHandle->pDataBlock, pSource->src.pBlock, &pSource->src.rowIndex);
int32_t code = adjustMergeTreeForNextTuple(pSource, pHandle->pMergeTree, pHandle, &pHandle->numOfCompletedSources); int32_t code = adjustMergeTreeForNextTuple(pSource, pHandle->pMergeTree, pHandle, &pHandle->numOfCompletedSources);
...@@ -355,8 +355,8 @@ int32_t msortComparFn(const void *pLeft, const void *pRight, void *param) { ...@@ -355,8 +355,8 @@ int32_t msortComparFn(const void *pLeft, const void *pRight, void *param) {
SArray *pInfo = pParam->orderInfo; SArray *pInfo = pParam->orderInfo;
SExternalMemSource* pLeftSource = pParam->pSources[pLeftIdx]; SSortSource* pLeftSource = pParam->pSources[pLeftIdx];
SExternalMemSource* pRightSource = pParam->pSources[pRightIdx]; SSortSource* pRightSource = pParam->pSources[pRightIdx];
// this input is exhausted, set the special value to denote this // this input is exhausted, set the special value to denote this
if (pLeftSource->src.rowIndex == -1) { if (pLeftSource->src.rowIndex == -1) {
...@@ -484,6 +484,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { ...@@ -484,6 +484,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
blockDataCleanup(pDataBlock); blockDataCleanup(pDataBlock);
} }
sortComparClearup(&pHandle->cmpParam);
tMergeTreeDestroy(pHandle->pMergeTree); tMergeTreeDestroy(pHandle->pMergeTree);
pHandle->numOfCompletedSources = 0; pHandle->numOfCompletedSources = 0;
...@@ -494,8 +495,6 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { ...@@ -494,8 +495,6 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
} }
} }
sortComparClearup(&pHandle->cmpParam);
taosArrayClear(pHandle->pOrderedSource); taosArrayClear(pHandle->pOrderedSource);
taosArrayAddAll(pHandle->pOrderedSource, pResList); taosArrayAddAll(pHandle->pOrderedSource, pResList);
taosArrayDestroy(pResList); taosArrayDestroy(pResList);
...@@ -523,7 +522,7 @@ static int32_t createInitialSortedMultiSources(SSortHandle* pHandle) { ...@@ -523,7 +522,7 @@ static int32_t createInitialSortedMultiSources(SSortHandle* pHandle) {
size_t sortBufSize = pHandle->numOfPages * pHandle->pageSize; size_t sortBufSize = pHandle->numOfPages * pHandle->pageSize;
if (pHandle->type == SORT_SINGLESOURCE_SORT) { if (pHandle->type == SORT_SINGLESOURCE_SORT) {
SGenericSource* source = taosArrayGetP(pHandle->pOrderedSource, 0); SSortSource* source = taosArrayGetP(pHandle->pOrderedSource, 0);
taosArrayClear(pHandle->pOrderedSource); taosArrayClear(pHandle->pOrderedSource);
while (1) { while (1) {
SSDataBlock* pBlock = pHandle->fetchfp(source->param); SSDataBlock* pBlock = pHandle->fetchfp(source->param);
...@@ -652,7 +651,7 @@ STupleHandle* tsortNextTuple(SSortHandle* pHandle) { ...@@ -652,7 +651,7 @@ STupleHandle* tsortNextTuple(SSortHandle* pHandle) {
} }
int32_t index = tMergeTreeGetChosenIndex(pHandle->pMergeTree); int32_t index = tMergeTreeGetChosenIndex(pHandle->pMergeTree);
SExternalMemSource *pSource = pHandle->cmpParam.pSources[index]; SSortSource *pSource = pHandle->cmpParam.pSources[index];
if (pHandle->needAdjust) { if (pHandle->needAdjust) {
int32_t code = adjustMergeTreeForNextTuple(pSource, pHandle->pMergeTree, pHandle, &pHandle->numOfCompletedSources); int32_t code = adjustMergeTreeForNextTuple(pSource, pHandle->pMergeTree, pHandle, &pHandle->numOfCompletedSources);
......
...@@ -138,12 +138,12 @@ int32_t docomp(const void* p1, const void* p2, void* param) { ...@@ -138,12 +138,12 @@ int32_t docomp(const void* p1, const void* p2, void* param) {
int32_t pRightIdx = *(int32_t *)p2; int32_t pRightIdx = *(int32_t *)p2;
SMsortComparParam *pParam = (SMsortComparParam *)param; SMsortComparParam *pParam = (SMsortComparParam *)param;
SGenericSource** px = reinterpret_cast<SGenericSource**>(pParam->pSources); SSortSource** px = reinterpret_cast<SSortSource**>(pParam->pSources);
SArray *pInfo = pParam->orderInfo; SArray *pInfo = pParam->orderInfo;
SGenericSource* pLeftSource = px[pLeftIdx]; SSortSource* pLeftSource = px[pLeftIdx];
SGenericSource* pRightSource = px[pRightIdx]; SSortSource* pRightSource = px[pRightIdx];
// this input is exhausted, set the special value to denote this // this input is exhausted, set the special value to denote this
if (pLeftSource->src.rowIndex == -1) { if (pLeftSource->src.rowIndex == -1) {
...@@ -218,7 +218,7 @@ TEST(testCase, inMem_sort_Test) { ...@@ -218,7 +218,7 @@ TEST(testCase, inMem_sort_Test) {
pInfo->count = 6; pInfo->count = 6;
pInfo->type = TSDB_DATA_TYPE_USMALLINT; pInfo->type = TSDB_DATA_TYPE_USMALLINT;
SGenericSource* ps = static_cast<SGenericSource*>(taosMemoryCalloc(1, sizeof(SGenericSource))); SSortSource* ps = static_cast<SSortSource*>(taosMemoryCalloc(1, sizeof(SSortSource)));
ps->param = pInfo; ps->param = pInfo;
tsortAddSource(phandle, ps); tsortAddSource(phandle, ps);
...@@ -301,7 +301,7 @@ TEST(testCase, external_mem_sort_Test) { ...@@ -301,7 +301,7 @@ TEST(testCase, external_mem_sort_Test) {
SSortHandle* phandle = tsortCreateSortHandle(orderInfo, SORT_SINGLESOURCE_SORT, 128, 3, NULL, "test_abc"); SSortHandle* phandle = tsortCreateSortHandle(orderInfo, SORT_SINGLESOURCE_SORT, 128, 3, NULL, "test_abc");
tsortSetFetchRawDataFp(phandle, getSingleColDummyBlock); tsortSetFetchRawDataFp(phandle, getSingleColDummyBlock);
SGenericSource* ps = static_cast<SGenericSource*>(taosMemoryCalloc(1, sizeof(SGenericSource))); SSortSource* ps = static_cast<SSortSource*>(taosMemoryCalloc(1, sizeof(SSortSource)));
ps->param = &pInfo[i]; ps->param = &pInfo[i];
tsortAddSource(phandle, ps); tsortAddSource(phandle, ps);
...@@ -369,10 +369,10 @@ TEST(testCase, ordered_merge_sort_Test) { ...@@ -369,10 +369,10 @@ TEST(testCase, ordered_merge_sort_Test) {
tsortSetFetchRawDataFp(phandle, getSingleColDummyBlock); tsortSetFetchRawDataFp(phandle, getSingleColDummyBlock);
tsortSetComparFp(phandle, docomp); tsortSetComparFp(phandle, docomp);
SGenericSource* p[10] = {0}; SSortSource* p[10] = {0};
_info c[10] = {0}; _info c[10] = {0};
for(int32_t i = 0; i < 10; ++i) { for(int32_t i = 0; i < 10; ++i) {
p[i] = static_cast<SGenericSource*>(taosMemoryCalloc(1, sizeof(SGenericSource))); p[i] = static_cast<SSortSource*>(taosMemoryCalloc(1, sizeof(SSortSource)));
c[i].count = 1; c[i].count = 1;
c[i].pageRows = 1000; c[i].pageRows = 1000;
c[i].startVal = i*1000; c[i].startVal = i*1000;
...@@ -396,9 +396,8 @@ TEST(testCase, ordered_merge_sort_Test) { ...@@ -396,9 +396,8 @@ TEST(testCase, ordered_merge_sort_Test) {
ASSERT_EQ(row++, *(int32_t*) v); ASSERT_EQ(row++, *(int32_t*) v);
} }
for(int32_t i = 0; i < 10; ++i) {
taosMemoryFree(p[i]); taosArrayDestroy(orderInfo);
}
tsortDestroySortHandle(phandle); tsortDestroySortHandle(phandle);
blockDataDestroy(pBlock); blockDataDestroy(pBlock);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册