提交 0b4d0122 编写于 作者: H Haojun Liao

[td-11818] Refactor and add error check.

上级 c57e99e4
...@@ -52,8 +52,12 @@ int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf); ...@@ -52,8 +52,12 @@ int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf);
size_t blockDataGetSize(const SSDataBlock* pBlock); size_t blockDataGetSize(const SSDataBlock* pBlock);
size_t blockDataGetRowSize(const SSDataBlock* pBlock); size_t blockDataGetRowSize(const SSDataBlock* pBlock);
int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst); int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst);
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows);
void blockDataClearup(SSDataBlock* pDataBlock, bool hasVarCol);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -22,28 +22,25 @@ extern "C" { ...@@ -22,28 +22,25 @@ extern "C" {
typedef int (*__merge_compare_fn_t)(const void *, const void *, void *param); typedef int (*__merge_compare_fn_t)(const void *, const void *, void *param);
typedef struct SLoserTreeNode { typedef struct SMultiwayMergeTreeInfo {
int32_t index; int32_t numOfSources;
void *pData; // TODO remove it? int32_t totalSources;
} SLoserTreeNode;
typedef struct SLoserTreeInfo {
int32_t numOfEntries;
int32_t totalEntries;
__merge_compare_fn_t comparFn; __merge_compare_fn_t comparFn;
void * param; void * param;
SLoserTreeNode *pNode; struct STreeNode *pNode;
} SLoserTreeInfo; } SMultiwayMergeTreeInfo;
int32_t tLoserTreeCreate(SLoserTreeInfo **pTree, uint32_t numOfEntries, void *param, __merge_compare_fn_t compareFn); int32_t tMergeTreeCreate(SMultiwayMergeTreeInfo **pTree, uint32_t numOfEntries, void *param, __merge_compare_fn_t compareFn);
void tLoserTreeInit(SLoserTreeInfo *pTree); void tMergeTreeAdjust(SMultiwayMergeTreeInfo *pTree, int32_t idx);
void tLoserTreeAdjust(SLoserTreeInfo *pTree, int32_t idx); void tMergeTreeRebuild(SMultiwayMergeTreeInfo *pTree);
void tLoserTreeRebuild(SLoserTreeInfo *pTree); void tMergeTreePrint(const SMultiwayMergeTreeInfo *pTree);
void tLoserTreeDisplay(SLoserTreeInfo *pTree); int32_t tMergeTreeGetChosenIndex(const SMultiwayMergeTreeInfo* pTree);
int32_t tMergeTreeAdjustIndex(const SMultiwayMergeTreeInfo* pTree);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -47,7 +47,7 @@ typedef struct SFilePage { ...@@ -47,7 +47,7 @@ typedef struct SFilePage {
* @param handle * @param handle
* @return * @return
*/ */
int32_t createDiskbasedResultBuffer(SDiskbasedBuf** pResultBuf, int32_t pagesize, int32_t inMemBufSize, uint64_t qId, const char* dir); int32_t createDiskbasedBuffer(SDiskbasedBuf** pResultBuf, int32_t pagesize, int32_t inMemBufSize, uint64_t qId, const char* dir);
/** /**
* *
...@@ -113,14 +113,14 @@ void destroyResultBuf(SDiskbasedBuf* pResultBuf); ...@@ -113,14 +113,14 @@ void destroyResultBuf(SDiskbasedBuf* pResultBuf);
* @param pList * @param pList
* @return * @return
*/ */
struct SPageInfo* getLastPageInfo(SIDList pList); SPageInfo* getLastPageInfo(SIDList pList);
/** /**
* *
* @param pPgInfo * @param pPgInfo
* @return * @return
*/ */
int32_t getPageId(const struct SPageInfo* pPgInfo); int32_t getPageId(const SPageInfo* pPgInfo);
/** /**
* Return the buffer page size. * Return the buffer page size.
......
...@@ -359,17 +359,16 @@ size_t blockDataGetSize(const SSDataBlock* pBlock) { ...@@ -359,17 +359,16 @@ size_t blockDataGetSize(const SSDataBlock* pBlock) {
int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startIndex, int32_t* stopIndex, int32_t pageSize) { int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startIndex, int32_t* stopIndex, int32_t pageSize) {
ASSERT(pBlock != NULL && stopIndex != NULL); ASSERT(pBlock != NULL && stopIndex != NULL);
int32_t size = 0;
int32_t numOfCols = pBlock->info.numOfCols; int32_t numOfCols = pBlock->info.numOfCols;
int32_t numOfRows = pBlock->info.rows; int32_t numOfRows = pBlock->info.rows;
size_t headerSize = sizeof(int32_t); size_t headerSize = sizeof(int32_t);
size_t colHeaderSize = sizeof(int32_t) * numOfCols;
// TODO speedup by checking if the whole page can fit in firstly. // TODO speedup by checking if the whole page can fit in firstly.
if (!hasVarCol) { if (!hasVarCol) {
size_t rowSize = blockDataGetRowSize(pBlock); size_t rowSize = blockDataGetRowSize(pBlock);
int32_t capacity = ((pageSize - headerSize) / (rowSize * 8 + 1)) * 8; int32_t capacity = ((pageSize - headerSize - colHeaderSize) / (rowSize * 8 + 1)) * 8;
*stopIndex = startIndex + capacity; *stopIndex = startIndex + capacity;
if (*stopIndex >= numOfRows) { if (*stopIndex >= numOfRows) {
...@@ -379,7 +378,7 @@ int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startInd ...@@ -379,7 +378,7 @@ int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startInd
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else { } else {
// iterate the rows that can be fit in this buffer page // iterate the rows that can be fit in this buffer page
size += headerSize; int32_t size = (headerSize + colHeaderSize);
for(int32_t j = startIndex; j < numOfRows; ++j) { for(int32_t j = startIndex; j < numOfRows; ++j) {
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
...@@ -423,6 +422,10 @@ SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int3 ...@@ -423,6 +422,10 @@ SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int3
} }
SSDataBlock* pDst = calloc(1, sizeof(SSDataBlock)); SSDataBlock* pDst = calloc(1, sizeof(SSDataBlock));
if (pDst == NULL) {
return NULL;
}
pDst->info = pBlock->info; pDst->info = pBlock->info;
pDst->info.rows = 0; pDst->info.rows = 0;
...@@ -472,7 +475,7 @@ SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int3 ...@@ -472,7 +475,7 @@ SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int3
* @param pBlock * @param pBlock
* @return * @return
*/ */
int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock) { // TODO add the column length!! int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock) {
ASSERT(pBlock != NULL); ASSERT(pBlock != NULL);
// write the number of rows // write the number of rows
...@@ -516,21 +519,9 @@ int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) { ...@@ -516,21 +519,9 @@ int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) {
size_t metaSize = pBlock->info.rows * sizeof(int32_t); size_t metaSize = pBlock->info.rows * sizeof(int32_t);
if (IS_VAR_DATA_TYPE(pCol->info.type)) { if (IS_VAR_DATA_TYPE(pCol->info.type)) {
char* p = realloc(pCol->varmeta.offset, metaSize);
if (p == NULL) {
// TODO handle error
}
pCol->varmeta.offset = (int32_t*)p;
memcpy(pCol->varmeta.offset, pStart, metaSize); memcpy(pCol->varmeta.offset, pStart, metaSize);
pStart += metaSize; pStart += metaSize;
} else { } else {
char* p = realloc(pCol->nullbitmap, BitmapLen(pBlock->info.rows));
if (p == NULL) {
// TODO handle error
}
pCol->nullbitmap = p;
memcpy(pCol->nullbitmap, pStart, BitmapLen(pBlock->info.rows)); memcpy(pCol->nullbitmap, pStart, BitmapLen(pBlock->info.rows));
pStart += BitmapLen(pBlock->info.rows); pStart += BitmapLen(pBlock->info.rows);
} }
...@@ -538,13 +529,26 @@ int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) { ...@@ -538,13 +529,26 @@ int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) {
int32_t colLength = *(int32_t*) pStart; int32_t colLength = *(int32_t*) pStart;
pStart += sizeof(int32_t); pStart += sizeof(int32_t);
if (pCol->pData == NULL) { if (IS_VAR_DATA_TYPE(pCol->info.type)) {
pCol->pData = malloc(pCol->info.bytes * 4096); // TODO refactor the memory mgmt if (pCol->varmeta.allocLen < colLength) {
char* tmp = realloc(pCol->pData, colLength);
if (tmp == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pCol->pData = tmp;
pCol->varmeta.allocLen = colLength;
}
pCol->varmeta.length = colLength;
ASSERT(pCol->varmeta.length <= pCol->varmeta.allocLen);
} }
memcpy(pCol->pData, pStart, colLength); memcpy(pCol->pData, pStart, colLength);
pStart += colLength; pStart += colLength;
} }
return TSDB_CODE_SUCCESS;
} }
size_t blockDataGetRowSize(const SSDataBlock* pBlock) { size_t blockDataGetRowSize(const SSDataBlock* pBlock) {
...@@ -759,3 +763,51 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirs ...@@ -759,3 +763,51 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirs
printf("sort:%ld, create:%ld, assign:%ld, copyback:%ld\n", p1-p0, p2 - p1, p3 - p2, p4-p3); printf("sort:%ld, create:%ld, assign:%ld, copyback:%ld\n", p1-p0, p2 - p1, p3 - p2, p4-p3);
destroyTupleIndex(index); destroyTupleIndex(index);
} }
void blockDataClearup(SSDataBlock* pDataBlock, bool hasVarCol) {
pDataBlock->info.rows = 0;
if (hasVarCol) {
for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) {
SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
if (IS_VAR_DATA_TYPE(p->info.type)) {
p->varmeta.length = 0;
}
}
}
}
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows) {
for(int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) {
SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
if (IS_VAR_DATA_TYPE(p->info.type)) {
char* tmp = realloc(p->varmeta.offset, sizeof(int32_t) * numOfRows);
if (tmp == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
p->varmeta.offset = (int32_t*)tmp;
memset(p->varmeta.offset, 0, sizeof(int32_t) * numOfRows);
p->varmeta.length = 0;
p->varmeta.allocLen = 0;
tfree(p->pData);
} else {
char* tmp = realloc(p->nullbitmap, BitmapLen(numOfRows));
if (tmp == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
p->nullbitmap = tmp;
memset(p->nullbitmap, 0, BitmapLen(numOfRows));
tmp = realloc(p->pData, numOfRows * p->info.bytes);
if (tmp == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
p->pData = tmp;
}
}
}
\ No newline at end of file
...@@ -2171,8 +2171,8 @@ static int32_t createDataBlocksInfo(STsdbReadHandle* pTsdbReadHandle, int32_t nu ...@@ -2171,8 +2171,8 @@ static int32_t createDataBlocksInfo(STsdbReadHandle* pTsdbReadHandle, int32_t nu
assert(cnt <= numOfBlocks && numOfQualTables <= numOfTables); // the pTableQueryInfo[j]->numOfBlocks may be 0 assert(cnt <= numOfBlocks && numOfQualTables <= numOfTables); // the pTableQueryInfo[j]->numOfBlocks may be 0
sup.numOfTables = numOfQualTables; sup.numOfTables = numOfQualTables;
SLoserTreeInfo* pTree = NULL; SMultiwayMergeTreeInfo* pTree = NULL;
uint8_t ret = tLoserTreeCreate(&pTree, sup.numOfTables, &sup, dataBlockOrderCompar); uint8_t ret = tMergeTreeCreate(&pTree, sup.numOfTables, &sup, dataBlockOrderCompar);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
cleanBlockOrderSupporter(&sup, numOfTables); cleanBlockOrderSupporter(&sup, numOfTables);
return TSDB_CODE_TDB_OUT_OF_MEMORY; return TSDB_CODE_TDB_OUT_OF_MEMORY;
...@@ -2181,7 +2181,7 @@ static int32_t createDataBlocksInfo(STsdbReadHandle* pTsdbReadHandle, int32_t nu ...@@ -2181,7 +2181,7 @@ static int32_t createDataBlocksInfo(STsdbReadHandle* pTsdbReadHandle, int32_t nu
int32_t numOfTotal = 0; int32_t numOfTotal = 0;
while (numOfTotal < cnt) { while (numOfTotal < cnt) {
int32_t pos = pTree->pNode[0].index; int32_t pos = tMergeTreeGetChosenIndex(pTree);
int32_t index = sup.blockIndexArray[pos]++; int32_t index = sup.blockIndexArray[pos]++;
STableBlockInfo* pBlocksInfo = sup.pDataBlockInfo[pos]; STableBlockInfo* pBlocksInfo = sup.pDataBlockInfo[pos];
...@@ -2192,7 +2192,7 @@ static int32_t createDataBlocksInfo(STsdbReadHandle* pTsdbReadHandle, int32_t nu ...@@ -2192,7 +2192,7 @@ static int32_t createDataBlocksInfo(STsdbReadHandle* pTsdbReadHandle, int32_t nu
sup.blockIndexArray[pos] = sup.numOfBlocksPerTable[pos] + 1; sup.blockIndexArray[pos] = sup.numOfBlocksPerTable[pos] + 1;
} }
tLoserTreeAdjust(pTree, pos + sup.numOfTables); tMergeTreeAdjust(pTree, tMergeTreeAdjustIndex(pTree));
} }
/* /*
......
...@@ -558,17 +558,18 @@ typedef struct SMultiwayMergeInfo { ...@@ -558,17 +558,18 @@ typedef struct SMultiwayMergeInfo {
} SMultiwayMergeInfo; } SMultiwayMergeInfo;
typedef struct SOrderOperatorInfo { typedef struct SOrderOperatorInfo {
int32_t sourceId; int32_t sourceId;
uint32_t sortBufSize; // max buffer size for in-memory sort uint32_t sortBufSize; // max buffer size for in-memory sort
SArray* orderInfo; // SArray<SBlockOrderInfo> SArray *orderInfo; // SArray<SBlockOrderInfo>
SSDataBlock* pDataBlock; SSDataBlock *pDataBlock;
bool nullFirst; // null value is put in the front bool nullFirst; // null value is put in the front
bool hasVarCol; // has variable length column, such as binary/varchar/nchar bool hasVarCol; // has variable length column, such as binary/varchar/nchar
SDiskbasedBuf* pSortInternalBuf; int32_t numOfSources;
int32_t numOfSources; int32_t numOfCompleted;
int32_t numOfCompleted; SDiskbasedBuf *pSortInternalBuf;
SLoserTreeInfo *pMergeTree; SMultiwayMergeTreeInfo *pMergeTree;
SArray *pSources; // SArray<SExternalMemSource*> SArray *pSources; // SArray<SExternalMemSource*>
int32_t capacity;
} SOrderOperatorInfo; } SOrderOperatorInfo;
SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* pSchema, SExecTaskInfo* pTaskInfo); SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* pSchema, SExecTaskInfo* pTaskInfo);
......
...@@ -533,7 +533,7 @@ static UNUSED_FUNC int32_t mergeIntoGroupResultImpl(STaskRuntimeEnv *pRuntimeEnv ...@@ -533,7 +533,7 @@ static UNUSED_FUNC int32_t mergeIntoGroupResultImpl(STaskRuntimeEnv *pRuntimeEnv
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
int32_t *posList = NULL; int32_t *posList = NULL;
SLoserTreeInfo *pTree = NULL; SMultiwayMergeTreeInfo *pTree = NULL;
STableQueryInfo **pTableQueryInfoList = NULL; STableQueryInfo **pTableQueryInfoList = NULL;
size_t size = taosArrayGetSize(pTableList); size_t size = taosArrayGetSize(pTableList);
...@@ -566,7 +566,7 @@ static UNUSED_FUNC int32_t mergeIntoGroupResultImpl(STaskRuntimeEnv *pRuntimeEnv ...@@ -566,7 +566,7 @@ static UNUSED_FUNC int32_t mergeIntoGroupResultImpl(STaskRuntimeEnv *pRuntimeEnv
SCompSupporter cs = {pTableQueryInfoList, posList, pRuntimeEnv->pQueryAttr->order.order}; SCompSupporter cs = {pTableQueryInfoList, posList, pRuntimeEnv->pQueryAttr->order.order};
int32_t ret = tLoserTreeCreate(&pTree, numOfTables, &cs, tableResultComparFn); int32_t ret = tMergeTreeCreate(&pTree, numOfTables, &cs, tableResultComparFn);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
code = TSDB_CODE_QRY_OUT_OF_MEMORY; code = TSDB_CODE_QRY_OUT_OF_MEMORY;
goto _end; goto _end;
...@@ -576,7 +576,7 @@ static UNUSED_FUNC int32_t mergeIntoGroupResultImpl(STaskRuntimeEnv *pRuntimeEnv ...@@ -576,7 +576,7 @@ static UNUSED_FUNC int32_t mergeIntoGroupResultImpl(STaskRuntimeEnv *pRuntimeEnv
int64_t startt = taosGetTimestampMs(); int64_t startt = taosGetTimestampMs();
while (1) { while (1) {
int32_t tableIndex = pTree->pNode[0].index; int32_t tableIndex = tMergeTreeGetChosenIndex(pTree);
SResultRowInfo *pWindowResInfo = &pTableQueryInfoList[tableIndex]->resInfo; SResultRowInfo *pWindowResInfo = &pTableQueryInfoList[tableIndex]->resInfo;
SResultRow *pWindowRes = getResultRow(pWindowResInfo, cs.rowIndex[tableIndex]); SResultRow *pWindowRes = getResultRow(pWindowResInfo, cs.rowIndex[tableIndex]);
...@@ -612,7 +612,7 @@ static UNUSED_FUNC int32_t mergeIntoGroupResultImpl(STaskRuntimeEnv *pRuntimeEnv ...@@ -612,7 +612,7 @@ static UNUSED_FUNC int32_t mergeIntoGroupResultImpl(STaskRuntimeEnv *pRuntimeEnv
} }
} }
tLoserTreeAdjust(pTree, tableIndex + pTree->numOfEntries); tMergeTreeAdjust(pTree, tMergeTreeAdjustIndex(pTree));
} }
int64_t endt = taosGetTimestampMs(); int64_t endt = taosGetTimestampMs();
......
...@@ -49,7 +49,7 @@ typedef struct SHistogramInfo { ...@@ -49,7 +49,7 @@ typedef struct SHistogramInfo {
SHistBin* elems; SHistBin* elems;
#else #else
tSkipList* pList; tSkipList* pList;
SLoserTreeInfo* pLoserTree; SMultiwayMergeTreeInfo* pLoserTree;
int32_t maxIndex; int32_t maxIndex;
bool ordered; bool ordered;
#endif #endif
......
...@@ -117,14 +117,14 @@ int32_t tHistogramAdd(SHistogramInfo** pHisto, double val) { ...@@ -117,14 +117,14 @@ int32_t tHistogramAdd(SHistogramInfo** pHisto, double val) {
if ((*pHisto)->ordered) { if ((*pHisto)->ordered) {
int32_t lastIndex = (*pHisto)->maxIndex; int32_t lastIndex = (*pHisto)->maxIndex;
SLoserTreeInfo* pTree = (*pHisto)->pLoserTree; SMultiwayMergeTreeInfo* pTree = (*pHisto)->pLoserTree;
(*pHisto)->pLoserTree->pNode[lastIndex + pTree->numOfEntries].pData = pResNode; (*pHisto)->pLoserTree->pNode[lastIndex + pTree->numOfEntries].pData = pResNode;
pEntry1->index = (*pHisto)->pLoserTree->pNode[lastIndex + pTree->numOfEntries].index; pEntry1->index = (*pHisto)->pLoserTree->pNode[lastIndex + pTree->numOfEntries].index;
// update the loser tree // update the loser tree
if ((*pHisto)->ordered) { if ((*pHisto)->ordered) {
tLoserTreeAdjust(pTree, pEntry1->index + pTree->numOfEntries); tMergeTreeAdjust(pTree, pEntry1->index + pTree->numOfEntries);
} }
tSkipListKey kx = tSkipListKey kx =
...@@ -142,10 +142,10 @@ int32_t tHistogramAdd(SHistogramInfo** pHisto, double val) { ...@@ -142,10 +142,10 @@ int32_t tHistogramAdd(SHistogramInfo** pHisto, double val) {
SHistBin* pPrevEntry = (SHistBin*)pResNode->pBackward[0]->pData; SHistBin* pPrevEntry = (SHistBin*)pResNode->pBackward[0]->pData;
pPrevEntry->delta = val - pPrevEntry->val; pPrevEntry->delta = val - pPrevEntry->val;
SLoserTreeInfo* pTree = (*pHisto)->pLoserTree; SMultiwayMergeTreeInfo* pTree = (*pHisto)->pLoserTree;
if ((*pHisto)->ordered) { if ((*pHisto)->ordered) {
tLoserTreeAdjust(pTree, pPrevEntry->index + pTree->numOfEntries); tMergeTreeAdjust(pTree, pPrevEntry->index + pTree->numOfEntries);
tLoserTreeDisplay(pTree); tMergeTreePrint(pTree);
} }
} }
...@@ -155,7 +155,7 @@ int32_t tHistogramAdd(SHistogramInfo** pHisto, double val) { ...@@ -155,7 +155,7 @@ int32_t tHistogramAdd(SHistogramInfo** pHisto, double val) {
if (!(*pHisto)->ordered) { if (!(*pHisto)->ordered) {
SSkipListPrint((*pHisto)->pList, 1); SSkipListPrint((*pHisto)->pList, 1);
SLoserTreeInfo* pTree = (*pHisto)->pLoserTree; SMultiwayMergeTreeInfo* pTree = (*pHisto)->pLoserTree;
tSkipListNode* pHead = (*pHisto)->pList->pHead.pForward[0]; tSkipListNode* pHead = (*pHisto)->pList->pHead.pForward[0];
tSkipListNode* p1 = pHead; tSkipListNode* p1 = pHead;
...@@ -183,13 +183,13 @@ int32_t tHistogramAdd(SHistogramInfo** pHisto, double val) { ...@@ -183,13 +183,13 @@ int32_t tHistogramAdd(SHistogramInfo** pHisto, double val) {
pTree->pNode[i].index = -1; pTree->pNode[i].index = -1;
} }
tLoserTreeDisplay(pTree); tMergeTreePrint(pTree);
for (int32_t i = pTree->totalEntries - 1; i >= pTree->numOfEntries; i--) { for (int32_t i = pTree->totalEntries - 1; i >= pTree->numOfEntries; i--) {
tLoserTreeAdjust(pTree, i); tMergeTreeAdjust(pTree, i);
} }
tLoserTreeDisplay(pTree); tMergeTreePrint(pTree);
(*pHisto)->ordered = true; (*pHisto)->ordered = true;
} }
...@@ -219,7 +219,7 @@ int32_t tHistogramAdd(SHistogramInfo** pHisto, double val) { ...@@ -219,7 +219,7 @@ int32_t tHistogramAdd(SHistogramInfo** pHisto, double val) {
pPrevEntry->delta = pEntry->val - pPrevEntry->val; pPrevEntry->delta = pEntry->val - pPrevEntry->val;
} }
SLoserTreeInfo* pTree = (*pHisto)->pLoserTree; SMultiwayMergeTreeInfo* pTree = (*pHisto)->pLoserTree;
if (pNextEntry->index != -1) { if (pNextEntry->index != -1) {
(*pHisto)->maxIndex = pNextEntry->index; (*pHisto)->maxIndex = pNextEntry->index;
...@@ -230,12 +230,12 @@ int32_t tHistogramAdd(SHistogramInfo** pHisto, double val) { ...@@ -230,12 +230,12 @@ int32_t tHistogramAdd(SHistogramInfo** pHisto, double val) {
printf("disappear index is:%d\n", f); printf("disappear index is:%d\n", f);
} }
tLoserTreeAdjust(pTree, pEntry->index + pTree->numOfEntries); tMergeTreeAdjust(pTree, pEntry->index + pTree->numOfEntries);
// remove the next node in skiplist // remove the next node in skiplist
tSkipListRemoveNode((*pHisto)->pList, pNext); tSkipListRemoveNode((*pHisto)->pList, pNext);
SSkipListPrint((*pHisto)->pList, 1); SSkipListPrint((*pHisto)->pList, 1);
tLoserTreeDisplay((*pHisto)->pLoserTree); tMergeTreePrint((*pHisto)->pLoserTree);
} else { // add to heap } else { // add to heap
if (pResNode->pForward[0] != NULL) { if (pResNode->pForward[0] != NULL) {
pEntry1->delta = ((SHistBin*)pResNode->pForward[0]->pData)->val - val; pEntry1->delta = ((SHistBin*)pResNode->pForward[0]->pData)->val - val;
......
...@@ -254,7 +254,7 @@ tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType, double minval, ...@@ -254,7 +254,7 @@ tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType, double minval,
resetSlotInfo(pBucket); resetSlotInfo(pBucket);
int32_t ret = createDiskbasedResultBuffer(&pBucket->pBuffer, pBucket->bufPageSize, pBucket->bufPageSize * 512, 1, tsTempDir); int32_t ret = createDiskbasedBuffer(&pBucket->pBuffer, pBucket->bufPageSize, pBucket->bufPageSize * 512, 1, tsTempDir);
if (ret != 0) { if (ret != 0) {
tMemBucketDestroy(pBucket); tMemBucketDestroy(pBucket);
return NULL; return NULL;
......
...@@ -17,79 +17,76 @@ ...@@ -17,79 +17,76 @@
#include "tlosertree.h" #include "tlosertree.h"
#include "ulog.h" #include "ulog.h"
// set initial value for loser tree typedef struct STreeNode {
void tLoserTreeInit(SLoserTreeInfo* pTree) { int32_t index;
assert((pTree->totalEntries & 0x01) == 0 && (pTree->numOfEntries << 1 == pTree->totalEntries)); void *pData; // TODO remove it?
} STreeNode;
for (int32_t i = 0; i < pTree->totalEntries; ++i) { // Set the initial value of the multiway merge tree.
if (i < pTree->numOfEntries) { static void tMergeTreeInit(SMultiwayMergeTreeInfo* pTree) {
assert((pTree->totalSources & 0x01) == 0 && (pTree->numOfSources << 1 == pTree->totalSources));
for (int32_t i = 0; i < pTree->totalSources; ++i) {
if (i < pTree->numOfSources) {
pTree->pNode[i].index = -1; pTree->pNode[i].index = -1;
} else { } else {
pTree->pNode[i].index = i - pTree->numOfEntries; pTree->pNode[i].index = i - pTree->numOfSources;
} }
} }
} }
/* int32_t tMergeTreeCreate(SMultiwayMergeTreeInfo** pTree, uint32_t numOfSources, void* param, __merge_compare_fn_t compareFn) {
* display whole loser tree on screen for debug purpose only.
*/
void tLoserTreeDisplay(SLoserTreeInfo* pTree) {
printf("the value of loser tree:\t");
for (int32_t i = 0; i < pTree->totalEntries; ++i) printf("%d\t", pTree->pNode[i].index);
printf("\n");
}
int32_t tLoserTreeCreate(SLoserTreeInfo** pTree, uint32_t numOfSources, void* param, __merge_compare_fn_t compareFn) {
int32_t totalEntries = numOfSources << 1u; int32_t totalEntries = numOfSources << 1u;
*pTree = (SLoserTreeInfo*)calloc(1, sizeof(SLoserTreeInfo) + sizeof(SLoserTreeNode) * totalEntries); SMultiwayMergeTreeInfo* pTreeInfo = (SMultiwayMergeTreeInfo*)calloc(1, sizeof(SMultiwayMergeTreeInfo) + sizeof(STreeNode) * totalEntries);
if ((*pTree) == NULL) { if ((*pTree) == NULL) {
uError("allocate memory for loser-tree failed. reason:%s", strerror(errno)); uError("allocate memory for loser-tree failed. reason:%s", strerror(errno));
return -1; return -1;
} }
(*pTree)->pNode = (SLoserTreeNode*)(((char*)(*pTree)) + sizeof(SLoserTreeInfo)); pTreeInfo->pNode = (STreeNode*)(((char*)(*pTree)) + sizeof(SMultiwayMergeTreeInfo));
(*pTree)->numOfEntries = numOfSources; pTreeInfo->numOfSources = numOfSources;
(*pTree)->totalEntries = totalEntries; pTreeInfo->totalSources = totalEntries;
(*pTree)->param = param; pTreeInfo->param = param;
(*pTree)->comparFn = compareFn; pTreeInfo->comparFn = compareFn;
// set initial value for loser tree // set initial value for loser tree
tLoserTreeInit(*pTree); tMergeTreeInit(pTreeInfo);
#ifdef _DEBUG_VIEW #ifdef _DEBUG_VIEW
printf("the initial value of loser tree:\n"); printf("the initial value of loser tree:\n");
tLoserTreeDisplay(*pTree); tLoserTreeDisplaypTreeInfo;
#endif #endif
for (int32_t i = totalEntries - 1; i >= numOfSources; i--) { for (int32_t i = totalEntries - 1; i >= numOfSources; i--) {
tLoserTreeAdjust(*pTree, i); tMergeTreeAdjust(pTreeInfo, i);
} }
#if defined(_DEBUG_VIEW) #if defined(_DEBUG_VIEW)
printf("after adjust:\n"); printf("after adjust:\n");
tLoserTreeDisplay(*pTree); tLoserTreeDisplaypTreeInfo;
printf("initialize local reducer completed!\n"); printf("initialize local reducer completed!\n");
#endif #endif
*pTree = pTreeInfo;
return 0; return 0;
} }
void tLoserTreeAdjust(SLoserTreeInfo* pTree, int32_t idx) { void tMergeTreeAdjust(SMultiwayMergeTreeInfo* pTree, int32_t idx) {
assert(idx <= pTree->totalEntries - 1 && idx >= pTree->numOfEntries && pTree->totalEntries >= 2); assert(idx <= pTree->totalSources - 1 && idx >= pTree->numOfSources && pTree->totalSources >= 2);
if (pTree->totalEntries == 2) { if (pTree->totalSources == 2) {
pTree->pNode[0].index = 0; pTree->pNode[0].index = 0;
pTree->pNode[1].index = 0; pTree->pNode[1].index = 0;
return; return;
} }
int32_t parentId = idx >> 1; int32_t parentId = idx >> 1;
SLoserTreeNode kLeaf = pTree->pNode[idx]; STreeNode kLeaf = pTree->pNode[idx];
while (parentId > 0) { while (parentId > 0) {
SLoserTreeNode* pCur = &pTree->pNode[parentId]; STreeNode* pCur = &pTree->pNode[parentId];
if (pCur->index == -1) { if (pCur->index == -1) {
pTree->pNode[parentId] = kLeaf; pTree->pNode[parentId] = kLeaf;
return; return;
...@@ -97,7 +94,7 @@ void tLoserTreeAdjust(SLoserTreeInfo* pTree, int32_t idx) { ...@@ -97,7 +94,7 @@ void tLoserTreeAdjust(SLoserTreeInfo* pTree, int32_t idx) {
int32_t ret = pTree->comparFn(pCur, &kLeaf, pTree->param); int32_t ret = pTree->comparFn(pCur, &kLeaf, pTree->param);
if (ret < 0) { if (ret < 0) {
SLoserTreeNode t = pTree->pNode[parentId]; STreeNode t = pTree->pNode[parentId];
pTree->pNode[parentId] = kLeaf; pTree->pNode[parentId] = kLeaf;
kLeaf = t; kLeaf = t;
} }
...@@ -111,11 +108,31 @@ void tLoserTreeAdjust(SLoserTreeInfo* pTree, int32_t idx) { ...@@ -111,11 +108,31 @@ void tLoserTreeAdjust(SLoserTreeInfo* pTree, int32_t idx) {
} }
} }
void tLoserTreeRebuild(SLoserTreeInfo* pTree) { void tMergeTreeRebuild(SMultiwayMergeTreeInfo* pTree) {
assert((pTree->totalEntries & 0x1) == 0); assert((pTree->totalSources & 0x1) == 0);
tLoserTreeInit(pTree); tMergeTreeInit(pTree);
for (int32_t i = pTree->totalEntries - 1; i >= pTree->numOfEntries; i--) { for (int32_t i = pTree->totalSources - 1; i >= pTree->numOfSources; i--) {
tLoserTreeAdjust(pTree, i); tMergeTreeAdjust(pTree, i);
} }
} }
/*
* display whole loser tree on screen for debug purpose only.
*/
void tMergeTreePrint(const SMultiwayMergeTreeInfo* pTree) {
printf("the value of loser tree:\t");
for (int32_t i = 0; i < pTree->totalSources; ++i) {
printf("%d\t", pTree->pNode[i].index);
}
printf("\n");
}
int32_t tMergeTreeGetChosenIndex(const SMultiwayMergeTreeInfo* pTree) {
return pTree->pNode[0].index;
}
int32_t tMergeTreeAdjustIndex(const SMultiwayMergeTreeInfo* pTree) {
return tMergeTreeGetChosenIndex(pTree) + pTree->numOfSources;
}
...@@ -55,7 +55,7 @@ typedef struct SDiskbasedBuf { ...@@ -55,7 +55,7 @@ typedef struct SDiskbasedBuf {
SDiskbasedBufStatis statis; SDiskbasedBufStatis statis;
} SDiskbasedBuf; } SDiskbasedBuf;
int32_t createDiskbasedResultBuffer(SDiskbasedBuf** pResultBuf, int32_t pagesize, int32_t inMemBufSize, uint64_t qId, const char* dir) { int32_t createDiskbasedBuffer(SDiskbasedBuf** pResultBuf, int32_t pagesize, int32_t inMemBufSize, uint64_t qId, const char* dir) {
*pResultBuf = calloc(1, sizeof(SDiskbasedBuf)); *pResultBuf = calloc(1, sizeof(SDiskbasedBuf));
SDiskbasedBuf* pResBuf = *pResultBuf; SDiskbasedBuf* pResBuf = *pResultBuf;
...@@ -492,13 +492,13 @@ void destroyResultBuf(SDiskbasedBuf* pResultBuf) { ...@@ -492,13 +492,13 @@ void destroyResultBuf(SDiskbasedBuf* pResultBuf) {
tfree(pResultBuf); tfree(pResultBuf);
} }
struct SPageInfo* getLastPageInfo(SIDList pList) { SPageInfo* getLastPageInfo(SIDList pList) {
size_t size = taosArrayGetSize(pList); size_t size = taosArrayGetSize(pList);
SPageInfo* pPgInfo = taosArrayGetP(pList, size - 1); SPageInfo* pPgInfo = taosArrayGetP(pList, size - 1);
return pPgInfo; return pPgInfo;
} }
int32_t getPageId(const struct SPageInfo* pPgInfo) { int32_t getPageId(const SPageInfo* pPgInfo) {
ASSERT(pPgInfo != NULL); ASSERT(pPgInfo != NULL);
return pPgInfo->pageId; return pPgInfo->pageId;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册