diff --git a/src/client/src/tscLocalMerge.c b/src/client/src/tscLocalMerge.c index 80fc82d90b1856753c4c88268c650147fed9e124..bf76b8cbe8a9fe31ef139db1a8b6f266a4b13c3b 100644 --- a/src/client/src/tscLocalMerge.c +++ b/src/client/src/tscLocalMerge.c @@ -691,9 +691,15 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr pModel = createColumnModel(pSchema, size, capacity); + int32_t pg = DEFAULT_PAGE_SIZE; + int32_t overhead = sizeof(tFilePage); + while((pg - overhead) < pModel->rowSize * 2) { + pg *= 2; + } + size_t numOfSubs = pTableMetaInfo->vgroupList->numOfVgroups; for (int32_t i = 0; i < numOfSubs; ++i) { - (*pMemBuffer)[i] = createExtMemBuffer(nBufferSizes, rlen, pModel); + (*pMemBuffer)[i] = createExtMemBuffer(nBufferSizes, rlen, pg, pModel); (*pMemBuffer)[i]->flushModel = MULTIPLE_APPEND_MODEL; } diff --git a/src/query/inc/qExtbuffer.h b/src/query/inc/qExtbuffer.h index 2cbef2b1bea66654b262a3fb37061477969960f2..9823e7d6ce1c8ca947edd0040088ae121e8421ae 100644 --- a/src/query/inc/qExtbuffer.h +++ b/src/query/inc/qExtbuffer.h @@ -28,9 +28,9 @@ extern "C" { #include "tdataformat.h" #include "talgo.h" -#define DEFAULT_PAGE_SIZE (1024L*64) // 16k larger than the SHistoInfo -#define MAX_TMPFILE_PATH_LENGTH PATH_MAX +#define MAX_TMPFILE_PATH_LENGTH PATH_MAX #define INITIAL_ALLOCATION_BUFFER_SIZE 64 +#define DEFAULT_PAGE_SIZE (4096L) // 16k larger than the SHistoInfo typedef enum EXT_BUFFER_FLUSH_MODEL { /* @@ -126,7 +126,7 @@ typedef struct tExtMemBuffer { * @param pModel * @return */ -tExtMemBuffer *createExtMemBuffer(int32_t inMemSize, int32_t elemSize, SColumnModel *pModel); +tExtMemBuffer *createExtMemBuffer(int32_t inMemSize, int32_t elemSize, int32_t pagesize, SColumnModel *pModel); /** * diff --git a/src/query/inc/qResultbuf.h b/src/query/inc/qResultbuf.h index ba446b46277f6892421ec290cb377cb0c5b76025..ac907cfee677bab2d9ebcca730dfd795c22c825a 100644 --- a/src/query/inc/qResultbuf.h +++ b/src/query/inc/qResultbuf.h @@ -52,7 +52,6 @@ typedef struct SResultBufStatis { int32_t getPages; int32_t releasePages; int32_t flushPages; - int32_t fileSize; } SResultBufStatis; typedef struct SDiskbasedResultBuf { @@ -68,29 +67,31 @@ typedef struct SDiskbasedResultBuf { SHashObj* groupSet; // id hash table SHashObj* all; SList* lruList; - void* handle; // for debug purpose void* emptyDummyIdList; // dummy id list - bool comp; // compressed before flushed to disk - void* assistBuf; // assistant buffer for compress data + void* assistBuf; // assistant buffer for compress/decompress data SArray* pFree; // free area in file + bool comp; // compressed before flushed to disk int32_t nextPos; // next page flush position + const void* handle; // for debug purpose SResultBufStatis statis; } SDiskbasedResultBuf; -#define DEFAULT_INTERN_BUF_PAGE_SIZE (1024L) +#define DEFAULT_INTERN_BUF_PAGE_SIZE (4096L) #define DEFAULT_INMEM_BUF_PAGES 10 #define PAGE_INFO_INITIALIZER (SPageDiskInfo){-1, -1} /** * create disk-based result buffer * @param pResultBuf - * @param size * @param rowSize + * @param pagesize + * @param inMemPages + * @param handle * @return */ -int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t numOfPages, int32_t rowSize, int32_t pagesize, - int32_t inMemPages, const void* handle); +int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t rowSize, int32_t pagesize, + int32_t inMemBufSize, const void* handle); /** * @@ -131,15 +132,13 @@ tFilePage* getResBufPage(SDiskbasedResultBuf* pResultBuf, int32_t id); */ void releaseResBufPage(SDiskbasedResultBuf* pResultBuf, void* page); -void releaseResBufPageInfo(SDiskbasedResultBuf* pResultBuf, SPageInfo* pi); - /** * * @param pResultBuf - * @param id - * @return + * @param pi */ -//tFilePage* getResBufPage(SDiskbasedResultBuf* pResultBuf, int32_t id); +void releaseResBufPageInfo(SDiskbasedResultBuf* pResultBuf, SPageInfo* pi); + /** * get the total buffer size in the format of disk file @@ -159,7 +158,7 @@ size_t getNumOfResultBufGroupId(const SDiskbasedResultBuf* pResultBuf); * destroy result buffer * @param pResultBuf */ -void destroyResultBuf(SDiskbasedResultBuf* pResultBuf, void* handle); +void destroyResultBuf(SDiskbasedResultBuf* pResultBuf); /** * diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 9d873dc95f65c6a12cb3afe928224c7a66ad6eda..ee24365d87d403e6289591ffb9da4fe7325bef38 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -1621,7 +1621,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { pRuntimeEnv->pFillInfo = taosDestoryFillInfo(pRuntimeEnv->pFillInfo); - destroyResultBuf(pRuntimeEnv->pResultBuf, pQInfo); + destroyResultBuf(pRuntimeEnv->pResultBuf); tsdbCleanupQueryHandle(pRuntimeEnv->pQueryHandle); tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle); @@ -4242,10 +4242,10 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo int32_t ps = DEFAULT_PAGE_SIZE; int32_t rowsize = 0; getIntermediateBufInfo(pRuntimeEnv, &ps, &rowsize); + int32_t TWOMB = 1024*1024*2; if (isSTableQuery && !onlyQueryTags(pRuntimeEnv->pQuery)) { -// int32_t numOfPages = getInitialPageNum(pQInfo); - code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, 2, rowsize, ps, 2, pQInfo); + code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, rowsize, ps, TWOMB, pQInfo); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -4273,8 +4273,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo } else if (pRuntimeEnv->groupbyNormalCol || QUERY_IS_INTERVAL_QUERY(pQuery)) { int32_t numOfResultRows = getInitialPageNum(pQInfo); getIntermediateBufInfo(pRuntimeEnv, &ps, &rowsize); - - code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, numOfResultRows, rowsize, ps, numOfResultRows, pQInfo); + code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, rowsize, ps, TWOMB, pQInfo); if (code != TSDB_CODE_SUCCESS) { return code; } diff --git a/src/query/src/qExtbuffer.c b/src/query/src/qExtbuffer.c index 69c5f0e24fe6361d41953c35fce1380b97d4e752..fb57f71199d8688fa8503790dab5551e7105ca3a 100644 --- a/src/query/src/qExtbuffer.c +++ b/src/query/src/qExtbuffer.c @@ -28,10 +28,10 @@ /* * SColumnModel is deeply copy */ -tExtMemBuffer* createExtMemBuffer(int32_t inMemSize, int32_t elemSize, SColumnModel *pModel) { +tExtMemBuffer* createExtMemBuffer(int32_t inMemSize, int32_t elemSize, int32_t pagesize, SColumnModel *pModel) { tExtMemBuffer* pMemBuffer = (tExtMemBuffer *)calloc(1, sizeof(tExtMemBuffer)); - pMemBuffer->pageSize = DEFAULT_PAGE_SIZE; + pMemBuffer->pageSize = pagesize; pMemBuffer->inMemCapacity = ALIGN8(inMemSize) / pMemBuffer->pageSize; pMemBuffer->nElemSize = elemSize; diff --git a/src/query/src/qPercentile.c b/src/query/src/qPercentile.c index c4490a01e79408ec2bf09527049082f9c05566c8..85e45e46b3eee0beef2babc893234aa0a5a0818f 100644 --- a/src/query/src/qPercentile.c +++ b/src/query/src/qPercentile.c @@ -535,7 +535,7 @@ void tMemBucketPut(tMemBucket *pBucket, void *data, int32_t numOfRows) { if (pSeg->pBuffer[slotIdx] == NULL) { pSeg->pBuffer[slotIdx] = createExtMemBuffer(pBucket->numOfTotalPages * pBucket->pageSize, pBucket->nElemSize, - pBucket->pOrderDesc->pColumnModel); + pBucket->pageSize, pBucket->pOrderDesc->pColumnModel); pSeg->pBuffer[slotIdx]->flushModel = SINGLE_APPEND_MODEL; pBucket->pOrderDesc->pColumnModel->capacity = pSeg->pBuffer[slotIdx]->numOfElemsPerPage; } diff --git a/src/query/src/qResultbuf.c b/src/query/src/qResultbuf.c index fbb5f116e6253c3d96aa4392fc2985ec1427cb21..33ae93f434db8ae28a9a001bfd351b7a954c79b4 100644 --- a/src/query/src/qResultbuf.c +++ b/src/query/src/qResultbuf.c @@ -8,31 +8,33 @@ #define GET_DATA_PAYLOAD(_p) ((_p)->pData + POINTER_BYTES) -int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t numOfPages, int32_t rowSize, - int32_t pagesize, int32_t inMemPages, const void* handle) { - +int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t rowSize, int32_t pagesize, + int32_t inMemBufSize, const void* handle) { *pResultBuf = calloc(1, sizeof(SDiskbasedResultBuf)); + SDiskbasedResultBuf* pResBuf = *pResultBuf; if (pResBuf == NULL) { return TSDB_CODE_COM_OUT_OF_MEMORY; } pResBuf->pageSize = pagesize; - pResBuf->numOfPages = 0; // all pages are in buffer in the first place - pResBuf->inMemPages = inMemPages; + pResBuf->numOfPages = 0; // all pages are in buffer in the first place + pResBuf->inMemPages = inMemBufSize/pagesize; // maximum allowed pages, it is a soft limit. pResBuf->totalBufSize = pResBuf->numOfPages * pagesize; pResBuf->allocateId = -1; pResBuf->comp = true; + pResBuf->handle = handle; - assert(inMemPages <= numOfPages); + // at least more than 2 pages must be in memory + assert(inMemBufSize >= pagesize * 2); pResBuf->numOfRowsPerPage = (pagesize - sizeof(tFilePage)) / rowSize; pResBuf->lruList = tdListNew(POINTER_BYTES); // init id hash table - pResBuf->groupSet = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false); - pResBuf->all = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false); + pResBuf->groupSet = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false); pResBuf->assistBuf = malloc(pResBuf->pageSize + 2); // EXTRA BYTES + pResBuf->all = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false); char path[PATH_MAX] = {0}; getTmpfilePath("qbuf", path); @@ -47,9 +49,6 @@ int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t nu return TSDB_CODE_SUCCESS; } -#define NUM_OF_PAGES_ON_DISK(_r) ((_r)->numOfPages - (_r)->inMemPages) -#define FILE_SIZE_ON_DISK(_r) (NUM_OF_PAGES_ON_DISK(_r) * (_r)->pageSize) - static int32_t createDiskFile(SDiskbasedResultBuf* pResultBuf) { pResultBuf->file = fopen(pResultBuf->path, "wb+"); if (pResultBuf->file == NULL) { @@ -384,18 +383,18 @@ SIDList getDataBufPagesIdList(SDiskbasedResultBuf* pResultBuf, int32_t groupId) } } -void destroyResultBuf(SDiskbasedResultBuf* pResultBuf, void* handle) { +void destroyResultBuf(SDiskbasedResultBuf* pResultBuf) { if (pResultBuf == NULL) { return; } if (pResultBuf->file != NULL) { - qDebug("QInfo:%p disk-based output buffer closed, total:%" PRId64 " bytes, file created:%s, file size:%d", handle, - pResultBuf->totalBufSize, pResultBuf->path, FILE_SIZE_ON_DISK(pResultBuf)); + qDebug("QInfo:%p disk-based output buffer closed, total:%" PRId64 " bytes, file created:%s, file size:%"PRId64, pResultBuf->handle, + pResultBuf->totalBufSize, pResultBuf->path, pResultBuf->diskFileSize); fclose(pResultBuf->file); } else { - qDebug("QInfo:%p disk-based output buffer closed, total:%" PRId64 " bytes, no file created", handle, + qDebug("QInfo:%p disk-based output buffer closed, total:%" PRId64 " bytes, no file created", pResultBuf->handle, pResultBuf->totalBufSize); } diff --git a/src/query/tests/resultBufferTest.cpp b/src/query/tests/resultBufferTest.cpp index 3171a7b322f4d98fb1a5e8ad009e3baf5adca2b4..3b74bf1b643d127ab10591242182ccbf414b0c3d 100644 --- a/src/query/tests/resultBufferTest.cpp +++ b/src/query/tests/resultBufferTest.cpp @@ -47,7 +47,7 @@ void simpleTest() { tFilePage* t4 = getResBufPage(pResultBuf, pageId); ASSERT_TRUE(t4 == pBufPage5); - destroyResultBuf(pResultBuf, NULL); + destroyResultBuf(pResultBuf); } void writeDownTest() { @@ -94,7 +94,7 @@ void writeDownTest() { SArray* pa = getDataBufPagesIdList(pResultBuf, groupId); ASSERT_EQ(taosArrayGetSize(pa), 5); - destroyResultBuf(pResultBuf, NULL); + destroyResultBuf(pResultBuf); } void recyclePageTest() { @@ -148,7 +148,7 @@ void recyclePageTest() { SArray* pa = getDataBufPagesIdList(pResultBuf, groupId); ASSERT_EQ(taosArrayGetSize(pa), 6); - destroyResultBuf(pResultBuf, NULL); + destroyResultBuf(pResultBuf); } } // namespace diff --git a/src/util/src/tcache.c b/src/util/src/tcache.c index 7c2a6b32191666f096b3131dd2bd6060ada48c34..4058bf1672d7929c0ef2b0b28c1a7a46b654dae6 100644 --- a/src/util/src/tcache.c +++ b/src/util/src/tcache.c @@ -381,7 +381,7 @@ void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) { } void *taosCacheTransfer(SCacheObj *pCacheObj, void **data) { - if (pCacheObj == NULL || data == NULL || *data == NULL) return NULL; + if (pCacheObj == NULL || data == NULL || (*data) == NULL) return NULL; size_t offset = offsetof(SCacheDataNode, data); SCacheDataNode *ptNode = (SCacheDataNode *)((char *)(*data) - offset);