提交 b6593d9a 编写于 作者: H Hongze Cheng

refact: tsdb

上级 1088e463
......@@ -22,15 +22,16 @@ typedef struct SMemSkipListNode SMemSkipListNode;
typedef struct SMemSkipListCurosr SMemSkipListCurosr;
struct SMemTable {
STsdb *pTsdb;
TSKEY minKey;
TSKEY maxKey;
int64_t minVer;
int64_t maxVer;
int64_t nRows;
int32_t nHash;
int32_t nBucket;
SMemData **pBuckets;
STsdb *pTsdb;
TSKEY minKey;
TSKEY maxKey;
int64_t minVer;
int64_t maxVer;
int64_t nRows;
int32_t nHash;
int32_t nBucket;
SMemData **pBuckets;
SMemSkipListCurosr *pSlc;
};
struct SMemSkipListNode {
......@@ -60,9 +61,15 @@ struct SMemData {
struct SMemSkipListCurosr {
SMemSkipList *pSl;
SMemSkipListNode *pNodeC;
SMemSkipListNode *forwards[];
};
typedef struct {
int64_t version;
uint32_t szRow;
const STSRow *pRow;
} STsdbRow;
#define HASH_BUCKET(SUID, UID, NBUCKET) (TABS((SUID) + (UID)) % (NBUCKET))
#define SL_NODE_SIZE(l) (sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l)*2)
......@@ -76,7 +83,13 @@ struct SMemSkipListCurosr {
#define SL_HEAD_NODE_FORWARD(n, l) SL_NODE_FORWARD(n, l)
#define SL_TAIL_NODE_BACKWARD(n, l) SL_NODE_FORWARD(n, l)
static int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl);
static int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl);
static int32_t tsdbEncodeRow(SEncoder *pEncoder, const STsdbRow *pRow);
static int32_t tsdbDecodeRow(SDecoder *pDecoder, STsdbRow *pRow);
static int32_t tsdbMemSkipListCursorCreate(int8_t maxLevel, SMemSkipListCurosr **ppSlc);
static void tsdbMemSkipListCursorDestroy(SMemSkipListCurosr *pSlc);
static void tsdbMemSkipListCursorInit(SMemSkipListCurosr *pSlc, SMemSkipList *pSl);
static void tsdbMemSkipListCursorPut(SMemSkipListCurosr *pSlc, SMemSkipListNode *pNode);
// SMemTable
int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable **ppMemTb) {
......@@ -102,6 +115,11 @@ int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable **ppMemTb) {
taosMemoryFree(pMemTb);
return -1;
}
if (tsdbMemSkipListCursorCreate(pTsdb->pVnode->config.tsdbCfg.slLevel, &pMemTb->pSlc) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(pMemTb->pBuckets);
taosMemoryFree(pMemTb);
}
*ppMemTb = pMemTb;
return 0;
......@@ -110,6 +128,7 @@ int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable **ppMemTb) {
int32_t tsdbMemTableDestroy2(STsdb *pTsdb, SMemTable *pMemTb) {
if (pMemTb) {
// loop to destroy the contents (todo)
tsdbMemSkipListCursorDestroy(pMemTb->pSlc);
taosMemoryFree(pMemTb->pBuckets);
taosMemoryFree(pMemTb);
}
......@@ -177,52 +196,47 @@ int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *p
}
// do insert data to SMemData
SMemSkipListCurosr slc = {0};
const STSRow *pRow;
uint32_t szRow;
SDecoder decoder = {0};
STsdbRow tRow = {.version = version};
SEncoder ec = {0};
SDecoder dc = {0};
tDecoderInit(&decoder, pSubmitBlk->pData, pSubmitBlk->nData);
tDecoderInit(&dc, pSubmitBlk->pData, pSubmitBlk->nData);
tsdbMemSkipListCursorInit(pMemTb->pSlc, &pMemData->sl);
for (;;) {
if (tDecodeIsEnd(&decoder)) break;
if (tDecodeIsEnd(&dc)) break;
if (tDecodeBinary(&decoder, (const uint8_t **)&pRow, &szRow) < 0) {
if (tDecodeBinary(&dc, (const uint8_t **)&tRow.pRow, &tRow.szRow) < 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
// check the row (todo)
// // move the cursor to position to write (todo)
// int32_t c;
// tsdbMemSkipListCursorMoveTo(&slc, pTSRow, version, &c);
// ASSERT(c);
// move cursor
// encode row
int8_t level = tsdbMemSkipListRandLevel(&pMemData->sl);
int32_t tsize = SL_NODE_SIZE(level) + sizeof(version) + (0 /*todo*/);
SMemSkipListNode *pNode = vnodeBufPoolMalloc(pPool, tsize);
int8_t level = tsdbMemSkipListRandLevel(&pMemData->sl);
int32_t tsize;
int32_t ret;
tEncodeSize(tsdbEncodeRow, &tRow, tsize, ret);
SMemSkipListNode *pNode = vnodeBufPoolMalloc(pPool, tsize + SL_NODE_SIZE(level));
if (pNode == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pNode->level = level;
tEncoderInit(&ec, (uint8_t *)SL_NODE_DATA(pNode), tsize);
ret = tsdbEncodeRow(&ec, &tRow);
ASSERT(ret == 0);
tEncoderClear(&ec);
// uint8_t *pData = SL_NODE_DATA(pSlNode);
// *(int64_t *)pData = version;
// pData += sizeof(version);
// memcpy(pData, pt, p - pt);
// // insert row
// tsdbMemSkipListCursorPut(&slc, pSlNode);
// put the node
tsdbMemSkipListCursorPut(pMemTb->pSlc, pNode);
// update status
if (pRow->ts < pMemData->minKey) pMemData->minKey = pRow->ts;
if (pRow->ts > pMemData->maxKey) pMemData->maxKey = pRow->ts;
if (tRow.pRow->ts < pMemData->minKey) pMemData->minKey = tRow.pRow->ts;
if (tRow.pRow->ts > pMemData->maxKey) pMemData->maxKey = tRow.pRow->ts;
}
tDecoderClear(&decoder);
// tsdbMemSkipListCursorClose(&slc);
tDecoderClear(&dc);
// update status
if (pMemData->minVer == -1) pMemData->minVer = version;
......@@ -236,7 +250,7 @@ int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *p
return 0;
}
static int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl) {
static FORCE_INLINE int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl) {
int8_t level = 1;
int8_t tlevel;
const uint32_t factor = 4;
......@@ -249,4 +263,51 @@ static int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl) {
}
return level;
}
static FORCE_INLINE int32_t tsdbEncodeRow(SEncoder *pEncoder, const STsdbRow *pRow) {
if (tEncodeI64(pEncoder, pRow->version) < 0) return -1;
if (tEncodeBinary(pEncoder, (const uint8_t *)pRow->pRow, pRow->szRow) < 0) return -1;
return 0;
}
static FORCE_INLINE int32_t tsdbDecodeRow(SDecoder *pDecoder, STsdbRow *pRow) {
if (tDecodeI64(pDecoder, &pRow->version) < 0) return -1;
if (tDecodeBinary(pDecoder, (const uint8_t **)&pRow->pRow, &pRow->szRow) < 0) return -1;
return 0;
}
static int32_t tsdbMemSkipListCursorCreate(int8_t maxLevel, SMemSkipListCurosr **ppSlc) {
*ppSlc = (SMemSkipListCurosr *)taosMemoryCalloc(1, sizeof(**ppSlc) + sizeof(SMemSkipListNode *) * maxLevel);
if (*ppSlc == NULL) {
return -1;
}
return 0;
}
static void tsdbMemSkipListCursorDestroy(SMemSkipListCurosr *pSlc) { taosMemoryFree(pSlc); }
static void tsdbMemSkipListCursorInit(SMemSkipListCurosr *pSlc, SMemSkipList *pSl) {
SMemSkipListNode *pHead = SL_HEAD_NODE(pSl);
pSlc->pSl = pSl;
for (int8_t iLevel = 0; iLevel < pSl->maxLevel; iLevel++) {
pSlc->forwards[iLevel] = pHead;
}
}
static void tsdbMemSkipListCursorPut(SMemSkipListCurosr *pSlc, SMemSkipListNode *pNode) {
SMemSkipList *pSl = pSlc->pSl;
SMemSkipListNode *pNodeNext;
for (int8_t iLevel = 0; iLevel < pNode->level; iLevel++) {
// todo
ASSERT(0);
}
if (pSl->level < pNode->level) {
pSl->level = pNode->level;
}
pSl->size += 1;
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册