未验证 提交 8bf48907 编写于 作者: H Hongze Cheng 提交者: GitHub

Merge pull request #13421 from taosdata/feat/row_refact

feat: vnode multi-version
...@@ -46,6 +46,7 @@ void tTSchemaDestroy(STSchema *pTSchema); ...@@ -46,6 +46,7 @@ void tTSchemaDestroy(STSchema *pTSchema);
#define COL_VAL_NULL(CID) ((SColVal){.cid = (CID), .isNull = 1}) #define COL_VAL_NULL(CID) ((SColVal){.cid = (CID), .isNull = 1})
#define COL_VAL_VALUE(CID, V) ((SColVal){.cid = (CID), .value = (V)}) #define COL_VAL_VALUE(CID, V) ((SColVal){.cid = (CID), .value = (V)})
int32_t tTSRowNew(STSRowBuilder *pBuilder, SArray *pArray, STSchema *pTSchema, STSRow2 **ppRow);
int32_t tTSRowClone(const STSRow2 *pRow, STSRow2 **ppRow); int32_t tTSRowClone(const STSRow2 *pRow, STSRow2 **ppRow);
void tTSRowFree(STSRow2 *pRow); void tTSRowFree(STSRow2 *pRow);
void tTSRowGet(STSRow2 *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal); void tTSRowGet(STSRow2 *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal);
...@@ -54,24 +55,24 @@ int32_t tPutTSRow(uint8_t *p, STSRow2 *pRow); ...@@ -54,24 +55,24 @@ int32_t tPutTSRow(uint8_t *p, STSRow2 *pRow);
int32_t tGetTSRow(uint8_t *p, STSRow2 *pRow); int32_t tGetTSRow(uint8_t *p, STSRow2 *pRow);
// STSRowBuilder // STSRowBuilder
#if 0 #define tsRowBuilderInit() ((STSRowBuilder){0})
int32_t tTSRowBuilderInit(STSRowBuilder *pBuilder, int32_t sver, int32_t nCols, SSchema *pSchema); #define tsRowBuilderClear(B) \
void tTSRowBuilderClear(STSRowBuilder *pBuilder); do { \
void tTSRowBuilderReset(STSRowBuilder *pBuilder); if ((B)->pBuf) { \
int32_t tTSRowBuilderPut(STSRowBuilder *pBuilder, int32_t cid, uint8_t *pData, uint32_t nData); taosMemoryFree((B)->pBuf); \
int32_t tTSRowBuilderGetRow(STSRowBuilder *pBuilder, const STSRow2 **ppRow); } \
#endif } while (0)
// STag // STag
int32_t tTagNew(SArray *pArray, int32_t version, int8_t isJson, STag **ppTag); int32_t tTagNew(SArray *pArray, int32_t version, int8_t isJson, STag **ppTag);
void tTagFree(STag *pTag); void tTagFree(STag *pTag);
bool tTagGet(const STag *pTag, STagVal *pTagVal); bool tTagGet(const STag *pTag, STagVal *pTagVal);
char* tTagValToData(const STagVal *pTagVal, bool isJson); char *tTagValToData(const STagVal *pTagVal, bool isJson);
int32_t tEncodeTag(SEncoder *pEncoder, const STag *pTag); int32_t tEncodeTag(SEncoder *pEncoder, const STag *pTag);
int32_t tDecodeTag(SDecoder *pDecoder, STag **ppTag); int32_t tDecodeTag(SDecoder *pDecoder, STag **ppTag);
int32_t tTagToValArray(const STag *pTag, SArray **ppArray); int32_t tTagToValArray(const STag *pTag, SArray **ppArray);
void debugPrintSTag(STag *pTag, const char *tag, int32_t ln); // TODO: remove void debugPrintSTag(STag *pTag, const char *tag, int32_t ln); // TODO: remove
void debugCheckTags(STag *pTag); // TODO: remove void debugCheckTags(STag *pTag); // TODO: remove
// STRUCT ================= // STRUCT =================
struct STColumn { struct STColumn {
...@@ -106,17 +107,9 @@ struct STSRow2 { ...@@ -106,17 +107,9 @@ struct STSRow2 {
}; };
struct STSRowBuilder { struct STSRowBuilder {
STSchema *pTSchema; STSRow2 tsRow;
int32_t szBitMap1; int32_t szBuf;
int32_t szBitMap2; uint8_t *pBuf;
int32_t szKVBuf;
uint8_t *pKVBuf;
int32_t szTPBuf;
uint8_t *pTPBuf;
int32_t iCol;
int32_t vlenKV;
int32_t vlenTP;
STSRow2 row;
}; };
struct SValue { struct SValue {
...@@ -154,7 +147,7 @@ struct STagVal { ...@@ -154,7 +147,7 @@ struct STagVal {
}; };
int8_t type; int8_t type;
union { union {
int64_t i64; int64_t i64;
struct { struct {
uint32_t nData; uint32_t nData;
uint8_t *pData; uint8_t *pData;
...@@ -162,7 +155,7 @@ struct STagVal { ...@@ -162,7 +155,7 @@ struct STagVal {
}; };
}; };
#define TD_TAG_JSON ((int8_t)0x40) // distinguish JSON string and JSON value with the highest bit #define TD_TAG_JSON ((int8_t)0x40) // distinguish JSON string and JSON value with the highest bit
#define TD_TAG_LARGE ((int8_t)0x20) #define TD_TAG_LARGE ((int8_t)0x20)
struct STag { struct STag {
int8_t flags; int8_t flags;
...@@ -422,4 +415,3 @@ int32_t tdMergeDataCols(SDataCols *target, SDataCols *source, int32_t rowsToM ...@@ -422,4 +415,3 @@ int32_t tdMergeDataCols(SDataCols *target, SDataCols *source, int32_t rowsToM
#endif #endif
#endif /*_TD_COMMON_DATA_FORMAT_H_*/ #endif /*_TD_COMMON_DATA_FORMAT_H_*/
此差异已折叠。
...@@ -36,7 +36,6 @@ target_sources( ...@@ -36,7 +36,6 @@ target_sources(
# tsdb # tsdb
"src/tsdb/tsdbCommit.c" "src/tsdb/tsdbCommit.c"
"src/tsdb/tsdbCommit2.c"
"src/tsdb/tsdbFile.c" "src/tsdb/tsdbFile.c"
"src/tsdb/tsdbFS.c" "src/tsdb/tsdbFS.c"
"src/tsdb/tsdbOpen.c" "src/tsdb/tsdbOpen.c"
......
...@@ -88,6 +88,18 @@ static void tsdbLoadAndMergeFromCache(STsdb *pTsdb, SDataCols *pDataCols, int *i ...@@ -88,6 +88,18 @@ static void tsdbLoadAndMergeFromCache(STsdb *pTsdb, SDataCols *pDataCols, int *i
SDataCols *pTarget, TSKEY maxKey, int maxRows, int8_t update); SDataCols *pTarget, TSKEY maxKey, int maxRows, int8_t update);
int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf); int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf);
int tsdbBegin(STsdb *pTsdb) {
if (!pTsdb) return 0;
STsdbMemTable *pMem;
if (tsdbMemTableCreate(pTsdb, &pTsdb->mem) < 0) {
return -1;
}
return 0;
}
int tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn) { int tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn) {
SDiskID did; SDiskID did;
SDFileSet nSet = {0}; SDFileSet nSet = {0};
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tsdb.h"
int tsdbBegin(STsdb *pTsdb) {
if (!pTsdb) return 0;
STsdbMemTable *pMem;
if (tsdbMemTableCreate(pTsdb, &pTsdb->mem) < 0) {
return -1;
}
return 0;
}
...@@ -52,20 +52,21 @@ struct SMemTable { ...@@ -52,20 +52,21 @@ struct SMemTable {
SArray *pArray; // SArray<SMemData> SArray *pArray; // SArray<SMemData>
}; };
#define SL_MAX_LEVEL 5
#define SL_NODE_SIZE(l) (sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l)*2) #define SL_NODE_SIZE(l) (sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l)*2)
#define SL_NODE_HALF_SIZE(l) (sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l))
#define SL_NODE_FORWARD(n, l) ((n)->forwards[l]) #define SL_NODE_FORWARD(n, l) ((n)->forwards[l])
#define SL_NODE_BACKWARD(n, l) ((n)->forwards[(n)->level + (l)]) #define SL_NODE_BACKWARD(n, l) ((n)->forwards[(n)->level + (l)])
#define SL_NODE_DATA(n) (&SL_NODE_BACKWARD(n, (n)->level)) #define SL_NODE_DATA(n) (&SL_NODE_BACKWARD(n, (n)->level))
#define SL_HEAD_FORWARD(sl, l) SL_NODE_FORWARD((sl)->pHead, l)
#define SL_TAIL_BACKWARD(sl, l) SL_NODE_FORWARD((sl)->pTail, l)
static int32_t tsdbGetOrCreateMemData(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, SMemData **ppMemData); static int32_t tsdbGetOrCreateMemData(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, SMemData **ppMemData);
static int memDataPCmprFn(const void *p1, const void *p2); static int memDataPCmprFn(const void *p1, const void *p2);
static int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow); static int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow);
static int32_t tGetTSDBRow(uint8_t *p, TSDBROW *pRow); static int32_t tGetTSDBRow(uint8_t *p, TSDBROW *pRow);
static int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl); static int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl);
static void memDataMovePos(SMemData *pMemData, TSDBROW *pRow, int8_t isForward, SMemSkipListNode **pos);
static int32_t memDataPutRow(SVBufPool *pPool, SMemData *pMemData, TSDBROW *pRow, int8_t isForward,
SMemSkipListNode **pos);
// SMemTable ============================================== // SMemTable ==============================================
int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable **ppMemTable) { int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable **ppMemTable) {
...@@ -109,6 +110,7 @@ int32_t tsdbInsertTableData2(STsdb *pTsdb, int64_t version, SVSubmitBlk *pSubmit ...@@ -109,6 +110,7 @@ int32_t tsdbInsertTableData2(STsdb *pTsdb, int64_t version, SVSubmitBlk *pSubmit
TSDBROW row = {.version = version}; TSDBROW row = {.version = version};
ASSERT(pMemTable); ASSERT(pMemTable);
ASSERT(pSubmitBlk->nData > 0);
{ {
// check if table exists (todo) // check if table exists (todo)
...@@ -122,38 +124,29 @@ int32_t tsdbInsertTableData2(STsdb *pTsdb, int64_t version, SVSubmitBlk *pSubmit ...@@ -122,38 +124,29 @@ int32_t tsdbInsertTableData2(STsdb *pTsdb, int64_t version, SVSubmitBlk *pSubmit
// do insert // do insert
int32_t nt; int32_t nt;
uint8_t *pt;
int32_t n = 0; int32_t n = 0;
uint8_t *p = pSubmitBlk->pData; uint8_t *p = pSubmitBlk->pData;
SVBufPool *pPool = pTsdb->pVnode->inUse; int32_t nRow = 0;
int8_t level; SMemSkipListNode *pos[SL_MAX_LEVEL] = {0};
SMemSkipListNode *pNode;
for (int8_t iLevel = 0; iLevel < SL_MAX_LEVEL; iLevel++) {
pos[iLevel] = pMemData->sl.pTail;
}
while (n < pSubmitBlk->nData) { while (n < pSubmitBlk->nData) {
nt = tGetTSRow(p + n, &row.tsRow); nt = tGetTSRow(p + n, &row.tsRow);
n += nt; n += nt;
ASSERT(n <= pSubmitBlk->nData); ASSERT(n <= pSubmitBlk->nData);
// build the node memDataMovePos(pMemData, &row, nRow ? 1 : 0, pos);
level = tsdbMemSkipListRandLevel(&pMemData->sl); code = memDataPutRow(pTsdb->pVnode->inUse, pMemData, &row, nRow ? 1 : 0, pos);
pNode = (SMemSkipListNode *)vnodeBufPoolMalloc(pPool, SL_NODE_SIZE(level) + nt + sizeof(version)); if (code) {
if (pNode == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
} }
pNode->level = level;
tPutTSDBRow((uint8_t *)SL_NODE_DATA(pNode), &row);
// put the node (todo)
// set info nRow++;
if (tsdbKeyCmprFn(&row, &pMemData->minKey) < 0) pMemData->minKey = *(TSDBKEY *)&row;
if (tsdbKeyCmprFn(&row, &pMemData->maxKey) > 0) pMemData->maxKey = *(TSDBKEY *)&row;
} }
if (tsdbKeyCmprFn(&pMemTable->minKey, &pMemData->minKey) < 0) pMemTable->minKey = pMemData->minKey;
if (tsdbKeyCmprFn(&pMemTable->maxKey, &pMemData->maxKey) > 0) pMemTable->maxKey = pMemData->maxKey;
return code; return code;
_err: _err:
...@@ -227,7 +220,7 @@ static int32_t tsdbGetOrCreateMemData(SMemTable *pMemTable, tb_uid_t suid, tb_ui ...@@ -227,7 +220,7 @@ static int32_t tsdbGetOrCreateMemData(SMemTable *pMemTable, tb_uid_t suid, tb_ui
} }
// create // create
pMemData = vnodeBufPoolMalloc(pPool, sizeof(*pMemData) + SL_NODE_HALF_SIZE(maxLevel) * 2); pMemData = vnodeBufPoolMalloc(pPool, sizeof(*pMemData) + SL_NODE_SIZE(maxLevel) * 2);
if (pMemData == NULL) { if (pMemData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
...@@ -242,11 +235,15 @@ static int32_t tsdbGetOrCreateMemData(SMemTable *pMemTable, tb_uid_t suid, tb_ui ...@@ -242,11 +235,15 @@ static int32_t tsdbGetOrCreateMemData(SMemTable *pMemTable, tb_uid_t suid, tb_ui
pMemData->sl.maxLevel = maxLevel; pMemData->sl.maxLevel = maxLevel;
pMemData->sl.level = 0; pMemData->sl.level = 0;
pMemData->sl.pHead = (SMemSkipListNode *)&pMemData[1]; pMemData->sl.pHead = (SMemSkipListNode *)&pMemData[1];
pMemData->sl.pTail = (SMemSkipListNode *)POINTER_SHIFT(pMemData->sl.pHead, SL_NODE_HALF_SIZE(maxLevel)); pMemData->sl.pTail = (SMemSkipListNode *)POINTER_SHIFT(pMemData->sl.pHead, SL_NODE_SIZE(maxLevel));
pMemData->sl.pHead->level = maxLevel;
pMemData->sl.pTail->level = maxLevel;
for (int8_t iLevel = 0; iLevel < pMemData->sl.maxLevel; iLevel++) { for (int8_t iLevel = 0; iLevel < pMemData->sl.maxLevel; iLevel++) {
SL_HEAD_FORWARD(&pMemData->sl, iLevel) = pMemData->sl.pTail; SL_NODE_FORWARD(pMemData->sl.pHead, iLevel) = pMemData->sl.pTail;
SL_TAIL_BACKWARD(&pMemData->sl, iLevel) = pMemData->sl.pHead; SL_NODE_BACKWARD(pMemData->sl.pHead, iLevel) = NULL;
SL_NODE_BACKWARD(pMemData->sl.pTail, iLevel) = pMemData->sl.pHead;
SL_NODE_FORWARD(pMemData->sl.pTail, iLevel) = NULL;
} }
if (idx < 0) idx = 0; if (idx < 0) idx = 0;
...@@ -313,264 +310,87 @@ static FORCE_INLINE int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl) { ...@@ -313,264 +310,87 @@ static FORCE_INLINE int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl) {
return level; return level;
} }
#if 0 //==================================================================================== static void memDataMovePos(SMemData *pMemData, TSDBROW *pRow, int8_t isForward, SMemSkipListNode **pos) {
TSDBKEY *pKey;
#define SL_MAX_LEVEL 5 int c;
struct SMemSkipListCurosr {
SMemSkipList *pSl;
SMemSkipListNode *pNodes[SL_MAX_LEVEL];
};
typedef struct {
int64_t version;
uint32_t szRow;
const STSRow *pRow;
} STsdbRow;
#define HASH_BUCKET(SUID, UID, NBUCKET) (TABS((SUID) + (UID)) % (NBUCKET)) if (isForward) {
// TODO
#define SL_HEAD_NODE(sl) ((sl)->pHead) } else {
#define SL_TAIL_NODE(sl) ((SMemSkipListNode *)&SL_NODE_FORWARD(SL_HEAD_NODE(sl), (sl)->maxLevel)) SMemSkipListNode *px = pMemData->sl.pTail;
#define SL_HEAD_NODE_FORWARD(n, l) SL_NODE_FORWARD(n, l)
#define SL_TAIL_NODE_BACKWARD(n, l) SL_NODE_FORWARD(n, l) for (int8_t iLevel = pMemData->sl.maxLevel - 1; iLevel >= 0; iLevel--) {
if (iLevel < pMemData->sl.level) {
static int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl); SMemSkipListNode *p = SL_NODE_BACKWARD(px, iLevel);
static int32_t tsdbEncodeRow(SEncoder *pEncoder, const STsdbRow *pRow);
static int32_t tsdbDecodeRow(SDecoder *pDecoder, STsdbRow *pRow); while (p != pMemData->sl.pHead) {
static int32_t tsdbMemSkipListCursorCreate(int8_t maxLevel, SMemSkipListCurosr **ppSlc); pKey = (TSDBKEY *)SL_NODE_DATA(p);
static void tsdbMemSkipListCursorDestroy(SMemSkipListCurosr *pSlc);
static void tsdbMemSkipListCursorInit(SMemSkipListCurosr *pSlc, SMemSkipList *pSl); c = tsdbKeyCmprFn(pKey, pRow);
static void tsdbMemSkipListCursorPut(SMemSkipListCurosr *pSlc, SMemSkipListNode *pNode); if (c <= 0) {
static int32_t tsdbMemSkipListCursorMoveTo(SMemSkipListCurosr *pSlc, int64_t version, TSKEY ts, int32_t flags); break;
static void tsdbMemSkipListCursorMoveToFirst(SMemSkipListCurosr *pSlc); } else {
static void tsdbMemSkipListCursorMoveToLast(SMemSkipListCurosr *pSlc); px = p;
static int32_t tsdbMemSkipListCursorMoveToNext(SMemSkipListCurosr *pSlc); p = SL_NODE_BACKWARD(px, iLevel);
static int32_t tsdbMemSkipListCursorMoveToPrev(SMemSkipListCurosr *pSlc); }
static SMemSkipListNode *tsdbMemSkipListNodeCreate(SVBufPool *pPool, SMemSkipList *pSl, const STsdbRow *pTRow); }
// SMemTable ======================== pos[iLevel] = px;
int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *pSubmitBlk) { }
SMemData *pMemData;
STsdb *pTsdb = pMemTb->pTsdb;
SVnode *pVnode = pTsdb->pVnode;
SVBufPool *pPool = pVnode->inUse;
tb_uid_t suid = pSubmitBlk->suid;
tb_uid_t uid = pSubmitBlk->uid;
int32_t iBucket;
// search SMemData by hash
iBucket = HASH_BUCKET(suid, uid, pMemTb->nBucket);
for (pMemData = pMemTb->pBuckets[iBucket]; pMemData; pMemData = pMemData->pHashNext) {
if (pMemData->suid == suid && pMemData->uid == uid) break;
}
// create pMemData if need
if (pMemData == NULL) {
int8_t maxLevel = pVnode->config.tsdbCfg.slLevel;
int32_t tsize = sizeof(*pMemData) + SL_NODE_HALF_SIZE(maxLevel) * 2;
SMemSkipListNode *pHead, *pTail;
pMemData = vnodeBufPoolMalloc(pPool, tsize);
if (pMemData == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pMemData->pHashNext = NULL;
pMemData->suid = suid;
pMemData->uid = uid;
pMemData->minKey = TSKEY_MAX;
pMemData->maxKey = TSKEY_MIN;
pMemData->minVer = -1;
pMemData->maxVer = -1;
pMemData->nRows = 0;
pMemData->sl.seed = taosRand();
pMemData->sl.maxLevel = maxLevel;
pMemData->sl.level = 0;
pMemData->sl.size = 0;
pHead = SL_HEAD_NODE(&pMemData->sl);
pTail = SL_TAIL_NODE(&pMemData->sl);
pHead->level = maxLevel;
pTail->level = maxLevel;
for (int iLevel = 0; iLevel < maxLevel; iLevel++) {
SL_HEAD_NODE_FORWARD(pHead, iLevel) = pTail;
SL_TAIL_NODE_BACKWARD(pTail, iLevel) = pHead;
}
// add to hash
if (pMemTb->nHash >= pMemTb->nBucket) {
// rehash (todo)
}
iBucket = HASH_BUCKET(suid, uid, pMemTb->nBucket);
pMemData->pHashNext = pMemTb->pBuckets[iBucket];
pMemTb->pBuckets[iBucket] = pMemData;
pMemTb->nHash++;
// sort organize (todo)
}
// do insert data to SMemData
SMemSkipListNode *forwards[SL_MAX_LEVEL];
SMemSkipListNode *pNode;
int32_t iRow;
STsdbRow tRow = {.version = version};
SEncoder ec = {0};
SDecoder dc = {0};
tDecoderInit(&dc, pSubmitBlk->pData, pSubmitBlk->nData);
tsdbMemSkipListCursorInit(pMemTb->pSlc, &pMemData->sl);
for (iRow = 0;; iRow++) {
if (tDecodeIsEnd(&dc)) break;
// decode row
if (tDecodeBinary(&dc, (uint8_t **)&tRow.pRow, &tRow.szRow) < 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
// move cursor
tsdbMemSkipListCursorMoveTo(pMemTb->pSlc, version, tRow.pRow->ts, 0);
// encode row
pNode = tsdbMemSkipListNodeCreate(pPool, &pMemData->sl, &tRow);
if (pNode == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
} }
// put the node
tsdbMemSkipListCursorPut(pMemTb->pSlc, pNode);
// update status
if (tRow.pRow->ts < pMemData->minKey) pMemData->minKey = tRow.pRow->ts;
if (tRow.pRow->ts > pMemData->maxKey) pMemData->maxKey = tRow.pRow->ts;
} }
tDecoderClear(&dc);
// update status
if (pMemData->minVer == -1) pMemData->minVer = version;
if (pMemData->maxVer == -1 || pMemData->maxVer < version) pMemData->maxVer = version;
if (pMemTb->minKey < pMemData->minKey) pMemTb->minKey = pMemData->minKey;
if (pMemTb->maxKey < pMemData->maxKey) pMemTb->maxKey = pMemData->maxKey;
if (pMemTb->minVer == -1) pMemTb->minVer = version;
if (pMemTb->maxVer == -1 || pMemTb->maxVer < version) pMemTb->maxVer = version;
return 0;
} }
static FORCE_INLINE int32_t tsdbEncodeRow(SEncoder *pEncoder, const STsdbRow *pRow) { static void memMovePosFrom(SMemData *pMemData, SMemSkipListNode *pNode, TSDBROW *pRow, int8_t isForward,
if (tEncodeI64(pEncoder, pRow->version) < 0) return -1; SMemSkipListNode **pos) {
if (tEncodeBinary(pEncoder, (const uint8_t *)pRow->pRow, pRow->szRow) < 0) return -1; SMemSkipListNode *px = pNode;
return 0; TSDBKEY *pKey;
} SMemSkipListNode *p;
int c;
static FORCE_INLINE int32_t tsdbDecodeRow(SDecoder *pDecoder, STsdbRow *pRow) { if (isForward) {
if (tDecodeI64(pDecoder, &pRow->version) < 0) return -1; } else {
if (tDecodeBinary(pDecoder, (uint8_t **)&pRow->pRow, &pRow->szRow) < 0) return -1; ASSERT(pNode != pMemData->sl.pHead);
return 0;
} for (int8_t iLevel = pMemData->sl.maxLevel - 1; iLevel >= 0; iLevel--) {
p = SL_NODE_BACKWARD(px, iLevel);
static int32_t tsdbMemSkipListCursorCreate(int8_t maxLevel, SMemSkipListCurosr **ppSlc) { while (p != pMemData->sl.pHead) {
*ppSlc = (SMemSkipListCurosr *)taosMemoryCalloc(1, sizeof(**ppSlc) + sizeof(SMemSkipListNode *) * maxLevel); pKey = (TSDBKEY *)SL_NODE_DATA(p);
if (*ppSlc == NULL) {
return -1; c = tsdbKeyCmprFn(pKey, pRow);
} if (c <= 0) {
return 0; break;
} } else {
px = p;
static void tsdbMemSkipListCursorDestroy(SMemSkipListCurosr *pSlc) { taosMemoryFree(pSlc); } p = SL_NODE_BACKWARD(px, iLevel);
}
static void tsdbMemSkipListCursorInit(SMemSkipListCurosr *pSlc, SMemSkipList *pSl) { }
SMemSkipListNode *pHead = SL_HEAD_NODE(pSl);
pSlc->pSl = pSl; pos[iLevel] = px;
// for (int8_t iLevel = 0; iLevel < pSl->maxLevel; iLevel++) {
// pSlc->forwards[iLevel] = pHead;
// }
}
static void tsdbMemSkipListCursorPut(SMemSkipListCurosr *pSlc, SMemSkipListNode *pNode) {
SMemSkipList *pSl = pSlc->pSl;
SMemSkipListNode *pNodeNext;
for (int8_t iLevel = 0; iLevel < pNode->level; iLevel++) {
// todo
ASSERT(0);
}
if (pSl->level < pNode->level) {
pSl->level = pNode->level;
}
pSl->size += 1;
}
static int32_t tsdbMemSkipListCursorMoveTo(SMemSkipListCurosr *pSlc, int64_t version, TSKEY ts, int32_t flags) {
SMemSkipListNode **pForwards = NULL;
SMemSkipList *pSl = pSlc->pSl;
int8_t maxLevel = pSl->maxLevel;
SMemSkipListNode *pHead = SL_HEAD_NODE(pSl);
SMemSkipListNode *pTail = SL_TAIL_NODE(pSl);
if (pSl->size == 0) {
for (int8_t iLevel = 0; iLevel < pSl->maxLevel; iLevel++) {
pForwards[iLevel] = pHead;
} }
} }
return 0;
}
static void tsdbMemSkipListCursorMoveToFirst(SMemSkipListCurosr *pSlc) {
SMemSkipList *pSl = pSlc->pSl;
SMemSkipListNode *pHead = SL_HEAD_NODE(pSl);
for (int8_t iLevel = 0; iLevel < pSl->maxLevel; iLevel++) {
pSlc->pNodes[iLevel] = pHead;
}
tsdbMemSkipListCursorMoveToNext(pSlc);
} }
static void tsdbMemSkipListCursorMoveToLast(SMemSkipListCurosr *pSlc) { static int32_t memDataPutRow(SVBufPool *pPool, SMemData *pMemData, TSDBROW *pRow, int8_t isForward,
SMemSkipList *pSl = pSlc->pSl; SMemSkipListNode **pos) {
SMemSkipListNode *pTail = SL_TAIL_NODE(pSl); int32_t code = 0;
int8_t level;
SMemSkipListNode *pNode;
for (int8_t iLevel = 0; iLevel < pSl->maxLevel; iLevel++) { level = tsdbMemSkipListRandLevel(&pMemData->sl);
pSlc->pNodes[iLevel] = pTail; pNode = (SMemSkipListNode *)vnodeBufPoolMalloc(pPool, SL_NODE_SIZE(level) + tPutTSDBRow(NULL, pRow));
if (pNode == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
} }
tsdbMemSkipListCursorMoveToPrev(pSlc); // do the read put
} if (isForward) {
// TODO
static int32_t tsdbMemSkipListCursorMoveToNext(SMemSkipListCurosr *pSlc) { } else {
// TODO // TODO
return 0;
}
static int32_t tsdbMemSkipListCursorMoveToPrev(SMemSkipListCurosr *pSlc) {
// TODO
return 0;
}
static SMemSkipListNode *tsdbMemSkipListNodeCreate(SVBufPool *pPool, SMemSkipList *pSl, const STsdbRow *pTRow) {
int32_t tsize;
int32_t ret;
int8_t level = tsdbMemSkipListRandLevel(pSl);
SMemSkipListNode *pNode = NULL;
SEncoder ec = {0};
tEncodeSize(tsdbEncodeRow, pTRow, tsize, ret);
pNode = vnodeBufPoolMalloc(pPool, tsize + SL_NODE_SIZE(level));
if (pNode) {
pNode->level = level;
tEncoderInit(&ec, (uint8_t *)SL_NODE_DATA(pNode), tsize);
tsdbEncodeRow(&ec, pTRow);
tEncoderClear(&ec);
} }
return pNode; _exit:
} return code;
#endif }
\ No newline at end of file \ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册