提交 655dbb47 编写于 作者: H Haojun Liao

refactor(query): do some internal refactor.

上级 60d9acb7
...@@ -141,7 +141,6 @@ typedef struct SqlFunctionCtx { ...@@ -141,7 +141,6 @@ typedef struct SqlFunctionCtx {
struct SSDataBlock *pSrcBlock; struct SSDataBlock *pSrcBlock;
struct SSDataBlock *pDstBlock; // used by indefinite rows function to set selectivity struct SSDataBlock *pDstBlock; // used by indefinite rows function to set selectivity
int32_t curBufPage; int32_t curBufPage;
bool increase;
bool isStream; bool isStream;
char udfName[TSDB_FUNC_NAME_LEN]; char udfName[TSDB_FUNC_NAME_LEN];
......
...@@ -67,10 +67,9 @@ void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t groupId, int32_t* pageId); ...@@ -67,10 +67,9 @@ void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t groupId, int32_t* pageId);
/** /**
* *
* @param pBuf * @param pBuf
* @param groupId
* @return * @return
*/ */
SIDList getDataBufPagesIdList(SDiskbasedBuf* pBuf, int32_t groupId); SIDList getDataBufPagesIdList(SDiskbasedBuf* pBuf);
/** /**
* get the specified buffer page by id * get the specified buffer page by id
...@@ -101,13 +100,6 @@ void releaseBufPageInfo(SDiskbasedBuf* pBuf, struct SPageInfo* pi); ...@@ -101,13 +100,6 @@ void releaseBufPageInfo(SDiskbasedBuf* pBuf, struct SPageInfo* pi);
*/ */
size_t getTotalBufSize(const SDiskbasedBuf* pBuf); size_t getTotalBufSize(const SDiskbasedBuf* pBuf);
/**
* get the number of groups in the result buffer
* @param pBuf
* @return
*/
size_t getNumOfBufGroupId(const SDiskbasedBuf* pBuf);
/** /**
* destroy result buffer * destroy result buffer
* @param pBuf * @param pBuf
......
...@@ -888,6 +888,7 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte ...@@ -888,6 +888,7 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte
pBlockIter->numOfBlocks = numOfBlocks; pBlockIter->numOfBlocks = numOfBlocks;
taosArrayClear(pBlockIter->blockList); taosArrayClear(pBlockIter->blockList);
pBlockIter->pTableMap = pReader->status.pTableMap;
// access data blocks according to the offset of each block in asc/desc order. // access data blocks according to the offset of each block in asc/desc order.
int32_t numOfTables = (int32_t)taosHashGetSize(pReader->status.pTableMap); int32_t numOfTables = (int32_t)taosHashGetSize(pReader->status.pTableMap);
......
...@@ -1216,7 +1216,6 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, ...@@ -1216,7 +1216,6 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput,
pCtx->start.key = INT64_MIN; pCtx->start.key = INT64_MIN;
pCtx->end.key = INT64_MIN; pCtx->end.key = INT64_MIN;
pCtx->numOfParams = pExpr->base.numOfParams; pCtx->numOfParams = pExpr->base.numOfParams;
pCtx->increase = false;
pCtx->isStream = false; pCtx->isStream = false;
pCtx->param = pFunct->pParam; pCtx->param = pFunct->pParam;
......
...@@ -184,7 +184,7 @@ SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int ...@@ -184,7 +184,7 @@ SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int
// in the first scan, new space needed for results // in the first scan, new space needed for results
int32_t pageId = -1; int32_t pageId = -1;
SIDList list = getDataBufPagesIdList(pResultBuf, tableGroupId); SIDList list = getDataBufPagesIdList(pResultBuf);
if (taosArrayGetSize(list) == 0) { if (taosArrayGetSize(list) == 0) {
pData = getNewBufPage(pResultBuf, tableGroupId, &pageId); pData = getNewBufPage(pResultBuf, tableGroupId, &pageId);
...@@ -299,7 +299,7 @@ static int32_t addNewWindowResultBuf(SResultRow* pWindowRes, SDiskbasedBuf* pRes ...@@ -299,7 +299,7 @@ static int32_t addNewWindowResultBuf(SResultRow* pWindowRes, SDiskbasedBuf* pRes
// in the first scan, new space needed for results // in the first scan, new space needed for results
int32_t pageId = -1; int32_t pageId = -1;
SIDList list = getDataBufPagesIdList(pResultBuf, tid); SIDList list = getDataBufPagesIdList(pResultBuf);
if (taosArrayGetSize(list) == 0) { if (taosArrayGetSize(list) == 0) {
pData = getNewBufPage(pResultBuf, tid, &pageId); pData = getNewBufPage(pResultBuf, tid, &pageId);
...@@ -1565,16 +1565,8 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprS ...@@ -1565,16 +1565,8 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprS
// the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows. // the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId); SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
char* in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo); char* in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
if (pCtx[j].increase) { for (int32_t k = 0; k < pRow->numOfRows; ++k) {
int64_t ts = *(int64_t*)in; colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
for (int32_t k = 0; k < pRow->numOfRows; ++k) {
colDataAppend(pColInfoData, pBlock->info.rows + k, (const char*)&ts, pCtx[j].resultInfo->isNullRes);
ts++;
}
} else {
for (int32_t k = 0; k < pRow->numOfRows; ++k) {
colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
}
} }
} }
} }
......
...@@ -637,6 +637,7 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num ...@@ -637,6 +637,7 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
setNotInterpoWindowKey(pSup->pCtx, numOfExprs, RESULT_ROW_START_INTERP); setNotInterpoWindowKey(pSup->pCtx, numOfExprs, RESULT_ROW_START_INTERP);
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &w, true);
doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, 0, pBlock->info.rows, doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, 0, pBlock->info.rows,
numOfExprs); numOfExprs);
...@@ -1808,7 +1809,7 @@ static bool timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SInt ...@@ -1808,7 +1809,7 @@ static bool timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SInt
void increaseTs(SqlFunctionCtx* pCtx) { void increaseTs(SqlFunctionCtx* pCtx) {
if (pCtx[0].pExpr->pExpr->_function.pFunctNode->funcType == FUNCTION_TYPE_WSTART) { if (pCtx[0].pExpr->pExpr->_function.pFunctNode->funcType == FUNCTION_TYPE_WSTART) {
pCtx[0].increase = true; // pCtx[0].increase = true;
} }
} }
......
...@@ -97,7 +97,7 @@ SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t page ...@@ -97,7 +97,7 @@ SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t page
return pSortHandle; return pSortHandle;
} }
static int32_t sortComparClearup(SMsortComparParam* cmpParam) { static int32_t sortComparCleanup(SMsortComparParam* cmpParam) {
for(int32_t i = 0; i < cmpParam->numOfSources; ++i) { for(int32_t i = 0; i < cmpParam->numOfSources; ++i) {
SSortSource* pSource = cmpParam->pSources[i]; // NOTICE: pSource may be SGenericSource *, if it is SORT_MULTISOURCE_MERGE SSortSource* pSource = cmpParam->pSources[i]; // NOTICE: pSource may be SGenericSource *, if it is SORT_MULTISOURCE_MERGE
blockDataDestroy(pSource->src.pBlock); blockDataDestroy(pSource->src.pBlock);
...@@ -134,15 +134,14 @@ int32_t tsortAddSource(SSortHandle* pSortHandle, void* pSource) { ...@@ -134,15 +134,14 @@ int32_t tsortAddSource(SSortHandle* pSortHandle, void* pSource) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t doAddNewExternalMemSource(SDiskbasedBuf *pBuf, SArray* pAllSources, SSDataBlock* pBlock, int32_t* sourceId) { static int32_t doAddNewExternalMemSource(SDiskbasedBuf *pBuf, SArray* pAllSources, SSDataBlock* pBlock, int32_t* sourceId, SArray* pPageIdList) {
SSortSource* pSource = taosMemoryCalloc(1, sizeof(SSortSource)); SSortSource* pSource = taosMemoryCalloc(1, sizeof(SSortSource));
if (pSource == NULL) { if (pSource == NULL) {
return TSDB_CODE_QRY_OUT_OF_MEMORY; return TSDB_CODE_QRY_OUT_OF_MEMORY;
} }
pSource->pageIdList = getDataBufPagesIdList(pBuf, (*sourceId));
pSource->src.pBlock = pBlock; pSource->src.pBlock = pBlock;
pSource->pageIdList = pPageIdList;
taosArrayPush(pAllSources, &pSource); taosArrayPush(pAllSources, &pSource);
(*sourceId) += 1; (*sourceId) += 1;
...@@ -171,6 +170,7 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) { ...@@ -171,6 +170,7 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
} }
} }
SArray* pPageIdList = taosArrayInit(4, sizeof(int32_t));
while(start < pDataBlock->info.rows) { while(start < pDataBlock->info.rows) {
int32_t stop = 0; int32_t stop = 0;
blockDataSplitRows(pDataBlock, pDataBlock->info.hasVarCol, start, &stop, pHandle->pageSize); blockDataSplitRows(pDataBlock, pDataBlock->info.hasVarCol, start, &stop, pHandle->pageSize);
...@@ -186,6 +186,8 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) { ...@@ -186,6 +186,8 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
return terrno; return terrno;
} }
taosArrayPush(pPageIdList, &pageId);
int32_t size = blockDataGetSize(p) + sizeof(int32_t) + taosArrayGetSize(p->pDataBlock) * sizeof(int32_t); int32_t size = blockDataGetSize(p) + sizeof(int32_t) + taosArrayGetSize(p->pDataBlock) * sizeof(int32_t);
assert(size <= getBufPageSize(pHandle->pBuf)); assert(size <= getBufPageSize(pHandle->pBuf));
...@@ -201,7 +203,7 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) { ...@@ -201,7 +203,7 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
blockDataCleanup(pDataBlock); blockDataCleanup(pDataBlock);
SSDataBlock* pBlock = createOneDataBlock(pDataBlock, false); SSDataBlock* pBlock = createOneDataBlock(pDataBlock, false);
return doAddNewExternalMemSource(pHandle->pBuf, pHandle->pOrderedSource, pBlock, &pHandle->sourceId); return doAddNewExternalMemSource(pHandle->pBuf, pHandle->pOrderedSource, pBlock, &pHandle->sourceId, pPageIdList);
} }
static void setCurrentSourceIsDone(SSortSource* pSource, SSortHandle* pHandle) { static void setCurrentSourceIsDone(SSortSource* pSource, SSortHandle* pHandle) {
...@@ -502,6 +504,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { ...@@ -502,6 +504,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
return code; return code;
} }
SArray* pPageIdList = taosArrayInit(4, sizeof(int32_t));
while (1) { while (1) {
SSDataBlock* pDataBlock = getSortedBlockDataInner(pHandle, &pHandle->cmpParam, numOfRows); SSDataBlock* pDataBlock = getSortedBlockDataInner(pHandle, &pHandle->cmpParam, numOfRows);
if (pDataBlock == NULL) { if (pDataBlock == NULL) {
...@@ -514,6 +517,8 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { ...@@ -514,6 +517,8 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
return terrno; return terrno;
} }
taosArrayPush(pPageIdList, &pageId);
int32_t size = blockDataGetSize(pDataBlock) + sizeof(int32_t) + taosArrayGetSize(pDataBlock->pDataBlock) * sizeof(int32_t); int32_t size = blockDataGetSize(pDataBlock) + sizeof(int32_t) + taosArrayGetSize(pDataBlock->pDataBlock) * sizeof(int32_t);
assert(size <= getBufPageSize(pHandle->pBuf)); assert(size <= getBufPageSize(pHandle->pBuf));
...@@ -525,12 +530,12 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { ...@@ -525,12 +530,12 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
blockDataCleanup(pDataBlock); blockDataCleanup(pDataBlock);
} }
sortComparClearup(&pHandle->cmpParam); sortComparCleanup(&pHandle->cmpParam);
tMergeTreeDestroy(pHandle->pMergeTree); tMergeTreeDestroy(pHandle->pMergeTree);
pHandle->numOfCompletedSources = 0; pHandle->numOfCompletedSources = 0;
SSDataBlock* pBlock = createOneDataBlock(pHandle->pDataBlock, false); SSDataBlock* pBlock = createOneDataBlock(pHandle->pDataBlock, false);
code = doAddNewExternalMemSource(pHandle->pBuf, pResList, pBlock, &pHandle->sourceId); code = doAddNewExternalMemSource(pHandle->pBuf, pResList, pBlock, &pHandle->sourceId, pPageIdList);
if (code != 0) { if (code != 0) {
return code; return code;
} }
......
...@@ -51,20 +51,20 @@ struct tMemBucket; ...@@ -51,20 +51,20 @@ struct tMemBucket;
typedef int32_t (*__perc_hash_func_t)(struct tMemBucket *pBucket, const void *value); typedef int32_t (*__perc_hash_func_t)(struct tMemBucket *pBucket, const void *value);
typedef struct tMemBucket { typedef struct tMemBucket {
int16_t numOfSlots; int16_t numOfSlots;
int16_t type; int16_t type;
int16_t bytes; int16_t bytes;
int32_t total; int32_t total;
int32_t elemPerPage; // number of elements for each object int32_t elemPerPage; // number of elements for each object
int32_t maxCapacity; // maximum allowed number of elements that can be sort directly to get the result int32_t maxCapacity; // maximum allowed number of elements that can be sort directly to get the result
int32_t bufPageSize; // disk page size int32_t bufPageSize; // disk page size
MinMaxEntry range; // value range MinMaxEntry range; // value range
int32_t times; // count that has been checked for deciding the correct data value buckets. int32_t times; // count that has been checked for deciding the correct data value buckets.
__compar_fn_t comparFn; __compar_fn_t comparFn;
tMemBucketSlot* pSlots;
tMemBucketSlot * pSlots; SDiskbasedBuf* pBuffer;
SDiskbasedBuf *pBuffer; __perc_hash_func_t hashFunc;
__perc_hash_func_t hashFunc; SHashObj* groupPagesMap; // disk page map for different groups;
} tMemBucket; } tMemBucket;
tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType, double minval, double maxval); tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType, double minval, double maxval);
......
...@@ -33,7 +33,7 @@ static SFilePage *loadDataFromFilePage(tMemBucket *pMemBucket, int32_t slotIdx) ...@@ -33,7 +33,7 @@ static SFilePage *loadDataFromFilePage(tMemBucket *pMemBucket, int32_t slotIdx)
SFilePage *buffer = (SFilePage *)taosMemoryCalloc(1, pMemBucket->bytes * pMemBucket->pSlots[slotIdx].info.size + sizeof(SFilePage)); SFilePage *buffer = (SFilePage *)taosMemoryCalloc(1, pMemBucket->bytes * pMemBucket->pSlots[slotIdx].info.size + sizeof(SFilePage));
int32_t groupId = getGroupId(pMemBucket->numOfSlots, slotIdx, pMemBucket->times); int32_t groupId = getGroupId(pMemBucket->numOfSlots, slotIdx, pMemBucket->times);
SIDList list = getDataBufPagesIdList(pMemBucket->pBuffer, groupId); SIDList list = taosHashGet(pMemBucket->groupPagesMap, &groupId, sizeof(groupId));
int32_t offset = 0; int32_t offset = 0;
for(int32_t i = 0; i < list->size; ++i) { for(int32_t i = 0; i < list->size; ++i) {
...@@ -97,11 +97,11 @@ double findOnlyResult(tMemBucket *pMemBucket) { ...@@ -97,11 +97,11 @@ double findOnlyResult(tMemBucket *pMemBucket) {
} }
int32_t groupId = getGroupId(pMemBucket->numOfSlots, i, pMemBucket->times); int32_t groupId = getGroupId(pMemBucket->numOfSlots, i, pMemBucket->times);
SIDList list = getDataBufPagesIdList(pMemBucket->pBuffer, groupId); SArray* list = *(SArray**)taosHashGet(pMemBucket->groupPagesMap, &groupId, sizeof(groupId));
assert(list->size == 1); assert(list->size == 1);
struct SPageInfo* pgInfo = (struct SPageInfo*) taosArrayGetP(list, 0); int32_t* pageId = taosArrayGet(list, 0);
SFilePage* pPage = getBufPage(pMemBucket->pBuffer, getPageId(pgInfo)); SFilePage* pPage = getBufPage(pMemBucket->pBuffer, *pageId);
assert(pPage->num == 1); assert(pPage->num == 1);
double v = 0; double v = 0;
...@@ -233,7 +233,7 @@ tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType, double minval, ...@@ -233,7 +233,7 @@ tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType, double minval,
pBucket->times = 1; pBucket->times = 1;
pBucket->maxCapacity = 200000; pBucket->maxCapacity = 200000;
pBucket->groupPagesMap = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
if (setBoundingBox(&pBucket->range, pBucket->type, minval, maxval) != 0) { if (setBoundingBox(&pBucket->range, pBucket->type, minval, maxval) != 0) {
// qError("MemBucket:%p, invalid value range: %f-%f", pBucket, minval, maxval); // qError("MemBucket:%p, invalid value range: %f-%f", pBucket, minval, maxval);
taosMemoryFree(pBucket); taosMemoryFree(pBucket);
...@@ -280,8 +280,16 @@ void tMemBucketDestroy(tMemBucket *pBucket) { ...@@ -280,8 +280,16 @@ void tMemBucketDestroy(tMemBucket *pBucket) {
return; return;
} }
void* p = taosHashIterate(pBucket->groupPagesMap, NULL);
while(p) {
SArray** p1 = p;
p = taosHashIterate(pBucket->groupPagesMap, p);
taosArrayDestroy(*p1);
}
destroyDiskbasedBuf(pBucket->pBuffer); destroyDiskbasedBuf(pBucket->pBuffer);
taosMemoryFreeClear(pBucket->pSlots); taosMemoryFreeClear(pBucket->pSlots);
taosHashCleanup(pBucket->groupPagesMap);
taosMemoryFreeClear(pBucket); taosMemoryFreeClear(pBucket);
} }
...@@ -357,8 +365,16 @@ int32_t tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size) { ...@@ -357,8 +365,16 @@ int32_t tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size) {
pSlot->info.data = NULL; pSlot->info.data = NULL;
} }
SArray* pPageIdList = (SArray*)taosHashGet(pBucket->groupPagesMap, &groupId, sizeof(groupId));
if (pPageIdList == NULL) {
SArray* pList = taosArrayInit(4, sizeof(int32_t));
taosHashPut(pBucket->groupPagesMap, &groupId, sizeof(groupId), &pList, POINTER_BYTES);
pPageIdList = pList;
}
pSlot->info.data = getNewBufPage(pBucket->pBuffer, groupId, &pageId); pSlot->info.data = getNewBufPage(pBucket->pBuffer, groupId, &pageId);
pSlot->info.pageId = pageId; pSlot->info.pageId = pageId;
taosArrayPush(pPageIdList, &pageId);
} }
memcpy(pSlot->info.data->data + pSlot->info.data->num * pBucket->bytes, d, pBucket->bytes); memcpy(pSlot->info.data->data + pSlot->info.data->num * pBucket->bytes, d, pBucket->bytes);
...@@ -476,7 +492,7 @@ double getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction) ...@@ -476,7 +492,7 @@ double getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction)
resetSlotInfo(pMemBucket); resetSlotInfo(pMemBucket);
int32_t groupId = getGroupId(pMemBucket->numOfSlots, i, pMemBucket->times - 1); int32_t groupId = getGroupId(pMemBucket->numOfSlots, i, pMemBucket->times - 1);
SIDList list = getDataBufPagesIdList(pMemBucket->pBuffer, groupId); SIDList list = taosHashGet(pMemBucket->groupPagesMap, &groupId, sizeof(groupId));
assert(list->size > 0); assert(list->size > 0);
for (int32_t f = 0; f < list->size; ++f) { for (int32_t f = 0; f < list->size; ++f) {
......
...@@ -33,7 +33,7 @@ struct SDiskbasedBuf { ...@@ -33,7 +33,7 @@ struct SDiskbasedBuf {
int32_t pageSize; // current used page size int32_t pageSize; // current used page size
int32_t inMemPages; // numOfPages that are allocated in memory int32_t inMemPages; // numOfPages that are allocated in memory
SList* freePgList; // free page list SList* freePgList; // free page list
SHashObj* groupSet; // id hash table, todo remove it SArray* pIdList; // page id list
SHashObj* all; SHashObj* all;
SList* lruList; SList* lruList;
void* emptyDummyIdList; // dummy id list void* emptyDummyIdList; // dummy id list
...@@ -241,26 +241,7 @@ static int32_t loadPageFromDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) { ...@@ -241,26 +241,7 @@ static int32_t loadPageFromDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
return 0; return 0;
} }
static SIDList addNewGroup(SDiskbasedBuf* pBuf, int32_t groupId) { static SPageInfo* registerPage(SDiskbasedBuf* pBuf, int32_t pageId) {
assert(taosHashGet(pBuf->groupSet, (const char*)&groupId, sizeof(int32_t)) == NULL);
SArray* pa = taosArrayInit(1, POINTER_BYTES);
int32_t ret = taosHashPut(pBuf->groupSet, (const char*)&groupId, sizeof(int32_t), &pa, POINTER_BYTES);
assert(ret == 0);
return pa;
}
static SPageInfo* registerPage(SDiskbasedBuf* pBuf, int32_t groupId, int32_t pageId) {
SIDList list = NULL;
char** p = taosHashGet(pBuf->groupSet, (const char*)&groupId, sizeof(int32_t));
if (p == NULL) { // it is a new group id
list = addNewGroup(pBuf, groupId);
} else {
list = (SIDList)(*p);
}
pBuf->numOfPages += 1; pBuf->numOfPages += 1;
SPageInfo* ppi = taosMemoryMalloc(sizeof(SPageInfo)); SPageInfo* ppi = taosMemoryMalloc(sizeof(SPageInfo));
...@@ -273,7 +254,7 @@ static SPageInfo* registerPage(SDiskbasedBuf* pBuf, int32_t groupId, int32_t pag ...@@ -273,7 +254,7 @@ static SPageInfo* registerPage(SDiskbasedBuf* pBuf, int32_t groupId, int32_t pag
ppi->pn = NULL; ppi->pn = NULL;
ppi->dirty = false; ppi->dirty = false;
return *(SPageInfo**)taosArrayPush(list, &ppi); return *(SPageInfo**)taosArrayPush(pBuf->pIdList, &ppi);
} }
static SListNode* getEldestUnrefedPage(SDiskbasedBuf* pBuf) { static SListNode* getEldestUnrefedPage(SDiskbasedBuf* pBuf) {
...@@ -372,7 +353,8 @@ int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMem ...@@ -372,7 +353,8 @@ int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMem
// init id hash table // init id hash table
_hash_fn_t fn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT); _hash_fn_t fn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT);
pPBuf->groupSet = taosHashInit(10, fn, true, false); pPBuf->pIdList = taosArrayInit(4, POINTER_BYTES);
pPBuf->assistBuf = taosMemoryMalloc(pPBuf->pageSize + 2); // EXTRA BYTES pPBuf->assistBuf = taosMemoryMalloc(pPBuf->pageSize + 2); // EXTRA BYTES
pPBuf->all = taosHashInit(10, fn, true, false); pPBuf->all = taosHashInit(10, fn, true, false);
...@@ -415,7 +397,7 @@ void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t groupId, int32_t* pageId) { ...@@ -415,7 +397,7 @@ void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t groupId, int32_t* pageId) {
*pageId = (++pBuf->allocateId); *pageId = (++pBuf->allocateId);
// register page id info // register page id info
pi = registerPage(pBuf, groupId, *pageId); pi = registerPage(pBuf, *pageId);
// add to hash map // add to hash map
taosHashPut(pBuf->all, pageId, sizeof(int32_t), &pi, POINTER_BYTES); taosHashPut(pBuf->all, pageId, sizeof(int32_t), &pi, POINTER_BYTES);
...@@ -516,19 +498,11 @@ void releaseBufPageInfo(SDiskbasedBuf* pBuf, SPageInfo* pi) { ...@@ -516,19 +498,11 @@ void releaseBufPageInfo(SDiskbasedBuf* pBuf, SPageInfo* pi) {
pBuf->statis.releasePages += 1; pBuf->statis.releasePages += 1;
} }
size_t getNumOfBufGroupId(const SDiskbasedBuf* pBuf) { return taosHashGetSize(pBuf->groupSet); }
size_t getTotalBufSize(const SDiskbasedBuf* pBuf) { return (size_t)pBuf->totalBufSize; } size_t getTotalBufSize(const SDiskbasedBuf* pBuf) { return (size_t)pBuf->totalBufSize; }
SIDList getDataBufPagesIdList(SDiskbasedBuf* pBuf, int32_t groupId) { SIDList getDataBufPagesIdList(SDiskbasedBuf* pBuf) {
assert(pBuf != NULL); ASSERT(pBuf != NULL);
return pBuf->pIdList;
char** p = taosHashGet(pBuf->groupSet, (const char*)&groupId, sizeof(int32_t));
if (p == NULL) { // it is a new group id
return pBuf->emptyDummyIdList;
} else {
return (SArray*)(*p);
}
} }
void destroyDiskbasedBuf(SDiskbasedBuf* pBuf) { void destroyDiskbasedBuf(SDiskbasedBuf* pBuf) {
...@@ -568,26 +542,21 @@ void destroyDiskbasedBuf(SDiskbasedBuf* pBuf) { ...@@ -568,26 +542,21 @@ void destroyDiskbasedBuf(SDiskbasedBuf* pBuf) {
taosRemoveFile(pBuf->path); taosRemoveFile(pBuf->path);
taosMemoryFreeClear(pBuf->path); taosMemoryFreeClear(pBuf->path);
SArray** p = taosHashIterate(pBuf->groupSet, NULL); size_t n = taosArrayGetSize(pBuf->pIdList);
while (p) { for (int32_t i = 0; i < n; ++i) {
size_t n = taosArrayGetSize(*p); SPageInfo* pi = taosArrayGetP(pBuf->pIdList, i);
for (int32_t i = 0; i < n; ++i) { taosMemoryFreeClear(pi->pData);
SPageInfo* pi = taosArrayGetP(*p, i); taosMemoryFreeClear(pi);
taosMemoryFreeClear(pi->pData);
taosMemoryFreeClear(pi);
}
taosArrayDestroy(*p);
p = taosHashIterate(pBuf->groupSet, p);
} }
taosArrayDestroy(pBuf->pIdList);
tdListFree(pBuf->lruList); tdListFree(pBuf->lruList);
tdListFree(pBuf->freePgList); tdListFree(pBuf->freePgList);
taosArrayDestroy(pBuf->emptyDummyIdList); taosArrayDestroy(pBuf->emptyDummyIdList);
taosArrayDestroy(pBuf->pFree); taosArrayDestroy(pBuf->pFree);
taosHashCleanup(pBuf->groupSet);
taosHashCleanup(pBuf->all); taosHashCleanup(pBuf->all);
taosMemoryFreeClear(pBuf->id); taosMemoryFreeClear(pBuf->id);
...@@ -662,25 +631,21 @@ void dBufPrintStatis(const SDiskbasedBuf* pBuf) { ...@@ -662,25 +631,21 @@ void dBufPrintStatis(const SDiskbasedBuf* pBuf) {
} }
void clearDiskbasedBuf(SDiskbasedBuf* pBuf) { void clearDiskbasedBuf(SDiskbasedBuf* pBuf) {
SArray** p = taosHashIterate(pBuf->groupSet, NULL); size_t n = taosArrayGetSize(pBuf->pIdList);
while (p) { for (int32_t i = 0; i < n; ++i) {
size_t n = taosArrayGetSize(*p); SPageInfo* pi = taosArrayGetP(pBuf->pIdList, i);
for (int32_t i = 0; i < n; ++i) { taosMemoryFreeClear(pi->pData);
SPageInfo* pi = taosArrayGetP(*p, i); taosMemoryFreeClear(pi);
taosMemoryFreeClear(pi->pData);
taosMemoryFreeClear(pi);
}
taosArrayDestroy(*p);
p = taosHashIterate(pBuf->groupSet, p);
} }
taosArrayDestroy(pBuf->pIdList);
tdListEmpty(pBuf->lruList); tdListEmpty(pBuf->lruList);
tdListEmpty(pBuf->freePgList); tdListEmpty(pBuf->freePgList);
taosArrayClear(pBuf->emptyDummyIdList); taosArrayClear(pBuf->emptyDummyIdList);
taosArrayClear(pBuf->pFree); taosArrayClear(pBuf->pFree);
taosHashClear(pBuf->groupSet);
taosHashClear(pBuf->all); taosHashClear(pBuf->all);
pBuf->numOfPages = 0; // all pages are in buffer in the first place pBuf->numOfPages = 0; // all pages are in buffer in the first place
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册