提交 9a9ea692 编写于 作者: H Haojun Liao

[td-225] update resbuf

上级 f72183fe
......@@ -20,42 +20,56 @@
extern "C" {
#endif
#include <tlist.h>
#include "hash.h"
#include "os.h"
#include "qExtbuffer.h"
#include "tlockfree.h"
typedef struct SArray* SIDList;
typedef struct SPageInfo {
int32_t pageId;
typedef struct SPageDiskInfo {
int32_t offset;
int32_t lengthOnDisk;
int32_t length;
} SPageDiskInfo;
typedef struct SPageInfo {
int32_t pageId;
SPageDiskInfo info;
void* pData;
T_REF_DECLARE();
} SPageInfo;
typedef struct SFreeListItem {
int32_t offset;
int32_t len;
} SFreeListItem;
typedef struct SDiskbasedResultBuf {
int32_t numOfRowsPerPage;
int32_t numOfPages;
int64_t totalBufSize;
int32_t fd;
// FILE* file;
// int32_t fd;
FILE* file;
int32_t allocateId; // allocated page id
int32_t incStep; // minimum allocated pages
// int32_t incStep; // minimum allocated pages
void* pBuf; // mmap buffer pointer
char* path; // file path
int32_t pageSize; // current used page size
int32_t inMemPages; // numOfPages that are allocated in memory
SHashObj* idsTable; // id hash table
SIDList list; // for each id, there is a page id list
void* iBuf; // inmemory buf
SHashObj* all;
SList* pPageList;
void* handle; // for debug purpose
void* emptyDummyIdList; // dummy id list
bool comp;
SArray* pFree; // free area in file
int32_t nextPos; // next page flush position
} SDiskbasedResultBuf;
#define DEFAULT_INTERN_BUF_PAGE_SIZE (1024L)
#define DEFAULT_INMEM_BUF_PAGES 10
#define PAGE_INFO_INITIALIZER (SPageDiskInfo){-1, -1}
/**
* create disk-based result buffer
......@@ -65,7 +79,7 @@ typedef struct SDiskbasedResultBuf {
* @return
*/
int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t numOfPages, int32_t rowSize, int32_t pagesize,
int32_t inMemPages, const void* handle);
int32_t inMemPages, const void* handle);
/**
*
......@@ -97,13 +111,10 @@ SIDList getDataBufPagesIdList(SDiskbasedResultBuf* pResultBuf, int32_t groupId);
* @param id
* @return
*/
static FORCE_INLINE tFilePage* getResBufPage(SDiskbasedResultBuf* pResultBuf, int32_t id) {
if (id < pResultBuf->inMemPages) {
return (tFilePage*) ((char*) pResultBuf->iBuf + id * pResultBuf->pageSize);
} else {
return (tFilePage*) ((char*) pResultBuf->pBuf + (id - pResultBuf->inMemPages) * pResultBuf->pageSize);
}
}
tFilePage* getResBufPage(SDiskbasedResultBuf* pResultBuf, int32_t id);
void releaseResBufPage(SDiskbasedResultBuf* pResultBuf, void* page);
/**
* get the total buffer size in the format of disk file
* @param pResultBuf
......
#include "qResultbuf.h"
#include <stddef.h>
#include "hash.h"
#include "qExtbuffer.h"
#include "queryLog.h"
......@@ -14,29 +15,26 @@ int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t nu
}
pResBuf->pageSize = pagesize;
pResBuf->numOfPages = inMemPages; // all pages are in buffer in the first place
pResBuf->numOfPages = 0; // all pages are in buffer in the first place
pResBuf->inMemPages = inMemPages;
assert(inMemPages <= numOfPages);
pResBuf->numOfRowsPerPage = (pagesize - sizeof(tFilePage)) / rowSize;
pResBuf->totalBufSize = pResBuf->numOfPages * pagesize;
pResBuf->incStep = 4;
pResBuf->allocateId = -1;
// todo opt perf by on demand create in memory buffer
pResBuf->iBuf = calloc(pResBuf->inMemPages, pResBuf->pageSize);
pResBuf->pPageList = tdListNew(POINTER_BYTES);
// init id hash table
pResBuf->idsTable = taosHashInit(numOfPages, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false);
pResBuf->list = taosArrayInit(numOfPages, POINTER_BYTES);
pResBuf->idsTable = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false);
pResBuf->all = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false);
char path[PATH_MAX] = {0};
getTmpfilePath("qbuf", path);
pResBuf->path = strdup(path);
pResBuf->fd = FD_INITIALIZER;
pResBuf->pBuf = NULL;
pResBuf->file = NULL;
pResBuf->emptyDummyIdList = taosArrayInit(1, sizeof(int32_t));
qDebug("QInfo:%p create resBuf for output, page size:%d, initial pages:%d, %" PRId64 "bytes", handle,
......@@ -53,133 +51,258 @@ int32_t getResBufSize(SDiskbasedResultBuf* pResultBuf) { return pResultBuf->tota
#define FILE_SIZE_ON_DISK(_r) (NUM_OF_PAGES_ON_DISK(_r) * (_r)->pageSize)
static int32_t createDiskResidesBuf(SDiskbasedResultBuf* pResultBuf) {
pResultBuf->fd = open(pResultBuf->path, O_CREAT | O_RDWR, 0666);
// pResultBuf->file = fopen(pResultBuf->path, "w");
if (!FD_VALID(pResultBuf->fd)) {
// pResultBuf->fd = open(pResultBuf->path, O_CREAT | O_RDWR, 0666);
pResultBuf->file = fopen(pResultBuf->path, "w");
if (pResultBuf->file == NULL) {
qError("failed to create tmp file: %s on disk. %s", pResultBuf->path, strerror(errno));
return TAOS_SYSTEM_ERROR(errno);
}
return TSDB_CODE_SUCCESS;
}
assert(pResultBuf->numOfPages == pResultBuf->inMemPages);
pResultBuf->numOfPages += pResultBuf->incStep;
static char* doCompressData(void* data, int32_t srcSize, int32_t *dst) { // do nothing
*dst = srcSize;
return data;
}
int32_t ret = ftruncate(pResultBuf->fd, NUM_OF_PAGES_ON_DISK(pResultBuf) * pResultBuf->pageSize);
if (ret != TSDB_CODE_SUCCESS) {
qError("failed to create tmp file: %s on disk. %s", pResultBuf->path, strerror(errno));
return TAOS_SYSTEM_ERROR(errno);
}
static int32_t allocatePositionInFile(SDiskbasedResultBuf* pResultBuf, size_t size) {
if (pResultBuf->pFree == NULL) {
return pResultBuf->nextPos;
} else { //todo speed up the search procedure
size_t num = taosArrayGetSize(pResultBuf->pFree);
pResultBuf->pBuf = mmap(NULL, FILE_SIZE_ON_DISK(pResultBuf), PROT_READ | PROT_WRITE, MAP_SHARED, pResultBuf->fd, 0);
int32_t offset = -1;
if (pResultBuf->pBuf == MAP_FAILED) {
qError("QInfo:%p failed to map temp file: %s. %s", pResultBuf->handle, pResultBuf->path, strerror(errno));
return TAOS_SYSTEM_ERROR(errno);
}
for(int32_t i = 0; i < num; ++i) {
SFreeListItem* pi = taosArrayGet(pResultBuf->pFree, i);
if (pi->len >= size) {
offset = pi->offset;
pi->offset += size;
pi->len -= size;
pResultBuf->totalBufSize = pResultBuf->numOfPages * pResultBuf->pageSize;
return TSDB_CODE_SUCCESS;
return offset;
}
}
// no available recycle space, allocate new area in file
return pResultBuf->nextPos;
}
}
static int32_t extendDiskFileSize(SDiskbasedResultBuf* pResultBuf, int32_t incNumOfPages) {
assert(pResultBuf->numOfPages * pResultBuf->pageSize == pResultBuf->totalBufSize);
int32_t ret = TSDB_CODE_SUCCESS;
static void doFlushPageToDisk(SDiskbasedResultBuf* pResultBuf, SPageInfo* pg) {
assert(T_REF_VAL_GET(pg) == 0);
if (pResultBuf->pBuf == NULL) {
assert(!FD_VALID(pResultBuf->fd));
int32_t size = -1;
char* t = doCompressData(pg->pData + POINTER_BYTES, pResultBuf->pageSize, &size);
if ((ret = createDiskResidesBuf(pResultBuf)) != TSDB_CODE_SUCCESS) {
return ret;
}
// this page is flushed to disk for the first time
if (pg->info.offset == -1) {
int32_t offset = allocatePositionInFile(pResultBuf, size);
pResultBuf->nextPos += size;
fseek(pResultBuf->file, offset, SEEK_SET);
fwrite(t, size, 1, pResultBuf->file);
} else {
ret = munmap(pResultBuf->pBuf, FILE_SIZE_ON_DISK(pResultBuf));
pResultBuf->numOfPages += incNumOfPages;
/*
* disk-based output buffer is exhausted, try to extend the disk-based buffer, the available disk space may
* be insufficient
*/
ret = ftruncate(pResultBuf->fd, NUM_OF_PAGES_ON_DISK(pResultBuf) * pResultBuf->pageSize);
if (ret != TSDB_CODE_SUCCESS) {
// dError("QInfo:%p failed to create intermediate result output file:%s. %s", pQInfo, pSupporter->extBufFile,
// strerror(errno));
return TSDB_CODE_QRY_NO_DISKSPACE;
if (pg->info.length < size) { // length becomes greater, current space is not enough, allocate new place.
//1. add current space to free list
taosArrayPush(pResultBuf->pFree, &pg->info);
//2. allocate new position, and update the info
int32_t offset = allocatePositionInFile(pResultBuf, size);
pResultBuf->nextPos += size;
//3. write to disk.
fseek(pResultBuf->file, offset, SEEK_SET);
fwrite(t, size, 1, pResultBuf->file);
}
}
}
pResultBuf->totalBufSize = pResultBuf->numOfPages * pResultBuf->pageSize;
pResultBuf->pBuf = mmap(NULL, FILE_SIZE_ON_DISK(pResultBuf), PROT_READ | PROT_WRITE, MAP_SHARED, pResultBuf->fd, 0);
static int32_t flushPageToDisk(SDiskbasedResultBuf* pResultBuf, SPageInfo* pg) {
int32_t ret = TSDB_CODE_SUCCESS;
assert(pResultBuf->numOfPages * pResultBuf->pageSize == pResultBuf->totalBufSize && pResultBuf->numOfPages >= pResultBuf->inMemPages);
if (pResultBuf->pBuf == MAP_FAILED) {
// dError("QInfo:%p failed to map temp file: %s. %s", pQInfo, pSupporter->extBufFile, strerror(errno));
return TSDB_CODE_QRY_OUT_OF_MEMORY;
if (pResultBuf->pBuf == NULL) {
assert(pResultBuf->file == NULL);
if ((ret = createDiskResidesBuf(pResultBuf)) != TSDB_CODE_SUCCESS) {
return ret;
}
}
doFlushPageToDisk(pResultBuf, pg);
return TSDB_CODE_SUCCESS;
}
#define NO_AVAILABLE_PAGES(_b) ((_b)->allocateId == (_b)->numOfPages - 1)
#define NO_AVAILABLE_PAGES(_b) ((_b)->numOfPages >= (_b)->inMemPages)
static FORCE_INLINE int32_t getGroupIndex(SDiskbasedResultBuf* pResultBuf, int32_t groupId) {
assert(pResultBuf != NULL);
static SIDList addNewGroup(SDiskbasedResultBuf* pResultBuf, int32_t groupId) {
assert(taosHashGet(pResultBuf->idsTable, (const char*) &groupId, sizeof(int32_t)) == NULL);
char* p = taosHashGet(pResultBuf->idsTable, (const char*)&groupId, sizeof(int32_t));
SArray* pa = taosArrayInit(1, sizeof(SPageInfo));
int32_t ret = taosHashPut(pResultBuf->idsTable, (const char*)&groupId, sizeof(int32_t), &pa, POINTER_BYTES);
assert(ret == 0);
return pa;
}
static SPageInfo* registerPage(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32_t pageId) {
SIDList list = NULL;
char** p = taosHashGet(pResultBuf->idsTable, (const char*)&groupId, sizeof(int32_t));
if (p == NULL) { // it is a new group id
return -1;
list = addNewGroup(pResultBuf, groupId);
} else {
list = (SIDList) (*p);
}
int32_t slot = GET_INT32_VAL(p);
assert(slot >= 0 && slot < taosHashGetSize(pResultBuf->idsTable));
pResultBuf->numOfPages += 1;
return slot;
SPageInfo ppi = { .info = PAGE_INFO_INITIALIZER, .pageId = pageId, };
return taosArrayPush(list, &ppi);
}
static int32_t addNewGroupId(SDiskbasedResultBuf* pResultBuf, int32_t groupId) {
int32_t num = getNumOfResultBufGroupId(pResultBuf); // the num is the newest allocated group id slot
taosHashPut(pResultBuf->idsTable, (const char*)&groupId, sizeof(int32_t), &num, sizeof(int32_t));
tFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32_t* pageId) {
if (NO_AVAILABLE_PAGES(pResultBuf)) {
// get the last page in linked list
SListIter iter = {0};
tdListInitIter(pResultBuf->pPageList, &iter, TD_LIST_BACKWARD);
SListNode* pn = NULL;
while((pn = tdListNext(&iter)) != NULL) {
assert(pn != NULL);
if (T_REF_VAL_GET(*(SPageInfo**)pn->data) == 0) {
break;
}
}
// all pages are referenced by user, try to allocate new space
if (pn == NULL) {
int32_t prev = pResultBuf->inMemPages;
pResultBuf->inMemPages = pResultBuf->inMemPages * 1.5;
qWarn("%p in memory buf page not sufficient, expand from %d to %d, page size:%d", pResultBuf, prev,
pResultBuf->inMemPages, pResultBuf->pageSize);
} else {
tdListPopNode(pResultBuf->pPageList, pn);
if (flushPageToDisk(pResultBuf, *(SPageInfo**)pn->data) != TSDB_CODE_SUCCESS) {
return NULL;
}
}
}
SArray* pa = taosArrayInit(1, sizeof(int32_t));
taosArrayPush(pResultBuf->list, &pa);
// register new id in this group
*pageId = (++pResultBuf->allocateId);
assert(taosArrayGetSize(pResultBuf->list) == taosHashGetSize(pResultBuf->idsTable));
return num;
}
// register page id info
SPageInfo* pi = registerPage(pResultBuf, groupId, *pageId);
static void registerPageId(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32_t pageId) {
int32_t slot = getGroupIndex(pResultBuf, groupId);
if (slot < 0) {
slot = addNewGroupId(pResultBuf, groupId);
}
// add to LRU list
assert(listNEles(pResultBuf->pPageList) < pResultBuf->inMemPages);
tdListPrepend(pResultBuf->pPageList, &pi);
// add to hash map
taosHashPut(pResultBuf->all, pageId, sizeof(int32_t), &pi, POINTER_BYTES);
// allocate buf
pi->pData = calloc(1, pResultBuf->pageSize + POINTER_BYTES);
pResultBuf->totalBufSize += pResultBuf->pageSize;
T_REF_INC(pi); // add ref count
((void**)pi->pData)[0] = pi;
SIDList pList = taosArrayGetP(pResultBuf->list, slot);
taosArrayPush(pList, &pageId);
return pi->pData + POINTER_BYTES;
}
tFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32_t* pageId) {
if (NO_AVAILABLE_PAGES(pResultBuf)) {
if (extendDiskFileSize(pResultBuf, pResultBuf->incStep) != TSDB_CODE_SUCCESS) {
tFilePage* getResBufPage(SDiskbasedResultBuf* pResultBuf, int32_t id) {
assert(pResultBuf != NULL && id >= 0);
SPageInfo** pi = taosHashGet(pResultBuf->all, &id, sizeof(int32_t));
assert(pi != NULL && *pi != NULL);
if ((*pi)->pData != NULL) { // it is in memory
// no need to update the LRU list
if (pResultBuf->numOfPages == 1) {
return (*pi)->pData + POINTER_BYTES;
}
SListNode* pnode = NULL; // todo speed up
SListIter iter = {0};
tdListInitIter(pResultBuf->pPageList, &iter, TD_LIST_FORWARD);
while((pnode = tdListNext(&iter)) != NULL) {
SPageInfo** pInfo = (SPageInfo**) pnode->data;
// remove it and add it into the front of linked-list
if ((*pInfo)->pageId == id) {
tdListPopNode(pResultBuf->pPageList, pnode);
tdListPrependNode(pResultBuf->pPageList, pnode);
T_REF_INC(*(SPageInfo**)pnode->data);
return ((*(SPageInfo**)pnode->data)->pData + POINTER_BYTES);
}
}
} else { // not in memory
// choose the be flushed page
// get the last page in linked list
SListIter iter1 = {0};
tdListInitIter(pResultBuf->pPageList, &iter1, TD_LIST_BACKWARD);
SListNode* pn = NULL;
while((pn = tdListNext(&iter1)) != NULL) {
assert(pn != NULL);
if (T_REF_VAL_GET(*(SPageInfo**)pn->data) == 0) {
break;
}
}
// all pages are referenced by user, try to allocate new space
if (pn == NULL) {
pResultBuf->inMemPages = pResultBuf->inMemPages * 1.5;
assert(0);
return NULL;
} else {
tdListPopNode(pResultBuf->pPageList, pn);
if (flushPageToDisk(pResultBuf, *(SPageInfo**)pn->data) != TSDB_CODE_SUCCESS) {
return NULL;
}
char* buf = (*(SPageInfo**)pn->data)->pData;
(*(SPageInfo**)pn->data)->pData = NULL;
// load file in disk
fseek(pResultBuf->file, (*pi)->info.offset, SEEK_SET);
fread(buf, (*pi)->info.length, 1, pResultBuf->file);
(*pi)->pData = buf;
return (*pi)->pData;
}
}
// register new id in this group
*pageId = (++pResultBuf->allocateId);
registerPageId(pResultBuf, groupId, *pageId);
return NULL;
}
// clear memory for the new page
tFilePage* page = getResBufPage(pResultBuf, *pageId);
memset(page, 0, pResultBuf->pageSize);
return page;
void releaseResBufPage(SDiskbasedResultBuf* pResultBuf, void* page) {
assert(pResultBuf != NULL && page != NULL);
char* p = (char*) page - POINTER_BYTES;
SPageInfo* ppi = ((SPageInfo**) p)[0];
assert(T_REF_VAL_GET(ppi) > 0);
T_REF_DEC(ppi);
}
int32_t getNumOfRowsPerPage(SDiskbasedResultBuf* pResultBuf) { return pResultBuf->numOfRowsPerPage; }
SIDList getDataBufPagesIdList(SDiskbasedResultBuf* pResultBuf, int32_t groupId) {
int32_t slot = getGroupIndex(pResultBuf, groupId);
if (slot < 0) {
assert(pResultBuf != NULL);
char** p = taosHashGet(pResultBuf->idsTable, (const char*)&groupId, sizeof(int32_t));
if (p == NULL) { // it is a new group id
return pResultBuf->emptyDummyIdList;
} else {
return taosArrayGetP(pResultBuf->list, slot);
return (SArray*) (*p);
}
}
......@@ -188,12 +311,11 @@ void destroyResultBuf(SDiskbasedResultBuf* pResultBuf, void* handle) {
return;
}
if (FD_VALID(pResultBuf->fd)) {
if (pResultBuf->file != NULL) {
qDebug("QInfo:%p disk-based output buffer closed, total:%" PRId64 " bytes, file created:%s, file size:%d", handle,
pResultBuf->totalBufSize, pResultBuf->path, FILE_SIZE_ON_DISK(pResultBuf));
close(pResultBuf->fd);
munmap(pResultBuf->pBuf, FILE_SIZE_ON_DISK(pResultBuf));
fclose(pResultBuf->file);
pResultBuf->pBuf = NULL;
} else {
qDebug("QInfo:%p disk-based output buffer closed, total:%" PRId64 " bytes, no file created", handle,
......@@ -203,17 +325,16 @@ void destroyResultBuf(SDiskbasedResultBuf* pResultBuf, void* handle) {
unlink(pResultBuf->path);
tfree(pResultBuf->path);
size_t size = taosArrayGetSize(pResultBuf->list);
for (int32_t i = 0; i < size; ++i) {
SArray* pa = taosArrayGetP(pResultBuf->list, i);
taosArrayDestroy(pa);
}
// size_t size = taosArrayGetSize(pResultBuf->list);
// for (int32_t i = 0; i < size; ++i) {
// SArray* pa = taosArrayGetP(pResultBuf->list, i);
// taosArrayDestroy(pa);
// }
taosArrayDestroy(pResultBuf->list);
tdListFree(pResultBuf->pPageList);
taosArrayDestroy(pResultBuf->emptyDummyIdList);
taosHashCleanup(pResultBuf->idsTable);
tfree(pResultBuf->iBuf);
tfree(pResultBuf);
}
......
......@@ -18,13 +18,35 @@ void simpleTest() {
tFilePage* pBufPage = getNewDataBuf(pResultBuf, groupId, &pageId);
ASSERT_TRUE(pBufPage != NULL);
ASSERT_EQ(getNumOfRowsPerPage(pResultBuf), (16384L - sizeof(int64_t))/64);
ASSERT_EQ(getResBufSize(pResultBuf), 1000*16384L);
ASSERT_EQ(getResBufSize(pResultBuf), 1024);
SIDList list = getDataBufPagesIdList(pResultBuf, groupId);
ASSERT_EQ(taosArrayGetSize(list), 1);
ASSERT_EQ(getNumOfResultBufGroupId(pResultBuf), 1);
releaseResBufPage(pResultBuf, pBufPage);
tFilePage* pBufPage1 = getNewDataBuf(pResultBuf, groupId, &pageId);
tFilePage* t = getResBufPage(pResultBuf, pageId);
assert(t == pBufPage1);
tFilePage* pBufPage2 = getNewDataBuf(pResultBuf, groupId, &pageId);
tFilePage* t1 = getResBufPage(pResultBuf, pageId);
assert(t1 == pBufPage2);
tFilePage* pBufPage3 = getNewDataBuf(pResultBuf, groupId, &pageId);
tFilePage* t2 = getResBufPage(pResultBuf, pageId);
assert(t2 == pBufPage3);
tFilePage* pBufPage4 = getNewDataBuf(pResultBuf, groupId, &pageId);
tFilePage* t3 = getResBufPage(pResultBuf, pageId);
assert(t3 == pBufPage4);
tFilePage* pBufPage5 = getNewDataBuf(pResultBuf, groupId, &pageId);
tFilePage* t4 = getResBufPage(pResultBuf, pageId);
assert(t4 == pBufPage5);
destroyResultBuf(pResultBuf, NULL);
}
} // namespace
......
......@@ -76,6 +76,7 @@ int tdListPrepend(SList *list, void *data) {
SListNode *node = (SListNode *)malloc(sizeof(SListNode) + list->eleSize);
if (node == NULL) return -1;
node->next = node->prev = NULL;
memcpy((void *)(node->data), data, list->eleSize);
tdListPrependNode(list, node);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册