diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable2.c b/source/dnode/vnode/src/tsdb/tsdbMemTable2.c index d32caed77ba1cac0ca1a76739153463e14f8eacd..78be665ae989f20659d63c88d4d8d3bb7e1461e7 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable2.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable2.c @@ -59,12 +59,15 @@ struct SMemTable { #define SL_NODE_BACKWARD(n, l) ((n)->forwards[(n)->level + (l)]) #define SL_NODE_DATA(n) (&SL_NODE_BACKWARD(n, (n)->level)) +#define SL_MOVE_BACKWARD 0x1 +#define SL_MOVE_FROM_POS 0x2 + static int32_t tsdbGetOrCreateMemData(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, SMemData **ppMemData); static int memDataPCmprFn(const void *p1, const void *p2); static int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow); static int32_t tGetTSDBRow(uint8_t *p, TSDBROW *pRow); static int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl); -static void memDataMovePosTo(SMemData *pMemData, SMemSkipListNode **pos, TSDBROW *pRow, int8_t forward); +static void memDataMovePosTo(SMemData *pMemData, SMemSkipListNode **pos, TSDBKEY *pKey, int32_t flags); static int32_t memDataDoPut(SMemData *pMemData, SMemSkipListNode **pos, TSDBROW *pRow, int8_t forward); static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, SMemData *pMemData, int64_t version, SVSubmitBlk *pSubmitBlk); @@ -308,7 +311,7 @@ static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, SMemData *pMemData, n += tGetTSRow(p + n, &row.tsRow); ASSERT(n <= pSubmitBlk->nData); - memDataMovePosTo(pMemData, pos, &row, 0); + memDataMovePosTo(pMemData, pos, &(TSDBKEY){.version = version, .ts = row.tsRow.ts}, SL_MOVE_BACKWARD); code = memDataDoPut(pMemData, pos, &row, 0); if (code) { goto _exit; @@ -321,7 +324,7 @@ static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, SMemData *pMemData, ASSERT(n <= pSubmitBlk->nData); // TODO - memDataMovePosTo(pMemData, pos, &row, 1); + memDataMovePosTo(pMemData, pos, &(TSDBKEY){.version = version, .ts = row.tsRow.ts}, SL_MOVE_FROM_POS); code = memDataDoPut(pMemData, pos, &row, 1); if (code) { goto _exit; @@ -334,38 +337,23 @@ _exit: return code; } -static void memDataMovePosTo(SMemData *pMemData, SMemSkipListNode **pos, TSDBROW *pRow, int8_t forward) { +static void memDataMovePosTo(SMemData *pMemData, SMemSkipListNode **pos, TSDBKEY *pKey, int32_t flags) { SMemSkipListNode *px; SMemSkipListNode *pn; - TSDBKEY *pKey; + TSDBKEY *pTKey; int c; + int backward = flags & SL_MOVE_BACKWARD; + int fromPos = flags & SL_MOVE_FROM_POS; - if (forward) { - px = pMemData->sl.pHead; - for (int8_t iLevel = pMemData->sl.maxLevel - 1; iLevel >= 0; iLevel--) { - pn = SL_NODE_FORWARD(px, iLevel); - while (pn != pMemData->sl.pTail) { - pKey = (TSDBKEY *)SL_NODE_DATA(pn); - - c = tsdbKeyCmprFn(pKey, pRow); - if (pKey >= 0) { - break; - } else { - px = pn; - pn = SL_NODE_FORWARD(px, iLevel); - } - } - pos[iLevel] = px; - } - } else { + if (backward) { px = pMemData->sl.pTail; for (int8_t iLevel = pMemData->sl.maxLevel - 1; iLevel >= 0; iLevel--) { if (iLevel < pMemData->sl.level) { pn = SL_NODE_BACKWARD(px, iLevel); while (pn != pMemData->sl.pHead) { - pKey = (TSDBKEY *)SL_NODE_DATA(pn); + pTKey = (TSDBKEY *)SL_NODE_DATA(pn); - c = tsdbKeyCmprFn(pKey, pRow); + c = tsdbKeyCmprFn(pTKey, pKey); if (c <= 0) { break; } else { @@ -376,6 +364,23 @@ static void memDataMovePosTo(SMemData *pMemData, SMemSkipListNode **pos, TSDBROW } pos[iLevel] = px; } + } else { + px = pMemData->sl.pHead; + for (int8_t iLevel = pMemData->sl.maxLevel - 1; iLevel >= 0; iLevel--) { + pn = SL_NODE_FORWARD(px, iLevel); + while (pn != pMemData->sl.pTail) { + pTKey = (TSDBKEY *)SL_NODE_DATA(pn); + + c = tsdbKeyCmprFn(pTKey, pKey); + if (pTKey >= 0) { + break; + } else { + px = pn; + pn = SL_NODE_FORWARD(px, iLevel); + } + } + pos[iLevel] = px; + } } }