提交 68fc2acc 编写于 作者: H Hongze Cheng

refact

上级 5ec83cbf
...@@ -15,10 +15,11 @@ ...@@ -15,10 +15,11 @@
#include "tsdb.h" #include "tsdb.h"
typedef struct SMemTable SMemTable; typedef struct SMemTable SMemTable;
typedef struct SMemData SMemData; typedef struct SMemData SMemData;
typedef struct SMemSkipList SMemSkipList; typedef struct SMemSkipList SMemSkipList;
typedef struct SMemSkipListNode SMemSkipListNode; typedef struct SMemSkipListNode SMemSkipListNode;
typedef struct SMemSkipListCurosr SMemSkipListCurosr;
struct SMemTable { struct SMemTable {
STsdb *pTsdb; STsdb *pTsdb;
...@@ -57,6 +58,20 @@ struct SMemData { ...@@ -57,6 +58,20 @@ struct SMemData {
SMemSkipList sl; SMemSkipList sl;
}; };
struct SMemSkipListCurosr {
SMemSkipList *pSl;
SMemSkipListNode *pNodeC;
};
#define SL_NODE_SIZE(l) (sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l)*2)
#define SL_NODE_HALF_SIZE(l) (sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l))
#define SL_NODE_FORWARD(n, l) ((n)->forwards[l])
#define SL_NODE_BACKWARD(n, l) ((n)->forwards[(n)->level + (l)])
#define SL_NODE_DATA(n) (&SL_NODE_BACKWARD(n, (n)->level))
#define SL_HEAD_NODE(sl) ((sl)->pHead)
#define SL_TAIL_NODE(sl) ((SMemSkipListNode *)&SL_NODE_FORWARD(SL_HEAD_NODE(sl), (sl)->maxLevel))
// SMemTable // SMemTable
int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable **ppMemTb) { int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable **ppMemTb) {
SMemTable *pMemTb = NULL; SMemTable *pMemTb = NULL;
...@@ -96,17 +111,18 @@ int32_t tsdbMemTableDestroy2(STsdb *pTsdb, SMemTable *pMemTb) { ...@@ -96,17 +111,18 @@ int32_t tsdbMemTableDestroy2(STsdb *pTsdb, SMemTable *pMemTb) {
} }
int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *pSubmitBlk) { int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *pSubmitBlk) {
SMemData *pMemData; SMemData *pMemData;
STsdb *pTsdb = pMemTb->pTsdb; STsdb *pTsdb = pMemTb->pTsdb;
SVnode *pVnode = pTsdb->pVnode; SVnode *pVnode = pTsdb->pVnode;
SVBufPool *pPool = pVnode->inUse; SVBufPool *pPool = pVnode->inUse;
int32_t hash; int32_t hash;
int32_t tlen; int32_t tlen;
uint8_t buf[16]; uint8_t buf[16];
int32_t rlen; int32_t rlen;
const uint8_t *p; const uint8_t *p;
SMemSkipListNode *pSlNode; SMemSkipListNode *pSlNode;
const STSRow *pTSRow; const STSRow *pTSRow;
SMemSkipListCurosr slc = {0};
// search hash // search hash
hash = (pSubmitBlk->suid + pSubmitBlk->uid) % pMemTb->nBucket; hash = (pSubmitBlk->suid + pSubmitBlk->uid) % pMemTb->nBucket;
...@@ -116,7 +132,11 @@ int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *p ...@@ -116,7 +132,11 @@ int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *p
// create pMemData if need // create pMemData if need
if (pMemData == NULL) { if (pMemData == NULL) {
pMemData = vnodeBufPoolMalloc(pPool, sizeof(*pMemData)); int8_t maxLevel = pVnode->config.tsdbCfg.slLevel;
int32_t tsize = sizeof(*pMemData) + SL_NODE_HALF_SIZE(maxLevel) * 2;
SMemSkipListNode *pHead, *pTail;
pMemData = vnodeBufPoolMalloc(pPool, tsize);
if (pMemData == NULL) { if (pMemData == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
...@@ -131,45 +151,73 @@ int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *p ...@@ -131,45 +151,73 @@ int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *p
pMemData->maxVer = -1; pMemData->maxVer = -1;
pMemData->nRows = 0; pMemData->nRows = 0;
pMemData->sl.seed = taosRand(); pMemData->sl.seed = taosRand();
pMemData->sl.maxLevel = pVnode->config.tsdbCfg.slLevel; pMemData->sl.maxLevel = maxLevel;
pMemData->sl.level = 0; pMemData->sl.level = 0;
pMemData->sl.size = 0; pMemData->sl.size = 0;
pHead = SL_HEAD_NODE(&pMemData->sl);
pTail = SL_TAIL_NODE(&pMemData->sl);
pHead->level = maxLevel;
pTail->level = maxLevel;
for (int iLevel = 0; iLevel < maxLevel; iLevel++) {
SL_NODE_FORWARD(pHead, iLevel) = pTail;
SL_NODE_FORWARD(pTail, iLevel) = pHead;
}
// add to MemTable // add to MemTable
hash = (pMemData->suid + pMemData->uid) % pMemTb->nBucket; hash = (pMemData->suid + pMemData->uid) % pMemTb->nBucket;
pMemData->pHashNext = pMemTb->pBuckets[hash]; pMemData->pHashNext = pMemTb->pBuckets[hash];
pMemTb->pBuckets[hash] = pMemData; pMemTb->pBuckets[hash] = pMemData;
pMemTb->nHash++;
} }
// loop to insert data to skiplist // loop to insert data to skiplist
#if 0
tsdbMemSkipListCursorOpen(&slc, &pMemData->sl);
p = pSubmitBlk->pData; p = pSubmitBlk->pData;
for (;;) { for (;;) {
if (p - (uint8_t *)pSubmitBlk->pData >= pSubmitBlk->nData) break; if (p - (uint8_t *)pSubmitBlk->pData >= pSubmitBlk->nData) break;
// p = tGetLen(p, &rlen); const uint8_t *pt = p;
pTSRow = (STSRow *)p; p = tGetBinary(p, &pTSRow, &rlen);
p += rlen;
if (pTSRow == NULL) break;
// check the row (todo) // check the row (todo)
// move the cursor to position to write (todo) // move the cursor to position to write (todo)
int32_t c;
// insert the row tsdbMemSkipListCursorMoveTo(&slc, pTSRow, version, &c);
int8_t level = 1; ASSERT(c);
tlen = 0; // sizeof(int64_t) + tsdbPutLen(rlen) + rlen;
pSlNode = vnodeBufPoolMalloc(pPool, tlen); // encode row
if (pSlNode == NULL) { int8_t level = tsdbMemSkipListRandLevel(&pMemData->sl);
ASSERT(0); int32_t tsize = SL_NODE_SIZE(level) + sizeof(version) + (p - pt);
} pSlNode = vnodeBufPoolMalloc(pPool, tsize);
pSlNode->level = level;
uint8_t *pData = SL_NODE_DATA(pSlNode);
*(int64_t *)pData = version;
pData += sizeof(version);
memcpy(pData, pt, p - pt);
// insert row
tsdbMemSkipListCursorPut(&slc, pSlNode);
// update status
if (pTSRow->ts < pMemData->minKey) pMemData->minKey = pTSRow->ts;
if (pTSRow->ts > pMemData->maxKey) pMemData->maxKey = pTSRow->ts;
} }
tsdbMemSkipListCursorClose(&slc);
#endif
return 0; if (pMemData->minVer == -1) pMemData->minVer = version;
} if (pMemData->maxVer == -1 || pMemData->maxVer < version) pMemData->maxVer = version;
static void tsdbEncodeRow(int64_t version, int32_t rlen, const STSRow *pRow) {} if (pMemTb->minKey < pMemData->minKey) pMemTb->minKey = pMemData->minKey;
if (pMemTb->maxKey < pMemData->maxKey) pMemTb->maxKey = pMemData->maxKey;
if (pMemTb->minVer == -1) pMemTb->minVer = version;
if (pMemTb->maxVer == -1 || pMemTb->maxVer < version) pMemTb->maxVer = version;
static void tsdbDecodeRow(int64_t *version, int32_t *rlen, const STSRow **ppRow) {} return 0;
}
// SMemData // SMemData
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册