提交 48753063 编写于 作者: H Hongze Cheng

feat: tsdb multi-version 1

上级 3b8e93f4
......@@ -39,35 +39,46 @@ typedef struct SDelOp SDelOp;
static int tsdbKeyCmprFn(const void *p1, const void *p2);
// tsdbMemTable ================
typedef struct STbData STbData;
typedef struct SMemTable SMemTable;
typedef struct SMergeInfo SMergeInfo;
typedef struct STable STable;
int tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable);
void tsdbMemTableDestroy(SMemTable *pMemTable);
int tsdbLoadDataFromCache(STsdb *pTsdb, STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead,
SDataCols *pCols, TKEY *filterKeys, int nFilterKeys, bool keepDup, SMergeInfo *pMergeInfo);
// tsdbMemTable ==============================================================================================
typedef struct STbData STbData;
typedef struct SMemTable SMemTable;
typedef struct STbDataIter STbDataIter;
typedef struct SMergeInfo SMergeInfo;
typedef struct STable STable;
// SMemTable
int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable);
void tsdbMemTableDestroy(SMemTable *pMemTable);
void tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, STbData **ppTbData);
// STbDataIter
int32_t tsdbTbDataIterCreate(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter **ppIter);
void *tsdbTbDataIterDestroy(STbDataIter *pIter);
void tsdbTbDataIterOpen(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter *pIter);
bool tsdbTbDataIterNext(STbDataIter *pIter);
bool tsdbTbDataIterGet(STbDataIter *pIter, TSDBROW *pRow);
int tsdbLoadDataFromCache(STsdb *pTsdb, STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead,
SDataCols *pCols, TKEY *filterKeys, int nFilterKeys, bool keepDup, SMergeInfo *pMergeInfo);
// tsdbMemTable2.c ==============================================================================================
typedef struct SMemTable2 SMemTable2;
typedef struct SMemData SMemData;
typedef struct SMemDataIter SMemDataIter;
// typedef struct SMemTable2 SMemTable2;
// typedef struct SMemData SMemData;
// typedef struct SMemDataIter SMemDataIter;
int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable2 **ppMemTable);
void tsdbMemTableDestroy2(SMemTable2 *pMemTable);
int32_t tsdbInsertTableData2(STsdb *pTsdb, int64_t version, SVSubmitBlk *pSubmitBlk);
int32_t tsdbDeleteTableData2(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey);
// int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable2 **ppMemTable);
// void tsdbMemTableDestroy2(SMemTable2 *pMemTable);
// int32_t tsdbInsertTableData2(STsdb *pTsdb, int64_t version, SVSubmitBlk *pSubmitBlk);
// int32_t tsdbDeleteTableData2(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey);
/* SMemDataIter */
void tsdbMemDataIterOpen(SMemData *pMemData, TSDBKEY *pKey, int8_t backward, SMemDataIter *pIter);
bool tsdbMemDataIterNext(SMemDataIter *pIter);
void tsdbMemDataIterGet(SMemDataIter *pIter, TSDBROW **ppRow);
// /* SMemDataIter */
// void tsdbMemDataIterOpen(SMemData *pMemData, TSDBKEY *pKey, int8_t backward, SMemDataIter *pIter);
// bool tsdbMemDataIterNext(SMemDataIter *pIter);
// void tsdbMemDataIterGet(SMemDataIter *pIter, TSDBROW **ppRow);
// tsdbCommit2.c ==============================================================================================
int32_t tsdbBegin2(STsdb *pTsdb);
int32_t tsdbCommit2(STsdb *pTsdb);
// // tsdbCommit2.c ==============================================================================================
// int32_t tsdbBegin2(STsdb *pTsdb);
// int32_t tsdbCommit2(STsdb *pTsdb);
// tsdbFile.c ==============================================================================================
typedef int32_t TSDB_FILE_T;
......@@ -211,29 +222,39 @@ struct TSDBKEY {
TSKEY ts;
};
typedef struct SMemSkipListNode SMemSkipListNode;
struct SMemSkipListNode {
int8_t level;
SMemSkipListNode *forwards[0];
};
typedef struct SMemSkipList {
uint32_t seed;
int64_t size;
int8_t maxLevel;
int8_t level;
SMemSkipListNode *pHead;
SMemSkipListNode *pTail;
} SMemSkipList;
struct STbData {
tb_uid_t suid;
tb_uid_t uid;
TSDBKEY minKey;
TSDBKEY maxKey;
SDelOp *pHead;
SDelOp *pTail;
int64_t nrows;
SSkipList *pData;
tb_uid_t suid;
tb_uid_t uid;
TSDBKEY minKey;
TSDBKEY maxKey;
SDelOp *pHead;
SDelOp *pTail;
SMemSkipList sl;
};
struct SMemTable {
STsdb *pTsdb;
int32_t nRef;
SRWLatch latch;
TSDBKEY minKey;
TSDBKEY maxKey;
int64_t nRow;
int64_t nDelOp;
SDelOp *pHead;
SDelOp *pTail;
SSkipList *pSlIdx; // SSkiplist<STbData>
SHashObj *pHashIdx;
SRWLatch latch;
STsdb *pTsdb;
int32_t nRef;
TSDBKEY minKey;
TSDBKEY maxKey;
int64_t nRow;
int64_t nDelOp;
SArray *aTbData; // SArray<STbData>
};
struct STsdbFSMeta {
......@@ -656,7 +677,7 @@ struct SFSIter {
struct TSDBROW {
int64_t version;
STSRow2 tsRow;
STSRow *pTSRow;
};
struct TABLEID {
......@@ -709,16 +730,6 @@ static FORCE_INLINE int tsdbKeyCmprFn(const void *p1, const void *p2) {
return 0;
}
typedef struct SMemSkipListNode SMemSkipListNode;
typedef struct SMemSkipList {
uint32_t seed;
int32_t size;
int8_t maxLevel;
int8_t level;
SMemSkipListNode *pHead;
SMemSkipListNode *pTail;
} SMemSkipList;
struct SMemData {
tb_uid_t suid;
tb_uid_t uid;
......@@ -730,13 +741,19 @@ struct SMemData {
};
struct SMemDataIter {
SMemData *pMemData;
STbData *pMemData;
int8_t backward;
TSDBROW *pRow;
SMemSkipListNode *pNode; // current node
TSDBROW row;
};
struct STbDataIter {
STbData *pTbData;
int8_t backward;
SMemSkipListNode *pNode;
};
#endif
#ifdef __cplusplus
......
......@@ -116,7 +116,9 @@ int tsdbBegin(STsdb* pTsdb);
int32_t tsdbCommit(STsdb* pTsdb);
int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq* pMsg);
int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp);
int tsdbInsertTableData(STsdb* pTsdb, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, SSubmitBlkRsp* pRsp);
int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock,
SSubmitBlkRsp* pRsp);
int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey);
tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
uint64_t taskId);
tsdbReaderT tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
......
......@@ -58,26 +58,26 @@ typedef struct {
#define TSDB_COMMIT_DEFAULT_ROWS(ch) TSDB_DEFAULT_BLOCK_ROWS(TSDB_COMMIT_REPO(ch)->pVnode->config.tsdbCfg.maxRows)
#define TSDB_COMMIT_TXN_VERSION(ch) FS_TXN_VERSION(REPO_FS(TSDB_COMMIT_REPO(ch)))
static void tsdbStartCommit(STsdb *pRepo);
static void tsdbEndCommit(STsdb *pTsdb, int eno);
static int tsdbInitCommitH(SCommitH *pCommith, STsdb *pRepo);
static void tsdbSeekCommitIter(SCommitH *pCommith, TSKEY key);
static int tsdbNextCommitFid(SCommitH *pCommith);
static void tsdbDestroyCommitH(SCommitH *pCommith);
static int tsdbCreateCommitIters(SCommitH *pCommith);
static void tsdbDestroyCommitIters(SCommitH *pCommith);
static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid);
static void tsdbResetCommitFile(SCommitH *pCommith);
static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid);
static int tsdbCommitToTable(SCommitH *pCommith, int tid);
static bool tsdbCommitIsSameFile(SCommitH *pCommith, int bidx);
static int tsdbMoveBlkIdx(SCommitH *pCommith, SBlockIdx *pIdx);
static int tsdbSetCommitTable(SCommitH *pCommith, STable *pTable);
static int tsdbComparKeyBlock(const void *arg1, const void *arg2);
static int tsdbWriteBlockInfo(SCommitH *pCommih);
static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLimit, bool toData);
static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx);
static int tsdbMoveBlock(SCommitH *pCommith, int bidx);
static void tsdbStartCommit(STsdb *pRepo);
static void tsdbEndCommit(STsdb *pTsdb, int eno);
static int tsdbInitCommitH(SCommitH *pCommith, STsdb *pRepo);
static void tsdbSeekCommitIter(SCommitH *pCommith, TSKEY key);
static int tsdbNextCommitFid(SCommitH *pCommith);
static void tsdbDestroyCommitH(SCommitH *pCommith);
static int32_t tsdbCreateCommitIters(SCommitH *pCommith);
static void tsdbDestroyCommitIters(SCommitH *pCommith);
static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid);
static void tsdbResetCommitFile(SCommitH *pCommith);
static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid);
static int tsdbCommitToTable(SCommitH *pCommith, int tid);
static bool tsdbCommitIsSameFile(SCommitH *pCommith, int bidx);
static int tsdbMoveBlkIdx(SCommitH *pCommith, SBlockIdx *pIdx);
static int tsdbSetCommitTable(SCommitH *pCommith, STable *pTable);
static int tsdbComparKeyBlock(const void *arg1, const void *arg2);
static int tsdbWriteBlockInfo(SCommitH *pCommih);
static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLimit, bool toData);
static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx);
static int tsdbMoveBlock(SCommitH *pCommith, int bidx);
static int tsdbCommitAddBlock(SCommitH *pCommith, const SBlock *pSupBlock, const SBlock *pSubBlocks, int nSubBlocks);
static int tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols *pDataCols, TSKEY keyLimit,
bool isLastOneBlock);
......@@ -453,11 +453,32 @@ static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) {
return 0;
}
static int tsdbCreateCommitIters(SCommitH *pCommith) {
STsdb *pRepo = TSDB_COMMIT_REPO(pCommith);
SMemTable *pMem = pRepo->imem;
static int32_t tsdbCreateCommitIters(SCommitH *pCommith) {
int32_t code = 0;
STsdb *pRepo = TSDB_COMMIT_REPO(pCommith);
SMemTable *pMem = pRepo->imem;
SCommitIter *pCommitIter;
pCommith->niters = taosArrayGetSize(pMem->aTbData);
pCommith->iters = (SCommitIter *)taosMemoryCalloc(pCommith->niters, sizeof(SCommitIter));
if (pCommith->iters == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
for (int32_t iIter = 0; iIter < pCommith->niters; iIter++) {
pCommitIter = (SCommitIter *)taosArrayGetP(pMem->aTbData, iIter);
// TODO
// pCommitIter->pIter =
}
return code;
_err:
return code;
#if 0
SSkipListIterator *pSlIter;
SCommitIter *pCommitIter;
SSkipListNode *pNode;
STbData *pTbData;
STSchema *pTSchema = NULL;
......@@ -495,8 +516,7 @@ static int tsdbCreateCommitIters(SCommitH *pCommith) {
}
}
tSkipListDestroyIter(pSlIter);
return 0;
#endif
}
static void tsdbDestroyCommitIters(SCommitH *pCommith) {
......
......@@ -15,11 +15,6 @@
#include "tsdb.h"
struct SMemSkipListNode {
int8_t level;
SMemSkipListNode *forwards[0];
};
typedef struct {
tb_uid_t uid;
STSchema *pTSchema;
......
......@@ -67,15 +67,16 @@ enum {
};
typedef struct STableCheckInfo {
uint64_t tableId;
TSKEY lastKey;
SBlockInfo* pCompInfo;
int32_t compSize;
int32_t numOfBlocks : 29; // number of qualified data blocks not the original blocks
uint8_t chosen : 2; // indicate which iterator should move forward
bool initBuf : 1; // whether to initialize the in-memory skip list iterator or not
SSkipListIterator* iter; // mem buffer skip list iterator
SSkipListIterator* iiter; // imem buffer skip list iterator
uint64_t suid;
uint64_t tableId;
TSKEY lastKey;
SBlockInfo* pCompInfo;
int32_t compSize;
int32_t numOfBlocks : 29; // number of qualified data blocks not the original blocks
uint8_t chosen : 2; // indicate which iterator should move forward
bool initBuf : 1; // whether to initialize the in-memory skip list iterator or not
STbDataIter* iter; // mem buffer skip list iterator
STbDataIter* iiter; // imem buffer skip list iterator
} STableCheckInfo;
typedef struct STableBlockInfo {
......@@ -265,8 +266,8 @@ static void resetCheckInfo(STsdbReadHandle* pTsdbReadHandle) {
for (int32_t i = 0; i < numOfTables; ++i) {
STableCheckInfo* pCheckInfo = (STableCheckInfo*)taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
pCheckInfo->lastKey = pTsdbReadHandle->window.skey;
pCheckInfo->iter = tSkipListDestroyIter(pCheckInfo->iter);
pCheckInfo->iiter = tSkipListDestroyIter(pCheckInfo->iiter);
pCheckInfo->iter = tsdbTbDataIterDestroy(pCheckInfo->iter);
pCheckInfo->iiter = tsdbTbDataIterDestroy(pCheckInfo->iiter);
pCheckInfo->initBuf = false;
if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
......@@ -752,23 +753,21 @@ static bool initTableMemIterator(STsdbReadHandle* pHandle, STableCheckInfo* pChe
pCheckInfo->initBuf = true;
int32_t order = pHandle->order;
STbData** pMem = NULL;
STbData** pIMem = NULL;
STbData* pMem = NULL;
STbData* pIMem = NULL;
TSKEY tLastKey = keyToTkey(pCheckInfo->lastKey);
if (pHandle->pTsdb->mem != NULL) {
pMem = taosHashGet(pHandle->pTsdb->mem->pHashIdx, &pCheckInfo->tableId, sizeof(pCheckInfo->tableId));
tsdbGetTbDataFromMemTable(pHandle->pTsdb->mem, pCheckInfo->suid, pCheckInfo->tableId, &pMem);
if (pMem != NULL) {
pCheckInfo->iter =
tSkipListCreateIterFromVal((*pMem)->pData, (const char*)&tLastKey, TSDB_DATA_TYPE_TIMESTAMP, order);
tsdbTbDataIterCreate(pMem, &(TSDBKEY){.version = 0, .ts = tLastKey}, 0, &pCheckInfo->iter);
}
}
if (pHandle->pTsdb->imem != NULL) {
pIMem = taosHashGet(pHandle->pTsdb->imem->pHashIdx, &pCheckInfo->tableId, sizeof(pCheckInfo->tableId));
tsdbGetTbDataFromMemTable(pHandle->pTsdb->mem, pCheckInfo->suid, pCheckInfo->tableId, &pIMem);
if (pIMem != NULL) {
pCheckInfo->iiter =
tSkipListCreateIterFromVal((*pIMem)->pData, (const char*)&tLastKey, TSDB_DATA_TYPE_TIMESTAMP, order);
tsdbTbDataIterCreate(pIMem, &(TSDBKEY){.version = 0, .ts = tLastKey}, 0, &pCheckInfo->iiter);
}
}
......@@ -777,22 +776,21 @@ static bool initTableMemIterator(STsdbReadHandle* pHandle, STableCheckInfo* pChe
return false;
}
bool memEmpty = (pCheckInfo->iter == NULL) || (pCheckInfo->iter != NULL && !tSkipListIterNext(pCheckInfo->iter));
bool imemEmpty = (pCheckInfo->iiter == NULL) || (pCheckInfo->iiter != NULL && !tSkipListIterNext(pCheckInfo->iiter));
bool memEmpty = (pCheckInfo->iter == NULL) || (pCheckInfo->iter != NULL && !tsdbTbDataIterNext(pCheckInfo->iter));
bool imemEmpty = (pCheckInfo->iiter == NULL) || (pCheckInfo->iiter != NULL && !tsdbTbDataIterNext(pCheckInfo->iiter));
if (memEmpty && imemEmpty) { // buffer is empty
return false;
}
if (!memEmpty) {
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
assert(node != NULL);
TSDBROW row;
STSRow* row = (STSRow*)SL_GET_NODE_DATA(node);
TSKEY key = TD_ROW_KEY(row); // first timestamp in buffer
tsdbTbDataIterGet(pCheckInfo->iter, &row);
TSKEY key = row.pTSRow->ts; // first timestamp in buffer
tsdbDebug("%p uid:%" PRId64 ", check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
"-%" PRId64 ", lastKey:%" PRId64 ", numOfRows:%" PRId64 ", %s",
pHandle, pCheckInfo->tableId, key, order, (*pMem)->minKey.ts, (*pMem)->maxKey.ts, pCheckInfo->lastKey,
(*pMem)->nrows, pHandle->idStr);
pHandle, pCheckInfo->tableId, key, order, pMem->minKey.ts, pMem->maxKey.ts, pCheckInfo->lastKey,
pMem->sl.size, pHandle->idStr);
if (ASCENDING_TRAVERSE(order)) {
assert(pCheckInfo->lastKey <= key);
......@@ -805,15 +803,14 @@ static bool initTableMemIterator(STsdbReadHandle* pHandle, STableCheckInfo* pChe
}
if (!imemEmpty) {
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter);
assert(node != NULL);
TSDBROW row;
STSRow* row = (STSRow*)SL_GET_NODE_DATA(node);
TSKEY key = TD_ROW_KEY(row); // first timestamp in buffer
tsdbTbDataIterGet(pCheckInfo->iter, &row);
TSKEY key = row.pTSRow->ts; // first timestamp in buffer
tsdbDebug("%p uid:%" PRId64 ", check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
"-%" PRId64 ", lastKey:%" PRId64 ", numOfRows:%" PRId64 ", %s",
pHandle, pCheckInfo->tableId, key, order, (*pIMem)->minKey.ts, (*pIMem)->maxKey.ts, pCheckInfo->lastKey,
(*pIMem)->nrows, pHandle->idStr);
pHandle, pCheckInfo->tableId, key, order, pIMem->minKey.ts, pIMem->maxKey.ts, pCheckInfo->lastKey,
pIMem->sl.size, pHandle->idStr);
if (ASCENDING_TRAVERSE(order)) {
assert(pCheckInfo->lastKey <= key);
......@@ -828,31 +825,23 @@ static bool initTableMemIterator(STsdbReadHandle* pHandle, STableCheckInfo* pChe
}
static void destroyTableMemIterator(STableCheckInfo* pCheckInfo) {
tSkipListDestroyIter(pCheckInfo->iter);
tSkipListDestroyIter(pCheckInfo->iiter);
tsdbTbDataIterDestroy(pCheckInfo->iter);
tsdbTbDataIterDestroy(pCheckInfo->iiter);
}
static TSKEY extractFirstTraverseKey(STableCheckInfo* pCheckInfo, int32_t order, int32_t update, TDRowVerT maxVer) {
TSDBROW row = {0};
STSRow *rmem = NULL, *rimem = NULL;
if (pCheckInfo->iter) {
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
if (node != NULL) {
rmem = (STSRow*)SL_GET_NODE_DATA(node);
// TODO: filter max version
// if (TD_ROW_VER(rmem) > maxVer) {
// rmem = NULL;
// }
if (tsdbTbDataIterGet(pCheckInfo->iter, &row)) {
rmem = row.pTSRow;
}
}
if (pCheckInfo->iiter) {
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter);
if (node != NULL) {
rimem = (STSRow*)SL_GET_NODE_DATA(node);
// TODO: filter max version
// if (TD_ROW_VER(rimem) > maxVer) {
// rimem = NULL;
// }
if (tsdbTbDataIterGet(pCheckInfo->iiter, &row)) {
rimem = row.pTSRow;
}
}
......@@ -889,7 +878,7 @@ static TSKEY extractFirstTraverseKey(STableCheckInfo* pCheckInfo, int32_t order,
pCheckInfo->chosen = CHECKINFO_CHOSEN_BOTH;
} else {
pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
tSkipListIterNext(pCheckInfo->iter);
tsdbTbDataIterNext(pCheckInfo->iter);
}
return r1;
} else if (r1 < r2 && ASCENDING_TRAVERSE(order)) {
......@@ -903,28 +892,17 @@ static TSKEY extractFirstTraverseKey(STableCheckInfo* pCheckInfo, int32_t order,
static STSRow* getSRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order, int32_t update, STSRow** extraRow,
TDRowVerT maxVer) {
TSDBROW row;
STSRow *rmem = NULL, *rimem = NULL;
if (pCheckInfo->iter) {
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
if (node != NULL) {
rmem = (STSRow*)SL_GET_NODE_DATA(node);
#if 0 // TODO: skiplist refactor
if (TD_ROW_VER(rmem) > maxVer) {
rmem = NULL;
}
#endif
if (tsdbTbDataIterGet(pCheckInfo->iter, &row)) {
rmem = row.pTSRow;
}
}
if (pCheckInfo->iiter) {
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter);
if (node != NULL) {
rimem = (STSRow*)SL_GET_NODE_DATA(node);
#if 0 // TODO: skiplist refactor
if (TD_ROW_VER(rimem) > maxVer) {
rimem = NULL;
}
#endif
if (tsdbTbDataIterGet(pCheckInfo->iiter, &row)) {
rimem = row.pTSRow;
}
}
......@@ -966,7 +944,7 @@ static STSRow* getSRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order, int
*extraRow = rimem;
return rmem;
} else {
tSkipListIterNext(pCheckInfo->iter);
tsdbTbDataIterNext(pCheckInfo->iter);
pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
return rimem;
}
......@@ -995,7 +973,7 @@ static bool moveToNextRowInMem(STableCheckInfo* pCheckInfo) {
bool hasNext = false;
if (pCheckInfo->chosen == CHECKINFO_CHOSEN_MEM) {
if (pCheckInfo->iter != NULL) {
hasNext = tSkipListIterNext(pCheckInfo->iter);
hasNext = tsdbTbDataIterNext(pCheckInfo->iter);
}
if (hasNext) {
......@@ -1003,11 +981,11 @@ static bool moveToNextRowInMem(STableCheckInfo* pCheckInfo) {
}
if (pCheckInfo->iiter != NULL) {
return tSkipListIterGet(pCheckInfo->iiter) != NULL;
return tsdbTbDataIterGet(pCheckInfo->iiter, NULL);
}
} else if (pCheckInfo->chosen == CHECKINFO_CHOSEN_IMEM) {
if (pCheckInfo->iiter != NULL) {
hasNext = tSkipListIterNext(pCheckInfo->iiter);
hasNext = tsdbTbDataIterNext(pCheckInfo->iiter);
}
if (hasNext) {
......@@ -1015,14 +993,14 @@ static bool moveToNextRowInMem(STableCheckInfo* pCheckInfo) {
}
if (pCheckInfo->iter != NULL) {
return tSkipListIterGet(pCheckInfo->iter) != NULL;
return tsdbTbDataIterGet(pCheckInfo->iter, NULL);
}
} else {
if (pCheckInfo->iter != NULL) {
hasNext = tSkipListIterNext(pCheckInfo->iter);
hasNext = tsdbTbDataIterNext(pCheckInfo->iter);
}
if (pCheckInfo->iiter != NULL) {
hasNext = tSkipListIterNext(pCheckInfo->iiter) || hasNext;
hasNext = tsdbTbDataIterNext(pCheckInfo->iiter) || hasNext;
}
}
......
......@@ -39,7 +39,7 @@ int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq *pMsg, SSubmitRsp *
SSubmitBlkRsp r = {0};
tGetSubmitMsgNext(&msgIter, &pBlock);
if (pBlock == NULL) break;
if (tsdbInsertTableData(pTsdb, &msgIter, pBlock, &r) < 0) {
if (tsdbInsertTableData(pTsdb, version, &msgIter, pBlock, &r) < 0) {
return -1;
}
......
......@@ -779,7 +779,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
sprintf(submitBlkRsp.tblFName, "%s.", pVnode->config.dbname);
}
if (tsdbInsertTableData(pVnode->pTsdb, &msgIter, pBlock, &submitBlkRsp) < 0) {
if (tsdbInsertTableData(pVnode->pTsdb, version, &msgIter, pBlock, &submitBlkRsp) < 0) {
submitBlkRsp.code = terrno;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册