提交 10b9968d 编写于 作者: H Haojun Liao

[td-225] fix bugs in resbuf

上级 8e97949a
...@@ -49,20 +49,19 @@ typedef struct SDiskbasedResultBuf { ...@@ -49,20 +49,19 @@ typedef struct SDiskbasedResultBuf {
int32_t numOfRowsPerPage; int32_t numOfRowsPerPage;
int32_t numOfPages; int32_t numOfPages;
int64_t totalBufSize; int64_t totalBufSize;
// int32_t fd; int64_t diskFileSize; // disk file size
FILE* file; FILE* file;
int32_t allocateId; // allocated page id int32_t allocateId; // allocated page id
// int32_t incStep; // minimum allocated pages
void* pBuf; // mmap buffer pointer
char* path; // file path char* path; // file path
int32_t pageSize; // current used page size int32_t pageSize; // current used page size
int32_t inMemPages; // numOfPages that are allocated in memory int32_t inMemPages; // numOfPages that are allocated in memory
SHashObj* idsTable; // id hash table SHashObj* groupSet; // id hash table
SHashObj* all; SHashObj* all;
SList* pPageList; SList* lruList;
void* handle; // for debug purpose void* handle; // for debug purpose
void* emptyDummyIdList; // dummy id list void* emptyDummyIdList; // dummy id list
bool comp; bool comp; // compressed before flushed to disk
void* assistBuf; // assistant buffer for compress data
SArray* pFree; // free area in file SArray* pFree; // free area in file
int32_t nextPos; // next page flush position int32_t nextPos; // next page flush position
} SDiskbasedResultBuf; } SDiskbasedResultBuf;
...@@ -95,7 +94,7 @@ tFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32 ...@@ -95,7 +94,7 @@ tFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32
* @param pResultBuf * @param pResultBuf
* @return * @return
*/ */
int32_t getNumOfRowsPerPage(SDiskbasedResultBuf* pResultBuf); size_t getNumOfRowsPerPage(const SDiskbasedResultBuf* pResultBuf);
/** /**
* *
...@@ -113,6 +112,11 @@ SIDList getDataBufPagesIdList(SDiskbasedResultBuf* pResultBuf, int32_t groupId); ...@@ -113,6 +112,11 @@ SIDList getDataBufPagesIdList(SDiskbasedResultBuf* pResultBuf, int32_t groupId);
*/ */
tFilePage* getResBufPage(SDiskbasedResultBuf* pResultBuf, int32_t id); tFilePage* getResBufPage(SDiskbasedResultBuf* pResultBuf, int32_t id);
/**
* release the referenced buf pages
* @param pResultBuf
* @param page
*/
void releaseResBufPage(SDiskbasedResultBuf* pResultBuf, void* page); void releaseResBufPage(SDiskbasedResultBuf* pResultBuf, void* page);
/** /**
...@@ -120,14 +124,14 @@ void releaseResBufPage(SDiskbasedResultBuf* pResultBuf, void* page); ...@@ -120,14 +124,14 @@ void releaseResBufPage(SDiskbasedResultBuf* pResultBuf, void* page);
* @param pResultBuf * @param pResultBuf
* @return * @return
*/ */
int32_t getResBufSize(SDiskbasedResultBuf* pResultBuf); size_t getResBufSize(const SDiskbasedResultBuf* pResultBuf);
/** /**
* get the number of groups in the result buffer * get the number of groups in the result buffer
* @param pResultBuf * @param pResultBuf
* @return * @return
*/ */
int32_t getNumOfResultBufGroupId(SDiskbasedResultBuf* pResultBuf); size_t getNumOfResultBufGroupId(const SDiskbasedResultBuf* pResultBuf);
/** /**
* destroy result buffer * destroy result buffer
......
#include "qResultbuf.h" #include "qResultbuf.h"
#include <stddef.h> #include <stddef.h>
#include <tscompression.h>
#include "hash.h" #include "hash.h"
#include "qExtbuffer.h" #include "qExtbuffer.h"
#include "queryLog.h" #include "queryLog.h"
...@@ -24,11 +25,13 @@ int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t nu ...@@ -24,11 +25,13 @@ int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t nu
pResBuf->totalBufSize = pResBuf->numOfPages * pagesize; pResBuf->totalBufSize = pResBuf->numOfPages * pagesize;
pResBuf->allocateId = -1; pResBuf->allocateId = -1;
pResBuf->pPageList = tdListNew(POINTER_BYTES); pResBuf->lruList = tdListNew(POINTER_BYTES);
// init id hash table // init id hash table
pResBuf->idsTable = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false); pResBuf->groupSet = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false);
pResBuf->all = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false); pResBuf->all = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false);
pResBuf->assistBuf = malloc(pResBuf->pageSize + 2); // EXTRA BYTES
pResBuf->comp = true;
char path[PATH_MAX] = {0}; char path[PATH_MAX] = {0};
getTmpfilePath("qbuf", path); getTmpfilePath("qbuf", path);
...@@ -43,25 +46,28 @@ int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t nu ...@@ -43,25 +46,28 @@ int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t nu
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t getNumOfResultBufGroupId(SDiskbasedResultBuf* pResultBuf) { return taosHashGetSize(pResultBuf->idsTable); }
int32_t getResBufSize(SDiskbasedResultBuf* pResultBuf) { return pResultBuf->totalBufSize; }
#define NUM_OF_PAGES_ON_DISK(_r) ((_r)->numOfPages - (_r)->inMemPages) #define NUM_OF_PAGES_ON_DISK(_r) ((_r)->numOfPages - (_r)->inMemPages)
#define FILE_SIZE_ON_DISK(_r) (NUM_OF_PAGES_ON_DISK(_r) * (_r)->pageSize) #define FILE_SIZE_ON_DISK(_r) (NUM_OF_PAGES_ON_DISK(_r) * (_r)->pageSize)
static int32_t createDiskResidesBuf(SDiskbasedResultBuf* pResultBuf) { static int32_t createDiskResidesBuf(SDiskbasedResultBuf* pResultBuf) {
// pResultBuf->fd = open(pResultBuf->path, O_CREAT | O_RDWR, 0666); pResultBuf->file = fopen(pResultBuf->path, "wb+");
pResultBuf->file = fopen(pResultBuf->path, "w");
if (pResultBuf->file == NULL) { if (pResultBuf->file == NULL) {
qError("failed to create tmp file: %s on disk. %s", pResultBuf->path, strerror(errno)); qError("failed to create tmp file: %s on disk. %s", pResultBuf->path, strerror(errno));
return TAOS_SYSTEM_ERROR(errno); return TAOS_SYSTEM_ERROR(errno);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static char* doCompressData(void* data, int32_t srcSize, int32_t *dst) { // do nothing static char* doCompressData(void* data, int32_t srcSize, int32_t *dst, bool comp, void* assistBuf) { // do nothing
*dst = srcSize; if (!comp) {
*dst = srcSize;
return data;
}
*dst = tsCompressString(data, srcSize, 1, assistBuf, srcSize, ONE_STAGE_COMP, NULL, 0);
memcpy(data, assistBuf, *dst);
return data; return data;
} }
...@@ -89,57 +95,64 @@ static int32_t allocatePositionInFile(SDiskbasedResultBuf* pResultBuf, size_t si ...@@ -89,57 +95,64 @@ static int32_t allocatePositionInFile(SDiskbasedResultBuf* pResultBuf, size_t si
} }
} }
static void doFlushPageToDisk(SDiskbasedResultBuf* pResultBuf, SPageInfo* pg) { static char* doFlushPageToDisk(SDiskbasedResultBuf* pResultBuf, SPageInfo* pg) {
assert(T_REF_VAL_GET(pg) == 0); assert(T_REF_VAL_GET(pg) == 0 && pg->pData != NULL);
int32_t size = -1; int32_t size = -1;
char* t = doCompressData(pg->pData + POINTER_BYTES, pResultBuf->pageSize, &size); char* t = doCompressData(pg->pData + POINTER_BYTES, pResultBuf->pageSize, &size, pResultBuf->comp, pResultBuf->assistBuf);
pg->info.length = size;
// this page is flushed to disk for the first time // this page is flushed to disk for the first time
if (pg->info.offset == -1) { if (pg->info.offset == -1) {
int32_t offset = allocatePositionInFile(pResultBuf, size); pg->info.offset = allocatePositionInFile(pResultBuf, size);
pResultBuf->nextPos += size; pResultBuf->nextPos += size;
fseek(pResultBuf->file, offset, SEEK_SET); fseek(pResultBuf->file, pg->info.offset, SEEK_SET);
fwrite(t, size, 1, pResultBuf->file); int32_t ret = fwrite(t, 1, size, pResultBuf->file);
UNUSED(ret);
} else { } else {
if (pg->info.length < size) { // length becomes greater, current space is not enough, allocate new place. if (pg->info.length < size) { // length becomes greater, current space is not enough, allocate new place.
//1. add current space to free list //1. add current space to free list
taosArrayPush(pResultBuf->pFree, &pg->info); taosArrayPush(pResultBuf->pFree, &pg->info);
//2. allocate new position, and update the info //2. allocate new position, and update the info
int32_t offset = allocatePositionInFile(pResultBuf, size); pg->info.offset = allocatePositionInFile(pResultBuf, size);
pResultBuf->nextPos += size; pResultBuf->nextPos += size;
//3. write to disk. //3. write to disk.
fseek(pResultBuf->file, offset, SEEK_SET); fseek(pResultBuf->file, pg->info.offset, SEEK_SET);
fwrite(t, size, 1, pResultBuf->file); fwrite(t, size, 1, pResultBuf->file);
} }
} }
char* ret = pg->pData;
pg->pData = NULL;
return ret;
} }
static int32_t flushPageToDisk(SDiskbasedResultBuf* pResultBuf, SPageInfo* pg) { static char* flushPageToDisk(SDiskbasedResultBuf* pResultBuf, SPageInfo* pg) {
int32_t ret = TSDB_CODE_SUCCESS; int32_t ret = TSDB_CODE_SUCCESS;
assert(pResultBuf->numOfPages * pResultBuf->pageSize == pResultBuf->totalBufSize && pResultBuf->numOfPages >= pResultBuf->inMemPages); assert(pResultBuf->numOfPages * pResultBuf->pageSize == pResultBuf->totalBufSize && pResultBuf->numOfPages >= pResultBuf->inMemPages);
if (pResultBuf->pBuf == NULL) { if (pResultBuf->file == NULL) {
assert(pResultBuf->file == NULL);
if ((ret = createDiskResidesBuf(pResultBuf)) != TSDB_CODE_SUCCESS) { if ((ret = createDiskResidesBuf(pResultBuf)) != TSDB_CODE_SUCCESS) {
return ret; terrno = ret;
return NULL;
} }
} }
doFlushPageToDisk(pResultBuf, pg); return doFlushPageToDisk(pResultBuf, pg);
return TSDB_CODE_SUCCESS;
} }
#define NO_AVAILABLE_PAGES(_b) ((_b)->numOfPages >= (_b)->inMemPages) #define NO_AVAILABLE_PAGES(_b) ((_b)->numOfPages >= (_b)->inMemPages)
static SIDList addNewGroup(SDiskbasedResultBuf* pResultBuf, int32_t groupId) { static SIDList addNewGroup(SDiskbasedResultBuf* pResultBuf, int32_t groupId) {
assert(taosHashGet(pResultBuf->idsTable, (const char*) &groupId, sizeof(int32_t)) == NULL); assert(taosHashGet(pResultBuf->groupSet, (const char*) &groupId, sizeof(int32_t)) == NULL);
SArray* pa = taosArrayInit(1, sizeof(SPageInfo)); SArray* pa = taosArrayInit(1, sizeof(SPageInfo));
int32_t ret = taosHashPut(pResultBuf->idsTable, (const char*)&groupId, sizeof(int32_t), &pa, POINTER_BYTES); int32_t ret = taosHashPut(pResultBuf->groupSet, (const char*)&groupId, sizeof(int32_t), &pa, POINTER_BYTES);
assert(ret == 0); assert(ret == 0);
return pa; return pa;
...@@ -148,7 +161,7 @@ static SIDList addNewGroup(SDiskbasedResultBuf* pResultBuf, int32_t groupId) { ...@@ -148,7 +161,7 @@ static SIDList addNewGroup(SDiskbasedResultBuf* pResultBuf, int32_t groupId) {
static SPageInfo* registerPage(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32_t pageId) { static SPageInfo* registerPage(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32_t pageId) {
SIDList list = NULL; SIDList list = NULL;
char** p = taosHashGet(pResultBuf->idsTable, (const char*)&groupId, sizeof(int32_t)); char** p = taosHashGet(pResultBuf->groupSet, (const char*)&groupId, sizeof(int32_t));
if (p == NULL) { // it is a new group id if (p == NULL) { // it is a new group id
list = addNewGroup(pResultBuf, groupId); list = addNewGroup(pResultBuf, groupId);
} else { } else {
...@@ -162,10 +175,13 @@ static SPageInfo* registerPage(SDiskbasedResultBuf* pResultBuf, int32_t groupId, ...@@ -162,10 +175,13 @@ static SPageInfo* registerPage(SDiskbasedResultBuf* pResultBuf, int32_t groupId,
} }
tFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32_t* pageId) { tFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32_t* pageId) {
char* allocPg = NULL;
if (NO_AVAILABLE_PAGES(pResultBuf)) { if (NO_AVAILABLE_PAGES(pResultBuf)) {
// get the last page in linked list // get the last page in linked list
SListIter iter = {0}; SListIter iter = {0};
tdListInitIter(pResultBuf->pPageList, &iter, TD_LIST_BACKWARD); tdListInitIter(pResultBuf->lruList, &iter, TD_LIST_BACKWARD);
SListNode* pn = NULL; SListNode* pn = NULL;
while((pn = tdListNext(&iter)) != NULL) { while((pn = tdListNext(&iter)) != NULL) {
...@@ -183,11 +199,12 @@ tFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32 ...@@ -183,11 +199,12 @@ tFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32
qWarn("%p in memory buf page not sufficient, expand from %d to %d, page size:%d", pResultBuf, prev, qWarn("%p in memory buf page not sufficient, expand from %d to %d, page size:%d", pResultBuf, prev,
pResultBuf->inMemPages, pResultBuf->pageSize); pResultBuf->inMemPages, pResultBuf->pageSize);
} else { } else {
tdListPopNode(pResultBuf->pPageList, pn); tdListPopNode(pResultBuf->lruList, pn);
SPageInfo* d = *(SPageInfo**) pn->data; SPageInfo* d = *(SPageInfo**) pn->data;
tfree(pn); tfree(pn);
if (flushPageToDisk(pResultBuf, d) != TSDB_CODE_SUCCESS) { allocPg = flushPageToDisk(pResultBuf, d);
if (allocPg == NULL) {
return NULL; return NULL;
} }
} }
...@@ -200,14 +217,19 @@ tFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32 ...@@ -200,14 +217,19 @@ tFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32
SPageInfo* pi = registerPage(pResultBuf, groupId, *pageId); SPageInfo* pi = registerPage(pResultBuf, groupId, *pageId);
// add to LRU list // add to LRU list
assert(listNEles(pResultBuf->pPageList) < pResultBuf->inMemPages); assert(listNEles(pResultBuf->lruList) < pResultBuf->inMemPages);
tdListPrepend(pResultBuf->pPageList, &pi); tdListPrepend(pResultBuf->lruList, &pi);
// add to hash map // add to hash map
taosHashPut(pResultBuf->all, pageId, sizeof(int32_t), &pi, POINTER_BYTES); taosHashPut(pResultBuf->all, pageId, sizeof(int32_t), &pi, POINTER_BYTES);
// allocate buf // allocate buf
pi->pData = calloc(1, pResultBuf->pageSize + POINTER_BYTES); if (allocPg == NULL) {
pi->pData = calloc(1, pResultBuf->pageSize + POINTER_BYTES);
} else {
pi->pData = allocPg;
}
pResultBuf->totalBufSize += pResultBuf->pageSize; pResultBuf->totalBufSize += pResultBuf->pageSize;
T_REF_INC(pi); // add ref count T_REF_INC(pi); // add ref count
...@@ -231,41 +253,47 @@ tFilePage* getResBufPage(SDiskbasedResultBuf* pResultBuf, int32_t id) { ...@@ -231,41 +253,47 @@ tFilePage* getResBufPage(SDiskbasedResultBuf* pResultBuf, int32_t id) {
SListNode* pnode = NULL; // todo speed up SListNode* pnode = NULL; // todo speed up
SListIter iter = {0}; SListIter iter = {0};
tdListInitIter(pResultBuf->pPageList, &iter, TD_LIST_FORWARD); tdListInitIter(pResultBuf->lruList, &iter, TD_LIST_FORWARD);
while((pnode = tdListNext(&iter)) != NULL) { while((pnode = tdListNext(&iter)) != NULL) {
SPageInfo** pInfo = (SPageInfo**) pnode->data; SPageInfo** pInfo = (SPageInfo**) pnode->data;
// remove it and add it into the front of linked-list // remove it and add it into the front of linked-list
if ((*pInfo)->pageId == id) { if ((*pInfo)->pageId == id) {
tdListPopNode(pResultBuf->pPageList, pnode); tdListPopNode(pResultBuf->lruList, pnode);
tdListPrependNode(pResultBuf->pPageList, pnode); tdListPrependNode(pResultBuf->lruList, pnode);
T_REF_INC(*(SPageInfo**)pnode->data); T_REF_INC(*(SPageInfo**)pnode->data);
return ((*(SPageInfo**)pnode->data)->pData + POINTER_BYTES); return ((*(SPageInfo**)pnode->data)->pData + POINTER_BYTES);
} }
} }
} else { // not in memory } else { // not in memory
// choose the be flushed page assert((*pi)->pData == NULL && (*pi)->info.length >= 0 && (*pi)->info.offset >= 0);
// get the last page in linked list
// choose the be flushed page: get the last page in linked list
SListIter iter1 = {0}; SListIter iter1 = {0};
tdListInitIter(pResultBuf->pPageList, &iter1, TD_LIST_BACKWARD); tdListInitIter(pResultBuf->lruList, &iter1, TD_LIST_BACKWARD);
SListNode* pn = NULL; SListNode* pn = NULL;
while((pn = tdListNext(&iter1)) != NULL) { while((pn = tdListNext(&iter1)) != NULL) {
assert(pn != NULL); assert(pn != NULL);
if (T_REF_VAL_GET(*(SPageInfo**)pn->data) == 0) { if (T_REF_VAL_GET(*(SPageInfo**)(pn->data)) == 0) {
break; break;
} }
} }
// all pages are referenced by user, try to allocate new space // all pages are referenced by user, try to allocate new space
if (pn == NULL) { if (pn == NULL) {
int32_t prev = pResultBuf->inMemPages;
pResultBuf->inMemPages = pResultBuf->inMemPages * 1.5; pResultBuf->inMemPages = pResultBuf->inMemPages * 1.5;
assert(0);
return NULL; qWarn("%p in memory buf page not sufficient, expand from %d to %d, page size:%d", pResultBuf, prev,
pResultBuf->inMemPages, pResultBuf->pageSize);
(*pi)->pData = calloc(1, pResultBuf->pageSize + POINTER_BYTES);
} else { } else {
tdListPopNode(pResultBuf->pPageList, pn); tdListPopNode(pResultBuf->lruList, pn);
if (flushPageToDisk(pResultBuf, *(SPageInfo**)pn->data) != TSDB_CODE_SUCCESS) { if (flushPageToDisk(pResultBuf, *(SPageInfo**)pn->data) != TSDB_CODE_SUCCESS) {
return NULL; return NULL;
} }
...@@ -273,15 +301,23 @@ tFilePage* getResBufPage(SDiskbasedResultBuf* pResultBuf, int32_t id) { ...@@ -273,15 +301,23 @@ tFilePage* getResBufPage(SDiskbasedResultBuf* pResultBuf, int32_t id) {
char* buf = (*(SPageInfo**)pn->data)->pData; char* buf = (*(SPageInfo**)pn->data)->pData;
(*(SPageInfo**)pn->data)->pData = NULL; (*(SPageInfo**)pn->data)->pData = NULL;
// load file in disk
fseek(pResultBuf->file, (*pi)->info.offset, SEEK_SET);
fread(buf, (*pi)->info.length, 1, pResultBuf->file);
(*pi)->pData = buf; (*pi)->pData = buf;
((void**)((*pi)->pData))[0] = (*pi);
tfree(pn); tfree(pn);
return (*pi)->pData;
} }
// load file in disk
int32_t ret = fseek(pResultBuf->file, (*pi)->info.offset, SEEK_SET);
ret = fread((*pi)->pData + POINTER_BYTES, 1, (*pi)->info.length, pResultBuf->file);
if (ret != (*pi)->info.length) {
terrno = errno;
return NULL;
}
// todo do decomp
return (*pi)->pData + POINTER_BYTES;
} }
return NULL; return NULL;
...@@ -297,12 +333,16 @@ void releaseResBufPage(SDiskbasedResultBuf* pResultBuf, void* page) { ...@@ -297,12 +333,16 @@ void releaseResBufPage(SDiskbasedResultBuf* pResultBuf, void* page) {
T_REF_DEC(ppi); T_REF_DEC(ppi);
} }
int32_t getNumOfRowsPerPage(SDiskbasedResultBuf* pResultBuf) { return pResultBuf->numOfRowsPerPage; } size_t getNumOfRowsPerPage(const SDiskbasedResultBuf* pResultBuf) { return pResultBuf->numOfRowsPerPage; }
size_t getNumOfResultBufGroupId(const SDiskbasedResultBuf* pResultBuf) { return taosHashGetSize(pResultBuf->groupSet); }
size_t getResBufSize(const SDiskbasedResultBuf* pResultBuf) { return pResultBuf->totalBufSize; }
SIDList getDataBufPagesIdList(SDiskbasedResultBuf* pResultBuf, int32_t groupId) { SIDList getDataBufPagesIdList(SDiskbasedResultBuf* pResultBuf, int32_t groupId) {
assert(pResultBuf != NULL); assert(pResultBuf != NULL);
char** p = taosHashGet(pResultBuf->idsTable, (const char*)&groupId, sizeof(int32_t)); char** p = taosHashGet(pResultBuf->groupSet, (const char*)&groupId, sizeof(int32_t));
if (p == NULL) { // it is a new group id if (p == NULL) { // it is a new group id
return pResultBuf->emptyDummyIdList; return pResultBuf->emptyDummyIdList;
} else { } else {
...@@ -320,7 +360,6 @@ void destroyResultBuf(SDiskbasedResultBuf* pResultBuf, void* handle) { ...@@ -320,7 +360,6 @@ void destroyResultBuf(SDiskbasedResultBuf* pResultBuf, void* handle) {
pResultBuf->totalBufSize, pResultBuf->path, FILE_SIZE_ON_DISK(pResultBuf)); pResultBuf->totalBufSize, pResultBuf->path, FILE_SIZE_ON_DISK(pResultBuf));
fclose(pResultBuf->file); fclose(pResultBuf->file);
pResultBuf->pBuf = NULL;
} else { } else {
qDebug("QInfo:%p disk-based output buffer closed, total:%" PRId64 " bytes, no file created", handle, qDebug("QInfo:%p disk-based output buffer closed, total:%" PRId64 " bytes, no file created", handle,
pResultBuf->totalBufSize); pResultBuf->totalBufSize);
...@@ -329,19 +368,25 @@ void destroyResultBuf(SDiskbasedResultBuf* pResultBuf, void* handle) { ...@@ -329,19 +368,25 @@ void destroyResultBuf(SDiskbasedResultBuf* pResultBuf, void* handle) {
unlink(pResultBuf->path); unlink(pResultBuf->path);
tfree(pResultBuf->path); tfree(pResultBuf->path);
SHashMutableIterator* iter = taosHashCreateIter(pResultBuf->idsTable); SHashMutableIterator* iter = taosHashCreateIter(pResultBuf->groupSet);
while(taosHashIterNext(iter)) { while(taosHashIterNext(iter)) {
SArray** p = (SArray**) taosHashIterGet(iter); SArray** p = (SArray**) taosHashIterGet(iter);
size_t n = taosArrayGetSize(*p);
for(int32_t i = 0; i < n; ++i) {
SPageInfo* pi = taosArrayGet(*p, i);
tfree(pi->pData);
}
taosArrayDestroy(*p); taosArrayDestroy(*p);
} }
taosHashDestroyIter(iter); taosHashDestroyIter(iter);
tdListFree(pResultBuf->pPageList); tdListFree(pResultBuf->lruList);
taosArrayDestroy(pResultBuf->emptyDummyIdList); taosArrayDestroy(pResultBuf->emptyDummyIdList);
taosHashCleanup(pResultBuf->idsTable); taosHashCleanup(pResultBuf->groupSet);
taosHashCleanup(pResultBuf->all); taosHashCleanup(pResultBuf->all);
tfree(pResultBuf->assistBuf);
tfree(pResultBuf); tfree(pResultBuf);
} }
......
...@@ -29,28 +29,80 @@ void simpleTest() { ...@@ -29,28 +29,80 @@ void simpleTest() {
tFilePage* pBufPage1 = getNewDataBuf(pResultBuf, groupId, &pageId); tFilePage* pBufPage1 = getNewDataBuf(pResultBuf, groupId, &pageId);
tFilePage* t = getResBufPage(pResultBuf, pageId); tFilePage* t = getResBufPage(pResultBuf, pageId);
assert(t == pBufPage1); ASSERT_TRUE(t == pBufPage1);
tFilePage* pBufPage2 = getNewDataBuf(pResultBuf, groupId, &pageId); tFilePage* pBufPage2 = getNewDataBuf(pResultBuf, groupId, &pageId);
tFilePage* t1 = getResBufPage(pResultBuf, pageId); tFilePage* t1 = getResBufPage(pResultBuf, pageId);
assert(t1 == pBufPage2); ASSERT_TRUE(t1 == pBufPage2);
tFilePage* pBufPage3 = getNewDataBuf(pResultBuf, groupId, &pageId); tFilePage* pBufPage3 = getNewDataBuf(pResultBuf, groupId, &pageId);
tFilePage* t2 = getResBufPage(pResultBuf, pageId); tFilePage* t2 = getResBufPage(pResultBuf, pageId);
assert(t2 == pBufPage3); ASSERT_TRUE(t2 == pBufPage3);
tFilePage* pBufPage4 = getNewDataBuf(pResultBuf, groupId, &pageId); tFilePage* pBufPage4 = getNewDataBuf(pResultBuf, groupId, &pageId);
tFilePage* t3 = getResBufPage(pResultBuf, pageId); tFilePage* t3 = getResBufPage(pResultBuf, pageId);
assert(t3 == pBufPage4); ASSERT_TRUE(t3 == pBufPage4);
tFilePage* pBufPage5 = getNewDataBuf(pResultBuf, groupId, &pageId); tFilePage* pBufPage5 = getNewDataBuf(pResultBuf, groupId, &pageId);
tFilePage* t4 = getResBufPage(pResultBuf, pageId); tFilePage* t4 = getResBufPage(pResultBuf, pageId);
assert(t4 == pBufPage5); ASSERT_TRUE(t4 == pBufPage5);
destroyResultBuf(pResultBuf, NULL);
}
void writeDownTest() {
SDiskbasedResultBuf* pResultBuf = NULL;
int32_t ret = createDiskbasedResultBuffer(&pResultBuf, 1000, 64, 1024, 4, NULL);
int32_t pageId = 0;
int32_t writePageId = 0;
int32_t groupId = 0;
int32_t nx = 12345;
tFilePage* pBufPage = getNewDataBuf(pResultBuf, groupId, &pageId);
ASSERT_TRUE(pBufPage != NULL);
*(int32_t*)(pBufPage->data) = nx;
writePageId = pageId;
releaseResBufPage(pResultBuf, pBufPage);
tFilePage* pBufPage1 = getNewDataBuf(pResultBuf, groupId, &pageId);
tFilePage* t1 = getResBufPage(pResultBuf, pageId);
ASSERT_TRUE(t1 == pBufPage1);
ASSERT_TRUE(pageId == 1);
tFilePage* pBufPage2 = getNewDataBuf(pResultBuf, groupId, &pageId);
tFilePage* t2 = getResBufPage(pResultBuf, pageId);
ASSERT_TRUE(t2 == pBufPage2);
ASSERT_TRUE(pageId == 2);
tFilePage* pBufPage3 = getNewDataBuf(pResultBuf, groupId, &pageId);
tFilePage* t3 = getResBufPage(pResultBuf, pageId);
ASSERT_TRUE(t3 == pBufPage3);
ASSERT_TRUE(pageId == 3);
tFilePage* pBufPage4 = getNewDataBuf(pResultBuf, groupId, &pageId);
tFilePage* t4 = getResBufPage(pResultBuf, pageId);
ASSERT_TRUE(t4 == pBufPage4);
ASSERT_TRUE(pageId == 4);
releaseResBufPage(pResultBuf, t4);
// flush the written page to disk, and read it out again
tFilePage* pBufPagex = getResBufPage(pResultBuf, writePageId);
ASSERT_EQ(*(int32_t*)pBufPagex->data, nx);
SArray* pa = getDataBufPagesIdList(pResultBuf, groupId);
ASSERT_EQ(taosArrayGetSize(pa), 5);
destroyResultBuf(pResultBuf, NULL); destroyResultBuf(pResultBuf, NULL);
} }
} // namespace } // namespace
TEST(testCase, resultBufferTest) { TEST(testCase, resultBufferTest) {
srand(time(NULL));
simpleTest(); simpleTest();
writeDownTest();
} }
#include "taosdef.h" #include "taosdef.h"
#include "tcompare.h" #include "tcompare.h"
#include <tarray.h> #include "tarray.h"
#include "tutil.h" #include "tutil.h"
int32_t compareInt32Val(const void *pLeft, const void *pRight) { int32_t compareInt32Val(const void *pLeft, const void *pRight) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册