提交 ae9698e1 编写于 作者: H Hongze Cheng

feat: vnode multi-version

上级 8bf48907
...@@ -43,6 +43,8 @@ typedef struct SMemTable SMemTable; ...@@ -43,6 +43,8 @@ typedef struct SMemTable SMemTable;
int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable **ppMemTable); int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable **ppMemTable);
void tsdbMemTableDestroy2(SMemTable *pMemTable); void tsdbMemTableDestroy2(SMemTable *pMemTable);
int32_t tsdbInsertTableData2(STsdb *pTsdb, int64_t version, SVSubmitBlk *pSubmitBlk);
int32_t tsdbDeleteTableData2(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey);
// tsdbMemTable ================ // tsdbMemTable ================
typedef struct STsdbRow STsdbRow; typedef struct STsdbRow STsdbRow;
......
...@@ -87,6 +87,7 @@ static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *p ...@@ -87,6 +87,7 @@ static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *p
static void tsdbLoadAndMergeFromCache(STsdb *pTsdb, SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, static void tsdbLoadAndMergeFromCache(STsdb *pTsdb, SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter,
SDataCols *pTarget, TSKEY maxKey, int maxRows, int8_t update); SDataCols *pTarget, TSKEY maxKey, int maxRows, int8_t update);
int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf); int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf);
static int tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn);
int tsdbBegin(STsdb *pTsdb) { int tsdbBegin(STsdb *pTsdb) {
if (!pTsdb) return 0; if (!pTsdb) return 0;
...@@ -100,57 +101,6 @@ int tsdbBegin(STsdb *pTsdb) { ...@@ -100,57 +101,6 @@ int tsdbBegin(STsdb *pTsdb) {
return 0; return 0;
} }
int tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn) {
SDiskID did;
SDFileSet nSet = {0};
STsdbFS *pfs = REPO_FS(pRepo);
int level;
ASSERT(pSet->fid >= pRtn->minFid);
level = tsdbGetFidLevel(pSet->fid, pRtn);
if (tfsAllocDisk(pRepo->pVnode->pTfs, level, &did) < 0) {
terrno = TSDB_CODE_TDB_NO_AVAIL_DISK;
return -1;
}
if (did.level > TSDB_FSET_LEVEL(pSet)) {
// Need to move the FSET to higher level
tsdbInitDFileSet(pRepo, &nSet, did, pSet->fid, FS_TXN_VERSION(pfs));
if (tsdbCopyDFileSet(pSet, &nSet) < 0) {
tsdbError("vgId:%d, failed to copy FSET %d from level %d to level %d since %s", REPO_ID(pRepo), pSet->fid,
TSDB_FSET_LEVEL(pSet), did.level, tstrerror(terrno));
return -1;
}
if (tsdbUpdateDFileSet(pfs, &nSet) < 0) {
return -1;
}
tsdbInfo("vgId:%d, FSET %d is copied from level %d disk id %d to level %d disk id %d", REPO_ID(pRepo), pSet->fid,
TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet), did.level, did.id);
} else {
// On a correct level
if (tsdbUpdateDFileSet(pfs, pSet) < 0) {
return -1;
}
}
return 0;
}
int tsdbPrepareCommit(STsdb *pTsdb) {
if (pTsdb->mem == NULL) return 0;
ASSERT(pTsdb->imem == NULL);
pTsdb->imem = pTsdb->mem;
pTsdb->mem = NULL;
return 0;
}
int tsdbCommit(STsdb *pRepo) { int tsdbCommit(STsdb *pRepo) {
SCommitH commith = {0}; SCommitH commith = {0};
SDFileSet *pSet = NULL; SDFileSet *pSet = NULL;
...@@ -223,6 +173,57 @@ int tsdbCommit(STsdb *pRepo) { ...@@ -223,6 +173,57 @@ int tsdbCommit(STsdb *pRepo) {
return 0; return 0;
} }
static int tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn) {
SDiskID did;
SDFileSet nSet = {0};
STsdbFS *pfs = REPO_FS(pRepo);
int level;
ASSERT(pSet->fid >= pRtn->minFid);
level = tsdbGetFidLevel(pSet->fid, pRtn);
if (tfsAllocDisk(pRepo->pVnode->pTfs, level, &did) < 0) {
terrno = TSDB_CODE_TDB_NO_AVAIL_DISK;
return -1;
}
if (did.level > TSDB_FSET_LEVEL(pSet)) {
// Need to move the FSET to higher level
tsdbInitDFileSet(pRepo, &nSet, did, pSet->fid, FS_TXN_VERSION(pfs));
if (tsdbCopyDFileSet(pSet, &nSet) < 0) {
tsdbError("vgId:%d, failed to copy FSET %d from level %d to level %d since %s", REPO_ID(pRepo), pSet->fid,
TSDB_FSET_LEVEL(pSet), did.level, tstrerror(terrno));
return -1;
}
if (tsdbUpdateDFileSet(pfs, &nSet) < 0) {
return -1;
}
tsdbInfo("vgId:%d, FSET %d is copied from level %d disk id %d to level %d disk id %d", REPO_ID(pRepo), pSet->fid,
TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet), did.level, did.id);
} else {
// On a correct level
if (tsdbUpdateDFileSet(pfs, pSet) < 0) {
return -1;
}
}
return 0;
}
int tsdbPrepareCommit(STsdb *pTsdb) {
if (pTsdb->mem == NULL) return 0;
ASSERT(pTsdb->imem == NULL);
pTsdb->imem = pTsdb->mem;
pTsdb->mem = NULL;
return 0;
}
void tsdbGetRtnSnap(STsdb *pRepo, SRtn *pRtn) { void tsdbGetRtnSnap(STsdb *pRepo, SRtn *pRtn) {
STsdbKeepCfg *pCfg = REPO_KEEP_CFG(pRepo); STsdbKeepCfg *pCfg = REPO_KEEP_CFG(pRepo);
TSKEY minKey, midKey, maxKey, now; TSKEY minKey, midKey, maxKey, now;
...@@ -543,8 +544,8 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid ...@@ -543,8 +544,8 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
return -1; return -1;
} }
tsdbDebug("vgId:%d, FSET %d at level %d disk id %d is opened to read to commit", REPO_ID(pRepo), TSDB_FSET_FID(pSet), tsdbDebug("vgId:%d, FSET %d at level %d disk id %d is opened to read to commit", REPO_ID(pRepo),
TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet)); TSDB_FSET_FID(pSet), TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet));
} else { } else {
pCommith->isRFileSet = false; pCommith->isRFileSet = false;
} }
......
...@@ -64,9 +64,10 @@ static int memDataPCmprFn(const void *p1, const void *p2); ...@@ -64,9 +64,10 @@ static int memDataPCmprFn(const void *p1, const void *p2);
static int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow); static int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow);
static int32_t tGetTSDBRow(uint8_t *p, TSDBROW *pRow); static int32_t tGetTSDBRow(uint8_t *p, TSDBROW *pRow);
static int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl); static int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl);
static void memDataMovePos(SMemData *pMemData, TSDBROW *pRow, int8_t isForward, SMemSkipListNode **pos); static void memDataGetPosToPut(SMemData *pMemData, SMemSkipListNode **backward, TSDBROW *pRow);
static int32_t memDataPutRow(SVBufPool *pPool, SMemData *pMemData, TSDBROW *pRow, int8_t isForward, static int32_t memDataDoPut(SMemData *pMemData, SMemSkipListNode **pos, TSDBROW *pRow, int8_t isForward);
SMemSkipListNode **pos); static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, SMemData *pMemData, int64_t version,
SVSubmitBlk *pSubmitBlk);
// SMemTable ============================================== // SMemTable ==============================================
int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable **ppMemTable) { int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable **ppMemTable) {
...@@ -123,30 +124,11 @@ int32_t tsdbInsertTableData2(STsdb *pTsdb, int64_t version, SVSubmitBlk *pSubmit ...@@ -123,30 +124,11 @@ int32_t tsdbInsertTableData2(STsdb *pTsdb, int64_t version, SVSubmitBlk *pSubmit
} }
// do insert // do insert
int32_t nt; code = tsdbInsertTableDataImpl(pMemTable, pMemData, version, pSubmitBlk);
int32_t n = 0;
uint8_t *p = pSubmitBlk->pData;
int32_t nRow = 0;
SMemSkipListNode *pos[SL_MAX_LEVEL] = {0};
for (int8_t iLevel = 0; iLevel < SL_MAX_LEVEL; iLevel++) {
pos[iLevel] = pMemData->sl.pTail;
}
while (n < pSubmitBlk->nData) {
nt = tGetTSRow(p + n, &row.tsRow);
n += nt;
ASSERT(n <= pSubmitBlk->nData);
memDataMovePos(pMemData, &row, nRow ? 1 : 0, pos);
code = memDataPutRow(pTsdb->pVnode->inUse, pMemData, &row, nRow ? 1 : 0, pos);
if (code) { if (code) {
goto _err; goto _err;
} }
nRow++;
}
return code; return code;
_err: _err:
...@@ -310,87 +292,79 @@ static FORCE_INLINE int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl) { ...@@ -310,87 +292,79 @@ static FORCE_INLINE int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl) {
return level; return level;
} }
static void memDataMovePos(SMemData *pMemData, TSDBROW *pRow, int8_t isForward, SMemSkipListNode **pos) { static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, SMemData *pMemData, int64_t version,
TSDBKEY *pKey; SVSubmitBlk *pSubmitBlk) {
int c; int32_t code = 0;
int32_t n = 0;
uint8_t *p = pSubmitBlk->pData;
int32_t nRow = 0;
TSDBROW row = {.version = version};
if (isForward) { SMemSkipListNode *backward[SL_MAX_LEVEL];
// TODO SMemSkipListNode *forward[SL_MAX_LEVEL];
} else {
SMemSkipListNode *px = pMemData->sl.pTail;
for (int8_t iLevel = pMemData->sl.maxLevel - 1; iLevel >= 0; iLevel--) { ASSERT(pSubmitBlk->nData);
if (iLevel < pMemData->sl.level) {
SMemSkipListNode *p = SL_NODE_BACKWARD(px, iLevel);
while (p != pMemData->sl.pHead) { // backward put first data
pKey = (TSDBKEY *)SL_NODE_DATA(p); n += tGetTSRow(p + n, &row.tsRow);
ASSERT(n <= pSubmitBlk->nData);
c = tsdbKeyCmprFn(pKey, pRow); memDataGetPosToPut(pMemData, backward, &row);
if (c <= 0) { code = memDataDoPut(pMemData, backward, &row, 0);
break; if (code) {
} else { goto _exit;
px = p;
p = SL_NODE_BACKWARD(px, iLevel);
}
} }
nRow++;
pos[iLevel] = px; // forward put rest
} while (n < pSubmitBlk->nData) {
} n += tGetTSRow(p + n, &row.tsRow);
ASSERT(n <= pSubmitBlk->nData);
// TODO
nRow++;
} }
_exit:
return code;
} }
static void memMovePosFrom(SMemData *pMemData, SMemSkipListNode *pNode, TSDBROW *pRow, int8_t isForward, static void memDataGetPosToPut(SMemData *pMemData, SMemSkipListNode **backward, TSDBROW *pRow) {
SMemSkipListNode **pos) { SMemSkipListNode *px;
SMemSkipListNode *px = pNode; SMemSkipListNode *pn;
TSDBKEY *pKey; TSDBKEY *pKey;
SMemSkipListNode *p;
int c; int c;
if (isForward) { for (int8_t iLevel = 0; iLevel < pMemData->sl.maxLevel - 1; iLevel++) {
} else { backward[iLevel] = pMemData->sl.pTail;
ASSERT(pNode != pMemData->sl.pHead); }
if (0) {
} else {
px = pMemData->sl.pTail;
for (int8_t iLevel = pMemData->sl.maxLevel - 1; iLevel >= 0; iLevel--) { for (int8_t iLevel = pMemData->sl.maxLevel - 1; iLevel >= 0; iLevel--) {
p = SL_NODE_BACKWARD(px, iLevel); if (iLevel < pMemData->sl.level) {
while (p != pMemData->sl.pHead) { pn = SL_NODE_BACKWARD(px, iLevel);
pKey = (TSDBKEY *)SL_NODE_DATA(p); while (pn != pMemData->sl.pHead) {
pKey = (TSDBKEY *)SL_NODE_DATA(pn);
c = tsdbKeyCmprFn(pKey, pRow); c = tsdbKeyCmprFn(pKey, pRow);
if (c <= 0) { if (c <= 0) {
break; break;
} else { } else {
px = p; px = pn;
p = SL_NODE_BACKWARD(px, iLevel); pn = SL_NODE_BACKWARD(px, iLevel);
} }
} }
}
pos[iLevel] = px; backward[iLevel] = px;
} }
} }
} }
static int32_t memDataPutRow(SVBufPool *pPool, SMemData *pMemData, TSDBROW *pRow, int8_t isForward, static int32_t memDataDoPut(SMemData *pMemData, SMemSkipListNode **pos, TSDBROW *pRow, int8_t isForward) {
SMemSkipListNode **pos) {
int32_t code = 0; int32_t code = 0;
int8_t level;
SMemSkipListNode *pNode;
level = tsdbMemSkipListRandLevel(&pMemData->sl);
pNode = (SMemSkipListNode *)vnodeBufPoolMalloc(pPool, SL_NODE_SIZE(level) + tPutTSDBRow(NULL, pRow));
if (pNode == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
// do the read put
if (isForward) {
// TODO
} else {
// TODO // TODO
}
_exit:
return code; return code;
} }
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册