提交 bb6cbf5f 编写于 作者: H Haojun Liao

[td-225]

上级 b04e3da3
......@@ -691,9 +691,15 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
pModel = createColumnModel(pSchema, size, capacity);
int32_t pg = DEFAULT_PAGE_SIZE;
int32_t overhead = sizeof(tFilePage);
while((pg - overhead) < pModel->rowSize * 2) {
pg *= 2;
}
size_t numOfSubs = pTableMetaInfo->vgroupList->numOfVgroups;
for (int32_t i = 0; i < numOfSubs; ++i) {
(*pMemBuffer)[i] = createExtMemBuffer(nBufferSizes, rlen, pModel);
(*pMemBuffer)[i] = createExtMemBuffer(nBufferSizes, rlen, pg, pModel);
(*pMemBuffer)[i]->flushModel = MULTIPLE_APPEND_MODEL;
}
......
......@@ -28,9 +28,9 @@ extern "C" {
#include "tdataformat.h"
#include "talgo.h"
#define DEFAULT_PAGE_SIZE (1024L*64) // 16k larger than the SHistoInfo
#define MAX_TMPFILE_PATH_LENGTH PATH_MAX
#define INITIAL_ALLOCATION_BUFFER_SIZE 64
#define DEFAULT_PAGE_SIZE (4096L) // 16k larger than the SHistoInfo
typedef enum EXT_BUFFER_FLUSH_MODEL {
/*
......@@ -126,7 +126,7 @@ typedef struct tExtMemBuffer {
* @param pModel
* @return
*/
tExtMemBuffer *createExtMemBuffer(int32_t inMemSize, int32_t elemSize, SColumnModel *pModel);
tExtMemBuffer *createExtMemBuffer(int32_t inMemSize, int32_t elemSize, int32_t pagesize, SColumnModel *pModel);
/**
*
......
......@@ -52,7 +52,6 @@ typedef struct SResultBufStatis {
int32_t getPages;
int32_t releasePages;
int32_t flushPages;
int32_t fileSize;
} SResultBufStatis;
typedef struct SDiskbasedResultBuf {
......@@ -68,29 +67,31 @@ typedef struct SDiskbasedResultBuf {
SHashObj* groupSet; // id hash table
SHashObj* all;
SList* lruList;
void* handle; // for debug purpose
void* emptyDummyIdList; // dummy id list
bool comp; // compressed before flushed to disk
void* assistBuf; // assistant buffer for compress data
void* assistBuf; // assistant buffer for compress/decompress data
SArray* pFree; // free area in file
bool comp; // compressed before flushed to disk
int32_t nextPos; // next page flush position
const void* handle; // for debug purpose
SResultBufStatis statis;
} SDiskbasedResultBuf;
#define DEFAULT_INTERN_BUF_PAGE_SIZE (1024L)
#define DEFAULT_INTERN_BUF_PAGE_SIZE (4096L)
#define DEFAULT_INMEM_BUF_PAGES 10
#define PAGE_INFO_INITIALIZER (SPageDiskInfo){-1, -1}
/**
* create disk-based result buffer
* @param pResultBuf
* @param size
* @param rowSize
* @param pagesize
* @param inMemPages
* @param handle
* @return
*/
int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t numOfPages, int32_t rowSize, int32_t pagesize,
int32_t inMemPages, const void* handle);
int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t rowSize, int32_t pagesize,
int32_t inMemBufSize, const void* handle);
/**
*
......@@ -131,15 +132,13 @@ tFilePage* getResBufPage(SDiskbasedResultBuf* pResultBuf, int32_t id);
*/
void releaseResBufPage(SDiskbasedResultBuf* pResultBuf, void* page);
void releaseResBufPageInfo(SDiskbasedResultBuf* pResultBuf, SPageInfo* pi);
/**
*
* @param pResultBuf
* @param id
* @return
* @param pi
*/
//tFilePage* getResBufPage(SDiskbasedResultBuf* pResultBuf, int32_t id);
void releaseResBufPageInfo(SDiskbasedResultBuf* pResultBuf, SPageInfo* pi);
/**
* get the total buffer size in the format of disk file
......@@ -159,7 +158,7 @@ size_t getNumOfResultBufGroupId(const SDiskbasedResultBuf* pResultBuf);
* destroy result buffer
* @param pResultBuf
*/
void destroyResultBuf(SDiskbasedResultBuf* pResultBuf, void* handle);
void destroyResultBuf(SDiskbasedResultBuf* pResultBuf);
/**
*
......
......@@ -1621,7 +1621,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
pRuntimeEnv->pFillInfo = taosDestoryFillInfo(pRuntimeEnv->pFillInfo);
destroyResultBuf(pRuntimeEnv->pResultBuf, pQInfo);
destroyResultBuf(pRuntimeEnv->pResultBuf);
tsdbCleanupQueryHandle(pRuntimeEnv->pQueryHandle);
tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle);
......@@ -4242,10 +4242,10 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
int32_t ps = DEFAULT_PAGE_SIZE;
int32_t rowsize = 0;
getIntermediateBufInfo(pRuntimeEnv, &ps, &rowsize);
int32_t TWOMB = 1024*1024*2;
if (isSTableQuery && !onlyQueryTags(pRuntimeEnv->pQuery)) {
// int32_t numOfPages = getInitialPageNum(pQInfo);
code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, 2, rowsize, ps, 2, pQInfo);
code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, rowsize, ps, TWOMB, pQInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -4273,8 +4273,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
} else if (pRuntimeEnv->groupbyNormalCol || QUERY_IS_INTERVAL_QUERY(pQuery)) {
int32_t numOfResultRows = getInitialPageNum(pQInfo);
getIntermediateBufInfo(pRuntimeEnv, &ps, &rowsize);
code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, numOfResultRows, rowsize, ps, numOfResultRows, pQInfo);
code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, rowsize, ps, TWOMB, pQInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......
......@@ -28,10 +28,10 @@
/*
* SColumnModel is deeply copy
*/
tExtMemBuffer* createExtMemBuffer(int32_t inMemSize, int32_t elemSize, SColumnModel *pModel) {
tExtMemBuffer* createExtMemBuffer(int32_t inMemSize, int32_t elemSize, int32_t pagesize, SColumnModel *pModel) {
tExtMemBuffer* pMemBuffer = (tExtMemBuffer *)calloc(1, sizeof(tExtMemBuffer));
pMemBuffer->pageSize = DEFAULT_PAGE_SIZE;
pMemBuffer->pageSize = pagesize;
pMemBuffer->inMemCapacity = ALIGN8(inMemSize) / pMemBuffer->pageSize;
pMemBuffer->nElemSize = elemSize;
......
......@@ -535,7 +535,7 @@ void tMemBucketPut(tMemBucket *pBucket, void *data, int32_t numOfRows) {
if (pSeg->pBuffer[slotIdx] == NULL) {
pSeg->pBuffer[slotIdx] = createExtMemBuffer(pBucket->numOfTotalPages * pBucket->pageSize, pBucket->nElemSize,
pBucket->pOrderDesc->pColumnModel);
pBucket->pageSize, pBucket->pOrderDesc->pColumnModel);
pSeg->pBuffer[slotIdx]->flushModel = SINGLE_APPEND_MODEL;
pBucket->pOrderDesc->pColumnModel->capacity = pSeg->pBuffer[slotIdx]->numOfElemsPerPage;
}
......
......@@ -8,10 +8,10 @@
#define GET_DATA_PAYLOAD(_p) ((_p)->pData + POINTER_BYTES)
int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t numOfPages, int32_t rowSize,
int32_t pagesize, int32_t inMemPages, const void* handle) {
int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t rowSize, int32_t pagesize,
int32_t inMemBufSize, const void* handle) {
*pResultBuf = calloc(1, sizeof(SDiskbasedResultBuf));
SDiskbasedResultBuf* pResBuf = *pResultBuf;
if (pResBuf == NULL) {
return TSDB_CODE_COM_OUT_OF_MEMORY;
......@@ -19,20 +19,22 @@ int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t nu
pResBuf->pageSize = pagesize;
pResBuf->numOfPages = 0; // all pages are in buffer in the first place
pResBuf->inMemPages = inMemPages;
pResBuf->inMemPages = inMemBufSize/pagesize; // maximum allowed pages, it is a soft limit.
pResBuf->totalBufSize = pResBuf->numOfPages * pagesize;
pResBuf->allocateId = -1;
pResBuf->comp = true;
pResBuf->handle = handle;
assert(inMemPages <= numOfPages);
// at least more than 2 pages must be in memory
assert(inMemBufSize >= pagesize * 2);
pResBuf->numOfRowsPerPage = (pagesize - sizeof(tFilePage)) / rowSize;
pResBuf->lruList = tdListNew(POINTER_BYTES);
// init id hash table
pResBuf->groupSet = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false);
pResBuf->all = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false);
pResBuf->assistBuf = malloc(pResBuf->pageSize + 2); // EXTRA BYTES
pResBuf->all = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false);
char path[PATH_MAX] = {0};
getTmpfilePath("qbuf", path);
......@@ -47,9 +49,6 @@ int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t nu
return TSDB_CODE_SUCCESS;
}
#define NUM_OF_PAGES_ON_DISK(_r) ((_r)->numOfPages - (_r)->inMemPages)
#define FILE_SIZE_ON_DISK(_r) (NUM_OF_PAGES_ON_DISK(_r) * (_r)->pageSize)
static int32_t createDiskFile(SDiskbasedResultBuf* pResultBuf) {
pResultBuf->file = fopen(pResultBuf->path, "wb+");
if (pResultBuf->file == NULL) {
......@@ -384,18 +383,18 @@ SIDList getDataBufPagesIdList(SDiskbasedResultBuf* pResultBuf, int32_t groupId)
}
}
void destroyResultBuf(SDiskbasedResultBuf* pResultBuf, void* handle) {
void destroyResultBuf(SDiskbasedResultBuf* pResultBuf) {
if (pResultBuf == NULL) {
return;
}
if (pResultBuf->file != NULL) {
qDebug("QInfo:%p disk-based output buffer closed, total:%" PRId64 " bytes, file created:%s, file size:%d", handle,
pResultBuf->totalBufSize, pResultBuf->path, FILE_SIZE_ON_DISK(pResultBuf));
qDebug("QInfo:%p disk-based output buffer closed, total:%" PRId64 " bytes, file created:%s, file size:%"PRId64, pResultBuf->handle,
pResultBuf->totalBufSize, pResultBuf->path, pResultBuf->diskFileSize);
fclose(pResultBuf->file);
} else {
qDebug("QInfo:%p disk-based output buffer closed, total:%" PRId64 " bytes, no file created", handle,
qDebug("QInfo:%p disk-based output buffer closed, total:%" PRId64 " bytes, no file created", pResultBuf->handle,
pResultBuf->totalBufSize);
}
......
......@@ -47,7 +47,7 @@ void simpleTest() {
tFilePage* t4 = getResBufPage(pResultBuf, pageId);
ASSERT_TRUE(t4 == pBufPage5);
destroyResultBuf(pResultBuf, NULL);
destroyResultBuf(pResultBuf);
}
void writeDownTest() {
......@@ -94,7 +94,7 @@ void writeDownTest() {
SArray* pa = getDataBufPagesIdList(pResultBuf, groupId);
ASSERT_EQ(taosArrayGetSize(pa), 5);
destroyResultBuf(pResultBuf, NULL);
destroyResultBuf(pResultBuf);
}
void recyclePageTest() {
......@@ -148,7 +148,7 @@ void recyclePageTest() {
SArray* pa = getDataBufPagesIdList(pResultBuf, groupId);
ASSERT_EQ(taosArrayGetSize(pa), 6);
destroyResultBuf(pResultBuf, NULL);
destroyResultBuf(pResultBuf);
}
} // namespace
......
......@@ -381,7 +381,7 @@ void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) {
}
void *taosCacheTransfer(SCacheObj *pCacheObj, void **data) {
if (pCacheObj == NULL || data == NULL || *data == NULL) return NULL;
if (pCacheObj == NULL || data == NULL || (*data) == NULL) return NULL;
size_t offset = offsetof(SCacheDataNode, data);
SCacheDataNode *ptNode = (SCacheDataNode *)((char *)(*data) - offset);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册