提交 af0180f2 编写于 作者: H Haojun Liao

[td-13039] add linear hash.

上级 0c5a8dc3
......@@ -43,8 +43,8 @@ extern "C" {
typedef struct SArray {
size_t size;
size_t capacity;
size_t elemSize;
uint32_t capacity;
uint32_t elemSize;
void* pData;
} SArray;
......
......@@ -30,7 +30,6 @@ typedef struct SPageInfo SPageInfo;
typedef struct SDiskbasedBuf SDiskbasedBuf;
#define DEFAULT_INTERN_BUF_PAGE_SIZE (1024L) // in bytes
#define DEFAULT_PAGE_SIZE (16384L)
typedef struct SFilePage {
int64_t num;
......@@ -64,7 +63,7 @@ int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMem
* @param pageId
* @return
*/
SFilePage* getNewDataBuf(SDiskbasedBuf* pBuf, int32_t groupId, int32_t* pageId);
void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t groupId, int32_t* pageId);
/**
*
......@@ -80,7 +79,7 @@ SIDList getDataBufPagesIdList(SDiskbasedBuf* pBuf, int32_t groupId);
* @param id
* @return
*/
SFilePage* getBufPage(SDiskbasedBuf* pBuf, int32_t id);
void* getBufPage(SDiskbasedBuf* pBuf, int32_t id);
/**
* release the referenced buf pages
......@@ -151,19 +150,27 @@ bool isAllDataInMemBuf(const SDiskbasedBuf* pBuf);
* @param pPageInfo
* @param dirty
*/
void setBufPageDirty(SFilePage* pPageInfo, bool dirty);
void setBufPageDirty(void* pPageInfo, bool dirty);
/**
* Print the statistics when closing this buffer
* @param pBuf
*/
void setPrintStatis(SDiskbasedBuf* pBuf);
void dBufSetPrintInfo(SDiskbasedBuf* pBuf);
/**
* return buf statistics.
* Return buf statistics.
* @param pBuf
* @return
*/
SDiskbasedBufStatis getDBufStatis(const SDiskbasedBuf* pBuf);
/**
* Print the buffer statistics information
* @param pBuf
*/
void dBufPrintStatis(const SDiskbasedBuf* pBuf);
#ifdef __cplusplus
}
#endif
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TLINEARHASH_H
#define TDENGINE_TLINEARHASH_H
#ifdef __cplusplus
extern "C" {
#endif
#include "thash.h"
enum {
LINEAR_HASH_STATIS = 0x1,
LINEAR_HASH_DATA = 0x2,
};
typedef struct SLHashObj SLHashObj;
SLHashObj* tHashInit(int32_t inMemPages, int32_t pageSize, _hash_fn_t fn, int32_t numOfTuplePerPage);
void* tHashCleanup(SLHashObj* pHashObj);
int32_t tHashPut(SLHashObj* pHashObj, const void *key, size_t keyLen, void *data, size_t size);
char* tHashGet(SLHashObj* pHashObj, const void *key, size_t keyLen);
int32_t tHashRemove(SLHashObj* pHashObj, const void *key, size_t keyLen);
void tHashPrint(const SLHashObj* pHashObj, int32_t type);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_TLINEARHASH_H
......@@ -705,7 +705,7 @@ static int32_t addNewWindowResultBuf(SResultRow *pWindowRes, SDiskbasedBuf *pRes
SIDList list = getDataBufPagesIdList(pResultBuf, tid);
if (taosArrayGetSize(list) == 0) {
pData = getNewDataBuf(pResultBuf, tid, &pageId);
pData = getNewBufPage(pResultBuf, tid, &pageId);
} else {
SPageInfo* pi = getLastPageInfo(list);
pData = getBufPage(pResultBuf, getPageId(pi));
......@@ -714,7 +714,7 @@ static int32_t addNewWindowResultBuf(SResultRow *pWindowRes, SDiskbasedBuf *pRes
if (pData->num + size > getBufPageSize(pResultBuf)) {
// release current page first, and prepare the next one
releaseBufPageInfo(pResultBuf, pi);
pData = getNewDataBuf(pResultBuf, tid, &pageId);
pData = getNewBufPage(pResultBuf, tid, &pageId);
if (pData != NULL) {
assert(pData->num == 0); // number of elements must be 0 for new allocated buffer
}
......@@ -4603,7 +4603,7 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr
tsBufSetTraverseOrder(pRuntimeEnv->pTsBuf, order);
}
int32_t ps = DEFAULT_PAGE_SIZE;
int32_t ps = 4096;
getIntermediateBufInfo(pRuntimeEnv, &ps, &pQueryAttr->intermediateResultRowSize);
int32_t TENMB = 1024*1024*10;
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tlinearhash.h"
#include "tcfg.h"
#include "taoserror.h"
#include "tpagedbuf.h"
#define LHASH_CAP_RATIO 0.85
// Always located in memory
typedef struct SLHashBucket {
SArray *pPageIdList;
int32_t size; // the number of element in this entry
} SLHashBucket;
typedef struct SLHashObj {
SDiskbasedBuf *pBuf;
_hash_fn_t hashFn;
int32_t tuplesPerPage;
SLHashBucket **pBucket; // entry list
int32_t numOfAlloc; // number of allocated bucket ptr slot
int32_t bits; // the number of bits used in hash
int32_t numOfBuckets; // the number of buckets
int64_t size; // the number of total items
} SLHashObj;
/**
* the data struct for each hash node
* +-----------+-------+--------+
* | SLHashNode| key | data |
* +-----------+-------+--------+
*/
typedef struct SLHashNode {
int32_t keyLen;
int32_t dataLen;
} SLHashNode;
#define GET_LHASH_NODE_KEY(_n) (((char*)(_n)) + sizeof(SLHashNode))
#define GET_LHASH_NODE_DATA(_n) ((char*)(_n) + sizeof(SLHashNode) + (_n)->keyLen)
#define GET_LHASH_NODE_LEN(_n) (sizeof(SLHashNode) + ((SLHashNode*)(_n))->keyLen + ((SLHashNode*)(_n))->dataLen)
static int32_t doAddNewBucket(SLHashObj* pHashObj);
static int32_t doGetBucketIdFromHashVal(int32_t hashv, int32_t bits) {
return hashv & ((1ul << (bits)) - 1);
}
static int32_t doGetAlternativeBucketId(int32_t bucketId, int32_t bits, int32_t numOfBuckets) {
int32_t v = bucketId - (1ul << (bits - 1));
ASSERT(v < numOfBuckets);
return v;
}
SLHashObj* tHashInit(int32_t inMemPages, int32_t pageSize, _hash_fn_t fn, int32_t numOfTuplePerPage) {
SLHashObj* pHashObj = calloc(1, sizeof(SLHashObj));
if (pHashObj == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
int32_t code = createDiskbasedBuf(&pHashObj->pBuf, pageSize, inMemPages * pageSize, 0, "/tmp");
if (code != 0) {
terrno = code;
return NULL;
}
/**
* The number of bits in the hash value, which is used to decide the exact bucket where the object should be located in.
* The initial value is 0.
*/
pHashObj->bits = 0;
pHashObj->hashFn = fn;
pHashObj->tuplesPerPage = numOfTuplePerPage;
pHashObj->numOfAlloc = 4; // initial allocated array list
pHashObj->pBucket = calloc(pHashObj->numOfAlloc, POINTER_BYTES);
code = doAddNewBucket(pHashObj);
if (code != TSDB_CODE_SUCCESS) {
destroyDiskbasedBuf(pHashObj->pBuf);
tfree(pHashObj);
terrno = code;
return NULL;
}
return pHashObj;
}
void* tHashCleanup(SLHashObj* pHashObj) {
destroyDiskbasedBuf(pHashObj->pBuf);
for(int32_t i = 0; i < pHashObj->numOfBuckets; ++i) {
taosArrayDestroy(pHashObj->pBucket[i]->pPageIdList);
tfree(pHashObj->pBucket[i]);
}
tfree(pHashObj->pBucket);
tfree(pHashObj);
return NULL;
}
static void doCopyObject(char* p, const void* key, int32_t keyLen, const void* data, int32_t size) {
*(int32_t*) p = keyLen;
p += sizeof(int32_t);
*(int32_t*) p = size;
p += sizeof(int32_t);
memcpy(p, key, keyLen);
p += keyLen;
memcpy(p, data, size);
}
static int32_t doAddToBucket(SLHashObj* pHashObj, SLHashBucket* pBucket, int32_t index, const void* key, int32_t keyLen,
const void* data, int32_t size) {
int32_t pageId = *(int32_t*)taosArrayGetLast(pBucket->pPageIdList);
SFilePage* pPage = getBufPage(pHashObj->pBuf, pageId);
ASSERT (pPage != NULL);
// put to current buf page
size_t nodeSize = sizeof(SLHashNode) + keyLen + size;
ASSERT(nodeSize <= getBufPageSize(pHashObj->pBuf));
if (pPage->num + nodeSize > getBufPageSize(pHashObj->pBuf)) {
releaseBufPage(pHashObj->pBuf, pPage);
// allocate the overflow buffer page to hold this k/v.
int32_t newPageId = -1;
SFilePage* pNewPage = getNewBufPage(pHashObj->pBuf, 0, &newPageId);
if (pNewPage == 0) {
// TODO handle error
}
taosArrayPush(pBucket->pPageIdList, &newPageId);
doCopyObject(pNewPage->data, key, keyLen, data, size);
pNewPage->num = nodeSize;
setBufPageDirty(pNewPage, true);
releaseBufPage(pHashObj->pBuf, pNewPage);
} else {
char* p = pPage->data + pPage->num;
doCopyObject(p, key, keyLen, data, size);
pPage->num += nodeSize;
setBufPageDirty(pPage, true);
releaseBufPage(pHashObj->pBuf, pPage);
}
pBucket->size += 1;
// printf("===> add to bucket:0x%x, num:%d, key:%d\n", index, pBucket->size, *(int*) key);
}
// TODO merge the fragments on multiple pages to recycle the empty disk page ASAP
static void doRemoveFromBucket(SFilePage* pPage, SLHashNode* pNode, SLHashBucket* pBucket) {
ASSERT(pPage != NULL && pNode != NULL);
int32_t len = GET_LHASH_NODE_LEN(pNode);
char* p = (char*) pNode + len;
char* pEnd = pPage->data + pPage->num;
memmove(pNode, p, (pEnd - p));
pPage->num -= len;
if (pPage->num == 0) {
// this page is empty, could be recycle in the future.
}
setBufPageDirty(pPage, true);
pBucket->size -= 1;
}
static int32_t doAddNewBucket(SLHashObj* pHashObj) {
if (pHashObj->numOfBuckets + 1 > pHashObj->numOfAlloc) {
int32_t newLen = pHashObj->numOfAlloc * 1.25;
if (newLen == pHashObj->numOfAlloc) {
newLen += 4;
}
char* p = realloc(pHashObj->pBucket, POINTER_BYTES * newLen);
if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
memset(p + POINTER_BYTES * pHashObj->numOfBuckets, 0, newLen - pHashObj->numOfBuckets);
pHashObj->pBucket = (SLHashBucket**) p;
pHashObj->numOfAlloc = newLen;
}
SLHashBucket* pBucket = calloc(1, sizeof(SLHashBucket));
pHashObj->pBucket[pHashObj->numOfBuckets] = pBucket;
pBucket->pPageIdList = taosArrayInit(2, sizeof(int32_t));
if (pBucket->pPageIdList == NULL || pBucket == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t pageId = -1;
SFilePage* p = getNewBufPage(pHashObj->pBuf, 0, &pageId);
releaseBufPage(pHashObj->pBuf, p);
taosArrayPush(pBucket->pPageIdList, &pageId);
pHashObj->numOfBuckets += 1;
// printf("---------------add new bucket, id:0x%x, total:%d\n", pHashObj->numOfBuckets - 1, pHashObj->numOfBuckets);
return TSDB_CODE_SUCCESS;
}
int32_t tHashPut(SLHashObj* pHashObj, const void *key, size_t keyLen, void *data, size_t size) {
ASSERT(pHashObj != NULL && key != NULL);
if (pHashObj->bits == 0) {
SLHashBucket* pBucket = pHashObj->pBucket[0];
doAddToBucket(pHashObj, pBucket, 0, key, keyLen, data, size);
} else {
int32_t hashVal = pHashObj->hashFn(key, keyLen);
int32_t v = doGetBucketIdFromHashVal(hashVal, pHashObj->bits);
if (pHashObj->numOfBuckets > v) {
SLHashBucket* pBucket = pHashObj->pBucket[v];
// TODO check return code
doAddToBucket(pHashObj, pBucket, v, key, keyLen, data, size);
} else { // no matched bucket exists, find the candidate bucket
int32_t bucketId = doGetAlternativeBucketId(v, pHashObj->bits, pHashObj->numOfBuckets);
// printf("bucketId: 0x%x not exists, put it into 0x%x instead\n", v, bucketId);
SLHashBucket* pBucket = pHashObj->pBucket[bucketId];
doAddToBucket(pHashObj, pBucket, bucketId, key, keyLen, data, size);
}
}
pHashObj->size += 1;
// Too many records, needs to bucket split
if ((pHashObj->numOfBuckets * LHASH_CAP_RATIO * pHashObj->tuplesPerPage) < pHashObj->size) {
int32_t newBucketId = pHashObj->numOfBuckets;
int32_t code = doAddNewBucket(pHashObj);
int32_t numOfBits = ceil(log(pHashObj->numOfBuckets) / log(2));
if (numOfBits > pHashObj->bits) {
// printf("extend the bits from %d to %d, new bucket:%d\n", pHashObj->bits, numOfBits, newBucketId);
ASSERT(numOfBits == pHashObj->bits + 1);
pHashObj->bits = numOfBits;
}
int32_t splitBucketId = (1ul << (pHashObj->bits - 1)) ^ newBucketId;
// load all data in this bucket and check if the data needs to relocated into the new bucket
SLHashBucket* pBucket = pHashObj->pBucket[splitBucketId];
// printf("split %d items' bucket:0x%x to new bucket:0x%x\n", pBucket->size, splitBucketId, newBucketId);
for (int32_t i = 0; i < taosArrayGetSize(pBucket->pPageIdList); ++i) {
int32_t pageId = *(int32_t*)taosArrayGet(pBucket->pPageIdList, i);
SFilePage* p = getBufPage(pHashObj->pBuf, pageId);
char* pStart = p->data;
while (pStart - p->data < p->num) {
SLHashNode* pNode = (SLHashNode*)pStart;
char* k = GET_LHASH_NODE_KEY(pNode);
int32_t hashv = pHashObj->hashFn(k, pNode->keyLen);
int32_t v1 = hashv & ((1ul << (pHashObj->bits)) - 1);
if (v1 != splitBucketId) { // place it into the new bucket
ASSERT(v1 == newBucketId);
// printf("move key:%d to 0x%x bucket, remain items:%d\n", *(int32_t*)k, v1, pBucket->size - 1);
SLHashBucket* pNewBucket = pHashObj->pBucket[newBucketId];
doAddToBucket(pHashObj, pNewBucket, newBucketId, (void*)GET_LHASH_NODE_KEY(pNode), pNode->keyLen,
GET_LHASH_NODE_KEY(pNode), pNode->dataLen);
doRemoveFromBucket(p, pNode, pBucket);
} else {
// printf("check key:%d, located into: %d, skip it\n", *(int*) k, v1);
int32_t nodeSize = GET_LHASH_NODE_LEN(pStart);
pStart += nodeSize;
}
}
releaseBufPage(pHashObj->pBuf, p);
}
}
}
char* tHashGet(SLHashObj* pHashObj, const void *key, size_t keyLen) {
ASSERT(pHashObj != NULL && key != NULL && keyLen > 0);
int32_t hashv = pHashObj->hashFn(key, keyLen);
int32_t bucketId = doGetBucketIdFromHashVal(hashv, pHashObj->bits);
if (bucketId >= pHashObj->numOfBuckets) {
bucketId = doGetAlternativeBucketId(bucketId, pHashObj->bits, pHashObj->numOfBuckets);
}
SLHashBucket* pBucket = pHashObj->pBucket[bucketId];
for (int32_t i = 0; i < taosArrayGetSize(pBucket->pPageIdList); ++i) {
int32_t pageId = *(int32_t*)taosArrayGet(pBucket->pPageIdList, i);
SFilePage* p = getBufPage(pHashObj->pBuf, pageId);
char* pStart = p->data;
while (pStart - p->data < p->num) {
SLHashNode* pNode = (SLHashNode*)pStart;
char* k = GET_LHASH_NODE_KEY(pNode);
if (pNode->keyLen == keyLen && (memcmp(key, k, keyLen) == 0)) {
releaseBufPage(pHashObj->pBuf, p);
return GET_LHASH_NODE_DATA(pNode);
} else {
pStart += GET_LHASH_NODE_LEN(pStart);
}
}
releaseBufPage(pHashObj->pBuf, p);
}
return NULL;
}
int32_t tHashRemove(SLHashObj* pHashObj, const void *key, size_t keyLen) {
}
void tHashPrint(const SLHashObj* pHashObj, int32_t type) {
printf("==================== linear hash ====================\n");
printf("total bucket:%d, size:%ld, ratio:%.2f\n", pHashObj->numOfBuckets, pHashObj->size, LHASH_CAP_RATIO);
dBufSetPrintInfo(pHashObj->pBuf);
if (type == LINEAR_HASH_DATA) {
for (int32_t i = 0; i < pHashObj->numOfBuckets; ++i) {
// printf("bucket: 0x%x, obj:%d, page:%d\n", i, pHashObj->pBucket[i]->size,
// (int)taosArrayGetSize(pHashObj->pBucket[i]->pPageIdList));
}
} else {
dBufPrintStatis(pHashObj->pBuf);
}
}
\ No newline at end of file
......@@ -147,7 +147,7 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
if (pHandle->pBuf == NULL) {
int32_t code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize, 0, "/tmp");
setPrintStatis(pHandle->pBuf);
dBufSetPrintInfo(pHandle->pBuf);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -162,7 +162,7 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
}
int32_t pageId = -1;
SFilePage* pPage = getNewDataBuf(pHandle->pBuf, pHandle->sourceId, &pageId);
SFilePage* pPage = getNewBufPage(pHandle->pBuf, pHandle->sourceId, &pageId);
if (pPage == NULL) {
return terrno;
}
......@@ -211,7 +211,7 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int
// multi-pass internal merge sort is required
if (pHandle->pBuf == NULL) {
code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize, 0, "/tmp");
setPrintStatis(pHandle->pBuf);
dBufSetPrintInfo(pHandle->pBuf);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -462,7 +462,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
}
int32_t pageId = -1;
SFilePage* pPage = getNewDataBuf(pHandle->pBuf, pHandle->sourceId, &pageId);
SFilePage* pPage = getNewBufPage(pHandle->pBuf, pHandle->sourceId, &pageId);
if (pPage == NULL) {
return terrno;
}
......
......@@ -357,7 +357,7 @@ TEST(testCase, external_sort_Test) {
taosArrayDestroy(pOrderVal);
}
#endif
TEST(testCase, sorted_merge_Test) {
srand(time(NULL));
......@@ -430,4 +430,6 @@ TEST(testCase, sorted_merge_Test) {
taosArrayDestroy(pExprInfo);
taosArrayDestroy(pOrderVal);
}
#endif
#pragma GCC diagnostic pop
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <gtest/gtest.h>
#include <iostream>
#include "executorimpl.h"
#include "tlinearhash.h"
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wwrite-strings"
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wsign-compare"
#include "os.h"
TEST(testCase, linear_hash_Tests) {
srand(time(NULL));
_hash_fn_t fn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT);
#if 1
SLHashObj* pHashObj = tHashInit(100, 64 + 8, fn, 4);
for(int32_t i = 0; i < 5000; ++i) {
tHashPut(pHashObj, &i, sizeof(i), &i, sizeof(i));
}
tHashPrint(pHashObj, LINEAR_HASH_STATIS);
for(int32_t i = 0; i < 100; ++i) {
char* v = tHashGet(pHashObj, &i, sizeof(i));
if (v != NULL) {
// printf("find value: %d, key:%d\n", *(int32_t*) v, i);
} else {
printf("failed to found key:%d in hash\n", i);
}
}
tHashPrint(pHashObj, LINEAR_HASH_DATA);
tHashCleanup(pHashObj);
#endif
#if 0
SHashObj* pHashObj = taosHashInit(1000, fn, false, HASH_NO_LOCK);
for(int32_t i = 0; i < 500000; ++i) {
taosHashPut(pHashObj, &i, sizeof(i), &i, sizeof(i));
}
for(int32_t i = 0; i < 10000; ++i) {
void* v = taosHashGet(pHashObj, &i, sizeof(i));
}
taosHashCleanup(pHashObj);
#endif
}
\ No newline at end of file
......@@ -221,7 +221,7 @@ tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType, double minval,
}
pBucket->numOfSlots = DEFAULT_NUM_OF_SLOT;
pBucket->bufPageSize = DEFAULT_PAGE_SIZE * 4; // 4k per page
pBucket->bufPageSize = 16384 * 4; // 16k per page
pBucket->type = dataType;
pBucket->bytes = nElemSize;
......@@ -347,7 +347,7 @@ int32_t tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size) {
pSlot->info.data = NULL;
}
pSlot->info.data = getNewDataBuf(pBucket->pBuffer, groupId, &pageId);
pSlot->info.data = getNewBufPage(pBucket->pBuffer, groupId, &pageId);
pSlot->info.pageId = pageId;
}
......
......@@ -18,7 +18,7 @@ typedef struct SPageDiskInfo {
int32_t length;
} SPageDiskInfo;
typedef struct SPageInfo {
struct SPageInfo {
SListNode* pn; // point to list node
void* pData;
int64_t offset;
......@@ -26,9 +26,9 @@ typedef struct SPageInfo {
int32_t length:30;
bool used:1; // set current page is in used
bool dirty:1; // set current buffer page is dirty or not
} SPageInfo;
};
typedef struct SDiskbasedBuf {
struct SDiskbasedBuf {
int32_t numOfPages;
int64_t totalBufSize;
uint64_t fileSize; // disk file size
......@@ -49,9 +49,7 @@ typedef struct SDiskbasedBuf {
uint64_t qId; // for debug purpose
bool printStatis; // Print statistics info when closing this buffer.
SDiskbasedBufStatis statis;
} SDiskbasedBuf;
static void printStatisData(const SDiskbasedBuf* pBuf);
};
int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMemBufSize, uint64_t qId, const char* dir) {
*pBuf = calloc(1, sizeof(SDiskbasedBuf));
......@@ -121,7 +119,7 @@ static char* doDecompressData(void* data, int32_t srcSize, int32_t *dst, SDiskba
return data;
}
*dst = tsDecompressString(data, srcSize, 1, pBuf->assistBuf, pBuf->pageSize+sizeof(SFilePage), ONE_STAGE_COMP, NULL, 0);
*dst = tsDecompressString(data, srcSize, 1, pBuf->assistBuf, pBuf->pageSize, ONE_STAGE_COMP, NULL, 0);
if (*dst > 0) {
memcpy(data, pBuf->assistBuf, *dst);
}
......@@ -151,14 +149,23 @@ static uint64_t allocatePositionInFile(SDiskbasedBuf* pBuf, size_t size) {
}
}
/**
* +--------------------------+-------------------+--------------+
* | PTR to SPageInfo (8bytes)| Payload (PageSize)| 2 Extra Bytes|
* +--------------------------+-------------------+--------------+
* @param pBuf
* @param pg
* @return
*/
static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
assert(!pg->used && pg->pData != NULL);
int32_t size = -1;
char* t = NULL;
if (pg->offset == -1 || pg->dirty) {
SFilePage* pPage = (SFilePage*) GET_DATA_PAYLOAD(pg);
t = doCompressData(pPage->data, pBuf->pageSize, &size, pBuf);
void* payload = GET_DATA_PAYLOAD(pg);
t = doCompressData(payload, pBuf->pageSize, &size, pBuf);
assert(size >= 0);
}
// this page is flushed to disk for the first time
......@@ -217,10 +224,14 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
pBuf->statis.flushBytes += size;
pBuf->statis.flushPages += 1;
} else {
size = pg->length;
}
assert(size >= 0);
char* pDataBuf = pg->pData;
memset(pDataBuf, 0, pBuf->pageSize + sizeof(SFilePage));
memset(pDataBuf, 0, pBuf->pageSize);
pg->pData = NULL; // this means the data is not in buffer
pg->length = size;
......@@ -251,8 +262,8 @@ static int32_t loadPageFromDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
return ret;
}
SFilePage* pPage = (SFilePage*) GET_DATA_PAYLOAD(pg);
ret = (int32_t)fread(pPage->data, 1, pg->length, pBuf->file);
void* pPage = (void*) GET_DATA_PAYLOAD(pg);
ret = (int32_t)fread(pPage, 1, pg->length, pBuf->file);
if (ret != pg->length) {
ret = TAOS_SYSTEM_ERROR(errno);
return ret;
......@@ -262,7 +273,7 @@ static int32_t loadPageFromDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
pBuf->statis.loadPages += 1;
int32_t fullSize = 0;
doDecompressData(pPage->data, pg->length, &fullSize, pBuf);
doDecompressData(pPage, pg->length, &fullSize, pBuf);
return 0;
}
......@@ -288,7 +299,7 @@ static SPageInfo* registerPage(SDiskbasedBuf* pBuf, int32_t groupId, int32_t pag
pBuf->numOfPages += 1;
SPageInfo* ppi = malloc(sizeof(SPageInfo));//{ .info = PAGE_INFO_INITIALIZER, .pageId = pageId, .pn = NULL};
SPageInfo* ppi = malloc(sizeof(SPageInfo));
ppi->pageId = pageId;
ppi->pData = NULL;
......@@ -302,6 +313,7 @@ static SPageInfo* registerPage(SDiskbasedBuf* pBuf, int32_t groupId, int32_t pag
static SListNode* getEldestUnrefedPage(SDiskbasedBuf* pBuf) {
SListIter iter = {0};
tdListInitIter(pBuf->lruList, &iter, TD_LIST_BACKWARD);
SListNode* pn = NULL;
......@@ -313,6 +325,8 @@ static SListNode* getEldestUnrefedPage(SDiskbasedBuf* pBuf) {
if (!pageInfo->used) {
break;
} else {
printf("page %d is used, dirty:%d\n", pageInfo->pageId, pageInfo->dirty);
}
}
......@@ -360,10 +374,10 @@ static void lruListMoveToFront(SList *pList, SPageInfo* pi) {
}
static FORCE_INLINE size_t getAllocPageSize(int32_t pageSize) {
return pageSize + POINTER_BYTES + 2 + sizeof(SFilePage);
return pageSize + POINTER_BYTES + 2;
}
SFilePage* getNewDataBuf(SDiskbasedBuf* pBuf, int32_t groupId, int32_t* pageId) {
void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t groupId, int32_t* pageId) {
pBuf->statis.getPages += 1;
char* availablePage = NULL;
......@@ -379,6 +393,10 @@ SFilePage* getNewDataBuf(SDiskbasedBuf* pBuf, int32_t groupId, int32_t* pageId)
// register new id in this group
*pageId = (++pBuf->allocateId);
if (*pageId == 11) {
printf("page is allocated, id:%d\n", *pageId);
}
// register page id info
SPageInfo* pi = registerPage(pBuf, groupId, *pageId);
......@@ -404,7 +422,7 @@ SFilePage* getNewDataBuf(SDiskbasedBuf* pBuf, int32_t groupId, int32_t* pageId)
return (void *)(GET_DATA_PAYLOAD(pi));
}
SFilePage* getBufPage(SDiskbasedBuf* pBuf, int32_t id) {
void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) {
assert(pBuf != NULL && id >= 0);
pBuf->statis.getPages += 1;
......@@ -493,7 +511,7 @@ void destroyDiskbasedBuf(SDiskbasedBuf* pBuf) {
return;
}
printStatisData(pBuf);
dBufPrintStatis(pBuf);
if (pBuf->file != NULL) {
uDebug("Paged buffer closed, total:%.2f Kb (%d Pages), inmem size:%.2f Kb (%d Pages), file size:%.2f Kb, page size:%.2f Kb, %"PRIx64"\n",
......@@ -561,7 +579,7 @@ bool isAllDataInMemBuf(const SDiskbasedBuf* pBuf) {
return pBuf->fileSize == 0;
}
void setBufPageDirty(SFilePage* pPage, bool dirty) {
void setBufPageDirty(void* pPage, bool dirty) {
int32_t offset = offsetof(SPageInfo, pData);
char* p = (char*)pPage - offset;
......@@ -569,7 +587,7 @@ void setBufPageDirty(SFilePage* pPage, bool dirty) {
ppi->dirty = dirty;
}
void setPrintStatis(SDiskbasedBuf* pBuf) {
void dBufSetPrintInfo(SDiskbasedBuf* pBuf) {
pBuf->printStatis = true;
}
......@@ -577,7 +595,7 @@ SDiskbasedBufStatis getDBufStatis(const SDiskbasedBuf* pBuf) {
return pBuf->statis;
}
void printStatisData(const SDiskbasedBuf* pBuf) {
void dBufPrintStatis(const SDiskbasedBuf* pBuf) {
if (!pBuf->printStatis) {
return;
}
......
......@@ -18,7 +18,7 @@ void simpleTest() {
int32_t pageId = 0;
int32_t groupId = 0;
SFilePage* pBufPage = getNewDataBuf(pResultBuf, groupId, &pageId);
SFilePage* pBufPage = getNewBufPage(pResultBuf, groupId, &pageId);
ASSERT_TRUE(pBufPage != NULL);
ASSERT_EQ(getTotalBufSize(pResultBuf), 1024);
......@@ -29,24 +29,24 @@ void simpleTest() {
releaseBufPage(pResultBuf, pBufPage);
SFilePage* pBufPage1 = getNewDataBuf(pResultBuf, groupId, &pageId);
SFilePage* pBufPage1 = getNewBufPage(pResultBuf, groupId, &pageId);
SFilePage* t = getBufPage(pResultBuf, pageId);
ASSERT_TRUE(t == pBufPage1);
SFilePage* pBufPage2 = getNewDataBuf(pResultBuf, groupId, &pageId);
SFilePage* pBufPage2 = getNewBufPage(pResultBuf, groupId, &pageId);
SFilePage* t1 = getBufPage(pResultBuf, pageId);
ASSERT_TRUE(t1 == pBufPage2);
SFilePage* pBufPage3 = getNewDataBuf(pResultBuf, groupId, &pageId);
SFilePage* pBufPage3 = getNewBufPage(pResultBuf, groupId, &pageId);
SFilePage* t2 = getBufPage(pResultBuf, pageId);
ASSERT_TRUE(t2 == pBufPage3);
SFilePage* pBufPage4 = getNewDataBuf(pResultBuf, groupId, &pageId);
SFilePage* pBufPage4 = getNewBufPage(pResultBuf, groupId, &pageId);
SFilePage* t3 = getBufPage(pResultBuf, pageId);
ASSERT_TRUE(t3 == pBufPage4);
SFilePage* pBufPage5 = getNewDataBuf(pResultBuf, groupId, &pageId);
SFilePage* pBufPage5 = getNewBufPage(pResultBuf, groupId, &pageId);
SFilePage* t4 = getBufPage(pResultBuf, pageId);
ASSERT_TRUE(t4 == pBufPage5);
......@@ -62,29 +62,29 @@ void writeDownTest() {
int32_t groupId = 0;
int32_t nx = 12345;
SFilePage* pBufPage = getNewDataBuf(pResultBuf, groupId, &pageId);
SFilePage* pBufPage = getNewBufPage(pResultBuf, groupId, &pageId);
ASSERT_TRUE(pBufPage != NULL);
*(int32_t*)(pBufPage->data) = nx;
writePageId = pageId;
releaseBufPage(pResultBuf, pBufPage);
SFilePage* pBufPage1 = getNewDataBuf(pResultBuf, groupId, &pageId);
SFilePage* pBufPage1 = getNewBufPage(pResultBuf, groupId, &pageId);
SFilePage* t1 = getBufPage(pResultBuf, pageId);
ASSERT_TRUE(t1 == pBufPage1);
ASSERT_TRUE(pageId == 1);
SFilePage* pBufPage2 = getNewDataBuf(pResultBuf, groupId, &pageId);
SFilePage* pBufPage2 = getNewBufPage(pResultBuf, groupId, &pageId);
SFilePage* t2 = getBufPage(pResultBuf, pageId);
ASSERT_TRUE(t2 == pBufPage2);
ASSERT_TRUE(pageId == 2);
SFilePage* pBufPage3 = getNewDataBuf(pResultBuf, groupId, &pageId);
SFilePage* pBufPage3 = getNewBufPage(pResultBuf, groupId, &pageId);
SFilePage* t3 = getBufPage(pResultBuf, pageId);
ASSERT_TRUE(t3 == pBufPage3);
ASSERT_TRUE(pageId == 3);
SFilePage* pBufPage4 = getNewDataBuf(pResultBuf, groupId, &pageId);
SFilePage* pBufPage4 = getNewBufPage(pResultBuf, groupId, &pageId);
SFilePage* t4 = getBufPage(pResultBuf, pageId);
ASSERT_TRUE(t4 == pBufPage4);
ASSERT_TRUE(pageId == 4);
......@@ -109,32 +109,32 @@ void recyclePageTest() {
int32_t groupId = 0;
int32_t nx = 12345;
SFilePage* pBufPage = getNewDataBuf(pResultBuf, groupId, &pageId);
SFilePage* pBufPage = getNewBufPage(pResultBuf, groupId, &pageId);
ASSERT_TRUE(pBufPage != NULL);
releaseBufPage(pResultBuf, pBufPage);
SFilePage* pBufPage1 = getNewDataBuf(pResultBuf, groupId, &pageId);
SFilePage* pBufPage1 = getNewBufPage(pResultBuf, groupId, &pageId);
SFilePage* t1 = getBufPage(pResultBuf, pageId);
ASSERT_TRUE(t1 == pBufPage1);
ASSERT_TRUE(pageId == 1);
SFilePage* pBufPage2 = getNewDataBuf(pResultBuf, groupId, &pageId);
SFilePage* pBufPage2 = getNewBufPage(pResultBuf, groupId, &pageId);
SFilePage* t2 = getBufPage(pResultBuf, pageId);
ASSERT_TRUE(t2 == pBufPage2);
ASSERT_TRUE(pageId == 2);
SFilePage* pBufPage3 = getNewDataBuf(pResultBuf, groupId, &pageId);
SFilePage* pBufPage3 = getNewBufPage(pResultBuf, groupId, &pageId);
SFilePage* t3 = getBufPage(pResultBuf, pageId);
ASSERT_TRUE(t3 == pBufPage3);
ASSERT_TRUE(pageId == 3);
SFilePage* pBufPage4 = getNewDataBuf(pResultBuf, groupId, &pageId);
SFilePage* pBufPage4 = getNewBufPage(pResultBuf, groupId, &pageId);
SFilePage* t4 = getBufPage(pResultBuf, pageId);
ASSERT_TRUE(t4 == pBufPage4);
ASSERT_TRUE(pageId == 4);
releaseBufPage(pResultBuf, t4);
SFilePage* pBufPage5 = getNewDataBuf(pResultBuf, groupId, &pageId);
SFilePage* pBufPage5 = getNewBufPage(pResultBuf, groupId, &pageId);
SFilePage* t5 = getBufPage(pResultBuf, pageId);
ASSERT_TRUE(t5 == pBufPage5);
ASSERT_TRUE(pageId == 5);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册