diff --git a/src/query/inc/qResultbuf.h b/src/query/inc/qResultbuf.h index b4c830370cecb05aa698052d09789afd7c7d0f02..da5f46b5e9fed356d27da7248733c2421ba3404c 100644 --- a/src/query/inc/qResultbuf.h +++ b/src/query/inc/qResultbuf.h @@ -49,20 +49,19 @@ typedef struct SDiskbasedResultBuf { int32_t numOfRowsPerPage; int32_t numOfPages; int64_t totalBufSize; -// int32_t fd; + int64_t diskFileSize; // disk file size FILE* file; int32_t allocateId; // allocated page id -// int32_t incStep; // minimum allocated pages - void* pBuf; // mmap buffer pointer char* path; // file path int32_t pageSize; // current used page size int32_t inMemPages; // numOfPages that are allocated in memory - SHashObj* idsTable; // id hash table + SHashObj* groupSet; // id hash table SHashObj* all; - SList* pPageList; + SList* lruList; void* handle; // for debug purpose void* emptyDummyIdList; // dummy id list - bool comp; + bool comp; // compressed before flushed to disk + void* assistBuf; // assistant buffer for compress data SArray* pFree; // free area in file int32_t nextPos; // next page flush position } SDiskbasedResultBuf; @@ -95,7 +94,7 @@ tFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32 * @param pResultBuf * @return */ -int32_t getNumOfRowsPerPage(SDiskbasedResultBuf* pResultBuf); +size_t getNumOfRowsPerPage(const SDiskbasedResultBuf* pResultBuf); /** * @@ -113,6 +112,11 @@ SIDList getDataBufPagesIdList(SDiskbasedResultBuf* pResultBuf, int32_t groupId); */ tFilePage* getResBufPage(SDiskbasedResultBuf* pResultBuf, int32_t id); +/** + * release the referenced buf pages + * @param pResultBuf + * @param page + */ void releaseResBufPage(SDiskbasedResultBuf* pResultBuf, void* page); /** @@ -120,14 +124,14 @@ void releaseResBufPage(SDiskbasedResultBuf* pResultBuf, void* page); * @param pResultBuf * @return */ -int32_t getResBufSize(SDiskbasedResultBuf* pResultBuf); +size_t getResBufSize(const SDiskbasedResultBuf* pResultBuf); /** * get the number of groups in the result buffer * @param pResultBuf * @return */ -int32_t getNumOfResultBufGroupId(SDiskbasedResultBuf* pResultBuf); +size_t getNumOfResultBufGroupId(const SDiskbasedResultBuf* pResultBuf); /** * destroy result buffer diff --git a/src/query/src/qResultbuf.c b/src/query/src/qResultbuf.c index e54217f7e516f1f4dc7937e765673c2eeaae5cc9..fa2bb814f2b0dfab94e1e43796b34bc42e486476 100644 --- a/src/query/src/qResultbuf.c +++ b/src/query/src/qResultbuf.c @@ -1,5 +1,6 @@ #include "qResultbuf.h" #include +#include #include "hash.h" #include "qExtbuffer.h" #include "queryLog.h" @@ -24,11 +25,13 @@ int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t nu pResBuf->totalBufSize = pResBuf->numOfPages * pagesize; pResBuf->allocateId = -1; - pResBuf->pPageList = tdListNew(POINTER_BYTES); + pResBuf->lruList = tdListNew(POINTER_BYTES); // init id hash table - pResBuf->idsTable = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false); + pResBuf->groupSet = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false); pResBuf->all = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false); + pResBuf->assistBuf = malloc(pResBuf->pageSize + 2); // EXTRA BYTES + pResBuf->comp = true; char path[PATH_MAX] = {0}; getTmpfilePath("qbuf", path); @@ -43,25 +46,28 @@ int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t nu return TSDB_CODE_SUCCESS; } -int32_t getNumOfResultBufGroupId(SDiskbasedResultBuf* pResultBuf) { return taosHashGetSize(pResultBuf->idsTable); } - -int32_t getResBufSize(SDiskbasedResultBuf* pResultBuf) { return pResultBuf->totalBufSize; } - #define NUM_OF_PAGES_ON_DISK(_r) ((_r)->numOfPages - (_r)->inMemPages) #define FILE_SIZE_ON_DISK(_r) (NUM_OF_PAGES_ON_DISK(_r) * (_r)->pageSize) static int32_t createDiskResidesBuf(SDiskbasedResultBuf* pResultBuf) { -// pResultBuf->fd = open(pResultBuf->path, O_CREAT | O_RDWR, 0666); - pResultBuf->file = fopen(pResultBuf->path, "w"); + pResultBuf->file = fopen(pResultBuf->path, "wb+"); if (pResultBuf->file == NULL) { qError("failed to create tmp file: %s on disk. %s", pResultBuf->path, strerror(errno)); return TAOS_SYSTEM_ERROR(errno); } + return TSDB_CODE_SUCCESS; } -static char* doCompressData(void* data, int32_t srcSize, int32_t *dst) { // do nothing - *dst = srcSize; +static char* doCompressData(void* data, int32_t srcSize, int32_t *dst, bool comp, void* assistBuf) { // do nothing + if (!comp) { + *dst = srcSize; + return data; + } + + *dst = tsCompressString(data, srcSize, 1, assistBuf, srcSize, ONE_STAGE_COMP, NULL, 0); + + memcpy(data, assistBuf, *dst); return data; } @@ -89,57 +95,64 @@ static int32_t allocatePositionInFile(SDiskbasedResultBuf* pResultBuf, size_t si } } -static void doFlushPageToDisk(SDiskbasedResultBuf* pResultBuf, SPageInfo* pg) { - assert(T_REF_VAL_GET(pg) == 0); +static char* doFlushPageToDisk(SDiskbasedResultBuf* pResultBuf, SPageInfo* pg) { + assert(T_REF_VAL_GET(pg) == 0 && pg->pData != NULL); int32_t size = -1; - char* t = doCompressData(pg->pData + POINTER_BYTES, pResultBuf->pageSize, &size); + char* t = doCompressData(pg->pData + POINTER_BYTES, pResultBuf->pageSize, &size, pResultBuf->comp, pResultBuf->assistBuf); + pg->info.length = size; // this page is flushed to disk for the first time if (pg->info.offset == -1) { - int32_t offset = allocatePositionInFile(pResultBuf, size); + pg->info.offset = allocatePositionInFile(pResultBuf, size); pResultBuf->nextPos += size; - fseek(pResultBuf->file, offset, SEEK_SET); - fwrite(t, size, 1, pResultBuf->file); + fseek(pResultBuf->file, pg->info.offset, SEEK_SET); + int32_t ret = fwrite(t, 1, size, pResultBuf->file); + + UNUSED(ret); } else { if (pg->info.length < size) { // length becomes greater, current space is not enough, allocate new place. //1. add current space to free list taosArrayPush(pResultBuf->pFree, &pg->info); //2. allocate new position, and update the info - int32_t offset = allocatePositionInFile(pResultBuf, size); + pg->info.offset = allocatePositionInFile(pResultBuf, size); pResultBuf->nextPos += size; //3. write to disk. - fseek(pResultBuf->file, offset, SEEK_SET); + fseek(pResultBuf->file, pg->info.offset, SEEK_SET); fwrite(t, size, 1, pResultBuf->file); } } + + char* ret = pg->pData; + pg->pData = NULL; + + return ret; } -static int32_t flushPageToDisk(SDiskbasedResultBuf* pResultBuf, SPageInfo* pg) { +static char* flushPageToDisk(SDiskbasedResultBuf* pResultBuf, SPageInfo* pg) { int32_t ret = TSDB_CODE_SUCCESS; assert(pResultBuf->numOfPages * pResultBuf->pageSize == pResultBuf->totalBufSize && pResultBuf->numOfPages >= pResultBuf->inMemPages); - if (pResultBuf->pBuf == NULL) { - assert(pResultBuf->file == NULL); + if (pResultBuf->file == NULL) { if ((ret = createDiskResidesBuf(pResultBuf)) != TSDB_CODE_SUCCESS) { - return ret; + terrno = ret; + return NULL; } } - doFlushPageToDisk(pResultBuf, pg); - return TSDB_CODE_SUCCESS; + return doFlushPageToDisk(pResultBuf, pg); } #define NO_AVAILABLE_PAGES(_b) ((_b)->numOfPages >= (_b)->inMemPages) static SIDList addNewGroup(SDiskbasedResultBuf* pResultBuf, int32_t groupId) { - assert(taosHashGet(pResultBuf->idsTable, (const char*) &groupId, sizeof(int32_t)) == NULL); + assert(taosHashGet(pResultBuf->groupSet, (const char*) &groupId, sizeof(int32_t)) == NULL); SArray* pa = taosArrayInit(1, sizeof(SPageInfo)); - int32_t ret = taosHashPut(pResultBuf->idsTable, (const char*)&groupId, sizeof(int32_t), &pa, POINTER_BYTES); + int32_t ret = taosHashPut(pResultBuf->groupSet, (const char*)&groupId, sizeof(int32_t), &pa, POINTER_BYTES); assert(ret == 0); return pa; @@ -148,7 +161,7 @@ static SIDList addNewGroup(SDiskbasedResultBuf* pResultBuf, int32_t groupId) { static SPageInfo* registerPage(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32_t pageId) { SIDList list = NULL; - char** p = taosHashGet(pResultBuf->idsTable, (const char*)&groupId, sizeof(int32_t)); + char** p = taosHashGet(pResultBuf->groupSet, (const char*)&groupId, sizeof(int32_t)); if (p == NULL) { // it is a new group id list = addNewGroup(pResultBuf, groupId); } else { @@ -162,10 +175,13 @@ static SPageInfo* registerPage(SDiskbasedResultBuf* pResultBuf, int32_t groupId, } tFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32_t* pageId) { + char* allocPg = NULL; + if (NO_AVAILABLE_PAGES(pResultBuf)) { + // get the last page in linked list SListIter iter = {0}; - tdListInitIter(pResultBuf->pPageList, &iter, TD_LIST_BACKWARD); + tdListInitIter(pResultBuf->lruList, &iter, TD_LIST_BACKWARD); SListNode* pn = NULL; while((pn = tdListNext(&iter)) != NULL) { @@ -183,11 +199,12 @@ tFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32 qWarn("%p in memory buf page not sufficient, expand from %d to %d, page size:%d", pResultBuf, prev, pResultBuf->inMemPages, pResultBuf->pageSize); } else { - tdListPopNode(pResultBuf->pPageList, pn); + tdListPopNode(pResultBuf->lruList, pn); SPageInfo* d = *(SPageInfo**) pn->data; tfree(pn); - if (flushPageToDisk(pResultBuf, d) != TSDB_CODE_SUCCESS) { + allocPg = flushPageToDisk(pResultBuf, d); + if (allocPg == NULL) { return NULL; } } @@ -200,14 +217,19 @@ tFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32 SPageInfo* pi = registerPage(pResultBuf, groupId, *pageId); // add to LRU list - assert(listNEles(pResultBuf->pPageList) < pResultBuf->inMemPages); - tdListPrepend(pResultBuf->pPageList, &pi); + assert(listNEles(pResultBuf->lruList) < pResultBuf->inMemPages); + tdListPrepend(pResultBuf->lruList, &pi); // add to hash map taosHashPut(pResultBuf->all, pageId, sizeof(int32_t), &pi, POINTER_BYTES); // allocate buf - pi->pData = calloc(1, pResultBuf->pageSize + POINTER_BYTES); + if (allocPg == NULL) { + pi->pData = calloc(1, pResultBuf->pageSize + POINTER_BYTES); + } else { + pi->pData = allocPg; + } + pResultBuf->totalBufSize += pResultBuf->pageSize; T_REF_INC(pi); // add ref count @@ -231,41 +253,47 @@ tFilePage* getResBufPage(SDiskbasedResultBuf* pResultBuf, int32_t id) { SListNode* pnode = NULL; // todo speed up SListIter iter = {0}; - tdListInitIter(pResultBuf->pPageList, &iter, TD_LIST_FORWARD); + tdListInitIter(pResultBuf->lruList, &iter, TD_LIST_FORWARD); while((pnode = tdListNext(&iter)) != NULL) { SPageInfo** pInfo = (SPageInfo**) pnode->data; // remove it and add it into the front of linked-list if ((*pInfo)->pageId == id) { - tdListPopNode(pResultBuf->pPageList, pnode); - tdListPrependNode(pResultBuf->pPageList, pnode); + tdListPopNode(pResultBuf->lruList, pnode); + tdListPrependNode(pResultBuf->lruList, pnode); T_REF_INC(*(SPageInfo**)pnode->data); return ((*(SPageInfo**)pnode->data)->pData + POINTER_BYTES); } } } else { // not in memory - // choose the be flushed page - // get the last page in linked list + assert((*pi)->pData == NULL && (*pi)->info.length >= 0 && (*pi)->info.offset >= 0); + + // choose the be flushed page: get the last page in linked list SListIter iter1 = {0}; - tdListInitIter(pResultBuf->pPageList, &iter1, TD_LIST_BACKWARD); + tdListInitIter(pResultBuf->lruList, &iter1, TD_LIST_BACKWARD); SListNode* pn = NULL; while((pn = tdListNext(&iter1)) != NULL) { assert(pn != NULL); - if (T_REF_VAL_GET(*(SPageInfo**)pn->data) == 0) { + if (T_REF_VAL_GET(*(SPageInfo**)(pn->data)) == 0) { break; } } // all pages are referenced by user, try to allocate new space if (pn == NULL) { + int32_t prev = pResultBuf->inMemPages; pResultBuf->inMemPages = pResultBuf->inMemPages * 1.5; - assert(0); - return NULL; + + qWarn("%p in memory buf page not sufficient, expand from %d to %d, page size:%d", pResultBuf, prev, + pResultBuf->inMemPages, pResultBuf->pageSize); + + (*pi)->pData = calloc(1, pResultBuf->pageSize + POINTER_BYTES); } else { - tdListPopNode(pResultBuf->pPageList, pn); + tdListPopNode(pResultBuf->lruList, pn); + if (flushPageToDisk(pResultBuf, *(SPageInfo**)pn->data) != TSDB_CODE_SUCCESS) { return NULL; } @@ -273,15 +301,23 @@ tFilePage* getResBufPage(SDiskbasedResultBuf* pResultBuf, int32_t id) { char* buf = (*(SPageInfo**)pn->data)->pData; (*(SPageInfo**)pn->data)->pData = NULL; - // load file in disk - fseek(pResultBuf->file, (*pi)->info.offset, SEEK_SET); - fread(buf, (*pi)->info.length, 1, pResultBuf->file); - (*pi)->pData = buf; + ((void**)((*pi)->pData))[0] = (*pi); tfree(pn); - return (*pi)->pData; } + + // load file in disk + int32_t ret = fseek(pResultBuf->file, (*pi)->info.offset, SEEK_SET); + ret = fread((*pi)->pData + POINTER_BYTES, 1, (*pi)->info.length, pResultBuf->file); + if (ret != (*pi)->info.length) { + terrno = errno; + return NULL; + } + + // todo do decomp + + return (*pi)->pData + POINTER_BYTES; } return NULL; @@ -297,12 +333,16 @@ void releaseResBufPage(SDiskbasedResultBuf* pResultBuf, void* page) { T_REF_DEC(ppi); } -int32_t getNumOfRowsPerPage(SDiskbasedResultBuf* pResultBuf) { return pResultBuf->numOfRowsPerPage; } +size_t getNumOfRowsPerPage(const SDiskbasedResultBuf* pResultBuf) { return pResultBuf->numOfRowsPerPage; } + +size_t getNumOfResultBufGroupId(const SDiskbasedResultBuf* pResultBuf) { return taosHashGetSize(pResultBuf->groupSet); } + +size_t getResBufSize(const SDiskbasedResultBuf* pResultBuf) { return pResultBuf->totalBufSize; } SIDList getDataBufPagesIdList(SDiskbasedResultBuf* pResultBuf, int32_t groupId) { assert(pResultBuf != NULL); - char** p = taosHashGet(pResultBuf->idsTable, (const char*)&groupId, sizeof(int32_t)); + char** p = taosHashGet(pResultBuf->groupSet, (const char*)&groupId, sizeof(int32_t)); if (p == NULL) { // it is a new group id return pResultBuf->emptyDummyIdList; } else { @@ -320,7 +360,6 @@ void destroyResultBuf(SDiskbasedResultBuf* pResultBuf, void* handle) { pResultBuf->totalBufSize, pResultBuf->path, FILE_SIZE_ON_DISK(pResultBuf)); fclose(pResultBuf->file); - pResultBuf->pBuf = NULL; } else { qDebug("QInfo:%p disk-based output buffer closed, total:%" PRId64 " bytes, no file created", handle, pResultBuf->totalBufSize); @@ -329,19 +368,25 @@ void destroyResultBuf(SDiskbasedResultBuf* pResultBuf, void* handle) { unlink(pResultBuf->path); tfree(pResultBuf->path); - SHashMutableIterator* iter = taosHashCreateIter(pResultBuf->idsTable); + SHashMutableIterator* iter = taosHashCreateIter(pResultBuf->groupSet); while(taosHashIterNext(iter)) { SArray** p = (SArray**) taosHashIterGet(iter); + size_t n = taosArrayGetSize(*p); + for(int32_t i = 0; i < n; ++i) { + SPageInfo* pi = taosArrayGet(*p, i); + tfree(pi->pData); + } taosArrayDestroy(*p); } taosHashDestroyIter(iter); - tdListFree(pResultBuf->pPageList); + tdListFree(pResultBuf->lruList); taosArrayDestroy(pResultBuf->emptyDummyIdList); - taosHashCleanup(pResultBuf->idsTable); + taosHashCleanup(pResultBuf->groupSet); taosHashCleanup(pResultBuf->all); + tfree(pResultBuf->assistBuf); tfree(pResultBuf); } diff --git a/src/query/tests/resultBufferTest.cpp b/src/query/tests/resultBufferTest.cpp index 93c1a65218ebca1f47614428804ab4ae4a031ea8..53a05925c2c778ed7161a923b4fd26e3cabb815a 100644 --- a/src/query/tests/resultBufferTest.cpp +++ b/src/query/tests/resultBufferTest.cpp @@ -29,28 +29,80 @@ void simpleTest() { tFilePage* pBufPage1 = getNewDataBuf(pResultBuf, groupId, &pageId); tFilePage* t = getResBufPage(pResultBuf, pageId); - assert(t == pBufPage1); + ASSERT_TRUE(t == pBufPage1); tFilePage* pBufPage2 = getNewDataBuf(pResultBuf, groupId, &pageId); tFilePage* t1 = getResBufPage(pResultBuf, pageId); - assert(t1 == pBufPage2); + ASSERT_TRUE(t1 == pBufPage2); tFilePage* pBufPage3 = getNewDataBuf(pResultBuf, groupId, &pageId); tFilePage* t2 = getResBufPage(pResultBuf, pageId); - assert(t2 == pBufPage3); + ASSERT_TRUE(t2 == pBufPage3); tFilePage* pBufPage4 = getNewDataBuf(pResultBuf, groupId, &pageId); tFilePage* t3 = getResBufPage(pResultBuf, pageId); - assert(t3 == pBufPage4); + ASSERT_TRUE(t3 == pBufPage4); tFilePage* pBufPage5 = getNewDataBuf(pResultBuf, groupId, &pageId); tFilePage* t4 = getResBufPage(pResultBuf, pageId); - assert(t4 == pBufPage5); + ASSERT_TRUE(t4 == pBufPage5); + + destroyResultBuf(pResultBuf, NULL); +} + +void writeDownTest() { + SDiskbasedResultBuf* pResultBuf = NULL; + int32_t ret = createDiskbasedResultBuffer(&pResultBuf, 1000, 64, 1024, 4, NULL); + + int32_t pageId = 0; + int32_t writePageId = 0; + int32_t groupId = 0; + int32_t nx = 12345; + + tFilePage* pBufPage = getNewDataBuf(pResultBuf, groupId, &pageId); + ASSERT_TRUE(pBufPage != NULL); + + *(int32_t*)(pBufPage->data) = nx; + writePageId = pageId; + releaseResBufPage(pResultBuf, pBufPage); + + tFilePage* pBufPage1 = getNewDataBuf(pResultBuf, groupId, &pageId); + tFilePage* t1 = getResBufPage(pResultBuf, pageId); + ASSERT_TRUE(t1 == pBufPage1); + ASSERT_TRUE(pageId == 1); + + tFilePage* pBufPage2 = getNewDataBuf(pResultBuf, groupId, &pageId); + tFilePage* t2 = getResBufPage(pResultBuf, pageId); + ASSERT_TRUE(t2 == pBufPage2); + ASSERT_TRUE(pageId == 2); + + tFilePage* pBufPage3 = getNewDataBuf(pResultBuf, groupId, &pageId); + tFilePage* t3 = getResBufPage(pResultBuf, pageId); + ASSERT_TRUE(t3 == pBufPage3); + ASSERT_TRUE(pageId == 3); + + tFilePage* pBufPage4 = getNewDataBuf(pResultBuf, groupId, &pageId); + tFilePage* t4 = getResBufPage(pResultBuf, pageId); + ASSERT_TRUE(t4 == pBufPage4); + ASSERT_TRUE(pageId == 4); + releaseResBufPage(pResultBuf, t4); + + // flush the written page to disk, and read it out again + tFilePage* pBufPagex = getResBufPage(pResultBuf, writePageId); + ASSERT_EQ(*(int32_t*)pBufPagex->data, nx); + + SArray* pa = getDataBufPagesIdList(pResultBuf, groupId); + ASSERT_EQ(taosArrayGetSize(pa), 5); + + destroyResultBuf(pResultBuf, NULL); } } // namespace + TEST(testCase, resultBufferTest) { + srand(time(NULL)); simpleTest(); + writeDownTest(); } diff --git a/src/util/src/tcompare.c b/src/util/src/tcompare.c index 889d38ff207315ed489f7877e3ee9f566a5cde41..95645882547110cb36fdc16eafe8cc7d1fe09bcf 100644 --- a/src/util/src/tcompare.c +++ b/src/util/src/tcompare.c @@ -1,6 +1,6 @@ #include "taosdef.h" #include "tcompare.h" -#include +#include "tarray.h" #include "tutil.h" int32_t compareInt32Val(const void *pLeft, const void *pRight) {