提交 ca7e62d9 编写于 作者: S Shengliang Guan

pagedbuf

上级 6834200d
...@@ -13,24 +13,23 @@ ...@@ -13,24 +13,23 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef TDENGINE_TPAGEDBUF_H #ifndef _TD_UTIL_PAGEDBUF_H_
#define TDENGINE_TPAGEDBUF_H #define _TD_UTIL_PAGEDBUF_H_
#include "thash.h"
#include "tlist.h"
#include "tlockfree.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#include "tlist.h" typedef struct SArray* SIDList;
#include "thash.h" typedef struct SPageInfo SPageInfo;
#include "os.h"
#include "tlockfree.h"
typedef struct SArray* SIDList;
typedef struct SPageInfo SPageInfo;
typedef struct SDiskbasedBuf SDiskbasedBuf; typedef struct SDiskbasedBuf SDiskbasedBuf;
#define DEFAULT_INTERN_BUF_PAGE_SIZE (1024L) // in bytes #define DEFAULT_INTERN_BUF_PAGE_SIZE (1024L) // in bytes
#define DEFAULT_PAGE_SIZE (16384L) #define DEFAULT_PAGE_SIZE (16384L)
typedef struct SFilePage { typedef struct SFilePage {
int64_t num; int64_t num;
...@@ -55,7 +54,8 @@ typedef struct SDiskbasedBufStatis { ...@@ -55,7 +54,8 @@ typedef struct SDiskbasedBufStatis {
* @param handle * @param handle
* @return * @return
*/ */
int32_t createDiskbasedBuffer(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMemBufSize, uint64_t qId, const char* dir); int32_t createDiskbasedBuffer(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMemBufSize, uint64_t qId,
const char* dir);
/** /**
* *
...@@ -168,4 +168,4 @@ SDiskbasedBufStatis getDBufStatis(const SDiskbasedBuf* pBuf); ...@@ -168,4 +168,4 @@ SDiskbasedBufStatis getDBufStatis(const SDiskbasedBuf* pBuf);
} }
#endif #endif
#endif // TDENGINE_TPAGEDBUF_H #endif // _TD_UTIL_PAGEDBUF_H_
#include "tlog.h" /*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "tpagedbuf.h" #include "tpagedbuf.h"
#include "taoserror.h" #include "taoserror.h"
#include "tcompression.h" #include "tcompression.h"
#include "thash.h" #include "thash.h"
#include "tlog.h"
#define GET_DATA_PAYLOAD(_p) ((char *)(_p)->pData + POINTER_BYTES) #define GET_DATA_PAYLOAD(_p) ((char*)(_p)->pData + POINTER_BYTES)
#define NO_IN_MEM_AVAILABLE_PAGES(_b) (listNEles((_b)->lruList) >= (_b)->inMemPages) #define NO_IN_MEM_AVAILABLE_PAGES(_b) (listNEles((_b)->lruList) >= (_b)->inMemPages)
typedef struct SFreeListItem { typedef struct SFreeListItem {
...@@ -13,62 +29,63 @@ typedef struct SFreeListItem { ...@@ -13,62 +29,63 @@ typedef struct SFreeListItem {
} SFreeListItem; } SFreeListItem;
typedef struct SPageDiskInfo { typedef struct SPageDiskInfo {
int64_t offset; int64_t offset;
int32_t length; int32_t length;
} SPageDiskInfo; } SPageDiskInfo;
typedef struct SPageInfo { typedef struct SPageInfo {
SListNode* pn; // point to list node SListNode* pn; // point to list node
void* pData; void* pData;
int64_t offset; int64_t offset;
int32_t pageId; int32_t pageId;
int32_t length:30; int32_t length : 30;
bool used:1; // set current page is in used bool used : 1; // set current page is in used
bool dirty:1; // set current buffer page is dirty or not bool dirty : 1; // set current buffer page is dirty or not
} SPageInfo; } SPageInfo;
typedef struct SDiskbasedBuf { typedef struct SDiskbasedBuf {
int32_t numOfPages; int32_t numOfPages;
int64_t totalBufSize; int64_t totalBufSize;
uint64_t fileSize; // disk file size uint64_t fileSize; // disk file size
TdFilePtr pFile; TdFilePtr pFile;
int32_t allocateId; // allocated page id int32_t allocateId; // allocated page id
char* path; // file path char* path; // file path
int32_t pageSize; // current used page size int32_t pageSize; // current used page size
int32_t inMemPages; // numOfPages that are allocated in memory int32_t inMemPages; // numOfPages that are allocated in memory
SHashObj* groupSet; // id hash table SHashObj* groupSet; // id hash table
SHashObj* all; SHashObj* all;
SList* lruList; SList* lruList;
void* emptyDummyIdList; // dummy id list void* emptyDummyIdList; // dummy id list
void* assistBuf; // assistant buffer for compress/decompress data void* assistBuf; // assistant buffer for compress/decompress data
SArray* pFree; // free area in file SArray* pFree; // free area in file
bool comp; // compressed before flushed to disk bool comp; // compressed before flushed to disk
uint64_t nextPos; // next page flush position uint64_t nextPos; // next page flush position
uint64_t qId; // for debug purpose uint64_t qId; // for debug purpose
bool printStatis; // Print statistics info when closing this buffer. bool printStatis; // Print statistics info when closing this buffer.
SDiskbasedBufStatis statis; SDiskbasedBufStatis statis;
} SDiskbasedBuf; } SDiskbasedBuf;
static void printStatisData(const SDiskbasedBuf* pBuf); static void printStatisData(const SDiskbasedBuf* pBuf);
int32_t createDiskbasedBuffer(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMemBufSize, uint64_t qId, const char* dir) { int32_t createDiskbasedBuffer(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMemBufSize, uint64_t qId,
const char* dir) {
*pBuf = calloc(1, sizeof(SDiskbasedBuf)); *pBuf = calloc(1, sizeof(SDiskbasedBuf));
SDiskbasedBuf* pResBuf = *pBuf; SDiskbasedBuf* pResBuf = *pBuf;
if (pResBuf == NULL) { if (pResBuf == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
pResBuf->pageSize = pagesize; pResBuf->pageSize = pagesize;
pResBuf->numOfPages = 0; // all pages are in buffer in the first place pResBuf->numOfPages = 0; // all pages are in buffer in the first place
pResBuf->totalBufSize = 0; pResBuf->totalBufSize = 0;
pResBuf->inMemPages = inMemBufSize/pagesize; // maximum allowed pages, it is a soft limit. pResBuf->inMemPages = inMemBufSize / pagesize; // maximum allowed pages, it is a soft limit.
pResBuf->allocateId = -1; pResBuf->allocateId = -1;
pResBuf->comp = true; pResBuf->comp = true;
pResBuf->pFile = NULL; pResBuf->pFile = NULL;
pResBuf->qId = qId; pResBuf->qId = qId;
pResBuf->fileSize = 0; pResBuf->fileSize = 0;
// at least more than 2 pages must be in memory // at least more than 2 pages must be in memory
assert(inMemBufSize >= pagesize * 2); assert(inMemBufSize >= pagesize * 2);
...@@ -76,8 +93,8 @@ static void printStatisData(const SDiskbasedBuf* pBuf); ...@@ -76,8 +93,8 @@ static void printStatisData(const SDiskbasedBuf* pBuf);
pResBuf->lruList = tdListNew(POINTER_BYTES); pResBuf->lruList = tdListNew(POINTER_BYTES);
// init id hash table // init id hash table
pResBuf->groupSet = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false); pResBuf->groupSet = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false);
pResBuf->assistBuf = malloc(pResBuf->pageSize + 2); // EXTRA BYTES pResBuf->assistBuf = malloc(pResBuf->pageSize + 2); // EXTRA BYTES
pResBuf->all = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false); pResBuf->all = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false);
char path[PATH_MAX] = {0}; char path[PATH_MAX] = {0};
...@@ -86,8 +103,9 @@ static void printStatisData(const SDiskbasedBuf* pBuf); ...@@ -86,8 +103,9 @@ static void printStatisData(const SDiskbasedBuf* pBuf);
pResBuf->emptyDummyIdList = taosArrayInit(1, sizeof(int32_t)); pResBuf->emptyDummyIdList = taosArrayInit(1, sizeof(int32_t));
// qDebug("QInfo:0x%"PRIx64" create resBuf for output, page size:%d, inmem buf pages:%d, file:%s", qId, pResBuf->pageSize, // qDebug("QInfo:0x%"PRIx64" create resBuf for output, page size:%d, inmem buf pages:%d, file:%s", qId,
// pResBuf->inMemPages, pResBuf->path); // pResBuf->pageSize,
// pResBuf->inMemPages, pResBuf->path);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -96,14 +114,14 @@ static int32_t createDiskFile(SDiskbasedBuf* pBuf) { ...@@ -96,14 +114,14 @@ static int32_t createDiskFile(SDiskbasedBuf* pBuf) {
// pBuf->file = fopen(pBuf->path, "wb+"); // pBuf->file = fopen(pBuf->path, "wb+");
pBuf->pFile = taosOpenFile(pBuf->path, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC); pBuf->pFile = taosOpenFile(pBuf->path, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC);
if (pBuf->pFile == NULL) { if (pBuf->pFile == NULL) {
// qError("failed to create tmp file: %s on disk. %s", pBuf->path, strerror(errno)); // qError("failed to create tmp file: %s on disk. %s", pBuf->path, strerror(errno));
return TAOS_SYSTEM_ERROR(errno); return TAOS_SYSTEM_ERROR(errno);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static char* doCompressData(void* data, int32_t srcSize, int32_t *dst, SDiskbasedBuf* pBuf) { // do nothing static char* doCompressData(void* data, int32_t srcSize, int32_t* dst, SDiskbasedBuf* pBuf) { // do nothing
if (!pBuf->comp) { if (!pBuf->comp) {
*dst = srcSize; *dst = srcSize;
return data; return data;
...@@ -115,13 +133,14 @@ static char* doCompressData(void* data, int32_t srcSize, int32_t *dst, SDiskbase ...@@ -115,13 +133,14 @@ static char* doCompressData(void* data, int32_t srcSize, int32_t *dst, SDiskbase
return data; return data;
} }
static char* doDecompressData(void* data, int32_t srcSize, int32_t *dst, SDiskbasedBuf* pBuf) { // do nothing static char* doDecompressData(void* data, int32_t srcSize, int32_t* dst, SDiskbasedBuf* pBuf) { // do nothing
if (!pBuf->comp) { if (!pBuf->comp) {
*dst = srcSize; *dst = srcSize;
return data; return data;
} }
*dst = tsDecompressString(data, srcSize, 1, pBuf->assistBuf, pBuf->pageSize+sizeof(SFilePage), ONE_STAGE_COMP, NULL, 0); *dst = tsDecompressString(data, srcSize, 1, pBuf->assistBuf, pBuf->pageSize + sizeof(SFilePage), ONE_STAGE_COMP, NULL,
0);
if (*dst > 0) { if (*dst > 0) {
memcpy(data, pBuf->assistBuf, *dst); memcpy(data, pBuf->assistBuf, *dst);
} }
...@@ -135,7 +154,7 @@ static uint64_t allocatePositionInFile(SDiskbasedBuf* pBuf, size_t size) { ...@@ -135,7 +154,7 @@ static uint64_t allocatePositionInFile(SDiskbasedBuf* pBuf, size_t size) {
int32_t offset = -1; int32_t offset = -1;
size_t num = taosArrayGetSize(pBuf->pFree); size_t num = taosArrayGetSize(pBuf->pFree);
for(int32_t i = 0; i < num; ++i) { for (int32_t i = 0; i < num; ++i) {
SFreeListItem* pi = taosArrayGet(pBuf->pFree, i); SFreeListItem* pi = taosArrayGet(pBuf->pFree, i);
if (pi->len >= size) { if (pi->len >= size) {
offset = pi->offset; offset = pi->offset;
...@@ -157,7 +176,7 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) { ...@@ -157,7 +176,7 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
int32_t size = -1; int32_t size = -1;
char* t = NULL; char* t = NULL;
if (pg->offset == -1 || pg->dirty) { if (pg->offset == -1 || pg->dirty) {
SFilePage* pPage = (SFilePage*) GET_DATA_PAYLOAD(pg); SFilePage* pPage = (SFilePage*)GET_DATA_PAYLOAD(pg);
t = doCompressData(pPage->data, pBuf->pageSize, &size, pBuf); t = doCompressData(pPage->data, pBuf->pageSize, &size, pBuf);
} }
...@@ -174,7 +193,7 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) { ...@@ -174,7 +193,7 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
return NULL; return NULL;
} }
ret = (int32_t) taosWriteFile(pBuf->pFile, t, size); ret = (int32_t)taosWriteFile(pBuf->pFile, t, size);
if (ret != size) { if (ret != size) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return NULL; return NULL;
...@@ -205,7 +224,7 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) { ...@@ -205,7 +224,7 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
return NULL; return NULL;
} }
ret = (int32_t) taosWriteFile(pBuf->pFile, t, size); ret = (int32_t)taosWriteFile(pBuf->pFile, t, size);
if (ret != size) { if (ret != size) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return NULL; return NULL;
...@@ -222,16 +241,16 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) { ...@@ -222,16 +241,16 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
char* pDataBuf = pg->pData; char* pDataBuf = pg->pData;
memset(pDataBuf, 0, pBuf->pageSize + sizeof(SFilePage)); memset(pDataBuf, 0, pBuf->pageSize + sizeof(SFilePage));
pg->pData = NULL; // this means the data is not in buffer pg->pData = NULL; // this means the data is not in buffer
pg->length = size; pg->length = size;
pg->dirty = false; pg->dirty = false;
return pDataBuf; return pDataBuf;
} }
static char* flushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) { static char* flushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
int32_t ret = TSDB_CODE_SUCCESS; int32_t ret = TSDB_CODE_SUCCESS;
assert(((int64_t) pBuf->numOfPages * pBuf->pageSize) == pBuf->totalBufSize && pBuf->numOfPages >= pBuf->inMemPages); assert(((int64_t)pBuf->numOfPages * pBuf->pageSize) == pBuf->totalBufSize && pBuf->numOfPages >= pBuf->inMemPages);
if (pBuf->pFile == NULL) { if (pBuf->pFile == NULL) {
if ((ret = createDiskFile(pBuf)) != TSDB_CODE_SUCCESS) { if ((ret = createDiskFile(pBuf)) != TSDB_CODE_SUCCESS) {
...@@ -251,7 +270,7 @@ static int32_t loadPageFromDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) { ...@@ -251,7 +270,7 @@ static int32_t loadPageFromDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
return ret; return ret;
} }
SFilePage* pPage = (SFilePage*) GET_DATA_PAYLOAD(pg); SFilePage* pPage = (SFilePage*)GET_DATA_PAYLOAD(pg);
ret = (int32_t)taosReadFile(pBuf->pFile, pPage->data, pg->length); ret = (int32_t)taosReadFile(pBuf->pFile, pPage->data, pg->length);
if (ret != pg->length) { if (ret != pg->length) {
ret = TAOS_SYSTEM_ERROR(errno); ret = TAOS_SYSTEM_ERROR(errno);
...@@ -267,7 +286,7 @@ static int32_t loadPageFromDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) { ...@@ -267,7 +286,7 @@ static int32_t loadPageFromDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
} }
static SIDList addNewGroup(SDiskbasedBuf* pBuf, int32_t groupId) { static SIDList addNewGroup(SDiskbasedBuf* pBuf, int32_t groupId) {
assert(taosHashGet(pBuf->groupSet, (const char*) &groupId, sizeof(int32_t)) == NULL); assert(taosHashGet(pBuf->groupSet, (const char*)&groupId, sizeof(int32_t)) == NULL);
SArray* pa = taosArrayInit(1, POINTER_BYTES); SArray* pa = taosArrayInit(1, POINTER_BYTES);
int32_t ret = taosHashPut(pBuf->groupSet, (const char*)&groupId, sizeof(int32_t), &pa, POINTER_BYTES); int32_t ret = taosHashPut(pBuf->groupSet, (const char*)&groupId, sizeof(int32_t), &pa, POINTER_BYTES);
...@@ -283,21 +302,21 @@ static SPageInfo* registerPage(SDiskbasedBuf* pBuf, int32_t groupId, int32_t pag ...@@ -283,21 +302,21 @@ static SPageInfo* registerPage(SDiskbasedBuf* pBuf, int32_t groupId, int32_t pag
if (p == NULL) { // it is a new group id if (p == NULL) { // it is a new group id
list = addNewGroup(pBuf, groupId); list = addNewGroup(pBuf, groupId);
} else { } else {
list = (SIDList) (*p); list = (SIDList)(*p);
} }
pBuf->numOfPages += 1; pBuf->numOfPages += 1;
SPageInfo* ppi = malloc(sizeof(SPageInfo));//{ .info = PAGE_INFO_INITIALIZER, .pageId = pageId, .pn = NULL}; SPageInfo* ppi = malloc(sizeof(SPageInfo)); //{ .info = PAGE_INFO_INITIALIZER, .pageId = pageId, .pn = NULL};
ppi->pageId = pageId; ppi->pageId = pageId;
ppi->pData = NULL; ppi->pData = NULL;
ppi->offset = -1; ppi->offset = -1;
ppi->length = -1; ppi->length = -1;
ppi->used = true; ppi->used = true;
ppi->pn = NULL; ppi->pn = NULL;
return *(SPageInfo**) taosArrayPush(list, &ppi); return *(SPageInfo**)taosArrayPush(list, &ppi);
} }
static SListNode* getEldestUnrefedPage(SDiskbasedBuf* pBuf) { static SListNode* getEldestUnrefedPage(SDiskbasedBuf* pBuf) {
...@@ -305,10 +324,10 @@ static SListNode* getEldestUnrefedPage(SDiskbasedBuf* pBuf) { ...@@ -305,10 +324,10 @@ static SListNode* getEldestUnrefedPage(SDiskbasedBuf* pBuf) {
tdListInitIter(pBuf->lruList, &iter, TD_LIST_BACKWARD); tdListInitIter(pBuf->lruList, &iter, TD_LIST_BACKWARD);
SListNode* pn = NULL; SListNode* pn = NULL;
while((pn = tdListNext(&iter)) != NULL) { while ((pn = tdListNext(&iter)) != NULL) {
assert(pn != NULL); assert(pn != NULL);
SPageInfo* pageInfo = *(SPageInfo**) pn->data; SPageInfo* pageInfo = *(SPageInfo**)pn->data;
assert(pageInfo->pageId >= 0 && pageInfo->pn == pn); assert(pageInfo->pageId >= 0 && pageInfo->pn == pn);
if (!pageInfo->used) { if (!pageInfo->used) {
...@@ -320,7 +339,7 @@ static SListNode* getEldestUnrefedPage(SDiskbasedBuf* pBuf) { ...@@ -320,7 +339,7 @@ static SListNode* getEldestUnrefedPage(SDiskbasedBuf* pBuf) {
} }
static char* evacOneDataPage(SDiskbasedBuf* pBuf) { static char* evacOneDataPage(SDiskbasedBuf* pBuf) {
char* bufPage = NULL; char* bufPage = NULL;
SListNode* pn = getEldestUnrefedPage(pBuf); SListNode* pn = getEldestUnrefedPage(pBuf);
// all pages are referenced by user, try to allocate new space // all pages are referenced by user, try to allocate new space
...@@ -331,12 +350,12 @@ static char* evacOneDataPage(SDiskbasedBuf* pBuf) { ...@@ -331,12 +350,12 @@ static char* evacOneDataPage(SDiskbasedBuf* pBuf) {
// increase by 50% of previous mem pages // increase by 50% of previous mem pages
pBuf->inMemPages = (int32_t)(pBuf->inMemPages * 1.5f); pBuf->inMemPages = (int32_t)(pBuf->inMemPages * 1.5f);
// qWarn("%p in memory buf page not sufficient, expand from %d to %d, page size:%d", pBuf, prev, // qWarn("%p in memory buf page not sufficient, expand from %d to %d, page size:%d", pBuf, prev,
// pBuf->inMemPages, pBuf->pageSize); // pBuf->inMemPages, pBuf->pageSize);
} else { } else {
tdListPopNode(pBuf->lruList, pn); tdListPopNode(pBuf->lruList, pn);
SPageInfo* d = *(SPageInfo**) pn->data; SPageInfo* d = *(SPageInfo**)pn->data;
assert(d->pn == pn); assert(d->pn == pn);
d->pn = NULL; d->pn = NULL;
...@@ -348,13 +367,13 @@ static char* evacOneDataPage(SDiskbasedBuf* pBuf) { ...@@ -348,13 +367,13 @@ static char* evacOneDataPage(SDiskbasedBuf* pBuf) {
return bufPage; return bufPage;
} }
static void lruListPushFront(SList *pList, SPageInfo* pi) { static void lruListPushFront(SList* pList, SPageInfo* pi) {
tdListPrepend(pList, &pi); tdListPrepend(pList, &pi);
SListNode* front = tdListGetHead(pList); SListNode* front = tdListGetHead(pList);
pi->pn = front; pi->pn = front;
} }
static void lruListMoveToFront(SList *pList, SPageInfo* pi) { static void lruListMoveToFront(SList* pList, SPageInfo* pi) {
tdListPopNode(pList, pi->pn); tdListPopNode(pList, pi->pn);
tdListPrependNode(pList, pi->pn); tdListPrependNode(pList, pi->pn);
} }
...@@ -401,7 +420,7 @@ SFilePage* getNewDataBuf(SDiskbasedBuf* pBuf, int32_t groupId, int32_t* pageId) ...@@ -401,7 +420,7 @@ SFilePage* getNewDataBuf(SDiskbasedBuf* pBuf, int32_t groupId, int32_t* pageId)
((void**)pi->pData)[0] = pi; ((void**)pi->pData)[0] = pi;
pi->used = true; pi->used = true;
return (void *)(GET_DATA_PAYLOAD(pi)); return (void*)(GET_DATA_PAYLOAD(pi));
} }
SFilePage* getBufPage(SDiskbasedBuf* pBuf, int32_t id) { SFilePage* getBufPage(SDiskbasedBuf* pBuf, int32_t id) {
...@@ -411,22 +430,22 @@ SFilePage* getBufPage(SDiskbasedBuf* pBuf, int32_t id) { ...@@ -411,22 +430,22 @@ SFilePage* getBufPage(SDiskbasedBuf* pBuf, int32_t id) {
SPageInfo** pi = taosHashGet(pBuf->all, &id, sizeof(int32_t)); SPageInfo** pi = taosHashGet(pBuf->all, &id, sizeof(int32_t));
assert(pi != NULL && *pi != NULL); assert(pi != NULL && *pi != NULL);
if ((*pi)->pData != NULL) { // it is in memory if ((*pi)->pData != NULL) { // it is in memory
// no need to update the LRU list if only one page exists // no need to update the LRU list if only one page exists
if (pBuf->numOfPages == 1) { if (pBuf->numOfPages == 1) {
(*pi)->used = true; (*pi)->used = true;
return (void *)(GET_DATA_PAYLOAD(*pi)); return (void*)(GET_DATA_PAYLOAD(*pi));
} }
SPageInfo** pInfo = (SPageInfo**) ((*pi)->pn->data); SPageInfo** pInfo = (SPageInfo**)((*pi)->pn->data);
assert(*pInfo == *pi); assert(*pInfo == *pi);
lruListMoveToFront(pBuf->lruList, (*pi)); lruListMoveToFront(pBuf->lruList, (*pi));
(*pi)->used = true; (*pi)->used = true;
return (void *)(GET_DATA_PAYLOAD(*pi)); return (void*)(GET_DATA_PAYLOAD(*pi));
} else { // not in memory } else { // not in memory
assert((*pi)->pData == NULL && (*pi)->pn == NULL && (*pi)->length >= 0 && (*pi)->offset >= 0); assert((*pi)->pData == NULL && (*pi)->pn == NULL && (*pi)->length >= 0 && (*pi)->offset >= 0);
char* availablePage = NULL; char* availablePage = NULL;
...@@ -453,16 +472,16 @@ SFilePage* getBufPage(SDiskbasedBuf* pBuf, int32_t id) { ...@@ -453,16 +472,16 @@ SFilePage* getBufPage(SDiskbasedBuf* pBuf, int32_t id) {
return NULL; return NULL;
} }
return (void *)(GET_DATA_PAYLOAD(*pi)); return (void*)(GET_DATA_PAYLOAD(*pi));
} }
} }
void releaseBufPage(SDiskbasedBuf* pBuf, void* page) { void releaseBufPage(SDiskbasedBuf* pBuf, void* page) {
assert(pBuf != NULL && page != NULL); assert(pBuf != NULL && page != NULL);
int32_t offset = offsetof(SPageInfo, pData); int32_t offset = offsetof(SPageInfo, pData);
char* p = page - offset; char* p = page - offset;
SPageInfo* ppi = ((SPageInfo**) p)[0]; SPageInfo* ppi = ((SPageInfo**)p)[0];
releaseBufPageInfo(pBuf, ppi); releaseBufPageInfo(pBuf, ppi);
} }
...@@ -484,7 +503,7 @@ SIDList getDataBufPagesIdList(SDiskbasedBuf* pBuf, int32_t groupId) { ...@@ -484,7 +503,7 @@ SIDList getDataBufPagesIdList(SDiskbasedBuf* pBuf, int32_t groupId) {
if (p == NULL) { // it is a new group id if (p == NULL) { // it is a new group id
return pBuf->emptyDummyIdList; return pBuf->emptyDummyIdList;
} else { } else {
return (SArray*) (*p); return (SArray*)(*p);
} }
} }
...@@ -496,30 +515,34 @@ void destroyResultBuf(SDiskbasedBuf* pBuf) { ...@@ -496,30 +515,34 @@ void destroyResultBuf(SDiskbasedBuf* pBuf) {
printStatisData(pBuf); printStatisData(pBuf);
if (pBuf->pFile != NULL) { if (pBuf->pFile != NULL) {
uDebug("Paged buffer closed, total:%.2f Kb (%d Pages), inmem size:%.2f Kb (%d Pages), file size:%.2f Kb, page size:%.2f Kb, %"PRIx64"\n", uDebug(
pBuf->totalBufSize/1024.0, pBuf->numOfPages, listNEles(pBuf->lruList) * pBuf->pageSize / 1024.0, "Paged buffer closed, total:%.2f Kb (%d Pages), inmem size:%.2f Kb (%d Pages), file size:%.2f Kb, page "
listNEles(pBuf->lruList), pBuf->fileSize/1024.0, pBuf->pageSize/1024.0f, pBuf->qId); "size:%.2f Kb, %" PRIx64 "\n",
pBuf->totalBufSize / 1024.0, pBuf->numOfPages, listNEles(pBuf->lruList) * pBuf->pageSize / 1024.0,
listNEles(pBuf->lruList), pBuf->fileSize / 1024.0, pBuf->pageSize / 1024.0f, pBuf->qId);
taosCloseFile(&pBuf->pFile); taosCloseFile(&pBuf->pFile);
} else { } else {
uDebug("Paged buffer closed, total:%.2f Kb, no file created, %"PRIx64, pBuf->totalBufSize/1024.0, pBuf->qId); uDebug("Paged buffer closed, total:%.2f Kb, no file created, %" PRIx64, pBuf->totalBufSize / 1024.0, pBuf->qId);
} }
// print the statistics information // print the statistics information
{ {
SDiskbasedBufStatis *ps = &pBuf->statis; SDiskbasedBufStatis* ps = &pBuf->statis;
uDebug("Get/Release pages:%d/%d, flushToDisk:%.2f Kb (%d Pages), loadFromDisk:%.2f Kb (%d Pages), avgPageSize:%.2f Kb\n" uDebug(
, ps->getPages, ps->releasePages, ps->flushBytes/1024.0f, ps->flushPages, ps->loadBytes/1024.0f, ps->loadPages "Get/Release pages:%d/%d, flushToDisk:%.2f Kb (%d Pages), loadFromDisk:%.2f Kb (%d Pages), avgPageSize:%.2f "
, ps->loadBytes/(1024.0 * ps->loadPages)); "Kb\n",
ps->getPages, ps->releasePages, ps->flushBytes / 1024.0f, ps->flushPages, ps->loadBytes / 1024.0f,
ps->loadPages, ps->loadBytes / (1024.0 * ps->loadPages));
} }
remove(pBuf->path); remove(pBuf->path);
tfree(pBuf->path); tfree(pBuf->path);
SArray** p = taosHashIterate(pBuf->groupSet, NULL); SArray** p = taosHashIterate(pBuf->groupSet, NULL);
while(p) { while (p) {
size_t n = taosArrayGetSize(*p); size_t n = taosArrayGetSize(*p);
for(int32_t i = 0; i < n; ++i) { for (int32_t i = 0; i < n; ++i) {
SPageInfo* pi = taosArrayGetP(*p, i); SPageInfo* pi = taosArrayGetP(*p, i);
tfree(pi->pData); tfree(pi->pData);
tfree(pi); tfree(pi);
...@@ -539,7 +562,7 @@ void destroyResultBuf(SDiskbasedBuf* pBuf) { ...@@ -539,7 +562,7 @@ void destroyResultBuf(SDiskbasedBuf* pBuf) {
} }
SPageInfo* getLastPageInfo(SIDList pList) { SPageInfo* getLastPageInfo(SIDList pList) {
size_t size = taosArrayGetSize(pList); size_t size = taosArrayGetSize(pList);
SPageInfo* pPgInfo = taosArrayGetP(pList, size - 1); SPageInfo* pPgInfo = taosArrayGetP(pList, size - 1);
return pPgInfo; return pPgInfo;
} }
...@@ -549,33 +572,23 @@ int32_t getPageId(const SPageInfo* pPgInfo) { ...@@ -549,33 +572,23 @@ int32_t getPageId(const SPageInfo* pPgInfo) {
return pPgInfo->pageId; return pPgInfo->pageId;
} }
int32_t getBufPageSize(const SDiskbasedBuf* pBuf) { int32_t getBufPageSize(const SDiskbasedBuf* pBuf) { return pBuf->pageSize; }
return pBuf->pageSize;
}
int32_t getNumOfInMemBufPages(const SDiskbasedBuf* pBuf) { int32_t getNumOfInMemBufPages(const SDiskbasedBuf* pBuf) { return pBuf->inMemPages; }
return pBuf->inMemPages;
}
bool isAllDataInMemBuf(const SDiskbasedBuf* pBuf) { bool isAllDataInMemBuf(const SDiskbasedBuf* pBuf) { return pBuf->fileSize == 0; }
return pBuf->fileSize == 0;
}
void setBufPageDirty(SFilePage* pPage, bool dirty) { void setBufPageDirty(SFilePage* pPage, bool dirty) {
int32_t offset = offsetof(SPageInfo, pData); // todo extract method int32_t offset = offsetof(SPageInfo, pData); // todo extract method
char* p = (char*)pPage - offset; char* p = (char*)pPage - offset;
SPageInfo* ppi = ((SPageInfo**) p)[0]; SPageInfo* ppi = ((SPageInfo**)p)[0];
ppi->dirty = dirty; ppi->dirty = dirty;
} }
void printStatisBeforeClose(SDiskbasedBuf* pBuf) { void printStatisBeforeClose(SDiskbasedBuf* pBuf) { pBuf->printStatis = true; }
pBuf->printStatis = true;
}
SDiskbasedBufStatis getDBufStatis(const SDiskbasedBuf* pBuf) { SDiskbasedBufStatis getDBufStatis(const SDiskbasedBuf* pBuf) { return pBuf->statis; }
return pBuf->statis;
}
void printStatisData(const SDiskbasedBuf* pBuf) { void printStatisData(const SDiskbasedBuf* pBuf) {
if (!pBuf->printStatis) { if (!pBuf->printStatis) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册