diff --git a/src/client/src/tscFunctionImpl.c b/src/client/src/tscFunctionImpl.c index 72ccd5adc6ee835e9e25f9b9b8b2511cad797910..17f6c97ea16223d692d4b8d06c00856ab477d09b 100644 --- a/src/client/src/tscFunctionImpl.c +++ b/src/client/src/tscFunctionImpl.c @@ -2131,6 +2131,11 @@ static STopBotInfo *getTopBotOutputInfo(SQLFunctionCtx *pCtx) { } bool topbot_datablock_filter(SQLFunctionCtx *pCtx, int32_t functionId, const char *minval, const char *maxval) { + SResultInfo *pResInfo = GET_RES_INFO(pCtx); + if (pResInfo == NULL) { + return true; + } + STopBotInfo *pTopBotInfo = getTopBotOutputInfo(pCtx); // required number of results are not reached, continue load data block diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 92fe078c3f8921cfb3875bfb9c64169502791a62..a18f8e27318c0194398f5b8d4231a2120b269e8d 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -42,8 +42,8 @@ typedef struct SSqlGroupbyExpr { } SSqlGroupbyExpr; typedef struct SPosInfo { - int16_t pageId; - int16_t rowId; + int32_t pageId; + int32_t rowId; } SPosInfo; typedef struct SWindowStatus { diff --git a/src/query/inc/qExtbuffer.h b/src/query/inc/qExtbuffer.h index b57c48933f0878ca86384219b1c7f66446db28eb..2cbef2b1bea66654b262a3fb37061477969960f2 100644 --- a/src/query/inc/qExtbuffer.h +++ b/src/query/inc/qExtbuffer.h @@ -28,7 +28,7 @@ extern "C" { #include "tdataformat.h" #include "talgo.h" -#define DEFAULT_PAGE_SIZE (1024L*4) // 16k larger than the SHistoInfo +#define DEFAULT_PAGE_SIZE (1024L*64) // 16k larger than the SHistoInfo #define MAX_TMPFILE_PATH_LENGTH PATH_MAX #define INITIAL_ALLOCATION_BUFFER_SIZE 64 diff --git a/src/query/inc/qResultbuf.h b/src/query/inc/qResultbuf.h index da5f46b5e9fed356d27da7248733c2421ba3404c..ba446b46277f6892421ec290cb377cb0c5b76025 100644 --- a/src/query/inc/qResultbuf.h +++ b/src/query/inc/qResultbuf.h @@ -34,10 +34,11 @@ typedef struct SPageDiskInfo { } SPageDiskInfo; typedef struct SPageInfo { + SListNode* pn; // point to list node int32_t pageId; SPageDiskInfo info; void* pData; - T_REF_DECLARE(); + bool used; // set current page is in used } SPageInfo; typedef struct SFreeListItem { @@ -45,6 +46,15 @@ typedef struct SFreeListItem { int32_t len; } SFreeListItem; +typedef struct SResultBufStatis { + int32_t flushBytes; + int32_t loadBytes; + int32_t getPages; + int32_t releasePages; + int32_t flushPages; + int32_t fileSize; +} SResultBufStatis; + typedef struct SDiskbasedResultBuf { int32_t numOfRowsPerPage; int32_t numOfPages; @@ -64,6 +74,8 @@ typedef struct SDiskbasedResultBuf { void* assistBuf; // assistant buffer for compress data SArray* pFree; // free area in file int32_t nextPos; // next page flush position + + SResultBufStatis statis; } SDiskbasedResultBuf; #define DEFAULT_INTERN_BUF_PAGE_SIZE (1024L) @@ -119,6 +131,16 @@ tFilePage* getResBufPage(SDiskbasedResultBuf* pResultBuf, int32_t id); */ void releaseResBufPage(SDiskbasedResultBuf* pResultBuf, void* page); +void releaseResBufPageInfo(SDiskbasedResultBuf* pResultBuf, SPageInfo* pi); + +/** + * + * @param pResultBuf + * @param id + * @return + */ +//tFilePage* getResBufPage(SDiskbasedResultBuf* pResultBuf, int32_t id); + /** * get the total buffer size in the format of disk file * @param pResultBuf @@ -144,7 +166,7 @@ void destroyResultBuf(SDiskbasedResultBuf* pResultBuf, void* handle); * @param pList * @return */ -int32_t getLastPageId(SIDList pList); +SPageInfo* getLastPageInfo(SIDList pList); #ifdef __cplusplus } diff --git a/src/query/inc/qUtil.h b/src/query/inc/qUtil.h index 7119cb75fe9f6604e42a96406ff631840477d5f7..ed7c7e8845317ac754758b8553e3f65ba8da4544 100644 --- a/src/query/inc/qUtil.h +++ b/src/query/inc/qUtil.h @@ -45,13 +45,14 @@ bool isWindowResClosed(SWindowResInfo *pWindowResInfo, int32_t slot); int32_t createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool isSTableQuery, size_t interBufSize); -static FORCE_INLINE char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int32_t columnIndex, SWindowResult *pResult) { +static FORCE_INLINE char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int32_t columnIndex, SWindowResult *pResult, + tFilePage* page) { assert(pResult != NULL && pRuntimeEnv != NULL); SQuery *pQuery = pRuntimeEnv->pQuery; - tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pResult->pos.pageId); - int32_t realRowId = pResult->pos.rowId * GET_ROW_PARAM_FOR_MULTIOUTPUT(pQuery, pRuntimeEnv->topBotQuery, pRuntimeEnv->stableQuery); +// tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pResult->pos.pageId); + int32_t realRowId = pResult->pos.rowId * GET_ROW_PARAM_FOR_MULTIOUTPUT(pQuery, pRuntimeEnv->topBotQuery, pRuntimeEnv->stableQuery); return ((char *)page->data) + pRuntimeEnv->offset[columnIndex] * pRuntimeEnv->numOfRowsPerPage + pQuery->pSelectExpr[columnIndex].bytes * realRowId; } diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 30d6cc288f452962f4a296989f59d38c7bfb5813..9d873dc95f65c6a12cb3afe928224c7a66ad6eda 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -221,7 +221,7 @@ void updateNumOfResult(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOfRes) { } static int32_t getGroupResultId(int32_t groupIndex) { - int32_t base = 200000; + int32_t base = 20000000; return base + (groupIndex * 10000); } @@ -478,10 +478,14 @@ static int32_t addNewWindowResultBuf(SWindowResult *pWindowRes, SDiskbasedResult if (taosArrayGetSize(list) == 0) { pData = getNewDataBuf(pResultBuf, sid, &pageId); } else { - pageId = getLastPageId(list); - pData = getResBufPage(pResultBuf, pageId); + SPageInfo* pi = getLastPageInfo(list); + pData = getResBufPage(pResultBuf, pi->pageId); + pageId = pi->pageId; if (pData->num >= numOfRowsPerPage) { + // release current page first, and prepare the next one + releaseResBufPageInfo(pResultBuf, pi); + pData = getNewDataBuf(pResultBuf, sid, &pageId); if (pData != NULL) { assert(pData->num == 0); // number of elements must be 0 for new allocated buffer @@ -497,6 +501,8 @@ static int32_t addNewWindowResultBuf(SWindowResult *pWindowRes, SDiskbasedResult if (pWindowRes->pos.pageId == -1) { // not allocated yet, allocate new buffer pWindowRes->pos.pageId = pageId; pWindowRes->pos.rowId = pData->num++; + + assert(pWindowRes->pos.pageId >= 0); } return 0; @@ -2111,9 +2117,6 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, void* pQueryHandle, } if (!needToLoadDataBlock(pRuntimeEnv, *pStatis, pRuntimeEnv->pCtx, pBlockInfo->rows)) { -#if defined(_DEBUG_VIEW) - qDebug("QInfo:%p block discarded by per-filter", GET_QINFO_ADDR(pRuntimeEnv)); -#endif // current block has been discard due to filter applied pRuntimeEnv->summary.discardBlocks += 1; qDebug("QInfo:%p data block discard, brange:%"PRId64 "-%"PRId64", rows:%d", GET_QINFO_ADDR(pRuntimeEnv), @@ -2446,6 +2449,8 @@ static void doMerge(SQueryRuntimeEnv *pRuntimeEnv, int64_t timestamp, SWindowRes SQuery * pQuery = pRuntimeEnv->pQuery; SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; + tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes->pos.pageId); + for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { int32_t functionId = pQuery->pSelectExpr[i].base.functionId; if (!mergeFlag) { @@ -2458,7 +2463,7 @@ static void doMerge(SQueryRuntimeEnv *pRuntimeEnv, int64_t timestamp, SWindowRes pCtx[i].hasNull = true; pCtx[i].nStartQueryTimestamp = timestamp; - pCtx[i].aInputElemBuf = getPosInResultPage(pRuntimeEnv, i, pWindowRes); + pCtx[i].aInputElemBuf = getPosInResultPage(pRuntimeEnv, i, pWindowRes, page); // in case of tag column, the tag information should be extracted from input buffer if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TAG) { @@ -2615,14 +2620,16 @@ int32_t tableResultComparFn(const void *pLeft, const void *pRight, void *param) SWindowResInfo *pWindowResInfo1 = &supporter->pTableQueryInfo[left]->windowResInfo; SWindowResult * pWindowRes1 = getWindowResult(pWindowResInfo1, leftPos); + tFilePage *page1 = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes1->pos.pageId); - char *b1 = getPosInResultPage(pRuntimeEnv, PRIMARYKEY_TIMESTAMP_COL_INDEX, pWindowRes1); + char *b1 = getPosInResultPage(pRuntimeEnv, PRIMARYKEY_TIMESTAMP_COL_INDEX, pWindowRes1, page1); TSKEY leftTimestamp = GET_INT64_VAL(b1); SWindowResInfo *pWindowResInfo2 = &supporter->pTableQueryInfo[right]->windowResInfo; SWindowResult * pWindowRes2 = getWindowResult(pWindowResInfo2, rightPos); + tFilePage *page2 = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes2->pos.pageId); - char *b2 = getPosInResultPage(pRuntimeEnv, PRIMARYKEY_TIMESTAMP_COL_INDEX, pWindowRes2); + char *b2 = getPosInResultPage(pRuntimeEnv, PRIMARYKEY_TIMESTAMP_COL_INDEX, pWindowRes2, page2); TSKEY rightTimestamp = GET_INT64_VAL(b2); if (leftTimestamp == rightTimestamp) { @@ -2685,35 +2692,26 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) { int32_t id = getGroupResultId(pQInfo->groupIndex - 1); SIDList list = getDataBufPagesIdList(pResultBuf, pQInfo->offset + id); - int32_t total = 0; int32_t size = taosArrayGetSize(list); - for (int32_t i = 0; i < size; ++i) { - int32_t* pgId = taosArrayGet(list, i); - tFilePage *pData = getResBufPage(pResultBuf, *pgId); - total += pData->num; - } - - int32_t rows = total; int32_t offset = 0; for (int32_t j = 0; j < size; ++j) { - int32_t* pgId = taosArrayGet(list, j); - tFilePage *pData = getResBufPage(pResultBuf, *pgId); + SPageInfo* pi = *(SPageInfo**) taosArrayGet(list, j); + tFilePage *pData = getResBufPage(pResultBuf, pi->pageId); for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { int32_t bytes = pRuntimeEnv->pCtx[i].outputBytes; char * pDest = pQuery->sdata[i]->data; - - memcpy(pDest + offset * bytes, pData->data + pRuntimeEnv->offset[i] * pData->num, - bytes * pData->num); + memcpy(pDest + offset * bytes, pData->data + pRuntimeEnv->offset[i] * pData->num, bytes * pData->num); } +// rows += pData->num; offset += pData->num; } assert(pQuery->rec.rows == 0); - pQuery->rec.rows += rows; + pQuery->rec.rows += offset; pQInfo->offset += 1; } @@ -2777,7 +2775,6 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) { assert(pQInfo->numOfGroupResultPages == 0); return 0; } else if (numOfTables == 1) { // no need to merge results since only one table in each group - } SCompSupporter cs = {pTableList, posList, pQInfo}; @@ -2802,8 +2799,9 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) { SWindowResInfo *pWindowResInfo = &pTableList[pos]->windowResInfo; SWindowResult * pWindowRes = getWindowResult(pWindowResInfo, cs.position[pos]); + tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes->pos.pageId); - char *b = getPosInResultPage(pRuntimeEnv, PRIMARYKEY_TIMESTAMP_COL_INDEX, pWindowRes); + char *b = getPosInResultPage(pRuntimeEnv, PRIMARYKEY_TIMESTAMP_COL_INDEX, pWindowRes, page); TSKEY ts = GET_INT64_VAL(b); assert(ts == pWindowRes->window.skey); @@ -3517,9 +3515,11 @@ void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pResult SQuery *pQuery = pRuntimeEnv->pQuery; // Note: pResult->pos[i]->num == 0, there is only fixed number of results for each group + tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pResult->pos.pageId); + for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i]; - pCtx->aOutputBuf = getPosInResultPage(pRuntimeEnv, i, pResult); + pCtx->aOutputBuf = getPosInResultPage(pRuntimeEnv, i, pResult, page); int32_t functionId = pQuery->pSelectExpr[i].base.functionId; if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) { @@ -3542,6 +3542,8 @@ void setWindowResOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult * SQuery *pQuery = pRuntimeEnv->pQuery; // Note: pResult->pos[i]->num == 0, there is only fixed number of results for each group + tFilePage* bufPage = getResBufPage(pRuntimeEnv->pResultBuf, pResult->pos.pageId); + for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i]; @@ -3550,7 +3552,7 @@ void setWindowResOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult * continue; } - pCtx->aOutputBuf = getPosInResultPage(pRuntimeEnv, i, pResult); + pCtx->aOutputBuf = getPosInResultPage(pRuntimeEnv, i, pResult, bufPage); pCtx->currentStage = 0; int32_t functionId = pCtx->functionId; @@ -3713,11 +3715,13 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResInfo *pResultInfo, int32_ pQInfo->groupIndex += 1; } + tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, result[i].pos.pageId); + for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { int32_t size = pRuntimeEnv->pCtx[j].outputBytes; char *out = pQuery->sdata[j]->data + numOfResult * size; - char *in = getPosInResultPage(pRuntimeEnv, j, &result[i]); + char *in = getPosInResultPage(pRuntimeEnv, j, &result[i], page); memcpy(out, in + oldOffset * size, size * numOfRowsToCopy); } @@ -4240,8 +4244,8 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo getIntermediateBufInfo(pRuntimeEnv, &ps, &rowsize); if (isSTableQuery && !onlyQueryTags(pRuntimeEnv->pQuery)) { - int32_t numOfPages = getInitialPageNum(pQInfo); - code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, numOfPages, rowsize, ps, numOfPages, pQInfo); +// int32_t numOfPages = getInitialPageNum(pQInfo); + code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, 2, rowsize, ps, 2, pQInfo); if (code != TSDB_CODE_SUCCESS) { return code; } diff --git a/src/query/src/qResultbuf.c b/src/query/src/qResultbuf.c index fa2bb814f2b0dfab94e1e43796b34bc42e486476..fbb5f116e6253c3d96aa4392fc2985ec1427cb21 100644 --- a/src/query/src/qResultbuf.c +++ b/src/query/src/qResultbuf.c @@ -1,11 +1,13 @@ #include "qResultbuf.h" -#include -#include +#include "stddef.h" +#include "tscompression.h" #include "hash.h" #include "qExtbuffer.h" #include "queryLog.h" #include "taoserror.h" +#define GET_DATA_PAYLOAD(_p) ((_p)->pData + POINTER_BYTES) + int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t numOfPages, int32_t rowSize, int32_t pagesize, int32_t inMemPages, const void* handle) { @@ -15,23 +17,22 @@ int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t nu return TSDB_CODE_COM_OUT_OF_MEMORY; } - pResBuf->pageSize = pagesize; - pResBuf->numOfPages = 0; // all pages are in buffer in the first place - pResBuf->inMemPages = inMemPages; + pResBuf->pageSize = pagesize; + pResBuf->numOfPages = 0; // all pages are in buffer in the first place + pResBuf->inMemPages = inMemPages; + pResBuf->totalBufSize = pResBuf->numOfPages * pagesize; + pResBuf->allocateId = -1; + pResBuf->comp = true; + assert(inMemPages <= numOfPages); pResBuf->numOfRowsPerPage = (pagesize - sizeof(tFilePage)) / rowSize; - - pResBuf->totalBufSize = pResBuf->numOfPages * pagesize; - pResBuf->allocateId = -1; - pResBuf->lruList = tdListNew(POINTER_BYTES); // init id hash table pResBuf->groupSet = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false); pResBuf->all = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false); pResBuf->assistBuf = malloc(pResBuf->pageSize + 2); // EXTRA BYTES - pResBuf->comp = true; char path[PATH_MAX] = {0}; getTmpfilePath("qbuf", path); @@ -49,7 +50,7 @@ int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t nu #define NUM_OF_PAGES_ON_DISK(_r) ((_r)->numOfPages - (_r)->inMemPages) #define FILE_SIZE_ON_DISK(_r) (NUM_OF_PAGES_ON_DISK(_r) * (_r)->pageSize) -static int32_t createDiskResidesBuf(SDiskbasedResultBuf* pResultBuf) { +static int32_t createDiskFile(SDiskbasedResultBuf* pResultBuf) { pResultBuf->file = fopen(pResultBuf->path, "wb+"); if (pResultBuf->file == NULL) { qError("failed to create tmp file: %s on disk. %s", pResultBuf->path, strerror(errno)); @@ -59,15 +60,27 @@ static int32_t createDiskResidesBuf(SDiskbasedResultBuf* pResultBuf) { return TSDB_CODE_SUCCESS; } -static char* doCompressData(void* data, int32_t srcSize, int32_t *dst, bool comp, void* assistBuf) { // do nothing - if (!comp) { +static char* doCompressData(void* data, int32_t srcSize, int32_t *dst, SDiskbasedResultBuf* pResultBuf) { // do nothing + if (!pResultBuf->comp) { *dst = srcSize; return data; } - *dst = tsCompressString(data, srcSize, 1, assistBuf, srcSize, ONE_STAGE_COMP, NULL, 0); + *dst = tsCompressString(data, srcSize, 1, pResultBuf->assistBuf, srcSize, ONE_STAGE_COMP, NULL, 0); - memcpy(data, assistBuf, *dst); + memcpy(data, pResultBuf->assistBuf, *dst); + return data; +} + +static char* doDecompressData(void* data, int32_t srcSize, int32_t *dst, SDiskbasedResultBuf* pResultBuf) { // do nothing + if (!pResultBuf->comp) { + *dst = srcSize; + return data; + } + + *dst = tsDecompressString(data, srcSize, 1, pResultBuf->assistBuf, pResultBuf->pageSize, ONE_STAGE_COMP, NULL, 0); + + memcpy(data, pResultBuf->assistBuf, *dst); return data; } @@ -96,11 +109,10 @@ static int32_t allocatePositionInFile(SDiskbasedResultBuf* pResultBuf, size_t si } static char* doFlushPageToDisk(SDiskbasedResultBuf* pResultBuf, SPageInfo* pg) { - assert(T_REF_VAL_GET(pg) == 0 && pg->pData != NULL); + assert(!pg->used && pg->pData != NULL); int32_t size = -1; - char* t = doCompressData(pg->pData + POINTER_BYTES, pResultBuf->pageSize, &size, pResultBuf->comp, pResultBuf->assistBuf); - pg->info.length = size; + char* t = doCompressData(GET_DATA_PAYLOAD(pg), pResultBuf->pageSize, &size, pResultBuf); // this page is flushed to disk for the first time if (pg->info.offset == -1) { @@ -108,26 +120,30 @@ static char* doFlushPageToDisk(SDiskbasedResultBuf* pResultBuf, SPageInfo* pg) { pResultBuf->nextPos += size; fseek(pResultBuf->file, pg->info.offset, SEEK_SET); - int32_t ret = fwrite(t, 1, size, pResultBuf->file); - - UNUSED(ret); + /*int32_t ret =*/ fwrite(t, 1, size, pResultBuf->file); } else { - if (pg->info.length < size) { // length becomes greater, current space is not enough, allocate new place. - //1. add current space to free list + // length becomes greater, current space is not enough, allocate new place, otherwise, do nothing + if (pg->info.length < size) { + // 1. add current space to free list taosArrayPush(pResultBuf->pFree, &pg->info); - //2. allocate new position, and update the info + // 2. allocate new position, and update the info pg->info.offset = allocatePositionInFile(pResultBuf, size); pResultBuf->nextPos += size; - - //3. write to disk. - fseek(pResultBuf->file, pg->info.offset, SEEK_SET); - fwrite(t, size, 1, pResultBuf->file); } + + //3. write to disk. + fseek(pResultBuf->file, pg->info.offset, SEEK_SET); + fwrite(t, size, 1, pResultBuf->file); } char* ret = pg->pData; + memset(ret, 0, pResultBuf->pageSize); + pg->pData = NULL; + pg->info.length = size; + + pResultBuf->statis.flushBytes += pg->info.length; return ret; } @@ -137,7 +153,7 @@ static char* flushPageToDisk(SDiskbasedResultBuf* pResultBuf, SPageInfo* pg) { assert(pResultBuf->numOfPages * pResultBuf->pageSize == pResultBuf->totalBufSize && pResultBuf->numOfPages >= pResultBuf->inMemPages); if (pResultBuf->file == NULL) { - if ((ret = createDiskResidesBuf(pResultBuf)) != TSDB_CODE_SUCCESS) { + if ((ret = createDiskFile(pResultBuf)) != TSDB_CODE_SUCCESS) { terrno = ret; return NULL; } @@ -146,12 +162,29 @@ static char* flushPageToDisk(SDiskbasedResultBuf* pResultBuf, SPageInfo* pg) { return doFlushPageToDisk(pResultBuf, pg); } +// load file block data in disk +static char* loadPageFromDisk(SDiskbasedResultBuf* pResultBuf, SPageInfo* pg) { + int32_t ret = fseek(pResultBuf->file, pg->info.offset, SEEK_SET); + ret = fread(GET_DATA_PAYLOAD(pg), 1, pg->info.length, pResultBuf->file); + if (ret != pg->info.length) { + terrno = errno; + return NULL; + } + + pResultBuf->statis.loadBytes += pg->info.length; + + int32_t fullSize = 0; + doDecompressData(GET_DATA_PAYLOAD(pg), pg->info.length, &fullSize, pResultBuf); + + return GET_DATA_PAYLOAD(pg); +} + #define NO_AVAILABLE_PAGES(_b) ((_b)->numOfPages >= (_b)->inMemPages) static SIDList addNewGroup(SDiskbasedResultBuf* pResultBuf, int32_t groupId) { assert(taosHashGet(pResultBuf->groupSet, (const char*) &groupId, sizeof(int32_t)) == NULL); - SArray* pa = taosArrayInit(1, sizeof(SPageInfo)); + SArray* pa = taosArrayInit(1, POINTER_BYTES); int32_t ret = taosHashPut(pResultBuf->groupSet, (const char*)&groupId, sizeof(int32_t), &pa, POINTER_BYTES); assert(ret == 0); @@ -170,44 +203,79 @@ static SPageInfo* registerPage(SDiskbasedResultBuf* pResultBuf, int32_t groupId, pResultBuf->numOfPages += 1; - SPageInfo ppi = { .info = PAGE_INFO_INITIALIZER, .pageId = pageId, }; - return taosArrayPush(list, &ppi); + SPageInfo* ppi = malloc(sizeof(SPageInfo));//{ .info = PAGE_INFO_INITIALIZER, .pageId = pageId, .pn = NULL}; + ppi->info = PAGE_INFO_INITIALIZER; + ppi->pageId = pageId; + ppi->pData = NULL; + ppi->pn = NULL; + ppi->used = true; + + return *(SPageInfo**) taosArrayPush(list, &ppi); } -tFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32_t* pageId) { - char* allocPg = NULL; +static SListNode* getEldestUnrefedPage(SDiskbasedResultBuf* pResultBuf) { + SListIter iter = {0}; + tdListInitIter(pResultBuf->lruList, &iter, TD_LIST_BACKWARD); - if (NO_AVAILABLE_PAGES(pResultBuf)) { + SListNode* pn = NULL; + while((pn = tdListNext(&iter)) != NULL) { + assert(pn != NULL); - // get the last page in linked list - SListIter iter = {0}; - tdListInitIter(pResultBuf->lruList, &iter, TD_LIST_BACKWARD); + SPageInfo* pageInfo = *(SPageInfo**) pn->data; + assert(pageInfo->pageId >= 0 && pageInfo->pn == pn); - SListNode* pn = NULL; - while((pn = tdListNext(&iter)) != NULL) { - assert(pn != NULL); - if (T_REF_VAL_GET(*(SPageInfo**)pn->data) == 0) { - break; - } + if (!pageInfo->used) { + break; } + } - // all pages are referenced by user, try to allocate new space - if (pn == NULL) { - int32_t prev = pResultBuf->inMemPages; - pResultBuf->inMemPages = pResultBuf->inMemPages * 1.5; + return pn; +} + +static char* evicOneDataPage(SDiskbasedResultBuf* pResultBuf) { + char* bufPage = NULL; + SListNode* pn = getEldestUnrefedPage(pResultBuf); - qWarn("%p in memory buf page not sufficient, expand from %d to %d, page size:%d", pResultBuf, prev, + // all pages are referenced by user, try to allocate new space + if (pn == NULL) { + int32_t prev = pResultBuf->inMemPages; + pResultBuf->inMemPages = pResultBuf->inMemPages * 1.5; + + qWarn("%p in memory buf page not sufficient, expand from %d to %d, page size:%d", pResultBuf, prev, pResultBuf->inMemPages, pResultBuf->pageSize); - } else { - tdListPopNode(pResultBuf->lruList, pn); - SPageInfo* d = *(SPageInfo**) pn->data; - tfree(pn); + } else { + pResultBuf->statis.flushPages += 1; + tdListPopNode(pResultBuf->lruList, pn); - allocPg = flushPageToDisk(pResultBuf, d); - if (allocPg == NULL) { - return NULL; - } - } + SPageInfo* d = *(SPageInfo**) pn->data; + assert(d->pn == pn); + + d->pn = NULL; + tfree(pn); + + bufPage = flushPageToDisk(pResultBuf, d); + } + + return bufPage; +} + +static void lruListPushFront(SList *pList, SPageInfo* pi) { + tdListPrepend(pList, &pi); + SListNode* front = tdListGetHead(pList); + pi->pn = front; +} + +static void lruListMoveToFront(SList *pList, SPageInfo* pi) { + tdListPopNode(pList, pi->pn); + tdListPrependNode(pList, pi->pn); +} + +tFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32_t* pageId) { + pResultBuf->statis.getPages += 1; + + char* availablePage = NULL; + if (NO_AVAILABLE_PAGES(pResultBuf)) { + availablePage = evicOneDataPage(pResultBuf); } // register new id in this group @@ -216,111 +284,72 @@ tFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32 // register page id info SPageInfo* pi = registerPage(pResultBuf, groupId, *pageId); + assert(pResultBuf->inMemPages > 0); + // add to LRU list assert(listNEles(pResultBuf->lruList) < pResultBuf->inMemPages); - tdListPrepend(pResultBuf->lruList, &pi); + lruListPushFront(pResultBuf->lruList, pi); // add to hash map taosHashPut(pResultBuf->all, pageId, sizeof(int32_t), &pi, POINTER_BYTES); // allocate buf - if (allocPg == NULL) { + if (availablePage == NULL) { pi->pData = calloc(1, pResultBuf->pageSize + POINTER_BYTES); } else { - pi->pData = allocPg; + pi->pData = availablePage; } pResultBuf->totalBufSize += pResultBuf->pageSize; - T_REF_INC(pi); // add ref count ((void**)pi->pData)[0] = pi; + pi->used = true; - return pi->pData + POINTER_BYTES; + return GET_DATA_PAYLOAD(pi); } tFilePage* getResBufPage(SDiskbasedResultBuf* pResultBuf, int32_t id) { assert(pResultBuf != NULL && id >= 0); + pResultBuf->statis.getPages += 1; SPageInfo** pi = taosHashGet(pResultBuf->all, &id, sizeof(int32_t)); assert(pi != NULL && *pi != NULL); if ((*pi)->pData != NULL) { // it is in memory - // no need to update the LRU list + // no need to update the LRU list if only one page exists if (pResultBuf->numOfPages == 1) { - return (*pi)->pData + POINTER_BYTES; + (*pi)->used = true; + return GET_DATA_PAYLOAD(*pi); } - SListNode* pnode = NULL; // todo speed up - - SListIter iter = {0}; - tdListInitIter(pResultBuf->lruList, &iter, TD_LIST_FORWARD); + SPageInfo** pInfo = (SPageInfo**) ((*pi)->pn->data); + assert(*pInfo == *pi); - while((pnode = tdListNext(&iter)) != NULL) { - SPageInfo** pInfo = (SPageInfo**) pnode->data; + lruListMoveToFront(pResultBuf->lruList, (*pi)); + (*pi)->used = true; - // remove it and add it into the front of linked-list - if ((*pInfo)->pageId == id) { - tdListPopNode(pResultBuf->lruList, pnode); - tdListPrependNode(pResultBuf->lruList, pnode); - T_REF_INC(*(SPageInfo**)pnode->data); + return GET_DATA_PAYLOAD(*pi); - return ((*(SPageInfo**)pnode->data)->pData + POINTER_BYTES); - } - } } else { // not in memory - assert((*pi)->pData == NULL && (*pi)->info.length >= 0 && (*pi)->info.offset >= 0); - - // choose the be flushed page: get the last page in linked list - SListIter iter1 = {0}; - tdListInitIter(pResultBuf->lruList, &iter1, TD_LIST_BACKWARD); + assert((*pi)->pData == NULL && (*pi)->pn == NULL && (*pi)->info.length >= 0 && (*pi)->info.offset >= 0); - SListNode* pn = NULL; - while((pn = tdListNext(&iter1)) != NULL) { - assert(pn != NULL); - if (T_REF_VAL_GET(*(SPageInfo**)(pn->data)) == 0) { - break; - } + char* availablePage = NULL; + if (NO_AVAILABLE_PAGES(pResultBuf)) { + availablePage = evicOneDataPage(pResultBuf); } - // all pages are referenced by user, try to allocate new space - if (pn == NULL) { - int32_t prev = pResultBuf->inMemPages; - pResultBuf->inMemPages = pResultBuf->inMemPages * 1.5; - - qWarn("%p in memory buf page not sufficient, expand from %d to %d, page size:%d", pResultBuf, prev, - pResultBuf->inMemPages, pResultBuf->pageSize); - + if (availablePage == NULL) { (*pi)->pData = calloc(1, pResultBuf->pageSize + POINTER_BYTES); } else { - tdListPopNode(pResultBuf->lruList, pn); - - if (flushPageToDisk(pResultBuf, *(SPageInfo**)pn->data) != TSDB_CODE_SUCCESS) { - return NULL; - } - - char* buf = (*(SPageInfo**)pn->data)->pData; - (*(SPageInfo**)pn->data)->pData = NULL; - - (*pi)->pData = buf; - - ((void**)((*pi)->pData))[0] = (*pi); - tfree(pn); - } - - // load file in disk - int32_t ret = fseek(pResultBuf->file, (*pi)->info.offset, SEEK_SET); - ret = fread((*pi)->pData + POINTER_BYTES, 1, (*pi)->info.length, pResultBuf->file); - if (ret != (*pi)->info.length) { - terrno = errno; - return NULL; + (*pi)->pData = availablePage; } - // todo do decomp + ((void**)((*pi)->pData))[0] = (*pi); - return (*pi)->pData + POINTER_BYTES; + lruListPushFront(pResultBuf->lruList, *pi); + loadPageFromDisk(pResultBuf, *pi); + return GET_DATA_PAYLOAD(*pi); } - - return NULL; } void releaseResBufPage(SDiskbasedResultBuf* pResultBuf, void* page) { @@ -328,9 +357,14 @@ void releaseResBufPage(SDiskbasedResultBuf* pResultBuf, void* page) { char* p = (char*) page - POINTER_BYTES; SPageInfo* ppi = ((SPageInfo**) p)[0]; + releaseResBufPageInfo(pResultBuf, ppi); +} - assert(T_REF_VAL_GET(ppi) > 0); - T_REF_DEC(ppi); +void releaseResBufPageInfo(SDiskbasedResultBuf* pResultBuf, SPageInfo* pi) { + assert(pi->pData != NULL && pi->used); + + pi->used = false; + pResultBuf->statis.releasePages += 1; } size_t getNumOfRowsPerPage(const SDiskbasedResultBuf* pResultBuf) { return pResultBuf->numOfRowsPerPage; } @@ -373,9 +407,11 @@ void destroyResultBuf(SDiskbasedResultBuf* pResultBuf, void* handle) { SArray** p = (SArray**) taosHashIterGet(iter); size_t n = taosArrayGetSize(*p); for(int32_t i = 0; i < n; ++i) { - SPageInfo* pi = taosArrayGet(*p, i); + SPageInfo* pi = taosArrayGetP(*p, i); tfree(pi->pData); + tfree(pi); } + taosArrayDestroy(*p); } @@ -390,8 +426,8 @@ void destroyResultBuf(SDiskbasedResultBuf* pResultBuf, void* handle) { tfree(pResultBuf); } -int32_t getLastPageId(SIDList pList) { +SPageInfo* getLastPageInfo(SIDList pList) { size_t size = taosArrayGetSize(pList); - return *(int32_t*) taosArrayGet(pList, size - 1); + return (SPageInfo*) taosArrayGetP(pList, size - 1); } diff --git a/src/query/src/qUtil.c b/src/query/src/qUtil.c index be8447149331ecf3ede4c0873f6ced9c6c385022..8ddc2d3857ab6dccba482a8b6aa99dcd4d831932 100644 --- a/src/query/src/qUtil.c +++ b/src/query/src/qUtil.c @@ -233,11 +233,13 @@ void clearTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pWindow if (pWindowRes == NULL) { return; } - + + tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes->pos.pageId); + for (int32_t i = 0; i < pRuntimeEnv->pQuery->numOfOutput; ++i) { SResultInfo *pResultInfo = &pWindowRes->resultInfo[i]; - char * s = getPosInResultPage(pRuntimeEnv, i, pWindowRes); + char * s = getPosInResultPage(pRuntimeEnv, i, pWindowRes, page); size_t size = pRuntimeEnv->pQuery->pSelectExpr[i].bytes; memset(s, 0, size); @@ -274,8 +276,11 @@ void copyTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *dst, con memcpy(pDst->interResultBuf, pSrc->interResultBuf, pDst->bufLen); // copy the output buffer data from src to dst, the position info keep unchanged - char * dstBuf = getPosInResultPage(pRuntimeEnv, i, dst); - char * srcBuf = getPosInResultPage(pRuntimeEnv, i, (SWindowResult *)src); + tFilePage *dstpage = getResBufPage(pRuntimeEnv->pResultBuf, dst->pos.pageId); + char * dstBuf = getPosInResultPage(pRuntimeEnv, i, dst, dstpage); + + tFilePage *srcpage = getResBufPage(pRuntimeEnv->pResultBuf, src->pos.pageId); + char * srcBuf = getPosInResultPage(pRuntimeEnv, i, (SWindowResult *)src, srcpage); size_t s = pRuntimeEnv->pQuery->pSelectExpr[i].bytes; memcpy(dstBuf, srcBuf, s); diff --git a/src/query/tests/resultBufferTest.cpp b/src/query/tests/resultBufferTest.cpp index 53a05925c2c778ed7161a923b4fd26e3cabb815a..3171a7b322f4d98fb1a5e8ad009e3baf5adca2b4 100644 --- a/src/query/tests/resultBufferTest.cpp +++ b/src/query/tests/resultBufferTest.cpp @@ -94,7 +94,59 @@ void writeDownTest() { SArray* pa = getDataBufPagesIdList(pResultBuf, groupId); ASSERT_EQ(taosArrayGetSize(pa), 5); + destroyResultBuf(pResultBuf, NULL); +} + +void recyclePageTest() { + SDiskbasedResultBuf* pResultBuf = NULL; + int32_t ret = createDiskbasedResultBuffer(&pResultBuf, 1000, 64, 1024, 4, NULL); + + int32_t pageId = 0; + int32_t writePageId = 0; + int32_t groupId = 0; + int32_t nx = 12345; + + tFilePage* pBufPage = getNewDataBuf(pResultBuf, groupId, &pageId); + ASSERT_TRUE(pBufPage != NULL); + releaseResBufPage(pResultBuf, pBufPage); + tFilePage* pBufPage1 = getNewDataBuf(pResultBuf, groupId, &pageId); + tFilePage* t1 = getResBufPage(pResultBuf, pageId); + ASSERT_TRUE(t1 == pBufPage1); + ASSERT_TRUE(pageId == 1); + + tFilePage* pBufPage2 = getNewDataBuf(pResultBuf, groupId, &pageId); + tFilePage* t2 = getResBufPage(pResultBuf, pageId); + ASSERT_TRUE(t2 == pBufPage2); + ASSERT_TRUE(pageId == 2); + + tFilePage* pBufPage3 = getNewDataBuf(pResultBuf, groupId, &pageId); + tFilePage* t3 = getResBufPage(pResultBuf, pageId); + ASSERT_TRUE(t3 == pBufPage3); + ASSERT_TRUE(pageId == 3); + + tFilePage* pBufPage4 = getNewDataBuf(pResultBuf, groupId, &pageId); + tFilePage* t4 = getResBufPage(pResultBuf, pageId); + ASSERT_TRUE(t4 == pBufPage4); + ASSERT_TRUE(pageId == 4); + releaseResBufPage(pResultBuf, t4); + releaseResBufPage(pResultBuf, t4); + + tFilePage* pBufPage5 = getNewDataBuf(pResultBuf, groupId, &pageId); + tFilePage* t5 = getResBufPage(pResultBuf, pageId); + ASSERT_TRUE(t5 == pBufPage5); + ASSERT_TRUE(pageId == 5); + + // flush the written page to disk, and read it out again + tFilePage* pBufPagex = getResBufPage(pResultBuf, writePageId); + *(int32_t*)(pBufPagex->data) = nx; + writePageId = pageId; // update the data + releaseResBufPage(pResultBuf, pBufPagex); + + tFilePage* pBufPagex1 = getResBufPage(pResultBuf, 1); + + SArray* pa = getDataBufPagesIdList(pResultBuf, groupId); + ASSERT_EQ(taosArrayGetSize(pa), 6); destroyResultBuf(pResultBuf, NULL); } @@ -105,4 +157,5 @@ TEST(testCase, resultBufferTest) { srand(time(NULL)); simpleTest(); writeDownTest(); + recyclePageTest(); } diff --git a/src/util/inc/tlist.h b/src/util/inc/tlist.h index a4ed9311e24bf5098f9cca8c05edd0998b8d6253..e8380294da47de1c9eb65059263ac820f3d816be 100644 --- a/src/util/inc/tlist.h +++ b/src/util/inc/tlist.h @@ -55,6 +55,8 @@ int tdListPrepend(SList *list, void *data); int tdListAppend(SList *list, void *data); SListNode *tdListPopHead(SList *list); SListNode *tdListPopTail(SList *list); +SListNode *tdListGetHead(SList *list); +SListNode *tsListGetTail(SList *list); SListNode *tdListPopNode(SList *list, SListNode *node); void tdListMove(SList *src, SList *dst); void tdListDiscard(SList *list); diff --git a/src/util/src/tlist.c b/src/util/src/tlist.c index 93293b8b915c6b95dfa2cb112a5dc49ec9268dd4..8c2ad83de117aaf528565a711a2aa3732984e0a9 100644 --- a/src/util/src/tlist.c +++ b/src/util/src/tlist.c @@ -122,6 +122,22 @@ SListNode *tdListPopTail(SList *list) { return node; } +SListNode *tdListGetHead(SList *list) { + if (list == NULL || list->numOfEles == 0) { + return NULL; + } + + return list->head; +} + +SListNode *tsListGetTail(SList *list) { + if (list == NULL || list->numOfEles == 0) { + return NULL; + } + + return list->tail; +} + SListNode *tdListPopNode(SList *list, SListNode *node) { if (list->head == node) { list->head = node->next; diff --git a/tests/script/general/parser/testSuite.sim b/tests/script/general/parser/testSuite.sim index c6981d29022dca56198d6655379472c905984285..1e73893793a9f011f6753e5745226e664ca253f2 100644 --- a/tests/script/general/parser/testSuite.sim +++ b/tests/script/general/parser/testSuite.sim @@ -93,6 +93,8 @@ run general/parser/groupby.sim sleep 2000 run general/parser/tags_filter.sim sleep 2000 +run general/parser/topbot.sim +sleep 2000 run general/parser/union.sim sleep 2000 run general/parser/sliding.sim diff --git a/tests/script/general/parser/topbot.sim b/tests/script/general/parser/topbot.sim new file mode 100644 index 0000000000000000000000000000000000000000..a0c46dbc65bac39b33cd54f0c61ac3ba96bf5736 --- /dev/null +++ b/tests/script/general/parser/topbot.sim @@ -0,0 +1,74 @@ +system sh/stop_dnodes.sh + +system sh/deploy.sh -n dnode1 -i 1 +system sh/cfg.sh -n dnode1 -c walLevel -v 0 +system sh/exec.sh -n dnode1 -s start +sleep 3000 +sql connect + +$dbPrefix = tb_db +$tbPrefix = tb_tb +$stbPrefix = tb_stb +$tbNum = 10 +$rowNum = 1000 +$totalNum = $tbNum * $rowNum +$loops = 200000 +$log = 10000 +$ts0 = 1537146000000 +$delta = 600000 +print ========== topbot.sim +$i = 0 +$db = $dbPrefix . $i +$stb = $stbPrefix . $i + +sql drop database $db -x step1 +step1: +sql create database $db cache 16 maxtables 200 +print ====== create tables +sql use $db +sql create table $stb (ts timestamp, c1 int, c2 bigint, c3 float, c4 double, c5 smallint, c6 tinyint, c7 bool, c8 binary(10), c9 nchar(10)) tags(t1 int) + +$i = 0 +$ts = $ts0 +$halfNum = $tbNum / 2 +while $i < $halfNum + $tbId = $i + $halfNum + $tb = $tbPrefix . $i + $tb1 = $tbPrefix . $tbId + sql create table $tb using $stb tags( $i ) + sql create table $tb1 using $stb tags( $tbId ) + + $x = 0 + while $x < $rowNum + $xs = $x * $delta + $ts = $ts0 + $xs + $c = $x / 10 + $c = $c * 10 + $c = $x - $c + $binary = 'binary . $c + $binary = $binary . ' + $nchar = 'nchar . $c + $nchar = $nchar . ' + sql insert into $tb values ( $ts , $c , $c , $c , $c , $c , $c , true, $binary , $nchar ) + sql insert into $tb1 values ( $ts , $c , NULL , $c , NULL , $c , $c , true, $binary , $nchar ) + $x = $x + 1 + endw + + $i = $i + 1 +endw +print ====== tables created + +sql use $db +##### select from table +print ====== select top/bot from table and check num of rows returned +sql select top(c1, 100) from tb_stb0 +if $row != 100 then + return -1 +endi + +sql select last(c2) from tb_tb9 +if $row != 1 then + return -1 +endi + +system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file