提交 9fe400f2 编写于 作者: H Hongze Cheng

more

上级 341a46e4
...@@ -59,12 +59,15 @@ struct SMemTable { ...@@ -59,12 +59,15 @@ struct SMemTable {
#define SL_NODE_BACKWARD(n, l) ((n)->forwards[(n)->level + (l)]) #define SL_NODE_BACKWARD(n, l) ((n)->forwards[(n)->level + (l)])
#define SL_NODE_DATA(n) (&SL_NODE_BACKWARD(n, (n)->level)) #define SL_NODE_DATA(n) (&SL_NODE_BACKWARD(n, (n)->level))
#define SL_MOVE_BACKWARD 0x1
#define SL_MOVE_FROM_POS 0x2
static int32_t tsdbGetOrCreateMemData(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, SMemData **ppMemData); static int32_t tsdbGetOrCreateMemData(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, SMemData **ppMemData);
static int memDataPCmprFn(const void *p1, const void *p2); static int memDataPCmprFn(const void *p1, const void *p2);
static int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow); static int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow);
static int32_t tGetTSDBRow(uint8_t *p, TSDBROW *pRow); static int32_t tGetTSDBRow(uint8_t *p, TSDBROW *pRow);
static int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl); static int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl);
static void memDataMovePosTo(SMemData *pMemData, SMemSkipListNode **pos, TSDBROW *pRow, int8_t forward); static void memDataMovePosTo(SMemData *pMemData, SMemSkipListNode **pos, TSDBKEY *pKey, int32_t flags);
static int32_t memDataDoPut(SMemData *pMemData, SMemSkipListNode **pos, TSDBROW *pRow, int8_t forward); static int32_t memDataDoPut(SMemData *pMemData, SMemSkipListNode **pos, TSDBROW *pRow, int8_t forward);
static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, SMemData *pMemData, int64_t version, static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, SMemData *pMemData, int64_t version,
SVSubmitBlk *pSubmitBlk); SVSubmitBlk *pSubmitBlk);
...@@ -308,7 +311,7 @@ static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, SMemData *pMemData, ...@@ -308,7 +311,7 @@ static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, SMemData *pMemData,
n += tGetTSRow(p + n, &row.tsRow); n += tGetTSRow(p + n, &row.tsRow);
ASSERT(n <= pSubmitBlk->nData); ASSERT(n <= pSubmitBlk->nData);
memDataMovePosTo(pMemData, pos, &row, 0); memDataMovePosTo(pMemData, pos, &(TSDBKEY){.version = version, .ts = row.tsRow.ts}, SL_MOVE_BACKWARD);
code = memDataDoPut(pMemData, pos, &row, 0); code = memDataDoPut(pMemData, pos, &row, 0);
if (code) { if (code) {
goto _exit; goto _exit;
...@@ -321,7 +324,7 @@ static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, SMemData *pMemData, ...@@ -321,7 +324,7 @@ static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, SMemData *pMemData,
ASSERT(n <= pSubmitBlk->nData); ASSERT(n <= pSubmitBlk->nData);
// TODO // TODO
memDataMovePosTo(pMemData, pos, &row, 1); memDataMovePosTo(pMemData, pos, &(TSDBKEY){.version = version, .ts = row.tsRow.ts}, SL_MOVE_FROM_POS);
code = memDataDoPut(pMemData, pos, &row, 1); code = memDataDoPut(pMemData, pos, &row, 1);
if (code) { if (code) {
goto _exit; goto _exit;
...@@ -334,44 +337,46 @@ _exit: ...@@ -334,44 +337,46 @@ _exit:
return code; return code;
} }
static void memDataMovePosTo(SMemData *pMemData, SMemSkipListNode **pos, TSDBROW *pRow, int8_t forward) { static void memDataMovePosTo(SMemData *pMemData, SMemSkipListNode **pos, TSDBKEY *pKey, int32_t flags) {
SMemSkipListNode *px; SMemSkipListNode *px;
SMemSkipListNode *pn; SMemSkipListNode *pn;
TSDBKEY *pKey; TSDBKEY *pTKey;
int c; int c;
int backward = flags & SL_MOVE_BACKWARD;
int fromPos = flags & SL_MOVE_FROM_POS;
if (forward) { if (backward) {
px = pMemData->sl.pHead; px = pMemData->sl.pTail;
for (int8_t iLevel = pMemData->sl.maxLevel - 1; iLevel >= 0; iLevel--) { for (int8_t iLevel = pMemData->sl.maxLevel - 1; iLevel >= 0; iLevel--) {
pn = SL_NODE_FORWARD(px, iLevel); if (iLevel < pMemData->sl.level) {
while (pn != pMemData->sl.pTail) { pn = SL_NODE_BACKWARD(px, iLevel);
pKey = (TSDBKEY *)SL_NODE_DATA(pn); while (pn != pMemData->sl.pHead) {
pTKey = (TSDBKEY *)SL_NODE_DATA(pn);
c = tsdbKeyCmprFn(pKey, pRow); c = tsdbKeyCmprFn(pTKey, pKey);
if (pKey >= 0) { if (c <= 0) {
break; break;
} else { } else {
px = pn; px = pn;
pn = SL_NODE_FORWARD(px, iLevel); pn = SL_NODE_BACKWARD(px, iLevel);
}
} }
} }
pos[iLevel] = px; pos[iLevel] = px;
} }
} else { } else {
px = pMemData->sl.pTail; px = pMemData->sl.pHead;
for (int8_t iLevel = pMemData->sl.maxLevel - 1; iLevel >= 0; iLevel--) { for (int8_t iLevel = pMemData->sl.maxLevel - 1; iLevel >= 0; iLevel--) {
if (iLevel < pMemData->sl.level) { pn = SL_NODE_FORWARD(px, iLevel);
pn = SL_NODE_BACKWARD(px, iLevel); while (pn != pMemData->sl.pTail) {
while (pn != pMemData->sl.pHead) { pTKey = (TSDBKEY *)SL_NODE_DATA(pn);
pKey = (TSDBKEY *)SL_NODE_DATA(pn);
c = tsdbKeyCmprFn(pKey, pRow); c = tsdbKeyCmprFn(pTKey, pKey);
if (c <= 0) { if (pTKey >= 0) {
break; break;
} else { } else {
px = pn; px = pn;
pn = SL_NODE_BACKWARD(px, iLevel); pn = SL_NODE_FORWARD(px, iLevel);
}
} }
} }
pos[iLevel] = px; pos[iLevel] = px;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册