From 655dbb47c197ed685b4ef2543fbfd8d4457fd337 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 26 Aug 2022 19:19:48 +0800 Subject: [PATCH] refactor(query): do some internal refactor. --- include/libs/function/function.h | 1 - include/util/tpagedbuf.h | 10 +-- source/dnode/vnode/src/tsdb/tsdbRead.c | 1 + source/libs/executor/src/executil.c | 1 - source/libs/executor/src/executorimpl.c | 16 +--- source/libs/executor/src/timewindowoperator.c | 3 +- source/libs/executor/src/tsort.c | 19 +++-- source/libs/function/inc/tpercentile.h | 28 +++---- source/libs/function/src/tpercentile.c | 28 +++++-- source/util/src/tpagedbuf.c | 81 ++++++------------- 10 files changed, 79 insertions(+), 109 deletions(-) diff --git a/include/libs/function/function.h b/include/libs/function/function.h index faf35bf03c..c8db01625e 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -141,7 +141,6 @@ typedef struct SqlFunctionCtx { struct SSDataBlock *pSrcBlock; struct SSDataBlock *pDstBlock; // used by indefinite rows function to set selectivity int32_t curBufPage; - bool increase; bool isStream; char udfName[TSDB_FUNC_NAME_LEN]; diff --git a/include/util/tpagedbuf.h b/include/util/tpagedbuf.h index ef266068cb..57a489c0dd 100644 --- a/include/util/tpagedbuf.h +++ b/include/util/tpagedbuf.h @@ -67,10 +67,9 @@ void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t groupId, int32_t* pageId); /** * * @param pBuf - * @param groupId * @return */ -SIDList getDataBufPagesIdList(SDiskbasedBuf* pBuf, int32_t groupId); +SIDList getDataBufPagesIdList(SDiskbasedBuf* pBuf); /** * get the specified buffer page by id @@ -101,13 +100,6 @@ void releaseBufPageInfo(SDiskbasedBuf* pBuf, struct SPageInfo* pi); */ size_t getTotalBufSize(const SDiskbasedBuf* pBuf); -/** - * get the number of groups in the result buffer - * @param pBuf - * @return - */ -size_t getNumOfBufGroupId(const SDiskbasedBuf* pBuf); - /** * destroy result buffer * @param pBuf diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 9aeec57948..be8e809a72 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -888,6 +888,7 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte pBlockIter->numOfBlocks = numOfBlocks; taosArrayClear(pBlockIter->blockList); + pBlockIter->pTableMap = pReader->status.pTableMap; // access data blocks according to the offset of each block in asc/desc order. int32_t numOfTables = (int32_t)taosHashGetSize(pReader->status.pTableMap); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index b89579a017..4b018f81ef 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1216,7 +1216,6 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, pCtx->start.key = INT64_MIN; pCtx->end.key = INT64_MIN; pCtx->numOfParams = pExpr->base.numOfParams; - pCtx->increase = false; pCtx->isStream = false; pCtx->param = pFunct->pParam; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index dc9217bf65..cf6940c52a 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -184,7 +184,7 @@ SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int // in the first scan, new space needed for results int32_t pageId = -1; - SIDList list = getDataBufPagesIdList(pResultBuf, tableGroupId); + SIDList list = getDataBufPagesIdList(pResultBuf); if (taosArrayGetSize(list) == 0) { pData = getNewBufPage(pResultBuf, tableGroupId, &pageId); @@ -299,7 +299,7 @@ static int32_t addNewWindowResultBuf(SResultRow* pWindowRes, SDiskbasedBuf* pRes // in the first scan, new space needed for results int32_t pageId = -1; - SIDList list = getDataBufPagesIdList(pResultBuf, tid); + SIDList list = getDataBufPagesIdList(pResultBuf); if (taosArrayGetSize(list) == 0) { pData = getNewBufPage(pResultBuf, tid, &pageId); @@ -1565,16 +1565,8 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprS // the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows. SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId); char* in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo); - if (pCtx[j].increase) { - int64_t ts = *(int64_t*)in; - for (int32_t k = 0; k < pRow->numOfRows; ++k) { - colDataAppend(pColInfoData, pBlock->info.rows + k, (const char*)&ts, pCtx[j].resultInfo->isNullRes); - ts++; - } - } else { - for (int32_t k = 0; k < pRow->numOfRows; ++k) { - colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes); - } + for (int32_t k = 0; k < pRow->numOfRows; ++k) { + colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes); } } } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index b6e31f9a31..4909c8d387 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -637,6 +637,7 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); setNotInterpoWindowKey(pSup->pCtx, numOfExprs, RESULT_ROW_START_INTERP); + updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &w, true); doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, 0, pBlock->info.rows, numOfExprs); @@ -1808,7 +1809,7 @@ static bool timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SInt void increaseTs(SqlFunctionCtx* pCtx) { if (pCtx[0].pExpr->pExpr->_function.pFunctNode->funcType == FUNCTION_TYPE_WSTART) { - pCtx[0].increase = true; +// pCtx[0].increase = true; } } diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 48af951773..fc411e850a 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -97,7 +97,7 @@ SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t page return pSortHandle; } -static int32_t sortComparClearup(SMsortComparParam* cmpParam) { +static int32_t sortComparCleanup(SMsortComparParam* cmpParam) { for(int32_t i = 0; i < cmpParam->numOfSources; ++i) { SSortSource* pSource = cmpParam->pSources[i]; // NOTICE: pSource may be SGenericSource *, if it is SORT_MULTISOURCE_MERGE blockDataDestroy(pSource->src.pBlock); @@ -134,15 +134,14 @@ int32_t tsortAddSource(SSortHandle* pSortHandle, void* pSource) { return TSDB_CODE_SUCCESS; } -static int32_t doAddNewExternalMemSource(SDiskbasedBuf *pBuf, SArray* pAllSources, SSDataBlock* pBlock, int32_t* sourceId) { +static int32_t doAddNewExternalMemSource(SDiskbasedBuf *pBuf, SArray* pAllSources, SSDataBlock* pBlock, int32_t* sourceId, SArray* pPageIdList) { SSortSource* pSource = taosMemoryCalloc(1, sizeof(SSortSource)); if (pSource == NULL) { return TSDB_CODE_QRY_OUT_OF_MEMORY; } - pSource->pageIdList = getDataBufPagesIdList(pBuf, (*sourceId)); pSource->src.pBlock = pBlock; - + pSource->pageIdList = pPageIdList; taosArrayPush(pAllSources, &pSource); (*sourceId) += 1; @@ -171,6 +170,7 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) { } } + SArray* pPageIdList = taosArrayInit(4, sizeof(int32_t)); while(start < pDataBlock->info.rows) { int32_t stop = 0; blockDataSplitRows(pDataBlock, pDataBlock->info.hasVarCol, start, &stop, pHandle->pageSize); @@ -186,6 +186,8 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) { return terrno; } + taosArrayPush(pPageIdList, &pageId); + int32_t size = blockDataGetSize(p) + sizeof(int32_t) + taosArrayGetSize(p->pDataBlock) * sizeof(int32_t); assert(size <= getBufPageSize(pHandle->pBuf)); @@ -201,7 +203,7 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) { blockDataCleanup(pDataBlock); SSDataBlock* pBlock = createOneDataBlock(pDataBlock, false); - return doAddNewExternalMemSource(pHandle->pBuf, pHandle->pOrderedSource, pBlock, &pHandle->sourceId); + return doAddNewExternalMemSource(pHandle->pBuf, pHandle->pOrderedSource, pBlock, &pHandle->sourceId, pPageIdList); } static void setCurrentSourceIsDone(SSortSource* pSource, SSortHandle* pHandle) { @@ -502,6 +504,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { return code; } + SArray* pPageIdList = taosArrayInit(4, sizeof(int32_t)); while (1) { SSDataBlock* pDataBlock = getSortedBlockDataInner(pHandle, &pHandle->cmpParam, numOfRows); if (pDataBlock == NULL) { @@ -514,6 +517,8 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { return terrno; } + taosArrayPush(pPageIdList, &pageId); + int32_t size = blockDataGetSize(pDataBlock) + sizeof(int32_t) + taosArrayGetSize(pDataBlock->pDataBlock) * sizeof(int32_t); assert(size <= getBufPageSize(pHandle->pBuf)); @@ -525,12 +530,12 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { blockDataCleanup(pDataBlock); } - sortComparClearup(&pHandle->cmpParam); + sortComparCleanup(&pHandle->cmpParam); tMergeTreeDestroy(pHandle->pMergeTree); pHandle->numOfCompletedSources = 0; SSDataBlock* pBlock = createOneDataBlock(pHandle->pDataBlock, false); - code = doAddNewExternalMemSource(pHandle->pBuf, pResList, pBlock, &pHandle->sourceId); + code = doAddNewExternalMemSource(pHandle->pBuf, pResList, pBlock, &pHandle->sourceId, pPageIdList); if (code != 0) { return code; } diff --git a/source/libs/function/inc/tpercentile.h b/source/libs/function/inc/tpercentile.h index dfb52f7694..554f9e567f 100644 --- a/source/libs/function/inc/tpercentile.h +++ b/source/libs/function/inc/tpercentile.h @@ -51,20 +51,20 @@ struct tMemBucket; typedef int32_t (*__perc_hash_func_t)(struct tMemBucket *pBucket, const void *value); typedef struct tMemBucket { - int16_t numOfSlots; - int16_t type; - int16_t bytes; - int32_t total; - int32_t elemPerPage; // number of elements for each object - int32_t maxCapacity; // maximum allowed number of elements that can be sort directly to get the result - int32_t bufPageSize; // disk page size - MinMaxEntry range; // value range - int32_t times; // count that has been checked for deciding the correct data value buckets. - __compar_fn_t comparFn; - - tMemBucketSlot * pSlots; - SDiskbasedBuf *pBuffer; - __perc_hash_func_t hashFunc; + int16_t numOfSlots; + int16_t type; + int16_t bytes; + int32_t total; + int32_t elemPerPage; // number of elements for each object + int32_t maxCapacity; // maximum allowed number of elements that can be sort directly to get the result + int32_t bufPageSize; // disk page size + MinMaxEntry range; // value range + int32_t times; // count that has been checked for deciding the correct data value buckets. + __compar_fn_t comparFn; + tMemBucketSlot* pSlots; + SDiskbasedBuf* pBuffer; + __perc_hash_func_t hashFunc; + SHashObj* groupPagesMap; // disk page map for different groups; } tMemBucket; tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType, double minval, double maxval); diff --git a/source/libs/function/src/tpercentile.c b/source/libs/function/src/tpercentile.c index 517253dc01..e6da187a35 100644 --- a/source/libs/function/src/tpercentile.c +++ b/source/libs/function/src/tpercentile.c @@ -33,7 +33,7 @@ static SFilePage *loadDataFromFilePage(tMemBucket *pMemBucket, int32_t slotIdx) SFilePage *buffer = (SFilePage *)taosMemoryCalloc(1, pMemBucket->bytes * pMemBucket->pSlots[slotIdx].info.size + sizeof(SFilePage)); int32_t groupId = getGroupId(pMemBucket->numOfSlots, slotIdx, pMemBucket->times); - SIDList list = getDataBufPagesIdList(pMemBucket->pBuffer, groupId); + SIDList list = taosHashGet(pMemBucket->groupPagesMap, &groupId, sizeof(groupId)); int32_t offset = 0; for(int32_t i = 0; i < list->size; ++i) { @@ -97,11 +97,11 @@ double findOnlyResult(tMemBucket *pMemBucket) { } int32_t groupId = getGroupId(pMemBucket->numOfSlots, i, pMemBucket->times); - SIDList list = getDataBufPagesIdList(pMemBucket->pBuffer, groupId); + SArray* list = *(SArray**)taosHashGet(pMemBucket->groupPagesMap, &groupId, sizeof(groupId)); assert(list->size == 1); - struct SPageInfo* pgInfo = (struct SPageInfo*) taosArrayGetP(list, 0); - SFilePage* pPage = getBufPage(pMemBucket->pBuffer, getPageId(pgInfo)); + int32_t* pageId = taosArrayGet(list, 0); + SFilePage* pPage = getBufPage(pMemBucket->pBuffer, *pageId); assert(pPage->num == 1); double v = 0; @@ -233,7 +233,7 @@ tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType, double minval, pBucket->times = 1; pBucket->maxCapacity = 200000; - + pBucket->groupPagesMap = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); if (setBoundingBox(&pBucket->range, pBucket->type, minval, maxval) != 0) { // qError("MemBucket:%p, invalid value range: %f-%f", pBucket, minval, maxval); taosMemoryFree(pBucket); @@ -280,8 +280,16 @@ void tMemBucketDestroy(tMemBucket *pBucket) { return; } + void* p = taosHashIterate(pBucket->groupPagesMap, NULL); + while(p) { + SArray** p1 = p; + p = taosHashIterate(pBucket->groupPagesMap, p); + taosArrayDestroy(*p1); + } + destroyDiskbasedBuf(pBucket->pBuffer); taosMemoryFreeClear(pBucket->pSlots); + taosHashCleanup(pBucket->groupPagesMap); taosMemoryFreeClear(pBucket); } @@ -357,8 +365,16 @@ int32_t tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size) { pSlot->info.data = NULL; } + SArray* pPageIdList = (SArray*)taosHashGet(pBucket->groupPagesMap, &groupId, sizeof(groupId)); + if (pPageIdList == NULL) { + SArray* pList = taosArrayInit(4, sizeof(int32_t)); + taosHashPut(pBucket->groupPagesMap, &groupId, sizeof(groupId), &pList, POINTER_BYTES); + pPageIdList = pList; + } + pSlot->info.data = getNewBufPage(pBucket->pBuffer, groupId, &pageId); pSlot->info.pageId = pageId; + taosArrayPush(pPageIdList, &pageId); } memcpy(pSlot->info.data->data + pSlot->info.data->num * pBucket->bytes, d, pBucket->bytes); @@ -476,7 +492,7 @@ double getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction) resetSlotInfo(pMemBucket); int32_t groupId = getGroupId(pMemBucket->numOfSlots, i, pMemBucket->times - 1); - SIDList list = getDataBufPagesIdList(pMemBucket->pBuffer, groupId); + SIDList list = taosHashGet(pMemBucket->groupPagesMap, &groupId, sizeof(groupId)); assert(list->size > 0); for (int32_t f = 0; f < list->size; ++f) { diff --git a/source/util/src/tpagedbuf.c b/source/util/src/tpagedbuf.c index fbade7b074..4710af3da1 100644 --- a/source/util/src/tpagedbuf.c +++ b/source/util/src/tpagedbuf.c @@ -33,7 +33,7 @@ struct SDiskbasedBuf { int32_t pageSize; // current used page size int32_t inMemPages; // numOfPages that are allocated in memory SList* freePgList; // free page list - SHashObj* groupSet; // id hash table, todo remove it + SArray* pIdList; // page id list SHashObj* all; SList* lruList; void* emptyDummyIdList; // dummy id list @@ -241,26 +241,7 @@ static int32_t loadPageFromDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) { return 0; } -static SIDList addNewGroup(SDiskbasedBuf* pBuf, int32_t groupId) { - assert(taosHashGet(pBuf->groupSet, (const char*)&groupId, sizeof(int32_t)) == NULL); - - SArray* pa = taosArrayInit(1, POINTER_BYTES); - int32_t ret = taosHashPut(pBuf->groupSet, (const char*)&groupId, sizeof(int32_t), &pa, POINTER_BYTES); - assert(ret == 0); - - return pa; -} - -static SPageInfo* registerPage(SDiskbasedBuf* pBuf, int32_t groupId, int32_t pageId) { - SIDList list = NULL; - - char** p = taosHashGet(pBuf->groupSet, (const char*)&groupId, sizeof(int32_t)); - if (p == NULL) { // it is a new group id - list = addNewGroup(pBuf, groupId); - } else { - list = (SIDList)(*p); - } - +static SPageInfo* registerPage(SDiskbasedBuf* pBuf, int32_t pageId) { pBuf->numOfPages += 1; SPageInfo* ppi = taosMemoryMalloc(sizeof(SPageInfo)); @@ -273,7 +254,7 @@ static SPageInfo* registerPage(SDiskbasedBuf* pBuf, int32_t groupId, int32_t pag ppi->pn = NULL; ppi->dirty = false; - return *(SPageInfo**)taosArrayPush(list, &ppi); + return *(SPageInfo**)taosArrayPush(pBuf->pIdList, &ppi); } static SListNode* getEldestUnrefedPage(SDiskbasedBuf* pBuf) { @@ -372,7 +353,8 @@ int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMem // init id hash table _hash_fn_t fn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT); - pPBuf->groupSet = taosHashInit(10, fn, true, false); + pPBuf->pIdList = taosArrayInit(4, POINTER_BYTES); + pPBuf->assistBuf = taosMemoryMalloc(pPBuf->pageSize + 2); // EXTRA BYTES pPBuf->all = taosHashInit(10, fn, true, false); @@ -415,7 +397,7 @@ void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t groupId, int32_t* pageId) { *pageId = (++pBuf->allocateId); // register page id info - pi = registerPage(pBuf, groupId, *pageId); + pi = registerPage(pBuf, *pageId); // add to hash map taosHashPut(pBuf->all, pageId, sizeof(int32_t), &pi, POINTER_BYTES); @@ -516,19 +498,11 @@ void releaseBufPageInfo(SDiskbasedBuf* pBuf, SPageInfo* pi) { pBuf->statis.releasePages += 1; } -size_t getNumOfBufGroupId(const SDiskbasedBuf* pBuf) { return taosHashGetSize(pBuf->groupSet); } - size_t getTotalBufSize(const SDiskbasedBuf* pBuf) { return (size_t)pBuf->totalBufSize; } -SIDList getDataBufPagesIdList(SDiskbasedBuf* pBuf, int32_t groupId) { - assert(pBuf != NULL); - - char** p = taosHashGet(pBuf->groupSet, (const char*)&groupId, sizeof(int32_t)); - if (p == NULL) { // it is a new group id - return pBuf->emptyDummyIdList; - } else { - return (SArray*)(*p); - } +SIDList getDataBufPagesIdList(SDiskbasedBuf* pBuf) { + ASSERT(pBuf != NULL); + return pBuf->pIdList; } void destroyDiskbasedBuf(SDiskbasedBuf* pBuf) { @@ -568,26 +542,21 @@ void destroyDiskbasedBuf(SDiskbasedBuf* pBuf) { taosRemoveFile(pBuf->path); taosMemoryFreeClear(pBuf->path); - SArray** p = taosHashIterate(pBuf->groupSet, NULL); - while (p) { - size_t n = taosArrayGetSize(*p); - for (int32_t i = 0; i < n; ++i) { - SPageInfo* pi = taosArrayGetP(*p, i); - taosMemoryFreeClear(pi->pData); - taosMemoryFreeClear(pi); - } - - taosArrayDestroy(*p); - p = taosHashIterate(pBuf->groupSet, p); + size_t n = taosArrayGetSize(pBuf->pIdList); + for (int32_t i = 0; i < n; ++i) { + SPageInfo* pi = taosArrayGetP(pBuf->pIdList, i); + taosMemoryFreeClear(pi->pData); + taosMemoryFreeClear(pi); } + taosArrayDestroy(pBuf->pIdList); + tdListFree(pBuf->lruList); tdListFree(pBuf->freePgList); taosArrayDestroy(pBuf->emptyDummyIdList); taosArrayDestroy(pBuf->pFree); - taosHashCleanup(pBuf->groupSet); taosHashCleanup(pBuf->all); taosMemoryFreeClear(pBuf->id); @@ -662,25 +631,21 @@ void dBufPrintStatis(const SDiskbasedBuf* pBuf) { } void clearDiskbasedBuf(SDiskbasedBuf* pBuf) { - SArray** p = taosHashIterate(pBuf->groupSet, NULL); - while (p) { - size_t n = taosArrayGetSize(*p); - for (int32_t i = 0; i < n; ++i) { - SPageInfo* pi = taosArrayGetP(*p, i); - taosMemoryFreeClear(pi->pData); - taosMemoryFreeClear(pi); - } - taosArrayDestroy(*p); - p = taosHashIterate(pBuf->groupSet, p); + size_t n = taosArrayGetSize(pBuf->pIdList); + for (int32_t i = 0; i < n; ++i) { + SPageInfo* pi = taosArrayGetP(pBuf->pIdList, i); + taosMemoryFreeClear(pi->pData); + taosMemoryFreeClear(pi); } + taosArrayDestroy(pBuf->pIdList); + tdListEmpty(pBuf->lruList); tdListEmpty(pBuf->freePgList); taosArrayClear(pBuf->emptyDummyIdList); taosArrayClear(pBuf->pFree); - taosHashClear(pBuf->groupSet); taosHashClear(pBuf->all); pBuf->numOfPages = 0; // all pages are in buffer in the first place -- GitLab